第五章 高级发布-订阅模式

第三章和第四章讲述了ZMQ中请求-应答模式的一些高级用法。如果你已经能够彻底理解了,那我要说声恭喜。这一章我们会关注发布-订阅模式,使用上层模式封装,提升ZMQ发布-订阅模式的性能、可靠性、状态同步及安全机制。

本章涉及的内容有:

  • 处理慢订阅者(自杀的蜗牛模式)
  • 高速订阅者(黑箱模式)
  • 构建一个共享键值缓存(克隆模式)

检测慢订阅者(自杀的蜗牛模式)

在使用发布-订阅模式的时候,最常见的问题之一是如何处理响应较慢的订阅者。理想状况下,发布者能以全速发送消息给订阅者,但现实中,订阅者会需要对消息做较长时间的处理,或者写得不够好,无法跟上发布者的脚步。

如何处理慢订阅者?最好的方法当然是让订阅者高效起来,不过这需要额外的工作。以下是一些处理慢订阅者的方法:

  • 在发布者中贮存消息。这是Gmail的做法,如果过去的几小时里没有阅读邮件的话,它会把邮件保存起来。但在高吞吐量的应用中,发布者堆积消息往往会导致内存溢出,最终崩溃。特别是当同是有多个订阅者时,或者无法用磁盘来做一个缓冲,情况就会变得更为复杂。

  • 在订阅者中贮存消息。这种做法要好的多,其实ZMQ默认的行为就是这样的。如果非得有一个人会因为内存溢出而崩溃,那也只会是订阅者,而非发布者,这挺公平的。然而,这种做法只对瞬间消息量很大的应用才合理,订阅者只是一时处理不过来,但最终会赶上进度。但是,这还是没有解决订阅者速度过慢的问题。

  • 暂停发送消息。这也是Gmail的做法,当我的邮箱容量超过7.554GB时,新的邮件就会被Gmail拒收或丢弃。这种做法对发布者来说很有益,ZMQ中若设置了阈值(HWM),其默认行为也就是这样的。但是,我们仍不能解决慢订阅者的问题,我们只是让消息变得断断续续而已。

  • 断开与满订阅者的连接。这是hotmail的做法,如果连续两周没有登录,它就会断开,这也是为什么我正在使用第十五个hotmail邮箱。不过这种方案在ZMQ里是行不通的,因为对于发布者而言,订阅者是不可见的,无法做相应处理。

看来没有一种经典的方式可以满足我们的需求,所以我们就要进行创新了。我们可以让订阅者自杀,而不仅仅是断开连接。这就是“自杀的蜗牛”模式。当订阅者发现自身运行得过慢时(对于慢速的定义应该是一个配置项,当达到这个标准时就大声地喊出来吧,让程序员知道),它会哀嚎一声,然后自杀。

订阅者如何检测自身速度过慢呢?一种方式是为消息进行编号,并在发布者端设置阈值。当订阅者发现消息编号不连续时,它就知道事情不对劲了。这里的阈值就是订阅者自杀的值。

这种方案有两个问题:一、如果我们连接的多个发布者,我们要如何为消息进行编号呢?解决方法是为每一个发布者设定一个唯一的编号,作为消息编号的一部分。二、如果订阅者使用ZMQ_SUBSRIBE选项对消息进行了过滤,那么我们精心设计的消息编号机制就毫无用处了。

有些情形不会进行消息的过滤,所以消息编号还是行得通的。不过更为普遍的解决方案是,发布者为消息标注时间戳,当订阅者收到消息时会检测这个时间戳,如果其差别达到某一个值,就发出警报并自杀。

当订阅者有自身的客户端或服务协议,需要保证最大延迟时间时,自杀的蜗牛模式会很合适。撤销一个订阅者也许并不是最周全的方案,但至少不会引发后续的问题。如果订阅者收到了过时的消息,那可能会对数据造成进一步的破坏,而且很难被发现。

以下是自杀的蜗牛模式的最简实现:

suisnail: Suicidal Snail in C

  1. //
  2. // 自杀的蜗牛模式
  3. //
  4. #include "czmq.h"
  5. // ---------------------------------------------------------------------
  6. // 该订阅者会连接至发布者,接收所有的消息,
  7. // 运行过程中它会暂停一会儿,模拟复杂的运算过程,
  8. // 当发现收到的消息超过1秒的延迟时,就自杀。
  9. #define MAX_ALLOWED_DELAY 1000 // 毫秒
  10. static void
  11. subscriber (void *args, zctx_t *ctx, void *pipe)
  12. {
  13. // 订阅所有消息
  14. void *subscriber = zsocket_new (ctx, ZMQ_SUB);
  15. zsocket_connect (subscriber, "tcp://localhost:5556");
  16. // 获取并处理消息
  17. while (1) {
  18. char *string = zstr_recv (subscriber);
  19. int64_t clock;
  20. int terms = sscanf (string, "%" PRId64, &clock);
  21. assert (terms == 1);
  22. free (string);
  23. // 自杀逻辑
  24. if (zclock_time () - clock > MAX_ALLOWED_DELAY) {
  25. fprintf (stderr, "E: 订阅者无法跟进, 取消中\n");
  26. break;
  27. }
  28. // 工作一定时间
  29. zclock_sleep (1 + randof (2));
  30. }
  31. zstr_send (pipe, "订阅者中止");
  32. }
  33. // ---------------------------------------------------------------------
  34. // 发布者每毫秒发送一条用时间戳标记的消息
  35. static void
  36. publisher (void *args, zctx_t *ctx, void *pipe)
  37. {
  38. // 准备发布者
  39. void *publisher = zsocket_new (ctx, ZMQ_PUB);
  40. zsocket_bind (publisher, "tcp://*:5556");
  41. while (1) {
  42. // 发送当前时间(毫秒)给订阅者
  43. char string [20];
  44. sprintf (string, "%" PRId64, zclock_time ());
  45. zstr_send (publisher, string);
  46. char *signal = zstr_recv_nowait (pipe);
  47. if (signal) {
  48. free (signal);
  49. break;
  50. }
  51. zclock_sleep (1); // 等待1毫秒
  52. }
  53. }
  54. // 下面的代码会启动一个订阅者和一个发布者,当订阅者死亡时停止运行
  55. //
  56. int main (void)
  57. {
  58. zctx_t *ctx = zctx_new ();
  59. void *pubpipe = zthread_fork (ctx, publisher, NULL);
  60. void *subpipe = zthread_fork (ctx, subscriber, NULL);
  61. free (zstr_recv (subpipe));
  62. zstr_send (pubpipe, "break");
  63. zclock_sleep (100);
  64. zctx_destroy (&ctx);
  65. return 0;
  66. }

几点说明:

  • 示例程序中的消息包含了系统当前的时间戳(毫秒)。在现实应用中,你应该使用时间戳作为消息头,并提供消息内容。
  • 示例程序中的发布者和订阅者是同一个进程的两个线程。在现实应用中,他们应该是两个不同的进程。示例中这么做只是为了演示的方便

高速订阅者(黑箱模式)

发布-订阅模式的一个典型应用场景是大规模分布式数据处理。如要处理从证券市场上收集到的数据,可以在证券交易系统上设置一个发布者,获取价格信息,并发送给一组订阅者。如果我们有很多订阅者,我们可以使用TCP。如果订阅者到达一定的量,那我们就应该使用可靠的广播协议,如pgm。

假设我们的发布者每秒产生10万条100个字节的消息。在剔除了不需要的市场信息后,这个比率还是比较合理的。现在我们需要记录一天的数据(8小时约有250GB),再将其传入一个模拟网络,即一组订阅者。虽然10万条数据对ZMQ来说很容易处理,但我们需要更高的速度。

假设我们有多台机器,一台做发布者,其他的做订阅者。这些机器都是8核的,发布者那台有12核。

在我们开始发布消息时,有两点需要注意:

  1. 即便只是处理很少的数据,订阅者仍有可能跟不上发布者的速度;
  2. 当处理到6M/s的数据量时,发布者和订阅者都有可能达到极限。

首先,我们需要将订阅者设计为一种多线程的处理程序,这样我们就能在一个线程中读取消息,使用其他线程来处理消息。一般来说,我们对每种消息的处理方式都是不同的。这样一来,订阅者可以对收到的消息进行一次过滤,如根据头信息来判别。当消息满足某些条件,订阅者会将消息交给worker处理。用ZMQ的语言来说,订阅者会将消息转发给worker来处理。

这样一来,订阅者看上去就像是一个队列装置,我们可以用各种方式去连接队列装置和worker。如我们建立单向的通信,每个worker都是相同的,可以使用PUSH和PULL套接字,分发的工作就交给ZMQ吧。这是最简单也是最快速的方式:

1

订阅者和发布者之间的通信使用TCP或PGM协议,订阅者和worker的通信由于是在同一个进程中完成的,所以使用inproc协议。

下面我们看看如何突破瓶颈。由于订阅者是单线程的,当它的CPU占用率达到100%时,它无法使用其他的核心。单线程程序总是会遇到瓶颈的,不管是2M、6M还是更多。我们需要将工作量分配到不同的线程中去,并发地执行。

很多高性能产品使用的方案是分片,就是将工作量拆分成独立并行的流。如,一半的专题数据由一个流媒体传输,另一半由另一个流媒体传输。我们可以建立更多的流媒体,但如果CPU核心数不变,那就没有必要了。
让我们看看如何将工作量分片为两个流:

2

要让两个流全速工作,需要这样配置ZMQ:

  • 使用两个I/O线程,而不是一个;
  • 使用两个独立的网络接口;
  • 每个I/O线程绑定至一个网络接口;
  • 两个订阅者线程,分别绑定至一个核心;
  • 使用两个SUB套接字;
  • 剩余的核心供worker使用;
  • worker线程同时绑定至两个订阅者线程的PUSH套接字。

创建的线程数量应和CPU核心数一致,如果我们建立的线程数量超过核心数,那其处理速度只会减少。另外,开放多个I/O线程也是没有必要的。

共享键值缓存(克隆模式)

发布-订阅模式和无线电广播有些类似,在你收听之前发送的消息你将无从得知,收到消息的多少又会取决于你的接收能力。让人吃惊的是,对于那些追求完美的工程师来说,这种机器恰恰符合他们的需求,且广为传播,成为现实生活中分发消息的最佳机制。想想非死不可、推特、BBS新闻、体育新闻等应用就知道了。

但是,在很多情形下,可靠的发布-订阅模式同样是有价值的。正如我们讨论请求-应答模式一样,我们会根据“故障”来定义“可靠性”,下面几项便是发布-订阅模式中可能发生的故障:

  • 订阅者连接太慢,因此没有收到发布者最初发送的消息;
  • 订阅者速度太慢,同样会丢失消息;
  • 订阅者可能会断开,其间的消息也会丢失。

还有一些情况我们碰到的比较少,但不是没有:

  • 订阅者崩溃、重启,从而丢失了所有已收到的消息;
  • 订阅者处理消息的速度过慢,导致消息在队列中堆砌并溢出;
  • 因网络过载而丢失消息(特别是PGM协议下的连接);
  • 网速过慢,消息在发布者处溢出,从而崩溃。

其实还会有其他出错的情况,只是以上这些在现实应用中是比较典型的。

我们已经有方法解决上面的某些问题了,比如对于慢速订阅者可以使用自杀的蜗牛模式。但是,对于其他的问题,我们最后能有一个可复用的框架来编写可靠的发布-订阅模式。

难点在于,我们并不知道目标应用程序会怎样处理这些数据。它们会进行过滤、只处理一部分消息吗?它们是否会将消息记录起来供日后使用?它们是否会将消息转发给其下的worker进行处理?需要考虑的情况实在太多了,每种情况都有其所谓的可靠性。

所以,我们将问题抽象出来,供多种应用程序使用。这种抽象应用我们称之为共享的键值缓存,它的功能是通过唯一的键名存储二进制数据块。

不要将这个抽象应用和分布式哈希表混淆起来,它是用来解决节点在分布式网络中相连接的问题的;也不要和分布式键值表混淆,它更像是一个NoSQL数据库。我们要建立的应用是将内存中的状态可靠地传递给一组客户端,它要做到的是:

  • 客户端可以随时加入网络,并获得服务端当前的状态;
  • 任何客户端都可以改变键值缓存(插入、更新、删除);
  • 将这种变化以最短的延迟可靠地传达给所有的客户端;
  • 能够处理大量的客户端,成百上千。

克隆模式的要点在于客户端会反过来和服务端进行通信,这在简单的发布-订阅模式中并不常见。所以我这里使用“服务端”、“客户端”而不是“发布者”、“订阅者”这两个词。我们会使用发布-订阅模式作为核心消息模式,不过还需要夹杂其他模式。

分发键值更新事件

我们会分阶段实施克隆模式。首先,我们看看如何从服务器发送键值更新事件给所有的客户端。我们将第一章中使用的天气服务模型进行改造,以键值对的方式发送信息,并让客户端使用哈希表来保存:

3

以下是服务端代码:

clonesrv1: Clone server, Model One in C

  1. //
  2. // 克隆模式服务端模型1
  3. //
  4. // 让我们直接编译,不生成类库
  5. #include "kvsimple.c"
  6. int main (void)
  7. {
  8. // 准备上下文和PUB套接字
  9. zctx_t *ctx = zctx_new ();
  10. void *publisher = zsocket_new (ctx, ZMQ_PUB);
  11. zsocket_bind (publisher, "tcp://*:5556");
  12. zclock_sleep (200);
  13. zhash_t *kvmap = zhash_new ();
  14. int64_t sequence = 0;
  15. srandom ((unsigned) time (NULL));
  16. while (!zctx_interrupted) {
  17. // 使用键值对分发消息
  18. kvmsg_t *kvmsg = kvmsg_new (++sequence);
  19. kvmsg_fmt_key (kvmsg, "%d", randof (10000));
  20. kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
  21. kvmsg_send (kvmsg, publisher);
  22. kvmsg_store (&kvmsg, kvmap);
  23. }
  24. printf (" 已中止\n已发送 %d 条消息\n", (int) sequence);
  25. zhash_destroy (&kvmap);
  26. zctx_destroy (&ctx);
  27. return 0;
  28. }

以下是客户端代码:

clonecli1: Clone client, Model One in C

  1. //
  2. // 克隆模式客户端模型1
  3. //
  4. // 让我们直接编译,不生成类库
  5. #include "kvsimple.c"
  6. int main (void)
  7. {
  8. // 准备上下文和SUB套接字
  9. zctx_t *ctx = zctx_new ();
  10. void *updates = zsocket_new (ctx, ZMQ_SUB);
  11. zsocket_connect (updates, "tcp://localhost:5556");
  12. zhash_t *kvmap = zhash_new ();
  13. int64_t sequence = 0;
  14. while (TRUE) {
  15. kvmsg_t *kvmsg = kvmsg_recv (updates);
  16. if (!kvmsg)
  17. break; // 中断
  18. kvmsg_store (&kvmsg, kvmap);
  19. sequence++;
  20. }
  21. printf (" 已中断\n收到 %d 条消息\n", (int) sequence);
  22. zhash_destroy (&kvmap);
  23. zctx_destroy (&ctx);
  24. return 0;
  25. }

几点说明:

  • 所有复杂的工作都在kvmsg类中完成了,这个类能够处理键值对类型的消息对象,其实质上是一个ZMQ多帧消息,共有三帧:键(ZMQ字符串)、编号(64位,按字节顺序排列)、二进制体(保存所有附加信息)。
  • 服务端随机生成消息,使用四位数作为键,这样可以模拟大量而不是过量的哈希表(1万个条目)。
  • 服务端绑定套接字后会等待200毫秒,以避免订阅者连接延迟而丢失数据的问题。我们会在后面的模型中解决这一点。
  • 我们使用“发布者”和“订阅者”来命名程序中使用的套接字,这样可以避免和后续模型中的其他套接字发生混淆。

以下是kvmsg的代码,已经经过了精简:
kvsimple: Key-value message class in C

  1. /* =====================================================================
  2. kvsimple - simple key-value message class for example applications
  3. ---------------------------------------------------------------------
  4. Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
  5. Copyright other contributors as noted in the AUTHORS file.
  6. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
  7. This is free software; you can redistribute it and/or modify it under
  8. the terms of the GNU Lesser General Public License as published by
  9. the Free Software Foundation; either version 3 of the License, or (at
  10. your option) any later version.
  11. This software is distributed in the hope that it will be useful, but
  12. WITHOUT ANY WARRANTY; without even the implied warranty of
  13. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. Lesser General Public License for more details.
  15. You should have received a copy of the GNU Lesser General Public
  16. License along with this program. If not, see
  17. <http://www.gnu.org/licenses/>.
  18. =====================================================================
  19. */
  20. #include "kvsimple.h"
  21. #include "zlist.h"
  22. // 键是一个短字符串
  23. #define KVMSG_KEY_MAX 255
  24. // 消息被格式化成三帧
  25. // frame 0: 键(ZMQ字符串)
  26. // frame 1: 编号(8个字节,按顺序排列)
  27. // frame 2: 内容(二进制数据块)
  28. #define FRAME_KEY 0
  29. #define FRAME_SEQ 1
  30. #define FRAME_BODY 2
  31. #define KVMSG_FRAMES 3
  32. // 类结构
  33. struct _kvmsg {
  34. // 消息中某帧是否存在
  35. int present [KVMSG_FRAMES];
  36. // 对应的ZMQ消息帧
  37. zmq_msg_t frame [KVMSG_FRAMES];
  38. // 将键转换为C语言字符串
  39. char key [KVMSG_KEY_MAX + 1];
  40. };
  41. // ---------------------------------------------------------------------
  42. // 构造函数,设置编号
  43. kvmsg_t *
  44. kvmsg_new (int64_t sequence)
  45. {
  46. kvmsg_t
  47. *self;
  48. self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t));
  49. kvmsg_set_sequence (self, sequence);
  50. return self;
  51. }
  52. // ---------------------------------------------------------------------
  53. // 析构函数
  54. // 释放消息中的帧,可供zhash_freefn()函数调用
  55. void
  56. kvmsg_free (void *ptr)
  57. {
  58. if (ptr) {
  59. kvmsg_t *self = (kvmsg_t *) ptr;
  60. // 销毁消息中的帧
  61. int frame_nbr;
  62. for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
  63. if (self->present [frame_nbr])
  64. zmq_msg_close (&self->frame [frame_nbr]);
  65. // 释放对象本身
  66. free (self);
  67. }
  68. }
  69. void
  70. kvmsg_destroy (kvmsg_t **self_p)
  71. {
  72. assert (self_p);
  73. if (*self_p) {
  74. kvmsg_free (*self_p);
  75. *self_p = NULL;
  76. }
  77. }
  78. // ---------------------------------------------------------------------
  79. // 从套接字中读取键值消息,返回kvmsg实例
  80. kvmsg_t *
  81. kvmsg_recv (void *socket)
  82. {
  83. assert (socket);
  84. kvmsg_t *self = kvmsg_new (0);
  85. // 读取所有帧,出错则销毁对象
  86. int frame_nbr;
  87. for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
  88. if (self->present [frame_nbr])
  89. zmq_msg_close (&self->frame [frame_nbr]);
  90. zmq_msg_init (&self->frame [frame_nbr]);
  91. self->present [frame_nbr] = 1;
  92. if (zmq_recvmsg (socket, &self->frame [frame_nbr], 0) == -1) {
  93. kvmsg_destroy (&self);
  94. break;
  95. }
  96. // 验证多帧消息
  97. int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0;
  98. if (zsockopt_rcvmore (socket) != rcvmore) {
  99. kvmsg_destroy (&self);
  100. break;
  101. }
  102. }
  103. return self;
  104. }
  105. // ---------------------------------------------------------------------
  106. // 向套接字发送键值对消息,不检验消息帧的内容
  107. void
  108. kvmsg_send (kvmsg_t *self, void *socket)
  109. {
  110. assert (self);
  111. assert (socket);
  112. int frame_nbr;
  113. for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
  114. zmq_msg_t copy;
  115. zmq_msg_init (&copy);
  116. if (self->present [frame_nbr])
  117. zmq_msg_copy (&copy, &self->frame [frame_nbr]);
  118. zmq_sendmsg (socket, &copy,
  119. (frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0);
  120. zmq_msg_close (&copy);
  121. }
  122. }
  123. // ---------------------------------------------------------------------
  124. // 从消息中获取键值,不存在则返回NULL
  125. char *
  126. kvmsg_key (kvmsg_t *self)
  127. {
  128. assert (self);
  129. if (self->present [FRAME_KEY]) {
  130. if (!*self->key) {
  131. size_t size = zmq_msg_size (&self->frame [FRAME_KEY]);
  132. if (size > KVMSG_KEY_MAX)
  133. size = KVMSG_KEY_MAX;
  134. memcpy (self->key,
  135. zmq_msg_data (&self->frame [FRAME_KEY]), size);
  136. self->key [size] = 0;
  137. }
  138. return self->key;
  139. }
  140. else
  141. return NULL;
  142. }
  143. // ---------------------------------------------------------------------
  144. // 返回消息的编号
  145. int64_t
  146. kvmsg_sequence (kvmsg_t *self)
  147. {
  148. assert (self);
  149. if (self->present [FRAME_SEQ]) {
  150. assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8);
  151. byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]);
  152. int64_t sequence = ((int64_t) (source [0]) << 56)
  153. + ((int64_t) (source [1]) << 48)
  154. + ((int64_t) (source [2]) << 40)
  155. + ((int64_t) (source [3]) << 32)
  156. + ((int64_t) (source [4]) << 24)
  157. + ((int64_t) (source [5]) << 16)
  158. + ((int64_t) (source [6]) << 8)
  159. + (int64_t) (source [7]);
  160. return sequence;
  161. }
  162. else
  163. return 0;
  164. }
  165. // ---------------------------------------------------------------------
  166. // 返回消息内容,不存在则返回NULL
  167. byte *
  168. kvmsg_body (kvmsg_t *self)
  169. {
  170. assert (self);
  171. if (self->present [FRAME_BODY])
  172. return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]);
  173. else
  174. return NULL;
  175. }
  176. // ---------------------------------------------------------------------
  177. // 返回消息内容的大小
  178. size_t
  179. kvmsg_size (kvmsg_t *self)
  180. {
  181. assert (self);
  182. if (self->present [FRAME_BODY])
  183. return zmq_msg_size (&self->frame [FRAME_BODY]);
  184. else
  185. return 0;
  186. }
  187. // ---------------------------------------------------------------------
  188. // 设置消息的键
  189. void
  190. kvmsg_set_key (kvmsg_t *self, char *key)
  191. {
  192. assert (self);
  193. zmq_msg_t *msg = &self->frame [FRAME_KEY];
  194. if (self->present [FRAME_KEY])
  195. zmq_msg_close (msg);
  196. zmq_msg_init_size (msg, strlen (key));
  197. memcpy (zmq_msg_data (msg), key, strlen (key));
  198. self->present [FRAME_KEY] = 1;
  199. }
  200. // ---------------------------------------------------------------------
  201. // 设置消息的编号
  202. void
  203. kvmsg_set_sequence (kvmsg_t *self, int64_t sequence)
  204. {
  205. assert (self);
  206. zmq_msg_t *msg = &self->frame [FRAME_SEQ];
  207. if (self->present [FRAME_SEQ])
  208. zmq_msg_close (msg);
  209. zmq_msg_init_size (msg, 8);
  210. byte *source = zmq_msg_data (msg);
  211. source [0] = (byte) ((sequence >> 56) & 255);
  212. source [1] = (byte) ((sequence >> 48) & 255);
  213. source [2] = (byte) ((sequence >> 40) & 255);
  214. source [3] = (byte) ((sequence >> 32) & 255);
  215. source [4] = (byte) ((sequence >> 24) & 255);
  216. source [5] = (byte) ((sequence >> 16) & 255);
  217. source [6] = (byte) ((sequence >> 8) & 255);
  218. source [7] = (byte) ((sequence) & 255);
  219. self->present [FRAME_SEQ] = 1;
  220. }
  221. // ---------------------------------------------------------------------
  222. // 设置消息内容
  223. void
  224. kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
  225. {
  226. assert (self);
  227. zmq_msg_t *msg = &self->frame [FRAME_BODY];
  228. if (self->present [FRAME_BODY])
  229. zmq_msg_close (msg);
  230. self->present [FRAME_BODY] = 1;
  231. zmq_msg_init_size (msg, size);
  232. memcpy (zmq_msg_data (msg), body, size);
  233. }
  234. // ---------------------------------------------------------------------
  235. // 使用printf()格式设置消息键
  236. void
  237. kvmsg_fmt_key (kvmsg_t *self, char *format, ...)
  238. {
  239. char value [KVMSG_KEY_MAX + 1];
  240. va_list args;
  241. assert (self);
  242. va_start (args, format);
  243. vsnprintf (value, KVMSG_KEY_MAX, format, args);
  244. va_end (args);
  245. kvmsg_set_key (self, value);
  246. }
  247. // ---------------------------------------------------------------------
  248. // 使用springf()格式设置消息内容
  249. void
  250. kvmsg_fmt_body (kvmsg_t *self, char *format, ...)
  251. {
  252. char value [255 + 1];
  253. va_list args;
  254. assert (self);
  255. va_start (args, format);
  256. vsnprintf (value, 255, format, args);
  257. va_end (args);
  258. kvmsg_set_body (self, (byte *) value, strlen (value));
  259. }
  260. // ---------------------------------------------------------------------
  261. // 若kvmsg结构的键值均存在,则存入哈希表;
  262. // 如果kvmsg结构已没有引用,则自动销毁和释放。
  263. void
  264. kvmsg_store (kvmsg_t **self_p, zhash_t *hash)
  265. {
  266. assert (self_p);
  267. if (*self_p) {
  268. kvmsg_t *self = *self_p;
  269. assert (self);
  270. if (self->present [FRAME_KEY]
  271. && self->present [FRAME_BODY]) {
  272. zhash_update (hash, kvmsg_key (self), self);
  273. zhash_freefn (hash, kvmsg_key (self), kvmsg_free);
  274. }
  275. *self_p = NULL;
  276. }
  277. }
  278. // ---------------------------------------------------------------------
  279. // 将消息内容打印至标准错误输出,用以调试和跟踪
  280. void
  281. kvmsg_dump (kvmsg_t *self)
  282. {
  283. if (self) {
  284. if (!self) {
  285. fprintf (stderr, "NULL");
  286. return;
  287. }
  288. size_t size = kvmsg_size (self);
  289. byte *body = kvmsg_body (self);
  290. fprintf (stderr, "[seq:%" PRId64 "]", kvmsg_sequence (self));
  291. fprintf (stderr, "[key:%s]", kvmsg_key (self));
  292. fprintf (stderr, "[size:%zd] ", size);
  293. int char_nbr;
  294. for (char_nbr = 0; char_nbr < size; char_nbr++)
  295. fprintf (stderr, "%02X", body [char_nbr]);
  296. fprintf (stderr, "\n");
  297. }
  298. else
  299. fprintf (stderr, "NULL message\n");
  300. }
  301. // ---------------------------------------------------------------------
  302. // 测试用例
  303. int
  304. kvmsg_test (int verbose)
  305. {
  306. kvmsg_t
  307. *kvmsg;
  308. printf (" * kvmsg: ");
  309. // 准备上下文和套接字
  310. zctx_t *ctx = zctx_new ();
  311. void *output = zsocket_new (ctx, ZMQ_DEALER);
  312. int rc = zmq_bind (output, "ipc://kvmsg_selftest.ipc");
  313. assert (rc == 0);
  314. void *input = zsocket_new (ctx, ZMQ_DEALER);
  315. rc = zmq_connect (input, "ipc://kvmsg_selftest.ipc");
  316. assert (rc == 0);
  317. zhash_t *kvmap = zhash_new ();
  318. // 测试简单消息的发送和接受
  319. kvmsg = kvmsg_new (1);
  320. kvmsg_set_key (kvmsg, "key");
  321. kvmsg_set_body (kvmsg, (byte *) "body", 4);
  322. if (verbose)
  323. kvmsg_dump (kvmsg);
  324. kvmsg_send (kvmsg, output);
  325. kvmsg_store (&kvmsg, kvmap);
  326. kvmsg = kvmsg_recv (input);
  327. if (verbose)
  328. kvmsg_dump (kvmsg);
  329. assert (streq (kvmsg_key (kvmsg), "key"));
  330. kvmsg_store (&kvmsg, kvmap);
  331. // 关闭并销毁所有对象
  332. zhash_destroy (&kvmap);
  333. zctx_destroy (&ctx);
  334. printf ("OK\n");
  335. return 0;
  336. }

我们会在下文编写一个更为完整的kvmsg类,可以用到现实环境中。

客户端和服务端都会维护一个哈希表,但这个模型需要所有的客户端都比服务端启动得早,而且不能崩溃,这显然不能满足可靠性的要求。

创建快照

为了让后续连接的(或从故障中恢复的)客户端能够获取服务器上的状态信息,需要让它在连接时获取一份快照。正如我们将“消息”的概念简化为“已编号的键值对”,我们也可以将“状态”简化为“一个哈希表”。为获取服务端状态,客户端会打开一个REQ套接字进行请求:

4

我们需要考虑时间的问题,因为生成快照是需要一定时间的,我们需要知道应从哪个更新事件开始更新快照,服务端是不知道何时有更新事件的。一种方法是先开始订阅消息,收到第一个消息之后向服务端请求“将该条更新之前的所有内容发送给”。这样一来,服务器需要为每一次更新保存一份快照,这显然是不现实的。

所以,我们会在客户端用以下方式进行同步:

  • 客户端开始订阅服务器的更新事件,然后请求一份快照。这样就能保证这份快照是在上一次更新事件之后产生的。

  • 客户端开始等待服务器的快照,并将更新事件保存在队列中,做法很简单,不要从套接字中读取消息就可以了,ZMQ会自动将这些消息保存起来,这时不应设置阈值(HWM)。

  • 当客户端获取到快照后,它将再次开始读取更新事件,但是需要丢弃那些早于快照生成时间的事件。如快照生成时包含了200次更新,那客户端会从第201次更新开始读取。

  • 随后,客户端就会用更新事件去更新自身的状态了。

这是一个比较简单的模型,因为它用到了ZMQ消息队列的机制。服务端代码如下:

clonesrv2: Clone server, Model Two in C

  1. //
  2. // 克隆模式 - 服务端 - 模型2
  3. //
  4. // 让我们直接编译,不创建类库
  5. #include "kvsimple.c"
  6. static int s_send_single (char *key, void *data, void *args);
  7. static void state_manager (void *args, zctx_t *ctx, void *pipe);
  8. int main (void)
  9. {
  10. // 准备套接字和上下文
  11. zctx_t *ctx = zctx_new ();
  12. void *publisher = zsocket_new (ctx, ZMQ_PUB);
  13. zsocket_bind (publisher, "tcp://*:5557");
  14. int64_t sequence = 0;
  15. srandom ((unsigned) time (NULL));
  16. // 开启状态管理器,并等待同步信号
  17. void *updates = zthread_fork (ctx, state_manager, NULL);
  18. free (zstr_recv (updates));
  19. while (!zctx_interrupted) {
  20. // 分发键值消息
  21. kvmsg_t *kvmsg = kvmsg_new (++sequence);
  22. kvmsg_fmt_key (kvmsg, "%d", randof (10000));
  23. kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
  24. kvmsg_send (kvmsg, publisher);
  25. kvmsg_send (kvmsg, updates);
  26. kvmsg_destroy (&kvmsg);
  27. }
  28. printf (" 已中断\n已发送 %d 条消息\n", (int) sequence);
  29. zctx_destroy (&ctx);
  30. return 0;
  31. }
  32. // 快照请求方信息
  33. typedef struct {
  34. void *socket; // 用于发送快照的ROUTER套接字
  35. zframe_t *identity; // 请求方的标识
  36. } kvroute_t;
  37. // 发送快照中单个键值对
  38. // 使用kvmsg对象作为载体
  39. static int
  40. s_send_single (char *key, void *data, void *args)
  41. {
  42. kvroute_t *kvroute = (kvroute_t *) args;
  43. // 先发送接收方标识
  44. zframe_send (&kvroute->identity,
  45. kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
  46. kvmsg_t *kvmsg = (kvmsg_t *) data;
  47. kvmsg_send (kvmsg, kvroute->socket);
  48. return 0;
  49. }
  50. // 该线程维护服务端状态,并处理快照请求。
  51. //
  52. static void
  53. state_manager (void *args, zctx_t *ctx, void *pipe)
  54. {
  55. zhash_t *kvmap = zhash_new ();
  56. zstr_send (pipe, "READY");
  57. void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
  58. zsocket_bind (snapshot, "tcp://*:5556");
  59. zmq_pollitem_t items [] = {
  60. { pipe, 0, ZMQ_POLLIN, 0 },
  61. { snapshot, 0, ZMQ_POLLIN, 0 }
  62. };
  63. int64_t sequence = 0; // 当前快照版本
  64. while (!zctx_interrupted) {
  65. int rc = zmq_poll (items, 2, -1);
  66. if (rc == -1 && errno == ETERM)
  67. break; // 上下文异常
  68. // 等待主线程的更新事件
  69. if (items [0].revents & ZMQ_POLLIN) {
  70. kvmsg_t *kvmsg = kvmsg_recv (pipe);
  71. if (!kvmsg)
  72. break; // 中断
  73. sequence = kvmsg_sequence (kvmsg);
  74. kvmsg_store (&kvmsg, kvmap);
  75. }
  76. // 执行快照请求
  77. if (items [1].revents & ZMQ_POLLIN) {
  78. zframe_t *identity = zframe_recv (snapshot);
  79. if (!identity)
  80. break; // 中断
  81. // 请求内容在第二帧中
  82. char *request = zstr_recv (snapshot);
  83. if (streq (request, "ICANHAZ?"))
  84. free (request);
  85. else {
  86. printf ("E: 错误的请求,程序中止\n");
  87. break;
  88. }
  89. // 发送快照给客户端
  90. kvroute_t routing = { snapshot, identity };
  91. // 逐项发送
  92. zhash_foreach (kvmap, s_send_single, &routing);
  93. // 发送结束标识,内含快照版本号
  94. printf ("正在发送快照,版本号 %d\n", (int) sequence);
  95. zframe_send (&identity, snapshot, ZFRAME_MORE);
  96. kvmsg_t *kvmsg = kvmsg_new (sequence);
  97. kvmsg_set_key (kvmsg, "KTHXBAI");
  98. kvmsg_set_body (kvmsg, (byte *) "", 0);
  99. kvmsg_send (kvmsg, snapshot);
  100. kvmsg_destroy (&kvmsg);
  101. }
  102. }
  103. zhash_destroy (&kvmap);
  104. }

以下是客户端代码:

clonecli2: Clone client, Model Two in C

  1. //
  2. // 克隆模式 - 客户端 - 模型2
  3. //
  4. // 让我们直接编译,不生成类库
  5. #include "kvsimple.c"
  6. int main (void)
  7. {
  8. // 准备上下文和SUB套接字
  9. zctx_t *ctx = zctx_new ();
  10. void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
  11. zsocket_connect (snapshot, "tcp://localhost:5556");
  12. void *subscriber = zsocket_new (ctx, ZMQ_SUB);
  13. zsocket_connect (subscriber, "tcp://localhost:5557");
  14. zhash_t *kvmap = zhash_new ();
  15. // 获取快照
  16. int64_t sequence = 0;
  17. zstr_send (snapshot, "ICANHAZ?");
  18. while (TRUE) {
  19. kvmsg_t *kvmsg = kvmsg_recv (snapshot);
  20. if (!kvmsg)
  21. break; // 中断
  22. if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
  23. sequence = kvmsg_sequence (kvmsg);
  24. printf ("已获取快照,版本号=%d\n", (int) sequence);
  25. kvmsg_destroy (&kvmsg);
  26. break; // 完成
  27. }
  28. kvmsg_store (&kvmsg, kvmap);
  29. }
  30. // 应用队列中的更新事件,丢弃过时事件
  31. while (!zctx_interrupted) {
  32. kvmsg_t *kvmsg = kvmsg_recv (subscriber);
  33. if (!kvmsg)
  34. break; // 中断
  35. if (kvmsg_sequence (kvmsg) > sequence) {
  36. sequence = kvmsg_sequence (kvmsg);
  37. kvmsg_store (&kvmsg, kvmap);
  38. }
  39. else
  40. kvmsg_destroy (&kvmsg);
  41. }
  42. zhash_destroy (&kvmap);
  43. zctx_destroy (&ctx);
  44. return 0;
  45. }

几点说明:

  • 客户端使用两个线程,一个用来生成随机的更新事件,另一个用来管理状态。两者之间使用PAIR套接字通信。可能你会考虑使用SUB套接字,但是“慢连接”的问题会影响到程序运行。PAIR套接字会让两个线程严格同步的。

  • 我们在updates套接字上设置了阈值(HWM),避免更新服务内存溢出。在inproc协议的连接中,阈值是两端套接字阈值的加和,所以要分别设置。

  • 客户端比较简单,用C语言编写,大约60行代码。大多数工作都在kvmsg类中完成了,不过总的来说,克隆模式实现起来还是比较简单的。

  • 我们没有用特别的方式来序列化状态内容。键值对用kvmsg对象表示,保存在一个哈希表中。在不同的时间请求状态时会得到不同的快照。

  • 我们假设客户端只和一个服务进行通信,而且服务必须是正常运行的。我们暂不考虑如何从服务崩溃的情形中恢复过来。

现在,这两段程序都还没有真正地工作起来,但已经能够正确地同步状态了。这是一个多种消息模式的混合体:进程内的PAIR、发布-订阅、ROUTER-DEALER等。

重发键值更新事件

第二个模型中,键值更新事件都来自于服务器,构成了一个中心化的模型。但是我们需要的是一个能够在客户端进行更新的缓存,并能同步到其他客户端中。这时,服务端只是一个无状态的中间件,带来的好处有:

  • 我们不用太过关心服务端的可靠性,因为即使它崩溃了,我们仍能从客户端中获取完整的数据。
  • 我们可以使用键值缓存在动态节点之间分享数据。

客户端的键值更新事件会通过PUSH-PULL套接字传达给服务端:

5

我们为什么不让客户端直接将更新信息发送给其他客户端呢?虽然这样做可以减少延迟,但是就无法为更新事件添加自增的唯一编号了。很多应用程序都需要更新事件以某种方式排序,只有将消息发给服务端,由服务端分发更新消息,才能保证更新事件的顺序。

有了唯一的编号后,客户端还能检测到更多的故障:网络堵塞或队列溢出。如果客户端发现消息输入流有一段空白,它能采取措施。可能你会觉得此时让客户端通知服务端,让它重新发送丢失的信息,可以解决问题。但事实上没有必要这么做。消息流的空挡表示网络状况不好,如果再进行这样的请求,只会让事情变得更糟。所以一般的做法是由客户端发出警告,并停止运行,等到有专人来维护后再继续工作。
我们开始创建在客户端进行状态更新的模型。以下是客户端代码:

clonesrv3: Clone server, Model Three in C

  1. //
  2. // 克隆模式 服务端 模型3
  3. //
  4. // 直接编译,不创建类库
  5. #include "kvsimple.c"
  6. static int s_send_single (char *key, void *data, void *args);
  7. // 快照请求方信息
  8. typedef struct {
  9. void *socket; // ROUTER套接字
  10. zframe_t *identity; // 请求方标识
  11. } kvroute_t;
  12. int main (void)
  13. {
  14. // 准备上下文和套接字
  15. zctx_t *ctx = zctx_new ();
  16. void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
  17. zsocket_bind (snapshot, "tcp://*:5556");
  18. void *publisher = zsocket_new (ctx, ZMQ_PUB);
  19. zsocket_bind (publisher, "tcp://*:5557");
  20. void *collector = zsocket_new (ctx, ZMQ_PULL);
  21. zsocket_bind (collector, "tcp://*:5558");
  22. int64_t sequence = 0;
  23. zhash_t *kvmap = zhash_new ();
  24. zmq_pollitem_t items [] = {
  25. { collector, 0, ZMQ_POLLIN, 0 },
  26. { snapshot, 0, ZMQ_POLLIN, 0 }
  27. };
  28. while (!zctx_interrupted) {
  29. int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC);
  30. // 执行来自客户端的更新事件
  31. if (items [0].revents & ZMQ_POLLIN) {
  32. kvmsg_t *kvmsg = kvmsg_recv (collector);
  33. if (!kvmsg)
  34. break; // 中断
  35. kvmsg_set_sequence (kvmsg, ++sequence);
  36. kvmsg_send (kvmsg, publisher);
  37. kvmsg_store (&kvmsg, kvmap);
  38. printf ("I: 发布更新事件 %5d\n", (int) sequence);
  39. }
  40. // 响应快照请求
  41. if (items [1].revents & ZMQ_POLLIN) {
  42. zframe_t *identity = zframe_recv (snapshot);
  43. if (!identity)
  44. break; // 中断
  45. // 请求内容在消息的第二帧中
  46. char *request = zstr_recv (snapshot);
  47. if (streq (request, "ICANHAZ?"))
  48. free (request);
  49. else {
  50. printf ("E: 错误的请求,程序中止\n");
  51. break;
  52. }
  53. // 发送快照
  54. kvroute_t routing = { snapshot, identity };
  55. // 逐条发送
  56. zhash_foreach (kvmap, s_send_single, &routing);
  57. // 发送结束标识和编号
  58. printf ("I: 正在发送快照,版本号:%d\n", (int) sequence);
  59. zframe_send (&identity, snapshot, ZFRAME_MORE);
  60. kvmsg_t *kvmsg = kvmsg_new (sequence);
  61. kvmsg_set_key (kvmsg, "KTHXBAI");
  62. kvmsg_set_body (kvmsg, (byte *) "", 0);
  63. kvmsg_send (kvmsg, snapshot);
  64. kvmsg_destroy (&kvmsg);
  65. }
  66. }
  67. printf (" 已中断\n已处理 %d 条消息\n", (int) sequence);
  68. zhash_destroy (&kvmap);
  69. zctx_destroy (&ctx);
  70. return 0;
  71. }
  72. // 发送一条键值对状态给套接字,使用kvmsg对象保存键值对
  73. static int
  74. s_send_single (char *key, void *data, void *args)
  75. {
  76. kvroute_t *kvroute = (kvroute_t *) args;
  77. // Send identity of recipient first
  78. zframe_send (&kvroute->identity,
  79. kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
  80. kvmsg_t *kvmsg = (kvmsg_t *) data;
  81. kvmsg_send (kvmsg, kvroute->socket);
  82. return 0;
  83. }

以下是客户端代码:

clonecli3: Clone client, Model Three in C

  1. //
  2. // 克隆模式 - 客户端 - 模型3
  3. //
  4. // 直接编译,不创建类库
  5. #include "kvsimple.c"
  6. int main (void)
  7. {
  8. // 准备上下文和SUB套接字
  9. zctx_t *ctx = zctx_new ();
  10. void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
  11. zsocket_connect (snapshot, "tcp://localhost:5556");
  12. void *subscriber = zsocket_new (ctx, ZMQ_SUB);
  13. zsocket_connect (subscriber, "tcp://localhost:5557");
  14. void *publisher = zsocket_new (ctx, ZMQ_PUSH);
  15. zsocket_connect (publisher, "tcp://localhost:5558");
  16. zhash_t *kvmap = zhash_new ();
  17. srandom ((unsigned) time (NULL));
  18. // 获取状态快照
  19. int64_t sequence = 0;
  20. zstr_send (snapshot, "ICANHAZ?");
  21. while (TRUE) {
  22. kvmsg_t *kvmsg = kvmsg_recv (snapshot);
  23. if (!kvmsg)
  24. break; // 中断
  25. if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
  26. sequence = kvmsg_sequence (kvmsg);
  27. printf ("I: 已收到快照,版本号:%d\n", (int) sequence);
  28. kvmsg_destroy (&kvmsg);
  29. break; // 完成
  30. }
  31. kvmsg_store (&kvmsg, kvmap);
  32. }
  33. int64_t alarm = zclock_time () + 1000;
  34. while (!zctx_interrupted) {
  35. zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };
  36. int tickless = (int) ((alarm - zclock_time ()));
  37. if (tickless < 0)
  38. tickless = 0;
  39. int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);
  40. if (rc == -1)
  41. break; // 上下文被关闭
  42. if (items [0].revents & ZMQ_POLLIN) {
  43. kvmsg_t *kvmsg = kvmsg_recv (subscriber);
  44. if (!kvmsg)
  45. break; // 中断
  46. // 丢弃过时消息,包括心跳
  47. if (kvmsg_sequence (kvmsg) > sequence) {
  48. sequence = kvmsg_sequence (kvmsg);
  49. kvmsg_store (&kvmsg, kvmap);
  50. printf ("I: 收到更新事件:%d\n", (int) sequence);
  51. }
  52. else
  53. kvmsg_destroy (&kvmsg);
  54. }
  55. // 创建一个随机的更新事件
  56. if (zclock_time () >= alarm) {
  57. kvmsg_t *kvmsg = kvmsg_new (0);
  58. kvmsg_fmt_key (kvmsg, "%d", randof (10000));
  59. kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
  60. kvmsg_send (kvmsg, publisher);
  61. kvmsg_destroy (&kvmsg);
  62. alarm = zclock_time () + 1000;
  63. }
  64. }
  65. printf (" 已准备\n收到 %d 条消息\n", (int) sequence);
  66. zhash_destroy (&kvmap);
  67. zctx_destroy (&ctx);
  68. return 0;
  69. }

几点说明:

  • 服务端整合为一个线程,负责收集来自客户端的更新事件并转发给其他客户端。它使用PULL套接字获取更新事件,ROUTER套接字处理快照请求,以及PUB套接字发布更新事件。

  • 客户端会每隔1秒左右发送随机的更新事件给服务端,现实中这一动作由应用程序触发。

子树克隆

现实中的键值缓存会越变越多,而客户端可能只会需要部分缓存。我们可以使用子树的方式来实现:客户端在发送快照请求时告诉服务端它需要的子树,在订阅更新事件时也指明子树。

关于子树的语法有很多,一种是“分层路径”结构,另一种是“主题树”:

  • 分层路径:/some/list/of/paths
    • 主题树:some.list.of.topics

这里我们会使用分层路径结构,以此扩展服务端和客户端,进行子树操作。维护多个子树其实并不太困难,因此我们不在这里演示。

下面是服务端代码,由模型3衍化而来:

clonesrv4: Clone server, Model Four in C

  1. //
  2. // 克隆模式 服务端 模型4
  3. //
  4. // 直接编译,不创建类库
  5. #include "kvsimple.c"
  6. static int s_send_single (char *key, void *data, void *args);
  7. // 快照请求方信息
  8. typedef struct {
  9. void *socket; // ROUTER套接字
  10. zframe_t *identity; // 请求方标识
  11. char *subtree; // 指定的子树
  12. } kvroute_t;
  13. int main (void)
  14. {
  15. // 准备上下文和套接字
  16. zctx_t *ctx = zctx_new ();
  17. void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
  18. zsocket_bind (snapshot, "tcp://*:5556");
  19. void *publisher = zsocket_new (ctx, ZMQ_PUB);
  20. zsocket_bind (publisher, "tcp://*:5557");
  21. void *collector = zsocket_new (ctx, ZMQ_PULL);
  22. zsocket_bind (collector, "tcp://*:5558");
  23. int64_t sequence = 0;
  24. zhash_t *kvmap = zhash_new ();
  25. zmq_pollitem_t items [] = {
  26. { collector, 0, ZMQ_POLLIN, 0 },
  27. { snapshot, 0, ZMQ_POLLIN, 0 }
  28. };
  29. while (!zctx_interrupted) {
  30. int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC);
  31. // 执行来自客户端的更新事件
  32. if (items [0].revents & ZMQ_POLLIN) {
  33. kvmsg_t *kvmsg = kvmsg_recv (collector);
  34. if (!kvmsg)
  35. break; // Interrupted
  36. kvmsg_set_sequence (kvmsg, ++sequence);
  37. kvmsg_send (kvmsg, publisher);
  38. kvmsg_store (&kvmsg, kvmap);
  39. printf ("I: 发布更新事件 %5d\n", (int) sequence);
  40. }
  41. // 响应快照请求
  42. if (items [1].revents & ZMQ_POLLIN) {
  43. zframe_t *identity = zframe_recv (snapshot);
  44. if (!identity)
  45. break; // Interrupted
  46. // 请求内容在消息的第二帧中
  47. char *request = zstr_recv (snapshot);
  48. char *subtree = NULL;
  49. if (streq (request, "ICANHAZ?")) {
  50. free (request);
  51. subtree = zstr_recv (snapshot);
  52. }
  53. else {
  54. printf ("E: 错误的请求,程序中止\n");
  55. break;
  56. }
  57. // 发送快照
  58. kvroute_t routing = { snapshot, identity, subtree };
  59. // 逐条发送
  60. zhash_foreach (kvmap, s_send_single, &routing);
  61. // 发送结束标识和编号
  62. printf ("I: 正在发送快照,版本号:%d\n", (int) sequence);
  63. zframe_send (&identity, snapshot, ZFRAME_MORE);
  64. kvmsg_t *kvmsg = kvmsg_new (sequence);
  65. kvmsg_set_key (kvmsg, "KTHXBAI");
  66. kvmsg_set_body (kvmsg, (byte *) subtree, 0);
  67. kvmsg_send (kvmsg, snapshot);
  68. kvmsg_destroy (&kvmsg);
  69. free (subtree);
  70. }
  71. }
  72. printf (" 已中断\n已处理 %d 条消息\n", (int) sequence);
  73. zhash_destroy (&kvmap);
  74. zctx_destroy (&ctx);
  75. return 0;
  76. }
  77. // 发送一条键值对状态给套接字,使用kvmsg对象保存键值对
  78. static int
  79. s_send_single (char *key, void *data, void *args)
  80. {
  81. kvroute_t *kvroute = (kvroute_t *) args;
  82. kvmsg_t *kvmsg = (kvmsg_t *) data;
  83. if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
  84. && memcmp (kvroute->subtree,
  85. kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
  86. // 先发送接收方的标识
  87. zframe_send (&kvroute->identity,
  88. kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
  89. kvmsg_send (kvmsg, kvroute->socket);
  90. }
  91. return 0;
  92. }

下面是客户端代码:

clonecli4: Clone client, Model Four in C

  1. //
  2. // 克隆模式 - 客户端 - 模型4
  3. //
  4. // 直接编译,不创建类库
  5. #include "kvsimple.c"
  6. #define SUBTREE "/client/"
  7. int main (void)
  8. {
  9. // 准备上下文和SUB套接字
  10. zctx_t *ctx = zctx_new ();
  11. void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
  12. zsocket_connect (snapshot, "tcp://localhost:5556");
  13. void *subscriber = zsocket_new (ctx, ZMQ_SUB);
  14. zsocket_connect (subscriber, "tcp://localhost:5557");
  15. zsockopt_set_subscribe (subscriber, SUBTREE);
  16. void *publisher = zsocket_new (ctx, ZMQ_PUSH);
  17. zsocket_connect (publisher, "tcp://localhost:5558");
  18. zhash_t *kvmap = zhash_new ();
  19. srandom ((unsigned) time (NULL));
  20. // 获取状态快照
  21. int64_t sequence = 0;
  22. zstr_sendm (snapshot, "ICANHAZ?");
  23. zstr_send (snapshot, SUBTREE);
  24. while (TRUE) {
  25. kvmsg_t *kvmsg = kvmsg_recv (snapshot);
  26. if (!kvmsg)
  27. break; // Interrupted
  28. if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
  29. sequence = kvmsg_sequence (kvmsg);
  30. printf ("I: 已收到快照,版本号:%d\n", (int) sequence);
  31. kvmsg_destroy (&kvmsg);
  32. break; // Done
  33. }
  34. kvmsg_store (&kvmsg, kvmap);
  35. }
  36. int64_t alarm = zclock_time () + 1000;
  37. while (!zctx_interrupted) {
  38. zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };
  39. int tickless = (int) ((alarm - zclock_time ()));
  40. if (tickless < 0)
  41. tickless = 0;
  42. int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);
  43. if (rc == -1)
  44. break; // 上下文被关闭
  45. if (items [0].revents & ZMQ_POLLIN) {
  46. kvmsg_t *kvmsg = kvmsg_recv (subscriber);
  47. if (!kvmsg)
  48. break; // 中断
  49. // 丢弃过时消息,包括心跳
  50. if (kvmsg_sequence (kvmsg) > sequence) {
  51. sequence = kvmsg_sequence (kvmsg);
  52. kvmsg_store (&kvmsg, kvmap);
  53. printf ("I: 收到更新事件:%d\n", (int) sequence);
  54. }
  55. else
  56. kvmsg_destroy (&kvmsg);
  57. }
  58. // 创建一个随机的更新事件
  59. if (zclock_time () >= alarm) {
  60. kvmsg_t *kvmsg = kvmsg_new (0);
  61. kvmsg_fmt_key (kvmsg, "%s%d", SUBTREE, randof (10000));
  62. kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
  63. kvmsg_send (kvmsg, publisher);
  64. kvmsg_destroy (&kvmsg);
  65. alarm = zclock_time () + 1000;
  66. }
  67. }
  68. printf (" 已准备\n收到 %d 条消息\n", (int) sequence);
  69. zhash_destroy (&kvmap);
  70. zctx_destroy (&ctx);
  71. return 0;
  72. }

瞬间值

瞬间值指的是那些会立刻过期的值。如果你用克隆模式搭建一个类似DNS的服务时,就可以用瞬间值来模拟动态DNS解析了。当节点连接网络,对外发布它的地址,并不断地更新地址。如果节点断开连接,则它的地址也会失效。

瞬间值可以和会话(session)联系起来,当会话结束时,瞬间值也就失效了。克隆模式中,会话是由客户端定义的,并会在客户端断开连接时消亡。

更简单的方法是为每一个瞬间值设定一个过期时间,客户端会不断延长这个时间,当断开连接时这个时间将得不到更新,服务器就会自动将其删除。

我们会用这种简单的方法来实现瞬间值,因为太过复杂的方法可能不值当,它们的差别仅在性能上体现。如果客户端有很多瞬间值,那为每个值设定过期时间是恰当的;如果瞬间值到达一定的量,那最好还是将其和会话相关联,统一进行过期处理。

首先,我们需要设法在键值对消息中加入过期时间。我们可以增加一个消息帧,但这样一来每当我们需要增加消息内容时就需要修改kvmsg类库了,这并不合适。所以,我们一次性增加一个“属性”消息帧,用于添加不同的消息属性。

其次,我们需要设法删除这条数据。目前为止服务端和客户端会盲目地增改哈希表中的数据,我们可以这样定义:当消息的值是空的,则表示删除这个键的数据。

下面是一个更为完整的kvmsg类代码,它实现了“属性”帧,以及一个UUID帧,我们后面会用到。该类还会负责处理值为空的消息,达到删除的目的:

kvmsg: Key-value message class - full in C

  1. /* =====================================================================
  2. kvmsg - key-value message class for example applications
  3. ---------------------------------------------------------------------
  4. Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
  5. Copyright other contributors as noted in the AUTHORS file.
  6. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
  7. This is free software; you can redistribute it and/or modify it under
  8. the terms of the GNU Lesser General Public License as published by
  9. the Free Software Foundation; either version 3 of the License, or (at
  10. your option) any later version.
  11. This software is distributed in the hope that it will be useful, but
  12. WITHOUT ANY WARRANTY; without even the implied warranty of
  13. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. Lesser General Public License for more details.
  15. You should have received a copy of the GNU Lesser General Public
  16. License along with this program. If not, see
  17. <http://www.gnu.org/licenses/>.
  18. =====================================================================
  19. */
  20. #include "kvmsg.h"
  21. #include <uuid/uuid.h>
  22. #include "zlist.h"
  23. // 键是短字符串
  24. #define KVMSG_KEY_MAX 255
  25. // 消息包含五帧
  26. // frame 0: 键(ZMQ字符串)
  27. // frame 1: 编号(8个字节,按顺序排列)
  28. // frame 2: UUID(二进制块,16个字节)
  29. // frame 3: 属性(ZMQ字符串)
  30. // frame 4: 值(二进制块)
  31. #define FRAME_KEY 0
  32. #define FRAME_SEQ 1
  33. #define FRAME_UUID 2
  34. #define FRAME_PROPS 3
  35. #define FRAME_BODY 4
  36. #define KVMSG_FRAMES 5
  37. // 类结构
  38. struct _kvmsg {
  39. // 帧是否存在
  40. int present [KVMSG_FRAMES];
  41. // 对应消息帧
  42. zmq_msg_t frame [KVMSG_FRAMES];
  43. // 键,C语言字符串格式
  44. char key [KVMSG_KEY_MAX + 1];
  45. // 属性列表,key=value形式
  46. zlist_t *props;
  47. size_t props_size;
  48. };
  49. // 将属性列表序列化为字符串
  50. static void
  51. s_encode_props (kvmsg_t *self)
  52. {
  53. zmq_msg_t *msg = &self->frame [FRAME_PROPS];
  54. if (self->present [FRAME_PROPS])
  55. zmq_msg_close (msg);
  56. zmq_msg_init_size (msg, self->props_size);
  57. char *prop = zlist_first (self->props);
  58. char *dest = (char *) zmq_msg_data (msg);
  59. while (prop) {
  60. strcpy (dest, prop);
  61. dest += strlen (prop);
  62. *dest++ = '\n';
  63. prop = zlist_next (self->props);
  64. }
  65. self->present [FRAME_PROPS] = 1;
  66. }
  67. // 从字符串中解析属性列表
  68. static void
  69. s_decode_props (kvmsg_t *self)
  70. {
  71. zmq_msg_t *msg = &self->frame [FRAME_PROPS];
  72. self->props_size = 0;
  73. while (zlist_size (self->props))
  74. free (zlist_pop (self->props));
  75. size_t remainder = zmq_msg_size (msg);
  76. char *prop = (char *) zmq_msg_data (msg);
  77. char *eoln = memchr (prop, '\n', remainder);
  78. while (eoln) {
  79. *eoln = 0;
  80. zlist_append (self->props, strdup (prop));
  81. self->props_size += strlen (prop) + 1;
  82. remainder -= strlen (prop) + 1;
  83. prop = eoln + 1;
  84. eoln = memchr (prop, '\n', remainder);
  85. }
  86. }
  87. // ---------------------------------------------------------------------
  88. // 构造函数,指定消息编号
  89. kvmsg_t *
  90. kvmsg_new (int64_t sequence)
  91. {
  92. kvmsg_t
  93. *self;
  94. self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t));
  95. self->props = zlist_new ();
  96. kvmsg_set_sequence (self, sequence);
  97. return self;
  98. }
  99. // ---------------------------------------------------------------------
  100. // 析构函数
  101. // 释放内存函数,供zhash_free_fn()调用
  102. void
  103. kvmsg_free (void *ptr)
  104. {
  105. if (ptr) {
  106. kvmsg_t *self = (kvmsg_t *) ptr;
  107. // 释放所有消息帧
  108. int frame_nbr;
  109. for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
  110. if (self->present [frame_nbr])
  111. zmq_msg_close (&self->frame [frame_nbr]);
  112. // 释放属性列表
  113. while (zlist_size (self->props))
  114. free (zlist_pop (self->props));
  115. zlist_destroy (&self->props);
  116. // 释放对象本身
  117. free (self);
  118. }
  119. }
  120. void
  121. kvmsg_destroy (kvmsg_t **self_p)
  122. {
  123. assert (self_p);
  124. if (*self_p) {
  125. kvmsg_free (*self_p);
  126. *self_p = NULL;
  127. }
  128. }
  129. // ---------------------------------------------------------------------
  130. // 复制kvmsg对象
  131. kvmsg_t *
  132. kvmsg_dup (kvmsg_t *self)
  133. {
  134. kvmsg_t *kvmsg = kvmsg_new (0);
  135. int frame_nbr;
  136. for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
  137. if (self->present [frame_nbr]) {
  138. zmq_msg_t *src = &self->frame [frame_nbr];
  139. zmq_msg_t *dst = &kvmsg->frame [frame_nbr];
  140. zmq_msg_init_size (dst, zmq_msg_size (src));
  141. memcpy (zmq_msg_data (dst),
  142. zmq_msg_data (src), zmq_msg_size (src));
  143. kvmsg->present [frame_nbr] = 1;
  144. }
  145. }
  146. kvmsg->props = zlist_copy (self->props);
  147. return kvmsg;
  148. }
  149. // ---------------------------------------------------------------------
  150. // 从套接字总读取键值对,返回kvmsg实例
  151. kvmsg_t *
  152. kvmsg_recv (void *socket)
  153. {
  154. assert (socket);
  155. kvmsg_t *self = kvmsg_new (0);
  156. // 读取所有帧,若有异常则直接返回空
  157. int frame_nbr;
  158. for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
  159. if (self->present [frame_nbr])
  160. zmq_msg_close (&self->frame [frame_nbr]);
  161. zmq_msg_init (&self->frame [frame_nbr]);
  162. self->present [frame_nbr] = 1;
  163. if (zmq_recvmsg (socket, &self->frame [frame_nbr], 0) == -1) {
  164. kvmsg_destroy (&self);
  165. break;
  166. }
  167. // 验证多帧消息
  168. int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0;
  169. if (zsockopt_rcvmore (socket) != rcvmore) {
  170. kvmsg_destroy (&self);
  171. break;
  172. }
  173. }
  174. if (self)
  175. s_decode_props (self);
  176. return self;
  177. }
  178. // ---------------------------------------------------------------------
  179. // 向套接字发送键值对消息,空消息也发送
  180. void
  181. kvmsg_send (kvmsg_t *self, void *socket)
  182. {
  183. assert (self);
  184. assert (socket);
  185. s_encode_props (self);
  186. int frame_nbr;
  187. for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
  188. zmq_msg_t copy;
  189. zmq_msg_init (&copy);
  190. if (self->present [frame_nbr])
  191. zmq_msg_copy (&copy, &self->frame [frame_nbr]);
  192. zmq_sendmsg (socket, &copy,
  193. (frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0);
  194. zmq_msg_close (&copy);
  195. }
  196. }
  197. // ---------------------------------------------------------------------
  198. // 返回消息的键
  199. char *
  200. kvmsg_key (kvmsg_t *self)
  201. {
  202. assert (self);
  203. if (self->present [FRAME_KEY]) {
  204. if (!*self->key) {
  205. size_t size = zmq_msg_size (&self->frame [FRAME_KEY]);
  206. if (size > KVMSG_KEY_MAX)
  207. size = KVMSG_KEY_MAX;
  208. memcpy (self->key,
  209. zmq_msg_data (&self->frame [FRAME_KEY]), size);
  210. self->key [size] = 0;
  211. }
  212. return self->key;
  213. }
  214. else
  215. return NULL;
  216. }
  217. // ---------------------------------------------------------------------
  218. // 返回消息的编号
  219. int64_t
  220. kvmsg_sequence (kvmsg_t *self)
  221. {
  222. assert (self);
  223. if (self->present [FRAME_SEQ]) {
  224. assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8);
  225. byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]);
  226. int64_t sequence = ((int64_t) (source [0]) << 56)
  227. + ((int64_t) (source [1]) << 48)
  228. + ((int64_t) (source [2]) << 40)
  229. + ((int64_t) (source [3]) << 32)
  230. + ((int64_t) (source [4]) << 24)
  231. + ((int64_t) (source [5]) << 16)
  232. + ((int64_t) (source [6]) << 8)
  233. + (int64_t) (source [7]);
  234. return sequence;
  235. }
  236. else
  237. return 0;
  238. }
  239. // ---------------------------------------------------------------------
  240. // 返回消息的UUID
  241. byte *
  242. kvmsg_uuid (kvmsg_t *self)
  243. {
  244. assert (self);
  245. if (self->present [FRAME_UUID]
  246. && zmq_msg_size (&self->frame [FRAME_UUID]) == sizeof (uuid_t))
  247. return (byte *) zmq_msg_data (&self->frame [FRAME_UUID]);
  248. else
  249. return NULL;
  250. }
  251. // ---------------------------------------------------------------------
  252. // 返回消息的内容
  253. byte *
  254. kvmsg_body (kvmsg_t *self)
  255. {
  256. assert (self);
  257. if (self->present [FRAME_BODY])
  258. return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]);
  259. else
  260. return NULL;
  261. }
  262. // ---------------------------------------------------------------------
  263. // 返回消息内容的长度
  264. size_t
  265. kvmsg_size (kvmsg_t *self)
  266. {
  267. assert (self);
  268. if (self->present [FRAME_BODY])
  269. return zmq_msg_size (&self->frame [FRAME_BODY]);
  270. else
  271. return 0;
  272. }
  273. // ---------------------------------------------------------------------
  274. // 设置消息的键
  275. void
  276. kvmsg_set_key (kvmsg_t *self, char *key)
  277. {
  278. assert (self);
  279. zmq_msg_t *msg = &self->frame [FRAME_KEY];
  280. if (self->present [FRAME_KEY])
  281. zmq_msg_close (msg);
  282. zmq_msg_init_size (msg, strlen (key));
  283. memcpy (zmq_msg_data (msg), key, strlen (key));
  284. self->present [FRAME_KEY] = 1;
  285. }
  286. // ---------------------------------------------------------------------
  287. // 设置消息的编号
  288. void
  289. kvmsg_set_sequence (kvmsg_t *self, int64_t sequence)
  290. {
  291. assert (self);
  292. zmq_msg_t *msg = &self->frame [FRAME_SEQ];
  293. if (self->present [FRAME_SEQ])
  294. zmq_msg_close (msg);
  295. zmq_msg_init_size (msg, 8);
  296. byte *source = zmq_msg_data (msg);
  297. source [0] = (byte) ((sequence >> 56) & 255);
  298. source [1] = (byte) ((sequence >> 48) & 255);
  299. source [2] = (byte) ((sequence >> 40) & 255);
  300. source [3] = (byte) ((sequence >> 32) & 255);
  301. source [4] = (byte) ((sequence >> 24) & 255);
  302. source [5] = (byte) ((sequence >> 16) & 255);
  303. source [6] = (byte) ((sequence >> 8) & 255);
  304. source [7] = (byte) ((sequence) & 255);
  305. self->present [FRAME_SEQ] = 1;
  306. }
  307. // ---------------------------------------------------------------------
  308. // 生成并设置消息的UUID
  309. void
  310. kvmsg_set_uuid (kvmsg_t *self)
  311. {
  312. assert (self);
  313. zmq_msg_t *msg = &self->frame [FRAME_UUID];
  314. uuid_t uuid;
  315. uuid_generate (uuid);
  316. if (self->present [FRAME_UUID])
  317. zmq_msg_close (msg);
  318. zmq_msg_init_size (msg, sizeof (uuid));
  319. memcpy (zmq_msg_data (msg), uuid, sizeof (uuid));
  320. self->present [FRAME_UUID] = 1;
  321. }
  322. // ---------------------------------------------------------------------
  323. // 设置消息的内容
  324. void
  325. kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
  326. {
  327. assert (self);
  328. zmq_msg_t *msg = &self->frame [FRAME_BODY];
  329. if (self->present [FRAME_BODY])
  330. zmq_msg_close (msg);
  331. self->present [FRAME_BODY] = 1;
  332. zmq_msg_init_size (msg, size);
  333. memcpy (zmq_msg_data (msg), body, size);
  334. }
  335. // ---------------------------------------------------------------------
  336. // 使用printf()格式设置消息的键
  337. void
  338. kvmsg_fmt_key (kvmsg_t *self, char *format, ...)
  339. {
  340. char value [KVMSG_KEY_MAX + 1];
  341. va_list args;
  342. assert (self);
  343. va_start (args, format);
  344. vsnprintf (value, KVMSG_KEY_MAX, format, args);
  345. va_end (args);
  346. kvmsg_set_key (self, value);
  347. }
  348. // ---------------------------------------------------------------------
  349. // 使用printf()格式设置消息内容
  350. void
  351. kvmsg_fmt_body (kvmsg_t *self, char *format, ...)
  352. {
  353. char value [255 + 1];
  354. va_list args;
  355. assert (self);
  356. va_start (args, format);
  357. vsnprintf (value, 255, format, args);
  358. va_end (args);
  359. kvmsg_set_body (self, (byte *) value, strlen (value));
  360. }
  361. // ---------------------------------------------------------------------
  362. // 获取消息属性,无则返回空字符串
  363. char *
  364. kvmsg_get_prop (kvmsg_t *self, char *name)
  365. {
  366. assert (strchr (name, '=') == NULL);
  367. char *prop = zlist_first (self->props);
  368. size_t namelen = strlen (name);
  369. while (prop) {
  370. if (strlen (prop) > namelen
  371. && memcmp (prop, name, namelen) == 0
  372. && prop [namelen] == '=')
  373. return prop + namelen + 1;
  374. prop = zlist_next (self->props);
  375. }
  376. return "";
  377. }
  378. // ---------------------------------------------------------------------
  379. // 设置消息属性
  380. // 属性名称不能包含=号,值的最大长度是255
  381. void
  382. kvmsg_set_prop (kvmsg_t *self, char *name, char *format, ...)
  383. {
  384. assert (strchr (name, '=') == NULL);
  385. char value [255 + 1];
  386. va_list args;
  387. assert (self);
  388. va_start (args, format);
  389. vsnprintf (value, 255, format, args);
  390. va_end (args);
  391. // 分配空间
  392. char *prop = malloc (strlen (name) + strlen (value) + 2);
  393. // 删除已存在的属性
  394. sprintf (prop, "%s=", name);
  395. char *existing = zlist_first (self->props);
  396. while (existing) {
  397. if (memcmp (prop, existing, strlen (prop)) == 0) {
  398. self->props_size -= strlen (existing) + 1;
  399. zlist_remove (self->props, existing);
  400. free (existing);
  401. break;
  402. }
  403. existing = zlist_next (self->props);
  404. }
  405. // 添加新属性
  406. strcat (prop, value);
  407. zlist_append (self->props, prop);
  408. self->props_size += strlen (prop) + 1;
  409. }
  410. // ---------------------------------------------------------------------
  411. // 在哈希表中保存kvmsg对象
  412. // 当kvmsg对象不再被使用时进行释放操作;
  413. // 若传入的值为空,则删除该对象。
  414. void
  415. kvmsg_store (kvmsg_t **self_p, zhash_t *hash)
  416. {
  417. assert (self_p);
  418. if (*self_p) {
  419. kvmsg_t *self = *self_p;
  420. assert (self);
  421. if (kvmsg_size (self)) {
  422. if (self->present [FRAME_KEY]
  423. && self->present [FRAME_BODY]) {
  424. zhash_update (hash, kvmsg_key (self), self);
  425. zhash_freefn (hash, kvmsg_key (self), kvmsg_free);
  426. }
  427. }
  428. else
  429. zhash_delete (hash, kvmsg_key (self));
  430. *self_p = NULL;
  431. }
  432. }
  433. // ---------------------------------------------------------------------
  434. // 将消息内容输出到标准错误输出
  435. void
  436. kvmsg_dump (kvmsg_t *self)
  437. {
  438. if (self) {
  439. if (!self) {
  440. fprintf (stderr, "NULL");
  441. return;
  442. }
  443. size_t size = kvmsg_size (self);
  444. byte *body = kvmsg_body (self);
  445. fprintf (stderr, "[seq:%" PRId64 "]", kvmsg_sequence (self));
  446. fprintf (stderr, "[key:%s]", kvmsg_key (self));
  447. fprintf (stderr, "[size:%zd] ", size);
  448. if (zlist_size (self->props)) {
  449. fprintf (stderr, "[");
  450. char *prop = zlist_first (self->props);
  451. while (prop) {
  452. fprintf (stderr, "%s;", prop);
  453. prop = zlist_next (self->props);
  454. }
  455. fprintf (stderr, "]");
  456. }
  457. int char_nbr;
  458. for (char_nbr = 0; char_nbr < size; char_nbr++)
  459. fprintf (stderr, "%02X", body [char_nbr]);
  460. fprintf (stderr, "\n");
  461. }
  462. else
  463. fprintf (stderr, "NULL message\n");
  464. }
  465. // ---------------------------------------------------------------------
  466. // 测试用例
  467. int
  468. kvmsg_test (int verbose)
  469. {
  470. kvmsg_t
  471. *kvmsg;
  472. printf (" * kvmsg: ");
  473. // 准备上下文和套接字
  474. zctx_t *ctx = zctx_new ();
  475. void *output = zsocket_new (ctx, ZMQ_DEALER);
  476. int rc = zmq_bind (output, "ipc://kvmsg_selftest.ipc");
  477. assert (rc == 0);
  478. void *input = zsocket_new (ctx, ZMQ_DEALER);
  479. rc = zmq_connect (input, "ipc://kvmsg_selftest.ipc");
  480. assert (rc == 0);
  481. zhash_t *kvmap = zhash_new ();
  482. // 测试简单消息的收发
  483. kvmsg = kvmsg_new (1);
  484. kvmsg_set_key (kvmsg, "key");
  485. kvmsg_set_uuid (kvmsg);
  486. kvmsg_set_body (kvmsg, (byte *) "body", 4);
  487. if (verbose)
  488. kvmsg_dump (kvmsg);
  489. kvmsg_send (kvmsg, output);
  490. kvmsg_store (&kvmsg, kvmap);
  491. kvmsg = kvmsg_recv (input);
  492. if (verbose)
  493. kvmsg_dump (kvmsg);
  494. assert (streq (kvmsg_key (kvmsg), "key"));
  495. kvmsg_store (&kvmsg, kvmap);
  496. // 测试带有属性的消息的收发
  497. kvmsg = kvmsg_new (2);
  498. kvmsg_set_prop (kvmsg, "prop1", "value1");
  499. kvmsg_set_prop (kvmsg, "prop2", "value1");
  500. kvmsg_set_prop (kvmsg, "prop2", "value2");
  501. kvmsg_set_key (kvmsg, "key");
  502. kvmsg_set_uuid (kvmsg);
  503. kvmsg_set_body (kvmsg, (byte *) "body", 4);
  504. assert (streq (kvmsg_get_prop (kvmsg, "prop2"), "value2"));
  505. if (verbose)
  506. kvmsg_dump (kvmsg);
  507. kvmsg_send (kvmsg, output);
  508. kvmsg_destroy (&kvmsg);
  509. kvmsg = kvmsg_recv (input);
  510. if (verbose)
  511. kvmsg_dump (kvmsg);
  512. assert (streq (kvmsg_key (kvmsg), "key"));
  513. assert (streq (kvmsg_get_prop (kvmsg, "prop2"), "value2"));
  514. kvmsg_destroy (&kvmsg);
  515. // 关闭并销毁所有对象
  516. zhash_destroy (&kvmap);
  517. zctx_destroy (&ctx);
  518. printf ("OK\n");
  519. return 0;
  520. }

客户端模型5和模型4没有太大区别,只是kvmsg类库变了。在更新消息的时候还需要添加一个过期时间的属性:

  1. kvmsg_set_prop (kvmsg, "ttl", "%d", randof (30));

服务端模型5有较大的变化,我们会用反应堆来代替轮询,这样就能混合处理定时事件和套接字事件了,只是在C语言中是比较麻烦的。下面是代码:

clonesrv5: Clone server, Model Five in C

  1. //
  2. // 克隆模式 - 服务端 - 模型5
  3. //
  4. // 直接编译,不建类库
  5. #include "kvmsg.c"
  6. // 反应堆处理器
  7. static int s_snapshots (zloop_t *loop, void *socket, void *args);
  8. static int s_collector (zloop_t *loop, void *socket, void *args);
  9. static int s_flush_ttl (zloop_t *loop, void *socket, void *args);
  10. // 服务器属性
  11. typedef struct {
  12. zctx_t *ctx; // 上下文
  13. zhash_t *kvmap; // 键值对存储
  14. zloop_t *loop; // zloop反应堆
  15. int port; // 主端口
  16. int64_t sequence; // 更新事件编号
  17. void *snapshot; // 处理快照请求
  18. void *publisher; // 发布更新事件
  19. void *collector; // 从客户端收集接收更新事件
  20. } clonesrv_t;
  21. int main (void)
  22. {
  23. clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
  24. self->port = 5556;
  25. self->ctx = zctx_new ();
  26. self->kvmap = zhash_new ();
  27. self->loop = zloop_new ();
  28. zloop_set_verbose (self->loop, FALSE);
  29. // 打开克隆模式服务端套接字
  30. self->snapshot = zsocket_new (self->ctx, ZMQ_ROUTER);
  31. self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
  32. self->collector = zsocket_new (self->ctx, ZMQ_PULL);
  33. zsocket_bind (self->snapshot, "tcp://*:%d", self->port);
  34. zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1);
  35. zsocket_bind (self->collector, "tcp://*:%d", self->port + 2);
  36. // 注册反应堆处理程序
  37. zloop_reader (self->loop, self->snapshot, s_snapshots, self);
  38. zloop_reader (self->loop, self->collector, s_collector, self);
  39. zloop_timer (self->loop, 1000, 0, s_flush_ttl, self);
  40. // 运行反应堆,直至中断
  41. zloop_start (self->loop);
  42. zloop_destroy (&self->loop);
  43. zhash_destroy (&self->kvmap);
  44. zctx_destroy (&self->ctx);
  45. free (self);
  46. return 0;
  47. }
  48. // ---------------------------------------------------------------------
  49. // 发送快照内容
  50. static int s_send_single (char *key, void *data, void *args);
  51. // 请求方信息
  52. typedef struct {
  53. void *socket; // ROUTER套接字
  54. zframe_t *identity; // 请求方标识
  55. char *subtree; // 子树信息
  56. } kvroute_t;
  57. static int
  58. s_snapshots (zloop_t *loop, void *snapshot, void *args)
  59. {
  60. clonesrv_t *self = (clonesrv_t *) args;
  61. zframe_t *identity = zframe_recv (snapshot);
  62. if (identity) {
  63. // 请求位于消息第二帧
  64. char *request = zstr_recv (snapshot);
  65. char *subtree = NULL;
  66. if (streq (request, "ICANHAZ?")) {
  67. free (request);
  68. subtree = zstr_recv (snapshot);
  69. }
  70. else
  71. printf ("E: 错误的请求,程序中止\n");
  72. if (subtree) {
  73. // 发送状态快照
  74. kvroute_t routing = { snapshot, identity, subtree };
  75. zhash_foreach (self->kvmap, s_send_single, &routing);
  76. // 发送结束符和版本号
  77. zclock_log ("I: 正在发送快照,版本号:%d", (int) self->sequence);
  78. zframe_send (&identity, snapshot, ZFRAME_MORE);
  79. kvmsg_t *kvmsg = kvmsg_new (self->sequence);
  80. kvmsg_set_key (kvmsg, "KTHXBAI");
  81. kvmsg_set_body (kvmsg, (byte *) subtree, 0);
  82. kvmsg_send (kvmsg, snapshot);
  83. kvmsg_destroy (&kvmsg);
  84. free (subtree);
  85. }
  86. }
  87. return 0;
  88. }
  89. // 每次发送一个快照键值对
  90. static int
  91. s_send_single (char *key, void *data, void *args)
  92. {
  93. kvroute_t *kvroute = (kvroute_t *) args;
  94. kvmsg_t *kvmsg = (kvmsg_t *) data;
  95. if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
  96. && memcmp (kvroute->subtree,
  97. kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
  98. // 先发送接收方标识
  99. zframe_send (&kvroute->identity,
  100. kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
  101. kvmsg_send (kvmsg, kvroute->socket);
  102. }
  103. return 0;
  104. }
  105. // ---------------------------------------------------------------------
  106. // 收集更新事件
  107. static int
  108. s_collector (zloop_t *loop, void *collector, void *args)
  109. {
  110. clonesrv_t *self = (clonesrv_t *) args;
  111. kvmsg_t *kvmsg = kvmsg_recv (collector);
  112. if (kvmsg) {
  113. kvmsg_set_sequence (kvmsg, ++self->sequence);
  114. kvmsg_send (kvmsg, self->publisher);
  115. int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl"));
  116. if (ttl)
  117. kvmsg_set_prop (kvmsg, "ttl",
  118. "%" PRId64, zclock_time () + ttl * 1000);
  119. kvmsg_store (&kvmsg, self->kvmap);
  120. zclock_log ("I: 正在发布更新事件 %d", (int) self->sequence);
  121. }
  122. return 0;
  123. }
  124. // ---------------------------------------------------------------------
  125. // 删除过期的瞬间值
  126. static int s_flush_single (char *key, void *data, void *args);
  127. static int
  128. s_flush_ttl (zloop_t *loop, void *unused, void *args)
  129. {
  130. clonesrv_t *self = (clonesrv_t *) args;
  131. zhash_foreach (self->kvmap, s_flush_single, args);
  132. return 0;
  133. }
  134. // 删除过期的键值对,并广播该事件
  135. static int
  136. s_flush_single (char *key, void *data, void *args)
  137. {
  138. clonesrv_t *self = (clonesrv_t *) args;
  139. kvmsg_t *kvmsg = (kvmsg_t *) data;
  140. int64_t ttl;
  141. sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl);
  142. if (ttl && zclock_time () >= ttl) {
  143. kvmsg_set_sequence (kvmsg, ++self->sequence);
  144. kvmsg_set_body (kvmsg, (byte *) "", 0);
  145. kvmsg_send (kvmsg, self->publisher);
  146. kvmsg_store (&kvmsg, self->kvmap);
  147. zclock_log ("I: 发布删除事件 %d", (int) self->sequence);
  148. }
  149. return 0;
  150. }

克隆服务器的可靠性

克隆模型1至5相对比较简单,下面我们会探讨一个非常复杂的模型。可以发现,为了构建可靠的消息队列,我们需要花费非常多的精力。所以我们经常会问:有必要这么做吗?如果说你能够接受可靠性不够高的、或者说已经足够好的架构,那恭喜你,你在成本和收益之间找到了平衡。虽然我们会偶尔丢失一些消息,但从经济的角度来说还是合理的。不管怎样,下面我们就来介绍这个复杂的模型。

在模型3中,你会关闭和重启服务,这会导致数据的丢失。任何后续加入的客户端只能得到重启之后的那些数据,而非所有的。下面就让我们想办法让克隆模式能够承担服务器重启的故障。

以下列举我们需要处理的问题:

  • 克隆服务器进程崩溃并自动或手工重启。进程丢失了所有数据,所以必须从别处进行恢复。

  • 克隆服务器硬件故障,长时间不能恢复。客户端需要切换至另一个可用的服务端。

  • 克隆服务器从网络上断开,如交换机发生故障等。它会在某个时点重连,但期间的数据就需要替代的服务器负责处理。

第一步我们需要增加一个服务器。我们可以使用第四章中提到的双子星模式,它是一个反应堆,而我们的程序经过整理后也是一个反应堆,因此可以互相协作。

我们需要保证更新事件在主服务器崩溃时仍能保留,最简单的机制就是同时发送给两台服务器。

备机就可以当做一台客户端来运行,像其他客户端一样从主机获取更新事件。同时它又能从客户端获取更新事件——虽然不应该以此更新数据,但可以先暂存起来。

所以,相较于模型5,模型6中引入了以下特性:

  • 客户端发送更新事件改用PUB-SUB套接字,而非PUSH-PULL。原因是PUSH套接字会在没有接收方时阻塞,且会进行负载均衡——我们需要两台服务器都接收到消息。我们会在服务器端绑定SUB套接字,在客户端连接PUB套接字。

  • 我们在服务器发送给客户端的更新事件中加入心跳,这样客户端可以知道主机是否已死,然后切换至备机。

  • 我们使用双子星模式的bstar反应堆类来创建主机和备机。双子星模式中需要有一个“投票”套接字,来协助判定对方节点是否已死。这里我们使用快照请求来作为“投票”。

  • 我们将为所有的更新事件添加UUID属性,它由客户端生成,服务端会将其发布给所有客户端。

  • 备机将维护一个“待处理列表”,保存来自客户端、尚未由服务端发布的更新事件;或者反过来,来自服务端、尚未从客户端收到的更新事件。这个列表从旧到新排列,这样就能方便地从顶部删除消息。

我们可以为客户端设计一个有限状态机,它有三种状态:

  • 客户端打开并连接了套接字,然后向服务端发送快照请求。为了避免消息风暴,它只会请求两次。

  • 客户端等待快照应答,如果获得了则保存它;如果没有获得,则向第二个服务器发送请求。

  • 客户端收到快照,便开始等待更新事件。如果在一定时间内没有收到服务端响应,则会连接第二个服务端。

客户端会一直循环下去,可能在程序刚启动时,部分客户端会试图连接主机,部分连接备机,相信双子星模式会很好地处理这一情况的。

我们可以将客户端状态图绘制出来:

6

故障恢复的步骤如下:

  • 客户端检测到主机不再发送心跳,因此转而连接备机,并请求一份新的快照;
  • 备机开始接收快照请求,并检测到主机死亡,于是开始作为主机运行;
  • 备机将待处理列表中的更新事件写入自身状态中,然后开始处理快照请求。

当主机恢复连接时:

  • 启动为slave状态,并作为克隆模式客户端连接备机;
  • 同时,使用SUB套接字从客户端接收更新事件。

我们做两点假设:

  • 至少有一台主机会继续运行。如果两台主机都崩溃了,那我们将丢失所有的服务端数据,无法恢复。
  • 不同的客户端不会同时更新同一个键值对。客户端的更新事件会先后到达两个服务器,因此更新的顺序可能会不一致。单个客户端的更新事件到达两台服务器的顺序是相同的,所以不用担心。

下面是整体架构图:

7

开始编程之前,我们需要将客户端重构成一个可复用的类。在ZMQ中写异步类有时是为了练习如何写出优雅的代码,但这里我们确实是希望克隆模式可以成为一种易于使用的程序。上述架构的伸缩性来源于客户端的正确行为,因此有必要将其封装成一份API。要在客户端中进行故障恢复还是比较复杂的,试想一下自由者模式和克隆模式结合起来会是什么样的吧。

按照我的习惯,我会先写出一份API的列表,然后加以实现。让我们假想一个名为clone的API,在其基础之上编写克隆模式客户端API。将代码封装为API显然会提升代码的稳定性,就以模型5为例,客户端需要打开三个套接字,端点名称直接写在了代码里。我们可以创建这样一组API:

  1. // 为每个套接字指定端点
  2. clone_subscribe (clone, "tcp://localhost:5556");
  3. clone_snapshot (clone, "tcp://localhost:5557");
  4. clone_updates (clone, "tcp://localhost:5558");
  5. // 由于有两个服务端,因此再执行一次
  6. clone_subscribe (clone, "tcp://localhost:5566");
  7. clone_snapshot (clone, "tcp://localhost:5567");
  8. clone_updates (clone, "tcp://localhost:5568");

但这种写法还是比较啰嗦的,因为没有必要将API内部的一些设计暴露给编程人员。现在我们会使用三个套接字,而将来可能就会使用两个,或者四个。我们不可能让所有的应用程序都相应地修改吧?让我们把这些信息包装到API中:

  1. // 指定主备服务器端点
  2. clone_connect (clone, "tcp://localhost:5551");
  3. clone_connect (clone, "tcp://localhost:5561");

这样一来代码就变得非常简洁,不过也会对现有代码的内部就够造成影响。我们需要从一个端点中推算出三个端点。一种方法是假设客户端和服务端使用三个连续的端点通信,并将这个规则写入协议;另一个方法是向服务器索取缺少的端点信息。我们使用第一种较为简单的方法:

  • 服务器状态ROUTER在端点P;
  • 服务器更新事件PUB在端点P + 1;
  • 服务器更新事件SUB在端点P + 2。

clone类和第四章的flcliapi类很类似,由两部分组成:

  • 一个在后台运行的异步克隆模式代理。该代理处理所有的I/O操作,实时地和服务器进行通信;
  • 一个在前台应用程序中同步运行的clone类。当你创建了一个clone对象后,它会自动创建后台的clone线程;当你销毁clone对象,该后台线程也会被销毁。

前台的clone类会使用inproc管道和后台的代理进行通信。C语言中,czmq线程会自动为我们创建这个管道。这也是ZMQ多线程编程的常规方式。

如果没有ZMQ,这种异步的设计将很难处理高压工作,而ZMQ会让其变得简单。编写出来额代码会相对比较复杂。我们可以用反应堆的模式来编写,但这会进一步增加复杂度,且影响应用程序的使用。因此,我们的设计的API将更像是一个能够和服务器进行通信的键值表:

  1. clone_t *clone_new (void);
  2. void clone_destroy (clone_t **self_p);
  3. void clone_connect (clone_t *self, char *address, char *service);
  4. void clone_set (clone_t *self, char *key, char *value);
  5. char *clone_get (clone_t *self, char *key);

下面就是克隆模式客户端模型6的代码,因为调用了API,所以非常简短:
clonecli6: Clone client, Model Six in C

  1. //
  2. // 克隆模式 - 客户端 - 模型6
  3. //
  4. // 直接编译,不建类库
  5. #include "clone.c"
  6. #define SUBTREE "/client/"
  7. int main (void)
  8. {
  9. // 创建分布式哈希表
  10. clone_t *clone = clone_new ();
  11. // 配置
  12. clone_subtree (clone, SUBTREE);
  13. clone_connect (clone, "tcp://localhost", "5556");
  14. clone_connect (clone, "tcp://localhost", "5566");
  15. // 插入随机键值
  16. while (!zctx_interrupted) {
  17. // 生成随机值
  18. char key [255];
  19. char value [10];
  20. sprintf (key, "%s%d", SUBTREE, randof (10000));
  21. sprintf (value, "%d", randof (1000000));
  22. clone_set (clone, key, value, randof (30));
  23. sleep (1);
  24. }
  25. clone_destroy (&clone);
  26. return 0;
  27. }

以下是clone类的实现:
clone: Clone class in C

  1. /* =====================================================================
  2. clone - client-side Clone Pattern class
  3. ---------------------------------------------------------------------
  4. Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
  5. Copyright other contributors as noted in the AUTHORS file.
  6. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
  7. This is free software; you can redistribute it and/or modify it under
  8. the terms of the GNU Lesser General Public License as published by
  9. the Free Software Foundation; either version 3 of the License, or (at
  10. your option) any later version.
  11. This software is distributed in the hope that it will be useful, but
  12. WITHOUT ANY WARRANTY; without even the implied warranty of
  13. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. Lesser General Public License for more details.
  15. You should have received a copy of the GNU Lesser General Public
  16. License along with this program. If not, see
  17. <http://www.gnu.org/licenses/>.
  18. =====================================================================
  19. */
  20. #include "clone.h"
  21. // 请求超时时间
  22. #define GLOBAL_TIMEOUT 4000 // msecs
  23. // 判定服务器死亡的时间
  24. #define SERVER_TTL 5000 // msecs
  25. // 服务器数量
  26. #define SERVER_MAX 2
  27. // =====================================================================
  28. // 同步部分,在应用程序线程中工作
  29. // ---------------------------------------------------------------------
  30. // 类结构
  31. struct _clone_t {
  32. zctx_t *ctx; // 上下文
  33. void *pipe; // 和后台代理间的通信套接字
  34. };
  35. // 该线程用于处理真正的clone类
  36. static void clone_agent (void *args, zctx_t *ctx, void *pipe);
  37. // ---------------------------------------------------------------------
  38. // 构造函数
  39. clone_t *
  40. clone_new (void)
  41. {
  42. clone_t
  43. *self;
  44. self = (clone_t *) zmalloc (sizeof (clone_t));
  45. self->ctx = zctx_new ();
  46. self->pipe = zthread_fork (self->ctx, clone_agent, NULL);
  47. return self;
  48. }
  49. // ---------------------------------------------------------------------
  50. // 析构函数
  51. void
  52. clone_destroy (clone_t **self_p)
  53. {
  54. assert (self_p);
  55. if (*self_p) {
  56. clone_t *self = *self_p;
  57. zctx_destroy (&self->ctx);
  58. free (self);
  59. *self_p = NULL;
  60. }
  61. }
  62. // ---------------------------------------------------------------------
  63. // 在链接之前指定快照和更新事件的子树
  64. // 发送给后台代理的消息内容为[SUBTREE][subtree]
  65. void clone_subtree (clone_t *self, char *subtree)
  66. {
  67. assert (self);
  68. zmsg_t *msg = zmsg_new ();
  69. zmsg_addstr (msg, "SUBTREE");
  70. zmsg_addstr (msg, subtree);
  71. zmsg_send (&msg, self->pipe);
  72. }
  73. // ---------------------------------------------------------------------
  74. // 连接至新的服务器端点
  75. // 消息内容:[CONNECT][endpoint][service]
  76. void
  77. clone_connect (clone_t *self, char *address, char *service)
  78. {
  79. assert (self);
  80. zmsg_t *msg = zmsg_new ();
  81. zmsg_addstr (msg, "CONNECT");
  82. zmsg_addstr (msg, address);
  83. zmsg_addstr (msg, service);
  84. zmsg_send (&msg, self->pipe);
  85. }
  86. // ---------------------------------------------------------------------
  87. // 设置新值
  88. // 消息内容:[SET][key][value][ttl]
  89. void
  90. clone_set (clone_t *self, char *key, char *value, int ttl)
  91. {
  92. char ttlstr [10];
  93. sprintf (ttlstr, "%d", ttl);
  94. assert (self);
  95. zmsg_t *msg = zmsg_new ();
  96. zmsg_addstr (msg, "SET");
  97. zmsg_addstr (msg, key);
  98. zmsg_addstr (msg, value);
  99. zmsg_addstr (msg, ttlstr);
  100. zmsg_send (&msg, self->pipe);
  101. }
  102. // ---------------------------------------------------------------------
  103. // 取值
  104. // 消息内容:[GET][key]
  105. // 如果没有clone可用,会返回NULL
  106. char *
  107. clone_get (clone_t *self, char *key)
  108. {
  109. assert (self);
  110. assert (key);
  111. zmsg_t *msg = zmsg_new ();
  112. zmsg_addstr (msg, "GET");
  113. zmsg_addstr (msg, key);
  114. zmsg_send (&msg, self->pipe);
  115. zmsg_t *reply = zmsg_recv (self->pipe);
  116. if (reply) {
  117. char *value = zmsg_popstr (reply);
  118. zmsg_destroy (&reply);
  119. return value;
  120. }
  121. return NULL;
  122. }
  123. // =====================================================================
  124. // 异步部分,在后台运行
  125. // ---------------------------------------------------------------------
  126. // 单个服务端信息
  127. typedef struct {
  128. char *address; // 服务端地址
  129. int port; // 端口
  130. void *snapshot; // 快照套接字
  131. void *subscriber; // 接收更新事件的套接字
  132. uint64_t expiry; // 服务器过期时间
  133. uint requests; // 收到的快照请求数
  134. } server_t;
  135. static server_t *
  136. server_new (zctx_t *ctx, char *address, int port, char *subtree)
  137. {
  138. server_t *self = (server_t *) zmalloc (sizeof (server_t));
  139. zclock_log ("I: adding server %s:%d...", address, port);
  140. self->address = strdup (address);
  141. self->port = port;
  142. self->snapshot = zsocket_new (ctx, ZMQ_DEALER);
  143. zsocket_connect (self->snapshot, "%s:%d", address, port);
  144. self->subscriber = zsocket_new (ctx, ZMQ_SUB);
  145. zsocket_connect (self->subscriber, "%s:%d", address, port + 1);
  146. zsockopt_set_subscribe (self->subscriber, subtree);
  147. return self;
  148. }
  149. static void
  150. server_destroy (server_t **self_p)
  151. {
  152. assert (self_p);
  153. if (*self_p) {
  154. server_t *self = *self_p;
  155. free (self->address);
  156. free (self);
  157. *self_p = NULL;
  158. }
  159. }
  160. // ---------------------------------------------------------------------
  161. // 后台代理类
  162. // 状态
  163. #define STATE_INITIAL 0 // 连接之前
  164. #define STATE_SYNCING 1 // 正在同步
  165. #define STATE_ACTIVE 2 // 正在更新
  166. typedef struct {
  167. zctx_t *ctx; // 上下文
  168. void *pipe; // 与主线程通信的套接字
  169. zhash_t *kvmap; // 键值表
  170. char *subtree; // 子树
  171. server_t *server [SERVER_MAX];
  172. uint nbr_servers; // 范围:0 - SERVER_MAX
  173. uint state; // 当前状态
  174. uint cur_server; // 当前master,0/1
  175. int64_t sequence; // 键值对编号
  176. void *publisher; // 发布更新事件的套接字
  177. } agent_t;
  178. static agent_t *
  179. agent_new (zctx_t *ctx, void *pipe)
  180. {
  181. agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
  182. self->ctx = ctx;
  183. self->pipe = pipe;
  184. self->kvmap = zhash_new ();
  185. self->subtree = strdup ("");
  186. self->state = STATE_INITIAL;
  187. self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
  188. return self;
  189. }
  190. static void
  191. agent_destroy (agent_t **self_p)
  192. {
  193. assert (self_p);
  194. if (*self_p) {
  195. agent_t *self = *self_p;
  196. int server_nbr;
  197. for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++)
  198. server_destroy (&self->server [server_nbr]);
  199. zhash_destroy (&self->kvmap);
  200. free (self->subtree);
  201. free (self);
  202. *self_p = NULL;
  203. }
  204. }
  205. // 若线程被中断则返回-1
  206. static int
  207. agent_control_message (agent_t *self)
  208. {
  209. zmsg_t *msg = zmsg_recv (self->pipe);
  210. char *command = zmsg_popstr (msg);
  211. if (command == NULL)
  212. return -1;
  213. if (streq (command, "SUBTREE")) {
  214. free (self->subtree);
  215. self->subtree = zmsg_popstr (msg);
  216. }
  217. else
  218. if (streq (command, "CONNECT")) {
  219. char *address = zmsg_popstr (msg);
  220. char *service = zmsg_popstr (msg);
  221. if (self->nbr_servers < SERVER_MAX) {
  222. self->server [self->nbr_servers++] = server_new (
  223. self->ctx, address, atoi (service), self->subtree);
  224. // 广播更新事件
  225. zsocket_connect (self->publisher, "%s:%d",
  226. address, atoi (service) + 2);
  227. }
  228. else
  229. zclock_log ("E: too many servers (max. %d)", SERVER_MAX);
  230. free (address);
  231. free (service);
  232. }
  233. else
  234. if (streq (command, "SET")) {
  235. char *key = zmsg_popstr (msg);
  236. char *value = zmsg_popstr (msg);
  237. char *ttl = zmsg_popstr (msg);
  238. zhash_update (self->kvmap, key, (byte *) value);
  239. zhash_freefn (self->kvmap, key, free);
  240. // 向服务端发送键值对
  241. kvmsg_t *kvmsg = kvmsg_new (0);
  242. kvmsg_set_key (kvmsg, key);
  243. kvmsg_set_uuid (kvmsg);
  244. kvmsg_fmt_body (kvmsg, "%s", value);
  245. kvmsg_set_prop (kvmsg, "ttl", ttl);
  246. kvmsg_send (kvmsg, self->publisher);
  247. kvmsg_destroy (&kvmsg);
  248. puts (key);
  249. free (ttl);
  250. free (key); // 键值对实际由哈希表对象控制
  251. }
  252. else
  253. if (streq (command, "GET")) {
  254. char *key = zmsg_popstr (msg);
  255. char *value = zhash_lookup (self->kvmap, key);
  256. if (value)
  257. zstr_send (self->pipe, value);
  258. else
  259. zstr_send (self->pipe, "");
  260. free (key);
  261. free (value);
  262. }
  263. free (command);
  264. zmsg_destroy (&msg);
  265. return 0;
  266. }
  267. // ---------------------------------------------------------------------
  268. // 异步的后台代理会维护一个服务端池,并处理来自应用程序的请求或应答。
  269. static void
  270. clone_agent (void *args, zctx_t *ctx, void *pipe)
  271. {
  272. agent_t *self = agent_new (ctx, pipe);
  273. while (TRUE) {
  274. zmq_pollitem_t poll_set [] = {
  275. { pipe, 0, ZMQ_POLLIN, 0 },
  276. { 0, 0, ZMQ_POLLIN, 0 }
  277. };
  278. int poll_timer = -1;
  279. int poll_size = 2;
  280. server_t *server = self->server [self->cur_server];
  281. switch (self->state) {
  282. case STATE_INITIAL:
  283. // 该状态下,如果有可用服务,会发送快照请求
  284. if (self->nbr_servers > 0) {
  285. zclock_log ("I: 正在等待服务器 %s:%d...",
  286. server->address, server->port);
  287. if (server->requests < 2) {
  288. zstr_sendm (server->snapshot, "ICANHAZ?");
  289. zstr_send (server->snapshot, self->subtree);
  290. server->requests++;
  291. }
  292. server->expiry = zclock_time () + SERVER_TTL;
  293. self->state = STATE_SYNCING;
  294. poll_set [1].socket = server->snapshot;
  295. }
  296. else
  297. poll_size = 1;
  298. break;
  299. case STATE_SYNCING:
  300. // 该状态下我们从服务器端接收快照内容,若失败则尝试其他服务器
  301. poll_set [1].socket = server->snapshot;
  302. break;
  303. case STATE_ACTIVE:
  304. // 该状态下我们从服务器获取更新事件,失败则尝试其他服务器
  305. poll_set [1].socket = server->subscriber;
  306. break;
  307. }
  308. if (server) {
  309. poll_timer = (server->expiry - zclock_time ())
  310. * ZMQ_POLL_MSEC;
  311. if (poll_timer < 0)
  312. poll_timer = 0;
  313. }
  314. // ------------------------------------------------------------
  315. // poll循环
  316. int rc = zmq_poll (poll_set, poll_size, poll_timer);
  317. if (rc == -1)
  318. break; // 上下文已被关闭
  319. if (poll_set [0].revents & ZMQ_POLLIN) {
  320. if (agent_control_message (self))
  321. break; // 中断
  322. }
  323. else
  324. if (poll_set [1].revents & ZMQ_POLLIN) {
  325. kvmsg_t *kvmsg = kvmsg_recv (poll_set [1].socket);
  326. if (!kvmsg)
  327. break; // 中断
  328. // 任何服务端的消息将重置它的过期时间
  329. server->expiry = zclock_time () + SERVER_TTL;
  330. if (self->state == STATE_SYNCING) {
  331. // 保存快照内容
  332. server->requests = 0;
  333. if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
  334. self->sequence = kvmsg_sequence (kvmsg);
  335. self->state = STATE_ACTIVE;
  336. zclock_log ("I: received from %s:%d snapshot=%d",
  337. server->address, server->port,
  338. (int) self->sequence);
  339. kvmsg_destroy (&kvmsg);
  340. }
  341. else
  342. kvmsg_store (&kvmsg, self->kvmap);
  343. }
  344. else
  345. if (self->state == STATE_ACTIVE) {
  346. // 丢弃过期的更新事件
  347. if (kvmsg_sequence (kvmsg) > self->sequence) {
  348. self->sequence = kvmsg_sequence (kvmsg);
  349. kvmsg_store (&kvmsg, self->kvmap);
  350. zclock_log ("I: received from %s:%d update=%d",
  351. server->address, server->port,
  352. (int) self->sequence);
  353. }
  354. else
  355. kvmsg_destroy (&kvmsg);
  356. }
  357. }
  358. else {
  359. // 服务端已死,尝试其他服务器
  360. zclock_log ("I: 服务器 %s:%d 无响应",
  361. server->address, server->port);
  362. self->cur_server = (self->cur_server + 1) % self->nbr_servers;
  363. self->state = STATE_INITIAL;
  364. }
  365. }
  366. agent_destroy (&self);
  367. }

最后是克隆服务器的模型6代码:

clonesrv6: Clone server, Model Six in C

  1. //
  2. // 克隆模式 - 服务端 - 模型6
  3. //
  4. // 直接编译,不建类库
  5. #include "bstar.c"
  6. #include "kvmsg.c"
  7. // bstar反应堆API
  8. static int s_snapshots (zloop_t *loop, void *socket, void *args);
  9. static int s_collector (zloop_t *loop, void *socket, void *args);
  10. static int s_flush_ttl (zloop_t *loop, void *socket, void *args);
  11. static int s_send_hugz (zloop_t *loop, void *socket, void *args);
  12. static int s_new_master (zloop_t *loop, void *unused, void *args);
  13. static int s_new_slave (zloop_t *loop, void *unused, void *args);
  14. static int s_subscriber (zloop_t *loop, void *socket, void *args);
  15. // 服务端属性
  16. typedef struct {
  17. zctx_t *ctx; // 上下文
  18. zhash_t *kvmap; // 存放键值对
  19. bstar_t *bstar; // bstar反应堆核心
  20. int64_t sequence; // 更新事件编号
  21. int port; // 主端口
  22. int peer; // 同伴端口
  23. void *publisher; // 发布更新事件的端口
  24. void *collector; // 接收客户端更新事件的端口
  25. void *subscriber; // 接受同伴更新事件的端口
  26. zlist_t *pending; // 延迟的更新事件
  27. Bool primary; // 是否为主机
  28. Bool master; // 是否为master
  29. Bool slave; // 是否为slave
  30. } clonesrv_t;
  31. int main (int argc, char *argv [])
  32. {
  33. clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
  34. if (argc == 2 && streq (argv [1], "-p")) {
  35. zclock_log ("I: 作为主机master运行,正在等待备机slave连接。");
  36. self->bstar = bstar_new (BSTAR_PRIMARY, "tcp://*:5003",
  37. "tcp://localhost:5004");
  38. bstar_voter (self->bstar, "tcp://*:5556", ZMQ_ROUTER,
  39. s_snapshots, self);
  40. self->port = 5556;
  41. self->peer = 5566;
  42. self->primary = TRUE;
  43. }
  44. else
  45. if (argc == 2 && streq (argv [1], "-b")) {
  46. zclock_log ("I: 作为备机slave运行,正在等待主机master连接。");
  47. self->bstar = bstar_new (BSTAR_BACKUP, "tcp://*:5004",
  48. "tcp://localhost:5003");
  49. bstar_voter (self->bstar, "tcp://*:5566", ZMQ_ROUTER,
  50. s_snapshots, self);
  51. self->port = 5566;
  52. self->peer = 5556;
  53. self->primary = FALSE;
  54. }
  55. else {
  56. printf ("Usage: clonesrv4 { -p | -b }\n");
  57. free (self);
  58. exit (0);
  59. }
  60. // 主机将成为master
  61. if (self->primary)
  62. self->kvmap = zhash_new ();
  63. self->ctx = zctx_new ();
  64. self->pending = zlist_new ();
  65. bstar_set_verbose (self->bstar, TRUE);
  66. // 设置克隆服务端套接字
  67. self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
  68. self->collector = zsocket_new (self->ctx, ZMQ_SUB);
  69. zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1);
  70. zsocket_bind (self->collector, "tcp://*:%d", self->port + 2);
  71. // 作为克隆客户端连接同伴
  72. self->subscriber = zsocket_new (self->ctx, ZMQ_SUB);
  73. zsocket_connect (self->subscriber, "tcp://localhost:%d", self->peer + 1);
  74. // 注册状态事件处理器
  75. bstar_new_master (self->bstar, s_new_master, self);
  76. bstar_new_slave (self->bstar, s_new_slave, self);
  77. // 注册bstar反应堆其他事件处理器
  78. zloop_reader (bstar_zloop (self->bstar), self->collector, s_collector, self);
  79. zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_flush_ttl, self);
  80. zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_send_hugz, self);
  81. // 开启bstar反应堆
  82. bstar_start (self->bstar);
  83. // 中断,终止。
  84. while (zlist_size (self->pending)) {
  85. kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
  86. kvmsg_destroy (&kvmsg);
  87. }
  88. zlist_destroy (&self->pending);
  89. bstar_destroy (&self->bstar);
  90. zhash_destroy (&self->kvmap);
  91. zctx_destroy (&self->ctx);
  92. free (self);
  93. return 0;
  94. }
  95. // ---------------------------------------------------------------------
  96. // 发送快照内容
  97. static int s_send_single (char *key, void *data, void *args);
  98. // 请求方信息
  99. typedef struct {
  100. void *socket; // ROUTER套接字
  101. zframe_t *identity; // 请求放标识
  102. char *subtree; // 子树
  103. } kvroute_t;
  104. static int
  105. s_snapshots (zloop_t *loop, void *snapshot, void *args)
  106. {
  107. clonesrv_t *self = (clonesrv_t *) args;
  108. zframe_t *identity = zframe_recv (snapshot);
  109. if (identity) {
  110. // 请求在消息的第二帧中
  111. char *request = zstr_recv (snapshot);
  112. char *subtree = NULL;
  113. if (streq (request, "ICANHAZ?")) {
  114. free (request);
  115. subtree = zstr_recv (snapshot);
  116. }
  117. else
  118. printf ("E: 错误的请求,正在退出……\n");
  119. if (subtree) {
  120. // 发送状态快照
  121. kvroute_t routing = { snapshot, identity, subtree };
  122. zhash_foreach (self->kvmap, s_send_single, &routing);
  123. // 发送终止消息,以及消息编号
  124. zclock_log ("I: 正在发送快照,版本号:%d", (int) self->sequence);
  125. zframe_send (&identity, snapshot, ZFRAME_MORE);
  126. kvmsg_t *kvmsg = kvmsg_new (self->sequence);
  127. kvmsg_set_key (kvmsg, "KTHXBAI");
  128. kvmsg_set_body (kvmsg, (byte *) subtree, 0);
  129. kvmsg_send (kvmsg, snapshot);
  130. kvmsg_destroy (&kvmsg);
  131. free (subtree);
  132. }
  133. }
  134. return 0;
  135. }
  136. // 每次发送一个快照键值对
  137. static int
  138. s_send_single (char *key, void *data, void *args)
  139. {
  140. kvroute_t *kvroute = (kvroute_t *) args;
  141. kvmsg_t *kvmsg = (kvmsg_t *) data;
  142. if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
  143. && memcmp (kvroute->subtree,
  144. kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
  145. // 先发送接收方的地址
  146. zframe_send (&kvroute->identity,
  147. kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
  148. kvmsg_send (kvmsg, kvroute->socket);
  149. }
  150. return 0;
  151. }
  152. // ---------------------------------------------------------------------
  153. // 从客户端收集更新事件
  154. // 如果我们是master,则将该事件写入kvmap对象;
  155. // 如果我们是slave,则将其写入延迟队列
  156. static int s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg);
  157. static int
  158. s_collector (zloop_t *loop, void *collector, void *args)
  159. {
  160. clonesrv_t *self = (clonesrv_t *) args;
  161. kvmsg_t *kvmsg = kvmsg_recv (collector);
  162. kvmsg_dump (kvmsg);
  163. if (kvmsg) {
  164. if (self->master) {
  165. kvmsg_set_sequence (kvmsg, ++self->sequence);
  166. kvmsg_send (kvmsg, self->publisher);
  167. int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl"));
  168. if (ttl)
  169. kvmsg_set_prop (kvmsg, "ttl",
  170. "%" PRId64, zclock_time () + ttl * 1000);
  171. kvmsg_store (&kvmsg, self->kvmap);
  172. zclock_log ("I: 正在发布更新事件:%d", (int) self->sequence);
  173. }
  174. else {
  175. // 如果我们已经从master中获得了该事件,则丢弃该消息
  176. if (s_was_pending (self, kvmsg))
  177. kvmsg_destroy (&kvmsg);
  178. else
  179. zlist_append (self->pending, kvmsg);
  180. }
  181. }
  182. return 0;
  183. }
  184. // 如果消息已在延迟队列中,则删除它并返回TRUE
  185. static int
  186. s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg)
  187. {
  188. kvmsg_t *held = (kvmsg_t *) zlist_first (self->pending);
  189. while (held) {
  190. if (memcmp (kvmsg_uuid (kvmsg),
  191. kvmsg_uuid (held), sizeof (uuid_t)) == 0) {
  192. zlist_remove (self->pending, held);
  193. return TRUE;
  194. }
  195. held = (kvmsg_t *) zlist_next (self->pending);
  196. }
  197. return FALSE;
  198. }
  199. // ---------------------------------------------------------------------
  200. // 删除带有过期时间的瞬间值
  201. static int s_flush_single (char *key, void *data, void *args);
  202. static int
  203. s_flush_ttl (zloop_t *loop, void *unused, void *args)
  204. {
  205. clonesrv_t *self = (clonesrv_t *) args;
  206. zhash_foreach (self->kvmap, s_flush_single, args);
  207. return 0;
  208. }
  209. // 如果键值对过期,则进行删除操作,并广播该事件
  210. static int
  211. s_flush_single (char *key, void *data, void *args)
  212. {
  213. clonesrv_t *self = (clonesrv_t *) args;
  214. kvmsg_t *kvmsg = (kvmsg_t *) data;
  215. int64_t ttl;
  216. sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl);
  217. if (ttl && zclock_time () >= ttl) {
  218. kvmsg_set_sequence (kvmsg, ++self->sequence);
  219. kvmsg_set_body (kvmsg, (byte *) "", 0);
  220. kvmsg_send (kvmsg, self->publisher);
  221. kvmsg_store (&kvmsg, self->kvmap);
  222. zclock_log ("I: 正在发布删除事件:%d", (int) self->sequence);
  223. }
  224. return 0;
  225. }
  226. // ---------------------------------------------------------------------
  227. // 发送心跳
  228. static int
  229. s_send_hugz (zloop_t *loop, void *unused, void *args)
  230. {
  231. clonesrv_t *self = (clonesrv_t *) args;
  232. kvmsg_t *kvmsg = kvmsg_new (self->sequence);
  233. kvmsg_set_key (kvmsg, "HUGZ");
  234. kvmsg_set_body (kvmsg, (byte *) "", 0);
  235. kvmsg_send (kvmsg, self->publisher);
  236. kvmsg_destroy (&kvmsg);
  237. return 0;
  238. }
  239. // ---------------------------------------------------------------------
  240. // 状态改变事件处理函数
  241. // 我们将转变为master
  242. //
  243. // 备机先将延迟列表中的事件更新到自己的快照中,
  244. // 并开始接收客户端发来的快照请求。
  245. static int
  246. s_new_master (zloop_t *loop, void *unused, void *args)
  247. {
  248. clonesrv_t *self = (clonesrv_t *) args;
  249. self->master = TRUE;
  250. self->slave = FALSE;
  251. zloop_cancel (bstar_zloop (self->bstar), self->subscriber);
  252. // 应用延迟列表中的事件
  253. while (zlist_size (self->pending)) {
  254. kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
  255. kvmsg_set_sequence (kvmsg, ++self->sequence);
  256. kvmsg_send (kvmsg, self->publisher);
  257. kvmsg_store (&kvmsg, self->kvmap);
  258. zclock_log ("I: 正在发布延迟列表中的更新事件:%d", (int) self->sequence);
  259. }
  260. return 0;
  261. }
  262. // ---------------------------------------------------------------------
  263. // 正在切换为slave
  264. static int
  265. s_new_slave (zloop_t *loop, void *unused, void *args)
  266. {
  267. clonesrv_t *self = (clonesrv_t *) args;
  268. zhash_destroy (&self->kvmap);
  269. self->master = FALSE;
  270. self->slave = TRUE;
  271. zloop_reader (bstar_zloop (self->bstar), self->subscriber,
  272. s_subscriber, self);
  273. return 0;
  274. }
  275. // ---------------------------------------------------------------------
  276. // 从同伴主机(master)接收更新事件;
  277. // 接收该类更新事件时,我们一定是slave。
  278. static int
  279. s_subscriber (zloop_t *loop, void *subscriber, void *args)
  280. {
  281. clonesrv_t *self = (clonesrv_t *) args;
  282. // 获取快照,如果需要的话。
  283. if (self->kvmap == NULL) {
  284. self->kvmap = zhash_new ();
  285. void *snapshot = zsocket_new (self->ctx, ZMQ_DEALER);
  286. zsocket_connect (snapshot, "tcp://localhost:%d", self->peer);
  287. zclock_log ("I: 正在请求快照:tcp://localhost:%d",
  288. self->peer);
  289. zstr_send (snapshot, "ICANHAZ?");
  290. while (TRUE) {
  291. kvmsg_t *kvmsg = kvmsg_recv (snapshot);
  292. if (!kvmsg)
  293. break; // 中断
  294. if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
  295. self->sequence = kvmsg_sequence (kvmsg);
  296. kvmsg_destroy (&kvmsg);
  297. break; // 完成
  298. }
  299. kvmsg_store (&kvmsg, self->kvmap);
  300. }
  301. zclock_log ("I: 收到快照,版本号:%d", (int) self->sequence);
  302. zsocket_destroy (self->ctx, snapshot);
  303. }
  304. // 查找并删除
  305. kvmsg_t *kvmsg = kvmsg_recv (subscriber);
  306. if (!kvmsg)
  307. return 0;
  308. if (strneq (kvmsg_key (kvmsg), "HUGZ")) {
  309. if (!s_was_pending (self, kvmsg)) {
  310. // 如果master的更新事件比客户端的事件早到,则将master的事件存入延迟列表,
  311. // 当收到客户端更新事件时会将其从列表中清除。
  312. zlist_append (self->pending, kvmsg_dup (kvmsg));
  313. }
  314. // 如果更新事件比kvmap版本高,则应用它
  315. if (kvmsg_sequence (kvmsg) > self->sequence) {
  316. self->sequence = kvmsg_sequence (kvmsg);
  317. kvmsg_store (&kvmsg, self->kvmap);
  318. zclock_log ("I: 收到更新事件:%d", (int) self->sequence);
  319. }
  320. else
  321. kvmsg_destroy (&kvmsg);
  322. }
  323. else
  324. kvmsg_destroy (&kvmsg);
  325. return 0;
  326. }

这段程序只有几百行,但还是花了一些时间来进行调通的。这个模型中包含了故障恢复,瞬间值,子树等等。虽然我们前期设计得很完备,但要在多个套接字之间进行调试还是很困难的。以下是我的工作方式:

  • 由于使用了反应堆(bstar,建立在zloop之上),我们节省了大量的代码,让程序变得简洁明了。整个服务以一个线程运行,因此不会出现跨线程的问题。只需将结构指针(self)传递给所有的处理器即可。此外,使用发应堆后可以让代码更为模块化,易于重用。

  • 我们逐个模块进行调试,只有某个模块能够正常运行时才会进入下一步。由于使用了四五个套接字,因此调试的工作量是很大的。我直接将调试信息输出到了屏幕上,因为实在没有必要专门开一个调试器来工作。

  • 因为一直在使用valgrind工具进行测试,因此我能确定程序没有内存泄漏的问题。在C语言中,内存泄漏是我们非常关心的问题,因为没有什么垃圾回收机制可以帮你完成。正确地使用像kvmsg、czmq之类的抽象层可以很好地避免内存泄漏。

这段程序肯定还会存在一些BUG,部分读者可能会帮助我调试和修复,我在此表示感谢。

测试模型6时,先开启主机和备机,再打开一组客户端,顺序随意。随机地中止某个服务进程,如果程序设计得是正确的,那客户端获得的数据应该都是一致的。

克隆模式协议

花费了那么多精力来开发一套可靠的发布-订阅模式机制,我们当然希望将来能够方便地在其基础之上进行扩展。较好的方法是将其编写为一个协议,这样就能让各种语言来实现它了。

我们将其称为“集群化哈希表协议”,这是一个能够跨集群地进行键值哈希表管理,提供了多客户端的通信机制;客户端可以只操作一个子树的数据,包括更新和定义瞬间值。