Source code for alibi.explainers.cfproto

import copy
import logging
import sys
from typing import Any, Callable, Dict, Optional, Sequence, Tuple, Union

import numpy as np
import tensorflow.compat.v1 as tf

from alibi.api.defaults import DEFAULT_DATA_CFP, DEFAULT_META_CFP
from alibi.api.interfaces import Explainer, Explanation, FitMixin
from alibi.confidence import TrustScore
from alibi.utils.discretizer import Discretizer
from alibi.utils.distance import abdm, multidim_scaling, mvdm
from alibi.utils.gradients import perturb
from alibi.utils.mapping import (num_to_ord, ohe_to_ord, ohe_to_ord_shape,
                                 ord_to_num, ord_to_ohe)
from alibi.utils.tf import argmax_grad, argmin_grad, one_hot_grad, round_grad

logger = logging.getLogger(__name__)


[docs] def CounterFactualProto(*args, **kwargs): """ The class name `CounterFactualProto` is deprecated, please use `CounterfactualProto`. """ # TODO: remove this function in an upcoming release warning_msg = 'The class name `CounterFactualProto` is deprecated, please use `CounterfactualProto`.' import warnings warnings.warn(warning_msg, FutureWarning) return CounterfactualProto(*args, **kwargs)
[docs] class CounterfactualProto(Explainer, FitMixin):
[docs] def __init__(self, predict: Union[Callable[[np.ndarray], np.ndarray], tf.keras.Model], shape: tuple, kappa: float = 0., beta: float = .1, feature_range: Tuple[Union[float, np.ndarray], Union[float, np.ndarray]] = (-1e10, 1e10), gamma: float = 0., ae_model: Optional[tf.keras.Model] = None, enc_model: Optional[tf.keras.Model] = None, theta: float = 0., cat_vars: Optional[Dict[int, int]] = None, ohe: bool = False, use_kdtree: bool = False, learning_rate_init: float = 1e-2, max_iterations: int = 1000, c_init: float = 10., c_steps: int = 10, eps: tuple = (1e-3, 1e-3), clip: tuple = (-1000., 1000.), update_num_grad: int = 1, write_dir: Optional[str] = None, sess: Optional[tf.Session] = None) -> None: """ Initialize prototypical counterfactual method. Parameters ---------- predict `tensorflow` model or any other model's prediction function returning class probabilities. shape Shape of input data starting with batch size. kappa Confidence parameter for the attack loss term. beta Regularization constant for L1 loss term. feature_range Tuple with `min` and `max` ranges to allow for perturbed instances. `Min` and `max` ranges can be `float` or `numpy` arrays with dimension (1x nb of features) for feature-wise ranges. gamma Regularization constant for optional auto-encoder loss term. ae_model Optional auto-encoder model used for loss regularization. enc_model Optional encoder model used to guide instance perturbations towards a class prototype. theta Constant for the prototype search loss term. cat_vars Dict with as keys the categorical columns and as values the number of categories per categorical variable. ohe Whether the categorical variables are one-hot encoded (OHE) or not. If not OHE, they are assumed to have ordinal encodings. use_kdtree Whether to use k-d trees for the prototype loss term if no encoder is available. learning_rate_init Initial learning rate of optimizer. max_iterations Maximum number of iterations for finding a counterfactual. c_init Initial value to scale the attack loss term. c_steps Number of iterations to adjust the constant scaling the attack loss term. eps If numerical gradients are used to compute `dL/dx = (dL/dp) * (dp/dx)`, then `eps[0]` is used to calculate `dL/dp` and `eps[1]` is used for `dp/dx`. `eps[0]` and `eps[1]` can be a combination of `float` values and `numpy` arrays. For `eps[0]`, the array dimension should be (1x nb of prediction categories) and for `eps[1]` it should be (1x nb of features). clip Tuple with min and max clip ranges for both the numerical gradients and the gradients obtained from the `tensorflow` graph. update_num_grad If numerical gradients are used, they will be updated every `update_num_grad` iterations. write_dir Directory to write `tensorboard` files to. sess Optional `tensorflow` session that will be used if passed instead of creating or inferring one internally. """ super().__init__(meta=copy.deepcopy(DEFAULT_META_CFP)) params = locals() remove = ['self', 'predict', 'ae_model', 'enc_model', 'sess', '__class__'] for key in remove: params.pop(key) self.meta['params'].update(params) self.predict = predict # check if the passed object is a model and get session is_model = isinstance(predict, tf.keras.Model) model_sess = tf.compat.v1.keras.backend.get_session() is_ae = isinstance(ae_model, tf.keras.Model) is_enc = isinstance(enc_model, tf.keras.Model) self.meta['params'].update(is_model=is_model, is_ae=is_ae, is_enc=is_enc) # if session provided, use it if isinstance(sess, tf.Session): self.sess = sess else: self.sess = model_sess if is_model: # Keras or TF model self.model = True self.classes = self.predict.predict(np.zeros(shape)).shape[1] # type: ignore else: # black-box model self.model = False self.classes = self.predict(np.zeros(shape)).shape[1] if is_enc: self.enc_model = True else: self.enc_model = False if is_ae: self.ae_model = True else: self.ae_model = False if use_kdtree and self.enc_model: logger.warning('Both an encoder and k-d trees enabled. Using the encoder for the prototype loss term.') if use_kdtree or self.enc_model: self.enc_or_kdtree = True else: self.enc_or_kdtree = False self.meta['params'].update(enc_or_kdtree=self.enc_or_kdtree) if cat_vars: self.is_cat = True else: self.is_cat = False cat_vars = dict() # to avoid further None checks self.meta['params'].update(is_cat=self.is_cat) self.shape = shape self.kappa = kappa self.beta = beta self.gamma = gamma self.theta = theta self.ae = ae_model self.enc = enc_model self.cat_vars = cat_vars self.ohe = ohe self.use_kdtree = use_kdtree self.batch_size = shape[0] self.max_iterations = max_iterations self.c_init = c_init self.c_steps = c_steps self.feature_range = tuple([(np.ones(shape[1:]) * feature_range[_])[None, :] if isinstance(feature_range[_], float) else np.array(feature_range[_]) for _ in range(2)]) self.update_num_grad = update_num_grad self.eps = eps self.clip = clip self.write_dir = write_dir if self.is_cat: # compute dimensionality after conversion from OHE to ordinal encoding shape = ohe_to_ord_shape(shape, cat_vars=cat_vars, is_ohe=self.ohe) # define ragged tensor for mapping from categorical to numerical values self.map_cat_to_num = tf.ragged.constant([np.zeros(v) for _, v in cat_vars.items()]) # define placeholder for mapping which can be fed after the fit step max_key = max(cat_vars, key=cat_vars.get) # type: ignore[arg-type] # feature with the most categories self.max_cat = cat_vars[max_key] cat_keys = list(cat_vars.keys()) n_cat = len(cat_keys) self.assign_map = tf.placeholder(tf.float32, (n_cat, self.max_cat), name='assign_map') self.map_var = tf.Variable(np.zeros((n_cat, self.max_cat)), dtype=tf.float32, name='map_var') # update ragged mapping tensor lengths = [v for k, v in cat_vars.items()] map_cat_to_num_val = tf.RaggedTensor.from_tensor(self.map_var, lengths=list(lengths)) self.map_cat_to_num = tf.ragged.map_flat_values(tf.add, self.map_cat_to_num, map_cat_to_num_val) # store categorical columns assuming ordinal encoding # used for the mapping between numerical values and categories if self.ohe: cat_vars_ord = {} c, k = 0, 0 while c < self.shape[-1]: if c in cat_keys: v = cat_vars[c] cat_vars_ord[k] = v k += 1 c += v continue k += 1 c += 1 cat_keys_ord = list(cat_vars_ord.keys()) cat_cols_ord = tf.constant(cat_keys_ord, name='cat_keys_ord') else: cat_cols_ord = tf.constant(cat_keys, name='cat_keys_ord') # mapping from numerical values to categories and vice versa # supports mapping to and from both ordinal encoding and OHE def is_eq(col, cat_cols): """ Check if a column represents a categorical variable. Parameters ---------- col Column index to check. cat_cols Indices of categorical variables. Returns ------- Boolean whether the column is a categorical variable. """ eq = tf.math.equal(col, cat_cols) eq_any = tf.reduce_any(eq) return tf.equal(eq_any, tf.constant(True)) def cond_loop(icol, iohe, icat, adv_to_map, adv_map, map_cols): """ Condition for while loop, only iterate over columns of instance. Parameters ---------- icol Iteration over columns of instance. adv_to_map Instance that needs to be mapped from categories to numerical values or vice versa. map_cols Number of columns in instance to be mapped. Returns ------- Boolean whether condition is met. """ return tf.less(icol, tf.minimum(map_cols, tf.shape(adv_to_map)[1])) def body_ord_to_num(icol, iohe, icat, adv_to_map, adv_map, map_cols): """ Body executed in while loop when mapping ordinal categories to numerical values. Parameters ---------- icol Iteration over columns of instance. icat Iteration over categorical variables. adv_to_map Instance that needs to be mapped from categories to numerical values or vice versa. adv_map Mapped instance from categories to numerical values. """ # check if icol is a categorical variable eq_any_true = is_eq(icol, cat_cols_ord) # map category to its numerical value def true_fn(): try: return self.map_cat_to_num[icat][adv_to_map[0, icol]] except TypeError: # the value of adv_to_map[0, icol] is a float # TODO: check error type idx = round_grad(adv_to_map[0, icol]) return self.map_cat_to_num[icat][idx] def false_fn(): return adv_to_map[0, icol] # write column to array adv_map_col = tf.cond(eq_any_true, true_fn, false_fn) adv_map = adv_map.write(icol, adv_map_col) # increment icol = tf.add(icol, 1) icat = tf.cond(eq_any_true, lambda: tf.add(icat, 1), lambda: icat) # if categorical variable return [icol, iohe, icat, adv_to_map, adv_map, map_cols] def body_num_to_ord(icol, iohe, icat, adv_to_map, adv_map, map_cols): """ Body executed in while loop when mapping numerical values to ordinal categories. Parameters ---------- icol Iteration over columns of instance. icat Iteration over categorical variables. adv_to_map Instance that needs to be mapped from categories to numerical values or vice versa. adv_map Mapped instance from numerical values to categories. """ # check if icol is a categorical variable eq_any_true = is_eq(icol, cat_cols_ord) # map numerical value to category def true_fn(): return argmin_grad(adv_to_map[0, icol], self.map_cat_to_num[icat]) def false_fn(): return adv_to_map[0, icol] # write column to array adv_map_col = tf.cond(eq_any_true, true_fn, false_fn) adv_map = adv_map.write(icol, adv_map_col) # increment icol = tf.add(icol, 1) icat = tf.cond(eq_any_true, lambda: tf.add(icat, 1), lambda: icat) # if categorical variable return [icol, iohe, icat, adv_to_map, adv_map, map_cols] def body_ohe_to_num(icol, iohe, icat, adv_to_map, adv_map, map_cols): """ Body executed in while loop when mapping OHE categories to numerical values. Parameters ---------- icol Iteration over columns of instance. iohe Iteration over OHE columns of instance. icat Iteration over categorical variables. adv_to_map Instance that needs to be mapped from categories to numerical values or vice versa. adv_map Mapped instance from categories to numerical values. """ # check if icol is a categorical variable eq_any_true = is_eq(icol, cat_cols_ord) # nb of categories v = tf.cond(eq_any_true, lambda: tf.shape(self.map_cat_to_num[icat])[0], lambda: tf.constant(1)) # map category to its numerical value def true_fn(): adv_ord = argmax_grad(adv_to_map[0, iohe:iohe + v]) # map to ord return self.map_cat_to_num[icat][adv_ord] # map to num def false_fn(): return adv_to_map[0, iohe] # write column to array adv_map_col = tf.cond(eq_any_true, true_fn, false_fn) adv_map = adv_map.write(icol, adv_map_col) # increment icol = tf.add(icol, 1) iohe = tf.add(iohe, v) icat = tf.cond(eq_any_true, lambda: tf.add(icat, 1), lambda: icat) # if categorical variable return [icol, iohe, icat, adv_to_map, adv_map, map_cols] def body_num_to_ohe(icol, iohe, icat, adv_to_map, adv_map, map_cols): """ Body executed in while loop when mapping numerical values to OHE categories. Parameters ---------- icol Iteration over columns of instance. iohe Iteration over OHE columns of instance. icat Iteration over categorical variables. adv_to_map Instance that needs to be mapped from categories to numerical values or vice versa. adv_map Mapped instance from numerical values to categories. """ # check if icol is a categorical variable eq_any_true = is_eq(icol, cat_cols_ord) def true_fn(): cat_ord = argmin_grad(adv_to_map[0, icol], self.map_cat_to_num[icat]) # map to ord cat_ohe = one_hot_grad(cat_ord, self.map_cat_to_num[icat]) # map to OHE return cat_ohe def false_fn(): return tf.reshape(adv_to_map[0, icol], (1,)) # get OHE mapped columns adv_map_col = tf.cond(eq_any_true, true_fn, false_fn) def while_ohe(i_ohe, i_ohe_cat, adv_ohe): return tf.less(i_ohe_cat, tf.shape(adv_map_col)[0]) def body_ohe(i_ohe, i_ohe_cat, adv_ohe): i_write = tf.add(i_ohe_cat, i_ohe) adv_ohe = adv_ohe.write(i_write, adv_map_col[i_ohe_cat]) i_ohe_cat = tf.add(i_ohe_cat, 1) return [i_ohe, i_ohe_cat, adv_ohe] # write OHE columns to array iohe, iohecat, adv_map = tf.while_loop(while_ohe, body_ohe, [iohe, tf.constant(0), adv_map]) # increment icol = tf.add(icol, 1) iohe = tf.add(iohe, iohecat) icat = tf.cond(eq_any_true, lambda: tf.add(icat, 1), lambda: icat) # if categorical variable return [icol, iohe, icat, adv_to_map, adv_map, map_cols] def apply_map(adv_to_map, to_num): """ Apply mapping from numerical to ordinal or OHE categorical variables or vice versa for an instance. Parameters ---------- adv_to_map Instance to map. to_num Map from categorical to numerical values if ``True``, vice versa if ``False``. Returns ------- Mapped instance. """ icol = tf.constant(0) iohe = tf.constant(0) icat = tf.constant(0) if self.ohe: body_to_num, body_to_cat = body_ohe_to_num, body_num_to_ohe else: body_to_num, body_to_cat = body_ord_to_num, body_num_to_ord if self.ohe and not to_num: shape_adv_map = self.shape else: shape_adv_map = shape adv_map = tf.TensorArray(dtype=tf.float32, size=shape_adv_map[1]) loop_vars = (icol, iohe, icat, adv_to_map, adv_map, shape_adv_map[1]) if to_num: # map from categorical to numerical values _, _, _, _, adv_map, _ = tf.while_loop(cond_loop, body_to_num, loop_vars, parallel_iterations=1, back_prop=True) else: # map from numerical to categorical values _, _, _, _, adv_map, _ = tf.while_loop(cond_loop, body_to_cat, loop_vars, parallel_iterations=1, back_prop=True) adv_map_stack = tf.reshape(adv_map.stack(), shape_adv_map) return adv_map_stack # define tf variables for original and perturbed instances, and target labels self.orig = tf.Variable(np.zeros(shape), dtype=tf.float32, name='orig') self.adv = tf.Variable(np.zeros(shape), dtype=tf.float32, name='adv') self.adv_s = tf.Variable(np.zeros(shape), dtype=tf.float32, name='adv_s') self.target = tf.Variable(np.zeros((self.batch_size, self.classes)), dtype=tf.float32, name='target') # variable for target class proto if self.enc_model: self.shape_enc = self.enc.predict(np.zeros(self.shape)).shape # type: ignore[union-attr] else: self.shape_enc = shape self.target_proto = tf.Variable(np.zeros(self.shape_enc), dtype=tf.float32, name='target_proto') # define tf variable for constant used in FISTA optimization self.const = tf.Variable(np.zeros(self.batch_size), dtype=tf.float32, name='const') self.global_step = tf.Variable(0.0, trainable=False, name='global_step') # define placeholders that will be assigned to relevant variables self.assign_orig = tf.placeholder(tf.float32, shape, name='assign_orig') self.assign_adv = tf.placeholder(tf.float32, shape, name='assign_adv') self.assign_adv_s = tf.placeholder(tf.float32, shape, name='assign_adv_s') self.assign_target = tf.placeholder(tf.float32, (self.batch_size, self.classes), name='assign_target') self.assign_const = tf.placeholder(tf.float32, [self.batch_size], name='assign_const') self.assign_target_proto = tf.placeholder(tf.float32, self.shape_enc, name='assign_target_proto') # define conditions and values for element-wise shrinkage thresholding with tf.name_scope('shrinkage_thresholding') as scope: cond = [tf.cast(tf.greater(tf.subtract(self.adv_s, self.orig), self.beta), tf.float32), tf.cast(tf.less_equal(tf.abs(tf.subtract(self.adv_s, self.orig)), self.beta), tf.float32), tf.cast(tf.less(tf.subtract(self.adv_s, self.orig), tf.negative(self.beta)), tf.float32)] upper = tf.minimum(tf.subtract(self.adv_s, self.beta), tf.cast(feature_range[1], tf.float32)) lower = tf.maximum(tf.add(self.adv_s, self.beta), tf.cast(feature_range[0], tf.float32)) self.assign_adv = tf.multiply(cond[0], upper) + tf.multiply(cond[1], self.orig) + tf.multiply(cond[2], lower) # perturbation update and vector projection on correct feature range set with tf.name_scope('perturbation_y') as scope: self.zt = tf.divide(self.global_step, self.global_step + tf.cast(3, tf.float32)) self.assign_adv_s = self.assign_adv + tf.multiply(self.zt, self.assign_adv - self.adv) # map to feature space self.assign_adv_s = tf.minimum(self.assign_adv_s, tf.cast(feature_range[1], tf.float32)) self.assign_adv_s = tf.maximum(self.assign_adv_s, tf.cast(feature_range[0], tf.float32)) # assign counterfactual of step k+1 to k with tf.name_scope('update_adv') as scope: self.adv_updater = tf.assign(self.adv, self.assign_adv) self.adv_updater_s = tf.assign(self.adv_s, self.assign_adv_s) # from perturbed instance, derive deviation delta with tf.name_scope('update_delta') as scope: self.delta = self.orig - self.adv self.delta_s = self.orig - self.adv_s # define L1 and L2 loss terms; L1+L2 is later used as an optimization constraint for FISTA ax_sum = list(np.arange(1, len(shape))) with tf.name_scope('loss_l1_l2') as scope: self.l2 = tf.reduce_sum(tf.square(self.delta), axis=ax_sum) self.l2_s = tf.reduce_sum(tf.square(self.delta_s), axis=ax_sum) self.l1 = tf.reduce_sum(tf.abs(self.delta), axis=ax_sum) self.l1_s = tf.reduce_sum(tf.abs(self.delta_s), axis=ax_sum) self.l1_l2 = self.l2 + tf.multiply(self.l1, self.beta) self.l1_l2_s = self.l2_s + tf.multiply(self.l1_s, self.beta) # sum losses self.loss_l1 = tf.reduce_sum(self.l1) self.loss_l1_s = tf.reduce_sum(self.l1_s) self.loss_l2 = tf.reduce_sum(self.l2) self.loss_l2_s = tf.reduce_sum(self.l2_s) if self.is_cat: # map adv and adv_s to categories self.adv_cat = apply_map(self.adv, to_num=False) self.adv_cat_s = apply_map(self.adv_s, to_num=False) else: self.adv_cat = self.adv self.adv_cat_s = self.adv_s with tf.name_scope('loss_ae') as scope: # gamma * AE loss if self.ae_model: # run autoencoder self.adv_ae = self.ae(self.adv_cat) # type: ignore[misc] self.adv_ae_s = self.ae(self.adv_cat_s) # type: ignore[misc] if self.is_cat: # map output autoencoder back to numerical values self.adv_ae = apply_map(self.adv_ae, to_num=True) self.adv_ae_s = apply_map(self.adv_ae_s, to_num=True) # compute loss self.loss_ae = self.gamma * tf.square(tf.norm(self.adv_ae - self.adv)) self.loss_ae_s = self.gamma * tf.square(tf.norm(self.adv_ae_s - self.adv_s)) else: # no auto-encoder available self.loss_ae = tf.constant(0.) self.loss_ae_s = tf.constant(0.) with tf.name_scope('loss_attack') as scope: if not self.model: self.loss_attack = tf.placeholder(tf.float32) elif self.c_init == 0. and self.c_steps == 1: # prediction loss term not used # make predictions on perturbed instance self.pred_proba = self.predict(self.adv_cat) self.pred_proba_s = self.predict(self.adv_cat_s) self.loss_attack = tf.constant(0.) self.loss_attack_s = tf.constant(0.) else: # make predictions on perturbed instance self.pred_proba = self.predict(self.adv_cat) self.pred_proba_s = self.predict(self.adv_cat_s) # probability of target label prediction self.target_proba = tf.reduce_sum(self.target * self.pred_proba, 1) target_proba_s = tf.reduce_sum(self.target * self.pred_proba_s, 1) # max probability of non target label prediction self.nontarget_proba_max = tf.reduce_max((1 - self.target) * self.pred_proba - (self.target * 10000), 1) nontarget_proba_max_s = tf.reduce_max((1 - self.target) * self.pred_proba_s - (self.target * 10000), 1) # loss term f(x,d) loss_attack = tf.maximum(0.0, -self.nontarget_proba_max + self.target_proba + self.kappa) loss_attack_s = tf.maximum(0.0, -nontarget_proba_max_s + target_proba_s + self.kappa) # c * f(x,d) self.loss_attack = tf.reduce_sum(self.const * loss_attack) self.loss_attack_s = tf.reduce_sum(self.const * loss_attack_s) with tf.name_scope('loss_prototype') as scope: if self.enc_model: self.loss_proto = self.theta * tf.square( tf.norm(self.enc(self.adv_cat) - self.target_proto)) # type: ignore[misc] self.loss_proto_s = self.theta * tf.square( tf.norm(self.enc(self.adv_cat_s) - self.target_proto)) # type: ignore[misc] elif self.use_kdtree: self.loss_proto = self.theta * tf.square(tf.norm(self.adv - self.target_proto)) self.loss_proto_s = self.theta * tf.square(tf.norm(self.adv_s - self.target_proto)) else: # no encoder available and no k-d trees used self.loss_proto = tf.constant(0.) self.loss_proto_s = tf.constant(0.) with tf.name_scope('loss_combined') as scope: # no need for L1 term in loss to optimize when using FISTA if self.model: self.loss_opt = self.loss_attack_s + self.loss_l2_s + self.loss_ae_s + self.loss_proto_s else: # separate numerical computation of loss attack gradient self.loss_opt = self.loss_l2_s + self.loss_ae_s + self.loss_proto_s # add L1 term to overall loss; this is not the loss that will be directly optimized self.loss_total = (self.loss_attack + self.loss_l2 + self.loss_ae + tf.multiply(self.beta, self.loss_l1) + self.loss_proto) with tf.name_scope('training') as scope: self.learning_rate = tf.train.polynomial_decay(learning_rate_init, self.global_step, self.max_iterations, 0, power=0.5) optimizer = tf.train.GradientDescentOptimizer(self.learning_rate) start_vars = set(x.name for x in tf.global_variables()) # first compute, then apply grads self.compute_grads = optimizer.compute_gradients(self.loss_opt, var_list=[self.adv_s]) self.grad_ph = tf.placeholder(tf.float32, name='grad_adv_s') var = [tvar for tvar in tf.trainable_variables() if tvar.name.startswith('adv_s')][-1] # get the last in # case explainer is re-initialized and a new graph is created grad_and_var = [(self.grad_ph, var)] self.apply_grads = optimizer.apply_gradients(grad_and_var, global_step=self.global_step) end_vars = tf.global_variables() new_vars = [x for x in end_vars if x.name not in start_vars] # variables to initialize self.setup: list = [] self.setup.append(self.orig.assign(self.assign_orig)) self.setup.append(self.target.assign(self.assign_target)) self.setup.append(self.const.assign(self.assign_const)) self.setup.append(self.adv.assign(self.assign_adv)) self.setup.append(self.adv_s.assign(self.assign_adv_s)) self.setup.append(self.target_proto.assign(self.assign_target_proto)) if self.is_cat: self.setup.append(self.map_var.assign(self.assign_map)) self.init = tf.variables_initializer(var_list=[self.global_step] + [self.adv_s] + [self.adv] + new_vars) if self.write_dir is not None: self.writer = tf.summary.FileWriter(write_dir, tf.get_default_graph()) self.writer.add_graph(tf.get_default_graph()) else: self.writer = None
[docs] def fit(self, train_data: np.ndarray, trustscore_kwargs: Optional[dict] = None, d_type: str = 'abdm', w: Optional[float] = None, disc_perc: Sequence[Union[int, float]] = (25, 50, 75), standardize_cat_vars: bool = False, smooth: float = 1., center: bool = True, update_feature_range: bool = True) -> "CounterfactualProto": """ Get prototypes for each class using the encoder or k-d trees. The prototypes are used for the encoder loss term or to calculate the optional trust scores. Parameters ---------- train_data Representative sample from the training data. trustscore_kwargs Optional arguments to initialize the trust scores method. d_type Pairwise distance metric used for categorical variables. Currently, ``'abdm'``, ``'mvdm'`` and ``'abdm-mvdm'`` are supported. ``'abdm'`` infers context from the other variables while ``'mvdm'`` uses the model predictions. ``'abdm-mvdm'`` is a weighted combination of the two metrics. w Weight on ``'abdm'`` (between 0. and 1.) distance if `d_type` equals ``'abdm-mvdm'``. disc_perc List with percentiles used in binning of numerical features used for the ``'abdm'`` and ``'abdm-mvdm'`` pairwise distance measures. standardize_cat_vars Standardize numerical values of categorical variables if ``True``. smooth Smoothing exponent between 0 and 1 for the distances. Lower values will smooth the difference in distance metric between different features. center Whether to center the scaled distance measures. If ``False``, the min distance for each feature except for the feature with the highest raw max distance will be the lower bound of the feature range, but the upper bound will be below the max feature range. update_feature_range Update feature range with scaled values. """ # get params for storage in meta params = locals() remove = ['self', 'train_data'] for key in remove: params.pop(key) # update metadata self.meta['params'].update(params) if self.model: preds = np.argmax(self.predict.predict(train_data), axis=1) # type: ignore else: preds = np.argmax(self.predict(train_data), axis=1) self.cat_vars_ord: dict = dict() if self.is_cat: # compute distance metrics for categorical variables if self.ohe: # convert OHE to ordinal encoding train_data_ord, self.cat_vars_ord = ohe_to_ord(train_data, self.cat_vars) else: train_data_ord, self.cat_vars_ord = train_data, self.cat_vars # bin numerical features to compute the pairwise distance matrices cat_keys = list(self.cat_vars_ord.keys()) n_ord = train_data_ord.shape[1] numerical_feats = [feat for feat in range(n_ord) if feat not in cat_keys] if d_type in ['abdm', 'abdm-mvdm'] and len(cat_keys) != n_ord: fnames = [str(_) for _ in range(n_ord)] disc = Discretizer(train_data_ord, numerical_feats, fnames, percentiles=disc_perc) train_data_bin = disc.discretize(train_data_ord) cat_vars_bin = {k: len(disc.feature_intervals[k]) for k in range(n_ord) if k not in cat_keys} else: train_data_bin = train_data_ord cat_vars_bin = {} if d_type not in ['abdm', 'mvdm', 'abdm-mvdm']: raise ValueError('d_type needs to be "abdm", "mvdm" or "abdm-mvdm". ' '{} is not supported.'.format(d_type)) # pairwise distances for categorical variables if d_type == 'abdm': d_pair = abdm(train_data_bin, self.cat_vars_ord, cat_vars_bin) elif d_type == 'mvdm': d_pair = mvdm(train_data_ord, preds, self.cat_vars_ord, alpha=1) # combined distance measure if d_type == 'abdm-mvdm': if w is None: msg = "Must specify a value for `w` if using d_type='abdm-mvdm'" raise ValueError(msg) # pairwise distances d_abdm = abdm(train_data_bin, self.cat_vars_ord, cat_vars_bin) d_mvdm = mvdm(train_data_ord, preds, self.cat_vars_ord, alpha=1) # multidim scaled distances d_abs_abdm, _ = multidim_scaling(d_abdm, n_components=2, use_metric=True, feature_range=self.feature_range, # type: ignore[arg-type] standardize_cat_vars=standardize_cat_vars, smooth=smooth, center=center, update_feature_range=False) d_abs_mvdm, _ = multidim_scaling(d_mvdm, n_components=2, use_metric=True, feature_range=self.feature_range, # type: ignore[arg-type] standardize_cat_vars=standardize_cat_vars, smooth=smooth, center=center, update_feature_range=False) # combine abdm and mvdm self.d_abs: Dict = {} new_feature_range = tuple([f.copy() for f in self.feature_range]) for k, v in d_abs_abdm.items(): self.d_abs[k] = v * w + d_abs_mvdm[k] * (1 - w) if center: # center the numerical feature values self.d_abs[k] -= .5 * (self.d_abs[k].max() + self.d_abs[k].min()) if update_feature_range: new_feature_range[0][0, k] = self.d_abs[k].min() new_feature_range[1][0, k] = self.d_abs[k].max() if update_feature_range: # assign updated feature range self.feature_range = new_feature_range else: # apply multidimensional scaling for the abdm or mvdm distances self.d_abs, self.feature_range = multidim_scaling(d_pair, n_components=2, use_metric=True, feature_range=self.feature_range, # type: ignore standardize_cat_vars=standardize_cat_vars, smooth=smooth, center=center, update_feature_range=update_feature_range) # create array used for ragged tensor placeholder self.d_abs_ragged: Any = [] for _, v in self.d_abs.items(): n_pad = self.max_cat - len(v) v_pad = np.pad(v, (0, n_pad), 'constant') self.d_abs_ragged.append(v_pad) self.d_abs_ragged = np.array(self.d_abs_ragged) if self.enc_model: enc_data = self.enc.predict(train_data) # type: ignore[union-attr] self.class_proto: dict = {} self.class_enc: dict = {} for i in range(self.classes): idx = np.where(preds == i)[0] self.class_proto[i] = np.expand_dims(np.mean(enc_data[idx], axis=0), axis=0) self.class_enc[i] = enc_data[idx] elif self.use_kdtree: logger.warning('No encoder specified. Using k-d trees to represent class prototypes.') if trustscore_kwargs is not None: ts = TrustScore(**trustscore_kwargs) else: ts = TrustScore() if self.is_cat: # map categorical to numerical data train_data = ord_to_num(train_data_ord, self.d_abs) ts.fit(train_data, preds, classes=self.classes) self.kdtrees = ts.kdtrees self.X_by_class = ts.X_kdtree return self
[docs] def loss_fn(self, pred_proba: np.ndarray, Y: np.ndarray) -> np.ndarray: """ Compute the attack loss. Parameters ---------- pred_proba Prediction probabilities of an instance. Y One-hot representation of instance labels. Returns ------- Loss of the attack. """ # probability of target label prediction target_proba = np.sum(pred_proba * Y) # max probability of non target label prediction nontarget_proba_max = np.max((1 - Y) * pred_proba - 10000 * Y) # loss term f(x,d) loss = np.maximum(0., - nontarget_proba_max + target_proba + self.kappa) # c * f(x,d) loss_attack = np.sum(self.const.eval(session=self.sess) * loss) return loss_attack
[docs] def get_gradients(self, X: np.ndarray, Y: np.ndarray, grads_shape: tuple, cat_vars_ord: dict) -> np.ndarray: """ Compute numerical gradients of the attack loss term: `dL/dx = (dL/dP)*(dP/dx)` with `L = loss_attack_s; P = predict; x = adv_s`. Parameters ---------- X Instance around which gradient is evaluated. Y One-hot representation of instance labels. grads_shape Shape of gradients. cat_vars_ord Dict with as keys the categorical columns and as values the number of categories per categorical variable. Returns ------- Array with gradients. """ # map back to categories to make predictions if self.is_cat: X_pred = num_to_ord(X, self.d_abs) if self.ohe: X_pred = ord_to_ohe(X_pred, cat_vars_ord)[0] else: X_pred = X # N = gradient batch size; F = nb of features; P = nb of prediction classes; B = instance batch size # dL/dP -> BxP preds = self.predict(X_pred) # NxP preds_pert_pos, preds_pert_neg = perturb(preds, self.eps[0], proba=True) # (N*P)xP def f(preds_pert): return np.sum(Y * preds_pert, axis=1) def g(preds_pert): return np.max((1 - Y) * preds_pert, axis=1) # find instances where the gradient is 0 idx_nograd = np.where(f(preds) - g(preds) <= - self.kappa)[0] if len(idx_nograd) == X.shape[0]: return np.zeros(X.shape) dl_df = f(preds_pert_pos) - f(preds_pert_neg) # N*P dl_dg = g(preds_pert_pos) - g(preds_pert_neg) # N*P dl_dp = dl_df - dl_dg # N*P dl_dp = np.reshape(dl_dp, (X.shape[0], -1)) / (2 * self.eps[0]) # NxP # dP/dx -> PxF X_pert_pos, X_pert_neg = perturb(X, self.eps[1], proba=False) # (N*F)x(shape of X[0]) X_pert = np.concatenate([X_pert_pos, X_pert_neg], axis=0) if self.is_cat: X_pert = num_to_ord(X_pert, self.d_abs) if self.ohe: X_pert = ord_to_ohe(X_pert, cat_vars_ord)[0] preds_concat = self.predict(X_pert) n_pert = X_pert_pos.shape[0] dp_dx = preds_concat[:n_pert] - preds_concat[n_pert:] # (N*F)*P dp_dx = np.reshape(np.reshape(dp_dx, (X.shape[0], -1)), (X.shape[0], preds.shape[1], -1), order='F') / (2 * self.eps[1]) # NxPxF # dL/dx -> Bx(shape of X[0]) grads = np.einsum('ij,ijk->ik', dl_dp, dp_dx) # NxF # set instances where gradient is 0 to 0 if len(idx_nograd) > 0: grads[idx_nograd] = np.zeros(grads.shape[1:]) grads = np.mean(grads, axis=0) # B*F grads = np.reshape(grads, (self.batch_size,) + grads_shape) # B*(shape of X[0]) return grads
[docs] def score(self, X: np.ndarray, adv_class: int, orig_class: int, eps: float = 1e-10) -> float: """ Parameters ---------- X Instance to encode and calculate distance metrics for. adv_class Predicted class on the perturbed instance. orig_class Predicted class on the original instance. eps Small number to avoid dividing by 0. Returns ------- Ratio between the distance to the prototype of the predicted class for the original instance and \ the prototype of the predicted class for the perturbed instance. """ if self.enc_model: if self.is_cat: X = num_to_ord(X, self.d_abs) if self.ohe: X, _ = ord_to_ohe(X, self.cat_vars_ord) # TODO: (Arnaud) is this a genuine bug? X_enc = self.enc.predict(X) # type: ignore[union-attr] adv_proto = self.class_proto[adv_class] orig_proto = self.class_proto[orig_class] dist_adv = np.linalg.norm(X_enc - adv_proto) dist_orig = np.linalg.norm(X_enc - orig_proto) elif self.use_kdtree: dist_adv = self.kdtrees[adv_class].query(X, k=1)[0] dist_orig = self.kdtrees[orig_class].query(X, k=1)[0] else: logger.warning('Need either an encoder or the k-d trees enabled to compute distance scores.') return dist_orig / (dist_adv + eps) # type: ignore[return-value]
[docs] def attack(self, X: np.ndarray, Y: np.ndarray, target_class: Optional[list] = None, k: Optional[int] = None, k_type: str = 'mean', threshold: float = 0., verbose: bool = False, print_every: int = 100, log_every: int = 100) -> Tuple[np.ndarray, Tuple[np.ndarray, np.ndarray]]: """ Find a counterfactual (CF) for instance `X` using a fast iterative shrinkage-thresholding algorithm (FISTA). Parameters ---------- X Instance to attack. Y Labels for `X` as one-hot-encoding. target_class List with target classes used to find closest prototype. If ``None``, the nearest prototype except for the predict class on the instance is used. k Number of nearest instances used to define the prototype for a class. Defaults to using all instances belonging to the class if an encoder is used and to 1 for k-d trees. k_type Use either the average encoding of the k nearest instances in a class (``k_type='mean'``) or the k-nearest encoding in the class (``k_type='point'``) to define the prototype of that class. Only relevant if an encoder is used to define the prototypes. threshold Threshold level for the ratio between the distance of the counterfactual to the prototype of the predicted class for the original instance over the distance to the prototype of the predicted class for the counterfactual. If the trust score is below the threshold, the proposed counterfactual does not meet the requirements. verbose Print intermediate results of optimization if ``True``. print_every Print frequency if verbose is ``True``. log_every `tensorboard` log frequency if write directory is specified. Returns ------- Overall best attack and gradients for that attack. """ # make sure nb of instances in X equals batch size assert self.batch_size == X.shape[0] def compare(x: Union[float, int, np.ndarray], y: int) -> bool: """ Compare predictions with target labels and return whether counterfactual conditions hold. Parameters ---------- x Predicted class probabilities or labels. y Target or predicted labels. Returns ------- Bool whether counterfactual conditions hold. """ if not isinstance(x, (float, int, np.int64)): x = np.copy(x) x[y] += self.kappa x = np.argmax(x) # type: ignore[assignment] return x != y # define target classes for prototype if not specified yet if target_class is None: target_class = list(range(self.classes)) target_class.remove(np.argmax(Y, axis=1)) if verbose: print('Predicted class: {}'.format(np.argmax(Y, axis=1))) print('Target classes: {}'.format(target_class)) if self.is_cat and self.ohe: # map categorical to numerical data X_ord = ohe_to_ord(X, self.cat_vars)[0] X_num = ord_to_num(X_ord, self.d_abs) elif self.is_cat: X_num = ord_to_num(X, self.d_abs) else: X_num = X # find closest prototype in the target class list dist_proto = {} if self.enc_model: X_enc = self.enc.predict(X) # type: ignore[union-attr] class_dict = self.class_proto if k is None else self.class_enc for c, v in class_dict.items(): if c not in target_class: continue if k is None: dist_proto[c] = np.linalg.norm(X_enc - v) elif k is not None: dist_k = np.linalg.norm(X_enc.reshape(X_enc.shape[0], -1) - v.reshape(v.shape[0], -1), axis=1) idx = np.argsort(dist_k)[:k] if k_type == 'mean': dist_proto[c] = np.mean(dist_k[idx]) else: dist_proto[c] = dist_k[idx[-1]] self.class_proto[c] = np.expand_dims(np.mean(v[idx], axis=0), axis=0) elif self.use_kdtree: if k is None: k = 1 self.class_proto = {} for c in range(self.classes): if c not in target_class: continue dist_c, idx_c = self.kdtrees[c].query(X_num, k=k) dist_proto[c] = dist_c[0][-1] self.class_proto[c] = self.X_by_class[c][idx_c[0][-1]].reshape(1, -1) if self.enc_or_kdtree: self.id_proto = min(dist_proto, key=dist_proto.get) # type: ignore[arg-type] proto_val = self.class_proto[self.id_proto] if verbose: print('Prototype class: {}'.format(self.id_proto)) else: # no prototype loss term used proto_val = np.zeros(self.shape_enc) # set shape for perturbed instance and gradients pert_shape = ohe_to_ord_shape(self.shape, cat_vars=self.cat_vars, is_ohe=self.ohe) # set the lower and upper bounds for the constant 'c' to scale the attack loss term # these bounds are updated for each c_step iteration const_lb = np.zeros(self.batch_size) const = np.ones(self.batch_size) * self.c_init const_ub = np.ones(self.batch_size) * 1e10 # init values for the best attack instances for each instance in the batch overall_best_dist = [1e10] * self.batch_size overall_best_attack = [np.zeros(self.shape[1:])] * self.batch_size overall_best_grad = (np.zeros(self.shape), np.zeros(self.shape)) # keep track of counterfactual evolution self.cf_global: dict = {i: [] for i in range(self.c_steps)} # iterate over nb of updates for 'c' for _ in range(self.c_steps): # init variables self.sess.run(self.init) # reset current best distances and scores current_best_dist = [1e10] * self.batch_size current_best_proba = [-1] * self.batch_size # assign variables for the current iteration feed_dict = {self.assign_orig: X_num, self.assign_target: Y, self.assign_const: const, self.assign_adv: X_num, self.assign_adv_s: X_num, self.assign_target_proto: proto_val} if self.is_cat: feed_dict[self.assign_map] = self.d_abs_ragged self.sess.run(self.setup, feed_dict=feed_dict) X_der_batch: Any = [] X_der_batch_s: Any = [] for i in range(self.max_iterations): # numerical gradients grads_num = np.zeros(pert_shape) grads_num_s = np.zeros(pert_shape) # check if numerical gradient computation is needed if not self.model and (self.c_init != 0. or self.c_steps > 1): X_der = self.adv.eval(session=self.sess) X_der_s = self.adv_s.eval(session=self.sess) X_der_batch.append(X_der) X_der_batch_s.append(X_der_s) if i % self.update_num_grad == 0 and i > 0: # compute numerical gradients c = self.const.eval(session=self.sess) X_der_batch = np.concatenate(X_der_batch) X_der_batch_s = np.concatenate(X_der_batch_s) grads_num = self.get_gradients(X_der_batch, Y, cat_vars_ord=self.cat_vars_ord, grads_shape=pert_shape[1:]) * c grads_num_s = self.get_gradients(X_der_batch_s, Y, cat_vars_ord=self.cat_vars_ord, grads_shape=pert_shape[1:]) * c # clip gradients grads_num = np.clip(grads_num, self.clip[0], self.clip[1]) grads_num_s = np.clip(grads_num_s, self.clip[0], self.clip[1]) X_der_batch, X_der_batch_s = [], [] # compute and clip gradients defined in graph grads_vars_graph = self.sess.run(self.compute_grads) grads_graph = [g for g, _ in grads_vars_graph][0] grads_graph = np.clip(grads_graph, self.clip[0], self.clip[1]) # apply gradients grads = grads_graph + grads_num_s self.sess.run(self.apply_grads, feed_dict={self.grad_ph: grads}) # update adv and adv_s with perturbed instances self.sess.run([self.adv_updater, self.adv_updater_s, self.delta, self.delta_s]) # compute overall and attack loss, L1+L2 loss, prediction probabilities # on perturbed instances and new adv # L1+L2 and prediction probabilities used to see if adv is better than the current best adv under FISTA if self.model: loss_tot, loss_attack, loss_l1_l2, pred_proba, adv = \ self.sess.run([self.loss_total, self.loss_attack, self.l1_l2, self.pred_proba, self.adv]) else: X_der = self.adv.eval(session=self.sess) # get updated perturbed instances if self.is_cat: # map back to categories to make predictions X_der = num_to_ord(X_der, self.d_abs) if self.ohe: X_der = ord_to_ohe(X_der, self.cat_vars_ord)[0] pred_proba = self.predict(X_der) # compute attack, total and L1+L2 losses as well as new perturbed instance loss_attack = self.loss_fn(pred_proba, Y) feed_dict = {self.loss_attack: loss_attack} loss_tot, loss_l1_l2, adv = self.sess.run([self.loss_total, self.l1_l2, self.adv], feed_dict=feed_dict) if i % log_every == 0 or i % print_every == 0: loss_l2, loss_l1, loss_ae, loss_proto = \ self.sess.run([self.loss_l2, self.loss_l1, self.loss_ae, self.loss_proto]) target_proba = np.sum(pred_proba * Y) nontarget_proba_max = np.max((1 - Y) * pred_proba) loss_opt = loss_l1_l2 + loss_attack + loss_ae + loss_proto if i % log_every == 0 and self.writer is not None: lr, zt, gs = self.sess.run([self.learning_rate, self.zt, self.global_step]) # add values and images to tensorboard summary = tf.Summary() summary.value.add(tag='loss/Optimized', simple_value=loss_opt) summary.value.add(tag='loss/Total', simple_value=loss_tot) summary.value.add(tag='loss/L1', simple_value=loss_l1) summary.value.add(tag='loss/L2', simple_value=loss_l2) summary.value.add(tag='loss/AutoEncoder', simple_value=loss_ae) summary.value.add(tag='loss/ClassPrototype', simple_value=loss_proto) summary.value.add(tag='loss/PredScale', simple_value=const[0]) summary.value.add(tag='loss/PredLoss', simple_value=loss_attack) summary.value.add(tag='training/lr', simple_value=lr) summary.value.add(tag='training/z', simple_value=zt) summary.value.add(tag='training/GlobalStep', simple_value=gs) self.writer.add_summary(summary) self.writer.flush() if verbose and i % print_every == 0: print('\nIteration: {}; Const: {}'.format(i, const[0])) print('Loss total: {:.3f}, loss attack: {:.3f}'.format(loss_tot, loss_attack)) print('L2: {:.3f}, L1: {:.3f}, loss AE: {:.3f}'.format(loss_l2, loss_l1, loss_ae)) print('Loss proto: {:.3f}'.format(loss_proto)) print('Target proba: {:.2f}, max non target proba: {:.2f}'.format(target_proba, nontarget_proba_max)) print('Gradient graph min/max: {:.3f}/{:.3f}'.format(grads_graph.min(), grads_graph.max())) print('Gradient graph mean/abs mean: {:.3f}/{:.3f}' .format(np.mean(grads_graph), np.mean(np.abs(grads_graph)))) if not self.model: print('Gradient numerical attack min/max: {:.3f}/{:.3f}' .format(grads_num.min(), grads_num.max())) print('Gradient numerical mean/abs mean: {:.3f}/{:.3f}' # type: ignore[str-format] .format(np.mean(grads_num), np.mean(np.abs(grads_num)))) sys.stdout.flush() # update best perturbation (distance) and class probabilities # if beta * L1 + L2 < current best and predicted label is different from the initial label: # update best current step or global perturbations for batch_idx, (dist, proba, adv_idx) in enumerate(zip(loss_l1_l2, pred_proba, adv)): Y_class = np.argmax(Y[batch_idx]) adv_class = np.argmax(proba) adv_idx = np.expand_dims(adv_idx, axis=0) if self.is_cat: # map back to categories adv_idx = num_to_ord(adv_idx, self.d_abs) if self.ohe: # map back from ordinal to OHE adv_idx = ord_to_ohe(adv_idx, self.cat_vars_ord)[0] # calculate trust score if threshold > 0.: score = self.score(adv_idx, np.argmax(pred_proba), Y_class) # type: ignore above_threshold = score > threshold else: above_threshold = True # current step if (dist < current_best_dist[batch_idx] and compare(proba, Y_class) # type: ignore and above_threshold and adv_class in target_class): current_best_dist[batch_idx] = dist current_best_proba[batch_idx] = adv_class # type: ignore # global if (dist < overall_best_dist[batch_idx] and compare(proba, Y_class) # type: ignore and above_threshold and adv_class in target_class): if verbose: print('\nNew best counterfactual found!') overall_best_dist[batch_idx] = dist overall_best_attack[batch_idx] = adv_idx overall_best_grad = (grads_graph, grads_num) self.best_attack = True self.cf_global[_].append(adv_idx) # adjust the 'c' constant for the first loss term for batch_idx in range(self.batch_size): if (compare(current_best_proba[batch_idx], np.argmax(Y[batch_idx])) and # type: ignore current_best_proba[batch_idx] != -1): # want to refine the current best solution by putting more emphasis on the regularization terms # of the loss by reducing 'c'; aiming to find a perturbation closer to the original instance const_ub[batch_idx] = min(const_ub[batch_idx], const[batch_idx]) if const_ub[batch_idx] < 1e9: const[batch_idx] = (const_lb[batch_idx] + const_ub[batch_idx]) / 2 else: # no valid current solution; put more weight on the first loss term to try and meet the # prediction constraint before finetuning the solution with the regularization terms const_lb[batch_idx] = max(const_lb[batch_idx], const[batch_idx]) # update lower bound to constant if const_ub[batch_idx] < 1e9: const[batch_idx] = (const_lb[batch_idx] + const_ub[batch_idx]) / 2 else: const[batch_idx] *= 10 # return best overall attack best_attack = np.concatenate(overall_best_attack, axis=0) if best_attack.shape != self.shape: best_attack = np.expand_dims(best_attack, axis=0) return best_attack, overall_best_grad
[docs] def explain(self, X: np.ndarray, Y: Optional[np.ndarray] = None, target_class: Optional[list] = None, k: Optional[int] = None, k_type: str = 'mean', threshold: float = 0., verbose: bool = False, print_every: int = 100, log_every: int = 100) -> Explanation: """ Explain instance and return counterfactual with metadata. Parameters ---------- X Instances to attack. Y Labels for `X` as one-hot-encoding. target_class List with target classes used to find closest prototype. If ``None``, the nearest prototype except for the predict class on the instance is used. k Number of nearest instances used to define the prototype for a class. Defaults to using all instances belonging to the class if an encoder is used and to 1 for k-d trees. k_type Use either the average encoding of the `k` nearest instances in a class (``k_type='mean'``) or the k-nearest encoding in the class (``k_type='point'``) to define the prototype of that class. Only relevant if an encoder is used to define the prototypes. threshold Threshold level for the ratio between the distance of the counterfactual to the prototype of the predicted class for the original instance over the distance to the prototype of the predicted class for the counterfactual. If the trust score is below the threshold, the proposed counterfactual does not meet the requirements. verbose Print intermediate results of optimization if ``True``. print_every Print frequency if verbose is ``True``. log_every `tensorboard` log frequency if write directory is specified Returns ------- explanation `Explanation` object containing the counterfactual with additional metadata as attributes. See usage at `CFProto examples`_ for details. .. _CFProto examples: https://docs.seldon.io/projects/alibi/en/stable/methods/CFProto.html """ # get params for storage in meta params = locals() remove = ['self', 'X', 'Y'] for key in remove: params.pop(key) if X.shape[0] != 1: logger.warning('Currently only single instance explanations supported (first dim = 1), ' 'but first dim = %s', X.shape[0]) # output explanation dictionary data = copy.deepcopy(DEFAULT_DATA_CFP) if Y is None: if self.model: Y_proba = self.predict.predict(X) # type: ignore else: Y_proba = self.predict(X) Y_ohe = np.zeros(Y_proba.shape) Y_class = np.argmax(Y_proba, axis=1) Y_ohe[np.arange(Y_proba.shape[0]), Y_class] = 1 Y = Y_ohe.copy() data['orig_proba'] = Y_proba else: # provided one-hot-encoding of prediction on X data['orig_proba'] = None data['orig_class'] = np.argmax(Y, axis=1)[0] # find best counterfactual self.best_attack = False best_attack, grads = self.attack(X, Y=Y, target_class=target_class, k=k, k_type=k_type, verbose=verbose, threshold=threshold, print_every=print_every, log_every=log_every) if self.enc_or_kdtree: data['id_proto'] = self.id_proto # add to explanation dict if not self.best_attack: logger.warning('No counterfactual found!') # create explanation object explanation = Explanation(meta=copy.deepcopy(self.meta), data=data) return explanation data['all'] = self.cf_global data['cf'] = {} data['cf']['X'] = best_attack if self.model: Y_pert = self.predict.predict(best_attack) # type: ignore else: Y_pert = self.predict(best_attack) data['cf']['class'] = np.argmax(Y_pert, axis=1)[0] data['cf']['proba'] = Y_pert data['cf']['grads_graph'], data['cf']['grads_num'] = grads[0], grads[1] # create explanation object explanation = Explanation(meta=copy.deepcopy(self.meta), data=data) return explanation
[docs] def reset_predictor(self, predictor: Union[Callable, tf.keras.Model]) -> None: """ Resets the predictor function/model. Parameters ---------- predictor New predictor function/model. """ raise NotImplementedError('Resetting a predictor is currently not supported')