End to end inference service example with Minio and Kafka

This example shows an end to end inference pipeline which processes an kafka event and invoke the inference service to get the prediction with provided pre/post processing code.

diagram

Deploy Kafka

If you do not have an existing kafka cluster, you can run the following commands to install in-cluster kafka using helm3 with persistence turned off.

  1. helm repo add confluentinc https://confluentinc.github.io/cp-helm-charts/
  2. helm repo update
  3. helm install my-kafka -f values.yaml --set cp-schema-registry.enabled=false,cp-kafka-rest.enabled=false,cp-kafka-connect.enabled=false confluentinc/cp-helm-charts

after successful install you are expected to see the running kafka cluster

  1. NAME READY STATUS RESTARTS AGE
  2. my-kafka-cp-kafka-0 2/2 Running 0 126m
  3. my-kafka-cp-kafka-1 2/2 Running 1 126m
  4. my-kafka-cp-kafka-2 2/2 Running 0 126m
  5. my-kafka-cp-zookeeper-0 2/2 Running 0 127m

Install Knative Eventing and Kafka Event Source

  • Install Knative Eventing Core >= 0.18

    1. kubectl apply -f https://github.com/knative/eventing/releases/download/v0.25.0/eventing-crds.yaml
    2. kubectl apply -f https://github.com/knative/eventing/releases/download/v0.25.0/eventing-core.yaml
  • Install Kafka Event Source.

    1. kubectl apply -f https://github.com/knative-sandbox/eventing-kafka/releases/download/v0.25.3/source.yaml
  • Install InferenceService addressable cluster role

    1. kubectl apply -f addressable-resolver.yaml

Deploy Minio

  • If you do not have Minio setup in your cluster, you can run following command to install Minio test instance.

    1. kubectl apply -f minio.yaml
  • Install Minio client mc

    1. # Run port forwarding command in a different terminal
    2. kubectl port-forward $(kubectl get pod --selector="app=minio" --output jsonpath='{.items[0].metadata.name}') 9000:9000
    3. mc config host add myminio http://127.0.0.1:9000 minio minio123
  • Create buckets mnist for uploading images and digit-[0-9] for classification.

    1. mc mb myminio/mnist
    2. mc mb myminio/digit-[0-9]
  • Setup event notification to publish events to kafka.

    1. # Setup bucket event notification with kafka
    2. mc admin config set myminio notify_kafka:1 tls_skip_verify="off" queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" client_tls_cert="" client_tls_key="" brokers="my-kafka-cp-kafka-headless:9092" topic="mnist" version=""
    3. # Restart minio
    4. mc admin service restart myminio
    5. # Setup event notification when putting images to the bucket
    6. mc event add myminio/mnist arn:minio:sqs::1:kafka -p --event put --suffix .png

Upload the mnist model to Minio

  1. gsutil cp -r gs://kfserving-examples/models/tensorflow/mnist .
  2. mc cp -r mnist myminio/

Create S3 Secret for Minio and attach to Service Account

KServe gets the secrets from your service account, you need to add the created or existing secret to your service account’s secret list. By default KServe uses default service account, user can use own service account and overwrite on InferenceService CRD.

Apply the secret and attach the secret to the service account.

  1. kubectl apply -f s3-secret.yaml

Build mnist transformer image

The transformation image implements the preprocess handler to process the minio notification event to download the image from minio and transform image bytes to tensors. The postprocess handler processes the prediction and upload the image to the classified minio bucket digit-[0-9].

  1. docker build -t $USER/mnist-transformer:latest -f ./transformer.Dockerfile . --rm
  2. docker push $USER/mnist-transformer:latest

Create the InferenceService

Specify the built image on Transformer spec and apply the inference service CRD.

  1. kubectl apply -f mnist-kafka.yaml

This creates transformer and predictor pods, the request goes to transformer first where it invokes the preprocess handler, transformer then calls out to predictor to get the prediction response which in turn invokes the postprocess handler.

  1. kubectl get pods -l serving.kubeflow.org/inferenceservice=mnist
  2. mnist-predictor-default-9t5ms-deployment-74f5cd7767-khthf 2/2 Running 0 10s
  3. mnist-transformer-default-jmf98-deployment-8585cbc748-ftfhd 2/2 Running 0 14m

Create kafka event source

Apply kafka event source which creates the kafka consumer pod to pull the events from kafka and deliver to inference service.

  1. kubectl apply -f kafka-source.yaml

This creates the kafka source pod which consumers the events from mnist topic

  1. kafkasource-kafka-source-3d809fe2-1267-11ea-99d0-42010af00zbn5h 1/1 Running 0 8h

Upload a digit image to Minio mnist bucket

The last step is to upload the image images/0.png, image then should be moved to the classified bucket based on the prediction response!

  1. mc cp images/0.png myminio/mnist

you should expect a notification event like following sent to kafka topic mnist after uploading an image in mnist bucket

  1. {
  2. "EventType":"s3:ObjectCreated:Put",
  3. "Key":"mnist/0.png",
  4. "Records":[
  5. {"eventVersion":"2.0",
  6. "eventSource":"minio:s3",
  7. "awsRegion":"",
  8. "eventTime":"2019-11-17T19:08:08Z",
  9. "eventName":"s3:ObjectCreated:Put",
  10. "userIdentity":{"principalId":"minio"},
  11. "requestParameters":{"sourceIPAddress":"127.0.0.1:37830"},
  12. "responseElements":{"x-amz-request-id":"15D808BF706E0994",
  13. "x-minio-origin-endpoint":"http://10.244.0.71:9000"},
  14. "s3":{
  15. "s3SchemaVersion":"1.0",
  16. "configurationId":"Config",
  17. "bucket":{
  18. "name":"mnist",
  19. "ownerIdentity":{"principalId":"minio"},
  20. "arn":"arn:aws:s3:::mnist"},
  21. "object":{"key":"0.png","size":324,"eTag":"ebed21f6f77b0a64673a3c96b0c623ba","contentType":"image/png","userMetadata":{"content-type":"image/png"},"versionId":"1","sequencer":"15D808BF706E0994"}},
  22. "source":{"host":"","port":"","userAgent":""}}
  23. ],
  24. "level":"info",
  25. "msg":"",
  26. "time":"2019-11-17T19:08:08Z"
  27. }

Check the transformer log, you should expect a prediction response and put the image to the corresponding bucket

  1. kubectl logs mnist-transformer-default-rctjm-deployment-54d59c849c-2dq98 kserve-container
  2. [I 201128 22:32:27 kfserver:88] Registering model: mnist
  3. [I 201128 22:32:27 kfserver:77] Listening on port 8080
  4. [I 201128 22:32:27 kfserver:79] Will fork 0 workers
  5. [I 201128 22:32:27 process:123] Starting 6 processes
  6. [I 201128 22:32:44 connectionpool:203] Starting new HTTP connection (1): minio-service
  7. [I 201128 22:32:58 image_transformer:51] {'predictions': [{'predictions': [0.0247901566, 1.37231364e-05, 0.0202635303, 0.39037028, 0.000513458275, 0.435112566, 0.000607515569, 0.00041125578, 0.127784252, 0.000133168287], 'classes': 5}]}
  8. [I 201128 22:32:58 image_transformer:53] digit:5