指南:如何编写和管理工作流

注意

Dapr工作流目前处于beta阶段。 查看已知限制 1.13.0.

让我们创建一个 Dapr 工作流,并使用控制台调用它。 通过提供的工作流示例,您将:

这个示例使用dapr init中的默认配置,在自托管模式下。

前期准备

设置环境

克隆JavaScript SDK存储库并进入该存储库。

  1. git clone https://github.com/dapr/js-sdk
  2. cd js-sdk

从 JavaScript SDK 根目录中,导航到 Dapr Workflow 示例。

  1. cd examples/workflow/authoring

运行以下命令使用 Dapr JavaScript SDK 安装运行此工作流示例所需的依赖项。

  1. npm install

运行activity-sequence.ts

activity-sequence 文件在 Dapr 工作流运行时中注册了一个工作流和一个活动。 工作流是按顺序执行的一系列活动。 我们使用DaprWorkflowClient来调度一个新的工作流实例并等待其完成。

  1. const daprHost = "localhost";
  2. const daprPort = "50001";
  3. const workflowClient = new DaprWorkflowClient({
  4. daprHost,
  5. daprPort,
  6. });
  7. const workflowRuntime = new WorkflowRuntime({
  8. daprHost,
  9. daprPort,
  10. });
  11. const hello = async (_: WorkflowActivityContext, name: string) => {
  12. return `Hello ${name}!`;
  13. };
  14. const sequence: TWorkflow = async function* (ctx: WorkflowContext): any {
  15. const cities: string[] = [];
  16. const result1 = yield ctx.callActivity(hello, "Tokyo");
  17. cities.push(result1);
  18. const result2 = yield ctx.callActivity(hello, "Seattle");
  19. cities.push(result2);
  20. const result3 = yield ctx.callActivity(hello, "London");
  21. cities.push(result3);
  22. return cities;
  23. };
  24. workflowRuntime.registerWorkflow(sequence).registerActivity(hello);
  25. // Wrap the worker startup in a try-catch block to handle any errors during startup
  26. try {
  27. await workflowRuntime.start();
  28. console.log("Workflow runtime started successfully");
  29. } catch (error) {
  30. console.error("Error starting workflow runtime:", error);
  31. }
  32. // Schedule a new orchestration
  33. try {
  34. const id = await workflowClient.scheduleNewWorkflow(sequence);
  35. console.log(`Orchestration scheduled with ID: ${id}`);
  36. // Wait for orchestration completion
  37. const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);
  38. console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
  39. } catch (error) {
  40. console.error("Error scheduling or waiting for orchestration:", error);
  41. }

在上面的代码中:

  • workflowRuntime.registerWorkflow(sequence) 在 Dapr Workflow 运行时中将 sequence 注册为一个工作流。
  • await workflowRuntime.start(); 构建并启动 Dapr Workflow 运行时内的引擎。
  • await workflowClient.scheduleNewWorkflow(sequence) 使用 Dapr Workflow 运行时调度一个新的工作流实例。
  • await workflowClient.waitForWorkflowCompletion(id, undefined, 30) 等待工作流实例完成。

在终端中执行以下命令来启动 activity-sequence.ts

  1. npm run start:dapr:activity-sequence

预期输出

  1. You're up and running! Both Dapr and your app logs will appear here.
  2. ...
  3. == APP == Orchestration scheduled with ID: dc040bea-6436-4051-9166-c9294f9d2201
  4. == APP == Waiting 30 seconds for instance dc040bea-6436-4051-9166-c9294f9d2201 to complete...
  5. == APP == Received "Orchestrator Request" work item with instance id 'dc040bea-6436-4051-9166-c9294f9d2201'
  6. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Rebuilding local state with 0 history event...
  7. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Processing 2 new history event(s): [ORCHESTRATORSTARTED=1, EXECUTIONSTARTED=1]
  8. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Waiting for 1 task(s) and 0 event(s) to complete...
  9. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Returning 1 action(s)
  10. == APP == Received "Activity Request" work item
  11. == APP == Activity hello completed with output "Hello Tokyo!" (14 chars)
  12. == APP == Received "Orchestrator Request" work item with instance id 'dc040bea-6436-4051-9166-c9294f9d2201'
  13. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Rebuilding local state with 3 history event...
  14. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Processing 2 new history event(s): [ORCHESTRATORSTARTED=1, TASKCOMPLETED=1]
  15. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Waiting for 1 task(s) and 0 event(s) to complete...
  16. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Returning 1 action(s)
  17. == APP == Received "Activity Request" work item
  18. == APP == Activity hello completed with output "Hello Seattle!" (16 chars)
  19. == APP == Received "Orchestrator Request" work item with instance id 'dc040bea-6436-4051-9166-c9294f9d2201'
  20. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Rebuilding local state with 6 history event...
  21. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Processing 2 new history event(s): [ORCHESTRATORSTARTED=1, TASKCOMPLETED=1]
  22. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Waiting for 1 task(s) and 0 event(s) to complete...
  23. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Returning 1 action(s)
  24. == APP == Received "Activity Request" work item
  25. == APP == Activity hello completed with output "Hello London!" (15 chars)
  26. == APP == Received "Orchestrator Request" work item with instance id 'dc040bea-6436-4051-9166-c9294f9d2201'
  27. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Rebuilding local state with 9 history event...
  28. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Processing 2 new history event(s): [ORCHESTRATORSTARTED=1, TASKCOMPLETED=1]
  29. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Orchestration completed with status COMPLETED
  30. == APP == dc040bea-6436-4051-9166-c9294f9d2201: Returning 1 action(s)
  31. INFO[0006] dc040bea-6436-4051-9166-c9294f9d2201: 'sequence' completed with a COMPLETED status. app_id=activity-sequence-workflow instance=kaibocai-devbox scope=wfengine.backend type=log ver=1.12.3
  32. == APP == Instance dc040bea-6436-4051-9166-c9294f9d2201 completed
  33. == APP == Orchestration completed! Result: ["Hello Tokyo!","Hello Seattle!","Hello London!"]

下一步