工作流模式

编写不同类型的工作流模式

Dapr 工作流简化了微服务架构中复杂的有状态协调要求。 以下部分介绍了可以从 Dapr 工作流中受益的几种应用程序模式。

任务链

在任务链模式中,工作流中的多个步骤连续运行,一个步骤的输出可以作为下一步的输入传递。 任务链工作流通常涉及创建需要对某些数据执行的操作序列,例如筛选、转换和缩减。

Diagram showing how the task chaining workflow pattern works

在某些情况下,可能需要跨多个微服务编排工作流的步骤。 为了提高可靠性和可伸缩性,还可以使用队列来触发各个步骤。

虽然模式很简单,但实现中隐藏着许多复杂性。 例如:

  • 如果其中一个微服务长时间不可用,会发生什么情况?
  • 失败的步骤可以自动重试吗?
  • 如果不行,如何在适用的情况下为回滚先前完成的步骤提供便利?
  • 撇开实施细节不谈,是否有办法将工作流程可视化,以便其他工程师了解工作流程的作用和工作方式?

Dapr 工作流解决了这些复杂问题,它允许您在自己选择的编程语言中以简单函数的形式简洁地实现任务链模式,如下例所示。

  1. import dapr.ext.workflow as wf
  2. def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
  3. try:
  4. result1 = yield ctx.call_activity(step1, input=wf_input)
  5. result2 = yield ctx.call_activity(step2, input=result1)
  6. result3 = yield ctx.call_activity(step3, input=result2)
  7. except Exception as e:
  8. yield ctx.call_activity(error_handler, input=str(e))
  9. raise
  10. return [result1, result2, result3]
  11. def step1(ctx, activity_input):
  12. print(f'Step 1: Received input: {activity_input}.')
  13. # Do some work
  14. return activity_input + 1
  15. def step2(ctx, activity_input):
  16. print(f'Step 2: Received input: {activity_input}.')
  17. # Do some work
  18. return activity_input * 2
  19. def step3(ctx, activity_input):
  20. print(f'Step 3: Received input: {activity_input}.')
  21. # Do some work
  22. return activity_input ^ 2
  23. def error_handler(ctx, error):
  24. print(f'Executing error handler: {error}.')
  25. # Do some compensating work

注意 工作流重试策略将在 Python SDK 的未来版本中提供。

  1. import { DaprWorkflowClient, WorkflowActivityContext, WorkflowContext, WorkflowRuntime, TWorkflow } from "@dapr/dapr";
  2. async function start() {
  3. // Update the gRPC client and worker to use a local address and port
  4. const daprHost = "localhost";
  5. const daprPort = "50001";
  6. const workflowClient = new DaprWorkflowClient({
  7. daprHost,
  8. daprPort,
  9. });
  10. const workflowRuntime = new WorkflowRuntime({
  11. daprHost,
  12. daprPort,
  13. });
  14. const hello = async (_: WorkflowActivityContext, name: string) => {
  15. return `Hello ${name}!`;
  16. };
  17. const sequence: TWorkflow = async function* (ctx: WorkflowContext): any {
  18. const cities: string[] = [];
  19. const result1 = yield ctx.callActivity(hello, "Tokyo");
  20. cities.push(result1);
  21. const result2 = yield ctx.callActivity(hello, "Seattle");
  22. cities.push(result2);
  23. const result3 = yield ctx.callActivity(hello, "London");
  24. cities.push(result3);
  25. return cities;
  26. };
  27. workflowRuntime.registerWorkflow(sequence).registerActivity(hello);
  28. // Wrap the worker startup in a try-catch block to handle any errors during startup
  29. try {
  30. await workflowRuntime.start();
  31. console.log("Workflow runtime started successfully");
  32. } catch (error) {
  33. console.error("Error starting workflow runtime:", error);
  34. }
  35. // Schedule a new orchestration
  36. try {
  37. const id = await workflowClient.scheduleNewWorkflow(sequence);
  38. console.log(`Orchestration scheduled with ID: ${id}`);
  39. // Wait for orchestration completion
  40. const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);
  41. console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
  42. } catch (error) {
  43. console.error("Error scheduling or waiting for orchestration:", error);
  44. }
  45. await workflowRuntime.stop();
  46. await workflowClient.stop();
  47. // stop the dapr side car
  48. process.exit(0);
  49. }
  50. start().catch((e) => {
  51. console.error(e);
  52. process.exit(1);
  53. });
  1. // Expotential backoff retry policy that survives long outages
  2. var retryOptions = new WorkflowTaskOptions
  3. {
  4. RetryPolicy = new WorkflowRetryPolicy(
  5. firstRetryInterval: TimeSpan.FromMinutes(1),
  6. backoffCoefficient: 2.0,
  7. maxRetryInterval: TimeSpan.FromHours(1),
  8. maxNumberOfAttempts: 10),
  9. };
  10. try
  11. {
  12. var result1 = await context.CallActivityAsync<string>("Step1", wfInput, retryOptions);
  13. var result2 = await context.CallActivityAsync<byte[]>("Step2", result1, retryOptions);
  14. var result3 = await context.CallActivityAsync<long[]>("Step3", result2, retryOptions);
  15. return string.Join(", ", result4);
  16. }
  17. catch (TaskFailedException) // Task failures are surfaced as TaskFailedException
  18. {
  19. // Retries expired - apply custom compensation logic
  20. await context.CallActivityAsync<long[]>("MyCompensation", options: retryOptions);
  21. throw;
  22. }

注意 在上面的示例中, "Step1", "Step2", "Step3", 和 "MyCompensation" 代表工作流活动,它们是代码中实际执行工作流步骤的函数。 为简洁起见,此示例中省略了这些活动实现。

  1. public class ChainWorkflow extends Workflow {
  2. @Override
  3. public WorkflowStub create() {
  4. return ctx -> {
  5. StringBuilder sb = new StringBuilder();
  6. String wfInput = ctx.getInput(String.class);
  7. String result1 = ctx.callActivity("Step1", wfInput, String.class).await();
  8. String result2 = ctx.callActivity("Step2", result1, String.class).await();
  9. String result3 = ctx.callActivity("Step3", result2, String.class).await();
  10. String result = sb.append(result1).append(',').append(result2).append(',').append(result3).toString();
  11. ctx.complete(result);
  12. };
  13. }
  14. }
  15. class Step1 implements WorkflowActivity {
  16. @Override
  17. public Object run(WorkflowActivityContext ctx) {
  18. Logger logger = LoggerFactory.getLogger(Step1.class);
  19. logger.info("Starting Activity: " + ctx.getName());
  20. // Do some work
  21. return null;
  22. }
  23. }
  24. class Step2 implements WorkflowActivity {
  25. @Override
  26. public Object run(WorkflowActivityContext ctx) {
  27. Logger logger = LoggerFactory.getLogger(Step2.class);
  28. logger.info("Starting Activity: " + ctx.getName());
  29. // Do some work
  30. return null;
  31. }
  32. }
  33. class Step3 implements WorkflowActivity {
  34. @Override
  35. public Object run(WorkflowActivityContext ctx) {
  36. Logger logger = LoggerFactory.getLogger(Step3.class);
  37. logger.info("Starting Activity: " + ctx.getName());
  38. // Do some work
  39. return null;
  40. }
  41. }
  1. func TaskChainWorkflow(ctx *workflow.WorkflowContext) (any, error) {
  2. var input int
  3. if err := ctx.GetInput(&input); err != nil {
  4. return "", err
  5. }
  6. var result1 int
  7. if err := ctx.CallActivity(Step1, workflow.ActivityInput(input)).Await(&result1); err != nil {
  8. return nil, err
  9. }
  10. var result2 int
  11. if err := ctx.CallActivity(Step1, workflow.ActivityInput(input)).Await(&result2); err != nil {
  12. return nil, err
  13. }
  14. var result3 int
  15. if err := ctx.CallActivity(Step1, workflow.ActivityInput(input)).Await(&result3); err != nil {
  16. return nil, err
  17. }
  18. return []int{result1, result2, result3}, nil
  19. }
  20. func Step1(ctx workflow.ActivityContext) (any, error) {
  21. var input int
  22. if err := ctx.GetInput(&input); err != nil {
  23. return "", err
  24. }
  25. fmt.Printf("Step 1: Received input: %s", input)
  26. return input + 1, nil
  27. }
  28. func Step2(ctx workflow.ActivityContext) (any, error) {
  29. var input int
  30. if err := ctx.GetInput(&input); err != nil {
  31. return "", err
  32. }
  33. fmt.Printf("Step 2: Received input: %s", input)
  34. return input * 2, nil
  35. }
  36. func Step3(ctx workflow.ActivityContext) (any, error) {
  37. var input int
  38. if err := ctx.GetInput(&input); err != nil {
  39. return "", err
  40. }
  41. fmt.Printf("Step 3: Received input: %s", input)
  42. return int(math.Pow(float64(input), 2)), nil
  43. }

如您所见,工作流以您选择的编程语言表示为一系列简单的语句。 这使组织中的任何工程师都可以快速了解端到端流程,而不必了解端到端系统架构。

幕后是 Dapr 工作流运行时:

  • 负责执行工作流程,确保流程顺利完成。
  • 自动保存进度。
  • 如果工作流程本身因故失败,则自动从最后完成的步骤恢复工作流程。
  • 可在目标编程语言中自然表达错误处理,让您轻松实现补偿逻辑。
  • 提供内置重试配置原语,简化了为工作流中各个步骤配置复杂重试策略的过程。

Fan-out/fan-in

在扇出/扇入(fan-out/fan-in)设计模式中,您可能会在多个 Worker 上同时执行多个任务,等待它们完成,然后对结果进行聚合。

Diagram showing how the fan-out/fan-in workflow pattern works

除了在之前的模式中提到的挑战之外,在手动实施扇出/扇入模式时还需要考虑几个重要问题:

  • 如何控制并行度?
  • 如何知道何时触发后续聚合步骤?
  • 如果并行步骤的数量是动态的,该怎么办?

Dapr 工作流提供了一种将扇出/扇入模式表达为简单函数的方法,如下例所示:

  1. import time
  2. from typing import List
  3. import dapr.ext.workflow as wf
  4. def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
  5. # get a batch of N work items to process in parallel
  6. work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)
  7. # schedule N parallel tasks to process the work items and wait for all to complete
  8. parallel_tasks = [ctx.call_activity(process_work_item, input=work_item) for work_item in work_batch]
  9. outputs = yield wf.when_all(parallel_tasks)
  10. # aggregate the results and send them to another activity
  11. total = sum(outputs)
  12. yield ctx.call_activity(process_results, input=total)
  13. def get_work_batch(ctx, batch_size: int) -> List[int]:
  14. return [i + 1 for i in range(batch_size)]
  15. def process_work_item(ctx, work_item: int) -> int:
  16. print(f'Processing work item: {work_item}.')
  17. time.sleep(5)
  18. result = work_item * 2
  19. print(f'Work item {work_item} processed. Result: {result}.')
  20. return result
  21. def process_results(ctx, final_result: int):
  22. print(f'Final result: {final_result}.')
  1. import {
  2. Task,
  3. DaprWorkflowClient,
  4. WorkflowActivityContext,
  5. WorkflowContext,
  6. WorkflowRuntime,
  7. TWorkflow,
  8. } from "@dapr/dapr";
  9. // Wrap the entire code in an immediately-invoked async function
  10. async function start() {
  11. // Update the gRPC client and worker to use a local address and port
  12. const daprHost = "localhost";
  13. const daprPort = "50001";
  14. const workflowClient = new DaprWorkflowClient({
  15. daprHost,
  16. daprPort,
  17. });
  18. const workflowRuntime = new WorkflowRuntime({
  19. daprHost,
  20. daprPort,
  21. });
  22. function getRandomInt(min: number, max: number): number {
  23. return Math.floor(Math.random() * (max - min + 1)) + min;
  24. }
  25. async function getWorkItemsActivity(_: WorkflowActivityContext): Promise<string[]> {
  26. const count: number = getRandomInt(2, 10);
  27. console.log(`generating ${count} work items...`);
  28. const workItems: string[] = Array.from({ length: count }, (_, i) => `work item ${i}`);
  29. return workItems;
  30. }
  31. function sleep(ms: number): Promise<void> {
  32. return new Promise((resolve) => setTimeout(resolve, ms));
  33. }
  34. async function processWorkItemActivity(context: WorkflowActivityContext, item: string): Promise<number> {
  35. console.log(`processing work item: ${item}`);
  36. // Simulate some work that takes a variable amount of time
  37. const sleepTime = Math.random() * 5000;
  38. await sleep(sleepTime);
  39. // Return a result for the given work item, which is also a random number in this case
  40. // For more information about random numbers in workflow please check
  41. // https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-code-constraints?tabs=csharp#random-numbers
  42. return Math.floor(Math.random() * 11);
  43. }
  44. const workflow: TWorkflow = async function* (ctx: WorkflowContext): any {
  45. const tasks: Task<any>[] = [];
  46. const workItems = yield ctx.callActivity(getWorkItemsActivity);
  47. for (const workItem of workItems) {
  48. tasks.push(ctx.callActivity(processWorkItemActivity, workItem));
  49. }
  50. const results: number[] = yield ctx.whenAll(tasks);
  51. const sum: number = results.reduce((accumulator, currentValue) => accumulator + currentValue, 0);
  52. return sum;
  53. };
  54. workflowRuntime.registerWorkflow(workflow);
  55. workflowRuntime.registerActivity(getWorkItemsActivity);
  56. workflowRuntime.registerActivity(processWorkItemActivity);
  57. // Wrap the worker startup in a try-catch block to handle any errors during startup
  58. try {
  59. await workflowRuntime.start();
  60. console.log("Worker started successfully");
  61. } catch (error) {
  62. console.error("Error starting worker:", error);
  63. }
  64. // Schedule a new orchestration
  65. try {
  66. const id = await workflowClient.scheduleNewWorkflow(workflow);
  67. console.log(`Orchestration scheduled with ID: ${id}`);
  68. // Wait for orchestration completion
  69. const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);
  70. console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
  71. } catch (error) {
  72. console.error("Error scheduling or waiting for orchestration:", error);
  73. }
  74. // stop worker and client
  75. await workflowRuntime.stop();
  76. await workflowClient.stop();
  77. // stop the dapr side car
  78. process.exit(0);
  79. }
  80. start().catch((e) => {
  81. console.error(e);
  82. process.exit(1);
  83. });
  1. // Get a list of N work items to process in parallel.
  2. object[] workBatch = await context.CallActivityAsync<object[]>("GetWorkBatch", null);
  3. // Schedule the parallel tasks, but don't wait for them to complete yet.
  4. var parallelTasks = new List<Task<int>>(workBatch.Length);
  5. for (int i = 0; i < workBatch.Length; i++)
  6. {
  7. Task<int> task = context.CallActivityAsync<int>("ProcessWorkItem", workBatch[i]);
  8. parallelTasks.Add(task);
  9. }
  10. // Everything is scheduled. Wait here until all parallel tasks have completed.
  11. await Task.WhenAll(parallelTasks);
  12. // Aggregate all N outputs and publish the result.
  13. int sum = parallelTasks.Sum(t => t.Result);
  14. await context.CallActivityAsync("PostResults", sum);
  1. public class FaninoutWorkflow extends Workflow {
  2. @Override
  3. public WorkflowStub create() {
  4. return ctx -> {
  5. // Get a list of N work items to process in parallel.
  6. Object[] workBatch = ctx.callActivity("GetWorkBatch", Object[].class).await();
  7. // Schedule the parallel tasks, but don't wait for them to complete yet.
  8. List<Task<Integer>> tasks = Arrays.stream(workBatch)
  9. .map(workItem -> ctx.callActivity("ProcessWorkItem", workItem, int.class))
  10. .collect(Collectors.toList());
  11. // Everything is scheduled. Wait here until all parallel tasks have completed.
  12. List<Integer> results = ctx.allOf(tasks).await();
  13. // Aggregate all N outputs and publish the result.
  14. int sum = results.stream().mapToInt(Integer::intValue).sum();
  15. ctx.complete(sum);
  16. };
  17. }
  18. }
  1. func BatchProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
  2. var input int
  3. if err := ctx.GetInput(&input); err != nil {
  4. return 0, err
  5. }
  6. var workBatch []int
  7. if err := ctx.CallActivity(GetWorkBatch, workflow.ActivityInput(input)).Await(&workBatch); err != nil {
  8. return 0, err
  9. }
  10. parallelTasks := workflow.NewTaskSlice(len(workBatch))
  11. for i, workItem := range workBatch {
  12. parallelTasks[i] = ctx.CallActivity(ProcessWorkItem, workflow.ActivityInput(workItem))
  13. }
  14. var outputs int
  15. for _, task := range parallelTasks {
  16. var output int
  17. err := task.Await(&output)
  18. if err == nil {
  19. outputs += output
  20. } else {
  21. return 0, err
  22. }
  23. }
  24. if err := ctx.CallActivity(ProcessResults, workflow.ActivityInput(outputs)).Await(nil); err != nil {
  25. return 0, err
  26. }
  27. return 0, nil
  28. }
  29. func GetWorkBatch(ctx workflow.ActivityContext) (any, error) {
  30. var batchSize int
  31. if err := ctx.GetInput(&batchSize); err != nil {
  32. return 0, err
  33. }
  34. batch := make([]int, batchSize)
  35. for i := 0; i < batchSize; i++ {
  36. batch[i] = i
  37. }
  38. return batch, nil
  39. }
  40. func ProcessWorkItem(ctx workflow.ActivityContext) (any, error) {
  41. var workItem int
  42. if err := ctx.GetInput(&workItem); err != nil {
  43. return 0, err
  44. }
  45. fmt.Printf("Processing work item: %d\n", workItem)
  46. time.Sleep(time.Second * 5)
  47. result := workItem * 2
  48. fmt.Printf("Work item %d processed. Result: %d\n", workItem, result)
  49. return result, nil
  50. }
  51. func ProcessResults(ctx workflow.ActivityContext) (any, error) {
  52. var finalResult int
  53. if err := ctx.GetInput(&finalResult); err != nil {
  54. return 0, err
  55. }
  56. fmt.Printf("Final result: %d\n", finalResult)
  57. return finalResult, nil
  58. }

此示例的关键要点是:

  • 扇出/扇入模式可以用普通编程结构表达为一个简单函数
  • 并行任务的数量可以是静态的,也可以是动态的
  • 工作流本身能够聚合并行执行的结果

虽然示例中未显示,但可以使用特定于语言的简单构造更进一步并限制并发程度。 此外,工作流的执行是持久的。 如果工作流启动 100 个并行任务执行,并且在流程崩溃之前仅完成 40 个,则工作流会自动重新启动,并且仅计划剩余的 60 个任务。

异步 HTTP API

异步HTTP API通常使用异步请求-回复模式来实现。 传统上,实现此模式涉及以下内容:

  1. 一个客户端向一个HTTP API端点发送请求(start API
  2. start API 向后端队列写入信息,从而触发长期运行操作的启动
  3. 在调度后端操作后,start API 立即向客户端返回 HTTP 202 响应,其中包含可用于轮询状态的标识符
  4. status API 查询包含长期运行操作状态的数据库
  5. 客户端会反复轮询_status API_,直到超时或收到”完成”响应为止

端到端流程如下图所示。

Diagram showing how the async request response pattern works

实现异步请求-回复模式的挑战在于,它涉及多个应用程序接口和状态存储的使用。 这还包括正确执行协议,以便客户端知道如何自动轮询状态,并知道操作何时完成。

Dapr 工作流 HTTP API 支持开箱即用的异步请求-回复模式,无需编写任何代码或进行任何状态管理。

以下 curl 命令说明了工作流 API 如何支持这种模式。

  1. curl -X POST http://localhost:3500/v1.0-beta1/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678 -d '{"Name":"Paperclips","Quantity":1,"TotalCost":9.95}'

上一条命令将产生以下 JSON 响应:

  1. {"instanceID":"12345678"}

然后,HTTP 客户端可以使用工作流实例 ID 构建状态查询 URL,并反复轮询,直到在有效负载中看到 “COMPLETE”、“FAILURE “或 “TERMINATED “状态。

  1. curl http://localhost:3500/v1.0-beta1/workflows/dapr/12345678

下面是一个正在进行的工作流程状态示例。

  1. {
  2. "instanceID": "12345678",
  3. "workflowName": "OrderProcessingWorkflow",
  4. "createdAt": "2023-05-03T23:22:11.143069826Z",
  5. "lastUpdatedAt": "2023-05-03T23:22:22.460025267Z",
  6. "runtimeStatus": "RUNNING",
  7. "properties": {
  8. "dapr.workflow.custom_status": "",
  9. "dapr.workflow.input": "{\"Name\":\"Paperclips\",\"Quantity\":1,\"TotalCost\":9.95}"
  10. }
  11. }

如您可以从前面的示例中看到,工作流的运行状态是RUNNING,这让客户端知道它应该继续轮询。

如果工作流已完成,则状态可能如下所示。

  1. {
  2. "instanceID": "12345678",
  3. "workflowName": "OrderProcessingWorkflow",
  4. "createdAt": "2023-05-03T23:30:11.381146313Z",
  5. "lastUpdatedAt": "2023-05-03T23:30:52.923870615Z",
  6. "runtimeStatus": "COMPLETED",
  7. "properties": {
  8. "dapr.workflow.custom_status": "",
  9. "dapr.workflow.input": "{\"Name\":\"Paperclips\",\"Quantity\":1,\"TotalCost\":9.95}",
  10. "dapr.workflow.output": "{\"Processed\":true}"
  11. }
  12. }

从上一个示例可以看出,工作流的运行状态现在是 COMPLETED,这意味着客户端可以停止轮询更新。

监控

监控模式是一种典型的循环过程:

  1. 检查系统状态
  2. 根据该状态采取一些操作 - 例如发送通知
  3. 睡眠一段时间
  4. 重复

下图提供了此模式的粗略说明。

Diagram showing how the monitor pattern works

根据业务需要,可能只有一个监控器,也可能有多个监控器,每个业务实体(如股票)一个。 此外,睡眠时间可能需要根据具体情况进行调整。 这些要求使得使用基于 cron 的调度系统变得不切实际。

Dapr 工作流原生支持这种模式,允许您实现_永恒的工作流_。 与其编写无限的 while 循环(这是一种反模式),Dapr Workflow 提供了一个 continue-as-new API,工作流作者可以使用它从头开始重新启动一个工作流函数,并使用新的输入。

  1. from dataclasses import dataclass
  2. from datetime import timedelta
  3. import random
  4. import dapr.ext.workflow as wf
  5. @dataclass
  6. class JobStatus:
  7. job_id: str
  8. is_healthy: bool
  9. def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
  10. # poll a status endpoint associated with this job
  11. status = yield ctx.call_activity(check_status, input=job)
  12. if not ctx.is_replaying:
  13. print(f"Job '{job.job_id}' is {status}.")
  14. if status == "healthy":
  15. job.is_healthy = True
  16. next_sleep_interval = 60 # check less frequently when healthy
  17. else:
  18. if job.is_healthy:
  19. job.is_healthy = False
  20. ctx.call_activity(send_alert, input=f"Job '{job.job_id}' is unhealthy!")
  21. next_sleep_interval = 5 # check more frequently when unhealthy
  22. yield ctx.create_timer(fire_at=ctx.current_utc_datetime + timedelta(seconds=next_sleep_interval))
  23. # restart from the beginning with a new JobStatus input
  24. ctx.continue_as_new(job)
  25. def check_status(ctx, _) -> str:
  26. return random.choice(["healthy", "unhealthy"])
  27. def send_alert(ctx, message: str):
  28. print(f'*** Alert: {message}')
  1. const statusMonitorWorkflow: TWorkflow = async function* (ctx: WorkflowContext): any {
  2. let duration;
  3. const status = yield ctx.callActivity(checkStatusActivity);
  4. if (status === "healthy") {
  5. // Check less frequently when in a healthy state
  6. // set duration to 1 hour
  7. duration = 60 * 60;
  8. } else {
  9. yield ctx.callActivity(alertActivity, "job unhealthy");
  10. // Check more frequently when in an unhealthy state
  11. // set duration to 5 minutes
  12. duration = 5 * 60;
  13. }
  14. // Put the workflow to sleep until the determined time
  15. ctx.createTimer(duration);
  16. // Restart from the beginning with the updated state
  17. ctx.continueAsNew();
  18. };
  1. public override async Task<object> RunAsync(WorkflowContext context, MyEntityState myEntityState)
  2. {
  3. TimeSpan nextSleepInterval;
  4. var status = await context.CallActivityAsync<string>("GetStatus");
  5. if (status == "healthy")
  6. {
  7. myEntityState.IsHealthy = true;
  8. // Check less frequently when in a healthy state
  9. nextSleepInterval = TimeSpan.FromMinutes(60);
  10. }
  11. else
  12. {
  13. if (myEntityState.IsHealthy)
  14. {
  15. myEntityState.IsHealthy = false;
  16. await context.CallActivityAsync("SendAlert", myEntityState);
  17. }
  18. // Check more frequently when in an unhealthy state
  19. nextSleepInterval = TimeSpan.FromMinutes(5);
  20. }
  21. // Put the workflow to sleep until the determined time
  22. await context.CreateTimer(nextSleepInterval);
  23. // Restart from the beginning with the updated state
  24. context.ContinueAsNew(myEntityState);
  25. return null;
  26. }

这个示例假设您有一个预定义的 MyEntityState 类,其中有一个布尔 IsHealthy 属性。

  1. public class MonitorWorkflow extends Workflow {
  2. @Override
  3. public WorkflowStub create() {
  4. return ctx -> {
  5. Duration nextSleepInterval;
  6. var status = ctx.callActivity(DemoWorkflowStatusActivity.class.getName(), DemoStatusActivityOutput.class).await();
  7. var isHealthy = status.getIsHealthy();
  8. if (isHealthy) {
  9. // Check less frequently when in a healthy state
  10. nextSleepInterval = Duration.ofMinutes(60);
  11. } else {
  12. ctx.callActivity(DemoWorkflowAlertActivity.class.getName()).await();
  13. // Check more frequently when in an unhealthy state
  14. nextSleepInterval = Duration.ofMinutes(5);
  15. }
  16. // Put the workflow to sleep until the determined time
  17. try {
  18. ctx.createTimer(nextSleepInterval);
  19. } catch (InterruptedException e) {
  20. throw new RuntimeException(e);
  21. }
  22. // Restart from the beginning with the updated state
  23. ctx.continueAsNew();
  24. }
  25. }
  26. }
  1. type JobStatus struct {
  2. JobID string `json:"job_id"`
  3. IsHealthy bool `json:"is_healthy"`
  4. }
  5. func StatusMonitorWorkflow(ctx *workflow.WorkflowContext) (any, error) {
  6. var sleepInterval time.Duration
  7. var job JobStatus
  8. if err := ctx.GetInput(&job); err != nil {
  9. return "", err
  10. }
  11. var status string
  12. if err := ctx.CallActivity(CheckStatus, workflow.ActivityInput(job)).Await(&status); err != nil {
  13. return "", err
  14. }
  15. if status == "healthy" {
  16. job.IsHealthy = true
  17. sleepInterval = time.Second * 60
  18. } else {
  19. if job.IsHealthy {
  20. job.IsHealthy = false
  21. err := ctx.CallActivity(SendAlert, workflow.ActivityInput(fmt.Sprintf("Job '%s' is unhealthy!", job.JobID))).Await(nil)
  22. if err != nil {
  23. return "", err
  24. }
  25. }
  26. sleepInterval = time.Second * 5
  27. }
  28. if err := ctx.CreateTimer(sleepInterval).Await(nil); err != nil {
  29. return "", err
  30. }
  31. ctx.ContinueAsNew(job, false)
  32. return "", nil
  33. }
  34. func CheckStatus(ctx workflow.ActivityContext) (any, error) {
  35. statuses := []string{"healthy", "unhealthy"}
  36. return statuses[rand.Intn(1)], nil
  37. }
  38. func SendAlert(ctx workflow.ActivityContext) (any, error) {
  39. var message string
  40. if err := ctx.GetInput(&message); err != nil {
  41. return "", err
  42. }
  43. fmt.Printf("*** Alert: %s", message)
  44. return "", nil
  45. }

实施监控模式的工作流可以永远循环,也可以通过不调用 continue-as-new 从容终止。

注意

这个模式也可以使用actors提醒来表示。 不同之处在于,该工作流程以单个函数的形式表达,输入和状态存储在本地变量中。 必要时,工作流还可以执行一系列具有更强可靠性保证的操作。

外部系统交互

在某些情况下,工作流可能需要暂停并等待外部系统执行某些操作。 例如,工作流可能需要暂停,等待收到付款。 在这种情况下,支付系统可能会在收到付款时将事件发布到发布/订阅主题,并且该主题的监听器可以使用发起事件工作流 API

另一种非常常见的情况是工作流需要暂停并等待人工操作,例如在审批采购订单时。 Dapr Workflow通过外部事件特性来支持这种事件模式。

下面是涉及人员的采购订单的示例工作流:

  1. 收到采购订单时将触发工作流。
  2. 工作流中的规则确定需要人工执行某些操作。 例如,采购订单金额超过了某个自动审批阈值。
  3. 工作流发送通知,请求人工操作。 例如,向指定的审批人发送带有审批链接的电子邮件。
  4. 工作流暂停,等待人工点击链接批准或拒绝订单。
  5. 如果在指定时间内未收到审核,工作流将恢复并执行一些补偿逻辑,例如取消订单。

下图说明了此流程。

Diagram showing how the external system interaction pattern works with a human involved

下面的示例代码展示了如何使用 Dapr 工作流实现这种模式。

  1. from dataclasses import dataclass
  2. from datetime import timedelta
  3. import dapr.ext.workflow as wf
  4. @dataclass
  5. class Order:
  6. cost: float
  7. product: str
  8. quantity: int
  9. def __str__(self):
  10. return f'{self.product} ({self.quantity})'
  11. @dataclass
  12. class Approval:
  13. approver: str
  14. @staticmethod
  15. def from_dict(dict):
  16. return Approval(**dict)
  17. def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
  18. # Orders under $1000 are auto-approved
  19. if order.cost < 1000:
  20. return "Auto-approved"
  21. # Orders of $1000 or more require manager approval
  22. yield ctx.call_activity(send_approval_request, input=order)
  23. # Approvals must be received within 24 hours or they will be canceled.
  24. approval_event = ctx.wait_for_external_event("approval_received")
  25. timeout_event = ctx.create_timer(timedelta(hours=24))
  26. winner = yield wf.when_any([approval_event, timeout_event])
  27. if winner == timeout_event:
  28. return "Cancelled"
  29. # The order was approved
  30. yield ctx.call_activity(place_order, input=order)
  31. approval_details = Approval.from_dict(approval_event.get_result())
  32. return f"Approved by '{approval_details.approver}'"
  33. def send_approval_request(_, order: Order) -> None:
  34. print(f'*** Sending approval request for order: {order}')
  35. def place_order(_, order: Order) -> None:
  36. print(f'*** Placing order: {order}')
  1. import {
  2. Task,
  3. DaprWorkflowClient,
  4. WorkflowActivityContext,
  5. WorkflowContext,
  6. WorkflowRuntime,
  7. TWorkflow,
  8. } from "@dapr/dapr";
  9. import * as readlineSync from "readline-sync";
  10. // Wrap the entire code in an immediately-invoked async function
  11. async function start() {
  12. class Order {
  13. cost: number;
  14. product: string;
  15. quantity: number;
  16. constructor(cost: number, product: string, quantity: number) {
  17. this.cost = cost;
  18. this.product = product;
  19. this.quantity = quantity;
  20. }
  21. }
  22. function sleep(ms: number): Promise<void> {
  23. return new Promise((resolve) => setTimeout(resolve, ms));
  24. }
  25. // Update the gRPC client and worker to use a local address and port
  26. const daprHost = "localhost";
  27. const daprPort = "50001";
  28. const workflowClient = new DaprWorkflowClient({
  29. daprHost,
  30. daprPort,
  31. });
  32. const workflowRuntime = new WorkflowRuntime({
  33. daprHost,
  34. daprPort,
  35. });
  36. // Activity function that sends an approval request to the manager
  37. const sendApprovalRequest = async (_: WorkflowActivityContext, order: Order) => {
  38. // Simulate some work that takes an amount of time
  39. await sleep(3000);
  40. console.log(`Sending approval request for order: ${order.product}`);
  41. };
  42. // Activity function that places an order
  43. const placeOrder = async (_: WorkflowActivityContext, order: Order) => {
  44. console.log(`Placing order: ${order.product}`);
  45. };
  46. // Orchestrator function that represents a purchase order workflow
  47. const purchaseOrderWorkflow: TWorkflow = async function* (ctx: WorkflowContext, order: Order): any {
  48. // Orders under $1000 are auto-approved
  49. if (order.cost < 1000) {
  50. return "Auto-approved";
  51. }
  52. // Orders of $1000 or more require manager approval
  53. yield ctx.callActivity(sendApprovalRequest, order);
  54. // Approvals must be received within 24 hours or they will be cancled.
  55. const tasks: Task<any>[] = [];
  56. const approvalEvent = ctx.waitForExternalEvent("approval_received");
  57. const timeoutEvent = ctx.createTimer(24 * 60 * 60);
  58. tasks.push(approvalEvent);
  59. tasks.push(timeoutEvent);
  60. const winner = ctx.whenAny(tasks);
  61. if (winner == timeoutEvent) {
  62. return "Cancelled";
  63. }
  64. yield ctx.callActivity(placeOrder, order);
  65. const approvalDetails = approvalEvent.getResult();
  66. return `Approved by ${approvalDetails.approver}`;
  67. };
  68. workflowRuntime
  69. .registerWorkflow(purchaseOrderWorkflow)
  70. .registerActivity(sendApprovalRequest)
  71. .registerActivity(placeOrder);
  72. // Wrap the worker startup in a try-catch block to handle any errors during startup
  73. try {
  74. await workflowRuntime.start();
  75. console.log("Worker started successfully");
  76. } catch (error) {
  77. console.error("Error starting worker:", error);
  78. }
  79. // Schedule a new orchestration
  80. try {
  81. const cost = readlineSync.questionInt("Cost of your order:");
  82. const approver = readlineSync.question("Approver of your order:");
  83. const timeout = readlineSync.questionInt("Timeout for your order in seconds:");
  84. const order = new Order(cost, "MyProduct", 1);
  85. const id = await workflowClient.scheduleNewWorkflow(purchaseOrderWorkflow, order);
  86. console.log(`Orchestration scheduled with ID: ${id}`);
  87. // prompt for approval asynchronously
  88. promptForApproval(approver, workflowClient, id);
  89. // Wait for orchestration completion
  90. const state = await workflowClient.waitForWorkflowCompletion(id, undefined, timeout + 2);
  91. console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
  92. } catch (error) {
  93. console.error("Error scheduling or waiting for orchestration:", error);
  94. }
  95. // stop worker and client
  96. await workflowRuntime.stop();
  97. await workflowClient.stop();
  98. // stop the dapr side car
  99. process.exit(0);
  100. }
  101. async function promptForApproval(approver: string, workflowClient: DaprWorkflowClient, id: string) {
  102. if (readlineSync.keyInYN("Press [Y] to approve the order... Y/yes, N/no")) {
  103. const approvalEvent = { approver: approver };
  104. await workflowClient.raiseEvent(id, "approval_received", approvalEvent);
  105. } else {
  106. return "Order rejected";
  107. }
  108. }
  109. start().catch((e) => {
  110. console.error(e);
  111. process.exit(1);
  112. });
  1. public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
  2. {
  3. // ...(other steps)...
  4. // Require orders over a certain threshold to be approved
  5. if (order.TotalCost > OrderApprovalThreshold)
  6. {
  7. try
  8. {
  9. // Request human approval for this order
  10. await context.CallActivityAsync(nameof(RequestApprovalActivity), order);
  11. // Pause and wait for a human to approve the order
  12. ApprovalResult approvalResult = await context.WaitForExternalEventAsync<ApprovalResult>(
  13. eventName: "ManagerApproval",
  14. timeout: TimeSpan.FromDays(3));
  15. if (approvalResult == ApprovalResult.Rejected)
  16. {
  17. // The order was rejected, end the workflow here
  18. return new OrderResult(Processed: false);
  19. }
  20. }
  21. catch (TaskCanceledException)
  22. {
  23. // An approval timeout results in automatic order cancellation
  24. return new OrderResult(Processed: false);
  25. }
  26. }
  27. // ...(other steps)...
  28. // End the workflow with a success result
  29. return new OrderResult(Processed: true);
  30. }

注意 在上面的示例中,RequestApprovalActivity是要调用的工作流活动的名称,ApprovalResult是工作流应用程序定义的枚举。 为简洁起见,示例代码中未包含这些定义。

  1. public class ExternalSystemInteractionWorkflow extends Workflow {
  2. @Override
  3. public WorkflowStub create() {
  4. return ctx -> {
  5. // ...other steps...
  6. Integer orderCost = ctx.getInput(int.class);
  7. // Require orders over a certain threshold to be approved
  8. if (orderCost > ORDER_APPROVAL_THRESHOLD) {
  9. try {
  10. // Request human approval for this order
  11. ctx.callActivity("RequestApprovalActivity", orderCost, Void.class).await();
  12. // Pause and wait for a human to approve the order
  13. boolean approved = ctx.waitForExternalEvent("ManagerApproval", Duration.ofDays(3), boolean.class).await();
  14. if (!approved) {
  15. // The order was rejected, end the workflow here
  16. ctx.complete("Process reject");
  17. }
  18. } catch (TaskCanceledException e) {
  19. // An approval timeout results in automatic order cancellation
  20. ctx.complete("Process cancel");
  21. }
  22. }
  23. // ...other steps...
  24. // End the workflow with a success result
  25. ctx.complete("Process approved");
  26. };
  27. }
  28. }
  1. type Order struct {
  2. Cost float64 `json:"cost"`
  3. Product string `json:"product"`
  4. Quantity int `json:"quantity"`
  5. }
  6. type Approval struct {
  7. Approver string `json:"approver"`
  8. }
  9. func PurchaseOrderWorkflow(ctx *workflow.WorkflowContext) (any, error) {
  10. var order Order
  11. if err := ctx.GetInput(&order); err != nil {
  12. return "", err
  13. }
  14. // Orders under $1000 are auto-approved
  15. if order.Cost < 1000 {
  16. return "Auto-approved", nil
  17. }
  18. // Orders of $1000 or more require manager approval
  19. if err := ctx.CallActivity(SendApprovalRequest, workflow.ActivityInput(order)).Await(nil); err != nil {
  20. return "", err
  21. }
  22. // Approvals must be received within 24 hours or they will be cancelled
  23. var approval Approval
  24. if err := ctx.WaitForExternalEvent("approval_received", time.Hour*24).Await(&approval); err != nil {
  25. // Assuming that a timeout has taken place - in any case; an error.
  26. return "error/cancelled", err
  27. }
  28. // The order was approved
  29. if err := ctx.CallActivity(PlaceOrder, workflow.ActivityInput(order)).Await(nil); err != nil {
  30. return "", err
  31. }
  32. return fmt.Sprintf("Approved by %s", approval.Approver), nil
  33. }
  34. func SendApprovalRequest(ctx workflow.ActivityContext) (any, error) {
  35. var order Order
  36. if err := ctx.GetInput(&order); err != nil {
  37. return "", err
  38. }
  39. fmt.Printf("*** Sending approval request for order: %v\n", order)
  40. return "", nil
  41. }
  42. func PlaceOrder(ctx workflow.ActivityContext) (any, error) {
  43. var order Order
  44. if err := ctx.GetInput(&order); err != nil {
  45. return "", err
  46. }
  47. fmt.Printf("*** Placing order: %v", order)
  48. return "", nil
  49. }

传递事件以恢复工作流执行的代码是工作流的外部代码。 工作流事件可通过 raise event 工作流管理 API 传递到等待中的工作流实例,如下例所示:

  1. from dapr.clients import DaprClient
  2. from dataclasses import asdict
  3. with DaprClient() as d:
  4. d.raise_workflow_event(
  5. instance_id=instance_id,
  6. workflow_component="dapr",
  7. event_name="approval_received",
  8. event_data=asdict(Approval("Jane Doe")))
  1. import { DaprClient } from "@dapr/dapr";
  2. public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) {
  3. this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload);
  4. }
  1. // Raise the workflow event to the waiting workflow
  2. await daprClient.RaiseWorkflowEventAsync(
  3. instanceId: orderId,
  4. workflowComponent: "dapr",
  5. eventName: "ManagerApproval",
  6. eventData: ApprovalResult.Approved);
  1. System.out.println("**SendExternalMessage: RestartEvent**");
  2. client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload");
  1. func raiseEvent() {
  2. daprClient, err := client.NewClient()
  3. if err != nil {
  4. log.Fatalf("failed to initialize the client")
  5. }
  6. err = daprClient.RaiseEventWorkflowBeta1(context.Background(), &client.RaiseEventWorkflowRequest{
  7. InstanceID: "instance_id",
  8. WorkflowComponent: "dapr",
  9. EventName: "approval_received",
  10. EventData: Approval{
  11. Approver: "Jane Doe",
  12. },
  13. })
  14. if err != nil {
  15. log.Fatalf("failed to raise event on workflow")
  16. }
  17. log.Println("raised an event on specified workflow")
  18. }

外部事件不一定由人类直接触发。 它们也可以由其他系统触发。 例如,工作流可能需要暂停,等待收到付款。 在这种情况下,支付系统可能会在收到付款时将事件发布到发布/订阅主题,并且该主题的监听器可以使用 发起事件工作流 API.

下一步

工作流模式 >>

相关链接