指南:如何管理工作流

管理和运行工作流

注意

Dapr工作流目前处于beta阶段。 查看已知限制 1.13.0.

现在,您已经在应用程序中编写了工作流及其活动,可以使用HTTP API调用来启动、终止和获取有关工作流的信息。 更多信息,请阅读 工作流 API 参考

在代码中管理工作流。 在编写工作流指南中的工作流示例中,工作流是使用以下API在代码中注册的:

  • start_workflow: 启动一个工作流实例
  • get_workflow: 获取工作流的状态信息
  • pause_workflow: 暂停或中止一个工作流实例,稍后可恢复该实例
  • resume_workflow: 恢复暂停的工作流实例
  • raise_workflow_event: 在工作流中引发事件
  • purge_workflow: 删除与特定工作流实例相关的所有元数据
  • terminate_workflow: 终止或停止工作流的特定实例
  1. from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
  2. from dapr.clients import DaprClient
  3. # Sane parameters
  4. instanceId = "exampleInstanceID"
  5. workflowComponent = "dapr"
  6. workflowName = "hello_world_wf"
  7. eventName = "event1"
  8. eventData = "eventData"
  9. # Start the workflow
  10. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  11. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  12. # Get info on the workflow
  13. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  14. # Pause the workflow
  15. d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  16. # Resume the workflow
  17. d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  18. # Raise an event on the workflow.
  19. d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
  20. event_name=eventName, event_data=eventData)
  21. # Purge the workflow
  22. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  23. # Terminate the workflow
  24. d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)

在代码中管理工作流。 在编写工作流指南中的工作流示例中,工作流是使用以下API在代码中注册的:

  • client.workflow.start: 启动一个工作流实例
  • client.workflow.get: 获取工作流的状态信息
  • client.workflow.pause: 暂停或中止一个工作流实例,稍后可恢复该实例
  • client.workflow.resume: 恢复暂停的工作流实例
  • client.workflow.purge: 删除与特定工作流实例相关的所有元数据
  • client.workflow.terminate: 终止或停止工作流的特定实例
  1. import { DaprClient } from "@dapr/dapr";
  2. async function printWorkflowStatus(client: DaprClient, instanceId: string) {
  3. const workflow = await client.workflow.get(instanceId);
  4. console.log(
  5. `Workflow ${workflow.workflowName}, created at ${workflow.createdAt.toUTCString()}, has status ${
  6. workflow.runtimeStatus
  7. }`,
  8. );
  9. console.log(`Additional properties: ${JSON.stringify(workflow.properties)}`);
  10. console.log("--------------------------------------------------\n\n");
  11. }
  12. async function start() {
  13. const client = new DaprClient();
  14. // Start a new workflow instance
  15. const instanceId = await client.workflow.start("OrderProcessingWorkflow", {
  16. Name: "Paperclips",
  17. TotalCost: 99.95,
  18. Quantity: 4,
  19. });
  20. console.log(`Started workflow instance ${instanceId}`);
  21. await printWorkflowStatus(client, instanceId);
  22. // Pause a workflow instance
  23. await client.workflow.pause(instanceId);
  24. console.log(`Paused workflow instance ${instanceId}`);
  25. await printWorkflowStatus(client, instanceId);
  26. // Resume a workflow instance
  27. await client.workflow.resume(instanceId);
  28. console.log(`Resumed workflow instance ${instanceId}`);
  29. await printWorkflowStatus(client, instanceId);
  30. // Terminate a workflow instance
  31. await client.workflow.terminate(instanceId);
  32. console.log(`Terminated workflow instance ${instanceId}`);
  33. await printWorkflowStatus(client, instanceId);
  34. // Wait for the workflow to complete, 30 seconds!
  35. await new Promise((resolve) => setTimeout(resolve, 30000));
  36. await printWorkflowStatus(client, instanceId);
  37. // Purge a workflow instance
  38. await client.workflow.purge(instanceId);
  39. console.log(`Purged workflow instance ${instanceId}`);
  40. // This will throw an error because the workflow instance no longer exists.
  41. await printWorkflowStatus(client, instanceId);
  42. }
  43. start().catch((e) => {
  44. console.error(e);
  45. process.exit(1);
  46. });

在代码中管理工作流。 在OrderProcessingWorkflow示例中,来自编写工作流指南,工作流在代码中注册。 现在,您可以启动、终止正在运行的工作流程,并获取相关信息:

  1. string orderId = "exampleOrderId";
  2. string workflowComponent = "dapr";
  3. string workflowName = "OrderProcessingWorkflow";
  4. OrderPayload input = new OrderPayload("Paperclips", 99.95);
  5. Dictionary<string, string> workflowOptions; // This is an optional parameter
  6. // Start the workflow. This returns back a "StartWorkflowResponse" which contains the instance ID for the particular workflow instance.
  7. StartWorkflowResponse startResponse = await daprClient.StartWorkflowAsync(orderId, workflowComponent, workflowName, input, workflowOptions);
  8. // Get information on the workflow. This response contains information such as the status of the workflow, when it started, and more!
  9. GetWorkflowResponse getResponse = await daprClient.GetWorkflowAsync(orderId, workflowComponent, eventName);
  10. // Terminate the workflow
  11. await daprClient.TerminateWorkflowAsync(orderId, workflowComponent);
  12. // Raise an event (an incoming purchase order) that your workflow will wait for. This returns the item waiting to be purchased.
  13. await daprClient.RaiseWorkflowEventAsync(orderId, workflowComponent, workflowName, input);
  14. // Pause
  15. await daprClient.PauseWorkflowAsync(orderId, workflowComponent);
  16. // Resume
  17. await daprClient.ResumeWorkflowAsync(orderId, workflowComponent);
  18. // Purge the workflow, removing all inbox and history information from associated instance
  19. await daprClient.PurgeWorkflowAsync(orderId, workflowComponent);

在代码中管理工作流。 在Java SDK的工作流示例中,工作流是使用以下API在代码中注册的:

  • scheduleNewWorkflow: 启动新的工作流实例
  • getInstanceState: 获取工作流的状态信息
  • waitForInstanceStart: 暂停或中止一个工作流实例,稍后可恢复该实例
  • raiseEvent: 为正在运行的工作流实例触发事件/任务
  • waitForInstanceCompletion: 等待工作流完成其任务
  • purgeInstance: 删除与特定工作流实例相关的所有元数据
  • terminateWorkflow: 终止工作流程
  • purgeInstance: 删除与特定工作流相关的所有元数据
  1. package io.dapr.examples.workflows;
  2. import io.dapr.workflows.client.DaprWorkflowClient;
  3. import io.dapr.workflows.client.WorkflowInstanceStatus;
  4. // ...
  5. public class DemoWorkflowClient {
  6. // ...
  7. public static void main(String[] args) throws InterruptedException {
  8. DaprWorkflowClient client = new DaprWorkflowClient();
  9. try (client) {
  10. // Start a workflow
  11. String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data");
  12. // Get status information on the workflow
  13. WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true);
  14. // Wait or pause for the workflow instance start
  15. try {
  16. WorkflowInstanceStatus waitForInstanceStartResult =
  17. client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true);
  18. }
  19. // Raise an event for the workflow; you can raise several events in parallel
  20. client.raiseEvent(instanceId, "TestEvent", "TestEventPayload");
  21. client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload");
  22. client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload");
  23. client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload");
  24. // Wait for workflow to complete running through tasks
  25. try {
  26. WorkflowInstanceStatus waitForInstanceCompletionResult =
  27. client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true);
  28. }
  29. // Purge the workflow instance, removing all metadata associated with it
  30. boolean purgeResult = client.purgeInstance(instanceId);
  31. // Terminate the workflow instance
  32. client.terminateWorkflow(instanceToTerminateId, null);
  33. System.exit(0);
  34. }
  35. }

在代码中管理工作流。 在Go SDK的工作流示例中,工作流是使用以下API在代码中注册的:

  • StartWorkflow: 启动新的工作流实例
  • GetWorkflow: 获取工作流的状态信息
  • PauseWorkflow: 暂停或中止一个工作流实例,稍后可恢复该实例
  • RaiseEventWorkflow: 为正在运行的工作流实例触发事件/任务
  • ResumeWorkflow: 等待工作流完成其任务
  • PurgeWorkflow:删除与特定工作流实例相关的所有元数据
  • TerminateWorkflow: 终止工作流程
  1. // Start workflow
  2. type StartWorkflowRequest struct {
  3. InstanceID string // Optional instance identifier
  4. WorkflowComponent string
  5. WorkflowName string
  6. Options map[string]string // Optional metadata
  7. Input any // Optional input
  8. SendRawInput bool // Set to True in order to disable serialization on the input
  9. }
  10. type StartWorkflowResponse struct {
  11. InstanceID string
  12. }
  13. // Get the workflow status
  14. type GetWorkflowRequest struct {
  15. InstanceID string
  16. WorkflowComponent string
  17. }
  18. type GetWorkflowResponse struct {
  19. InstanceID string
  20. WorkflowName string
  21. CreatedAt time.Time
  22. LastUpdatedAt time.Time
  23. RuntimeStatus string
  24. Properties map[string]string
  25. }
  26. // Purge workflow
  27. type PurgeWorkflowRequest struct {
  28. InstanceID string
  29. WorkflowComponent string
  30. }
  31. // Terminate workflow
  32. type TerminateWorkflowRequest struct {
  33. InstanceID string
  34. WorkflowComponent string
  35. }
  36. // Pause workflow
  37. type PauseWorkflowRequest struct {
  38. InstanceID string
  39. WorkflowComponent string
  40. }
  41. // Resume workflow
  42. type ResumeWorkflowRequest struct {
  43. InstanceID string
  44. WorkflowComponent string
  45. }
  46. // Raise an event for the running workflow
  47. type RaiseEventWorkflowRequest struct {
  48. InstanceID string
  49. WorkflowComponent string
  50. EventName string
  51. EventData any
  52. SendRawData bool // Set to True in order to disable serialization on the data
  53. }

使用 HTTP 调用管理工作流。 下面的示例插入了 编写工作流示例 中的属性,并随机设置了实例 ID 号。

启动工作流

要使用 ID 12345678 启动工作流,请运行:

  1. POST http://localhost:3500/v1.0-beta1/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678

请注意,工作流实例 ID 只能包含字母数字字符、下划线和破折号。

终止工作流程

要使用 ID 12345678 终止工作流,请运行:

  1. POST http://localhost:3500/v1.0-beta1/workflows/dapr/12345678/terminate

引发事件

对于支持订阅外部事件的工作流组件(如 Dapr 工作流引擎),您可以使用以下 “raise event” API 向特定工作流实例发送已命名的事件。

  1. POST http://localhost:3500/v1.0-beta1/workflows/<workflowComponentName>/<instanceID>/raiseEvent/<eventName>

eventName 可以是任何函数。

暂停或恢复工作流

若要规划停机时间、等待输入等,可以暂停,然后恢复工作流。 要暂停一个带有ID 12345678的工作流,直到触发恢复,请运行:

  1. POST http://localhost:3500/v1.0-beta1/workflows/dapr/12345678/pause

要恢复一个带有 ID 12345678 的工作流,请运行:

  1. POST http://localhost:3500/v1.0-beta1/workflows/dapr/12345678/resume

清除工作流

清除 API 可用于从底层状态存储中永久删除工作流元数据,包括任何已存储的输入、输出和工作流历史记录。 这通常有助于实施数据保留政策和释放资源。

只有处于 “已完成”、“已失败 “或 “已终止 “状态的工作流实例才能被清除。 如果工作流处于任何其他状态,调用清除会返回错误。

  1. POST http://localhost:3500/v1.0-beta1/workflows/dapr/12345678/purge

获取工作流程信息

要获取工作流信息(输出和输入)的 ID 为 12345678,请运行:

  1. GET http://localhost:3500/v1.0-beta1/workflows/dapr/12345678

了解有关这些HTTP调用的更多信息,请参阅工作流API参考指南

下一步