开始使用 Dapr 客户端 Python SDK

如何使用 Dapr Python SDK 启动和运行

Dapr 客户端包允许您从 Python 应用程序中与其他 Dapr 应用程序进行交互。

注意

如果你还没有,请尝试使用一个快速入门快速了解如何使用 Dapr Python SDK 与 API 构建块。

前期准备

在开始之前,请安装 Dapr Python 包

导入客户端包

dapr 包包含 DaprClient,该工具包将用于创建和使用客户端。

  1. from dapr.clients import DaprClient

初始化客户端

您可以以多种方式初始化 Dapr 客户端:

默认值:

当您在不带任何参数的情况下初始化客户端时,它将使用默认值作为 Dapr sidecar 实例(127.0.0.1:50001)。

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. # use the client

在初始化时指定一个端点:

当作为构造函数的参数传递时,gRPC端点优先于任何配置或环境变量。

  1. from dapr.clients import DaprClient
  2. with DaprClient("mydomain:50051?tls=true") as d:
  3. # use the client

环境变量:

Dapr Sidecar 终端点

您可以使用标准化的DAPR_GRPC_ENDPOINT环境变量来指定gRPC端点。 当设置了这个变量时,客户端可以在没有任何参数的情况下进行初始化:

  1. export DAPR_GRPC_ENDPOINT="mydomain:50051?tls=true"
  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. # the client will use the endpoint specified in the environment variables

遗留的环境变量 DAPR_RUNTIME_HOSTDAPR_HTTP_PORTDAPR_GRPC_PORT 也被支持,但是 DAPR_GRPC_ENDPOINT 优先级更高。

Dapr API 令牌

如果您的 Dapr 实例配置需要 DAPR_API_TOKEN 环境变量,您可以在环境中设置它,客户端将自动使用它。
您可以在此处阅读有关 Dapr API 令牌身份验证的更多信息(链接)。

健康超时

在客户端初始化时,会对Dapr sidecar(/healthz/outboud)进行健康检查。 客户端将在 sidecar 启动并运行后继续进行。

默认超时时间为60秒,但可以通过设置DAPR_HEALTH_TIMEOUT环境变量来覆盖。

错误处理

最初,Dapr中的错误遵循了标准的gRPC错误模型。 然而,为了提供更详细和信息丰富的错误消息,在版本1.13中引入了一个增强的错误模型,与gRPC的更丰富的错误模型保持一致。 作为回应,Python SDK 实现了 DaprGrpcError,一个专门设计用于改善开发者体验的自定义异常类。
需要注意的是,将所有gRPC状态异常转换为DaprGrpcError仍在进行中。 目前,SDK中并非每个API调用都已更新以利用此自定义异常。 我们正在积极推进这项改进,并欢迎社区的贡献。

使用 Dapr python-SDK 时处理 DaprGrpcError 异常的示例:

  1. try:
  2. d.save_state(store_name=storeName, key=key, value=value)
  3. except DaprGrpcError as err:
  4. print(f'Status code: {err.code()}')
  5. print(f"Message: {err.message()}")
  6. print(f"Error code: {err.error_code()}")
  7. print(f"Error info(reason): {err.error_info.reason}")
  8. print(f"Resource info (resource type): {err.resource_info.resource_type}")
  9. print(f"Resource info (resource name): {err.resource_info.resource_name}")
  10. print(f"Bad request (field): {err.bad_request.field_violations[0].field}")
  11. print(f"Bad request (description): {err.bad_request.field_violations[0].description}")

构建块

Python SDK 允许您与所有的Dapr构建块}进行接口交互。

调用服务

Dapr Python SDK 提供了一个简单的 API,用于通过 HTTP 或 gRPC(已弃用)调用服务。 可以通过设置 DAPR_API_METHOD_INVOCATION_PROTOCOL 环境变量来选择协议,默认情况下未设置时为HTTP。 在 Dapr 中,GRPC 服务调用已被弃用,推荐使用 GRPC 代理作为替代方案。

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. # invoke a method (gRPC or HTTP GET)
  4. resp = d.invoke_method('service-to-invoke', 'method-to-invoke', data='{"message":"Hello World"}')
  5. # for other HTTP verbs the verb must be specified
  6. # invoke a 'POST' method (HTTP only)
  7. resp = d.invoke_method('service-to-invoke', 'method-to-invoke', data='{"id":"100", "FirstName":"Value", "LastName":"Value"}', http_verb='post')

HTTP api调用的基本终结点在DAPR_HTTP_ENDPOINT环境变量中指定。 如果未设置此变量,则端点值将从DAPR_RUNTIME_HOSTDAPR_HTTP_PORT变量中派生,其默认值分别为127.0.0.13500

gRPC调用的基本终端点是用于客户端初始化的终端点(上面解释了)。

保存和获取应用程序状态

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. # Save state
  4. d.save_state(store_name="statestore", key="key1", value="value1")
  5. # Get state
  6. data = d.get_state(store_name="statestore", key="key1").data
  7. # Delete state
  8. d.delete_state(store_name="statestore", key="key1")

查询应用程序状态(Alpha)

  1. from dapr import DaprClient
  2. query = '''
  3. {
  4. "filter": {
  5. "EQ": { "state": "CA" }
  6. },
  7. "sort": [
  8. {
  9. "key": "person.id",
  10. "order": "DESC"
  11. }
  12. ]
  13. }
  14. '''
  15. with DaprClient() as d:
  16. resp = d.query_state(
  17. store_name='state_store',
  18. query=query,
  19. states_metadata={"metakey": "metavalue"}, # optional
  20. )

发布和订阅消息

发布消息

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. resp = d.publish_event(pubsub_name='pubsub', topic_name='TOPIC_A', data='{"message":"Hello World"}')

订阅消息

  1. from cloudevents.sdk.event import v1
  2. from dapr.ext.grpc import App
  3. import json
  4. app = App()
  5. # Default subscription for a topic
  6. @app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
  7. def mytopic(event: v1.Event) -> None:
  8. data = json.loads(event.Data())
  9. print(f'Received: id={data["id"]}, message="{data ["message"]}"'
  10. ' content_type="{event.content_type}"',flush=True)
  11. # Specific handler using Pub/Sub routing
  12. @app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
  13. rule=Rule("event.type == \"important\"", 1))
  14. def mytopic_important(event: v1.Event) -> None:
  15. data = json.loads(event.Data())
  16. print(f'Received: id={data["id"]}, message="{data ["message"]}"'
  17. ' content_type="{event.content_type}"',flush=True)

与输出绑定交互

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. resp = d.invoke_binding(binding_name='kafkaBinding', operation='create', data='{"message":"Hello World"}')

检索密钥

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. resp = d.get_secret(store_name='localsecretstore', key='secretKey')

Configuration

获取配置

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. # Get Configuration
  4. configuration = d.get_configuration(store_name='configurationstore', keys=['orderId'], config_metadata={})

订阅配置

  1. import asyncio
  2. from time import sleep
  3. from dapr.clients import DaprClient
  4. async def executeConfiguration():
  5. with DaprClient() as d:
  6. storeName = 'configurationstore'
  7. key = 'orderId'
  8. # Wait for sidecar to be up within 20 seconds.
  9. d.wait(20)
  10. # Subscribe to configuration by key.
  11. configuration = await d.subscribe_configuration(store_name=storeName, keys=[key], config_metadata={})
  12. while True:
  13. if configuration != None:
  14. items = configuration.get_items()
  15. for key, item in items:
  16. print(f"Subscribe key={key} value={item.value} version={item.version}", flush=True)
  17. else:
  18. print("Nothing yet")
  19. sleep(5)
  20. asyncio.run(executeConfiguration())

分布式锁

  1. from dapr.clients import DaprClient
  2. def main():
  3. # Lock parameters
  4. store_name = 'lockstore' # as defined in components/lockstore.yaml
  5. resource_id = 'example-lock-resource'
  6. client_id = 'example-client-id'
  7. expiry_in_seconds = 60
  8. with DaprClient() as dapr:
  9. print('Will try to acquire a lock from lock store named [%s]' % store_name)
  10. print('The lock is for a resource named [%s]' % resource_id)
  11. print('The client identifier is [%s]' % client_id)
  12. print('The lock will will expire in %s seconds.' % expiry_in_seconds)
  13. with dapr.try_lock(store_name, resource_id, client_id, expiry_in_seconds) as lock_result:
  14. assert lock_result.success, 'Failed to acquire the lock. Aborting.'
  15. print('Lock acquired successfully!!!')
  16. # At this point the lock was released - by magic of the `with` clause ;)
  17. unlock_result = dapr.unlock(store_name, resource_id, client_id)
  18. print('We already released the lock so unlocking will not work.')
  19. print('We tried to unlock it anyway and got back [%s]' % unlock_result.status)

Workflow

  1. from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
  2. from dapr.clients import DaprClient
  3. instanceId = "exampleInstanceID"
  4. workflowComponent = "dapr"
  5. workflowName = "hello_world_wf"
  6. eventName = "event1"
  7. eventData = "eventData"
  8. def main():
  9. with DaprClient() as d:
  10. host = settings.DAPR_RUNTIME_HOST
  11. port = settings.DAPR_GRPC_PORT
  12. workflowRuntime = WorkflowRuntime(host, port)
  13. workflowRuntime = WorkflowRuntime()
  14. workflowRuntime.register_workflow(hello_world_wf)
  15. workflowRuntime.register_activity(hello_act)
  16. workflowRuntime.start()
  17. # Start the workflow
  18. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  19. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  20. print(f"start_resp {start_resp.instance_id}")
  21. # ...
  22. # Pause Test
  23. d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  24. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  25. print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
  26. # Resume Test
  27. d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  28. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  29. print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
  30. sleep(1)
  31. # Raise event
  32. d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
  33. event_name=eventName, event_data=eventData)
  34. sleep(5)
  35. # Purge Test
  36. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  37. try:
  38. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  39. except DaprInternalError as err:
  40. if nonExistentIDError in err._message:
  41. print("Instance Successfully Purged")
  42. # Kick off another workflow for termination purposes
  43. # This will also test using the same instance ID on a new workflow after
  44. # the old instance was purged
  45. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  46. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  47. print(f"start_resp {start_resp.instance_id}")
  48. # Terminate Test
  49. d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  50. sleep(1)
  51. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  52. print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
  53. # Purge Test
  54. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  55. try:
  56. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  57. except DaprInternalError as err:
  58. if nonExistentIDError in err._message:
  59. print("Instance Successfully Purged")
  60. workflowRuntime.shutdown()

相关链接

Python SDK示例