MQTT Protocol

MQTTMQTT - 图1 (opens new window) is a machine-to-machine (M2M)/“Internet of Things” connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.

IoTDB supports the MQTT v3.1(an OASIS Standard) protocol. IoTDB server includes a built-in MQTT service that allows remote devices send messages into IoTDB server directly.

MQTT - 图2

Built-in MQTT Service

The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients and then write the data into storage immediately. The MQTT topic corresponds to IoTDB timeseries. The messages payload can be format to events by PayloadFormatter which loaded by java SPI, and the default implementation is JSONPayloadFormatter. The default json formatter support two json format, and the following is an MQTT message payload example:

  1. {
  2. "device":"root.sg.d1",
  3. "timestamp":1586076045524,
  4. "measurements":["s1","s2"],
  5. "values":[0.530635,0.530635]
  6. }

or

  1. {
  2. "device":"root.sg.d1",
  3. "timestamps":[1586076045524,1586076065526],
  4. "measurements":["s1","s2"],
  5. "values":[[0.530635,0.530635], [0.530655,0.530695]]
  6. }

MQTT - 图3

MQTT Configurations

The IoTDB MQTT service load configurations from ${IOTDB_HOME}/${IOTDB_CONF}/iotdb-engine.properties by default.

Configurations are as follows:

NAMEDESCRIPTIONDEFAULT
enable_mqtt_servicewhether to enable the mqtt servicefalse
mqtt_hostthe mqtt service binding host0.0.0.0
mqtt_portthe mqtt service binding port1883
mqtt_handler_pool_sizethe handler pool size for handing the mqtt messages1
mqtt_payload_formatterthe mqtt message payload formatterjson
mqtt_max_message_sizethe max mqtt message size in byte1048576

Coding Examples

The following is an example which a mqtt client send messages to IoTDB server.

  1. MQTT mqtt = new MQTT();
  2. mqtt.setHost("127.0.0.1", 1883);
  3. mqtt.setUserName("root");
  4. mqtt.setPassword("root");
  5. BlockingConnection connection = mqtt.blockingConnection();
  6. connection.connect();
  7. Random random = new Random();
  8. for (int i = 0; i < 10; i++) {
  9. String payload = String.format("{\n" +
  10. "\"device\":\"root.sg.d1\",\n" +
  11. "\"timestamp\":%d,\n" +
  12. "\"measurements\":[\"s1\"],\n" +
  13. "\"values\":[%f]\n" +
  14. "}", System.currentTimeMillis(), random.nextDouble());
  15. connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
  16. }
  17. connection.disconnect();

Rest