发布和订阅使用Cloudevents的消息

了解 Dapr 为何使用 CloudEvents,它们如何在 Dapr 发布/订阅中工作,以及如何创建 CloudEvents。

为了启用消息路由并为每个消息提供附加上下文,Dapr使用CloudEvents 1.0规范作为其消息格式。 任何通过 Dapr 发送到 topic 的应用程序消息都会自动被包装在 CloudEvents 信封中,使用 Content-Type 头部的值 作为 datacontenttype 属性。

Dapr 使用 CloudEvents 为事件负载提供额外的上下文,从而启用以下功能:

  • 追踪
  • 用于正确反序列化事件数据的 Content-type
  • 发送方应用程序的验证

您可以选择三种方法之一通过发布/订阅来发布 CloudEvent:

  1. 发送一个发布/订阅事件,然后由 Dapr 封装在 CloudEvent 包裹中。
  2. 通过覆盖标准 CloudEvent 属性,替换 Dapr 提供的特定 CloudEvents 属性。
  3. 作为发布/订阅事件的一部分,编写自己的CloudEvent信封。

Dapr生成的CloudEvents示例

自动将发布操作发送到 Dapr 时,会自动将其包装在一个包含以下字段的 CloudEvent 包裹中:

  • id
  • source
  • specversion
  • 类型
  • traceparent
  • traceid
  • tracestate
  • topic
  • pubsubname
  • time
  • datacontenttype (可选)

以下示例演示了由Dapr生成的CloudEvent,用于发布操作到包含 orders 的主题:

  • 一种唯一标识消息的 W3C traceid
  • data 以及云事件的字段,其中数据内容被序列化为JSON
  1. {
  2. "topic": "orders",
  3. "pubsubname": "order_pub_sub",
  4. "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  5. "tracestate": "",
  6. "data": {
  7. "orderId": 1
  8. },
  9. "id": "5929aaac-a5e2-4ca1-859c-edfe73f11565",
  10. "specversion": "1.0",
  11. "datacontenttype": "application/json; charset=utf-8",
  12. "source": "checkout",
  13. "type": "com.dapr.event.sent",
  14. "time": "2020-09-23T06:23:21Z",
  15. "traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
  16. }

作为v1.0 CloudEvent的另一个示例,以下展示了将数据作为XML内容以JSON序列化的CloudEvent消息:

  1. {
  2. "topic": "orders",
  3. "pubsubname": "order_pub_sub",
  4. "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  5. "tracestate": "",
  6. "data" : "<note><to></to><from>user2</from><message>Order</message></note>",
  7. "id" : "id-1234-5678-9101",
  8. "specversion" : "1.0",
  9. "datacontenttype" : "text/xml",
  10. "subject" : "Test XML Message",
  11. "source" : "https://example.com/message",
  12. "type" : "xml.message",
  13. "time" : "2020-09-23T06:23:21Z"
  14. }

替换 Dapr 生成的 CloudEvents 值

Dapr 会自动生成多个 CloudEvent 属性。 您可以通过提供以下可选的元数据键/值来替换这些生成的 CloudEvent 属性:

  • cloudevent.id: 重写 id
  • cloudevent.source: 重写 source
  • cloudevent.type: 重写 type
  • cloudevent.traceid: 重写 traceid
  • cloudevent.tracestate: 覆盖 tracestate
  • cloudevent.traceparent: 覆盖 traceparent

使用这些元数据属性替换CloudEvents属性的能力适用于所有发布/订阅组件。

如何使用Dapr扩展来开发和运行Dapr应用程序

例如,要在代码中替换上面的CloudEvent示例中的sourceid值:

  1. with DaprClient() as client:
  2. order = {'orderId': i}
  3. # Publish an event/message using Dapr PubSub
  4. result = client.publish_event(
  5. pubsub_name='order_pub_sub',
  6. topic_name='orders',
  7. publish_metadata={'cloudevent.id': 'd99b228f-6c73-4e78-8c4d-3f80a043d317', 'cloudevent.source': 'payment'}
  8. )
  1. var order = new Order(i);
  2. using var client = new DaprClientBuilder().Build();
  3. // Override cloudevent metadata
  4. var metadata = new Dictionary<string,string>() {
  5. { "cloudevent.source", "payment" },
  6. { "cloudevent.id", "d99b228f-6c73-4e78-8c4d-3f80a043d317" }
  7. }
  8. // Publish an event/message using Dapr PubSub
  9. await client.PublishEventAsync("order_pub_sub", "orders", order, metadata);
  10. Console.WriteLine("Published data: " + order);
  11. await Task.Delay(TimeSpan.FromSeconds(1));

然后,JSON有效负载会反映新的sourceid值:

  1. {
  2. "topic": "orders",
  3. "pubsubname": "order_pub_sub",
  4. "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  5. "tracestate": "",
  6. "data": {
  7. "orderId": 1
  8. },
  9. "id": "d99b228f-6c73-4e78-8c4d-3f80a043d317",
  10. "specversion": "1.0",
  11. "datacontenttype": "application/json; charset=utf-8",
  12. "source": "payment",
  13. "type": "com.dapr.event.sent",
  14. "time": "2020-09-23T06:23:21Z",
  15. "traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
  16. }

重要

虽然你可以替换 traceid/traceparenttracestate,否则可能会干扰跟踪事件,并在跟踪工具中报告不一致的结果。 推荐使用Open Telemetry进行分布式跟踪。 了解更多关于分布式追踪。

发布您自己的 CloudEvent

如果您想使用自己的CloudEvent,请确保将datacontenttype指定为application/cloudevents+json

如果由应用程序创建的CloudEvent不包含CloudEvent规范中的 个最低要求字段 ,则该消息将被拒绝。 如果缺少以下字段,Dapr将添加到CloudEvent中:

  • time
  • traceid
  • traceparent
  • tracestate
  • topic
  • pubsubname
  • source
  • 类型
  • specversion

您可以向自定义 CloudEvent 添加不属于官方 CloudEvent 规范的附加字段。 Dapr 将按原样传递这些字段。

如何使用Dapr扩展来开发和运行Dapr应用程序

将一个 CloudEvent 发布到 orders 主题:

  1. dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{\"orderId\": \"100\"}'

将一个 CloudEvent 发布到 orders 主题:

  1. curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders -H "Content-Type: application/cloudevents+json" -d '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}'

将一个 CloudEvent 发布到 orders 主题:

  1. Invoke-RestMethod -Method Post -ContentType 'application/cloudevents+json' -Body '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'

事件去重

当使用由Dapr创建的Cloud Events时,信封中包含一个id字段,应用程序可以使用该字段执行消息去重操作。 Dapr 不会自动处理去重。 Dapr 支持使用原生支持消息去重的消息代理。

下一步