Source code for alibi_detect.od.vae

import logging
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
from typing import Dict, Tuple
from alibi_detect.models.tensorflow.autoencoder import VAE
from alibi_detect.models.tensorflow.trainer import trainer
from alibi_detect.models.tensorflow.losses import elbo
from alibi_detect.base import BaseDetector, FitMixin, ThresholdMixin, outlier_prediction_dict
from alibi_detect.utils.tensorflow.prediction import predict_batch
from alibi_detect.utils._types import OptimizerTF

logger = logging.getLogger(__name__)


[docs] class OutlierVAE(BaseDetector, FitMixin, ThresholdMixin):
[docs] def __init__(self, threshold: float = None, score_type: str = 'mse', # TODO: reconstruction proba; make sure to infer correct distribution vae: tf.keras.Model = None, encoder_net: tf.keras.Model = None, decoder_net: tf.keras.Model = None, latent_dim: int = None, samples: int = 10, beta: float = 1., data_type: str = None ) -> None: """ VAE-based outlier detector. Parameters ---------- threshold Threshold used for outlier score to determine outliers. score_type Metric used for outlier scores. Either 'mse' (mean squared error) or 'proba' (reconstruction probabilities) supported. vae A trained tf.keras model if available. encoder_net Layers for the encoder wrapped in a tf.keras.Sequential class if no 'vae' is specified. decoder_net Layers for the decoder wrapped in a tf.keras.Sequential class if no 'vae' is specified. latent_dim Dimensionality of the latent space. samples Number of samples sampled to evaluate each instance. beta Beta parameter for KL-divergence loss term. data_type Optionally specify the data type (tabular, image or time-series). Added to metadata. """ super().__init__() if threshold is None: logger.warning('No threshold level set. Need to infer threshold using `infer_threshold`.') self.threshold = threshold self.score_type = score_type self.samples = samples # check if model can be loaded, otherwise initialize VAE model if isinstance(vae, tf.keras.Model): self.vae = vae elif isinstance(encoder_net, tf.keras.Sequential) and isinstance(decoder_net, tf.keras.Sequential): self.vae = VAE(encoder_net, decoder_net, latent_dim, beta=beta) # define VAE model else: raise TypeError('No valid format detected for `vae` (tf.keras.Model) ' 'or `encoder_net` and `decoder_net` (tf.keras.Sequential).') # set metadata self.meta['detector_type'] = 'outlier' self.meta['data_type'] = data_type self.meta['online'] = False
[docs] def fit(self, X: np.ndarray, loss_fn: tf.keras.losses = elbo, optimizer: OptimizerTF = tf.keras.optimizers.Adam, cov_elbo: dict = dict(sim=.05), epochs: int = 20, batch_size: int = 64, verbose: bool = True, log_metric: Tuple[str, "tf.keras.metrics"] = None, callbacks: tf.keras.callbacks = None, ) -> None: """ Train VAE model. Parameters ---------- X Training batch. loss_fn Loss function used for training. optimizer Optimizer used for training. cov_elbo Dictionary with covariance matrix options in case the elbo loss function is used. Either use the full covariance matrix inferred from X (dict(cov_full=None)), only the variance (dict(cov_diag=None)) or a float representing the same standard deviation for each feature (e.g. dict(sim=.05)). epochs Number of training epochs. batch_size Batch size used for training. verbose Whether to print training progress. log_metric Additional metrics whose progress will be displayed if verbose equals True. callbacks Callbacks used during training. """ # train arguments args = [self.vae, loss_fn, X] optimizer = optimizer() if isinstance(optimizer, type) else optimizer kwargs = {'optimizer': optimizer, 'epochs': epochs, 'batch_size': batch_size, 'verbose': verbose, 'log_metric': log_metric, 'callbacks': callbacks} # initialize covariance matrix if elbo loss fn is used use_elbo = loss_fn.__name__ == 'elbo' cov_elbo_type, cov = [*cov_elbo][0], [*cov_elbo.values()][0] if use_elbo and cov_elbo_type in ['cov_full', 'cov_diag']: cov = tfp.stats.covariance(X.reshape(X.shape[0], -1)) if cov_elbo_type == 'cov_diag': # infer standard deviation from covariance matrix cov = tf.math.sqrt(tf.linalg.diag_part(cov)) if use_elbo: kwargs['loss_fn_kwargs'] = {cov_elbo_type: tf.dtypes.cast(cov, tf.float32)} # train trainer(*args, **kwargs)
[docs] def infer_threshold(self, X: np.ndarray, outlier_type: str = 'instance', outlier_perc: float = 100., threshold_perc: float = 95., batch_size: int = int(1e10) ) -> None: """ Update threshold by a value inferred from the percentage of instances considered to be outliers in a sample of the dataset. Parameters ---------- X Batch of instances. outlier_type Predict outliers at the 'feature' or 'instance' level. outlier_perc Percentage of sorted feature level outlier scores used to predict instance level outlier. threshold_perc Percentage of X considered to be normal based on the outlier score. batch_size Batch size used when making predictions with the VAE. """ # compute outlier scores fscore, iscore = self.score(X, outlier_perc=outlier_perc, batch_size=batch_size) if outlier_type == 'feature': outlier_score = fscore elif outlier_type == 'instance': outlier_score = iscore else: raise ValueError('`outlier_score` needs to be either `feature` or `instance`.') # update threshold self.threshold = np.percentile(outlier_score, threshold_perc)
[docs] def feature_score(self, X_orig: np.ndarray, X_recon: np.ndarray) -> np.ndarray: """ Compute feature level outlier scores. Parameters ---------- X_orig Batch of original instances. X_recon Batch of reconstructed instances. Returns ------- Feature level outlier scores. """ if self.score_type == 'mse': fscore = np.power(X_orig - X_recon, 2) fscore = fscore.reshape((-1, self.samples) + X_orig.shape[1:]) fscore = np.mean(fscore, axis=1) elif self.score_type == 'proba': pass return fscore
[docs] def instance_score(self, fscore: np.ndarray, outlier_perc: float = 100.) -> np.ndarray: """ Compute instance level outlier scores. Parameters ---------- fscore Feature level outlier scores. outlier_perc Percentage of sorted feature level outlier scores used to predict instance level outlier. Returns ------- Instance level outlier scores. """ fscore_flat = fscore.reshape(fscore.shape[0], -1).copy() n_score_features = int(np.ceil(.01 * outlier_perc * fscore_flat.shape[1])) sorted_fscore = np.sort(fscore_flat, axis=1) sorted_fscore_perc = sorted_fscore[:, -n_score_features:] iscore = np.mean(sorted_fscore_perc, axis=1) return iscore
[docs] def score(self, X: np.ndarray, outlier_perc: float = 100., batch_size: int = int(1e10)) \ -> Tuple[np.ndarray, np.ndarray]: """ Compute feature and instance level outlier scores. Parameters ---------- X Batch of instances. outlier_perc Percentage of sorted feature level outlier scores used to predict instance level outlier. batch_size Batch size used when making predictions with the VAE. Returns ------- Feature and instance level outlier scores. """ # sample reconstructed instances X_samples = np.repeat(X, self.samples, axis=0) X_recon = predict_batch(X_samples, self.vae, batch_size=batch_size) # compute feature and instance level scores fscore = self.feature_score(X_samples, X_recon) # type: ignore[arg-type] iscore = self.instance_score(fscore, outlier_perc=outlier_perc) return fscore, iscore
[docs] def predict(self, X: np.ndarray, outlier_type: str = 'instance', outlier_perc: float = 100., batch_size: int = int(1e10), return_feature_score: bool = True, return_instance_score: bool = True) \ -> Dict[Dict[str, str], Dict[np.ndarray, np.ndarray]]: """ Predict whether instances are outliers or not. Parameters ---------- X Batch of instances. outlier_type Predict outliers at the 'feature' or 'instance' level. outlier_perc Percentage of sorted feature level outlier scores used to predict instance level outlier. batch_size Batch size used when making predictions with the VAE. return_feature_score Whether to return feature level outlier scores. return_instance_score Whether to return instance level outlier scores. Returns ------- Dictionary containing ``'meta'`` and ``'data'`` dictionaries. - ``'meta'`` has the model's metadata. - ``'data'`` contains the outlier predictions and both feature and instance level outlier scores. """ # compute outlier scores fscore, iscore = self.score(X, outlier_perc=outlier_perc, batch_size=batch_size) if outlier_type == 'feature': outlier_score = fscore elif outlier_type == 'instance': outlier_score = iscore else: raise ValueError('`outlier_score` needs to be either `feature` or `instance`.') # values above threshold are outliers outlier_pred = (outlier_score > self.threshold).astype(int) # populate output dict od = outlier_prediction_dict() od['meta'] = self.meta od['data']['is_outlier'] = outlier_pred if return_feature_score: od['data']['feature_score'] = fscore if return_instance_score: od['data']['instance_score'] = iscore return od