Ingest Data into DynamoDB

EMQX supports integration with DynamoDB, so you can save MQTT messages and client events to DynamoDB, or use events to trigger the update or removal of data to record the online status or online/offline of clients.

TIP

EMQX Enterprise Edition features. EMQX Enterprise Edition provides comprehensive coverage of key business scenarios, rich data integration, product-level reliability, and 24/7 global technical support. Experience the benefits of this enterprise-ready MQTT messaging platformIngest Data into DynamoDB - 图1 (opens new window) today.

Prerequisites

Features List

Quick Start Tutorial

This section introduces how to configure the DynamoDB data bridge, covering topics like how to set up the DynamoDB server, create data bridges and rules for forwarding data to DynamoDB and test the data bridges and rules.

This tutorial assumes that you run both EMQX and DynamoDB on the local machine. If you have Dynamo and EMQX running remotely, adjust the settings accordingly.

Install DynamoDB Local Server

  1. Prepare a docker-compose file, dynamo.yaml, to set up the Dynamodb local server.
  1. version: '3.8'
  2. services:
  3. dynamo:
  4. command: "-jar DynamoDBLocal.jar -sharedDb"
  5. image: "amazon/dynamodb-local:latest"
  6. container_name: dynamo
  7. ports:
  8. - "8000:8000"
  9. environment:
  10. AWS_ACCESS_KEY_ID: root
  11. AWS_SECRET_ACCESS_KEY: public
  12. AWS_DEFAULT_REGION: us-west-2
  1. Start the server.
  1. docker-compose -f dynamo.yaml up
  1. Prepare a table definition and save it to your home directory as mqtt_msg.json.
  1. {
  2. "TableName": "mqtt_msg",
  3. "KeySchema": [
  4. { "AttributeName": "id", "KeyType": "HASH" }
  5. ],
  6. "AttributeDefinitions": [
  7. { "AttributeName": "id", "AttributeType": "S" }
  8. ],
  9. "ProvisionedThroughput": {
  10. "ReadCapacityUnits": 5,
  11. "WriteCapacityUnits": 5
  12. }
  13. }
  1. Create a new table via this file.
  1. docker run --rm -v ${HOME}:/dynamo_data -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb create-table --cli-input-json file:///dynamo_data/mqtt_msg.json --endpoint-url http://host.docker.internal:8000
  1. Check if the table was created successfully.
  1. docker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb list-tables --endpoint-url http://host.docker.internal:8000

The following JSON will be printed if the table was created successfully.

  1. {
  2. "TableNames": [
  3. "mqtt_msg"
  4. ]
  5. }

Create a DynamoDB Data Bridge

  1. Go to EMQX Dashboard, and click Integration -> Data Bridge.

  2. Click Create on the top right corner of the page.

  3. In the Create Data Bridge page, click to select DynamoDB, and then click Next.

  4. Input a name for the data bridge. The name should be a combination of upper/lower case letters and numbers.

  5. Input the connection information:

    • Database Url: Input http://127.0.0.1:8000, or the actual URL if the DynamoDB server is running remotely.
    • Table Name: Input mqtt_msg.
    • AWS Access Key ID: Input root.
    • AWS Secret Access Key: Input public.
  6. Leave the Template empty by default.

    TIP

    When this value is empty the whole message will be stored in the database. The actual value is JSON template data.

  7. Advanced settings (optional): Choose whether to use sync or async query mode as needed. For details, see Data Integration.

  8. Before clicking Create, you can click Test Connectivity to test that the bridge can connect to the MySQL server.

  9. Then click Create to finish the creation of the data bridge.

    A confirmation dialog will appear and ask if you like to create a rule using this data bridge, you can click Create Rule to continue creating rules to specify the data to be saved into DynamoDB. You can also create rules by following the steps in Create Rules for DynamoDB Data Bridge.

Now the data bridge should appear in the data bridge list (Integration -> Data Bridge) with Resource Status as Connected.

Create a Rule for DynamoDB Data Bridge

Now that you have successfully created the data bridge to DynamoDB, you can continue to create rules to specify the data to be saved into DynamoDB. You need to create two different rules for messages forward and event records.

  1. Go to EMQX Dashboard, and click Integration -> Rules.

  2. Click Create on the top right corner of the page.

  3. Input my_rule as the rule ID, and set the rules in the SQL Editor based on the feature to use:

    • To create a rule for message storage, input the following statement, which means the MQTT messages under topic t/# will be saved to DynamoDB.

      Note: If you want to specify your own SQL syntax, make sure that you have included all fields required by the data bridge in the SELECT part.

      1. SELECT
      2. *
      3. FROM
      4. "t/#"
    • To create a rule for online/offline status recording, input the following statement:

      1. SELECT
      2. str(event) + timestamp as id, *
      3. FROM
      4. "$events/client_connected", "$events/client_disconnected"

      TIP

      For convenience, the mqtt_msg topic will be reused to receive online/offline events.

  4. Click the Add Action button, select Forwarding with Data Bridge from the dropdown list, and then select the data bridge you just created under Data Bridge. Click the Add button.

  5. Click the Create button to finish the setup.

Now you have successfully created the data bridge to DynamoDB. You can click Integration -> Flows to view the topology. It can be seen that the messages under topic t/# are sent and saved to DynamoDB after parsing by rule my_rule.

Test Data Bridge and Rule

Use MQTT X to send a message to topic t/1 to trigger an online/offline event.

  1. mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello DynamoDB" }'

Check the running status of the data bridge, there should be one new incoming and one new outgoing message.

Check whether the data is written into the mqtt_msg data table.

  1. docker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb scan --table-name=mqtt_msg --endpoint-url http://host.docker.internal:8000

The output will be:

  1. {
  2. "Items": [
  3. {
  4. "metadata": {
  5. "S": "{\"rule_id\":\"90d98f59\"}"
  6. },
  7. "peerhost": {
  8. "S": "127.0.0.1"
  9. },
  10. "clientid": {
  11. "S": "emqx_c"
  12. },
  13. "flags": {
  14. "S": "{\"retain\":false,\"dup\":false}"
  15. },
  16. "node": {
  17. "S": "emqx@127.0.0.1"
  18. },
  19. "qos": {
  20. "N": "0"
  21. },
  22. "payload": {
  23. "S": "{ \"msg\": \"hello DynamoDB\" }"
  24. },
  25. "pub_props": {
  26. "S": "{\"User-Property\":{}}"
  27. },
  28. "publish_received_at": {
  29. "N": "1678263363503"
  30. },
  31. "topic": {
  32. "S": "t/1"
  33. },
  34. "id": {
  35. "S": "0005F65F239F03FEF44300000BB40002"
  36. },
  37. "event": {
  38. "S": "message.publish"
  39. },
  40. "username": {
  41. "S": "undefined"
  42. },
  43. "timestamp": {
  44. "N": "1678263363503"
  45. }
  46. },
  47. {
  48. "conn_props": {
  49. "S": "{\"User-Property\":{},\"Request-Problem-Information\":1}"
  50. },
  51. "peername": {
  52. "S": "127.0.0.1:59582"
  53. },
  54. "metadata": {
  55. "S": "{\"rule_id\":\"703890a5\"}"
  56. },
  57. "clientid": {
  58. "S": "emqx_c"
  59. },
  60. "is_bridge": {
  61. "S": "false"
  62. },
  63. "keepalive": {
  64. "N": "30"
  65. },
  66. "proto_ver": {
  67. "N": "5"
  68. },
  69. "proto_name": {
  70. "S": "MQTT"
  71. },
  72. "connected_at": {
  73. "N": "1678263363499"
  74. },
  75. "receive_maximum": {
  76. "N": "32"
  77. },
  78. "sockname": {
  79. "S": "127.0.0.1:1883"
  80. },
  81. "mountpoint": {
  82. "S": "undefined"
  83. },
  84. "node": {
  85. "S": "emqx@127.0.0.1"
  86. },
  87. "id": {
  88. "S": "client.connected1678263363499"
  89. },
  90. "expiry_interval": {
  91. "N": "0"
  92. },
  93. "event": {
  94. "S": "client.connected"
  95. },
  96. "username": {
  97. "S": "undefined"
  98. },
  99. "timestamp": {
  100. "N": "1678263363499"
  101. },
  102. "clean_start": {
  103. "S": "true"
  104. }
  105. },
  106. {
  107. "reason": {
  108. "S": "normal"
  109. },
  110. "peername": {
  111. "S": "127.0.0.1:59582"
  112. },
  113. "metadata": {
  114. "S": "{\"rule_id\":\"703890a5\"}"
  115. },
  116. "clientid": {
  117. "S": "emqx_c"
  118. },
  119. "proto_ver": {
  120. "N": "5"
  121. },
  122. "proto_name": {
  123. "S": "MQTT"
  124. },
  125. "sockname": {
  126. "S": "127.0.0.1:1883"
  127. },
  128. "disconn_props": {
  129. "S": "{\"User-Property\":{}}"
  130. },
  131. "node": {
  132. "S": "emqx@127.0.0.1"
  133. },
  134. "id": {
  135. "S": "client.disconnected1678263363503"
  136. },
  137. "event": {
  138. "S": "client.disconnected"
  139. },
  140. "disconnected_at": {
  141. "N": "1678263363503"
  142. },
  143. "username": {
  144. "S": "undefined"
  145. },
  146. "timestamp": {
  147. "N": "1678263363503"
  148. }
  149. }
  150. ],
  151. "Count": 3,
  152. "ScannedCount": 3,
  153. "ConsumedCapacity": null
  154. }