DynamoDB

通过 DynamoDB 数据桥接可以将 MQTT 消息和客户端事件存储到 DynamoDB 中,也可以通过事件触发对 DynamoDB 中数据的更新或删除操作,从而实现对诸如设备在线状态、上下线历史等的记录。

提示

EMQX 企业版功能。EMQX 企业版可以为您带来更全面的关键业务场景覆盖、更丰富的数据集成支持,更高的生产级可靠性保证以及 24/7 的全球技术支持,欢迎免费试用DynamoDB - 图1 (opens new window)

提示

本章节内容同样适用于 TimescaleDB 以及 MatrixDB。

前置条件

特性

快速开始教程

本节介绍如何配置 DynamoDB 数据桥接,包括如何设置 DynamoDB 服务器、创建数据桥接和规则以将数据转发到 RocketMQ、以及如何测试数据桥接和规则。

本教程假定您在本地机器上同时运行 EMQX 和 DynamoDB。如果您在远程运行 DynamoDB 和 EMQX,请相应地调整设置。

安装 DynamoDB 本地服务器

  1. 准备一份 docker-compose 文件 dynamo.yaml 来部署 DynamoDB 本地服务器。
  1. version: '3.8'
  2. services:
  3. dynamo:
  4. command: "-jar DynamoDBLocal.jar -sharedDb"
  5. image: "amazon/dynamodb-local:latest"
  6. container_name: dynamo
  7. ports:
  8. - "8000:8000"
  9. environment:
  10. AWS_ACCESS_KEY_ID: root
  11. AWS_SECRET_ACCESS_KEY: public
  12. AWS_DEFAULT_REGION: us-west-2
  1. 通过这份文件启动 DynamoDB 服务器。
  1. docker-compose -f dynamo.yaml up
  1. 准备一份数据表定义文件,将其存放到 home 目录,并取名为 mqtt_msg.json
  1. {
  2. "TableName": "mqtt_msg",
  3. "KeySchema": [
  4. { "AttributeName": "id", "KeyType": "HASH" }
  5. ],
  6. "AttributeDefinitions": [
  7. { "AttributeName": "id", "AttributeType": "S" }
  8. ],
  9. "ProvisionedThroughput": {
  10. "ReadCapacityUnits": 5,
  11. "WriteCapacityUnits": 5
  12. }
  13. }
  1. 通过这份文件创建一个新的表。
  1. docker run --rm -v ${HOME}:/dynamo_data -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb create-table --cli-input-json file:///dynamo_data/mqtt_msg.json --endpoint-url http://host.docker.internal:8000
  1. 通过下面的指令检查创建是否成功。
  1. docker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb list-tables --endpoint-url http://host.docker.internal:8000

如果创建成功了,我们将会看到下面的输出。

  1. {
  2. "TableNames": [
  3. "mqtt_msg"
  4. ]
  5. }

创建 DynamoDB 数据桥接

  1. 转到 Dashboard 数据集成 -> 数据桥接页面。

  2. 点击页面右上角的创建

  3. 在数据桥接类型中选择 DynamoDB,点击下一步

  4. 输入数据桥接名称,要求是大小写英文字母和数字组合。

  5. 输入 DynamoDB 连接信息:

    • DynamoDB 地址:填写 http://127.0.0.1:8000
    • 表名:填写 mqtt_msg
    • 连接访问 ID: 填写 root
    • 连接访问密钥:填写 public
  6. 模版设置为默认值,即为空;模版为空时将会将整个消息转发给 RocketMQ,实际值为 JSON 模版数据。

  7. 高级配置(可选),根据情况配置同步/异步模式,队列与批量等参数,详细请参考数据桥接简介

  8. 在完成创建之前,您可以点击测试连接来测试桥接可以连接到 DynamoDB 服务器。

  9. 点击创建按钮完成数据桥接创建。

    在弹出的创建成功对话框中您可以点击创建规则,继续创建规则以指定需要写入 DynamoDB 的数据。您也可以按照创建 DynamoDB 数据桥接规则章节的步骤来创建规则。

创建 DynamoDB 数据桥接规则

至此您已经完成数据桥接创建,接下来将继续创建一条规则来指定需要写入的数据。您需要为消息转发和设备上下线记录创建两条不同的规则。

  1. 转到 Dashboard 数据集成 -> 规则页面。

  2. 点击页面右上角的创建

  3. 输入规则 ID my_rule,在 SQL 编辑器中根据业务实现需要输入规则:

    • 如需实现对指定主题消息的转发,例如将 t/# 主题的 MQTT 消息转发至 RocketMQ,输入以下 SQL 语法:

      注意:如果您希望制定自己的 SQL 语法,需要确保规则选出的字段(SELECT 部分)包含所有 SQL 模板中用到的变量。

      1. SELECT
      2. *
      3. FROM
      4. "t/#"
    • 如需实现设备上下线记录,输入以下 SQL 语法:

      1. SELECT
      2. str(event) + timestamp as id, *
      3. FROM
      4. "$events/client_connected", "$events/client_disconnected"

      提示

      为了演示方便,上下线消息也设置为复用 mqtt_msg 表。

  4. 点击添加动作,在动作下拉框中选择使用数据桥接转发选项,选择先前创建好的 DynamoDB 数据桥接。点击添加

  5. 点击最下方创建按钮完成规则创建。

至此您已经完成整个创建过程,可以前往 数据集成 -> Flows 页面查看拓扑图,此时应当看到 t/# 主题的消息经过名为 my_rule 的规则处理,处理结果交由 DynamoDB 存储。

测试数据桥接与规则

使用 MQTTX 向 t/1 主题发布消息,此操作同时会触发上下线事件:

  1. mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello DynamoDB" }'

分别查看两个数据桥接运行统计,命中、发送成功次数均 +1。

查看数据是否已经写入表中,mqtt_msg 表:

  1. docker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb scan --table-name=mqtt_msg --endpoint-url http://host.docker.internal:8000

输出如下:

  1. {
  2. "Items": [
  3. {
  4. "metadata": {
  5. "S": "{\"rule_id\":\"90d98f59\"}"
  6. },
  7. "peerhost": {
  8. "S": "127.0.0.1"
  9. },
  10. "clientid": {
  11. "S": "emqx_c"
  12. },
  13. "flags": {
  14. "S": "{\"retain\":false,\"dup\":false}"
  15. },
  16. "node": {
  17. "S": "emqx@127.0.0.1"
  18. },
  19. "qos": {
  20. "N": "0"
  21. },
  22. "payload": {
  23. "S": "{ \"msg\": \"hello DynamoDB\" }"
  24. },
  25. "pub_props": {
  26. "S": "{\"User-Property\":{}}"
  27. },
  28. "publish_received_at": {
  29. "N": "1678263363503"
  30. },
  31. "topic": {
  32. "S": "t/1"
  33. },
  34. "id": {
  35. "S": "0005F65F239F03FEF44300000BB40002"
  36. },
  37. "event": {
  38. "S": "message.publish"
  39. },
  40. "username": {
  41. "S": "undefined"
  42. },
  43. "timestamp": {
  44. "N": "1678263363503"
  45. }
  46. },
  47. {
  48. "conn_props": {
  49. "S": "{\"User-Property\":{},\"Request-Problem-Information\":1}"
  50. },
  51. "peername": {
  52. "S": "127.0.0.1:59582"
  53. },
  54. "metadata": {
  55. "S": "{\"rule_id\":\"703890a5\"}"
  56. },
  57. "clientid": {
  58. "S": "emqx_c"
  59. },
  60. "is_bridge": {
  61. "S": "false"
  62. },
  63. "keepalive": {
  64. "N": "30"
  65. },
  66. "proto_ver": {
  67. "N": "5"
  68. },
  69. "proto_name": {
  70. "S": "MQTT"
  71. },
  72. "connected_at": {
  73. "N": "1678263363499"
  74. },
  75. "receive_maximum": {
  76. "N": "32"
  77. },
  78. "sockname": {
  79. "S": "127.0.0.1:1883"
  80. },
  81. "mountpoint": {
  82. "S": "undefined"
  83. },
  84. "node": {
  85. "S": "emqx@127.0.0.1"
  86. },
  87. "id": {
  88. "S": "client.connected1678263363499"
  89. },
  90. "expiry_interval": {
  91. "N": "0"
  92. },
  93. "event": {
  94. "S": "client.connected"
  95. },
  96. "username": {
  97. "S": "undefined"
  98. },
  99. "timestamp": {
  100. "N": "1678263363499"
  101. },
  102. "clean_start": {
  103. "S": "true"
  104. }
  105. },
  106. {
  107. "reason": {
  108. "S": "normal"
  109. },
  110. "peername": {
  111. "S": "127.0.0.1:59582"
  112. },
  113. "metadata": {
  114. "S": "{\"rule_id\":\"703890a5\"}"
  115. },
  116. "clientid": {
  117. "S": "emqx_c"
  118. },
  119. "proto_ver": {
  120. "N": "5"
  121. },
  122. "proto_name": {
  123. "S": "MQTT"
  124. },
  125. "sockname": {
  126. "S": "127.0.0.1:1883"
  127. },
  128. "disconn_props": {
  129. "S": "{\"User-Property\":{}}"
  130. },
  131. "node": {
  132. "S": "emqx@127.0.0.1"
  133. },
  134. "id": {
  135. "S": "client.disconnected1678263363503"
  136. },
  137. "event": {
  138. "S": "client.disconnected"
  139. },
  140. "disconnected_at": {
  141. "N": "1678263363503"
  142. },
  143. "username": {
  144. "S": "undefined"
  145. },
  146. "timestamp": {
  147. "N": "1678263363503"
  148. }
  149. }
  150. ],
  151. "Count": 3,
  152. "ScannedCount": 3,
  153. "ConsumedCapacity": null
  154. }