import logging
import os
from io import BytesIO
from pathlib import Path
from typing import Union, TYPE_CHECKING, Tuple
import dill
import requests
from requests import RequestException
import tensorflow as tf
from tensorflow.python.keras import backend
from alibi_detect.models.tensorflow import PixelCNN
from alibi_detect.saving import load_detector
if TYPE_CHECKING:
# Import the true objects directly for typechecking. (See note in CONTRIBUTING.md in Optional Dependencies section)
from alibi_detect.ad.adversarialae import AdversarialAE # noqa
from alibi_detect.ad.model_distillation import ModelDistillation # noqa
from alibi_detect.base import BaseDetector # noqa
from alibi_detect.od.llr import LLR # noqa
from alibi_detect.od.isolationforest import IForest # noqa
from alibi_detect.od.mahalanobis import Mahalanobis # noqa
from alibi_detect.od.aegmm import OutlierAEGMM # noqa
from alibi_detect.od.ae import OutlierAE # noqa
from alibi_detect.od.prophet import OutlierProphet # noqa
from alibi_detect.od.seq2seq import OutlierSeq2Seq # noqa
from alibi_detect.od.vae import OutlierVAE # noqa
from alibi_detect.od.vaegmm import OutlierVAEGMM # noqa
from alibi_detect.od.sr import SpectralResidual # noqa
from alibi_detect.utils.url import _join_url
# do not extend pickle dispatch table so as not to change pickle behaviour
dill.extend(use_dill=False)
logger = logging.getLogger(__name__)
Data = Union[
'BaseDetector',
'AdversarialAE',
'ModelDistillation',
'IForest',
'LLR',
'Mahalanobis',
'OutlierAEGMM',
'OutlierAE',
'OutlierProphet',
'OutlierSeq2Seq',
'OutlierVAE',
'OutlierVAEGMM',
'SpectralResidual'
]
"""Number of seconds to wait for URL requests before raising an error."""
TIMEOUT = 10
[docs]def get_pixelcnn_default_kwargs():
dist = PixelCNN(
image_shape=(28, 28, 1),
num_resnet=5,
num_hierarchies=2,
num_filters=32,
num_logistic_mix=1,
receptive_field_dims=(3, 3),
dropout_p=.3,
l2_weight=0.
)
KWARGS_PIXELCNN = {
'dist_s': dist,
'dist_b': dist.copy(),
'input_shape': (28, 28, 1)
}
return KWARGS_PIXELCNN
[docs]def fetch_tf_model(dataset: str, model: str) -> tf.keras.Model:
"""
Fetch pretrained tensorflow models from the google cloud bucket.
Parameters
----------
dataset
Dataset trained on.
model
Model name.
Returns
-------
Pretrained tensorflow model.
"""
url = 'https://storage.googleapis.com/seldon-models/alibi-detect/classifier/'
path_model = _join_url(url, [dataset, model, 'model.h5'])
save_path = tf.keras.utils.get_file(Path(model + '.h5').resolve(), path_model)
if dataset == 'cifar10' and model == 'resnet56':
custom_objects = {'backend': backend}
else:
custom_objects = None
clf = tf.keras.models.load_model(save_path, custom_objects=custom_objects)
return clf
[docs]def fetch_enc_dec(url: str, filepath: Union[str, os.PathLike]) -> None:
"""
Download encoder and decoder networks.
Parameters
----------
url
URL to fetch detector from.
filepath
Local directory to save detector to.
"""
url_models = _join_url(url, 'model')
model_path = Path(filepath).joinpath('model').resolve()
if not model_path.is_dir():
model_path.mkdir(parents=True, exist_ok=True)
# encoder and decoder
tf.keras.utils.get_file(
model_path.joinpath('encoder_net.h5'),
_join_url(url_models, 'encoder_net.h5')
)
tf.keras.utils.get_file(
model_path.joinpath('decoder_net.h5'),
_join_url(url_models, 'decoder_net.h5')
)
[docs]def fetch_ae(url: str, filepath: Union[str, os.PathLike]) -> None:
"""
Download AE outlier detector.
Parameters
----------
url
URL to fetch detector from.
filepath
Local directory to save detector to.
"""
fetch_enc_dec(url, filepath)
url_models = _join_url(url, 'model')
model_path = Path(filepath).joinpath('model').resolve()
# encoder and decoder
tf.keras.utils.get_file(
model_path.joinpath('checkpoint'),
_join_url(url_models, 'checkpoint')
)
tf.keras.utils.get_file(
model_path.joinpath('ae.ckpt.index'),
_join_url(url_models, 'ae.ckpt.index')
)
tf.keras.utils.get_file(
model_path.joinpath('ae.ckpt.data-00000-of-00001'),
_join_url(url_models, 'ae.ckpt.data-00000-of-00001')
)
[docs]def fetch_ad_ae(url: str, filepath: Union[str, os.PathLike], state_dict: dict) -> None:
"""
Download AE adversarial detector.
Parameters
----------
url
URL to fetch detector from.
filepath
Local directory to save detector to.
state_dict
Dictionary containing the detector's parameters.
"""
fetch_enc_dec(url, filepath)
url_models = _join_url(url, 'model')
model_path = Path(filepath).joinpath('model').resolve()
tf.keras.utils.get_file(
model_path.joinpath('model.h5'),
_join_url(url_models, 'model.h5')
)
tf.keras.utils.get_file(
model_path.joinpath('checkpoint'),
_join_url(url_models, 'checkpoint')
)
tf.keras.utils.get_file(
model_path.joinpath('ae.ckpt.index'),
_join_url(url_models, 'ae.ckpt.index')
)
tf.keras.utils.get_file(
model_path.joinpath('ae.ckpt.data-00000-of-00002'),
_join_url(url_models, 'ae.ckpt.data-00000-of-00002')
)
tf.keras.utils.get_file(
model_path.joinpath('ae.ckpt.data-00001-of-00002'),
_join_url(url_models, 'ae.ckpt.data-00001-of-00002')
)
hidden_layer_kld = state_dict['hidden_layer_kld']
if hidden_layer_kld:
for i, (_, _) in enumerate(hidden_layer_kld.items()):
hl = 'model_hl_' + str(i)
tf.keras.utils.get_file(
model_path.joinpath(hl + '.ckpt.index'),
_join_url(url_models, hl + '.ckpt.index')
)
tf.keras.utils.get_file(
model_path.joinpath(hl + '.ckpt.data-00000-of-00002'),
_join_url(url_models, hl + '.ckpt.data-00000-of-00002')
)
tf.keras.utils.get_file(
model_path.joinpath(hl + '.ckpt.data-00001-of-00002'),
_join_url(url_models, hl + '.ckpt.data-00001-of-00002')
)
[docs]def fetch_ad_md(url: str, filepath: Union[str, os.PathLike]) -> None:
"""
Download model and distilled model.
Parameters
----------
url
URL to fetch detector from.
filepath
Local directory to save detector to.
"""
url_models = _join_url(url, 'model')
model_path = Path(filepath).joinpath('model').resolve()
if not model_path.is_dir():
model_path.mkdir(parents=True, exist_ok=True)
# encoder and decoder
tf.keras.utils.get_file(
model_path.joinpath('model.h5'),
_join_url(url_models, 'model.h5')
)
tf.keras.utils.get_file(
model_path.joinpath('distilled_model.h5'),
_join_url(url_models, 'distilled_model.h5')
)
[docs]def fetch_aegmm(url: str, filepath: Union[str, os.PathLike]) -> None:
"""
Download AEGMM outlier detector.
Parameters
----------
url
URL to fetch detector from.
filepath
Local directory to save detector to.
"""
# save encoder and decoder
fetch_enc_dec(url, filepath)
# save GMM network
url_models = _join_url(url, 'model')
model_path = Path(filepath).joinpath('model').resolve()
tf.keras.utils.get_file(
model_path.joinpath('gmm_density_net.h5'),
_join_url(url_models, 'gmm_density_net.h5')
)
tf.keras.utils.get_file(
model_path.joinpath('checkpoint'),
_join_url(url_models, 'checkpoint')
)
tf.keras.utils.get_file(
model_path.joinpath('aegmm.ckpt.index'),
_join_url(url_models, 'aegmm.ckpt.index')
)
tf.keras.utils.get_file(
model_path.joinpath('aegmm.ckpt.data-00000-of-00001'),
_join_url(url_models, 'aegmm.ckpt.data-00000-of-00001')
)
[docs]def fetch_vae(url: str, filepath: Union[str, os.PathLike]) -> None:
"""
Download VAE outlier detector.
Parameters
----------
url
URL to fetch detector from.
filepath
Local directory to save detector to.
"""
fetch_enc_dec(url, filepath)
# save VAE weights
url_models = _join_url(url, 'model')
model_path = Path(filepath).joinpath('model').resolve()
tf.keras.utils.get_file(
model_path.joinpath('checkpoint'),
_join_url(url_models, 'checkpoint')
)
tf.keras.utils.get_file(
model_path.joinpath('vae.ckpt.index'),
_join_url(url_models, 'vae.ckpt.index')
)
tf.keras.utils.get_file(
model_path.joinpath('vae.ckpt.data-00000-of-00001'),
_join_url(url_models, 'vae.ckpt.data-00000-of-00001')
)
[docs]def fetch_vaegmm(url: str, filepath: Union[str, os.PathLike]) -> None:
"""
Download VAEGMM outlier detector.
Parameters
----------
url
URL to fetch detector from.
filepath
Local directory to save detector to.
"""
# save encoder and decoder
fetch_enc_dec(url, filepath)
# save GMM network
url_models = _join_url(url, 'model')
model_path = Path(filepath).joinpath('model').resolve()
tf.keras.utils.get_file(
model_path.joinpath('gmm_density_net.h5'),
_join_url(url_models, 'gmm_density_net.h5')
)
tf.keras.utils.get_file(
model_path.joinpath('checkpoint'),
_join_url(url_models, 'checkpoint')
)
tf.keras.utils.get_file(
model_path.joinpath('vaegmm.ckpt.index'),
_join_url(url_models, 'vaegmm.ckpt.index')
)
tf.keras.utils.get_file(
model_path.joinpath('vaegmm.ckpt.data-00000-of-00001'),
_join_url(url_models, 'vaegmm.ckpt.data-00000-of-00001')
)
[docs]def fetch_seq2seq(url: str, filepath: Union[str, os.PathLike]) -> None:
"""
Download sequence-to-sequence outlier detector.
Parameters
----------
url
URL to fetch detector from.
filepath
Local directory to save detector to.
"""
url_models = _join_url(url, 'model')
model_path = Path(filepath).joinpath('model').resolve()
if not model_path.is_dir():
model_path.mkdir(parents=True, exist_ok=True)
# save seq2seq
tf.keras.utils.get_file(
model_path.joinpath('checkpoint'),
_join_url(url_models, 'checkpoint')
)
tf.keras.utils.get_file(
model_path.joinpath('seq2seq.ckpt.index'),
_join_url(url_models, 'seq2seq.ckpt.index')
)
tf.keras.utils.get_file(
model_path.joinpath('seq2seq.ckpt.data-00000-of-00001'),
_join_url(url_models, 'seq2seq.ckpt.data-00000-of-00001')
)
# save threshold network
tf.keras.utils.get_file(
model_path.joinpath('threshold_net.h5'),
_join_url(url_models, 'threshold_net.h5')
)
[docs]def fetch_llr(url: str, filepath: Union[str, os.PathLike]) -> str:
"""
Download Likelihood Ratio outlier detector.
Parameters
----------
url
URL to fetch detector from.
filepath
Local directory to save detector to.
"""
url_models = _join_url(url, 'model')
model_path = Path(filepath).joinpath('model').resolve()
if not model_path.is_dir():
model_path.mkdir(parents=True, exist_ok=True)
try:
tf.keras.utils.get_file(
model_path.joinpath('model_s.h5'),
_join_url(url_models, 'model_s.h5')
)
tf.keras.utils.get_file(
model_path.joinpath('model_b.h5'),
_join_url(url_models, 'model_b.h5')
)
model_type = 'weights'
return model_type
except Exception:
tf.keras.utils.get_file(
model_path.joinpath('model.h5'),
_join_url(url_models, 'model.h5')
)
tf.keras.utils.get_file(
model_path.joinpath('model_background.h5'),
_join_url(url_models, 'model_background.h5')
)
return 'model'
[docs]def fetch_state_dict(url: str, filepath: Union[str, os.PathLike],
save_state_dict: bool = True) -> Tuple[dict, dict]:
"""
Fetch the metadata and state/hyperparameter values of pre-trained detectors.
Parameters
----------
url
URL to fetch detector from.
filepath
Local directory to save detector to.
save_state_dict
Whether to save the state dict locally.
Returns
-------
Detector metadata and state.
"""
# Check if metadata stored as dill or pickle
try:
url_meta = _join_url(url, 'meta.dill')
resp = requests.get(url_meta, timeout=TIMEOUT)
resp.raise_for_status()
suffix = '.dill'
except RequestException:
try:
url_meta = _join_url(url, 'meta.pickle')
resp = requests.get(url_meta, timeout=TIMEOUT)
resp.raise_for_status()
suffix = '.pickle'
except RequestException:
logger.exception('Timed out while searching for meta.dill or meta.pickle files at %s.', url)
raise
# Load metadata and state_dict
meta = dill.load(BytesIO(resp.content))
try:
url_state = _join_url(url, meta['name'] + suffix)
resp = requests.get(url_state)
resp.raise_for_status()
except RequestException:
logger.exception('Timed out while searching for corresponding state file at %s.', url)
raise
state_dict = dill.load(BytesIO(resp.content))
# Save state
if save_state_dict:
filepath = Path(filepath)
with open(filepath.joinpath('meta.dill'), 'wb') as f:
dill.dump(meta, f)
with open(filepath.joinpath(meta['name'] + '.dill'), 'wb') as f:
dill.dump(state_dict, f)
return meta, state_dict
[docs]def fetch_detector(filepath: Union[str, os.PathLike],
detector_type: str,
dataset: str,
detector_name: str,
model: str = None) -> Data:
"""
Fetch an outlier or adversarial detector from a google bucket, save it locally and return
the initialised detector.
Parameters
----------
filepath
Local directory to save detector to.
detector_type
`outlier` or `adversarial`.
dataset
Dataset of pre-trained detector. E.g. `kddcup`, `cifar10` or `ecg`.
detector_name
Name of the detector in the bucket.
model
Classification model used for adversarial detection.
Returns
-------
Initialised pre-trained detector.
"""
# create path (if needed)
filepath = Path(filepath)
if not filepath.is_dir():
filepath.mkdir(parents=True, exist_ok=True)
logger.warning('Directory {} does not exist and is now created.'.format(filepath))
# create url of detector
url = 'https://storage.googleapis.com/seldon-models/alibi-detect/'
if detector_type == 'adversarial':
url = _join_url(url, ['ad', dataset, model, detector_name])
elif detector_type == 'outlier':
url = _join_url(url, ['od', detector_name, dataset])
# fetch the metadata and state dict
meta, state_dict = fetch_state_dict(url, filepath, save_state_dict=True)
# load detector
name = meta['name']
kwargs: dict = {}
if name == 'OutlierAE':
fetch_ae(url, filepath)
elif name == 'OutlierAEGMM':
fetch_aegmm(url, filepath)
elif name == 'OutlierVAE':
fetch_vae(url, filepath)
elif name == 'OutlierVAEGMM':
fetch_vaegmm(url, filepath)
elif name == 'OutlierSeq2Seq':
fetch_seq2seq(url, filepath)
elif name == 'AdversarialAE':
fetch_ad_ae(url, filepath, state_dict)
if model == 'resnet56':
kwargs = {'custom_objects': {'backend': backend}}
elif name == 'ModelDistillation':
fetch_ad_md(url, filepath)
if model == 'resnet56':
kwargs = {'custom_objects': {'backend': backend}}
elif name == 'LLR':
model_type = fetch_llr(url, filepath)
if model_type == 'weights':
kwargs = get_pixelcnn_default_kwargs()
detector = load_detector(filepath, **kwargs)
return detector # type: ignore[return-value] # load_detector returns drift detectors but `Data` doesn't inc. them
# TODO - above type ignore can be removed once all detectors use the config based approach.