指南:发布消息并订阅主题

了解如何使用一个服务向主题发送消息,并在另一个服务中订阅该主题

现在,你已了解 Dapr 发布/订阅 构建块提供的功能,请了解它如何在你的服务中工作。 下面的示例代码粗略地描述了一个使用两个服务处理订单的应用程序,每个服务都使用 Dapr sidecars:

  • 使用 Dapr 订阅消息队列中主题的结帐服务。
  • 使用 Dapr 向 RabbitMQ 发布消息的订单处理服务。

Diagram showing state management of example service

Dapr 将在符合 CloudEvents v1.0 的信封中自动包装用户有效负载,对 Content-Type 头值使用 datacontenttype 属性。 了解更多关于使用CloudEvents的消息。

下面的示例演示了您的应用程序如何发布和订阅名为orders的主题。

注意

如果你还没有,请尝试使用发布/订阅快速入门快速了解如何使用发布/订阅。

设置 发布/订阅 组件

第一步是设置 发布/订阅 组件:

When you run dapr init, Dapr creates a default Redis pubsub.yaml and runs a Redis container on your local machine, located:

  • 在Windows上,在 %UserProfile%\.dapr\components\pubsub.yaml
  • 在Linux/MacOS上,在~/.dapr/components/pubsub.yaml

使用 pubsub.yaml 组件,您可以轻松更换底层组件而无需更改应用程序代码。 在此示例中,我们使用 RabbitMQ。

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: order-pub-sub
  5. spec:
  6. type: pubsub.rabbitmq
  7. version: v1
  8. metadata:
  9. - name: host
  10. value: "amqp://localhost:5672"
  11. - name: durable
  12. value: "false"
  13. - name: deletedWhenUnused
  14. value: "false"
  15. - name: autoAck
  16. value: "false"
  17. - name: reconnectWait
  18. value: "0"
  19. - name: concurrency
  20. value: parallel
  21. scopes:
  22. - orderprocessing
  23. - checkout

您可以通过创建一个包含该文件的组件目录(在此示例中为myComponents),并在使用dapr run命令行界面时使用--resources-path标志,来使用另一个pubsub component来覆盖此文件。

  1. dapr run --app-id myapp --resources-path ./myComponents -- dotnet run
  1. dapr run --app-id myapp --resources-path ./myComponents -- mvn spring-boot:run
  1. dapr run --app-id myapp --resources-path ./myComponents -- python3 app.py
  1. dapr run --app-id myapp --resources-path ./myComponents -- go run app.go
  1. dapr run --app-id myapp --resources-path ./myComponents -- npm start

To deploy this into a Kubernetes cluster, fill in the metadata connection details of the pub/sub component in the YAML below, save as pubsub.yaml, and run kubectl apply -f pubsub.yaml.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: order-pub-sub
  5. spec:
  6. type: pubsub.rabbitmq
  7. version: v1
  8. metadata:
  9. - name: connectionString
  10. value: "amqp://localhost:5672"
  11. - name: protocol
  12. value: amqp
  13. - name: hostname
  14. value: localhost
  15. - name: username
  16. value: username
  17. - name: password
  18. value: password
  19. - name: durable
  20. value: "false"
  21. - name: deletedWhenUnused
  22. value: "false"
  23. - name: autoAck
  24. value: "false"
  25. - name: reconnectWait
  26. value: "0"
  27. - name: concurrency
  28. value: parallel
  29. scopes:
  30. - orderprocessing
  31. - checkout

订阅主题

Dapr 允许您的应用程序有两种方法来订阅 topics:

  • 声明,其中订阅是在外部文件中定义的。
  • 编程式,订阅在用户代码中定义。

详细了解声明式和程序化订阅方法文档。 这个示例演示了一个声明式订阅。

创建一个名为subscription.yaml的文件,并粘贴以下内容:

  1. apiVersion: dapr.io/v2alpha1
  2. kind: Subscription
  3. metadata:
  4. name: order-pub-sub
  5. spec:
  6. topic: orders
  7. routes:
  8. default: /checkout
  9. pubsubname: order-pub-sub
  10. scopes:
  11. - orderprocessing
  12. - checkout

上面的示例显示了对主题 orders 的事件订阅,用于 pubsub 组件 order-pub-sub

  • route字段告诉Dapr将所有主题消息发送到应用程序中的/checkout端点。
  • scopes 字段使得此订阅适用于具有 orderprocessingcheckout 的应用程序。

subscription.yaml 放在与你的 pubsub.yaml 组件相同的目录中。 当 Dapr 启动时,它将加载组件和订阅。

下面是利用 Dapr SDK 订阅你在 subscription.yaml 中定义的主题的代码示例。

  1. //dependencies
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System;
  5. using Microsoft.AspNetCore.Mvc;
  6. using Dapr;
  7. using Dapr.Client;
  8. //code
  9. namespace CheckoutService.controller
  10. {
  11. [ApiController]
  12. public class CheckoutServiceController : Controller
  13. {
  14. //Subscribe to a topic
  15. [Topic("order-pub-sub", "orders")]
  16. [HttpPost("checkout")]
  17. public void getCheckout([FromBody] int orderId)
  18. {
  19. Console.WriteLine("Subscriber received : " + orderId);
  20. }
  21. }
  22. }

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和订阅程序:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 --app-protocol https dotnet run
  1. //dependencies
  2. import io.dapr.Topic;
  3. import io.dapr.client.domain.CloudEvent;
  4. import org.springframework.web.bind.annotation.*;
  5. import com.fasterxml.jackson.databind.ObjectMapper;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import reactor.core.publisher.Mono;
  9. //code
  10. @RestController
  11. public class CheckoutServiceController {
  12. private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class);
  13. //Subscribe to a topic
  14. @Topic(name = "orders", pubsubName = "order-pub-sub")
  15. @PostMapping(path = "/checkout")
  16. public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
  17. return Mono.fromRunnable(() -> {
  18. try {
  19. log.info("Subscriber received: " + cloudEvent.getData());
  20. } catch (Exception e) {
  21. throw new RuntimeException(e);
  22. }
  23. });
  24. }
  25. }

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和订阅程序:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 mvn spring-boot:run
  1. #dependencies
  2. from cloudevents.sdk.event import v1
  3. from dapr.ext.grpc import App
  4. import logging
  5. import json
  6. #code
  7. app = App()
  8. logging.basicConfig(level = logging.INFO)
  9. #Subscribe to a topic
  10. @app.subscribe(pubsub_name='order-pub-sub', topic='orders')
  11. def mytopic(event: v1.Event) -> None:
  12. data = json.loads(event.Data())
  13. logging.info('Subscriber received: ' + str(data))
  14. app.run(6002)

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和订阅程序:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --app-protocol grpc -- python3 CheckoutService.py
  1. //dependencies
  2. import (
  3. "log"
  4. "net/http"
  5. "context"
  6. "github.com/dapr/go-sdk/service/common"
  7. daprd "github.com/dapr/go-sdk/service/http"
  8. )
  9. //code
  10. var sub = &common.Subscription{
  11. PubsubName: "order-pub-sub",
  12. Topic: "orders",
  13. Route: "/checkout",
  14. }
  15. func main() {
  16. s := daprd.NewService(":6002")
  17. //Subscribe to a topic
  18. if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
  19. log.Fatalf("error adding topic subscription: %v", err)
  20. }
  21. if err := s.Start(); err != nil && err != http.ErrServerClosed {
  22. log.Fatalf("error listenning: %v", err)
  23. }
  24. }
  25. func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
  26. log.Printf("Subscriber received: %s", e.Data)
  27. return false, nil
  28. }

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和订阅程序:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 go run CheckoutService.go
  1. //dependencies
  2. import { DaprServer, CommunicationProtocolEnum } from '@dapr/dapr';
  3. //code
  4. const daprHost = "127.0.0.1";
  5. const serverHost = "127.0.0.1";
  6. const serverPort = "6002";
  7. start().catch((e) => {
  8. console.error(e);
  9. process.exit(1);
  10. });
  11. async function start(orderId) {
  12. const server = new DaprServer({
  13. serverHost,
  14. serverPort,
  15. communicationProtocol: CommunicationProtocolEnum.HTTP,
  16. clientOptions: {
  17. daprHost,
  18. daprPort: process.env.DAPR_HTTP_PORT,
  19. },
  20. });
  21. //Subscribe to a topic
  22. await server.pubsub.subscribe("order-pub-sub", "orders", async (orderId) => {
  23. console.log(`Subscriber received: ${JSON.stringify(orderId)}`)
  24. });
  25. await server.start();
  26. }

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和订阅程序:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 npm start

发布消息

用名为 orderprocessing 的 app-id 启动一个 Dapr 实例:

  1. dapr run --app-id orderprocessing --dapr-http-port 3601

然后发布一条消息给 orders 主题:

  1. dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{"orderId": "100"}'
  1. curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders -H "Content-Type: application/json" -d '{"orderId": "100"}'
  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"orderId": "100"}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'

下面是利用 Dapr SDK 发布主题的代码示例。

  1. //dependencies
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Net.Http;
  5. using System.Net.Http.Headers;
  6. using System.Threading.Tasks;
  7. using Dapr.Client;
  8. using Microsoft.AspNetCore.Mvc;
  9. using System.Threading;
  10. //code
  11. namespace EventService
  12. {
  13. class Program
  14. {
  15. static async Task Main(string[] args)
  16. {
  17. string PUBSUB_NAME = "order-pub-sub";
  18. string TOPIC_NAME = "orders";
  19. while(true) {
  20. System.Threading.Thread.Sleep(5000);
  21. Random random = new Random();
  22. int orderId = random.Next(1,1000);
  23. CancellationTokenSource source = new CancellationTokenSource();
  24. CancellationToken cancellationToken = source.Token;
  25. using var client = new DaprClientBuilder().Build();
  26. //Using Dapr SDK to publish a topic
  27. await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken);
  28. Console.WriteLine("Published data: " + orderId);
  29. }
  30. }
  31. }
  32. }

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和发布程序:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 --app-protocol https dotnet run
  1. //dependencies
  2. import io.dapr.client.DaprClient;
  3. import io.dapr.client.DaprClientBuilder;
  4. import io.dapr.client.domain.Metadata;
  5. import static java.util.Collections.singletonMap;
  6. import org.springframework.boot.autoconfigure.SpringBootApplication;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import java.util.Random;
  10. import java.util.concurrent.TimeUnit;
  11. //code
  12. @SpringBootApplication
  13. public class OrderProcessingServiceApplication {
  14. private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);
  15. public static void main(String[] args) throws InterruptedException{
  16. String MESSAGE_TTL_IN_SECONDS = "1000";
  17. String TOPIC_NAME = "orders";
  18. String PUBSUB_NAME = "order-pub-sub";
  19. while(true) {
  20. TimeUnit.MILLISECONDS.sleep(5000);
  21. Random random = new Random();
  22. int orderId = random.nextInt(1000-1) + 1;
  23. DaprClient client = new DaprClientBuilder().build();
  24. //Using Dapr SDK to publish a topic
  25. client.publishEvent(
  26. PUBSUB_NAME,
  27. TOPIC_NAME,
  28. orderId,
  29. singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
  30. log.info("Published data:" + orderId);
  31. }
  32. }
  33. }

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和发布程序:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
  1. #dependencies
  2. import random
  3. from time import sleep
  4. import requests
  5. import logging
  6. import json
  7. from dapr.clients import DaprClient
  8. #code
  9. logging.basicConfig(level = logging.INFO)
  10. while True:
  11. sleep(random.randrange(50, 5000) / 1000)
  12. orderId = random.randint(1, 1000)
  13. PUBSUB_NAME = 'order-pub-sub'
  14. TOPIC_NAME = 'orders'
  15. with DaprClient() as client:
  16. #Using Dapr SDK to publish a topic
  17. result = client.publish_event(
  18. pubsub_name=PUBSUB_NAME,
  19. topic_name=TOPIC_NAME,
  20. data=json.dumps(orderId),
  21. data_content_type='application/json',
  22. )
  23. logging.info('Published data: ' + str(orderId))

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和发布程序:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --app-protocol grpc python3 OrderProcessingService.py
  1. //dependencies
  2. import (
  3. "context"
  4. "log"
  5. "math/rand"
  6. "time"
  7. "strconv"
  8. dapr "github.com/dapr/go-sdk/client"
  9. )
  10. //code
  11. var (
  12. PUBSUB_NAME = "order-pub-sub"
  13. TOPIC_NAME = "orders"
  14. )
  15. func main() {
  16. for i := 0; i < 10; i++ {
  17. time.Sleep(5000)
  18. orderId := rand.Intn(1000-1) + 1
  19. client, err := dapr.NewClient()
  20. if err != nil {
  21. panic(err)
  22. }
  23. defer client.Close()
  24. ctx := context.Background()
  25. //Using Dapr SDK to publish a topic
  26. if err := client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId)));
  27. err != nil {
  28. panic(err)
  29. }
  30. log.Println("Published data: " + strconv.Itoa(orderId))
  31. }
  32. }

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和发布程序:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
  1. //dependencies
  2. import { DaprServer, DaprClient, CommunicationProtocolEnum } from '@dapr/dapr';
  3. const daprHost = "127.0.0.1";
  4. var main = function() {
  5. for(var i=0;i<10;i++) {
  6. sleep(5000);
  7. var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
  8. start(orderId).catch((e) => {
  9. console.error(e);
  10. process.exit(1);
  11. });
  12. }
  13. }
  14. async function start(orderId) {
  15. const PUBSUB_NAME = "order-pub-sub"
  16. const TOPIC_NAME = "orders"
  17. const client = new DaprClient({
  18. daprHost,
  19. daprPort: process.env.DAPR_HTTP_PORT,
  20. communicationProtocol: CommunicationProtocolEnum.HTTP
  21. });
  22. console.log("Published data:" + orderId)
  23. //Using Dapr SDK to publish a topic
  24. await client.pubsub.publish(PUBSUB_NAME, TOPIC_NAME, orderId);
  25. }
  26. function sleep(ms) {
  27. return new Promise(resolve => setTimeout(resolve, ms));
  28. }
  29. main();

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和发布程序:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start

消息确认和重试

为了告诉Dapr消息处理成功,返回一个200 OK响应。 如果 Dapr 收到超过 200 的返回状态代码,或者你的应用崩溃,Dapr 将根据 At-Least-Once 语义尝试重新传递消息。

演示视频

观看此演示视频以了解使用Dapr进行发布/订阅消息传递的更多信息。

下一步