# AE outlier detection on CIFAR10¶

## Method¶

The Auto-Encoder (AE) outlier detector is first trained on a batch of unlabeled, but normal (inlier) data. Unsupervised training is desireable since labeled data is often scarce. The AE detector tries to reconstruct the input it receives. If the input data cannot be reconstructed well, the reconstruction error is high and the data can be flagged as an outlier. The reconstruction error is measured as the mean squared error (MSE) between the input and the reconstructed instance.

### Dataset¶

CIFAR10 consists of 60,000 32 by 32 RGB images equally distributed over 10 classes.

[1]:

import logging
import matplotlib.pyplot as plt
import numpy as np
import os
import tensorflow as tf
tf.keras.backend.clear_session()
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, \
Dense, Layer, Reshape, InputLayer, Flatten
from tqdm import tqdm

from alibi_detect.od import OutlierAE
from alibi_detect.utils.fetching import fetch_detector
from alibi_detect.utils.visualize import plot_instance_score, plot_feature_outlier_image

logger = tf.get_logger()
logger.setLevel(logging.ERROR)

ERROR:fbprophet:Importing plotly failed. Interactive plots will not work.


[2]:

train, test = tf.keras.datasets.cifar10.load_data()
X_train, y_train = train
X_test, y_test = test

X_train = X_train.astype('float32') / 255
X_test = X_test.astype('float32') / 255
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)

(50000, 32, 32, 3) (50000, 1) (10000, 32, 32, 3) (10000, 1)


### Load or define outlier detector¶

The pretrained outlier and adversarial detectors used in the example notebooks can be found here. You can use the built-in fetch_detector function which saves the pre-trained models in a local directory filepath and loads the detector. Alternatively, you can train a detector from scratch:

[3]:

load_outlier_detector = True

[4]:

filepath = 'my_path'  # change to (absolute) directory where model is downloaded
detector_type = 'outlier'
dataset = 'cifar10'
detector_name = 'OutlierAE'
od = fetch_detector(filepath, detector_type, dataset, detector_name)
filepath = os.path.join(filepath, detector_name)
else:  # define model, initialize, train and save outlier detector
encoding_dim = 1024

encoder_net = tf.keras.Sequential(
[
InputLayer(input_shape=(32, 32, 3)),
Flatten(),
Dense(encoding_dim,)
])

decoder_net = tf.keras.Sequential(
[
InputLayer(input_shape=(encoding_dim,)),
Dense(4*4*128),
Reshape(target_shape=(4, 4, 128)),
])

# initialize outlier detector
od = OutlierAE(threshold=.015,  # threshold for outlier score
encoder_net=encoder_net,  # can also pass AE model instead
decoder_net=decoder_net,  # of separate encoder and decoder
)
# train
od.fit(X_train,
epochs=50,
verbose=True)

# save the trained outlier detector
save_detector(od, filepath)


### Check quality AE model¶

[5]:

idx = 8
X = X_train[idx].reshape(1, 32, 32, 3)
X_recon = od.ae(X)

[6]:

plt.imshow(X.reshape(32, 32, 3))
plt.axis('off')
plt.show()

[7]:

plt.imshow(X_recon.numpy().reshape(32, 32, 3))
plt.axis('off')
plt.show()


### Check outliers on original CIFAR images¶

[8]:

X = X_train[:500]
print(X.shape)

(500, 32, 32, 3)

[9]:

od_preds = od.predict(X,
outlier_type='instance',    # use 'feature' or 'instance' level
return_feature_score=True,  # scores used to determine outliers
return_instance_score=True)
print(list(od_preds['data'].keys()))

['instance_score', 'feature_score', 'is_outlier']


## Plot instance level outlier scores¶

[10]:

target = np.zeros(X.shape[0],).astype(int)  # all normal CIFAR10 training instances
labels = ['normal', 'outlier']
plot_instance_score(od_preds, target, labels, od.threshold)


## Visualize predictions¶

[11]:

X_recon = od.ae(X).numpy()
plot_feature_outlier_image(od_preds,
X,
X_recon=X_recon,
instance_ids=[8, 60, 100, 330],  # pass a list with indices of instances to display
max_instances=5,  # max nb of instances to display
outliers_only=False)  # only show outlier predictions


### Predict outliers on perturbed CIFAR images¶

We perturb CIFAR images by adding random noise to patches (masks) of the image. For each mask size in n_mask_sizes, sample n_masks and apply those to each of the n_imgs images. Then we predict outliers on the masked instances:

[12]:

# nb of predictions per image: n_masks * n_mask_sizes
n_imgs = 50


[13]:

mask_sizes = [(2*n,2*n) for n in range(1,n_mask_sizes+1)]
img_ids = np.arange(n_imgs)
X_orig = X[img_ids].reshape(img_ids.shape[0], 32, 32, 3)
print(X_orig.shape)

[(2, 2), (4, 4), (6, 6), (8, 8), (10, 10), (12, 12), (14, 14), (16, 16), (18, 18), (20, 20)]
(50, 32, 32, 3)


Calculate instance level outlier scores:

[14]:

all_img_scores = []
for i in tqdm(range(X_orig.shape[0])):
channels=[0,1,2],
noise_distr=(0,1),
clip_rng=(0,1))
# predict outliers
# store average score over n_masks for a given mask size
img_scores[j] = np.mean(score)
all_img_scores.append(img_scores)

100%|██████████| 50/50 [00:17<00:00,  2.90it/s]


## Visualize outlier scores vs. mask sizes¶

[15]:

x_plt = [mask[0] for mask in mask_sizes]

[16]:

for ais in all_img_scores:
plt.plot(x_plt, ais)
plt.xticks(x_plt)
plt.title('Outlier Score All Images for Increasing Mask Size')
plt.ylabel('Outlier Score')
plt.show()

[17]:

ais_np = np.zeros((len(all_img_scores), all_img_scores[0].shape[0]))
for i, ais in enumerate(all_img_scores):
ais_np[i, :] = ais
ais_mean = np.mean(ais_np, axis=0)
plt.title('Mean Outlier Score All Images for Increasing Mask Size')
plt.ylabel('Outlier score')
plt.plot(x_plt, ais_mean)
plt.xticks(x_plt)
plt.show()


## Investigate instance level outlier¶

[18]:

i = 8  # index of instance to look at

[19]:

plt.plot(x_plt, all_img_scores[i])
plt.xticks(x_plt)
plt.title('Outlier Scores Image {} for Increasing Mask Size'.format(i))
plt.ylabel('Outlier score')
plt.show()


Reconstruction of masked images and outlier scores per channel:

[20]:

all_X_mask = []
X_i = X_orig[i].reshape(1, 32, 32, 3)
n_masks=1,  # just 1 for visualization purposes
channels=[0,1,2],
noise_distr=(0,1),
clip_rng=(0,1))


Visualize:

[21]:

plot_feature_outlier_image(od_preds,
X_recon=all_X_recon,
n_channels=3)


### Predict outliers on a subset of features¶

The sensitivity of the outlier detector can not only be controlled via the threshold, but also by selecting the percentage of the features used for the instance level outlier score computation. For instance, we might want to flag outliers if 40% of the features (pixels for images) have an average outlier score above the threshold. This is possible via the outlier_perc argument in the predict function. It specifies the percentage of the features that are used for outlier detection, sorted in descending outlier score order.

[22]:

perc_list = [20, 40, 60, 80, 100]

all_perc_scores = []
for perc in perc_list:
iscore = od_preds_perc['data']['instance_score']
all_perc_scores.append(iscore)


Visualize outlier scores vs. mask sizes and percentage of features used:

[23]:

x_plt = [0] + x_plt
for aps in all_perc_scores:
plt.plot(x_plt, aps)
plt.xticks(x_plt)
plt.legend(perc_list)
plt.title('Outlier Score for Increasing Mask Size and Different Feature Subsets')
plt.ylabel('Outlier Score')
plt.show()


### Infer outlier threshold value¶

Finding good threshold values can be tricky since they are typically not easy to interpret. The infer_threshold method helps finding a sensible value. We need to pass a batch of instances X and specify what percentage of those we consider to be normal via threshold_perc.

[24]:

print('Current threshold: {}'.format(od.threshold))
od.infer_threshold(X, threshold_perc=99)  # assume 1% of the training data are outliers
print('New threshold: {}'.format(od.threshold))

Current threshold: 0.015
New threshold: 0.0017021061840932791