Publish and subscribe to bulk messages

Learn how to use the bulk publish and subscribe APIs in Dapr.

alpha

The bulk publish and subscribe APIs are in alpha stage.

With the bulk publish and subscribe APIs, you can publish and subscribe to multiple messages in a single request. When writing applications that need to send or receive a large number of messages, using bulk operations allows achieving high throughput by reducing the overall number of requests between the Dapr sidecar, the application, and the underlying pub/sub broker.

Publishing messages in bulk

Restrictions when publishing messages in bulk

The bulk publish API allows you to publish multiple messages to a topic in a single request. It is non-transactional, i.e., from a single bulk request, some messages can succeed and some can fail. If any of the messages fail to publish, the bulk publish operation returns a list of failed messages.

The bulk publish operation also does not guarantee any ordering of messages.

Example

  1. import io.dapr.client.DaprClientBuilder;
  2. import io.dapr.client.DaprPreviewClient;
  3. import io.dapr.client.domain.BulkPublishResponse;
  4. import io.dapr.client.domain.BulkPublishResponseFailedEntry;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. class BulkPublisher {
  8. private static final String PUBSUB_NAME = "my-pubsub-name";
  9. private static final String TOPIC_NAME = "topic-a";
  10. public void publishMessages() {
  11. try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
  12. // Create a list of messages to publish
  13. List<String> messages = new ArrayList<>();
  14. for (int i = 0; i < 10; i++) {
  15. String message = String.format("This is message #%d", i);
  16. messages.add(message);
  17. }
  18. // Publish list of messages using the bulk publish API
  19. BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block();
  20. }
  21. }
  22. }
  1. import { DaprClient } from "@dapr/dapr";
  2. const pubSubName = "my-pubsub-name";
  3. const topic = "topic-a";
  4. async function start() {
  5. const client = new DaprClient();
  6. // Publish multiple messages to a topic.
  7. await client.pubsub.publishBulk(pubSubName, topic, ["message 1", "message 2", "message 3"]);
  8. // Publish multiple messages to a topic with explicit bulk publish messages.
  9. const bulkPublishMessages = [
  10. {
  11. entryID: "entry-1",
  12. contentType: "application/json",
  13. event: { hello: "foo message 1" },
  14. },
  15. {
  16. entryID: "entry-2",
  17. contentType: "application/cloudevents+json",
  18. event: {
  19. specversion: "1.0",
  20. source: "/some/source",
  21. type: "example",
  22. id: "1234",
  23. data: "foo message 2",
  24. datacontenttype: "text/plain"
  25. },
  26. },
  27. {
  28. entryID: "entry-3",
  29. contentType: "text/plain",
  30. event: "foo message 3",
  31. },
  32. ];
  33. await client.pubsub.publishBulk(pubSubName, topic, bulkPublishMessages);
  34. }
  35. start().catch((e) => {
  36. console.error(e);
  37. process.exit(1);
  38. });
  1. using System;
  2. using System.Collections.Generic;
  3. using Dapr.Client;
  4. const string PubsubName = "my-pubsub-name";
  5. const string TopicName = "topic-a";
  6. IReadOnlyList<object> BulkPublishData = new List<object>() {
  7. new { Id = "17", Amount = 10m },
  8. new { Id = "18", Amount = 20m },
  9. new { Id = "19", Amount = 30m }
  10. };
  11. using var client = new DaprClientBuilder().Build();
  12. var res = await client.BulkPublishEventAsync(PubsubName, TopicName, BulkPublishData);
  13. if (res == null) {
  14. throw new Exception("null response from dapr");
  15. }
  16. if (res.FailedEntries.Count > 0)
  17. {
  18. Console.WriteLine("Some events failed to be published!");
  19. foreach (var failedEntry in res.FailedEntries)
  20. {
  21. Console.WriteLine("EntryId: " + failedEntry.Entry.EntryId + " Error message: " +
  22. failedEntry.ErrorMessage);
  23. }
  24. }
  25. else
  26. {
  27. Console.WriteLine("Published all events!");
  28. }
  1. import requests
  2. import json
  3. base_url = "http://localhost:3500/v1.0-alpha1/publish/bulk/{}/{}"
  4. pubsub_name = "my-pubsub-name"
  5. topic_name = "topic-a"
  6. payload = [
  7. {
  8. "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
  9. "event": "first text message",
  10. "contentType": "text/plain"
  11. },
  12. {
  13. "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
  14. "event": {
  15. "message": "second JSON message"
  16. },
  17. "contentType": "application/json"
  18. }
  19. ]
  20. response = requests.post(base_url.format(pubsub_name, topic_name), json=payload)
  21. print(response.status_code)
  1. package main
  2. import (
  3. "fmt"
  4. "strings"
  5. "net/http"
  6. "io/ioutil"
  7. )
  8. const (
  9. pubsubName = "my-pubsub-name"
  10. topicName = "topic-a"
  11. baseUrl = "http://localhost:3500/v1.0-alpha1/publish/bulk/%s/%s"
  12. )
  13. func main() {
  14. url := fmt.Sprintf(baseUrl, pubsubName, topicName)
  15. method := "POST"
  16. payload := strings.NewReader(`[
  17. {
  18. "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
  19. "event": "first text message",
  20. "contentType": "text/plain"
  21. },
  22. {
  23. "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
  24. "event": {
  25. "message": "second JSON message"
  26. },
  27. "contentType": "application/json"
  28. }
  29. ]`)
  30. client := &http.Client {}
  31. req, _ := http.NewRequest(method, url, payload)
  32. req.Header.Add("Content-Type", "application/json")
  33. res, err := client.Do(req)
  34. // ...
  35. }
  1. curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a \
  2. -H 'Content-Type: application/json' \
  3. -d '[
  4. {
  5. "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
  6. "event": "first text message",
  7. "contentType": "text/plain"
  8. },
  9. {
  10. "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
  11. "event": {
  12. "message": "second JSON message"
  13. },
  14. "contentType": "application/json"
  15. },
  16. ]'
  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Uri 'http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a' `
  2. -Body '[
  3. {
  4. "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
  5. "event": "first text message",
  6. "contentType": "text/plain"
  7. },
  8. {
  9. "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
  10. "event": {
  11. "message": "second JSON message"
  12. },
  13. "contentType": "application/json"
  14. },
  15. ]'

Subscribing messages in bulk

The bulk subscribe API allows you to subscribe multiple messages from a topic in a single request. As we know from How to: Publish & Subscribe to topics, there are two ways to subscribe to topic(s):

  • Declaratively - subscriptions are defined in an external file.
  • Programmatically - subscriptions are defined in code.

To Bulk Subscribe to topic(s), we just need to use bulkSubscribe spec attribute, something like following:

  1. apiVersion: dapr.io/v2alpha1
  2. kind: Subscription
  3. metadata:
  4. name: order-pub-sub
  5. spec:
  6. topic: orders
  7. routes:
  8. default: /checkout
  9. pubsubname: order-pub-sub
  10. bulkSubscribe:
  11. enabled: true
  12. maxMessagesCount: 100
  13. maxAwaitDurationMs: 40
  14. scopes:
  15. - orderprocessing
  16. - checkout

In the example above, bulkSubscribe is optional. If you use bulkSubscribe, then:

  • enabled is mandatory and enables or disables bulk subscriptions on this topic
  • You can optionally configure the max number of messages (maxMessagesCount) delivered in a bulk message. Default value of maxMessagesCount for components not supporting bulk subscribe is 100 i.e. for default bulk events between App and Dapr. Please refer How components handle publishing and subscribing to bulk messages. If a component supports bulk subscribe, then default value for this parameter can be found in that component doc. Please refer Supported components.
  • You can optionally provide the max duration to wait (maxAwaitDurationMs) before a bulk message is sent to the app. Default value of maxAwaitDurationMs for components not supporting bulk subscribe is 1000 i.e. for default bulk events between App and Dapr. Please refer How components handle publishing and subscribing to bulk messages. If a component supports bulk subscribe, then default value for this parameter can be found in that component doc. Please refer Supported components.

The application receives an EntryId associated with each entry (individual message) in the bulk message. This EntryId must be used by the app to communicate the status of that particular entry. If the app fails to notify on an EntryId status, it’s considered a RETRY.

A JSON-encoded payload body with the processing status against each entry needs to be sent:

  1. {
  2. "statuses": {
  3. "entryId": "<entryId>",
  4. "status": "<status>"
  5. }
  6. }

Possible status values:

StatusDescription
SUCCESSMessage is processed successfully
RETRYMessage to be retried by Dapr
DROPWarning is logged and message is dropped

Please refer Expected HTTP Response for Bulk Subscribe for further insights on response.

Example

Please refer following code samples for how to use Bulk Subscribe:

  1. import io.dapr.Topic;
  2. import io.dapr.client.domain.BulkSubscribeAppResponse;
  3. import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
  4. import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
  5. import io.dapr.client.domain.BulkSubscribeMessage;
  6. import io.dapr.client.domain.BulkSubscribeMessageEntry;
  7. import io.dapr.client.domain.CloudEvent;
  8. import io.dapr.springboot.annotations.BulkSubscribe;
  9. import org.springframework.web.bind.annotation.PostMapping;
  10. import org.springframework.web.bind.annotation.RequestBody;
  11. import reactor.core.publisher.Mono;
  12. class BulkSubscriber {
  13. @BulkSubscribe()
  14. // @BulkSubscribe(maxMessagesCount = 100, maxAwaitDurationMs = 40)
  15. @Topic(name = "topicbulk", pubsubName = "orderPubSub")
  16. @PostMapping(path = "/topicbulk")
  17. public Mono<BulkSubscribeAppResponse> handleBulkMessage(
  18. @RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
  19. return Mono.fromCallable(() -> {
  20. List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
  21. for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
  22. try {
  23. CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
  24. System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
  25. entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
  26. } catch (Exception e) {
  27. e.printStackTrace();
  28. entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
  29. }
  30. }
  31. return new BulkSubscribeAppResponse(entries);
  32. });
  33. }
  34. }
  1. import { DaprServer } from "@dapr/dapr";
  2. const pubSubName = "orderPubSub";
  3. const topic = "topicbulk";
  4. const DAPR_HOST = process.env.DAPR_HOST || "127.0.0.1";
  5. const DAPR_HTTP_PORT = process.env.DAPR_HTTP_PORT || "3502";
  6. const SERVER_HOST = process.env.SERVER_HOST || "127.0.0.1";
  7. const SERVER_PORT = process.env.APP_PORT || 5001;
  8. async function start() {
  9. const server = new DaprServer(SERVER_HOST, SERVER_PORT, DAPR_HOST, DAPR_HTTP_PORT);
  10. // Publish multiple messages to a topic with default config.
  11. await client.pubsub.bulkSubscribeWithDefaultConfig(pubSubName, topic, (data) => console.log("Subscriber received: " + JSON.stringify(data)));
  12. // Publish multiple messages to a topic with specific maxMessagesCount and maxAwaitDurationMs.
  13. await client.pubsub.bulkSubscribeWithConfig(pubSubName, topic, (data) => console.log("Subscriber received: " + JSON.stringify(data)), 100, 40);
  14. }

How components handle publishing and subscribing to bulk messages

Some pub/sub brokers support sending and receiving multiple messages in a single request. When a component supports bulk publish or subscribe operations, Dapr runtime uses them to further optimize the communication between the Dapr sidecar and the underlying pub/sub broker.

For components that do not have bulk publish or subscribe support, Dapr runtime uses the regular publish and subscribe APIs to send and receive messages one by one. This is still more efficient than directly using the regular publish or subscribe APIs, because applications can still send/receive multiple messages in a single request to/from Dapr.

Watch the demo

Watch this video for an demo on bulk pub/sub:

Last modified February 16, 2023: document all brokers support bulk pub/sub (#3179) (914e1c53)