Getting started with the Dapr client Python SDK

How to get up and running with the Dapr Python SDK

The Dapr client package allows you to interact with other Dapr applications from a Python application.

Note

If you haven’t already, try out one of the quickstarts for a quick walk-through on how to use the Dapr Python SDK with an API building block.

Prerequisites

Install the Dapr Python package before getting started.

Import the client package

The dapr package contains the DaprClient, which is used to create and use a client.

  1. from dapr.clients import DaprClient

Initialising the client

You can initialise a Dapr client in multiple ways:

Default values:

When you initialise the client without any parameters it will use the default values for a Dapr sidecar instance (127.0.0.1:50001).

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. # use the client

Specifying an endpoint on initialisation:

When passed as an argument in the constructor, the endpoint takes precedence over any configuration or environment variable.

  1. from dapr.clients import DaprClient
  2. with DaprClient("https://mydomain:4443") as d:
  3. # use the client

Specifying an endpoint in an environment variable:

You can use the standardised DAPR_GRPC_ENDPOINT and/or DAPR_HTTP_ENDPOINT environment variables to specify the endpoint. When these environment variables are set, the client can be initialised without any arguments.

  1. export DAPR_GRPC_ENDPOINT="https://mydomain:50051"
  2. export DAPR_HTTP_ENDPOINT="https://mydomain:443"

The legacy environment variables DAPR_RUNTIME_HOST, DAPR_HTTP_PORT and DAPR_GRPC_PORT are also supported, but DAPR_GRPC_ENDPOINT and DAPR_HTTP_ENDPOINT take precedence.

Building blocks

The Python SDK allows you to interface with all of the Dapr building blocks.

Invoke a service

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. # invoke a method (gRPC or HTTP GET)
  4. resp = d.invoke_method('service-to-invoke', 'method-to-invoke', data='{"message":"Hello World"}')
  5. # for other HTTP verbs the verb must be specified
  6. # invoke a 'POST' method (HTTP only)
  7. resp = d.invoke_method('service-to-invoke', 'method-to-invoke', data='{"id":"100", "FirstName":"Value", "LastName":"Value"}', http_verb='post')

Save & get application state

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. # Save state
  4. d.save_state(store_name="statestore", key="key1", value="value1")
  5. # Get state
  6. data = d.get_state(store_name="statestore", key="key1").data
  7. # Delete state
  8. d.delete_state(store_name="statestore", key="key1")

Query application state (Alpha)

  1. from dapr import DaprClient
  2. query = '''
  3. {
  4. "filter": {
  5. "EQ": { "state": "CA" }
  6. },
  7. "sort": [
  8. {
  9. "key": "person.id",
  10. "order": "DESC"
  11. }
  12. ]
  13. }
  14. '''
  15. with DaprClient() as d:
  16. resp = d.query_state(
  17. store_name='state_store',
  18. query=query,
  19. states_metadata={"metakey": "metavalue"}, # optional
  20. )

Publish & subscribe to messages

Publish messages

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. resp = d.publish_event(pubsub_name='pubsub', topic_name='TOPIC_A', data='{"message":"Hello World"}')

Subscribe to messages

  1. from cloudevents.sdk.event import v1
  2. from dapr.ext.grpc import App
  3. import json
  4. app = App()
  5. # Default subscription for a topic
  6. @app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
  7. def mytopic(event: v1.Event) -> None:
  8. data = json.loads(event.Data())
  9. print(f'Received: id={data["id"]}, message="{data ["message"]}"'
  10. ' content_type="{event.content_type}"',flush=True)
  11. # Specific handler using Pub/Sub routing
  12. @app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
  13. rule=Rule("event.type == \"important\"", 1))
  14. def mytopic_important(event: v1.Event) -> None:
  15. data = json.loads(event.Data())
  16. print(f'Received: id={data["id"]}, message="{data ["message"]}"'
  17. ' content_type="{event.content_type}"',flush=True)

Interact with output bindings

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. resp = d.invoke_binding(binding_name='kafkaBinding', operation='create', data='{"message":"Hello World"}')

Retrieve secrets

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. resp = d.get_secret(store_name='localsecretstore', key='secretKey')

Configuration

Get configuration

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. # Get Configuration
  4. configuration = d.get_configuration(store_name='configurationstore', keys=['orderId'], config_metadata={})

Subscribe to configuration

  1. import asyncio
  2. from time import sleep
  3. from dapr.clients import DaprClient
  4. async def executeConfiguration():
  5. with DaprClient() as d:
  6. storeName = 'configurationstore'
  7. key = 'orderId'
  8. # Wait for sidecar to be up within 20 seconds.
  9. d.wait(20)
  10. # Subscribe to configuration by key.
  11. configuration = await d.subscribe_configuration(store_name=storeName, keys=[key], config_metadata={})
  12. while True:
  13. if configuration != None:
  14. items = configuration.get_items()
  15. for key, item in items:
  16. print(f"Subscribe key={key} value={item.value} version={item.version}", flush=True)
  17. else:
  18. print("Nothing yet")
  19. sleep(5)
  20. asyncio.run(executeConfiguration())

Distributed Lock

  1. from dapr.clients import DaprClient
  2. def main():
  3. # Lock parameters
  4. store_name = 'lockstore' # as defined in components/lockstore.yaml
  5. resource_id = 'example-lock-resource'
  6. client_id = 'example-client-id'
  7. expiry_in_seconds = 60
  8. with DaprClient() as dapr:
  9. print('Will try to acquire a lock from lock store named [%s]' % store_name)
  10. print('The lock is for a resource named [%s]' % resource_id)
  11. print('The client identifier is [%s]' % client_id)
  12. print('The lock will will expire in %s seconds.' % expiry_in_seconds)
  13. with dapr.try_lock(store_name, resource_id, client_id, expiry_in_seconds) as lock_result:
  14. assert lock_result.success, 'Failed to acquire the lock. Aborting.'
  15. print('Lock acquired successfully!!!')
  16. # At this point the lock was released - by magic of the `with` clause ;)
  17. unlock_result = dapr.unlock(store_name, resource_id, client_id)
  18. print('We already released the lock so unlocking will not work.')
  19. print('We tried to unlock it anyway and got back [%s]' % unlock_result.status)

Workflow

  1. from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
  2. from dapr.clients import DaprClient
  3. instanceId = "exampleInstanceID"
  4. workflowComponent = "dapr"
  5. workflowName = "hello_world_wf"
  6. eventName = "event1"
  7. eventData = "eventData"
  8. def main():
  9. with DaprClient() as d:
  10. host = settings.DAPR_RUNTIME_HOST
  11. port = settings.DAPR_GRPC_PORT
  12. workflowRuntime = WorkflowRuntime(host, port)
  13. workflowRuntime = WorkflowRuntime()
  14. workflowRuntime.register_workflow(hello_world_wf)
  15. workflowRuntime.register_activity(hello_act)
  16. workflowRuntime.start()
  17. # Start the workflow
  18. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  19. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  20. print(f"start_resp {start_resp.instance_id}")
  21. # ...
  22. # Pause Test
  23. d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  24. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  25. print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
  26. # Resume Test
  27. d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  28. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  29. print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
  30. sleep(1)
  31. # Raise event
  32. d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
  33. event_name=eventName, event_data=eventData)
  34. sleep(5)
  35. # Purge Test
  36. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  37. try:
  38. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  39. except DaprInternalError as err:
  40. if nonExistentIDError in err._message:
  41. print("Instance Successfully Purged")
  42. # Kick off another workflow for termination purposes
  43. # This will also test using the same instance ID on a new workflow after
  44. # the old instance was purged
  45. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  46. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  47. print(f"start_resp {start_resp.instance_id}")
  48. # Terminate Test
  49. d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  50. sleep(1)
  51. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  52. print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
  53. # Purge Test
  54. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  55. try:
  56. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  57. except DaprInternalError as err:
  58. if nonExistentIDError in err._message:
  59. print("Instance Successfully Purged")
  60. workflowRuntime.shutdown()

Python SDK examples