自定义协议开发

环境准备和开发工具

  • JDK:1.8+
  • MAVEN:3.1+

注意

maven不要使用全局仓库配置,可能导致依赖无法下载

  • 开发工具:idea

第一步 通过idea创建maven工程:demo-protocol

第二步 修改pom文件,添加依赖

  1. <dependencies>
  2. // jetlinks 核心依赖
  3. <dependency>
  4. <groupId>org.jetlinks</groupId>
  5. <artifactId>jetlinks-core</artifactId>
  6. <version>1.0.2-BUILD-SNAPSHOT</version>
  7. </dependency>
  8. // jetlinks 协议解析接口包
  9. <dependency>
  10. <groupId>org.jetlinks</groupId>
  11. <artifactId>jetlinks-supports</artifactId>
  12. <version>1.0.2-BUILD-SNAPSHOT</version>
  13. </dependency>
  14. // lombok,需要idea安装lombok插件,否则去掉
  15. <dependency>
  16. <groupId>org.projectlombok</groupId>
  17. <artifactId>lombok</artifactId>
  18. <version>1.18.10</version>
  19. </dependency>
  20. // vertx核心包,可以用来进行网络模拟测试
  21. <dependency>
  22. <groupId>io.vertx</groupId>
  23. <artifactId>vertx-core</artifactId>
  24. <version>3.8.3</version>
  25. <scope>test</scope>
  26. </dependency>
  27. // 单元测试包
  28. <dependency>
  29. <groupId>org.junit.jupiter</groupId>
  30. <artifactId>junit-jupiter</artifactId>
  31. <version>5.5.2</version>
  32. <scope>test</scope>
  33. </dependency>
  34. // logback日志
  35. <dependency>
  36. <groupId>ch.qos.logback</groupId>
  37. <artifactId>logback-classic</artifactId>
  38. <version>1.2.3</version>
  39. </dependency>
  40. </dependencies>
  41. // netty组件
  42. <dependencyManagement>
  43. <dependencies>
  44. <dependency>
  45. <groupId>io.netty</groupId>
  46. <artifactId>netty-bom</artifactId>
  47. <version>${netty.version}</version>
  48. <type>pom</type>
  49. <scope>import</scope>
  50. </dependency>
  51. </dependencies>
  52. </dependencyManagement>
  • 修改maven pom文件properties
  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <project.build.locales>zh_CN</project.build.locales>
  4. <java.version>1.8</java.version>
  5. <project.build.jdk>${java.version}</project.build.jdk>
  6. <netty.version>4.1.45.Final</netty.version>
  7. </properties>
  • 添加maven编译规则
  1. <build>
  2. <plugins>
  3. <plugin>
  4. <groupId>org.apache.maven.plugins</groupId>
  5. <artifactId>maven-compiler-plugin</artifactId>
  6. <version>3.1</version>
  7. <configuration>
  8. <source>${project.build.jdk}</source>
  9. <target>${project.build.jdk}</target>
  10. <encoding>${project.build.sourceEncoding}</encoding>
  11. </configuration>
  12. </plugin>
  13. </plugins>
  14. </build>
  • 添加hsweb私服和阿里云仓库
  1. <repositories>
  2. <repository>
  3. <id>hsweb-nexus</id>
  4. <name>Nexus Release Repository</name>
  5. <url>http://nexus.hsweb.me/content/groups/public/</url>
  6. <snapshots>
  7. <enabled>true</enabled>
  8. <updatePolicy>always</updatePolicy>
  9. </snapshots>
  10. </repository>
  11. <repository>
  12. <id>aliyun-nexus</id>
  13. <name>aliyun</name>
  14. <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  15. </repository>
  16. </repositories>

第三步 协议开发

  • 新建packag:org.jetlinks.demo.protocol

  • 创建协议编码解码类:DemoDeviceMessageCodec

  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.buffer.ByteBufUtil;
  3. import io.netty.buffer.Unpooled;
  4. import lombok.AllArgsConstructor;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.commons.codec.DecoderException;
  7. import org.apache.commons.codec.binary.Hex;
  8. import org.jetlinks.core.message.DeviceMessage;
  9. import org.jetlinks.core.message.DeviceOnlineMessage;
  10. import org.jetlinks.core.message.Message;
  11. import org.jetlinks.core.message.codec.*;
  12. import org.jetlinks.core.message.function.FunctionInvokeMessage;
  13. import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
  14. import org.jetlinks.core.server.session.DeviceSession;
  15. import org.jetlinks.wt.protocol.message.NbIotMessage;
  16. import org.jetlinks.wt.protocol.message.data.enums.DataIdEnum;
  17. import org.jetlinks.wt.protocol.message.enums.ControlEnum;
  18. import org.reactivestreams.Publisher;
  19. import reactor.core.publisher.Flux;
  20. import reactor.core.publisher.Mono;
  21. @AllArgsConstructor
  22. @Slf4j
  23. public class DemoDeviceMessageCodec implements DeviceMessageCodec {
  24. // 传输协议定义
  25. @Override
  26. public Transport getSupportTransport() {
  27. return DefaultTransport.TCP;
  28. }
  29. // 把tcp消息解码为平台消息,多用于设备上报消息到平台
  30. @Override
  31. public Mono<? extends Message> decode(MessageDecodeContext context) {
  32. return Mono.empty();
  33. }
  34. // 把平台消息编码为协议传输消息,多用于平台命令下发到设备
  35. @Override
  36. public Publisher<? extends EncodedMessage> encode(MessageEncodeContext context) {
  37. retrun Mono.empty();
  38. }
  39. }
  • 创建协议入口类: DemoProtocolSupportProvider
  1. import org.jetlinks.core.ProtocolSupport;
  2. import org.jetlinks.core.Value;
  3. import org.jetlinks.core.defaults.CompositeProtocolSupport;
  4. import org.jetlinks.core.device.AuthenticationResponse;
  5. import org.jetlinks.core.device.DeviceRegistry;
  6. import org.jetlinks.core.device.MqttAuthenticationRequest;
  7. import org.jetlinks.core.message.codec.DefaultTransport;
  8. import org.jetlinks.core.metadata.DefaultConfigMetadata;
  9. import org.jetlinks.core.metadata.types.PasswordType;
  10. import org.jetlinks.core.metadata.types.StringType;
  11. import org.jetlinks.core.spi.ProtocolSupportProvider;
  12. import org.jetlinks.core.spi.ServiceContext;
  13. import org.jetlinks.demo.protocol.tcp.DemoTcpMessageCodec;
  14. import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
  15. import reactor.core.publisher.Mono;
  16. public class DemoProtocolSupportProvider implements ProtocolSupportProvider {
  17. @Override
  18. public Mono<? extends ProtocolSupport> create(ServiceContext context) {
  19. CompositeProtocolSupport support = new CompositeProtocolSupport();
  20. // 协议ID
  21. support.setId("demo-v1");
  22. // 协议名称
  23. support.setName("演示协议v1");
  24. // 协议说明
  25. support.setDescription("演示协议");
  26. // 物模型编解码,固定为JetLinksDeviceMetadataCodec
  27. support.setMetadataCodec(new JetLinksDeviceMetadataCodec());
  28. //TCP消息编解码器
  29. DemoDeviceMessageCodec codec = new DemoDeviceMessageCodec();
  30. // 两个参数,协议支持和编解码类DemoDeviceMessageCodec中保持一致,第二个参数定义使用的编码解码类
  31. support.addMessageCodecSupport(DefaultTransport.TCP, () -> Mono.just(codec));
  32. return Mono.just(support);
  33. }
  34. }

第四步 设备上报消息解码

  1. @AllArgsConstructor
  2. @Slf4j
  3. public class DemoTcpMessageCodec implements DeviceMessageCodec {
  4. ....
  5. // 把tcp消息解码为平台消息,多用于设备上报消息到平台
  6. @Override
  7. public Publisher<? extends Message> decode(MessageDecodeContext context) {
  8. return Flux.defer(() -> {
  9. // 消息上下文
  10. FromDeviceMessageContext ctx = ((FromDeviceMessageContext) context);
  11. // 从上下文中获取消息字节数组
  12. ByteBuf byteBuf = context.getMessage().getPayload();
  13. byte[] payload = ByteBufUtil.getBytes(byteBuf, 0, byteBuf.readableBytes(), false);
  14. // 把字节流转换为字符串,根据不同设备不同协议进行解析,
  15. String text=new String(payload);
  16. ReportPropertyMessage message = new ReportPropertyMessage();
  17. // 设置消息ID为我们获得的消息内容
  18. message.setDeviceId(text);
  19. // 以当前时间戳为消息时间
  20. long time= LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
  21. message.setTimestamp(time);
  22. // 构造上报属性
  23. Map<String, Object> properties = new HashMap<>();
  24. properties.put("text",text);
  25. // 设置上报属性
  26. message.setProperties(properties);
  27. // 获取设备会话信息
  28. DeviceSession session = ctx.getSession();
  29. // 如果session中没有设备信息,则为设备首次上线
  30. if (session.getOperator() == null) {
  31. DeviceOnlineMessage onlineMessage = new DeviceOnlineMessage();
  32. onlineMessage.setDeviceId(text);
  33. onlineMessage.setTimestamp(System.currentTimeMillis());
  34. // 返回到平台上线消息
  35. return Flux.just(message,onlineMessage);
  36. }
  37. // 返回到平台属性上报消息
  38. return Mono.just(message);
  39. });
  40. }
  41. .....
  42. }

第五步 平台发送消息到设备(消息编码)

  1. @AllArgsConstructor
  2. @Slf4j
  3. public class DemoTcpMessageCodec implements DeviceMessageCodec {
  4. ..........
  5. // 把平台消息编码为协议传输消息,多用于平台命令下发到设备
  6. @Override
  7. public Publisher<? extends EncodedMessage> encode(MessageEncodeContext context) {
  8. // 从平台消息上下文中获取消息内容
  9. CommonDeviceMessage message = (CommonDeviceMessage) context.getMessage();
  10. EncodedMessage encodedMessage = EncodedMessage.simple(Unpooled.wrappedBuffer(message.toString().getBytes()));
  11. // 根据消息类型的不同,构造不同的消息
  12. if (message instanceof ReadPropertyMessage) {
  13. ReadPropertyMessage readPropertyMessage = (ReadPropertyMessage) message;
  14. // 获取需要传输的字节
  15. byte[] bytes = readPropertyMessage.toString().getBytes();
  16. // 构造为平台传输到设备的消息体
  17. encodedMessage = EncodedMessage.simple(Unpooled.wrappedBuffer(bytes));
  18. }
  19. return Mono.just(encodedMessage);
  20. }
  21. }

第六步 打成jar包上传到平台,并进行调试

TCP服务网关接入设备为例。

进行协议上传和 配置,并使用TCP工具进行调试。