Declarative and programmatic subscription methods
Learn more about the methods by which Dapr allows you to subscribe to topics.
Pub/sub API subscription methods
Dapr applications can subscribe to published topics via two methods that support the same features: declarative and programmatic.
| Subscription method | Description |
|---|---|
| Declarative | Subscription is defined in an external file. The declarative approach removes the Dapr dependency from your code and allows for existing applications to subscribe to topics, without having to change code. |
| Programmatic | Subscription is defined in the application code. The programmatic approach implements the subscription in your code. |
The examples below demonstrate pub/sub messaging between a checkout app and an orderprocessing app via the orders topic. The examples demonstrate the same Dapr pub/sub component used first declaratively, then programmatically.
Declarative subscriptions
You can subscribe declaratively to a topic using an external component file. This example uses a YAML component file named subscription.yaml:
apiVersion: dapr.io/v1alpha1kind: Subscriptionmetadata:name: orderspec:topic: ordersroute: /checkoutpubsubname: pubsubscopes:- orderprocessing- checkout
Here the subscription called order:
- Uses the pub/sub component called
pubsubto subscribes to the topic calledorders. - Sets the
routefield to send all topic messages to the/checkoutendpoint in the app. - Sets
scopesfield to scope this subscription for access only by apps with IDsorderprocessingandcheckout.
When running Dapr, set the YAML component file path to point Dapr to the component.
dapr run --app-id myapp --components-path ./myComponents -- dotnet run
dapr run --app-id myapp --components-path ./myComponents -- mvn spring-boot:run
dapr run --app-id myapp --components-path ./myComponents -- python3 app.py
dapr run --app-id myapp --components-path ./myComponents -- npm start
dapr run --app-id myapp --components-path ./myComponents -- go run app.go
In Kubernetes, apply the component to the cluster:
kubectl apply -f subscription.yaml
In your application code, subscribe to the topic specified in the Dapr pub/sub component.
//Subscribe to a topic[Topic("pubsub", "orders")][HttpPost("checkout")]public void getCheckout([FromBody] int orderId){Console.WriteLine("Subscriber received : " + orderId);}
//Subscribe to a topic@Topic(name = "orders", pubsubName = "pubsub")@PostMapping(path = "/checkout")public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {return Mono.fromRunnable(() -> {try {log.info("Subscriber received: " + cloudEvent.getData());} catch (Exception e) {throw new RuntimeException(e);}});}
#Subscribe to a topic@app.subscribe(pubsub_name='pubsub', topic='orders')def mytopic(event: v1.Event) -> None:data = json.loads(event.Data())logging.info('Subscriber received: ' + str(data))app.run(6002)
//Subscribe to a topicawait server.pubsub.subscribe("pubsub", "orders", async (orderId) => {console.log(`Subscriber received: ${JSON.stringify(orderId)}`)});await server.startServer();
//Subscribe to a topicif err := s.AddTopicEventHandler(sub, eventHandler); err != nil {log.Fatalf("error adding topic subscription: %v", err)}if err := s.Start(); err != nil && err != http.ErrServerClosed {log.Fatalf("error listenning: %v", err)}func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {log.Printf("Subscriber received: %s", e.Data)return false, nil}
The /checkout endpoint matches the route defined in the subscriptions and this is where Dapr sends all topic messages to.
Programmatic subscriptions
The programmatic approach returns the routes JSON structure within the code, unlike the declarative approach’s route YAML structure. In the example below, you define the values found in the declarative YAML subscription above within the application code.
[Topic("pubsub", "orders")][HttpPost("/checkout")]public async Task<ActionResult<Order>>Checkout(Order order, [FromServices] DaprClient daprClient){// Logicreturn order;}
or
// Dapr subscription in [Topic] routes orders topic to this routeapp.MapPost("/checkout", [Topic("pubsub", "orders")] (Order order) => {Console.WriteLine("Subscriber received : " + order);return Results.Ok(order);});
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();@Topic(name = "checkout", pubsubName = "pubsub")@PostMapping(path = "/orders")public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String> cloudEvent) {return Mono.fromRunnable(() -> {try {System.out.println("Subscriber received: " + cloudEvent.getData());System.out.println("Subscriber received: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));} catch (Exception e) {throw new RuntimeException(e);}});
@app.route('/dapr/subscribe', methods=['GET'])def subscribe():subscriptions = [{'pubsubname': 'pubsub','topic': 'checkout','routes': {'rules': [{'match': 'event.type == "order"','path': '/orders'},],'default': '/orders'}}]return jsonify(subscriptions)@app.route('/orders', methods=['POST'])def ds_subscriber():print(request.json, flush=True)return json.dumps({'success':True}), 200, {'ContentType':'application/json'}app.run()
const express = require('express')const bodyParser = require('body-parser')const app = express()app.use(bodyParser.json({ type: 'application/*+json' }));const port = 3000app.get('/dapr/subscribe', (req, res) => {res.json([{pubsubname: "pubsub",topic: "checkout",routes: {rules: [{match: 'event.type == "order"',path: '/orders'},],default: '/products'}}]);})app.post('/orders', (req, res) => {console.log(req.body);res.sendStatus(200);});app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
package main"encoding/json""fmt""log""net/http""github.com/gorilla/mux")const appPort = 3000type subscription struct {PubsubName string `json:"pubsubname"`Topic string `json:"topic"`Metadata map[string]string `json:"metadata,omitempty"`Routes routes `json:"routes"`}type routes struct {Rules []rule `json:"rules,omitempty"`Default string `json:"default,omitempty"`}type rule struct {Match string `json:"match"`Path string `json:"path"`}// This handles /dapr/subscribefunc configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {t := []subscription{{PubsubName: "pubsub",Topic: "checkout",Routes: routes{Rules: []rule{{Match: `event.type == "order"`,Path: "/orders",},},Default: "/orders",},},}w.WriteHeader(http.StatusOK)json.NewEncoder(w).Encode(t)}func main() {router := mux.NewRouter().StrictSlash(true)router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", appPort), router))}
Next Steps
- Try out the pub/sub Quickstart
- Follow: How-To: Configure pub/sub components with multiple namespaces
- Learn more about declarative and programmatic subscription methods.
- Learn about topic scoping
- Learn about message TTL
- Learn more about pub/sub with and without CloudEvent
- List of pub/sub components
- Read the pub/sub API reference
Last modified June 24, 2022: [Pub/sub subscriptions] Fix C# example (#2545) (b00ce037)