目录

ZooKeeper在Spark的使用

概述

摘抄一段 ZooKeeper 官网的一句话。大意就是 ZooKeeper 为分布式应用提供了高效可靠的分布式协调服务,提供了统一命名服务、配置管理和分布式锁等分布式的基础服务。

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

那么 Spark 在哪里有用到 ZooKeeper,并且用在什么地方呢?不要脸的在 IDE 上全局搜一下(其实是笔者对 ZooKeeper 相对熟一点,而且自己公司的项目也用 ZooKeeper 做些小功能,所以想看看 Spark 里是怎么用的。

Spark 对 ZooKeeper 的使用主要在以下几个类中。

1
2
org.apache.spark.deploy.master.ZooKeeperPersistenceEngine
org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent

基于ZooKeeper的持久化引擎

ZooKeeperPersistenceEngine 这个类主要是用于 Spark Master 高可用的持久化工作。其实很容易理解,作为一个分布式内存计算框架,每个环节都有「崩溃」的可能性,那么能不能减少诸如宕机、网络问题带来的影响呢?那么就是将运行中的各个环节,或者说有助于恢复这个集群状态的信息持久化下来。了解 Spark 高可用的实现的同学都知道,Spark 中有个参数 spark.deploy.recoveryMode,是当 Master 有问题的时候,重新启动恢复的时候用到的。有几个恢复的方式,从本地文件中恢复,又或者是本文介绍的,利用 ZooKeeper 存储状态,并从中恢复。

仔细看看代码,其实 Spark 里用到 ZooKeeper 的地方不多,而且用的相对都不复杂,所以跟着源码走是很好理清楚逻辑的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 篇幅有限,节选了部分代码,但是应该不影响文意
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer)
  extends PersistenceEngine
  with Logging {

  private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
  private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)

  SparkCuratorUtil.mkdir(zk, WORKING_DIR)

  // 重载的 persist 方法,调用的是 serializeIntoFile
  // 明明是写到 ZooKeeper,为什么还叫 IntoFile?其实无伤大雅
  override def persist(name: String, obj: Object): Unit = {
    serializeIntoFile(WORKING_DIR + "/" + name, obj)
  }
  
  // 写入 ZooKeeper 实际调用的是这个方法
  // 可以看到,创建的是一个持久性的节点
  // 序列化完 value 就可以写入指定的路径
  private def serializeIntoFile(path: String, value: AnyRef) {
    val serialized = serializer.newInstance().serialize(value)
    val bytes = new Array[Byte](serialized.remaining())
    serialized.get(bytes)
    zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)
  }
}

顺藤摸瓜,我还想看看 Master 和 Worker 等这些信息是如何写入的,那再往深看看。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 发现基于 ZooKeeper 实现的恢复模式的工厂类
private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
  extends StandaloneRecoveryModeFactory(conf, serializer) {
  
  // 从这里看看是哪里调用了这个方法创建了 ZooKeeperPersistenceEngine
  def createPersistenceEngine(): PersistenceEngine = {
    new ZooKeeperPersistenceEngine(conf, serializer)
  }

  def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
    new ZooKeeperLeaderElectionAgent(master, conf)
  }
}

然后在 org.apache.spark.deploy.master.Master 中发现了持久化模式的选择。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  // 如果选择 ZooKeeper 的恢复模式,那么就用上述的工厂方法创建 ZooKeeper 的持久化方法
  case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
      new ZooKeeperRecoveryModeFactory(conf, serializer)
    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  case "FILESYSTEM" =>
    val fsFactory =
      new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}

根据变量 persistenceEngine 可以找出以下几处调用,依照命名来看,应该就是处理持久化的组件信息的方法了,其中挑 addWorker 看看是如何记录 Worker 信息。

/zookeeper%E5%9C%A8spark%E7%9A%84%E4%BD%BF%E7%94%A8/img.png
1
2
3
final def addWorker(worker: WorkerInfo): Unit = {
  persist("worker_" + worker.id, worker)
}

看到这个方法,激发了我的好奇心,WorkInfo 究竟藏着什么信息呢。其实就是 Worker 的 id, host, cores 之类的信息,因为进入恢复模式的时候,是需要让新的 Master 恢复得到这些 Worker 的信息才能实现恢复和接管的。

1
2
3
4
5
6
7
8
private[spark] class WorkerInfo(
    val id: String,
    val host: String,
    val port: Int,
    val cores: Int,
    val memory: Int,
    val endpoint: RpcEndpointRef,
    val webUiAddress: String)

基于ZooKeeper的领导选举代理

领导选举机制可以保证集群虽然存在多个 Master,但是只有一个 Master 是激活的,其他都会处于 standby 状态。

这里简单提一下 ZooKeeper 是如何选主的。ZooKeeper 创建节点是强一致性的,可以保证在分布式高并发的情况下创建的节点一定是全局唯一的,因此只要 Spark 的一个 Master 被选为 Leader,那么在相应目录下,就一定只有这个 Master 是创建成功的,而其他 Master 会创建一个子节点的 Watcher,用于监控当前 Master 是否还存货,一旦他挂了,那么就会重新启动选主过程。Spark 在这里利用了 ZooKeeper 的这一特性,个人猜测也是因为这个原因,类名叫做 xxxAgent

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
    conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging  {

  val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"

  private var zk: CuratorFramework = _
  private var leaderLatch: LeaderLatch = _
  private var status = LeadershipStatus.NOT_LEADER

  start()

  // 获得 ZooKeeper 的客户端,创建用于选主的 LeaderLatch 实例
  private def start() {
    logInfo("Starting ZooKeeper LeaderElection agent")
    zk = SparkCuratorUtil.newClient(conf)
    leaderLatch = new LeaderLatch(zk, WORKING_DIR)
    // 增加一个监听器,监控主节点的存活
    leaderLatch.addListener(this)
    leaderLatch.start()
  }

  override def stop() {
    leaderLatch.close()
    zk.close()
  }

  // override 了 ZooKeeper 的 LeaderLatchListener 接口的方法
  // 用于判断当前节点是否为 Leader 节点
  override def isLeader() {
    synchronized {
      // could have lost leadership by now.
      if (!leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have gained leadership")
      updateLeadershipStatus(true)
    }
  }

  override def notLeader() {
    synchronized {
      // could have gained leadership by now.
      if (leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have lost leadership")
      updateLeadershipStatus(false)
    }
  }

  private def updateLeadershipStatus(isLeader: Boolean) {
    if (isLeader && status == LeadershipStatus.NOT_LEADER) {
      status = LeadershipStatus.LEADER
      masterInstance.electedLeader()
    } else if (!isLeader && status == LeadershipStatus.LEADER) {
      status = LeadershipStatus.NOT_LEADER
      masterInstance.revokedLeadership()
    }
  }

  private object LeadershipStatus extends Enumeration {
    type LeadershipStatus = Value
    val LEADER, NOT_LEADER = Value
  }
}

介绍完 ZooKeeperLeaderElectionAgent 的实现,可以看看 Master 是怎么使用的,下面的代码在上面也出现过。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
      new ZooKeeperRecoveryModeFactory(conf, serializer)
      // 如果 RECOVERY_MODE 是 ZOOKEEPER 的话,就会创建基于 ZooKeeper 的选主代理
    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  case "FILESYSTEM" =>
    val fsFactory =
      new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}

总结

总结下,ZooKeeper 作为一个分布式应用中优秀的组件,在 Spark 中的应用并不复杂(也是源于 Spark 的优秀吧),而包括持久化和选主的实现,我们完全可以参考 Spark 的实现方式,应用在自己的项目中。

参考资料

  1. Spark内核设计的技术-架构设计与实现
  2. 从Paxos到ZooKeeper
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。