EdgeX Message Bus action

The action is used for publishing output message into EdgeX message bus.

**Please notice that, if you’re using the ZeorMQ message bus, the action will create a NEW EdgeX message bus (with the address where running Kuiper service), but not by leveraging the original message bus (normally it’s the address & port exposed by application service). **

Also, you need to expose the port number to host server before running the Kuiper server if you want to have the service available to other hosts.

Property nameOptionalDescription
protocoltrueThe protocol. If it’s not specified, then use default value tcp.
hosttrueThe host of message bus. If not specified, then use default value .
porttrueThe port of message bus. If not specified, then use default value 5563.
topictrueThe topic to be published. If not specified, then use default value events.
contentTypetrueThe content type of message to be published. If not specified, then use the default value application/json.
metadatatrueThe property is a field name that allows user to specify a field name of SQL select clause, the field name should use meta() AS xxx to select all of EdgeX metadata from message.
deviceNametrueAllows user to specify the device name in the event structure that are sent from Kuiper.
typetrueThe message bus type, two types of message buses are supported, zero or mqtt, and zero is the default value.
optionaltrueIf mqtt message bus type is specified, then some optional values can be specified. Please refer to below for supported optional supported configurations.

Below optional configurations are supported, please check MQTT specification for the detailed information.

  • optional
    • ClientId
    • Username
    • Password
    • Qos
    • KeepAlive
    • Retained
    • ConnectionPayload
    • CertFile
    • KeyFile
    • CertPEMBlock
    • KeyPEMBlock
    • SkipCertVerify

Examples

Publish result to a new EdgeX message bus without keeping original metadata

In this case, the original metadata value (such as id, pushed, created, modified, origin in Events structure, and id, created, modified, origin, pushed, device in Reading structure will not be kept). Kuiper acts as another EdgeX micro service here, and it has own device name. A deviceName property is provided, and allows user to specify the device name of Kuiper. Below is one example,

  1. Data received from EdgeX message bus events topic,
  1. {
  2. "Device": "demo", "Created": 000,
  3. "readings":
  4. [
  5. {"Name": "Temperature", value: "30", "Created":123 …},
  6. {"Name": "Humidity", value: "20", "Created":456 …}
  7. ]
  8. }
  1. Use following rule, and specify deviceName with kuiper in edgex action.
  1. {
  2. "id": "rule1",
  3. "sql": "SELECT temperature * 3 AS t1, humidity FROM events",
  4. "actions": [
  5. {
  6. "edgex": {
  7. "protocol": "tcp",
  8. "host": "*",
  9. "port": 5571,
  10. "topic": "application",
  11. "deviceName": "kuiper",
  12. "contentType": "application/json"
  13. }
  14. }
  15. ]
  16. }
  1. The data sent to EdgeX message bus.
  1. {
  2. "Device": "kuiper", "Created": 0,
  3. "readings":
  4. [
  5. {"Name": "t1", value: "90" , "Created": 0 …},
  6. {"Name": "humidity", value: "20" , "Created": 0 …}
  7. ]
  8. }

Please notice that,

  • The device name of Event structure is changed to kuiper
  • All of metadata for Events and Readings structure will be updated with new value. Created field is updated to another value generated by Kuiper (here is 0).

Publish result to a new EdgeX message bus with keeping original metadata

But for some scenarios, you may want to keep some of original metadata. Such as keep the device name as original value that published to Kuiper (demo in the sample), and also other metadata of readings arrays. In such case, Kuiper is acting as a filter - to filter NOT concerned messages, but still keep original data.

Below is an example,

  1. Data received from EdgeX message bus events topic,
  1. {
  2. "Device": "demo", "Created": 000,
  3. "readings":
  4. [
  5. {"Name": "Temperature", value: "30", "Created":123 …},
  6. {"Name": "Humidity", value: "20", "Created":456 …}
  7. ]
  8. }
  1. Use following rule, and specify metadata with edgex_meta in edgex action.
  1. {
  2. "id": "rule1",
  3. "sql": "SELECT meta(*) AS edgex_meta, temperature * 3 AS t1, humidity FROM events WHERE temperature > 30",
  4. "actions": [
  5. {
  6. "edgex": {
  7. "protocol": "tcp",
  8. "host": "*",
  9. "port": 5571,
  10. "topic": "application",
  11. "metadata": "edgex_meta",
  12. "contentType": "application/json"
  13. }
  14. }
  15. ]
  16. }

Please notice that,

  • User need to add meta(*) AS edgex_meta in the SQL clause, the meta(*) returns all of metadata.
  • In edgex action, value edgex_meta is specified for metadata property. This property specifies which field contains metadata of message.
  1. The data sent to EdgeX message bus.
  1. {
  2. "Device": "demo", "Created": 000,
  3. "readings":
  4. [
  5. {"Name": "t1", value: "90" , "Created": 0 …},
  6. {"Name": "humidity", value: "20", "Created":456 …}
  7. ]
  8. }

Please notice that,

  • The metadata of Events structure is still kept, such as Device & Created.
  • For the reading that can be found in original message, the metadata will be kept. Such as humidity metadata will be the old values received from EdgeX message bus.
  • For the reading that can NOT be found in original message, the metadata will not be set. Such as metadata of t1 in the sample will fill with default value that generated by Kuiper.
  • If your SQL has aggregated function, then it does not make sense to keep these metadata, but Kuiper will still fill with metadata from a particular message in the time window. For example, with following SQL, SELECT avg(temperature) AS temperature, meta(*) AS edgex_meta FROM ... GROUP BY TUMBLINGWINDOW(ss, 10). In this case, there are possibly several messages in the window, the metadata value for temperature will be filled with value from 1st message that received from bus.

Send result to MQTT message bus

Below is a rule that send analysis result to MQTT message bus, please notice how to specify ClientId in optional configuration.

  1. {
  2. "id": "rule1",
  3. "sql": "SELECT meta(*) AS edgex_meta, temperature, humidity, humidity*2 as h1 FROM demo WHERE temperature = 20",
  4. "actions": [
  5. {
  6. "edgex": {
  7. "protocol": "tcp",
  8. "host": "127.0.0.1",
  9. "port": 1883,
  10. "topic": "result",
  11. "type": "mqtt",
  12. "metadata": "edgex_meta",
  13. "contentType": "application/json",
  14. "optional": {
  15. "ClientId": "edgex_message_bus_001"
  16. }
  17. }
  18. },
  19. {
  20. "log":{}
  21. }
  22. ]
  23. }