最近在用Airflow搭建我们的任务调度系统,我们的主要场景是在AWS上用airflow触发SageMaker的训练调度和Endpoint部署,以及触发ECS上的Task Defination运行某些容器,AWS提供了现成的由他们托管的Airflow集群,就不需要自己部署了,还算比较方便。如果你也有类似的任务调度需求也可以试试airflow。当然目前也有一些基于Kubernetes云原生的调度平台,也可以一起调研下,比如Argo, Tekton等。
小规模使用,直接参考官方文档Running Airflow in Docker即可。如果有大规模使用的需要,可以把里面的各个组件分别部署,搭建可用性更高的集群。
airflow的任务使用python脚本进行定义,扔到airflow的dags目录下,它会自己加载,随后你可以在它的web界面上控制DAG的启动停止,以及查看运行状态和日志。文档:Python SDK。
如果不喜欢Web界面,也可以用CLI工具。
只提一些常用的概念。
有向无环图,在airflow里面,相当于调度的“流水线(pipeline)”的概念,你可以配置它的调度周期,如每隔多久跑一次或者某个时间定时运行一次,它由多个Task组成。通常我们每个任务脚本里面就定义一个DAG。
如果不想在启动的时候自动拉起一大堆任务,可以在实例化DAG的时候传入catchup=False
,避免自己的集群爆掉
就是在某个DAG里面单个的步骤。通过对Operator传入参数进行实例化,就可以定义一个步骤。你可以通过一些方法或者运算符定义Task之间的依赖关系。文档:Setting up Dependencies
你可以通过对Operator实例化,创建一个Task。它的职责是依据你传入的参数,操作某些对象,例如PythonOperator是把你传入的python函数跑一下,BashOperator是把你传入的命令在bash里面跑一下,除了简单的执行一些本地脚本外,airflow还提供了一大批第三方系统的operator,用于对他们进行控制。文档:Providers packages
这个概念更类似于“客户端”,某些Operator需要操作外部系统时,可能需要使用它们的SDK创建一个client实例,而Hook就是相当于包装了这个client,给operator使用,通常情况下你不需要研究Hook和Operator内部的逻辑,除非现有的operator逻辑无法满足你的需要,你可以考虑继承Operator和Hook来实现你自己的逻辑。
airflow对于第三方系统的连接抽象出了一个统一的配置定义,可以在Web界面上Admin>Connections里面进行管理,简单来说,比如你想操作AWS的S3,可能需要填一些如region, access_key, secret之类的玩意,新建一个connection配置填进去,并在你的dag脚本里把这个配置的id填进去,这样它的hook在建立连接时就会使用这个配置了。我在用这个的时候发现有一个坑,在使用aws的connection的时候,如果想改变默认的region,需要新建一个connection配置,修改aws_default是无效的。
task与task之间需要用XCom来进行参数传递。你可以认为它是一个key-value的系统,各个task在运行过程中可以写入或者读取里面的数据。在Web界面Admin>XComs里面可以看得到里面已有的数据。
通常在operator的config参数,会自动进行Jinja模板渲染,其中的上下文里面就有其它task扔进XCom里面的数据。如
{{ task_instance.xcom_pull(task_ids='foo', key='return_value') }}
或者在使用PythonOperator执行python函数时,可以拿到task_instance,从而读写xcom
def do_something(**kwargs):
ti = kwargs['ti']
value = ti.xcom_pull(task_ids='upstream_task', key='return_value')
ti.xcom_push(key='hello',value="world")
在DAG的default_args
参数中可以设置on_success_callback
,on_failure_callback
,它的值是一个函数,在里面你同样可以从传入的参数获取到上下文信息,用这个玩意可以把airflow接入到你的各种IM系统的bot来发送通知,比如企业微信钉钉机器人之类的。当然通知也可以用airflow自带的邮件功能。
def failure_callback(context):
ti = context['task_instance']
execution_date = context['execution_date']
print("task fail:", ti.dag_id, ti.task_id, execution_date)