发布和订阅批量消息

了解如何在 Dapr 中使用批量发布和订阅 API。

alpha

批量发布和订阅API目前处于alpha阶段。

使用批量发布和订阅API,您可以在一个请求中发布和订阅多个消息。 当编写需要发送或接收大量消息的应用程序时,使用批量操作可以通过减少 Dapr sidecar、应用程序和底层发布/订阅代理之间的请求总数来实现高吞吐量。

批量发布消息

批量发布消息时的限制

批量发布API允许您在单个请求中发布多条消息到一个主题。 它是_非事务性_,即从一个单一的批量请求中,一些消息可以成功,一些消息可以失败。 如果任何消息发布失败,批量发布操作将返回一个失败消息列表。

批量发布操作也不能保证消息的顺序。

如何使用Dapr扩展来开发和运行Dapr应用程序

  1. import io.dapr.client.DaprClientBuilder;
  2. import io.dapr.client.DaprPreviewClient;
  3. import io.dapr.client.domain.BulkPublishResponse;
  4. import io.dapr.client.domain.BulkPublishResponseFailedEntry;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. class BulkPublisher {
  8. private static final String PUBSUB_NAME = "my-pubsub-name";
  9. private static final String TOPIC_NAME = "topic-a";
  10. public void publishMessages() {
  11. try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
  12. // Create a list of messages to publish
  13. List<String> messages = new ArrayList<>();
  14. for (int i = 0; i < 10; i++) {
  15. String message = String.format("This is message #%d", i);
  16. messages.add(message);
  17. }
  18. // Publish list of messages using the bulk publish API
  19. BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block();
  20. }
  21. }
  22. }
  1. import { DaprClient } from "@dapr/dapr";
  2. const pubSubName = "my-pubsub-name";
  3. const topic = "topic-a";
  4. async function start() {
  5. const client = new DaprClient();
  6. // Publish multiple messages to a topic.
  7. await client.pubsub.publishBulk(pubSubName, topic, ["message 1", "message 2", "message 3"]);
  8. // Publish multiple messages to a topic with explicit bulk publish messages.
  9. const bulkPublishMessages = [
  10. {
  11. entryID: "entry-1",
  12. contentType: "application/json",
  13. event: { hello: "foo message 1" },
  14. },
  15. {
  16. entryID: "entry-2",
  17. contentType: "application/cloudevents+json",
  18. event: {
  19. specversion: "1.0",
  20. source: "/some/source",
  21. type: "example",
  22. id: "1234",
  23. data: "foo message 2",
  24. datacontenttype: "text/plain"
  25. },
  26. },
  27. {
  28. entryID: "entry-3",
  29. contentType: "text/plain",
  30. event: "foo message 3",
  31. },
  32. ];
  33. await client.pubsub.publishBulk(pubSubName, topic, bulkPublishMessages);
  34. }
  35. start().catch((e) => {
  36. console.error(e);
  37. process.exit(1);
  38. });
  1. using System;
  2. using System.Collections.Generic;
  3. using Dapr.Client;
  4. const string PubsubName = "my-pubsub-name";
  5. const string TopicName = "topic-a";
  6. IReadOnlyList<object> BulkPublishData = new List<object>() {
  7. new { Id = "17", Amount = 10m },
  8. new { Id = "18", Amount = 20m },
  9. new { Id = "19", Amount = 30m }
  10. };
  11. using var client = new DaprClientBuilder().Build();
  12. var res = await client.BulkPublishEventAsync(PubsubName, TopicName, BulkPublishData);
  13. if (res == null) {
  14. throw new Exception("null response from dapr");
  15. }
  16. if (res.FailedEntries.Count > 0)
  17. {
  18. Console.WriteLine("Some events failed to be published!");
  19. foreach (var failedEntry in res.FailedEntries)
  20. {
  21. Console.WriteLine("EntryId: " + failedEntry.Entry.EntryId + " Error message: " +
  22. failedEntry.ErrorMessage);
  23. }
  24. }
  25. else
  26. {
  27. Console.WriteLine("Published all events!");
  28. }
  1. import requests
  2. import json
  3. base_url = "http://localhost:3500/v1.0-alpha1/publish/bulk/{}/{}"
  4. pubsub_name = "my-pubsub-name"
  5. topic_name = "topic-a"
  6. payload = [
  7. {
  8. "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
  9. "event": "first text message",
  10. "contentType": "text/plain"
  11. },
  12. {
  13. "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
  14. "event": {
  15. "message": "second JSON message"
  16. },
  17. "contentType": "application/json"
  18. }
  19. ]
  20. response = requests.post(base_url.format(pubsub_name, topic_name), json=payload)
  21. print(response.status_code)
  1. package main
  2. import (
  3. "fmt"
  4. "strings"
  5. "net/http"
  6. "io/ioutil"
  7. )
  8. const (
  9. pubsubName = "my-pubsub-name"
  10. topicName = "topic-a"
  11. baseUrl = "http://localhost:3500/v1.0-alpha1/publish/bulk/%s/%s"
  12. )
  13. func main() {
  14. url := fmt.Sprintf(baseUrl, pubsubName, topicName)
  15. method := "POST"
  16. payload := strings.NewReader(`[
  17. {
  18. "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
  19. "event": "first text message",
  20. "contentType": "text/plain"
  21. },
  22. {
  23. "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
  24. "event": {
  25. "message": "second JSON message"
  26. },
  27. "contentType": "application/json"
  28. }
  29. ]`)
  30. client := &http.Client {}
  31. req, _ := http.NewRequest(method, url, payload)
  32. req.Header.Add("Content-Type", "application/json")
  33. res, err := client.Do(req)
  34. // ...
  35. }
  1. curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a \
  2. -H 'Content-Type: application/json' \
  3. -d '[
  4. {
  5. "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
  6. "event": "first text message",
  7. "contentType": "text/plain"
  8. },
  9. {
  10. "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
  11. "event": {
  12. "message": "second JSON message"
  13. },
  14. "contentType": "application/json"
  15. },
  16. ]'
  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Uri 'http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a' `
  2. -Body '[
  3. {
  4. "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
  5. "event": "first text message",
  6. "contentType": "text/plain"
  7. },
  8. {
  9. "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
  10. "event": {
  11. "message": "second JSON message"
  12. },
  13. "contentType": "application/json"
  14. },
  15. ]'

批量订阅消息

批量订阅API允许您在单个请求中从一个主题订阅多条消息。 正如我们从如何:发布和订阅主题中所了解的那样,有两种订阅主题的方式:

  • 声明 - 订阅是在外部文件中定义的。
  • 编程式 - 订阅在代码中定义。

要批量订阅主题,我们只需要使用bulkSubscribe spec属性,类似以下方式:

  1. apiVersion: dapr.io/v2alpha1
  2. kind: Subscription
  3. metadata:
  4. name: order-pub-sub
  5. spec:
  6. topic: orders
  7. routes:
  8. default: /checkout
  9. pubsubname: order-pub-sub
  10. bulkSubscribe:
  11. enabled: true
  12. maxMessagesCount: 100
  13. maxAwaitDurationMs: 40
  14. scopes:
  15. - orderprocessing
  16. - checkout

在上面的示例中,bulkSubscribe是_可选的_。 如果您使用 bulkSubscribe,那么:

  • enabled 是必填的,并且在此主题上启用或禁用批量订阅
  • 您可以选择配置每个批量消息中传递的最大消息数(maxMessagesCount)。 不支持批量订阅的组件的 maxMessagesCount 的默认值为100,即默认情况下 App 和 Dapr 之间的批量事件。 请参考组件如何处理发布和订阅批量消息。 如果组件支持批量订阅,则可以在该组件文档中找到此参数的默认值。
  • 您可以选择提供最长等待时间(maxAwaitDurationMs)在批量消息发送到应用程序之前。 不支持批量订阅的组件的 maxAwaitDurationMs 的默认值为1000,即默认情况下 App 和 Dapr 之间的批量事件。 请参考组件如何处理发布和订阅批量消息。 如果组件支持批量订阅,则可以在该组件文档中找到此参数的默认值。

应用程序收到一个 EntryId 与批量消息中的每个条目(单个消息)相关联。 这 EntryId 必须由应用程序用于传达该特定条目的状态。 如果应用程序在 EntryId 的状态上未能通知,那么它将被视为 RETRY

需要发送一个带有每个条目处理状态的JSON编码的负载主体:

  1. {
  2. "statuses":
  3. [
  4. {
  5. "entryId": "<entryId1>",
  6. "status": "<status>"
  7. },
  8. {
  9. "entryId": "<entryId2>",
  10. "status": "<status>"
  11. }
  12. ]
  13. }

可能的状态值:

状态说明
SUCCESS消息已成功处理
RETRY将由 Dapr 重试的消息
DROP警告被记录下来,信息被删除

请参考预期的批量订阅HTTP响应以获取有关响应的更多见解。

如何使用Dapr扩展来开发和运行Dapr应用程序

有关如何使用批量订阅,请参阅以下代码示例:

  1. import io.dapr.Topic;
  2. import io.dapr.client.domain.BulkSubscribeAppResponse;
  3. import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
  4. import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
  5. import io.dapr.client.domain.BulkSubscribeMessage;
  6. import io.dapr.client.domain.BulkSubscribeMessageEntry;
  7. import io.dapr.client.domain.CloudEvent;
  8. import io.dapr.springboot.annotations.BulkSubscribe;
  9. import org.springframework.web.bind.annotation.PostMapping;
  10. import org.springframework.web.bind.annotation.RequestBody;
  11. import reactor.core.publisher.Mono;
  12. class BulkSubscriber {
  13. @BulkSubscribe()
  14. // @BulkSubscribe(maxMessagesCount = 100, maxAwaitDurationMs = 40)
  15. @Topic(name = "topicbulk", pubsubName = "orderPubSub")
  16. @PostMapping(path = "/topicbulk")
  17. public Mono<BulkSubscribeAppResponse> handleBulkMessage(
  18. @RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
  19. return Mono.fromCallable(() -> {
  20. List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
  21. for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
  22. try {
  23. CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
  24. System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
  25. entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
  26. } catch (Exception e) {
  27. e.printStackTrace();
  28. entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
  29. }
  30. }
  31. return new BulkSubscribeAppResponse(entries);
  32. });
  33. }
  34. }
  1. import { DaprServer } from "@dapr/dapr";
  2. const pubSubName = "orderPubSub";
  3. const topic = "topicbulk";
  4. const daprHost = process.env.DAPR_HOST || "127.0.0.1";
  5. const daprPort = process.env.DAPR_HTTP_PORT || "3502";
  6. const serverHost = process.env.SERVER_HOST || "127.0.0.1";
  7. const serverPort = process.env.APP_PORT || 5001;
  8. async function start() {
  9. const server = new DaprServer({
  10. serverHost,
  11. serverPort,
  12. clientOptions: {
  13. daprHost,
  14. daprPort,
  15. },
  16. });
  17. // Publish multiple messages to a topic with default config.
  18. await client.pubsub.bulkSubscribeWithDefaultConfig(pubSubName, topic, (data) => console.log("Subscriber received: " + JSON.stringify(data)));
  19. // Publish multiple messages to a topic with specific maxMessagesCount and maxAwaitDurationMs.
  20. await client.pubsub.bulkSubscribeWithConfig(pubSubName, topic, (data) => console.log("Subscriber received: " + JSON.stringify(data)), 100, 40);
  21. }
  1. using Microsoft.AspNetCore.Mvc;
  2. using Dapr.AspNetCore;
  3. using Dapr;
  4. namespace DemoApp.Controllers;
  5. [ApiController]
  6. [Route("[controller]")]
  7. public class BulkMessageController : ControllerBase
  8. {
  9. private readonly ILogger<BulkMessageController> logger;
  10. public BulkMessageController(ILogger<BulkMessageController> logger)
  11. {
  12. this.logger = logger;
  13. }
  14. [BulkSubscribe("messages", 10, 10)]
  15. [Topic("pubsub", "messages")]
  16. public ActionResult<BulkSubscribeAppResponse> HandleBulkMessages([FromBody] BulkSubscribeMessage<BulkMessageModel<BulkMessageModel>> bulkMessages)
  17. {
  18. List<BulkSubscribeAppResponseEntry> responseEntries = new List<BulkSubscribeAppResponseEntry>();
  19. logger.LogInformation($"Received {bulkMessages.Entries.Count()} messages");
  20. foreach (var message in bulkMessages.Entries)
  21. {
  22. try
  23. {
  24. logger.LogInformation($"Received a message with data '{message.Event.Data.MessageData}'");
  25. responseEntries.Add(new BulkSubscribeAppResponseEntry(message.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
  26. }
  27. catch (Exception e)
  28. {
  29. logger.LogError(e.Message);
  30. responseEntries.Add(new BulkSubscribeAppResponseEntry(message.EntryId, BulkSubscribeAppResponseStatus.RETRY));
  31. }
  32. }
  33. return new BulkSubscribeAppResponse(responseEntries);
  34. }
  35. public class BulkMessageModel
  36. {
  37. public string MessageData { get; set; }
  38. }
  39. }

组件如何处理发布和订阅批量消息

对于事件发布/订阅,涉及到两种网络传输方式。

  1. 从/到 App 到/从 Dapr
  2. 从/到 Dapr 到/从 Pubsub Broker

这些是可以优化的机会。 当进行优化时,会进行批量请求,从而减少总的调用次数,提高吞吐量,提供更好的延迟。

启用批量发布和/或批量订阅后,应用程序与Dapr sidecar(上述第1点)之间的通信将得到优化,以便所有组件

从 Dapr sidecar 到 pub/sub broker 的优化取决于多个因素,例如:

  • Broker必须本质上支持批量发布/订阅
  • 必须更新Dapr 组件以支持 broker 提供的批量 API

目前,以下组件已更新以支持此级别的优化:

Component批量发布批量订阅
KafkaYesYes
Azure ServicebusYesYes
Azure EventhubsYesYes

例子

观看以下关于批量发布/订阅的演示和介绍。

KubeCon Europe 2023 presentation

Dapr社区会议#77演示

相关链接