链上信使协议

介绍

链上信使协议AMOP(Advanced Messages Onchain Protocol)系统旨在为联盟链提供一个安全高效的消息信道,联盟链中的各个机构,只要部署了区块链节点,无论是共识节点还是观察节点,均可使用AMOP进行通讯,AMOP有如下优势:

  • 实时:AMOP消息不依赖区块链交易和共识,消息在节点间实时传输,延时在毫秒级。
  • 可靠:AMOP消息传输时,自动寻找区块链网络中所有可行的链路进行通讯,只要收发双方至少有一个链路可用,消息就保证可达。
  • 高效:AMOP消息结构简洁、处理逻辑高效,仅需少量cpu占用,能充分利用网络带宽。
  • 安全:AMOP的所有通讯链路使用SSL加密,加密算法可配置。
  • 易用:使用AMOP时,无需在SDK做任何额外配置。

逻辑架构

../../_images/AMOP.jpg以银行典型IDC架构为例,各区域概述:

  • SF区:机构内部的业务服务区,此区域内的业务子系统使用区块链SDK,如无DMZ区,配置SDK连接到区块链节点,反之配置SDK连接到DMZ区的区块链前置。
  • DMZ区:机构内部的外网隔离区,非必须,如有,该区域部署区块链前置。
  • 区块链P2P网络:此区域部署各机构的区块链节点,此区域为逻辑区域,区块链节点也可部署在机构内部。

配置

AMOP无需任何额外配置,以下为Web3SDK的配置案例SDK配置(Spring Bean):

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
  4. xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
  5. xmlns:context="http://www.springframework.org/schema/context"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
  8. http://www.springframework.org/schema/tx
  9. http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
  10. http://www.springframework.org/schema/aop
  11. http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">
  12.  
  13. <!-- AMOP消息处理线程池配置,根据实际需要配置 -->
  14. <bean id="pool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  15. <property name="corePoolSize" value="50" />
  16. <property name="maxPoolSize" value="100" />
  17. <property name="queueCapacity" value="500" />
  18. <property name="keepAliveSeconds" value="60" />
  19. <property name="rejectedExecutionHandler">
  20. <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
  21. </property>
  22. </bean>
  23.  
  24. <!-- 群组信息配置 -->
  25. <bean id="groupChannelConnectionsConfig" class="org.fisco.bcos.channel.handler.GroupChannelConnectionsConfig">
  26. <property name="allChannelConnections">
  27. <list>
  28. <bean id="group1" class="org.fisco.bcos.channel.handler.ChannelConnections">
  29. <property name="groupId" value="1" />
  30. <property name="connectionsStr">
  31. <list>
  32. <value>127.0.0.1:20200</value> <!-- 格式:IP:端口 -->
  33. <value>127.0.0.1:20201</value>
  34. </list>
  35. </property>
  36. </bean>
  37. </list>
  38. </property>
  39. </bean>
  40.  
  41. <!-- 区块链节点信息配置 -->
  42. <bean id="channelService" class="org.fisco.bcos.channel.client.Service" depends-on="groupChannelConnectionsConfig">
  43. <property name="groupId" value="1" />
  44. <property name="orgID" value="fisco" />
  45. <property name="allChannelConnections" ref="groupChannelConnectionsConfig"></property>
  46. </bean>

区块链前置配置,如有DMZ区:

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
  4. xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
  5. xmlns:context="http://www.springframework.org/schema/context"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
  8. http://www.springframework.org/schema/tx
  9. http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
  10. http://www.springframework.org/schema/aop
  11. http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">
  12.  
  13. <!-- 区块链节点信息配置 -->
  14. <bean id="proxyServer" class="org.fisco.bcos.channel.proxy.Server">
  15. <property name="remoteConnections">
  16. <bean class="org.fisco.bcos.channel.handler.ChannelConnections">
  17. <property name="connectionsStr">
  18. <list>
  19. <value>127.0.0.1:5051</value><!-- 格式:IP:端口 -->
  20. </list>
  21. </property>
  22. </bean>
  23. </property>
  24.  
  25. <property name="localConnections">
  26. <bean class="org.fisco.bcos.channel.handler.ChannelConnections">
  27. </bean>
  28. </property>
  29. <!-- 区块链前置监听端口配置,区块链SDK连接用 -->
  30. <property name="bindPort" value="30333"/>
  31. </bean>
  32. </beans>

SDK使用

AMOP的消息收发基于topic(主题)机制,服务端首先设置一个topic,客户端往该topic发送消息,服务端即可收到。

AMOP支持在同一个区块链网络中有多个topic收发消息,topic支持任意数量的服务端和客户端,当有多个服务端关注同一个topic时,该topic的消息将随机下发到其中一个可用的服务端。

服务端代码案例:

  1. package org.fisco.bcos.channel.test.amop;
  2.  
  3. import org.fisco.bcos.channel.client.Service;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.context.ApplicationContext;
  7. import org.springframework.context.support.ClassPathXmlApplicationContext;
  8.  
  9. import java.util.HashSet;
  10. import java.util.Set;
  11.  
  12. public class Channel2Server {
  13. static Logger logger = LoggerFactory.getLogger(Channel2Server.class);
  14.  
  15. public static void main(String[] args) throws Exception {
  16. if (args.length < 1) {
  17. System.out.println("Param: topic");
  18. return;
  19. }
  20.  
  21. String topic = args[0];
  22.  
  23. logger.debug("init Server");
  24.  
  25. ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
  26. Service service = context.getBean(Service.class);
  27.  
  28. // 设置topic,支持多个topic
  29. Set<String> topics = new HashSet<String>();
  30. topics.add(topic);
  31. service.setTopics(topics);
  32.  
  33. // 处理消息的PushCallback类,参见Callback代码
  34. PushCallback cb = new PushCallback();
  35. service.setPushCallback(cb);
  36.  
  37. System.out.println("3s...");
  38. Thread.sleep(1000);
  39. System.out.println("2s...");
  40. Thread.sleep(1000);
  41. System.out.println("1s...");
  42. Thread.sleep(1000);
  43.  
  44. System.out.println("start test");
  45. System.out.println("===================================================================");
  46.  
  47. // 启动服务
  48. service.run();
  49. }
  50. }

服务端的PushCallback类案例:

  1. package org.fisco.bcos.channel.test.amop;
  2.  
  3. import java.time.LocalDateTime;
  4. import java.time.format.DateTimeFormatter;
  5.  
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8.  
  9. import org.fisco.bcos.channel.client.ChannelPushCallback;
  10. import org.fisco.bcos.channel.dto.ChannelPush;
  11. import org.fisco.bcos.channel.dto.ChannelResponse;
  12.  
  13. class PushCallback extends ChannelPushCallback {
  14. static Logger logger = LoggerFactory.getLogger(PushCallback2.class);
  15.  
  16. // onPush方法,在收到AMOP消息时被调用
  17. @Override
  18. public void onPush(ChannelPush push) {
  19. DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  20. logger.debug("push:" + push.getContent());
  21.  
  22. System.out.println(df.format(LocalDateTime.now()) + "server:push:" + push.getContent());
  23.  
  24. // 回包消息
  25. ChannelResponse response = new ChannelResponse();
  26. response.setContent("receive request seq:" + String.valueOf(push.getMessageID()));
  27. response.setErrorCode(0);
  28.  
  29. push.sendResponse(response);
  30. }
  31. }

客户端案例:

  1. package org.fisco.bcos.channel.test.amop;
  2.  
  3. import java.time.LocalDateTime;
  4. import java.time.format.DateTimeFormatter;
  5.  
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.context.ApplicationContext;
  9. import org.springframework.context.support.ClassPathXmlApplicationContext;
  10.  
  11. import org.fisco.bcos.channel.client.Service;
  12. import org.fisco.bcos.channel.dto.ChannelRequest;
  13. import org.fisco.bcos.channel.dto.ChannelResponse;
  14.  
  15. public class Channel2Client {
  16. static Logger logger = LoggerFactory.getLogger(Channel2Client.class);
  17.  
  18. public static void main(String[] args) throws Exception {
  19. if(args.length < 2) {
  20. System.out.println("param: target topic total number of request");
  21. return;
  22. }
  23.  
  24. String topic = args[0];
  25. Integer count = Integer.parseInt(args[1]);
  26. DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  27.  
  28. ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
  29.  
  30. Service service = context.getBean(Service.class);
  31. service.run();
  32.  
  33. System.out.println("3s ...");
  34. Thread.sleep(1000);
  35. System.out.println("2s ...");
  36. Thread.sleep(1000);
  37. System.out.println("1s ...");
  38. Thread.sleep(1000);
  39.  
  40. System.out.println("start test");
  41. System.out.println("===================================================================");
  42. for (Integer i = 0; i < count; ++i) {
  43. Thread.sleep(2000); // 建立连接需要一点时间,如果立即发送消息会失败
  44.  
  45. ChannelRequest request = new ChannelRequest();
  46. request.setToTopic(topic); // 设置消息topic
  47. request.setMessageID(service.newSeq()); // 消息序列号,唯一标识某条消息,可用newSeq()随机生成
  48. request.setTimeout(5000); // 消息的超时时间
  49.  
  50. request.setContent("request seq:" + request.getMessageID()); // 发送的消息内容
  51. System.out.println(df.format(LocalDateTime.now()) + " request seq:" + String.valueOf(request.getMessageID())
  52. + ", Content:" + request.getContent());
  53.  
  54. ChannelResponse response = service.sendChannelMessage2(request); // 发送消息
  55.  
  56. System.out.println(df.format(LocalDateTime.now()) + "response seq:" + String.valueOf(response.getMessageID())
  57. + ", ErrorCode:" + response.getErrorCode() + ", Content:" + response.getContent());
  58. }
  59. }
  60. }

测试

按上述说明配置好后,用户指定一个主题:topic,执行以下两个命令可以进行测试。

启动amop服务端:

  1. java -cp 'conf/:apps/*:lib/*' org.fisco.bcos.channel.test.amop.Channel2Server [topic]

启动amop客户端:

  1. java -cp 'conf/:apps/*:lib/*' org.fisco.bcos.channel.test.amop.Channel2Client [topic] [消息条数]

错误码

  • 99:发送消息失败,AMOP经由所有链路的尝试后,消息未能发到服务端,建议使用发送时生成的seq,检查链路上各个节点的处理情况。
  • 102:消息超时,建议检查服务端是否正确处理了消息,带宽是否足够。