Getting started with the Dapr Workflow Python SDK

How to get up and running with workflows using the Dapr Python SDK

Let’s create a Dapr workflow and invoke it using the console. With the provided hello world workflow example, you will:

This example uses the default configuration from dapr init in self-hosted mode.

In the Python example project, the app.py file contains the setup of the app, including:

  • The workflow definition
  • The workflow activity definitions
  • The registration of the workflow and workflow activities

Prerequisites

Set up the environment

Run the following command to install the requirements for running this workflow sample with the Dapr Python SDK.

  1. pip3 install -r demo_workflow/requirements.txt

Clone the [Python SDK repo].

  1. git clone https://github.com/dapr/python-sdk.git

From the Python SDK root directory, navigate to the Dapr Workflow example.

  1. cd examples/demo_workflow

Run the application locally

To run the Dapr application, you need to start the Python program and a Dapr sidecar. In the terminal, run:

  1. dapr run --app-id orderapp --app-protocol grpc --dapr-grpc-port 50001 --resources-path components --placement-host-address localhost:50005 -- python3 app.py

Note: Since Python3.exe is not defined in Windows, you may need to use python app.py instead of python3 app.py.

Expected output

  1. == APP == ==========Start Counter Increase as per Input:==========
  2. == APP == start_resp exampleInstanceID
  3. == APP == Hi Counter!
  4. == APP == New counter value is: 1!
  5. == APP == Hi Counter!
  6. == APP == New counter value is: 11!
  7. == APP == Hi Counter!
  8. == APP == Hi Counter!
  9. == APP == Get response from hello_world_wf after pause call: Suspended
  10. == APP == Hi Counter!
  11. == APP == Get response from hello_world_wf after resume call: Running
  12. == APP == Hi Counter!
  13. == APP == New counter value is: 111!
  14. == APP == Hi Counter!
  15. == APP == Instance Successfully Purged
  16. == APP == start_resp exampleInstanceID
  17. == APP == Hi Counter!
  18. == APP == New counter value is: 1112!
  19. == APP == Hi Counter!
  20. == APP == New counter value is: 1122!
  21. == APP == Get response from hello_world_wf after terminate call: Terminated
  22. == APP == Instance Successfully Purged

What happened?

When you ran dapr run, the Dapr client:

  1. Registered the workflow (hello_world_wf) and its actvity (hello_act)
  2. Started the workflow engine
  1. def main():
  2. with DaprClient() as d:
  3. host = settings.DAPR_RUNTIME_HOST
  4. port = settings.DAPR_GRPC_PORT
  5. workflowRuntime = WorkflowRuntime(host, port)
  6. workflowRuntime = WorkflowRuntime()
  7. workflowRuntime.register_workflow(hello_world_wf)
  8. workflowRuntime.register_activity(hello_act)
  9. workflowRuntime.start()
  10. print("==========Start Counter Increase as per Input:==========")
  11. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  12. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  13. print(f"start_resp {start_resp.instance_id}")

Dapr then paused and resumed the workflow:

  1. # Pause
  2. d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  3. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  4. print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
  5. # Resume
  6. d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  7. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  8. print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")

Once the workflow resumed, Dapr raised a workflow event and printed the new counter value:

  1. # Raise event
  2. d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
  3. event_name=eventName, event_data=eventData)

To clear out the workflow state from your state store, Dapr purged the workflow:

  1. # Purge
  2. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  3. try:
  4. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  5. except DaprInternalError as err:
  6. if nonExistentIDError in err._message:
  7. print("Instance Successfully Purged")

The sample then demonstrated terminating a workflow by:

  • Starting a new workflow using the same instanceId as the purged workflow.
  • Terminating the workflow and purging before shutting down the workflow.
  1. # Kick off another workflow
  2. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  3. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  4. print(f"start_resp {start_resp.instance_id}")
  5. # Terminate
  6. d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  7. sleep(1)
  8. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  9. print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
  10. # Purge
  11. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  12. try:
  13. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  14. except DaprInternalError as err:
  15. if nonExistentIDError in err._message:
  16. print("Instance Successfully Purged")

Next steps