import numpy as np
from typing import Any, Callable, List, Optional, Union
from alibi_detect.base import DriftConfigMixin
from alibi_detect.cd.base_online import BaseUniDriftOnline
from alibi_detect.utils.misc import quantile
import numba as nb
from tqdm import tqdm
import warnings
[docs]class CVMDriftOnline(BaseUniDriftOnline, DriftConfigMixin):
online_state_keys = ('t', 'test_stats', 'drift_preds', 'xs', 'ids_ref_wins', 'ids_wins_ref', 'ids_wins_wins')
[docs] def __init__(
self,
x_ref: Union[np.ndarray, list],
ert: float,
window_sizes: List[int],
preprocess_fn: Optional[Callable] = None,
x_ref_preprocessed: bool = False,
n_bootstraps: int = 10000,
batch_size: int = 64,
n_features: Optional[int] = None,
verbose: bool = True,
input_shape: Optional[tuple] = None,
data_type: Optional[str] = None
) -> None:
"""
Online Cramer-von Mises (CVM) data drift detector using preconfigured thresholds, which tests for
any change in the distribution of continuous univariate data. This detector is an adaption of that
proposed by :cite:t:`Ross2012a`.
For multivariate data, the detector makes a correction similar to the Bonferroni correction used for
the offline detector. Given :math:`d` features, the detector configures thresholds by
targeting the :math:`1-\\beta` quantile of test statistics over the simulated streams, where
:math:`\\beta = 1 - (1-(1/ERT))^{(1/d)}`. For the univariate case, this simplifies to
:math:`\\beta = 1/ERT`. At prediction time, drift is flagged if the test statistic of any feature stream
exceed the thresholds.
Note
----
In the multivariate case, for the ERT to be accurately targeted the feature streams must be independent.
Parameters
----------
x_ref
Data used as reference distribution.
ert
The expected run-time (ERT) in the absence of drift. For the univariate detectors, the ERT is defined
as the expected run-time after the smallest window is full i.e. the run-time from t=min(windows_sizes).
window_sizes
window sizes for the sliding test-windows used to compute the test-statistic.
Smaller windows focus on responding quickly to severe drift, larger windows focus on
ability to detect slight drift.
preprocess_fn
Function to preprocess the data before computing the data drift metrics.
x_ref_preprocessed
Whether the given reference data `x_ref` has been preprocessed yet. If `x_ref_preprocessed=True`, only
the test data `x` will be preprocessed at prediction time. If `x_ref_preprocessed=False`, the reference
data will also be preprocessed.
n_bootstraps
The number of bootstrap simulations used to configure the thresholds. The larger this is the
more accurately the desired ERT will be targeted. Should ideally be at least an order of magnitude
larger than the ERT.
batch_size
The maximum number of bootstrap simulations to compute in each batch when configuring thresholds.
A smaller batch size reduces memory requirements, but can result in a longer configuration run time.
n_features
Number of features used in the statistical test. No need to pass it if no preprocessing takes place.
In case of a preprocessing step, this can also be inferred automatically but could be more
expensive to compute.
verbose
Whether or not to print progress during configuration.
input_shape
Shape of input data.
data_type
Optionally specify the data type (tabular, image or time-series). Added to metadata.
"""
super().__init__(
x_ref=x_ref,
ert=ert,
window_sizes=window_sizes,
preprocess_fn=preprocess_fn,
x_ref_preprocessed=x_ref_preprocessed,
n_bootstraps=n_bootstraps,
n_features=n_features,
verbose=verbose,
input_shape=input_shape,
data_type=data_type
)
# Set config
self._set_config(locals())
self.batch_size = n_bootstraps if batch_size is None else batch_size
# Configure thresholds and initialise detector
self._initialise_state()
self._configure_thresholds()
self._configure_ref()
def _configure_ref(self) -> None:
"""
Configure the reference data.
"""
ids_ref_ref = self.x_ref[None, :, :] >= self.x_ref[:, None, :]
self.ref_cdf_ref = np.sum(ids_ref_ref, axis=0) / self.n
def _configure_thresholds(self) -> None:
"""
Private method to simulate trajectories of the Cramer-von Mises statistic for the desired reference set
size and window sizes under the null distribution, where both the reference set and deployment stream
follow the same distribution. It then uses these simulated trajectories to estimate thresholds.
As the test statistics are rank based and independent of the underlying distribution, we may use any
continuous distribution -- we use Gaussian.
The thresholds should stop changing after t=(2*max-window-size - 1) and therefore we need only simulate
trajectories and estimate thresholds up to this point.
"""
if self.verbose:
print("Using %d bootstrap simulations to configure thresholds..." % self.n_bootstraps)
# Assuming independent features, calibrate to beta = 1 - (1-FPR)^(1/n_features)
beta = 1 - (1 - self.fpr) ** (1 / self.n_features)
# Compute test statistic at each t_max number of t's, for each of the n_bootstrap number of streams
# Only need to simulate streams for a single feature here.
t_max = 2 * self.max_ws - 1
stats = self._simulate_streams(t_max)
# At each t for each stream, find max stats. over window sizes
with warnings.catch_warnings():
warnings.filterwarnings(action='ignore', message='All-NaN slice encountered')
max_stats = np.nanmax(stats, -1)
# Now loop through each t and find threshold (at each t) that satisfies eqn. (2) in Ross et al.
thresholds = np.full((t_max, 1), np.nan)
for t in range(np.min(self.window_sizes)-1, t_max):
# Compute (1-beta) quantile of max_stats at a given t, over all streams
threshold = quantile(max_stats[:, t], 1 - beta)
# Remove streams for which a change point has already been detected
max_stats = max_stats[max_stats[:, t] <= threshold]
thresholds[t, 0] = threshold
self.thresholds = thresholds
def _simulate_streams(self, t_max: int) -> np.ndarray:
"""
Private method to simulate streams. _ids_to_stats is a decorated function that is vectorised
over the parallel streams. Not sufficient just to write a normal vectorised numpy implementation as this
can lead to OOM errors (when trying to store (n+t_max) x (n+t_max) x n_bootstraps matrices of floats).
However, we will store the boolean matrix of this size as it faster to compute this way (and 64x smaller).
To further reduce memory requirements, _ids_to_stats can be called for batches of streams, so that
the ids array is of shape batch_size x (n+t_max) x (n+t_max).
"""
n_windows = len(self.window_sizes)
stats = np.zeros((self.n_bootstraps, t_max, n_windows))
n_batches = int(np.ceil(self.n_bootstraps / self.batch_size))
idxs = np.array_split(np.arange(self.n_bootstraps), n_batches)
batches = enumerate(tqdm(idxs, "Computing thresholds over %d batches" % n_batches)) if self.verbose \
else enumerate(idxs)
for b, idx in batches:
xs = np.random.randn(len(idx), self.n + t_max)
ids = xs[:, None, :] >= xs[:, :, None]
stats[idx, :, :] = _ids_to_stats(ids[:, :self.n, :], ids[:, self.n:, :], np.asarray(self.window_sizes))
# Remove stats prior to windows being full
for k, ws in enumerate(self.window_sizes):
stats[:, :ws-1, k] = np.nan
return stats
def _update_state(self, x_t: np.ndarray):
"""
Update online state based on the provided test instance.
Parameters
----------
x_t
The test instance.
"""
self.t += 1
if self.t == 1:
# Initialise stream
self.xs = x_t
self.ids_ref_wins = (x_t >= self.x_ref)[:, None, :]
self.ids_wins_ref = (x_t <= self.x_ref)[None, :, :]
self.ids_wins_wins = np.full((1, 1, self.n_features), 1)
else:
# Update stream
self.xs = np.concatenate([self.xs, x_t])
self.ids_ref_wins = np.concatenate(
[self.ids_ref_wins[:, -(self.max_ws - 1):, :], (x_t >= self.x_ref)[:, None, :]], 1
)
self.ids_wins_ref = np.concatenate(
[self.ids_wins_ref[-(self.max_ws - 1):, :, :], (x_t <= self.x_ref)[None, :, :]], 0
)
self.ids_wins_wins = np.concatenate(
[self.ids_wins_wins[-(self.max_ws - 1):, -(self.max_ws - 1):, :],
(x_t >= self.xs[-self.max_ws:-1, :])[:, None, :]], 1
)
self.ids_wins_wins = np.concatenate(
[self.ids_wins_wins, (x_t <= self.xs[-self.max_ws:, :])[None, :, :]], 0
)
def _initialise_state(self) -> None:
"""
Initialise online state (the stateful attributes updated by `score` and `predict`).
"""
super()._initialise_state()
self.ids_ref_wins = np.array([])
self.ids_wins_ref = np.array([])
self.ids_wins_wins = np.array([])
[docs] def score(self, x_t: Union[np.ndarray, Any]) -> np.ndarray:
"""
Compute the test-statistic (CVM) between the reference window(s) and test window.
If a given test-window is not yet full then a test-statistic of np.nan is returned for that window.
Parameters
----------
x_t
A single instance.
Returns
-------
Estimated CVM test statistics between reference window and test window(s).
"""
x_t = super()._preprocess_xt(x_t)
self._update_state(x_t)
stats = np.zeros((len(self.window_sizes), self.n_features), dtype=np.float32)
for k, ws in enumerate(self.window_sizes):
if self.t >= ws:
ref_cdf_win = np.sum(self.ids_ref_wins[:, -ws:], axis=0) / self.n
win_cdf_ref = np.sum(self.ids_wins_ref[-ws:], axis=0) / ws
win_cdf_win = np.sum(self.ids_wins_wins[-ws:, -ws:], axis=0) / ws
ref_cdf_diffs = self.ref_cdf_ref - win_cdf_ref
win_cdf_diffs = ref_cdf_win - win_cdf_win
sum_diffs_2 = np.sum(ref_cdf_diffs * ref_cdf_diffs, axis=0) \
+ np.sum(win_cdf_diffs * win_cdf_diffs, axis=0)
stats[k, :] = _normalise_stats(sum_diffs_2, self.n, ws)
else:
stats[k, :] = np.nan
return stats
def _check_drift(self, test_stats: np.ndarray, thresholds: np.ndarray) -> int:
"""
Private method to compare test stats to thresholds. The max stats over all windows are compute for each
feature. Drift is flagged if `max_stats` for any feature exceeds the single `thresholds` set.
Parameters
----------
test_stats
Array of test statistics with shape (n_windows, n_features)
thresholds
Array of thresholds with shape (t_max, 1).
Returns
-------
An int equal to 1 if drift, 0 otherwise.
"""
with warnings.catch_warnings():
warnings.filterwarnings(action='ignore', message='All-NaN slice encountered')
max_stats = np.nanmax(test_stats, axis=0)
drift_pred = int((max_stats > thresholds).any())
return drift_pred
@nb.njit(parallel=False, cache=True)
def _normalise_stats(stats: np.ndarray, n: int, ws: int) -> np.ndarray:
"""
See Eqns 3 & 14 of https://www.projecteuclid.org/euclid.aoms/1177704477.
"""
mu = 1 / 6 + 1 / (6 * (n + ws))
var_num = (n + ws + 1) * (4 * n * ws * (n + ws) - 3 * (n * n + ws * ws) - 2 * n * ws)
var_denom = 45 * (n + ws) * (n + ws) * 4 * n * ws
prod = n * ws / ((n + ws) * (n + ws))
return (stats * prod - mu) / np.sqrt(var_num / var_denom)
@nb.njit(parallel=True, cache=True)
def _ids_to_stats(
ids_ref_all: np.ndarray,
ids_stream_all: np.ndarray,
window_sizes: np.ndarray
) -> np.ndarray:
n_bootstraps = ids_ref_all.shape[0]
n = ids_ref_all.shape[1]
t_max = ids_stream_all.shape[1]
n_all = ids_ref_all.shape[-1]
n_windows = window_sizes.shape[0]
stats = np.zeros((n_bootstraps, t_max, n_windows))
for b in nb.prange(n_bootstraps):
ref_cdf_all = np.sum(ids_ref_all[b], axis=0) / n
cumsums = np.zeros((t_max+1, n_all))
for i in range(n_all):
cumsums[1:, i] = np.cumsum(ids_stream_all[b, :, i])
for k in range(n_windows):
ws = window_sizes[k]
win_cdf_ref = (cumsums[ws:, :n] - cumsums[:-ws, :n]) / ws
cdf_diffs_on_ref = np.empty_like(win_cdf_ref)
for j in range(win_cdf_ref.shape[0]): # Need to loop through as can't broadcast in njit parallel
cdf_diffs_on_ref[j, :] = ref_cdf_all[:n] - win_cdf_ref[j, :]
stats[b, (ws-1):, k] = np.sum(cdf_diffs_on_ref * cdf_diffs_on_ref, axis=-1)
for t in range(ws-1, t_max):
win_cdf_win = (cumsums[t + 1, n + t - ws:n + t] -
cumsums[t + 1 - ws, n + t - ws:n + t]) / ws
cdf_diffs_on_win = ref_cdf_all[n + t - ws:n + t] - win_cdf_win
stats[b, t, k] += np.sum(cdf_diffs_on_win * cdf_diffs_on_win)
stats[b, :, k] = _normalise_stats(stats[b, :, k], n, ws)
return stats