Source code for alibi_detect.od.pytorch.ensemble

from abc import ABC, abstractmethod
from typing import Optional
from typing_extensions import Self

import torch
import numpy as np
from torch.nn import Module

from alibi_detect.exceptions import NotFittedError


[docs]class BaseTransformTorch(Module):
[docs] def __init__(self): """Base Transform class. provides abstract methods for transform objects that map `torch` tensors. """ super().__init__()
[docs] def transform(self, x: torch.Tensor): """Public transform method. Parameters ---------- x `torch.Tensor` array to be transformed """ raise NotImplementedError()
[docs] def forward(self, x: torch.Tensor) -> torch.Tensor: return self.transform(x)
[docs]class FitMixinTorch(ABC): fitted = False
[docs] @abstractmethod def fit(self, x_ref: torch.Tensor) -> Self: """Abstract fit method. Parameters ---------- x `torch.Tensor` to fit object on. """ pass
def _set_fitted(self) -> Self: """Sets the fitted attribute to True. Should be called within each transform method. """ self.fitted = True return self
[docs] def check_fitted(self): """Checks to make sure object has been fitted. Raises ------ NotFittedError Raised if method called and object has not been fit. """ if not torch.jit.is_scripting(): self._check_fitted()
@torch.jit.unused def _check_fitted(self): """Raises error if parent object instance has not been fit.""" if not self.fitted: raise NotFittedError(self.__class__.__name__)
[docs]class PValNormalizer(BaseTransformTorch, FitMixinTorch):
[docs] def __init__(self): """Maps scores to there p-values. Needs to be fit (see :py:obj:`~alibi_detect.od.pytorch.ensemble.BaseFittedTransformTorch`). Returns the proportion of scores in the reference dataset that are greater than the score of interest. Output is between ``1`` and ``0``. Small values are likely to be outliers. """ super().__init__() self.val_scores = None
[docs] def fit(self, val_scores: torch.Tensor) -> Self: """Fit transform on scores. Parameters ---------- val_scores score outputs of ensemble of detectors applied to reference data. """ self.val_scores = val_scores return self._set_fitted()
[docs] def transform(self, scores: torch.Tensor) -> torch.Tensor: """Transform scores to 1 - p-values. Parameters ---------- scores `Torch.Tensor` of scores from ensemble of detectors. Returns ------- `Torch.Tensor` of 1 - p-values. """ self.check_fitted() less_than_val_scores = scores[:, None, :] < self.val_scores[None, :, :] p_vals = (1 + less_than_val_scores.sum(1))/(len(self.val_scores) + 1) return 1 - p_vals
[docs]class ShiftAndScaleNormalizer(BaseTransformTorch, FitMixinTorch):
[docs] def __init__(self): """Maps scores to their normalized values. Needs to be fit (see :py:obj:`~alibi_detect.od.pytorch.ensemble.BaseFittedTransformTorch`). Subtracts the dataset mean and scales by the standard deviation. """ super().__init__() self.val_means = None self.val_scales = None
[docs] def fit(self, val_scores: torch.Tensor) -> Self: """Computes the mean and standard deviation of the scores and stores them. Parameters ---------- val_scores `Torch.Tensor` of scores from ensemble of detectors. """ self.val_means = val_scores.mean(0)[None, :] self.val_scales = val_scores.std(0)[None, :] return self._set_fitted()
[docs] def transform(self, scores: torch.Tensor) -> torch.Tensor: """Transform scores to normalized values. Subtracts the mean and scales by the standard deviation. Parameters ---------- scores `Torch.Tensor` of scores from ensemble of detectors. Returns ------- `Torch.Tensor` of normalized scores. """ self.check_fitted() return (scores - self.val_means)/self.val_scales
[docs]class TopKAggregator(BaseTransformTorch):
[docs] def __init__(self, k: Optional[int] = None): """Takes the mean of the top `k` scores. Parameters ---------- k number of scores to take the mean of. If `k` is left ``None`` then will be set to half the number of scores passed in the forward call. """ super().__init__() self.k = k
[docs] def transform(self, scores: torch.Tensor) -> torch.Tensor: """Takes the mean of the top `k` scores. Parameters ---------- scores `Torch.Tensor` of scores from ensemble of detectors. Returns ------- `Torch.Tensor` of mean of top `k` scores. """ if self.k is None: self.k = int(np.ceil(scores.shape[1]/2)) sorted_scores, _ = torch.sort(scores, 1) return sorted_scores[:, -self.k:].mean(-1)
[docs]class AverageAggregator(BaseTransformTorch):
[docs] def __init__(self, weights: Optional[torch.Tensor] = None): """Averages the scores of the detectors in an ensemble. Parameters ---------- weights Optional parameter to weight the scores. If `weights` is left ``None`` then will be set to a vector of ones. Raises ------ ValueError If `weights` does not sum to ``1``. """ super().__init__() if weights is not None and not np.isclose(weights.sum(), 1): raise ValueError("Weights must sum to 1.") self.weights = weights
[docs] def transform(self, scores: torch.Tensor) -> torch.Tensor: """Averages the scores of the detectors in an ensemble. If weights were passed in the `__init__` then these are used to weight the scores. Parameters ---------- scores `Torch.Tensor` of scores from ensemble of detectors. Returns ------- `Torch.Tensor` of mean of scores. """ if self.weights is None: m = scores.shape[-1] self.weights = torch.ones(m, device=scores.device)/m return scores @ self.weights
[docs]class MaxAggregator(BaseTransformTorch):
[docs] def __init__(self): """Takes the maximum of the scores of the detectors in an ensemble.""" super().__init__()
[docs] def transform(self, scores: torch.Tensor) -> torch.Tensor: """Takes the maximum score of a set of detectors in an ensemble. Parameters ---------- scores `Torch.Tensor` of scores from ensemble of detectors. Returns ------- `Torch.Tensor` of maximum scores. """ vals, _ = torch.max(scores, dim=-1) return vals
[docs]class MinAggregator(BaseTransformTorch):
[docs] def __init__(self): """Takes the minimum score of a set of detectors in an ensemble.""" super().__init__()
[docs] def transform(self, scores: torch.Tensor) -> torch.Tensor: """Takes the minimum score of a set of detectors in an ensemble. Parameters ---------- scores `Torch.Tensor` of scores from ensemble of detectors. Returns ------- `Torch.Tensor` of minimum scores. """ vals, _ = torch.min(scores, dim=-1) return vals
[docs]class Ensembler(BaseTransformTorch, FitMixinTorch):
[docs] def __init__(self, normalizer: Optional[BaseTransformTorch] = None, aggregator: BaseTransformTorch = None): """An Ensembler applies normalization and aggregation operations to the scores of an ensemble of detectors. Parameters ---------- normalizer `BaseFittedTransformTorch` object to normalize the scores. If ``None`` then no normalization is applied. aggregator `BaseTransformTorch` object to aggregate the scores. If ``None`` defaults to `AverageAggregator`. """ super().__init__() self.normalizer = normalizer if self.normalizer is None: self.fitted = True if aggregator is None: aggregator = AverageAggregator() self.aggregator = aggregator
[docs] def transform(self, x: torch.Tensor) -> torch.Tensor: """Apply the normalizer and aggregator to the scores. Parameters ---------- x `Torch.Tensor` of scores from ensemble of detectors. Returns ------- `Torch.Tensor` of aggregated and normalized scores. """ if self.normalizer is not None: x = self.normalizer(x) x = self.aggregator(x) return x
[docs] def fit(self, x: torch.Tensor) -> Self: """Fit the normalizer to the scores. Parameters ---------- x `Torch.Tensor` of scores from ensemble of detectors. """ if self.normalizer is not None: self.normalizer.fit(x) # type: ignore return self._set_fitted()