Outbox Event Router

Example

In order to understand the configuration and terms used in this SMT, let’s look into its parts with the given expected outbox message:

  1. # Kafka Topic: outbox.event.order
  2. # Kafka Message key: "1"
  3. # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
  4. # Kafka Message Timestamp: 1556890294484
  5. {
  6. "eventType": "OrderCreated",
  7. "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
  8. }

This message is generated by transforming a Debezium raw message, which looks like:

  1. # Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f"
  2. # Kafka Message Headers: ""
  3. # Kafka Message Timestamp: 1556890294484
  4. {
  5. "before": null,
  6. "after": {
  7. "id": "406c07f3-26f0-4eea-a50c-109940064b8f",
  8. "aggregateid": "1",
  9. "aggregatetype": "Order",
  10. "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}",
  11. "timestamp": 1556890294344,
  12. "type": "OrderCreated"
  13. },
  14. "source": {
  15. "version": "0.9.3.Final",
  16. "connector": "postgresql",
  17. "name": "dbserver1-bare",
  18. "db": "orderdb",
  19. "ts_usec": 1556890294448870,
  20. "txId": 584,
  21. "lsn": 24064704,
  22. "schema": "inventory",
  23. "table": "outboxevent",
  24. "snapshot": false,
  25. "last_snapshot_record": null,
  26. "xmin": null
  27. },
  28. "op": "c",
  29. "ts_ms": 1556890294484
  30. }

This result was achieved with the default configuration which assumes a table structure and event routing based on aggregates. In case you want a custom behavior, the SMT is fully configurable, check the available configuration options.

Configuration

Configuration options

PropertyDefaultGroupDescription

table.field.event.id

id

Table

The column which contains the event ID within the outbox table

table.field.event.key

aggregateid

Table

The column which contains the event key within the outbox table; when this is set the value of this column will be used as a Kafka message key

table.field.event.type

type

Table

The column which contains the event type within the outbox table

table.field.event.timestamp

Table

Optionally you can override the Kafka message timestamp with a value from a chosen field, otherwise it’ll be the Debezium event processed timestamp.

table.field.event.payload

payload

Table

The column which contains the event payload within the outbox table

table.field.event.payload.id

aggregateid

Table

The column which contains the payload ID within the outbox table

table.fields.additional.placement

Table, Envelope

Extra fields can be added as part of the event envelope or as message header; the format is a list of colon-delimited pairs or trios when you desire to have aliases, e.g. id:header,field_name:envelope:alias. Placement options are header and envelope.
It is possible to configure eventType field among additional fields. In such case the field could be placed either in envelope or among other headers.

table.field.event.schema.version

Table, Schema

When set, it’ll be used as schema version as in the Kafka Connect Schema javadoc

route.by.field

aggregatetype

Router

The column which determines how the events will be routed, the value will become part of the topic name

route.topic.regex

(?<routedByValue>.*)

Router

The default regex to use within the RegexRouter, the default capture will allow to replace the routed field into a new topic name defined in route.topic.replacement

route.topic.replacement

outbox.event​.${routedByValue}

Router

The name of the topic in which the events will be routed, a replacement ${routedByValue} is available which is the value of The column configured via route.by.field

route.tombstone.on.empty.payload

false

Router

Whether or not an empty or null payload should cause a tombstone event.

debezium.op.invalid.behavior

warn

Debezium

While Debezium is monitoring the table, it’s not expecting to see ‘update’ row events, in case it happens, this transform can log it as warning, error or stop the process. Options are warn, error and fatal

Default table columns

  1. Column | Type | Modifiers
  2. --------------+------------------------+-----------
  3. id | uuid | not null
  4. aggregatetype | character varying(255) | not null
  5. aggregateid | character varying(255) | not null
  6. type | character varying(255) | not null
  7. payload | jsonb |

Default configuration values explained

After observing all those pieces we can see what the default configuration does:

Table ColumnEffect

id

The id shows up as a header in the Kafka message, this is the unique ID of the event, can be used for consumer side dedupe for instance.

aggregatetype

Is the default field for the routing, it gets append to the topic name (check configuration route.topic.replacement)

aggregateid

Becomes the Kafka message key, which is important for keeping ordering within Kafka partitions

type

The type column becomes either part of the message envelope as eventType or message header.
See option table.fields.additional.placement for more details.

payload

The JSON representation of the event itself, becomes either part of the message as payload or if other metadata including eventType are delivered as headers then the payload becomes the message itself without an encapsulation in an envelope

Basic configuration

  1. transforms=outbox,...
  2. transforms.outbox.type=io.debezium.transforms.outbox.EventRouter

Using Avro as the payload format

The outbox routing SMT supports arbitrary payload formats, as the payload column value is passed on transparently. As an alternative to working with JSON as shown above it’s therefore also possible to use Avro. This can be beneficial for the purposes of message format governance and making sure outbox event schemas evolve in a backwards-compatible way.

How a source application produces Avro messages as an outbox event payload is out of the scope of this documentation. One possibility could be to leverage the KafkaAvroSerializer class and use it to serialize GenericRecord instances. In order to ensure that the Kafka message value is the exact Avro binary data, apply the following configuration to the connector:

  1. transforms=outbox,...
  2. transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
  3. transforms.outbox.table.fields.additional.placement=type:header:eventType
  4. value.converter=io.debezium.converters.ByteBufferConverter

This moves the eventType value into a Kafka message header, leaving only the payload column value (the Avro data) as the sole message value. Using ByteBufferConverter as the value converter will propagate that value as-is into the Kafka message value.