Machine Learning

Parameters

Config Files

tomlimport toml

with open("example.toml", "r") as f:
    data = toml.load(f)

print(data["title"],"\n",
        data["database"]["data"],"\n",
        data["database"]["temp_targets"],"\n",
        data["owner"]["dob"])
# TOML Example 
# [['delta', 'phi'], [3.14]] 
# {'cpu': 79.5, 'case': 72.0} 
# 1979-05-27 07:32:00-08:00    
Example TOML Filetitle = "TOML Example" [owner] name = "Tom Preston-Werner" dob = 1979-05-27T07:32:00-08:00 [database] enabled = true ports = [ 8000, 8001, 8002 ] data = [ ["delta", "phi"], [3.14] ] temp_targets = { cpu = 79.5, case = 72.0 } [servers] [servers.alpha] ip = "10.0.0.1" role = "frontend" [servers.beta] ip = "10.0.0.2" role = "backend"
hydraimport hydra
from omegaconf import DictConfig

# import yaml
# config = yaml.load(
# open('conf/model/specialmodel.yaml', 'r'), 
#       Loader=yaml.SafeLoader)


@hydra.main(version_base=None, config_path="../conf")
def main(config: DictConfig):
    ... ...

python  src/models/predict.py +model=specialmodel
Example YAML File# This is a YAML file example name: John Doe age: 30 email: johndoe@example.com address: street: 123 Main St city: "{formated_city}" state: CA postal_code: 12345 phone_numbers: # list of dicts - type: home number: (555) 555-5555 - type: work number: (555) 123-4567 formatted_text: | This is a formatted text example. It can span multiple paragraphs. unformatted_text: > This is an unformatted text example. It can span multiple paragraphs. Structure is only visible in code.

Grid Search

optunaimport optuna
optuna.logging.set_verbosity(optuna.logging.WARNING)

f = lambda X : X[0]**2 + X[1]**3 - 2*X[0] + X[1] - X[2]

def objective(trial):
    X = [trial.suggest_float(f"X[{i}]", 1e-5, 1e-0, log=True) for i in range(2)]
    X += [trial.suggest_float("X[2]", -5, 5, step=0.1)]
    return f(X)

sampler = optuna.samplers.TPESampler(seed=10)
study = optuna.create_study(sampler=sampler, direction="maximize")
study.optimize(objective, n_trials=100, timeout=60,
                show_progress_bar=True, n_jobs=-1,)
print(study.best_params, study.best_value,sep="\n")
optuna.visualization.plot_parallel_coordinate(study).write_html("parallel.html")
optuna.visualization.plot_optimization_history(study).write_html("history.html")
optuna.visualization.plot_contour(study).write_html("contour.html")
optuna.visualization.plot_slice(study).write_html("slice.html")

Toy Pipeline

We give two examples of toy pipelines, one in scikit-learn and the other in pytorch.
Then we will get into each framework in detail.

Scikit-learn Pipeline

StratifiedGroupKFoldCV Splitimport pandas as pd
import numpy as np
from sklearn.model_selection import StratifiedGroupKFold

df = pd.DataFrame({
    "datetime_lcl": pd.date_range("2020-01-01", periods=100, freq="D"),
    "x": np.random.randn(100),
    "a": np.random.randint(0, 5, 100),
    "b": np.random.randint(0, 3, 100),
    "target": np.random.randint(0, 2, 100)
})


levels = df["datetime_lcl"].dt.year
group = df["datetime_lcl"].dt.date
skf = StratifiedGroupKFold(n_splits=5).split(df, levels, group)

for train_index, test_index in skf:
    print("TRAIN:", train_index, "TEST:", test_index)
    print("TRAIN:", df.iloc[train_index], "TEST:", df.iloc[test_index])

ColumnTransformerTransformerMixinfrom sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer


class MaxMinTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        row_max = X.max(axis=1)
        row_min = X.min(axis=1)
        return np.c_[row_max, row_min]


CT = ColumnTransformer([
    ("max_min", MaxMinTransformer(), ["a", "b"]),
    ("x", "passthrough", ["x"])
])

CT.get_params().keys()
FeatureUnionfrom sklearn.pipeline import FeatureUnion

class ColumnSquared(BaseEstimator, TransformerMixin):
    def __init__(self, columns):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return X[self.columns] ** 2


FU = FeatureUnion([
    ("max_min_idx", CT),
    ("x", ColumnSquared(["x"]))
])

FU.get_params().keys()
RegressorMixinPipelinefrom sklearn.base import RegressorMixin
from sklearn.pipeline import Pipeline


class DummyRegressor(RegressorMixin):
    def __init__(self, multi_factor=1):
        self.multi_factor = multi_factor

    def fit(self, X, y, ):
        return self

    def predict(self, X):
        return np.random.randn(X.shape[0])+X[:, 0]*self.multi_factor


pipe = Pipeline([
    ("fu", FU),
    ("model", DummyRegressor(multi_factor=2))
])

pipe.fit(df, df["target"])
pipe.predict(df)

Pytorch Pipeline

transformsfrom torchvision import datasets, transforms
import torch

transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
train_dataset = datasets.MNIST(root='./data', 
            train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                            batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=64, shuffle=False)CNNimport torch.nn as nn
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        # 1 channel for grayscale, 32 filters, 3x3 kernel
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = x.view(-1, 64 * 7 * 7)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

    
    
optimimport torch.optim as optim

model = CNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    for images, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# Test the model
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy of the model on the test images: {100 * correct / total} %')

Neural Networks

Torch

Regression

Regressionfrom scratch# linear regression from scratch with torch
import torch as t
from sklearn.datasets import load_diabetes

# get data
X, y = load_diabetes(return_X_y=True, as_frame=True)


# convert to tensor
X = t.tensor(X.values, dtype=t.float32)
y = t.tensor(y.values, dtype=t.float32)


class LinearRegressionScratch():
    def __init__(self, num_inputs, lr, sigma=0.01):
        self.w = t.normal(0, sigma, (num_inputs, 1), requires_grad=True)
        self.b = t.zeros(1, requires_grad=True)

    def forward(self, X):
        return X@self.w + self.b

    def loss(self, y_hat, y):
        l = (y_hat - y) ** 2 
        return l.mean()

    # calculate gradient
    def gradient(self, X, y):
        y_hat = self.forward(X)
        l = self.loss(y_hat, y)
        l.backward()
    
    # update parameters
    def update(self, lr):
        self.w.data -= lr * self.w.grad
        self.b.data -= lr * self.b.grad
        self.w.grad.zero_()
        self.b.grad.zero_()
    
    # train model
    def train(self, X, y, lr, epochs):
        for epoch in range(epochs):
            self.gradient(X, y)
            self.update(lr)
            if (epoch + 1) % 50 == 0:
                print(f'epoch {epoch + 1}, loss: {self.loss(self.forward(X), y).item():.4f}')


Model = LinearRegressionScratch(X.shape[1], 0.01)
Model.train(X, y, 0.01, 500)
Classification from scratchfrom sklearn.datasets import load_iris
import torch as t
X, y = load_iris(return_X_y = True, as_frame=True)
# to tensor
X = t.tensor(X.to_numpy(), dtype=t.float32)
y = t.tensor(y.to_numpy(), dtype=t.long)


def softmax(X):
    X_exp = t.exp(X)
    partition = X_exp.sum(1, keepdims=True)
    return X_exp / partition


def cross_entropy(y_hat, y):
    return -t.log(y_hat[range(len(y_hat)), y]).mean()


class ClassifierScratch:
    def __init__(self, num_inputs, num_outputs, lr, sigma=0.01):
        self.W = t.normal(0, sigma, size=(
            num_inputs, num_outputs), requires_grad=True)
        self.b = t.zeros(num_outputs, requires_grad=True)

    def forward(self, X):
        return softmax(t.matmul(X.reshape(-1, self.W.shape[0]), self.W) + self.b)

    def loss(self, Y_hat, Y):
        return cross_entropy(Y_hat, Y)

    def gradient(self, X, Y):
        Y_hat = self.forward(X)
        l = self.loss(Y_hat, Y)
        l.backward()

    def update(self, lr):
        with t.no_grad():
            self.W -= lr * self.W.grad
            self.b -= lr * self.b.grad
            self.W.grad.zero_()
            self.b.grad.zero_()

    def accuracy(self, X, Y):
        Y_hat = self.forward(X)
        preds = Y_hat.argmax(axis=1).type(Y.dtype)
        compare = (preds == Y.reshape(-1)).type(t.float32)
        return compare.mean()

    def train(self, X, Y, lr, epochs):
        for epoch in range(epochs):
            self.gradient(X, Y)
            self.update(lr)
            if epoch % 100 == 0:
                print(f'epoch {epoch}, loss {self.loss(self.forward(X), Y)},\
                accuracy {self.accuracy(X, Y)}')


Model = ClassifierScratch(X.shape[1], 3, 0.1)
Model.train(X, y, 0.1, 1000)

AutoEncoder

AutoEncoderimport torch

class AE(torch.nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        
        self.encoder = torch.nn.Sequential(
            torch.nn.Linear(input_dim, input_dim // 2),
            torch.nn.ReLU(),
            torch.nn.Linear(input_dim // 2, input_dim // 4),
            torch.nn.ReLU(),
            torch.nn.Linear(input_dim // 4, input_dim // 8),
            torch.nn.ReLU(),
            torch.nn.Linear(input_dim // 8, output_dim)
        )
        
        self.decoder = torch.nn.Sequential(
            torch.nn.Linear(output_dim, input_dim // 8),
            torch.nn.ReLU(),
            torch.nn.Linear(input_dim // 8, input_dim // 4),
            torch.nn.ReLU(),
            torch.nn.Linear(input_dim // 4, input_dim // 2),
            torch.nn.ReLU(),
            torch.nn.Linear(input_dim // 2, input_dim),
            torch.nn.Sigmoid()
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

TensorFlow

importsset upsimport tensorflow as tf
import keras

def make_features(X):
    return tf.stack([tf.pow(X,i) for i in range(5)],axis=1)
X = tf.constant(np.linspace(0, 2, 1000), dtype=tf.float32)
Y = X * tf.exp(-X**2)
Xf = make_features(X)

Basics

constantvariableGradientTapeSTEPS = 2000
LEARNING_RATE = .02
n_weights = Xf.shape[1]
W = tf.Variable(np.zeros((n_weights, 1)), dtype=tf.float32)
losses = []

def predict(X, W):
    return tf.squeeze(X @ W, -1)

def loss_mse(X, Y, W):
    Y_hat = predict(X, W)
    errors = (Y_hat - Y)**2
    return tf.reduce_mean(errors)

def compute_gradients(X, Y, W):
    with tf.GradientTape() as tape:
        loss = loss_mse(Xf, Y, W)
    return tape.gradient(loss, W)

optimizer = keras.optimizers.Adam(learning_rate=0.01)
for step in range(1, STEPS + 1):

    dW = compute_gradients(X, Y, W)
    optimizer.apply_gradients(zip([dW], [W]))

    if step % 100 == 0:
        loss = loss_mse(Xf, Y, W)
        losses.append(loss)
        plt.clf()
        plt.plot(losses)


print("STEP: {} MSE: {}".format(STEPS, loss_mse(Xf, Y, W)))

plt.figure()
plt.plot(X, Y, label='actual')
plt.plot(X, predict(Xf, W), label='predicted')
plt.legend()
optimizers# function to minimize
square = lambda: x**2  + y**2 + 3
# initialize variable 
x = tf.Variable(1.0)
y = tf.Variable(2.0)

# set optimizer and learning rate
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)

# train
for i in range(1000): 
    optimizer.minimize(square, [x,y])
    if i%100==0:
      print(f'{i}-th step : ',x.numpy(),y.numpy())
# 0-th step :  0.98 1.96
# 100-th step :  0.1299672 0.2599344
# 200-th step :  0.0172362 0.034472395
# 300-th step :  0.002285857 0.004571714
# 400-th step :  0.0003031494 0.0006062988
# 500-th step :  4.0203555e-05 8.04071e-05
# 600-th step :  5.3317785e-06 1.0663557e-05
# 700-th step :  7.0709825e-07 1.4141964e-06
# 800-th step :  9.3775085e-08 1.8755017e-07
# 900-th step :  1.243641e-08 2.487282e-08

Different Ways to Create Model

build-in datasetimport numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import \
Input,Flatten,Dense,BatchNormalization
seed = 2048
np.random.seed(seed)
tf.random.set_seed(seed)

# load data
(X_train, y_train), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train, X_test = X_train / 255.0, X_test / 255.0

# data dimensions
input_dim = (28, 28)
output_dim = len(np.unique(y_train))
Sequential# Sequential
layers = [Flatten(input_shape=input_dim),
    Dense(units=128, activation='relu'),
    BatchNormalization(),
    Dense(units=128, activation='relu'),
    BatchNormalization(),
    Dense(units=output_dim, activation='softmax')]

model = keras.Sequential(layers)Sequential.add# Sequential.add method
model = keras.Sequential()
model.add(Flatten(input_shape=input_dim))
model.add(Dense(units=128, activation='relu'))
model.add(BatchNormalization())
model.add(Dense(units=128, activation='relu'))
model.add(BatchNormalization())
model.add(Dense(units=output_dim, activation='softmax'))Functional# Functional API
def create_model(input_dim, output_dim):
    inputs = Input(shape=input_dim)
    x = Flatten()(inputs)
    x = Dense(units=128, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dense(units=128, activation='relu')(x)
    x = BatchNormalization()(x)
    outputs = Dense(units=output_dim, activation='softmax')(x)
return keras.Model(inputs=inputs, outputs=outputs)

model = create_model(input_dim, output_dim)
Subclassingadd_loss# define layer for fully connected NN
hidden_layer = [128, 128]

# define custom layer
class myDenseBlock(layers.Layer):
    def __init__(self, units, activation='relu'):
        super(myDenseBlock, self).__init__()
        self.dense = Dense(units, activation)
        self.bn = BatchNormalization()

    def call(self, inputs):
        x = self.dense(inputs)
        x = self.bn(x)
        self.add_loss(0.000001 * tf.reduce_sum(tf.square(x)))
        return x

# define custom model
class myModel(keras.Model):
    def __init__(self, hidden_layer, output_dim, activation='relu'):
        super(myModel,self).__init__()
        self.hidden_layer = [myDenseBlock(units) for units in hidden_layer]
        self.flatten = Flatten()
        self.softmax = Dense(units=output_dim, activation='softmax')

    def call(self, inputs):
        x = self.flatten(inputs)
        for layer in self.hidden_layer:
            x = layer(x)
        x = self.softmax(x)
        return x

model = myModel(hidden_layer, output_dim)
compilefitmodel.compile(
optimizer="Adam",
loss="sparse_categorical_crossentropy",
metrics=['acc']
)

model.fit(
X_train, y_train,
epochs=2,
batch_size=128,
validation_data=(X_test,y_test),
)

Computer Vision

OpenCV

Image

importsimport cv2
from google.colab.patches import cv2_imshow
import numpy as np
            
image downloads!wget -q --show-progress 'https://github.com/weiyang2048/Garage/blob/main/Data%20Sets/sky.jpg?raw=true' -O sky.jpg \
'https://github.com/weiyang2048/Garage/blob/main/Data%20Sets/stone.jpg?raw=true' -O stone.jpg
model downloads!wget -q --show-progress https://cdn.rawgit.com/opencv/opencv/3.4.0/samples/data/dnn/synset_words.txt\
https://cdn.rawgit.com/opencv/opencv/3.4.0/samples/data/dnn/bvlc_googlenet.prototxt\
http://dl.caffe.berkeleyvision.org/bvlc_googlenet.caffemodel
CannymatchTemplatesky = cv2.imread('sky.jpg')
stone = cv2.imread('stone.jpg')
# edge detection
edges = cv2.Canny(sky, 50,200, 10)
template = cv2.Canny(stone, 50,200, 10)
cv2_imshow(edges)
cv2_imshow(template)

result = cv2.matchTemplate(edges, template, cv2.TM_CCOEFF)
(_, maxVal, _, maxLoc) = cv2.minMaxLoc(result)
(_, maxVal, _, maxLoc)

Image Classification# pretrained model
all_rows = open('synset_words.txt').read().strip().split("\n")
classes = [r[r.find(' ') + 1:] for r in all_rows]
print(classes[0:3])
net = cv2.dnn.readNetFromCaffe('./bvlc_googlenet.prototxt','./bvlc_googlenet.caffemodel')

blob = cv2.dnn.blobFromImage(img, 1, (224,224))
net.setInput(blob)
outp = net.forward()

#top five predictions
idx = np.argsort(outp[0])[::-1][:5]
for (i,id) in enumerate(idx):
    print('{}. {} ({}): Probability {:.3}%'.format(i+1, classes[id], id, outp[0][id]*100))
Outputs : ['tench, Tinca tinca', 'goldfish, Carassius auratus', 'great white shark, white shark, man-eater, man-eating shark, Carcharodon carcharias'] 1. projectile, missile (744): Probability 23.6% 2. torch (862): Probability 10.4% 3. missile (657): Probability 9.45% 4. cannon (471): Probability 9.1% 5. rifle (764): Probability 4.28%

Video

Videoimport cv2
import numpy as np
from google.colab.patches import cv2_imshow
from matplotlib import pyplot as plt

cap = cv2.VideoCapture(
    "https://github.com/weiyang2048/Garage/raw/main/Data%20Sets/sample.mov")

if cap.isOpened() == False:
    print('Cannot open file or video stream')

for i in range(2):
    ret, frame = cap.read()
    if ret == True:
        plt.imshow(frame)
Video Classification!wget -q --show-progress https://cdn.rawgit.com/opencv/opencv/3.4.0/samples/data/dnn/synset_words.txt\
https://cdn.rawgit.com/opencv/opencv/3.4.0/samples/data/dnn/bvlc_googlenet.prototxt\
http://dl.caffe.berkeleyvision.org/bvlc_googlenet.caffemodel

# pretrained model
all_rows = open('synset_words.txt').read().strip().split("\n")
classes = [r[r.find(' ') + 1:] for r in all_rows]
net = cv2.dnn.readNetFromCaffe('./bvlc_googlenet.prototxt','./bvlc_googlenet.caffemodel')

if cap.isOpened() == False : print('Cannot open file or video stream')

for i in range(2):
    ret, frame = cap.read()
    blob = cv2.dnn.blobFromImage(frame, 1, (224,224))
    net.setInput(blob)
    outp = net.forward()

r=1
for i in np.argsort(outp[0])[::-1][:5]:
    txt = ' "%s" probability "%.3f" ' % (classes[i], outp[0][i] * 100)
    cv2.putText(frame, txt, (0, 25 + 40*r), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,0,0), 2)
    r+=1

if ret == True:
    cv2_imshow(frame)
else:
    break
Outputs :

YOLO

# download YOLOv3 pretrained model
!gdown -q --id 1i9WN6SEBgpkXI6kOHXV7Th8eAgbYMTWu
!unrar e YOLOv3.rar -idq
!rm YOLOv3.rar

# image and video
!wget -q https://github.com/weiyang2048/Garage/raw/main/Data%20Sets/sky.jpg
!wget -q https://github.com/weiyang2048/Garage/raw/main/Data%20Sets/sample.mov
import cv2
import numpy as np
from google.colab.patches import cv2_imshow

confThreshold = 0.5  #Confidence threshold
nmsThreshold = 0.4   #Non-maximum suppression threshold


classes = open('coco.names').read().strip().split('\n')
net = cv2.dnn.readNetFromDarknet("yolov3.cfg", "yolov3.weights")
net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)

# Get the names of the output layers
def getOutputsNames(net):
# Get the names of all the layers in the network
layersNames = net.getLayerNames()
# Get the names of the output layers, i.e. the layers with unconnected outputs
return [layersNames[i[0] - 1] for i in net.getUnconnectedOutLayers()]

# Draw the predicted bounding box
def drawPred(classId, conf, left, top, right, bottom):
# Draw a bounding box.
cv2.rectangle(frame, (left, top), (right, bottom), (255, 178, 50), 3)

label = '%.2f' % conf

# Get the label for the class name and its confidence
if classes:
    assert(classId < len(classes))
    label = '%s:%s' % (classes[classId], label)

#Display the label at the top of the bounding box
labelSize, baseLine = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
top = max(top, labelSize[1])
cv2.rectangle(frame, (left, top - round(1.5*labelSize[1])), (left + round(1.5*labelSize[0]), top + baseLine), (255, 255, 255), cv2.FILLED)
cv2.putText(frame, label, (left, top), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0,0,0), 1)

# Remove the bounding boxes with low confidence using non-maxima suppression
def postprocess(frame, outp):
frameHeight = frame.shape[0]
frameWidth = frame.shape[1]

# Scan through all the bounding boxes output from the network and keep only the
# ones with high confidence scores. Assign the box's class label as the class with the highest score.
classIds = []
confidences = []
boxes = []
for out in outp:
    for detection in out:
        scores = detection[5:]
        classId = np.argmax(scores)
        confidence = scores[classId]
        if confidence > confThreshold:
            center_x = int(detection[0] * frameWidth)
            center_y = int(detection[1] * frameHeight)
            width = int(detection[2] * frameWidth)
            height = int(detection[3] * frameHeight)
            left = int(center_x - width / 2)
            top = int(center_y - height / 2)
            classIds.append(classId)
            confidences.append(float(confidence))
            boxes.append([left, top, width, height])

# Perform non maximum suppression to eliminate redundant overlapping boxes with
# lower confidences.
indices = cv2.dnn.NMSBoxes(boxes, confidences, confThreshold, nmsThreshold)
for i in indices:
    i = i[0]
    box = boxes[i]
    left = box[0]
    top = box[1]
    width = box[2]
    height = box[3]
    drawPred(classIds[i], confidences[i], left, top, left + width, top + height)
image
image = "./sky.jpg"

cap = cv2.VideoCapture(image)
outputFile = image[:-4]+'_YOLOv3_output.jpg'

hasFrame, frame = cap.read()

# Create a 4D blob from a frame.
blob = cv2.dnn.blobFromImage(frame, 1/255, (416,416), [0,0,0], 1, crop=False)

# Sets the input to the network
net.setInput(blob)

# Runs the forward pass to get output of the output layers
outp = net.forward(getOutputsNames(net))

# Remove the bounding boxes with low confidence
postprocess(frame, outp)

cv2.imwrite(outputFile, frame.astype(np.uint8))

cv2_imshow(frame)
Outputs :
Videovideo = "./sample.mov"
cap = cv2.VideoCapture(video)
outputFile = video[:-4]+'_YOLOv3_output.avi'

# Get the video writer initialized to save the output video
vid_writer = cv2.VideoWriter(outputFile, cv2.VideoWriter_fourcc('M','J','P','G'), 30, (round(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),round(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))

for i in range(3):

hasFrame, frame = cap.read()

# Create a 4D blob from a frame.
blob = cv2.dnn.blobFromImage(frame, 1/255, (416,416), [0,0,0], 1, crop=False)

# Sets the input to the network
net.setInput(blob)

# Runs the forward pass to get output of the output layers
outp = net.forward(getOutputsNames(net))

# Remove the bounding boxes with low confidence
postprocess(frame, outp)

# Write the frame with the detection boxes
vid_writer.write(frame.astype(np.uint8))

cv2_imshow(frame)
Outputs :

Automated Machine Learning

Caret

importpip install -q pycaret scipy
from pycaret.datasets import get_data
Classificationsetupcompare_modelsevaluate_modelpredict_modelfrom pycaret.classification import setup, compare_models,evaluate_model,predict_model,save_model

data = get_data('juice')
s = setup(data, target = 'Purchase', session_id = 123,
            normalize = True,
            polynomial_features=True, polynomial_degree=2,
            remove_multicollinearity = True, multicollinearity_threshold = 0.9,
            feature_selection=True, n_features_to_select=13 # 0.3
            )
best = compare_models()
evaluate_model(best)
predict_model(best) # predictions on hold out
# predictions = predict_model(best,data=new_data)
save_model(best,"best_model")
time_series blend_modelsplot_modelimport pycaret.time_series as pts
from sktime.transformations.series.summarize import WindowSummarizer

airline = get_data('airline')
kwargs = {"lag_feature": {"lag": [36, 24, 13, 12, 11, 9, 6, 3, 2, 1]}}
fe_target_rr = [WindowSummarizer(n_jobs=1, truncate="bfill", **kwargs)]

exp = pts.setup(data=airline, fh=12, fold=3, session_id=42)
model1 = exp.create_model("lr_cds_dt")
exp = pts.setup(
    data=airline, fh=12, fold=3, fe_target_rr=fe_target_rr, session_id=42
)
model2 = exp.create_model("lr_cds_dt")
exp_name = pts.setup(data = airline,  fh = 12,fe_target_rr=fe_target_rr)
top3 = pts.compare_models(n_select = 3,)
blender = pts.blend_models(top3)
pts.plot_model([model1, model2,blender], data_kwargs={"labels": ["Baseline", "With FE","blender"]})

BQML

Create Model 🔗CREATE OR REPLACE MODEL project_id.model_uri
OPTIONS (model_type='linear_reg',
         input_label_cols=['fare_amount']) AS
SELECT
    *
FROM
    project_id.data_set.table
ML.PREDICTML.EXPLAIN_PREDICTSELECT * FROM 
# or ML.EXPLAIN_PREDICT
ML.PREDICT(MODEL `{model_uri}`, (
    {inputs_query}
))
ORDER BY {ORDER_BY}
GLOBAL_EXPLAINFEATURE_IMPORTANCE WITH global_explain AS ( SELECT * FROM
    ML.GLOBAL_EXPLAIN(MODEL `aes-analytics-0001.us_solar_generation_fcst_ads.{plant_id}_BQML_NEBULA`) ),
feature_importance AS ( SELECT * FROM
    ML.FEATURE_IMPORTANCE(MODEL `aes-analytics-0001.us_solar_generation_fcst_ads.{plant_id}_BQML_NEBULA`) )

SELECT * FROM feature_importance JOIN global_explain USING (feature)

Tools

MLflow

set_experimentlog_figureend_runimport mlflow
import matplotlib.pyplot as plt

mlflow.set_experiment("test13")
fig,ax = plt.subplots()
plt.plot([1,2,3])
mlflow.log_figure(fig,"12.png")
mlflow.end_run()
start_runlog_paramlog_metricmlflow.start_run(run_name="my_fancy_run")
mlflow.log_param("my", "param")
mlflow.log_metric("score", 100)
mlflow.end_run()
create_experimentexp_id = mlflow.create_experiment(
    "test-1111",
    tags={"version": "v1", "priority": "P1"},
)

with mlflow.start_run(experiment_id=exp_id):
    mlflow.log_param("hello", "mlflow")

Explainable AI

LIME : Local Interpretable Model-agnostic Explanations

  1. Instance Selection Choose target instance $x_0 \in \mathbb{R}^d$ to explain (e.g., a specific loan application).
  2. Perturbation Create perturbed samples $\{x'_i\}$ near $x_0$
  3. Weight $$\text{RBF kernel: } \pi_{x_0}(x'_i) = \exp\left(-\frac{\|x'_i - x_0\|^2}{\sigma^2}\right)$$
  4. Surrogate Training $$\arg \min_g \sum_i \pi_{x_0}(x'_i) \left(f(x'_i) - g(x'_i)\right)^2 + \Omega(g)$$
  5. Explanation For linear $$g(x) = w^T x + b$$ feature importance = $|w_i|$.
LIMEimport lime
import lime.lime_tabular
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris

# Load data and train a model
iris = load_iris()
X_train, X_test = iris.data[:100], iris.data[100:]
y_train, y_test = iris.target[:100], iris.target[100:]
model = RandomForestClassifier().fit(X_train, y_train)

# Create the LIME explainer
explainer = lime.lime_tabular.LimeTabularExplainer(
    X_train,
    feature_names=iris.feature_names,
    class_names=iris.target_names,
    discretize_continuous=True
)

# Explain a prediction
idx = 0
exp = explainer.explain_instance(X_test[idx], model.predict_proba, num_features=4)
exp.show_in_notebook()

SHAP : Shapley Additive Explanations

For a model $f$ and feature $i$, the Shapley value is:

$$ \phi_i = \sum_{S \subseteq F \setminus \{i\}} \frac{|S|!(|F| - |S| - 1)!}{|F|!} \left[ f(S \cup \{i\}) - f(S) \right] $$

where:

  • $F$ = set of all features
  • $S$ = subset of features (excluding $i$)
  • $f(S)$ = model prediction using only features in $S$
$$\sum \phi_i = f(x) - \mathbb{E}[f]$$
SHAPimport shap

# For XGBoost models
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X)

# For any model (KernelSHAP)
explainer = shap.KernelExplainer(model.predict, background_data)
shap_values = explainer.shap_values(X)