Using Sequence with Broker and Trigger

We are going to create the following logical configuration. We create a PingSource, feeding events into the Broker, then we create a Filter that wires those events into a Sequence consisting of 3 steps. Then we take the end of the Sequence and feed newly minted events back into the Broker and create another Trigger which will then display those events.

Prerequisites

For this example, we’ll assume you have set up an InMemoryChannel as well as Knative Serving (for our functions). The examples use default namespace, again, if your broker lives in another Namespace, you will need to modify the examples to reflect this.

If you want to use different type of Channel, you will have to modify the Sequence.Spec.ChannelTemplate to create the appropriate Channel resources.

Logical Configuration

The functions used in these examples live in https://github.com/knative/eventing-contrib/blob/master/cmd/appender/main.go.

Setup

Creating the Broker

The easiest way to create a Broker is to annotate your namespace:

  1. kubectl label namespace default knative-eventing-injection=enabled

Create the Knative Services

Change default below to create the steps in the Namespace where you have configured your Broker

  1. apiVersion: serving.knative.dev/v1
  2. kind: Service
  3. metadata:
  4. name: first
  5. spec:
  6. template:
  7. spec:
  8. containers:
  9. - image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/appender
  10. env:
  11. - name: MESSAGE
  12. value: " - Handled by 0"
  13. ---
  14. apiVersion: serving.knative.dev/v1
  15. kind: Service
  16. metadata:
  17. name: second
  18. spec:
  19. template:
  20. spec:
  21. containers:
  22. - image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/appender
  23. env:
  24. - name: MESSAGE
  25. value: " - Handled by 1"
  26. ---
  27. apiVersion: serving.knative.dev/v1
  28. kind: Service
  29. metadata:
  30. name: third
  31. spec:
  32. template:
  33. spec:
  34. containers:
  35. - image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/appender
  36. env:
  37. - name: MESSAGE
  38. value: " - Handled by 2"
  39. ---
  1. kubectl -n default create -f ./steps.yaml

Create the Sequence

The sequence.yaml file contains the specifications for creating the Sequence. If you are using a different type of Channel, you need to change the spec.channelTemplate to point to your desired Channel.

Also, change the spec.reply.name to point to your Broker

  1. apiVersion: flows.knative.dev/v1beta1
  2. kind: Sequence
  3. metadata:
  4. name: sequence
  5. spec:
  6. channelTemplate:
  7. apiVersion: messaging.knative.dev/v1beta1
  8. kind: InMemoryChannel
  9. steps:
  10. - ref:
  11. apiVersion: serving.knative.dev/v1
  12. kind: Service
  13. name: first
  14. - ref:
  15. apiVersion: serving.knative.dev/v1
  16. kind: Service
  17. name: second
  18. - ref:
  19. apiVersion: serving.knative.dev/v1
  20. kind: Service
  21. name: third
  22. reply:
  23. ref:
  24. kind: Broker
  25. apiVersion: eventing.knative.dev/v1beta1
  26. name: default

Change default below to create the Sequence in the Namespace where you have configured your Broker.

  1. kubectl -n default create -f ./sequence.yaml

Create the PingSource targeting the Broker

This will create a PingSource which will send a CloudEvent with {“message”: “Hello world!”} as the data payload every 2 minutes.

  1. apiVersion: sources.knative.dev/v1alpha2
  2. kind: PingSource
  3. metadata:
  4. name: ping-source
  5. spec:
  6. schedule: "*/2 * * * *"
  7. jsonData: '{"message": "Hello world!"}'
  8. sink:
  9. ref:
  10. apiVersion: eventing.knative.dev/v1beta1
  11. kind: Broker
  12. name: default

Here, if you are using different type of Channel, you need to change the spec.channelTemplate to point to your desired Channel. Also, change the spec.reply.name to point to your Broker

Change default below to create the Sequence in the Namespace where you have configured your Broker.

  1. kubectl -n default create -f ./ping-source.yaml

Create the Trigger targeting the Sequence

  1. apiVersion: eventing.knative.dev/v1beta1
  2. kind: Trigger
  3. metadata:
  4. name: sequence-trigger
  5. spec:
  6. filter:
  7. attributes:
  8. type: dev.knative.sources.ping
  9. subscriber:
  10. ref:
  11. apiVersion: flows.knative.dev/v1beta1
  12. kind: Sequence
  13. name: sequence

Change default below to create the Sequence in the Namespace where you have configured your Broker.

  1. kubectl -n default create -f ./trigger.yaml

Create the Service and Trigger displaying the events created by Sequence

  1. apiVersion: serving.knative.dev/v1
  2. kind: Service
  3. metadata:
  4. name: sequence-display
  5. spec:
  6. template:
  7. spec:
  8. containers:
  9. - image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/appender
  10. ---
  11. apiVersion: eventing.knative.dev/v1alpha1
  12. kind: Trigger
  13. metadata:
  14. name: display-trigger
  15. spec:
  16. filter:
  17. attributes:
  18. type: samples.http.mod3
  19. subscriber:
  20. ref:
  21. apiVersion: serving.knative.dev/v1
  22. kind: Service
  23. name: sequence-display
  24. ---

Change default below to create the Service and Trigger in the Namespace where you have configured your Broker.

  1. kubectl -n default create -f ./display-trigger.yaml

Inspecting the results

You can now see the final output by inspecting the logs of the event-display pods.

  1. kubectl -n default get pods

Then look at the logs for the event-display pod:

  1. kubectl -n default logs -l serving.knative.dev/service=sequence-display -c user-container --tail=-1
  2. ☁️ cloudevents.Event
  3. Validation: valid
  4. Context Attributes,
  5. specversion: 1.0
  6. type: samples.http.mod3
  7. source: /apis/v1/namespaces/default/pingsources/ping-source
  8. id: 159bba01-054a-4ae7-b7be-d4e7c5f773d2
  9. time: 2020-03-03T14:56:00.000652027Z
  10. datacontenttype: application/json
  11. Extensions,
  12. knativearrivaltime: 2020-03-03T14:56:00.018390608Z
  13. knativehistory: default-kne-trigger-kn-channel.default.svc.cluster.local; sequence-kn-sequence-0-kn-channel.default.svc.cluster.local; sequence-kn-sequence-1-kn-channel.default.svc.cluster.local; sequence-kn-sequence-2-kn-channel.default.svc.cluster.local; default-kne-trigger-kn-channel.default.svc.cluster.local
  14. traceparent: 00-e893412106ff417a90a5695e53ffd9cc-5829ae45a14ed462-00
  15. Data,
  16. {
  17. "id": 0,
  18. "message": "Hello world! - Handled by 0 - Handled by 1 - Handled by 2"
  19. }

And you can see that the initial PingSource message {"Hello World!"} has been appended to it by each of the steps in the Sequence.