目录

Spark-Kubernetes源码分析系列-submit

概述

Kubernetes 是作为新的 resourceManager 集成到 Spark 中的,集成的思路跟将 YARN 集成是类似的,Spark 本身提供 Standalone 这种资源管理的模式,当然是不够的。

而集成 Kubernetes 的方式,其实是很好理解的,也就是在 Spark 中起一个 Http 的客户端从而和 Kubernetes 的 ApiSever 进行通信,从而把与 Application 相关的一些配置,例如如何创建 Driver 和 Executor 的 Pod,当然也包括对 Pod 的 Watch 相关。

源码分析

Spark Kubernetes 的模块的代码其实并不多,建议大家到以下目录下利用 tree 简单看一下。

1
2
# 路径
path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark
1
2
3
4
5
6
7
8
9
# tree -d -L 3
.
├── deploy
│   └── k8s
│       ├── features // 包括 Driver/Executor, configMap, secret 等配置的步骤
│       └── submit // 跟 submit 有关
└── scheduler
    └── cluster
        └── k8s // 跟 executor pod 的调度,状态等有关

代码结构还是很清晰的,一部分是与 deploy 有关,一部分是跟 scheduler 有关。

本文重点解析以下 submit 相关的代码。

1
2
3
4
5
6
/path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit
├── K8sSubmitOps.scala // spark submit 相关
├── KubernetesClientApplication.scala // spark submit 的封装
├── KubernetesDriverBuilder.scala // Driver builder
├── LoggingPodStatusWatcher.scala // Spark Pod 的状态 Watcher
└── MainAppResource.scala // 包含 Java/Python/R 的一些资源定义

然后看一下 Spark K8S 模式的入口类。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private[spark] class KubernetesClientApplication extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
    val parsedArguments = ClientArguments.fromCommandLineArgs(args)
    run(parsedArguments, conf)
  }

  private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
    val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
    val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
    val kubernetesConf = KubernetesConf.createDriverConf(
      sparkConf,
      kubernetesAppId,
      clientArguments.mainAppResource,
      clientArguments.mainClass,
      clientArguments.driverArgs)
    val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
    val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None

    val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)

    Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
      master,
      Some(kubernetesConf.namespace),
      KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
      SparkKubernetesClientFactory.ClientType.Submission,
      sparkConf, None, None)) { kubernetesClient =>
        val client = new Client(
          kubernetesConf,
          new KubernetesDriverBuilder(),
          kubernetesClient,
          waitForAppCompletion,
          watcher)
        client.run()
    }
  }
}

这一段是 Spark K8S 的入口 Main Class,重点关注 run() 方法。首先生成一个 kubernetesAppId,为什么不是 spark app name,原因是这个关于 App 的标识,会以 Label 的方式,标注在关于这个 App 的所有资源上,包括 Driver/Executor Pod,ConfigMap,Secret 等。

可以留意一下 WAIT_FOR_APP_COMPLETION 这个配置,默认值为 true。表示当选择 cluster mode 的时候,laucher 进程是否会等待 App 结束后才会退出,如果改为 false,则 laucher 进程会马上结束。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
private[spark] class Client(
    conf: KubernetesDriverConf, builder: KubernetesDriverBuilder,
    kubernetesClient: KubernetesClient, waitForAppCompletion: Boolean,
    watcher: LoggingPodStatusWatcher) extends Logging {
  def run(): Unit = {
    // driver Pod 的配置是这里来的
    val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)

    // configmap 从这里来的
    val configMapName = s"${conf.resourceNamePrefix}-driver-conf-map"
    val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties)

    // Driver 容器的配置
    val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container).addNewEnv()
        .withName(ENV_SPARK_CONF_DIR)
        .withValue(SPARK_CONF_DIR_INTERNAL)
        .endEnv()
      .addNewVolumeMount()
        .withName(SPARK_CONF_VOLUME)
        .withMountPath(SPARK_CONF_DIR_INTERNAL)
        .endVolumeMount()
      .build()

    // Driver Pod 的配置
    val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod)
      .editSpec()
        .addToContainers(resolvedDriverContainer)
        .addNewVolume()
          .withName(SPARK_CONF_VOLUME)
          .withNewConfigMap()
            .withName(configMapName)
            .endConfigMap()
          .endVolume()
        .endSpec()
      .build()

    Utils.tryWithResource(
      kubernetesClient
        .pods()
        .withName(resolvedDriverPod.getMetadata.getName)
        .watch(watcher)) { _ =>
      val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)

      try {
        // including configMap
        val otherKubernetesResources =
          resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
        addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
        kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
      } catch {
        case NonFatal(e) =>
          kubernetesClient.pods().delete(createdDriverPod)
          throw e
      }

      val sId = s"${Option(conf.namespace).map(_ + ":").getOrElse("")}" +
        s"${resolvedDriverPod.getMetadata.getName}"
      if (waitForAppCompletion) {
        logInfo(s"Waiting for application ${conf.appName} with submission ID ${sId} to finish...")
        // watcher
        watcher.awaitCompletion()
        logInfo(s"Application ${conf.appName} with submission ID ${sId} finished.")
      } else {
        logInfo(s"Deployed Spark application ${conf.appName} with " +
          s"submission ID ${sId} into Kubernetes.")
      }
    }
  }

  // K8S 的特性,其他的包括 configmap secret executor-pod 这些,如果 driver 挂了,其他都要删除干净
  private def addDriverOwnerReference(driverPod: Pod, resources: Seq[HasMetadata]): Unit = {
    val driverPodOwnerReference = new OwnerReferenceBuilder()
      .withName(driverPod.getMetadata.getName)
      .withApiVersion(driverPod.getApiVersion)
      .withUid(driverPod.getMetadata.getUid)
      .withKind(driverPod.getKind)
      .withController(true)
      .build()
    // 给每个 resource 都给个名字 
    resources.foreach { resource =>
      val originalMetadata = resource.getMetadata
      originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
    }
  }

  // 创建 ConfigMap,主要是得到像 Hadoop/Spark Conf 之类的配置信息
  private def buildConfigMap(configMapName: String, conf: Map[String, String]): ConfigMap = {
    // Java 的 Prop?
    val properties = new Properties()

    conf.foreach { case (k, v) =>
      properties.setProperty(k, v)
    }
    val propertiesWriter = new StringWriter()
    properties.store(propertiesWriter,
      s"Java properties built from Kubernetes config map with name: $configMapName")
    new ConfigMapBuilder()
      .withNewMetadata()
        .withName(configMapName)
        .endMetadata()
      .addToData(SPARK_CONF_FILE_NAME, propertiesWriter.toString)
      .build()
  }
}

然后看看 Driver Pod 都需要配置些什么东西。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private[spark] class KubernetesDriverBuilder {

  def buildFromFeatures(
      conf: KubernetesDriverConf,
      client: KubernetesClient): KubernetesDriverSpec = {

    val initialPod = conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
      .map { file =>
        // Spark 3.0 开始可以支持传 Pod 的 Template 文件,而且 Template 最后会覆盖之前的配置,Priority 最高
        KubernetesUtils.loadPodFromTemplate(
          client,
          new File(file),
          conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
      }
      .getOrElse(SparkPod.initialPod())

    // 重点关注,这里是配置 Pod 的步骤清单
    val features: Seq[KubernetesFeatureConfigStep] = Seq(
      new BasicDriverFeatureStep(conf),
      new DriverKubernetesCredentialsFeatureStep(conf),
      new DriverServiceFeatureStep(conf),
      new MountSecretsFeatureStep(conf),
      new EnvSecretsFeatureStep(conf),
      new LocalDirsFeatureStep(conf),
      new MountVolumesFeatureStep(conf),
      new DriverCommandFeatureStep(conf),
      new HadoopConfDriverFeatureStep(conf),
      new KerberosConfDriverFeatureStep(conf),
      new PodTemplateConfigMapStep(conf))

    val spec = KubernetesDriverSpec(
      initialPod,
      driverKubernetesResources = Seq.empty,
      conf.sparkConf.getAll.toMap)

    features.foldLeft(spec) { case (spec, feature) =>
      val configuredPod = feature.configurePod(spec.pod)
      val addedSystemProperties = feature.getAdditionalPodSystemProperties()
      val addedResources = feature.getAdditionalKubernetesResources()

      KubernetesDriverSpec(
        configuredPod,
        spec.driverKubernetesResources ++ addedResources,
        spec.systemProperties ++ addedSystemProperties)
    }
  }
}

然后看一下 Pod 状态的监听器。原理是创建一个 scheduler 后台线程池,按照配置的时间间隔,去监听 Pod 的状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private[k8s] class LoggingPodStatusWatcherImpl(
  appId: String,
  maybeLoggingInterval: Option[Long]) extends LoggingPodStatusWatcher with Logging {

  private val podCompletedFuture = new CountDownLatch(1)
  // start timer for periodic logging
  private val scheduler =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
  private val logRunnable: Runnable = () => logShortStatus()

  private var pod = Option.empty[Pod]

  private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")

  def start(): Unit = {
    maybeLoggingInterval.foreach { interval =>
      scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
    }
  }

  // 当异步接受到事件的时候,判断 Pod 是已经 Succeeded 还是 Failed
  override def eventReceived(action: Action, pod: Pod): Unit = {
    this.pod = Option(pod)
    action match {
      case Action.DELETED | Action.ERROR =>
        closeWatch()

      case _ =>
        logLongStatus()
        if (hasCompleted()) {
          closeWatch()
        }
    }
  }

最后,看看一些新的特性,比如说可以用 spark submit 来 kill 掉整个 APP。

1
spark-submit --kill runzhliu:spark-hdfs-* --master k8s://https://kubernetes.default.svc --conf spark.kubernetes.namespace=runzhliu
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
  with CommandLineLoggingUtils {

  private def isGlob(name: String): Boolean = {
    name.last == '*'
  }

  def execute(submissionId: String, sparkConf: SparkConf, op: K8sSubmitOp): Unit = {
    val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
    submissionId.split(":", 2) match {
      case Array(part1, part2@_*) =>
        val namespace = if (part2.isEmpty) None else Some(part1)
        val pName = if (part2.isEmpty) part1 else part2.headOption.get
        Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
          master,
          namespace,
          KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
          SparkKubernetesClientFactory.ClientType.Submission,
          sparkConf,
          None,
          None)
        ) { kubernetesClient =>
          implicit val client: KubernetesClient = kubernetesClient
          if (isGlob(pName)) {
            val ops = namespace match {
              case Some(ns) =>
                kubernetesClient
                  .pods
                  .inNamespace(ns)
              case None =>
                kubernetesClient
                  .pods
            }
            val pods = ops
              .list()
              .getItems
              .asScala
              .filter { pod =>
                val meta = pod.getMetadata
                meta.getName.startsWith(pName.stripSuffix("*")) &&
                  meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE
              }.toList
            op.executeOnGlob(pods, namespace, sparkConf)
          } else {
            op.executeOnPod(pName, namespace, sparkConf)
          }
        }
      case _ =>
        printErrorAndExit(s"Submission ID: {$submissionId} is invalid.")
    }
  }

  // 这是可以 kill 掉 Spark App 的方法
  override def kill(submissionId: String, conf: SparkConf): Unit = {
    printMessage(s"Submitting a request to kill submission " +
      s"${submissionId} in ${conf.get("spark.master")}. " +
      s"Grace period in secs: ${getGracePeriod(conf).getOrElse("not set.")}")
    execute(submissionId, conf, new KillApplication)
  }

  // 这是可以看 App 状态的
  override def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit = {
    printMessage(s"Submitting a request for the status of submission" +
      s" ${submissionId} in ${conf.get("spark.master")}.")
    execute(submissionId, conf, new ListStatus)
  }

  override def supports(master: String): Boolean = {
    master.startsWith("k8s://")
  }
}

总结

至此,关于 Spark 以 K8S 作为 ResourceManager 的时候,Spark submit 的流程的过程中,是如何创建 Driver 和 Executor,还有各种 Secret 和 ConfigMap 这些资源的过程就分析到这里。

警告
本文最后更新于 2019年10月9日,文中内容可能已过时,请谨慎参考。