概述
features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kind 的 YAML 文件。
分析
看看 features 包里的代码。这里面都是 Spark 在 K8S 中构建各种资源的步骤。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
/path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features
├── BasicDriverFeatureStep.scala
├── BasicExecutorFeatureStep.scala
├── DriverCommandFeatureStep.scala
├── DriverKubernetesCredentialsFeatureStep.scala
├── DriverServiceFeatureStep.scala
├── EnvSecretsFeatureStep.scala
├── ExecutorKubernetesCredentialsFeatureStep.scala
├── HadoopConfDriverFeatureStep.scala
├── KerberosConfDriverFeatureStep.scala
├── KubernetesFeatureConfigStep.scala
├── LocalDirsFeatureStep.scala
├── MountSecretsFeatureStep.scala
├── MountVolumesFeatureStep.scala
└── PodTemplateConfigMapStep.scala
|
还记得 Spark Kubernetes 的源码分析系列 - submit 文章里提到的,在 KubernetesDriverBuilder
中,有一个 features
这个变量,这里需要 new 很多配置,也就是具体的用来配置 Pod 的一些步骤。
1
2
3
4
5
6
7
8
9
10
11
12
|
val features = Seq(
new BasicDriverFeatureStep(conf),
new DriverKubernetesCredentialsFeatureStep(conf),
new DriverServiceFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new DriverCommandFeatureStep(conf),
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf))
|
下面我们按照顺序来分析一下。
BasicDriverFeatureStep
类名就告诉我们,他是干嘛用的了,就是 Driver Feature 相对 Basic 的部分 feature,那么 Baisc 的 feature 包括什么呢?
1
2
3
4
5
6
7
8
9
|
driverPodName // Driver Pod 的名字
driverContainerImage // Driver Container
driverCpuCores // Driver 需要的 Cpu Cores
driverCoresRequest // Driver 的 Request Cpu Cores(K8S相关)
driverLimitCores // Driver 的 Limit Cpu Cores(K8S相关)
driverMemoryMiB // Driver 的内存 MiB
overheadFactor // 这个稍后会讲到
memoryOverheadMiB // 这个稍后会讲到
driverMemoryWithOverheadMiB // 这个稍后会讲到
|
以上的参数,生成后,主要是用于配置 Pod 和 Container 的参数。这一块内容又长又臭,可以看看我写在里面的注释。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
# 一堆的 Builder
val driverContainer = new ContainerBuilder(pod.container)
# Container Name
.withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))
# Image Name
.withImage(driverContainerImage)
# Image 拉取的策略
.withImagePullPolicy(conf.imagePullPolicy)
# Driver 的端口
.addNewPort()
.withName(DRIVER_PORT_NAME)
.withContainerPort(driverPort)
.withProtocol("TCP")
.endPort()
.addNewPort()
# Block Manager 的 Port 相关配置
.withName(BLOCK_MANAGER_PORT_NAME)
.withContainerPort(driverBlockManagerPort)
.withProtocol("TCP")
.endPort()
.addNewPort()
# Spark UI 的端口配置
.withName(UI_PORT_NAME)
.withContainerPort(driverUIPort)
.withProtocol("TCP")
.endPort()
.addNewEnv()
# 一些环境变量
.withName(ENV_SPARK_USER)
.withValue(Utils.getCurrentUserName())
.endEnv()
.addAllToEnv(driverCustomEnvs.asJava)
.addNewEnv()
.withName(ENV_DRIVER_BIND_ADDRESS)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "status.podIP")
.build())
.endEnv()
.editOrNewResources()
# cpu 相关配置
.addToRequests("cpu", driverCpuQuantity)
.addToLimits(maybeCpuLimitQuantity.toMap.asJava)
.addToRequests("memory", driverMemoryQuantity)
.addToLimits("memory", driverMemoryQuantity)
.addToLimits(driverResourceQuantities.asJava)
.endResources()
# 终于 build 完
.build()
val driverPod = new PodBuilder(pod.pod)
# 如果 Pod 是存在的,表示要么修改,否则就是新增
.editOrNewMetadata()
# Pod 的名字
.withName(driverPodName)
# Pod 的 Label
.addToLabels(conf.labels.asJava)
.addToAnnotations(conf.annotations.asJava)
.endMetadata()
.editOrNewSpec()
# Pod 的重启策略
.withRestartPolicy("Never")
# Pod 的 NodeSelector 特性
.addToNodeSelector(conf.nodeSelector.asJava)
# 拉取镜像的 Repository 密码(ru
.addToImagePullSecrets(conf.imagePullSecrets: _*)
.endSpec()
.build()
|
此外 getAdditionalPodSystemProperties()
还需要这个方法是拉取其他的配置,比如说 spark.app.id
等等,不赘述了。
DriverKubernetesCredentialsFeatureStep
这个 Step 是用于配置 Driver 的安全认证相关的配置,一般认为就是 K8S 那一套安全认证的机制了。
1
2
3
4
5
6
7
8
9
10
11
|
maybeMountedOAuthTokenFile // OAuthToken 文件
maybeMountedClientKeyFile // Client Key 文件
maybeMountedClientCertFile // Cient Cert 文件
maybeMountedCaCertFile // Ca Cert 文件
driverServiceAccount // Driver 的 Service Account
oauthTokenBase64 // OauthToken Base64 编码
caCertDataBase64 // CaCert 里面的数据 Base64 编码
clientKeyDataBase64 // Client Key 数据的 Base64 编码
clientCertDataBase64 // Client Cert 数据的 Base 64 编码
shouldMountSecret // 是否需要挂载 Secret
driverCredentialsSecretName // Driver 的认证 Secret 名
|
这里有很多关于访问 ApiServer 的安全认证的细节,如果不熟悉 K8S 的同学,需要补补课。下面是这个 Step 的关键方法,也就是把这些安全相关的文件通过 secret 保存下来。
1
2
3
4
5
6
7
8
|
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
// 如果 conf 存在以上提及的一些认证文件,则会进行挂载 Secret
if (shouldMountSecret) {
Seq(createCredentialsSecret())
} else {
Seq.empty
}
}
|
DriverServiceFeatureStep
这个就是配置 Driver Service 的 Step,因为 Pod 在 K8S 集群里,创建 Executor 需要不同的 Executor Pod 访问到 Driver Pod,才能注册上,也包括 Block Manager 以及 Spark UI 的端口和服务负载配置。
1
2
3
4
5
|
preferredServiceName // Service Name
resolvedServiceName // 上面的 Service Name 超过63个字符的话需要重新配置
driverPort // Driver 的端口
driverBlockManagerPort // Block Manager 的端口
driverUIPort // Spark UI 的端口
|
上面的 Service Name 超过63个字符的话需要重新配置。
1
2
3
4
5
6
7
8
9
10
11
|
private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
preferredServiceName
} else {
// 超过63个字符,就是需要系统内部重置这个名字了
val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
s"$shorterServiceName as the driver service's name.")
shorterServiceName
}
|
MountSecretsFeatureStep
EnvSecretsFeatureStep
LocalDirsFeatureStep
1
2
|
resolvedLocalDirs // 本地目录
useLocalDirTmpFs // 如果 conf 配置为 true,则表示本地目录会用其他的存储系统,例如内存,具体请看 spark.kubernetes.local.dirs.tmpfs
|
MountVolumesFeatureStep
DriverCommandFeatureStep
这是关于 Driver 命令行的一些配置,具体看看注释是怎么解释的。
1
2
3
4
|
/**
* Creates the driver command for running the user app, and propagates needed configuration so
* executors can also find the app code.
*/
|
HadoopConfDriverFeatureStep
这是用于挂载 Hadoop 配置文件的 Step,例如访问 HDFS 的时候,需要 core-site.x ml,hdfs-site.xml 等等。
1
2
3
|
confDir // Hadoop 相关的环境变量 HADOOP_CONF_DIR
existingConfMap // spark.kubernetes.hadoop.configMapName 提交任务的 configMap 名字,这些可以提前生成,直接挂载
confFiles // 配置文件
|
然后具体看看 Hadoop 的配置文件是如何通过 configMap 挂载到 Driver Pod 上的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
override def configurePod(original: SparkPod): SparkPod = {
original.transform { case pod if hasHadoopConf =>
// 如果有环境变量,就从环境变量指定的路径获取
val confVolume = if (confDir.isDefined) {
val keyPaths = confFiles.map { file =>
new KeyToPathBuilder()
.withKey(file.getName())
.withPath(file.getName())
.build()
}
new VolumeBuilder()
.withName(HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(newConfigMapName)
.withItems(keyPaths.asJava)
.endConfigMap()
.build()
} else {
// 没有环境变量的话,就直接用存在的 configMap
new VolumeBuilder()
.withName(HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(existingConfMap.get)
.endConfigMap()
.build()
}
// 修改 Pod,通过 editSpec 方法
val podWithConf = new PodBuilder(pod.pod)
.editSpec()
.addNewVolumeLike(confVolume)
.endVolume()
.endSpec()
.build()
// Container Mount 需要的 Volume
val containerWithMount = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(HADOOP_CONF_VOLUME)
.withMountPath(HADOOP_CONF_DIR_PATH)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_CONF_DIR)
.withValue(HADOOP_CONF_DIR_PATH)
.endEnv()
.build()
SparkPod(podWithConf, containerWithMount)
}
}
|
KerberosConfDriverFeatureStep
这是关于 Kerberos 配置的 Step。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
/**
* Provide kerberos / service credentials to the Spark driver.
*
* There are three use cases, in order of precedence:
* Kerberos 的服务,有三种场景
*
* - keytab: if a kerberos keytab is defined, it is provided to the driver, and the driver will
* manage the kerberos login and the creation of delegation tokens.
* - existing tokens: if a secret containing delegation tokens is provided, it will be mounted
* on the driver pod, and the driver will handle distribution of those tokens to executors.
* - tgt only: if Hadoop security is enabled, the local TGT will be used to create delegation
* tokens which will be provided to the driver. The driver will handle distribution of the
* tokens to executors.
*/
|
1
2
3
4
5
6
7
8
|
principal // 指的是 KDC 中账号的 Principal
keytab // 指的是 Kerberos 生成的 Keytab
existingSecretName // 存在的 secret name
existingSecretItemKey // secret 中的 item key
krb5File // Kerberos 服务的配置文件
krb5CMap // krb5 的 configMap
hadoopConf // 多余?
delegationTokens // Hadoop 体系中的轻量级认证 DT
|
生成 token 的关键代码如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
private lazy val delegationTokens: Array[Byte] = {
// 如果 keytab 和 secret 都是空的,就去生成 DT
if (keytab.isEmpty && existingSecretName.isEmpty) {
val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf,
SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf), null)
val creds = UserGroupInformation.getCurrentUser().getCredentials()
tokenManager.obtainDelegationTokens(creds)
// If no tokens and no secrets are stored in the credentials, make sure nothing is returned,
// to avoid creating an unnecessary secret.
if (creds.numberOfTokens() > 0 || creds.numberOfSecretKeys() > 0) {
SparkHadoopUtil.get.serialize(creds)
} else {
null
}
} else {
null
}
}
|
PodTemplateConfigMapStep
可以指定 Executor 的 Pod 的模板 spark.kubernetes.executor.podTemplateFile
。所以这个 Step 主要就是来解析这个 Pod Template 的。
总结
可以看到 Driver 的构建是通过多个 feature 的配置来组装起来的,最终都会通过 K8S 的 Java 客户端来跟 ApiServer 交互来在 K8S 集群中生成 Driver。
警告
本文最后更新于 2019年10月9日,文中内容可能已过时,请谨慎参考。