概述
之前文章介绍了 Flink session cluster on Kubernetes,需要注意,这种部署方式,可以在同一个 Cluster 上多次提交 Flink Job,而本文介绍的,是一种将任务和镜像绑定的部署方式,即 Flink 集群是不共享的,其组件是单独属于一个 Job。
Creating the job-specific image
可以参考之前另一篇文章。上一篇文章打出来的镜像是一个用了官方提供的 WordCount 例子,在部署 Flink job cluster 的重点在于把用户要运行的 Job 代码放入镜像,此处不赘述了。
Deploy Flink job cluster on Kubernetes
官方提供了 Service 和 Job 的模板用于在 K8S 集群上运行 Flink 任务。这里需要注意,模板里面有两个变量,需要用户自行填写。
${FLINK_IMAGE_NAME}
: Flink 镜像文件的名字
${FLINK_JOB_PARALLELISM}
: Flink Job 的并行度,可以理解成需要的 TaskManager 的个数(Pod 数量)
然后官网也提供了一个命令。
1
|
FLINK_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB_PARALLELISM=<PARALLELISM> envsubst < job-cluster-job.yaml.template | kubectl create -f -
|
当然,直接运行会报错的,除了填充变量以外,envsubst
也是个坑爹的工具,他其实就是用来替换文件中的占位变量的工具,Mac 用户可以通过 brew install gettext
,安装完部分用户会发现 envsubst
命令还是用不了,如果不行的用户可以尝试运行下面的命令,直接去找到 envsusbt
运行文件的路径。配置好之后,这个命令就可以运行了。
1
|
alias envsubst=/usr/local/Cellar/gettext/0.19.8.1/bin/envsubst
|
开始在 K8S 中部署 Flink 的组件。
首先部署 Service,基本上就是暴露各服务组件的端口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# 省略 License
apiVersion: v1
kind: Service
metadata:
name: flink-job-cluster
labels:
app: flink
component: job-cluster
spec:
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: query
port: 6125
nodePort: 30025
- name: ui
port: 8081
nodePort: 30081
type: NodePort
selector:
app: flink
component: job-cluster
|
然后部署 Job Cluster,通过以下 YAML 文件。
1
|
FLINK_IMAGE_NAME=flink-job:latest FLINK_JOB_PARALLELISM=1 envsubst < job-cluster-job.yaml.template | kubectl create -f -
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# 省略了 License
apiVersion: batch/v1
kind: Job
metadata:
name: flink-job-cluster
spec:
template:
metadata:
labels:
app: flink
component: job-cluster
spec:
restartPolicy: OnFailure
containers:
- name: flink-job-cluster
image: ${FLINK_IMAGE_NAME}
# 这是笔者加的,因为需要拉本地的 Docker 镜像
imagePullPolicy: IfNotPresent
# job-cluster 进程启动的命令
args: ["job-cluster", "-Djobmanager.rpc.address=flink-job-cluster",
"-Dparallelism.default=${FLINK_JOB_PARALLELISM}", "-Dblob.server.port=6124", "-Dqueryable-state.server.ports=6125"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 6125
name: query
- containerPort: 8081
name: ui
|
然后部署 TaskManager。
1
|
FLINK_IMAGE_NAME=flink-job:latest FLINK_JOB_PARALLELISM=1 envsubst < task-manager-deployment.yaml.template | kubectl create -f -
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
# 省略 License
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-task-manager
spec:
replicas: ${FLINK_JOB_PARALLELISM}
template:
metadata:
labels:
app: flink
component: task-manager
spec:
containers:
- name: flink-task-manager
image: ${FLINK_IMAGE_NAME}
# 先找本地的镜像
imagePullPolicy: IfNotPresent
# task-manager 启动命令
args: ["task-manager", "-Djobmanager.rpc.address=flink-job-cluster"]
|
上图可以看到 Job Cluster 作为 K8S Job 的资源类型,已经运行结束了。
总结
在 K8S 上运行 Flink,目前看来还没有 Spark on K8S 这么成熟,原因是 K8S 并没有作为原生的 Flink 资源调度器(Spark 已经开发了),当然了,我看社区也有人提 PR 了,但是社区似乎还没有十分迫切要把 Flink 和 K8S 结合起来,所以进度上,稍微比 Spark on K8S 要慢一些。
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。