This page was generated from examples/kafka/cifar10/cifar10_kafka.ipynb.

Seldon Kafka Integration Example with CIFAR10 Model

In this example we will run SeldonDeployments for a CIFAR10 Tensorflow model which take their inputs from a Kafka topic and push their outputs to a Kafka topic. We will experiment with both REST and gRPC Seldon graphs. For REST we will load our input topic with Tensorflow JSON requests and for gRPC we will load Tensorflow PredictRequest protoBuffers.

Requirements

[ ]:
!pip install -r requirements.txt

Setup Kafka

Install Strimzi on cluster

[ ]:
!helm repo add strimzi https://strimzi.io/charts/
[ ]:
!helm install my-release strimzi/strimzi-kafka-operator

Set the following to whether you are running a local Kind cluster or a cloud based cluster.

[ ]:
clusterType="kind"
#clusterType="cloud"
[ ]:
if clusterType == "kind":
    !kubectl apply -f cluster-kind.yaml
else:
    !kubectl apply -f cluster-cloud.yaml

Get broker endpoint.

[ ]:
if clusterType == "kind":
    res=!kubectl get service my-cluster-kafka-external-bootstrap -n default -o=jsonpath='{.spec.ports[0].nodePort}'
    port=res[0]
    %env BROKER=172.17.0.2:$port
else:
    res=!kubectl get service my-cluster-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].hostname}'
    if len(res) == 1:
        hostname=res[0]
        %env BROKER=$h:9094
    else:
        res=!kubectl get service my-cluster-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].ip}'
        ip=res[0]
        %env BROKER=$ip:9094
[ ]:
%%writefile topics.yaml
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: cifar10-rest-input
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 2
  replicas: 1
---
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: cifar10-rest-output
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 2
  replicas: 1
---
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: cifar10-grpc-input
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 2
  replicas: 1
---
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: cifar10-grpc-output
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 2
  replicas: 1
[ ]:
!kubectl apply -f topics.yaml

Install Seldon

Download Test Request Data

We have two example datasets containing 50,000 requests in tensorflow serving format for CIFAR10. One in JSON format and one as length encoded proto buffers.

[ ]:
!gsutil cp gs://seldon-datasets/cifar10/requests/tensorflow/cifar10_tensorflow.json.gz cifar10_tensorflow.json.gz
!gunzip cifar10_tensorflow.json.gz
!gsutil cp gs://seldon-datasets/cifar10/requests/tensorflow/cifar10_tensorflow.proto cifar10_tensorflow.proto

Test CIFAR10 REST Model

Upload tensorflow serving rest requests to kafka. This may take some time dependent on your network connection.

[ ]:
!python ../../../util/kafka/test-client.py produce $BROKER cifar10-rest-input --file cifar10_tensorflow.json
[ ]:
res=!kubectl get service my-cluster-kafka-external-bootstrap -o=jsonpath='{.spec.clusterIP}'
ip=res[0]
%env BROKER_CIP=$ip
[ ]:
%%writefile cifar10_rest.yaml
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: tfserving-cifar10
spec:
  protocol: tensorflow
  transport: rest
  serverType: kafka
  predictors:
  - componentSpecs:
    - spec:
        containers:
        - args:
          - --port=8500
          - --rest_api_port=8501
          - --model_name=resnet32
          - --model_base_path=gs://seldon-models/tfserving/cifar10/resnet32
          - --enable_batching
          image: tensorflow/serving
          name: resnet32
          ports:
          - containerPort: 8501
            name: http
    svcOrchSpec:
      env:
      - name: KAFKA_BROKER
        value: BROKER_IP
      - name: KAFKA_INPUT_TOPIC
        value: cifar10-rest-input
      - name: KAFKA_OUTPUT_TOPIC
        value: cifar10-rest-output
    graph:
      name: resnet32
      type: MODEL
      endpoint:
        service_port: 8501
    name: model
    replicas: 1
[ ]:
!cat cifar10_rest.yaml | sed s/BROKER_IP/$BROKER_CIP:9094/ | kubectl apply -f -

Looking at the metrics dashboard for Seldon you should see throughput we are getting. For a single replica on GKE with n1-standard-4 nodes we can see roughly 150 requests per second being processed.

rest

[ ]:
!kubectl delete -f cifar10_rest.yaml

Test CIFAR10 gRPC Model

Upload tensorflow serving rest requests to kafka. This is a file of protobuffer tenserflow.serving.PredictRequest (defn). Each binary protobuffer is prefixed by the numbre of bytes. Out test-client python script reads them and sends to our topic. This may take some time dependent on your network connection.

[ ]:
!python ../../../util/kafka/test-client.py produce $BROKER cifar10-grpc-input --file cifar10_tensorflow.proto --proto_name tensorflow.serving.PredictRequest
[ ]:
res=!kubectl get service my-cluster-kafka-external-bootstrap -o=jsonpath='{.spec.clusterIP}'
ip=res[0]
%env BROKER_CIP=$ip
[ ]:
%%writefile cifar10_grpc.yaml
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: tfserving-cifar10
spec:
  protocol: tensorflow
  transport: grpc
  serverType: kafka
  predictors:
  - componentSpecs:
    - spec:
        containers:
        - args:
          - --port=8500
          - --rest_api_port=8501
          - --model_name=resnet32
          - --model_base_path=gs://seldon-models/tfserving/cifar10/resnet32
          - --enable_batching
          image: tensorflow/serving
          name: resnet32
          ports:
          - containerPort: 8500
            name: http
    svcOrchSpec:
      env:
      - name: KAFKA_BROKER
        value: BROKER_IP
      - name: KAFKA_INPUT_TOPIC
        value: cifar10-grpc-input
      - name: KAFKA_OUTPUT_TOPIC
        value: cifar10-grpc-output
    graph:
      name: resnet32
      type: MODEL
      endpoint:
        service_port: 8500
    name: model
    replicas: 2
[ ]:
!cat cifar10_grpc.yaml | sed s/BROKER_IP/$BROKER_CIP:9094/ | kubectl apply -f -

Looking at the metrics dashboard for Seldon you should see throughput we are getting. For a single replica on GKE with n1-standard-4 nodes we can see around 220 requests per second being processed.

grpc

[ ]:
!kubectl delete -f cifar10_grpc.yaml
[ ]: