Speaking in POJO instead of ByteBuf 用 POJO 代替 ByteBuf

我们回顾了迄今为止的所有例子使用 ByteBuf 作为协议消息的主要数据结构。在本节中,我们将改善的 TIME 协议客户端和服务器例子,使用 POJO 代替 ByteBuf。

ChannelHandler 使用 POJO 的好处很明显:通过从 ChannelHandler 中提取出 ByteBuf 的代码,将会使 ChannelHandler 的实现变得更加可维护和可重用。在 TIME 客户端和服务器的例子中,我们读取的仅仅是一个 32 位的整形数据,直接使用 ByteBuf 不会是一个主要的问题。然而,你会发现当你需要实现一个真实的协议,分离代码变得非常的必要。

首先,让我们定义一个新的类型叫做 UnixTime。

  1. public class UnixTime {
  2. private final long value;
  3. public UnixTime() {
  4. this(System.currentTimeMillis() / 1000L + 2208988800L);
  5. }
  6. public UnixTime(long value) {
  7. this.value = value;
  8. }
  9. public long value() {
  10. return value;
  11. }
  12. @Override
  13. public String toString() {
  14. return new Date((value() - 2208988800L) * 1000L).toString();
  15. }
  16. }

现在我们可以修改下 TimeDecoder 类,返回一个 UnixTime,以替代 ByteBuf

  1. @Override
  2. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
  3. if (in.readableBytes() < 4) {
  4. return;
  5. }
  6. out.add(new UnixTime(in.readUnsignedInt()));
  7. }

下面是修改后的解码器,TimeClientHandler 不再任何的 ByteBuf 代码了。

  1. @Override
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  3. UnixTime m = (UnixTime) msg;
  4. System.out.println(m);
  5. ctx.close();
  6. }

是不是变得更加简单和优雅了?相同的技术可以被运用到服务端。让我们修改一下 TimeServerHandler 的代码。

  1. @Override
  2. public void channelActive(ChannelHandlerContext ctx) {
  3. ChannelFuture f = ctx.writeAndFlush(new UnixTime());
  4. f.addListener(ChannelFutureListener.CLOSE);
  5. }

现在,唯一缺少的功能是一个编码器,是ChannelOutboundHandler的实现,用来将 UnixTime 对象重新转化为一个 ByteBuf。这是比编写一个解码器简单得多,因为没有需要处理的数据包编码消息时拆分和组装。

  1. public class TimeEncoder extends ChannelOutboundHandlerAdapter {
  2. @Override
  3. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
  4. UnixTime m = (UnixTime) msg;
  5. ByteBuf encoded = ctx.alloc().buffer(4);
  6. encoded.writeInt((int)m.value());
  7. ctx.write(encoded, promise); // (1)
  8. }
  9. }

1.在这几行代码里还有几个重要的事情。第一,通过 ChannelPromise,当编码后的数据被写到了通道上 Netty 可以通过这个对象标记是成功还是失败。第二, 我们不需要调用 cxt.flush()。因为处理器已经单独分离出了一个方法 void flush(ChannelHandlerContext cxt),如果像自己实现 flush() 方法内容可以自行覆盖这个方法。

进一步简化操作,你可以使用 MessageToByteEncode:

  1. public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
  2. @Override
  3. protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
  4. out.writeInt((int)msg.value());
  5. }
  6. }

最后的任务就是在 TimeServerHandler 之前把 TimeEncoder 插入到 ChannelPipeline。 但这是不那么重要的工作。