关联已存在的 Ckafka Topic

该例子中,我们通过 serverless.yml 创建了一个名为 ckafka_trigger 的 CKafka 触发器,并且关联了一个名为 test 的 CKafka Topic,每次这个 Topic 收到消息后,函数都会被调用。详情可以参考 CKafka 触发器概述

  1. functions:
  2. hello_world:
  3. handler: index.main_handler
  4. runtime: Nodejs8.9
  5. events:
  6. - ckafka:
  7. name: ckafka_trigger
  8. parameters:
  9. name: ckafka-2o10hua5 # ckafka-id
  10. topic: test
  11. maxMsgNum: 999
  12. offset: latest
  13. enable: true

注: CKafka 触发器默认开启,SCF 的后台模块作为消费者,连接 CKafka 实例并消费消息。

CKafka 触发器的事件消息结构

在指定的 CKafka Topic 接收到消息时,云函数的后台消费者模块会消费到消息,并将消息组装为类似以下的 JSON 格式事件,触发绑定的函数并将数据内容作为入参传递给函数。

  1. {
  2. "Records": [
  3. {
  4. "Ckafka": {
  5. "topic": "test-topic",
  6. "partition": 1,
  7. "offset": 36,
  8. "msgKey": "None",
  9. "msgBody": "Hello from Ckafka!"
  10. }
  11. },
  12. {
  13. "Ckafka": {
  14. "topic": "test-topic",
  15. "partition": 1,
  16. "offset": 37,
  17. "msgKey": "None",
  18. "msgBody": "Hello from Ckafka again!"
  19. }
  20. }
  21. ]
  22. }