Writing an event source using JavaScript

This tutorial provides instructions to build an event source in JavaScript and implement it with a ContainerSource or SinkBinding.

ContainerSource and SinkBinding both work by injecting environment variables to an application. Injected environment variables at minimum contain the URL of a sink that receives events.

Bootstrapping

Create the project and add the dependencies:

  1. npm init -y
  2. npm install cloudevents got --save

Using ContainerSource

A ContainerSource creates a container for your event source image and manages this container.

The sink URL where events are posted will be made available to the application through the K_SINK environment variable by the ContainerSource.

Example

The following example event source emits an event to the sink every second:

  1. // File - index.js
  2. const got = require('got');
  3. const { CloudEvent, Emitter, emitterFor } = require('cloudevents');
  4. const K_SINK = process.env['K_SINK'];
  5. K_SINK || logExit('Error: K_SINK Environment variable is not defined');
  6. console.log(`Sink URL is ${K_SINK}`);
  7. const source = 'urn:event:from:heartbeat/example';
  8. const type = 'heartbeat.example';
  9. let eventIndex = 0;
  10. setInterval(() => {
  11. console.log(`Emitting event # ${++eventIndex}`);
  12. // Create a new CloudEvent each second
  13. const event = new CloudEvent({ source, type, data: {'hello': `World # ${eventIndex}`} });
  14. // Emits the 'cloudevent' Node.js event application-wide
  15. event.emit();
  16. }, 1000);
  17. // Create a function that can post an event
  18. const emit = emitterFor(event => {
  19. got.post(K_SINK, event)
  20. .then(response => {
  21. console.log('Event posted successfully');
  22. console.log(response.data);
  23. })
  24. .catch(err => {
  25. console.log('Error during event post');
  26. console.error(err);
  27. });
  28. });
  29. // Send the CloudEvent any time a Node.js 'cloudevent' event is emitted
  30. Emitter.on('cloudevent', emit);
  31. registerGracefulExit();
  32. function registerGracefulExit() {
  33. process.on('exit', logExit);
  34. //catches ctrl+c event
  35. process.on('SIGINT', logExit);
  36. process.on('SIGTERM', logExit);
  37. // catches 'kill pid' (for example: nodemon restart)
  38. process.on('SIGUSR1', logExit);
  39. process.on('SIGUSR2', logExit);
  40. }
  41. function logExit(message = 'Exiting...') {
  42. // Handle graceful exit
  43. console.log(message);
  44. process.exit();
  45. }
  1. # File - Dockerfile
  2. FROM node:16
  3. WORKDIR /usr/src/app
  4. COPY package*.json ./
  5. RUN npm install
  6. COPY . .
  7. EXPOSE 8080
  8. CMD [ "node", "index.js" ]

Procedure

Before publishing the ContainerSource, you must build the application image, and push it to a container registry that your cluster can access.

  1. Build and push the image:

    1. export REGISTRY=docker.io/myregistry
    2. docker build . -t $REGISTRY/node-heartbeat-source:v1
    3. docker push $REGISTRY/node-heartbeat-source:v1
  2. Create the event display service which logs any CloudEvents posted to it:

    1. apiVersion: serving.knative.dev/v1
    2. kind: Service
    3. metadata:
    4. name: event-display
    5. spec:
    6. template:
    7. spec:
    8. containers:
    9. - image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
  3. Create the ContainerSource object:

    1. apiVersion: sources.knative.dev/v1
    2. kind: ContainerSource
    3. metadata:
    4. name: heartbeat-source
    5. spec:
    6. template:
    7. spec:
    8. containers:
    9. - image: docker.io/myregistry/node-heartbeat-source:v1
    10. name: heartbeats
    11. sink:
    12. ref:
    13. apiVersion: serving.knative.dev/v1
    14. kind: Service
    15. name: event-display
  4. Check the logs of the event-display Service. You can observe that a new message is pushed every second:

    1. $ kubectl logs -l serving.knative.dev/service=event-display -c user-container
    1. ☁️ cloudevents.Event
    2. Validation: valid
    3. Context Attributes,
    4. specversion: 1.0
    5. type: heartbeat.example
    6. source: urn:event:from:heartbeat/example
    7. id: 47e69d34-def7-449b-8382-3652495f9163
    8. datacontenttype: application/json
    9. Data,
    10. {
    11. "hello": "World 1"
    12. }
  5. Optional: If you are interested in seeing what is injected into the event source as a K_SINK, you can check the logs:

    1. $ kubectl logs heartbeat-source-7575c888c7-85w5t
    1. Sink URL is http://event-display.default.svc.cluster.local
    2. Emitting event #1
    3. Emitting event #2
    4. Event posted successfully
    5. Event posted successfully

Using SinkBinding

SinkBinding does not create any containers. It injects the sink information to an already existing Kubernetes resources. This is a flexible approach as you can use any Kubernetes PodSpecable object as an event source, such as Deployment, Job, or Knative services.

Procedure

  1. Create the event-display service:

    1. apiVersion: serving.knative.dev/v1
    2. kind: Service
    3. metadata:
    4. name: event-display
    5. spec:
    6. template:
    7. spec:
    8. containers:
    9. - image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
  2. Create a Kubernetes Deployment that runs the event source:

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: node-heartbeats-deployment
    5. labels:
    6. app: node-heartbeats
    7. spec:
    8. replicas: 2
    9. selector:
    10. matchLabels:
    11. app: node-heartbeats
    12. template:
    13. metadata:
    14. labels:
    15. app: node-heartbeats
    16. spec:
    17. containers:
    18. - name: node-heartbeats
    19. image: docker.io/myregistry/node-heartbeat-source:v1
    20. ports:
    21. - containerPort: 8080
  3. Because the SinkBinding has not yet been created, you will see an error message, because the K_SINK environment variable is not yet injected:

    1. $ kubectl logs node-heartbeats-deployment-9ffbb644b-llkzk
    1. Sink URL is undefined
    2. Emitting event #1
    3. Error during event post
    4. TypeError [ERR_INVALID_ARG_TYPE]: The "url" argument must be of type string. Received type undefined
  4. Create the SinkBinding object:

    1. apiVersion: sources.knative.dev/v1
    2. kind: SinkBinding
    3. metadata:
    4. name: bind-node-heartbeat
    5. spec:
    6. subject:
    7. apiVersion: apps/v1
    8. kind: Deployment
    9. selector:
    10. matchLabels:
    11. app: node-heartbeats
    12. sink:
    13. ref:
    14. apiVersion: serving.knative.dev/v1
    15. kind: Service
    16. name: event-display

    Observe that the Pods are recreated and this time the K_SINK environment variable is injected.

    Since the replicas is set to 2, there are 2 pods that are posting events to the sink:

    1. $ kubectl logs event-display-dpplv-deployment-67c9949cf9-bvjvk -c user-container
    1. ☁️ cloudevents.Event
    2. Validation: valid
    3. Context Attributes,
    4. specversion: 1.0
    5. type: heartbeat.example
    6. source: urn:event:from:heartbeat/example
    7. id: 47e69d34-def7-449b-8382-3652495f9163
    8. datacontenttype: application/json
    9. Data,
    10. {
    11. "hello": "World 1"
    12. }
    13. ☁️ cloudevents.Event
    14. Validation: valid
    15. Context Attributes,
    16. specversion: 1.0
    17. type: heartbeat.example
    18. source: urn:event:from:heartbeat/example
    19. id: 47e69d34-def7-449b-8382-3652495f9163
    20. datacontenttype: application/json
    21. Data,
    22. {
    23. "hello": "World 1"
    24. }