Ingest Data into MongoDB

EMQX supports integration with MongoDB so you can save MQTT messages and client events to MongoDB.

TIP

EMQX Enterprise Edition features. EMQX Enterprise Edition provides comprehensive coverage of key business scenarios, rich data integration, product-level reliability, and 24/7 global technical support. Experience the benefits of this enterprise-ready MQTT messaging platformIngest Data into MongoDB - 图1 (opens new window) today.

Prerequisites

Feature List

Quick Start Tutorial

This section introduces how to use the MongoDB data bridge, covering topics like how to set up the MongoDB server, create a data bridge and rule for forwarding data to MongoDB and test the data bridges and rules.

This tutorial assumes that you run both EMQX and MongoDB on the local machine. If you have MongoDB and EMQX running remotely, adjust the settings accordingly.

Install MongoDB Server

Install MongoDB via Docker, and then run the docker image.

  1. # To start the MongoDB docker image and set the password as public
  2. docker run -d --name mongodb -p 27017:27017 mongo
  3. # Access the container
  4. docker exec -it mongodb bash
  5. # Locate the MongoDB server in the container
  6. mongo
  7. # Create a user
  8. use admin
  9. db.createUser({ user: "admin", pwd: "public", roles: [ { role: "root", db: "admin" } ] })
  10. # Create database emqx_data
  11. use emqx_data
  12. # create collection emqx_messages
  13. db.createCollection('emqx_messages')

Create a MongoDB Data Bridge

  1. Go to EMQX Dashboard, click Integration -> Data Bridge.

  2. Click Create on the top right corner of the page.

  3. In the Create Data Bridge page, click to select MongoDB, and then click Next.

  4. Input a name for the data bridge. The name should be a combination of upper/lower case letters and numbers.

  5. Set MongoDB Mode and Srv Record as your business needs, for example, single and the default deselected status.

  6. Configure the MongoDB connection information. Input emqx_data as the Database Name, 127.0.0.1:27017 as the Server Host, admin as the Username, public as the Password, and emqx_messages as Collection to be used. For the other fields, you can keep the default setting.

  7. Configure the Payload template to save clientid, topic, qos, timestamp, and payload to MongoDB. This template will be executed via the MongoDB insert command, and the sample code is as follows:

    1. {
    2. "clientid": "${clientid}",
    3. "topic": "${topic}",
    4. "qos": ${qos},
    5. "timestamp": ${timestamp},
    6. "payload": ${payload}
    7. }

    Notes when configuring the Payload template:

    • All keys need to be wrapped in double quotes ";

    • Auto-derivation of the data type of “value” is not supported:

      • Characters need to be wrapped with ", otherwise, an error will be reported;
      • Values do not need to be wrapped, otherwise, they will be recognized as characters;
      • For timestamp, date, and time types, if no special treatment is performed, they will be recognized as numeric or character types. To store them as date or time, use the mongo_date function in the rule SQL to process the fields. For details, see Time and date functions.
    • Nested objects are allowed, when value is a JSON object:

      • It is not allowed to use " to nest the value in the template, otherwise, it will cause an execution error;
      • Objects will be nested and stored according to their own structure;
    • To store objects as JSON characters, use the json_encode function in rule SQL for the conversion, and the corresponding value in the template is still not allowed to be wrapped with ".
  8. Advanced settings (optional): Choose whether to use sync or async query mode, and whether to enable queue or batch. For details, see Configuration.

  9. Before clicking Create, you can click Test Connectivity to test that the bridge can connect to the MongoDB server.

  10. Click the Create button to finish the setup.

Now the MongoDB data bridge should appear in the data bridge list (Integration -> Data Bridge) with Resource Status as Connected.

Create a Rule for MongoDB Data Bridge

  1. Go to EMQX Dashboard, click Integration -> Rules.

  2. Click Create on the top right corner of the page.

  3. Input my_rule as the rule ID, and set the rules in the SQL Editor. Here we want to save the MQTT messages under topic t/# to MongoDB, we can use the SQL syntax below.

    Note: If you want to specify your own SQL syntax, make sure that you have included all fields required by the data bridge in the SELECT part.

  1. SELECT
  2. *
  3. FROM
  4. "t/#"

You can use the SQL syntax below to save timestamp as data type and the payload in JSON as JSON strings:

  1. SELECT
  2. *,
  3. mongo_date(timestamp) as timestamp,
  4. json_encode(payload) as payload
  5. FROM
  6. "t/#"
  1. Click the Add Action button, select Forwarding with Data Bridge from the dropdown list and then select the data bridge we just created under Data bridge. Then, click the Add button.

  2. Click the Create button to finish the setup.

Now a rule to forward data to MongoDB via a MongoDB data bridge is created. You can go to Integration -> Flows to view the topology. Messages under topic t/# are first processed by rule my_rule and then saved in MongoDB.

Test the Data Bridge and Rule

Use MQTTX to send a message to topic t/1:

  1. mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello MongoDB" }'

Check the running status of the two data bridges, there should be one new Matched and one new Sent Successfully message.

Check whether the message is written into collection emqx_messages:

  1. > db.emqx_messages.find().pretty()
  2. {
  3. "_id" : ObjectId("63db7059df489d01ed000009"),
  4. "clientid" : "emqx_c",
  5. "payload" : {
  6. "msg" : "hello MongoDB"
  7. },
  8. "qos" : 0,
  9. "timestamp" : NumberLong("1675325529070"),
  10. "topic" : "t/1"
  11. }

Under the second rule SQL, the returned information should be:

  1. > db.emqx_messages.find().pretty()
  2. {
  3. "_id" : ObjectId("63db7535df489d01ed000013"),
  4. "clientid" : "emqx_c",
  5. "payload" : "{ \"msg\": \"hello MongoDB\" }",
  6. "qos" : 0,
  7. "timestamp" : ISODate("2023-02-02T08:33:36.715Z"),
  8. "topic" : "t/1"
  9. }