Source code for alibi_detect.od.ae

import logging
import numpy as np
import tensorflow as tf
from typing import Dict, Tuple
from alibi_detect.models.tensorflow.autoencoder import AE
from alibi_detect.models.tensorflow.trainer import trainer
from alibi_detect.base import BaseDetector, FitMixin, ThresholdMixin, outlier_prediction_dict
from alibi_detect.utils.tensorflow.prediction import predict_batch
from alibi_detect.utils._types import OptimizerTF

logger = logging.getLogger(__name__)


[docs] class OutlierAE(BaseDetector, FitMixin, ThresholdMixin):
[docs] def __init__(self, threshold: float = None, ae: tf.keras.Model = None, encoder_net: tf.keras.Model = None, decoder_net: tf.keras.Model = None, data_type: str = None ) -> None: """ AE-based outlier detector. Parameters ---------- threshold Threshold used for outlier score to determine outliers. ae A trained tf.keras model if available. encoder_net Layers for the encoder wrapped in a tf.keras.Sequential class if no 'ae' is specified. decoder_net Layers for the decoder wrapped in a tf.keras.Sequential class if no 'ae' is specified. data_type Optionally specify the data type (tabular, image or time-series). Added to metadata. """ super().__init__() if threshold is None: logger.warning('No threshold level set. Need to infer threshold using `infer_threshold`.') self.threshold = threshold # check if model can be loaded, otherwise initialize AE model if isinstance(ae, tf.keras.Model): self.ae = ae elif isinstance(encoder_net, tf.keras.Sequential) and isinstance(decoder_net, tf.keras.Sequential): self.ae = AE(encoder_net, decoder_net) else: raise TypeError('No valid format detected for `ae` (tf.keras.Model) ' 'or `encoder_net`, `decoder_net` (tf.keras.Sequential).') # set metadata self.meta['detector_type'] = 'outlier' self.meta['data_type'] = data_type self.meta['online'] = False
[docs] def fit(self, X: np.ndarray, loss_fn: tf.keras.losses = tf.keras.losses.MeanSquaredError(), optimizer: OptimizerTF = tf.keras.optimizers.Adam, epochs: int = 20, batch_size: int = 64, verbose: bool = True, log_metric: Tuple[str, "tf.keras.metrics"] = None, callbacks: tf.keras.callbacks = None, ) -> None: """ Train AE model. Parameters ---------- X Training batch. loss_fn Loss function used for training. optimizer Optimizer used for training. epochs Number of training epochs. batch_size Batch size used for training. verbose Whether to print training progress. log_metric Additional metrics whose progress will be displayed if verbose equals True. callbacks Callbacks used during training. """ # train arguments args = [self.ae, loss_fn, X] optimizer = optimizer() if isinstance(optimizer, type) else optimizer kwargs = {'optimizer': optimizer, 'epochs': epochs, 'batch_size': batch_size, 'verbose': verbose, 'log_metric': log_metric, 'callbacks': callbacks} # train trainer(*args, **kwargs)
[docs] def infer_threshold(self, X: np.ndarray, outlier_type: str = 'instance', outlier_perc: float = 100., threshold_perc: float = 95., batch_size: int = int(1e10) ) -> None: """ Update threshold by a value inferred from the percentage of instances considered to be outliers in a sample of the dataset. Parameters ---------- X Batch of instances. outlier_type Predict outliers at the 'feature' or 'instance' level. outlier_perc Percentage of sorted feature level outlier scores used to predict instance level outlier. threshold_perc Percentage of X considered to be normal based on the outlier score. batch_size Batch size used when making predictions with the autoencoder. """ # compute outlier scores fscore, iscore = self.score(X, outlier_perc=outlier_perc, batch_size=batch_size) if outlier_type == 'feature': outlier_score = fscore elif outlier_type == 'instance': outlier_score = iscore else: raise ValueError('`outlier_score` needs to be either `feature` or `instance`.') # update threshold self.threshold = np.percentile(outlier_score, threshold_perc)
[docs] def feature_score(self, X_orig: np.ndarray, X_recon: np.ndarray) -> np.ndarray: """ Compute feature level outlier scores. Parameters ---------- X_orig Batch of original instances. X_recon Batch of reconstructed instances. Returns ------- Feature level outlier scores. """ fscore = np.power(X_orig - X_recon, 2) return fscore
[docs] def instance_score(self, fscore: np.ndarray, outlier_perc: float = 100.) -> np.ndarray: """ Compute instance level outlier scores. Parameters ---------- fscore Feature level outlier scores. outlier_perc Percentage of sorted feature level outlier scores used to predict instance level outlier. Returns ------- Instance level outlier scores. """ fscore_flat = fscore.reshape(fscore.shape[0], -1).copy() n_score_features = int(np.ceil(.01 * outlier_perc * fscore_flat.shape[1])) sorted_fscore = np.sort(fscore_flat, axis=1) sorted_fscore_perc = sorted_fscore[:, -n_score_features:] iscore = np.mean(sorted_fscore_perc, axis=1) return iscore
[docs] def score(self, X: np.ndarray, outlier_perc: float = 100., batch_size: int = int(1e10)) \ -> Tuple[np.ndarray, np.ndarray]: """ Compute feature and instance level outlier scores. Parameters ---------- X Batch of instances. outlier_perc Percentage of sorted feature level outlier scores used to predict instance level outlier. batch_size Batch size used when making predictions with the autoencoder. Returns ------- Feature and instance level outlier scores. """ # reconstruct instances X_recon = predict_batch(X, self.ae, batch_size=batch_size) # compute feature and instance level scores fscore = self.feature_score(X, X_recon) # type: ignore[arg-type] iscore = self.instance_score(fscore, outlier_perc=outlier_perc) return fscore, iscore
[docs] def predict(self, X: np.ndarray, outlier_type: str = 'instance', outlier_perc: float = 100., batch_size: int = int(1e10), return_feature_score: bool = True, return_instance_score: bool = True) \ -> Dict[Dict[str, str], Dict[np.ndarray, np.ndarray]]: """ Predict whether instances are outliers or not. Parameters ---------- X Batch of instances. outlier_type Predict outliers at the 'feature' or 'instance' level. outlier_perc Percentage of sorted feature level outlier scores used to predict instance level outlier. batch_size Batch size used when making predictions with the autoencoder. return_feature_score Whether to return feature level outlier scores. return_instance_score Whether to return instance level outlier scores. Returns ------- Dictionary containing ``'meta'`` and ``'data'`` dictionaries. - ``'meta'`` has the model's metadata. - ``'data'`` contains the outlier predictions and both feature and instance level outlier scores. """ # compute outlier scores fscore, iscore = self.score(X, outlier_perc=outlier_perc, batch_size=batch_size) if outlier_type == 'feature': outlier_score = fscore elif outlier_type == 'instance': outlier_score = iscore else: raise ValueError('`outlier_score` needs to be either `feature` or `instance`.') # values above threshold are outliers outlier_pred = (outlier_score > self.threshold).astype(int) # populate output dict od = outlier_prediction_dict() od['meta'] = self.meta od['data']['is_outlier'] = outlier_pred if return_feature_score: od['data']['feature_score'] = fscore if return_instance_score: od['data']['instance_score'] = iscore return od