Save data to DolphinDB

DolphinDBSave data to DolphinDB - 图1 (opens new window) is a high-performance distributed time series database developed by Zhejiang DolphinDB Co., Ltd, which integrates powerful programming language and high-capacity and high-speed flow data analysis system, providing a one-stop solution for rapid storage, retrieval, analysis and calculation of massive structured data. It is suitable for the area of quantitative finance and industrial Internet of things.

EMQX uses Erlang to implement DolphinDB’s client API, which transmits data to DolphinDB for storage through TCP.

Set up DolphinDB

Currently, EMQX only adapts to DolphinDB 1.20.7 version.

Taking the Linux version as an example, you can go to the official website to download the latest version of the Linux64 installation package from the community: https://www.dolphindb.cn/downloads.html

Upload the server directory of the installation package to the server directory /opts/app/dolphindb, and test whether the startup is normal:

  1. chmod +x ./dolphindb
  2. ./dolphindb
  3. ## If the startup is successful, you will enter the dolphindb command line and execute 1+1
  4. >1+1
  5. 2

If the startup is successful and the correct output is obtained, it indicates that DolphinDB is successfully installed. Then use <CRTL+D> to close DolphinDB.

Now, we need to open the publish / subscribe function of streamtable in dolphin dB and create relevant data tables to realize the function of EMQ x message storage and persistence:

  1. Modify the DolphinDB’s configuration file vim dolphindb.cfg and add the following configuration items to enable the publish/subscribe function:
  1. ## Publisher for streaming
  2. maxPubConnections=10
  3. persistenceDir=/ddb/pubdata/
  4. #persistenceWorkerNum=
  5. #maxPersistenceQueueDepth=
  6. #maxMsgNumPerBlock=
  7. #maxPubQueueDepthPerSite=
  8. ## Subscriber for streaming
  9. subPort=8000
  10. #subExecutors=
  11. #maxSubConnections=
  12. #subExecutorPooling=
  13. #maxSubQueueDepth=
  1. Start the dolphin DB service from the background:
  1. ## After startup, dolphin DB will listen to port 8848 for client.
  2. nohup ./dolphindb -console 0 &
  1. Go to the official website of DolphinDB and download a suitable GUI client to connect to the DolphinDB service:

    • Go to download pageSave data to DolphinDB - 图2 (opens new window) to download DolphinDB GUI
    • DolphinDB GUI client depends on the Java environment, therefore, make sure that Java is installed at first
    • Go to the DolphinDB GUI directory and execute sh gui.sh to start the client
    • Add Server in the client and create a Project with script files.
  2. Create a distributed database and a streamtable table, and persist the data of streamtable into the distributed table
  1. // Create a distributed file database named emqx
  2. // And create a table named `msg`, partition by the hash values of `clientid` and `topic`:
  3. schema = table(1:0, `clientid`topic`qos`payload, [STRING, STRING, INT, STRING])
  4. db1 = database("", HASH, [STRING, 8])
  5. db2 = database("", HASH, [STRING, 8])
  6. db = database("dfs://emqx", COMPO, [db1, db2])
  7. db.createPartitionedTable(schema, "msg",`clientid`topic)
  8. // Create a StreamTable table named `st_msg` and persist the data to the `msg` table.
  9. share streamTable(10000:0,`clientid`topic`qos`payload, [STRING,STRING,INT,STRING]) as st_msg
  10. msg_ref= loadTable("dfs://emqx", "msg")
  11. subscribeTable(, "st_msg", "save_msg_to_dfs", 0, msg_ref, true)
  12. // Query msg_ref to check whether the creation is successful
  13. select * from msg_ref;

After that, you can see that an empty msg_ref has been created successfully:

Create DolphinDB Table

So far, the configuration of DolphinDB has been completed.

For detailed DolphinDB usage documentation, please refer to:

Configure the rules engine

Create rules:

Open EMQX DashboardSave data to DolphinDB - 图4 (opens new window) and select the “Rules” tab on the left.

Fill in the rule SQL:

  1. SELECT * FROM "t/#"

image

Related actions:

On the “Response Action” interface, select “Add”, and then select “Save Data to DolphinDB” in the “Action” drop-down box.

image

Fill in the action parameters:

The “Save data to DolphinDB” action requires two parameters:

1). SQL template. In this example, we insert a piece of data into the stream table st_msg, and the SQL template is:

  1. insert into st_msg values(${clientid}, ${topic}, ${qos}, ${payload})

2). The ID of the associated resource. Now that the resource drop-down box is empty, and you can click “New Resource” in the upper right corner to create a DolphinDB resource:

Fill in the server address corresponding to the DolphinDB server deployed above. The user name is admin and the password is 123456

image

Click the “OK” button.

Return to the response action interface and click “OK”.

image

Return to the rule creation interface and click “Create”.

image

In the rule list, click the “View” button or the rule ID connection to preview the rule just created:

image

The rule has been created. Now, send a piece of data:

  1. Topic: "t/a"
  2. QoS: 1
  3. Payload: "hello"

Then check the persistent msg_dfs table to see whether the new data is added successfully:

image