OpenTSDB Backend

TIP

After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to OpenTSDB to setup Save data to OpenTSDB in rule engine.

Configure OpenTSDB Server

Config file: etc/plugins/emqx_backend_opentsdb.conf:

  1. ## OpenTSDB Server
  2. backend.opentsdb.pool1.server = 127.0.0.1:4242
  3. ## OpenTSDB Pool Size
  4. backend.opentsdb.pool1.pool_size = 8
  5. ## Whether to return summary info
  6. backend.opentsdb.pool1.summary = true
  7. ## Whether to return detailed info
  8. ##
  9. ## Value: true | false
  10. backend.opentsdb.pool1.details = false
  11. ## Synchronous write or not
  12. ##
  13. ## Value: true | false
  14. backend.opentsdb.pool1.sync = false
  15. ## Synchronous write timeout in milliseconds
  16. ##
  17. ## Value: Duration
  18. ##
  19. ## Default: 0
  20. backend.opentsdb.pool1.sync_timeout = 0
  21. ## Max batch size
  22. ##
  23. ## Value: Number >= 0
  24. ## Default: 20
  25. backend.opentsdb.pool1.max_batch_size = 20
  26. ## Store PUBLISH Messages
  27. backend.opentsdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

Parameters in hook rule:

OptionDescription
topicConfigure which topics need to execute hooks
actionConfigure specific action for hook, function is a built-in function provided as Backend for general functions
poolPool Name, used to connect multiple OpenTSDB servers

Example:

  1. ## Store PUBLISH message whose topic is "sensor/#"
  2. backend.influxdb.hook.message.publish.1 = {"topic": "sensor/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
  3. ## Store PUBLISH message whose topic is "stat/#"
  4. backend.influxdb.hook.message.publish.2 = {"topic": "stat/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

Description of OpenTSDB Persistence Hooks

hooktopicactionDescription
message.publish#on_message_publishStore published messages

Since MQTT Message cannot be written directly to OpenTSDB, OpenTSDB Backend provides an emqx_backend_opentsdb.tmpl template file to convert MQTT Message to DataPoint that can be written to OpenTSDB.

Template file use Json format:

  • key - MQTT Topic, Json String, support wildcard characters
  • value - Template, Json Object, used to convert MQTT Message into measurement,tag_key=tag_value,... field_key=field_value,... timestamp and write to InfluxDB。

You can define different templates for different topics or multiple templates for the same topic, likes:

  1. {
  2. <Topic 1>: <Template 1>,
  3. <Topic 2>: <Template 2>
  4. }

The template format is as follows:

  1. {
  2. "measurement": <Measurement>,
  3. "tags": {
  4. <Tag Key>: <Tag Value>
  5. },
  6. "value": <Value>,
  7. "timestamp": <Timestamp>
  8. }

measurement and value are required options, tags and timestamp are optional.

All values (such as <Measurement>) can be configured directly in the template as a fixed value that data types supported depending on the table you define. More realistically, of course, you can access the data in the MQTT message through the placeholder we provide.

Currently, we support placeholders as follows:

PlaceholderDescription
$idMQTT Message UUID, assigned by EMQX
$clientidClient ID used by the Client
$usernameUsername used by the Client
$peerhostIP of Client
$qosQoS of MQTT Message
$topicTopic of MQTT Message
$payloadPayload of MQTT Message, must be valid Json data
$<Number>It must be used with $paylaod to retrieve data from Json Array
$timestampThe timestamp EMQX sets when preparing to forward messages, precision: Nanoseconds

$payload and $<Number>:

You can directly use $content to obtain the complete message payload, you can use ["$payload", <Key>, ...] to get the data inside the message payload.

For example payload is {"data": {"temperature": 23.9}}, you can via ["$payload", "data", "temperature"] to get 23.9.

In the case of array data type in Json, we introduced $0 and $<pos_integer>, $0 means to get all elements in the array, and $<pos_integer> means to get the <pos_integer>th element in the array.

A simple example, ["$payload", "$0", "temp"] will get [20, 21] from [{"temp": 20}, {"temp": 21}], and ["$payload", "$1", "temp"] will only get 20.

It is worth noting that when you use $0, we expect the number of data you get is same. Because we need to convert these arrays into multiple records and write it into OpenTSDB, and when you have three pieces of data in one field and two in another, we won’t know how to combine the data for you.

Example

data/templates directory provides a sample template (emqx_backend_opentsdb_example.tmpl, please remove the “_example” suffix from the filename when using it formally) for the user’s reference:

  1. {
  2. "sample": {
  3. "measurement": "$topic",
  4. "tags": {
  5. "host": ["$payload", "data", "$0", "host"],
  6. "region": ["$payload", "data", "$0", "region"],
  7. "qos": "$qos",
  8. "clientid": "$clientid"
  9. },
  10. "value": ["$payload", "data", "$0", "temp"],
  11. "timestamp": "$timestamp"
  12. }
  13. }

When an MQTT Message whose Topic is “sample” has the following Payload:

  1. {
  2. "data": [
  3. {
  4. "temp": 1,
  5. "host": "serverA",
  6. "region": "hangzhou"
  7. },
  8. {
  9. "temp": 2,
  10. "host": "serverB",
  11. "region": "ningbo"
  12. }
  13. ]
  14. }

Backend converts MQTT messages into the following data and writes it to OpenTSDB:

  1. [
  2. {
  3. "measurement": "sample",
  4. "tags": {
  5. "clientid": "mqttjs_ebcc36079a",
  6. "host": "serverA",
  7. "qos": "0",
  8. "region": "hangzhou",
  9. },
  10. "value": "1",
  11. "timestamp": "1560743513626681000"
  12. },
  13. {
  14. "measurement": "sample",
  15. "tags": {
  16. "clientid": "mqttjs_ebcc36079a",
  17. "host": "serverB",
  18. "qos": "0",
  19. "region": "ningbo",
  20. },
  21. "value": "2",
  22. "timestamp": "1560743513626681000"
  23. }
  24. ]

Enable OpenTSDB Backend

  1. ./bin/emqx_ctl plugins load emqx_backend_opentsdb