alibi.utils.distributed module

class alibi.utils.distributed.ActorPool(actors)[source]

Bases: object

__init__(actors)[source]

Taken fom the ray repository: https://github.com/ray-project/ray/pull/5945 . Create an actor pool from a list of existing actors. An actor pool is a utility class similar to multiprocessing.Pool that lets you schedule ray tasks over a fixed pool of actors.

Parameters:

actors – List of ray actor handles to use in this pool.

Examples

>>> a1, a2 = Actor.remote(), Actor.remote()
>>> pool = ActorPool([a1, a2])
>>> print(pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4]))
[2, 4, 6, 8]
get_next(timeout=None)[source]

Returns the next pending result in order. This returns the next result produced by alibi.utils.distributed.ActorPool.submit(), blocking for up to the specified timeout until it is available.

Returns:

The next result.

Raises:

TimeoutError – If the timeout is reached.

Examples

>>> pool = ActorPool(...)
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> print(pool.get_next())
2
get_next_unordered(timeout=None)[source]

Returns any of the next pending results. This returns some result produced by alibi.utils.distributed.ActorPool.submit(), blocking for up to the specified timeout until it is available. Unlike alibi.utils.distributed.ActorPool.get_next(), the results are not always returned in same order as submitted, which can improve performance.

Returns:

The next result.

Raises:

TimeoutError

Examples

>>> pool = ActorPool(...)
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> pool.submit(lambda a, v: a.double.remote(v), 2)
>>> print(pool.get_next_unordered())
4
>>> print(pool.get_next_unordered())
2
has_next()[source]

Returns whether there are any pending results to return.

Returns:

True if there are any pending results not yet returned.

Examples

>>> pool = ActorPool(...)
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> print(pool.has_next())
True
>>> print(pool.get_next())
2
>>> print(pool.has_next())
False
map(fn, values, chunksize=1)[source]

Apply the given function in parallel over the actors and values. This returns an ordered iterator that will return results of the map as they finish. Note that you must iterate over the iterator to force the computation to finish.

Parameters:
  • fn (Callable) – Function that takes (actor, value) as argument and returns an ObjectID computing the result over the value. The actor will be considered busy until the ObjectID completes.

  • values (list) – List of values that fn(actor, value) should be applied to.

  • chunksize (int) – Splits the list of values to be submitted to the parallel process into sublists of size chunksize or less.

Returns:

Iterator over results from applying fn to the actors and values.

Examples

>>> pool = ActorPool(...)
>>> print(pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4]))
[2, 4, 6, 8]
map_unordered(fn, values, chunksize=1)[source]

Similar to alibi.utils.distributed.ActorPool.map(), but returning an unordered iterator. This returns an unordered iterator that will return results of the map as they finish. This can be more efficient that alibi.utils.distributed.ActorPool.map() if some results take longer to compute than others.

Parameters:
  • fn (Callable) – Function that takes (actor, value) as argument and returns an ObjectID computing the result over the value. The actor will be considered busy until the ObjectID completes.

  • values (list) – List of values that fn(actor, value) should be applied to.

  • chunksize (int) – Splits the list of values to be submitted to the parallel process into sublists of size chunksize or less.

Returns:

Iterator over results from applying fn to the actors and values.

Examples

>>> pool = ActorPool(...)
>>> print(pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4]))
[6, 2, 4, 8]
submit(fn, value)[source]

Schedule a single task to run in the pool. This has the same argument semantics as alibi.utils.distributed.ActorPool.map(), but takes on a single value instead of a list of values. The result can be retrieved using alibi.utils.distributed.ActorPool.get_next() / alibi.utils.distributed.ActorPool.get_next_unordered().

Parameters:
  • fn (Callable) – Function that takes (actor, value) as argument and returns an ObjectID computing the result over the value. The actor will be considered busy until the ObjectID completes.

  • value (object) – Value to compute a result for.

Examples

>>> pool = ActorPool(...)
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> pool.submit(lambda a, v: a.double.remote(v), 2)
>>> print(pool.get_next(), pool.get_next())
2, 4
class alibi.utils.distributed.DistributedExplainer(distributed_opts, explainer_type, explainer_init_args, explainer_init_kwargs, concatenate_results=True, return_generator=False)[source]

Bases: object

A class that orchestrates the execution of the execution of a batch of explanations in parallel.

__getattr__(item)[source]

Accesses actor attributes. Use sparingly as this involves a remote call (that is, these attributes are of an object in a different process). The intended use is for retrieving any common state across the actor at the end of the computation in order to form the response (see notes 2 & 3).

Parameters:

item (str) – The explainer attribute to be returned.

Return type:

Any

Returns:

The value of the attribute specified by item.

Raises:

ValueError – If the actor index is invalid.

Notes

  1. This method assumes that the actor implements a return_attribute method.

  2. Note that we are indexing the idle actors. This means that if a pool was initialised with 5 actors and 3 are busy, indexing with index 2 will raise an IndexError.

  3. The order of _idle_actors constantly changes - an actor is removed from it if there is a task to execute and appended back when the task is complete. Therefore, indexing at the same position as computation proceeds will result in retrieving state from different processes.

__init__(distributed_opts, explainer_type, explainer_init_args, explainer_init_kwargs, concatenate_results=True, return_generator=False)[source]

Creates a pool of actors (i.e., replicas of an instantiated explainer_type in a separate process) which can explain batches of instances in parallel via calls to get_explanation.

Parameters:
  • distributed_opts (Dict[str, Any]) –

    A dictionary with the following type (minimal signature):

    class DistributedOpts(TypedDict):
        n_cpus: Optional[int]
        batch_size: Optional[int]
    

    The dictionary may contain two additional keys:

    • 'actor_cpu_frac' : (float, <= 1.0, >0.0) - This is used to create more than one process on one CPU/GPU. This may not speed up CPU intensive tasks but it is worth experimenting with when few physical cores are available. In particular, this is highly useful when the user wants to share a GPU for multiple tasks, with the caviat that the machine learning framework itself needs to support running multiple replicas on the same GPU. See the ray documentation here for details.

    • 'algorithm' : str - this is specified internally by the caller. It is used in order to register target function callbacks for the parallel pool These should be implemented in the global scope. If not specified, its value will be 'default', which will select a default target function which expects the actor has a get_explanation method.

  • explainer_type (Any) – Explainer class.

  • explainer_init_args (Tuple) – Positional arguments to explainer constructor.

  • explainer_init_kwargs (dict) – Keyword arguments to explainer constructor.

  • concatenate_results (bool) – If True concatenates the results. See alibi.utils.distributed.concatenate_minibatches() for more details.

  • return_generator (bool) – If True a generator that returns the results in the order the computation finishes is returned when get_explanation is called. Otherwise, the order of the results is the same as the order of the minibatches.

Notes

When return_generator=True, the caller has to take elements from the generator (e.g., by calling next) in order to start computing the results (because the ray pool is implemented as a generator).

property actor_index: int

Returns the index of the actor for which state is returned.

Return type:

int

concatenate: Callable
create_parallel_pool(explainer_type, explainer_init_args, explainer_init_kwargs)[source]

Creates a pool of actors that can explain the rows of a dataset in parallel.

Parameters:

documentation. (See constructor) –

get_explanation(X, **kwargs)[source]

Performs distributed explanations of instances in X.

Parameters:
  • X (ndarray) – A batch of instances to be explained. Split into batches according to the settings passed to the constructor.

  • **kwargs – Any keyword-arguments for the explainer explain method.

Return type:

Union[Generator[Tuple[int, Any], None, None], List[Any], Any]

Returns:

The explanations are returned as

  • a generator, if the return_generator option is specified. This is used so that the caller can access the results as they are computed. This is the only case when this method is non-blocking and the caller needs to call next on the generator to trigger the parallel computation.

  • a list of objects, whose type depends on the return type of the explainer. This is returned if no custom preprocessing function is specified.

  • an object, whose type depends on the return type of the concatenation function return when called with a list of minibatch results with the same order as the minibatches.

return_attribute(name)[source]

Returns an attribute specified by its name. Used in a distributed context where the properties cannot be accessed using the dot syntax.

Return type:

Any

set_actor_index(value)[source]

Sets actor index. This is used when the DistributedExplainer is in a separate process because ray does not support calling property setters remotely

class alibi.utils.distributed.PoolCollection(distributed_opts, explainer_type, explainer_init_args, explainer_init_kwargs, **kwargs)[source]

Bases: object

A wrapper object that turns a DistributedExplainer into a remote actor. This allows running multiple distributed explainers in parallel.

__getattr__(item)[source]

Access attributes of the distributed explainer or the distributed explainer contained.

Return type:

Any

__init__(distributed_opts, explainer_type, explainer_init_args, explainer_init_kwargs, **kwargs)[source]

Initialises a list of distinct distributed explainers which can explain the same batch in parallel. It generalizes the DistributedExplainer, which contains replicas of one explainer object, speeding up the task of explaining batches of instances.

Parameters:
  • distributed_opts (Dict[str, Any]) – See alibi.utils.distributed.DistributedExplainer() constructor documentation for explanations. Each entry in the list is a different explainer configuration (e.g., CEM in PN vs PP mode, different background dataset sizes for SHAP, etc).

  • explainer_type (Any) – See alibi.utils.distributed.DistributedExplainer() constructor documentation for explanations. Each entry in the list is a different explainer configuration (e.g., CEM in PN vs PP mode, different background dataset sizes for SHAP, etc).

  • explainer_init_args (List[Tuple]) – See alibi.utils.distributed.DistributedExplainer() constructor documentation for explanations. Each entry in the list is a different explainer configuration (e.g., CEM in PN vs PP mode, different background dataset sizes for SHAP, etc).

  • explainer_init_kwargs (List[Dict]) – See alibi.utils.distributed.DistributedExplainer() constructor documentation for explanations. Each entry in the list is a different explainer configuration (e.g., CEM in PN vs PP mode, different background dataset sizes for SHAP, etc).

  • **kwargs – Any other kwargs, passed to the DistributedExplainer objects.

Raises:
  • ResourceError – If the number of CPUs specified by the user is smaller than the number of distributed explainers.

  • ValueError – If the number of entries in the explainers args/kwargs list differ.

static create_explainer_handles(distributed_opts, explainer_type, explainer_init_args, explainer_init_kwargs, **kwargs)[source]

Creates multiple actors for DistributedExplainer so that tasks can be executed in parallel. The actors are initialised with different arguments, so they represent different explainers.

Parameters:
get_explanation(X, **kwargs)[source]

Calls a collection of distributed explainers in parallel. Each distributed explainer will explain each row in X in parallel.

Parameters:

X – Batch of instances to be explained.

Return type:

List

Returns:

A list of responses collected from each explainer.

Notes

Note that the call to ray.get is blocking.

Raises:

TypeError – If the user sets return_generator=True for the DistributedExplainer. This is because generators cannot be pickled so one cannot call ray.get.

property remote_explainer_index: int

Returns the index of the actor for which state is returned.

Return type:

int

exception alibi.utils.distributed.ResourceError[source]

Bases: Exception

alibi.utils.distributed.batch(X, batch_size=None, n_batches=4)[source]

Splits the input into sub-arrays.

Parameters:
  • X (ndarray) – Array to be split.

  • batch_size (Optional[int]) –

    The size of each batch. In particular

    • if batch_size is not None, batches of this size are created. The sizes of the batches created might vary if the 0-th dimension of X is not divisible by batch_size. For an array of length l that should be split into n sections, it returns l % n sub-arrays of size l//n + 1 and the rest of size l//n

    • if batch_size is None, then X is split into n_batches sub-arrays.

  • n_batches (int) – Number of batches in which to split the sub-array. Only used if batch_size = None

Return type:

List[ndarray]

Returns:

A list of sub-arrays of X.

alibi.utils.distributed.concatenate_minibatches(minibatch_results)[source]

Merges the explanations computed on minibatches so that the distributed explainer returns the same output as the sequential version. If the type returned by the explainer is not supported by the function, expand this function by adding an appropriately named private function and use this function to check the input type and call it.

Parameters:

minibatch_results (Union[List[ndarray], List[List[ndarray]]]) – Explanations for each minibatch.

Return type:

Union[ndarray, List[ndarray]]

Returns:

  • If the input is List[np.ndarray], a single numpy array obtained by concatenating minibatch results along the 0th axis.

  • If the input is List[List[np.ndarray]] A list of numpy arrays obtained by concatenating arrays in with the same position in the sublists along the 0th axis.

alibi.utils.distributed.default_target_fcn(actor, instances, kwargs=None)[source]

A target function that is executed in parallel given an actor pool. Its arguments must be an actor and a batch of values to be processed by the actor. Its role is to execute distributed computations when an actor is available.

Parameters:
  • actor (Any) – A ray actor. This is typically a class decorated with the @ray.remote decorator, that has been subsequently instantiated using cls.remote(*args, **kwargs).

  • instances (tuple) – A (batch_index, batch) tuple containing the batch of instances to be explained along with a batch index.

  • kwargs (Optional[Dict]) – A list of keyword arguments for the actor get_explanation method.

Returns:

A future that can be used to later retrieve the results of a distributed computation.

Notes

This function can be customized (e.g., if one does not desire to wrap the explainer such that it has get_explanation method. The customized function should be called *_target_fcn with the wildcard being replaced by the name of the explanation method (e.g., cem, cfproto, etc). The same name should be added to the distributed_opts dictionary passed by the user prior to instantiating the DistributedExplainer.

alibi.utils.distributed.invert_permutation(p)[source]

Inverts a permutation.

Parameters:

p (list) – Some permutation of 0, 1, …, len(p)-1. Returns an array s, where s[i] gives the index of i in p.

Return type:

ndarray

Returns:

ss[i] gives the index of i in p.

alibi.utils.distributed.order_result(unordered_result)[source]

Re-orders the result of a distributed explainer so that the explanations follow the same order as the input to the explainer.

Parameters:

unordered_result (Generator[Tuple[int, Any], None, None]) – Each tuple contains the batch id as the first entry and the explanations for that batch as the second.

Return type:

List

Returns:

A list with re-ordered results.

Notes

This should not be used if one wants to take advantage of the results being returned as they are calculated.