How-To: Route messages to different event handlers

Learn how to route messages from a topic to different event handlers based on CloudEvent fields

Preview feature

Pub/Sub message routing is currently in preview.

Introduction

Content-based routing is a messaging pattern that utilizes a DSL instead of imperative application code. PubSub routing is an implementation of this pattern that allows developers to use expressions to route CloudEvents based on their contents to different URIs/paths and event handlers in your application. If no route matches, then an optional default route is used. This becomes useful as your applications expands to support multiple event versions, or special cases. Routing can be implemented with code; however, keeping routing rules external from the application can improve portability.

This feature is available to both the declarative and programmatic subscription approaches.

Enable message routing

This is a preview feature. To enable it, add the PubSub.Routing feature entry to your application configuration like so:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Configuration
  3. metadata:
  4. name: pubsubroutingconfig
  5. spec:
  6. features:
  7. - name: PubSub.Routing
  8. enabled: true

Learn more about enabling preview features.

Declarative subscription

For declarative subscriptions, you must use dapr.io/v2alpha1 as the apiVersion. Here is an example of subscriptions.yaml using routing.

  1. apiVersion: dapr.io/v2alpha1
  2. kind: Subscription
  3. metadata:
  4. name: myevent-subscription
  5. spec:
  6. pubsubname: pubsub
  7. topic: deathStarStatus
  8. routes:
  9. rules:
  10. - match: event.type == "rebels.attacking.v3"
  11. path: /dsstatus.v3
  12. - match: event.type == "rebels.attacking.v2"
  13. path: /dsstatus.v2
  14. default: /dsstatus
  15. scopes:
  16. - app1
  17. - app2

Programmatic subscription

Alternatively, the programattic approach varies slightly in that the routes structure is returned instead of route. The JSON structure matches the declarative YAML.

  1. import flask
  2. from flask import request, jsonify
  3. from flask_cors import CORS
  4. import json
  5. import sys
  6. app = flask.Flask(__name__)
  7. CORS(app)
  8. @app.route('/dapr/subscribe', methods=['GET'])
  9. def subscribe():
  10. subscriptions = [
  11. {
  12. 'pubsubname': 'pubsub',
  13. 'topic': 'deathStarStatus',
  14. 'routes': {
  15. 'rules': [
  16. {
  17. 'match': 'event.type == "rebels.attacking.v3"',
  18. 'path': '/dsstatus.v3'
  19. },
  20. {
  21. 'match': 'event.type == "rebels.attacking.v2"',
  22. 'path': '/dsstatus.v2'
  23. },
  24. ],
  25. 'default': '/dsstatus'
  26. }
  27. }]
  28. return jsonify(subscriptions)
  29. @app.route('/dsstatus', methods=['POST'])
  30. def ds_subscriber():
  31. print(request.json, flush=True)
  32. return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
  33. app.run()
  1. const express = require('express')
  2. const bodyParser = require('body-parser')
  3. const app = express()
  4. app.use(bodyParser.json({ type: 'application/*+json' }));
  5. const port = 3000
  6. app.get('/dapr/subscribe', (req, res) => {
  7. res.json([
  8. {
  9. pubsubname: "pubsub",
  10. topic: "deathStarStatus",
  11. routes: {
  12. rules: [
  13. {
  14. match: 'event.type == "rebels.attacking.v3"',
  15. path: '/dsstatus.v3'
  16. },
  17. {
  18. match: 'event.type == "rebels.attacking.v2"',
  19. path: '/dsstatus.v2'
  20. },
  21. ],
  22. default: '/dsstatus'
  23. }
  24. }
  25. ]);
  26. })
  27. app.post('/dsstatus', (req, res) => {
  28. console.log(req.body);
  29. res.sendStatus(200);
  30. });
  31. app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "net/http"
  7. "github.com/gorilla/mux"
  8. )
  9. const appPort = 3000
  10. type subscription struct {
  11. PubsubName string `json:"pubsubname"`
  12. Topic string `json:"topic"`
  13. Metadata map[string]string `json:"metadata,omitempty"`
  14. Routes routes `json:"routes"`
  15. }
  16. type routes struct {
  17. Rules []rule `json:"rules,omitempty"`
  18. Default string `json:"default,omitempty"`
  19. }
  20. type rule struct {
  21. Match string `json:"match"`
  22. Path string `json:"path"`
  23. }
  24. // This handles /dapr/subscribe
  25. func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
  26. t := []subscription{
  27. {
  28. PubsubName: "pubsub",
  29. Topic: "deathStarStatus",
  30. Routes: routes{
  31. Rules: []rule{
  32. {
  33. Match: `event.type == "rebels.attacking.v3"`,
  34. Path: "/dsstatus.v3",
  35. },
  36. {
  37. Match: `event.type == "rebels.attacking.v2"`,
  38. Path: "/dsstatus.v2",
  39. },
  40. },
  41. Default: "/dsstatus",
  42. },
  43. },
  44. }
  45. w.WriteHeader(http.StatusOK)
  46. json.NewEncoder(w).Encode(t)
  47. }
  48. func main() {
  49. router := mux.NewRouter().StrictSlash(true)
  50. router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")
  51. log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", appPort), router))
  52. }
  1. <?php
  2. require_once __DIR__.'/vendor/autoload.php';
  3. $app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
  4. new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'deathStarStatus', routes: (
  5. rules: => [
  6. ('match': 'event.type == "rebels.attacking.v3"', path: '/dsstatus.v3'),
  7. ('match': 'event.type == "rebels.attacking.v2"', path: '/dsstatus.v2'),
  8. ]
  9. default: '/dsstatus')),
  10. ]]));
  11. $app->post('/dsstatus', function(
  12. #[\Dapr\Attributes\FromBody]
  13. \Dapr\PubSub\CloudEvent $cloudEvent,
  14. \Psr\Log\LoggerInterface $logger
  15. ) {
  16. $logger->alert('Received event: {event}', ['event' => $cloudEvent]);
  17. return ['status' => 'SUCCESS'];
  18. }
  19. );
  20. $app->start();

In these examples, depending on the type of the event (event.type), the application will be called on /dsstatus.v3, /dsstatus.v2 or /dsstatus. The expressions are written as Common Expression Language (CEL) where event represents the cloud event. Any of the attributes from the CloudEvents core specification can be referenced in the expression. One caveat is that it is only possible to access the attributes inside event.data if it is nested JSON

Next steps

Last modified September 20, 2021 : Merge pull request #1800 from greenie-msft/gRPC_proxying_video (36dff3c)