事务的实现

一个事务从开始到结束通常会经历以下三个阶段:

  • 事务开始。
  • 命令入队。
  • 事务执行。

本节接下来的内容将对这三个阶段进行介绍,说明一个事务从开始到结束的整个过程。

事务开始

MULTI 命令的执行标志着事务的开始:

  1. redis> MULTI
  2. OK

MULTI 命令可以将执行该命令的客户端从非事务状态切换至事务状态,这一切换是通过在客户端状态的 flags 属性中打开 REDIS_MULTI 标识来完成的,MULTI 命令的实现可以用以下伪代码来表示:

  1. def MULTI():
  2.  
  3. # 打开事务标识
  4. client.flags |= REDIS_MULTI
  5.  
  6. # 返回 OK 回复
  7. replyOK()

命令入队

当一个客户端处于非事务状态时,这个客户端发送的命令会立即被服务器执行:

  1. redis> SET "name" "Practical Common Lisp"
  2. OK
  3.  
  4. redis> GET "name"
  5. "Practical Common Lisp"
  6.  
  7. redis> SET "author" "Peter Seibel"
  8. OK
  9.  
  10. redis> GET "author"
  11. "Peter Seibel"

与此不同的是,当一个客户端切换到事务状态之后,服务器会根据这个客户端发来的不同命令执行不同的操作:

  • 如果客户端发送的命令为 EXEC 、 DISCARD 、 WATCH 、 MULTI 四个命令的其中一个,那么服务器立即执行这个命令。
  • 与此相反,如果客户端发送的命令是 EXEC 、 DISCARD 、 WATCH 、 MULTI 四个命令以外的其他命令,那么服务器并不立即执行这个命令,而是将这个命令放入一个事务队列里面,然后向客户端返回 QUEUED 回复。

服务器判断命令是该入队还是该立即执行的过程可以用流程图 IMAGE_ENQUEUE_OR_EXEC 来描述。

digraph enque_or_execute { label = "\n图 IMAGE_ENQUEUE_OR_EXEC 服务器判断命令是该入队还是该执行的过程"; node [shape = box]; // command_in [label = "服务器接到来自客户端的命令"]; in_transaction_or_not [label = "这个客户端正处于事务状态?", shape = diamond]; not_exec_and_discard [label = "这个命令是否\nEXEC 、 DISCARD 、\nWATCH 或 MULTI ?", shape = diamond]; enqueu_command [label = "将命令放入事务队列"]; return_enqueued [label = "向客户端返回 QUEUED"]; exec_command [label = "执行这个命令"]; return_command_result [label = "向客户端返回命令的执行结果"]; // command_in -> in_transaction_or_not; in_transaction_or_not -> not_exec_and_discard [label = "是"]; not_exec_and_discard -> enqueu_command [label = "否"]; not_exec_and_discard -> exec_command [label = "是"]; in_transaction_or_not -> exec_command [label = "否"]; exec_command -> return_command_result; enqueu_command -> return_enqueued;}

事务队列

每个 Redis 客户端都有自己的事务状态,这个事务状态保存在客户端状态的 mstate 属性里面:

  1. typedef struct redisClient {
  2.  
  3. // ...
  4.  
  5. // 事务状态
  6. multiState mstate; /* MULTI/EXEC state */
  7.  
  8. // ...
  9.  
  10. } redisClient;

事务状态包含一个事务队列,以及一个已入队命令的计数器(也可以说是事务队列的长度):

  1. typedef struct multiState {
  2.  
  3. // 事务队列,FIFO 顺序
  4. multiCmd *commands;
  5.  
  6. // 已入队命令计数
  7. int count;
  8.  
  9. } multiState;

事务队列是一个 multiCmd 类型的数组,数组中的每个 multiCmd 结构都保存了一个已入队命令的相关信息,包括指向命令实现函数的指针,命令的参数,以及参数的数量:

  1. typedef struct multiCmd {
  2.  
  3. // 参数
  4. robj **argv;
  5.  
  6. // 参数数量
  7. int argc;
  8.  
  9. // 命令指针
  10. struct redisCommand *cmd;
  11.  
  12. } multiCmd;

事务队列以先进先出(FIFO)的方式保存入队的命令:较先入队的命令会被放到数组的前面,而较后入队的命令则会被放到数组的后面。

举个例子,如果客户端执行以下命令:

  1. redis> MULTI
  2. OK
  3.  
  4. redis> SET "name" "Practical Common Lisp"
  5. QUEUED
  6.  
  7. redis> GET "name"
  8. QUEUED
  9.  
  10. redis> SET "author" "Peter Seibel"
  11. QUEUED
  12.  
  13. redis> GET "author"
  14. QUEUED

那么服务器将为客户端创建图 IMAGE_TRANSACTION_STATE 所示的事务状态:

  • 最先入队的 SET 命令被放在了事务队列的索引 0 位置上。
  • 第二入队的 GET 命令被放在了事务队列的索引 1 位置上。
  • 第三入队的另一个 SET 命令被放在了事务队列的索引 2 位置上。
  • 最后入队的另一个 GET 命令被放在了事务队列的索引 3 位置上。

![digraph { label = "\n 图 IMAGE_TRANSACTION_STATE 事务状态"; rankdir = LR; node [shape = record]; //redisClient [label = " redisClient | … | mstate | … "]; multiState [label = " multiState | commands | count \n 4 "]; commands [label = " multiCmd[4] | <0> [0] | <1> [1] | <2> [2] | <3> [3] "]; multiCmd0 [label = " multiCmd | argv | argc \n 3 | cmd "]; multiCmd1 [label = " multiCmd | argv | argc \n 2 | cmd "]; multiCmd2 [label = " multiCmd | argv | argc \n 3 | cmd "]; multiCmd3 [label = " multiCmd | argv | argc \n 2 | cmd "]; //redisClient:mstate -> multiState:head; multiState:commands -> commands:head; commands:0 -> multiCmd0:head; commands:1 -> multiCmd1:head; commands:2 -> multiCmd2:head; commands:3 -> multiCmd3:head; argv0 [label = " robj[3] | { StringObject \n \"SET\" | StringObject \n \"name\" | StringObject \n \"Practical Common Lisp\" } "]; cmd0 [label = " setCommand ", shape = plaintext]; multiCmd0:argv -> argv0; multiCmd0:cmd -> cmd0; argv1 [label = " robj[2] | { StringObject \n \"GET\" | StringObject \n \"name\" } "]; cmd1 [label = " getCommand ", shape = plaintext]; multiCmd1:argv -> argv1; multiCmd1:cmd -> cmd1; argv2 [label = " robj[3] | { StringObject \n \"SET\" | StringObject \n \"author\" | StringObject \n \"Peter Seibel\" } "]; cmd2 [label = " setCommand ", shape = plaintext]; multiCmd2:argv -> argv2; multiCmd2:cmd -> cmd2; argv3 [label = " robj[2] | { StringObject \n \"GET\" | StringObject \n \"author\" } "]; cmd3 label = " getCommand ", shape = plaintext]; multiCmd3:argv -> argv3; multiCmd3:cmd -> cmd3;}

执行事务

当一个处于事务状态的客户端向服务器发送 EXEC 命令时,这个 EXEC 命令将立即被服务器执行:服务器会遍历这个客户端的事务队列,执行队列中保存的所有命令,最后将执行命令所得的结果全部返回给客户端。

举个例子,对于图 IMAGE_TRANSACTION_STATE 所示的事务队列来说,服务器首先会执行命令:

  1. SET "name" "Practical Common Lisp"

接着执行命令:

  1. GET "name"

之后执行命令:

  1. SET "author" "Peter Seibel"

再之后执行命令:

  1. GET "author"

最后,服务器会将执行这四个命令所得的回复返回给客户端:

  1. redis> EXEC
  2. 1) OK
  3. 2) "Practical Common Lisp"
  4. 3) OK
  5. 4) "Peter Seibel"

EXEC 命令的实现原理可以用以下伪代码来描述:

  1. def EXEC():
  2.  
  3. # 创建空白的回复队列
  4. reply_queue = []
  5.  
  6. # 遍历事务队列中的每个项
  7. # 读取命令的参数,参数的个数,以及要执行的命令
  8. for argv, argc, cmd in client.mstate.commands:
  9.  
  10. # 执行命令,并取得命令的返回值
  11. reply = execute_command(cmd, argv, argc)
  12.  
  13. # 将返回值追加到回复队列末尾
  14. reply_queue.append(reply)
  15.  
  16. # 移除 REDIS_MULTI 标识,让客户端回到非事务状态
  17. client.flags &= ~REDIS_MULTI
  18.  
  19. # 清空客户端的事务状态,包括:
  20. # 1)清零入队命令计数器
  21. # 2)释放事务队列
  22. client.mstate.count = 0
  23. release_transaction_queue(client.mstate.commands)
  24.  
  25. # 将事务的执行结果返回给客户端
  26. send_reply_to_client(client, reply_queue)