Zookeeper Java 客户端 ——Apache Curator

一、基本依赖

Curator 是 Netflix 公司开源的一个 Zookeeper 客户端,目前由 Apache 进行维护。与 Zookeeper 原生客户端相比,Curator 的抽象层次更高,功能也更加丰富,是目前 Zookeeper 使用范围最广的 Java 客户端。本篇文章主要讲解其基本使用,项目采用 Maven 构建,以单元测试的方法进行讲解,相关依赖如下:

  1. <dependencies>
  2. <!--Curator 相关依赖-->
  3. <dependency>
  4. <groupId>org.apache.curator</groupId>
  5. <artifactId>curator-framework</artifactId>
  6. <version>4.0.0</version>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.apache.curator</groupId>
  10. <artifactId>curator-recipes</artifactId>
  11. <version>4.0.0</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.zookeeper</groupId>
  15. <artifactId>zookeeper</artifactId>
  16. <version>3.4.13</version>
  17. </dependency>
  18. <!--单元测试相关依赖-->
  19. <dependency>
  20. <groupId>junit</groupId>
  21. <artifactId>junit</artifactId>
  22. <version>4.12</version>
  23. </dependency>
  24. </dependencies>

完整源码见本仓库: https://github.com/heibaiying/BigData-Notes/tree/master/code/Zookeeper/curator

二、客户端相关操作

2.1 创建客户端实例

这里使用 @Before 在单元测试执行前创建客户端实例,并使用 @After 在单元测试后关闭客户端连接。

  1. public class BasicOperation {
  2. private CuratorFramework client = null;
  3. private static final String zkServerPath = "192.168.0.226:2181";
  4. private static final String nodePath = "/hadoop/yarn";
  5. @Before
  6. public void prepare() {
  7. // 重试策略
  8. RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
  9. client = CuratorFrameworkFactory.builder()
  10. .connectString(zkServerPath)
  11. .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
  12. .namespace("workspace").build(); //指定命名空间后,client 的所有路径操作都会以/workspace 开头
  13. client.start();
  14. }
  15. @After
  16. public void destroy() {
  17. if (client != null) {
  18. client.close();
  19. }
  20. }
  21. }

2.2 重试策略

在连接 Zookeeper 时,Curator 提供了多种重试策略以满足各种需求,所有重试策略均继承自 RetryPolicy 接口,如下图:

Zookeeper Java 客户端 —— Apache Curator - 图1

这些重试策略类主要分为以下两类:

  • RetryForever :代表一直重试,直到连接成功;
  • SleepingRetry : 基于一定间隔时间的重试。这里以其子类 ExponentialBackoffRetry 为例说明,其构造器如下:
  1. /**
  2. * @param baseSleepTimeMs 重试之间等待的初始时间
  3. * @param maxRetries 最大重试次数
  4. * @param maxSleepMs 每次重试间隔的最长睡眠时间(毫秒)
  5. */
  6. ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

2.3 判断服务状态

  1. @Test
  2. public void getStatus() {
  3. CuratorFrameworkState state = client.getState();
  4. System.out.println("服务是否已经启动:" + (state == CuratorFrameworkState.STARTED));
  5. }

三、节点增删改查

3.1 创建节点

  1. @Test
  2. public void createNodes() throws Exception {
  3. byte[] data = "abc".getBytes();
  4. client.create().creatingParentsIfNeeded()
  5. .withMode(CreateMode.PERSISTENT) //节点类型
  6. .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
  7. .forPath(nodePath, data);
  8. }

创建时可以指定节点类型,这里的节点类型和 Zookeeper 原生的一致,全部类型定义在枚举类 CreateMode 中:

  1. public enum CreateMode {
  2. // 永久节点
  3. PERSISTENT (0, false, false),
  4. //永久有序节点
  5. PERSISTENT_SEQUENTIAL (2, false, true),
  6. // 临时节点
  7. EPHEMERAL (1, true, false),
  8. // 临时有序节点
  9. EPHEMERAL_SEQUENTIAL (3, true, true);
  10. ....
  11. }

2.2 获取节点信息

  1. @Test
  2. public void getNode() throws Exception {
  3. Stat stat = new Stat();
  4. byte[] data = client.getData().storingStatIn(stat).forPath(nodePath);
  5. System.out.println("节点数据:" + new String(data));
  6. System.out.println("节点信息:" + stat.toString());
  7. }

如上所示,节点信息被封装在 Stat 类中,其主要属性如下:

  1. public class Stat implements Record {
  2. private long czxid;
  3. private long mzxid;
  4. private long ctime;
  5. private long mtime;
  6. private int version;
  7. private int cversion;
  8. private int aversion;
  9. private long ephemeralOwner;
  10. private int dataLength;
  11. private int numChildren;
  12. private long pzxid;
  13. ...
  14. }

每个属性的含义如下:

状态属性 说明
czxid 数据节点创建时的事务 ID
ctime 数据节点创建时的时间
mzxid 数据节点最后一次更新时的事务 ID
mtime 数据节点最后一次更新时的时间
pzxid 数据节点的子节点最后一次被修改时的事务 ID
cversion 子节点的更改次数
version 节点数据的更改次数
aversion 节点的 ACL 的更改次数
ephemeralOwner 如果节点是临时节点,则表示创建该节点的会话的 SessionID;如果节点是持久节点,则该属性值为 0
dataLength 数据内容的长度
numChildren 数据节点当前的子节点个数

2.3 获取子节点列表

  1. @Test
  2. public void getChildrenNodes() throws Exception {
  3. List<String> childNodes = client.getChildren().forPath("/hadoop");
  4. for (String s : childNodes) {
  5. System.out.println(s);
  6. }
  7. }

2.4 更新节点

更新时可以传入版本号也可以不传入,如果传入则类似于乐观锁机制,只有在版本号正确的时候才会被更新。

  1. @Test
  2. public void updateNode() throws Exception {
  3. byte[] newData = "defg".getBytes();
  4. client.setData().withVersion(0) // 传入版本号,如果版本号错误则拒绝更新操作,并抛出 BadVersion 异常
  5. .forPath(nodePath, newData);
  6. }

2.5 删除节点

  1. @Test
  2. public void deleteNodes() throws Exception {
  3. client.delete()
  4. .guaranteed() // 如果删除失败,那么在会继续执行,直到成功
  5. .deletingChildrenIfNeeded() // 如果有子节点,则递归删除
  6. .withVersion(0) // 传入版本号,如果版本号错误则拒绝删除操作,并抛出 BadVersion 异常
  7. .forPath(nodePath);
  8. }

2.6 判断节点是否存在

  1. @Test
  2. public void existNode() throws Exception {
  3. // 如果节点存在则返回其状态信息如果不存在则为 null
  4. Stat stat = client.checkExists().forPath(nodePath + "aa/bb/cc");
  5. System.out.println("节点是否存在:" + !(stat == null));
  6. }

三、监听事件

3.1 创建一次性监听

和 Zookeeper 原生监听一样,使用 usingWatcher 注册的监听是一次性的,即监听只会触发一次,触发后就销毁。示例如下:

  1. @Test
  2. public void DisposableWatch() throws Exception {
  3. client.getData().usingWatcher(new CuratorWatcher() {
  4. public void process(WatchedEvent event) {
  5. System.out.println("节点" + event.getPath() + "发生了事件:" + event.getType());
  6. }
  7. }).forPath(nodePath);
  8. Thread.sleep(1000 * 1000); //休眠以观察测试效果
  9. }

3.2 创建永久监听

Curator 还提供了创建永久监听的 API,其使用方式如下:

  1. @Test
  2. public void permanentWatch() throws Exception {
  3. // 使用 NodeCache 包装节点,对其注册的监听作用于节点,且是永久性的
  4. NodeCache nodeCache = new NodeCache(client, nodePath);
  5. // 通常设置为 true, 代表创建 nodeCache 时,就去获取对应节点的值并缓存
  6. nodeCache.start(true);
  7. nodeCache.getListenable().addListener(new NodeCacheListener() {
  8. public void nodeChanged() {
  9. ChildData currentData = nodeCache.getCurrentData();
  10. if (currentData != null) {
  11. System.out.println("节点路径:" + currentData.getPath() +
  12. "数据:" + new String(currentData.getData()));
  13. }
  14. }
  15. });
  16. Thread.sleep(1000 * 1000); //休眠以观察测试效果
  17. }

3.3 监听子节点

这里以监听 /hadoop 下所有子节点为例,实现方式如下:

  1. @Test
  2. public void permanentChildrenNodesWatch() throws Exception {
  3. // 第三个参数代表除了节点状态外,是否还缓存节点内容
  4. PathChildrenCache childrenCache = new PathChildrenCache(client, "/hadoop", true);
  5. /*
  6. * StartMode 代表初始化方式:
  7. * NORMAL: 异步初始化
  8. * BUILD_INITIAL_CACHE: 同步初始化
  9. * POST_INITIALIZED_EVENT: 异步并通知,初始化之后会触发 INITIALIZED 事件
  10. */
  11. childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
  12. List<ChildData> childDataList = childrenCache.getCurrentData();
  13. System.out.println("当前数据节点的子节点列表:");
  14. childDataList.forEach(x -> System.out.println(x.getPath()));
  15. childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
  16. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
  17. switch (event.getType()) {
  18. case INITIALIZED:
  19. System.out.println("childrenCache 初始化完成");
  20. break;
  21. case CHILD_ADDED:
  22. // 需要注意的是: 即使是之前已经存在的子节点,也会触发该监听,因为会把该子节点加入 childrenCache 缓存中
  23. System.out.println("增加子节点:" + event.getData().getPath());
  24. break;
  25. case CHILD_REMOVED:
  26. System.out.println("删除子节点:" + event.getData().getPath());
  27. break;
  28. case CHILD_UPDATED:
  29. System.out.println("被修改的子节点的路径:" + event.getData().getPath());
  30. System.out.println("修改后的数据:" + new String(event.getData().getData()));
  31. break;
  32. }
  33. }
  34. });
  35. Thread.sleep(1000 * 1000); //休眠以观察测试效果
  36. }