End-to-end Reusable ML Pipeline with Seldon and KubeflowĀ¶

In this example we showcase how to build re-usable components to build an ML pipeline that can be trained and deployed at scale.

We will automate content moderation on the Reddit comments in /r/science building a machine learning NLP model with the following components:


This tutorial will break down in the following sections:

  1. Test and build all our reusable pipeline steps

  2. Use Kubeflow to Train the Pipeline and Deploy to Seldon

  3. Test Seldon Deployed ML REST Endpoints

  4. Visualise Seldonā€™s Production ML Pipelines

Before you startĀ¶

Make sure you have the following components set-up and running in your Kubernetes cluster:

Letā€™s get started! šŸš€šŸ”„ We will be building the end-to-end pipeline below:


%%writefile requirements-dev.txt
Overwriting requirements-dev.txt
!pip install -r requirements-dev.txt

1) Test and build all our reusable pipeline stepsĀ¶

We will start by building each of the components in our ML pipeline.


Letā€™s first have a look at our clean_text step:Ā¶

!ls pipeline/pipeline_steps
clean_text       lr_text_classifier  tfidf_vectorizer
data_downloader  spacy_tokenize

Like in this step, all of the other steps can be found in the pipeline/pipeline_steps/ folder, and all have the following structure: * pipeline_step.py which exposes the functionality through a CLI * Transformer.py which transforms the data accordingly * requirements.txt which states the python dependencies to run * build_image.sh which uses s2i to build the image with one line

Letā€™s check out the CLI for clean_textĀ¶

The pipeline_step CLI is the entry point for the kubeflow image as it will be able to pass any relevant parameters

!python pipeline/pipeline_steps/clean_text/pipeline_step.py --help
Usage: pipeline_step.py [OPTIONS]

  --in-path TEXT
  --out-path TEXT
  --help           Show this message and exit.

This is actually a very simple file, as we are using the click library to define the commands:

!cat pipeline/pipeline_steps/clean_text/pipeline_step.py
import dill
import click
import dill
    # Running for tests
    from .Transformer import Transformer
    # Running from CLI
    from Transformer import Transformer

@click.option('--in-path', default="/mnt/raw_text.data")
@click.option('--out-path', default="/mnt/clean_text.data")
def run_pipeline(in_path, out_path):
    clean_text_transformer = Transformer()
    with open(in_path, 'rb') as in_f:
        x = dill.load(in_f)
    y = clean_text_transformer.predict(x)
    with open(out_path, "wb") as out_f:
        dill.dump(y, out_f)

if __name__ == "__main__":

The Transformer is where the data munging and transformation stage comes in, which will be wrapped by the container and exposed through the Seldon Engine to ensure our pipeline can be used in production.

Seldon provides multiple different features, such as abilities to send custom metrics, pre-process / post-process data and more. In this example we will only be exposing the predict step.

!cat pipeline/pipeline_steps/clean_text/Transformer.py
import re
from html.parser import HTMLParser
import numpy as np
import logging

class Transformer():
    __html_parser = HTMLParser()
    __uplus_pattern = \
    __markup_link_pattern = \

    def predict(self, X, feature_names=[]):
        f = np.vectorize(Transformer.transform_clean_text)
        X_clean = f(X)
        return X_clean

    def fit(self, X, y=None, **fit_params):
        return self

    def transform_clean_text(raw_text):
            decoded = raw_text.encode("ISO-8859-1").decode("utf-8")
            decoded = raw_text.encode("ISO-8859-1").decode("cp1252")
        html_unescaped =Transformer.\
        html_unescaped = re.sub(r"\r\n", " ", html_unescaped)
        html_unescaped = re.sub(r"\r\r\n", " ", html_unescaped)
        html_unescaped = re.sub(r"\r", " ", html_unescaped)
        html_unescaped = html_unescaped.replace("&gt;", " > ")
        html_unescaped = html_unescaped.replace("&lt;", " < ")
        html_unescaped = html_unescaped.replace("--", " - ")
        html_unescaped = Transformer.__uplus_pattern.sub(
            " U\g<digit> ", html_unescaped)
        html_unescaped = Transformer.__markup_link_pattern.sub(
            " \1 \2 ", html_unescaped)
        html_unescaped = html_unescaped.replace("\\", "")
        return html_unescaped

If you want to understand how the CLI pipeline talks to each other, have a look at the end to end test in pipeline/pipeline_tests/:

!pytest ./pipeline/pipeline_tests/. --disable-pytest-warnings
Test session starts (platform: linux, Python 3.7.4, pytest 6.0.1, pytest-sugar 0.9.4)
rootdir: /home/alejandro/Programming/kubernetes/seldon/seldon-core/examples/kubeflow
plugins: celery-4.4.0, flaky-3.6.1, cov-2.10.0, django-3.8.0, forked-1.1.3, sugar-0.9.4, xdist-1.30.0
collecting ... 
 pipeline/pipeline_tests/test_pipeline.py āœ“                      100% ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆ

Results (2.12s):
       1 passed

To build the image we provide a build script in each of the steps that contains the instructions:

!cat pipeline/pipeline_steps/clean_text/build_image.sh

s2i build . seldonio/seldon-core-s2i-python3:1.19.0-dev clean_text_transformer:0.1

The only thing you need to make sure is that Seldon knows how to wrap the right model and file.

This can be achieved with the s2i/environment file.

As you can see, here we just tell it we want it to use our Transformer.py file:

!cat pipeline/pipeline_steps/clean_text/.s2i/environment

Once this is defined, the only thing we need to do is to run the build_image.sh for all the reusable components.

Here we show the manual way to do it:

# we must be in the same directory
cd pipeline/pipeline_steps/clean_text/ && ./build_image.sh
cd ../data_downloader && ./build_image.sh
cd ../lr_text_classifier && ./build_image.sh
cd ../spacy_tokenize && ./build_image.sh
cd ../tfidf_vectorizer && ./build_image.sh
3) Train our NLP Pipeline through the Kubeflow UIĀ¶

We can access the Kubeflow dashboard to train our ML pipeline via http://localhost/_/pipeline-dashboard

If you canā€™t edit this, you need to make sure that the ambassador gateway service is accessible:

!kubectl get svc ambassador -n kubeflow
NAME         TYPE       CLUSTER-IP      EXTERNAL-IP   PORT(S)        AGE
ambassador   NodePort   <none>        80:30209/TCP   8m58s

In my case, I need to change the kind from NodePort into LoadBalancer which can be done with the following command:

!kubectl patch svc ambassador --type='json' -p '[{"op":"replace","path":"/spec/type","value":"LoadBalancer"}]' -n kubeflow
service/ambassador patched

Now that Iā€™ve changed it to a loadbalancer, it has allocated the external IP as my localhost so I can access it at http://localhost/_/pipeline-dashboard

!kubectl get svc ambassador -n kubeflow
NAME         TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)        AGE
ambassador   LoadBalancer   localhost     80:30209/TCP   9m20s

If this was successful, you should be able to access the dashboard kf-pipeline-dashboard

Define the pipelineĀ¶

Now we want to generate the pipeline. For this we can use the DSL provided by kubeflow to define the actual steps required.

The pipeline will look as follows:


!cat train_pipeline/nlp_pipeline.py

import kfp.dsl as dsl
import yaml
from kubernetes import client as k8s

  description='A pipeline demonstrating reproducible steps for NLP'
def nlp_pipeline(
    vop = dsl.VolumeOp(

    download_step = dsl.ContainerOp(
            "--labels-path", labels_path,
            "--features-path", raw_text_path,
            "--csv-url", csv_url,
            "--csv-encoding", csv_encoding,
            "--features-column", features_column,
            "--labels-column", labels_column
        pvolumes={"/mnt": vop.volume}

    clean_step = dsl.ContainerOp(
            "--in-path", raw_text_path,
            "--out-path", clean_text_path,
        pvolumes={"/mnt": download_step.pvolume}

    tokenize_step = dsl.ContainerOp(
            "--in-path", clean_text_path,
            "--out-path", spacy_tokens_path,
        pvolumes={"/mnt": clean_step.pvolume}

    vectorize_step = dsl.ContainerOp(
            "--in-path", spacy_tokens_path,
            "--out-path", tfidf_vectors_path,
            "--max-features", tfidf_max_features,
            "--ngram-range", tfidf_ngram_range,
            "--action", "train",
            "--model-path", tfidf_model_path,
        pvolumes={"/mnt": tokenize_step.pvolume}

    predict_step = dsl.ContainerOp(
            "--in-path", tfidf_vectors_path,
            "--labels-path", labels_path,
            "--out-path", lr_prediction_path,
            "--c-param", lr_c_param,
            "--action", "train",
            "--model-path", lr_model_path,
        pvolumes={"/mnt": vectorize_step.pvolume}

        seldon_config = yaml.load(open("../deploy_pipeline/seldon_production_pipeline.yaml"))
        # If this file is run from the project core directory
        seldon_config = yaml.load(open("deploy_pipeline/seldon_production_pipeline.yaml"))

    deploy_step = dsl.ResourceOp(
        attribute_outputs={"name": "{.metadata.name}"})


if __name__ == '__main__':
  import kfp.compiler as compiler
  compiler.Compiler().compile(nlp_pipeline, __file__ + '.tar.gz')

Breaking down the codeĀ¶

As you can see in the DSL, we have the ContainerOp - each of those is a step in the Kubeflow pipeline.

At the end we can see the seldondeploy step which basically deploys the trained pipeline

The definition of the SeldonDeployment graph is provided in the deploy_pipeline/seldon_production_pipeline.yaml file.

The seldondeployment file defines our production execution graph using the same reusable components.

!cat deploy_pipeline/seldon_production_pipeline.yaml
apiVersion: machinelearning.seldon.io/v1alpha2
kind: SeldonDeployment
    app: seldon
  name: "seldon-deployment-{{workflow.name}}"
  namespace: kubeflow
    project_name: NLP Pipeline
    deployment_version: v1
  name: "seldon-deployment-{{workflow.name}}"
  - componentSpecs:
    - spec:
        - image: clean_text_transformer:0.1
          imagePullPolicy: IfNotPresent
          name: cleantext
              memory: 1Mi
        - image: spacy_tokenizer:0.1
          imagePullPolicy: IfNotPresent
          name: spacytokenizer
        - image: tfidf_vectorizer:0.1
          imagePullPolicy: IfNotPresent
          name: tfidfvectorizer
          - name: mypvc
            mountPath: /mnt
        - image: lr_text_classifier:0.1
          imagePullPolicy: IfNotPresent
          name: lrclassifier
          - name: mypvc
            mountPath: /mnt
        terminationGracePeriodSeconds: 20
        - name: mypvc
            claimName: "{{workflow.name}}-my-pvc"
      - name: spacytokenizer
          type: REST
        type: MODEL
        - name: tfidfvectorizer
            type: REST
          type: MODEL
          - name: lrclassifier
              type: REST
            type: MODEL
            children: []
      name: cleantext
        type: REST
      type: MODEL
    name: single-model
    replicas: 1
      predictor_version: v1

Seldon Production pipeline contentsĀ¶

If we look at the file weā€™ll be using to deploy our pipeline, we can see that it has the following key points:

  1. Reusable components definitions as containerSpecs: cleantext, spacytokenizer, tfidfvectorizer & lrclassifier

  2. DAG (directed acyclic graph) definition for REST pipeline: cleantext -> spacytokenizer -> tfidfvectorizer -> lrclassifier

This graph in our production deployment looks as follows:


Generate the pipeline files to upload to KubeflowĀ¶

To generate the pipeline we just have to run the pipeline file, which will output the tar.gz file that will be uploaded.

# Generating graph definition
python train_pipeline/nlp_pipeline.py
ls train_pipeline/
/home/alejandro/miniconda3/lib/python3.7/site-packages/kfp/components/_data_passing.py:168: UserWarning: Missing type name was inferred as "Float" based on the value "0.1".
  warnings.warn('Missing type name was inferred as "{}" based on the value "{}".'.format(type_name, str(value)))
/home/alejandro/miniconda3/lib/python3.7/site-packages/kfp/components/_data_passing.py:168: UserWarning: Missing type name was inferred as "Integer" based on the value "10000".
  warnings.warn('Missing type name was inferred as "{}" based on the value "{}".'.format(type_name, str(value)))
/home/alejandro/miniconda3/lib/python3.7/site-packages/kfp/components/_data_passing.py:168: UserWarning: Missing type name was inferred as "Integer" based on the value "3".
  warnings.warn('Missing type name was inferred as "{}" based on the value "{}".'.format(type_name, str(value)))
train_pipeline/nlp_pipeline.py:114: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
  seldon_config = yaml.load(open("deploy_pipeline/seldon_production_pipeline.yaml"))

Run the pipelineĀ¶

You can access the Kubeflow Pipelines UI by forwarding the port with the following command:

kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8000:80

The UI should now be accessible via http://localhost:8000.

We now need to upload the resulting nlp_pipeline.py.tar.gz file generated.

This can be done through the ā€œUpload PIpelineā€ button in the UI.

Once itā€™s uploaded, we want to create and trigger a run! You should now be able to see how each step is executed:


Inspecting the data created in the Persistent VolumeĀ¶

The pipeline saves the output of the pipeline together with the trained model in the persistent volume claim.

The persistent volume claim is the same name as the argo workflow:

!kubectl get workflow -n kubeflow
NAME        AGE
nlp-bddff   2m

Our workflow is there! So we can actually access it by running

!kubectl get workflow -n kubeflow -o jsonpath='{.items[0].metadata.name}'

And we can use good old sed to insert this workflow name in our PVC-Access controler which we can use to inspect the contents of the volume:

!sed "s/PVC_NAME/"$(kubectl get workflow -n kubeflow -o jsonpath='{.items[0].metadata.name}')"-my-pvc/g" deploy_pipeline/pvc-access.yaml
apiVersion: v1
kind: Pod
  name: pvc-access-container
  - name: pvc-access-container
    image: busybox
    command: ["/bin/sh", "-ec", "sleep 1000"]
    - name: mypvc
      mountPath: /mnt
  - name: mypvc
      claimName: nlp-b7qt8-my-pvc

We just need to apply this container with our kubectl command, and we can use it to inspect the mounted folder:

!sed "s/PVC_NAME/"$(kubectl get workflow -n kubeflow -o jsonpath='{.items[0].metadata.name}')"-my-pvc/g" deploy_pipeline/pvc-access.yaml | kubectl -n kubeflow apply -f -
pod/pvc-access-container created
!kubectl get pods -n kubeflow pvc-access-container
NAME                   READY   STATUS    RESTARTS   AGE
pvc-access-container   1/1     Running   0          6s

Now we can run an ls command to see whatā€™s inside:

!kubectl -n kubeflow exec -it pvc-access-container ls /mnt
clean.data       lr.model         text.data        tfidf.model
labels.data      prediction.data  tfidf.data       tokens.data
!kubectl delete -f deploy_pipeline/pvc-access.yaml -n kubeflow
pod "pvc-access-container" deleted

5) Test Deployed ML REST EndpointsĀ¶

Now that itā€™s running we have a production ML text pipeline that we can Query using REST and GRPC

First we can check if our Seldon deployment is running with

!kubectl -n kubeflow get seldondeployment
NAME                          AGE
seldon-deployment-nlp-b7qt8   57m

We will need the Seldon Pipeline Deployment name to reach the API, so we can get it using:

!kubectl -n kubeflow get seldondeployment -o jsonpath='{.items[0].metadata.name}'

Now we can interact with our API in two ways:

  1. Using CURL or any client like PostMan

  2. Using the Python SeldonClient

Using CURL from the terminalĀ¶

When using CURL, the only thing we need to provide is the data in JSON format, as well as the url, which is of the format:

curl -X POST -H 'Content-Type: application/json' \
    -d "{'data': {'names': ['text'], 'ndarray': ['Hello world this is a test']}}" \$(kubectl -n kubeflow get seldondeployment -o jsonpath='{.items[0].metadata.name}')/api/v0.1/predictions
  "meta": {
    "puid": "k89krp6t7tfgb386nt6vc3iftk",
    "tags": {
    "routing": {
      "cleantext": -1,
      "tfidfvectorizer": -1,
      "spacytokenizer": -1
    "requestPath": {
      "cleantext": "clean_text_transformer:0.1",
      "tfidfvectorizer": "tfidf_vectorizer:0.1",
      "lrclassifier": "lr_text_classifier:0.1",
      "spacytokenizer": "spacy_tokenizer:0.1"
    "metrics": []
  "data": {
    "names": ["t:0", "t:1"],
    "ndarray": [[0.6729318752883149, 0.3270681247116851]]
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   599  100   527  100    72    516     70  0:00:01  0:00:01 --:--:--   588

Using the SeldonClientĀ¶

We can also use the Python SeldonClient to interact with the pipeline we just deployed

import subprocess

import numpy as np

from seldon_core.seldon_client import SeldonClient

host = "localhost"
port = "80"  # Make sure you use the port above
batch = np.array(["Hello world this is a test"])
payload_type = "ndarray"
# Get the deployment name
deployment_name = subprocess.getoutput(
    "kubectl -n kubeflow get seldondeployment -o jsonpath='{.items[0].metadata.name}'"
transport = "rest"
namespace = "kubeflow"

sc = SeldonClient(
    gateway="ambassador", ambassador_endpoint=host + ":" + port, namespace=namespace

client_prediction = sc.predict(

Success:True message:
data {
  names: "text"
  ndarray {
    values {
      string_value: "Hello world this is a test"

meta {
  puid: "qtdca40d3s0463nn4ginhkvc6t"
  routing {
    key: "cleantext"
    value: -1
  routing {
    key: "spacytokenizer"
    value: -1
  routing {
    key: "tfidfvectorizer"
    value: -1
  requestPath {
    key: "cleantext"
    value: "clean_text_transformer:0.1"
  requestPath {
    key: "lrclassifier"
    value: "lr_text_classifier:0.1"
  requestPath {
    key: "spacytokenizer"
    value: "spacy_tokenizer:0.1"
  requestPath {
    key: "tfidfvectorizer"
    value: "tfidf_vectorizer:0.1"
data {
  names: "t:0"
  names: "t:1"
  ndarray {
    values {
      list_value {
        values {
          number_value: 0.6729318752883149
        values {
          number_value: 0.3270681247116851

6) Visualise Seldonā€™s Production ML PipelinesĀ¶

We can visualise the performance using the SeldonAnalytics package, which we can deploy using:

!helm install seldon-core-analytics --repo https://storage.googleapis.com/seldon-charts --namespace kubeflow

In my case, similar to what I did with Ambassador, I need to make sure the service is a LoadBalancer instead of a NodePort

!kubectl patch svc grafana-prom --type='json' -p '[{"op":"replace","path":"/spec/type","value":"LoadBalancer"}]' -n kubeflow
service/grafana-prom patched
!kubectl get svc grafana-prom -n kubeflow
NAME           TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)        AGE
grafana-prom   LoadBalancer   localhost     80:32445/TCP   64m

Now we can access it at the port provided, in my case it is http://localhost:32445/d/3swM2iGWz/prediction-analytics?refresh=5s&orgId=1

(initial username is admin and password is password, which will be requested to be changed on the first login)

Generate a bunch of requests and visualise:

while True:
    client_prediction = sc.predict(

You now have a full end-to-end training and production NLP pipelineĀ¶


