Spark-Kubernetes源码分析系列-动态资源分配
概述
由于 Spark 正在就 SPIP: Use remote storage for persisting shuffle data 在改造 External Shuffle Service,所以目前 Spark on Kubernetes 还未能通过 External Shuffle Service 进行动态资源分配(在之前的文章也已经介绍过),所以本文主要就一个已经合并进行 Master 的 commit,来分析一下 Spark on Kubernetes 下,目前 TenC 弹性计算 Spark 任务是如何实现动态资源分配的。
Executor增加和移除的过程
要分析动态资源分配,首先要理解 Spark 在以 Kubernetes 作为 Cluster Manager 下,是如何创建和移除 Executor 的。
KubernetesClusterManager
继承自 ExternalClusterManager
,作为 Spark 的集群管理者,都需要继承这个类。其主要作用是创建 SchedulerBackend
和 TaskScheduler
。
ExecutorPodsAllocator
ExecutorPodsAllocator
是 KubernetesClusterManager
的成员之一,其作用就是将 Spark 作业的实际 Executor 数目通过方法 setTotalExpectedExecutors
不断调整到目标数量。
Spark 创建 Executor 是按批次 batch 一轮一轮来的。spark.kubernetes.allocation.batch.delay
是每一轮分配 Executor 的等待时间。其作用是定时去查看一个维护着 applicationId
和对应 Executor 快照 SnapShot 的队列。
每过 spark.kubernetes.allocation.batch.delay
的秒数,就会去查看一下以下三种情况。
- 队列的Executor Pod中处于
Pending
和Running
的Executor数量 - 已经被请求Requested,但是还没有入列的Executor
newlyCreatedExecutors
刚刚创建成功的Executor数量
系统会统计上述三者的和是否等于目标数量 totalExpectedExecutors
。下面是描述上述关系的简单公式。
currentRunningCount + currentPendingExecutors.size + newlyCreatedExecutors.size = totalExpectedExecutors
接下来再讲一下维护 Executor Pod 数量的关键角色 ExecutorPodsSnapshot
。
ExecutorPodsSnapshot
顾名思义,ExecutorPodsSnapshot
就是对应的 Spark 作业在 Kubernetes 中存在的 Executor Pod 状态的快照,其本质上就是一个以 Executor Id 为 key,状态为 value 的 Map[Long, ExecutorPodState]
。
我们知道在 Kubernetes 里 Pod 的状态 State 是通过阶段 Phase 来表示的,在 Kubernetes 中有以下几种 Phase。
- Pending
- Running
- Succeeded
- Failed
- Unknown
通过 toStatesByExecutorId
方法获取 Executor Pod 的状态,并且通过 withUpdate
来更新 Pod 的状态。
ExecutorPodsPollingSnapshotSource
ExecutorPodsPollingSnapshotSource
这个类的作用是主动去请求 Kubernetes ApiServer,对应的 Application Id 下的所有 Executor Pod 的状态,用于个更新 SnapShot 的状态。可以通过 spark.kubernetes.executor.apiPollingInterval
参数来控制主动同步 Executor Pod 状态的时间间隔。
ExecutorPodsWatchSnapshotSource
ExecutorPodsWatchSnapshotSource
是用来 Watch 对应的 Application Id 的 Executor Pod 状态。当 Executor Pod 状态发生变化的时候,就会接收到由 Kubernetes 集群发送的事件,然后就会去更新 Executor Pod 在 Snapshot 的状态。
ExecutorPodsSnapshotsStoreImpl
ExecutorPodsSnapshotsStoreImpl
这个类主要的功能是依照发布-订阅者模式,生产者将 Executor Pod 的状态发送给订阅者,订阅者根据 Executor Pod 的状态来进行反应。
任意一个 Executor Pod 的状态变化的时候,会通过 updatePod()
方法来将更新信息发送出去,而方法 replaceSnapshot()
则可以同步某个 Application 对应的所有 Executor Pod 的最新状态。
ExecutorPodsLifecycleManager
ExecutorPodsLifecycleManager
主要作用是根据 SnapShot 中 Executor Pod 的状态,再同步 Kubernetes 里和 Spark 的 Exeucutor 状态。
非External Shuffle Service下的动态资源分配
前文提到,在 on Yarn 模式下,如果需要开启动态资源分配,就需要在 Yarn 的节点上部署一个 External Shuffle Service 服务。而 on Kubernetes 上,暂时是没有开箱即用的选项的,所以研究一下这个 commit,看看如何不通过 External Shuffle Service 来实现动态资源分配。
shuffleId
在 Shuffle 过程中是非常重要的,shuffle write 的文件名也是以 shuffleId
作为命名的一部分,所以如果要记录哪个 Stage 产生了 shuffle 文件,根据 shuffleId 是可以找到对应的 Stage,和 Stage 对应的 Job,所以可以在该 Job 还没结束之前一直 hold 住 Executor。
工作负荷是怎么计算的
动态资源分配里,常说需要根据工作负荷来动态调整 Executor 的数目,那工作负荷具体是怎么衡量的呢?
maxNumExecutorsNeeded()
方法是用来计算当前最大需要的 Executor 数量,而他其实就是由当前运行的 Task 数量和还没成功调度的 Pending 的 Task 的数量,乘以一个分配的比率 spark.dynamicAllocation.executorAllocationRatio
,然后除以每个 Executor 可以运行的 Task 数量(默认为1)。
|
|
总结
本文主要从 Spark 的 Kubernetes 模块的源码分析了,如何在不使用 External Shuffle Service 的情况下开启动态资源分配的特性。需要注意的是,这个方法并不是完美替代 External Shuffle Service 的方案,不过目前在我们平台上使用也表现出足够的稳定性了。