关于热备份的基本概念和使用可以参照 wiki:跨机房同步,这里将主要描述跨机房同步的设计方案和执行细节。

背景

小米内部有些业务对服务可用性有较高要求,但又不堪每年数次机房故障的烦恼,于是向 pegasus 团队寻求帮助,希望在机房故障时,服务能够切换流量至备用机房而数据不致丢失。因为成本所限,在小米内部以双机房为主。

通常解决该问题有几种思路:

  • 由 client 将数据同步写至两机房。这种方法较为低效,容易受跨机房专线带宽影响,并且延时高,同机房 1ms 内的写延时在跨机房下通常会放大到几十毫秒,优点是一致性强,但需要 client 实现。服务端的复杂度小,客户端的复杂度大。

  • 使用 raft/paxos 协议进行 quorum write 实现机房间同步。这种做法需要至少 3 副本分别在 3 机房部署,延时较高但提供强一致性,因为要考虑跨集群的元信息管理,这是实现难度最大的一种方案。

  • 在两机房下分别部署两个 pegasus 集群,集群间进行异步复制。机房 A 的数据可能会在 1 分钟后复制到机房 B,但 client 对此无感知,只感知机房 A。在机房 A 故障时,用户可以选择写机房 B。这种方案适合 最终一致性/弱一致性 要求的场景。后面会讲解我们如何实现 “最终一致性”。

基于实际业务需求考虑,我们选择方案3。

  1. +-------+ +-------+
  2. | +---+ | | +---+ |
  3. | | P +--------> S | |
  4. | +-+-+ | | +---+ |
  5. | | | | |
  6. | +-v-+ | | |
  7. | | S | | | |
  8. | +---+ | | |
  9. +-------+ +-------+
  10. dead alive
  11. 只用两机房,使用 raft 协议进行进行跨机房同步依然无法避免机房故障时的停服。(5节点同理)
  1. +---+ +---+
  2. | A | | B |
  3. +-+-+ +-+-+
  4. | |
  5. +--------------------------------------------------+
  6. | +------v-------+ +------v-------+ |
  7. | | pegasus A <----------> pegasus B | |
  8. | +--------------+ +--------------+ |
  9. +--------------------------------------------------+
  10. 虽然是各写一个机房,但理想情况下 A B 都能读到所有的数据。

架构选择

即使同样是做方案 3 的集群间异步同步,业内的做法也有不同:

  • 各集群单副本:这种方案考虑到多集群已存在冗余的情况下,可以减少单集群内的副本数,同时既然一致性已没有保证,大可以索性脱离一致性协议,完全依赖于稳定的集群间网络,保证即使单机房宕机,损失的数据量也是仅仅几十毫秒内的请求量级。考虑机房数为 5 的时候,如果每个机房都是 3 副本,那么全量数据就是 3*5=15 副本,这时候简化为各集群单副本的方案就是几乎最自然的选择。

  • 同步工具作为外部依赖使用:工具自然是尽可能不影响服务是最好,所以同步工具可以作为外部依赖部署,单纯到节点上读日志(WAL),发日志。这个方案对日志 GC 有前提条件,即日志不可以在同步完成前被删除,否则就丢数据了,但存储服务日志的 GC 是外部工具难以控制的。所以可以把日志强行保留一周以上,但缺点是磁盘空间的成本较大。同步工具作为外部依赖的优点在于稳定性强,不影响服务,缺点在于对服务的控制能力差,很难处理一些琐碎的一致性问题(后面会讲到),难以实现最终一致性

  • 同步工具嵌入到服务内部:这种做法在工具稳定前会有一段阵痛期,即工具的稳定性影响服务的稳定性。但实现的灵活性肯定是最强的。

最初 Pegasus 的热备份方案借鉴于 HBase Replication,基本只考虑了第三种方案。而事实证明这种方案更容易保证 Pegasus 存储数据不丢的属性。

基本概念

  • duplicate_rpc
  • cluster id
  • confirmed_decree
  1. +----------+ +----------+
  2. | +------+ | | +------+ |
  3. | | app1 +---------> app1 | |
  4. | +------+ | | +------+ |
  5. | | | |
  6. | cluster1 | | cluster2 |
  7. +----------+ +----------+

pegasus 的热备份以表为粒度。支持单向和双向的复制。为了运维方便,两集群表名必须一致。为了可扩展性和易用性,两集群 partition count 可不同。

  1. +------------+
  2. | +--------+ |
  3. +------>replica1| |
  4. | | +--------+ |
  5. +------------+ | | |
  6. | +--------+ | | | +--------+ |
  7. | |replica1| | +------>replica2| |
  8. | +--------+ | | | +--------+ |
  9. | +-----------------> | |
  10. | +--------+ | | | +--------+ |
  11. | |replica2| | +------>replica3| |
  12. | +--------+ | | | +--------+ |
  13. +------------+ | | |
  14. | | +--------+ |
  15. +------>replica4| |
  16. cluster A | +--------+ |
  17. +------------+
  18. cluster B

如上图所示,每个 replica (这里特指每个分片的 primary,注意 secondary 不负责热备份复制)独自复制自己的 private log 到远端,replica 之间互不影响。复制直接通过 pegasus client 来完成。每一条写入 A 的记录(如 set / multiset)都会通过 pegasus client 回放到 B。为了将热备份的写与常规写区别开,我们这里定义 duplicate_rpc 表示热备写。

A->B 的热备写,B 也同样会经由三副本的 PacificA 协议提交,并且写入 private log 中。这里有一个问题是,在 A,B 互相同步的场景,一份写操作将形成循环:A->B->A,同样的写会无数次地被重放。为了避免循环写,我们引入 cluster id 的概念,每条 duplicate_rpc 都会标记发送者的 cluster id。

  1. [duplication-group]
  2. A=1
  3. B=2

所以当 B 重放某条 duplicate_rpc 时,发现其 cluster_id = 1,则不会将它发往 A。

热备份同时也需要容忍在 replica 主备切换下复制的进度不会丢失,例如当前 replica1 复制到日志 decree=5001,此时发生主备切换,我们不想看到 replica1 从 0 开始,所以为了能够支持 断点续传,我们引入 confirmed_decree。replica 定期向 meta 汇报当前进度(如 decree=5001),一旦 meta 将该进度持久化至 zookeeper,当 replica 恢复时即可安全地从 confirmed_decree=5001 开始热备份复制。

流程

热备份相关的元信息首先会记录至 meta server 上,replica server 通过 duplication sync 定期同步元信息,包括各个分片的 confirmed_decree。

  1. +----------+ add dup +----------+
  2. | client +-----------> meta |
  3. +----------+ +----+-----+
  4. |
  5. | duplication sync
  6. |
  7. +-----v-----+
  8. | replica |
  9. +-----------+

每个 replica 首先读取 private log,为了限制流量,每次只会读入一个日志块而非一整个日志文件。每一批日志统一传递给 mutation_duplicator 进行发送。mutation_duplicator 是一个接口类,目前只实现用 pegasus client 将日志分发至目标集群,未来如有需求也可接入 HBase 等系统。

  1. +----------------------+ 2
  2. | private_log_loader +--------------+
  3. +-----------^----------+ |
  4. | 1 +----------v----------+
  5. +----------+------+ | mutation_duplicator |
  6. | | +----=----------------+
  7. | | |
  8. | private log | |
  9. | | +------=----------------------+ pegasus client
  10. | | | pegasus_mutation_duplicator +----------------->
  11. +-----------------+ +-----------------------------+ 3

每个日志块的一批写中可能有多组 hashkey,不同的 hashkey 可以并行分发而不会影响正确性,从而可以提高热备份效率。而如果 hashkey 相同,例如 set<hashkey="h", sortkey="s1", value="v1"> 与 set<hashkey="h", sortkey="s2", value="v2"> 有先后关系,则它们必须串行依次发送。

当前我们的策略是每一个日志块的所有写发完毕后,再重复读日志块,发日志的过程。往后可能再做优化。

日志完整性

在引入热备份之前,Pegasus 的日志会定期被清理,无用的日志文件会被删除。如果有被删除的日志还没有被复制到远端集群,两集群就会数据不一致。为了避免这一问题,我们引入了几个机制来保证日志的完整性,从而实现两集群的最终一致性。

  • delay gcPegasus 认为 last_durable_decree 之后的日志即可被删除(Garbage Collected),因为它们已经被持久化至 rocksdb 的 sst files 中,即使宕机重启数据也不会丢失。但考虑如果热备份的进度较慢,我们则需要延迟 GC,保证数据只有在 confirmed_decree 之后的日志才可被 GC。

  • init info另一种导致日志丢失的情况是在 replica 重启时,原先默认 last_durable_decree 之前的日志可以被跳过。我们需要保证 confirmed_decree 后的日志不会在重启时被忽略,如下图,我们要保证 log.2 不会被跳过。

  1. +-------+rocksdb+-------+
  2. | last_committed_decree +-----+
  3. | +--------------+ + | 3 | last_durable_decree
  4. | | memtable | | +-----+
  5. | +--------------+ | | 2 |
  6. | + +-----+
  7. | +-----------------+ last_durable_decree | 1 | confirmed_decree
  8. | | sst files | + +-----+
  9. | +-----------------+ |
  10. +-----------------------+ private logs

然而在重启时我们并不知晓是否正在进行热备份,也不知道 confirmed_decree 具体值,因为此时我们还未进行 duplication_sync。所以目前的做法是在 init_info 存储 duplicating=true 这一项,表明正在进行热备份,则保证不会跳过任一日志。

  • group check虽然 primary 不会 GC 那些未被热备的日志,但 secondary 并未遵守这一约定,这些丢失日志的 secondary 有朝一日也会被提拔为 primary,从而影响日志完整性。所以 primary 需要将 confirmed_decree 通过组间心跳(group check)的方式通知 secondary,保证它们不会误删日志。
  1. +---------+ +-----------+
  2. | | | |
  3. | primary +----------->+ secondary |
  4. | | | |
  5. +---+-----+ +-----------+
  6. | confirmed=5001
  7. | +-----------+
  8. | | |
  9. +----------------->+ secondary |
  10. | |
  11. group check +-----------+
  • replica learn当一个 replica 新加入3副本组中,由于它的数据滞后于 primary,它会通过 replica learn 来拷贝新日志,跟上组员的进度。此时从何处开始拷贝日志(称为 learn_start_decree)就是一个问题:
  1. +-------+rocksdb+-------+ +----+
  2. | last_committed_decree=51 | 51 |
  3. | +-----------------+ + +----+
  4. | | sst files | | +----+
  5. | +-----------------+ | | 50 |
  6. | | +----+
  7. +-----------------------+
  8. private logs
  9. learner's state, confirmed_decree=20
  10. primary's committed_decree = 62

显然,primary 不光需要让 learner 跟上进度,拷贝 [52, 62] 的日志,也需要补齐那些未热备的日志。所以 primary 需要完整拷贝 [21-62] 的日志。

  • apply learned state原先 learner 收到 [21-60] 之间的日志后首先会放入 learn/ 目录下,然后简单地重放每一条日志并写入 rocksdb,并不会写入日志中。为了保证日志完整性,我们会将 learn/ 目录 rename 至 plog 目录,替代之前所有的日志。
  1. +----+
  2. | 60 |
  3. +----+
  4. | 59 |
  5. +----+
  6. +----+
  7. +----+ |....| +----+
  8. | 51 | +----+ | 62 |
  9. +----+ +----+ +----+
  10. | 50 | | 21 | | 61 |
  11. +----+ +----+ +----+
  12. +-----------+ +---------+--------+
  13. | plog/ | | learn/ | cache |
  14. +-----------+ +---------+--------+

在 learn 的过程中,还可能有部分日志不是以文件的形式复制到 learner,而是以内存形式拷贝到 "cache" 中(我们也将此称为 "learn cache"),如上图的 [61,62]。原先这些日志只会在写入 rocksdb 后被丢弃,现在它们还需要被写至 private log 中。

最终在这样一轮 learn 完成后,我们得到的日志集如下:

  1. +----+
  2. | 62 |
  3. +----+
  4. | 61 |
  5. +----+
  6. | 60 |
  7. +-----------+ +----+
  8. | plog/ | | 59 |
  9. +-----------+ +----+
  10. +----+
  11. |....|
  12. +----+
  13. +----+
  14. | 21 |
  15. +----+