目录

ZooKeeper监听器介绍

概述

最近频繁需要利用 ZK 来做工具,这里顺便整理一下最近学到的东西。需要知道,如果要使用 Zookeeper,Curator 库是个好东西。在使用原生的 ZooKeeper 的时候,是可以使用 Watcher 对节点进行监听的,但是唯一不方便的是一个 Watcher 只能生效一次,也就是说每次进行监听回调之后我们需要自己重新的设置监听才能达到永久监听的效果。而 Curator 则在这方面进行了封装,使得我们不需要手动的重复监听了。本文主要讲监听器的部分。

1
2
3
4
.
├── NodeCacheListener.java
├── PathChildrenCacheListener.java
└── TreeCacheListener.java

Curator 实现了三个监听器,作用分别是:

  1. NodeCache: 用来监控一个 znode, 当节点的数据修改或者删除时,Node Cache 能更新它的状态包含最新的改变
  2. PathChildrenCache: 用来监控一个 znode 的子节点。当一个子节点增加、更新、删除时, Path Cache会改变它的状态,会包含最新的子节点,子节点的数据和状态。(此处需要注意,他只会监听一级子节点,不会循环监听子节点下面的 child)
  3. TreeCache: 可以用来监控节点的状态,还监控节点的子节点的状态,类似上面两种 cache 的组合。这也就是 Tree 的概念。它监控整个树中节点的状态。(只要是所监听的路径下的所有叶子节点都会监听)

需要注意,早期的 Curator 是没有提供 TreeCache 相关的 API 的,也就是说如果需要监听这种树状结构的话,需要自行实现,Spark 的依赖 Jar 包中就有用到没有实现 TreeCache 的 2.6.0 版本。运行以下代码之前,肯定需要在本地开启一个 ZK 的服务,可以是单点的,也就是本地。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/**
 * @author runzhliu
 */
public class CuratorPathChildrenListenerExample {

    /**
     * zookeeper地址
     */
    private static final String CONNECT_ADDR = "localhost:2181";
    /**
     * session超时时间
     */
    private static final int SESSION_OUTTIME = 5000;

    public static void main(String[] args) throws Exception {

        // 1 重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);

        // 2 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(CONNECT_ADDR)
                .sessionTimeoutMs(SESSION_OUTTIME)
                .retryPolicy(retryPolicy)
                .build();

        // 3 建立连接
        cf.start();

        // 4 建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受
        PathChildrenCache cache = new PathChildrenCache(cf, "/super", true);

        //5 在初始化的时候就进行缓存监听
        cache.start(StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            /**
             * 方法名称:监听子节点变更,包括新建、修改、删除
             */
            @Override
            public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("新增子节点:" + event.getData().getPath());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("修改子节点:" + event.getData().getPath());
                        break;
                    case CHILD_REMOVED:
                        System.out.println("删除子节点:" + event.getData().getPath());
                        break;
                    default:
                        break;
                }
            }
        });

        // 创建本身节点不发生变化,也没有日志输出
        cf.create().forPath("/super", "init".getBytes());

        // 添加子节点,这时候会触发监听器
        Thread.sleep(1000);
        cf.create().forPath("/super/c1", "c1内容".getBytes());
        Thread.sleep(1000);
        cf.create().forPath("/super/c2", "c2内容".getBytes());

        // 修改子节点,这时候会触发监听器
        Thread.sleep(1000);
        cf.setData().forPath("/super/c1", "c1更新内容".getBytes());

        // 删除子节,这时候会触发监听器
        Thread.sleep(1000);
        cf.delete().forPath("/super/c2");

        // 删除本身节点,是不会触发监听器的
        Thread.sleep(1000);
        cf.delete().deletingChildrenIfNeeded().forPath("/super");

        Thread.sleep(Integer.MAX_VALUE);
    }
}

一个 TreeCache Listener 的例子。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
object TreeCacheExample extends Logging {
  /**
   * zookeeper地址
   */
  private val CONNECT_ADDR = "localhost:2181"
  /**
   * session超时时间
   */
  private val SESSION_OUTTIME = 5000

  def main(args: Array[String]): Unit = {

    val _client = CuratorFrameworkFactory.newClient(
      CONNECT_ADDR,
      new RetryNTimes(10, SESSION_OUTTIME)
    )
    _client.start()

    Thread.sleep(3000)

    val treeCache = new TreeCache(_client, "/config-service")

    val listener = new TreeCacheListener {
      override def childEvent(client: CuratorFramework, event: TreeCacheEvent): Unit = {
        val data = event.getData
        if (data != null) {
          logger.info("get path: [ " + data.getPath + s" ], and event type: [ ${event.getType} ].")
          event.getType match {
            // scalastyle:off println
            case Type.NODE_ADDED => logger.info(s"add node for: ${data.getPath}")
            case Type.NODE_REMOVED => logger.info(s"remove node for: ${data.getPath}")
            case Type.NODE_UPDATED => logger.info(s"update node for: ${data.getPath}")
            // never happen!
            case Type.INITIALIZED => logger.info(s"initialized")
            case Type.CONNECTION_RECONNECTED => logger.info("reconnect")
            // sync config
            case Type.CONNECTION_SUSPENDED =>
            case Type.CONNECTION_LOST =>
              logger.error("lost zookeeper connection.")
          }
        } else {
          logger.warn(s"got an null data child event. envet type: [ ${event.getType} ].")
        }
      }
    }

    treeCache.start()
    treeCache.getListenable.addListener(listener)

    Thread.sleep(Integer.MAX_VALUE)
  }
}

参考资料

  1. https://blog.csdn.net/Leafage_M/article/details/78735485
  2. http://colobu.com/2014/12/15/zookeeper-recipes-by-example-5/
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。