利用 Function 函数计算服务进行消息处理

声明

  • 本文测试所用设备系统为 Ubuntu 18.04
  • python 版本为 3.6,2.7 版本配置流程相同,但需要在 python 脚本中注意语言差异
  • 模拟 MQTT client 行为的客户端为 MQTTBox
  • 本文所用镜像为依赖 Baetyl 源码自行编译所得,具体请查看 如何从源码构建镜像提示:Darwin 系统可以通过源码安装 Baetyl,可参考 源码编译 Baetyl

与基于 Hub 服务实现设备间消息转发不同的是,本文主要介绍利用本地函数计算服务进行消息处理。其中 Hub 服务用于建立 Baetyl 与 MQTT 客户端之间的连接,Python 运行时服务用于处理 MQTT 消息,而本地函数计算服务则通过 MQTT 消息上下文衔接本地 Hub 服务与 Python 运行时服务。

本文将以 TCP 连接方式为例,展示本地函数计算服务的消息处理、计算功能。

操作流程

  • 步骤一:安装 Baetyl,并导入示例配置包。参考 快速安装 Baetyl 进行操作;
  • 步骤二:依据测试需求修改导入的配置信息,执行 sudo systemctl start baetyl 以容器模式启动 Baetyl,然后执行 sudo systemctl status baetyl 来查看 Baetyl 是否正常运行。如果 Baetyl 已经启动,执行 sudo systemctl start baetyl 重启来加载新的配置。
  • 步骤三:通过 MQTTBox 以 TCP 方式与 Baetyl Hub 服务 建立连接
    • 若成功与 Hub 服务建立连接,则依据配置的主题权限信息向有权限的主题发布消息,同时向拥有订阅权限的主题订阅消息,并观察 Baetyl 日志信息;
      • 若 Baetyl 日志显示已经启动 Python 运行时服务,则表明发布的消息受到了预期的函数处理;
      • 若 Baetyl 日志显示未成功启动 Python 运行时服务,则重复上述步骤,直至看到 Baetyl 主程序成功启动了 Python 运行时服务。
    • 若与 Baetyl Hub 建立连接失败,则重复步骤三操作,直至 MQTTBox 与 Baetyl Hub 服务成功建立连接为止。
  • 步骤四:通过 MQTTBox 查看对应主题消息的收发状态。../_images/python-flow.png基于本地函数计算服务实现设备消息处理流程

消息处理测试

依据 步骤一 导入示例配置包后,确认一下应用配置、 Hub 服务配置以及函数计算服务配置。

将 Baetyl 应用配置改成如下配置:

  1. # /usr/local/var/db/baetyl/application.yml
  2. version: v0
  3. services:
  4. - name: localhub
  5. image: hub.baidubce.com/baetyl/baetyl-hub
  6. replica: 1
  7. ports:
  8. - 1883:1883
  9. mounts:
  10. - name: localhub-conf
  11. path: etc/baetyl
  12. readonly: true
  13. - name: localhub-data
  14. path: var/db/baetyl/data
  15. - name: localhub-log
  16. path: var/log/baetyl
  17. - name: function-manager
  18. image: hub.baidubce.com/baetyl/baetyl-function-manager
  19. replica: 1
  20. mounts:
  21. - name: function-manager-conf
  22. path: etc/baetyl
  23. readonly: true
  24. - name: function-manager-log
  25. path: var/log/baetyl
  26. - name: function-python27-sayhi
  27. image: hub.baidubce.com/baetyl/baetyl-function-python27
  28. replica: 0
  29. mounts:
  30. - name: function-sayhi-conf
  31. path: etc/baetyl
  32. readonly: true
  33. - name: function-sayhi-code
  34. path: var/db/baetyl/function-sayhi
  35. readonly: true
  36. - name: function-python36-sayhi
  37. image: hub.baidubce.com/baetyl/baetyl-function-python36
  38. replica: 0
  39. mounts:
  40. - name: function-sayhi-conf
  41. path: etc/baetyl
  42. readonly: true
  43. - name: function-sayhi-code
  44. path: var/db/baetyl/function-sayhi
  45. readonly: true
  46. - name: function-node85-sayhi
  47. image: hub.baidubce.com/baetyl/baetyl-function-node85
  48. replica: 0
  49. mounts:
  50. - name: function-sayjs-conf
  51. path: etc/baetyl
  52. readonly: true
  53. - name: function-sayjs-code
  54. path: var/db/baetyl/function-sayhi
  55. readonly: true
  56. - name: function-sql-filter
  57. image: hub.baidubce.com/baetyl/baetyl-function-sql
  58. replica: 0
  59. mounts:
  60. - name: function-filter-conf
  61. path: etc/baetyl
  62. readonly: true
  63. volumes:
  64. # hub
  65. - name: localhub-conf
  66. path: var/db/baetyl/localhub-conf
  67. - name: localhub-data
  68. path: var/db/baetyl/localhub-data
  69. - name: localhub-cert
  70. path: var/db/baetyl/localhub-cert-only-for-test
  71. - name: localhub-log
  72. path: var/db/baetyl/localhub-log
  73. # function
  74. - name: function-manager-conf
  75. path: var/db/baetyl/function-manager-conf
  76. - name: function-manager-log
  77. path: var/db/baetyl/function-manager-log
  78. - name: function-sayhi-conf
  79. path: var/db/baetyl/function-sayhi-conf
  80. - name: function-sayhi-code
  81. path: var/db/baetyl/function-sayhi-code
  82. - name: function-sayjs-conf
  83. path: var/db/baetyl/function-sayjs-conf
  84. - name: function-sayjs-code
  85. path: var/db/baetyl/function-sayjs-code
  86. - name: function-filter-conf
  87. path: var/db/baetyl/function-filter-conf

Baetyl Hub 服务配置改成如下配置:

  1. # /usr/local/var/db/baetyl/localhub-conf/service.yml
  2. listen:
  3. - tcp://0.0.0.0:1883
  4. principals:
  5. - username: test
  6. password: hahaha
  7. permissions:
  8. - action: 'pub'
  9. permit: ['#']
  10. - action: 'sub'
  11. permit: ['#']
  12. subscriptions:
  13. - source:
  14. topic: 't'
  15. target:
  16. topic: 't/topic'
  17. logger:
  18. path: var/log/baetyl/service.log
  19. level: "debug"

Baetyl 本地函数计算服务相关配置无需修改,具体配置如下:

  1. # /usr/local/var/db/baetyl/function-manager-conf/service.yml
  2. hub:
  3. address: tcp://localhub:1883
  4. username: test
  5. password: hahaha
  6. rules:
  7. - clientid: func-python27-sayhi-1
  8. subscribe:
  9. topic: t
  10. function:
  11. name: python27-sayhi
  12. publish:
  13. topic: t/py2hi
  14. - clientid: func-sql-filter-1
  15. subscribe:
  16. topic: t
  17. qos: 1
  18. function:
  19. name: sql-filter
  20. publish:
  21. topic: t/sqlfilter
  22. qos: 1
  23. - clientid: func-python36-sayhi-1
  24. subscribe:
  25. topic: t
  26. function:
  27. name: python36-sayhi
  28. publish:
  29. topic: t/py3hi
  30. - clientid: func-node85-sayhi-1
  31. subscribe:
  32. topic: t
  33. function:
  34. name: node85-sayhi
  35. publish:
  36. topic: t/node8hi
  37. functions:
  38. - name: python27-sayhi
  39. service: function-python27-sayhi
  40. instance:
  41. min: 0
  42. max: 10
  43. - name: sql-filter
  44. service: function-sql-filter
  45. - name: python36-sayhi
  46. service: function-python36-sayhi
  47. - name: node85-sayhi
  48. service: function-node85-sayhi
  49. logger:
  50. path: var/log/baetyl/service.log
  51. level: "debug"
  52.  
  53. # /usr/local/var/db/baetyl/function-filter-conf/service.yml
  54. functions:
  55. - name: sql-filter
  56. handler: 'select qos() as qos, topic() as topic, * where id < 10'
  57.  
  58. # /usr/local/var/db/baetyl/function-sayhi-conf/service.yml
  59. functions:
  60. - name: 'python27-sayhi'
  61. handler: 'index.handler'
  62. codedir: 'var/db/baetyl/function-sayhi'
  63. - name: 'python36-sayhi'
  64. handler: 'index.handler'
  65. codedir: 'var/db/baetyl/function-sayhi'
  66.  
  67. # /usr/local/var/db/baetyl/function-sayjs-conf/service.yml
  68. functions:
  69. - name: 'node85-sayhi'
  70. handler: 'index.handler'
  71. codedir: 'var/db/baetyl/function-sayhi'

Python 函数代码无需修。/usr/local/var/db/baetyl/function-sayhi-code/index.py 实现如下:

  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. """
  4. function to say hi in python
  5. """
  6.  
  7. def handler(event, context):
  8. """
  9. function handler
  10. """
  11. res = {}
  12. if isinstance(event, dict):
  13. if "err" in event:
  14. raise TypeError(event['err'])
  15. res = event
  16. elif isinstance(event, bytes):
  17. res['bytes'] = event.decode("utf-8")
  18.  
  19. if 'messageQOS' in context:
  20. res['messageQOS'] = context['messageQOS']
  21. if 'messageTopic' in context:
  22. res['messageTopic'] = context['messageTopic']
  23. if 'messageTimestamp' in context:
  24. res['messageTimestamp'] = context['messageTimestamp']
  25. if 'functionName' in context:
  26. res['functionName'] = context['functionName']
  27. if 'functionInvokeID' in context:
  28. res['functionInvokeID'] = context['functionInvokeID']
  29.  
  30. res['Say'] = 'Hello Baetyl'
  31. return res

Node 函数代码无需修。/usr/local/var/db/baetyl/function-sayjs-code/index.js 实现如下:

  1. #!/usr/bin/env node
  2.  
  3. const hasAttr = (obj, attr) => {
  4. if (obj instanceof Object && !(obj instanceof Array)) {
  5. if (obj[attr] != undefined) {
  6. return true;
  7. }
  8. }
  9. return false;
  10. };
  11.  
  12. const passParameters = (event, context) => {
  13. if (hasAttr(context, 'messageQOS')) {
  14. event['messageQOS'] = context['messageQOS'];
  15. }
  16. if (hasAttr(context, 'messageTopic')) {
  17. event['messageTopic'] = context['messageTopic'];
  18. }
  19. if (hasAttr(context, 'messageTimestamp')) {
  20. event['messageTimestamp'] = context['messageTimestamp'];
  21. }
  22. if (hasAttr(context, 'functionName')) {
  23. event['functionName'] = context['functionName'];
  24. }
  25. if (hasAttr(context, 'functionInvokeID')) {
  26. event['functionInvokeID'] = context['functionInvokeID'];
  27. }
  28. };
  29.  
  30. exports.handler = (event, context, callback) => {
  31. // support Buffer & json object
  32. if (Buffer.isBuffer(event)) {
  33. const message = event.toString();
  34. event = {}
  35. event["bytes"] = message;
  36. }
  37. else if("err" in event) {
  38. return callback(new TypeError(event['err']))
  39. }
  40.  
  41. passParameters(event, context);
  42. event['Say'] = 'Hello Baetyl'
  43. callback(null, event);
  44. };

如上配置,假若 MQTTBox 基于上述配置信息已与 Hub 服务建立连接,向 Hub 发送主题为 t 的消息,函数计算服务会降消息分别路由给 python27-sayhipython36-sayhinode85-sayhisql-filter 函数处理,并分别输出主题为 t/py2hit/py3hit/node8hit/sqlfilter 的消息。这时订阅主题 # 的 MQTT client 将会接收到这这些消息,以及原消息 t 和 Hub 服务直接转主题的消息 t/topic

提示:凡是在 rules 消息路由配置项中出现、用到的函数,必须在 functions 配置项中进行函数实例的配置,否则无法正常启动函数运行时实例。

Baetyl 启动

依据 步骤二,执行 sudo systemctl start baetyl 以容器模式启动 Baetyl,如果 Baetyl 已经启动,执行 sudo systemctl restart baetyl 来重启。

提示:Darwin 系统通过源码安装 Baetyl,可执行 sudo baetyl start 以容器模式启动 Baetyl。

查看 Baetyl 主程序的日志,执行 sudo tail -f -n 40 /usr/local/var/log/baetyl/baetyl.log 显示如下:

../_images/function-start-log.pngBaetyl 加载、启动日志

同样,我们也可以通过执行命令 docker ps 查看系统当前正在运行的 docker 容器列表,具体如下图示。

../_images/docker-ps1.png通过 docker ps 命令查看系统当前运行 docker 容器列表

经过对比,不难发现,本次 Baetyl 启动时已经成功加载了 Hub 服务和函数计算服务,函数运行时服务实例并没有启动,因为函数运行时服务实例在有消息触发时才会动态创建。

MQTTBox 建立连接

本次测试中,我们采用 TCP 连接方式对 MQTTBox 进行连接信息配置,然后点击 Add subscriber 按钮订阅主题 # ,该主题用于接收所有 Hub 服务收到的消息。

消息处理验证

通过查看 /usr/local/var/db/baetyl/function-sayhi-code/index.py 代码文件可以发现,在接收到某字典类格式的消息后,函数 handler 会对其进行一系列处理,然后将处理结果返回。返回的结果中包括各种追加的上下文信息,比如 messageTopicfunctionName 等。

这里,我们通过 MQTTBox 将消息 {"id":1} 发布给主题 t ,然后观察 MQTTBox 接收到的消息如下。

../_images/mqttbox-tcp-process-success.pngMQTTBox 接收消息

发送消息后,我们快速执行命令 docker ps 查看系统当前正在运行的容器列表,所有函数运行时服务实例都被启动了,其结果如下图示。

../_images/docker-ps-after-trigger.png通过 docker ps 命令查看系统当前正在运行的容器列表

综上,我们通过 Hub 服务和函数计算服务模拟了消息在本地处理的过程,可以看出该框架非常适合用于边缘处理消息流。