Use EventSource

This document gives an example of how to use an event source to trigger a synchronous function.

In this example, an EventSource is defined for synchronous invocation to use the event source (a Kafka server) as an input bindings of a function (a Knative service). When the event source generates an event, it will invoke the function and get a synchronous return through the spec.sink configuration.

Create a Function

Use the following content to create a function as the EventSource Sink. For more information about how to create a function, see Your First Function: Go.

  1. apiVersion: core.openfunction.io/v1beta1
  2. kind: Function
  3. metadata:
  4. name: sink
  5. spec:
  6. version: "v1.0.0"
  7. image: "openfunction/sink-sample:latest"
  8. port: 8080
  9. serving:
  10. runtime: "knative"
  11. template:
  12. containers:
  13. - name: function
  14. imagePullPolicy: Always

After the function is created, run the following command to get the URL of the function.

Note

In the URL of the function, the openfunction is the name of the Kubernetes Service and the io is the namespace where the Kubernetes Service runs. For more information, see Namespaces of Services.

  1. $ kubectl get functions.core.openfunction.io
  2. NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE
  3. sink Skipped Running serving-4x5wh http://openfunction.io/default/sink 13s

Create a Kafka Cluster

  1. Run the following commands to install strimzi-kafka-operator in the default namespace.

    1. helm repo add strimzi https://strimzi.io/charts/
    2. helm install kafka-operator -n default strimzi/strimzi-kafka-operator
  2. Use the following content to create a file kafka.yaml.

    1. apiVersion: kafka.strimzi.io/v1beta2
    2. kind: Kafka
    3. metadata:
    4. name: kafka-server
    5. namespace: default
    6. spec:
    7. kafka:
    8. version: 3.1.0
    9. replicas: 1
    10. listeners:
    11. - name: plain
    12. port: 9092
    13. type: internal
    14. tls: false
    15. - name: tls
    16. port: 9093
    17. type: internal
    18. tls: true
    19. config:
    20. offsets.topic.replication.factor: 1
    21. transaction.state.log.replication.factor: 1
    22. transaction.state.log.min.isr: 1
    23. default.replication.factor: 1
    24. min.insync.replicas: 1
    25. inter.broker.protocol.version: "3.1"
    26. storage:
    27. type: ephemeral
    28. zookeeper:
    29. replicas: 1
    30. storage:
    31. type: ephemeral
    32. entityOperator:
    33. topicOperator: {}
    34. userOperator: {}
    35. ---
    36. apiVersion: kafka.strimzi.io/v1beta2
    37. kind: KafkaTopic
    38. metadata:
    39. name: events-sample
    40. namespace: default
    41. labels:
    42. strimzi.io/cluster: kafka-server
    43. spec:
    44. partitions: 10
    45. replicas: 1
    46. config:
    47. retention.ms: 7200000
    48. segment.bytes: 1073741824
  3. Run the following command to deploy a 1-replica Kafka server named kafka-server and 1-replica Kafka topic named events-sample in the default namespace. The Kafka and Zookeeper clusters created by this command have a storage type of ephemeral and are demonstrated using emptyDir.

    1. kubectl apply -f kafka.yaml
  4. Run the following command to check pod status and wait for Kafka and Zookeeper to be up and running.

    1. $ kubectl get po
    2. NAME READY STATUS RESTARTS AGE
    3. kafka-server-entity-operator-568957ff84-nmtlw 3/3 Running 0 8m42s
    4. kafka-server-kafka-0 1/1 Running 0 9m13s
    5. kafka-server-zookeeper-0 1/1 Running 0 9m46s
    6. strimzi-cluster-operator-687fdd6f77-cwmgm 1/1 Running 0 11m
  5. Run the following command to view the metadata of the Kafka cluster.

    1. kafkacat -L -b kafka-server-kafka-brokers:9092

Trigger a Synchronous Function

Create an EventSource

  1. Use the following content to create an EventSource configuration file (for example, eventsource-sink.yaml).

    Note

    • The following example defines an event source named my-eventsource and mark the events generated by the specified Kafka server as sample-one events.
    • spec.sink references the target function (Knative service) created in the prerequisites.
    1. apiVersion: events.openfunction.io/v1alpha1
    2. kind: EventSource
    3. metadata:
    4. name: my-eventsource
    5. spec:
    6. logLevel: "2"
    7. kafka:
    8. sample-one:
    9. brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
    10. topic: "events-sample"
    11. authRequired: false
    12. sink:
    13. uri: "http://openfunction.io.svc.cluster.local/default/sink"
  2. Run the following command to apply the configuration file.

    1. kubectl apply -f eventsource-sink.yaml
  3. Run the following commands to check the results.

    1. $ kubectl get eventsources.events.openfunction.io
    2. NAME EVENTBUS SINK STATUS
    3. my-eventsource Ready
    4. $ kubectl get components
    5. NAME AGE
    6. serving-8f6md-component-esc-kafka-sample-one-r527t 68m
    7. serving-8f6md-component-ts-my-eventsource-default-wz8jt 68m
    8. $ kubectl get deployments.apps
    9. NAME READY UP-TO-DATE AVAILABLE AGE
    10. serving-8f6md-deployment-v100-pg9sd 1/1 1 1 68m

    Note

    In this example of triggering a synchronous function, the workflow of the EventSource controller is described as follows:

    1. Create an EventSource custom resource named my-eventsource.
    2. Create a Dapr component named serving-xxxxx-component-esc-kafka-sample-one-xxxxx to enable the EventSource to associate with the event source.
    3. Create a Dapr component named serving-xxxxx-component-ts-my-eventsource-default-xxxxx enable the EventSource to associate with the sink function.
    4. Create a Deployment named serving-xxxxx-deployment-v100-xxxxx-xxxxxxxxxx-xxxxx for processing events.

Create an event producer

To start the target function, you need to create some events to trigger the function.

  1. Use the following content to create an event producer configuration file (for example, events-producer.yaml).

    1. apiVersion: core.openfunction.io/v1beta1
    2. kind: Function
    3. metadata:
    4. name: events-producer
    5. spec:
    6. version: "v1.0.0"
    7. image: openfunctiondev/v1beta1-bindings:latest
    8. serving:
    9. template:
    10. containers:
    11. - name: function
    12. imagePullPolicy: Always
    13. runtime: "async"
    14. inputs:
    15. - name: cron
    16. component: cron
    17. outputs:
    18. - name: target
    19. component: kafka-server
    20. operation: "create"
    21. bindings:
    22. cron:
    23. type: bindings.cron
    24. version: v1
    25. metadata:
    26. - name: schedule
    27. value: "@every 2s"
    28. kafka-server:
    29. type: bindings.kafka
    30. version: v1
    31. metadata:
    32. - name: brokers
    33. value: "kafka-server-kafka-brokers:9092"
    34. - name: topics
    35. value: "events-sample"
    36. - name: consumerGroup
    37. value: "bindings-with-output"
    38. - name: publishTopic
    39. value: "events-sample"
    40. - name: authRequired
    41. value: "false"
  2. Run the following command to apply the configuration file.

    1. kubectl apply -f events-producer.yaml
  3. Run the following command to check the results in real time.

    1. $ kubectl get po --watch
    2. NAME READY STATUS RESTARTS AGE
    3. serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 0/2 ContainerCreating 0 1s
    4. serving-8f6md-deployment-v100-pg9sd-6666c5577f-4rpdg 2/2 Running 0 23m
    5. serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 0/2 ContainerCreating 0 1s
    6. serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 1/2 Running 0 5s
    7. serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 2/2 Running 0 8s
    8. serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 Pending 0 0s
    9. serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 Pending 0 0s
    10. serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 ContainerCreating 0 0s
    11. serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 ContainerCreating 0 2s
    12. serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 1/2 Running 0 4s
    13. serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 1/2 Running 0 4s
    14. serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 2/2 Running 0 4s

Last modified June 18, 2022: Add function inputs and outputs docs (#105) (dd9d200)