平台从第三方或者设备主动拉取数据

场景:设备无法推送数据到平台,需要平台主动去调用第三方平台接口或者去调用设备的tcp服务等。

流程

  1. 通过实现自定义协议的DeviceStateChecker来自定义处理设备状态获取逻辑,比如通过调用第三方平台获取设备信息.
  2. 通过实现自定义协议的DeviceMessageSenderInterceptor.afterSent来拦截消息发送,替换掉默认处理方式.在这里使用WebClient或者Vertx请求第三方或者设备.
  3. 请求后解析数据为对应的消息,调用DecodedClientMessageHandler.handleMessage(device,message)完成默认消息处理之后,返回消息.

例子一,通过http到第三方平台获取数据.

第一步定义消息编码解码器

  1. public class HttpMessageCodec implements DeviceMessageCodec {
  2. // 定义一个通用的响应,用于收到请求后响应
  3. private static final SimpleHttpResponseMessage response = SimpleHttpResponseMessage
  4. .builder()
  5. .payload(Unpooled.wrappedBuffer("{success:true}".getBytes()))
  6. .contentType(MediaType.APPLICATION_JSON)
  7. .status(200)
  8. .build();
  9. @Override
  10. public Transport getSupportTransport() {
  11. return DefaultTransport.HTTP;
  12. }
  13. @Nonnull
  14. @Override
  15. public Publisher<? extends Message> decode(@Nonnull MessageDecodeContext context){
  16. // 这里用于别的平台请求/通知jetlinks的请求处理
  17. // 把消息转换为http消息
  18. HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage();
  19. String url = message.getUrl();
  20. // 这里通常需要判断是不是自己需要的请求,如果不是直接返回/响应,防止非法请求
  21. if (!url.endsWith("/eventRcv")) {
  22. return message.response(response).then(Mono.empty());
  23. }
  24. // 获取具体消息类型
  25. ByteBuf payload = message.getPayload();
  26. String string = payload.toString(StandardCharsets.UTF_8);
  27. // 通常来说,云平台通知的定义为事件消息(也可以定义成别的消息)
  28. EventMessage eventMessage = new EventMessage();
  29. eventMessage.setEvent("test");
  30. eventMessage.setDeviceId(string);
  31. eventMessage.setData(string);
  32. eventMessage.setMessageId(String.valueOf(System.currentTimeMillis()));
  33. eventMessage.setTimestamp(System.currentTimeMillis());
  34. return message.response(response).thenMany(Flux.just(eventMessage));
  35. }
  36. @Nonnull
  37. @Override
  38. public Publisher<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
  39. // 对接其他云平台,命令发起不在这里处理,所以这里返回空就可以了
  40. return Mono.empty();
  41. }
  42. }

第二步 定义一个消息拦截器

  1. @Slf4j
  2. @AllArgsConstructor
  3. @Getter
  4. @Setter
  5. public class HttpMessageSenderInterceptor implements DeviceMessageSenderInterceptor{
  6. // 通过构造器注入一个编码消息处理器,用于消息的持久化
  7. private DecodedClientMessageHandler handler;
  8. private static final WebClient webclient=WebClient.builder().build();
  9. /**
  10. * 在消息发送后触发.
  11. *
  12. * @param device 设备操作接口
  13. * @param message 源消息
  14. * @param reply 回复的消息
  15. * @param <R> 回复的消息类型
  16. * @return 新的回复结果
  17. */
  18. public <R extends DeviceMessage> Flux<R> afterSent(DeviceOperator device, DeviceMessage message, Flux<R> reply) {
  19. return Flux.from(
  20. // 从配置中获取url等各种请求所需参数
  21. device.getConfigs("url")
  22. .flatMap(values->{
  23. String url=values.getValue("url").map(Value::asString).orElse(null);
  24. // 通常发起请求都是通过方法调用
  25. FunctionInvokeMessage invokeMessage = (FunctionInvokeMessage) message;
  26. // 从命令发起的上下文中获取消息体
  27. List<FunctionParameter> inputs = invokeMessage.getInputs();
  28. Map< String, Object> body=iputs
  29. .stream()
  30. .collect(Collectors
  31. .toMap(FunctionParameter::getName, FunctionParameter::getValue));
  32. return webclient // 构造WebClient
  33. .post() // 指定请求类型
  34. .uri(url) // 请求路径
  35. .bodyValue(body) // 请求参数
  36. .retrieve() // 发起请求
  37. .bodyToMono(String.class) // 响应参数
  38. .flatMap(s -> {
  39. // 响应参数包装为功能回复参数
  40. FunctionInvokeMessageReply reply1 = new FunctionInvokeMessageReply();
  41. reply1.setSuccess(true);
  42. reply1.setMessage(s);
  43. reply1.setDeviceId(message.getDeviceId());
  44. reply1.setMessageId(message.getMessageId());
  45. reply1.setTimestamp(System.currentTimeMillis());
  46. reply1.setOutput(s);
  47. reply1.setFunctionId(((FunctionInvokeMessage) message).getFunctionId());
  48. return Mono.just(reply1)
  49. .map(deviceMessage->(R)deviceMessage);
  50. })
  51. // 消息持久化
  52. .flatMap(msg->handler.handleMessage(device,msg)
  53. .thenReturn(msg));
  54. })
  55. );
  56. }
  57. }

第三步 定义一个设备状态检测器

  1. /**
  2. * 这个接口会在进入设备详情页面和刷新设备状态时调用
  3. */
  4. @Slf4j
  5. public class HttpDeviceStateChecker implements DeviceStateChecker {
  6. @Override
  7. public @NotNull Mono<Byte> checkState(@NotNull DeviceOperator device) {
  8. // 如果第三方平台有提供设备状态查询接口,则调用接口确定设备状态,否则设置为设备在线,方便发起功能或者属性查询
  9. return Mono.just(DeviceState.online);
  10. }
  11. }

第四步 定义协议处理器

  1. public class HttpProtocolSupportProvider implements ProtocolSupportProvider{
  2. private static final DefaultConfigMetadata httpRequest = new DefaultConfigMetadata(
  3. "Http请求配置"
  4. , "")
  5. .add("url", "url", " http请求地址", new StringType());
  6. @Override
  7. public Mono<? extends ProtocolSupport> create(ServiceContext serviceContext){
  8. CompositeProtocolSupport support = new CompositeProtocolSupport();
  9. support.setId("http-demo-v1");
  10. support.setName("http调用第三方接口DEMO");
  11. support.setDescription("http调用第三方接口DEMO");
  12. support.setMetadataCodec(new JetLinksDeviceMetadataCodec());
  13. // 设置一个编解码入口
  14. HttpMessageCodec codec = new HttpMessageCodec();
  15. support.addMessageCodecSupport(DefaultTransport.HTTP, () -> Mono.just(codec));
  16. // 添加配置项定义
  17. support.addConfigMetadata(DefaultTransport.HTTP, httpRequest);
  18. HttpDeviceStateChecker httpDeviceStateChecker = new HttpDeviceStateChecker();
  19. // 设置设备状态检查接口
  20. support.setDeviceStateChecker(httpDeviceStateChecker);
  21. // 设置HTTP消息拦截器,用于发送HTTP消息
  22. serviceContext.getService(DecodedClientMessageHandler.class)
  23. .ifPresent(handler -> support.addMessageSenderInterceptor(new HttpMessageSenderInterceptor(handler)));
  24. return Mono.just(support);
  25. }
  26. }

例子二,通过tcp短链接从设备拉取数据.

TODO