乐观事务和悲观事务

简单的讲,乐观事务模型就是直接提交,遇到冲突就回滚,悲观事务模型就是在真正提交事务前,先尝试对需要修改的资源上锁,只有在确保事务一定能够执行成功后,才开始提交。

对于乐观事务模型来说,比较适合冲突率不高的场景,因为直接提交大概率会成功,冲突是小概率事件,但是一旦遇到事务冲突,回滚的代价会比较大。

悲观事务的好处是对于冲突率高的场景,提前上锁的代价小于事后回滚的代价,而且还能以比较低的代价解决多个并发事务互相冲突导致谁也成功不了的场景。不过悲观事务在冲突率不高的场景并没有乐观事务处理高效。

从应用端实现的复杂度而言,悲观事务更直观,更容易实现。而乐观事务需要复杂的应用端重试机制来保证。

下面用 bookshop 数据库中的表实现一个购书的例子来演示乐观事务和悲观事务的区别以及优缺点。购书流程主要包括:

  1. 更新库存数量
  2. 创建订单
  3. 付款

这三个操作需要保证全部成功或者全部失败,并且在并发情况下要保证不超卖。

悲观事务

下面代码以悲观事务的方式,用两个线程模拟了两个用户并发买同一本书的过程,书店剩余 10 本,Bob 购买了 6 本,Alice 购买了 4 本。两个人几乎同一时间完成订单,最终,这本书的剩余库存为零。

  • Java
  • Golang
  • Python

当使用多个线程模拟多用户同时插入的情况时,需要使用一个线程安全的连接对象,这里使用 Java 当前较流行的连接池 HikariCP

Golang 的 sql.DB 是并发安全的,无需引入外部包。

封装一个用于适配 TiDB 事务的工具包 util,编写以下代码备用:

  1. package util
  2. import (
  3. "context"
  4. "database/sql"
  5. )
  6. type TiDBSqlTx struct {
  7. *sql.Tx
  8. conn *sql.Conn
  9. pessimistic bool
  10. }
  11. func TiDBSqlBegin(db *sql.DB, pessimistic bool) (*TiDBSqlTx, error) {
  12. ctx := context.Background()
  13. conn, err := db.Conn(ctx)
  14. if err != nil {
  15. return nil, err
  16. }
  17. if pessimistic {
  18. _, err = conn.ExecContext(ctx, "set @@tidb_txn_mode=?", "pessimistic")
  19. } else {
  20. _, err = conn.ExecContext(ctx, "set @@tidb_txn_mode=?", "optimistic")
  21. }
  22. if err != nil {
  23. return nil, err
  24. }
  25. tx, err := conn.BeginTx(ctx, nil)
  26. if err != nil {
  27. return nil, err
  28. }
  29. return &TiDBSqlTx{
  30. conn: conn,
  31. Tx: tx,
  32. pessimistic: pessimistic,
  33. }, nil
  34. }
  35. func (tx *TiDBSqlTx) Commit() error {
  36. defer tx.conn.Close()
  37. return tx.Tx.Commit()
  38. }
  39. func (tx *TiDBSqlTx) Rollback() error {
  40. defer tx.conn.Close()
  41. return tx.Tx.Rollback()
  42. }

使用 Python 的 mysqlclient Driver 开启多个连接对象进行交互,线程之间不共享连接,以保证其线程安全。

1. 编写悲观事务示例

  • Java
  • Golang
  • Python

配置文件

在 Java 中,如果你使用 Maven 作为包管理,在 pom.xml 中的 <dependencies> 节点中,加入以下依赖来引入 HikariCP,同时设定打包目标,及 JAR 包启动的主类,完整的 pom.xml 如下所示:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.pingcap</groupId>
  6. <artifactId>plain-java-txn</artifactId>
  7. <version>0.0.1</version>
  8. <name>plain-java-jdbc</name>
  9. <properties>
  10. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  11. <maven.compiler.source>17</maven.compiler.source>
  12. <maven.compiler.target>17</maven.compiler.target>
  13. </properties>
  14. <dependencies>
  15. <dependency>
  16. <groupId>junit</groupId>
  17. <artifactId>junit</artifactId>
  18. <version>4.13.2</version>
  19. <scope>test</scope>
  20. </dependency>
  21. <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
  22. <dependency>
  23. <groupId>mysql</groupId>
  24. <artifactId>mysql-connector-java</artifactId>
  25. <version>8.0.28</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>com.zaxxer</groupId>
  29. <artifactId>HikariCP</artifactId>
  30. <version>5.0.1</version>
  31. </dependency>
  32. </dependencies>
  33. <build>
  34. <plugins>
  35. <plugin>
  36. <groupId>org.apache.maven.plugins</groupId>
  37. <artifactId>maven-assembly-plugin</artifactId>
  38. <version>3.3.0</version>
  39. <configuration>
  40. <descriptorRefs>
  41. <descriptorRef>jar-with-dependencies</descriptorRef>
  42. </descriptorRefs>
  43. <archive>
  44. <manifest>
  45. <mainClass>com.pingcap.txn.TxnExample</mainClass>
  46. </manifest>
  47. </archive>
  48. </configuration>
  49. <executions>
  50. <execution>
  51. <id>make-assembly</id>
  52. <phase>package</phase>
  53. <goals>
  54. <goal>single</goal>
  55. </goals>
  56. </execution>
  57. </executions>
  58. </plugin>
  59. </plugins>
  60. </build>
  61. </project>

代码

随后编写代码:

  1. package com.pingcap.txn;
  2. import com.zaxxer.hikari.HikariDataSource;
  3. import java.math.BigDecimal;
  4. import java.sql.*;
  5. import java.util.Arrays;
  6. import java.util.concurrent.*;
  7. public class TxnExample {
  8. public static void main(String[] args) throws SQLException, InterruptedException {
  9. System.out.println(Arrays.toString(args));
  10. int aliceQuantity = 0;
  11. int bobQuantity = 0;
  12. for (String arg: args) {
  13. if (arg.startsWith("ALICE_NUM")) {
  14. aliceQuantity = Integer.parseInt(arg.replace("ALICE_NUM=", ""));
  15. }
  16. if (arg.startsWith("BOB_NUM")) {
  17. bobQuantity = Integer.parseInt(arg.replace("BOB_NUM=", ""));
  18. }
  19. }
  20. HikariDataSource ds = new HikariDataSource();
  21. ds.setJdbcUrl("jdbc:mysql://localhost:4000/bookshop?useServerPrepStmts=true&cachePrepStmts=true");
  22. ds.setUsername("root");
  23. ds.setPassword("");
  24. // prepare data
  25. Connection connection = ds.getConnection();
  26. createBook(connection, 1L, "Designing Data-Intensive Application", "Science & Technology",
  27. Timestamp.valueOf("2018-09-01 00:00:00"), new BigDecimal(100), 10);
  28. createUser(connection, 1L, "Bob", new BigDecimal(10000));
  29. createUser(connection, 2L, "Alice", new BigDecimal(10000));
  30. CountDownLatch countDownLatch = new CountDownLatch(2);
  31. ExecutorService threadPool = Executors.newFixedThreadPool(2);
  32. final int finalBobQuantity = bobQuantity;
  33. threadPool.execute(() -> {
  34. buy(ds, 1, 1000L, 1L, 1L, finalBobQuantity);
  35. countDownLatch.countDown();
  36. });
  37. final int finalAliceQuantity = aliceQuantity;
  38. threadPool.execute(() -> {
  39. buy(ds, 2, 1001L, 1L, 2L, finalAliceQuantity);
  40. countDownLatch.countDown();
  41. });
  42. countDownLatch.await(5, TimeUnit.SECONDS);
  43. }
  44. public static void createUser(Connection connection, Long id, String nickname, BigDecimal balance) throws SQLException {
  45. PreparedStatement insert = connection.prepareStatement(
  46. "INSERT INTO `users` (`id`, `nickname`, `balance`) VALUES (?, ?, ?)");
  47. insert.setLong(1, id);
  48. insert.setString(2, nickname);
  49. insert.setBigDecimal(3, balance);
  50. insert.executeUpdate();
  51. }
  52. public static void createBook(Connection connection, Long id, String title, String type, Timestamp publishedAt, BigDecimal price, Integer stock) throws SQLException {
  53. PreparedStatement insert = connection.prepareStatement(
  54. "INSERT INTO `books` (`id`, `title`, `type`, `published_at`, `price`, `stock`) values (?, ?, ?, ?, ?, ?)");
  55. insert.setLong(1, id);
  56. insert.setString(2, title);
  57. insert.setString(3, type);
  58. insert.setTimestamp(4, publishedAt);
  59. insert.setBigDecimal(5, price);
  60. insert.setInt(6, stock);
  61. insert.executeUpdate();
  62. }
  63. public static void buy (HikariDataSource ds, Integer threadID,
  64. Long orderID, Long bookID, Long userID, Integer quantity) {
  65. String txnComment = "/* txn " + threadID + " */ ";
  66. try (Connection connection = ds.getConnection()) {
  67. try {
  68. connection.setAutoCommit(false);
  69. connection.createStatement().executeUpdate(txnComment + "begin pessimistic");
  70. // waiting for other thread ran the 'begin pessimistic' statement
  71. TimeUnit.SECONDS.sleep(1);
  72. BigDecimal price = null;
  73. // read price of book
  74. PreparedStatement selectBook = connection.prepareStatement(txnComment + "select price from books where id = ? for update");
  75. selectBook.setLong(1, bookID);
  76. ResultSet res = selectBook.executeQuery();
  77. if (!res.next()) {
  78. throw new RuntimeException("book not exist");
  79. } else {
  80. price = res.getBigDecimal("price");
  81. }
  82. // update book
  83. String updateBookSQL = "update `books` set stock = stock - ? where id = ? and stock - ? >= 0";
  84. PreparedStatement updateBook = connection.prepareStatement(txnComment + updateBookSQL);
  85. updateBook.setInt(1, quantity);
  86. updateBook.setLong(2, bookID);
  87. updateBook.setInt(3, quantity);
  88. int affectedRows = updateBook.executeUpdate();
  89. if (affectedRows == 0) {
  90. // stock not enough, rollback
  91. connection.createStatement().executeUpdate(txnComment + "rollback");
  92. return;
  93. }
  94. // insert order
  95. String insertOrderSQL = "insert into `orders` (`id`, `book_id`, `user_id`, `quality`) values (?, ?, ?, ?)";
  96. PreparedStatement insertOrder = connection.prepareStatement(txnComment + insertOrderSQL);
  97. insertOrder.setLong(1, orderID);
  98. insertOrder.setLong(2, bookID);
  99. insertOrder.setLong(3, userID);
  100. insertOrder.setInt(4, quantity);
  101. insertOrder.executeUpdate();
  102. // update user
  103. String updateUserSQL = "update `users` set `balance` = `balance` - ? where id = ?";
  104. PreparedStatement updateUser = connection.prepareStatement(txnComment + updateUserSQL);
  105. updateUser.setBigDecimal(1, price.multiply(new BigDecimal(quantity)));
  106. updateUser.setLong(2, userID);
  107. updateUser.executeUpdate();
  108. connection.createStatement().executeUpdate(txnComment + "commit");
  109. } catch (Exception e) {
  110. connection.createStatement().executeUpdate(txnComment + "rollback");
  111. e.printStackTrace();
  112. }
  113. } catch (SQLException e) {
  114. e.printStackTrace();
  115. }
  116. }
  117. }

首先编写一个封装了所需的数据库操作的 helper.go 文件:

  1. package main
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "time"
  7. "github.com/go-sql-driver/mysql"
  8. "github.com/pingcap-inc/tidb-example-golang/util"
  9. "github.com/shopspring/decimal"
  10. )
  11. type TxnFunc func(txn *util.TiDBSqlTx) error
  12. const (
  13. ErrWriteConflict = 9007 // Transactions in TiKV encounter write conflicts.
  14. ErrInfoSchemaChanged = 8028 // table schema changes
  15. ErrForUpdateCantRetry = 8002 // "SELECT FOR UPDATE" commit conflict
  16. ErrTxnRetryable = 8022 // The transaction commit fails and has been rolled back
  17. )
  18. const retryTimes = 5
  19. var retryErrorCodeSet = map[uint16]interface{}{
  20. ErrWriteConflict: nil,
  21. ErrInfoSchemaChanged: nil,
  22. ErrForUpdateCantRetry: nil,
  23. ErrTxnRetryable: nil,
  24. }
  25. func runTxn(db *sql.DB, optimistic bool, optimisticRetryTimes int, txnFunc TxnFunc) {
  26. txn, err := util.TiDBSqlBegin(db, !optimistic)
  27. if err != nil {
  28. panic(err)
  29. }
  30. err = txnFunc(txn)
  31. if err != nil {
  32. txn.Rollback()
  33. if mysqlErr, ok := err.(*mysql.MySQLError); ok && optimistic && optimisticRetryTimes != 0 {
  34. if _, retryableError := retryErrorCodeSet[mysqlErr.Number]; retryableError {
  35. fmt.Printf("[runTxn] got a retryable error, rest time: %d\n", optimisticRetryTimes-1)
  36. runTxn(db, optimistic, optimisticRetryTimes-1, txnFunc)
  37. return
  38. }
  39. }
  40. fmt.Printf("[runTxn] got an error, rollback: %+v\n", err)
  41. } else {
  42. err = txn.Commit()
  43. if mysqlErr, ok := err.(*mysql.MySQLError); ok && optimistic && optimisticRetryTimes != 0 {
  44. if _, retryableError := retryErrorCodeSet[mysqlErr.Number]; retryableError {
  45. fmt.Printf("[runTxn] got a retryable error, rest time: %d\n", optimisticRetryTimes-1)
  46. runTxn(db, optimistic, optimisticRetryTimes-1, txnFunc)
  47. return
  48. }
  49. }
  50. if err == nil {
  51. fmt.Println("[runTxn] commit success")
  52. }
  53. }
  54. }
  55. func prepareData(db *sql.DB, optimistic bool) {
  56. runTxn(db, optimistic, retryTimes, func(txn *util.TiDBSqlTx) error {
  57. publishedAt, err := time.Parse("2006-01-02 15:04:05", "2018-09-01 00:00:00")
  58. if err != nil {
  59. return err
  60. }
  61. if err = createBook(txn, 1, "Designing Data-Intensive Application",
  62. "Science & Technology", publishedAt, decimal.NewFromInt(100), 10); err != nil {
  63. return err
  64. }
  65. if err = createUser(txn, 1, "Bob", decimal.NewFromInt(10000)); err != nil {
  66. return err
  67. }
  68. if err = createUser(txn, 2, "Alice", decimal.NewFromInt(10000)); err != nil {
  69. return err
  70. }
  71. return nil
  72. })
  73. }
  74. func buyPessimistic(db *sql.DB, goroutineID, orderID, bookID, userID, amount int) {
  75. txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID)
  76. if goroutineID != 1 {
  77. txnComment = "\t" + txnComment
  78. }
  79. fmt.Printf("\nuser %d try to buy %d books(id: %d)\n", userID, amount, bookID)
  80. runTxn(db, false, retryTimes, func(txn *util.TiDBSqlTx) error {
  81. time.Sleep(time.Second)
  82. // read the price of book
  83. selectBookForUpdate := "select `price` from books where id = ? for update"
  84. bookRows, err := txn.Query(selectBookForUpdate, bookID)
  85. if err != nil {
  86. return err
  87. }
  88. fmt.Println(txnComment + selectBookForUpdate + " successful")
  89. defer bookRows.Close()
  90. price := decimal.NewFromInt(0)
  91. if bookRows.Next() {
  92. err = bookRows.Scan(&price)
  93. if err != nil {
  94. return err
  95. }
  96. } else {
  97. return fmt.Errorf("book ID not exist")
  98. }
  99. bookRows.Close()
  100. // update book
  101. updateStock := "update `books` set stock = stock - ? where id = ? and stock - ? >= 0"
  102. result, err := txn.Exec(updateStock, amount, bookID, amount)
  103. if err != nil {
  104. return err
  105. }
  106. fmt.Println(txnComment + updateStock + " successful")
  107. affected, err := result.RowsAffected()
  108. if err != nil {
  109. return err
  110. }
  111. if affected == 0 {
  112. return fmt.Errorf("stock not enough, rollback")
  113. }
  114. // insert order
  115. insertOrder := "insert into `orders` (`id`, `book_id`, `user_id`, `quality`) values (?, ?, ?, ?)"
  116. if _, err := txn.Exec(insertOrder,
  117. orderID, bookID, userID, amount); err != nil {
  118. return err
  119. }
  120. fmt.Println(txnComment + insertOrder + " successful")
  121. // update user
  122. updateUser := "update `users` set `balance` = `balance` - ? where id = ?"
  123. if _, err := txn.Exec(updateUser,
  124. price.Mul(decimal.NewFromInt(int64(amount))), userID); err != nil {
  125. return err
  126. }
  127. fmt.Println(txnComment + updateUser + " successful")
  128. return nil
  129. })
  130. }
  131. func buyOptimistic(db *sql.DB, goroutineID, orderID, bookID, userID, amount int) {
  132. txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID)
  133. if goroutineID != 1 {
  134. txnComment = "\t" + txnComment
  135. }
  136. fmt.Printf("\nuser %d try to buy %d books(id: %d)\n", userID, amount, bookID)
  137. runTxn(db, true, retryTimes, func(txn *util.TiDBSqlTx) error {
  138. time.Sleep(time.Second)
  139. // read the price and stock of book
  140. selectBookForUpdate := "select `price`, `stock` from books where id = ? for update"
  141. bookRows, err := txn.Query(selectBookForUpdate, bookID)
  142. if err != nil {
  143. return err
  144. }
  145. fmt.Println(txnComment + selectBookForUpdate + " successful")
  146. defer bookRows.Close()
  147. price, stock := decimal.NewFromInt(0), 0
  148. if bookRows.Next() {
  149. err = bookRows.Scan(&price, &stock)
  150. if err != nil {
  151. return err
  152. }
  153. } else {
  154. return fmt.Errorf("book ID not exist")
  155. }
  156. bookRows.Close()
  157. if stock < amount {
  158. return fmt.Errorf("book not enough")
  159. }
  160. // update book
  161. updateStock := "update `books` set stock = stock - ? where id = ? and stock - ? >= 0"
  162. result, err := txn.Exec(updateStock, amount, bookID, amount)
  163. if err != nil {
  164. return err
  165. }
  166. fmt.Println(txnComment + updateStock + " successful")
  167. affected, err := result.RowsAffected()
  168. if err != nil {
  169. return err
  170. }
  171. if affected == 0 {
  172. return fmt.Errorf("stock not enough, rollback")
  173. }
  174. // insert order
  175. insertOrder := "insert into `orders` (`id`, `book_id`, `user_id`, `quality`) values (?, ?, ?, ?)"
  176. if _, err := txn.Exec(insertOrder,
  177. orderID, bookID, userID, amount); err != nil {
  178. return err
  179. }
  180. fmt.Println(txnComment + insertOrder + " successful")
  181. // update user
  182. updateUser := "update `users` set `balance` = `balance` - ? where id = ?"
  183. if _, err := txn.Exec(updateUser,
  184. price.Mul(decimal.NewFromInt(int64(amount))), userID); err != nil {
  185. return err
  186. }
  187. fmt.Println(txnComment + updateUser + " successful")
  188. return nil
  189. })
  190. }
  191. func createBook(txn *util.TiDBSqlTx, id int, title, bookType string,
  192. publishedAt time.Time, price decimal.Decimal, stock int) error {
  193. _, err := txn.ExecContext(context.Background(),
  194. "INSERT INTO `books` (`id`, `title`, `type`, `published_at`, `price`, `stock`) values (?, ?, ?, ?, ?, ?)",
  195. id, title, bookType, publishedAt, price, stock)
  196. return err
  197. }
  198. func createUser(txn *util.TiDBSqlTx, id int, nickname string, balance decimal.Decimal) error {
  199. _, err := txn.ExecContext(context.Background(),
  200. "INSERT INTO `users` (`id`, `nickname`, `balance`) VALUES (?, ?, ?)",
  201. id, nickname, balance)
  202. return err
  203. }

再编写一个包含 main 函数的 txn.go 来调用 helper.go,同时处理传入的命令行参数:

  1. package main
  2. import (
  3. "database/sql"
  4. "flag"
  5. "fmt"
  6. "sync"
  7. )
  8. func main() {
  9. optimistic, alice, bob := parseParams()
  10. openDB("mysql", "root:@tcp(127.0.0.1:4000)/bookshop?charset=utf8mb4", func(db *sql.DB) {
  11. prepareData(db, optimistic)
  12. buy(db, optimistic, alice, bob)
  13. })
  14. }
  15. func buy(db *sql.DB, optimistic bool, alice, bob int) {
  16. buyFunc := buyOptimistic
  17. if !optimistic {
  18. buyFunc = buyPessimistic
  19. }
  20. wg := sync.WaitGroup{}
  21. wg.Add(1)
  22. go func() {
  23. defer wg.Done()
  24. buyFunc(db, 1, 1000, 1, 1, bob)
  25. }()
  26. wg.Add(1)
  27. go func() {
  28. defer wg.Done()
  29. buyFunc(db, 2, 1001, 1, 2, alice)
  30. }()
  31. wg.Wait()
  32. }
  33. func openDB(driverName, dataSourceName string, runnable func(db *sql.DB)) {
  34. db, err := sql.Open(driverName, dataSourceName)
  35. if err != nil {
  36. panic(err)
  37. }
  38. defer db.Close()
  39. runnable(db)
  40. }
  41. func parseParams() (optimistic bool, alice, bob int) {
  42. flag.BoolVar(&optimistic, "o", false, "transaction is optimistic")
  43. flag.IntVar(&alice, "a", 4, "Alice bought num")
  44. flag.IntVar(&bob, "b", 6, "Bob bought num")
  45. flag.Parse()
  46. fmt.Println(optimistic, alice, bob)
  47. return optimistic, alice, bob
  48. }

Golang 的例子中,已经包含乐观事务。

  1. import time
  2. import MySQLdb
  3. import os
  4. import datetime
  5. from threading import Thread
  6. REPEATABLE_ERROR_CODE_SET = {
  7. 9007, # Transactions in TiKV encounter write conflicts.
  8. 8028, # table schema changes
  9. 8002, # "SELECT FOR UPDATE" commit conflict
  10. 8022 # The transaction commit fails and has been rolled back
  11. }
  12. def create_connection():
  13. return MySQLdb.connect(
  14. host="127.0.0.1",
  15. port=4000,
  16. user="root",
  17. password="",
  18. database="bookshop",
  19. autocommit=False
  20. )
  21. def prepare_data() -> None:
  22. connection = create_connection()
  23. with connection:
  24. with connection.cursor() as cursor:
  25. cursor.execute("INSERT INTO `books` (`id`, `title`, `type`, `published_at`, `price`, `stock`) "
  26. "values (%s, %s, %s, %s, %s, %s)",
  27. (1, "Designing Data-Intensive Application", "Science & Technology",
  28. datetime.datetime(2018, 9, 1), 100, 10))
  29. cursor.executemany("INSERT INTO `users` (`id`, `nickname`, `balance`) VALUES (%s, %s, %s)",
  30. [(1, "Bob", 10000), (2, "ALICE", 10000)])
  31. connection.commit()
  32. def buy_optimistic(thread_id: int, order_id: int, book_id: int, user_id: int, amount: int,
  33. optimistic_retry_times: int = 5) -> None:
  34. connection = create_connection()
  35. txn_log_header = f"/* txn {thread_id} */"
  36. if thread_id != 1:
  37. txn_log_header = "\t" + txn_log_header
  38. with connection:
  39. with connection.cursor() as cursor:
  40. cursor.execute("BEGIN OPTIMISTIC")
  41. print(f'{txn_log_header} BEGIN OPTIMISTIC')
  42. time.sleep(1)
  43. try:
  44. # read the price of book
  45. select_book_for_update = "SELECT `price`, `stock` FROM books WHERE id = %s FOR UPDATE"
  46. cursor.execute(select_book_for_update, (book_id,))
  47. book = cursor.fetchone()
  48. if book is None:
  49. raise Exception("book_id not exist")
  50. price, stock = book
  51. print(f'{txn_log_header} {select_book_for_update} successful')
  52. if stock < amount:
  53. raise Exception("book not enough, rollback")
  54. # update book
  55. update_stock = "update `books` set stock = stock - %s where id = %s and stock - %s >= 0"
  56. rows_affected = cursor.execute(update_stock, (amount, book_id, amount))
  57. print(f'{txn_log_header} {update_stock} successful')
  58. if rows_affected == 0:
  59. raise Exception("stock not enough, rollback")
  60. # insert order
  61. insert_order = "insert into `orders` (`id`, `book_id`, `user_id`, `quality`) values (%s, %s, %s, %s)"
  62. cursor.execute(insert_order, (order_id, book_id, user_id, amount))
  63. print(f'{txn_log_header} {insert_order} successful')
  64. # update user
  65. update_user = "update `users` set `balance` = `balance` - %s where id = %s"
  66. cursor.execute(update_user, (amount * price, user_id))
  67. print(f'{txn_log_header} {update_user} successful')
  68. except Exception as err:
  69. connection.rollback()
  70. print(f'something went wrong: {err}')
  71. else:
  72. # important here! you need deal the Exception from the TiDB
  73. try:
  74. connection.commit()
  75. except MySQLdb.MySQLError as db_err:
  76. code, desc = db_err.args
  77. if code in REPEATABLE_ERROR_CODE_SET and optimistic_retry_times > 0:
  78. print(f'retry, rest {optimistic_retry_times - 1} times, for {code} {desc}')
  79. buy_optimistic(thread_id, order_id, book_id, user_id, amount, optimistic_retry_times - 1)
  80. def buy_pessimistic(thread_id: int, order_id: int, book_id: int, user_id: int, amount: int) -> None:
  81. connection = create_connection()
  82. txn_log_header = f"/* txn {thread_id} */"
  83. if thread_id != 1:
  84. txn_log_header = "\t" + txn_log_header
  85. with connection:
  86. with connection.cursor() as cursor:
  87. cursor.execute("BEGIN PESSIMISTIC")
  88. print(f'{txn_log_header} BEGIN PESSIMISTIC')
  89. time.sleep(1)
  90. try:
  91. # read the price of book
  92. select_book_for_update = "SELECT `price` FROM books WHERE id = %s FOR UPDATE"
  93. cursor.execute(select_book_for_update, (book_id,))
  94. book = cursor.fetchone()
  95. if book is None:
  96. raise Exception("book_id not exist")
  97. price = book[0]
  98. print(f'{txn_log_header} {select_book_for_update} successful')
  99. # update book
  100. update_stock = "update `books` set stock = stock - %s where id = %s and stock - %s >= 0"
  101. rows_affected = cursor.execute(update_stock, (amount, book_id, amount))
  102. print(f'{txn_log_header} {update_stock} successful')
  103. if rows_affected == 0:
  104. raise Exception("stock not enough, rollback")
  105. # insert order
  106. insert_order = "insert into `orders` (`id`, `book_id`, `user_id`, `quality`) values (%s, %s, %s, %s)"
  107. cursor.execute(insert_order, (order_id, book_id, user_id, amount))
  108. print(f'{txn_log_header} {insert_order} successful')
  109. # update user
  110. update_user = "update `users` set `balance` = `balance` - %s where id = %s"
  111. cursor.execute(update_user, (amount * price, user_id))
  112. print(f'{txn_log_header} {update_user} successful')
  113. except Exception as err:
  114. connection.rollback()
  115. print(f'something went wrong: {err}')
  116. else:
  117. connection.commit()
  118. optimistic = os.environ.get('OPTIMISTIC')
  119. alice = os.environ.get('ALICE')
  120. bob = os.environ.get('BOB')
  121. if not (optimistic and alice and bob):
  122. raise Exception("please use \"OPTIMISTIC=<is_optimistic> ALICE=<alice_num> "
  123. "BOB=<bob_num> python3 txn_example.py\" to start this script")
  124. prepare_data()
  125. if bool(optimistic) is True:
  126. buy_func = buy_optimistic
  127. else:
  128. buy_func = buy_pessimistic
  129. bob_thread = Thread(target=buy_func, kwargs={
  130. "thread_id": 1, "order_id": 1000, "book_id": 1, "user_id": 1, "amount": int(bob)})
  131. alice_thread = Thread(target=buy_func, kwargs={
  132. "thread_id": 2, "order_id": 1001, "book_id": 1, "user_id": 2, "amount": int(alice)})
  133. bob_thread.start()
  134. alice_thread.start()
  135. bob_thread.join(timeout=10)
  136. alice_thread.join(timeout=10)

Python 的例子中,已经包含乐观事务。

2. 运行不涉及超卖的例子

运行示例程序:

  • Java
  • Golang
  • Python

在 Java 中运行示例程序:

  1. mvn clean package
  2. java -jar target/plain-java-txn-0.0.1-jar-with-dependencies.jar ALICE_NUM=4 BOB_NUM=6

在 Golang 中运行示例程序:

  1. go build -o bin/txn
  2. ./bin/txn -a 4 -b 6

在 Python 中运行示例程序:

  1. OPTIMISTIC=False ALICE=4 BOB=6 python3 txn_example.py

SQL 日志:

  1. /* txn 1 */ BEGIN PESSIMISTIC
  2. /* txn 2 */ BEGIN PESSIMISTIC
  3. /* txn 2 */ SELECT * FROM `books` WHERE `id` = 1 FOR UPDATE
  4. /* txn 2 */ UPDATE `books` SET `stock` = `stock` - 4 WHERE `id` = 1 AND `stock` - 4 >= 0
  5. /* txn 2 */ INSERT INTO `orders` (`id`, `book_id`, `user_id`, `quality`) VALUES (1001, 1, 1, 4)
  6. /* txn 2 */ UPDATE `users` SET `balance` = `balance` - 400.0 WHERE `id` = 2
  7. /* txn 2 */ COMMIT
  8. /* txn 1 */ SELECT * FROM `books` WHERE `id` = 1 FOR UPDATE
  9. /* txn 1 */ UPDATE `books` SET `stock` = `stock` - 6 WHERE `id` = 1 AND `stock` - 6 >= 0
  10. /* txn 1 */ INSERT INTO `orders` (`id`, `book_id`, `user_id`, `quality`) VALUES (1000, 1, 1, 6)
  11. /* txn 1 */ UPDATE `users` SET `balance` = `balance` - 600.0 WHERE `id` = 1
  12. /* txn 1 */ COMMIT

最后,检验一下订单创建、用户余额扣减、图书库存扣减情况,都符合预期。

  1. mysql> SELECT * FROM `books`;
  2. +----+--------------------------------------+----------------------+---------------------+-------+--------+
  3. | id | title | type | published_at | stock | price |
  4. +----+--------------------------------------+----------------------+---------------------+-------+--------+
  5. | 1 | Designing Data-Intensive Application | Science & Technology | 2018-09-01 00:00:00 | 0 | 100.00 |
  6. +----+--------------------------------------+----------------------+---------------------+-------+--------+
  7. 1 row in set (0.00 sec)
  8. mysql> SELECT * FROM orders;
  9. +------+---------+---------+---------+---------------------+
  10. | id | book_id | user_id | quality | ordered_at |
  11. +------+---------+---------+---------+---------------------+
  12. | 1000 | 1 | 1 | 6 | 2022-04-19 10:58:12 |
  13. | 1001 | 1 | 1 | 4 | 2022-04-19 10:58:11 |
  14. +------+---------+---------+---------+---------------------+
  15. 2 rows in set (0.01 sec)
  16. mysql> SELECT * FROM users;
  17. +----+---------+----------+
  18. | id | balance | nickname |
  19. +----+---------+----------+
  20. | 1 | 9400.00 | Bob |
  21. | 2 | 9600.00 | Alice |
  22. +----+---------+----------+
  23. 2 rows in set (0.00 sec)

3. 运行防止超卖的例子

可以再把难度加大,如果图书的库存剩余 10 本,Bob 购买 7 本,Alice 购买 4 本,两人几乎同时下单,结果会是怎样?继续复用上个例子里的代码来解决这个需求,只不过把 Bob 购买数量从 6 改成 7:

运行示例程序:

  • Java
  • Golang
  • Python

在 Java 中运行示例程序:

  1. mvn clean package
  2. java -jar target/plain-java-txn-0.0.1-jar-with-dependencies.jar ALICE_NUM=4 BOB_NUM=7

在 Golang 中运行示例程序:

  1. go build -o bin/txn
  2. ./bin/txn -a 4 -b 7

在 Python 中运行示例程序:

  1. OPTIMISTIC=False ALICE=4 BOB=7 python3 txn_example.py
  1. /* txn 1 */ BEGIN PESSIMISTIC
  2. /* txn 2 */ BEGIN PESSIMISTIC
  3. /* txn 2 */ SELECT * FROM `books` WHERE `id` = 1 FOR UPDATE
  4. /* txn 2 */ UPDATE `books` SET `stock` = `stock` - 4 WHERE `id` = 1 AND `stock` - 4 >= 0
  5. /* txn 2 */ INSERT INTO `orders` (`id`, `book_id`, `user_id`, `quality`) values (1001, 1, 1, 4)
  6. /* txn 2 */ UPDATE `users` SET `balance` = `balance` - 400.0 WHERE `id` = 2
  7. /* txn 2 */ COMMIT
  8. /* txn 1 */ SELECT * FROM `books` WHERE `id` = 1 FOR UPDATE
  9. /* txn 1 */ UPDATE `books` SET `stock` = `stock` - 7 WHERE `id` = 1 AND `stock` - 7 >= 0
  10. /* txn 1 */ ROLLBACK

由于 txn 2 抢先获得锁资源,更新了 stock,txn 1 里面 affected_rows 返回值为 0,进入了 rollback 流程。

再检验一下订单创建、用户余额扣减、图书库存扣减情况。Alice 下单 4 本书成功,Bob 下单 7 本书失败,库存剩余 6 本符合预期。

  1. mysql> SELECT * FROM books;
  2. +----+--------------------------------------+----------------------+---------------------+-------+--------+
  3. | id | title | type | published_at | stock | price |
  4. +----+--------------------------------------+----------------------+---------------------+-------+--------+
  5. | 1 | Designing Data-Intensive Application | Science & Technology | 2018-09-01 00:00:00 | 6 | 100.00 |
  6. +----+--------------------------------------+----------------------+---------------------+-------+--------+
  7. 1 row in set (0.00 sec)
  8. mysql> SELECT * FROM orders;
  9. +------+---------+---------+---------+---------------------+
  10. | id | book_id | user_id | quality | ordered_at |
  11. +------+---------+---------+---------+---------------------+
  12. | 1001 | 1 | 1 | 4 | 2022-04-19 11:03:03 |
  13. +------+---------+---------+---------+---------------------+
  14. 1 row in set (0.00 sec)
  15. mysql> SELECT * FROM users;
  16. +----+----------+----------+
  17. | id | balance | nickname |
  18. +----+----------+----------+
  19. | 1 | 10000.00 | Bob |
  20. | 2 | 9600.00 | Alice |
  21. +----+----------+----------+
  22. 2 rows in set (0.01 sec)

乐观事务

下面代码以乐观事务的方式,用两个线程模拟了两个用户并发买同一本书的过程,和悲观事务的示例一样。书店剩余 10 本,Bob 购买了 6 本,Alice 购买了 4 本。两个人几乎同一时间完成订单,最终,这本书的剩余库存为零。

1. 编写乐观事务示例

  • Java
  • Golang
  • Python

使用 Java 编写乐观事务示例:

代码编写

  1. package com.pingcap.txn.optimistic;
  2. import com.zaxxer.hikari.HikariDataSource;
  3. import java.math.BigDecimal;
  4. import java.sql.*;
  5. import java.util.Arrays;
  6. import java.util.concurrent.*;
  7. public class TxnExample {
  8. public static void main(String[] args) throws SQLException, InterruptedException {
  9. System.out.println(Arrays.toString(args));
  10. int aliceQuantity = 0;
  11. int bobQuantity = 0;
  12. for (String arg: args) {
  13. if (arg.startsWith("ALICE_NUM")) {
  14. aliceQuantity = Integer.parseInt(arg.replace("ALICE_NUM=", ""));
  15. }
  16. if (arg.startsWith("BOB_NUM")) {
  17. bobQuantity = Integer.parseInt(arg.replace("BOB_NUM=", ""));
  18. }
  19. }
  20. HikariDataSource ds = new HikariDataSource();
  21. ds.setJdbcUrl("jdbc:mysql://localhost:4000/bookshop?useServerPrepStmts=true&cachePrepStmts=true");
  22. ds.setUsername("root");
  23. ds.setPassword("");
  24. // prepare data
  25. Connection connection = ds.getConnection();
  26. createBook(connection, 1L, "Designing Data-Intensive Application", "Science & Technology",
  27. Timestamp.valueOf("2018-09-01 00:00:00"), new BigDecimal(100), 10);
  28. createUser(connection, 1L, "Bob", new BigDecimal(10000));
  29. createUser(connection, 2L, "Alice", new BigDecimal(10000));
  30. CountDownLatch countDownLatch = new CountDownLatch(2);
  31. ExecutorService threadPool = Executors.newFixedThreadPool(2);
  32. final int finalBobQuantity = bobQuantity;
  33. threadPool.execute(() -> {
  34. buy(ds, 1, 1000L, 1L, 1L, finalBobQuantity, 5);
  35. countDownLatch.countDown();
  36. });
  37. final int finalAliceQuantity = aliceQuantity;
  38. threadPool.execute(() -> {
  39. buy(ds, 2, 1001L, 1L, 2L, finalAliceQuantity, 5);
  40. countDownLatch.countDown();
  41. });
  42. countDownLatch.await(5, TimeUnit.SECONDS);
  43. }
  44. public static void createUser(Connection connection, Long id, String nickname, BigDecimal balance) throws SQLException {
  45. PreparedStatement insert = connection.prepareStatement(
  46. "INSERT INTO `users` (`id`, `nickname`, `balance`) VALUES (?, ?, ?)");
  47. insert.setLong(1, id);
  48. insert.setString(2, nickname);
  49. insert.setBigDecimal(3, balance);
  50. insert.executeUpdate();
  51. }
  52. public static void createBook(Connection connection, Long id, String title, String type, Timestamp publishedAt, BigDecimal price, Integer stock) throws SQLException {
  53. PreparedStatement insert = connection.prepareStatement(
  54. "INSERT INTO `books` (`id`, `title`, `type`, `published_at`, `price`, `stock`) values (?, ?, ?, ?, ?, ?)");
  55. insert.setLong(1, id);
  56. insert.setString(2, title);
  57. insert.setString(3, type);
  58. insert.setTimestamp(4, publishedAt);
  59. insert.setBigDecimal(5, price);
  60. insert.setInt(6, stock);
  61. insert.executeUpdate();
  62. }
  63. public static void buy (HikariDataSource ds, Integer threadID, Long orderID, Long bookID,
  64. Long userID, Integer quantity, Integer retryTimes) {
  65. String txnComment = "/* txn " + threadID + " */ ";
  66. try (Connection connection = ds.getConnection()) {
  67. try {
  68. connection.setAutoCommit(false);
  69. connection.createStatement().executeUpdate(txnComment + "begin optimistic");
  70. // waiting for other thread ran the 'begin optimistic' statement
  71. TimeUnit.SECONDS.sleep(1);
  72. BigDecimal price = null;
  73. // read price of book
  74. PreparedStatement selectBook = connection.prepareStatement(txnComment + "SELECT * FROM books where id = ? for update");
  75. selectBook.setLong(1, bookID);
  76. ResultSet res = selectBook.executeQuery();
  77. if (!res.next()) {
  78. throw new RuntimeException("book not exist");
  79. } else {
  80. price = res.getBigDecimal("price");
  81. int stock = res.getInt("stock");
  82. if (stock < quantity) {
  83. throw new RuntimeException("book not enough");
  84. }
  85. }
  86. // update book
  87. String updateBookSQL = "update `books` set stock = stock - ? where id = ? and stock - ? >= 0";
  88. PreparedStatement updateBook = connection.prepareStatement(txnComment + updateBookSQL);
  89. updateBook.setInt(1, quantity);
  90. updateBook.setLong(2, bookID);
  91. updateBook.setInt(3, quantity);
  92. updateBook.executeUpdate();
  93. // insert order
  94. String insertOrderSQL = "insert into `orders` (`id`, `book_id`, `user_id`, `quality`) values (?, ?, ?, ?)";
  95. PreparedStatement insertOrder = connection.prepareStatement(txnComment + insertOrderSQL);
  96. insertOrder.setLong(1, orderID);
  97. insertOrder.setLong(2, bookID);
  98. insertOrder.setLong(3, userID);
  99. insertOrder.setInt(4, quantity);
  100. insertOrder.executeUpdate();
  101. // update user
  102. String updateUserSQL = "update `users` set `balance` = `balance` - ? where id = ?";
  103. PreparedStatement updateUser = connection.prepareStatement(txnComment + updateUserSQL);
  104. updateUser.setBigDecimal(1, price.multiply(new BigDecimal(quantity)));
  105. updateUser.setLong(2, userID);
  106. updateUser.executeUpdate();
  107. connection.createStatement().executeUpdate(txnComment + "commit");
  108. } catch (Exception e) {
  109. connection.createStatement().executeUpdate(txnComment + "rollback");
  110. System.out.println("error occurred: " + e.getMessage());
  111. if (e instanceof SQLException sqlException) {
  112. switch (sqlException.getErrorCode()) {
  113. // You can get all error codes at https://docs.pingcap.com/tidb/stable/error-codes
  114. case 9007: // Transactions in TiKV encounter write conflicts.
  115. case 8028: // table schema changes
  116. case 8002: // "SELECT FOR UPDATE" commit conflict
  117. case 8022: // The transaction commit fails and has been rolled back
  118. if (retryTimes != 0) {
  119. System.out.println("rest " + retryTimes + " times. retry for " + e.getMessage());
  120. buy(ds, threadID, orderID, bookID, userID, quantity, retryTimes - 1);
  121. }
  122. }
  123. }
  124. }
  125. } catch (SQLException e) {
  126. e.printStackTrace();
  127. }
  128. }
  129. }

配置更改

此处,需将 pom.xml 中启动类:

  1. <mainClass>com.pingcap.txn.TxnExample</mainClass>

更改为:

  1. <mainClass>com.pingcap.txn.optimistic.TxnExample</mainClass>

来指向乐观事务的例子。

Golang 在编写悲观事务示例章节中的例子已经支持了乐观事务,无需更改,可直接使用。

Python 在编写悲观事务示例章节中的例子已经支持了乐观事务,无需更改,可直接使用。

2. 运行不涉及超卖的例子

运行示例程序:

  • Java
  • Golang
  • Python

在 Java 中运行示例程序:

  1. mvn clean package
  2. java -jar target/plain-java-txn-0.0.1-jar-with-dependencies.jar ALICE_NUM=4 BOB_NUM=6

在 Golang 中运行示例程序:

  1. go build -o bin/txn
  2. ./bin/txn -a 4 -b 6 -o true

在 Python 中运行示例程序:

  1. OPTIMISTIC=True ALICE=4 BOB=6 python3 txn_example.py

SQL 语句执行过程:

  1. /* txn 2 */ BEGIN OPTIMISTIC
  2. /* txn 1 */ BEGIN OPTIMISTIC
  3. /* txn 2 */ SELECT * FROM `books` WHERE `id` = 1 FOR UPDATE
  4. /* txn 2 */ UPDATE `books` SET `stock` = `stock` - 4 WHERE `id` = 1 AND `stock` - 4 >= 0
  5. /* txn 2 */ INSERT INTO `orders` (`id`, `book_id`, `user_id`, `quality`) VALUES (1001, 1, 1, 4)
  6. /* txn 2 */ UPDATE `users` SET `balance` = `balance` - 400.0 WHERE `id` = 2
  7. /* txn 2 */ COMMIT
  8. /* txn 1 */ SELECT * FROM `books` WHERE `id` = 1 for UPDATE
  9. /* txn 1 */ UPDATE `books` SET `stock` = `stock` - 6 WHERE `id` = 1 AND `stock` - 6 >= 0
  10. /* txn 1 */ INSERT INTO `orders` (`id`, `book_id`, `user_id`, `quality`) VALUES (1000, 1, 1, 6)
  11. /* txn 1 */ UPDATE `users` SET `balance` = `balance` - 600.0 WHERE `id` = 1
  12. retry 1 times for 9007 Write conflict, txnStartTS=432618733006225412, conflictStartTS=432618733006225411, conflictCommitTS=432618733006225414, key={tableID=126, handle=1} primary={tableID=114, indexID=1, indexValues={1, 1000, }} [try again later]
  13. /* txn 1 */ BEGIN OPTIMISTIC
  14. /* txn 1 */ SELECT * FROM `books` WHERE `id` = 1 FOR UPDATE
  15. /* txn 1 */ UPDATE `books` SET `stock` = `stock` - 6 WHERE `id` = 1 AND `stock` - 6 >= 0
  16. /* txn 1 */ INSERT INTO `orders` (`id`, `book_id`, `user_id`, `quality`) VALUES (1000, 1, 1, 6)
  17. /* txn 1 */ UPDATE `users` SET `balance` = `balance` - 600.0 WHERE `id` = 1
  18. /* txn 1 */ COMMIT

在乐观事务模式下,由于中间状态不一定正确,不能像悲观事务模式一样,通过 affected_rows 来判断某个语句是否执行成功。需要把事务看做一个整体,通过最终的 COMMIT 语句是否返回异常来判断当前事务是否发生写冲突。

从上面 SQL 日志可以看出,由于两个事务并发执行,并且对同一条记录做了修改,txn 1 COMMIT 之后抛出了 9007 Write conflict 异常。对于乐观事务写冲突,在应用端可以进行安全的重试,重试一次之后提交成功,最终执行结果符合预期:

  1. mysql> SELECT * FROM books;
  2. +----+--------------------------------------+----------------------+---------------------+-------+--------+
  3. | id | title | type | published_at | stock | price |
  4. +----+--------------------------------------+----------------------+---------------------+-------+--------+
  5. | 1 | Designing Data-Intensive Application | Science & Technology | 2018-09-01 00:00:00 | 0 | 100.00 |
  6. +----+--------------------------------------+----------------------+---------------------+-------+--------+
  7. 1 row in set (0.01 sec)
  8. mysql> SELECT * FROM orders;
  9. +------+---------+---------+---------+---------------------+
  10. | id | book_id | user_id | quality | ordered_at |
  11. +------+---------+---------+---------+---------------------+
  12. | 1000 | 1 | 1 | 6 | 2022-04-19 03:18:19 |
  13. | 1001 | 1 | 1 | 4 | 2022-04-19 03:18:17 |
  14. +------+---------+---------+---------+---------------------+
  15. 2 rows in set (0.01 sec)
  16. mysql> SELECT * FROM users;
  17. +----+---------+----------+
  18. | id | balance | nickname |
  19. +----+---------+----------+
  20. | 1 | 9400.00 | Bob |
  21. | 2 | 9600.00 | Alice |
  22. +----+---------+----------+
  23. 2 rows in set (0.00 sec)

3. 运行防止超卖的例子

再来看一下用乐观事务防止超卖的例子,如果图书的库存剩余 10 本,Bob 购买 7 本,Alice 购买 4 本,两人几乎同时下单,结果会是怎样?继续复用乐观事务例子里的代码来解决这个需求,只不过把 Bob 购买数量从 6 改成 7:

运行示例程序:

  • Java
  • Golang
  • Python

在 Java 中运行示例程序:

  1. mvn clean package
  2. java -jar target/plain-java-txn-0.0.1-jar-with-dependencies.jar ALICE_NUM=4 BOB_NUM=7

在 Golang 中运行示例程序:

  1. go build -o bin/txn
  2. ./bin/txn -a 4 -b 7 -o true

在 Python 中运行示例程序:

  1. OPTIMISTIC=True ALICE=4 BOB=7 python3 txn_example.py
  1. /* txn 1 */ BEGIN OPTIMISTIC
  2. /* txn 2 */ BEGIN OPTIMISTIC
  3. /* txn 2 */ SELECT * FROM `books` WHERE `id` = 1 FOR UPDATE
  4. /* txn 2 */ UPDATE `books` SET `stock` = `stock` - 4 WHERE `id` = 1 AND `stock` - 4 >= 0
  5. /* txn 2 */ INSERT INTO `orders` (`id`, `book_id`, `user_id`, `quality`) VALUES (1001, 1, 1, 4)
  6. /* txn 2 */ UPDATE `users` SET `balance` = `balance` - 400.0 WHERE `id` = 2
  7. /* txn 2 */ COMMIT
  8. /* txn 1 */ SELECT * FROM `books` WHERE `id` = 1 FOR UPDATE
  9. /* txn 1 */ UPDATE `books` SET `stock` = `stock` - 7 WHERE `id` = 1 AND `stock` - 7 >= 0
  10. /* txn 1 */ INSERT INTO `orders` (`id`, `book_id`, `user_id`, `quality`) VALUES (1000, 1, 1, 7)
  11. /* txn 1 */ UPDATE `users` SET `balance` = `balance` - 700.0 WHERE `id` = 1
  12. retry 1 times for 9007 Write conflict, txnStartTS=432619094333980675, conflictStartTS=432619094333980676, conflictCommitTS=432619094333980678, key={tableID=126, handle=1} primary={tableID=114, indexID=1, indexValues={1, 1000, }} [try again later]
  13. /* txn 1 */ BEGIN OPTIMISTIC
  14. /* txn 1 */ SELECT * FROM `books` WHERE `id` = 1 FOR UPDATE
  15. Fail -> out of stock
  16. /* txn 1 */ ROLLBACK

从上面的 SQL 日志可以看出,第一次执行由于写冲突,txn 1 在应用端进行了重试,从获取到的最新快照对比发现,剩余库存不够,应用端抛出 out of stock 异常结束。

  1. mysql> SELECT * FROM books;
  2. +----+--------------------------------------+----------------------+---------------------+-------+--------+
  3. | id | title | type | published_at | stock | price |
  4. +----+--------------------------------------+----------------------+---------------------+-------+--------+
  5. | 1 | Designing Data-Intensive Application | Science & Technology | 2018-09-01 00:00:00 | 6 | 100.00 |
  6. +----+--------------------------------------+----------------------+---------------------+-------+--------+
  7. 1 row in set (0.00 sec)
  8. mysql> SELECT * FROM orders;
  9. +------+---------+---------+---------+---------------------+
  10. | id | book_id | user_id | quality | ordered_at |
  11. +------+---------+---------+---------+---------------------+
  12. | 1001 | 1 | 1 | 4 | 2022-04-19 03:41:16 |
  13. +------+---------+---------+---------+---------------------+
  14. 1 row in set (0.00 sec)
  15. mysql> SELECT * FROM users;
  16. +----+----------+----------+
  17. | id | balance | nickname |
  18. +----+----------+----------+
  19. | 1 | 10000.00 | Bob |
  20. | 2 | 9600.00 | Alice |
  21. +----+----------+----------+
  22. 2 rows in set (0.00 sec)