Getting started with the Dapr Python gRPC service extension

How to get up and running with the Dapr Python gRPC extension package

The Dapr Python SDK provides a built in gRPC server extension module, dapr.ext.grpc, for creating Dapr services.

Installation

You can download and install the Dapr gRPC server extension module with:

  1. pip install dapr-ext-grpc

Note

  1. The development package will contain features and behavior that will be compatible with the pre-release version of the Dapr runtime. Make sure to uninstall any stable versions of the Python SDK extension before installing the dapr-dev package.
  1. pip3 install dapr-ext-grpc-dev

Examples

The App object can be used to create a server.

Listen for service invocation requests

The InvokeMethodReqest and InvokeMethodResponse objects can be used to handle incoming requests.

A simple service that will listen and respond to requests will look like:

  1. from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse
  2. app = App()
  3. @app.method(name='my-method')
  4. def mymethod(request: InvokeMethodRequest) -> InvokeMethodResponse:
  5. print(request.metadata, flush=True)
  6. print(request.text(), flush=True)
  7. return InvokeMethodResponse(b'INVOKE_RECEIVED', "text/plain; charset=UTF-8")
  8. app.run(50051)

A full sample can be found here.

Subscribe to a topic

When subscribing to a topic, you can instruct dapr whether the event delivered has been accepted, or whether it should be dropped, or retried later.

  1. from typing import Optional
  2. from cloudevents.sdk.event import v1
  3. from dapr.ext.grpc import App
  4. from dapr.clients.grpc._response import TopicEventResponse
  5. app = App()
  6. # Default subscription for a topic
  7. @app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
  8. def mytopic(event: v1.Event) -> Optional[TopicEventResponse]:
  9. print(event.Data(),flush=True)
  10. # Returning None (or not doing a return explicitly) is equivalent
  11. # to returning a TopicEventResponse("success").
  12. # You can also return TopicEventResponse("retry") for dapr to log
  13. # the message and retry delivery later, or TopicEventResponse("drop")
  14. # for it to drop the message
  15. return TopicEventResponse("success")
  16. # Specific handler using Pub/Sub routing
  17. @app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
  18. rule=Rule("event.type == \"important\"", 1))
  19. def mytopic_important(event: v1.Event) -> None:
  20. print(event.Data(),flush=True)
  21. # Handler with disabled topic validation
  22. @app.subscribe(pubsub_name='pubsub-mqtt', topic='topic/#', disable_topic_validation=True,)
  23. def mytopic_wildcard(event: v1.Event) -> None:
  24. print(event.Data(),flush=True)
  25. app.run(50051)

A full sample can be found here.

Setup input binding trigger

  1. from dapr.ext.grpc import App, BindingRequest
  2. app = App()
  3. @app.binding('kafkaBinding')
  4. def binding(request: BindingRequest):
  5. print(request.text(), flush=True)
  6. app.run(50051)

A full sample can be found here.