Source code for seldon_core.microservice_tester

import argparse
import json
import logging
from typing import Dict, List, Tuple, Union

import numpy as np
from numpy.core.multiarray import ndarray

from seldon_core.seldon_client import SeldonClient


[docs]class SeldonTesterException(Exception): def __init__(self, message): super().__init__(message)
[docs]def gen_continuous( f_range: Tuple[Union[float, str], Union[float, str]], n: int ) -> np.ndarray: """ Create a continuous feature based on given range Parameters ---------- f_range A range tuple n The number of feature values to create Returns ------- """ if f_range[0] == "inf" and f_range[1] == "inf": return np.random.normal(size=n) if f_range[0] == "inf": return f_range[1] - np.random.lognormal(size=n) if f_range[1] == "inf": return f_range[0] + np.random.lognormal(size=n) return np.random.uniform(f_range[0], f_range[1], size=n)
[docs]def reconciliate_cont_type(feature: np.ndarray, dtype: str) -> np.ndarray: """ Ensure numpy arrays are always of type float Parameters ---------- feature Feature values dtype The required type in the contract Returns ------- Numpy array """ if dtype == "FLOAT": return feature elif dtype == "INT": return (feature + 0.5).astype(int).astype(float) else: raise SeldonTesterException(f"Unknown dtype in reconciliate_cont_type {dtype}")
[docs]def gen_categorical(values: List[str], n: List[int]) -> np.ndarray: """ Generate a random categorical feature Parameters ---------- values The list of categorical values n The number of features to create Returns ------- Numpy array of values """ vals = np.random.randint(len(values), size=n) return np.array(values)[vals]
[docs]def generate_batch(contract: Dict, n: int, field: str) -> np.ndarray: feature_batches = [] ty_set = set() for feature_def in contract[field]: ty_set.add(feature_def["ftype"]) if feature_def["ftype"] == "continuous": if "range" in feature_def: f_range = feature_def["range"] else: f_range = ["inf", "inf"] if "shape" in feature_def: shape = [n] + feature_def["shape"] else: shape = [n, 1] batch = gen_continuous(f_range, shape) batch = np.around(batch, decimals=3) batch = reconciliate_cont_type(batch, feature_def["dtype"]) elif feature_def["ftype"] == "categorical": batch = gen_categorical(feature_def["values"], [n, 1]) else: raise SeldonTesterException(f"Unknown feature type {feature_def['ftype']}") feature_batches.append(batch) if len(ty_set) == 1: return np.concatenate(feature_batches, axis=1) else: out = np.empty((n, len(contract["features"])), dtype=object) return np.concatenate(feature_batches, axis=1, out=out)
[docs]def unfold_contract(contract: Dict) -> Dict: """ Expand contract to full version Parameters ---------- contract The Seldon Contract Returns ------- Full contract """ unfolded_contract: Dict = {} unfolded_contract["targets"] = [] unfolded_contract["features"] = [] for feature in contract["features"]: if feature.get("repeat") is not None: for i in range(feature.get("repeat")): new_feature: Dict = {} new_feature.update(feature) new_feature["name"] = feature["name"] + str(i + 1) del new_feature["repeat"] unfolded_contract["features"].append(new_feature) else: unfolded_contract["features"].append(feature) for target in contract["targets"]: if target.get("repeat") is not None: for i in range(target.get("repeat")): new_target: Dict = {} new_target.update(target) new_target["name"] = target["name"] + ":" + str(i) del new_target["repeat"] unfolded_contract["targets"].append(new_target) else: unfolded_contract["targets"].append(target) return unfolded_contract
[docs]def get_class_names(contract: Dict) -> List[str]: names = [] for feature in contract["features"]: names.append(feature["name"]) return names
[docs]def run_send_feedback(args): """ Make a feedback call to microservice Parameters ---------- args Command line args """ contract = json.load(open(args.contract, "r")) contract = unfold_contract(contract) endpoint = args.host + ":" + str(args.port) sc = SeldonClient(microservice_endpoint=endpoint) for i in range(args.n_requests): batch = generate_batch(contract, args.batch_size, "features") if args.prnt: print("-" * 40) print("SENDING NEW REQUEST:") if not args.grpc: transport = "rest" else: transport = "grpc" if args.tensor: payload_type = "tensor" else: payload_type = "ndarray" response_predict = sc.microservice( data=batch, transport=transport, payload_type=payload_type, method="predict" ) response_feedback = sc.microservice_feedback( prediction_request=response_predict.request, prediction_response=response_predict.response, reward=1.0, transport=transport, ) if args.prnt: print(f"RECEIVED RESPONSE:\n{response_feedback}\n")
[docs]def run_method(args, method): """ Make a predict call to microservice Parameters ---------- args Command line args """ contract = json.load(open(args.contract, "r")) contract = unfold_contract(contract) feature_names = [feature["name"] for feature in contract["features"]] endpoint = f"{args.host}:{args.port}" sc = SeldonClient(microservice_endpoint=endpoint) for i in range(args.n_requests): batch: ndarray = generate_batch(contract, args.batch_size, "features") if args.prnt: print(f"{'-' * 40}\nSENDING NEW REQUEST:\n") print(batch) transport = "grpc" if args.grpc else "rest" payload_type = "tensor" if args.tensor else "ndarray" response = sc.microservice( data=batch, transport=transport, method=method, payload_type=payload_type, names=feature_names, ) if args.prnt: print(f"RECEIVED RESPONSE:\n{response.response}\n")
[docs]def main(): parser = argparse.ArgumentParser() parser.add_argument( "contract", type=str, help="File that contains the data contract" ) parser.add_argument("host", type=str) parser.add_argument("port", type=int) parser.add_argument( "--endpoint", type=str, choices=["predict", "send-feedback", "transform-input"], default="predict", ) parser.add_argument("-b", "--batch-size", type=int, default=1) parser.add_argument("-n", "--n-requests", type=int, default=1) parser.add_argument("--grpc", action="store_true") parser.add_argument("-t", "--tensor", action="store_true") parser.add_argument( "-p", "--prnt", action="store_true", help="Prints requests and responses" ) parser.add_argument( "--log-level", type=str, choices=["DEBUG", "INFO", "ERROR"], default="ERROR" ) args = parser.parse_args() LOG_FORMAT = ( "%(asctime)s - %(name)s:%(funcName)s:%(lineno)s - %(levelname)s: %(message)s" ) if args.log_level == "DEBUG": log_level = logging.DEBUG elif args.log_level == "INFO": log_level = logging.INFO else: log_level = logging.ERROR logging.basicConfig(level=log_level, format=LOG_FORMAT) if args.endpoint == "predict" or args.endpoint == "transform-input": run_method(args, args.endpoint) elif args.endpoint == "send-feedback": run_send_feedback(args)
if __name__ == "__main__": main()