Source code for seldon_core.batch_processor

import copy
import json
import logging
import time
import uuid
from queue import Empty, Queue
from threading import Event, Thread
from typing import Dict, List, Tuple

import click
import numpy as np
import requests

from seldon_core.seldon_client import (
    SeldonCallCredentials,
    SeldonChannelCredentials,
    SeldonClient,
)

# lower case as grpc really does not like upper case here and rest does not care
SELDON_PUID_HEADER = "seldon-puid"

CHOICES_GATEWAY_TYPE = ["ambassador", "istio", "seldon"]
CHOICES_TRANSPORT = ["rest", "grpc"]
CHOICES_PAYLOAD_TYPE = ["ndarray", "tensor", "tftensor"]
CHOICES_DATA_TYPE = ["data", "json", "str", "raw"]
CHOICES_METHOD = ["predict", "feedback"]
CHOICES_LOG_LEVEL = {
    "debug": logging.DEBUG,
    "info": logging.INFO,
    "warning": logging.WARNING,
    "error": logging.ERROR,
}

logger = logging.getLogger(__name__)


[docs]def setup_logging(log_level: str): LOG_FORMAT = ( "%(asctime)s - batch_processor.py:%(lineno)s - %(levelname)s: %(message)s" ) logging.basicConfig(level=CHOICES_LOG_LEVEL[log_level], format=LOG_FORMAT)
[docs]def start_multithreaded_batch_worker( deployment_name: str, gateway_type: str, namespace: str, host: str, transport: str, data_type: str, payload_type: str, workers: int, retries: int, batch_size: int, input_data_path: str, output_data_path: str, method: str, log_level: str, benchmark: bool, batch_id: str, batch_interval: float, call_credentials_token: str, use_ssl: bool, ssl_verify: bool, ) -> None: """ Starts the multithreaded batch worker which consists of three worker types and two queues; the input_file_worker which reads a file and puts all lines in an input queue, which are then read by the multiple request_processor_workers (the number of parallel workers is specified by the workers param), which puts the output in the output queue and then the output_file_worker which puts all the outputs in the output file in a thread-safe approach. All parameters are defined and explained in detail in the run_cli function. """ setup_logging(log_level) start_time = time.time() out_queue_empty_event = Event() q_in = Queue(workers * 2) q_out = Queue(workers * 2) if method == "feedback" and data_type != "raw": raise RuntimeError("Feedback method is supported only with `raw` data type.") elif data_type not in ["data", "raw"] and batch_size > 1: raise RuntimeError( "Batch size greater than 1 is only supported for `data` data type." ) # Providing call credentials sets the REST transport protocol to https, # so we configure credentials even without a supplied token if use_ssl is set. credentials = None channel_credentials = None if use_ssl or len(call_credentials_token) > 0: token = None if len(call_credentials_token) > 0: token = call_credentials_token credentials = SeldonCallCredentials(token=token) channel_credentials = SeldonChannelCredentials(verify=ssl_verify) sc = SeldonClient( gateway=gateway_type, transport=transport, deployment_name=deployment_name, payload_type=payload_type, gateway_endpoint=host, namespace=namespace, client_return_type="dict", call_credentials=credentials, channel_credentials=channel_credentials, ) t_in = Thread( target=_start_input_file_worker, args=(q_in, input_data_path, batch_size), daemon=True, ) t_in.start() for _ in range(workers): Thread( target=_start_request_worker, args=( q_in, q_out, data_type, sc, method, retries, batch_id, payload_type, batch_interval, ), daemon=True, ).start() t_out = Thread( target=_start_output_file_worker, args=(q_out, output_data_path, out_queue_empty_event), ) t_out.start() # Make sure all data was loaded t_in.join() # Make sure all data was passed through both queues q_in.join() q_out.join() # Set event so output worker can close file once it's done with q_out queue out_queue_empty_event.set() # Wait for output worker to join main thread t_out.join() if benchmark: logger.debug(f"Elapsed time: {time.time() - start_time}")
def _start_input_file_worker( q_in: Queue, input_data_path: str, batch_size: int ) -> None: """ Runs logic for the input file worker which reads the input file from filestore and puts all of the lines into the input queue so it can be processed. Parameters --- q_in The queue to put all the data into for further processing input_data_path The local file to read the data from to be processed """ input_data_file = open(input_data_path, "r") enum_idx = 0 batch = [] for line in input_data_file: unique_id = str(uuid.uuid1()) batch.append((enum_idx, unique_id, line)) # If the batch to send is the size then push to queue and rest batch if len(batch) == batch_size: q_in.put(batch) batch = [] enum_idx += 1 if batch: q_in.put(batch) def _start_output_file_worker( q_out: Queue, output_data_path: str, stop_event: Event ) -> None: """ Runs logic for the output file worker which receives all the processed output from the request worker through the queue and adds it into the output file in a thread safe manner. Parameters --- q_out The queue to read the results from output_data_path The local file to write the results into """ counter = 0 with open(output_data_path, "w") as output_data_file: while not stop_event.is_set(): try: line = q_out.get(timeout=0.1) except Empty: continue output_data_file.write(f"{line}\n") q_out.task_done() counter += 1 if counter % 100 == 0: logger.info(f"Processed instances: {counter}") logger.info(f"Total processed instances: {counter}") def _start_request_worker( q_in: Queue, q_out: Queue, data_type: str, sc: SeldonClient, method: str, retries: int, batch_id: str, payload_type: str, batch_interval: float, ) -> None: """ Runs logic for the worker that sends requests from the queue until the queue gets completely empty. The worker marks the task as done when it finishes processing to ensure that the queue gets populated as it's currently configured with a threshold. Parameters --- q_in Queue to read the input data from q_out Queue to put the resulting requests into data_type The json/str/data/raw type to send the requests as sc An initialised Seldon Client configured to send the requests to method: Method to call: predict or feedback retries The number of attempts to try for each request batch_id The unique identifier for the batch which is passed to all requests """ while True: start_time = time.time() input_data = q_in.get() if method == "predict": # If we have a batch size > 1 then we wish to use the method for sending multiple predictions # as a single request and split the response into multiple responses. if len(input_data) > 1: str_outputs = _send_batch_predict_multi_request( input_data, data_type, sc, retries, batch_id, payload_type, ) for str_output in str_outputs: q_out.put(str_output) else: batch_idx, batch_instance_id, input_raw = input_data[0] str_output = _send_batch_predict( batch_idx, batch_instance_id, input_raw, data_type, sc, retries, batch_id, ) q_out.put(str_output) elif method == "feedback": batch_idx, batch_instance_id, input_raw = input_data[0] str_output = _send_batch_feedback( batch_idx, batch_instance_id, input_raw, data_type, sc, retries, batch_id, ) q_out.put(str_output) # Setting time interval before the task is marked as done if batch_interval > 0: remaining_interval = batch_interval - (time.time() - start_time) if remaining_interval > 0: time.sleep(remaining_interval) # Mark task as done in the queue to add space for new tasks q_in.task_done() def _extract_raw_data_multi_request( loaded_data: List[Dict], tags: Dict ) -> Tuple[Dict, str, Dict]: raw_input_tags = [d.get("meta", {}).get("tags", {}) for d in loaded_data] first_input = loaded_data[0] # Raw input format in mini-batch mode only work for "data" format if "data" not in first_input: raise ValueError( "raw input with predict in mini-batch mode requires data payload" ) # If-block for ndarray case elif "ndarray" in first_input["data"]: payload_type = "ndarray" names_list = [d["data"]["names"] for d in loaded_data] arrays = [np.array(d["data"]["ndarray"]) for d in loaded_data] if not all(names_list[0] == name for name in names_list): raise ValueError("All names in mini-batch must be the same.") for arr in arrays: if arr.shape[0] != 1: raise ValueError( "When using mini-batching each row should contain single instance." ) ndarray = np.concatenate(arrays) raw_data = { "data": {"names": names_list[0], "ndarray": ndarray.tolist()}, "meta": {"tags": tags}, } return raw_data, payload_type, raw_input_tags # If-block for tensor case elif "tensor" in first_input["data"]: payload_type = "tensor" names_list = [d["data"]["names"] for d in loaded_data] tensor_shapes = [d["data"]["tensor"]["shape"] for d in loaded_data] tensor_values = [d["data"]["tensor"]["values"] for d in loaded_data] if not all(names_list[0] == name for name in names_list): raise ValueError("All names in mini-batch must be the same.") dim_0 = 0 dim_1 = tensor_shapes[0][1] for shape in tensor_shapes: if shape[0] != 1: raise ValueError( "When using mini-batching each row should contain single instance." ) dim_0 += shape[0] if dim_1 != shape[1]: raise ValueError( "All instances in mini-batch must have same number of features." ) values = sum(tensor_values, []) shape = [dim_0, dim_1] raw_data = { "data": { "names": names_list[0], "tensor": {"shape": shape, "values": values}, }, "meta": {"tags": tags}, } return raw_data, payload_type, raw_input_tags def _send_batch_predict_multi_request( input_data: [], data_type: str, sc: SeldonClient, retries: int, batch_id: str, payload_type: str, ) -> [str]: """ Send an request using the Seldon Client with batch context including the unique ID of the batch and the Batch enumerated index as metadata. This function also uses the unique batch ID as request ID so the request can be traced back individually in the Seldon Request Logger context. Each request will be attempted for the number of retries, and will return the string serialised result. This method is similar to _send_batch_predict, but allows multiple requests to be combined into a single prediction. Parameters --- input_data The input data containing the indexes, instance_ids and predictions data_type The data type to send which can be `data` sc The instance of SeldonClient to use to send the requests to the seldon model retries The number of times to retry the request batch_id The unique identifier for the batch which is passed to all requests Returns --- A string serialised result of the response (or equivalent data with error info) """ indexes = [x[0] for x in input_data] seldon_puid = input_data[0][1] instance_ids = [f"{seldon_puid}-item-{n}" for n, _ in enumerate(input_data)] loaded_data = [json.loads(data[2]) for data in input_data] predict_kwargs = {} tags = { "batch_id": batch_id, } predict_kwargs["meta"] = tags predict_kwargs["headers"] = {SELDON_PUID_HEADER: seldon_puid} try: # Process raw input format if data_type == "raw": raw_data, payload_type, raw_input_tags = _extract_raw_data_multi_request( loaded_data, predict_kwargs["meta"] ) predict_kwargs["raw_data"] = raw_data else: # Initialise concatenated array for data arrays = [np.array(arr) for arr in loaded_data] for arr in arrays: if arr.shape[0] != 1: raise ValueError( "When using mini-batching each row should contain single instance." ) concat = np.concatenate(arrays) predict_kwargs["data"] = concat logger.debug(f"calling sc.predict with {predict_kwargs}") except Exception as e: error_resp = { "status": {"info": "FAILURE", "reason": str(e), "status": 1}, "meta": tags, } logger.error(f"Exception: {e}") str_output = json.dumps(error_resp) return [str_output] try: for i in range(retries): try: seldon_payload = sc.predict(**predict_kwargs) assert seldon_payload.success response = seldon_payload.response break except (requests.exceptions.RequestException, AssertionError) as e: logger.error( f"Exception: {e}, retries {i+1} / {retries} for batch_id(s)={indexes}" ) if i == (retries - 1): raise except Exception as e: output = [] for batch_index, batch_instance_id in zip(indexes, instance_ids): error_resp = { "status": {"info": "FAILURE", "reason": str(e), "status": 1}, "meta": dict( batch_index=batch_index, batch_instance_id=batch_instance_id, **tags ), } logger.error(f"Exception: {e}") output.append(json.dumps(error_resp)) return output # Take the response create new responses for each request responses = [] # If tensor then prepare the ndarray if payload_type == "tensor": tensor = np.array(response["data"]["tensor"]["values"]) shape = response["data"]["tensor"]["shape"] tensor_ndarray = tensor.reshape(shape) for i in range(len(input_data)): try: new_response = copy.deepcopy(response) if data_type == "raw": # This is for tags from model to take priority (match BATCH_SIZE: 1 behaviour) new_response["meta"]["tags"] = { **raw_input_tags[i], **new_response["meta"]["tags"], } if payload_type == "ndarray": # Format new responses for each original prediction request new_response["data"]["ndarray"] = [response["data"]["ndarray"][i]] new_response["meta"]["tags"]["batch_index"] = indexes[i] new_response["meta"]["tags"]["batch_instance_id"] = instance_ids[i] responses.append(json.dumps(new_response)) elif payload_type == "tensor": # Format new responses for each original prediction request new_response["data"]["tensor"]["shape"][0] = 1 new_response["data"]["tensor"]["values"] = np.ndarray.tolist( tensor_ndarray[i] ) new_response["meta"]["tags"]["batch_index"] = indexes[i] new_response["meta"]["tags"]["batch_instance_id"] = instance_ids[i] responses.append(json.dumps(new_response)) else: raise RuntimeError( "Only `ndarray` and `tensor` input are currently supported for batch size greater than 1." ) except Exception as e: error_resp = { "status": {"info": "FAILURE", "reason": str(e), "status": 1}, "meta": tags, } logger.error("Exception: %s" % e) responses.append(json.dumps(error_resp)) return responses def _send_batch_predict( batch_idx: int, batch_instance_id: int, input_raw: str, data_type: str, sc: SeldonClient, retries: int, batch_id: str, ) -> str: """ Send an request using the Seldon Client with batch context including the unique ID of the batch and the Batch enumerated index as metadata. This function also uses the unique batch ID as request ID so the request can be traced back individually in the Seldon Request Logger context. Each request will be attempted for the number of retries, and will return the string serialised result. Parameters --- batch_idx The enumerated index given to the batch datapoint in order of local dataset batch_instance_id The unique ID of the batch datapoint created with the python uuid function input_raw The raw input in string format to be loaded to the respective format data_type The data type to send which can be `str`, `json` and `data` sc The instance of SeldonClient to use to send the requests to the seldon model retries The number of times to retry the request batch_id The unique identifier for the batch which is passed to all requests Returns --- A string serialised result of the response (or equivalent data with error info) """ predict_kwargs = {} tags = { "batch_id": batch_id, "batch_instance_id": batch_instance_id, "batch_index": batch_idx, } predict_kwargs["meta"] = tags predict_kwargs["headers"] = {SELDON_PUID_HEADER: batch_instance_id} try: data = json.loads(input_raw) if data_type == "data": data_np = np.array(data) predict_kwargs["data"] = data_np elif data_type == "str": predict_kwargs["str_data"] = data elif data_type == "json": predict_kwargs["json_data"] = data elif data_type == "raw": # Make sure data contains meta.tags keys. data["meta"] = data.get("meta", {}) data["meta"]["tags"] = data["meta"].get("tags", {}) # Update them with our data["meta"]["tags"].update(tags) predict_kwargs["raw_data"] = data logger.debug(f"calling sc.predict with {predict_kwargs}") str_output = None for i in range(retries): try: seldon_payload = sc.predict(**predict_kwargs) assert seldon_payload.success str_output = json.dumps(seldon_payload.response) break except (requests.exceptions.RequestException, AssertionError) as e: logger.error( f"Exception: {e}, retries {i+1} / {retries} for batch_index={batch_idx}" ) if i == (retries - 1): raise except Exception as e: error_resp = { "status": {"info": "FAILURE", "reason": str(e), "status": 1}, "meta": tags, } logger.error("Exception: %s" % e) str_output = json.dumps(error_resp) return str_output def _send_batch_feedback( batch_idx: int, batch_instance_id: int, input_raw: str, data_type: str, sc: SeldonClient, retries: int, batch_id: str, ) -> str: """ Send an request using the Seldon Client with feedback Parameters --- batch_idx The enumerated index given to the batch datapoint in order of local dataset batch_instance_id The unique ID of the batch datapoint created with the python uuid function input_raw The raw input in string format to be loaded to the respective format data_type The data type to send which can be str, json and data sc The instance of SeldonClient to use to send the requests to the seldon model retries The number of times to retry the request batch_id The unique identifier for the batch which is passed to all requests Returns --- A string serialised result of the response (or equivalent data with error info) """ feedback_kwargs = {} meta = { "tags": { "batch_id": batch_id, "batch_instance_id": batch_instance_id, "batch_index": batch_idx, } } # Feedback protos do not support meta - defined to include in file output only. try: data = json.loads(input_raw) feedback_kwargs["raw_request"] = data str_output = None for i in range(retries): try: seldon_payload = sc.feedback(**feedback_kwargs) assert seldon_payload.success # Update Tags so we can track feedback instances in output file tags = seldon_payload.response.get("meta", {}).get("tags", {}) tags.update(meta["tags"]) if "meta" not in seldon_payload.response: seldon_payload.response["meta"] = {} seldon_payload.response["meta"]["tags"] = tags str_output = json.dumps(seldon_payload.response) break except (requests.exceptions.RequestException, AssertionError) as e: logger.error(f"Exception: {e}, retries {retries}") if i == (retries - 1): raise except Exception as e: error_resp = { "status": {"info": "FAILURE", "reason": str(e), "status": 1}, "meta": meta, } logger.error("Exception: %s" % e) str_output = json.dumps(error_resp) return str_output @click.command() @click.option( "--deployment-name", "-d", envvar="SELDON_BATCH_DEPLOYMENT_NAME", required=True, help="The name of the SeldonDeployment to send the requests to", ) @click.option( "--gateway-type", "-g", envvar="SELDON_BATCH_GATEWAY_TYPE", type=click.Choice(CHOICES_GATEWAY_TYPE), default="istio", help="The gateway type for the seldon model, which can be through the ingress provider (istio/ambassador) or directly through the service (seldon)", ) @click.option( "--namespace", "-n", envvar="SELDON_BATCH_NAMESPACE", default="default", help="The Kubernetes namespace where the SeldonDeployment is deployed in", ) @click.option( "--host", "-h", envvar="SELDON_BATCH_HOST", default="istio-ingressgateway.istio-system.svc.cluster.local:80", help="The hostname for the seldon model to send the request to, which can be the ingress of the Seldon model or the service itself", ) @click.option( "--transport", "-t", envvar="SELDON_BATCH_TRANSPORT", type=click.Choice(CHOICES_TRANSPORT), default="rest", help="The transport type of the SeldonDeployment model which can be REST or GRPC", ) @click.option( "--data-type", "-a", envvar="SELDON_BATCH_DATA_TYPE", type=click.Choice(CHOICES_DATA_TYPE), default="data", help="Whether to use json, strData or Seldon Data type for the payload to send to the SeldonDeployment which aligns with the SeldonClient format", ) @click.option( "--payload-type", "-p", envvar="SELDON_BATCH_PAYLOAD_TYPE", type=click.Choice(CHOICES_PAYLOAD_TYPE), default="ndarray", help="The payload type expected by the SeldonDeployment and hence the expected format for the data in the input file which can be an array", ) @click.option( "--workers", "-w", envvar="SELDON_BATCH_WORKERS", type=int, default=1, help="The number of parallel request processor workers to run for parallel processing", ) @click.option( "--retries", "-r", envvar="SELDON_BATCH_RETRIES", type=int, default=3, help="The number of retries for each request before marking an error", ) @click.option( "--input-data-path", "-i", envvar="SELDON_BATCH_INPUT_DATA_PATH", type=click.Path(), default="/assets/input-data.txt", help="The local filestore path where the input file with the data to process is located", ) @click.option( "--output-data-path", "-o", envvar="SELDON_BATCH_OUTPUT_DATA_PATH", type=click.Path(), default="/assets/input-data.txt", help="The local filestore path where the output file should be written with the outputs of the batch processing", ) @click.option( "--method", "-m", envvar="SELDON_BATCH_METHOD", type=click.Choice(CHOICES_METHOD), default="predict", help="The method of the SeldonDeployment to send the request to which currently only supports the predict method", ) @click.option( "--log-level", "-l", envvar="SELDON_BATCH_LOG_LEVEL", type=click.Choice(list(CHOICES_LOG_LEVEL)), default="info", help="The log level for the batch processor", ) @click.option( "--benchmark", "-b", envvar="SELDON_BATCH_BENCHMARK", is_flag=True, help="If true the batch processor will print the elapsed time taken to run the process", ) @click.option( "--batch-id", "-u", envvar="SELDON_BATCH_ID", default=str(uuid.uuid1()), type=str, help="Unique batch ID to identify all data points processed in this batch, if not provided is auto generated", ) @click.option( "--batch-size", "-s", envvar="SELDON_BATCH_SIZE", default=1, type=int, help="Batch size greater than 1 can be used to group multiple predictions into a single request.", ) @click.option( "--batch-interval", "-t", envvar="SELDON_BATCH_MIN_INTERVAL", default=0, type=float, help="Minimum Time interval (in seconds) between batch predictions made by every worker.", ) @click.option( "--use-ssl", envvar="SELDON_BATCH_USE_SSL", default=False, type=bool, help="Whether to use https rather than http as the REST transport protocol.", ) @click.option( "--call-credentials-token", envvar="SELDON_BATCH_CALL_CREDENTIALS_TOKEN", default="", type=str, help="Auth token used by Seldon Client, if supplied and using REST the transport protocol will be https.", ) @click.option( "--ssl-verify", envvar="SELDON_BATCH_SSL_VERIFY", default=True, type=bool, help="Can be set to false to avoid SSL verification in REST.", ) def run_cli( deployment_name: str, gateway_type: str, namespace: str, host: str, transport: str, data_type: str, payload_type: str, workers: int, retries: int, batch_size: int, input_data_path: str, output_data_path: str, method: str, log_level: str, benchmark: bool, batch_id: str, batch_interval: float, call_credentials_token: str, use_ssl: bool, ssl_verify: bool, ): """ Command line interface for Seldon Batch Processor, which can be used to send requests through configurable parallel workers to Seldon Core models. It is recommended that the respective Seldon Core model is also optimized with number of replicas to distribute and scale out the batch processing work. The processor is able to process data from local filestore input file in various formats supported by the SeldonClient module. It is also suggested to use the batch processor component integrated with an ETL Workflow Manager such as Kubeflow, Argo Pipelines, Airflow, etc. which would allow for extra setup / teardown steps such as downloading the data from object store or starting a seldon core model with replicas. See the Seldon Core examples folder for implementations of this batch module with Seldon Core. """ start_multithreaded_batch_worker( deployment_name, gateway_type, namespace, host, transport, data_type, payload_type, workers, retries, batch_size, input_data_path, output_data_path, method, log_level, benchmark, batch_id, batch_interval, call_credentials_token, use_ssl, ssl_verify, ) if __name__ == "__main__": run_cli()