ZooKeeper的应用场景
概述
ZooKeeper 是一个典型的发布/订阅模式的分布式数据管理与协调框架。
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.
ZooKeeper 解决了什么问题?
- 高性能使得 ZooKeeper 能够应用于对系统吞吐有明确要求的大型分布式系统
- 高可用可以解决分布式的单点问题
- 具有严格的顺序访问控制能力,主要是针对写操作的严格顺序性,使得客户端可以基于 ZooKeeper 来实现一些复杂的同步原语
一些基本概念
ZooKeepr 提供基于类似于文件系统的目录节点树方式的数据存储,这是一个共享的内存中的树型结构。有几个概念需要关注一下。
- Session 会话 客户端启动会与服务端建立一个 TCP 长连接,通过这个连接可以发送请求并接受响应,以及接受服务端的 Watcher 事件通知
- Znode 数据节点
/xxxx
就是一个 Znode,会保存自己的数据内容和属性信息,分为持久和临时节点,节点有 SEQUENTIAL 属性 - Version 版本 Stat 数据结构包含 version, cversion, aversion
- Watcher 事件监听器 客户端可以在 Znode 上注册 Watcher,服务端将事件通知已注册的客户端
|
|
典型应用场景和实现
利用 ZooKeeper 可以非常方便构建一系列分布式应用中都会涉及到的核心功能。
- 数据发布/订阅
- 负载均衡
- 命名服务
- 分布式协调/通知
- 集群管理
- Master 选举
- 分布式锁
- 分布式队列
多个开源项目中都应用到了 ZooKeeper,例如 HBase, Spark, Flink, Storm, Kafka, Dubbo 等等。
数据发布/订阅
数据发布/订阅的一个常见的场景是配置中心,发布者把数据发布到 ZooKeeper 的一个或一系列的节点上,供订阅者进行数据订阅,达到动态获取数据的目的。
配置信息一般有几个特点:
- 数据量小的KV
- 数据内容在运行时会发生动态变化
- 集群机器共享,配置一致
ZooKeeper 采用的是推拉结合的方式。
- 推: 服务端会推给注册了监控节点的客户端 Wathcer 事件通知
- 拉: 客户端获得通知后,然后主动到服务端拉取最新的数据
实现的思路可以如下。
|
|
- 把配置信息写到一个 Znode 上,例如
/Configuration
- 客户端启动初始化阶段读取服务端节点的数据,并且注册一个数据变更的 Watcher
- 配置变更只需要对 Znode 数据进行 set 操作,数据变更的通知会发送到客户端,客户端重新获取新数据,完成配置动态修改
负载均衡
负载均衡是一种手段,用来把对某种资源的访问分摊给不同的设备,从而减轻单点的压力。
实现的思路:
- 首先建立 Servers 节点,并建立监听器监视 Servers 子节点的状态(用于在服务器增添时及时同步当前集群中服务器列表)
- 在每个服务器启动时,在 Servers 节点下建立临时子节点 Worker Server,并在对应的字节点下存入服务器的相关信息,包括服务的地址,IP,端口等等
- 可以自定义一个负载均衡算法,在每个请求过来时从 ZooKeeper 服务器中获取当前集群服务器列表,根据算法选出其中一个服务器来处理请求
命名服务
命名服务就是提供名称的服务。ZooKeeper 的命名服务有两个应用方面。
- 提供类 JNDI 功能,可以把系统中各种服务的名称、地址以及目录信息存放在 ZooKeeper,需要的时候去 ZooKeeper 中读取
- 制作分布式的序列号生成器
利用 ZooKeeper 顺序节点的特性,制作分布式的序列号生成器,或者叫 id 生成器。(分布式环境下使用作为数据库 id,另外一种是 UUID(缺点:没有规律)),ZooKeeper 可以生成有顺序的容易理解的同时支持分布式环境的编号。
在创建节点时,如果设置节点是有序的,则 ZooKeeper 会自动在你的节点名后面加上序号,上面说容易理解,是比如说这样,你要获得订单的 id,你可以在创建节点时指定节点名为 order_[日期]_xxxxxx,这样一看就大概知道是什么时候的订单。
|
|
分布式协调/通知
一种典型的分布式系统机器间的通信方式是心跳。心跳检测是指分布式环境中,不同机器之间需要检测彼此是否正常运行。传统的方法是通过主机之间相互 PING 来实现,又或者是建立长连接,通过 TCP 连接固有的心跳检测机制来实现上层机器的心跳检测。
如果使用 ZooKeeper,可以基于其临时节点的特性,不同机器在 ZooKeeper 的一个指定节点下创建临时子节点,不同机器之间可以根据这个临时节点来判断客户端机器是否存活。
好处就是检测系统和被检系统不需要直接相关联,而是通过 ZooKeeper 节点来关联,大大减少系统的耦合。
集群管理
集群管理主要指集群监控和集群控制两个方面。前者侧重于集群运行时的状态的收集,后者则是对集群进行操作与控制。开发和运维中,面对集群,经常有如下需求:
- 希望知道集群中究竟有多少机器在工作
- 对集群中的每台机器的运行时状态进行数据收集
- 对集群中机器进行上下线的操作
分布式集群管理体系中有一种传统的基于 Agent 的方式,就是在集群每台机器部署 Agent 来收集机器的 CPU、内存等指标。但是如果需要深入到业务状态进行监控,比如一个分布式消息中间件中,希望监控每个消费者对消息的消费状态,或者一个分布式任务调度系统中,需要对每个机器删的任务执行情况进行监控。对于这些业务紧密耦合的监控需求,统一的 Agent 是不太合适的。
利用 ZooKeeper 实现集群管理监控组件的思路:
在管理机器上线/下线的场景中,为了实现自动化的线上运维,我们必须对机器的上/下线情况有一个全局的监控。通常在新增机器的时候,需要首先将指定的 Agent 部署到这些机器上去。Agent 部署启动之后,会首先向 ZooKeeper 的指定节点进行注册,具体的做法就是在机器列表节点下面创建一个临时子节点,例如 /machine/[Hostname]
(下文我们以“主机节点”代表这个节点),如下图所示。
当 Agent 在 ZooKeeper 上创建完这个临时子节点后,对 /machines
节点关注的监控中心就会接收到“子节点变更”事件,即上线通知,于是就可以对这个新加入的机器开启相应的后台管理逻辑。另一方面,监控中心同样可以获取到机器下线的通知,这样便实现了对机器上/下线的检测,同时能够很容易的获取到在线的机器列表,对于大规模的扩容和容量评估都有很大的帮助。
Master选举
分布式系统中 Master 是用来协调集群中其他系统单元,具有对分布式系统状态更改的决定权。比如一些读写分离的应用场景,客户端写请求往往是 Master 来处理的。
利用常见关系型数据库中的主键特性来实现也是可以的,集群中所有机器都向数据库中插入一条相同主键 ID 的记录,数据库会帮助我们自动进行主键冲突检查,可以保证只有一台机器能够成功。
但是有一个问题,如果插入成功的和护短机器成为 Master 后挂了的话,如何通知集群重新选举 Master?
利用 ZooKeeper 创建节点 API 接口,提供了强一致性,能够很好保证在分布式高并发情况下节点的创建一定是全局唯一性。
集群机器都尝试创建节点,创建成功的客户端机器就会成为 Master,失败的客户端机器就在该节点上注册一个 Watcher 用于监控当前 Master 机器是否存活,一旦发现 Master 挂了,其余客户端就可以进行选举了。
分布式锁
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。如果不同系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,一般需要通过一些互斥的手段来防止彼此之间的干扰,以保证一致性。
排他锁
如果事务 T1 对数据对象 O1 加上了排他锁,那么加锁期间,只允许事务 T1 对 O1 进行读取和更新操作。核心是保证当前有且仅有一个事务获得锁,并且锁释放后,所有正在等待获取锁的事务都能够被通知到。对于实现排他锁,简单地说就是多个客户端同时去竞争创建同一个临时子节点,ZooKeeper 能够保证只有一个客户端创建成功,那么这个创建成功的客户端就获得排他锁。正常情况下,这个客户端执行完业务逻辑会删除这个节点,也就是释放了锁。如果该客户端宕机了,那么这个临时节点会被自动删除,锁也会被释放。
通过 ZooKeeper 上的 Znode 可以表示一个锁,/x_lock/lock
。
- 获取锁
所有客户端都会通过调用
create()
接口尝试在/x_lock
创建临时子节点/x_lock/lock
。最终只有一个客户端创建成功,那么该客户端就获取了锁。同时没有获取到锁的其他客户端,注册一个子节点变更的 Watcher 监听。 - 释放锁 获取锁的客户端发生宕机或者正常完成业务逻辑后,就会把临时节点删除。临时子节点删除后,其他客户端又开始新的一轮获取锁的过程。
共享锁
又称为读锁。允许一个资源可以被多个读操作访问,或者被一个写操作访问,但两者不能同时进行。通过 ZooKeeper 上的 Znode 表示一个锁,/s_lock/[HOSTNAME]-请求类型-序号
。
|
|
- 获取锁
需要获得共享锁的客户端都会在
s_lock
这个节点下面创建一个临时顺序节点,如果当前是读请求,就创建类型为 R 的临时节点,如果是写请求,就创建类型为 W 的临时节点。 - 判断读写顺序
共享锁下不同事务可以同时对同一个数据对象进行读取操作,而更新操作必须在当前没有任何事务进行读写操作的情况下进行。
2.1 创建完节点后,获取
s_lock
的所有子节点,并对该节点注册子节点变更的 Watcher 监听 2.2 然后确定自己的节点序号在所有的子节点中的顺序 2.3 对于读请求,如果没有比自己小的子节点,或是所有比自己序号小的子节点都是读请求,那么表明自己已经成功获取到了共享锁,同时开始执行读取逻辑,如果有比自己序号小的写请求,那么就需要进行等待 2.4 接收到 Watcher 通知后重复 2.1 - 释放锁 获取锁的客户端发生宕机或者正常完成业务逻辑后,就会把临时节点删除。临时子节点删除后,其他客户端又开始新的一轮获取锁的过程。
羊群效应
在 2.7.2 介绍的共享锁中,在判断读写顺序的时候会出现一个问题,假如 host4 在移除自己的节点的时候,后面 host5-7 都需要接收 Watcher 事件通知,但是实际上,只有 host5 接收到事件就可以了。因此以上的实现方式会产生大量的 Watcher 通知。这样会对 ZooKeeper 服务器造成了巨大的性能影响和网络冲击,这就是羊群效应。
改进的一步在于,调用 getChildren()
接口的时候获取到所有已经创建的子节点列表,但是这个时候不要注册任何的 Watcher。当无法获取共享锁的时候,调用 exist()
来对比自己小的那个节点注册 Wathcer。而对于读写请求,会有不同的定义:
- 读请求: 在比自己序号小的最后一个写请求节点注册 Watcher。
- 写请求: 向比自己序号小的最后一个节点注册 Watcher。
分布式队列
FIFO
使用 ZooKeeper 实现 FIFO 队列,入队操作 offer就是在 queue_fifo
下创建自增序的子节点,并把数据放入节点内。出队操作 poll就是先找到 queue_fifo
下序号最小的那个节点,取出数据,然后删除此节点。
|
|
出队操作,根据以下步骤确定执行顺序:
- 通过
get_children()
接口获取/queue_fifo
节点下所有子节点 - 通过自己的节点序号在所有子节点中的顺序
- 如果不是最小的子节点,那么进入等待,同时向比自己序号小的最后一个子节点注册 Watcher 监听
- 接收到 Watcher 通知后重复 1
Barrier
Barrier就是栅栏或者屏障,适用于这样的业务场景:当有些操作需要并行执行,但后续操作又需要串行执行,此时必须等待所有并行执行的线程全部结束,才开始串行,于是就需要一个屏障,来控制所有线程同时开始,并等待所有线程全部结束。
利用 ZooKeeper 的实现,开始时 queue_barrier
节点是一个已经存在的默认节点,并且将其节点的数据内容赋值为一个数字 n 来代表 Barrier 值,比如 n=10 代表只有当 /queue_barrier
节点下的子节点个数达到10才会打开 Barrier。之后所有客户端都会在 queue_barrier
节点下创建一个临时节点,如 queue_barrier/host1
。
如何控制所有线程同时开始?
所有的线程启动时在 ZooKeeper 节点 /queue_barrier
下插入顺序临时节点,然后检查 /queue/barrier
下所有 children 节点的数量是否为所有的线程数,如果不是,则等待,如果是,则开始执行。具体的步骤如下:
getData()
获取/queue_barrier
节点的数据内容getChildren()
获取/queue_barrier
节点下的所有子节点,同时注册对子节点列表变更的 Watche 监听。- 统计子节点的个数
- 如果子节点个数不足10,那么进入等待
- 接收 Watcher 通知后,重复2
如何等待所有线程结束?
所有线程在执行完毕后,都检查 /queue/barrier
下所有 children 节点数量是否为0,若不为0,则继续等待。
用什么类型的节点? 根节点使用持久节点,子节点使用临时节点,根节点为什么要用持久节点?首先因为临时节点不能有子节点,所以根节点要用持久节点,并且在程序中要判断根节点是否存在。 子节点为什么要用临时节点?临时节点随着连接的断开而消失,在程序中,虽然会删除临时节点,但可能会出现程序在节点被删除之前就 crash了,如果是持久节点,节点不会被删除。
ZooKeeper在大型分布式系统的应用
Hadoop
Hadoop 利用 ZooKeeper 实现了高可用的功能,包括 HDFS 的 NameNode 和 YARN 的 ResourceManager。此外 YARN 的运行状态是利用 ZooKeeper 来存储的。
ResourceManager HA
- 多个 RM 在
/yarn-leader-election/pseudo-yarn-rm-cluster
竞争创建锁节点 - 注册 Watcher,创建锁成功的 RM 为 Active 节点,创建锁不成功的 RM 监听此节点,并成为 Stanby 状态
- 当 Active RM 挂了,通知 Standby RM,开始新一轮竞争
Fencing
Fencing 一般指解决脑裂这样的问题,YARN 引入了 Fencing 机制,利用的是 ZooKeeper 数据节点的 ACL 权限控制。
如果 RM1 假死,此时 RM2 成为 Active 状态,当 RM1 恢复之后,会试图去更新 ZooKeeper 里的数据,但此时会发现写上了 ACL 权限的节点无法修改,这样就可以避免了脑裂。
Spark
Spark 对 ZooKeeper 的使用主要在以下几个类中。
|
|
ZooKeeperPersistenceEngine
ZooKeeperPersistenceEngine 这个类主要是用于 Spark Master 高可用的持久化工作。其实很容易理解,作为一个分布式内存计算框架,每个环节都有「崩溃」的可能性,那么能不能减少诸如宕机、网络问题带来的影响呢?那么就是将运行中的各个环节,或者说有助于恢复这个集群状态的信息持久化下来。
Spark 中有个参数 spark.deploy.recoveryMode
,是当 Master 有问题的时候,重新启动恢复的时候用到的。有几个恢复的方式,从本地文件中恢复,又或者是本文介绍的,利用 ZooKeeper 存储状态,并从中恢复。
ZooKeeperLeaderElectionAgent
领导选举机制可以保证集群虽然存在多个 Master,但是只有一个 Master 是 Active 的,其他都会处于 Standby 状态。
应用 ZooKeeper 选主,利用 ZooKeeper 创建节点是强一致性的,可以保证在分布式高并发的情况下创建的节点一定是全局唯一的,因此只要 Spark 的一个 Master 被选为 Leader,那么在相应目录下,就一定只有这个 Master 是创建成功的,而其他 Master 会创建一个子节点的 Watcher,用于监控当前 Master 是否还存活,一旦他挂了,那么就会重新启动选主过程。
Kafka
Kafka 中大部分组件都应用了 ZooKeeper。
- Broker 注册
/broker/ids/[0...N]
记录了 Broker 服务器列表记录,这个临时节点的节点数据是 ip 端口之类的信息。 - Topic 注册
/broker/topcs
记录了 Topic 的分区信息和 Broker 的对应关系 - 生产者负载均衡 生产者需要将消息发送到对应的 Broker 上,生产者通过 Broker 和 Topic 注册的信息,以及 Broker 和 Topic 的对应关系和变化注册事件 Watcher 监听,从而实现一种动态的负载均衡机制。
- 消息消费进度 Offset 记录 消费者对指定消息分区进行消息消费的过程中,需要定时将分区消息的消费进度 Offset 记录到 ZooKeeper 上,以便消费者进行重启或者其他消费者重新阶段该消息分区的消息消费后,能够从之前的进度开始继续系消费
Dubbo
Dubbo 基于 ZooKeeper 实现了服务注册中心。哪一个服务由哪一个机器来提供必需让调用者知道,简单来说就是 ip 地址和服务名称的对应关系。ZooKeeper 通过心跳机制可以检测挂掉的机器并将挂掉机器的 ip 和服务对应关系从列表中删除。
至于支持高并发,简单来说就是横向扩展,在不更改代码的情况通过添加机器来提高运算能力。通过添加新的机器向 ZooKeeper 注册服务,服务的提供者多了能服务的客户就多了。
服务提供者在启动的时候,向 ZooKeeper 上的指定节点 /dubbo/${serviceName}/providers
目录下写入自己的 URL 地址,这个操作就完成了服务的发布。
服务消费者启动的时候,订阅 /dubbo/${serviceName}/providers
目录下的提供者 URL 地址, 并向 /dubbo/${serviceName}/consumers
目录下写入自己的URL地址。
Canal
Canal 的 HA 分为两部分,Canal server 和 Canal client 分别有对应的 HA 实现。
- Canal server: 为了减少对 Mysql dump 的请求,不同 server 上的 instance 要求同一时间只能有一个处于 running,其他的处于 Standby 状态。
- Canal client: 为了保证有序性,一份 instance 同一时间只能由一个 canal client 进行 get/ack/rollback 操作,否则客户端接收无法保证有序。
- Canal server 要启动某个 Canal instance 时都先向 Zookeeper 进行一次尝试启动判断,通过在 ZooKeeper 里创建临时节点的方式。
- 创建 ZooKeeper 节点成功后,对应的 Canal server 就启动对应的 Canal instance,没有创建成功的 Canal instance 就会处于 Standby 状态,一旦 ZooKeeper 发现 Canal server A 创建的节点消失后,立即通知其他的 Canal server 再次进行步骤1的操作,重新选出一个 Canal server 启动 instance。
- Canal client 每次进行 connect 时,会首先向 ZooKeeper 询问当前是谁启动了 Canal instance,然后和其建立连接,一旦连接不可用,会重新尝试 connect。
总结
由于 ZooKeeper 可以很好保证分布式环境中数据的强一致性,也是基于这个特点,使得越来越多的分布式系统将 ZooKeeper 作为核心组件进行封装使用。在以上提到的这些分布式系统的常见的应用场景下,利用 ZooKeeper 可以快速的实现相关的组件,而无需重新造轮子。
很多大型开源的分布式系统中应用 ZooKeeper 来实现组件的时候,一般都会考虑使用 Apache Curator 是 Netflix 公司开源的一个 ZooKeeper 客户端,提供了更高层次和更易用的操作 ZooKeeper 的接口。
参考资料
- 从Paxos到ZooKeeper分布式一致性原理与实践
- http://zookeeper.apache.org/