Source code for alibi.utils.distributed

import copy
import logging
from functools import partial
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union

import numpy as np
from scipy import sparse
import ray


logger = logging.getLogger(__name__)


[docs] class ActorPool(object): # TODO: JANIS: IF YOU DECIDE TO TAKE A DEPENDENCY ON RAY CORE, THEN THIS CLASS SHOULD INHERIT FROM # RAY.UTIL.ACTORPOOL, OVERRIDE MAP AND MAP_UNORDERED AND ADD _CHUNK STATIC METHOD.
[docs] def __init__(self, actors): """ Taken fom the `ray` repository: https://github.com/ray-project/ray/pull/5945 . Create an actor pool from a list of existing actors. An actor pool is a utility class similar to `multiprocessing.Pool` that lets you schedule `ray` tasks over a fixed pool of actors. Parameters ---------- actors List of `ray` actor handles to use in this pool. Examples --------- >>> a1, a2 = Actor.remote(), Actor.remote() >>> pool = ActorPool([a1, a2]) >>> print(pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])) [2, 4, 6, 8] """ self._idle_actors = list(actors) self._future_to_actor = {} self._index_to_future = {} self._next_task_index = 0 self._next_return_index = 0 self._pending_submits = []
[docs] def map(self, fn, values, chunksize=1): """Apply the given function in parallel over the `actors` and `values`. This returns an ordered iterator that will return results of the map as they finish. Note that you must iterate over the iterator to force the computation to finish. Parameters ---------- fn : Callable Function that takes `(actor, value)` as argument and returns an `ObjectID` computing the result over the `value`. The `actor` will be considered busy until the `ObjectID` completes. values : list List of values that `fn(actor, value)` should be applied to. chunksize : int Splits the list of values to be submitted to the parallel process into sublists of size chunksize or less. Returns ------- Iterator over results from applying `fn` to the `actors` and `values`. Examples -------- >>> pool = ActorPool(...) >>> print(pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])) [2, 4, 6, 8] """ values = self._chunk(values, chunksize=chunksize) for v in values: self.submit(fn, v) while self.has_next(): yield self.get_next()
[docs] def map_unordered(self, fn, values, chunksize=1): """ Similar to :py:meth:`alibi.utils.distributed.ActorPool.map`, but returning an unordered iterator. This returns an unordered iterator that will return results of the map as they finish. This can be more efficient that :py:meth:`alibi.utils.distributed.ActorPool.map` if some results take longer to compute than others. Parameters ---------- fn : Callable Function that takes `(actor, value)` as argument and returns an `ObjectID` computing the result over the `value`. The `actor` will be considered busy until the `ObjectID` completes. values : list List of values that `fn(actor, value)` should be applied to. chunksize : int Splits the list of values to be submitted to the parallel process into sublists of size chunksize or less. Returns ------- Iterator over results from applying `fn` to the `actors` and `values`. Examples -------- >>> pool = ActorPool(...) >>> print(pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])) [6, 2, 4, 8] """ values = self._chunk(values, chunksize=chunksize) for v in values: self.submit(fn, v) while self.has_next(): yield self.get_next_unordered()
[docs] def submit(self, fn: Callable, value: object): """ Schedule a single task to run in the pool. This has the same argument semantics as :py:meth:`alibi.utils.distributed.ActorPool.map`, but takes on a single value instead of a list of values. The result can be retrieved using :py:meth:`alibi.utils.distributed.ActorPool.get_next()` / :py:meth:`alibi.utils.distributed.ActorPool.get_next_unordered()`. Parameters ---------- fn Function that takes `(actor, value)` as argument and returns an `ObjectID` computing the result over the `value`. The `actor` will be considered busy until the `ObjectID` completes. value Value to compute a result for. Examples -------- >>> pool = ActorPool(...) >>> pool.submit(lambda a, v: a.double.remote(v), 1) >>> pool.submit(lambda a, v: a.double.remote(v), 2) >>> print(pool.get_next(), pool.get_next()) 2, 4 """ if self._idle_actors: actor = self._idle_actors.pop() future = fn(actor, value) self._future_to_actor[future] = (self._next_task_index, actor) self._index_to_future[self._next_task_index] = future self._next_task_index += 1 else: self._pending_submits.append((fn, value))
[docs] def has_next(self): """ Returns whether there are any pending results to return. Returns ------- ``True`` if there are any pending results not yet returned. Examples -------- >>> pool = ActorPool(...) >>> pool.submit(lambda a, v: a.double.remote(v), 1) >>> print(pool.has_next()) True >>> print(pool.get_next()) 2 >>> print(pool.has_next()) False """ return bool(self._future_to_actor)
[docs] def get_next(self, timeout=None): """ Returns the next pending result in order. This returns the next result produced by :py:meth:`alibi.utils.distributed.ActorPool.submit`, blocking for up to the specified timeout until it is available. Returns ------- The next result. Raises ------ TimeoutError If the timeout is reached. Examples --------- >>> pool = ActorPool(...) >>> pool.submit(lambda a, v: a.double.remote(v), 1) >>> print(pool.get_next()) 2 """ if not self.has_next(): raise StopIteration("No more results to get") if self._next_return_index >= self._next_task_index: raise ValueError("It is not allowed to call get_next() after " "get_next_unordered().") future = self._index_to_future[self._next_return_index] if timeout is not None: res, _ = ray.wait([future], timeout=timeout) if not res: raise TimeoutError("Timed out waiting for result") del self._index_to_future[self._next_return_index] self._next_return_index += 1 i, a = self._future_to_actor.pop(future) self._return_actor(a) return ray.get(future)
[docs] def get_next_unordered(self, timeout=None): """ Returns any of the next pending results. This returns some result produced by :py:meth:`alibi.utils.distributed.ActorPool.submit()`, blocking for up to the specified timeout until it is available. Unlike :py:meth:`alibi.utils.distributed.ActorPool.get_next()`, the results are not always returned in same order as submitted, which can improve performance. Returns ------- The next result. Raises ------ `TimeoutError` if the timeout is reached. Examples -------- >>> pool = ActorPool(...) >>> pool.submit(lambda a, v: a.double.remote(v), 1) >>> pool.submit(lambda a, v: a.double.remote(v), 2) >>> print(pool.get_next_unordered()) 4 >>> print(pool.get_next_unordered()) 2 """ if not self.has_next(): raise StopIteration("No more results to get") res, _ = ray.wait( list(self._future_to_actor), num_returns=1, timeout=timeout) if res: [future] = res else: raise TimeoutError("Timed out waiting for result") i, a = self._future_to_actor.pop(future) self._return_actor(a) del self._index_to_future[i] self._next_return_index = max(self._next_return_index, i + 1) return ray.get(future)
def _return_actor(self, actor): self._idle_actors.append(actor) if self._pending_submits: self.submit(*self._pending_submits.pop(0)) @staticmethod def _chunk(values: list, chunksize: int) -> Generator[List, None, None]: """Yield successive chunks of len=chunksize from values.""" for i in range(0, len(values), chunksize): yield values[i:i + chunksize]
[docs] def batch(X: np.ndarray, batch_size: Optional[int] = None, n_batches: int = 4) -> List[np.ndarray]: """ Splits the input into sub-arrays. Parameters ---------- X Array to be split. batch_size The size of each batch. In particular - if `batch_size` is not ``None``, batches of this size are created. The sizes of the batches created might \ vary if the 0-th dimension of `X` is not divisible by `batch_size`. For an array of length `l` that should \ be split into `n` sections, it returns `l % n` sub-arrays of size `l//n + 1` and the rest of `size l//n` - if `batch_size` is ``None``, then `X` is split into `n_batches` sub-arrays. n_batches Number of batches in which to split the sub-array. Only used if ``batch_size = None`` Returns ------ A list of sub-arrays of `X`. """ # noqa W605 n_records = X.shape[0] if isinstance(X, sparse.spmatrix): logger.warning("Batching function received sparse matrix input. Converting to dense matrix first...") X = X.toarray() if batch_size: n_batches = n_records // batch_size if n_records % batch_size != 0: n_batches += 1 slices = [batch_size * i for i in range(1, n_batches)] batches = np.array_split(X, slices) else: batches = np.array_split(X, n_batches) return batches
[docs] def default_target_fcn(actor: Any, instances: tuple, kwargs: Optional[Dict] = None): """ A target function that is executed in parallel given an actor pool. Its arguments must be an actor and a batch of values to be processed by the actor. Its role is to execute distributed computations when an actor is available. Parameters ---------- actor A `ray` actor. This is typically a class decorated with the `@ray.remote decorator`, that has been subsequently instantiated using ``cls.remote(*args, **kwargs)``. instances A `(batch_index, batch)` tuple containing the batch of instances to be explained along with a batch index. kwargs A list of keyword arguments for the actor `get_explanation` method. Returns ------- A future that can be used to later retrieve the results of a distributed computation. Notes ----- This function can be customized (e.g., if one does not desire to wrap the explainer such that it has `get_explanation` method. The customized function should be called `*_target_fcn` with the wildcard being replaced by the name of the explanation method (e.g., `cem`, `cfproto`, etc). The same name should be added to the `distributed_opts` dictionary passed by the user prior to instantiating the `DistributedExplainer`. """ if kwargs is None: kwargs = {} return actor.get_explanation.remote(instances, **kwargs)
[docs] def concatenate_minibatches(minibatch_results: Union[List[np.ndarray], List[List[np.ndarray]]]) -> \ Union[np.ndarray, List[np.ndarray]]: """ Merges the explanations computed on minibatches so that the distributed explainer returns the same output as the sequential version. If the type returned by the explainer is not supported by the function, expand this function by adding an appropriately named private function and use this function to check the input type and call it. Parameters ---------- minibatch_results Explanations for each minibatch. Returns ------- If the input is ``List[np.ndarray]``, a single `numpy` array obtained by concatenating `minibatch` results along \ the 0th axis. If the input is ``List[List[np.ndarray]]`` A list of `numpy` arrays obtained by concatenating arrays in with the \ same position in the sublists along the 0th axis. """ # noqa W605 if isinstance(minibatch_results[0], np.ndarray): return np.concatenate(minibatch_results, axis=0) elif isinstance(minibatch_results[0], list) and isinstance(minibatch_results[0][0], np.ndarray): return _array_list_concatenator(minibatch_results) # type: ignore else: raise TypeError( "Minibatch concatenation function is defined only for List[np.ndarray] and List[List[np.ndarray]]" )
def _array_list_concatenator(minibatch_results: List[List[np.ndarray]]) -> List[np.ndarray]: """ Concatenates the arrays with the same sublist index into a single array. """ n_classes = len(minibatch_results[0]) to_concatenate = [list(zip(*minibatch_results))[idx] for idx in range(n_classes)] concatenated = [np.concatenate(arrays, axis=0) for arrays in to_concatenate] return concatenated
[docs] def invert_permutation(p: list) -> np.ndarray: """ Inverts a permutation. Parameters ----------- p Some permutation of `0, 1, ..., len(p)-1`. Returns an array `s`, where `s[i]` gives the index of `i` in `p`. Returns ------- s `s[i]` gives the index of `i` in `p`. """ s = np.empty_like(p) s[p] = np.arange(len(p)) return s
[docs] def order_result(unordered_result: Generator[Tuple[int, Any], None, None]) -> List: """ Re-orders the result of a distributed explainer so that the explanations follow the same order as the input to the explainer. Parameters ---------- unordered_result Each tuple contains the batch id as the first entry and the explanations for that batch as the second. Returns ------- A list with re-ordered results. Notes ----- This should not be used if one wants to take advantage of the results being returned as they are calculated. """ result_order, results = list(zip(*[(idx, res) for idx, res in unordered_result])) orig_order = invert_permutation(list(result_order)) ordered_result = [results[idx] for idx in orig_order] return ordered_result
[docs] class ResourceError(Exception): pass
[docs] class DistributedExplainer: """ A class that orchestrates the execution of the execution of a batch of explanations in parallel. """ concatenate: Callable
[docs] def __init__(self, distributed_opts: Dict[str, Any], explainer_type: Any, explainer_init_args: Tuple, explainer_init_kwargs: dict, concatenate_results: bool = True, return_generator: bool = False): """ Creates a pool of actors (i.e., replicas of an instantiated `explainer_type` in a separate process) which can explain batches of instances in parallel via calls to `get_explanation`. Parameters ---------- distributed_opts A dictionary with the following type (minimal signature):: class DistributedOpts(TypedDict): n_cpus: Optional[int] batch_size: Optional[int] The dictionary may contain two additional keys: - ``'actor_cpu_frac'`` : ``(float, <= 1.0, >0.0)`` - This is used to create more than one process \ on one CPU/GPU. This may not speed up CPU intensive tasks but it is worth experimenting with when \ few physical cores are available. In particular, this is highly useful when the user wants to share \ a GPU for multiple tasks, with the caviat that the machine learning framework itself needs to \ support running multiple replicas on the same GPU. See the `ray` documentation `here`_ for details. .. _here: https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#fractional-resource-requirements - ``'algorithm'`` : ``str`` - this is specified internally by the caller. It is used in order to \ register target function callbacks for the parallel pool These should be implemented in the global \ scope. If not specified, its value will be ``'default'``, which will select a default target function \ which expects the actor has a `get_explanation` method. explainer_type Explainer class. explainer_init_args Positional arguments to explainer constructor. explainer_init_kwargs Keyword arguments to explainer constructor. concatenate_results If ``True`` concatenates the results. See :py:func:`alibi.utils.distributed.concatenate_minibatches` for more details. return_generator If ``True`` a generator that returns the results in the order the computation finishes is returned when `get_explanation` is called. Otherwise, the order of the results is the same as the order of the minibatches. Notes ----- When ``return_generator=True``, the caller has to take elements from the generator (e.g., by calling `next`) in order to start computing the results (because the `ray` pool is implemented as a generator). """ # noqa W605 self.n_processes = distributed_opts['n_cpus'] self.batch_size = distributed_opts['batch_size'] self.return_generator = return_generator self.concatenate_results = concatenate_results algorithm = distributed_opts.get('algorithm', 'default') if algorithm == 'default': logger.warning( "No algorithm specified in distributed option, default target function will be selected." ) self.target_fcn = default_target_fcn # check global scope for any specific target function if concatenate_results: self.concatenate = concatenate_minibatches if f"{algorithm}_target_fcn" in globals(): self.target_fcn = globals()[f"{algorithm}_target_fcn"] if not ray.is_initialized(): logger.info(f"Initialising ray on {self.n_processes} processes!") ray.init(num_cpus=self.n_processes) # a pool is a collection of handles to different processes that can process data points in parallel self.pool = self.create_parallel_pool( explainer_type, explainer_init_args, explainer_init_kwargs ) # use to retrieve state from different actors self._actor_index = 0
[docs] def __getattr__(self, item: str) -> Any: """ Accesses actor attributes. Use sparingly as this involves a remote call (that is, these attributes are of an object in a different process). *The intended use is for retrieving any common state across the actor at the end of the computation in order to form the response (see notes 2 & 3)*. Parameters ---------- item The explainer attribute to be returned. Returns ------- The value of the attribute specified by `item`. Raises ------ ValueError If the actor index is invalid. Notes ----- 1. This method assumes that the actor implements a `return_attribute` method. 2. Note that we are indexing the idle actors. This means that if a pool was initialised with 5 actors \ and 3 are busy, indexing with index 2 will raise an `IndexError`. 3. The order of `_idle_actors` constantly changes - an actor is removed from it if there is a task to \ execute and appended back when the task is complete. Therefore, indexing at the same position as \ computation proceeds will result in retrieving state from different processes. """ # noqa W605 if self._actor_index > self.n_processes - 1: raise ValueError(f"Index of actor should be less than or equal to {self.n_processes - 1}!") actor = self.pool._idle_actors[self._actor_index] # noqa return ray.get(actor.return_attribute.remote(item))
@property def actor_index(self) -> int: """ Returns the index of the actor for which state is returned. """ return self._actor_index @actor_index.setter def actor_index(self, value: int): """ Sets the actor index to allow retrieving state from a specific actor. """ self._actor_index = value
[docs] def set_actor_index(self, value: int): """ Sets actor index. This is used when the `DistributedExplainer` is in a separate process because `ray` does not support calling property setters remotely """ self._actor_index = value
[docs] def return_attribute(self, name: str) -> Any: """ Returns an attribute specified by its name. Used in a distributed context where the properties cannot be accessed using the dot syntax. """ # if the attribute requested from PoolCollection object, return it try: return self.__getattribute__(name) # else, it is an attribute of one of the actors except AttributeError: return self.__getattr__(name)
[docs] def create_parallel_pool(self, explainer_type: Any, explainer_init_args: Tuple, explainer_init_kwargs: dict): """ Creates a pool of actors that can explain the rows of a dataset in parallel. Parameters ---------- See constructor documentation. """ handles = [ray.remote(explainer_type) for _ in range(self.n_processes)] workers = [handle.remote(*explainer_init_args, **explainer_init_kwargs) for handle in handles] return ray.util.ActorPool(workers)
[docs] def get_explanation(self, X: np.ndarray, **kwargs) -> \ Union[Generator[Tuple[int, Any], None, None], List[Any], Any]: """ Performs distributed explanations of instances in `X`. Parameters ---------- X A batch of instances to be explained. Split into batches according to the settings passed to the constructor. **kwargs Any keyword-arguments for the explainer `explain` method. Returns -------- The explanations are returned as - a generator, if the `return_generator` option is specified. This is used so that the caller can access \ the results as they are computed. This is the only case when this method is non-blocking and the caller \ needs to call `next` on the generator to trigger the parallel computation. - a list of objects, whose type depends on the return type of the explainer. This is returned if no \ custom preprocessing function is specified. - an object, whose type depends on the return type of the concatenation function return when called with \ a list of minibatch results with the same order as the minibatches. """ # noqa E501 if kwargs is not None: self.target_fcn = partial(self.target_fcn, kwargs=kwargs) batched_instances = batch(X, self.batch_size, self.n_processes) # non-blocking, as results finish if self.return_generator: # note: do not use this option inside a remote object generators cannot be pickled return self.pool.map_unordered(self.target_fcn, enumerate(batched_instances)) # blocking, submit order explanations = self.pool.map(self.target_fcn, batched_instances) results = [minibatch_explanation for minibatch_explanation in explanations] if self.concatenate_results: return self.concatenate(results) return results
[docs] class PoolCollection: """ A wrapper object that turns a `DistributedExplainer` into a remote actor. This allows running multiple distributed explainers in parallel. """
[docs] def __init__(self, distributed_opts: Dict[str, Any], explainer_type: Any, explainer_init_args: List[Tuple], explainer_init_kwargs: List[Dict], **kwargs): """ Initialises a list of *distinct* distributed explainers which can explain the same batch in parallel. It generalizes the `DistributedExplainer`, which contains replicas of one explainer object, speeding up the task of explaining batches of instances. Parameters ---------- distributed_opts , explainer_type, explainer_init_args, explainer_init_kwargs See :py:meth:`alibi.utils.distributed.DistributedExplainer` constructor documentation for explanations. Each entry in the list is a different explainer configuration (e.g., CEM in PN vs PP mode, different background dataset sizes for SHAP, etc). **kwargs Any other kwargs, passed to the `DistributedExplainer` objects. Raises ------ ResourceError If the number of CPUs specified by the user is smaller than the number of distributed explainers. ValueError If the number of entries in the explainers args/kwargs list differ. """ # noqa W605 available_cpus = distributed_opts['n_cpus'] if len(explainer_init_args) != len(explainer_init_kwargs): raise ValueError( "To run multiple explainers over distinct parallel pools of replicas, the lists of args and kwargs" "should be of equal length." ) cpus_per_pool = available_cpus // len(explainer_init_args) if cpus_per_pool < 1: raise ResourceError( f"Running {explainer_type.__name__} requires {len(explainer_init_args)} CPU but only {available_cpus} " f"were specified. Please allocate more cpus to run this explainer in distributed mode" ) # we can allow users to experiment with CPU fractions if only 1 CPU available per pool actor_cpu_fraction = distributed_opts.get('actor_cpu_fraction', 1.0) if cpus_per_pool == 1: if actor_cpu_fraction is not None: cpus_per_pool /= actor_cpu_fraction if not ray.is_initialized(): logger.info(f"Initialising ray on {distributed_opts['n_cpus']} CPUs") ray.init(num_cpus=distributed_opts['n_cpus']) opts = copy.deepcopy(distributed_opts) opts.update(n_cpus=int(cpus_per_pool)) self.distributed_explainers = self.create_explainer_handles( opts, explainer_type, explainer_init_args, explainer_init_kwargs, **kwargs, ) self._remote_explainer_index = 0
@property def remote_explainer_index(self) -> int: """ Returns the index of the actor for which state is returned. """ return self._remote_explainer_index @remote_explainer_index.setter def remote_explainer_index(self, value: int): """ Sets the actor index to allow retrieving state from a specific actor. """ self._remote_explainer_index = value
[docs] def __getattr__(self, item: str) -> Any: """ Access attributes of the distributed explainer or the distributed explainer contained. """ if self._remote_explainer_index > len(self.distributed_explainers) - 1: raise ValueError( f"Index of explainer should be less than or equal to {len(self.distributed_explainers) - 1}!" ) actor = self.distributed_explainers[self._remote_explainer_index] # noqa return ray.get(actor.return_attribute.remote(item))
def __getitem__(self, item: int): if item > len(self.distributed_explainers) - 1: raise IndexError( f"Pool collection contains only {len(self.distributed_explainers)} distributed explainers." ) return self.distributed_explainers[item]
[docs] @staticmethod def create_explainer_handles(distributed_opts: Dict[str, Any], explainer_type: Any, explainer_init_args: List[Tuple], explainer_init_kwargs: List[Dict], **kwargs): """ Creates multiple actors for `DistributedExplainer` so that tasks can be executed in parallel. The actors are initialised with different arguments, so they represent different explainers. Parameters ---------- distributed_opts, explainer_type, explainer_init_args, explainer_init_kwargs, **kwargs See :py:meth:`alibi.utils.distributed.PoolCollection`. """ explainer_handles = [ray.remote(DistributedExplainer) for _ in range(len(explainer_init_args))] distributed_explainers = [] for handle, exp_args, exp_kwargs in zip(explainer_handles, explainer_init_args, explainer_init_kwargs): distributed_explainers.append(handle.remote( distributed_opts, explainer_type, exp_args, exp_kwargs, **kwargs) ) return distributed_explainers
[docs] def get_explanation(self, X, **kwargs) -> List: """ Calls a collection of distributed explainers in parallel. Each distributed explainer will explain each row in `X` in parallel. Parameters ---------- X Batch of instances to be explained. Returns ------- A list of responses collected from each explainer. Notes ----- Note that the call to `ray.get` is blocking. Raises ------ TypeError If the user sets ``return_generator=True`` for the DistributedExplainer. This is because generators cannot be pickled so one cannot call `ray.get`. """ return ray.get( [explainer.get_explanation.remote(X, **kwargs) for explainer in self.distributed_explainers] )