import http.client as http_client
import json
import logging
from typing import Dict, Iterable, List, Optional, Tuple, Union
import grpc
import numpy as np
import requests
from google.protobuf import any_pb2, json_format
from seldon_core.proto import prediction_pb2, prediction_pb2_grpc
from seldon_core.utils import (
array_to_grpc_datadef,
feedback_to_json,
json_to_feedback,
json_to_seldon_message,
seldon_message_to_json,
seldon_messages_to_json,
)
logger = logging.getLogger(__name__)
[docs]class SeldonClientException(Exception):
"""
Seldon Client Exception
"""
status_code = 400
def __init__(self, message):
Exception.__init__(self)
self.message = message
[docs]class SeldonChannelCredentials:
"""
Channel credentials.
Presently just denotes an SSL connection.
For GRPC in order to be properly implemented, you need to provide *either*
the root_certificate_files, *or* all the file paths.
The verify attribute currently is used to avoid SSL verification in REST
however for GRPC it is recommended that you provide a path at least for the
root_certificates_file otherwise it may not work as expected.
"""
def __init__(
self,
verify: bool = True,
root_certificates_file: str = None,
private_key_file: str = None,
certificate_chain_file: str = None,
):
self.verify = verify
self.root_certificates_file = root_certificates_file
self.private_key_file = private_key_file
self.certificate_chain_file = certificate_chain_file
[docs]class SeldonCallCredentials:
"""
Credentials for each call, currently implements the ability to provide
an OAuth token which is currently made available through REST via
the X-Auth-Token header, and via GRPC via the metadata call creds.
"""
def __init__(self, token: str = None):
self.token = token
[docs]class SeldonClientPrediction:
"""
Data class to return from Seldon Client
"""
def __init__(
self,
request: Optional[Union[prediction_pb2.SeldonMessage, Dict]],
response: Optional[Union[prediction_pb2.SeldonMessage, Dict]],
success: bool = True,
msg: str = "",
):
self.request = request
self.response = response
self.success = success
self.msg = msg
def __repr__(self):
return "Success:%s message:%s\nRequest:\n%s\nResponse:\n%s" % (
self.success,
self.msg,
self.request,
self.response,
)
[docs]class SeldonClientFeedback:
"""
Data class to return from Seldon Client for feedback calls
"""
def __init__(
self,
request: Optional[prediction_pb2.Feedback],
response: Optional[Union[prediction_pb2.SeldonMessage, Dict]],
success: bool = True,
msg: str = "",
):
self.request = request
self.response = response
self.success = success
self.msg = msg
def __repr__(self):
return "Success:%s message:%s\nRequest:\n%s\nResponse:\n%s" % (
self.success,
self.msg,
self.request,
self.response,
)
[docs]class SeldonClientCombine:
"""
Data class to return from Seldon Client for aggregate calls
"""
def __init__(
self,
request: Optional[prediction_pb2.SeldonMessageList],
response: Optional[prediction_pb2.SeldonMessage],
success: bool = True,
msg: str = "",
):
self.request = request
self.response = response
self.success = success
self.msg = msg
def __repr__(self):
return "Success:%s message:%s\nRequest:\n%s\nResponse:\n%s" % (
self.success,
self.msg,
self.request,
self.response,
)
[docs]class SeldonClient:
"""
A reference Seldon API Client
"""
[docs] def __init__(
self,
gateway: str = "ambassador",
transport: str = "rest",
namespace: str = None,
deployment_name: str = None,
payload_type: str = "tensor",
gateway_endpoint: str = "localhost:8003",
microservice_endpoint: str = "localhost:5000",
grpc_max_send_message_length: int = 4 * 1024 * 1024,
grpc_max_receive_message_length: int = 4 * 1024 * 1024,
channel_credentials: SeldonChannelCredentials = None,
call_credentials: SeldonCallCredentials = None,
debug: bool = False,
client_return_type: str = "dict",
ssl: bool = None,
):
"""
Parameters
----------
gateway
API Gateway - either ambassador, istio or seldon
transport
API transport - grpc or rest
namespace
k8s namespace of running deployment
deployment_name
name of seldon deployment
payload_type
type of payload - tensor, ndarray or tftensor
gateway_endpoint
Gateway endpoint
microservice_endpoint
Running microservice endpoint
grpc_max_send_message_length
Max grpc send message size in bytes
grpc_max_receive_message_length
Max grpc receive message size in bytes
client_return_type
the return type of all functions can be either dict or proto
"""
if debug:
logger.setLevel(logging.DEBUG)
http_client.HTTPConnection.debuglevel = 1
self.config = locals().copy()
del self.config["self"]
logger.debug("Configuration:" + str(self.config))
def _gather_args(self, **kwargs):
c2 = {**self.config}
c2.update({k: v for k, v in kwargs.items() if v is not None})
return c2
def _validate_args(
self,
gateway: str = None,
transport: str = None,
method: str = None,
data: np.ndarray = None,
client_return_type: str = "dict",
**kwargs,
):
"""
Internal method to validate parameters
Parameters
----------
gateway
API gateway
transport
API transport
method
The method to call
data
Numpy data to send
kwargs
Returns
-------
"""
if not (gateway == "ambassador" or gateway == "seldon" or gateway == "istio"):
raise SeldonClientException(
"Valid values for gateway are 'ambassador', 'istio', or 'seldon'"
)
if not (transport == "rest" or transport == "grpc"):
raise SeldonClientException(
"Valid values for transport are 'rest' or 'grpc'"
)
if not (
method == "predict"
or method == "route"
or method == "aggregate"
or method == "transform-input"
or method == "transform-output"
or method == "send-feedback"
or method is None
):
raise SeldonClientException(
"Valid values for method are 'predict', 'route', 'transform-input', 'transform-output', 'aggregate' or None"
)
if not (data is None or isinstance(data, np.ndarray)):
raise SeldonClientException("Valid values for data are None or numpy array")
if not (client_return_type == "proto" or client_return_type == "dict"):
raise SeldonClientException(
"Valid values for client_return_type are proto or dict"
)
[docs] def predict(
self,
gateway: str = None,
transport: str = None,
deployment_name: str = None,
payload_type: str = None,
gateway_endpoint: str = None,
microservice_endpoint: str = None,
method: str = None,
shape: Tuple = (1, 1),
namespace: str = None,
data: np.ndarray = None,
bin_data: Union[bytes, bytearray] = None,
str_data: str = None,
json_data: Union[str, List, Dict] = None,
custom_data: any_pb2.Any = None,
names: Iterable[str] = None,
gateway_prefix: str = None,
headers: Dict = None,
http_path: str = None,
meta: Dict = None,
client_return_type: str = None,
raw_data: Dict = None,
ssl: bool = None,
) -> SeldonClientPrediction:
"""
Parameters
----------
gateway
API Gateway - either ambassador, istio or seldon
transport
API transport - grpc or rest
namespace
k8s namespace of running deployment
deployment_name
name of seldon deployment
payload_type
type of payload - tensor, ndarray or tftensor
gateway_endpoint
Gateway endpoint
microservice_endpoint
Running microservice endpoint
grpc_max_send_message_length
Max grpc send message size in bytes
grpc_max_receive_message_length
Max grpc receive message size in bytes
data
Numpy Array Payload to send
bin_data
Binary payload to send - will override data
str_data
String payload to send - will override data
json_data
JSON payload to send - will override data
custom_data
Custom payload to send - will override data
names
Column names
gateway_prefix
prefix path for gateway URL endpoint
headers
Headers to add to request
http_path:
Custom http path for predict call to use
meta:
Custom meta map, supplied as tags
client_return_type
the return type of all functions can be either dict or proto
raw_data
Raw payload, a dictionary representing the json request or the raw grpc proto
Returns
-------
"""
k = self._gather_args(
gateway=gateway,
transport=transport,
deployment_name=deployment_name,
payload_type=payload_type,
gateway_endpoint=gateway_endpoint,
microservice_endpoint=microservice_endpoint,
method=method,
shape=shape,
namespace=namespace,
names=names,
data=data,
bin_data=bin_data,
str_data=str_data,
json_data=json_data,
custom_data=custom_data,
gateway_prefix=gateway_prefix,
headers=headers,
http_path=http_path,
meta=meta,
client_return_type=client_return_type,
raw_data=raw_data,
ssl=ssl,
)
self._validate_args(**k)
if k["gateway"] == "ambassador" or k["gateway"] == "istio":
if k["transport"] == "rest":
return rest_predict_gateway(**k)
elif k["transport"] == "grpc":
return grpc_predict_gateway(**k)
else:
raise SeldonClientException("Unknown transport " + k["transport"])
elif k["gateway"] == "seldon":
if k["transport"] == "rest":
return rest_predict_seldon(**k)
elif k["transport"] == "grpc":
return grpc_predict_seldon(**k)
else:
raise SeldonClientException("Unknown transport " + k["transport"])
else:
raise SeldonClientException("Unknown gateway " + k["gateway"])
[docs] def feedback(
self,
prediction_request: prediction_pb2.SeldonMessage = None,
prediction_response: prediction_pb2.SeldonMessage = None,
prediction_truth: prediction_pb2.SeldonMessage = None,
reward: float = 0,
gateway: str = None,
transport: str = None,
deployment_name: str = None,
payload_type: str = None,
gateway_endpoint: str = None,
microservice_endpoint: str = None,
method: str = None,
shape: Tuple = (1, 1),
namespace: str = None,
gateway_prefix: str = None,
client_return_type: str = None,
raw_request: dict = None,
ssl: bool = None,
) -> SeldonClientFeedback:
"""
Parameters
----------
prediction_request
Previous prediction request
prediction_response
Previous prediction response
reward
A reward to send in feedback
gateway
API Gateway - either ambassador, istio or seldon
transport
API transport - grpc or rest
deployment_name
name of seldon deployment
payload_type
payload - tensor, ndarray or tftensor
gateway_endpoint
Gateway endpoint
microservice_endpoint
Running microservice endpoint
grpc_max_send_message_length
Max grpc send message size in bytes
grpc_max_receive_message_length
Max grpc receive message size in bytes
method
The microservice method to call
shape
The shape of the data to send
namespace
k8s namespace of running deployment
client_return_type
the return type of all functions can be either dict or proto
Returns
-------
"""
k = self._gather_args(
gateway=gateway,
transport=transport,
deployment_name=deployment_name,
payload_type=payload_type,
gateway_endpoint=gateway_endpoint,
microservice_endpoint=microservice_endpoint,
method=method,
shape=shape,
namespace=namespace,
gateway_prefix=gateway_prefix,
client_return_type=client_return_type,
raw_request=raw_request,
ssl=ssl,
)
self._validate_args(**k)
if k["gateway"] == "ambassador" or k["gateway"] == "istio":
if k["transport"] == "rest":
return rest_feedback_gateway(
prediction_request,
prediction_response,
prediction_truth,
reward,
**k,
)
elif k["transport"] == "grpc":
return grpc_feedback_gateway(
prediction_request,
prediction_response,
prediction_truth,
reward,
**k,
)
else:
raise SeldonClientException("Unknown transport " + k["transport"])
elif k["gateway"] == "seldon":
if k["transport"] == "rest":
return rest_feedback_seldon(
prediction_request,
prediction_response,
prediction_truth,
reward,
**k,
)
elif k["transport"] == "grpc":
return grpc_feedback_seldon(
prediction_request,
prediction_response,
prediction_truth,
reward,
**k,
)
else:
raise SeldonClientException("Unknown transport " + k["transport"])
else:
raise SeldonClientException("Unknown gateway " + k["gateway"])
[docs] def explain(
self,
gateway: str = None,
transport: str = None,
deployment_name: str = None,
payload_type: str = None,
gateway_endpoint: str = None,
shape: Tuple = (1, 1),
namespace: str = None,
data: np.ndarray = None,
bin_data: Union[bytes, bytearray] = None,
str_data: str = None,
json_data: str = None,
names: Iterable[str] = None,
gateway_prefix: str = None,
headers: Dict = None,
http_path: str = None,
client_return_type: str = None,
predictor: str = None,
ssl: bool = None,
) -> Dict:
"""
Parameters
----------
gateway
API Gateway - either ambassador, istio or seldon
transport
API transport - grpc or rest
namespace
k8s namespace of running deployment
deployment_name
name of seldon deployment
payload_type
type of payload - tensor, ndarray or tftensor
gateway_endpoint
Gateway endpoint
microservice_endpoint
Running microservice endpoint
grpc_max_send_message_length
Max grpc send message size in bytes
grpc_max_receive_message_length
Max grpc receive message size in bytes
data
Numpy Array Payload to send
bin_data
Binary payload to send - will override data
str_data
String payload to send - will override data
json_data
JSON payload to send - will override data
names
Column names
gateway_prefix
prefix path for gateway URL endpoint
headers
Headers to add to request
http_path:
Custom http path for predict call to use
client_return_type
the return type of all functions can be either dict or proto
predictor
The name of the predictor to send the explanations to
Returns
-------
"""
k = self._gather_args(
gateway=gateway,
transport=transport,
deployment_name=deployment_name,
payload_type=payload_type,
gateway_endpoint=gateway_endpoint,
shape=shape,
namespace=namespace,
names=names,
data=data,
bin_data=bin_data,
str_data=str_data,
json_data=json_data,
gateway_prefix=gateway_prefix,
headers=headers,
http_path=http_path,
client_return_type=client_return_type,
predictor=predictor,
ssl=ssl,
)
self._validate_args(**k)
if k["gateway"] == "ambassador" or k["gateway"] == "istio":
if k["transport"] == "rest":
return explain_predict_gateway(**k)
elif k["transport"] == "grpc":
raise SeldonClientException("gRPC not supported for explain")
else:
raise SeldonClientException("Unknown transport " + k["transport"])
else:
raise SeldonClientException("Unknown gateway " + k["gateway"])
[docs] def microservice(
self,
gateway: str = None,
transport: str = None,
deployment_name: str = None,
payload_type: str = None,
gateway_endpoint: str = None,
microservice_endpoint: str = None,
method: str = None,
shape: Tuple = (1, 1),
namespace: str = None,
data: np.ndarray = None,
datas: List[np.ndarray] = None,
ndatas: int = None,
bin_data: Union[bytes, bytearray] = None,
str_data: str = None,
json_data: Union[str, List, Dict] = None,
custom_data: any_pb2.Any = None,
names: Iterable[str] = None,
) -> Union[SeldonClientPrediction, SeldonClientCombine]:
"""
Parameters
----------
gateway
API Gateway - either ambassador, istio or seldon
transport
API transport - grpc or rest
deployment_name
name of seldon deployment
payload_type
payload - tensor, ndarray or tftensor
gateway_endpoint
Gateway endpoint
microservice_endpoint
Running microservice endpoint
grpc_max_send_message_length
Max grpc send message size in bytes
grpc_max_receive_message_length
Max grpc receive message size in bytes
method
The microservice method to call
shape
The shape of the data to send
namespace
k8s namespace of running deployment
data
Numpy Array Payload to send
bin_data
Binary payload to send - will override data
str_data
String payload to send - will override data
json_data
String payload to send - will override data
custom_data
Custom payload to send - will override data
ndatas
Multiple numpy arrays to send for aggregation
bin_data
Binary data payload
str_data
String data payload
names
Column names
Returns
-------
A prediction result
"""
k = self._gather_args(
gateway=gateway,
transport=transport,
deployment_name=deployment_name,
payload_type=payload_type,
gateway_endpoint=gateway_endpoint,
microservice_endpoint=microservice_endpoint,
method=method,
shape=shape,
namespace=namespace,
datas=datas,
ndatas=ndatas,
names=names,
data=data,
bin_data=bin_data,
str_data=str_data,
json_data=json_data,
custom_data=custom_data,
)
self._validate_args(**k)
if k["transport"] == "rest":
if (
k["method"] == "predict"
or k["method"] == "transform-input"
or k["method"] == "transform-output"
or k["method"] == "route"
):
return microservice_api_rest_seldon_message(**k)
elif k["method"] == "aggregate":
return microservice_api_rest_aggregate(**k)
else:
raise SeldonClientException("Unknown method " + k["method"])
elif k["transport"] == "grpc":
if (
k["method"] == "predict"
or k["method"] == "transform-input"
or k["method"] == "transform-output"
or k["method"] == "route"
):
return microservice_api_grpc_seldon_message(**k)
elif k["method"] == "aggregate":
return microservice_api_grpc_aggregate(**k)
else:
raise SeldonClientException("Unknown method " + k["method"])
else:
raise SeldonClientException("Unknown transport " + k["transport"])
[docs] def microservice_feedback(
self,
prediction_request: prediction_pb2.SeldonMessage = None,
prediction_response: prediction_pb2.SeldonMessage = None,
reward: float = 0,
gateway: str = None,
transport: str = None,
deployment_name: str = None,
payload_type: str = None,
gateway_endpoint: str = None,
microservice_endpoint: str = None,
method: str = None,
shape: Tuple = (1, 1),
namespace: str = None,
) -> SeldonClientFeedback:
"""
Parameters
----------
prediction_request
Previous prediction request
prediction_response
Previous prediction response
reward
A reward to send in feedback
gateway
API Gateway - either Gateway or seldon
transport
API transport - grpc or rest
deployment_name
name of seldon deployment
payload_type
payload - tensor, ndarray or tftensor
gateway_endpoint
Gateway endpoint
microservice_endpoint
Running microservice endpoint
grpc_max_send_message_length
Max grpc send message size in bytes
grpc_max_receive_message_length
Max grpc receive message size in bytes
method
The microservice method to call
shape
The shape of the data to send
namespace
k8s namespace of running deployment
Returns
-------
A client response
"""
k = self._gather_args(
gateway=gateway,
transport=transport,
deployment_name=deployment_name,
payload_type=payload_type,
gateway_endpoint=gateway_endpoint,
microservice_endpoint=microservice_endpoint,
method=method,
shape=shape,
namespace=namespace,
)
self._validate_args(**k)
if k["transport"] == "rest":
return microservice_api_rest_feedback(
prediction_request, prediction_response, reward, **k
)
else:
return microservice_api_grpc_feedback(
prediction_request, prediction_response, reward, **k
)
[docs]def microservice_api_rest_seldon_message(
method: str = "predict",
microservice_endpoint: str = "localhost:5000",
shape: Tuple = (1, 1),
data: object = None,
payload_type: str = "tensor",
bin_data: Union[bytes, bytearray] = None,
str_data: str = None,
json_data: Union[str, List, Dict] = None,
names: Iterable[str] = None,
**kwargs,
) -> SeldonClientPrediction:
"""
Call Seldon microservice REST API
Parameters
----------
method
The microservice method to call
microservice_endpoint
Running microservice endpoint
grpc_max_send_message_length
Max grpc send message size in bytes
grpc_max_receive_message_length
Max grpc receive message size in bytes
method
The microservice method to call
shape
The shape of the data to send
namespace
k8s namespace of running deployment
shape
Shape of the data to send
data
Numpy array data to send
payload_type
payload - tensor, ndarray or tftensor
bin_data
Binary data payload
str_data
String data payload
json_data
JSON data payload
names
Column names
kwargs
Returns
-------
A SeldonClientPrediction data response
"""
if bin_data is not None:
request = prediction_pb2.SeldonMessage(binData=bin_data)
elif str_data is not None:
request = prediction_pb2.SeldonMessage(strData=str_data)
elif json_data is not None:
request = json_to_seldon_message({"jsonData": json_data})
else:
if data is None:
data = np.random.rand(*shape)
datadef = array_to_grpc_datadef(payload_type, data, names=names)
request = prediction_pb2.SeldonMessage(data=datadef)
payload = seldon_message_to_json(request)
response_raw = requests.post(
"http://" + microservice_endpoint + "/" + method,
data={"json": json.dumps(payload)},
)
if response_raw.status_code == 200:
success = True
msg = ""
else:
success = False
msg = response_raw.reason
try:
response = json_to_seldon_message(response_raw.json())
return SeldonClientPrediction(request, response, success, msg)
except Exception as e:
return SeldonClientPrediction(request, None, success, str(e))
[docs]def microservice_api_rest_aggregate(
microservice_endpoint: str = "localhost:5000",
shape: Tuple = (1, 1),
datas: List[np.ndarray] = None,
ndatas: int = None,
payload_type: str = "tensor",
names: Iterable[str] = None,
**kwargs,
) -> SeldonClientCombine:
"""
Call Seldon microservice REST API aggregate endpoint
Parameters
----------
microservice_endpoint
Running microservice endpoint
shape
The shape of the data to send
datas
List of Numpy array data to send
ndatas
Multiple numpy arrays to send for aggregation
payload_type
payload - tensor, ndarray or tftensor
names
Column names
kwargs
Returns
-------
A SeldonClientPrediction
"""
if datas is None:
datas = []
for n in range(ndatas):
data = np.random.rand(*shape)
datas.append(data)
msgs = []
for data in datas:
if isinstance(data, (bytes, bytearray)):
msgs.append(prediction_pb2.SeldonMessage(binData=data))
elif isinstance(data, str):
msgs.append(prediction_pb2.SeldonMessage(strData=data))
else:
datadef = array_to_grpc_datadef(payload_type, data, names)
msgs.append(prediction_pb2.SeldonMessage(data=datadef))
request = prediction_pb2.SeldonMessageList(seldonMessages=msgs)
payload = seldon_messages_to_json(request)
response_raw = requests.post(
"http://" + microservice_endpoint + "/aggregate",
data={"json": json.dumps(payload)},
)
if response_raw.status_code == 200:
success = True
msg = ""
else:
success = False
msg = response_raw.reason
try:
response = json_to_seldon_message(response_raw.json())
return SeldonClientCombine(request, response, success, msg)
except Exception as e:
return SeldonClientCombine(request, None, success, str(e))
[docs]def microservice_api_rest_feedback(
prediction_request: prediction_pb2.SeldonMessage = None,
prediction_response: prediction_pb2.SeldonMessage = None,
reward: float = 0,
microservice_endpoint: str = None,
**kwargs,
) -> SeldonClientFeedback:
"""
Call Seldon microserice REST API to send feedback
Parameters
----------
prediction_request
Previous prediction request
prediction_response
Previous prediction response
reward
A reward to send in feedback
microservice_endpoint
Running microservice endpoint
kwargs
Returns
-------
A SeldonClientFeedback
"""
request = prediction_pb2.Feedback(
request=prediction_request, response=prediction_response, reward=reward
)
payload = feedback_to_json(request)
response_raw = requests.post(
"http://" + microservice_endpoint + "/send-feedback",
data={"json": json.dumps(payload)},
)
if response_raw.status_code == 200:
success = True
msg = ""
else:
success = False
msg = response_raw.reason
try:
response = json_to_seldon_message(response_raw.json())
return SeldonClientFeedback(request, response, success, msg)
except Exception as e:
return SeldonClientFeedback(request, None, success, str(e))
[docs]def microservice_api_grpc_seldon_message(
method: str = "predict",
microservice_endpoint: str = "localhost:5000",
shape: Tuple = (1, 1),
data: object = None,
payload_type: str = "tensor",
bin_data: Union[bytes, bytearray] = None,
str_data: str = None,
json_data: Union[str, List, Dict] = None,
custom_data: any_pb2.Any = None,
grpc_max_send_message_length: int = 4 * 1024 * 1024,
grpc_max_receive_message_length: int = 4 * 1024 * 1024,
names: Iterable[str] = None,
**kwargs,
) -> SeldonClientPrediction:
"""
Call Seldon microservice gRPC API
Parameters
----------
method
Method to call
microservice_endpoint
Running microservice endpoint
shape
The shape of the data to send
data
Numpy array data to send
payload_type
payload - tensor, ndarray or tftensor
bin_data
Binary data to send
str_data
String data to send
json_data
JSON data to send
custom_data
Custom data to send
grpc_max_send_message_length
Max grpc send message size in bytes
grpc_max_receive_message_length
Max grpc receive message size in bytes
names
column names
kwargs
Returns
-------
SeldonClientPrediction
"""
if bin_data is not None:
request = prediction_pb2.SeldonMessage(binData=bin_data)
elif str_data is not None:
request = prediction_pb2.SeldonMessage(strData=str_data)
elif json_data is not None:
request = json_to_seldon_message({"jsonData": json_data})
elif custom_data is not None:
request = prediction_pb2.SeldonMessage(customData=custom_data)
else:
if data is None:
data = np.random.rand(*shape)
datadef = array_to_grpc_datadef(payload_type, data, names=names)
request = prediction_pb2.SeldonMessage(data=datadef)
channel = grpc.insecure_channel(
microservice_endpoint,
options=[
("grpc.max_send_message_length", grpc_max_send_message_length),
("grpc.max_receive_message_length", grpc_max_receive_message_length),
],
)
try:
if method == "predict":
stub_model = prediction_pb2_grpc.ModelStub(channel)
response = stub_model.Predict(request=request)
elif method == "transform-input":
stub = prediction_pb2_grpc.GenericStub(channel)
response = stub.TransformInput(request=request)
elif method == "transform-output":
stub = prediction_pb2_grpc.GenericStub(channel)
response = stub.TransformOutput(request=request)
elif method == "route":
stub = prediction_pb2_grpc.GenericStub(channel)
response = stub.Route(request=request)
else:
raise SeldonClientException("Unknown method:" + method)
channel.close()
return SeldonClientPrediction(request, response, True, "")
except Exception as e:
channel.close()
return SeldonClientPrediction(request, None, False, str(e))
[docs]def microservice_api_grpc_aggregate(
microservice_endpoint: str = "localhost:5000",
shape: Tuple = (1, 1),
datas: List[np.ndarray] = None,
ndatas: int = None,
payload_type: str = "tensor",
grpc_max_send_message_length: int = 4 * 1024 * 1024,
grpc_max_receive_message_length: int = 4 * 1024 * 1024,
names: Iterable[str] = None,
**kwargs,
) -> SeldonClientCombine:
"""
Call Seldon microservice gRPC API aggregate
Parameters
----------
microservice_endpoint
Microservice API endpoint
shape
Shape of the data to send
datas
List of Numpy array data to send
ndatas
Multiple numpy arrays to send for aggregation
payload_type
payload - tensor, ndarray or tftensor
grpc_max_send_message_length
Max grpc send message size in bytes
grpc_max_receive_message_length
Max grpc receive message size in bytes
names
Column names
kwargs
Returns
-------
SeldonClientCombine
"""
if datas is None:
datas = []
for n in range(ndatas):
data = np.random.rand(*shape)
datas.append(data)
msgs = []
for data in datas:
if isinstance(data, (bytes, bytearray)):
msgs.append(prediction_pb2.SeldonMessage(binData=data))
elif isinstance(data, str):
msgs.append(prediction_pb2.SeldonMessage(strData=data))
elif isinstance(data, any_pb2.Any):
msgs.append(prediction_pb2.SeldonMessage(customData=data))
else:
datadef = array_to_grpc_datadef(payload_type, data, names=names)
msgs.append(prediction_pb2.SeldonMessage(data=datadef))
request = prediction_pb2.SeldonMessageList(seldonMessages=msgs)
try:
channel = grpc.insecure_channel(
microservice_endpoint,
options=[
("grpc.max_send_message_length", grpc_max_send_message_length),
("grpc.max_receive_message_length", grpc_max_receive_message_length),
],
)
stub = prediction_pb2_grpc.GenericStub(channel)
response = stub.Aggregate(request=request)
channel.close()
return SeldonClientCombine(request, response, True, "")
except Exception as e:
return SeldonClientCombine(request, None, False, str(e))
[docs]def microservice_api_grpc_feedback(
prediction_request: prediction_pb2.SeldonMessage = None,
prediction_response: prediction_pb2.SeldonMessage = None,
reward: float = 0,
microservice_endpoint: str = None,
grpc_max_send_message_length: int = 4 * 1024 * 1024,
grpc_max_receive_message_length: int = 4 * 1024 * 1024,
**kwargs,
) -> SeldonClientFeedback:
"""
Call Seldon gRPC
Parameters
----------
prediction_request
Previous prediction request
prediction_response
Previous prediction response
reward
A reward to send in feedback
microservice_endpoint
Running microservice endpoint
kwargs
Returns
-------
"""
request = prediction_pb2.Feedback(
request=prediction_request, response=prediction_response, reward=reward
)
try:
channel = grpc.insecure_channel(
microservice_endpoint,
options=[
("grpc.max_send_message_length", grpc_max_send_message_length),
("grpc.max_receive_message_length", grpc_max_receive_message_length),
],
)
stub = prediction_pb2_grpc.GenericStub(channel)
response = stub.SendFeedback(request=request)
channel.close()
return SeldonClientFeedback(request, response, True, "")
except Exception as e:
return SeldonClientFeedback(request, None, False, str(e))
#
# External API
#
[docs]def rest_predict_seldon(
namespace: str = None,
gateway_endpoint: str = "localhost:8002",
shape: Tuple = (1, 1),
data: object = None,
payload_type: str = "tensor",
bin_data: Union[bytes, bytearray] = None,
str_data: str = None,
json_data: Union[str, List, Dict] = None,
names: Iterable[str] = None,
client_return_type: str = "proto",
raw_data: Dict = None,
meta: Dict = {},
**kwargs,
) -> SeldonClientPrediction:
"""
Call Seldon API Gateway using REST
Parameters
----------
namespace
k8s namespace of running deployment
shape
Shape of endpoint
data
Data to send
payload_type
payload - tensor, ndarray or tftensor
bin_data
Binary data to send
str_data
String data to send
json_data
JSON data to send
names
column names
client_return_type
the return type of all functions can be either dict or proto
raw_data
Raw payload (dictionary) given by the user
meta
Custom meta data map, supplied as tags
kwargs
Returns
-------
Seldon Client Prediction
"""
metaKV = prediction_pb2.Meta()
metaJson = {"tags": meta}
json_format.ParseDict(metaJson, metaKV)
if raw_data:
request = json_to_seldon_message(raw_data)
payload = raw_data
else:
if bin_data is not None:
request = prediction_pb2.SeldonMessage(binData=bin_data, meta=metaKV)
elif str_data is not None:
request = prediction_pb2.SeldonMessage(strData=str_data, meta=metaKV)
elif json_data is not None:
request = json_to_seldon_message({"jsonData": json_data})
else:
if data is None:
data = np.random.rand(*shape)
datadef = array_to_grpc_datadef(payload_type, data, names=names)
request = prediction_pb2.SeldonMessage(data=datadef, meta=metaKV)
payload = seldon_message_to_json(request)
response_raw = requests.post(
"http://" + gateway_endpoint + "/api/v1.0/predictions", json=payload
)
if response_raw.status_code == 200:
success = True
msg = ""
else:
success = False
msg = str(response_raw.status_code) + ":" + response_raw.reason
try:
if len(response_raw.text) > 0:
try:
if client_return_type == "proto":
response = json_to_seldon_message(response_raw.json())
elif client_return_type == "dict":
response = response_raw.json()
else:
SeldonClientException("Invalid client_return_type")
except:
response = None
else:
response = None
return SeldonClientPrediction(request, response, success, msg)
except Exception as e:
return SeldonClientPrediction(request, None, False, str(e))
[docs]def grpc_predict_seldon(
namespace: str = None,
gateway_endpoint: str = "localhost:8004",
shape: Tuple[int, int] = (1, 1),
data: np.ndarray = None,
payload_type: str = "tensor",
bin_data: Union[bytes, bytearray] = None,
str_data: str = None,
json_data: Union[str, List, Dict] = None,
custom_data: any_pb2.Any = None,
grpc_max_send_message_length: int = 4 * 1024 * 1024,
grpc_max_receive_message_length: int = 4 * 1024 * 1024,
names: Iterable[str] = None,
client_return_type: str = "proto",
raw_data: Dict = None,
**kwargs,
) -> SeldonClientPrediction:
"""
Call Seldon gRPC API Gateway endpoint
Parameters
----------
namespace
k8s namespace of running deployment
shape
Shape of endpoint
data
Data to send
payload_type
payload - tensor, ndarray or tftensor
bin_data
Binary data to send
str_data
String data to send
json_data
JSON data to send
custom_data
Custom data to send
grpc_max_send_message_length
Max grpc send message size in bytes
grpc_max_receive_message_length
Max grpc receive message size in bytes
names
Column names
client_return_type
the return type of all functions can be either dict or proto
raw_data
Raw payload (dictionary or proto) given by the user
kwargs
Returns
-------
A SeldonMessage proto
"""
if isinstance(raw_data, prediction_pb2.SeldonMessage):
request = raw_data
elif raw_data:
request = json_to_seldon_message(raw_data)
else:
if bin_data is not None:
request = prediction_pb2.SeldonMessage(binData=bin_data)
elif str_data is not None:
request = prediction_pb2.SeldonMessage(strData=str_data)
elif json_data is not None:
request = json_to_seldon_message({"jsonData": json_data})
elif custom_data is not None:
request = prediction_pb2.SeldonMessage(customData=custom_data)
else:
if data is None:
data = np.random.rand(*shape)
datadef = array_to_grpc_datadef(payload_type, data, names=names)
request = prediction_pb2.SeldonMessage(data=datadef)
channel = grpc.insecure_channel(
gateway_endpoint,
options=[
("grpc.max_send_message_length", grpc_max_send_message_length),
("grpc.max_receive_message_length", grpc_max_receive_message_length),
],
)
stub = prediction_pb2_grpc.SeldonStub(channel)
try:
response = stub.Predict(request=request)
channel.close()
if client_return_type == "dict":
request = seldon_message_to_json(request)
response = seldon_message_to_json(response)
elif client_return_type != "proto":
raise SeldonClientException("Invalid client_return_type")
return SeldonClientPrediction(request, response, True, "")
except Exception as e:
channel.close()
return SeldonClientPrediction(request, None, False, str(e))
[docs]def rest_predict_gateway(
deployment_name: str,
namespace: str = None,
gateway_endpoint: str = "localhost:8003",
shape: Tuple[int, int] = (1, 1),
data: np.ndarray = None,
headers: Dict = None,
gateway_prefix: str = None,
payload_type: str = "tensor",
bin_data: Union[bytes, bytearray] = None,
str_data: str = None,
json_data: Union[str, Dict, List] = None,
names: Iterable[str] = None,
call_credentials: SeldonCallCredentials = None,
channel_credentials: SeldonChannelCredentials = None,
http_path: str = None,
meta: Dict = {},
client_return_type: str = "proto",
raw_data: Dict = None,
ssl: bool = None,
**kwargs,
) -> SeldonClientPrediction:
"""
REST request to Gateway Ingress
Parameters
----------
deployment_name
The name of the Seldon Deployment
namespace
k8s namespace of running deployment
gateway_endpoint
The host:port of gateway
shape
The shape of the data to send
data
The numpy data to send
headers
Headers to add to request
gateway_prefix
The prefix path to add to the request
payload_type
payload - tensor, ndarray or tftensor
bin_data
Binary data to send
str_data
String data to send
json_data
JSON data to send as str, dict or list
names
Column names
call_credentials
Call credentials - see SeldonCallCredentials
channel_credentials
Channel credentials - see SeldonChannelCredentials
http_path
Custom http path
meta
Custom meta map
client_return_type
the return type of all functions can be either dict or proto
raw_data
Raw payload (dictionary) given by the user
Returns
-------
A requests Response object
"""
# Create meta data
metaKV = prediction_pb2.Meta()
metaJson = {"tags": meta}
json_format.ParseDict(metaJson, metaKV)
if raw_data is not None:
request = json_to_seldon_message(raw_data)
payload = raw_data
else:
if bin_data is not None:
request = prediction_pb2.SeldonMessage(binData=bin_data, meta=metaKV)
elif str_data is not None:
request = prediction_pb2.SeldonMessage(strData=str_data, meta=metaKV)
elif json_data is not None:
request = json_to_seldon_message({"jsonData": json_data})
else:
if data is None:
data = np.random.rand(*shape)
datadef = array_to_grpc_datadef(payload_type, data, names=names)
request = prediction_pb2.SeldonMessage(data=datadef, meta=metaKV)
payload = seldon_message_to_json(request)
if headers is not None:
req_headers = headers.copy()
else:
req_headers = {}
if call_credentials is None or ssl is False:
scheme = "http"
else:
scheme = "https"
if call_credentials is not None:
if call_credentials.token is not None:
req_headers["X-Auth-Token"] = call_credentials.token
if http_path is not None:
url = (
scheme
+ "://"
+ gateway_endpoint
+ "/seldon/"
+ namespace
+ "/"
+ deployment_name
+ http_path
)
else:
if gateway_prefix is None:
if namespace is None:
url = (
scheme
+ "://"
+ gateway_endpoint
+ "/seldon/"
+ deployment_name
+ "/api/v1.0/predictions"
)
else:
url = (
scheme
+ "://"
+ gateway_endpoint
+ "/seldon/"
+ namespace
+ "/"
+ deployment_name
+ "/api/v1.0/predictions"
)
else:
url = (
scheme
+ "://"
+ gateway_endpoint
+ gateway_prefix
+ "/api/v1.0/predictions"
)
verify = True
cert = None
if channel_credentials is not None:
if channel_credentials.certificate_chain_file is not None:
verify = channel_credentials.certificate_chain_file
else:
verify = channel_credentials.verify
if channel_credentials.private_key_file is not None:
cert = (
channel_credentials.root_certificates_file,
channel_credentials.private_key_file,
)
logger.debug("URL is " + url)
response_raw = requests.post(
url, json=payload, headers=req_headers, verify=verify, cert=cert
)
if response_raw.status_code == 200:
success = True
msg = ""
else:
success = False
msg = str(response_raw.status_code) + ":" + response_raw.reason
try:
if len(response_raw.text) > 0:
try:
logger.debug("Raw response: %s", response_raw.text)
if client_return_type == "proto":
response = json_to_seldon_message(response_raw.json())
elif client_return_type == "dict":
response = response_raw.json()
else:
raise SeldonClientException("Invalid client_return_type")
except:
response = None
else:
response = None
return SeldonClientPrediction(request, response, success, msg)
except Exception as e:
return SeldonClientPrediction(request, None, False, str(e))
[docs]def explain_predict_gateway(
deployment_name: str,
namespace: str = None,
gateway_endpoint: str = "localhost:8003",
gateway: str = None,
transport: str = "rest",
shape: Tuple[int, int] = (1, 1),
data: np.ndarray = None,
headers: Dict = None,
gateway_prefix: str = None,
payload_type: str = "tensor",
bin_data: Union[bytes, bytearray] = None,
str_data: str = None,
json_data: Union[str, List, Dict] = None,
names: Iterable[str] = None,
call_credentials: SeldonCallCredentials = None,
channel_credentials: SeldonChannelCredentials = None,
http_path: str = None,
client_return_type: str = "dict",
predictor: str = None,
ssl: bool = None,
**kwargs,
) -> SeldonClientPrediction:
"""
REST explain request to Gateway Ingress
Parameters
----------
deployment_name
The name of the Seldon Deployment
namespace
k8s namespace of running deployment
gateway_endpoint
The host:port of gateway
gateway
The type of gateway which can be seldon or ambassador/istio
transport
The type of transport, in this case only rest is supported
shape
The shape of the data to send
data
The numpy data to send
headers
Headers to add to request
gateway_prefix
The prefix path to add to the request
payload_type
payload - tensor, ndarray or tftensor
bin_data
Binary data to send
str_data
String data to send
json_data
JSON data to send
names
Column names
call_credentials
Call credentials - see SeldonCallCredentials
channel_credentials
Channel credentials - see SeldonChannelCredentials
http_path
Custom http path
client_return_type
the return type of all functions can be either dict or proto
Returns
-------
A JSON Dict
"""
if transport != "rest":
raise SeldonClientException("Only supported transport is REST for explanations")
if bin_data is not None:
request = prediction_pb2.SeldonMessage(binData=bin_data)
elif str_data is not None:
request = prediction_pb2.SeldonMessage(strData=str_data)
elif json_data is not None:
request = json_to_seldon_message({"jsonData": json_data})
else:
if data is None:
data = np.random.rand(*shape)
datadef = array_to_grpc_datadef(payload_type, data, names=names)
request = prediction_pb2.SeldonMessage(data=datadef)
payload = seldon_message_to_json(request)
if not headers is None:
req_headers = headers.copy()
else:
req_headers = {}
if channel_credentials is None or ssl is False:
scheme = "http"
else:
scheme = "https"
if not call_credentials is None:
if not call_credentials.token is None:
req_headers["X-Auth-Token"] = call_credentials.token
if http_path is not None:
url = (
scheme
+ "://"
+ gateway_endpoint
+ "/seldon/"
+ namespace
+ "/"
+ deployment_name
+ "-explainer"
+ "/"
+ predictor
+ http_path
)
elif gateway == "seldon":
url = scheme + "://" + gateway_endpoint + "/api/v1.0/explain"
else:
if not predictor:
raise SeldonClientException(
"Predictor parameter must be provided to talk through explainer via gateway"
)
if gateway_prefix is None:
if namespace is None:
url = (
scheme
+ "://"
+ gateway_endpoint
+ "/seldon/"
+ deployment_name
+ "-explainer"
+ "/"
+ predictor
+ "/api/v1.0/explain"
)
else:
url = (
scheme
+ "://"
+ gateway_endpoint
+ "/seldon/"
+ namespace
+ "/"
+ deployment_name
+ "-explainer"
+ "/"
+ predictor
+ "/api/v1.0/explain"
)
else:
url = (
scheme + "://" + gateway_endpoint + gateway_prefix + "/api/v1.0/explain"
)
verify = True
cert = None
if not channel_credentials is None:
if not channel_credentials.certificate_chain_file is None:
verify = channel_credentials.certificate_chain_file
else:
verify = channel_credentials.verify
if not channel_credentials.private_key_file is None:
cert = (
channel_credentials.root_certificates_file,
channel_credentials.private_key_file,
)
logger.debug("URL is " + url)
response_raw = requests.post(
url, json=payload, headers=req_headers, verify=verify, cert=cert
)
if response_raw.status_code == 200:
if client_return_type == "dict":
ret_request = payload
ret_response = response_raw.json()
else:
raise SeldonClientException("Invalid client_return_type")
return SeldonClientPrediction(ret_request, ret_response, True, "")
else:
return SeldonClientPrediction(
payload,
response_raw,
False,
f"Unsuccessful request with status code: {response_raw.status_code}",
)
[docs]def grpc_predict_gateway(
deployment_name: str,
namespace: str = None,
gateway_endpoint: str = "localhost:8003",
shape: Tuple[int, int] = (1, 1),
data: np.ndarray = None,
headers: Dict = None,
payload_type: str = "tensor",
bin_data: Union[bytes, bytearray] = None,
str_data: str = None,
json_data: Union[str, List, Dict] = None,
custom_data: any_pb2.Any = None,
grpc_max_send_message_length: int = 4 * 1024 * 1024,
grpc_max_receive_message_length: int = 4 * 1024 * 1024,
names: Iterable[str] = None,
call_credentials: SeldonCallCredentials = None,
channel_credentials: SeldonChannelCredentials = None,
meta: Dict = {},
client_return_type: str = "proto",
raw_data: Dict = None,
ssl: bool = None,
**kwargs,
) -> SeldonClientPrediction:
"""
gRPC request to Gateway Ingress
Parameters
----------
deployment_name
Deployment name of Seldon Deployment
namespace
The namespace the Seldon Deployment is running in
gateway_endpoint
The endpoint for gateway
shape
The shape of the data
data
The numpy array data to send
headers
A Dict of key value pairs to add to gRPC HTTP Headers
payload_type
payload - tensor, ndarray or tftensor
bin_data
Binary data to send
str_data
String data to send
json_data
JSON data to send
custom_data
Custom data to send
grpc_max_send_message_length
Max grpc send message size in bytes
grpc_max_receive_message_length
Max grpc receive message size in bytes
names
Column names
call_credentials
Call credentials - see SeldonCallCredentials
channel_credentials
Channel credentials - see SeldonChannelCredentials
meta
Custom meta data map, supplied as tags
client_return_type
the return type of all functions can be either dict or proto
raw_data
Raw payload (dictionary or proto) given by the user
Returns
-------
A SeldonMessage proto response
"""
# Create meta data
metaKV = prediction_pb2.Meta()
metaJson = {"tags": meta}
json_format.ParseDict(metaJson, metaKV)
if isinstance(raw_data, prediction_pb2.SeldonMessage):
request = raw_data
elif raw_data is not None:
request = json_to_seldon_message(raw_data)
else:
if bin_data is not None:
request = prediction_pb2.SeldonMessage(binData=bin_data, meta=metaKV)
elif str_data is not None:
request = prediction_pb2.SeldonMessage(strData=str_data, meta=metaKV)
elif json_data is not None:
request = json_to_seldon_message({"jsonData": json_data})
elif custom_data is not None:
request = prediction_pb2.SeldonMessage(customData=custom_data, meta=metaKV)
else:
if data is None:
data = np.random.rand(*shape)
datadef = array_to_grpc_datadef(payload_type, data, names=names)
request = prediction_pb2.SeldonMessage(data=datadef, meta=metaKV)
options = [
("grpc.max_send_message_length", grpc_max_send_message_length),
("grpc.max_receive_message_length", grpc_max_receive_message_length),
]
if channel_credentials is None:
channel = grpc.insecure_channel(gateway_endpoint, options)
else:
# If one of root cert & cert chain are provided, both must be provided
# otherwise there is a null pointer exception in the Go underlying impl
if (
channel_credentials.private_key_file
and channel_credentials.root_certificates_file
and channel_credentials.certificate_chain_file
):
grpc_channel_credentials = grpc.ssl_channel_credentials(
root_certificates=open(
channel_credentials.root_certificates_file, "rb"
).read(),
private_key=open(channel_credentials.private_key_file, "rb").read(),
certificate_chain=open(
channel_credentials.certificate_chain_file, "rb"
).read(),
)
# For most usecases only providing the root cert file is enough
elif channel_credentials.root_certificates_file:
grpc_channel_credentials = grpc.ssl_channel_credentials(
root_certificates=open(
channel_credentials.root_certificates_file, "rb"
).read()
)
# This piece also allows for blank SSL Channel credentials in case this is required
else:
if ssl is False:
grpc_channel_credentials = grpc.local_channel_credentials()
else:
grpc_channel_credentials = grpc.ssl_channel_credentials()
if channel_credentials.verify == False:
# If Verify is set to false then we add the SSL Target Name Override option
options += [
("grpc.ssl_target_name_override", gateway_endpoint.split(":")[0])
]
if not call_credentials is None:
grpc_call_credentials = grpc.metadata_call_credentials(
lambda context, callback: callback(
(("x-auth-token", call_credentials.token),), None
)
)
credentials = grpc.composite_channel_credentials(
grpc_channel_credentials, grpc_call_credentials
)
else:
credentials = grpc_channel_credentials
logger.debug(f"Sending GRPC Request to endpoint: {gateway_endpoint}")
channel = grpc.secure_channel(gateway_endpoint, credentials, options)
stub = prediction_pb2_grpc.SeldonStub(channel)
if namespace is None:
metadata = [("seldon", deployment_name)]
else:
metadata = [("seldon", deployment_name), ("namespace", namespace)]
if not headers is None:
for k in headers:
metadata.append((k, headers[k]))
try:
response = stub.Predict(request=request, metadata=metadata)
channel.close()
if client_return_type == "dict":
request = seldon_message_to_json(request)
response = seldon_message_to_json(response)
elif client_return_type != "proto":
raise SeldonClientException("Invalid client_return_type")
return SeldonClientPrediction(request, response, True, "")
except Exception as e:
channel.close()
return SeldonClientPrediction(request, None, False, str(e))
[docs]def rest_feedback_seldon(
prediction_request: prediction_pb2.SeldonMessage = None,
prediction_response: prediction_pb2.SeldonMessage = None,
prediction_truth: prediction_pb2.SeldonMessage = None,
reward: float = 0,
namespace: str = None,
gateway_endpoint: str = "localhost:8002",
client_return_type: str = "proto",
raw_request: dict = None,
**kwargs,
) -> SeldonClientFeedback:
"""
Send Feedback to Seldon API Gateway using REST
Parameters
----------
prediction_request
Previous prediction request
prediction_response
Previous prediction response
reward
A reward to send in feedback
namespace
k8s namespace of running deployment
client_return_type
the return type of all functions can be either dict or proto
kwargs
Returns
-------
"""
if raw_request:
request = json_to_feedback(raw_request)
payload = raw_request
else:
request = prediction_pb2.Feedback(
request=prediction_request,
response=prediction_response,
reward=reward,
truth=prediction_truth,
)
payload = feedback_to_json(request)
response_raw = requests.post(
"http://" + gateway_endpoint + "/api/v1.0/feedback", json=payload
)
if response_raw.status_code == 200:
success = True
msg = ""
else:
success = False
msg = str(response_raw.status_code) + ":" + response_raw.reason
try:
if len(response_raw.text) > 0:
try:
if client_return_type == "proto":
response = json_to_seldon_message(response_raw.json())
elif client_return_type == "dict":
response = response_raw.json()
else:
raise SeldonClientException("Invalid client_return_type")
except:
response = None
else:
response = None
return SeldonClientFeedback(request, response, success, msg)
except Exception as e:
return SeldonClientFeedback(request, None, False, str(e))
[docs]def grpc_feedback_seldon(
prediction_request: prediction_pb2.SeldonMessage = None,
prediction_response: prediction_pb2.SeldonMessage = None,
prediction_truth: prediction_pb2.SeldonMessage = None,
reward: float = 0,
namespace: str = None,
gateway_endpoint: str = "localhost:8004",
grpc_max_send_message_length: int = 4 * 1024 * 1024,
grpc_max_receive_message_length: int = 4 * 1024 * 1024,
client_return_type: str = "proto",
raw_request: dict = None,
**kwargs,
) -> SeldonClientFeedback:
"""
Send feedback to Seldon API gateway via gRPC
Parameters
----------
prediction_request
Previous prediction request
prediction_response
Previous prediction response
reward
A reward to send in feedback
namespace
k8s namespace of running deployment
grpc_max_send_message_length
Max grpc send message size in bytes
grpc_max_receive_message_length
Max grpc receive message size in bytes
client_return_type
the return type of all functions can be either dict or proto
kwargs
Returns
-------
"""
if isinstance(raw_request, prediction_pb2.Feedback):
request = raw_request
elif raw_request:
request = json_to_feedback(raw_request)
else:
request = prediction_pb2.Feedback(
request=prediction_request,
response=prediction_response,
reward=reward,
truth=prediction_truth,
)
channel = grpc.insecure_channel(
gateway_endpoint,
options=[
("grpc.max_send_message_length", grpc_max_send_message_length),
("grpc.max_receive_message_length", grpc_max_receive_message_length),
],
)
stub = prediction_pb2_grpc.SeldonStub(channel)
try:
response = stub.SendFeedback(request=request)
channel.close()
if client_return_type == "dict":
request = seldon_message_to_json(request)
response = seldon_message_to_json(response)
elif client_return_type != "proto":
raise SeldonClientException("Invalid client_return_type")
return SeldonClientFeedback(request, response, True, "")
except Exception as e:
channel.close()
return SeldonClientFeedback(request, None, False, str(e))
[docs]def rest_feedback_gateway(
prediction_request: prediction_pb2.SeldonMessage = None,
prediction_response: prediction_pb2.SeldonMessage = None,
prediction_truth: prediction_pb2.SeldonMessage = None,
reward: float = 0,
deployment_name: str = "",
namespace: str = None,
gateway_endpoint: str = "localhost:8003",
headers: Dict = None,
gateway_prefix: str = None,
client_return_type: str = "proto",
raw_request: dict = None,
**kwargs,
) -> SeldonClientFeedback:
"""
Send Feedback to Seldon via gateway using REST
Parameters
----------
prediction_request
Previous prediction request
prediction_response
Previous prediction response
reward
A reward to send in feedback
deployment_name
The name of the running Seldon deployment
namespace
k8s namespace of running deployment
gateway_endpoint
The gateway host:port endpoint
headers
Headers to add to the request
gateway_prefix
The prefix to add to the request path for gateway
client_return_type
the return type of all functions can be either dict or proto
kwargs
Returns
-------
A Seldon Feedback Response
"""
if raw_request:
request = json_to_feedback(raw_request)
payload = raw_request
else:
request = prediction_pb2.Feedback(
request=prediction_request,
response=prediction_response,
reward=reward,
truth=prediction_truth,
)
payload = feedback_to_json(request)
if gateway_prefix is None:
if namespace is None:
response_raw = requests.post(
"http://"
+ gateway_endpoint
+ "/seldon/"
+ deployment_name
+ "/api/v1.0/feedback",
json=payload,
headers=headers,
)
else:
response_raw = requests.post(
"http://"
+ gateway_endpoint
+ "/seldon/"
+ namespace
+ "/"
+ deployment_name
+ "/api/v1.0/feedback",
json=payload,
headers=headers,
)
else:
response_raw = requests.post(
"http://" + gateway_endpoint + gateway_prefix + "/api/v1.0/feedback",
json=payload,
headers=headers,
)
if response_raw.status_code == 200:
success = True
msg = ""
else:
success = False
msg = str(response_raw.status_code) + ":" + response_raw.reason
try:
if len(response_raw.text) > 0:
try:
if client_return_type == "proto":
response = json_to_seldon_message(response_raw.json())
elif client_return_type == "dict":
response = response_raw.json()
else:
raise SeldonClientException("Invalid client_return_type")
except:
response = None
else:
response = None
return SeldonClientFeedback(request, response, success, msg)
except Exception as e:
return SeldonClientFeedback(request, None, False, str(e))
[docs]def grpc_feedback_gateway(
prediction_request: prediction_pb2.SeldonMessage = None,
prediction_response: prediction_pb2.SeldonMessage = None,
prediction_truth: prediction_pb2.SeldonMessage = None,
reward: float = 0,
deployment_name: str = "",
namespace: str = None,
gateway_endpoint: str = "localhost:8003",
headers: Dict = None,
grpc_max_send_message_length: int = 4 * 1024 * 1024,
grpc_max_receive_message_length: int = 4 * 1024 * 1024,
client_return_type: str = "proto",
raw_request: dict = None,
**kwargs,
) -> SeldonClientFeedback:
"""
Parameters
----------
prediction_request
Previous prediction request
prediction_response
Previous prediction response
reward
A reward to send in feedback
deployment_name
The name of the running Seldon deployment
namespace
k8s namespace of running deployment
gateway_endpoint
The gateway host:port endpoint
headers
Headers to add to the request
grpc_max_send_message_length
Max grpc send message size in bytes
grpc_max_receive_message_length
Max grpc receive message size in bytes
client_return_type
the return type of all functions can be either dict or proto
kwargs
Returns
-------
"""
if isinstance(raw_request, prediction_pb2.Feedback):
request = raw_request
elif raw_request:
request = json_to_feedback(raw_request)
else:
request = prediction_pb2.Feedback(
request=prediction_request,
response=prediction_response,
reward=reward,
truth=prediction_truth,
)
channel = grpc.insecure_channel(
gateway_endpoint,
options=[
("grpc.max_send_message_length", grpc_max_send_message_length),
("grpc.max_receive_message_length", grpc_max_receive_message_length),
],
)
stub = prediction_pb2_grpc.SeldonStub(channel)
if namespace is None:
metadata = [("seldon", deployment_name)]
else:
metadata = [("seldon", deployment_name), ("namespace", namespace)]
if not headers is None:
for k in headers:
metadata.append((k, headers[k]))
try:
response = stub.SendFeedback(request=request, metadata=metadata)
channel.close()
if client_return_type == "dict":
request = seldon_message_to_json(request)
response = seldon_message_to_json(response)
elif client_return_type != "proto":
raise SeldonClientException("Invalid client_return_type")
return SeldonClientFeedback(request, response, True, "")
except Exception as e:
channel.close()
return SeldonClientFeedback(request, None, False, str(e))