集成 EMQX

本节将演示如何通过 Neuron 南向采集设备实际点位数据,通过北向 Sparkplug B 插件将数据上报到 EMQX, 再通过EMQX编解码功能解码后得到正确完整的数据结果,流程如图:

Sparkplug B

配置 Neuron

添加南向设备

通过南向驱动采集 Modbus TCP 模拟器点位值去模拟实际设备点位值,配置如下:

添加设备

image-20230419133807028

设备配置

image-20230419134424414

image-20230419134500446

创建组

image-20230419134630777

添加点位

image-20230419134741736

添加北向应用

添加应用

image-20230419134941116

应用配置

image-20230419135022571

image-20230419135247025

添加订阅

image-20230419135416517

配置 EMQX

我们可以通过安装 MQTTX 桌面客户端查看由 Neuron 转发到 EMQX 的数据。但如果直接订阅 Neuron 北向 SparkPlugB 上报到 EMQX 的数据,则会出现字符串乱码的情况,如图:

image-20230419140026304

所以可以通过EMQX规则引擎编解码的能力,编写一个相应的proto文件结合规则引擎将上报的数据进行解码后得到正确完整的数据结果。

创建编解码

image-20230419140540482

image-20230419140724126

bash

  1. ## 完整proto文件
  2. syntax = "proto2";
  3. //
  4. // To compile:
  5. // cd client_libraries/java
  6. // protoc --proto_path=../../ --java_out=src/main/java ../../sparkplug_b.proto
  7. //
  8. message Payload {
  9. /*
  10. // Indexes of Data Types
  11. // Unknown placeholder for future expansion.
  12. Unknown = 0;
  13. // Basic Types
  14. Int8 = 1;
  15. Int16 = 2;
  16. Int32 = 3;
  17. Int64 = 4;
  18. UInt8 = 5;
  19. UInt16 = 6;
  20. UInt32 = 7;
  21. UInt64 = 8;
  22. Float = 9;
  23. Double = 10;
  24. Boolean = 11;
  25. String = 12;
  26. DateTime = 13;
  27. Text = 14;
  28. // Additional Metric Types
  29. UUID = 15;
  30. DataSet = 16;
  31. Bytes = 17;
  32. File = 18;
  33. Template = 19;
  34. // Additional PropertyValue Types
  35. PropertySet = 20;
  36. PropertySetList = 21;
  37. */
  38. message Template {
  39. message Parameter {
  40. optional string name = 1;
  41. optional uint32 type = 2;
  42. oneof value {
  43. uint32 int_value = 3;
  44. uint64 long_value = 4;
  45. float float_value = 5;
  46. double double_value = 6;
  47. bool boolean_value = 7;
  48. string string_value = 8;
  49. ParameterValueExtension extension_value = 9;
  50. }
  51. message ParameterValueExtension {
  52. extensions 1 to max;
  53. }
  54. }
  55. optional string version = 1; // The version of the Template to prevent mismatches
  56. repeated Metric metrics = 2; // Each metric is the name of the metric and the datatype of the member but does not contain a value
  57. repeated Parameter parameters = 3;
  58. optional string template_ref = 4; // Reference to a template if this is extending a Template or an instance - must exist if an instance
  59. optional bool is_definition = 5;
  60. extensions 6 to max;
  61. }
  62. message DataSet {
  63. message DataSetValue {
  64. oneof value {
  65. uint32 int_value = 1;
  66. uint64 long_value = 2;
  67. float float_value = 3;
  68. double double_value = 4;
  69. bool boolean_value = 5;
  70. string string_value = 6;
  71. DataSetValueExtension extension_value = 7;
  72. }
  73. message DataSetValueExtension {
  74. extensions 1 to max;
  75. }
  76. }
  77. message Row {
  78. repeated DataSetValue elements = 1;
  79. extensions 2 to max; // For third party extensions
  80. }
  81. optional uint64 num_of_columns = 1;
  82. repeated string columns = 2;
  83. repeated uint32 types = 3;
  84. repeated Row rows = 4;
  85. extensions 5 to max; // For third party extensions
  86. }
  87. message PropertyValue {
  88. optional uint32 type = 1;
  89. optional bool is_null = 2;
  90. oneof value {
  91. uint32 int_value = 3;
  92. uint64 long_value = 4;
  93. float float_value = 5;
  94. double double_value = 6;
  95. bool boolean_value = 7;
  96. string string_value = 8;
  97. PropertySet propertyset_value = 9;
  98. PropertySetList propertysets_value = 10; // List of Property Values
  99. PropertyValueExtension extension_value = 11;
  100. }
  101. message PropertyValueExtension {
  102. extensions 1 to max;
  103. }
  104. }
  105. message PropertySet {
  106. repeated string keys = 1; // Names of the properties
  107. repeated PropertyValue values = 2;
  108. extensions 3 to max;
  109. }
  110. message PropertySetList {
  111. repeated PropertySet propertyset = 1;
  112. extensions 2 to max;
  113. }
  114. message MetaData {
  115. // Bytes specific metadata
  116. optional bool is_multi_part = 1;
  117. // General metadata
  118. optional string content_type = 2; // Content/Media type
  119. optional uint64 size = 3; // File size, String size, Multi-part size, etc
  120. optional uint64 seq = 4; // Sequence number for multi-part messages
  121. // File metadata
  122. optional string file_name = 5; // File name
  123. optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc)
  124. optional string md5 = 7; // md5 of data
  125. // Catchalls and future expansion
  126. optional string description = 8; // Could be anything such as json or xml of custom properties
  127. extensions 9 to max;
  128. }
  129. message Metric {
  130. optional string name = 1; // Metric name - should only be included on birth
  131. optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages
  132. optional uint64 timestamp = 3; // Timestamp associated with data acquisition time
  133. optional uint32 datatype = 4; // DataType of the metric/tag value
  134. optional bool is_historical = 5; // If this is historical data and should not update real time tag
  135. optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag
  136. optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes.
  137. optional MetaData metadata = 8; // Metadata for the payload
  138. optional PropertySet properties = 9;
  139. oneof value {
  140. uint32 int_value = 10;
  141. uint64 long_value = 11;
  142. float float_value = 12;
  143. double double_value = 13;
  144. bool boolean_value = 14;
  145. string string_value = 15;
  146. bytes bytes_value = 16; // Bytes, File
  147. DataSet dataset_value = 17;
  148. Template template_value = 18;
  149. MetricValueExtension extension_value = 19;
  150. }
  151. message MetricValueExtension {
  152. extensions 1 to max;
  153. }
  154. }
  155. optional uint64 timestamp = 1; // Timestamp at message sending time
  156. repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs
  157. optional uint64 seq = 3; // Sequence number
  158. optional string uuid = 4; // UUID to track message type in terms of schema definitions
  159. optional bytes body = 5; // To optionally bypass the whole definition above
  160. extensions 6 to max; // For third party extensions
  161. }

创建规则

SQL 语句

bash

  1. SELECT
  2. schema_decode('neuron', payload, 'Payload') as SparkPlugB
  3. FROM
  4. "spBv1.0/group1/DDATA/node1/modbus"

这里的关键点在于 schema_decode('neuron', payload, 'Payload')

  • schema_decode 函数将 payload 字段的内容按照 protobuf_person Schema 来做解码;
  • as SparkPlugB 将解码后的值保存到变量 “SparkPlugB” 里;
  • 最后一个参数 Payload 指明了 payload 中的消息的类型是 protobuf schema 里定义的 ‘Payload’ 类型。

image-20230419141423179

然后使用以下参数添加动作:

  • 动作类型:消息重新发布
  • 目的主题:SparkPlugB/test

这个动作将解码之后的 “Payload” 以 JSON 的格式发送到 SparkPlugB/test 这个主题。

image-20230419141513303

通过 MQTTX 验证

再次通过 MQTTX 工具去订阅相关主题,经过 EMQX 规则引擎的编解码后,我们可以在 MQTTX 客户端看到正确解码后的数据,如图:

image-20230419141831882

如上图,可以看到解码前的原数据是乱码的,解码后得到完整正确的数据结果;至此,通过Neuron南向采集设备点位值,北向 SparkPlugB 上报到 EMQX,通过编解码功能解码得到完整的数据结果已完成。

拓展阅读:Sparkplug 协议

Neuron 上报数据到 EMQX 的 Topic 是根据 Sparkplug B 协议规范定义的,格式如下:

namespace/group_id/DDATA/edge_node_id/device_id

协议内容如图:

image-20230419143059088

至于更多 Neuron 北向 Sparkplug B 插件相关标准的定义,可以参考 SparkPlug B 协议规范