Debezium Connector for MongoDB

Overview

MongoDB’s replication mechanism provides redundancy and high availability, and is the preferred way to run MongoDB in production. MongoDB connector captures the changes in a replica set or sharded cluster.

A MongoDB replica set consists of a set of servers that all have copies of the same data, and replication ensures that all changes made by clients to documents on the replica set’s primary are correctly applied to the other replica set’s servers, called secondaries. MongoDB replication works by having the primary record the changes in its oplog (or operation log), and then each of the secondaries reads the primary’s oplog and applies in order all of the operations to their own documents. When a new server is added to a replica set, that server first performs an initial sync of all of the databases and collections on the primary, and then reads the primary’s oplog to apply all changes that might have been made since it began the initial sync. This new server becomes a secondary (and able to handle queries) when it catches up to the tail of the primary’s oplog.

The MongoDB connector uses this same replication mechanism, though it does not actually become a member of the replica set. Just like MongoDB secondaries, however, the connector always reads the oplog of the replica set’s primary. And, when the connector sees a replica set for the first time, it looks at the oplog to get the last recorded transaction and then performs an intial sync of the primary’s databases and collections. When all the data is copied, the connector then starts reading the oplog from the position it read earlier. Operations in the MongoDB oplog are idempotent, so no matter how many times the operations are applied, they result in the same end state.

As the MongoDB connector processes the oplog, it periodically records the position in the oplog where the event originated. When the MongoDB connector stops, it records the last oplog position that it processed, so that upon restart it simply begins reading the oplog from that position. In other words, the connector can be stopped, upgraded or maintained, and restarted some time later, and it will pick up exactly where it left off without losing a single event. Of course, MongoDB’s oplogs are usually capped at a maximum size, which means that the connector should not be stopped for too long, or else some of the operations in the oplog might be purged before the connector has a chance to read them. In this case, upon restart the connector will detect the missing oplog operations, perform an initial sync, and then proceed to tail the oplog.

The MongoDB connector is also quite tolerant of changes in membership and leadership of the replica sets, of additions or removals of shards within a sharded cluster, and network problems that might cause communication failures. The connector always uses the replica set’s primary node to tail the oplog, so when the replica set undergoes an election and a different node becomes primary, the connector will immediately stop tailing the oplog, connect to the new primary, and start tailing the oplog using the new primary node. Likewise, if connector experiences any problems communicating with the replica set primary, it will try to reconnect (using exponential backoff so as to not overwhelm the network or replica set) and continue tailing the oplog from where it last left off. In this way the connector is able to dynamically adjust to changes in replica set membership and to automatically handle communication failures.

Additional resources

Setting up MongoDB

The MongoDB connector uses MongoDB’s oplog to capture the changes, so the connector works only with MongoDB replica sets or with sharded clusters where each shard is a separate replica set. See the MongoDB documentation for setting up a replica set or sharded cluster. Also, be sure to understand how to enable access control and authentication with replica sets.

You must also have a MongoDB user that has the appropriate roles to read the admin database where the oplog can be read. Additionally, the user must also be able to read the config database in the configuration server of a sharded cluster.

Supported MongoDB topologies

The MongoDB connector can be used with a variety of MongoDB topologies.

MongoDB replica set

The MongoDB connector can capture changes from a single MongoDB replica set. Production replica sets require a minimum of at least three members.

To use the MongoDB connector with a replica set, provide the addresses of one or more replica set servers as seed addresses through the connector’s mongodb.hosts property. The connector will use these seeds to connect to the replica set, and then once connected will get from the replica set the complete set of members and which member is primary. The connector will start a task to connect to the primary and capture the changes from the primary’s oplog. When the replica set elects a new primary, the task will automatically switch over to the new primary.

When MongoDB is fronted by a proxy (such as with Docker on OS X or Windows), then when a client connects to the replica set and discovers the members, the MongoDB client will exclude the proxy as a valid member and will attempt and fail to connect directly to the members rather than go through the proxy.

In such a case, set the connector’s optional mongodb.members.auto.discover configuration property to false to instruct the connector to forgo membership discovery and instead simply use the first seed address (specified via the mongodb.hosts property) as the primary node. This may work, but still make cause issues when election occurs.

MongoDB sharded cluster

A MongoDB sharded cluster consists of:

  • One or more shards, each deployed as a replica set;

  • A separate replica set that acts as the cluster’s configuration server

  • One or more routers (also called mongos) to which clients connect and that routes requests to the appropriate shards

To use the MongoDB connector with a sharded cluster, configure the connector with the host addresses of the configuration server replica set. When the connector connects to this replica set, it discovers that it is acting as the configuration server for a sharded cluster, discovers the information about each replica set used as a shard in the cluster, and will then start up a separate task to capture the changes from each replica set. If new shards are added to the cluster or existing shards removed, the connector will automatically adjust its tasks accordingly.

MongoDB standalone server

The MongoDB connector is not capable of monitoring the changes of a standalone MongoDB server, since standalone servers do not have an oplog. The connector will work if the standalone server is converted to a replica set with one member.

MongoDB does not recommend running a standalone server in production.

How the MongoDB connector works

When a MongoDB connector is configured and deployed, it starts by connecting to the MongoDB servers at the seed addresses, and determines the details about each of the available replica sets. Since each replica set has its own independent oplog, the connector will try to use a separate task for each replica set. The connector can limit the maximum number of tasks it will use, and if not enough tasks are available the connector will assign multiple replica sets to each task, although the task will still use a separate thread for each replica set.

When running the connector against a sharded cluster, use a value of tasks.max that is greater than the number of replica sets. This will allow the connector to create one task for each replica set, and will let Kafka Connect coordinate, distribute, and manage the tasks across all of the available worker processes.

Logical connector name

The connector configuration property mongodb.name serves as a logical name for the MongoDB replica set or sharded cluster. The connector uses the logical name in a number of ways: as the prefix for all topic names, and as a unique identifier when recording the oplog position of each replica set.

You should give each MongoDB connector a unique logical name that meaningfully describes the source MongoDB system. We recommend logical names begin with an alphabetic or underscore character, and remaining characters that are alphanumeric or underscore.

Initial sync

When a task starts up using a replica set, it uses the connector’s logical name and the replica set name to find an offset that describes the position in the replica sets oplog where the connector previously stopped reading. If an offset can be found and it is still in the oplog, then the task immediately proceeds with tailing the oplog, starting at the recorded offset position.

However, if no offset is found or if the oplog no longer contains that position, the task must first obtain the current state of the replica set contents by performing an initial sync. This process starts by recording the current position of the oplog and recording that as the offset (along with a flag that denotes an initial sync has been started). The task will then proceed to copy each collection, spawning as many threads as possible (up to the value of the initial.sync.max.threads configuration property) to perform this work in parallel. The connector will record a separate read event for each document it sees, and that read event will contain the object’s identifier, the complete state of the object, and source information about the MongoDB replica set where the object was found. The source information will also include a flag that denotes the event was produced during an initial sync.

This initial sync will continue until it has copied all collections that match the connector’s filters. If the connector is stopped before the tasks’ initial syncs are completed, upon restart the connector begins the initial sync again.

Try to avoid task reassignment and reconfiguration while the connector is performing an intial sync of any replica sets. The connector does log messages with the progress of the initial sync. For utmost control, run a separate cluster of Kafka Connect for each connector.

Tailing the oplog

Once the connector task for a replica set has an offset, it uses the offset to determine the position in the oplog where it should start reading. The task will then connect to the replica set’s primary node and start reading the oplog from that position, processing all of the create, insert, and delete operations and converting them into Debezium change events. Each change event includes the position in the oplog where the operation was found, and the connector periodically records this as its most recent offset. (The interval at which the offset is recorded is governed by the offset.flush.interval.ms Kafka Connect worker configuration property.)

When the connector is stopped gracefully, the last offset processed is recorded so that, upon restart, the connector will continue exactly where it left off. If the connector’s tasks terminate unexpectedly, however, then the tasks may have processed and generated events after it last records the offset but before the last offset is recorded; upon restart, the connector begins at the last recorded offset, possibly generating some the same events that were previously generated just prior to the crash.

When everything is operating nominally, Kafka consumers will actually see every message exactly once. However, when things go wrong Kafka can only guarantee consumers will see every message at least once. Therefore, your consumers need to anticipate seeing messages more than once.

As mentioned above, the connector tasks always use the replica set’s primary node to tail the oplog, ensuring that the connector sees the most up-to-date operations as possible and can capture the changes with lower latency than if secondaries were to be used instead. When the replica set elects a new primary, the connector will immediately stop tailing the oplog, connect to the new primary, and start tailing the new primary’s oplog start at the same position. Likewise, if connector experiences any problems communicating with the replica set members, it will try to reconnect (using exponential backoff so as to not overwhelm the replica set) and once connected continue tailing the oplog from where it last left off. In this way the connector is able to dynamically adjust to changes in replica set membership and to automatically handle communication failures.

The bottom line is that the MongoDB connector will continue running under most situations, though communication problems may cause the connector to wait until the problems are resolved.

Topics names

The MongoDB connector writes events for all insert, update, and delete operations to documents in each collection to a single Kafka topic. The name of the Kafka topics always takes the form logicalName.databaseName.collectionName, where logicalName is the logical name of the connector as specified with the mongodb.name configuration property, databaseName is the name of the database where the operation occurred, and collectionName is the name of the MongoDB collection in which the affected document existed.

For example, consider a MongoDB replica set with an inventory database that contains four collections: products, products_on_hand, customers, and orders. If the connector monitoring this database were given a logical name of fulfillment, then the connector would produce events on these four Kafka topics:

  • fulfillment.inventory.products

  • fulfillment.inventory.products_on_hand

  • fulfillment.inventory.customers

  • fulfillment.inventory.orders

Notice that the topic names do not incorporate the replica set name or shard name. As a result, all changes to a sharded collection (where each shard contains a subset of the collection’s documents) all go to the same Kafka topic.

You can set up Kafka to auto-create the topics as they are needed. If not, then you must use Kafka administration tools to create the topics before starting the connector.

Partitions

The MongoDB connector does not make any explicit determination of the topic partitions for events. Instead, it allows Kafka to determine the partition based upon the key. You can change Kafka’s partitioning logic by defining in the Kafka Connect worker configuration the name of the Partitioner implementation.

Be aware that Kafka only maintains total order for events written to a single topic partition. Partitioning the events by key does mean that all events with the same key will always go to the same partition, ensuring that all events for a specific document are always totally ordered.

Events

All data change events produced by the MongoDB connector have a key and a value.

Starting with Kafka 0.10, Kafka can optionally record with the message key and value the timestamp at which the message was created (recorded by the producer) or written to the log by Kafka.

Debezium and Kafka Connect are designed around continuous streams of event messages, and the structure of these events could potentially change over time if the source of those events changed in structure or if the connector is improved or changed. This could be difficult for consumers to deal with, so to make it very easy Kafka Connect makes each event self-contained. Every message key and value has two parts: a schema and payload. The schema describes the structure of the payload, while the payload contains the actual data.

Change event’s key

For a given collection, the change event’s key contains a single id field. Its value is the document’s identifier represented as string which is derived from the MongoDB extended JSON serialization in strict mode. Consider a connector with a logical name of fulfillment, a replica set containing an inventory database with a customers collection containing documents such as:

  1. {
  2. "_id": 1004,
  3. "first_name": "Anne",
  4. "last_name": "Kretchmar",
  5. "email": "annek@noanswer.org"
  6. }

Every change event for the customers collection will feature the same key structure, which in JSON looks like this:

  1. {
  2. "schema": {
  3. "type": "struct",
  4. "name": "fulfillment.inventory.customers.Key"
  5. "optional": false,
  6. "fields": [
  7. {
  8. "field": "id",
  9. "type": "string",
  10. "optional": false
  11. }
  12. ]
  13. },
  14. "payload": {
  15. "id": "1004"
  16. }
  17. }

The schema portion of the key contains a Kafka Connect schema describing what is in the payload portion, and in our case that means that the payload value is not optional, is a structure defined by a schema named fulfillment.inventory.customers.Key, and has one required field named id of type string. If we look at the value of the key’s payload field, we’ll see that it is indeed a structure (which in JSON is just an object) with a single id field, whose value is a string containing the integer 1004.

This example used a document with an integer identifier, but any valid MongoDB document identifier (including documents) will work. The value of the id field in the payload will simply be a string representing a MongoDB extended JSON serialization (strict mode) of the original document’s _id field. Find below a few examples showing how _id fields of different types will get encoded as the event key’s payload:

TypeMongoDB _id ValueKey’s payload

Integer

1234

{ “id” : “1234” }

Float

12.34

{ “id” : “12.34” }

String

“1234”

{ “id” : “\”1234\”” }

Document

{ “hi” : “kafka”, “nums” : [10.0, 100.0, 1000.0] }

{ “id” : “{\”hi\” : \”kafka\”, \”nums\” : [10.0, 100.0, 1000.0]}” }

ObjectId

ObjectId(“596e275826f08b2730779e1f”)

{ “id” : “{\”$oid\” : \”596e275826f08b2730779e1f\”}” }

Binary

BinData(“a2Fma2E=”,0)

{ “id” : “{\”$binary\” : \”a2Fma2E=\”, \”$type\” : \”00\”}” }

The MongoDB connector ensures that all Kafka Connect schema names are valid Avro schema names. This means that the logical server name must start with Latin letters or an underscore (e.g., [a-z,A-Z,]), and the remaining characters in the logical server name and all characters in the database and collections names must be Latin letters, digits, or an underscore (e.g., [a-z,A-Z,0-9,\]). If not, then all invalid characters will automatically be replaced with an underscore character.

This can lead to unexpected conflicts in schemas names when the logical server name, database names, and table names contain other characters, and the only distinguishing characters between table full names are invalid and thus replaced with underscores. The connector attempts to produce an exception in this such cases, but only when the conflicts exist between schemas used within a single connector.

Change event’s value

The value of the change event message is a bit more complicated. Like the key message, it has a schema section and payload section. The payload section of every change event value produced by the MongoDB connector has an envelope structure with the following fields:

  • op is a mandatory field that contains a string value describing the type of operation. Values for the MongoDB connector are c for create (or insert), u for update, d for delete, and r for read (in the case of a initial sync).

  • after is an optional field that if present contains the state of the document after the event occurred. MongoDB’s oplog entries only contain the full state of a document for create events, so these are the only events that contain an after field.

  • source is a mandatory field that contains a structure describing the source metadata for the event, which in the case of MongoDB contains several fields: the Debezium version, the logical name, the replica set’s name, the namespace of the collection, the MongoDB timestamp (and ordinal of the event within the timestamp) at which the event occurred, the identifier of the MongoDB operation (e.g., the h field in the oplog event), and the initial sync flag if the event resulted during an intial sync.

  • ts_ms is optional and if present contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.

And of course, the schema portion of the event message’s value contains a schema that describes this envelope structure and the nested fields within it.

Let’s look at what a create/read event value might look like for our customers table:

  1. {
  2. "schema": {
  3. "type": "struct",
  4. "fields": [
  5. {
  6. "type": "string",
  7. "optional": true,
  8. "name": "io.debezium.data.Json",
  9. "version": 1,
  10. "field": "after"
  11. },
  12. {
  13. "type": "string",
  14. "optional": true,
  15. "name": "io.debezium.data.Json",
  16. "version": 1,
  17. "field": "patch"
  18. },
  19. {
  20. "type": "struct",
  21. "fields": [
  22. {
  23. "type": "string",
  24. "optional": false,
  25. "field": "version"
  26. },
  27. {
  28. "type": "string",
  29. "optional": false,
  30. "field": "connector"
  31. },
  32. {
  33. "type": "string",
  34. "optional": false,
  35. "field": "name"
  36. },
  37. {
  38. "type": "int64",
  39. "optional": false,
  40. "field": "ts_ms"
  41. },
  42. {
  43. "type": "boolean",
  44. "optional": true,
  45. "default": false,
  46. "field": "snapshot"
  47. },
  48. {
  49. "type": "string",
  50. "optional": false,
  51. "field": "db"
  52. },
  53. {
  54. "type": "string",
  55. "optional": false,
  56. "field": "rs"
  57. },
  58. {
  59. "type": "string",
  60. "optional": false,
  61. "field": "collection"
  62. },
  63. {
  64. "type": "int32",
  65. "optional": false,
  66. "field": "ord"
  67. },
  68. {
  69. "type": "int64",
  70. "optional": true,
  71. "field": "h"
  72. }
  73. ],
  74. "optional": false,
  75. "name": "io.debezium.connector.mongo.Source",
  76. "field": "source"
  77. },
  78. {
  79. "type": "string",
  80. "optional": true,
  81. "field": "op"
  82. },
  83. {
  84. "type": "int64",
  85. "optional": true,
  86. "field": "ts_ms"
  87. }
  88. ],
  89. "optional": false,
  90. "name": "dbserver1.inventory.customers.Envelope"
  91. },
  92. "payload": {
  93. "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}",
  94. "patch": null,
  95. "source": {
  96. "version": "1.0.3.Final",
  97. "connector": "mongodb",
  98. "name": "fulfillment",
  99. "ts_ms": 1558965508000,
  100. "snapshot": true,
  101. "db": "inventory",
  102. "rs": "rs0",
  103. "collection": "customers",
  104. "ord": 31,
  105. "h": 1546547425148721999
  106. },
  107. "op": "r",
  108. "ts_ms": 1558965515240
  109. }
  110. }

If we look at the schema portion of this event’s value, we can see the schema for the envelope is specific to the collection, and the schema for the source structure (which is specific to the MongoDB connector and reused across all events). Also note that the after value is always a string, and that by convention it will contain a JSON representation of the document.

If we look at the payload portion of this event’s value, we can see the information in the event, namely that it is describing that the document was read as part of an initial sync (since op=r and initsync=true), and that the after field value contains the JSON string representation of the document.

It may appear that the JSON representations of the events are much larger than the rows they describe. This is true, because the JSON representation must include the schema and the payload portions of the message. It is possible and even recommended to use the Avro Converter to dramatically decrease the size of the actual messages written to the Kafka topics.

The value of an update change event on this collection will actually have the exact same schema, and its payload is structured the same but will hold different values. Specifically, an update event will not have an after value and will instead have a patch string containing the JSON representation of the idempotent update operation. Here’s an example:

  1. {
  2. "schema": { ... },
  3. "payload": {
  4. "op": "u",
  5. "ts_ms": 1465491461815,
  6. "patch": "{\"$set\":{\"first_name\":\"Anne Marie\"}}",
  7. "source": {
  8. "version": "1.0.3.Final",
  9. "connector": "mongodb",
  10. "name": "fulfillment",
  11. "ts_ms": 1558965508000,
  12. "snapshot": true,
  13. "db": "inventory",
  14. "rs": "rs0",
  15. "collection": "customers",
  16. "ord": 6,
  17. "h": 1546547425148721999
  18. }
  19. }
  20. }

When we compare this to the value in the insert event, we see a couple of differences in the payload section:

  • The op field value is now u, signifying that this document changed because of an update

  • The patch field appears and has the stringified JSON representation of the actual MongoDB idempotent change to the document, which in this example involves setting the first_name field to a new value

  • The after field no longer appears

  • The source field structure has the same fields as before, but the values are different since this event is from a different position in the oplog

  • The ts_ms shows the timestamp that Debezium processed this event

The content of the patch field is provided by MongoDB itself and its exact format depends on the specific database version. You should therefore be prepared for potential changes to the format when upgrading the MongoDB instance to a new version.

All examples in this document were obtained from MongoDB 3.4 and might differ if you use a different one.

Update events in MongoDB’s oplog don’t have the before or after states of the changed document, so there’s no way for the connector to provide this information. However, because create or read events do contain the starting state, downstream consumers of the stream can actually fully-reconstruct the state by keeping the latest state for each document and applying each event to that state. Debezium connector’s are not able to keep such state, so it is not able to do this.

So far we’ve seen samples of create/read and update events. Now, let’s look at the value of a delete event for the same table. The value of an delete event on this collection will also have the exact same schema, and its payload is structured the same but will hold different values. In particular, a delete event will not have an after value or a patch value:

  1. {
  2. "schema": { ... },
  3. "payload": {
  4. "op": "d",
  5. "ts_ms": 1465495462115,
  6. "source": {
  7. "version": "1.0.3.Final",
  8. "connector": "mongodb",
  9. "name": "fulfillment",
  10. "ts_ms": 1558965508000,
  11. "snapshot": true,
  12. "db": "inventory",
  13. "rs": "rs0",
  14. "collection": "customers",
  15. "ord": 6,
  16. "h": 1546547425148721999
  17. }
  18. }
  19. }

When we compare this to the value in the other events, we see a couple of differences in the payload section:

  • The op field value is now d, signifying that this document was deleted

  • The patch field does not appear

  • The after field does not appear

  • The source field structure has the same fields as before, but the values are different since this event is from a different position in the oplog

  • The ts_ms shows the timestamp that Debezium processed this event

The MongoDB connector actually provides one other kind of event. Each delete event is followed by a tombstone event that has the same key but a null value, giving Kafka enough information to know that its Kafka log compaction mechanism can remove all messages with that key.

All MongoDB connector events are designed to work with Kafka log compaction, which allows for the removal of older messages as long as at least the most recent message for every key is kept. This is how Kafka can reclaim storage space while ensuring the topic contains a complete dataset and can be used for reloading key-based state.

All MongoDB connector events for a uniquely identified document will have exactly the same key, signaling to Kafka that only the latest event be kept. And, a tombstone event informs Kafka that all messages with that same key can be removed.

Deploying the MongoDB connector

If you’ve already installed Zookeeper, Kafka, and Kafka Connect, then using Debezium’s MongoDB connector is easy. Simply download the connector’s plugin archive, extract the JARs into your Kafka Connect environment, and add the directory with the JARs to Kafka Connect’s plugin path using the plugin.path configuration property. Restart your Kafka Connect process to pick up the new JARs.

If immutable containers are your thing, then check out Debezium’s Docker images for Zookeeper, Kafka, and Kafka Connect with the MongoDB connector already pre-installed and ready to go. Our tutorial even walks you through using these images, and this is a great way to learn what Debezium is all about. You can even run Debezium on Kubernetes and OpenShift.

Example configuration

To use the connector to produce change events for a particular MongoDB replica set or sharded cluster, create a configuration file in JSON. When the connector starts, it will perform an initial sync of the collections in your MongoDB replica sets and start reading the replica sets’ oplogs, producing events for every inserted, updated, and deleted row. Optionally filter out collections that are not needed.

Following is an example of the configuration for a MongoDB connector that monitors a MongoDB replica set rs0 at port 27017 on 192.168.99.100, which we logically name fullfillment. Typically, you configure the Debezium MongoDB connector in a .json file using the configuration properties available for the connector.

  1. {
  2. "name": "inventory-connector", (1)
  3. "config": {
  4. "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", (2)
  5. "mongodb.hosts": "rs0/192.168.99.100:27017", (3)
  6. "mongodb.name": "fullfillment", (4)
  7. "collection.whitelist": "inventory[.]*", (5)
  8. }
  9. }
1The name of our connector when we register it with a Kafka Connect service.
2The name of the MongoDB connector class.
3The host addresses to use to connect to the MongoDB replica set.
4The logical name of the MongoDB replica set, which forms a namespace for generated events and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.
5A list of regular expressions that match the collection namespaces (for example, <dbName>.<collectionName>) of all collections to be monitored. This is optional.

See the complete list of connector properties that can be specified in these configurations.

This configuration can be sent via POST to a running Kafka Connect service, which will then record the configuration and start up the one connector task that will connect to the MongoDB replica set or sharded cluster, assign tasks for each replica set, perform an initial sync if necessary, read the oplog, and record events to Kafka topics.

Connector properties

The following configuration properties are required unless a default value is available.

PropertyDefaultDescription

name

Unique name for the connector. Attempting to register again with the same name will fail. (This property is required by all Kafka Connect connectors.)

connector.class

The name of the Java class for the connector. Always use a value of io.debezium.connector.mongodb.MongoDbConnector for the MongoDB connector.

mongodb.hosts

The comma-separated list of hostname and port pairs (in the form ‘host’ or ‘host:port’) of the MongoDB servers in the replica set. The list can contain a single hostname and port pair. If mongodb.members.auto.discover is set to false, then the host and port pair should be prefixed with the replica set name (e.g., rs0/localhost:27017).

mongodb.name

A unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. Only alphanumeric characters and underscores should be used.

mongodb.user

Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.

mongodb.password

Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.

mongodb.ssl.enabled

false

Connector will use SSL to connect to MongoDB instances.

mongodb.ssl.invalid.hostname.allowed

false

When SSL is enabled this setting controls whether strict hostname checking is disabled during connection phase. If true the connection will not prevent man-in-the-middle attacks.

database.whitelist

empty string

An optional comma-separated list of regular expressions that match database names to be monitored; any database name not included in the whitelist is excluded from monitoring. By default all databases is monitored. May not be used with database.blacklist.

database.blacklist

empty string

An optional comma-separated list of regular expressions that match database names to be excluded from monitoring; any database name not included in the blacklist is monitored. May not be used with database.whitelist.

collection.whitelist

empty string

An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be monitored; any collection not included in the whitelist is excluded from monitoring. Each identifier is of the form databaseName.collectionName. By default the connector will monitor all collections except those in the local and admin databases. May not be used with collection.blacklist.

collection.blacklist

empty string

An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring; any collection not included in the blacklist is monitored. Each identifier is of the form databaseName.collectionName. May not be used with collection.whitelist.

snapshot.mode

initial

Specifies the criteria for running a snapshot (eg. initial sync) upon startup of the connector. The default is initial, and specifies the connector reads a snapshot when either no offset is found or if the oplog no longer contains the previous offset. The never option specifies that the connector should never use snapshots, instead the connector should proceed to tail the log.

field.blacklist

empty string

An optional comma-separated list of the fully-qualified names of fields that should be excluded from change event message values. Fully-qualified names for fields are of the form databaseName.collectionName.fieldName.nestedFieldName, where databaseName and collectionName may contain the wildcard () which matches any characters.

field.renames

empty string

An optional comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change event message values. Fully-qualified replacements for fields are of the form databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName, where databaseName and collectionName may contain the wildcard () which matches any characters, the colon character (:) is used to determine rename mapping of field. The next field replacement is applied to the result of the previous field replacement in the list, so keep this in mind when renaming multiple fields that are in the same path.

tasks.max

1

The maximum number of tasks that should be created for this connector. The MongoDB connector will attempt to use a separate task for each replica set, so the default is acceptable when using the connector with a single MongoDB replica set. When using the connector with a MongoDB sharded cluster, we recommend specifying a value that is equal to or more than the number of shards in the cluster, so that the work for each replica set can be distributed by Kafka Connect.

initial.sync.max.threads

1

Positive integer value that specifies the maximum number of threads used to perform an intial sync of the collections in a replica set. Defaults to 1.

tombstones.on.delete

true

Controls whether a tombstone event should be generated after a delete event.
When true the delete operations are represented by a delete event and a subsequent tombstone event. When false only a delete event is sent.
Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.

snapshot.delay.ms

An interval in milli-seconds that the connector should wait before taking a snapshot after starting up;
Can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which may cause re-balancing of connectors.

snapshot.fetch.size

0

Specifies the maximum number of documents that should be read in one go from each collection while taking a snapshot. The connector will read the collection contents in multiple batches of this size.
Defaults to 0, which indicates that the server chooses an appropriate fetch size.

The following advanced configuration properties have good defaults that will work in most situations and therefore rarely need to be specified in the connector’s configuration.

PropertyDefaultDescription

max.queue.size

8192

Positive integer value that specifies the maximum size of the blocking queue into which change events read from the database log are placed before they are written to Kafka. This queue can provide backpressure to the oplog reader when, for example, writes to Kafka are slower or if Kafka is not available. Events that appear in the queue are not included in the offsets periodically recorded by this connector. Defaults to 8192, and should always be larger than the maximum batch size specified in the max.batch.size property.

max.batch.size

2048

Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048.

poll.interval.ms

1000

Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 1000 milliseconds, or 1 second.

connect.backoff.initial.delay.ms

1000

Positive integer value that specifies the initial delay when trying to reconnect to a primary after the first failed connection attempt or when no primary is available. Defaults to 1 second (1000 ms).

connect.backoff.max.delay.ms

1000

Positive integer value that specifies the maximum delay when trying to reconnect to a primary after repeated failed connection attempts or when no primary is available. Defaults to 120 seconds (120,000 ms).

connect.max.attempts

16

Positive integer value that specifies the maximum number of failed connection attempts to a replica set primary before an exception occurs and task is aborted. Defaults to 16, which with the defaults for connect.backoff.initial.delay.ms and connect.backoff.max.delay.ms results in just over 20 minutes of attempts before failing.

mongodb.members.auto.discover

true

Boolean value that specifies whether the addresses in ‘mongodb.hosts’ are seeds that should be used to discover all members of the cluster or replica set (true), or whether the address(es) in mongodb.hosts should be used as is (false). The default is true and should be used in all cases except where MongoDB is fronted by a proxy.

source.struct.version

v2

Schema version for the source block in CDC events. Debezium 0.10 introduced a few breaking
changes to the structure of the source block in order to unify the exposed structure across all the connectors.
By setting this option to v1 the structure used in earlier versions can be produced. Note that this setting is not recommended and is planned for removal in a future Debezium version.

heartbeat.interval.ms

0

Controls how frequently heartbeat messages are sent.
This property contains an interval in milli-seconds that defines how frequently the connector sends messages into a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should leverage heartbeat messages in cases where only records in non-captured collections are changed for a longer period of time. In such situation the connector would proceed to read the oplog from the database but never emit any change messages into Kafka, which in turn means that no offset updates are committed to Kafka. This will cause the oplog files to be rotated out but connector will not notice it so on restart some events are no longer available which leads to the need of re-execution of the initial snapshot.

Set this parameter to 0 to not send heartbeat messages at all.
Disabled by default.

heartbeat.topics.prefix

__debezium-heartbeat

Controls the naming of the topic to which heartbeat messages are sent.
The topic is named according to the pattern <heartbeat.topics.prefix>.<server.name>.

sanitize.field.names

true when connector configuration explicitly specifies the key.converter or value.converter parameters to use Avro, otherwise defaults to false.

Whether field names are sanitized to adhere to Avro naming requirements. See Avro naming for more details.

MongoDB connector common issues

Debezium is a distributed system that captures all changes in multiple upstream databases, and will never miss or lose an event. Of course, when the system is operating nominally or being administered carefully, then Debezium provides exactly once delivery of every change event. However, if a fault does happen then the system will still not lose any events, although while it is recovering from the fault it may repeat some change events. Thus, in these abnormal situations Debezium (like Kafka) provides at least once delivery of change events.

The rest of this section describes how Debezium handles various kinds of faults and problems.

Configuration and startup errors

The connector will fail upon startup, report an error/exception in the log, and stop running when the connector’s configuration is invalid, or when the connector repeatedly fails to connect to MongoDB using the specified connectivity parameters. Reconnection is done using exponential backoff, and the maximum number of attempts is configurable.

In these cases, the error will have more details about the problem and possibly a suggested work around. The connector can be restarted when the configuration has been corrected or the MongoDB problem has been addressed.

MongoDB becomes unavailable

Once the connector is running, if the primary node of any of the MongoDB replica sets become unavailable or unreachable, the connector will repeatedly attempt to reconnect to the primary node, using exponential backoff to prevent saturating the network or servers. If the primary remains unavailable after the configurable number of connection attempts, the connector will fail.

The attempts to reconnect are controlled by three properties:

  • connect.backoff.initial.delay.ms - The delay before attempting to reconnect for the first time, with a default of 1 second (1000 milliseconds).

  • connect.backoff.max.delay.ms - The maximum delay before attempting to reconnect, with a default of 120 seconds (120,000 milliseconds).

  • connect.max.attempts - The maximum number of attempts before an error is produced, with a default of 16.

Each delay is double that of the prior delay, up to the maximum delay. Given the default values, the following table shows the delay for each failed connection attempt and the total accumulated time before failure.

Reconnection attempt numberDelay before attempt, in secondsTotal delay before attempt, in minutes and seconds

1

1

00:01

2

2

00:03

3

4

00:07

4

8

00:15

5

16

00:31

6

32

01:03

7

64

02:07

8

120

04:07

9

120

06:07

10

120

08:07

11

120

10:07

12

120

12:07

13

120

14:07

14

120

16:07

15

120

18:07

16

120

20:07

Kafka Connect process stops gracefully

If Kafka Connect is being run in distributed mode, and a Kafka Connect process is stopped gracefully, then prior to shutdown of that processes Kafka Connect will migrate all of the process’ connector tasks to another Kafka Connect process in that group, and the new connector tasks will pick up exactly where the prior tasks left off. There is a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.

If the group contains only one process and that process is stopped gracefully, then Kafka Connect will stop the connector and record the last offset for each replica set. Upon restart, the replica set tasks will continue exactly where they left off.

Kafka Connect process crashes

If the Kafka Connector process stops unexpectedly, then any connector tasks it was running will terminate without recording their most recently-processed offsets. When Kafka Connect is being run in distributed mode, it will restart those connector tasks on other processes. However, the MongoDB connectors will resume from the last offset recorded by the earlier processes, which means that the new replacement tasks may generate some of the same change events that were processed just prior to the crash. The number of duplicate events depends on the offset flush period and the volume of data changes just before the crash.

Because there is a chance that some events may be duplicated during a recovery from failure, consumers should always anticipate some events may be duplicated. Debezium changes are idempotent, so a sequence of events always results in the same state.

Debezium also includes with each change event message the source-specific information about the origin of the event, including the MongoDB event’s unique transaction identifier (h) and timestamp (sec and ord). Consumers can keep track of other of these values to know whether it has already seen a particular event.

Kafka becomes unavailable

As the connector generates change events, the Kafka Connect framework records those events in Kafka using the Kafka producer API. Kafka Connect will also periodically record the latest offset that appears in those change events, at a frequency you’ve specified in the Kafka Connect worker configuration. If the Kafka brokers become unavailable, the Kafka Connect worker process running the connectors will simply repeatedly attempt to reconnect to the Kafka brokers. In other words, the connector tasks will simply pause until a connection can be reestablished, at which point the connectors will resume exactly where they left off.

Connector is stopped for a duration

If the connector is gracefully stopped, the replica sets can continue to be used and any new changes are recorded in MongoDB’s oplog. When the connector is restarted, it will resume reading the oplog for each replica set where it last left off, recording change events for all of the changes that were made while the connector was stopped. If the connector is stopped long enough such that MongoDB purges from its oplog some operations that the connector has not read, then upon startup the connector will perform an initial sync.

A properly configured Kafka cluster is capable of massive throughput. Kafka Connect is written with Kafka best practices, and given enough resources will also be able to handle very large numbers of database change events. Because of this, when a connector has been restarted after a while, it is very likely to catch up with the database, though how quickly will depend upon the capabilities and performance of Kafka and the volume of changes being made to the data in MongoDB.

If the connector remains stopped for long enough, MongoDB might purge older oplog files and the connector’s last position may be lost. In this case, when the connector configured with initial snapshot mode (the default) is finally restarted, the MongoDB server will no longer have the starting point and the connector will fail with an error.

MongoDB loses writes

It is possible for MongoDB to lose commits in specific failure situations. For example, if the primary applies a change and records it in its oplog before it then crashes unexpectedly, the secondary nodes may not have had a chance to read those changes from the primary’s oplog before the primary crashed. If one such secondary is then elected as primary, it’s oplog is missing the last changes that the old primary had recorded and no longer has those changes.

In these cases where MongoDB loses changes recorded in a primary’s oplog, it is possible that the MongoDB connector may or may not capture these lost changes. At this time, there is no way to prevent this side effect of MongoDB.