目录

Airflow调度Spark任务

概述

在没有统一的任务调度工具的时候,一般大家都会使用 crontab 来调度定时的批任务。但是这样会有个问题,当大家在不同的机器上运行 crontab 任务的时候,一是不知道所有任务的全貌,二是任务分布在不同机器上,三是日志收集会比较麻烦。因此就有了 Azkaban, Oozie 这些调度平台,虽然笔者对以上调度平台的使用经验比较少,并且需要配置很多 Hadoop 生态的文件以及写 XML,可视化也一般,又缺少 CLI 的工具,所以不妨跳过这些传统的任务调度工具,直接使用 Airflow。关于 Airflow 可以直接参考官网,或者看我之前写的一篇简单介绍。

Spark on Airflow

我们知道 Airflow 天然支持 BashOperator,所以要用 Airflow 来调度 Spark 任务实在太简单了,只要配置好 spark-submit 的脚本,以及定义好任务间的依赖关系(DAG),那么用 Airflow 来调度启动依赖关系复杂的 Spark 批处理任务,就显得非常轻量级了,而且 Airflow 还提供了各种日志回显,任务状态的可视化。这里举一个最简单的例子。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ruzhliu',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 20),
    'email': ['runzhliu@163.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'bashtest', default_args=default_args, schedule_interval=timedelta(days=1))

t1 = BashOperator(
    task_id='bash',
    bash_command='bash /Users/runzhliu/airflow/bashtest.sh ',
    dag=dag)

以上可能是一个最简单的用 BashOperator 定义 Spark 定时任务的例子了。其实只要将 spark-submit 脚本封装在一个脚本文件里(同时可以用Git来进行任务代码的版本控制),这里需要注意一个坑,如果使用 bash /Users/runzhliu/airflow/bashtest.sh 类似这种格式的命令的时候,一定要在最后留一个空格,这是 jinja2 作为模板引擎导致的一个陷阱,当使用 bash 命令的时候,尾部必须加一个空格

/airflow%E8%B0%83%E5%BA%A6spark%E4%BB%BB%E5%8A%A1/img.png

总结

调度平台作为大数据平台的基础设施,大多数公司都会选择自研,原因很多也很复杂,最主要的就是需要考虑如果将大数据平台已有的一些组件组合起来,自研的话也更能开发出适合自身业务特点的调度系统和功能。但是 Airflow 作为一个开源的调度平台,也能够满足大多数中小型公司的业务需求了。

警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。