This page was generated from components/drift-detection/cifar10/cifar10_drift.ipynb.
Cifar10 Drift Detection¶
In this example we will deploy an image classification model along with a drift detector trained on the same dataset. For in depth details on creating a drift detection model for your own dataset see the alibi-detect project and associated documentation. You can find details for this CIFAR10 example in their documentation as well.
Prequisites:
-
Ensure the istio-ingressgateway is exposed as a loadbalancer (no auth in this demo)
-
Ensure you install for istio, e.g. for the helm chart
--set istio.enabled=true
Tested on GKE and Kind with Knative 0.18 and Istio 1.7.3
[ ]:
!pip install -r requirements_notebook.txt
Ensure gateway installed
[ ]:
!kubectl apply -f ../../../notebooks/resources/seldon-gateway.yaml
Setup Resources¶
[ ]:
!kubectl create namespace cifar10drift
[ ]:
%%writefile broker.yaml
apiVersion: eventing.knative.dev/v1
kind: broker
metadata:
name: default
namespace: cifar10drift
[ ]:
!kubectl create -f broker.yaml
[ ]:
%%writefile event-display.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: hello-display
namespace: cifar10drift
spec:
replicas: 1
selector:
matchLabels: &labels
app: hello-display
template:
metadata:
labels: *labels
spec:
containers:
- name: event-display
image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
---
kind: Service
apiVersion: v1
metadata:
name: hello-display
namespace: cifar10drift
spec:
selector:
app: hello-display
ports:
- protocol: TCP
port: 80
targetPort: 8080
[ ]:
!kubectl apply -f event-display.yaml
Create the SeldonDeployment image classification model for Cifar10. We add in a logger
for requests - the default destination is the namespace Knative Broker.
[ ]:
%%writefile cifar10.yaml
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: tfserving-cifar10
namespace: cifar10drift
spec:
protocol: tensorflow
transport: rest
predictors:
- componentSpecs:
- spec:
containers:
- args:
- --port=8500
- --rest_api_port=8501
- --model_name=resnet32
- --model_base_path=gs://seldon-models/tfserving/cifar10/resnet32
image: tensorflow/serving
name: resnet32
ports:
- containerPort: 8501
name: http
protocol: TCP
graph:
name: resnet32
type: MODEL
endpoint:
service_port: 8501
logger:
mode: all
url: http://broker-ingress.knative-eventing.svc.cluster.local/cifar10drift/default
name: model
replicas: 1
[ ]:
!kubectl apply -f cifar10.yaml
Create the pretrained Drift Detector. We forward replies to the message-dumper we started. Notice the drift_batch_size
. The drift detector will wait until drify_batch_size
number of requests are received before making a drift prediction.
Here we configure seldonio/alibi-detect-server
to use rclone for downloading the artifact. If RCLONE_ENABLED=true
environmental variable is set or any of the environmental variables contain RCLONE_CONFIG
in their name then rclone will be used to download the artifacts. If RCLONE_ENABLED=false
or no RCLONE_CONFIG
variables are present then kfserving storage.py logic will be used to download the artifacts.
[ ]:
%%writefile cifar10cd.yaml
apiVersion: v1
kind: Secret
metadata:
name: seldon-rclone-secret
namespace: cifar10drift
type: Opaque
stringData:
RCLONE_CONFIG_GS_TYPE: google cloud storage
RCLONE_CONFIG_GS_ANONYMOUS: "true"
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: drift-detector
namespace: cifar10drift
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/minScale: "1"
spec:
containers:
- image: seldonio/alibi-detect-server:1.8.0-dev
imagePullPolicy: IfNotPresent
args:
- --model_name
- cifar10cd
- --http_port
- '8080'
- --protocol
- tensorflow.http
- --storage_uri
- gs://seldon-models/alibi-detect/cd/ks/cifar10-0_6_2
- --reply_url
- http://hello-display.cifar10drift
- --event_type
- io.seldon.serving.inference.drift
- --event_source
- io.seldon.serving.cifar10cd
- DriftDetector
- --drift_batch_size
- '5000'
envFrom:
- secretRef:
name: seldon-rclone-secret
[ ]:
!kubectl apply -f cifar10cd.yaml
Create a Knative trigger to forward logging events to our Drift Detector.
[ ]:
%%writefile trigger.yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: drift-trigger
namespace: cifar10drift
spec:
broker: default
filter:
attributes:
type: io.seldon.serving.inference.request
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: drift-detector
namespace: cifar10drift
[ ]:
!kubectl apply -f trigger.yaml
Get the IP address of the Istio Ingress Gateway. This assumes you have installed istio with a LoadBalancer.
[ ]:
CLUSTER_IPS = !(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
CLUSTER_IP = CLUSTER_IPS[0]
print(CLUSTER_IP)
If you are using Kind or Minikube you will need to port-forward to the istio ingressgateway and uncomment the following
[ ]:
# CLUSTER_IP="localhost:8004"
[ ]:
SERVICE_HOSTNAMES = !(kubectl get ksvc -n cifar10drift drift-detector -o jsonpath='{.status.url}' | cut -d "/" -f 3)
SERVICE_HOSTNAME_CD = SERVICE_HOSTNAMES[0]
print(SERVICE_HOSTNAME_CD)
[ ]:
import json
import matplotlib.pyplot as plt
import numpy as np
import requests
import tensorflow as tf
tf.keras.backend.clear_session()
train, test = tf.keras.datasets.cifar10.load_data()
X_train, y_train = train
X_test, y_test = test
X_train = X_train.astype("float32") / 255
X_test = X_test.astype("float32") / 255
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
classes = (
"plane",
"car",
"bird",
"cat",
"deer",
"dog",
"frog",
"horse",
"ship",
"truck",
)
def show(X):
plt.imshow(X.reshape(32, 32, 3))
plt.axis("off")
plt.show()
def predict(X):
formData = {"instances": X.tolist()}
headers = {}
res = requests.post(
"http://"
+ CLUSTER_IP
+ "/seldon/cifar10drift/tfserving-cifar10/v1/models/resnet32/:predict",
json=formData,
headers=headers,
)
if res.status_code == 200:
j = res.json()
if len(j["predictions"]) == 1:
return classes[np.array(j["predictions"])[0].argmax()]
else:
print("Failed with ", res.status_code)
return []
def drift(X):
formData = {"instances": X.tolist()}
headers = {}
headers = {
"ce-namespace": "default",
"ce-modelid": "cifar10drift",
"ce-type": "io.seldon.serving.inference.request",
"ce-id": "1234",
"ce-source": "localhost",
"ce-specversion": "1.0",
}
headers["Host"] = SERVICE_HOSTNAME_CD
res = requests.post("http://" + CLUSTER_IP + "/", json=formData, headers=headers)
if res.status_code == 200:
od = res.json()
return od
else:
print("Failed with ", res.status_code)
return []
Normal Prediction¶
[ ]:
idx = 1
X = X_train[idx : idx + 1]
show(X)
predict(X)
Test Drift¶
We need to accumulate a large enough batch size so no drift will be tested as yet.
[ ]:
!kubectl logs -n cifar10drift $(kubectl get pod -n cifar10drift -l app=hello-display -o jsonpath='{.items[0].metadata.name}')
We will now send 5000 requests to the model in batches. The drift detector will run at the end of this as we set the drift_batch_size
to 5000 in our yaml above.
[ ]:
from tqdm import tqdm
for i in tqdm(range(0, 5000, 100)):
X = X_train[i : i + 100]
predict(X)
Let’s check the message dumper and extract the first drift result.
[ ]:
!kubectl logs -n cifar10drift $(kubectl get pod -n cifar10drift -l app=hello-display -o jsonpath='{.items[0].metadata.name}')
Now, let’s create some CIFAR10 examples with motion blur.
[ ]:
from alibi_detect.datasets import corruption_types_cifar10c, fetch_cifar10c
corruption = ["motion_blur"]
X_corr, y_corr = fetch_cifar10c(corruption=corruption, severity=5, return_X_y=True)
X_corr = X_corr.astype("float32") / 255
[ ]:
show(X_corr[0])
show(X_corr[1])
show(X_corr[2])
Send these examples to the predictor.
[ ]:
for i in tqdm(range(0, 5000, 100)):
X = X_corr[i : i + 100]
predict(X)
Now when we check the message dump we should find a new drift response.
[ ]:
!kubectl logs -n cifar10drift $(kubectl get pod -n cifar10drift -l app=hello-display -o jsonpath='{.items[0].metadata.name}')
Tear Down¶
[ ]:
!kubectl delete ns cifar10drift
[ ]: