This page was generated from examples/kafka/sklearn_spacy/README.ipynb.

Seldon Kafka Integration Example with SpaCy Reddit Model

In this model we will build upon the code from the Seldon SpaCy NLP example for text classification.

You will learn how to deploy the model using the Kafka protocol.

Requirements

In your commands: * helm 3.x * kubectl

In cluster: * Install Seldon * Install Kafka as per the instructions below

Setup Kafka

Ensure your helm repo has access to the strimzi Kafka charts which we’ll use to install.

[23]:
!helm repo add strimzi https://strimzi.io/charts/
"strimzi" has been added to your repositories

Install the Kafka operator in your cluster

[30]:
!helm install my-release strimzi/strimzi-kafka-operator
NAME: my-release
LAST DEPLOYED: Tue Sep 29 08:31:41 2020
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Thank you for installing strimzi-kafka-operator-0.19.0

To create a Kafka cluster refer to the following documentation.

https://strimzi.io/docs/operators/0.19.0/using.html#deploying-cluster-operator-helm-chart-str

We now create a kafka cluster instantiation with a simple setup as outlined below.

[38]:
%%writefile kafka-cluster-config.yaml
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 1
    listeners:
      plain: {}
      tls: {}
      external:
        type: nodeport
        tls: false
    storage:
      type: ephemeral
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
  zookeeper:
    replicas: 1
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}
Overwriting kafka-cluster-config.yaml
[39]:
!kubectl apply -f kafka-cluster-config.yaml
kafka.kafka.strimzi.io/my-cluster created

We can now check that kafka was installed correctly

[105]:
!kubectl get pods | grep my-cluster
my-cluster-entity-operator-df58f8b9f-s9wx5               3/3     Running   0          148m
my-cluster-kafka-0                                       2/2     Running   0          149m
my-cluster-zookeeper-0                                   1/1     Running   0          149m

Create topics

We now need to create the input and output topics for our reddit classifier

[49]:
%%writefile topics.yaml
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: reddit-classifier-input
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 2
  replicas: 1
---
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: reddit-classifier-output
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 2
  replicas: 1
Writing topics.yaml
[50]:
!kubectl apply -f topics.yaml
kafkatopic.kafka.strimzi.io/reddit-classifier-input created
kafkatopic.kafka.strimzi.io/reddit-classifier-output created

Train Spacy Sklearn Model

To train the spacy sklearn model you can follow the instructions in the SKlearn Spacy Model Example.

Alternatively you can just use the image that is saved in the seldon dockerhub with the image seldonio/reddit-classifier:0.1

Deploy SpaCy Text Classifier

Now we’re able to define the YAML that will be used to deploy the configuration.

[96]:
%%writefile sdep_reddit_kafka.yaml
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: reddit-kafka
spec:
  serverType: kafka
  predictors:
  - componentSpecs:
    - spec:
        containers:
        - image: seldonio/reddit-classifier:0.1
          name: classifier
    svcOrchSpec:
      env:
      - name: KAFKA_BROKER
        value: my-cluster-kafka-brokers.default.svc.cluster.local:9092
      - name: KAFKA_INPUT_TOPIC
        value: reddit-classifier-input
      - name: KAFKA_OUTPUT_TOPIC
        value: reddit-classifier-output
    graph:
      name: classifier
      type: MODEL
    name: default
    replicas: 1
Overwriting sdep_reddit_kafka.yaml
[97]:
!kubectl apply -f sdep_reddit_kafka.yaml
seldondeployment.machinelearning.seldon.io/reddit-kafka configured

We can confirm that now the model is running as expected:

[106]:
!kubectl get pods | grep reddit
reddit-kafka-default-0-classifier-c6ccdd66f-vmf4v        2/2     Running   0          45m

Send real time data for stream processing

We can now send real time data for stream processing.

Below we send a single input with the text “This is an input”, which will be consumed from the input topic.

[ ]:
!kubectl run --quiet=true -it --rm kafkaconsumer --image=bitnami/kafka:2.6.0 --restart=Never --command -- \
    bash -c "echo '{\"data\": {\"ndarray\": [\"This is an input\"]}}' \
    | kafka-console-producer.sh --broker-list my-cluster-kafka-external-bootstrap.default:9094 --topic reddit-classifier-input"

Check the data processed

We now are able to see all the data that has been pushed to the output topic reddit-classifier-output.

This allows us to see all the inputs that have been processed. We will be listening for 10 seconds to ensure all data is found.

[103]:
!kubectl run --quiet=true -it --rm kafkaproducer --image=bitnami/kafka:2.6.0 --restart=Never --command -- \
    kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-external-bootstrap.default:9094 --topic reddit-classifier-output \
        --from-beginning --timeout-ms 10000
{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

[2020-09-29 11:45:06,366] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 9 messages
[ ]: