概述
摘抄一段 ZooKeeper 官网的一句话。大意就是 ZooKeeper 为分布式应用提供了高效可靠的分布式协调服务,提供了统一命名服务、配置管理和分布式锁等分布式的基础服务。
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.
那么 Spark 在哪里有用到 ZooKeeper,并且用在什么地方呢?不要脸的在 IDE 上全局搜一下(其实是笔者对 ZooKeeper 相对熟一点,而且自己公司的项目也用 ZooKeeper 做些小功能,所以想看看 Spark 里是怎么用的。
Spark 对 ZooKeeper 的使用主要在以下几个类中。
1
2
|
org.apache.spark.deploy.master.ZooKeeperPersistenceEngine
org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent
|
基于ZooKeeper的持久化引擎
ZooKeeperPersistenceEngine
这个类主要是用于 Spark Master 高可用的持久化工作。其实很容易理解,作为一个分布式内存计算框架,每个环节都有「崩溃」的可能性,那么能不能减少诸如宕机、网络问题带来的影响呢?那么就是将运行中的各个环节,或者说有助于恢复这个集群状态的信息持久化下来。了解 Spark 高可用的实现的同学都知道,Spark 中有个参数 spark.deploy.recoveryMode
,是当 Master 有问题的时候,重新启动恢复的时候用到的。有几个恢复的方式,从本地文件中恢复,又或者是本文介绍的,利用 ZooKeeper 存储状态,并从中恢复。
仔细看看代码,其实 Spark 里用到 ZooKeeper 的地方不多,而且用的相对都不复杂,所以跟着源码走是很好理清楚逻辑的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// 篇幅有限,节选了部分代码,但是应该不影响文意
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer)
extends PersistenceEngine
with Logging {
private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
SparkCuratorUtil.mkdir(zk, WORKING_DIR)
// 重载的 persist 方法,调用的是 serializeIntoFile
// 明明是写到 ZooKeeper,为什么还叫 IntoFile?其实无伤大雅
override def persist(name: String, obj: Object): Unit = {
serializeIntoFile(WORKING_DIR + "/" + name, obj)
}
// 写入 ZooKeeper 实际调用的是这个方法
// 可以看到,创建的是一个持久性的节点
// 序列化完 value 就可以写入指定的路径
private def serializeIntoFile(path: String, value: AnyRef) {
val serialized = serializer.newInstance().serialize(value)
val bytes = new Array[Byte](serialized.remaining())
serialized.get(bytes)
zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)
}
}
|
顺藤摸瓜,我还想看看 Master 和 Worker 等这些信息是如何写入的,那再往深看看。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// 发现基于 ZooKeeper 实现的恢复模式的工厂类
private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
extends StandaloneRecoveryModeFactory(conf, serializer) {
// 从这里看看是哪里调用了这个方法创建了 ZooKeeperPersistenceEngine
def createPersistenceEngine(): PersistenceEngine = {
new ZooKeeperPersistenceEngine(conf, serializer)
}
def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
new ZooKeeperLeaderElectionAgent(master, conf)
}
}
|
然后在 org.apache.spark.deploy.master.Master
中发现了持久化模式的选择。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
// 如果选择 ZooKeeper 的恢复模式,那么就用上述的工厂方法创建 ZooKeeper 的持久化方法
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
|
根据变量 persistenceEngine
可以找出以下几处调用,依照命名来看,应该就是处理持久化的组件信息的方法了,其中挑 addWorker
看看是如何记录 Worker 信息。
1
2
3
|
final def addWorker(worker: WorkerInfo): Unit = {
persist("worker_" + worker.id, worker)
}
|
看到这个方法,激发了我的好奇心,WorkInfo
究竟藏着什么信息呢。其实就是 Worker 的 id, host, cores 之类的信息,因为进入恢复模式的时候,是需要让新的 Master 恢复得到这些 Worker 的信息才能实现恢复和接管的。
1
2
3
4
5
6
7
8
|
private[spark] class WorkerInfo(
val id: String,
val host: String,
val port: Int,
val cores: Int,
val memory: Int,
val endpoint: RpcEndpointRef,
val webUiAddress: String)
|
基于ZooKeeper的领导选举代理
领导选举机制可以保证集群虽然存在多个 Master,但是只有一个 Master 是激活的,其他都会处于 standby 状态。
这里简单提一下 ZooKeeper 是如何选主的。ZooKeeper 创建节点是强一致性的,可以保证在分布式高并发的情况下创建的节点一定是全局唯一的,因此只要 Spark 的一个 Master 被选为 Leader,那么在相应目录下,就一定只有这个 Master 是创建成功的,而其他 Master 会创建一个子节点的 Watcher,用于监控当前 Master 是否还存货,一旦他挂了,那么就会重新启动选主过程。Spark 在这里利用了 ZooKeeper 的这一特性,个人猜测也是因为这个原因,类名叫做 xxxAgent
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
private var zk: CuratorFramework = _
private var leaderLatch: LeaderLatch = _
private var status = LeadershipStatus.NOT_LEADER
start()
// 获得 ZooKeeper 的客户端,创建用于选主的 LeaderLatch 实例
private def start() {
logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
// 增加一个监听器,监控主节点的存活
leaderLatch.addListener(this)
leaderLatch.start()
}
override def stop() {
leaderLatch.close()
zk.close()
}
// override 了 ZooKeeper 的 LeaderLatchListener 接口的方法
// 用于判断当前节点是否为 Leader 节点
override def isLeader() {
synchronized {
// could have lost leadership by now.
if (!leaderLatch.hasLeadership) {
return
}
logInfo("We have gained leadership")
updateLeadershipStatus(true)
}
}
override def notLeader() {
synchronized {
// could have gained leadership by now.
if (leaderLatch.hasLeadership) {
return
}
logInfo("We have lost leadership")
updateLeadershipStatus(false)
}
}
private def updateLeadershipStatus(isLeader: Boolean) {
if (isLeader && status == LeadershipStatus.NOT_LEADER) {
status = LeadershipStatus.LEADER
masterInstance.electedLeader()
} else if (!isLeader && status == LeadershipStatus.LEADER) {
status = LeadershipStatus.NOT_LEADER
masterInstance.revokedLeadership()
}
}
private object LeadershipStatus extends Enumeration {
type LeadershipStatus = Value
val LEADER, NOT_LEADER = Value
}
}
|
介绍完 ZooKeeperLeaderElectionAgent
的实现,可以看看 Master 是怎么使用的,下面的代码在上面也出现过。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
// 如果 RECOVERY_MODE 是 ZOOKEEPER 的话,就会创建基于 ZooKeeper 的选主代理
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
|
总结
总结下,ZooKeeper 作为一个分布式应用中优秀的组件,在 Spark 中的应用并不复杂(也是源于 Spark 的优秀吧),而包括持久化和选主的实现,我们完全可以参考 Spark 的实现方式,应用在自己的项目中。
参考资料
- Spark内核设计的技术-架构设计与实现
- 从Paxos到ZooKeeper
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。