Distributed Local Writes for Insert Only Workloads

MongoDB Tag Aware Sharding allows administrators to control data distributionin a sharded cluster by defining ranges of the shard key and taggingthem to one or more shards.

This tutorial uses Zones along with a multi-datacentersharded cluster deployment and application-side logic to support distributedlocal writes, as well as high write availability in the event of a replica setelection or datacenter failure.

Tip

Changed in version 4.0.3: By defining the zones and the zone ranges before sharding an emptyor a non-existing collection, the shard collection operation createschunks for the defined zone ranges as well as any additional chunksto cover the entire range of the shard key values and performs aninitial chunk distribution based on the zone ranges. This initialcreation and distribution of chunks allows for faster setup of zonedsharding. After the initial distribution, the balancer manages thechunk distribution going forward.

See Pre-Define Zones and Zone Ranges for an Empty or Non-Existing Collection for an example.

Important

The concepts discussed in this tutorial require a specific deploymentarchitecture, as well as application-level logic.

These concepts require familiarity with MongoDB sharded clusters, replica sets, and the generalbehavior of zones.

This tutorial assumes an insert-only or insert-intensive workload. Theconcepts and strategies discussed in this tutorial are not well suited foruse cases that require fast reads or updates.

Scenario

Consider an insert-intensive application, where reads are infrequent and lowpriority compared to writes. The application writes documents to a shardedcollection, and requires near-constant uptime from thedatabase to support its SLAs or SLOs.

The following represents a partial view of the format of documents theapplication writes to the database:

  1. {
  2. "_id" : ObjectId("56f08c447fe58b2e96f595fa"),
  3. "message_id" : 329620,
  4. "datacenter" : "alfa",
  5. "userid" : 123,
  6. ...
  7. }
  8. {
  9. "_id" : ObjectId("56f08c447fe58b2e96f595fb"),
  10. "message_id" : 578494,
  11. "datacenter" : "bravo",
  12. "userid" : 456,
  13. ...
  14. }
  15. {
  16. "_id" : ObjectId("56f08c447fe58b2e96f595fc"),
  17. "message_id" : 689979,
  18. "datacenter" : "bravo",
  19. "userid" : 789,
  20. ...
  21. }

Shard Key

The collection uses the { datacenter : 1, userid : 1 } compound index asthe shard key.

The datacenter field in each document allows for creating a tag range oneach distinct datacenter value. Without the datacenter field, it would notbe possible to associate a document with a specific datacenter.

The userid field provides a high cardinalityand low frequency component to the shard keyrelative to datacenter.

See Choosing a Shard Key for moregeneral instructions on selecting a shard key.

Architecture

The deployment consists of two datacenters, alfa and bravo. There aretwo shards, shard0000 and shard0001. Each shard is a replicaset with three members. shard0000 has two members on alfa and onepriority 0 member on bravo.shard0001 has two members on bravo and one priority 0 member on alfa.

Diagram of sharded cluster architecture for high availability

Tags

This application requires one tag per datacenter. Each shard has onetag assigned to it based on the datacenter containing the majority ofits replica set members. There are two tag ranges, one for each datacenter.

  • alfa Datacenter
  • Tag shards with a majority of members on this datacenter as alfa.

Create a tag range with:

  • a lower bound of { "datacenter" : "alfa", "userid" : MinKey },
  • an upper bound of { "datacenter" : "alfa", "userid" : MaxKey }, and
  • the tag alfa
    • bravo Datacenter
    • Tag shards with a majority of members on this datacenter as bravo.

Create a tag range with:

  • a lower bound of { "datacenter" : "bravo", "userid" : MinKey },
  • an upper bound of { "datacenter" : "bravo", "userid" : MaxKey }, and
  • the tag bravo

Note

The MinKey andMaxKey values are reserved specialvalues for comparisons

Based on theconfigured tags and tag ranges, mongos routes documents withdatacenter : alfa to the alfa datacenter, and documents withdatacenter : bravo to the bravo datacenter.

Write Operations

If an inserted or updated document matches a configured tag range, it can onlybe written to a shard with the related tag.

MongoDB can write documents that do not match a configured tag range to anyshard in the cluster.

Note

The behavior described above requires the cluster to be in a steady statewith no chunks violating a configured tag range. See the following sectionon the balancer formore information.

Balancer

The balancermigrates the tagged chunks to the appropriate shard. Untilthe migration, shards may contain chunks that violate configured tag rangesand tags. Once balancing completes, shards should only contain chunks whoseranges do not violate its assigned tags and tag ranges.

Adding or removing tags or tag ranges can result in chunk migrations.Depending on the size of your data set and the number of chunks a tag rangeaffects, these migrations may impact cluster performance. Consider runningyour balancer during specific scheduled windows.See Schedule the Balancing Window for a tutorial on how to set ascheduling window.

Application Behavior

By default, the application writes to the nearest datacenter. If the localdatacenter is down, or if writes to that datacenter are not acknowledgedwithin a set time period, the application switches to the other availabledatacenter by changing the value of the datacenter field before attemptingto write the document to the database.

The application supports write timeouts. The application usesWrite Concern to set a timeout for each writeoperation.

If the application encounters a write or timeout error, it modifies thedatacenter field in each document and performs the write. This routes thedocument to the other datacenter. If both datacenters are down, then writescannot succeed. See Resolve Write Failure.

The application periodically checks connectivity to any datacenters marked as “down”. If connectivity is restored, the application cancontinue performing normal write operations.

Given the switching logic, as well as any load balancers or similar mechanismsin place to handle client traffic between datacenters, the application cannotpredict which of the two datacenters a given document was written to. Toensure that no documents are missed as a part of read operations, theapplication must perform broadcast queries by not including the datacenter field as apart of any query.

The application performs reads using a read preference of nearest to reduce latency.

It is possible for a write operation to succeed despite a reported timeouterror. The application responds to the error by attempting to re-writethe document to the other datacenter - this can result in a document beingduplicated across both datacenters. The application resolves duplicatesas a part of the read logic.

Switching Logic

The application has logic to switch datacenters if one or more writes fail, orif writes are not acknowledged within a set timeperiod. The application modifies the datacenter field based on the targetdatacenter’s tag to direct thedocument towards that datacenter.

For example, an application attempting to write to the alfa datacentermight follow this general procedure:

  • Attempt to write document, specifying datacenter : alfa.
  • On write timeout or error, log alfa as momentarily down.
  • Attempt to write same document, modifying datacenter : bravo.
  • On write timeout or error, log bravo as momentarily down.
  • If both alfa and bravo are down, log and report errors.See Resolve Write Failure.

Procedure

Configure Shard Tags

You must be connected to a mongos associated with the targetsharded cluster in order to proceed. You cannot create tags byconnecting directly to a shard replica set member.

Tag each shard.

Tag each shard in the alfa data center with the alfa tag.

  1. sh.addShardTag("shard0000", "alfa")

Tag each shard in the bravo data center with the bravo tag.

  1. sh.addShardTag("shard0001", "bravo")

You can review the tags assigned to any given shard by runningsh.status().

Define ranges for each tag.

Define the range for the alfa database and associate it to the alfatag using the sh.addTagRange() method. This method requires:

  • The full namespace of the target collection.
  • The inclusive lower bound of the range.
  • The exclusive upper bound of the range.
  • The name of the tag.
  1. sh.addTagRange(
  2. "<database>.<collection>",
  3. { "datacenter" : "alfa", "userid" : MinKey },
  4. { "datacenter" : "alfa", "userid" : MaxKey },
  5. "alfa"
  6. )

Define the range for the bravo database and associate it to thebravo tag using the sh.addTagRange() method. This methodrequires:

  • The full namespace of the target collection.
  • The inclusive lower bound of the range.
  • The exclusive upper bound of the range.
  • The name of the tag.
  1. sh.addTagRange(
  2. "<database>.<collection>",
  3. { "datacenter" : "bravo", "userid" : MinKey },
  4. { "datacenter" : "bravo", "userid" : MaxKey },
  5. "bravo"
  6. )

The MinKey and MaxKey values are reserved specialvalues for comparisons. MinKey always compares as less thanevery other possible value, while MaxKey always compares asgreater than every other possible value. The configured ranges capture everyuser for each datacenter.

Review the changes.

The next time the balancer runs, itsplits andmigrates chunks across theshards respecting the tag ranges and tags.

Once balancing finishes, the shards tagged as alfa should onlycontain documents with datacenter : alfa, while shards tagged asbravo should only contain documents with datacenter : bravo.

You can review the chunk distribution by running sh.status().

Resolve Write Failure

When the application’s default datacenter is down or inaccessible, theapplication changes the datacenter field to the otherdatacenter.

For example, the application attempts to write the following document to thealfa datacenter by default:

  1. {
  2. "_id" : ObjectId("56f08c447fe58b2e96f595fa"),
  3. "message_id" : 329620,
  4. "datacenter" : "alfa",
  5. "userid" : 123,
  6. ...
  7. }

If the application receives an error on attempted write, or if the writeacknowledgement takes too long, the application logs the datacenter asunavailable and alters the datacenter field to point to the bravodatacenter.

  1. {
  2. "_id" : ObjectId("56f08c457fe58b2e96f595fb"),
  3. "message_id" : 329620,
  4. "datacenter" : "bravo",
  5. "userid" : 123,
  6. ...
  7. }

The application periodically checks the alfa datacenter forconnectivity. If the datacenter is reachable again, the application can resumenormal writes.

Note

It is possible that the original write to datacenter : alfa succeeded,especially if the error was related to a timeout.If so, the document with message_id : 329620 may now be duplicatedacross both datacenters. Applications must resolve duplicates as a partof read operations.

Resolve Duplicate Documents on Reads

The application’s switching logic allows for potential document duplication.When performing reads, the application resolves any duplicate documents on theapplication layer.

The following query searches for documents where the userid is 123.Note that while userid is part of the shard key, the query does notinclude the datacenter field, and therefore does not perform atargeted read operation.

  1. db.collection.find( { "userid" : 123 } )

The results show that the document with message_id of 329620 has beeninserted into MongoDB twice, probably as a result of a delayed writeacknowledgement.

  1. {
  2. "_id" : ObjectId("56f08c447fe58b2e96f595fa"),
  3. "message_id" : 329620
  4. "datacenter" : "alfa",
  5. "userid" : 123,
  6. data : {...}
  7. }
  8. {
  9. "_id" : ObjectId("56f08c457fe58b2e96f595fb"),
  10. "message_id" : 329620
  11. "datacenter" : "bravo",
  12. "userid" : 123,
  13. ...
  14. }

The application can either ignore the duplicates, taking one of the twodocuments, or it can attempt to trim the duplicates until only a singledocument remains.

One method for trimming duplicates is to use theObjectId.getTimestamp() method to extract the timestamp from the_id field. The application can then keep either the first documentinserted, or the last document inserted. This assumes the_id field uses the MongoDB ObjectId.

For example, using getTimestamp() on the documentwith ObjectId("56f08c447fe58b2e96f595fa") returns:

  1. ISODate("2016-03-22T00:05:24Z")

Using getTimestamp() on the document withObjectId("56f08c457fe58b2e96f595fb") returns:

  1. ISODate("2016-03-22T00:05:25Z")