Writing an Event Source using the easy way

Introduction

As stated in tutorial on writing a Source with a Receive Adapter, there are multiple ways to create event sources. The way in that tutorial is to create an independent event source that has its own CRD.

This tutorial provides a simpler mechanism to build an event source in Javascript and use it with ContainerSource and / or the SinkBinding.

ContainerSource is an easy way to turn any dispatcher container into an Event Source. Similarly, another option is using SinkBinding which provides a framework for injecting environment variables into any Kubernetes resource which has a spec.template that looks like a Pod (aka PodSpecable).

SinkBinding is a newer concept and it should be preferred over ContainerSource.

Code for this tutorial is available here.

Bootstrapping

Create the project and add the dependencies:

  1. npm init
  2. npm install cloudevents-sdk@2.0.1 --save

Please note that because of a bug, you will need at least 2.0.1 version of the Javascript SDK.

Making use of ContainerSource

ContainerSource and SinkBinding both work by injecting environment variables to an application. Injected environment variables at minimum contain the URL of a sink that will receive events.

Following example emits an event to the sink every 1000 milliseconds. The sink URL to post the events will be made available to the application via the K_SINK environment variable by ContainerSource.

  1. // File - index.js
  2. const { CloudEvent, HTTPEmitter } = require("cloudevents-sdk");
  3. let sinkUrl = process.env['K_SINK'];
  4. console.log("Sink URL is " + sinkUrl);
  5. let emitter = new HTTPEmitter({
  6. url: sinkUrl
  7. });
  8. let eventIndex = 0;
  9. setInterval(function () {
  10. console.log("Emitting event #" + ++eventIndex);
  11. let myevent = new CloudEvent({
  12. source: "urn:event:from:my-api/resource/123",
  13. type: "your.event.source.type",
  14. id: "your-event-id",
  15. dataContentType: "application/json",
  16. data: {"hello": "World " + eventIndex},
  17. });
  18. // Emit the event
  19. emitter.send(myevent)
  20. .then(response => {
  21. // Treat the response
  22. console.log("Event posted successfully");
  23. console.log(response.data);
  24. })
  25. .catch(err => {
  26. // Deal with errors
  27. console.log("Error during event post");
  28. console.error(err);
  29. });
  30. }, 1000);
  1. # File - Dockerfile
  2. FROM node:10
  3. WORKDIR /usr/src/app
  4. COPY package*.json ./
  5. RUN npm install
  6. COPY . .
  7. EXPOSE 8080
  8. CMD [ "node", "index.js" ]

Build and push the image:

  1. docker build . -t path/to/image/registry/node-knative-heartbeat-source:v1
  2. docker push path/to/image/registry/node-knative-heartbeat-source:v1

Create the event display service which simply logs any cloudevents posted to it.

  1. cat <<EOS |kubectl apply -f -
  2. ---
  3. apiVersion: serving.knative.dev/v1
  4. kind: Service
  5. metadata:
  6. name: event-display
  7. spec:
  8. template:
  9. spec:
  10. containers:
  11. - image: docker.io/aliok/event_display-864884f202126ec3150c5fcef437d90c@sha256:93cb4dcda8fee80a1f68662ae6bf20301471b046ede628f3c3f94f39752fbe08
  12. EOS

Create the ContainerSource:

  1. cat <<EOS |kubectl apply -f -
  2. ---
  3. apiVersion: sources.knative.dev/v1alpha2
  4. kind: ContainerSource
  5. metadata:
  6. name: test-heartbeats
  7. spec:
  8. template:
  9. spec:
  10. containers:
  11. - image: path/to/image/registry/node-knative-heartbeat-source:v1
  12. name: heartbeats
  13. sink:
  14. ref:
  15. apiVersion: serving.knative.dev/v1
  16. kind: Service
  17. name: event-display
  18. EOS

Check the logs of the event display service. You will see a new message is pushed every second:

  1. $ kubectl logs -l serving.knative.dev/service=event-display -c user-container
  2. ☁️ cloudevents.Event
  3. Validation: valid
  4. Context Attributes,
  5. specversion: 1.0
  6. type: your.event.source.type
  7. source: urn:event:from:your-api/resource/123
  8. id: your-event-id
  9. datacontenttype: application/json
  10. Data,
  11. {
  12. "hello": "World 1"
  13. }

If you are interested in seeing what is injected into the event source as a K_SINK, you can check the logs:

  1. $ kubectl logs test-heartbeats-deployment-7575c888c7-85w5t
  2. Sink URL is http://event-display.default.svc.cluster.local
  3. Emitting event #1
  4. Emitting event #2
  5. Event posted successfully
  6. Event posted successfully

Please note that the example code above is using Binary mode for CloudEvents. Simply change

  1. let binding = new v1.BinaryHTTPEmitter(config);

with

  1. let binding = new v1.StructuredHTTPEmitter(config);

to employ structured mode.

However, binary mode should be used in most of the cases as:

  • It is faster in terms of serialization and deserialization
  • It works better with cloudevents-aware proxies (like Knative Channels) can simply check the header instead of parsing the payload

Making use of SinkBinding

SinkBinding is a more powerful way of making any Kubernetes resource an event source.

ContainerSource will create the container for your event source’s image and it will be ContainerSource responsibility to manage the container.

SinkBinding though, will not create any containers. It will inject the sink information to the already existing Kubernetes resources. This is a more flexible approach as you can use any Kubernetes PodSpecable as an event source, such as Deployment, Job, Knative Service, DaemonSet etc.

We don’t need any code changes in our source for making it work with SinkBinding.

Create the event display as in the section before:

  1. cat <<EOS |kubectl apply -f -
  2. ---
  3. apiVersion: serving.knative.dev/v1
  4. kind: Service
  5. metadata:
  6. name: event-display
  7. spec:
  8. template:
  9. spec:
  10. containers:
  11. - image: docker.io/aliok/event_display-864884f202126ec3150c5fcef437d90c@sha256:93cb4dcda8fee80a1f68662ae6bf20301471b046ede628f3c3f94f39752fbe08
  12. EOS

Create a Kubernetes deployment that runs the event source:

  1. cat <<EOS |kubectl apply -f -
  2. ---
  3. apiVersion: apps/v1
  4. kind: Deployment
  5. metadata:
  6. name: node-heartbeats-deployment
  7. labels:
  8. app: node-heartbeats
  9. spec:
  10. replicas: 2
  11. selector:
  12. matchLabels:
  13. app: node-heartbeats
  14. template:
  15. metadata:
  16. labels:
  17. app: node-heartbeats
  18. spec:
  19. containers:
  20. - name: node-heartbeats
  21. image: path/to/image/registry/node-knative-heartbeat-source:v1
  22. ports:
  23. - containerPort: 8080
  24. EOS

As the SinkBinding is not created yet, K_SINK environment variable is not yet injected and the event source will complain about that.

  1. $ kubectl logs node-heartbeats-deployment-9ffbb644b-llkzk
  2. Sink URL is undefined
  3. Emitting event #1
  4. Error during event post
  5. TypeError [ERR_INVALID_ARG_TYPE]: The "url" argument must be of type string. Received type undefined

Create the SinkBinding:

  1. cat <<EOS |kubectl apply -f -
  2. ---
  3. apiVersion: sources.knative.dev/v1alpha1
  4. kind: SinkBinding
  5. metadata:
  6. name: bind-node-heartbeat
  7. spec:
  8. subject:
  9. apiVersion: apps/v1
  10. kind: Deployment
  11. selector:
  12. matchLabels:
  13. app: node-heartbeats
  14. sink:
  15. ref:
  16. apiVersion: serving.knative.dev/v1
  17. kind: Service
  18. name: event-display
  19. EOS

You will see the pods are recreated and this time the K_SINK environment variable is injected.

Also note that since the replicas is set to 2, there will be 2 pods that are posting events to the sink.

  1. $ kubectl logs event-display-dpplv-deployment-67c9949cf9-bvjvk -c user-container
  2. ☁️ cloudevents.Event
  3. Validation: valid
  4. Context Attributes,
  5. specversion: 1.0
  6. type: your.event.source.type
  7. source: urn:event:from:your-api/resource/123
  8. id: your-event-id
  9. datacontenttype: application/json
  10. Data,
  11. {
  12. "hello": "World 1"
  13. }
  14. ☁️ cloudevents.Event
  15. Validation: valid
  16. Context Attributes,
  17. specversion: 1.0
  18. type: your.event.source.type
  19. source: urn:event:from:your-api/resource/123
  20. id: your-event-id
  21. datacontenttype: application/json
  22. Data,
  23. {
  24. "hello": "World 1"
  25. }