什么是 Reactor?

现在你要了解下 Reactor,不妨在你喜欢的搜索引擎里输入 Reactive,Spring+Reactive,Asynchronous+Java 之类的关键词,或者直接输入 Reactor是什么货?。简单说,Reactor 是一个轻量级 JVM 基础库,帮助你的服务或应用高效,异步地传递消息。

"高效"是指什么?

  • 消息从A传递到B时,产生很少的内存垃圾,甚至不产生。
  • 解决消费者处理消息的效率低于生产者时带来的溢出问题。
  • 尽可能提供非阻塞异步流

从经验可知(主要是 #rage#drunk 的推特),异步编程很难,而像 JVM 这类提供众多可选参数的平台则尤其困难。 Reactor 旨在帮助大多数用例真正非阻塞地运行。我们提供的 API 比 JDK 的 java.util.concurrent 库低级原语更高效。Reactor 提供了下列功能的替代函数 (并建议不使用 JDK 原生语句):

  • 阻塞等待:如 Future.get()

  • 不安全的数据访问:如 ReentrantLock.lock()

  • 异常冒泡:如 try…​catch…​finally

  • 同步阻塞:如 synchronized{ }

  • Wrapper分配(GC 压力):如 new Wrapper(event) 当消息传递效率成为系统性能瓶颈的时候(10k msg/s,100k msg/s,1M…),非阻塞机制就显得尤为重要。虽然这个有理论支持 (参见 Amdahl’s Law),但读起来太无聊了。我们举例说明,比如你用了个 Executor 方法:

  1. private ExecutorService threadPool = Executors.newFixedThreadPool(8);
  2. final List<T> batches = new ArrayList<T>();
  3. Callable<T> t = new Callable<T>() { // *1
  4. public T run() {
  5. synchronized(batches) { // *2
  6. T result = callDatabase(msg); // *3
  7. batches.add(result);
  8. return result;
  9. }
  10. }
  11. };
  12. Future<T> f = threadPool.submit(t); // *4
  13. T result = f.get() // *5
  • Callable 分配 — 可能导致 GC 压力。
  • 同步过程强制每个线程执行停-检查操作。
  • 消息的消费可能比生产慢。
  • 使用线程池(ThreadPool)将任务传递给目标线程 — 通过 FutureTask 方式肯定会产生 GC 压力。
  • 阻塞直至 callDatabase() 回调。 在这个简单的例子中,很容易指出为什么扩容是很有限的:

  • 分配对象可能产生GC压力,特别是当任务运行时间过长。

    • 每次 GC 暂停都会影响全局性能。
  • 默认,队列是无界的,任务会因为数据库调用而堆积。

    • 积压虽然不会直接导致内存泄漏,但会带来严重副作用:GC 暂停时要扫描更多的对象;有丢失重要数据位的风险;等等 …

    • 典型链式队列节点分配时会产生大量内存压力。

  • 阻塞回调容易产生恶性循环。

    • 阻塞回调会降低消息生产者的效率。在实践中,任务提交后需要等待结果返回,此时流式过程几乎演变为同步的了。

    • 会话过程抛出的任何带数据存储的异常都会以不受控的方式被传递给生产者,否定了任何通常在线程边界附近可用的容错性。

要实现完全非阻塞是很难办到的,尤其是在有着类似微服务架构这样时髦绰号的分布式系统的世界里。因此 Reactor 做了部分妥协,尝试利用最优的可用模式,使开发者觉得他们是在写异步纳米服务,而不是什么数学论文。

没有什么传播得比光快(除了绯闻和网红猫的视频),正如到了某个阶段,延迟是每一个系统到都要面对的实实在在的问题。为此:

Reactor 提供的框架可以帮助减轻应用中由延迟产生的副作用,只需要增加一点点开销:

  • 使用了一些聪明的结构,通过启动预分配策略解决运行时分配问题

  • 通过确定信息传递主结构的边界,避免任务的无限堆叠;

  • 采用主流的响应与事件驱动构架模式,提供包含反馈在内的非阻塞端对端流

  • 引入新的 Reactive Streams标准,拒绝超过当前容量请求,从而保证限制结构的有效性;

  • IPC上也使用了类似理念,提供对流控制友好的非阻塞 IO 驱动

  • 开放了帮助开发者们以零副作用方式组织他们代码的函数接口,借助这些函数来处理容错性和线程安全。