Drivers API

Callback API vs Core API

The Callback API:

The Core API:

Callback API

The new callback API incorporates logic:

  • Python
  • Java (Sync)
  • Node.js
  • PHP
  • C#

Important

  • For transactions on MongoDB 4.2 deployments (replicasets and sharded clusters), clients must useMongoDB drivers updated for MongoDB 4.2.
  • When using the drivers, each operation in thetransaction must be associated with the session (i.e.pass in the session to each operation).

The example uses the new callback API for working withtransactions, which starts a transaction, executes thespecified operations, and commits (or aborts on error). Thenew callback API incorporates retry logic for"TransientTransactionError" or"UnknownTransactionCommitResult" commit errors.

  1. # For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
  2. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
  3. # For a sharded cluster, connect to the mongos instances; e.g.
  4. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
  5.  
  6. client = MongoClient(uriString)
  7. wc_majority = WriteConcern("majority", wtimeout=1000)
  8.  
  9. # Prereq: Create collections. CRUD operations in transactions must be on existing collections.
  10. client.get_database(
  11. "mydb1", write_concern=wc_majority).foo.insert_one({'abc': 0})
  12. client.get_database(
  13. "mydb2", write_concern=wc_majority).bar.insert_one({'xyz': 0})
  14.  
  15. # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
  16. def callback(session):
  17. collection_one = session.client.mydb1.foo
  18. collection_two = session.client.mydb2.bar
  19.  
  20. # Important:: You must pass the session to the operations.
  21. collection_one.insert_one({'abc': 1}, session=session)
  22. collection_two.insert_one({'xyz': 999}, session=session)
  23.  
  24. # Step 2: Start a client session.
  25. with client.start_session() as session:
  26. # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
  27. session.with_transaction(
  28. callback, read_concern=ReadConcern('local'),
  29. write_concern=wc_majority,
  30. read_preference=ReadPreference.PRIMARY)
  31.  

Important

  • For transactions on MongoDB 4.2 deployments (replicasets and sharded clusters), clients must useMongoDB drivers updated for MongoDB 4.2.
  • When using the drivers, each operation in thetransaction must be associated with the session (i.e.pass in the session to each operation).

The example uses the new callback API for working withtransactions, which starts a transaction, executes thespecified operations, and commits (or aborts on error). Thenew callback API incorporates retry logic for"TransientTransactionError" or"UnknownTransactionCommitResult" commit errors.

  1. /*
  2. For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
  3. String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl";
  4. For a sharded cluster, connect to the mongos instances; e.g.
  5. String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin";
  6. */
  7.  
  8. final MongoClient client = MongoClients.create(uri);
  9.  
  10. /*
  11. Prereq: Create collections. CRUD operations in transactions must be on existing collections.
  12. */
  13.  
  14. client.getDatabase("mydb1").getCollection("foo")
  15. .withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("abc", 0));
  16. client.getDatabase("mydb2").getCollection("bar")
  17. .withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("xyz", 0));
  18.  
  19. /* Step 1: Start a client session. */
  20.  
  21. final ClientSession clientSession = client.startSession();
  22.  
  23. /* Step 2: Optional. Define options to use for the transaction. */
  24.  
  25. TransactionOptions txnOptions = TransactionOptions.builder()
  26. .readPreference(ReadPreference.primary())
  27. .readConcern(ReadConcern.LOCAL)
  28. .writeConcern(WriteConcern.MAJORITY)
  29. .build();
  30.  
  31. /* Step 3: Define the sequence of operations to perform inside the transactions. */
  32.  
  33. TransactionBody txnBody = new TransactionBody<String>() {
  34. public String execute() {
  35. MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo");
  36. MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar");
  37.  
  38. /*
  39. Important:: You must pass the session to the operations..
  40. */
  41.  
  42. coll1.insertOne(clientSession, new Document("abc", 1));
  43. coll2.insertOne(clientSession, new Document("xyz", 999));
  44.  
  45. return "Inserted into collections in different databases";
  46. }
  47. };
  48. try {
  49. /*
  50. Step 4: Use .withTransaction() to start a transaction,
  51. execute the callback, and commit (or abort on error).
  52. */
  53.  
  54. clientSession.withTransaction(txnBody, txnOptions);
  55. } catch (RuntimeException e) {
  56. // some error handling
  57. } finally {
  58. clientSession.close();
  59. }

Important

  • For transactions on MongoDB 4.2 deployments (replicasets and sharded clusters), clients must useMongoDB drivers updated for MongoDB 4.2.
  • When using the drivers, each operation in thetransaction must be associated with the session (i.e.pass in the session to each operation).

The example uses the new callback API for working withtransactions, which starts a transaction, executes thespecified operations, and commits (or aborts on error). Thenew callback API incorporates retry logic for"TransientTransactionError" or"UnknownTransactionCommitResult" commit errors.

  1. // For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
  2. // const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
  3. // For a sharded cluster, connect to the mongos instances; e.g.
  4. // const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
  5.  
  6. const client = new MongoClient(uri);
  7. await client.connect();
  8.  
  9. // Prereq: Create collections. CRUD operations in transactions must be on existing collections.
  10.  
  11. await client
  12. .db('mydb1')
  13. .collection('foo')
  14. .insertOne({ abc: 0 }, { w: 'majority' });
  15.  
  16. await client
  17. .db('mydb2')
  18. .collection('bar')
  19. .insertOne({ xyz: 0 }, { w: 'majority' });
  20.  
  21. // Step 1: Start a Client Session
  22. const session = client.startSession();
  23.  
  24. // Step 2: Optional. Define options to use for the transaction
  25. const transactionOptions = {
  26. readPreference: 'primary',
  27. readConcern: { level: 'local' },
  28. writeConcern: { w: 'majority' }
  29. };
  30.  
  31. // Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error)
  32. // Note: The callback for withTransaction MUST be async and/or return a Promise.
  33. try {
  34. await session.withTransaction(async () => {
  35. const coll1 = client.db('mydb1').collection('foo');
  36. const coll2 = client.db('mydb2').collection('bar');
  37.  
  38. // Important:: You must pass the session to the operations
  39.  
  40. await coll1.insertOne({ abc: 1 }, { session });
  41. await coll2.insertOne({ xyz: 999 }, { session });
  42. }, transactionOptions);
  43. } finally {
  44. await session.endSession();
  45. await client.close();
  46. }

The example uses the new callback API for working withtransactions, which starts a transaction, executes thespecified operations, and commits (or aborts on error). Thenew callback API also incorporates retry logic forTransientTransactionError orUnknownTransactionCommitResult commit errors.

Important

  • For transactions on MongoDB 4.2 deployments (replicasets and sharded clusters), clients must useMongoDB drivers updated for MongoDB 4.2.
  • When using the drivers, each operation in thetransaction must be associated with the session (i.e.pass in the session to each operation).
  1. /*
  2. * For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
  3. * uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
  4. * For a sharded cluster, connect to the mongos instances; e.g.
  5. * uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
  6. */
  7.  
  8. $client = new \MongoDB\Client($uriString);
  9.  
  10. // Prerequisite: Create collections. CRUD operations in transactions must be on existing collections.
  11. $client->selectCollection(
  12. 'mydb1',
  13. 'foo',
  14. [
  15. 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
  16. ]
  17. )->insertOne(['abc' => 0]);
  18.  
  19. $client->selectCollection(
  20. 'mydb2',
  21. 'bar',
  22. [
  23. 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
  24. ]
  25. )->insertOne(['xyz' => 0]);
  26.  
  27. // Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
  28.  
  29. $callback = function (\MongoDB\Driver\Session $session) use ($client) {
  30. $client
  31. ->selectCollection('mydb1', 'foo')
  32. ->insertOne(['abc' => 1], ['session' => $session]);
  33.  
  34. $client
  35. ->selectCollection('mydb2', 'bar')
  36. ->insertOne(['xyz' => 999], ['session' => $session]);
  37. };
  38.  
  39. // Step 2: Start a client session.
  40.  
  41. $session = $client->startSession();
  42.  
  43. // Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
  44.  
  45. $transactionOptions = [
  46. 'readConcern' => new \MongoDB\Driver\ReadConcern(\MongoDB\Driver\ReadConcern::LOCAL),
  47. 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
  48. 'readPreference' => new \MongoDB\Driver\ReadPreference(\MongoDB\Driver\ReadPreference::RP_PRIMARY),
  49. ];
  50.  
  51. \MongoDB\with_transaction($session, $callback, $transactionOptions);

The example uses the new callback API for working withtransactions, which starts a transaction, executes thespecified operations, and commits (or aborts on error). Thenew callback API also incorporates retry logic forTransientTransactionError orUnknownTransactionCommitResult commit errors.

Important

  • For transactions on MongoDB 4.2 deployments (replicasets and sharded clusters), clients must useMongoDB drivers updated for MongoDB 4.2.
  • When using the drivers, each operation in thetransaction must be associated with the session (i.e.pass in the session to each operation).
  1. // For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
  2. // string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl";
  3. // For a sharded cluster, connect to the mongos instances; e.g.
  4. // string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
  5. var client = new MongoClient(connectionString);
  6.  
  7. // Prereq: Create collections. CRUD operations in transactions must be on existing collections.
  8. var database1 = client.GetDatabase("mydb1");
  9. var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority);
  10. collection1.InsertOne(new BsonDocument("abc", 0));
  11.  
  12. var database2 = client.GetDatabase("mydb2");
  13. var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority);
  14. collection2.InsertOne(new BsonDocument("xyz", 0));
  15.  
  16. // Step 1: Start a client session.
  17. using (var session = client.StartSession())
  18. {
  19. // Step 2: Optional. Define options to use for the transaction.
  20. var transactionOptions = new TransactionOptions(
  21. readPreference: ReadPreference.Primary,
  22. readConcern: ReadConcern.Local,
  23. writeConcern: WriteConcern.WMajority);
  24.  
  25. // Step 3: Define the sequence of operations to perform inside the transactions
  26. var cancellationToken = CancellationToken.None; // normally a real token would be used
  27. result = session.WithTransaction(
  28. (s, ct) =>
  29. {
  30. collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct);
  31. collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct);
  32. return "Inserted into collections in different databases";
  33. },
  34. transactionOptions,
  35. cancellationToken);
  36. }

Core API

The core transaction API does not incorporate retry logic for errorslabeled:

To handle "TransientTransactionError", applications shouldexplicitly incorporate retry logic for the error.

To handle "UnknownTransactionCommitResult",applications should explicitly incorporate retry logic for the error.

The following example incorporates logic to retry the transaction fortransient errors and retry the commit for unknown commit error:

  • Python
  • Java (Sync)
  • Node.js
  • PHP
  • C
  • Other
    • C++11
    • C#
    • Perl
    • Ruby
    • Scala
    • Go

Important

To associate read and write operations with a transaction, you mustpass the session to each operation in the transaction.

  1. def run_transaction_with_retry(txn_func, session):
  2. while True:
  3. try:
  4. txn_func(session) # performs transaction
  5. break
  6. except (ConnectionFailure, OperationFailure) as exc:
  7. # If transient error, retry the whole transaction
  8. if exc.has_error_label("TransientTransactionError"):
  9. print("TransientTransactionError, retrying "
  10. "transaction ...")
  11. continue
  12. else:
  13. raise
  14.  
  15. def commit_with_retry(session):
  16. while True:
  17. try:
  18. # Commit uses write concern set at transaction start.
  19. session.commit_transaction()
  20. print("Transaction committed.")
  21. break
  22. except (ConnectionFailure, OperationFailure) as exc:
  23. # Can retry commit
  24. if exc.has_error_label("UnknownTransactionCommitResult"):
  25. print("UnknownTransactionCommitResult, retrying "
  26. "commit operation ...")
  27. continue
  28. else:
  29. print("Error during commit ...")
  30. raise
  31.  
  32. # Updates two collections in a transactions
  33.  
  34. def update_employee_info(session):
  35. employees_coll = session.client.hr.employees
  36. events_coll = session.client.reporting.events
  37.  
  38. with session.start_transaction(
  39. read_concern=ReadConcern("snapshot"),
  40. write_concern=WriteConcern(w="majority"),
  41. read_preference=ReadPreference.PRIMARY):
  42. employees_coll.update_one(
  43. {"employee": 3}, {"$set": {"status": "Inactive"}},
  44. session=session)
  45. events_coll.insert_one(
  46. {"employee": 3, "status": {
  47. "new": "Inactive", "old": "Active"}},
  48. session=session)
  49.  
  50. commit_with_retry(session)
  51.  
  52. # Start a session.
  53. with client.start_session() as session:
  54. try:
  55. run_transaction_with_retry(update_employee_info, session)
  56. except Exception as exc:
  57. # Do something with error.
  58. raise
  59.  

Important

To associate read and write operations with a transaction, you mustpass the session to each operation in the transaction.

  1. void runTransactionWithRetry(Runnable transactional) {
  2. while (true) {
  3. try {
  4. transactional.run();
  5. break;
  6. } catch (MongoException e) {
  7. System.out.println("Transaction aborted. Caught exception during transaction.");
  8.  
  9. if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
  10. System.out.println("TransientTransactionError, aborting transaction and retrying ...");
  11. continue;
  12. } else {
  13. throw e;
  14. }
  15. }
  16. }
  17. }
  18.  
  19. void commitWithRetry(ClientSession clientSession) {
  20. while (true) {
  21. try {
  22. clientSession.commitTransaction();
  23. System.out.println("Transaction committed");
  24. break;
  25. } catch (MongoException e) {
  26. // can retry commit
  27. if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
  28. System.out.println("UnknownTransactionCommitResult, retrying commit operation ...");
  29. continue;
  30. } else {
  31. System.out.println("Exception during commit ...");
  32. throw e;
  33. }
  34. }
  35. }
  36. }
  37.  
  38. void updateEmployeeInfo() {
  39.  
  40. MongoCollection<Document> employeesCollection = client.getDatabase("hr").getCollection("employees");
  41. MongoCollection<Document> eventsCollection = client.getDatabase("reporting").getCollection("events");
  42.  
  43. TransactionOptions txnOptions = TransactionOptions.builder()
  44. .readPreference(ReadPreference.primary())
  45. .readConcern(ReadConcern.MAJORITY)
  46. .writeConcern(WriteConcern.MAJORITY)
  47. .build();
  48.  
  49. try (ClientSession clientSession = client.startSession()) {
  50. clientSession.startTransaction(txnOptions);
  51.  
  52. employeesCollection.updateOne(clientSession,
  53. Filters.eq("employee", 3),
  54. Updates.set("status", "Inactive"));
  55. eventsCollection.insertOne(clientSession,
  56. new Document("employee", 3).append("status", new Document("new", "Inactive").append("old", "Active")));
  57.  
  58. commitWithRetry(clientSession);
  59. }
  60. }
  61.  
  62.  
  63. void updateEmployeeInfoWithRetry() {
  64. runTransactionWithRetry(this::updateEmployeeInfo);
  65. }

Important

To associate read and write operations with a transaction, you mustpass the session to each operation in the transaction.

  1. async function commitWithRetry(session) {
  2. try {
  3. await session.commitTransaction();
  4. console.log('Transaction committed.');
  5. } catch (error) {
  6. if (
  7. error.errorLabels &&
  8. error.errorLabels.indexOf('UnknownTransactionCommitResult') >= 0
  9. ) {
  10. console.log('UnknownTransactionCommitResult, retrying commit operation ...');
  11. await commitWithRetry(session);
  12. } else {
  13. console.log('Error during commit ...');
  14. throw error;
  15. }
  16. }
  17. }
  18.  
  19. async function runTransactionWithRetry(txnFunc, client, session) {
  20. try {
  21. await txnFunc(client, session);
  22. } catch (error) {
  23. console.log('Transaction aborted. Caught exception during transaction.');
  24.  
  25. // If transient error, retry the whole transaction
  26. if (error.errorLabels && error.errorLabels.indexOf('TransientTransactionError') >= 0) {
  27. console.log('TransientTransactionError, retrying transaction ...');
  28. await runTransactionWithRetry(txnFunc, client, session);
  29. } else {
  30. throw error;
  31. }
  32. }
  33. }
  34.  
  35. async function updateEmployeeInfo(client, session) {
  36. session.startTransaction({
  37. readConcern: { level: 'snapshot' },
  38. writeConcern: { w: 'majority' },
  39. readPreference: 'primary'
  40. });
  41.  
  42. const employeesCollection = client.db('hr').collection('employees');
  43. const eventsCollection = client.db('reporting').collection('events');
  44.  
  45. await employeesCollection.updateOne(
  46. { employee: 3 },
  47. { $set: { status: 'Inactive' } },
  48. { session }
  49. );
  50. await eventsCollection.insertOne(
  51. {
  52. employee: 3,
  53. status: { new: 'Inactive', old: 'Active' }
  54. },
  55. { session }
  56. );
  57.  
  58. try {
  59. await commitWithRetry(session);
  60. } catch (error) {
  61. await session.abortTransaction();
  62. throw error;
  63. }
  64. }
  65.  
  66. return client.withSession(session =>
  67. runTransactionWithRetry(updateEmployeeInfo, client, session)
  68. );

Important

To associate read and write operations with a transaction, you mustpass the session to each operation in the transaction.

  1. private function runTransactionWithRetry3(callable $txnFunc, \MongoDB\Client $client, \MongoDB\Driver\Session $session)
  2. {
  3. while (true) {
  4. try {
  5. $txnFunc($client, $session); // performs transaction
  6. break;
  7. } catch (\MongoDB\Driver\Exception\CommandException $error) {
  8. $resultDoc = $error->getResultDocument();
  9.  
  10. // If transient error, retry the whole transaction
  11. if (isset($resultDoc->errorLabels) && in_array('TransientTransactionError', $resultDoc->errorLabels)) {
  12. continue;
  13. } else {
  14. throw $error;
  15. }
  16. } catch (\MongoDB\Driver\Exception\Exception $error) {
  17. throw $error;
  18. }
  19. }
  20. }
  21.  
  22. private function commitWithRetry3(\MongoDB\Driver\Session $session)
  23. {
  24. while (true) {
  25. try {
  26. $session->commitTransaction();
  27. echo "Transaction committed.\n";
  28. break;
  29. } catch (\MongoDB\Driver\Exception\CommandException $error) {
  30. $resultDoc = $error->getResultDocument();
  31.  
  32. if (isset($resultDoc->errorLabels) && in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels)) {
  33. echo "UnknownTransactionCommitResult, retrying commit operation ...\n";
  34. continue;
  35. } else {
  36. echo "Error during commit ...\n";
  37. throw $error;
  38. }
  39. } catch (\MongoDB\Driver\Exception\Exception $error) {
  40. echo "Error during commit ...\n";
  41. throw $error;
  42. }
  43. }
  44. }
  45.  
  46. private function updateEmployeeInfo3(\MongoDB\Client $client, \MongoDB\Driver\Session $session)
  47. {
  48. $session->startTransaction([
  49. 'readConcern' => new \MongoDB\Driver\ReadConcern("snapshot"),
  50. 'readPrefernece' => new \MongoDB\Driver\ReadPreference(\MongoDB\Driver\ReadPreference::RP_PRIMARY),
  51. 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY),
  52. ]);
  53.  
  54. try {
  55. $client->hr->employees->updateOne(
  56. ['employee' => 3],
  57. ['$set' => ['status' => 'Inactive']],
  58. ['session' => $session]
  59. );
  60. $client->reporting->events->insertOne(
  61. ['employee' => 3, 'status' => [ 'new' => 'Inactive', 'old' => 'Active']],
  62. ['session' => $session]
  63. );
  64. } catch (\MongoDB\Driver\Exception\Exception $error) {
  65. echo "Caught exception during transaction, aborting.\n";
  66. $session->abortTransaction();
  67. throw $error;
  68. }
  69.  
  70. $this->commitWithRetry3($session);
  71. }
  72.  
  73. private function doUpdateEmployeeInfo(\MongoDB\Client $client)
  74. {
  75. // Start a session.
  76. $session = $client->startSession();
  77.  
  78. try {
  79. $this->runTransactionWithRetry3([$this, 'updateEmployeeInfo3'], $client, $session);
  80. } catch (\MongoDB\Driver\Exception\Exception $error) {
  81. // Do something with error
  82. }
  83. }
  1. /* takes a session, an out-param for server reply, and out-param for error. */
  2. typedef bool (*txn_func_t) (mongoc_client_session_t *,
  3. bson_t *,
  4. bson_error_t *);
  5.  
  6.  
  7. /* runs transactions with retry logic */
  8. bool
  9. run_transaction_with_retry (txn_func_t txn_func,
  10. mongoc_client_session_t *cs,
  11. bson_error_t *error)
  12. {
  13. bson_t reply;
  14. bool r;
  15.  
  16. while (true) {
  17. /* perform transaction */
  18. r = txn_func (cs, &reply, error);
  19. if (r) {
  20. /* success */
  21. bson_destroy (&reply);
  22. return true;
  23. }
  24.  
  25. MONGOC_WARNING ("Transaction aborted: %s", error->message);
  26. if (mongoc_error_has_label (&reply, "TransientTransactionError")) {
  27. /* on transient error, retry the whole transaction */
  28. MONGOC_WARNING ("TransientTransactionError, retrying transaction...");
  29. bson_destroy (&reply);
  30. } else {
  31. /* non-transient error */
  32. break;
  33. }
  34. }
  35.  
  36. bson_destroy (&reply);
  37. return false;
  38. }
  39.  
  40.  
  41. /* commit transactions with retry logic */
  42. bool
  43. commit_with_retry (mongoc_client_session_t *cs, bson_error_t *error)
  44. {
  45. bson_t reply;
  46. bool r;
  47.  
  48. while (true) {
  49. /* commit uses write concern set at transaction start, see
  50. * mongoc_transaction_opts_set_write_concern */
  51. r = mongoc_client_session_commit_transaction (cs, &reply, error);
  52. if (r) {
  53. MONGOC_INFO ("Transaction committed");
  54. break;
  55. }
  56.  
  57. if (mongoc_error_has_label (&reply, "UnknownTransactionCommitResult")) {
  58. MONGOC_WARNING ("UnknownTransactionCommitResult, retrying commit ...");
  59. bson_destroy (&reply);
  60. } else {
  61. /* commit failed, cannot retry */
  62. break;
  63. }
  64. }
  65.  
  66. bson_destroy (&reply);
  67.  
  68. return r;
  69. }
  70.  
  71.  
  72. /* updates two collections in a transaction and calls commit_with_retry */
  73. bool
  74. update_employee_info (mongoc_client_session_t *cs,
  75. bson_t *reply,
  76. bson_error_t *error)
  77. {
  78. mongoc_client_t *client;
  79. mongoc_collection_t *employees;
  80. mongoc_collection_t *events;
  81. mongoc_read_concern_t *rc;
  82. mongoc_write_concern_t *wc;
  83. mongoc_transaction_opt_t *txn_opts;
  84. bson_t opts = BSON_INITIALIZER;
  85. bson_t *filter = NULL;
  86. bson_t *update = NULL;
  87. bson_t *event = NULL;
  88. bool r;
  89.  
  90. bson_init (reply);
  91.  
  92. client = mongoc_client_session_get_client (cs);
  93. employees = mongoc_client_get_collection (client, "hr", "employees");
  94. events = mongoc_client_get_collection (client, "reporting", "events");
  95.  
  96. rc = mongoc_read_concern_new ();
  97. mongoc_read_concern_set_level (rc, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
  98. wc = mongoc_write_concern_new ();
  99. mongoc_write_concern_set_w (wc, MONGOC_WRITE_CONCERN_W_MAJORITY);
  100. txn_opts = mongoc_transaction_opts_new ();
  101. mongoc_transaction_opts_set_read_concern (txn_opts, rc);
  102. mongoc_transaction_opts_set_write_concern (txn_opts, wc);
  103.  
  104. r = mongoc_client_session_start_transaction (cs, txn_opts, error);
  105. if (!r) {
  106. goto done;
  107. }
  108.  
  109. r = mongoc_client_session_append (cs, &opts, error);
  110. if (!r) {
  111. goto done;
  112. }
  113.  
  114. filter = BCON_NEW ("employee", BCON_INT32 (3));
  115. update = BCON_NEW ("$set", "{", "status", "Inactive", "}");
  116. /* mongoc_collection_update_one will reinitialize reply */
  117. bson_destroy (reply);
  118. r = mongoc_collection_update_one (
  119. employees, filter, update, &opts, reply, error);
  120.  
  121. if (!r) {
  122. goto abort;
  123. }
  124.  
  125. event = BCON_NEW ("employee", BCON_INT32 (3));
  126. BCON_APPEND (event, "status", "{", "new", "Inactive", "old", "Active", "}");
  127.  
  128. bson_destroy (reply);
  129. r = mongoc_collection_insert_one (events, event, &opts, reply, error);
  130. if (!r) {
  131. goto abort;
  132. }
  133.  
  134. r = commit_with_retry (cs, error);
  135.  
  136. abort:
  137. if (!r) {
  138. MONGOC_ERROR ("Aborting due to error in transaction: %s", error->message);
  139. mongoc_client_session_abort_transaction (cs, NULL);
  140. }
  141.  
  142. done:
  143. mongoc_collection_destroy (employees);
  144. mongoc_collection_destroy (events);
  145. mongoc_read_concern_destroy (rc);
  146. mongoc_write_concern_destroy (wc);
  147. mongoc_transaction_opts_destroy (txn_opts);
  148. bson_destroy (&opts);
  149. bson_destroy (filter);
  150. bson_destroy (update);
  151. bson_destroy (event);
  152.  
  153. return r;
  154. }
  155.  
  156.  
  157. void
  158. example_func (mongoc_client_t *client)
  159. {
  160. mongoc_client_session_t *cs;
  161. bson_error_t error;
  162. bool r;
  163.  
  164. cs = mongoc_client_start_session (client, NULL, &error);
  165. if (!cs) {
  166. MONGOC_ERROR ("Could not start session: %s", error.message);
  167. return;
  168. }
  169.  
  170. r = run_transaction_with_retry (update_employee_info, cs, &error);
  171. if (!r) {
  172. MONGOC_ERROR ("Could not update employee, permanent error: %s",
  173. error.message);
  174. }
  175.  
  176. mongoc_client_session_destroy (cs);
  177. }
  1. using transaction_func = std::function<void(client_session & session)>;
  2. auto run_transaction_with_retry = [](transaction_func txn_func, client_session& session) {
  3. while (true) {
  4. try {
  5. txn_func(session); // performs transaction.
  6. break;
  7. } catch (const operation_exception& oe) {
  8. std::cout << "Transaction aborted. Caught exception during transaction."
  9. << std::endl;
  10. // If transient error, retry the whole transaction.
  11. if (oe.has_error_label("TransientTransactionError")) {
  12. std::cout << "TransientTransactionError, retrying transaction ..."
  13. << std::endl;
  14. continue;
  15. } else {
  16. throw oe;
  17. }
  18. }
  19. }
  20. };
  21.  
  22. auto commit_with_retry = [](client_session& session) {
  23. while (true) {
  24. try {
  25. session.commit_transaction(); // Uses write concern set at transaction start.
  26. std::cout << "Transaction committed." << std::endl;
  27. break;
  28. } catch (const operation_exception& oe) {
  29. // Can retry commit
  30. if (oe.has_error_label("UnknownTransactionCommitResult")) {
  31. std::cout << "UnknownTransactionCommitResult, retrying commit operation ..."
  32. << std::endl;
  33. continue;
  34. } else {
  35. std::cout << "Error during commit ..." << std::endl;
  36. throw oe;
  37. }
  38. }
  39. }
  40. };
  41.  
  42. // Updates two collections in a transaction
  43. auto update_employee_info = [&](client_session& session) {
  44. auto& client = session.client();
  45. auto employees = client["hr"]["employees"];
  46. auto events = client["reporting"]["events"];
  47.  
  48. options::transaction txn_opts;
  49. read_concern rc;
  50. rc.acknowledge_level(read_concern::level::k_snapshot);
  51. txn_opts.read_concern(rc);
  52. write_concern wc;
  53. wc.acknowledge_level(write_concern::level::k_majority);
  54. txn_opts.write_concern(wc);
  55.  
  56. session.start_transaction(txn_opts);
  57.  
  58. try {
  59. employees.update_one(
  60. make_document(kvp("employee", 3)),
  61. make_document(kvp("$set", make_document(kvp("status", "Inactive")))));
  62. events.insert_one(make_document(
  63. kvp("employee", 3),
  64. kvp("status", make_document(kvp("new", "Inactive"), kvp("old", "Active")))));
  65. } catch (const operation_exception& oe) {
  66. std::cout << "Caught exception during transaction, aborting." << std::endl;
  67. session.abort_transaction();
  68. throw oe;
  69. }
  70.  
  71. commit_with_retry(session);
  72. };
  73.  
  74. auto session = client.start_session();
  75. try {
  76. run_transaction_with_retry(update_employee_info, session);
  77. } catch (const operation_exception& oe) {
  78. // Do something with error.
  79. throw oe;
  80. }
  1. public void RunTransactionWithRetry(Action<IMongoClient, IClientSessionHandle> txnFunc, IMongoClient client, IClientSessionHandle session)
  2. {
  3. while (true)
  4. {
  5. try
  6. {
  7. txnFunc(client, session); // performs transaction
  8. break;
  9. }
  10. catch (MongoException exception)
  11. {
  12. // if transient error, retry the whole transaction
  13. if (exception.HasErrorLabel("TransientTransactionError"))
  14. {
  15. Console.WriteLine("TransientTransactionError, retrying transaction.");
  16. continue;
  17. }
  18. else
  19. {
  20. throw;
  21. }
  22. }
  23. }
  24. }
  25.  
  26. public void CommitWithRetry(IClientSessionHandle session)
  27. {
  28. while (true)
  29. {
  30. try
  31. {
  32. session.CommitTransaction();
  33. Console.WriteLine("Transaction committed.");
  34. break;
  35. }
  36. catch (MongoException exception)
  37. {
  38. // can retry commit
  39. if (exception.HasErrorLabel("UnknownTransactionCommitResult"))
  40. {
  41. Console.WriteLine("UnknownTransactionCommitResult, retrying commit operation");
  42. continue;
  43. }
  44. else
  45. {
  46. Console.WriteLine($"Error during commit: {exception.Message}.");
  47. throw;
  48. }
  49. }
  50. }
  51. }
  52.  
  53. // updates two collections in a transaction
  54. public void UpdateEmployeeInfo(IMongoClient client, IClientSessionHandle session)
  55. {
  56. var employeesCollection = client.GetDatabase("hr").GetCollection<BsonDocument>("employees");
  57. var eventsCollection = client.GetDatabase("reporting").GetCollection<BsonDocument>("events");
  58.  
  59. session.StartTransaction(new TransactionOptions(
  60. readConcern: ReadConcern.Snapshot,
  61. writeConcern: WriteConcern.WMajority,
  62. readPreference: ReadPreference.Primary));
  63.  
  64. try
  65. {
  66. employeesCollection.UpdateOne(
  67. session,
  68. Builders<BsonDocument>.Filter.Eq("employee", 3),
  69. Builders<BsonDocument>.Update.Set("status", "Inactive"));
  70. eventsCollection.InsertOne(
  71. session,
  72. new BsonDocument
  73. {
  74. { "employee", 3 },
  75. { "status", new BsonDocument { { "new", "Inactive" }, { "old", "Active" } } }
  76. });
  77. }
  78. catch (Exception exception)
  79. {
  80. Console.WriteLine($"Caught exception during transaction, aborting: {exception.Message}.");
  81. session.AbortTransaction();
  82. throw;
  83. }
  84.  
  85. CommitWithRetry(session);
  86. }
  87.  
  88. public void UpdateEmployeeInfoWithTransactionRetry(IMongoClient client)
  89. {
  90. // start a session
  91. using (var session = client.StartSession())
  92. {
  93. try
  94. {
  95. RunTransactionWithRetry(UpdateEmployeeInfo, client, session);
  96. }
  97. catch (Exception exception)
  98. {
  99. // do something with error
  100. Console.WriteLine($"Non transient exception caught during transaction: ${exception.Message}.");
  101. }
  102. }
  103. }

Important

To associate read and write operations with a transaction, you mustpass the session to each operation in the transaction.

  1. sub runTransactionWithRetry {
  2. my ( $txnFunc, $session ) = @_;
  3.  
  4. LOOP: {
  5. eval {
  6. $txnFunc->($session); # performs transaction
  7. };
  8. if ( my $error = $@ ) {
  9. print("Transaction aborted-> Caught exception during transaction.\n");
  10. # If transient error, retry the whole transaction
  11. if ( $error->has_error_label("TransientTransactionError") ) {
  12. print("TransientTransactionError, retrying transaction ->..\n");
  13. redo LOOP;
  14. }
  15. else {
  16. die $error;
  17. }
  18. }
  19. }
  20.  
  21. return;
  22. }
  23.  
  24. sub commitWithRetry {
  25. my ($session) = @_;
  26.  
  27. LOOP: {
  28. eval {
  29. $session->commit_transaction(); # Uses write concern set at transaction start.
  30. print("Transaction committed->\n");
  31. };
  32. if ( my $error = $@ ) {
  33. # Can retry commit
  34. if ( $error->has_error_label("UnknownTransactionCommitResult") ) {
  35. print("UnknownTransactionCommitResult, retrying commit operation ->..\n");
  36. redo LOOP;
  37. }
  38. else {
  39. print("Error during commit ->..\n");
  40. die $error;
  41. }
  42. }
  43. }
  44.  
  45. return;
  46. }
  47.  
  48. # Updates two collections in a transactions
  49.  
  50. sub updateEmployeeInfo {
  51. my ($session) = @_;
  52. my $employeesCollection = $session->client->ns("hr.employees");
  53. my $eventsCollection = $session->client->ns("reporting.events");
  54.  
  55. $session->start_transaction(
  56. {
  57. readConcern => { level => "snapshot" },
  58. writeConcern => { w => "majority" },
  59. readPreference => 'primary',
  60. }
  61. );
  62.  
  63. eval {
  64. $employeesCollection->update_one(
  65. { employee => 3 }, { '$set' => { status => "Inactive" } },
  66. { session => $session},
  67. );
  68. $eventsCollection->insert_one(
  69. { employee => 3, status => { new => "Inactive", old => "Active" } },
  70. { session => $session},
  71. );
  72. };
  73. if ( my $error = $@ ) {
  74. print("Caught exception during transaction, aborting->\n");
  75. $session->abort_transaction();
  76. die $error;
  77. }
  78.  
  79. commitWithRetry($session);
  80. }
  81.  
  82. # Start a session
  83. my $session = $client->start_session();
  84.  
  85. eval {
  86. runTransactionWithRetry(\&updateEmployeeInfo, $session);
  87. };
  88. if ( my $error = $@ ) {
  89. # Do something with error
  90. }
  91.  
  92. $session->end_session();

Important

To associate read and write operations with a transaction, you mustpass the session to each operation in the transaction.

  1. def run_transaction_with_retry(session)
  2. begin
  3. yield session # performs transaction
  4. rescue Mongo::Error => e
  5. puts 'Transaction aborted. Caught exception during transaction.'
  6. raise unless e.label?('TransientTransactionError')
  7. puts "TransientTransactionError, retrying transaction ..."
  8. retry
  9. end
  10. end
  11.  
  12. def commit_with_retry(session)
  13. begin
  14. session.commit_transaction
  15. puts 'Transaction committed.'
  16. rescue Mongo::Error => e
  17. if e.label?('UnknownTransactionCommitResult')
  18. puts "UnknownTransactionCommitResult, retrying commit operation ..."
  19. retry
  20. else
  21. puts 'Error during commit ...'
  22. raise
  23. end
  24. end
  25. end
  26.  
  27. # updates two collections in a transaction
  28.  
  29. def update_employee_info(session)
  30. employees_coll = session.client.use(:hr)[:employees]
  31. events_coll = session.client.use(:reporting)[:events]
  32.  
  33. session.start_transaction(read_concern: { level: :snapshot },
  34. write_concern: { w: :majority })
  35. employees_coll.update_one({ employee: 3 }, { '$set' => { status: 'Inactive'} },
  36. session: session)
  37. events_coll.insert_one({ employee: 3, status: { new: 'Inactive', old: 'Active' } },
  38. session: session)
  39. commit_with_retry(session)
  40. end
  41.  
  42. session = client.start_session
  43.  
  44. begin
  45. run_transaction_with_retry(session) { |s| update_employee_info(s) }
  46. rescue StandardError => e
  47. # Do something with error
  48. raise
  49. end

Important

To associate read and write operations with a transaction, you mustpass the session to each operation in the transaction.

  1. def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
  2. observable.map(clientSession => {
  3. val employeesCollection = database.getCollection("employees")
  4. val eventsCollection = database.getCollection("events")
  5.  
  6. val transactionOptions = TransactionOptions.builder()
  7. .readPreference(ReadPreference.primary())
  8. .readConcern(ReadConcern.SNAPSHOT)
  9. .writeConcern(WriteConcern.MAJORITY)
  10. .build()
  11. clientSession.startTransaction(transactionOptions)
  12. employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))
  13. .subscribe((res: UpdateResult) => println(res))
  14. eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
  15. .subscribe((res: Completed) => println(res))
  16.  
  17. clientSession
  18. })
  19. }
  20.  
  21. def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
  22. observable.recoverWith({
  23. case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
  24. println("UnknownTransactionCommitResult, retrying commit operation ...")
  25. commitAndRetry(observable)
  26. }
  27. case e: Exception => {
  28. println(s"Exception during commit ...: $e")
  29. throw e
  30. }
  31. })
  32. }
  33.  
  34. def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
  35. observable.recoverWith({
  36. case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
  37. println("TransientTransactionError, aborting transaction and retrying ...")
  38. runTransactionAndRetry(observable)
  39. }
  40. })
  41. }
  42.  
  43. def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {
  44.  
  45. val database = client.getDatabase("hr")
  46. val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
  47. val commitTransactionObservable: SingleObservable[Completed] =
  48. updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
  49. val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
  50.  
  51. runTransactionAndRetry(commitAndRetryObservable)
  52. }
  53. }
  1. runTransactionWithRetry := func(sctx mongo.SessionContext, txnFn func(mongo.SessionContext) error) error {
  2. for {
  3. err := txnFn(sctx) // Performs transaction.
  4. if err == nil {
  5. return nil
  6. }
  7.  
  8. log.Println("Transaction aborted. Caught exception during transaction.")
  9.  
  10. // If transient error, retry the whole transaction
  11. if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") {
  12. log.Println("TransientTransactionError, retrying transaction...")
  13. continue
  14. }
  15. return err
  16. }
  17. }
  18.  
  19. commitWithRetry := func(sctx mongo.SessionContext) error {
  20. for {
  21. err := sctx.CommitTransaction(sctx)
  22. switch e := err.(type) {
  23. case nil:
  24. log.Println("Transaction committed.")
  25. return nil
  26. case mongo.CommandError:
  27. // Can retry commit
  28. if e.HasErrorLabel("UnknownTransactionCommitResult") {
  29. log.Println("UnknownTransactionCommitResult, retrying commit operation...")
  30. continue
  31. }
  32. log.Println("Error during commit...")
  33. return e
  34. default:
  35. log.Println("Error during commit...")
  36. return e
  37. }
  38. }
  39. }
  40.  
  41. // Updates two collections in a transaction.
  42. updateEmployeeInfo := func(sctx mongo.SessionContext) error {
  43. employees := client.Database("hr").Collection("employees")
  44. events := client.Database("reporting").Collection("events")
  45.  
  46. err := sctx.StartTransaction(options.Transaction().
  47. SetReadConcern(readconcern.Snapshot()).
  48. SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
  49. )
  50. if err != nil {
  51. return err
  52. }
  53.  
  54. _, err = employees.UpdateOne(sctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}})
  55. if err != nil {
  56. sctx.AbortTransaction(sctx)
  57. log.Println("caught exception during transaction, aborting.")
  58. return err
  59. }
  60. _, err = events.InsertOne(sctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}})
  61. if err != nil {
  62. sctx.AbortTransaction(sctx)
  63. log.Println("caught exception during transaction, aborting.")
  64. return err
  65. }
  66.  
  67. return commitWithRetry(sctx)
  68. }
  69.  
  70. return client.UseSessionWithOptions(
  71. ctx, options.Session().SetDefaultReadPreference(readpref.Primary()),
  72. func(sctx mongo.SessionContext) error {
  73. return runTransactionWithRetry(sctx, updateEmployeeInfo)
  74. },
  75. )
  76. }

Driver Versions

For transactions on MongoDB 4.2 deployments (replica sets and shardedclusters), clients must use MongoDB drivers updated for MongoDB4.2:

- C 1.15.0- C# 2.9.0- Go 1.1- Java 3.11.0- Node 3.3.0- Perl 2.2.0- Python 3.9.0- Ruby 2.10.0- Scala 2.7.0

For transactions on MongoDB 4.0 replica sets, clients require MongoDBdrivers updated for MongoDB 4.0 or later.

- Java 3.8.0- Python 3.7.0- C 1.11.0- C# 2.7- Node 3.1.0- Ruby 2.6.0- Perl 2.0.0- PHPC 1.5.0- Scala 2.4.0

Transaction Error Handling

Regardless of the database system, whether MongoDB or relationaldatabases, applications should take measures to handle errors duringtransaction commits and incorporate retry logic for transactions.

"TransientTransactionError"

The individual write operations inside the transaction are notretryable, regardless of the value of retryWrites. If anoperation encounters an error associated with the label"TransientTransactionError", such as when the primary steps down,the transaction as a whole can be retried.

  • The callback API incorporates retry logic for"TransientTransactionError".
  • The core transaction API does not incorporate retry logicfor "TransientTransactionError". To handle"TransientTransactionError", applications should explicitlyincorporate retry logic for the error.

"UnknownTransactionCommitResult"

The commit operations are retryable write operations. If the commit operation encounters an error,MongoDB drivers retry the commit regardless of the value ofretryWrites.

If the commit operation encounters an error labeled"UnknownTransactionCommitResult", the commit can be retried.

  • The callback API incorporates retry logic for"UnknownTransactionCommitResult".
  • The core transaction API does not incorporate retry logic for"UnknownTransactionCommitResult". To handle"UnknownTransactionCommitResult", applications should explicitlyincorporate retry logic for the error.

Driver Version Errors

On sharded clusters with multiple mongos instances,performing transactions with drivers updated for MongoDB 4.0 (insteadof MongoDB 4.2) will fail and can result in errors, including:

Note

Your driver may return a different error. Refer to your driver’sdocumentation for details.

Error CodeError Message
251cannot continue txnId -1 for session … with txnId 1
50940cannot commit with no participants

For transactions on MongoDB 4.2 deployments (replica sets and shardedclusters), use the MongoDB drivers updated for MongoDB 4.2

Additional Information

The following mongo shell methods are available fortransactions: