Counter 例子详解

本文档主要介绍一个基于 jraft 的分布式计数器的例子。

场景

在多个节点(机器)组成的一个 raft group 中保存一个分布式计数器,该计数器可以递增和获取,并且在所有节点之间保持一致,任何少数节点的挂掉都不会影响堆外提供的两个服务:

  • incrmentAndGet(delta) 递增 delta 数值并返回递增后的值。
  • get() 获取最新的值

RPC 请求

jraft 底层使用 bolt 作为通讯框架,定义两个请求

  • IncrementAndGetRequest,用于递增
  1. public class IncrementAndGetRequest implements Serializable {
  2.  
  3. private static final long serialVersionUID = -5623664785560971849L;
  4.  
  5. private long delta;
  6.  
  7. public long getDelta() {
  8. return this.delta;
  9. }
  10.  
  11. public void setDelta(long delta) {
  12. this.delta = delta;
  13. }
  14. }
  • GetValueRequest,用于获取最新值:
  1. public class GetValueRequest implements Serializable {
  2. private static final long serialVersionUID = 9218253805003988802L;
  3.  
  4. public GetValueRequest() {
  5. super();
  6. }
  7.  
  8. }

应答结果 ValueResponse,包括:

  • success 是否成功
  • value 成功情况下返回的最新值
  • errorMsg 失败情况下的错误信息
  • redirect 发生了重新选举,需要跳转的新的leader节点。
  1. public class ValueResponse implements Serializable {
  2.  
  3. private static final long serialVersionUID = -4220017686727146773L;
  4.  
  5. private long value;
  6. private boolean success;
  7. /**
  8. * redirect peer id
  9. */
  10. private String redirect;
  11.  
  12. private String errorMsg;
  13.  
  14. public String getErrorMsg() {
  15. return this.errorMsg;
  16. }
  17.  
  18. public void setErrorMsg(String errorMsg) {
  19. this.errorMsg = errorMsg;
  20. }
  21. ......
  22. }

IncrementAndAddClosure 用于 Leader 服务端接收IncrementAndGetRequest 请求后的回调处理:

  1. public class IncrementAndAddClosure implements Closure {
  2. private CounterServer counterServer;
  3. private IncrementAndGetRequest request;
  4. private ValueResponse response;
  5. private Closure done; // 网络应答callback
  6.  
  7. public IncrementAndAddClosure(CounterServer counterServer, IncrementAndGetRequest request, ValueResponse response,
  8. Closure done) {
  9. super();
  10. this.counterServer = counterServer;
  11. this.request = request;
  12. this.response = response;
  13. this.done = done;
  14. }
  15.  
  16. @Override
  17. public void run(Status status) {
  18. // 返回应答给客户端
  19. if (this.done != null) {
  20. done.run(status);
  21. }
  22. }
  23.  
  24. public IncrementAndGetRequest getRequest() {
  25. return this.request;
  26. }
  27.  
  28. public void setRequest(IncrementAndGetRequest request) {
  29. this.request = request;
  30. }
  31.  
  32. public ValueResponse getResponse() {
  33. return this.response;
  34. }
  35.  
  36. }

服务端

状态机 CounterStateMachine

首先持有一个初始值:

  1. public class CounterStateMachine extends StateMachineAdapter {
  2. /**
  3. * counter value
  4. */
  5. private AtomicLong value = new AtomicLong(0);

实现核心的 onApply(iterator) 方法,应用用户提交的请求到状态机:

  1. @Override
  2. public void onApply(Iterator iter) {
  3. // 遍历日志
  4. while (iter.hasNext()) {
  5. long delta = 0;
  6.  
  7. IncrementAndAddClosure closure = null;
  8. // done 回调不为null,必须在应用日志后调用,如果不为 null,说明当前是leader。
  9. if (iter.done() != null) {
  10. // 当前是leader,可以直接从 IncrementAndAddClosure 中获取 delta,避免反序列化
  11. closure = (IncrementAndAddClosure) iter.done();
  12. delta = closure.getRequest().getDelta();
  13. } else {
  14. // 其他节点应用此日志,需要反序列化 IncrementAndGetRequest,获取 delta
  15. ByteBuffer data = iter.getData();
  16. try {
  17. IncrementAndGetRequest request = Codecs.getSerializer(Codecs.Hessian2).decode(data.array(),
  18. IncrementAndGetRequest.class.getName());
  19. delta = request.getDelta();
  20. } catch (CodecException e) {
  21. LOG.error("Fail to decode IncrementAndGetRequest", e);
  22. }
  23. }
  24. long prev = this.value.get();
  25. // 更新状态机
  26. long updated = value.addAndGet(delta);
  27. // 更新后,确保调用 done,返回应答给客户端。
  28. if (closure != null) {
  29. closure.getResponse().setValue(updated);
  30. closure.getResponse().setSuccess(true);
  31. closure.run(Status.OK());
  32. }
  33. LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex());
  34. iter.next();
  35. }
  36. }

CounterServer

启动一个 raft node节点,提供分布式计数器服务,内部使用 jraft 提供的 RaftGroupService 服务框架:

  1. public class CounterServer {
  2. // jraft 服务端服务框架
  3. private RaftGroupService raftGroupService;
  4. // raft 节点
  5. private Node node;
  6. // 业务状态机
  7. private CounterStateMachine fsm;
  8.  
  9. public CounterServer(String dataPath, String groupId, PeerId serverId, NodeOptions nodeOptions) throws IOException {
  10. // 初始化路径
  11. FileUtils.forceMkdir(new File(dataPath));
  12. // 初始化全局定时器
  13. TimerManager.init(50);
  14.  
  15. // 这里让 raft RPC 和业务 RPC 使用同一个 RPC server, 通常也可以分开.
  16. RpcServer rpcServer = new RpcServer(serverId.getPort());
  17. RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
  18. // 注册业务处理器
  19. rpcServer.registerUserProcessor(new GetValueRequestProcessor(this));
  20. rpcServer.registerUserProcessor(new IncrementAndGetRequestProcessor(this));
  21. // 初始化状态机
  22. this.fsm = new CounterStateMachine();
  23. // 设置状态机到启动参数
  24. nodeOptions.setFsm(this.fsm);
  25. // 设置存储路径
  26. // 日志, 必须
  27. nodeOptions.setLogUri(dataPath + File.separator + "log");
  28. // 元信息, 必须
  29. nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta");
  30. // snapshot, 可选, 一般都推荐
  31. nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
  32. // 初始化 raft group 服务框架
  33. this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
  34. // 启动
  35. this.node = this.raftGroupService.start();
  36. }
  37.  
  38. public CounterStateMachine getFsm() {
  39. return this.fsm;
  40. }
  41.  
  42. public Node getNode() {
  43. return this.node;
  44. }
  45.  
  46. public RaftGroupService RaftGroupService() {
  47. return this.raftGroupService;
  48. }
  49.  
  50. /**
  51. * 生成重定向请求
  52. */
  53. public ValueResponse redirect() {
  54. ValueResponse response = new ValueResponse();
  55. response.setSuccess(false);
  56. if (node != null) {
  57. PeerId leader = node.getLeaderId();
  58. if (leader != null) {
  59. response.setRedirect(leader.toString());
  60. }
  61. }
  62.  
  63. return response;
  64. }
  65.  
  66. public static void main(String[] args) throws IOException {
  67. if (args.length != 4) {
  68. System.out
  69. .println("Useage : java com.alipay.jraft.example.counter.CounterServer {dataPath} {groupId} {serverId} {initConf}");
  70. System.out
  71. .println("Example: java com.alipay.jraft.example.counter.CounterServer /tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");
  72. System.exit(1);
  73. }
  74. String dataPath = args[0];
  75. String groupId = args[1];
  76. String serverIdStr = args[2];
  77. String initConfStr = args[3];
  78.  
  79. NodeOptions nodeOptions = new NodeOptions();
  80. // 为了测试, 调整 snapshot 间隔等参数
  81. nodeOptions.setElectionTimeoutMs(5000);
  82. nodeOptions.setDisableCli(false);
  83. nodeOptions.setSnapshotIntervalSecs(30);
  84. // 解析参数
  85. PeerId serverId = new PeerId();
  86. if (!serverId.parse(serverIdStr)) {
  87. throw new IllegalArgumentException("Fail to parse serverId:" + serverIdStr);
  88. }
  89. Configuration initConf = new Configuration();
  90. if (!initConf.parse(initConfStr)) {
  91. throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr);
  92. }
  93. // 设置初始集群配置
  94. nodeOptions.setInitialConf(initConf);
  95.  
  96. // 启动
  97. CounterServer counterServer = new CounterServer(dataPath, groupId, serverId, nodeOptions);
  98. System.out.println("Started counter server at port:"
  99. + counterServer.getNode().getNodeId().getPeerId().getPort());
  100. }
  101. }

启动三个节点的参数类似:

windows 用户请注意第一个参数的数据目录设置

  1. /tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
  2. /tmp/server2 counter 127.0.0.1:8082 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
  3. /tmp/server3 counter 127.0.0.1:8083 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083

分别为 server1/server2/server3三个目录,raft group名称为 counter,节点ip也分别为

  1. 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083

注册的网络请求处理器,我们看下 IncrementAndGetRequestProcessor 实现,一个普通的 bolt processor :

  1. public class IncrementAndGetRequestProcessor extends AsyncUserProcessor<IncrementAndGetRequest> {
  2. private static final Logger LOG = LoggerFactory.getLogger(IncrementAndGetRequestProcessor.class);
  3.  
  4. private CounterServer counterServer;
  5.  
  6. public IncrementAndGetRequestProcessor(CounterServer counterServer) {
  7. super();
  8. this.counterServer = counterServer;
  9. }
  10.  
  11. @Override
  12. public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, IncrementAndGetRequest request) {
  13.  
  14.      // 非leader,生成跳转请求
  15. if (!counterServer.getFsm().isLeader()) {
  16. asyncCtx.sendResponse(counterServer.redirect());
  17. return;
  18. }
  19.  
  20. // 构建应答回调
  21. ValueResponse response = new ValueResponse();
  22. IncrementAndAddClosure closure = new IncrementAndAddClosure(counterServer, request, response, new Closure() {
  23.  
  24. @Override
  25. public void run(Status status) {
  26. // 提交后处理
  27. if (!status.isOk()) {
  28. // 提交失败,返回错误信息
  29. response.setErrorMsg(status.getErrorMsg());
  30. response.setSuccess(false);
  31. }
  32. // 成功,返回ValueResponse应答
  33. asyncCtx.sendResponse(response);
  34.  
  35. }
  36. });
  37.  
  38. try {
  39. // 构建提交任务
  40. Task task = new Task();
  41. task.setDone(closure); // 设置回调
  42. // 填充数据,将请求用 hessian2序列化到 data 字段
  43. task.setData(ByteBuffer.wrap(Codecs.getSerializer(Codecs.Hessian2).encode(request)));
  44.  
  45. // 提交到 raft group
  46. counterServer.getNode().apply(task);
  47. } catch (CodecException e) {
  48. // 处理序列化异常
  49. LOG.error("Fail to encode IncrementAndGetRequest", e);
  50. ValueResponse responseObject = response;
  51. responseObject.setSuccess(false);
  52. responseObject.setErrorMsg(e.getMessage());
  53. asyncCtx.sendResponse(responseObject);
  54. }
  55. }
  56.  
  57. @Override
  58. public String interest() {
  59. return IncrementAndGetRequest.class.getName();
  60. }
  61.  
  62. }

客户端

客户端 CounterClient 比较简单,主要使用 jraft 提供的 RouteTable 来刷新获取最新的 leader 节点,然后发送请求到 leader节点即可:

  1. public class CounterClient {
  2. public static void main(String[] args) throws Exception {
  3. if (args.length != 2) {
  4. System.out.println("Useage : java com.alipay.jraft.example.counter.CounterClient {groupId} {conf}");
  5. System.out
  6. .println("Example: java com.alipay.jraft.example.counter.CounterClient counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");
  7. System.exit(1);
  8. }
  9. String groupId = args[0];
  10. String confStr = args[1];
  11.  
  12. Configuration conf = new Configuration();
  13. if (!conf.parse(confStr)) {
  14. throw new IllegalArgumentException("Fail to parse conf:" + confStr);
  15. }
  16. // 更新raft group配置
  17. RouteTable.getInstance().updateConfiguration(groupId, conf);

接下来初始化 RPC 客户端并更新路由表:

  1. BoltCliClientService cliClientService = new BoltCliClientService();
  2. cliClientService.init(new CliOptions());
  3.  
  4. if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) {
  5. throw new IllegalStateException("Refresh leader failed");
  6. }

获取 leader 后发送请求:

  1. PeerId leader = RouteTable.getInstance().selectLeader(groupId);
  2. System.out.println("Leader is " + leader);
  3. int n = 1000;
  4. CountDownLatch latch = new CountDownLatch(n);
  5. long start = System.currentTimeMillis();
  6. for (int i = 0; i < n; i++) {
  7. incrementAndGet(cliClientService, leader, i, latch);
  8. }
  9. latch.await();
  10. System.out.println(n + " ops, cost : " + (System.currentTimeMillis() - start) + " ms.");
  11. System.exit(0);

incrementAndGet 方法实现比较简单了:

  1. private static void incrementAndGet(BoltCliClientService cliClientService, PeerId leader, long delta,
  2. CountDownLatch latch) throws RemotingException, InterruptedException {
  3. // 构建 IncrementAndGetRequest 请求并发送到 leader
  4. IncrementAndGetRequest request = new IncrementAndGetRequest();
  5. request.setDelta(delta);
  6. cliClientService.getRpcClient().invokeWithCallback(leader.getEndpoint().toString(), request,
  7. new InvokeCallback() {
  8.  
  9. @Override
  10. public void onResponse(Object result) {
  11. latch.countDown();
  12. System.out.println("incrementAndGet result:" + result);
  13. }
  14.  
  15. @Override
  16. public void onException(Throwable e) {
  17. e.printStackTrace();
  18. latch.countDown();
  19.  
  20. }
  21.  
  22. @Override
  23. public Executor getExecutor() {
  24. return null;
  25. }
  26. }, 5000);
  27. }

Snapshot 实现

为了避免每次节点重启的时候,重新应用一遍所有的日志,并且避免保存所有的日志,可以使用 snapshot 机制,也就是为状态机做一个 checkpoint,保存当时状态机的状态,删除在此之前的所有日志,核心是实现 StateMachine的两个方法:

  • onSnapshotLoad,启动或者安装 snapshot 后加载 snapshot
  • onSnapshotSave ,定期保存 snapshot
    我们先为 Counter实现一个snapsho t数据文件:
  1. public class CounterSnapshotFile {
  2. private static final Logger LOG = LoggerFactory.getLogger(CounterSnapshotFile.class);
  3. private String path;
  4.  
  5. public CounterSnapshotFile(String path) {
  6. super();
  7. this.path = path;
  8. }
  9.  
  10. public String getPath() {
  11. return this.path;
  12. }
  13.  
  14. /**
  15. * Save value to snapshot file.
  16. * @param value
  17. * @return
  18. */
  19. public boolean save(long value) {
  20. try {
  21. FileUtils.writeStringToFile(new File(path), String.valueOf(value));
  22. return true;
  23. } catch (IOException e) {
  24. LOG.error("Fail to save snapshot", e);
  25. return false;
  26. }
  27. }
  28.  
  29. public long load() throws IOException {
  30. String s = FileUtils.readFileToString(new File(path));
  31. if (!StringUtils.isBlank(s)) {
  32. return Long.parseLong(s);
  33. }
  34. throw new IOException("Fail to load snapshot from " + path + ",content: " + s);
  35. }
  36. }

保存到指定的 path

然后实现 StateMachine的两个方法:

  1. public boolean onSnapshotLoad(SnapshotReader reader) {
  2. // leader不用从 snapshot 加载,他不会接受 snapshot 安装请求
  3. if (isLeader()) {
  4. LOG.warn("Leader is not supposed to load snapshot");
  5. return false;
  6. }
  7. // 未找到数据文件,忽略
  8. if (reader.getFileMeta("data") == null) {
  9. LOG.error("Fail to find data file in {}", reader.getPath());
  10. return false;
  11. }
  12. // 将 snapshot 保存在 reader.getPath()/data 文件里
  13. CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data");
  14. try {
  15. this.value.set(snapshot.load());
  16. return true;
  17. } catch (IOException e) {
  18. LOG.error("Fail to load snapshot from {}", snapshot.getPath());
  19. return false;
  20. }
  21.  
  22. }
  23.  
  24. public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {
  25. // 获取此刻状态机状态
  26. final long currVal = this.value.get();
  27. // 异步保存,避免阻塞状态机
  28. Utils.runInThread(new Runnable() {
  29.  
  30. @Override
  31. public void run() {
  32. CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data");
  33. if (snapshot.save(currVal)) {
  34. if (writer.addFile("data")) {
  35. done.run(Status.OK());
  36. } else {
  37. done.run(new Status(RaftError.EIO, "Fail to add file to writer"));
  38. }
  39. } else {
  40. done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath()));
  41. }
  42. }
  43. });
  44. }

snapshot 的间隔可以通过 NodeOptions 的 snapshotIntervalSecs 控制,默认一个小时。