设备消息协议解析SDK

平台封装了网络通信,但是具体的数据由消息协议进行解析.协议(ProtocolSupport)主要由认证器(Authenticator), 消息编解码器(DeviceMessageCodec),消息发送拦截器(DeviceMessageSenderInterceptor)以及配置元数据(ConfigMetadata)组成.

认证器

认证器(Authenticator)是用于在收到设备请求(例如MQTT)时,对客户端进行认证时使用,不同的网络协议(Transport)使用不同的认证器.

接口定义:

  1. public interface Authenticator {
  2. Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request,
  3. @Nonnull DeviceOperator device);
  4. }

参数AuthenticationRequest为认证请求参数,不同的网络类型请求类型也不同,请根据实际情况转换为对应的类型,例如: MqttAuthenticationRequest mqttRequest = (MqttAuthenticationRequest)request;

参数DeviceOperator为对应的设备操作接口,可通过此接口获取设备的配置,例如:device.getConfig("mqttUsername").

返回值Mono<AuthenticationResponse>为认证结果.

例:

  1. Authenticator mqttAuthenticator = (request, device) -> {
  2. MqttAuthenticationRequest mqttRequest = ((MqttAuthenticationRequest) request);
  3. return device.getConfigs("username", "password") //获取设备的配置信息,由配置元数据定义,在设备型号中进行配置.
  4. .flatMap(values -> {
  5. String username = values.getValue("username").map(Value::asString).orElse(null);
  6. String password = values.getValue("password").map(Value::asString).orElse(null);
  7. if (mqttRequest.getUsername().equals(username) && mqttRequest.getPassword().equals(password)) {
  8. return Mono.just(AuthenticationResponse.success());
  9. } else {
  10. return Mono.just(AuthenticationResponse.error(400, "密码错误"));
  11. }
  12. });
  13. }

消息编解码器

用于将平台统一的消息(Message)设备端能处理的消息(EncodedMessage)进行相互转换.

接口(DeviceMessageCodec)定义:

  1. //此编解码器支持的网络协议,如: DefaultTransport.MQTT
  2. Transport getSupportTransport();
  3. //将平台发往设备的消息编码为设备端对消息
  4. Publisher<? extends EncodedMessage> encode(MessageEncodeContext context);
  5. //将设备发往平台的消息解码为平台统一的消息
  6. Publisher<? extends Message> decode(MessageDecodeContext context);

编码: 可以从上下文MessageEncodeContext中获取当前设备操作接口DeviceOperator以及平台统一的设备消息Message.根据设备侧定义的协议转换为对应的EncodedMessage.

注意

不同的网络协议需要转换为不同的EncodedMessage类型.比如,MQTT需要转换为MqttMessage.

大部分情况下:MessageDecodeContext可转为FromDeviceMessageContext,可获取到当前设备的连接会话DeviceSession,通过会话可以直接发送消息到设备.

解码: 可以从上下文MessageDecodeContext中获取设备操作接口DeviceOperator以及设备消息EncodedMessage,然后将消息转换为平台统一的消息.

平台统一消息定义

平台将设备抽象为由属性(property),功能(function),事件(event)组成. 平台接入设备之前,应该先将设备的属性``功能``事件设计好.

消息组成

消息主要由deviceId,messageId,headers组成.

deviceId为设备的唯一标识,messageId为消息的唯一标识,headers为消息头,通常用于对自定义消息处理的行为,如是否异步消息, 是否分片消息等.

常用的(Headers)[https://github.com/jetlinks/jetlinks-core/blob/master/src/main/java/org/jetlinks/core/message/Headers.java\]:

  1. async 是否异步,boolean类型.
  2. timeout 指定超时时间. 毫秒.
  3. frag_msg_id 分片主消息ID,为下发消息的messageId
  4. frag_num 分片总数
  5. frag_part 当前分片索引
  6. frag_last 是否为最后一个分片,当无法确定分片数量的时候,可以将分片设置到足够大,最后一个分片设置:frag_last=true来完成返回.
  7. keepOnline 与DeviceOnlineMessage配合使用,在TCP短链接,保持设备一直在线状态,连接断开不会设置设备离线.

TIP

messageId通常由平台自动生成,如果设备不支持消息id,可在自定义协议中通过Map的方式来做映射,将设备返回的消息与平台的messageId进行绑定.

属性相关消息

  1. 获取设备属性(ReadPropertyMessage)对应设备回复的消息ReadPropertyMessageReply.
  2. 修改设备属性(WritePropertyMessage)对应设备回复的消息WritePropertyMessageReply.
  3. 设备上报属性(ReportPropertyMessage) 由设备上报.

注意

设备回复的消息是通过messageId进行绑定,messageId应该注意要全局唯一,如果设备无法做到,可以在编解码时通过添加前缀等方式实现.

消息定义:

  1. ReadPropertyMessage{
  2. String deviceId;
  3. String messageId;
  4. List<String> properties;//可读取多个属性
  5. }
  6. ReadPropertyMessageReply{
  7. String deviceId;
  8. String messageId;
  9. long timestamp;
  10. boolean success;
  11. Map<String,Object> properties;//属性键值对
  12. }
  1. WritePropertyMessage{
  2. String deviceId;
  3. String messageId;
  4. Map<String,Object> properties;
  5. }
  6. WritePropertyMessageReply{
  7. String deviceId;
  8. String messageId;
  9. long timestamp;
  10. boolean success;
  11. Map<String,Object> properties; //回复被修改的属性最新值
  12. }
  1. ReportPropertyMessage{
  2. String deviceId;
  3. String messageId;
  4. long timestamp;
  5. Map<String,Object> properties;
  6. }

功能相关消息

调用设备功能到消息(FunctionInvokeMessage)由平台发往设备,对应到返回消息FunctionInvokeMessageReply.

消息定义:

  1. FunctionInvokeMessage{
  2. String functionId;//功能标识,在元数据中定义.
  3. String deviceId;
  4. String messageId;
  5. List<FunctionParameter> inputs;//输入参数
  6. }
  7. FunctionParameter{
  8. String name;
  9. Object value;
  10. }
  11. FunctionInvokeMessageReply{
  12. String deviceId;
  13. String messageId;
  14. long timestamp;
  15. boolean success;
  16. Object output; //输出值,需要与元数据定义中的类型一致
  17. }

事件消息

事件消息EventMessage由设备端发往平台.

消息定义:

  1. EventMessage{
  2. String event; //事件标识,在元数据中定义
  3. Object data; //与元数据中定义的类型一致,如果是对象类型,请转为java.util.HashMap,禁止使用自定义类型.
  4. long timestamp;
  5. }

其他消息

  1. DeviceOnlineMessage 设备上线消息,通常用于网关代理的子设备的上线操作.
  2. DeviceOfflineMessage 设备上线消息,通常用于网关代理的子设备的下线操作.
  3. ChildrenDeviceMessage 子设备消息,通常用于网关代理的子设备的消息.
  4. ChildrenDeviceMessageReply 子设备消息回复,用于平台向网关代理的子设备发送消息后设备回复给平台的结果.

消息定义:

  1. DeviceOnlineMessage{
  2. String deviceId;
  3. long timestamp;
  4. }
  5. DeviceOfflineMessage{
  6. String deviceId;
  7. long timestamp;
  8. }
  1. ChildDeviceMessage{
  2. String deviceId;
  3. String childDeviceId;
  4. Message childDeviceMessage; //子设备消息
  5. }

父子设备消息处理请看这里

消息发送拦截器

使用拦截器可以拦截消息发送和返回的动作,通过修改参数等操作实现自定义逻辑,如: 当设备离线时,将消息缓存到设备配置中,等设备上线时再重发.

  1. DeviceMessageSenderInterceptor{
  2. //发送前
  3. Mono<DeviceMessage> preSend(DeviceOperator device, DeviceMessage message);
  4. //发送后
  5. <R extends DeviceMessage> Flux<R> afterSent(DeviceOperator device, DeviceMessage message, Flux<R> reply);
  6. }

在发送前,可以将参数DeviceMessage转为其他消息.

发送后,会将返回结果流Flux<R>传入,通过对该数据流对操作以实现自定义行为,如:忽略错误等.

配置元数据

配置元数据用于告诉平台,在使用此协议的时候,需要添加一些自定义配置到设备配置(DeviceOperator.setConfig)中. 在其他地方可以通过DeviceOperator.getConfig获取这些配置.

例如:

  1. CompositeProtocolSupport support = new CompositeProtocolSupport();
  2. support.setId("demo-v1");
  3. support.setName("演示协议v1");
  4. support.setDescription("演示协议");
  5. support.setMetadataCodec(new JetLinksDeviceMetadataCodec()); //固定为JetLinksDeviceMetadataCodec,请勿修改.
  6. DefaultConfigMetadata mqttConfig = new DefaultConfigMetadata(
  7. "MQTT认证配置"
  8. , "")
  9. .add("username", "username", "MQTT用户名", new StringType())
  10. .add("password", "password", "MQTT密码", new PasswordType());
  11. //设置MQTT所需要到配置
  12. support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig);

完整例子

演示协议设备消息协议解析SDK - 图1