目录

Spark-Kubernetes源码分析系列-features

概述

features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kind 的 YAML 文件。

分析

看看 features 包里的代码。这里面都是 Spark 在 K8S 中构建各种资源的步骤。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
/path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features
├── BasicDriverFeatureStep.scala
├── BasicExecutorFeatureStep.scala
├── DriverCommandFeatureStep.scala
├── DriverKubernetesCredentialsFeatureStep.scala
├── DriverServiceFeatureStep.scala
├── EnvSecretsFeatureStep.scala
├── ExecutorKubernetesCredentialsFeatureStep.scala
├── HadoopConfDriverFeatureStep.scala
├── KerberosConfDriverFeatureStep.scala
├── KubernetesFeatureConfigStep.scala
├── LocalDirsFeatureStep.scala
├── MountSecretsFeatureStep.scala
├── MountVolumesFeatureStep.scala
└── PodTemplateConfigMapStep.scala

还记得 Spark Kubernetes 的源码分析系列 - submit 文章里提到的,在 KubernetesDriverBuilder 中,有一个 features 这个变量,这里需要 new 很多配置,也就是具体的用来配置 Pod 的一些步骤。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val features = Seq(
new BasicDriverFeatureStep(conf),
new DriverKubernetesCredentialsFeatureStep(conf),
new DriverServiceFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new DriverCommandFeatureStep(conf),
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf))

下面我们按照顺序来分析一下。

BasicDriverFeatureStep

类名就告诉我们,他是干嘛用的了,就是 Driver Feature 相对 Basic 的部分 feature,那么 Baisc 的 feature 包括什么呢?

1
2
3
4
5
6
7
8
9
driverPodName // Driver Pod 的名字
driverContainerImage // Driver Container 
driverCpuCores // Driver 需要的 Cpu Cores
driverCoresRequest // Driver 的 Request Cpu Cores(K8S相关)
driverLimitCores // Driver 的 Limit Cpu Cores(K8S相关)
driverMemoryMiB // Driver 的内存 MiB
overheadFactor // 这个稍后会讲到
memoryOverheadMiB // 这个稍后会讲到
driverMemoryWithOverheadMiB // 这个稍后会讲到

以上的参数,生成后,主要是用于配置 Pod 和 Container 的参数。这一块内容又长又臭,可以看看我写在里面的注释。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# 一堆的 Builder
val driverContainer = new ContainerBuilder(pod.container)
  # Container Name
  .withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))
  # Image Name
  .withImage(driverContainerImage)
  # Image 拉取的策略
  .withImagePullPolicy(conf.imagePullPolicy)
  # Driver 的端口
  .addNewPort()
    .withName(DRIVER_PORT_NAME)
    .withContainerPort(driverPort)
    .withProtocol("TCP")
    .endPort()
  .addNewPort()
    # Block Manager  Port 相关配置
    .withName(BLOCK_MANAGER_PORT_NAME)
    .withContainerPort(driverBlockManagerPort)
    .withProtocol("TCP")
    .endPort()
  .addNewPort()
     # Spark UI 的端口配置
    .withName(UI_PORT_NAME)
    .withContainerPort(driverUIPort)
    .withProtocol("TCP")
    .endPort()
  .addNewEnv()
    # 一些环境变量
    .withName(ENV_SPARK_USER)
    .withValue(Utils.getCurrentUserName())
    .endEnv()
  .addAllToEnv(driverCustomEnvs.asJava)
  .addNewEnv()
    .withName(ENV_DRIVER_BIND_ADDRESS)
    .withValueFrom(new EnvVarSourceBuilder()
      .withNewFieldRef("v1", "status.podIP")
      .build())
    .endEnv()
  .editOrNewResources()
     # cpu 相关配置
    .addToRequests("cpu", driverCpuQuantity)
    .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
    .addToRequests("memory", driverMemoryQuantity)
    .addToLimits("memory", driverMemoryQuantity)
    .addToLimits(driverResourceQuantities.asJava)
    .endResources()
    # 终于 build 
  .build()

val driverPod = new PodBuilder(pod.pod)
   # 如果 Pod 是存在的表示要么修改否则就是新增
  .editOrNewMetadata()
    # Pod 的名字
    .withName(driverPodName)
    # Pod  Label
    .addToLabels(conf.labels.asJava)
    .addToAnnotations(conf.annotations.asJava)
    .endMetadata()
  .editOrNewSpec()
    # Pod 的重启策略
    .withRestartPolicy("Never")
    # Pod  NodeSelector 特性
    .addToNodeSelector(conf.nodeSelector.asJava)
    # 拉取镜像的 Repository 密码ru
    .addToImagePullSecrets(conf.imagePullSecrets: _*)
    .endSpec()
  .build()

此外 getAdditionalPodSystemProperties() 还需要这个方法是拉取其他的配置,比如说 spark.app.id 等等,不赘述了。

DriverKubernetesCredentialsFeatureStep

这个 Step 是用于配置 Driver 的安全认证相关的配置,一般认为就是 K8S 那一套安全认证的机制了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
maybeMountedOAuthTokenFile // OAuthToken 文件
maybeMountedClientKeyFile // Client Key 文件
maybeMountedClientCertFile // Cient Cert 文件
maybeMountedCaCertFile // Ca Cert 文件
driverServiceAccount // Driver 的 Service Account
oauthTokenBase64 // OauthToken Base64 编码
caCertDataBase64 // CaCert 里面的数据 Base64 编码
clientKeyDataBase64 // Client Key 数据的 Base64 编码
clientCertDataBase64 // Client Cert 数据的 Base 64 编码
shouldMountSecret // 是否需要挂载 Secret
driverCredentialsSecretName // Driver 的认证 Secret 名

这里有很多关于访问 ApiServer 的安全认证的细节,如果不熟悉 K8S 的同学,需要补补课。下面是这个 Step 的关键方法,也就是把这些安全相关的文件通过 secret 保存下来。

1
2
3
4
5
6
7
8
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
  // 如果 conf 存在以上提及的一些认证文件,则会进行挂载 Secret
  if (shouldMountSecret) {
    Seq(createCredentialsSecret())
  } else {
    Seq.empty
  }
}

DriverServiceFeatureStep

这个就是配置 Driver Service 的 Step,因为 Pod 在 K8S 集群里,创建 Executor 需要不同的 Executor Pod 访问到 Driver Pod,才能注册上,也包括 Block Manager 以及 Spark UI 的端口和服务负载配置。

1
2
3
4
5
preferredServiceName // Service Name
resolvedServiceName // 上面的 Service Name 超过63个字符的话需要重新配置
driverPort // Driver 的端口
driverBlockManagerPort // Block Manager 的端口
driverUIPort // Spark UI 的端口

上面的 Service Name 超过63个字符的话需要重新配置。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
  preferredServiceName
} else {
  // 超过63个字符,就是需要系统内部重置这个名字了
  val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
  val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
  logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
    s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
    s"$shorterServiceName as the driver service's name.")
  shorterServiceName
}

MountSecretsFeatureStep

EnvSecretsFeatureStep

LocalDirsFeatureStep

1
2
resolvedLocalDirs // 本地目录
useLocalDirTmpFs // 如果 conf 配置为 true,则表示本地目录会用其他的存储系统,例如内存,具体请看 spark.kubernetes.local.dirs.tmpfs

MountVolumesFeatureStep

DriverCommandFeatureStep

这是关于 Driver 命令行的一些配置,具体看看注释是怎么解释的。

1
2
3
4
/**
 * Creates the driver command for running the user app, and propagates needed configuration so
 * executors can also find the app code.
 */

HadoopConfDriverFeatureStep

这是用于挂载 Hadoop 配置文件的 Step,例如访问 HDFS 的时候,需要 core-site.x ml,hdfs-site.xml 等等。

1
2
3
confDir // Hadoop 相关的环境变量 HADOOP_CONF_DIR
existingConfMap // spark.kubernetes.hadoop.configMapName 提交任务的 configMap 名字,这些可以提前生成,直接挂载
confFiles // 配置文件

然后具体看看 Hadoop 的配置文件是如何通过 configMap 挂载到 Driver Pod 上的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
  override def configurePod(original: SparkPod): SparkPod = {

    original.transform { case pod if hasHadoopConf =>

      // 如果有环境变量,就从环境变量指定的路径获取
      val confVolume = if (confDir.isDefined) {
        val keyPaths = confFiles.map { file =>
          new KeyToPathBuilder()
            .withKey(file.getName())
            .withPath(file.getName())
            .build()
        }
        new VolumeBuilder()
          .withName(HADOOP_CONF_VOLUME)
          .withNewConfigMap()
            .withName(newConfigMapName)
            .withItems(keyPaths.asJava)
            .endConfigMap()
          .build()
      } else {
        // 没有环境变量的话,就直接用存在的 configMap
        new VolumeBuilder()
          .withName(HADOOP_CONF_VOLUME)
          .withNewConfigMap()
            .withName(existingConfMap.get)
            .endConfigMap()
          .build()
      }

      // 修改 Pod,通过 editSpec 方法
      val podWithConf = new PodBuilder(pod.pod)
        .editSpec()
          .addNewVolumeLike(confVolume)
            .endVolume()
          .endSpec()
          .build()

      // Container Mount 需要的 Volume
      val containerWithMount = new ContainerBuilder(pod.container)
        .addNewVolumeMount()
          .withName(HADOOP_CONF_VOLUME)
          .withMountPath(HADOOP_CONF_DIR_PATH)
          .endVolumeMount()
        .addNewEnv()
          .withName(ENV_HADOOP_CONF_DIR)
          .withValue(HADOOP_CONF_DIR_PATH)
          .endEnv()
        .build()

      SparkPod(podWithConf, containerWithMount)
    }
  }

KerberosConfDriverFeatureStep

这是关于 Kerberos 配置的 Step。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
/**
 * Provide kerberos / service credentials to the Spark driver.
 *
 * There are three use cases, in order of precedence:
 * Kerberos 的服务,有三种场景
 *
 * - keytab: if a kerberos keytab is defined, it is provided to the driver, and the driver will
 *   manage the kerberos login and the creation of delegation tokens.
 * - existing tokens: if a secret containing delegation tokens is provided, it will be mounted
 *   on the driver pod, and the driver will handle distribution of those tokens to executors.
 * - tgt only: if Hadoop security is enabled, the local TGT will be used to create delegation
 *   tokens which will be provided to the driver. The driver will handle distribution of the
 *   tokens to executors.
 */
1
2
3
4
5
6
7
8
principal // 指的是 KDC 中账号的 Principal
keytab // 指的是 Kerberos 生成的 Keytab
existingSecretName  // 存在的 secret name
existingSecretItemKey // secret 中的 item key
krb5File // Kerberos 服务的配置文件
krb5CMap // krb5 的 configMap
hadoopConf // 多余?
delegationTokens // Hadoop 体系中的轻量级认证 DT

生成 token 的关键代码如下。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
private lazy val delegationTokens: Array[Byte] = {
  // 如果 keytab 和 secret 都是空的,就去生成 DT
  if (keytab.isEmpty && existingSecretName.isEmpty) {
    val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf,
      SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf), null)
    val creds = UserGroupInformation.getCurrentUser().getCredentials()
    tokenManager.obtainDelegationTokens(creds)
    // If no tokens and no secrets are stored in the credentials, make sure nothing is returned,
    // to avoid creating an unnecessary secret.
    if (creds.numberOfTokens() > 0 || creds.numberOfSecretKeys() > 0) {
      SparkHadoopUtil.get.serialize(creds)
    } else {
      null
    }
  } else {
    null
  }
}

PodTemplateConfigMapStep

可以指定 Executor 的 Pod 的模板 spark.kubernetes.executor.podTemplateFile。所以这个 Step 主要就是来解析这个 Pod Template 的。

总结

可以看到 Driver 的构建是通过多个 feature 的配置来组装起来的,最终都会通过 K8S 的 Java 客户端来跟 ApiServer 交互来在 K8S 集群中生成 Driver。

警告
本文最后更新于 2019年10月9日,文中内容可能已过时,请谨慎参考。