开始使用 Dapr Workflow Python SDK

如何使用 Dapr Python SDK 启动和运行工作流

注意

Dapr工作流目前处于alpha阶段。

让我们创建一个 Dapr 工作流,并使用控制台调用它。 通过提供的hello world工作流示例,您将:

这个示例使用dapr init中的默认配置,在自托管模式下。

在Python示例项目中,app.py文件包含了应用程序的设置,包括:

  • 工作流定义
  • 工作流活动定义
  • 工作流和工作流活动的注册

前期准备

设置环境

运行以下命令使用 Dapr Python SDK 安装运行此工作流示例所需的依赖项。

  1. pip3 install -r demo_workflow/requirements.txt

克隆[Python SDK存储库]。

  1. git clone https://github.com/dapr/python-sdk.git

从 Python SDK 根目录中,导航到 Dapr Workflow 示例。

  1. cd examples/demo_workflow

在本地运行应用程序

要运行 Dapr 应用程序,您需要启动 Python 程序和 Dapr sidecar。 在终端中运行:

  1. dapr run --app-id orderapp --app-protocol grpc --dapr-grpc-port 50001 --resources-path components --placement-host-address localhost:50005 -- python3 app.py

**注意:**由于Python3.exe在Windows中未定义,您可能需要使用python app.py替代python3 app.py

预期输出

  1. == APP == ==========Start Counter Increase as per Input:==========
  2. == APP == start_resp exampleInstanceID
  3. == APP == Hi Counter!
  4. == APP == New counter value is: 1!
  5. == APP == Hi Counter!
  6. == APP == New counter value is: 11!
  7. == APP == Hi Counter!
  8. == APP == Hi Counter!
  9. == APP == Get response from hello_world_wf after pause call: Suspended
  10. == APP == Hi Counter!
  11. == APP == Get response from hello_world_wf after resume call: Running
  12. == APP == Hi Counter!
  13. == APP == New counter value is: 111!
  14. == APP == Hi Counter!
  15. == APP == Instance Successfully Purged
  16. == APP == start_resp exampleInstanceID
  17. == APP == Hi Counter!
  18. == APP == New counter value is: 1112!
  19. == APP == Hi Counter!
  20. == APP == New counter value is: 1122!
  21. == APP == Get response from hello_world_wf after terminate call: Terminated
  22. == APP == Get response from child_wf after terminate call: Terminated
  23. == APP == Instance Successfully Purged

发生了什么?

当你运行 dapr run,Dapr 客户端:

  1. 注册了工作流(hello_world_wf)及其活动(hello_act
  2. 启动工作流引擎
  1. def main():
  2. with DaprClient() as d:
  3. host = settings.DAPR_RUNTIME_HOST
  4. port = settings.DAPR_GRPC_PORT
  5. workflowRuntime = WorkflowRuntime(host, port)
  6. workflowRuntime = WorkflowRuntime()
  7. workflowRuntime.register_workflow(hello_world_wf)
  8. workflowRuntime.register_activity(hello_act)
  9. workflowRuntime.start()
  10. print("==========Start Counter Increase as per Input:==========")
  11. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  12. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  13. print(f"start_resp {start_resp.instance_id}")

Dapr 然后暂停并恢复工作流:

  1. # Pause
  2. d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  3. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  4. print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
  5. # Resume
  6. d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  7. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  8. print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")

一旦工作流恢复,Dapr会触发一个工作流事件并打印新的计数器值:

  1. # Raise event
  2. d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
  3. event_name=eventName, event_data=eventData)

为了清除状态存储中的工作流状态,Dapr已经清除了工作流:

  1. # Purge
  2. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  3. try:
  4. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  5. except DaprInternalError as err:
  6. if nonExistentIDError in err._message:
  7. print("Instance Successfully Purged")

然后示范了通过以下方式终止工作流程:

  • 使用与清除的工作流相同的instanceId启动新的工作流程。
  • 在关闭工作流之前终止工作流并清除。
  1. # Kick off another workflow
  2. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  3. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  4. print(f"start_resp {start_resp.instance_id}")
  5. # Terminate
  6. d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  7. sleep(1)
  8. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  9. print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
  10. # Purge
  11. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  12. try:
  13. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  14. except DaprInternalError as err:
  15. if nonExistentIDError in err._message:
  16. print("Instance Successfully Purged")

下一步