核心接口与状态机

业界有句话叫“一流的卖标准、二流的卖技术、三流的卖产品”,如果说 smart-socket 的技术价值仅算二流水准的话,那么我们为其精心设计的接口期望能稍微提升一下它的档次。基于 smart-socket 进行通信开发的主要工作量基本集中在两个接口的实现:ProtocolMessageProcessor

2.1 核心接口与状态机 - 图1

两个接口分工很明确,Protocol 负责解析数据形成消息实体,之后 smart-socket 会把该消息实体传递至 MessageProcessor 进行业务处理。

当然,你也可以在 Protocol 中一次性完成解析、业务处理,又或者将 Protocol 当个摆设,所有事情集中在 MessageProcessor 完成。smart-socket 不限制你实现功能的自由性,只是提供一个更规范、更合理的建议,最终决定权还是在用户的手中。

1. Protocol

  1. public interface Protocol<T> {
  2. T decode(final ByteBuffer readBuffer, AioSession<T> session);
  3. }

Protocol 是一个泛型接口,<T>指的是业务消息实体类,smart-socket 中不少地方都运用了泛型设计,其含义都代表数据解码后封装的消息类型。Protocol 中只定义了一个方法decode

decode(消息解码),AIO 的数据传输是以 ByteBuffer 为媒介的。所有读取到的字节数据都会填充在 ByteBuffer 中并以事件回调的形式触发 Protocol#decode() 方法。所以我们实现的 decode 算法就是 ByteBuffer 对象转化为业务消息<T>的过程。

需要强调一点,读者朋友请不要把解码想的很简单,令人“深恶痛绝”的半包/粘包就是在这个环节需要应对的。处理方式也不难,遵守以下两点即可

  • 根据协议解析每一个字段前,都要先确保剩余数据满足解析所需,不满足则终止该字段解析并返回 null。
  • 当已读的数据已满足一个完整业务消息所需时,立即构造该业务对象并返回,无需关心 ByteBuffer 中是否有剩余的数据。考虑到有些读者对上述两点还不甚理解,我们通过两个示例来模拟通信过程中的半包、粘包场景。通信协议依旧是如1.1节中的定义的类型:

2.1.2_1

  • 半包
  1. public class StringProtocol implements Protocol<String> {
  2. public static void main(String[] args) {
  3. StringProtocol protocol = new StringProtocol();
  4. byte[] msgBody = "smart-socket".getBytes();
  5. byte msgHead = (byte) msgBody.length;
  6. System.out.println("完整消息长度:" + (msgBody.length + 1));
  7. ByteBuffer buffer = ByteBuffer.allocate(msgBody.length);
  8. buffer.put(msgHead);
  9. buffer.put(msgBody, 0, buffer.remaining());
  10. buffer.flip();
  11. System.out.println(protocol.decode(buffer, null));
  12. }
  13. public String decode(ByteBuffer buffer, AioSession<String> session) {
  14. buffer.mark();
  15. byte length = buffer.get();
  16. if (buffer.remaining() < length) {
  17. System.out.println("半包:期望长度:" + length + " ,实际剩余长度:" + buffer.remaining());
  18. buffer.reset();
  19. return null;
  20. }
  21. byte[] body = new byte[length];
  22. buffer.get(body);
  23. buffer.mark();
  24. return new String(body);
  25. }
  26. }

根据协议规定,完整的消息长度是字符串“smart-socket”字节数加一个字节的消息头,即13位。但因接收数据的 ByteBuffer 空间不足导致无法容纳整个消息,此时执行解码算法decode便等同于通信中的半包,运行后控制台打印如下:

  1. 完整消息长度:13
  2. 半包:期望长度:12 ,实际剩余长度:11
  3. null
  • 粘包
  1. public class StringProtocol implements Protocol<String> {
  2. public static void main(String[] args) {
  3. StringProtocol protocol = new StringProtocol();
  4. byte[] msgBody = "smart-socket".getBytes();
  5. byte msgHead = (byte) msgBody.length;
  6. System.out.println("完整消息长度:" + (msgBody.length + 1));
  7. ByteBuffer buffer = ByteBuffer.allocate((msgBody.length + 1) * 2);
  8. //第一个消息
  9. buffer.put(msgHead);
  10. buffer.put(msgBody);
  11. //第二个消息
  12. buffer.put(msgHead);
  13. buffer.put(msgBody);
  14. buffer.flip();
  15. String str = null;
  16. while ((str = protocol.decode(buffer, null)) != null) {
  17. System.out.println("消息解码成功:"+str);
  18. }
  19. }
  20. public String decode(ByteBuffer buffer, AioSession<String> session) {
  21. if (!buffer.hasRemaining()) {
  22. return null;
  23. }
  24. buffer.mark();
  25. byte length = buffer.get();
  26. if (buffer.remaining() < length) {
  27. System.out.println("半包:期望长度:" + length + " ,实际剩余长度:" + buffer.remaining());
  28. buffer.reset();
  29. return null;
  30. }
  31. byte[] body = new byte[length];
  32. buffer.get(body);
  33. buffer.mark();
  34. return new String(body);
  35. }
  36. }

粘包出现于已读数据的部分超过了一个完整的消息长度。如 demo 所示,我们在 ByteBuffer 中放入了符合协议贵的两个完整消息,按照解码算法解析出第一个消息里立即返回new String(body),待该消息处理完成后再进行下一次解码。故上述例子的控制台打印如下:

  1. 完整消息长度:13
  2. 消息解码成功:smart-socket
  3. 消息解码成功:smart-socket

至此我们已经为大家介绍了 Protocol 的特性以及对于半包粘包的处理方式,当然真实场景下我们会面临更复杂的协议,对于半包粘包的处理方式也是多种多样,在通信协议章节在详细说明。

2. MessageProcessor

  1. public interface MessageProcessor<T> {
  2. /**
  3. * 处理接收到的消息
  4. *
  5. * @param session 通信会话
  6. * @param msg 待处理的业务消息
  7. */
  8. void process(AioSession<T> session, T msg);
  9. /**
  10. * 状态机事件,当枚举事件发生时由框架触发该方法
  11. *
  12. *
  13. * @param session 本次触发状态机的AioSession对象
  14. * @param stateMachineEnum 状态枚举
  15. * @param throwable 异常对象,如果存在的话
  16. * @see StateMachineEnum
  17. */
  18. void stateEvent(AioSession<T> session, StateMachineEnum stateMachineEnum, Throwable throwable);
  19. }

Protocol 侧重于通信层的数据解析,而 MessageProcessor 则负责应用层的消息业务处理。定义了消息处理器接口,smart-socket 在通过 Protocol 完成消息解码后,会将消息对象交由 MessageProcessor 实现类进行业务处理。

  • process消息处理器,smart-socket 每接收到一个完整的业务消息,都会交由该处理器执行。
  • stateEvent执行状态机,smart-socket 内置了状态枚举StateMachineEnumMessageProcessor实现类可在此方法中处理其关注的事件。

3. 状态机StateMachineEnum

smart-socket中 引入了状态机的概念,状态机的存在不会决策 smart-socket 对于通信的事件处理,但会在特定事件发生之时通知消息处理器MessageProcessor#stateEvent。目前已有的状态枚举为:

状态枚举说明
NEW_SESSION网络连接建立时触发,连接建立时会构建传输层的AioSession,如果业务层面也需要维护一个会话,可在此状态机中处理
INPUT_SHUTDOWN数据读取完毕时触发,即传统意义中的read()==-1
INPUT_EXCEPTION读数据过程中发生异常时触发此状态机
OUTPUT_EXCEPTION写数据过程中发生异常时触发此状态机
SESSION_CLOSING触发了AioSession.close方法,但由于当前AioSession还有未完成的事件,会进入SESSION_CLOSING状态
SESSION_CLOSEDAioSesson完成close操作后触发此状态机
PROCESS_EXCEPTION业务处理异常
DECODE_EXCEPTION解码异常
REJECT_ACCEPT服务端拒绝客户端连接请求

状态机伴贯穿了通信服务的整个生命周期,在这个过程中不同事件的发生会触发不同的状态机。通信事件与状态机的关系如下图所示。

2.1 核心接口与状态机 - 图3

图2.2.2

状态机相对于整个通信环境的各个节点只是一个旁观者,它见证了各个事件的发生,却无力扭转事件的发展方向。状态机本质其实跟大家所认知的过滤器、拦截器有点类似,那为什么smart-socket要如此设计呢?想想一下如果我们按照过滤器的设计思路,其形态会如下所示:

  1. public interface Filter{
  2. void newSession(AioSesion session);
  3. void processException(AioSession session,Throwable throwable);
  4. void decodeExcepton(AioSession session,Throwable throwable);
  5. void inputException(AioSession session,Throwable throwable);
  6. void outputException(AioSession session,Throwable throwable);
  7. void sessionClosing(AioSession session);
  8. void sessionClosed(AioSession session);
  9. }

这样的设计存在以下缺陷:

  • 对实现类不友好;也许我只想处理 newSession,却不得不保留其余方法的空实现;
  • 无法平滑升级;加入新版本中加入新的事件类型,老版本代码需要全部更改;而采用状态机模式,不仅解决了上述问题,还为通信服务的多元化扩展带了便利。例如 IM 场景下,我们在 NEW_SESSION 状态机中收集 Session 集合,在消息处理时很容易就能实现消息群发;当某个用户断线后,我们及时在状态机 SESSION_CLOSED 中感知到并更新 Session 集合中的会话状态,甚至可以群发消息给所有用户“某某人掉线了”。这些通信状态和业务相结合的场景, 用状态机能很好的得以解决。最后奉上一段粗糙的伪代码,读者自行领悟。
  1. public class IMProcessor implements MessageProcessor<Message> {
  2. private LinkedBlockingQueue sessions = new LinkedBlockingQueue();
  3. public void process(AioSession<String> session, Message msg) {
  4. for(AioSession otherSession:sessions){
  5. if(otherSession==session){
  6. continue;
  7. }
  8. sendMessage(otherSession,session+"群发送消息:"+msg);
  9. }
  10. }
  11. public void stateEvent(AioSession<Message> session, StateMachineEnum state, Throwable throwable) {
  12. switch (state) {
  13. case NEW_SESSION:
  14. sessions.add(session);
  15. break;
  16. case SESSION_CLOSED:
  17. sessions.remove(session);
  18. break;
  19. }
  20. }
  21. }