Server

介绍

Dapr 服务器将允许您从 Dapr Sidecar 接收通信,并访问其面向服务器的功能,例如:订阅事件、接收输入绑定等等。

先决条件

安装和导入 Dapr 的 JS SDK

  1. 使用 npm 安装 SDK:
  1. npm i @dapr/dapr --save
  1. 导入类库:
  1. import { DaprServer, CommunicationProtocolEnum } from "@dapr/dapr";
  2. const daprHost = "127.0.0.1"; // Dapr Sidecar Host
  3. const daprPort = "3500"; // Dapr Sidecar Port of this Example Server
  4. const serverHost = "127.0.0.1"; // App Host of this Example Server
  5. const serverPort = "50051"; // App Port of this Example Server
  6. // HTTP Example
  7. const server = new DaprServer({
  8. serverHost,
  9. serverPort,
  10. communicationProtocol: CommunicationProtocolEnum.HTTP, // DaprClient to use same communication protocol as DaprServer, in case DaprClient protocol not mentioned explicitly
  11. clientOptions: {
  12. daprHost,
  13. daprPort,
  14. },
  15. });
  16. // GRPC Example
  17. const server = new DaprServer({
  18. serverHost,
  19. serverPort,
  20. communicationProtocol: CommunicationProtocolEnum.GRPC,
  21. clientOptions: {
  22. daprHost,
  23. daprPort,
  24. },
  25. });

运行

要运行这些示例,您可以使用两种不同的协议与 Dapr Sidecar 进行交互:HTTP(默认)或 gRPC。

使用 HTTP(内置 express webserver)

  1. import { DaprServer } from "@dapr/dapr";
  2. const server = new DaprServer({
  3. serverHost: appHost,
  4. serverPort: appPort,
  5. clientOptions: {
  6. daprHost,
  7. daprPort,
  8. },
  9. });
  10. // initialize subscribtions, ... before server start
  11. // the dapr sidecar relies on these
  12. await server.start();
  1. # Using dapr run
  2. dapr run --app-id example-sdk --app-port 50051 --app-protocol http -- npm run start
  3. # or, using npm script
  4. npm run start:dapr-http

ℹ️ 注意: 在这里需要填写app-port,因为这是我们的服务器需要绑定的地方。 Dapr 在完成启动之前,将检查是否有应用程序绑定到该端口。

使用 HTTP(自带的 express webserver)

除了使用内置的 Web 服务器进行 Dapr sidecar 与应用程序的通信之外,您还可以自带实例。 这在诸如构建 REST API 后端并希望直接集成 Dapr 的情况下非常有帮助。

注意,目前仅适用于express

💡 注意:当使用自定义的Web服务器时,SDK将配置服务器属性,如最大body大小,并添加新的路由到其中。 这些路由本身是唯一的,以避免与您的应用程序发生冲突,但不能保证不会发生碰撞。

  1. import { DaprServer, CommunicationProtocolEnum } from "@dapr/dapr";
  2. import express from "express";
  3. const myApp = express();
  4. myApp.get("/my-custom-endpoint", (req, res) => {
  5. res.send({ msg: "My own express app!" });
  6. });
  7. const daprServer = new DaprServer({
  8. serverHost: "127.0.0.1", // App Host
  9. serverPort: "50002", // App Port
  10. serverHttp: myApp,
  11. clientOptions: {
  12. daprHost
  13. daprPort
  14. }
  15. });
  16. // Initialize subscriptions before the server starts, the Dapr sidecar uses it.
  17. // This will also initialize the app server itself (removing the need for `app.listen` to be called).
  18. await daprServer.start();

在配置完上述内容后,您可以像平常一样调用您的自定义终端点:

  1. const res = await fetch(`http://127.0.0.1:50002/my-custom-endpoint`);
  2. const json = await res.json();

使用 gRPC

由于 HTTP 是默认设置,因此必须调整通信协议才能使用 gRPC。 您可以通过向客户端或服务器构造函数传递一个额外的参数来做到这一点。

  1. import { DaprServer, CommunicationProtocol } from "@dapr/dapr";
  2. const server = new DaprServer({
  3. serverHost: appHost,
  4. serverPort: appPort,
  5. communicationProtocol: CommunicationProtocolEnum.GRPC,
  6. clientOptions: {
  7. daprHost,
  8. daprPort,
  9. },
  10. });
  11. // initialize subscribtions, ... before server start
  12. // the dapr sidecar relies on these
  13. await server.start();
  1. # Using dapr run
  2. dapr run --app-id example-sdk --app-port 50051 --app-protocol grpc -- npm run start
  3. # or, using npm script
  4. npm run start:dapr-grpc

ℹ️ 注意: 在这里需要填写app-port,因为这是我们的服务器需要绑定的地方。 Dapr 在完成启动之前,将检查是否有应用程序绑定到该端口。

构建块

JavaScript Server SDK 允许您与所有Dapr构建块专注于Sidecar到App功能。

调用 API

侦听调用

  1. import { DaprServer, DaprInvokerCallbackContent } from "@dapr/dapr";
  2. const daprHost = "127.0.0.1"; // Dapr Sidecar Host
  3. const daprPort = "3500"; // Dapr Sidecar Port of this Example Server
  4. const serverHost = "127.0.0.1"; // App Host of this Example Server
  5. const serverPort = "50051"; // App Port of this Example Server "
  6. async function start() {
  7. const server = new DaprServer({
  8. serverHost,
  9. serverPort,
  10. clientOptions: {
  11. daprHost,
  12. daprPort,
  13. },
  14. });
  15. const callbackFunction = (data: DaprInvokerCallbackContent) => {
  16. console.log("Received body: ", data.body);
  17. console.log("Received metadata: ", data.metadata);
  18. console.log("Received query: ", data.query);
  19. console.log("Received headers: ", data.headers); // only available in HTTP
  20. };
  21. await server.invoker.listen("hello-world", callbackFunction, { method: HttpMethod.GET });
  22. // You can now invoke the service with your app id and method "hello-world"
  23. await server.start();
  24. }
  25. start().catch((e) => {
  26. console.error(e);
  27. process.exit(1);
  28. });

有关服务调用的完整指南,请访问操作方法: 调用服务

Pub/Sub API

订阅消息

订阅消息可以通过多种方式进行,以提供在您的主题上灵活接收消息的能力:

  • 通过subscribe方法直接订阅
  • 通过subscribeWithOptions方法以选项直接订阅
  • 之后通过susbcribeOnEvent方法订阅

每次事件到达时,我们将其正文作为data传递,并将头部作为headers,其中可以包含事件发布者的属性(例如来自IoT Hub的设备ID)

Dapr 在启动时需要设置订阅,但在 JS SDK 中我们也允许在之后添加事件处理程序,为您提供编程的灵活性。

下面提供了一个示例

  1. import { DaprServer } from "@dapr/dapr";
  2. const daprHost = "127.0.0.1"; // Dapr Sidecar Host
  3. const daprPort = "3500"; // Dapr Sidecar Port of this Example Server
  4. const serverHost = "127.0.0.1"; // App Host of this Example Server
  5. const serverPort = "50051"; // App Port of this Example Server "
  6. async function start() {
  7. const server = new DaprServer({
  8. serverHost,
  9. serverPort,
  10. clientOptions: {
  11. daprHost,
  12. daprPort,
  13. },
  14. });
  15. const pubSubName = "my-pubsub-name";
  16. const topic = "topic-a";
  17. // Configure Subscriber for a Topic
  18. // Method 1: Direct subscription through the `subscribe` method
  19. await server.pubsub.subscribe(pubSubName, topic, async (data: any, headers: object) =>
  20. console.log(`Received Data: ${JSON.stringify(data)} with headers: ${JSON.stringify(headers)}`),
  21. );
  22. // Method 2: Direct susbcription with options through the `subscribeWithOptions` method
  23. await server.pubsub.subscribeWithOptions(pubSubName, topic, {
  24. callback: async (data: any, headers: object) =>
  25. console.log(`Received Data: ${JSON.stringify(data)} with headers: ${JSON.stringify(headers)}`),
  26. });
  27. // Method 3: Subscription afterwards through the `susbcribeOnEvent` method
  28. // Note: we use default, since if no route was passed (empty options) we utilize "default" as the route name
  29. await server.pubsub.subscribeWithOptions("pubsub-redis", "topic-options-1", {});
  30. server.pubsub.subscribeToRoute("pubsub-redis", "topic-options-1", "default", async (data: any, headers: object) => {
  31. console.log(`Received Data: ${JSON.stringify(data)} with headers: ${JSON.stringify(headers)}`);
  32. });
  33. // Start the server
  34. await server.start();
  35. }

有关状态操作的完整列表,请访问操作方法: 发布 & 订阅

以 SUCCESS/RETRY/DROP 状态订阅

Dapr支持重试逻辑的状态码,用于指定消息处理后应该发生什么。

⚠️ JS SDK允许在同一主题上进行多个回调,我们根据RETRY > DROP > SUCCESS的优先级处理状态,并默认为SUCCESS

⚠️ 确保在您的应用程序中配置弹性来处理RETRY消息

在JS SDK中,我们通过DaprPubSubStatusEnum枚举来支持这些消息。 为了确保 Dapr 会进行重试,我们需要配置一个弹性策略。

components/resiliency.yaml

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Resiliency
  3. metadata:
  4. name: myresiliency
  5. spec:
  6. policies:
  7. retries:
  8. # Global Retry Policy for Inbound Component operations
  9. DefaultComponentInboundRetryPolicy:
  10. policy: constant
  11. duration: 500ms
  12. maxRetries: 10
  13. targets:
  14. components:
  15. messagebus:
  16. inbound:
  17. retry: DefaultComponentInboundRetryPolicy

src/index.ts

  1. import { DaprServer, DaprPubSubStatusEnum } from "@dapr/dapr";
  2. const daprHost = "127.0.0.1"; // Dapr Sidecar Host
  3. const daprPort = "3500"; // Dapr Sidecar Port of this Example Server
  4. const serverHost = "127.0.0.1"; // App Host of this Example Server
  5. const serverPort = "50051"; // App Port of this Example Server "
  6. async function start() {
  7. const server = new DaprServer({
  8. serverHost,
  9. serverPort,
  10. clientOptions: {
  11. daprHost,
  12. daprPort,
  13. },
  14. });
  15. const pubSubName = "my-pubsub-name";
  16. const topic = "topic-a";
  17. // Process a message successfully
  18. await server.pubsub.subscribe(pubSubName, topic, async (data: any, headers: object) => {
  19. return DaprPubSubStatusEnum.SUCCESS;
  20. });
  21. // Retry a message
  22. // Note: this example will keep on retrying to deliver the message
  23. // Note 2: each component can have their own retry configuration
  24. // e.g., https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-redis-pubsub/
  25. await server.pubsub.subscribe(pubSubName, topic, async (data: any, headers: object) => {
  26. return DaprPubSubStatusEnum.RETRY;
  27. });
  28. // Drop a message
  29. await server.pubsub.subscribe(pubSubName, topic, async (data: any, headers: object) => {
  30. return DaprPubSubStatusEnum.DROP;
  31. });
  32. // Start the server
  33. await server.start();
  34. }

订阅基于规则的消息

Dapr 支持根据规则将消息路由到不同的处理程序(路由)。

例如,您正在编写一个应用程序,需要根据其”type”使用Dapr处理消息,您可以将它们发送到不同的路由handlerType1handlerType2,默认路由为handlerDefault

  1. import { DaprServer } from "@dapr/dapr";
  2. const daprHost = "127.0.0.1"; // Dapr Sidecar Host
  3. const daprPort = "3500"; // Dapr Sidecar Port of this Example Server
  4. const serverHost = "127.0.0.1"; // App Host of this Example Server
  5. const serverPort = "50051"; // App Port of this Example Server "
  6. async function start() {
  7. const server = new DaprServer({
  8. serverHost,
  9. serverPort,
  10. clientOptions: {
  11. daprHost,
  12. daprPort,
  13. },
  14. });
  15. const pubSubName = "my-pubsub-name";
  16. const topic = "topic-a";
  17. // Configure Subscriber for a Topic with rule set
  18. // Note: the default route and match patterns are optional
  19. await server.pubsub.subscribe("pubsub-redis", "topic-1", {
  20. default: "/default",
  21. rules: [
  22. {
  23. match: `event.type == "my-type-1"`,
  24. path: "/type-1",
  25. },
  26. {
  27. match: `event.type == "my-type-2"`,
  28. path: "/type-2",
  29. },
  30. ],
  31. });
  32. // Add handlers for each route
  33. server.pubsub.subscribeToRoute("pubsub-redis", "topic-1", "default", async (data) => {
  34. console.log(`Handling Default`);
  35. });
  36. server.pubsub.subscribeToRoute("pubsub-redis", "topic-1", "type-1", async (data) => {
  37. console.log(`Handling Type 1`);
  38. });
  39. server.pubsub.subscribeToRoute("pubsub-redis", "topic-1", "type-2", async (data) => {
  40. console.log(`Handling Type 2`);
  41. });
  42. // Start the server
  43. await server.start();
  44. }

使用通配符订阅

支持常用的通配符 *+ (请确保验证 pubsub 组件是否支持),可以按如下方式订阅:

  1. import { DaprServer } from "@dapr/dapr";
  2. const daprHost = "127.0.0.1"; // Dapr Sidecar Host
  3. const daprPort = "3500"; // Dapr Sidecar Port of this Example Server
  4. const serverHost = "127.0.0.1"; // App Host of this Example Server
  5. const serverPort = "50051"; // App Port of this Example Server "
  6. async function start() {
  7. const server = new DaprServer({
  8. serverHost,
  9. serverPort,
  10. clientOptions: {
  11. daprHost,
  12. daprPort,
  13. },
  14. });
  15. const pubSubName = "my-pubsub-name";
  16. // * Wildcard
  17. await server.pubsub.subscribe(pubSubName, "/events/*", async (data: any, headers: object) =>
  18. console.log(`Received Data: ${JSON.stringify(data)}`),
  19. );
  20. // + Wildcard
  21. await server.pubsub.subscribe(pubSubName, "/events/+/temperature", async (data: any, headers: object) =>
  22. console.log(`Received Data: ${JSON.stringify(data)}`),
  23. );
  24. // Start the server
  25. await server.start();
  26. }

批量订阅消息

批量订阅得到支持,并可通过以下API使用:

  • 通过subscribeBulk方法进行批量订阅:maxMessagesCountmaxAwaitDurationMs是可选的;如果不提供,将使用相关组件的默认值。

在监听消息时,应用程序会批量接收来自 Dapr 的消息。 然而,就像普通的订阅一样,回调函数每次只接收一条消息,用户可以选择返回一个 DaprPubSubStatusEnum 值来确认成功、重试或丢弃消息。 默认行为是返回成功响应。

请参考此文档了解更多详情。

  1. import { DaprServer } from "@dapr/dapr";
  2. const pubSubName = "orderPubSub";
  3. const topic = "topicbulk";
  4. const daprHost = process.env.DAPR_HOST || "127.0.0.1";
  5. const daprHttpPort = process.env.DAPR_HTTP_PORT || "3502";
  6. const serverHost = process.env.SERVER_HOST || "127.0.0.1";
  7. const serverPort = process.env.APP_PORT || 5001;
  8. async function start() {
  9. const server = new DaprServer({
  10. serverHost,
  11. serverPort,
  12. clientOptions: {
  13. daprHost,
  14. daprPort: daprHttpPort,
  15. },
  16. });
  17. // Publish multiple messages to a topic with default config.
  18. await client.pubsub.subscribeBulk(pubSubName, topic, (data) =>
  19. console.log("Subscriber received: " + JSON.stringify(data)),
  20. );
  21. // Publish multiple messages to a topic with specific maxMessagesCount and maxAwaitDurationMs.
  22. await client.pubsub.subscribeBulk(
  23. pubSubName,
  24. topic,
  25. (data) => {
  26. console.log("Subscriber received: " + JSON.stringify(data));
  27. return DaprPubSubStatusEnum.SUCCESS; // If App doesn't return anything, the default is SUCCESS. App can also return RETRY or DROP based on the incoming message.
  28. },
  29. {
  30. maxMessagesCount: 100,
  31. maxAwaitDurationMs: 40,
  32. },
  33. );
  34. }

死信主题

Dapr支持死信主题。 这意味着当消息处理失败时,它会被发送到死信队列。 例如,当一个消息无法在 /my-queue 上处理时,它将被发送到 /my-queue-failed。 例如,当一个消息无法在 /my-queue 上处理时,它将被发送到 /my-queue-failed

您可以使用以下选项与subscribeWithOptions方法一起使用:

  • deadletterTopic: 指定一个死信主题名称(注意:如果没有提供,则创建一个名为deadletter的主题)
  • deadletterCallback: 触发我们的死信处理程序的方法

通过以下两种方式可以在JS SDK中实现死信支持

  • deadletterCallback作为选项传递
  • 通过使用subscribeToRoute手动订阅路由

下面提供了一个示例

  1. import { DaprServer } from "@dapr/dapr";
  2. const daprHost = "127.0.0.1"; // Dapr Sidecar Host
  3. const daprPort = "3500"; // Dapr Sidecar Port of this Example Server
  4. const serverHost = "127.0.0.1"; // App Host of this Example Server
  5. const serverPort = "50051"; // App Port of this Example Server "
  6. async function start() {
  7. const server = new DaprServer({
  8. serverHost,
  9. serverPort,
  10. clientOptions: {
  11. daprHost,
  12. daprPort,
  13. },
  14. });
  15. const pubSubName = "my-pubsub-name";
  16. // Method 1 (direct subscribing through subscribeWithOptions)
  17. await server.pubsub.subscribeWithOptions("pubsub-redis", "topic-options-5", {
  18. callback: async (data: any) => {
  19. throw new Error("Triggering Deadletter");
  20. },
  21. deadLetterCallback: async (data: any) => {
  22. console.log("Handling Deadletter message");
  23. },
  24. });
  25. // Method 2 (subscribe afterwards)
  26. await server.pubsub.subscribeWithOptions("pubsub-redis", "topic-options-1", {
  27. deadletterTopic: "my-deadletter-topic",
  28. });
  29. server.pubsub.subscribeToRoute("pubsub-redis", "topic-options-1", "default", async () => {
  30. throw new Error("Triggering Deadletter");
  31. });
  32. server.pubsub.subscribeToRoute("pubsub-redis", "topic-options-1", "my-deadletter-topic", async () => {
  33. console.log("Handling Deadletter message");
  34. });
  35. // Start server
  36. await server.start();
  37. }

Bindings API

接收输入绑定

  1. import { DaprServer } from "@dapr/dapr";
  2. const daprHost = "127.0.0.1";
  3. const daprPort = "3500";
  4. const serverHost = "127.0.0.1";
  5. const serverPort = "5051";
  6. async function start() {
  7. const server = new DaprServer({
  8. serverHost,
  9. serverPort,
  10. clientOptions: {
  11. daprHost,
  12. daprPort,
  13. },
  14. });
  15. const bindingName = "my-binding-name";
  16. const response = await server.binding.receive(bindingName, async (data: any) =>
  17. console.log(`Got Data: ${JSON.stringify(data)}`),
  18. );
  19. await server.start();
  20. }
  21. start().catch((e) => {
  22. console.error(e);
  23. process.exit(1);
  24. });

有关输出绑定的完整指南,请访问操作方法:使用绑定

配置 API

💡 配置 API 目前只能通过 gRPC 获得

获取配置值

  1. import { DaprServer } from "@dapr/dapr";
  2. const daprHost = "127.0.0.1";
  3. const daprPort = "3500";
  4. const serverHost = "127.0.0.1";
  5. const serverPort = "5051";
  6. async function start() {
  7. const client = new DaprClient({
  8. daprHost,
  9. daprPort,
  10. communicationProtocol: CommunicationProtocolEnum.GRPC,
  11. });
  12. const config = await client.configuration.get("config-redis", ["myconfigkey1", "myconfigkey2"]);
  13. }
  14. start().catch((e) => {
  15. console.error(e);
  16. process.exit(1);
  17. });

订阅关键变更

  1. import { DaprServer } from "@dapr/dapr";
  2. const daprHost = "127.0.0.1";
  3. const daprPort = "3500";
  4. const serverHost = "127.0.0.1";
  5. const serverPort = "5051";
  6. async function start() {
  7. const client = new DaprClient({
  8. daprHost,
  9. daprPort,
  10. communicationProtocol: CommunicationProtocolEnum.GRPC,
  11. });
  12. const stream = await client.configuration.subscribeWithKeys("config-redis", ["myconfigkey1", "myconfigkey2"], () => {
  13. // Received a key update
  14. });
  15. // When you are ready to stop listening, call the following
  16. await stream.close();
  17. }
  18. start().catch((e) => {
  19. console.error(e);
  20. process.exit(1);
  21. });

相关链接