Pub/sub

创建一个Pub/sub组件只需要几个基本步骤。

添加 pub/sub 命名空间

添加 using 语句来引用与Pub/sub(发布/订阅)相关的命名空间。

  1. using Dapr.PluggableComponents.Components;
  2. using Dapr.PluggableComponents.Components.PubSub;

实现 IPubSub

创建一个实现IPubSub接口的类。

  1. internal sealed class MyPubSub : IPubSub
  2. {
  3. public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
  4. {
  5. // Called to initialize the component with its configured metadata...
  6. }
  7. public Task PublishAsync(PubSubPublishRequest request, CancellationToken cancellationToken = default)
  8. {
  9. // Send the message to the "topic"...
  10. }
  11. public Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
  12. {
  13. // Until canceled, check the topic for messages and deliver them to the Dapr runtime...
  14. }
  15. }

PullMessagesAsync() 方法的调用是“长时间运行”的,即该方法不会在取消之前返回(例如,通过 cancellationToken). “topic”从中拉取消息的方式是通过topic参数传递,而将消息传递给Dapr运行时是通过deliveryHandler回调函数执行的。 Delivery 允许组件在应用程序(由 Dapr 运行时提供服务)确认处理消息后,接收通知。

  1. public async Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
  2. {
  3. TimeSpan pollInterval = // Polling interval (e.g. from initalization metadata)...
  4. // Poll the topic until canceled...
  5. while (!cancellationToken.IsCancellationRequested)
  6. {
  7. var messages = // Poll topic for messages...
  8. foreach (var message in messages)
  9. {
  10. // Deliver the message to the Dapr runtime...
  11. await deliveryHandler(
  12. new PubSubPullMessagesResponse(topicName)
  13. {
  14. // Set the message content...
  15. },
  16. // Callback invoked when application acknowledges the message...
  17. async errorMessage =>
  18. {
  19. // An empty message indicates the application successfully processed the message...
  20. if (String.IsNullOrEmpty(errorMessage))
  21. {
  22. // Delete the message from the topic...
  23. }
  24. })
  25. }
  26. // Wait for the next poll (or cancellation)...
  27. await Task.Delay(pollInterval, cancellationToken);
  28. }
  29. }

注册pub/sub组件

在主程序文件中(例如,Program.cs),在应用程序服务中注册Pub/sub(发布/订阅)组件。

  1. using Dapr.PluggableComponents;
  2. var app = DaprPluggableComponentsApplication.Create();
  3. app.RegisterService(
  4. "<socket name>",
  5. serviceBuilder =>
  6. {
  7. serviceBuilder.RegisterPubSub<MyPubSub>();
  8. });
  9. app.Run();