异步调用

在 Dubbo 中发起异步调用

特性说明

背景

从 2.7.0 开始,Dubbo 的所有异步编程接口开始以 CompletableFuture 为基础

基于 NIO 的非阻塞实现并行调用,客户端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较小。

/user-guide/images/future.jpg

参考用例

https://github.com/apache/dubbo-samples/tree/master/dubbo-samples-async

使用场景

使用方式

使用 CompletableFuture 签名的接口

需要服务提供者事先定义 CompletableFuture 签名的服务,接口定义指南如下:

Provider端异步执行将阻塞的业务从Dubbo内部线程池切换到业务自定义线程,避免Dubbo线程池的过度占用,有助于避免不同服务间的互相影响。异步执行无异于节省资源或提升RPC响应性能,因为如果业务执行需要阻塞,则始终还是要有线程来负责执行。

Provider 端异步执行和 Consumer 端异步调用是相互独立的,任意正交组合两端配置

  • Consumer同步 - Provider同步
  • Consumer异步 - Provider同步
  • Consumer同步 - Provider异步
  • Consumer异步 - Provider异步

定义 CompletableFuture 签名的接口

服务接口定义

  1. public interface AsyncService {
  2. CompletableFuture<String> sayHello(String name);
  3. }

服务实现

  1. public class AsyncServiceImpl implements AsyncService {
  2. @Override
  3. public CompletableFuture<String> sayHello(String name) {
  4. return CompletableFuture.supplyAsync(() -> {
  5. System.out.println(name);
  6. try {
  7. Thread.sleep(5000);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. return "async response from provider.";
  12. });
  13. }
  14. }

通过 return CompletableFuture.supplyAsync() ,业务执行已从 Dubbo 线程切换到业务线程,避免了对 Dubbo 线程池的阻塞。

使用 AsyncContext

Dubbo 提供了一个类似 Servlet 3.0 的异步接口AsyncContext,在没有 CompletableFuture 签名接口的情况下,也可以实现 Provider 端的异步执行。

服务接口定义

  1. public interface AsyncService {
  2. String sayHello(String name);
  3. }

服务暴露,和普通服务完全一致

  1. <bean id="asyncService" class="org.apache.dubbo.samples.governance.impl.AsyncServiceImpl"/>
  2. <dubbo:service interface="org.apache.dubbo.samples.governance.api.AsyncService" ref="asyncService"/>

服务实现

  1. public class AsyncServiceImpl implements AsyncService {
  2. public String sayHello(String name) {
  3. final AsyncContext asyncContext = RpcContext.startAsync();
  4. new Thread(() -> {
  5. // 如果要使用上下文,则必须要放在第一句执行
  6. asyncContext.signalContextSwitch();
  7. try {
  8. Thread.sleep(500);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. // 写回响应
  13. asyncContext.write("Hello " + name + ", response from provider.");
  14. }).start();
  15. return null;
  16. }
  17. }

注意接口的返回类型是 CompletableFuture<String>

XML 引用服务

  1. <dubbo:reference id="asyncService" timeout="10000" interface="com.alibaba.dubbo.samples.async.api.AsyncService"/>

调用远程服务

  1. // 调用直接返回CompletableFuture
  2. CompletableFuture<String> future = asyncService.sayHello("async call request");
  3. // 增加回调
  4. future.whenComplete((v, t) -> {
  5. if (t != null) {
  6. t.printStackTrace();
  7. } else {
  8. System.out.println("Response: " + v);
  9. }
  10. });
  11. // 早于结果输出
  12. System.out.println("Executed before response return.");

使用 RpcContext

在 consumer.xml 中配置

  1. <dubbo:reference id="asyncService" interface="org.apache.dubbo.samples.governance.api.AsyncService">
  2. <dubbo:method name="sayHello" async="true" />
  3. </dubbo:reference>

调用代码

  1. // 此调用会立即返回null
  2. asyncService.sayHello("world");
  3. // 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
  4. CompletableFuture<String> helloFuture = RpcContext.getServiceContext().getCompletableFuture();
  5. // 为Future添加回调
  6. helloFuture.whenComplete((retValue, exception) -> {
  7. if (exception == null) {
  8. System.out.println(retValue);
  9. } else {
  10. exception.printStackTrace();
  11. }
  12. });

或者,也可以这样做异步调用

  1. CompletableFuture<String> future = RpcContext.getServiceContext().asyncCall(
  2. () -> {
  3. asyncService.sayHello("oneway call request1");
  4. }
  5. );
  6. future.get();

异步总是不等待返回,你也可以设置是否等待消息发出

  • sent="true" 等待消息发出,消息发送失败将抛出异常。
  • sent="false" 不等待消息发出,将消息放入 IO 队列,即刻返回。
  1. <dubbo:method name="findFoo" async="true" sent="true" />

如果你只是想异步,完全忽略返回值,可以配置 return="false",以减少 Future 对象的创建和管理成本

  1. <dubbo:method name="findFoo" async="true" return="false" />

最后修改 December 16, 2022: Fix check (#1736) (97972c1)