测试前准备

声明

- 本文测试所用设备系统为MacOS- 模拟MQTT client行为的客户端为MQTTBOX- 本文所用镜像为依赖OpenEdge源码自行编译所得,具体请查看如何从源码构建镜像

与基于本地Hub模块实现设备间消息转发不同的是,本文在Hub模块的基础上,引入Function函数计算模块及具体执行所需的Python27 runtime模块,将接收到的消息交给Python27 runtime来处理(Python27 runtime会调用具体的函数脚本来执行具体计算、分析、处理等),然后将处理结果以主题方式反馈给Hub模块,最终订阅该主题的MQTT client(要求MQTT client事先订阅该主题)将会收到该处理结果。

本地Hub模块的配置项信息不再赘述,详情查看使用Hub进行设备间消息转发,这里主要介绍新引入的Function函数计算模块相关配置(Python函数的运行环境构建参考OpenEdge设计),完整的配置参考函数计算模块配置

提示:凡是在rules消息路由配置项中出现、用到的函数,必须在functions配置项中进行函数执行具体配置,否则将不予启动。

本文将以TCP连接方式为例,展示本地Function模块的消息处理、计算功能。

操作流程

  • Step1:依据使用需求编写配置文件信息,然后以Docker容器模式启动OpenEdge可执行程序;
  • Step2:通过MQTTBOX以TCP方式与OpenEdge Hub建立连接
    • 若成功与OpenEdge Hub模块建立连接,则依据配置的主题权限信息向有权限的主题发布消息,同时向拥有订阅权限的主题订阅消息,并观察OpenEdge日志信息;
      • 若OpenEdge日志显示已经启动Python runtime模块,则表明发布的消息受到了预期的函数处理;
      • 若OpenEdge日志显示未成功启动Python runtime模块,则重复上述操作,直至看到OpenEdge主程序成功启动了Python runtime模块。
    • 若与OpenEdge Hub建立连接失败,则重复Step2操作,直至MQTTBOX与OpenEdge Hub成功建立连接为止。
  • Step3:通过MQTTBOX查看对应主题消息的收发状态。
    基于Function模块实现设备消息处理流程

消息路由测试

本文测试使用的本地Hub及Function模块的相关配置信息如下:

  1. name: localhub
  2. listen:
  3. - tcp://:1883
  4. principals:
  5. - username: 'test'
  6. password: 'be178c0543eb17f5f3043021c9e5fcf30285e557a4fc309cce97ff9ca6182912'
  7. permissions:
  8. - action: 'pub'
  9. permit: ['#']
  10. - action: 'sub'
  11. permit: ['#']
  12. # 本地Function模块配置:
  13. name: localfunc
  14. hub:
  15. address: tcp://localhub:1883
  16. username: test
  17. password: hahaha
  18. rules:
  19. - id: rule-e1iluuac1
  20. subscribe:
  21. topic: t
  22. qos: 1
  23. compute:
  24. function: sayhi
  25. publish:
  26. topic: t/hi
  27. qos: 1
  28. functions:
  29. - id: func-nyeosbbch
  30. name: 'sayhi'
  31. runtime: 'python27'
  32. handler: 'sayhi.handler'
  33. codedir: 'var/db/openedge/module/func-nyeosbbch'
  34. entry: "openedge-function-runtime-python27:build"
  35. env:
  36. USER_ID: acuiot
  37. instance:
  38. min: 0
  39. max: 10
  40. timeout: 1m

如上配置,假若MQTTBOX基于上述配置信息已与本地Hub模块建立连接,向主题“t”发送的消息将会交给“sayhi”函数处理,然后将处理结果以主题“t/hi”发布回Hub模块,这时订阅主题“t/hi”的MQTT client将会接收到这条处理后的消息。

OpenEdge 启动

Step1所述,以Docker容器模式启动OpenEdge,通过观察OpenEdge启动日志可以发现Hub、Function模块均已被成功加载,具体如下图示。

OpenEdge加载、启动日志

同样,我们也可以通过执行命令docker ps查看系统当前正在运行的docker容器列表,具体如下图示。

通过`docker ps`命令查看系统当前运行docker容器列表

经过对比,不难发现,本次OpenEdge启动时已经成功加载了Hub、Function两个容器模块。

MQTTBOX 建立连接

本次测试中,我们采用TCP连接方式对MQTTBOX进行连接信息配置,然后点击“Add subscriber”按钮订阅主题“t/hi”,该主题用于接收经python函数“sayhi”处理之后的结果数据,具体如下图示。

MQTTBOX连接配置

上图显示,MQTTBOX已经成功订阅了主题“t/hi”。

消息路由验证

根据上文所述,这里我们利用python函数“sayhi”对主题“t”的消息进行处理,并将结果反馈给主题“t/hi”。那么,首先,需要获悉的就是处理函数“sayhi.py”的具体信息,具体如下示:

  1. #!/usr/bin/env python
  2. #-*- coding:utf-8 -*-
  3. """
  4. module to say hi
  5. """
  6. import os
  7. import time
  8. import threading
  9. def handler(event, context):
  10. """
  11. function handler
  12. """
  13. event['USER_ID'] = os.environ['USER_ID']
  14. event['functionName'] = context['functionName']
  15. event['functionInvokeID'] = context['functionInvokeID']
  16. event['invokeid'] = context['invokeid']
  17. event['messageQOS'] = context['messageQOS']
  18. event['messageTopic'] = context['messageTopic']
  19. event['py'] = '你好,世界!'
  20. return event

可以发现,在接收到某Json格式的消息后,函数“sayhi.py”会对其进行一系列处理,然后将处理结果返回。返回的结果中包括:环境变量“USER_ID”、函数名称“functionName”、函数调用ID“functionInvokeID”、输入消息主题“messageTopic”、输入消息消息QoS“messageQOS”等字段。

这里,我们通过MQTTBOX将消息“{"id":10}”发布给主题“t”,然后观察主题“t/hi”的接收消息情况,具体如下图示。

MQTTBOX成功接收到经python函数处理之后的消息,且结果与上面的分析结果吻合。由此,我们完成了消息路由的处理测试。

此外,我们这时可以观察OpenEdge的日志及再次执行命令docker ps查看系统当前正在运行的容器列表,其结果如下图示。

运用python函数处理消息时OpenEdge日志

通过docker ps命令查看系统当前正在运行的容器列表

从上述两张图片中可以看出,除了OpenEdge启动时已加载的Hub、Function模块容器,在利用python函数“sayhi.py”对主题“t”消息进行处理时,系统还启动、并运行了Python Runtime模块,其主要用于对消息作运行时处理(各类模块加载、启动细节可参见OpenEdge设计)。

最后更新于 2018-12-28 10:23:09

原文: https://openedge.tech/docs/tutorials/local/Message-processing-with-function-module