指南:发布消息并订阅主题

了解如何使用一个服务向主题发送消息,并在另一个服务中订阅该主题

介绍

Pub/Sub 是一个分布式系统中的常见模式,它有许多服务用于解偶、异步消息传递。 使用Pub/Sub,您可以在事件消费者与事件生产者解偶的场景中启用。

Dapr 提供了一个可扩展的 Pub/Sub 系统(保证消息至少传递一次),允许开发者发布和订阅主题。 Dapr 为 Pub/Sub 提供组件,使操作者能够使用他们所喜欢的基础设施,例如 Redis Streams 和 Kafka 等。

步骤 1: 设置 Pub/Sub 组件

当发布消息时,必须指定所发送数据的内容类型。 除非指定, Dapr 将假定类型为 text/plain。 当使用 Dapr 的 HTTP API时,内容类型可以设置在 Content-Type 头中。 gRPC 客户端和 SDK 有一个专用的内容类型参数。

步骤 1: 设置 Pub/Sub 组件

然后发布一条消息给 deathStarStatus 主题:

How-To: Publish & subscribe - 图1

第一步是设置 Pub/Sub 组件:

运行 dapr init 时默认在本地机器上安装 Redis 流。

在 Linux/MacOS 上打开 ~/.dapr/components/pubsub.yam 或在 Windows 上打开%UserProfile%\.dapr\components\pubsub.yaml 组件文件以验证:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: pubsub
  5. spec:
  6. type: pubsub.redis
  7. version: v1
  8. metadata:
  9. - name: redisHost
  10. value: localhost:6379
  11. - name: redisPassword
  12. value: ""

您可以重写这个文件以使用另一个 Redis 实例或者另一个 pubsub component ,通过创建 components 文件夹(文件夹中包含重写的文件)并在 dapr run 命令行界面使用 --components-path 标志。

要将其部署到 Kubernetes 群集中,请为你想要的 pubsub 组件 在下面的 yaml metadata 中填写链接详情,保存为 pubsub.yaml,然后运行 kubectl apply -f pubsub.yaml

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: pubsub
  5. namespace: default
  6. spec:
  7. type: pubsub.redis
  8. version: v1
  9. metadata:
  10. - name: redisHost
  11. value: localhost:6379
  12. - name: redisPassword
  13. value: ""

步骤 2: 订阅主题

Dapr 允许两种方法订阅主题:

  • Declaratively, where subscriptions are defined in an external file.
  • 编程方式,订阅在用户代码中定义

Note

声明和编程方式都支持相同的功能。 声明的方式从用户代码中移除对 Dapr 的依赖性,并允许使用现有应用程序订阅主题。 编程方法在用户代码中实现订阅。

声明式订阅

您可以使用以下自定义资源定义 (CRD) 订阅主题。 创建名为 subscription.yaml 的文件并粘贴以下内容:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Subscription
  3. metadata:
  4. name: myevent-subscription
  5. spec:
  6. topic: deathStarStatus
  7. route: /dsstatus
  8. pubsubname: pubsub
  9. scopes:
  10. - app1
  11. - app2

上面的示例显示了 deathStarStatus主题的事件订阅,对于pubsub 组件 pubsub

  • route 告诉 Dapr 将所有主题消息发送到应用程序中的 /dsstatus 端点。
  • scopesapp1app2 启用订阅。

设置组件:

将 CRD 放在 ./components 目录中。 当 Dapr 启动时,它将加载组件和订阅。

注意:默认情况下,在 MacOS/Linux 上从 $HOME/.dapr/components 加载组件,以及 %USERPROFILE%\.dapr\components 在Windows上。

还可以通过将 Dapr CLI 指向组件路径来覆盖默认目录:

  1. dapr run --app-id myapp --components-path ./myComponents -- python3 app1.py

注意:如果你将订阅置于自定义组件路径中,请确保Pub/Sub 组件也存在。

在 Kubernetes 中,将 CRD 保存到文件中并将其应用于群集:

  1. kubectl apply -f subscription.yaml

示例

创建名为 app1.py 的文件,并粘贴以下内容:

  1. import flask
  2. from flask import request, jsonify
  3. from flask_cors import CORS
  4. import json
  5. import sys
  6. app = flask.Flask(__name__)
  7. CORS(app)
  8. @app.route('/dsstatus', methods=['POST'])
  9. def ds_subscriber():
  10. print(request.json, flush=True)
  11. return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
  12. app.run()

创建名为” app1.py 的文件,并粘贴如下内容:

  1. pip install flask
  2. pip install flask_cors

创建 app1.py 后,确保 flask 和 flask_cors 已经安装了:

  1. dapr --app-id app1 --app-port 5000 run python app1.py

设置上述订阅后,将此 javascript(Node > 4.16)下载到 app2.js 文件中:

  1. const express = require('express')
  2. const bodyParser = require('body-parser')
  3. const app = express()
  4. app.use(bodyParser.json({ type: 'application/*+json' }));
  5. const port = 3000
  6. app.post('/dsstatus', (req, res) => {
  7. console.log(req.body);
  8. res.sendStatus(200);
  9. });
  10. app.listen(port, () => console.log(`consumer app listening on port ${port}!`))

设置上述订阅后,将此 javascript(Node > 4.16)下载到 app2.js 文件中:

  1. dapr --app-id app2 --app-port 3000 run node app2.js

创建名为 app1.py 的文件,并粘贴以下内容:

  1. <?php
  2. require_once __DIR__.'/vendor/autoload.php';
  3. $app = \Dapr\App::create();
  4. $app->post('/dsstatus', function(
  5. #[\Dapr\Attributes\FromBody]
  6. \Dapr\PubSub\CloudEvent $cloudEvent,
  7. \Psr\Log\LoggerInterface $logger
  8. ) {
  9. $logger->alert('Received event: {event}', ['event' => $cloudEvent]);
  10. return ['status' => 'SUCCESS'];
  11. }
  12. );
  13. $app->start();

在创建 app1.php并安装 SDK后, 继续启动应用程序:

  1. dapr --app-id app1 --app-port 3000 run -- php -S 0.0.0.0:3000 app1.php

编程方式订阅

若要订阅主题,请使用您选择的编程语言启动 Web 服务器,并监听以下 GET 终结点: /dapr/subscribe。 Dapr 实例将在启动时调用到您的应用,并期望对的订阅主题响应 JOSN:

  • pubsubname: Dapr 用到的 pub/sub 组件
  • topic: 订阅的主题
  • route:当消息来到该主题时,Dapr 需要调用哪个终结点

示例

  1. import flask
  2. from flask import request, jsonify
  3. from flask_cors import CORS
  4. import json
  5. import sys
  6. app = flask.Flask(__name__)
  7. CORS(app)
  8. @app.route('/dapr/subscribe', methods=['GET'])
  9. def subscribe():
  10. subscriptions = [{'pubsubname': 'pubsub',
  11. 'topic': 'deathStarStatus',
  12. 'route': 'dsstatus'}]
  13. return jsonify(subscriptions)
  14. @app.route('/dsstatus', methods=['POST'])
  15. def ds_subscriber():
  16. print(request.json, flush=True)
  17. return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
  18. app.run()

创建 app1.py 后,确保 flask 和 flask_cors 已经安装了:

  1. pip install flask
  2. pip install flask_cors

然后运行:

  1. dapr --app-id app1 --app-port 5000 run python app1.py
  1. const express = require('express')
  2. const bodyParser = require('body-parser')
  3. const app = express()
  4. app.use(bodyParser.json({ type: 'application/*+json' }));
  5. const port = 3000
  6. app.get('/dapr/subscribe', (req, res) => {
  7. res.json([
  8. {
  9. pubsubname: "pubsub",
  10. topic: "deathStarStatus",
  11. route: "dsstatus"
  12. }
  13. ]);
  14. })
  15. app.post('/dsstatus', (req, res) => {
  16. console.log(req.body);
  17. res.sendStatus(200);
  18. });
  19. app.listen(port, () => console.log(`consumer app listening on port ${port}!`))

运行此应用:

  1. dapr --app-id app2 --app-port 3000 run node app2.js

更新 app1.php

  1. <?php
  2. require_once __DIR__.'/vendor/autoload.php';
  3. $app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
  4. new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'deathStarStatus', route: '/dsstatus'),
  5. ]]));
  6. $app->post('/dsstatus', function(
  7. #[\Dapr\Attributes\FromBody]
  8. \Dapr\PubSub\CloudEvent $cloudEvent,
  9. \Psr\Log\LoggerInterface $logger
  10. ) {
  11. $logger->alert('Received event: {event}', ['event' => $cloudEvent]);
  12. return ['status' => 'SUCCESS'];
  13. }
  14. );
  15. $app->start();

运行此应用:

  1. dapr --app-id app1 --app-port 3000 run -- php -S 0.0.0.0:3000 app1.php

/dsstatus 终结点与订阅中定义的 route 相匹配,这是 Dapr 将所有主题消息发送至的位置。

步骤 3: 发布主题

要发布主题,您需要运行一个 Dapr sidecar 的实例才能使用 Pub/Sub Redis 组件。 您可以使用安装在您本地环境中的默认的Redis组件。

用名为 testpubsub 的 app-id 启动一个 Dapr 实例:

  1. dapr run --app-id testpubsub --dapr-http-port 3500

然后发布一条消息给 deathStarStatus 主题:

  1. dapr publish --publish-app-id testpubsub --pubsub pubsub --topic deathStarStatus --data '{"status": "completed"}'

然后发布一条消息给 deathStarStatus 主题:

  1. curl -X POST http://localhost:3500/v1.0/publish/pubsub/deathStarStatus -H "Content-Type: application/json" -d '{"status": "completed"}'

然后发布一条消息给 deathStarStatus 主题:

  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"status": "completed"}' -Uri 'http://localhost:3500/v1.0/publish/pubsub/deathStarStatus'

Dapr 将在符合 Cloud Events v1.0 的信封中自动包装用户有效负载,对 datacontenttype 属性使用 Content-Type 头值。

步骤 4: ACK-ing 消息

为了告诉Dapr 消息处理成功,返回一个 200 OK 响应。 如果 Dapr 收到超过 200 的返回状态代码,或者你的应用崩溃,Dapr 将根据 At-Least-Once 语义尝试重新传递消息。

示例

  1. @app.route('/dsstatus', methods=['POST'])
  2. def ds_subscriber():
  3. print(request.json, flush=True)
  4. return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
  1. app.post('/dsstatus', (req, res) => {
  2. res.sendStatus(200);
  3. });

(可选) 步骤5:发布带有代码的主题

如果您喜欢使用代码发布一个主题,下面就是一个例子。

  1. const express = require('express');
  2. const path = require('path');
  3. const request = require('request');
  4. const bodyParser = require('body-parser');
  5. const app = express();
  6. app.use(bodyParser.json());
  7. const daprPort = process.env.DAPR_HTTP_PORT || 3500;
  8. const daprUrl = `http://localhost:${daprPort}/v1.0`;
  9. const port = 8080;
  10. const pubsubName = 'pubsub';
  11. app.post('/publish', (req, res) => {
  12. console.log("Publishing: ", req.body);
  13. const publishUrl = `${daprUrl}/publish/${pubsubName}/deathStarStatus`;
  14. request( { uri: publishUrl, method: 'POST', json: req.body } );
  15. res.sendStatus(200);
  16. });
  17. app.listen(process.env.PORT || port, () => console.log(`Listening on port ${port}!`));

如果您喜欢使用代码发布一个主题,下面就是一个例子。

  1. <?php
  2. require_once __DIR__.'/vendor/autoload.php';
  3. $app = \Dapr\App::create();
  4. $app->run(function(\DI\FactoryInterface $factory, \Psr\Log\LoggerInterface $logger) {
  5. $publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
  6. $publisher->topic('deathStarStatus')->publish('operational');
  7. $logger->alert('published!');
  8. });

您可以将此保存到 app2.phpapp1 正在另一个终端中运行时,执行:

  1. dapr --app-id app2 run -- php app2.php

发送自定义 CloudEvent

Dapr 自动接收发布请求上发送的数据,并将其包装在CloudEvent 1.0 信封中。 如果您想使用自己自定义的 CloudEvent,请确保指定内容类型为 application/ cloudevents+json

Read about content types here, and about the Cloud Events message format.

下一步