0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

一个开源的ETL工具Airflow

lviY_AI_shequ 来源:未知 作者:李倩 2018-09-14 16:37 次阅读

最近在弄画像标签每天ETL的调度事情,这篇文章分享一下一个开源的ETL工具Airflow。

一、基础概念

Airflow是Airbnb内部发起并开源的一个ETL管理平台,使用Python编写实现的任务管理、调度、监控工作流平台。这是其官方文档地址:Apache Airflow (incubating) Documentation,关于airflow产品的使用,里面有详细的介绍。

Airflow的调度依赖于crontab命令,与crontab相比airflow可以直观的看到任务执行情况、任务之间的逻辑依赖关系、可以设定任务出错时邮件提醒、可以查看任务执行日志。

而crontab命令管理的方式存在以下几方面的弊端:

1、在多任务调度执行的情况下,难以理清任务之间的依赖关系;

2、不便于查看当前执行到哪一个任务;

3、任务执行失败时不便于查看执行日志,也即不方便定位报错的任务和错误原因;

4、不便于查看调度流下每个任务执行的起止消耗时间,这对于优化task作业是非常重要的;

5、不便于记录历史调度任务的执行情况,而这对于优化作业和错误排查是很重要的;

Airflow中有两个最基本的概念:DAG和task,下面主要介绍一下。

DAG是什么:

DAG是Directed Acyclic Graph的缩写,即有向无环图。是所有要执行任务脚本(即task)的集合,在这个DAG中定义了各个task的依赖关系、调度时间、失败重启机制等。通过DAGid来标识每个DAG任务

每个DAG是由1到多个task组成

task是什么:

task是具体执行的任务脚本,可以是一个命令行(BashOperator),也可以是python脚本等。

二、主要功能键介绍

1、DAG管理

在airflow的主页,可以看到当前所有的DAG列表(通俗点说就是所有的调度任务列表),中间“Task by State”那一列显示任务的执行状态。深绿色的表示已执行成功的task,浅绿色的表示当前正在执行的task。

右侧“Links”那一列可以链接查看当前DAG任务的依赖关系、执行时间、执行脚本等情况。

当点击具体某一个DAG任务时,就可以进去查看该DAG的调度依赖、执行时长、调度脚本等具体执行情况

2、调度依赖查看

通过“Graph View”选项可以查看当前调度任务的依赖关系,当调度作业较为复杂时,这种图形化方式展示的依赖关系可以帮助用户迅速理清。

在用户画像的调度管理中,每天需要执行cookieid和userid两个维度的画像脚本,因此可以设定并行执行任务,让cookieid和userid的脚本同时执行调度作业

3、执行状态

通过“Tree View”选项可以查看当前任务的执行状态,包括当前执行到哪一个task,还有哪些task未执行。哪些task执行成功,哪些task执行失败。

也可以查看历史上该DAG下面各task的执行情况。

4、各task执行时间

通过“Gantt”选项可以查看各task任务的执行起止时间的甘特图。

了解各task执行的时间可以有针对性地优化执行时间长的task对应脚本。

5、DAG调度脚本

通过“Code”选项,可以查看当前DAG调度的脚本。脚本里面定义了需要执行的task、执行顺序及依赖、调度时间、失败发送邮件或重调机制等方法

三、脚本实例

在开发过程中,task脚本是需要被调度的脚本,在Airflow中主要需要开发的是DAG脚本,即管理task任务的脚本。通过一个DAG脚本,将各个调度作业脚本串起来,按照业务逻辑去执行。

1、DAG脚本

下面通过一个具体DAG脚本实例来了解一下:

from airflow.operators.bash_operator import BashOperator import airflow from airflow.models import DAG from airflow import operators from airflow.contrib.hooks import SSHHook from airflow.models import BaseOperator from airflow.contrib.operators import SSHExecuteOperator from airflow.operators.latest_only_operator import LatestOnlyOperator import os import sys from datetime import timedelta,date,datetime import pendulum from airflow.utils.trigger_rule import TriggerRule default_args = { 'owner': 'superuserprofile', 'depends_on_past': False, 'start_date': datetime(2018, 06, 01), 'email': ['administer@testemail.com'], 'email_on_failure': True , 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=1), } os.environ['SPARK_HOME'] = '/usr/local/spark-2.1.1-bin-hadoop2.6' sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

该段脚本定义了需要引入的包,以及默认的DAG参数配置,包括task是否依赖上游任务,首次调度时间、任务失败接收邮箱、任务失败是否重新调起等

dag = DAG( 'superuserprofile', default_args=default_args, description='A userprofile test', schedule_interval='00 08 * * *' )

该段脚本实例化了DAG,设置了DAGid,调度执行时间

gender_task = BashOperator( task_id='gender', bash_command=' sudo -E -H -u userprofile spark-submit --master yarn --deploy-mode client --driver-memory 1g --executor-memory 8g --executor-cores 2 --num-executors 200 /airflow/userprofile_gender.py {{ ds_nodash }} ', dag=dag, trigger_rule=TriggerRule.ALL_DONE ) country_task = BashOperator( task_id='country', bash_command=' sudo -E -H -u userprofile spark-submit --master yarn --deploy-mode client --driver-memory 1g --executor-memory 4g --executor-cores 2 --num-executors 200 /airflow/userprofile_country.py {{ ds_nodash }} ', dag=dag, trigger_rule=TriggerRule.ALL_DONE )

该段脚本设置了两个需要执行的task任务(userprofile_gender.py和userprofile_country.py)的实例化。

task直接的调度依赖关系可以通过set_upstream、set_downstream命令或符号>> 、<<来建立。

gender_task .set_upstream(country_task) 命令指gender_task 任务将依赖country_task任务;反之同理

gender_task >> country_task 命令指country_task 任务将依赖gender_task 任务先执行完,反之同理

2、命令行执行

Airflow通过可视化界面的方式实现了调度管理的界面操作,但在测试脚本或界面操作失败的时候,可通过命令行的方式调起任务。下面介绍几个常用命令

命令1:airflow list_tasksuserprofile

该命令用于查看当前DAG任务下的所有task的列表

其中userprofile是DAGid,加粗的airflow list_tasks是关键字命令

-----------------------------------------------------------------------

命令2:airflow testuserprofile gender_task 20180601

该命令用于单独执行DAG下面的某个task

其中userprofile是DAGid,gender_task是要具体某个taskid,20180601是执行日期。加粗部分是关键字命令

-----------------------------------------------------------------------

命令3:airflow backfill -s2018-06-01-e2018-06-02 userprofile

该命令用于调起整个DAG脚本执行

其中2018-06-01是执行脚本的开始日期, 2018-06-02是结束日期,userprofile是DAGid,加粗部分是关键字命令。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 开源
    +关注

    关注

    3

    文章

    3235

    浏览量

    42354
  • python
    +关注

    关注

    55

    文章

    4777

    浏览量

    84401

原文标题:用户画像—Airflow作业调度(ETL)

文章出处:【微信号:AI_shequ,微信公众号:人工智能爱好者社区】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    【美洲认证系列】美国ETL认证介绍

    美国发明家爱迪生在1896年手创立的,在美国及世界范围内享有极高的声誉。同UL、CSA样,ETL可根据UL标准或美国国家标准测试核发ETL认证标志,也可同时按照UL标准或美国国家标
    发表于 08-07 15:40

    美国ETL认证简介

    美国ETL认证简介  ETL是美国电子测试实验室(ElectricalTestingLaboratories)的简称。ETL试验室是由美国发明家爱迪生在1896年手创立的,在美国及世
    发表于 04-25 18:33

    UL和ETL的区别是什么?

    ETL认证和UL认证具有同样的北美市场准入效力,但ETL认证的费用比UL认证低的多,般只有UL认证的半,而且,ETL认证的产品检测可以通
    发表于 08-10 21:41

    BI软件的ETL开源的好,还是自研的好?

    时,需要将问题反馈到社区。由于开源社区无人负责,需要被动等待对方响应并解决问题,这就又有不可预估的时间成本。自研ETL:BI软件里的自研ETL
    发表于 08-27 09:44

    路灯ETL认证

    目前取得ETL证书有两种方式,种通过CB测试报告转,种是直接申请,所需提交资料如下:1、申请表;2、CB测试证书复印件(直接申请不需要);3、CB测试报告复印件(直接申请不需要);4、样品(4-8
    发表于 06-10 22:02

    如何使用openssl制作开源C签名工具

    为了将签名与多维数据集程序员分离,我使用 openssl 制作了开源 C 签名工具。在各种构建环境中应该相当容易使用。
    发表于 12-06 07:52

    种金融系统专用ETL工具的研究与实现

    实现了种基于Spring框架的商业银行专用ETL程序。该程序利用Spring框架的控制反转技术,基于Ibatis的数据访问对象技术和Spring JDBC,以及Spring对Timer的支持,解决了ETL过程中的数据转
    发表于 04-13 09:04 24次下载

    基于元数据的ETL工具集成研究

    针对现有的ETL工具在大型信息集成系统中的应用缺陷,结合元数据技术,通过对ETL工具的集成,设计实现了基于元数据的ETL
    发表于 12-25 14:53 10次下载

    基于数据质量监管的ETL设计

    设计具有灵活性和可扩展性的ETL 架构,对ETL 架构进行灵活改进,把数据质量管理引入到ETL 架构中,尝试以此提高数据仓库的数据质量,
    发表于 01-15 16:30 9次下载

    ETL工具Kettle用户手册

    ETL工具Kettle用户手册
    发表于 04-05 11:02 0次下载

    用于数据分析的各类主流ETL 工具比较,哪种最适合你

    数据提取、转换与加载(ETL,Extract-Transform-Load)工具能够使组织内的不同数据更易于访问、更有意义、也更能被其他数据系统所使用。通常情况下,面对由此产生的自写代码、自建系统的相关成本和复杂性时,企业会直接去选择购置
    的头像 发表于 06-23 11:25 2.4w次阅读

    5顶级的人工智能开源工具

    DMTK 是分布式机器学习工具Distributed Machine Learning Toolkit的缩写,和 CNTK 样,是微软的开源人工智能工具
    的头像 发表于 06-01 10:20 6176次阅读
    5<b class='flag-5'>个</b>顶级的人工智能<b class='flag-5'>开源</b><b class='flag-5'>工具</b>

    款用于Windows的开源反rookit (ARK)工具

    OpenArk 是款用于 Windows 的开源反 rookit (ARK) 工具。Ark是Anti-Rootkit的缩写,它是款逆向/编程帮手,也是用户发现操作系统中隐藏恶意软件
    的头像 发表于 07-19 15:08 3462次阅读
    <b class='flag-5'>一</b>款用于Windows的<b class='flag-5'>开源</b>反rookit (ARK)<b class='flag-5'>工具</b>

    多数据源数据转换和同步的ETL工具推荐

    有许多支持多数据源数据转换和同步的ETL工具可供选择。以下是些常见的ETL工具和它们支持多数据源数据转换和同步的特点: Apache Ni
    的头像 发表于 07-28 16:32 1051次阅读

    上线 Airflow 官方!DolphinDB 带来数据管理新体验

    DolphinDB 正式登陆 Apache Airflow 官方,成为 Airflow 官方认可的第三方插件及工具供应商。Apache Airflow
    的头像 发表于 08-23 17:00 235次阅读
    上线 <b class='flag-5'>Airflow</b> 官方!DolphinDB 带来数据管理新体验