Transactions
Amazon DocumentDB(与 MongoDB 兼容)现在支持 MongoDB 4.0 兼容性,包括事务。您可以跨多个文档、语句、集合和数据库执行事务。利用事务,您可以对 Amazon DocumentDB 集群中的一个或多个文档执行原子、一致、隔离和持久 (ACID) 操作,从而简化应用程序开发。交易的常见使用案例包括财务处理、履行和管理订单以及构建多人游戏。
交易不会产生额外费用。您只需为作为事务一部分使用的读取和写入 IOs 付费。
Requirements
要使用事务功能,您需要满足以下要求:
您必须使用 Amazon DocumentDB 4.0 引擎。
您必须使用与 MongoDB 4.0 或更高版本兼容的驱动程序。
最佳实践
下面是一些最佳做法,以便您能充分利用与 Amazon DocumentDB 的事务。
在事务完成后,始终提交或中止事务。使事务保持在不完整的状态会联系数据库资源,并可能导致写冲突。
建议将事务保持在所需的最小命令数。如果您的事务具有多个可拆分为多个较小事务的语句,则建议这样做以降低超时的可能性。始终旨在创建短事务,而不是长时间运行的读取。
Limitations
Amazon DocumentDB 不支持在事务中使用游标。
Amazon DocumentDB 无法在事务中创建新集合,并且无法针对非现有集合进行查询/更新。
文档级写入锁定受 1 分钟超时的约束,用户无法配置该超时。
不支持可重试写入、可重试提交和可重试中止。
每个 Amazon DocumentDB 实例对于同时在实例上打开的并发事务的数量都有一个上限。有关限制,请参阅实例限制。
对于给定事务,事务日志大小必须小于 32MB。
Amazon DocumentDB 在事务中支持
count(),但并非所有驱动程序都支持此功能。一种替代方法是使用countDocuments()API,它将计数查询转换为客户端的聚合查询。事务具有一分钟执行限制,会话超时为 30 分钟。如果事务超时,则该事务将中止,并且现有事务的会话中发出的任何后续命令将生成以下错误:
WriteCommandError({"ok" : 0,"operationTime" : Timestamp(1603491424, 627726),"code" : 251,"errmsg" : "Given transaction number 0 does not match any in-progress transactions."})
监控和诊断
在 Amazon DocumentDB 4.0 中支持事务时,添加了其他 CloudWatch 指标,以帮助您监控事务。
新 CloudWatch 指标
DatabaseTransactions:在一分钟期间内进行的打开事务数。DatabaseTransactionsAborted:在一分钟周期内执行的已中止事务数。DatabaseTransactionsMax:一分钟周期内打开的事务的最大数量。TransactionsAborted:一分钟内在实例上中止的事务数。TransactionsCommitted:一分钟内在实例上提交的事务数。TransactionsOpen:在以一分钟为间隔在实例上打开的事务数。TransactionsOpenMax:在一分钟内在实例上打开的事务的最大数量。TransactionsStarted:在一分钟内在实例上启动的事务数。
注意
有关 CloudWatch 的更多 Amazon DocumentDB 指标,请转到 使用 Amazon DocumentDB 监控 CloudWatch。
此外,新字段已添加到 currentOp lsid、transactionThreadId 和“idle transaction”和 serverStatus 事务的新状态:currentActive、currentInactive、currentOpen、totalAborted、totalCommitted 和 totalStarted。
事务隔离级别
在启动事务时,您能够同时指定 readConcern 和 writeConcern,如以下示例所示:
mySession.startTransaction({readConcern: {level: 'snapshot'}, writeConcern: {w: 'majority'}});
对于 readConcern,Amazon DocumentDB 默认情况下支持快照隔离。如果指定了本地、可用或大多数 readConcern,Amazon DocumentDB 会将 readConcern 级别升级到快照。Amazon DocumentDB 不支持可线性的 readConcern,并且指定此类读取问题会导致错误。
对于 writeConcern,默认情况下,Amazon DocumentDB 支持大多数写入 quorum,并在跨三个 AZs 保留四个数据副本时实现写入 quorum。 如果指定了较低的 writeConcern,Amazon DocumentDB 会将 writeConcern 升级到大多数。此外,会记录所有 Amazon DocumentDB 写入,并且无法禁用日志。
使用案例
在本节中,我们将演练事务的两个使用案例:多语句和多集合。
多语句事务
Amazon DocumentDB 事务是多语句,这意味着您可以编写一个跨多个语句以及显式提交或回滚的事务。您可以将 insert、update、update 和 findAndModify 操作分组为单个原子操作。
多语句交易的一种常见使用案例是借记积分交易。例如:您需要为 Debty 支付好友款项。因此,您需要从您的账户借记 (提取) 500 USD,并向您的好友账户缴纳 500 USD(存储库)。要执行该操作,您可以在单个交易中同时执行余额和积分操作以确保原子性。这样做可以防止出现从您的账户借记 500 USD 但不会利用您好友的账户。以下是此使用案例的形式:
// *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario***// Setup bank account for Alice and Bob. Each have $1000 in their accountvar databaseName = "bank";var collectionName = "account";var amountToTransfer = 500;var session = db.getMongo().startSession({causalConsistency: false});var bankDB = session.getDatabase(databaseName);var accountColl = bankDB[collectionName];accountColl.drop();accountColl.insert({name: "Alice", balance: 1000});accountColl.insert({name: "Bob", balance: 1000});session.startTransaction();// deduct $500 from Alice's accountvar aliceBalance = accountColl.find({"name": "Alice"}).next().balance;var newAliceBalance = aliceBalance - amountToTransfer;accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;// add $500 to Bob's accountvar bobBalance = accountColl.find({"name": "Bob"}).next().balance;var newBobBalance = bobBalance + amountToTransfer;accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;session.commitTransaction();accountColl.find();// *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario***// Setup bank account for Alice and Bob. Each have $1000 in their accountvar databaseName = "bank";var collectionName = "account";var amountToTransfer = 500;var session = db.getMongo().startSession({causalConsistency: false});var bankDB = session.getDatabase(databaseName);var accountColl = bankDB[collectionName];accountColl.drop();accountColl.insert({name: "Alice", balance: 1000});accountColl.insert({name: "Bob", balance: 1000});session.startTransaction();// deduct $500 from Alice's accountvar aliceBalance = accountColl.find({"name": "Alice"}).next().balance;var newAliceBalance = aliceBalance - amountToTransfer;accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;session.abortTransaction();
多集合事务
我们的事务也是多集合事务,这意味着它们可用于在单个事务内以及跨多个集合执行多个操作。这将提供一致的数据视图并维护数据的完整性。当您将命令提交为单个 <> 时,事务是“要么全有要么全无”执行—,因为前者要么全部成功,要么全部失败。
下面是使用同一个方案的多收集事务的示例,以及来自多语句事务示例的数据。
// *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario***// Setup bank account for Alice and Bob. Each have $1000 in their accountvar amountToTransfer = 500;var collectionName = "account";var session = db.getMongo().startSession({causalConsistency: false});var accountCollInBankA = session.getDatabase("bankA")[collectionName];var accountCollInBankB = session.getDatabase("bankB")[collectionName];accountCollInBankA.drop();accountCollInBankB.drop();accountCollInBankA.insert({name: "Alice", balance: 1000});accountCollInBankB.insert({name: "Bob", balance: 1000});session.startTransaction();// deduct $500 from Alice's accountvar aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;var newAliceBalance = aliceBalance - amountToTransfer;accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;// add $500 to Bob's accountvar bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;var newBobBalance = bobBalance + amountToTransfer;accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;session.commitTransaction();accountCollInBankA.find(); // Alice holds $500 in bankAaccountCollInBankB.find(); // Bob holds $1500 in bankB// *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario***// Setup bank account for Alice and Bob. Each have $1000 in their accountvar collectionName = "account";var amountToTransfer = 500;var session = db.getMongo().startSession({causalConsistency: false});var accountCollInBankA = session.getDatabase("bankA")[collectionName];var accountCollInBankB = session.getDatabase("bankB")[collectionName];accountCollInBankA.drop();accountCollInBankB.drop();accountCollInBankA.insert({name: "Alice", balance: 1000});accountCollInBankB.insert({name: "Bob", balance: 1000});session.startTransaction();// deduct $500 from Alice's accountvar aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;var newAliceBalance = aliceBalance - amountToTransfer;accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;// add $500 to Bob's accountvar bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;var newBobBalance = bobBalance + amountToTransfer;accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;session.abortTransaction();accountCollInBankA.find(); // Alice holds $1000 in bankAaccountCollInBankB.find(); // Bob holds $1000 in bankB
回调 API 的事务 API 示例
回调 API 仅适用于 4.2 版以上的驱动程序。
Javascript
以下代码演示如何将 Amazon DocumentDB 事务 API 与 Javascript 结合使用。
// *** Transfer $500 from Alice to Bob inside a transaction: Success ***// Setup bank account for Alice and Bob. Each have $1000 in their accountvar databaseName = "bank";var collectionName = "account";var amountToTransfer = 500;var session = db.getMongo().startSession({causalConsistency: false});var bankDB = session.getDatabase(databaseName);var accountColl = bankDB[collectionName];accountColl.drop();accountColl.insert({name: "Alice", balance: 1000});accountColl.insert({name: "Bob", balance: 1000});session.startTransaction();// deduct $500 from Alice's accountvar aliceBalance = accountColl.find({"name": "Alice"}).next().balance;assert(aliceBalance >= amountToTransfer);var newAliceBalance = aliceBalance - amountToTransfer;accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;assert.eq(newAliceBalance, findAliceBalance);// add $500 to Bob's accountvar bobBalance = accountColl.find({"name": "Bob"}).next().balance;var newBobBalance = bobBalance + amountToTransfer;accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;assert.eq(newBobBalance, findBobBalance);session.commitTransaction();accountColl.find();
Node.js
以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Node.js 结合使用。
// Node.js callback API:const bankDB = await mongoclient.db("bank");var accountColl = await bankDB.createCollection("account");var amountToTransfer = 500;const session = mongoclient.startSession({causalConsistency: false});await accountColl.drop();await accountColl.insertOne({name: "Alice", balance: 1000}, { session });await accountColl.insertOne({name: "Bob", balance: 1000}, { session });const transactionOptions = {readConcern: { level: 'snapshot' },writeConcern: { w: 'majority' }};// deduct $500 from Alice's accountvar aliceBalance = await accountColl.findOne({name: "Alice"}, {session});assert(aliceBalance.balance >= amountToTransfer);var newAliceBalance = aliceBalance - amountToTransfer;session.startTransaction(transactionOptions);await accountColl.updateOne({name: "Alice"}, {$set: {balance: newAliceBalance}}, {session });await session.commitTransaction();aliceBalance = await accountColl.findOne({name: "Alice"}, {session});assert(newAliceBalance == aliceBalance.balance);// add $500 to Bob's accountvar bobBalance = await accountColl.findOne({name: "Bob"}, {session});var newBobBalance = bobBalance.balance + amountToTransfer;session.startTransaction(transactionOptions);await accountColl.updateOne({name: "Bob"}, {$set: {balance: newBobBalance}}, {session });await session.commitTransaction();bobBalance = await accountColl.findOne({name: "Bob"}, {session});assert(newBobBalance == bobBalance.balance);
C#
以下代码演示了如何将 Amazon DocumentDB 事务 API 与 C# 结合使用。
// C# Callback APIvar dbName = "bank";var collName = "account";var amountToTransfer = 500;using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false})){var bankDB = client.GetDatabase(dbName);var accountColl = bankDB.GetCollection<BsonDocument>(collName);bankDB.DropCollection(collName);accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } });accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } });// start transactionvar transactionOptions = new TransactionOptions(readConcern: ReadConcern.Snapshot,writeConcern: WriteConcern.WMajority);var result = session.WithTransaction((sess, cancellationtoken) =>{// deduct $500 from Alice's accountvar aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");Debug.Assert(aliceBalance >= amountToTransfer);var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer;accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice"),Builders<BsonDocument>.Update.Set("balance", newAliceBalance));aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");Debug.Assert(aliceBalance == newAliceBalance);// add $500 from Bob's accountvar bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");var newBobBalance = bobBalance.AsInt32 + amountToTransfer;accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob"),Builders<BsonDocument>.Update.Set("balance", newBobBalance));bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");Debug.Assert(bobBalance == newBobBalance);return "Transaction committed";}, transactionOptions);// check values outside of transactionvar aliceNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");var bobNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");Debug.Assert(aliceNewBalance == 500);Debug.Assert(bobNewBalance == 1500);}
Ruby
以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Ruby 结合使用。
// Ruby Callback APIdbName = "bank"collName = "account"amountToTransfer = 500session = client.start_session(:causal_consistency=> false)bankDB = Mongo::Database.new(client, dbName)accountColl = bankDB[collName]accountColl.drop()accountColl.insert_one({"name"=>"Alice", "balance"=>1000})accountColl.insert_one({"name"=>"Bob", "balance"=>1000})# start transactionsession.with_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority}) do# deduct $500 from Alice's accountaliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']assert aliceBalance >= amountToTransfernewAliceBalance = aliceBalance - amountToTransferaccountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session)aliceBalance = accountColl.find({"name"=>>"Alice"}, :session=> session).first['balance']assert_equal(newAliceBalance, aliceBalance)# add $500 from Bob's accountbobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']newBobBalance = bobBalance + amountToTransferaccountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session)bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']assert_equal(newBobBalance, bobBalance)end# check results outside of transactionaliceBalance = accountColl.find({"name"=>"Alice"}).first['balance']bobBalance = accountColl.find({"name"=>"Bob"}).first['balance']assert_equal(aliceBalance, 500)assert_equal(bobBalance, 1500)session.end_session
Go
以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Go 结合使用。
// Go - Callback APItype Account struct {Name stringBalance int}ctx := context.TODO()dbName := "bank"collName := "account"amountToTransfer := 500session, err := client.StartSession(options.Session().SetCausalConsistency(false))assert.NilError(t, err)defer session.EndSession(ctx)bankDB := client.Database(dbName)accountColl := bankDB.Collection(collName)accountColl.Drop(ctx)_, err = accountColl.InsertOne(ctx, bson.M{"name" : "Alice", "balance":1000})_, err = accountColl.InsertOne(ctx, bson.M{"name" : "Bob", "balance":1000})transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()).SetWriteConcern(writeconcern.New(writeconcern.WMajority()))_, err = session.WithTransaction(ctx, func(sessionCtx mongo.SessionContext) (interface{}, error) {var result Account// deduct $500 from Alice's accounterr = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result)aliceBalance := result.BalancenewAliceBalance := aliceBalance - amountToTransfer_, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}})err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result)aliceBalance = result.Balanceassert.Equal(t, aliceBalance, newAliceBalance)// add $500 to Bob's accounterr = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result)bobBalance := result.BalancenewBobBalance := bobBalance + amountToTransfer_, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}})err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result)bobBalance = result.Balanceassert.Equal(t, bobBalance, newBobBalance)if err != nil {return nil, err}return "transaction committed", err}, transactionOptions)// check results outside of transactionvar result Accounterr = accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&result)aliceNewBalance := result.Balanceerr = accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result)bobNewBalance := result.Balanceassert.Equal(t, aliceNewBalance, 500)assert.Equal(t, bobNewBalance, 1500)// Go - Core APItype Account struct {Name stringBalance int}func transferMoneyWithRetry(sessionContext mongo.SessionContext, accountColl *mongo.Collection, t *testing.T) error {amountToTransfer := 500transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()).SetWriteConcern(writeconcern.New(writeconcern.WMajority()))if err := sessionContext.StartTransaction(transactionOptions); err != nil {panic(err)}var result Account// deduct $500 from Alice's accounterr := accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result)aliceBalance := result.BalancenewAliceBalance := aliceBalance - amountToTransfer_, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}})if err != nil {sessionContext.AbortTransaction(sessionContext)}err = accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result)aliceBalance = result.Balanceassert.Equal(t, aliceBalance, newAliceBalance)// add $500 to Bob's accounterr = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result)bobBalance := result.BalancenewBobBalance := bobBalance + amountToTransfer_, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}})if err != nil {sessionContext.AbortTransaction(sessionContext)}err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result)bobBalance = result.Balanceassert.Equal(t, bobBalance, newBobBalance)err = sessionContext.CommitTransaction(sessionContext)return err}func doTransactionWithRetry(t *testing.T) {ctx := context.TODO()dbName := "bank"collName := "account"bankDB := client.Database(dbName)accountColl := bankDB.Collection(collName)client.UseSessionWithOptions(ctx, options.Session().SetCausalConsistency(false), func(sessionContext mongo.SessionContext) error {accountColl.Drop(ctx)accountColl.InsertOne(sessionContext, bson.M{"name" : "Alice", "balance":1000})accountColl.InsertOne(sessionContext, bson.M{"name" : "Bob", "balance":1000})for {err := transferMoneyWithRetry(sessionContext, accountColl, t)if err == nil {println("transaction committed")return nil}if mongoErr := err.(mongo.CommandError); mongoErr.HasErrorLabel("TransientTransactionError") {continue}println("transaction failed")return err}})// check results outside of transactionvar result AccountaccountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&esult)aliceBalance := result.Balanceassert.Equal(t, aliceBalance, 500)accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result)bobBalance := result.Balanceassert.Equal(t, bobBalance, 1500)}
Java
以下代码演示如何将 Amazon DocumentDB 事务 API 与 Java 结合使用。
// Java (sync) - Callback APIMongoDatabase bankDB = mongoClient.getDatabase("bank");MongoCollection accountColl = bankDB.getCollection("account");accountColl.drop();int amountToTransfer = 500;// add sample dataaccountColl.insertOne(new Document("name", "Alice").append("balance", 1000));accountColl.insertOne(new Document("name", "Bob").append("balance", 1000));TransactionOptions txnOptions = TransactionOptions.builder().readConcern(ReadConcern.SNAPSHOT).writeConcern(WriteConcern.MAJORITY).build();ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) {clientSession.withTransaction(new TransactionBody<Void>() {@Overridepublic Void execute() {// deduct $500 from Alice's accountList<Document> documentList = new ArrayList<>();accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);int aliceBalance = (int) documentList.get(0).get("balance");int newAliceBalance = aliceBalance - amountToTransfer;accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance)));// check Alice's new balancedocumentList = new ArrayList<>();accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);int updatedBalance = (int) documentList.get(0).get("balance");Assert.assertEquals(updatedBalance, newAliceBalance);// add $500 to Bob's accountdocumentList = new ArrayList<>();accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);int bobBalance = (int) documentList.get(0).get("balance");int newBobBalance = bobBalance + amountToTransfer;accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance)));// check Bob's new balancedocumentList = new ArrayList<>();accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);updatedBalance = (int) documentList.get(0).get("balance");Assert.assertEquals(updatedBalance, newBobBalance);return null;}}, txnOptions);}
C
以下代码演示了如何将 Amazon DocumentDB 事务 API 与 C 结合使用。
// Sample Code for C with Callback#include <bson.h>#include <mongoc.h>#include <stdio.h>#include <string.h>#include <assert.h>typedef struct {int64_t balance;bson_t *account;bson_t *opts;mongoc_collection_t *collection;} ctx_t;bool callback_session (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error){bool r = true;ctx_t *data = (ctx_t *) ctx;bson_t local_reply;bson_t *selector = data->account;bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (data->balance), "}");mongoc_collection_update_one (data->collection, selector, update, data->opts, &local_reply, error);*reply = bson_copy (&local_reply);bson_destroy (&local_reply);bson_destroy (update);return r;}void test_callback_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){bson_t reply;bool r = true;const bson_t *doc;bson_iter_t iter;ctx_t alice_ctx;ctx_t bob_ctx;bson_error_t error;// find querybson_t *alice_query = bson_new ();BSON_APPEND_UTF8(alice_query, "name", "Alice");bson_t *bob_query = bson_new ();BSON_APPEND_UTF8(bob_query, "name", "Bob");// create session// set causal consistency to falsemongoc_session_opt_t *session_opts = mongoc_session_opts_new ();mongoc_session_opts_set_causal_consistency (session_opts, false);// start the sessionmongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error);// add session to optionsbson_t *opts = bson_new();mongoc_client_session_append (client_session, opts, &error);// deduct 500 from Alice// find account balance of Alicemongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);mongoc_cursor_next (cursor, &doc);bson_iter_init (&iter, doc);bson_iter_find (&iter, "balance");int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64;assert(alice_balance >= amount_to_transfer);int64_t new_alice_balance = alice_balance - amount_to_transfer;// set variables which will be used by callback functionalice_ctx.collection = collection;alice_ctx.opts = opts;alice_ctx.balance = new_alice_balance;alice_ctx.account = alice_query;// callbackr = mongoc_client_session_with_transaction (client_session, &callback_session, NULL, &alice_ctx, &reply, &error);assert(r);// find account balance of Alice after transactioncursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);mongoc_cursor_next (cursor, &doc);bson_iter_init (&iter, doc);bson_iter_find (&iter, "balance");alice_balance = (bson_iter_value (&iter))->value.v_int64;assert(alice_balance == new_alice_balance);assert(alice_balance == 500);// add 500 to bob's balance// find account balance of Bobcursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);mongoc_cursor_next (cursor, &doc);bson_iter_init (&iter, doc);bson_iter_find (&iter, "balance");int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64;int64_t new_bob_balance = bob_balance + amount_to_transfer;bob_ctx.collection = collection;bob_ctx.opts = opts;bob_ctx.balance = new_bob_balance;bob_ctx.account = bob_query;// set read & write concernmongoc_read_concern_t *read_concern = mongoc_read_concern_new ();mongoc_write_concern_t *write_concern = mongoc_write_concern_new ();mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new ();mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY);mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);mongoc_transaction_opts_set_write_concern (txn_opts, write_concern);mongoc_transaction_opts_set_read_concern (txn_opts, read_concern);// callbackr = mongoc_client_session_with_transaction (client_session, &callback_session, txn_opts, &bob_ctx, &reply, &error);assert(r);// find account balance of Bob after transactioncursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);mongoc_cursor_next (cursor, &doc);bson_iter_init (&iter, doc);bson_iter_find (&iter, "balance");bob_balance = (bson_iter_value (&iter))->value.v_int64;assert(bob_balance == new_bob_balance);assert(bob_balance == 1500);// cleanupbson_destroy(alice_query);bson_destroy(bob_query);mongoc_client_session_destroy(client_session);bson_destroy(opts);mongoc_transaction_opts_destroy(txn_opts);mongoc_read_concern_destroy(read_concern);mongoc_write_concern_destroy(write_concern);mongoc_cursor_destroy(cursor);bson_destroy(doc);}int main(int argc, char* argv[]) {mongoc_init ();mongoc_client_t* client = mongoc_client_new (<connection uri>);bson_error_t error;// connect to bank dbmongoc_database_t *database = mongoc_client_get_database (client, "bank");// access account collectionmongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account");// set amount to transferint64_t amount_to_transfer = 500;// delete the collection if already existingmongoc_collection_drop(collection, &error);// open Alice accountbson_t *alice_account = bson_new ();BSON_APPEND_UTF8(alice_account, "name", "Alice");BSON_APPEND_INT64(alice_account, "balance", 1000);// open Bob accountbson_t *bob_account = bson_new ();BSON_APPEND_UTF8(bob_account, "name", "Bob");BSON_APPEND_INT64(bob_account, "balance", 1000);bool r = true;r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error);if (!r) {printf("Error encountered:%s", error.message);}r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error);if (!r) {printf("Error encountered:%s", error.message);}test_callback_money_transfer(client, collection, amount_to_transfer);}
Python
以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Python 结合使用。
// Sample Python code with callback apiimport pymongodef callback(session, balance, query):collection.update_one(query, {'$set': {"balance": balance}}, session=session)client = pymongo.MongoClient(<connection uri>)rc_snapshot = pymongo.read_concern.ReadConcern('snapshot')wc_majority = pymongo.write_concern.WriteConcern('majority')# To start, drop and create an account collection and insert balances for both Alice and Bobcollection = client.get_database("bank").get_collection("account")collection.drop()collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000})collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000})amount_to_transfer = 500# deduct 500 from Alice's accountalice_balance = collection.find_one({"name": "Alice"}).get("balance")assert alice_balance >= amount_to_transfernew_alice_balance = alice_balance - amount_to_transferwith client.start_session({'causalConsistency':False}) as session:session.with_transaction(lambda s: callback(s, new_alice_balance, {"name": "Alice"}), read_concern=rc_snapshot, write_concern=wc_majority)updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance")assert updated_alice_balance == new_alice_balance# add 500 to Bob's accountbob_balance = collection.find_one({"name": "Bob"}).get("balance")assert bob_balance >= amount_to_transfernew_bob_balance = bob_balance + amount_to_transferwith client.start_session({'causalConsistency':False}) as session:session.with_transaction(lambda s: callback(s, new_bob_balance, {"name": "Bob"}), read_concern=rc_snapshot, write_concern=wc_majority)updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance")assert updated_bob_balance == new_bob_balanceSample Python code with Core apiimport pymongoclient = pymongo.MongoClient(<connection_string>)rc_snapshot = pymongo.read_concern.ReadConcern('snapshot')wc_majority = pymongo.write_concern.WriteConcern('majority')# To start, drop and create an account collection and insert balances for both Alice and Bobcollection = client.get_database("bank").get_collection("account")collection.drop()collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000})collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000})amount_to_transfer = 500# deduct 500 from Alice's accountalice_balance = collection.find_one({"name": "Alice"}).get("balance")assert alice_balance >= amount_to_transfernew_alice_balance = alice_balance - amount_to_transferwith client.start_session({'causalConsistency':False}) as session:session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority)collection.update_one({"name": "Alice"}, {'$set': {"balance": new_alice_balance}}, session=session)session.commit_transaction()updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance")assert updated_alice_balance == new_alice_balance# add 500 to Bob's accountbob_balance = collection.find_one({"name": "Bob"}).get("balance")assert bob_balance >= amount_to_transfernew_bob_balance = bob_balance + amount_to_transferwith client.start_session({'causalConsistency':False}) as session:session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority)collection.update_one({"name": "Bob"}, {'$set': {"balance": new_bob_balance}}, session=session)session.commit_transaction()updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance")assert updated_bob_balance == new_bob_balance
核心 API 的事务 API 示例
Javascript
以下代码演示如何将 Amazon DocumentDB 事务 API 与 Javascript 结合使用。
// *** Transfer $500 from Alice to Bob inside a transaction: Success ***// Setup bank account for Alice and Bob. Each have $1000 in their accountvar databaseName = "bank";var collectionName = "account";var amountToTransfer = 500;var session = db.getMongo().startSession({causalConsistency: false});var bankDB = session.getDatabase(databaseName);var accountColl = bankDB[collectionName];accountColl.drop();accountColl.insert({name: "Alice", balance: 1000});accountColl.insert({name: "Bob", balance: 1000});session.startTransaction();// deduct $500 from Alice's accountvar aliceBalance = accountColl.find({"name": "Alice"}).next().balance;assert(aliceBalance >= amountToTransfer);var newAliceBalance = aliceBalance - amountToTransfer;accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;assert.eq(newAliceBalance, findAliceBalance);// add $500 to Bob's accountvar bobBalance = accountColl.find({"name": "Bob"}).next().balance;var newBobBalance = bobBalance + amountToTransfer;accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;assert.eq(newBobBalance, findBobBalance);session.commitTransaction();accountColl.find();
C#
以下代码演示了如何将 Amazon DocumentDB 事务 API 与 C# 结合使用。
// C# Core APIpublic void TransferMoneyWithRetry(IMongoCollection<bSondocument> accountColl, IClientSessionHandle session){var amountToTransfer = 500;// start transactionvar transactionOptions = new TransactionOptions(readConcern: ReadConcern.Snapshot,writeConcern: WriteConcern.WMajority);session.StartTransaction(transactionOptions);try{// deduct $500 from Alice's accountvar aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");Debug.Assert(aliceBalance >= amountToTransfer);var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer;accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Alice"),Builders<bSondocument>.Update.Set("balance", newAliceBalance));aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");Debug.Assert(aliceBalance == newAliceBalance);// add $500 from Bob's accountvar bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");var newBobBalance = bobBalance.AsInt32 + amountToTransfer;accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Bob"),Builders<bSondocument>.Update.Set("balance", newBobBalance));bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");Debug.Assert(bobBalance == newBobBalance);}catch (Exception e){session.AbortTransaction();throw;}session.CommitTransaction();}}public void DoTransactionWithRetry(MongoClient client){var dbName = "bank";var collName = "account";using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false})){try{var bankDB = client.GetDatabase(dbName);var accountColl = bankDB.GetCollection<bSondocument>(collName);bankDB.DropCollection(collName);accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } });accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } });while(true) {try{TransferMoneyWithRetry(accountColl, session);break;}catch (MongoException e){if(e.HasErrorLabel("TransientTransactionError")){continue;}else{throw;}}}// check values outside of transactionvar aliceNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");var bobNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");Debug.Assert(aliceNewBalance == 500);Debug.Assert(bobNewBalance == 1500);}catch (Exception e){Console.WriteLine("Error running transaction: " + e.Message);}}}
Ruby
以下代码演示如何将 Amazon DocumentDB 事务 API 与 Ruby 结合使用。
# Ruby Core APIdef transfer_money_w_retry(session, accountColl)amountToTransfer = 500session.start_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority})# deduct $500 from Alice's accountaliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']assert aliceBalance >= amountToTransfernewAliceBalance = aliceBalance - amountToTransferaccountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session)aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']assert_equal(newAliceBalance, aliceBalance)# add $500 to Bob's accountbobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']newBobBalance = bobBalance + amountToTransferaccountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session)bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']assert_equal(newBobBalance, bobBalance)session.commit_transactionenddef do_txn_w_retry(client)dbName = "bank"collName = "account"session = client.start_session(:causal_consistency=> false)bankDB = Mongo::Database.new(client, dbName)accountColl = bankDB[collName]accountColl.drop()accountColl.insert_one({"name"=>"Alice", "balance"=>1000})accountColl.insert_one({"name"=>"Bob", "balance"=>1000})begintransferMoneyWithRetry(session, accountColl)puts "transaction committed"rescue Mongo::Error => eif e.label?('TransientTransactionError')retryelseputs "transaction failed"raiseendend# check results outside of transactionaliceBalance = accountColl.find({"name"=>"Alice"}).first['balance']bobBalance = accountColl.find({"name"=>"Bob"}).first['balance']assert_equal(aliceBalance, 500)assert_equal(bobBalance, 1500)end
Java
以下代码演示如何将 Amazon DocumentDB 事务 API 与 Java 结合使用。
// Java (sync) - Core APIpublic void transferMoneyWithRetry() {// connect to serverMongoClientURI mongoURI = new MongoClientURI(uri);MongoClient mongoClient = new MongoClient(mongoURI);MongoDatabase bankDB = mongoClient.getDatabase("bank");MongoCollection accountColl = bankDB.getCollection("account");accountColl.drop();// insert some sample dataaccountColl.insertOne(new Document("name", "Alice").append("balance", 1000));accountColl.insertOne(new Document("name", "Bob").append("balance", 1000));while (true) {try {doTransferMoneyWithRetry(accountColl, mongoClient);break;} catch (MongoException e) {if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {continue;} else {throw e;}}}}public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) {int amountToTransfer = 500;TransactionOptions txnOptions = TransactionOptions.builder().readConcern(ReadConcern.SNAPSHOT).writeConcern(WriteConcern.MAJORITY).build();ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) {clientSession.startTransaction(txnOptions);// deduct $500 from Alice's accountList<Document> documentList = new ArrayList<>();accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);int aliceBalance = (int) documentList.get(0).get("balance");Assert.assertTrue(aliceBalance >= amountToTransfer);int newAliceBalance = aliceBalance - amountToTransfer;accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance)));// check Alice's new balancedocumentList = new ArrayList<>();accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);int updatedBalance = (int) documentList.get(0).get("balance");Assert.assertEquals(updatedBalance, newAliceBalance);// add $500 to Bob's accountdocumentList = new ArrayList<>();accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);int bobBalance = (int) documentList.get(0).get("balance");int newBobBalance = bobBalance + amountToTransfer;accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance)));// check Bob's new balancedocumentList = new ArrayList<>();accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);updatedBalance = (int) documentList.get(0).get("balance");Assert.assertEquals(updatedBalance, newBobBalance);// commit transactionclientSession.commitTransaction();}}// Java (async) -- Core APIpublic void transferMoneyWithRetry() {// connect to the serverMongoClient mongoClient = MongoClients.create(uri);MongoDatabase bankDB = mongoClient.getDatabase("bank");MongoCollection accountColl = bankDB.getCollection("account");SubscriberLatchWrapper<Void> dropCallback = new SubscriberLatchWrapper<>();mongoClient.getDatabase("bank").drop().subscribe(dropCallback);dropCallback.await();// insert some sample dataSubscriberLatchWrapper<InsertOneResult> insertionCallback = new SubscriberLatchWrapper<>();accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)).subscribe(insertionCallback);insertionCallback.await();insertionCallback = new SubscriberLatchWrapper<>();accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)).subscribe(insertionCallback);;insertionCallback.await();while (true) {try {doTransferMoneyWithRetry(accountColl, mongoClient);break;} catch (MongoException e) {if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {continue;} else {throw e;}}}}public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) {int amountToTransfer = 500;// start the transactionTransactionOptions txnOptions = TransactionOptions.builder().readConcern(ReadConcern.SNAPSHOT).writeConcern(WriteConcern.MAJORITY).build();ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();SubscriberLatchWrapper<ClientSession> sessionCallback = new SubscriberLatchWrapper<>();mongoClient.startSession(sessionOptions).subscribe(sessionCallback);ClientSession session = sessionCallback.get().get(0);session.startTransaction(txnOptions);// deduct $500 from Alice's accountSubscriberLatchWrapper<Document> findCallback = new SubscriberLatchWrapper<>();accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback);Document documentFound = findCallback.get().get(0);int aliceBalance = (int) documentFound.get("balance");int newAliceBalance = aliceBalance - amountToTransfer;SubscriberLatchWrapper<UpdateResult> updateCallback = new SubscriberLatchWrapper<>();accountColl.updateOne(session, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))).subscribe(updateCallback);updateCallback.await();// check Alice's new balancefindCallback = new SubscriberLatchWrapper<>();accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback);documentFound = findCallback.get().get(0);int updatedBalance = (int) documentFound.get("balance");Assert.assertEquals(updatedBalance, newAliceBalance);// add $500 to Bob's accountfindCallback = new SubscriberLatchWrapper<>();accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback);documentFound = findCallback.get().get(0);int bobBalance = (int) documentFound.get("balance");int newBobBalance = bobBalance + amountToTransfer;updateCallback = new SubscriberLatchWrapper<>();accountColl.updateOne(session, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))).subscribe(updateCallback);updateCallback.await();// check Bob's new balancefindCallback = new SubscriberLatchWrapper<>();accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback);documentFound = findCallback.get().get(0);updatedBalance = (int) documentFound.get("balance");Assert.assertEquals(updatedBalance, newBobBalance);// commit the transactionSubscriberLatchWrapper<Void> transactionCallback = new SubscriberLatchWrapper<>();session.commitTransaction().subscribe(transactionCallback);transactionCallback.await();}public class SubscriberLatchWrapper<T> implements Subscriber<T> {/*** A Subscriber that stores the publishers results and provides a latch so can block on completion.** @param <T> The publishers result type*/private final List<T> received;private final List<RuntimeException> errors;private final CountDownLatch latch;private volatile Subscription subscription;private volatile boolean completed;/*** Construct an instance*/public SubscriberLatchWrapper() {this.received = new ArrayList<>();this.errors = new ArrayList<>();this.latch = new CountDownLatch(1);}@Overridepublic void onSubscribe(final Subscription s) {subscription = s;subscription.request(Integer.MAX_VALUE);}@Overridepublic void onNext(final T t) {received.add(t);}@Overridepublic void onError(final Throwable t) {if (t instanceof RuntimeException) {errors.add((RuntimeException) t);} else {errors.add(new RuntimeException("Unexpected exception", t));}onComplete();}@Overridepublic void onComplete() {completed = true;subscription.cancel();latch.countDown();}/*** Get received elements** @return the list of received elements*/public List<T> getReceived() {return received;}/*** Get received elements.** @return the list of receive elements*/public List<T> get() {return await().getReceived();}/*** Await completion or error** @return this*/public SubscriberLatchWrapper<T> await() {subscription.request(Integer.MAX_VALUE);try {if (!latch.await(300, TimeUnit.SECONDS)) {throw new MongoTimeoutException("Publisher onComplete timed out for 300 seconds");}} catch (InterruptedException e) {throw new MongoInterruptedException("Interrupted waiting for observeration", e);}if (!errors.isEmpty()) {throw errors.get(0);}return this;}public boolean getCompleted() {return this.completed;}public void close() {subscription.cancel();received.clear();}}
C
以下代码演示了如何将 Amazon DocumentDB 事务 API 与 C 结合使用。
// Sample C code with core sessionbool core_session(mongoc_client_session_t *client_session, mongoc_collection_t* collection, bson_t *selector, int64_t balance){bool r = true;bson_error_t error;bson_t *opts = bson_new();bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (balance), "}");// set read & write concernmongoc_read_concern_t *read_concern = mongoc_read_concern_new ();mongoc_write_concern_t *write_concern = mongoc_write_concern_new ();mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new ();mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY);mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);mongoc_transaction_opts_set_write_concern (txn_opts, write_concern);mongoc_transaction_opts_set_read_concern (txn_opts, read_concern);mongoc_client_session_start_transaction (client_session, txn_opts, &error);mongoc_client_session_append (client_session, opts, &error);r = mongoc_collection_update_one (collection, selector, update, opts, NULL, &error);mongoc_client_session_commit_transaction (client_session, NULL, &error);bson_destroy (opts);mongoc_transaction_opts_destroy(txn_opts);mongoc_read_concern_destroy(read_concern);mongoc_write_concern_destroy(write_concern);bson_destroy (update);return r;}void test_core_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){bson_t reply;bool r = true;const bson_t *doc;bson_iter_t iter;bson_error_t error;// find querybson_t *alice_query = bson_new ();BSON_APPEND_UTF8(alice_query, "name", "Alice");bson_t *bob_query = bson_new ();BSON_APPEND_UTF8(bob_query, "name", "Bob");// create session// set causal consistency to falsemongoc_session_opt_t *session_opts = mongoc_session_opts_new ();mongoc_session_opts_set_causal_consistency (session_opts, false);// start the sessionmongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error);// add session to optionsbson_t *opts = bson_new();mongoc_client_session_append (client_session, opts, &error);// deduct 500 from Alice// find account balance of Alicemongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);mongoc_cursor_next (cursor, &doc);bson_iter_init (&iter, doc);bson_iter_find (&iter, "balance");int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64;assert(alice_balance >= amount_to_transfer);int64_t new_alice_balance = alice_balance - amount_to_transfer;// corer = core_session (client_session, collection, alice_query, new_alice_balance);assert(r);// find account balance of Alice after transactioncursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);mongoc_cursor_next (cursor, &doc);bson_iter_init (&iter, doc);bson_iter_find (&iter, "balance");alice_balance = (bson_iter_value (&iter))->value.v_int64;assert(alice_balance == new_alice_balance);assert(alice_balance == 500);// add 500 to Bob's balance// find account balance of Bobcursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);mongoc_cursor_next (cursor, &doc);bson_iter_init (&iter, doc);bson_iter_find (&iter, "balance");int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64;int64_t new_bob_balance = bob_balance + amount_to_transfer;//corer = core_session (client_session, collection, bob_query, new_bob_balance);assert(r);// find account balance of Bob after transactioncursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);mongoc_cursor_next (cursor, &doc);bson_iter_init (&iter, doc);bson_iter_find (&iter, "balance");bob_balance = (bson_iter_value (&iter))->value.v_int64;assert(bob_balance == new_bob_balance);assert(bob_balance == 1500);// cleanupbson_destroy(alice_query);bson_destroy(bob_query);mongoc_client_session_destroy(client_session);bson_destroy(opts);mongoc_cursor_destroy(cursor);bson_destroy(doc);}int main(int argc, char* argv[]) {mongoc_init ();mongoc_client_t* client = mongoc_client_new (<connection uri>);bson_error_t error;// connect to bank dbmongoc_database_t *database = mongoc_client_get_database (client, "bank");// access account collectionmongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account");// set amount to transferint64_t amount_to_transfer = 500;// delete the collection if already existingmongoc_collection_drop(collection, &error);// open Alice accountbson_t *alice_account = bson_new ();BSON_APPEND_UTF8(alice_account, "name", "Alice");BSON_APPEND_INT64(alice_account, "balance", 1000);// open Bob accountbson_t *bob_account = bson_new ();BSON_APPEND_UTF8(bob_account, "name", "Bob");BSON_APPEND_INT64(bob_account, "balance", 1000);bool r = true;r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error);if (!r) {printf("Error encountered:%s", error.message);}r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error);if (!r) {printf("Error encountered:%s", error.message);}test_core_money_transfer(client, collection, amount_to_transfer);}
Scala
以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Scala 结合使用。
// Scala Core APIdef transferMoneyWithRetry(sessionObservable: SingleObservable[ClientSession] , database: MongoDatabase ): Unit = {val accountColl = database.getCollection("account")var amountToTransfer = 500var transactionObservable: Observable[ClientSession] = sessionObservable.map(clientSession => {clientSession.startTransaction()// deduct $500 from Alice's accountvar aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance")assert(aliceBalance >= amountToTransfer)var newAliceBalance = aliceBalance - amountToTransferaccountColl.updateOne(clientSession, Document("name" -> "Alice"), Document("$set" -> Document("balance" -> newAliceBalance))).await()aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance")assert(aliceBalance == newAliceBalance)// add $500 to Bob's accountvar bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance")var newBobBalance = bobBalance + amountToTransferaccountColl.updateOne(clientSession, Document("name" -> "Bob"), Document("$set" -> Document("balance" -> newBobBalance))).await()bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance")assert(bobBalance == newBobBalance)clientSession})transactionObservable.flatMap(clientSession => clientSession.commitTransaction()).await()}def doTransactionWithRetry(): Unit = {val client: MongoClient = MongoClientWrapper.getMongoClient()val database: MongoDatabase = client.getDatabase("bank")val accountColl = database.getCollection("account")accountColl.drop().await()val sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build()var sessionObservable: SingleObservable[ClientSession] = client.startSession(sessionOptions)accountColl.insertOne(Document("name" -> "Alice", "balance" -> 1000)).await()accountColl.insertOne(Document("name" -> "Bob", "balance" -> 1000)).await()var retry = truewhile (retry) {try {transferMoneyWithRetry(sessionObservable, database)println("transaction committed")retry = false}catch {case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {println("retrying transaction")}case other: Throwable => {println("transaction failed")retry = falsethrow other}}}// check results outside of transactionassert(accountColl.find(Document("name" -> "Alice")).results().head.getInteger("balance") == 500)assert(accountColl.find(Document("name" -> "Bob")).results().head.getInteger("balance") == 1500)accountColl.drop().await()}
支持的 命令
| Command | 支持 |
|---|---|
| 是 |
| 是 |
| 是 |
| 是 |
| 是 |
| 否 |
| 否 |
| 是 |
不支持的功能
| Methods | 阶段或命令 |
|---|---|
|
|
|
|
|
|
Sessions
MongoDB 会话是一个框架,用于跨分片支持可重试写入、因果一致性、事务和管理操作。创建会话时,客户端会生成一个逻辑会话标识符 (lsid),用于在向服务器发送命令时标记该会话中的所有操作。
Amazon DocumentDB 支持使用会话启用事务,但不支持因果一致性或可重试写入。
在 Amazon DocumentDB 中使用事务时,事务将从使用 session.startTransaction() API 的会话中发起,并且会话一次支持单个事务。同样,事务使用提交 (session.commitTransaction()) 或中止 (session.abortTransaction()) APIs 完成。
焦散一致性
原因一致性保证在单个客户端会话中,客户端将观察先写后读一致性、 Monatomical 读/写,并且写入操作将遵循读取并且这些保证适用于集群中的所有实例,而不仅仅是主实例。Amazon DocumentDB 不支持因果一致性,以下语句将导致错误。
var mySession = db.getMongo().startSession();var mySessionObject = mySession.getDatabase('test').getCollection('account');mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}});//Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }mySessionObject.find()//Error: error: {// "ok" : 0,// "code" : 303,// "errmsg" : "Feature not supported: 'causal consistency'",// "operationTime" : Timestamp(1603461817, 493214)//}mySession.endSession()
您可以在会话中禁用因果一致性。请注意,这样做将使您能够利用会话框架,但不会对读取提供因果一致性保证。使用 Amazon DocumentDB 时,从主集群进行的读取将具有先写后读一致性,从副本实例进行的读取将具有最终一致性。事务是使用会话的主要使用案例。
var mySession = db.getMongo().startSession({causalConsistency: false});var mySessionObject = mySession.getDatabase('test').getCollection('account');mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}});//Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }mySessionObject.find()//{ "_id" : 1, "name" : "Bob", "balance" : 100 }//{ "_id" : 2, "name" : "Alice", "balance" : 1700 }
可重试写入
可重试写入是一种功能,在该功能中,客户端尝试重试写入操作(一次、网络出现错误或客户端找不到主集群)。在 Amazon DocumentDB 中,不支持可重试写入,必须禁用。您可以使用连接字符串中的命令 (retryWrites=false) 将其禁用。以下是示例:
mongodb://chimera:<insertYourPassword>@docdb-2019-01-29-02-57-28.cluster-ccuszbx3pn5e.us-east-1.docdb.amazonaws.com:27017/?ssl=true&ssl_ca_certs=rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false
事务错误
使用事务时,在某些情况下, 可能会产生一个错误,指出事务编号与任何进行中的事务都不匹配。
错误可在至少两个不同的场景中生成:
- After the one-minute transaction timeout.
- After an instance restart (due to patching, crash recovery, etc.), it is possible to receive this error even in cases where the transaction successfully committed. During an instance restart, the database can’t tell the difference between a transaction that successfully completed versus a transaction that aborted. In other words, the transaction completion state is ambiguous.
处理此错误的最佳方式是使事务更新具有静态 - 例如,通过使用 $set 排列器而不是递增/递减操作。请参阅以下内容:
{ "ok" : 0,"operationTime" : Timestamp(1603938167, 1),"code" : 251,"errmsg" : "Given transaction number 1 does not match any in-progress transactions."}
