事务用户手册

Copyright 2016, Baidu, Inc.

单行事务

功能

  • 提供单行的read-modify-write原子语义
    下面的例子中,Session B在事务进行过程中,value被Session A所修改,因此B的提交将失败,否则将出现A的更新被覆盖的问题:

    1. -------------------
    2. | Wang | 100 |
    3. -------------------
    4. Session A Session B
    5. time
    6. | Begin
    7. | Begin
    8. | value=Get(Wang);
    9. | 100
    10. | value=Get(Wang);
    11. | 100
    12. |
    13. v value+=20;
    14. 120
    15. Put(Wang, value);
    16. value-=20;
    17. 80
    18. Put(Wang, value);
    19. Commit
    20. success
    21. Commit
    22. fail
  • 两个事务修改同一行,但读操作所涉及到的CF或者Colum无任何交集时,两个事务不会冲突
    下面的例子中,Session A和Session B分别对同一行的Cf1和Cf2进行read-modify-write操作,后提交的事务不会因先提交的事务而失败:

    1. ---------------------------
    2. | ROW | CF1 | CF2 |
    3. ---------------------------
    4. | Wang | 100 | 100 |
    5. ---------------------------
    6. Session A Session B
    7. time
    8. | Begin
    9. | Begin
    10. | value=Get(Wang, CF1);
    11. | 100
    12. | value=Get(Wang, CF2);
    13. | 100
    14. |
    15. v value+=20;
    16. 120
    17. Put(Wang, CF1, value);
    18. value-=20;
    19. 80
    20. Put(Wang, CF2, value);
    21. Commit
    22. success
    23. Commit
    24. success
  • 能够避免“幻影读”现象
    下面的例子中,Session A在事务中读取了CF1的[C1 ~ C5)区间,区间内只有C2和C4两个列;Session B插入了一个新的列C3,导致Session A的事务提交失败:

    1. ----------------------------------
    2. | ROW | CF1 | CF2 |
    3. | | C2 | C4 | C1 |
    4. | Wang | 100 | 200 | 300 |
    5. ----------------------------------
    6. Session A Session B
    7. time
    8. | Begin;
    9. | Begin;
    10. | column_list=[C1, C5);
    11. | value_list=Get(Wang, CF1, column_list);
    12. | 100, 200
    13. | value=150;
    14. | Put(Wang, CF1, C3, value);
    15. | value_list+=20;
    16. v 120, 220
    17. Put(Wang, CF1, column_list, value_list);
    18. Commit;
    19. success
    20. Commit;
    21. fail

约束

  • 不支持多版本语义
    • 写操作的时间戳不能由用户指定,而是由Tera分配,Tera保证一行内的修改时间戳单调递增,也就是说,只支持以下四种操作:
      • 增加最新版本
      • 删除整行
      • 删除整个CF
      • 删除整个Column
    • 读操作的时间戳不能由用户指定,只能读到第一次Get时快照视图上每个Column的最新版本
    • 数据的历史版本只能由非事务操作修改,历史版本不能参与到事务过程中

API

  1. class Table {
  2. /// 创建事务
  3. virtual Transaction* StartRowTransaction(const std::string& row_key) = 0;
  4. /// 提交事务
  5. virtual void CommitRowTransaction(Transaction* transaction) = 0;
  6. /// 回滚事务
  7. virtual void RollbackRowTransaction(Transaction* transaction) = 0;
  8. };
  9. class Transaction {
  10. /// 提交一个修改操作
  11. virtual void ApplyMutation(RowMutation* row_mu) = 0;
  12. /// 读取操作
  13. virtual void Get(RowReader* row_reader) = 0;
  14. /// 回调函数原型
  15. typedef void (*Callback)(Transaction* transaction);
  16. /// 设置提交回调, 提交操作会异步返回
  17. virtual void SetCommitCallback(Callback callback) = 0;
  18. /// 获取提交回调
  19. virtual Callback GetCommitCallback() = 0;
  20. /// 设置用户上下文,可在回调函数中获取
  21. virtual void SetContext(void* context) = 0;
  22. /// 获取用户上下文
  23. virtual void* GetContext() = 0;
  24. /// 获得结果错误码
  25. virtual const ErrorCode& GetError() = 0;
  26. };

使用示例

  1. #include "tera.h"
  2. int main() {
  3. tera::ErrorCode error_code;
  4. // Get a client instance
  5. tera::Client* client = tera::Client::NewClient("./tera.flag", "txn_sample", &error_code);
  6. assert(client);
  7. // Create table
  8. tera::TableDescriptor schema("employee");
  9. schema.EnableTxn();
  10. schema.AddLocalityGroup("lg0");
  11. schema.AddColumnFamily("title", "lg0");
  12. schema.AddColumnFamily("salary", "lg0");
  13. client->CreateTable(schema, &error_code);
  14. assert(error_code.GetType() == tera::ErrorCode::kOK);
  15. // Open table
  16. tera::Table* table = client->OpenTable("employee", &error_code);
  17. assert(table);
  18. // init a row
  19. tera::RowMutation* init = table->NewRowMutation("Amy");
  20. init->Put("title", "", "junior");
  21. init->Put("salary", "", "100");
  22. table->ApplyMutation(init);
  23. assert(init->GetError().GetType() == tera::ErrorCode::kOK);
  24. delete init;
  25. // txn read the row
  26. tera::Transaction* txn = table->StartRowTransaction("Amy");
  27. tera::RowReader* reader = table->NewRowReader("Amy");
  28. reader->AddColumnFamily("title");
  29. txn->Get(reader);
  30. assert(reader->GetError().GetType() == tera::ErrorCode::kOK);
  31. // get title
  32. std::string title;
  33. while (!reader->Done()) {
  34. if (reader->Family() == "title") {
  35. title = reader->Value();
  36. break;
  37. }
  38. reader->Next();
  39. }
  40. delete reader;
  41. // txn write the row
  42. tera::RowMutation* mutation = table->NewRowMutation("Amy");
  43. if (title == "junior") {
  44. mutation->Put("title", "", "senior");
  45. mutation->Put("salary", "", "200");
  46. } else if (title == "senior") {
  47. mutation->Put("title", "", "director");
  48. mutation->Put("salary", "", "300");
  49. }
  50. txn->ApplyMutation(mutation);
  51. assert(mutation->GetError().GetType() == tera::ErrorCode::kOK);
  52. delete mutation;
  53. // txn commit
  54. table->CommitRowTransaction(txn);
  55. printf("Transaction commit result %s\n", txn->GetError().ToString().c_str());
  56. delete txn;
  57. // Close
  58. delete table;
  59. delete client;
  60. return 0;
  61. }