Writing an event source using Javascript

This tutorial provides instructions to build an event source in Javascript and implement it with a ContainerSource or SinkBinding.

  • Using a ContainerSource is a simple way to turn any dispatcher container into a Knative event source.
  • Using SinkBinding provides a framework for injecting environment variables into any Kubernetes resource that has a spec.template and is PodSpecable.

ContainerSource and SinkBinding both work by injecting environment variables to an application. Injected environment variables at minimum contain the URL of a sink that will receive events.

Bootstrapping

Create the project and add the dependencies:

  1. npm init
  2. npm install cloudevents-sdk@2.0.1 --save

NOTE: Due to this bug, you must use version 2.0.1 of the Javascript SDK or newer.

Using ContainerSource

A ContainerSource creates a container for your event source image and manages this container.

The sink URL to post the events will be made available to the application through the K_SINK environment variable by the ContainerSource.

Example

The following example event source emits an event to the sink every 1000 milliseconds:

  1. // File - index.js
  2. const { CloudEvent, HTTPEmitter } = require("cloudevents-sdk");
  3. let sinkUrl = process.env['K_SINK'];
  4. console.log("Sink URL is " + sinkUrl);
  5. let emitter = new HTTPEmitter({
  6. url: sinkUrl
  7. });
  8. let eventIndex = 0;
  9. setInterval(function () {
  10. console.log("Emitting event #" + ++eventIndex);
  11. let myevent = new CloudEvent({
  12. source: "urn:event:from:my-api/resource/123",
  13. type: "your.event.source.type",
  14. id: "your-event-id",
  15. dataContentType: "application/json",
  16. data: {"hello": "World " + eventIndex},
  17. });
  18. // Emit the event
  19. emitter.send(myevent)
  20. .then(response => {
  21. // Treat the response
  22. console.log("Event posted successfully");
  23. console.log(response.data);
  24. })
  25. .catch(err => {
  26. // Deal with errors
  27. console.log("Error during event post");
  28. console.error(err);
  29. });
  30. }, 1000);
  1. # File - Dockerfile
  2. FROM node:10
  3. WORKDIR /usr/src/app
  4. COPY package*.json ./
  5. RUN npm install
  6. COPY . .
  7. EXPOSE 8080
  8. CMD [ "node", "index.js" ]

The example code uses Binary mode for CloudEvents. To employ structured code, change let binding = new v1.BinaryHTTPEmitter(config); to let binding = new v1.StructuredHTTPEmitter(config);.

Binary mode is used in most cases because:

  • It is faster in terms of serialization and deserialization.
  • It works better with CloudEvent-aware proxies, such as Knative Channels, and can simply check the header instead of parsing the payload.

Procedure

  1. Build and push the image:

    1. docker build . -t path/to/image/registry/node-knative-heartbeat-source:v1
    2. docker push path/to/image/registry/node-knative-heartbeat-source:v1
  2. Create the event display service which logs any CloudEvents posted to it:

    1. apiVersion: serving.knative.dev/v1
    2. kind: Service
    3. metadata:
    4. name: event-display
    5. spec:
    6. template:
    7. spec:
    8. containers:
    9. - image: docker.io/aliok/event_display-864884f202126ec3150c5fcef437d90c@sha256:93cb4dcda8fee80a1f68662ae6bf20301471b046ede628f3c3f94f39752fbe08
  3. Create the ContainerSource object:

    1. apiVersion: sources.knative.dev/v1
    2. kind: ContainerSource
    3. metadata:
    4. name: test-heartbeats
    5. spec:
    6. template:
    7. spec:
    8. containers:
    9. - image: path/to/image/registry/node-knative-heartbeat-source:v1
    10. name: heartbeats
    11. sink:
    12. ref:
    13. apiVersion: serving.knative.dev/v1
    14. kind: Service
    15. name: event-display
  4. Check the logs of the event display service. You will see a new message is pushed every second:

    1. $ kubectl logs -l serving.knative.dev/service=event-display -c user-container
    2. ☁️ cloudevents.Event
    3. Validation: valid
    4. Context Attributes,
    5. specversion: 1.0
    6. type: your.event.source.type
    7. source: urn:event:from:your-api/resource/123
    8. id: your-event-id
    9. datacontenttype: application/json
    10. Data,
    11. {
    12. "hello": "World 1"
    13. }
  5. Optional: If you are interested in seeing what is injected into the event source as a K_SINK, you can check the logs:

    1. $ kubectl logs test-heartbeats-deployment-7575c888c7-85w5t
    2. Sink URL is http://event-display.default.svc.cluster.local
    3. Emitting event #1
    4. Emitting event #2
    5. Event posted successfully
    6. Event posted successfully

Using SinkBinding

SinkBinding does not create any containers. It injects the sink information to an already existing Kubernetes resources. This is a flexible approach as you can use any Kubernetes PodSpecable object as an event source, such as Deployment, Job, or Knative services.

Procedure

  1. Create an event display service:

    1. apiVersion: serving.knative.dev/v1
    2. kind: Service
    3. metadata:
    4. name: event-display
    5. spec:
    6. template:
    7. spec:
    8. containers:
    9. - image: docker.io/aliok/event_display-864884f202126ec3150c5fcef437d90c@sha256:93cb4dcda8fee80a1f68662ae6bf20301471b046ede628f3c3f94f39752fbe08
  2. Create a Kubernetes deployment that runs the event source:

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: node-heartbeats-deployment
    5. labels:
    6. app: node-heartbeats
    7. spec:
    8. replicas: 2
    9. selector:
    10. matchLabels:
    11. app: node-heartbeats
    12. template:
    13. metadata:
    14. labels:
    15. app: node-heartbeats
    16. spec:
    17. containers:
    18. - name: node-heartbeats
    19. image: path/to/image/registry/node-knative-heartbeat-source:v1
    20. ports:
    21. - containerPort: 8080
  3. Because the SinkBinding has not yet been created, you will see an error message, because the K_SINK environment variable is not yet injected:

    1. $ kubectl logs node-heartbeats-deployment-9ffbb644b-llkzk
    2. Sink URL is undefined
    3. Emitting event #1
    4. Error during event post
    5. TypeError [ERR_INVALID_ARG_TYPE]: The "url" argument must be of type string. Received type undefined
  4. Create the SinkBinding object:

    1. apiVersion: sources.knative.dev/v1
    2. kind: SinkBinding
    3. metadata:
    4. name: bind-node-heartbeat
    5. spec:
    6. subject:
    7. apiVersion: apps/v1
    8. kind: Deployment
    9. selector:
    10. matchLabels:
    11. app: node-heartbeats
    12. sink:
    13. ref:
    14. apiVersion: serving.knative.dev/v1
    15. kind: Service
    16. name: event-display

    You will see the pods are recreated and this time the K_SINK environment variable is injected.

    Also note that since the replicas is set to 2, there will be 2 pods that are posting events to the sink.

    1. $ kubectl logs event-display-dpplv-deployment-67c9949cf9-bvjvk -c user-container
    2. ☁️ cloudevents.Event
    3. Validation: valid
    4. Context Attributes,
    5. specversion: 1.0
    6. type: your.event.source.type
    7. source: urn:event:from:your-api/resource/123
    8. id: your-event-id
    9. datacontenttype: application/json
    10. Data,
    11. {
    12. "hello": "World 1"
    13. }
    14. ☁️ cloudevents.Event
    15. Validation: valid
    16. Context Attributes,
    17. specversion: 1.0
    18. type: your.event.source.type
    19. source: urn:event:from:your-api/resource/123
    20. id: your-event-id
    21. datacontenttype: application/json
    22. Data,
    23. {
    24. "hello": "World 1"
    25. }