目录

Spark-Operator是如何提交Spark作业

概述

本文将 Spark 作业称为 Spark Application 或者简称为 Spark App 或者 App。目前 TenC 弹性计算平台的 Spark 作业,是通过 Spark Operator 提交给 Kubernetes 集群的,这与 Spark 原生的直接通过 spark-submit 提交 Spark App 的方式不同,所以理解 Spark Operator 中提交 Spark App 的逻辑,对于用户来说是非常有必要的。本文将就其具体的提交逻辑,介绍一下。

Spark Operator 中的 spark-submit 命令

熟悉 Spark 的同学未必对 Kubernetes 和 Operator 熟悉,所以看 Spark Operator 的逻辑的时候有可能会遇到一些问题,我的建议是先从提交 spark-submit 命令相关的逻辑开始看就会很容易理解。Spark Operator 的提交作业的逻辑主要在 pkg/controller/sparkapplication/submission.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func runSparkSubmit(submission *submission) (bool, error) {
	sparkHome, present := os.LookupEnv(sparkHomeEnvVar)
	if !present {
		glog.Error("SPARK_HOME is not specified")
	}
	// 这个就是 Spark 用户熟悉的 spark-submit 命令
	var command = filepath.Join(sparkHome, "/bin/spark-submit")

	cmd := execCommand(command, submission.args...)
	glog.V(2).Infof("spark-submit arguments: %v", cmd.Args)
	output, err := cmd.Output()
	glog.V(3).Infof("spark-submit output: %s", string(output))
	if err != nil {
		var errorMsg string
		if exitErr, ok := err.(*exec.ExitError); ok {
			errorMsg = string(exitErr.Stderr)
		}
		// The driver pod of the application already exists.
		if strings.Contains(errorMsg, podAlreadyExistsErrorCode) {
			glog.Warningf("trying to resubmit an already submitted SparkApplication %s/%s", submission.namespace, submission.name)
			return false, nil
		}
		if errorMsg != "" {
			return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %s", submission.namespace, submission.name, errorMsg)
		}
		return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", submission.namespace, submission.name, err)
	}

	return true, nil
}

controller 里有个 submitSparkApplication() 这个方法是用来提交 Spark Application 的。NewState 的情况就是 Controller 发现有处于这个状态下的 Spark Application ,然后就会调用这个方法。

1
2
3
4
5
6
7
8
case v1beta2.NewState:
	c.recordSparkApplicationEvent(appToUpdate)
	if err := c.validateSparkApplication(appToUpdate); err != nil {
		appToUpdate.Status.AppState.State = v1beta2.FailedState
		appToUpdate.Status.AppState.ErrorMessage = err.Error()
	} else {
		appToUpdate = c.submitSparkApplication(appToUpdate)
	}

因为将代码放在 markdown 里做注释不是特别的明显,所以这里截个图可以看看。之前的文章有提到过,在 Spark Operator 里提交 Spark 任务,spark-submit 的过程是很难 Debug 的,原因就在于下面的截图代码里,这里的 output 是执行 spark-submit 之后的输出,而这个输出是在 Spark Operator 的 Pod 里执行的,但是这部分的日志由于只能输出一次,所以用户不能像原生的 spark-submit 的方式,可以看到提交任务的日志,所以一旦是 spark-submit 过程中的问题,在 Spark Operator 中就难以体现了。

/spark-operator%E6%98%AF%E5%A6%82%E4%BD%95%E6%8F%90%E4%BA%A4spark%E4%BD%9C%E4%B8%9A/image_1dmtltc5kn431mcs12781hfr7vi9.png

下面是 Spark Operator 日志里,这个 output 输出的内容,这里的输出是曾经在通过 spark-submit 提交过 Spark 任务在 Kubernetes 的用户熟悉的提交日志,不过可以看到光凭一次 output 的内容,是无法理解提交任务哪里出了问题的。

/spark-operator%E6%98%AF%E5%A6%82%E4%BD%95%E6%8F%90%E4%BA%A4spark%E4%BD%9C%E4%B8%9A/image_1dmtml9i0v361k3fkio1scl1q8qm.png

Spark Operator 文档中说明了,默认是以 Spark 最新的 Release 版本作为 base 镜像的,所以如果需要修改 Spark 源码,那就必须在编译 Spark Operator 的镜像的是,同时将 SPARK_ARGS 修改成用户最新更改的 Spark 源码。这里必须注意到,一般上来说,base 镜像只会影响 spark-submit 的过程,如果用户修改的代码逻辑不影响 spark-submit,那么就没有必要重新编译 Spark Operator 的镜像,因为 Driver 是通过 spark-submit 传递的参数 spark.kubernetes.container.image 或者 spark.kubernetes.driver.container.image 的镜像里的 jar 包依赖影响,而 Executor 的依赖同样是来源于 spark-submit 传递的参数 spark.kubernetes.container.image 或者 spark.kubernetes.executor.container.image 里的 jars 影响,因此用户一定要注意这样的依赖关系,通过下面的图,可以更清晰的理解其中的逻辑。

/spark-operator%E6%98%AF%E5%A6%82%E4%BD%95%E6%8F%90%E4%BA%A4spark%E4%BD%9C%E4%B8%9A/image_1dmvf1jlq1g3n138svnqm5bh6n9.png

总结

本文主要介绍了 Spark Operator 中提交 Spark 作业的代码逻辑,也介绍了在 Spark Operator 中检查提交作业逻辑的问题,由于 Operator 依赖于 Spark 镜像,默认情况下,TenC 上的 Spark Operator 使用的是计算资源组定制过的 Spark 镜像,因此,如果用户对作业提交有其他定制化的需求,可以联系计算资源组的同学进行讨论!

警告
本文最后更新于 2019年10月9日,文中内容可能已过时,请谨慎参考。