客户端AioQuickClient

在过去我不曾觉得框架对于提供客户端API有多重要,因为即便采用最传统的BIO技术也能满足我们的业务所需。直到从事互联网开行业后才意识到一个问题,随着业务的发展会在系统中存在大量的RPC服务,所以服务调用方可能会创建几十甚至上百个客户端连接。如果继续采用BIO技术,单单通信所需的线程资源都会对系统运行造成巨大负担。而运用AIO技术可以实现客户端通信线程资源共享,仅需少量线程便可支持几百上千的客户端连接。

smart-socket提供的客户端服务AioQuickClient在代码实现上比较简单,并且尽量让必要的事情都在构造方法中完成,核心的几个要素只有host、port、protocol、messageProcessor。如仅需以下几行代码便完成客户端的创建、通信、关闭。

  1. public class IntegerClient {
  2. public static void main(String[] args) throws Exception {
  3. AioQuickClient<Integer> aioQuickClient = new AioQuickClient<Integer>("localhost", 8888, new IntegerProtocol(), new IntegerClientProcessor());
  4. AioSession<Integer> session = aioQuickClient.start();
  5. session.writeBuffer().writeInt(1);
  6. Thread.sleep(1000);
  7. aioQuickClient.shutdown();
  8. }
  9. }

在smart-socket中实现客户端AioQuickClient功能的代码量非常少,最复杂的逻辑也就是start方法。在客户端中存在两个start方法,两者的区别在于线程资源是独享还是共享的。值得注意的是,执行start后会返回当前客户端的AioSession对象,如此一来建立创建成功后可直接进行数据输出。

  1. public final AioSession<T> start() throws IOException, ExecutionException, InterruptedException {
  2. this.asynchronousChannelGroup = AsynchronousChannelGroup.withFixedThreadPool(2, new ThreadFactory() {
  3. @Override
  4. public Thread newThread(Runnable r) {
  5. return new Thread(r);
  6. }
  7. });
  8. return start(asynchronousChannelGroup);
  9. }
  10. public AioSession<T> start(AsynchronousChannelGroup asynchronousChannelGroup) throws IOException, ExecutionException, InterruptedException {
  11. AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(asynchronousChannelGroup);
  12. if (bufferPool == null) {
  13. bufferPool = new BufferPagePool(IoServerConfig.getIntProperty(IoServerConfig.Property.CLIENT_PAGE_SIZE, 1024 * 256), 1, IoServerConfig.getBoolProperty(IoServerConfig.Property.CLIENT_PAGE_IS_DIRECT, true));
  14. }
  15. //set socket options
  16. if (config.getSocketOptions() != null) {
  17. for (Map.Entry<SocketOption<Object>, Object> entry : config.getSocketOptions().entrySet()) {
  18. socketChannel.setOption(entry.getKey(), entry.getValue());
  19. }
  20. }
  21. //bind host
  22. if (localAddress != null) {
  23. socketChannel.bind(localAddress);
  24. }
  25. socketChannel.connect(new InetSocketAddress(config.getHost(), config.getPort())).get();
  26. //连接成功则构造AIOSession对象
  27. session = new AioSession<T>(socketChannel, config, new ReadCompletionHandler<T>(), new WriteCompletionHandler<T>(), bufferPool.allocateBufferPage());
  28. session.initSession();
  29. return session;
  30. }