This page was generated from examples/od_ae_cifar10.ipynb.
AE outlier detection on CIFAR10
Method
The Auto-Encoder (AE) outlier detector is first trained on a batch of unlabeled, but normal (inlier) data. Unsupervised training is desireable since labeled data is often scarce. The AE detector tries to reconstruct the input it receives. If the input data cannot be reconstructed well, the reconstruction error is high and the data can be flagged as an outlier. The reconstruction error is measured as the mean squared error (MSE) between the input and the reconstructed instance.
Dataset
CIFAR10 consists of 60,000 32 by 32 RGB images equally distributed over 10 classes.
[1]:
import logging
import matplotlib.pyplot as plt
import numpy as np
import os
import tensorflow as tf
tf.keras.backend.clear_session()
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, \
Dense, Layer, Reshape, InputLayer, Flatten
from tqdm import tqdm
from alibi_detect.od import OutlierAE
from alibi_detect.utils.fetching import fetch_detector
from alibi_detect.utils.perturbation import apply_mask
from alibi_detect.saving import save_detector, load_detector
from alibi_detect.utils.visualize import plot_instance_score, plot_feature_outlier_image
logger = tf.get_logger()
logger.setLevel(logging.ERROR)
ERROR:fbprophet:Importing plotly failed. Interactive plots will not work.
Load CIFAR10 data
[2]:
train, test = tf.keras.datasets.cifar10.load_data()
X_train, y_train = train
X_test, y_test = test
X_train = X_train.astype('float32') / 255
X_test = X_test.astype('float32') / 255
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
(50000, 32, 32, 3) (50000, 1) (10000, 32, 32, 3) (10000, 1)
Load or define outlier detector
The pretrained outlier and adversarial detectors used in the example notebooks can be found here. You can use the built-in fetch_detector
function which saves the pre-trained models in a local directory filepath
and loads the detector. Alternatively, you can train a detector from scratch:
[3]:
load_outlier_detector = True
[4]:
filepath = 'my_path' # change to (absolute) directory where model is downloaded
detector_type = 'outlier'
dataset = 'cifar10'
detector_name = 'OutlierAE'
filepath = os.path.join(filepath, detector_name)
if load_outlier_detector: # load pretrained outlier detector
od = fetch_detector(filepath, detector_type, dataset, detector_name)
else: # define model, initialize, train and save outlier detector
encoding_dim = 1024
encoder_net = tf.keras.Sequential(
[
InputLayer(input_shape=(32, 32, 3)),
Conv2D(64, 4, strides=2, padding='same', activation=tf.nn.relu),
Conv2D(128, 4, strides=2, padding='same', activation=tf.nn.relu),
Conv2D(512, 4, strides=2, padding='same', activation=tf.nn.relu),
Flatten(),
Dense(encoding_dim,)
])
decoder_net = tf.keras.Sequential(
[
InputLayer(input_shape=(encoding_dim,)),
Dense(4*4*128),
Reshape(target_shape=(4, 4, 128)),
Conv2DTranspose(256, 4, strides=2, padding='same', activation=tf.nn.relu),
Conv2DTranspose(64, 4, strides=2, padding='same', activation=tf.nn.relu),
Conv2DTranspose(3, 4, strides=2, padding='same', activation='sigmoid')
])
# initialize outlier detector
od = OutlierAE(threshold=.015, # threshold for outlier score
encoder_net=encoder_net, # can also pass AE model instead
decoder_net=decoder_net, # of separate encoder and decoder
)
# train
od.fit(X_train,
epochs=50,
verbose=True)
# save the trained outlier detector
save_detector(od, filepath)
Check quality AE model
[5]:
idx = 8
X = X_train[idx].reshape(1, 32, 32, 3)
X_recon = od.ae(X)
[6]:
plt.imshow(X.reshape(32, 32, 3))
plt.axis('off')
plt.show()

[7]:
plt.imshow(X_recon.numpy().reshape(32, 32, 3))
plt.axis('off')
plt.show()

Check outliers on original CIFAR images
[8]:
X = X_train[:500]
print(X.shape)
(500, 32, 32, 3)
[9]:
od_preds = od.predict(X,
outlier_type='instance', # use 'feature' or 'instance' level
return_feature_score=True, # scores used to determine outliers
return_instance_score=True)
print(list(od_preds['data'].keys()))
['instance_score', 'feature_score', 'is_outlier']
Plot instance level outlier scores
[10]:
target = np.zeros(X.shape[0],).astype(int) # all normal CIFAR10 training instances
labels = ['normal', 'outlier']
plot_instance_score(od_preds, target, labels, od.threshold)

Visualize predictions
[11]:
X_recon = od.ae(X).numpy()
plot_feature_outlier_image(od_preds,
X,
X_recon=X_recon,
instance_ids=[8, 60, 100, 330], # pass a list with indices of instances to display
max_instances=5, # max nb of instances to display
outliers_only=False) # only show outlier predictions

Predict outliers on perturbed CIFAR images
We perturb CIFAR images by adding random noise to patches (masks) of the image. For each mask size in n_mask_sizes
, sample n_masks
and apply those to each of the n_imgs
images. Then we predict outliers on the masked instances:
[12]:
# nb of predictions per image: n_masks * n_mask_sizes
n_mask_sizes = 10
n_masks = 20
n_imgs = 50
Define masks and get images:
[13]:
mask_sizes = [(2*n,2*n) for n in range(1,n_mask_sizes+1)]
print(mask_sizes)
img_ids = np.arange(n_imgs)
X_orig = X[img_ids].reshape(img_ids.shape[0], 32, 32, 3)
print(X_orig.shape)
[(2, 2), (4, 4), (6, 6), (8, 8), (10, 10), (12, 12), (14, 14), (16, 16), (18, 18), (20, 20)]
(50, 32, 32, 3)
Calculate instance level outlier scores:
[14]:
all_img_scores = []
for i in tqdm(range(X_orig.shape[0])):
img_scores = np.zeros((len(mask_sizes),))
for j, mask_size in enumerate(mask_sizes):
# create masked instances
X_mask, mask = apply_mask(X_orig[i].reshape(1, 32, 32, 3),
mask_size=mask_size,
n_masks=n_masks,
channels=[0,1,2],
mask_type='normal',
noise_distr=(0,1),
clip_rng=(0,1))
# predict outliers
od_preds_mask = od.predict(X_mask)
score = od_preds_mask['data']['instance_score']
# store average score over `n_masks` for a given mask size
img_scores[j] = np.mean(score)
all_img_scores.append(img_scores)
100%|██████████| 50/50 [00:17<00:00, 2.90it/s]
Visualize outlier scores vs. mask sizes
[15]:
x_plt = [mask[0] for mask in mask_sizes]
[16]:
for ais in all_img_scores:
plt.plot(x_plt, ais)
plt.xticks(x_plt)
plt.title('Outlier Score All Images for Increasing Mask Size')
plt.xlabel('Mask size')
plt.ylabel('Outlier Score')
plt.show()

[17]:
ais_np = np.zeros((len(all_img_scores), all_img_scores[0].shape[0]))
for i, ais in enumerate(all_img_scores):
ais_np[i, :] = ais
ais_mean = np.mean(ais_np, axis=0)
plt.title('Mean Outlier Score All Images for Increasing Mask Size')
plt.xlabel('Mask size')
plt.ylabel('Outlier score')
plt.plot(x_plt, ais_mean)
plt.xticks(x_plt)
plt.show()

Investigate instance level outlier
[18]:
i = 8 # index of instance to look at
[19]:
plt.plot(x_plt, all_img_scores[i])
plt.xticks(x_plt)
plt.title('Outlier Scores Image {} for Increasing Mask Size'.format(i))
plt.xlabel('Mask size')
plt.ylabel('Outlier score')
plt.show()

Reconstruction of masked images and outlier scores per channel:
[20]:
all_X_mask = []
X_i = X_orig[i].reshape(1, 32, 32, 3)
all_X_mask.append(X_i)
# apply masks
for j, mask_size in enumerate(mask_sizes):
# create masked instances
X_mask, mask = apply_mask(X_i,
mask_size=mask_size,
n_masks=1, # just 1 for visualization purposes
channels=[0,1,2],
mask_type='normal',
noise_distr=(0,1),
clip_rng=(0,1))
all_X_mask.append(X_mask)
all_X_mask = np.concatenate(all_X_mask, axis=0)
all_X_recon = od.ae(all_X_mask).numpy()
od_preds = od.predict(all_X_mask)
Visualize:
[21]:
plot_feature_outlier_image(od_preds,
all_X_mask,
X_recon=all_X_recon,
max_instances=all_X_mask.shape[0],
n_channels=3)

Predict outliers on a subset of features
The sensitivity of the outlier detector can not only be controlled via the threshold
, but also by selecting the percentage of the features used for the instance level outlier score computation. For instance, we might want to flag outliers if 40% of the features (pixels for images) have an average outlier score above the threshold. This is possible via the outlier_perc
argument in the predict
function. It specifies the percentage of the features that are used for outlier detection,
sorted in descending outlier score order.
[22]:
perc_list = [20, 40, 60, 80, 100]
all_perc_scores = []
for perc in perc_list:
od_preds_perc = od.predict(all_X_mask, outlier_perc=perc)
iscore = od_preds_perc['data']['instance_score']
all_perc_scores.append(iscore)
Visualize outlier scores vs. mask sizes and percentage of features used:
[23]:
x_plt = [0] + x_plt
for aps in all_perc_scores:
plt.plot(x_plt, aps)
plt.xticks(x_plt)
plt.legend(perc_list)
plt.title('Outlier Score for Increasing Mask Size and Different Feature Subsets')
plt.xlabel('Mask Size')
plt.ylabel('Outlier Score')
plt.show()

Infer outlier threshold value
Finding good threshold values can be tricky since they are typically not easy to interpret. The infer_threshold
method helps finding a sensible value. We need to pass a batch of instances X
and specify what percentage of those we consider to be normal via threshold_perc
.
[24]:
print('Current threshold: {}'.format(od.threshold))
od.infer_threshold(X, threshold_perc=99) # assume 1% of the training data are outliers
print('New threshold: {}'.format(od.threshold))
Current threshold: 0.015
New threshold: 0.0017021061840932791