Streaming 通信模式

具体用例可以参考:dubbo-samples-triple/pojo;

开启 Triple 新特性 —— Stream (流)

Stream 是 Dubbo3 新提供的一种调用类型,在以下场景时建议使用流的方式:

  • 接口需要发送大量数据,这些数据无法被放在一个 RPC 的请求或响应中,需要分批发送,但应用层如果按照传统的多次 RPC 方式无法解决顺序和性能的问题,如果需要保证有序,则只能串行发送
  • 流式场景,数据需要按照发送顺序处理, 数据本身是没有确定边界的
  • 推送类场景,多个消息在同一个调用的上下文中被发送和处理

Stream 分为以下三种:

  • SERVER_STREAM(服务端流) SERVER_STREAM
  • CLIENT_STREAM(客户端流) CLIENT_STREAM
  • BIDIRECTIONAL_STREAM(双向流) BIDIRECTIONAL_STREAM

由于 java 语言的限制,BIDIRECTIONAL_STREAM 和 CLIENT_STREAM 的实现是一样的。

在 Dubbo3 中,流式接口以 SteamObserver 声明和使用,用户可以通过使用和实现这个接口来发送和处理流的数据、异常和结束。

对于 Dubbo2 用户来说,可能会对StreamObserver感到陌生,这是Dubbo3定义的一种流类型,Dubbo2 中并不存在 Stream 的类型,所以对于迁移场景没有任何影响。

流的语义保证

  • 提供消息边界,可以方便地对消息单独处理
  • 严格有序,发送端的顺序和接收端顺序一致
  • 全双工,发送不需要等待
  • 支持取消和超时

非 PB 序列化的流

  1. api
  1. public interface IWrapperGreeter {
  2. StreamObserver<String> sayHelloStream(StreamObserver<String> response);
  3. void sayHelloServerStream(String request, StreamObserver<String> response);
  4. }

Stream 方法的方法入参和返回值是严格约定的,为防止写错而导致问题,Dubbo3 框架侧做了对参数的检查, 如果出错则会抛出异常。 对于 双向流(BIDIRECTIONAL_STREAM), 需要注意参数中的 StreamObserver 是响应流,返回参数中的 StreamObserver 为请求流。

  1. 实现类
  1. public class WrapGreeterImpl implements WrapGreeter {
  2. //...
  3. @Override
  4. public StreamObserver<String> sayHelloStream(StreamObserver<String> response) {
  5. return new StreamObserver<String>() {
  6. @Override
  7. public void onNext(String data) {
  8. System.out.println(data);
  9. response.onNext("hello,"+data);
  10. }
  11. @Override
  12. public void onError(Throwable throwable) {
  13. throwable.printStackTrace();
  14. }
  15. @Override
  16. public void onCompleted() {
  17. System.out.println("onCompleted");
  18. response.onCompleted();
  19. }
  20. };
  21. }
  22. @Override
  23. public void sayHelloServerStream(String request, StreamObserver<String> response) {
  24. for (int i = 0; i < 10; i++) {
  25. response.onNext("hello," + request);
  26. }
  27. response.onCompleted();
  28. }
  29. }
  1. 调用方式
  1. delegate.sayHelloServerStream("server stream", new StreamObserver<String>() {
  2. @Override
  3. public void onNext(String data) {
  4. System.out.println(data);
  5. }
  6. @Override
  7. public void onError(Throwable throwable) {
  8. throwable.printStackTrace();
  9. }
  10. @Override
  11. public void onCompleted() {
  12. System.out.println("onCompleted");
  13. }
  14. });
  15. StreamObserver<String> request = delegate.sayHelloStream(new StreamObserver<String>() {
  16. @Override
  17. public void onNext(String data) {
  18. System.out.println(data);
  19. }
  20. @Override
  21. public void onError(Throwable throwable) {
  22. throwable.printStackTrace();
  23. }
  24. @Override
  25. public void onCompleted() {
  26. System.out.println("onCompleted");
  27. }
  28. });
  29. for (int i = 0; i < n; i++) {
  30. request.onNext("stream request" + i);
  31. }
  32. request.onCompleted();

使用 Protobuf 序列化的流

对于 Protobuf 序列化方式,推荐编写 IDL 使用 compiler 插件进行编译生成。生成的代码大致如下:

  1. public interface PbGreeter {
  2. static final String JAVA_SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter";
  3. static final String SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter";
  4. static final boolean inited = PbGreeterDubbo.init();
  5. //...
  6. void greetServerStream(org.apache.dubbo.sample.tri.GreeterRequest request, org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver);
  7. org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterRequest> greetStream(org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver);
  8. }

完整用例

  1. 编写 Java 接口

    1. import org.apache.dubbo.common.stream.StreamObserver;
    2. import org.apache.dubbo.hello.HelloReply;
    3. import org.apache.dubbo.hello.HelloRequest;
    4. public interface IGreeter {
    5. /**
    6. * <pre>
    7. * Sends greeting by stream
    8. * </pre>
    9. */
    10. StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver);
    11. }
  2. 编写实现类

    1. public class IStreamGreeterImpl implements IStreamGreeter {
    2. @Override
    3. public StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver) {
    4. return new StreamObserver<HelloRequest>() {
    5. private List<HelloReply> replyList = new ArrayList<>();
    6. @Override
    7. public void onNext(HelloRequest helloRequest) {
    8. System.out.println("onNext receive request name:" + helloRequest.getName());
    9. replyList.add(HelloReply.newBuilder()
    10. .setMessage("receive name:" + helloRequest.getName())
    11. .build());
    12. }
    13. @Override
    14. public void onError(Throwable cause) {
    15. System.out.println("onError");
    16. replyObserver.onError(cause);
    17. }
    18. @Override
    19. public void onCompleted() {
    20. System.out.println("onComplete receive request size:" + replyList.size());
    21. for (HelloReply reply : replyList) {
    22. replyObserver.onNext(reply);
    23. }
    24. replyObserver.onCompleted();
    25. }
    26. };
    27. }
    28. }
  3. 创建 Provider

    1. public class StreamProvider {
    2. public static void main(String[] args) throws InterruptedException {
    3. ServiceConfig<IStreamGreeter> service = new ServiceConfig<>();
    4. service.setInterface(IStreamGreeter.class);
    5. service.setRef(new IStreamGreeterImpl());
    6. service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051));
    7. service.setApplication(new ApplicationConfig("stream-provider"));
    8. service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
    9. service.export();
    10. System.out.println("dubbo service started");
    11. new CountDownLatch(1).await();
    12. }
    13. }
  4. 创建 Consumer

    1. public class StreamConsumer {
    2. public static void main(String[] args) throws InterruptedException, IOException {
    3. ReferenceConfig<IStreamGreeter> ref = new ReferenceConfig<>();
    4. ref.setInterface(IStreamGreeter.class);
    5. ref.setCheck(false);
    6. ref.setProtocol(CommonConstants.TRIPLE);
    7. ref.setLazy(true);
    8. ref.setTimeout(100000);
    9. ref.setApplication(new ApplicationConfig("stream-consumer"));
    10. ref.setRegistry(new RegistryConfig("zookeeper://mse-6e9fda00-p.zk.mse.aliyuncs.com:2181"));
    11. final IStreamGreeter iStreamGreeter = ref.get();
    12. System.out.println("dubbo ref started");
    13. try {
    14. StreamObserver<HelloRequest> streamObserver = iStreamGreeter.sayHello(new StreamObserver<HelloReply>() {
    15. @Override
    16. public void onNext(HelloReply reply) {
    17. System.out.println("onNext");
    18. System.out.println(reply.getMessage());
    19. }
    20. @Override
    21. public void onError(Throwable throwable) {
    22. System.out.println("onError:" + throwable.getMessage());
    23. }
    24. @Override
    25. public void onCompleted() {
    26. System.out.println("onCompleted");
    27. }
    28. });
    29. streamObserver.onNext(HelloRequest.newBuilder()
    30. .setName("tony")
    31. .build());
    32. streamObserver.onNext(HelloRequest.newBuilder()
    33. .setName("nick")
    34. .build());
    35. streamObserver.onCompleted();
    36. } catch (Throwable t) {
    37. t.printStackTrace();
    38. }
    39. System.in.read();
    40. }
    41. }
  5. 运行 Provider 和 Consumer ,可以看到请求正常返回了

    onNext
    receive name:tony
    onNext
    receive name:nick
    onCompleted

常见问题

  1. protobuf 类找不到

由于 Triple 协议底层需要依赖 protobuf 协议进行传输,即使定义的服务接口不使用 protobuf 也需要在环境中引入 protobuf 的依赖。

  1. <dependency>
  2. <groupId>com.google.protobuf</groupId>
  3. <artifactId>protobuf-java</artifactId>
  4. <version>3.19.4</version>
  5. </dependency>

最后修改 December 16, 2022: Fix check (#1736) (97972c1)