事务是关系型数据库的重要特性,Drogon通过Transaction类提供了对事务的支持。

Transaction类的对象由DbClient创建,事务有关的操作很多是自动进行的:

  • Transaction对象创建之初,自动执行了begin语句开始事务;
  • Transaction对象析构时,自动执行commit语句结束事务;
  • 如果中间出现导致事务失败的异常,自动执行rollback语句回滚事务;
  • 如果事务已经回滚,则之后的sql语句都会返回异常(抛异常或者执行异常回调);

事务的创建

事务创建接口由DbClient提供,如下:

  1. std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>());

这个接口很简单,它返回一个Transaction对象的智能指针,显然,这个智能指针失去所有持有者而析构事务对象时,事务也就结束了。参数commitCallback用于返回事务提交是否成功,需要注意的是,这个回调只用来指示commit命令是否成功,如果,事务在执行过程中自动或手动回滚了,这个回调就不会执行了。通常,commit命令都会成功,这个回调的bool类型参数为true,只有某些特殊情况,比如commit执行过程中连接断了,才会导致commitCallback通知用户commit失败,这时,事务在服务端的状态是不确定的,用户需特殊处理,当然,考虑到这种情况极少发生,非关键业务也可以选择忽略这种事件,那么用户只需要在创建事务时忽略这个参数即可(默认的空回调会传入这个接口)。

事务要独占一个数据库连接,因此事务创建过程中,DbClient要从自己的连接池中选一个空闲连接交给事务对象管理,这就有一个问题,如果DbClient的所有连接都在执行sql或者其他事务,那么,这个接口会阻塞直到有空闲连接为止。

框架同时提供了一个异步接口创建事务,如下:

  1. void newTransactionAsync(const std::function<void(const std::shared_ptr<Transaction> &)> &callback);

这个接口通过回调函数返回事务对象,不会阻塞当前线程,保证了应用的高并发,缺点是多了一次回调函数的嵌套。用户可根据实际情况自行选用。

事务的接口

Transaction的接口和DbClient几乎完全一致,除了下面这两点区别:

  • Transaction提供了rollback()接口,使用户可以在任何情况下回滚事务,比如并未有任何错误发生,只是业务逻辑要求回滚事务的情况,有时候事务已经自动回滚了,这时候再次调用rollback()接口,没有任何负作用,因此,显示的使用rollback()接口是个不错的策略,至少可以保证不会错误提交。
  • 用户不能调用事务的newTransaction()接口,这是容易理解的,虽然数据库有子事务的概念,但目前框架并不支持。

事实上,Transaction被设计成DbClient的子类,也是为了保持这些接口的一致性,同时,也为后文ORM的使用创造了方便条件。

框架目前没有提供控制事务隔离等级的接口,也就是说,隔离等级就是当前数据库服务的默认等级。

事务的生命周期

事务对象的智能指针除了被用户持有外,当它有未执行的sql时,框架也会持有,所以不用担心在还有未执行sql的情况下事务对象被析构。另外,事务对象智能指针常常在它的某个接口的结果回调里被捕获并使用,这也是正常的使用方式,不要担心由此引发循环引用导致事务对象永远不会析构,框架会通过回调完毕后清空事务内部的回调函数对象来打破循环引用;

一个例子

举个最简单的例子,假设有一个任务表,用户从中选取一个未处理状态的任务,把它改成正在处理中的状态,为了防止并发的竞态条件,我们使用Transaction,程序如下:

  1. {
  2. auto transPtr = clientPtr->newTransaction();
  3. transPtr->execSqlAsync( "select * from tasks where status=$1 for update order by time",
  4. "none",
  5. [=](const Result &r) {
  6. if (r.size() > 0)
  7. {
  8. std::cout << "Got a task!" << std::endl;
  9. *transPtr << "update tasks set status=$1 where task_id=$2"
  10. << "handling"
  11. << r[0]["task_id"].as<int64_t>()
  12. >> [](const Result &r)
  13. {
  14. std::cout << "Updated!";
  15. ... do something about the task;
  16. }
  17. >> [](const DrogonDbException &e)
  18. {
  19. std::cerr << "err:" << e.base().what() << std::end;
  20. };
  21. }
  22. else
  23. {
  24. std::cout << "No new tasks found!" << std::endl;
  25. }
  26. },
  27. [](const DrogonDbException &e) {
  28. std::cerr << "err:" << e.base().what() << std::end;
  29. });
  30. }

本例中使用select for update避免并发修改,update语句在select语句的结果回调中完成,进行了一次嵌套,最外层的大括号是为了限定transPtr的作用范围,使之在执行完sql后及时的销毁从而结束事务。

08.3 ORM