Create a Knative-based Function to Interact with Middleware

Learn how to create a Knative-based function to interact with middleware via Dapr components.

This document describes how to create a Knative-based function to interact with middleware via Dapr components.

Overview

Similar to asynchronous functions, the functions that are based on Knative runtime can interact with middleware through Dapr components. This document uses two functions, function-front and kafka-input, for demonstration.

The following diagram illustrates the relationship between these functions.

Create a Knative-based Function to Interact with Middleware - 图1

Prerequisites

Create a Kafka Server and Topic

  1. Run the following commands to install strimzi-kafka-operator in the default namespace.

    1. helm repo add strimzi https://strimzi.io/charts/
    2. helm install kafka-operator -n default strimzi/strimzi-kafka-operator
  2. Use the following content to create a file kafka.yaml.

    1. apiVersion: kafka.strimzi.io/v1beta2
    2. kind: Kafka
    3. metadata:
    4. name: kafka-server
    5. namespace: default
    6. spec:
    7. kafka:
    8. version: 3.3.1
    9. replicas: 1
    10. listeners:
    11. - name: plain
    12. port: 9092
    13. type: internal
    14. tls: false
    15. - name: tls
    16. port: 9093
    17. type: internal
    18. tls: true
    19. config:
    20. offsets.topic.replication.factor: 1
    21. transaction.state.log.replication.factor: 1
    22. transaction.state.log.min.isr: 1
    23. default.replication.factor: 1
    24. min.insync.replicas: 1
    25. inter.broker.protocol.version: "3.1"
    26. storage:
    27. type: ephemeral
    28. zookeeper:
    29. replicas: 1
    30. storage:
    31. type: ephemeral
    32. entityOperator:
    33. topicOperator: {}
    34. userOperator: {}
    35. ---
    36. apiVersion: kafka.strimzi.io/v1beta2
    37. kind: KafkaTopic
    38. metadata:
    39. name: sample-topic
    40. namespace: default
    41. labels:
    42. strimzi.io/cluster: kafka-server
    43. spec:
    44. partitions: 10
    45. replicas: 1
    46. config:
    47. retention.ms: 7200000
    48. segment.bytes: 1073741824
  3. Run the following command to deploy a 1-replica Kafka server named kafka-server and 1-replica Kafka topic named sample-topic in the default namespace.

    1. kubectl apply -f kafka.yaml
  4. Run the following command to check pod status and wait for Kafka and Zookeeper to be up and running.

    1. $ kubectl get po
    2. NAME READY STATUS RESTARTS AGE
    3. kafka-server-entity-operator-568957ff84-nmtlw 3/3 Running 0 8m42s
    4. kafka-server-kafka-0 1/1 Running 0 9m13s
    5. kafka-server-zookeeper-0 1/1 Running 0 9m46s
    6. strimzi-cluster-operator-687fdd6f77-cwmgm 1/1 Running 0 11m
  5. Run the following commands to view the metadata of the Kafka cluster.

    1. # Starts a utility pod.
    2. $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm
    3. # Checks metadata of the Kafka cluster.
    4. $ kafkacat -L -b kafka-server-kafka-brokers:9092

Create Functions

  1. Use the following example YAML file to create a manifest kafka-input.yaml and modify the value of spec.image to set your own image registry address. The field spec.serving.inputs defines an input source that points to a Dapr component of the Kafka server. It means that the kafka-input function will be driven by events in the topic sample-topic of the Kafka server.

    1. apiVersion: core.openfunction.io/v1beta2
    2. kind: Function
    3. metadata:
    4. name: kafka-input
    5. spec:
    6. version: "v1.0.0"
    7. image: <your registry name>/kafka-input:latest
    8. imageCredentials:
    9. name: push-secret
    10. build:
    11. builder: openfunction/builder-go:latest
    12. env:
    13. FUNC_NAME: "HandleKafkaInput"
    14. FUNC_CLEAR_SOURCE: "true"
    15. srcRepo:
    16. url: "https://github.com/OpenFunction/samples.git"
    17. sourceSubPath: "functions/async/bindings/kafka-input"
    18. revision: "main"
    19. serving:
    20. scaleOptions:
    21. minReplicas: 0
    22. maxReplicas: 10
    23. keda:
    24. triggers:
    25. - type: kafka
    26. metadata:
    27. topic: sample-topic
    28. bootstrapServers: kafka-server-kafka-brokers.default.svc:9092
    29. consumerGroup: kafka-input
    30. lagThreshold: "20"
    31. scaledObject:
    32. pollingInterval: 15
    33. cooldownPeriod: 60
    34. advanced:
    35. horizontalPodAutoscalerConfig:
    36. behavior:
    37. scaleDown:
    38. stabilizationWindowSeconds: 45
    39. policies:
    40. - type: Percent
    41. value: 50
    42. periodSeconds: 15
    43. scaleUp:
    44. stabilizationWindowSeconds: 0
    45. triggers:
    46. dapr:
    47. - name: target-topic
    48. type: bindings.kafka
    49. bindings:
    50. target-topic:
    51. type: bindings.kafka
    52. version: v1
    53. metadata:
    54. - name: brokers
    55. value: "kafka-server-kafka-brokers:9092"
    56. - name: topics
    57. value: "sample-topic"
    58. - name: consumerGroup
    59. value: "kafka-input"
    60. - name: publishTopic
    61. value: "sample-topic"
    62. - name: authRequired
    63. value: "false"
    64. template:
    65. containers:
    66. - name: function
    67. imagePullPolicy: Always
  2. Run the following command to create the function kafka-input.

    1. kubectl apply -f kafka-input.yaml
  3. Use the following example YAML file to create a manifest function-front.yaml and modify the value of spec.image to set your own image registry address.

  1. apiVersion: core.openfunction.io/v1beta2
  2. kind: Function
  3. metadata:
  4. name: function-front
  5. spec:
  6. version: "v1.0.0"
  7. image: "<your registry name>/sample-knative-dapr:latest"
  8. imageCredentials:
  9. name: push-secret
  10. build:
  11. builder: openfunction/builder-go:latest
  12. env:
  13. FUNC_NAME: "ForwardToKafka"
  14. FUNC_CLEAR_SOURCE: "true"
  15. srcRepo:
  16. url: "https://github.com/OpenFunction/samples.git"
  17. sourceSubPath: "functions/knative/with-output-binding"
  18. revision: "main"
  19. serving:
  20. hooks:
  21. pre:
  22. - plugin-custom
  23. - plugin-example
  24. post:
  25. - plugin-example
  26. - plugin-custom
  27. scaleOptions:
  28. minReplicas: 0
  29. maxReplicas: 5
  30. outputs:
  31. - dapr:
  32. name: kafka-server
  33. operation: "create"
  34. bindings:
  35. kafka-server:
  36. type: bindings.kafka
  37. version: v1
  38. metadata:
  39. - name: brokers
  40. value: "kafka-server-kafka-brokers:9092"
  41. - name: authRequired
  42. value: "false"
  43. - name: publishTopic
  44. value: "sample-topic"
  45. - name: topics
  46. value: "sample-topic"
  47. - name: consumerGroup
  48. value: "function-front"
  49. template:
  50. containers:
  51. - name: function
  52. imagePullPolicy: Always

Note

metadata.plugins.pre defines the order of plugins that need to be called before the user function is executed. metadata.plugins.post defines the order of plugins that need to be called after the user function is executed. For more information about the logic of these two plugins and the effect of the plugins after they are executed, see Plugin mechanism.

  1. In the manifest, spec.serving.outputs defines an output that points to a Dapr component of the Kafka server. That allows you to send custom content to the output target in the function function-front.

    1. func Sender(ctx ofctx.Context, in []byte) (ofctx.Out, error) {
    2. ...
    3. _, err := ctx.Send("target", greeting)
    4. ...
    5. }
  2. Run the following command to create the function function-front.

    1. kubectl apply -f function-front.yaml

Check Results

  1. Run the following command to view the status of the functions.

    1. $ kubectl get functions.core.openfunction.io
    2. NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE
    3. function-front Succeeded Running builder-bhbtk serving-vc6jw https://openfunction.io/default/function-front 2m41s
    4. kafka-input Succeeded Running builder-dprfd serving-75vrt 2m21s

    Note

    The URL, provided by the OpenFunction Domain, is the address that can be accessed. To access the function through this URL address, you need to make sure that DNS can resolve this address.

  2. Run the following command to create a pod in the cluster for accessing the function.

    1. kubectl run curl --image=radial/busyboxplus:curl -i --tty --rm
  3. Run the following command to access the function through URL.

    1. [ root@curl:/ ]$ curl -d '{"message":"Awesome OpenFunction!"}' -H "Content-Type: application/json" -X POST http://openfunction.io.svc.cluster.local/default/function-front
  4. Run the following command to view the log of function-front.

    1. kubectl logs -f \
    2. $(kubectl get po -l \
    3. openfunction.io/serving=$(kubectl get functions function-front -o jsonpath='{.status.serving.resourceRef}') \
    4. -o jsonpath='{.items[0].metadata.name}') \
    5. function

    The output looks as follows.

    1. dapr client initializing for: 127.0.0.1:50001
    2. I0125 06:51:55.584973 1 framework.go:107] Plugins for pre-hook stage:
    3. I0125 06:51:55.585044 1 framework.go:110] - plugin-custom
    4. I0125 06:51:55.585052 1 framework.go:110] - plugin-example
    5. I0125 06:51:55.585057 1 framework.go:115] Plugins for post-hook stage:
    6. I0125 06:51:55.585062 1 framework.go:118] - plugin-custom
    7. I0125 06:51:55.585067 1 framework.go:118] - plugin-example
    8. I0125 06:51:55.585179 1 knative.go:46] Knative Function serving http: listening on port 8080
    9. 2022/01/25 06:52:02 http - Data: {"message":"Awesome OpenFunction!"}
    10. I0125 06:52:02.246450 1 plugin-example.go:83] the sum is: 2
  5. Run the following command to view the log of kafka-input.

    1. kubectl logs -f \
    2. $(kubectl get po -l \
    3. openfunction.io/serving=$(kubectl get functions kafka-input -o jsonpath='{.status.serving.resourceRef}') \
    4. -o jsonpath='{.items[0].metadata.name}') \
    5. function

    The output looks as follows.

    1. dapr client initializing for: 127.0.0.1:50001
    2. I0125 06:35:28.332381 1 framework.go:107] Plugins for pre-hook stage:
    3. I0125 06:35:28.332863 1 framework.go:115] Plugins for post-hook stage:
    4. I0125 06:35:28.333749 1 async.go:39] Async Function serving grpc: listening on port 8080
    5. message from Kafka '{Awesome OpenFunction!}'