Source code for alibi_detect.cd.mmd_online

import os
import numpy as np
from typing import Any, Callable, Dict, Optional, Union
from alibi_detect.utils.frameworks import has_pytorch, has_tensorflow, BackendValidator, Framework
from alibi_detect.base import DriftConfigMixin
from alibi_detect.utils._types import TorchDeviceType

if has_pytorch:
    from alibi_detect.cd.pytorch.mmd_online import MMDDriftOnlineTorch

if has_tensorflow:
    from alibi_detect.cd.tensorflow.mmd_online import MMDDriftOnlineTF


[docs] class MMDDriftOnline(DriftConfigMixin):
[docs] def __init__( self, x_ref: Union[np.ndarray, list], ert: float, window_size: int, backend: str = 'tensorflow', preprocess_fn: Optional[Callable] = None, x_ref_preprocessed: bool = False, kernel: Optional[Callable] = None, sigma: Optional[np.ndarray] = None, n_bootstraps: int = 1000, device: TorchDeviceType = None, verbose: bool = True, input_shape: Optional[tuple] = None, data_type: Optional[str] = None ) -> None: """ Online maximum Mean Discrepancy (MMD) data drift detector using preconfigured thresholds. Parameters ---------- x_ref Data used as reference distribution. ert The expected run-time (ERT) in the absence of drift. For the multivariate detectors, the ERT is defined as the expected run-time from t=0. window_size The size of the sliding test-window used to compute the test-statistic. Smaller windows focus on responding quickly to severe drift, larger windows focus on ability to detect slight drift. backend Backend used for the MMD implementation and configuration. preprocess_fn Function to preprocess the data before computing the data drift metrics. x_ref_preprocessed Whether the given reference data `x_ref` has been preprocessed yet. If `x_ref_preprocessed=True`, only the test data `x` will be preprocessed at prediction time. If `x_ref_preprocessed=False`, the reference data will also be preprocessed. kernel Kernel used for the MMD computation, defaults to Gaussian RBF kernel. sigma Optionally set the GaussianRBF kernel bandwidth. Can also pass multiple bandwidth values as an array. The kernel evaluation is then averaged over those bandwidths. If `sigma` is not specified, the 'median heuristic' is adopted whereby `sigma` is set as the median pairwise distance between reference samples. n_bootstraps The number of bootstrap simulations used to configure the thresholds. The larger this is the more accurately the desired ERT will be targeted. Should ideally be at least an order of magnitude larger than the ERT. device Device type used. The default tries to use the GPU and falls back on CPU if needed. Can be specified by passing either ``'cuda'``, ``'gpu'``, ``'cpu'`` or an instance of ``torch.device``. Only relevant for 'pytorch' backend. verbose Whether or not to print progress during configuration. input_shape Shape of input data. data_type Optionally specify the data type (tabular, image or time-series). Added to metadata. """ super().__init__() # Set config self._set_config(locals()) backend = backend.lower() BackendValidator( backend_options={Framework.TENSORFLOW: [Framework.TENSORFLOW], Framework.PYTORCH: [Framework.PYTORCH]}, construct_name=self.__class__.__name__ ).verify_backend(backend) kwargs = locals() args = [kwargs['x_ref'], kwargs['ert'], kwargs['window_size']] pop_kwargs = ['self', 'x_ref', 'ert', 'window_size', 'backend', '__class__'] [kwargs.pop(k, None) for k in pop_kwargs] if kernel is None: if backend == Framework.TENSORFLOW: from alibi_detect.utils.tensorflow.kernels import GaussianRBF else: from alibi_detect.utils.pytorch.kernels import GaussianRBF # type: ignore kwargs.update({'kernel': GaussianRBF}) if backend == Framework.TENSORFLOW: kwargs.pop('device', None) self._detector = MMDDriftOnlineTF(*args, **kwargs) else: self._detector = MMDDriftOnlineTorch(*args, **kwargs) # type: ignore self.meta = self._detector.meta
@property def t(self): return self._detector.t @property def test_stats(self): return self._detector.test_stats @property def thresholds(self): return [self._detector.thresholds[min(s, self._detector.window_size-1)] for s in range(self.t)]
[docs] def reset_state(self): """ Resets the detector to its initial state (`t=0`). This does not include reconfiguring thresholds. """ self._detector.reset_state()
[docs] def predict(self, x_t: Union[np.ndarray, Any], return_test_stat: bool = True) \ -> Dict[Dict[str, str], Dict[str, Union[int, float]]]: """ Predict whether the most recent window of data has drifted from the reference data. Parameters ---------- x_t A single instance to be added to the test-window. return_test_stat Whether to return the test statistic (squared MMD) and threshold. Returns ------- Dictionary containing ``'meta'`` and ``'data'`` dictionaries. - ``'meta'`` has the model's metadata. - ``'data'`` contains the drift prediction and optionally the test-statistic and threshold. """ return self._detector.predict(x_t, return_test_stat)
[docs] def score(self, x_t: Union[np.ndarray, Any]) -> float: """ Compute the test-statistic (squared MMD) between the reference window and test window. Parameters ---------- x_t A single instance to be added to the test-window. Returns ------- Squared MMD estimate between reference window and test window. """ return self._detector.score(x_t)
[docs] def save_state(self, filepath: Union[str, os.PathLike]): """ Save a detector's state to disk in order to generate a checkpoint. Parameters ---------- filepath The directory to save state to. """ self._detector.save_state(filepath)
[docs] def load_state(self, filepath: Union[str, os.PathLike]): """ Load the detector's state from disk, in order to restart from a checkpoint previously generated with `save_state`. Parameters ---------- filepath The directory to load state from. """ self._detector.load_state(filepath)
[docs] def get_config(self) -> dict: # Needed due to self.x_ref being a torch.Tensor when backend='pytorch' """ Get the detector's configuration dictionary. Returns ------- The detector's configuration dictionary. """ cfg = super().get_config() if cfg.get('backend') == 'pytorch': cfg['x_ref'] = cfg['x_ref'].detach().cpu().numpy() return cfg