0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

基于非常简单的Python代码就能完成流水线开发

Linux爱好者 来源:Python实用宝典 作者:Ckend 2021-11-16 18:20 次阅读

Mara-pipelines 是一个轻量级的数据转换框架,具有透明和低复杂性的特点。其他特点如下:

基于非常简单的Python代码就能完成流水线开发。

使用 PostgreSQL 作为数据处理引擎。

有Web界面可视化分析流水线执行过程。

基于 Python 的 multiprocessing 单机流水线执行。不需要分布式任务队列。轻松调试和输出日志。

基于成本的优先队列:首先运行具有较高成本(基于记录的运行时间)的节点。

此外,在Mara-pipelines的Web界面中,你不仅可以查看和管理流水线及其任务节点,你还可以直接触发这些流水线和节点,非常好用:

1.安装

由于使用了大量的依赖,Mara-pipelines 并不适用于 Windows,如果你需要在 Windows 上使用 Mara-pipelines,请使用 Docker 或者 Windows 下的 linux 子系统。

使用pip安装Mara-pipelines:

pip install mara-pipelines

或者:

pip install git+https://github.com/mara/mara-pipelines.git

2.使用示例

这是一个基础的流水线演示,由三个相互依赖的节点组成,包括 任务1(ping_localhost), 子流水线(sub_pipeline), 任务2(sleep):

# 注意,这个示例中使用了部分国外的网站,如果无法访问,请变更为国内网站。
frommara_pipelines.commands.bash importRunBash
frommara_pipelines.pipelines importPipeline, Task
frommara_pipelines.ui.cli importrun_pipeline, run_interactively

pipeline = Pipeline(
id='demo',
description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')

pipeline.add(Task(id='ping_localhost', description='Pings localhost',
commands=[RunBash('ping -c 3 localhost')]))

sub_pipeline = Pipeline(id='sub_pipeline', description='Pings a number of hosts')

forhost in['google', 'amazon', 'facebook']:
sub_pipeline.add(Task(id=f'ping_{host}', description=f'Pings {host}',
commands=[RunBash(f'ping -c 3 {host}.com')]))

sub_pipeline.add_dependency('ping_amazon', 'ping_facebook')
sub_pipeline.add(Task(id='ping_foo', description='Pings foo',
commands=[RunBash('ping foo')]), ['ping_amazon'])

pipeline.add(sub_pipeline, ['ping_localhost'])

pipeline.add(Task(id='sleep', description='Sleeps for 2 seconds',
commands=[RunBash('sleep 2')]), ['sub_pipeline'])

可以看到,Task包含了多个commands,这些 command s会用于真正地执行动作。

而 pipeline.add 的参数中,第一个参数是其节点,第二个参数是此节点的上游。如:

pipeline.add(sub_pipeline, ['ping_localhost'])

则表明必须执行完 ping_localhost 才会执行 sub_pipeline.

为了运行这个流水线,需要配置一个 PostgreSQL 数据库来存储运行时信息、运行输出和增量处理状态:

importmara_db.auto_migration
importmara_db.config
importmara_db.dbs

mara_db.config.databases 
= lambda: {'mara': mara_db.dbs.PostgreSQLDB(host='localhost', user='root', database='example_etl_mara')}

mara_db.auto_migration.auto_discover_models_and_migrate()

如果 PostgresSQL 正在运行并且账号密码正确,输出如下所示(创建了一个包含多个表的数据库):

Created database "postgresql+psycopg2://root@localhost/example_etl_mara"

CREATETABLEdata_integration_file_dependency (
node_path TEXT[] NOTNULL, 
dependency_type VARCHARNOTNULL, 
hashVARCHAR, 
timestampTIMESTAMPWITHOUTTIMEZONE, 
PRIMARY KEY(node_path, dependency_type)
);

.. more tables

为了运行这个流水线,你需要:

frommara_pipelines.ui.cli importrun_pipeline

run_pipeline(pipeline)

这将运行单个流水线节点及其 (sub_pipeline) 所依赖的所有节点:

run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstreams=True)

3.Web 界面

我认为 mara-pipelines 最有用的是他们提供了基于Flask管控流水线的Web界面。

对于每条流水线,他们都有一个页面显示:

所有子节点的图以及它们之间的依赖关系

流水线的总体运行时间图表以及过去 30 天内最昂贵的节点(可配置)

所有流水线节点及其平均运行时间和由此产生的排队优先级的表

流水线最后一次运行的输出和时间线

对于每个任务,都有一个页面显示

流水线中任务的上游和下游

最近 30 天内任务的运行时间

任务的所有命令

任务最后运行的输出

此外,流水线和任务可以直接从网页端调用运行,这是非常棒的特点。

责任编辑:haq

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 数据
    +关注

    关注

    8

    文章

    6790

    浏览量

    88719
  • python
    +关注

    关注

    54

    文章

    4763

    浏览量

    84349

原文标题:超级方便的轻量级 Python 流水线工具,还有漂亮的可视化界面!

文章出处:【微信号:LinuxHub,微信公众号:Linux爱好者】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    行云流水线 满足你对工作流编排的一切幻想~skr

    流水线模型 众所周知,DevOps流水线(DevOps pipeline)的本质是实现自动化工作流程,用于支持软件开发、测试和部署的连续集成、交付和部署(CI/CD)实践。它是DevOps方法论
    的头像 发表于 08-05 13:42 217次阅读

    ADS900高速流水线模数转换器(ADC)数据表

    电子发烧友网站提供《ADS900高速流水线模数转换器(ADC)数据表.pdf》资料免费下载
    发表于 07-30 14:11 0次下载
    ADS900高速<b class='flag-5'>流水线</b>模数转换器(ADC)数据表

    ADS930高速流水线模数转换器(ADC)数据表

    电子发烧友网站提供《ADS930高速流水线模数转换器(ADC)数据表.pdf》资料免费下载
    发表于 07-30 14:10 0次下载
    ADS930高速<b class='flag-5'>流水线</b>模数转换器(ADC)数据表

    ADS901高速流水线模数转换器数据表

    电子发烧友网站提供《ADS901高速流水线模数转换器数据表.pdf》资料免费下载
    发表于 07-30 11:43 0次下载
    ADS901高速<b class='flag-5'>流水线</b>模数转换器数据表

    ADS5421流水线式模数转换器(ADC)数据表

    电子发烧友网站提供《ADS5421流水线式模数转换器(ADC)数据表.pdf》资料免费下载
    发表于 07-30 11:16 0次下载
    ADS5421<b class='flag-5'>流水线</b>式模数转换器(ADC)数据表

    ADS5413 CMOS流水线模数转换器(ADC)数据表

    电子发烧友网站提供《ADS5413 CMOS流水线模数转换器(ADC)数据表.pdf》资料免费下载
    发表于 07-29 13:21 0次下载
    ADS5413 CMOS<b class='flag-5'>流水线</b>模数转换器(ADC)数据表

    ADS5237流水线式模数转换器(ADC)数据表

    电子发烧友网站提供《ADS5237流水线式模数转换器(ADC)数据表.pdf》资料免费下载
    发表于 07-29 11:46 0次下载
    ADS5237<b class='flag-5'>流水线</b>式模数转换器(ADC)数据表

    ADS828流水线式CMOS模数转换器数据表

    电子发烧友网站提供《ADS828流水线式CMOS模数转换器数据表.pdf》资料免费下载
    发表于 07-23 09:17 0次下载
    ADS828<b class='flag-5'>流水线</b>式CMOS模数转换器数据表

    ADS805流水线模数转换器ADC数据表

    电子发烧友网站提供《ADS805流水线模数转换器ADC数据表.pdf》资料免费下载
    发表于 07-16 11:28 0次下载
    ADS805<b class='flag-5'>流水线</b>模数转换器ADC数据表

    固定式的扫码器在SMT流水线中的使用

    新大陆固定式扫码器作为一种高效的条码扫描设备,广泛应用于各个行业中,尤其是在SMT(表面贴装技术)流水线中有重要的作用。以下是新大陆固定式扫码器在SMT流水线中的具体使用情况。提高生产效率和质量
    的头像 发表于 07-03 10:18 414次阅读
    固定式的扫码器在SMT<b class='flag-5'>流水线</b>中的使用

    新大陆工业读码器Soldier100,用于流水线上数据码采集

    非常适合在制造业中进行数据码的采集。高速流动条码的识别能力:在制造业的流水线上,尤其是在大企业和大产品生产的过程中,数据收集非常重要。传统的手工收集方法不仅效率低下
    的头像 发表于 06-26 14:12 259次阅读
    新大陆工业读码器Soldier100,用于<b class='flag-5'>流水线</b>上数据码采集

    RISC-V架构的多级流水线处理

    有的单核RISC-V MCU支持四级流水线,有的只支持三级流水线,是不是级数越多,带来的开销越大,功耗也越高呢?
    发表于 05-20 16:01

    具有3态输出的多级流水线寄存器数据表

    电子发烧友网站提供《具有3态输出的多级流水线寄存器数据表.pdf》资料免费下载
    发表于 05-16 09:39 0次下载
    具有3态输出的多级<b class='flag-5'>流水线</b>寄存器数据表

    固定式安装工业读码器,助力提高流水线人工上料效率

    上的条码信息,无需手动操作即可完成数据采集。这种自动化的条码扫描技术大大缩短了人工上料的时间,提高了生产效率。而工业读码器的稳定性和准确度保证了流水线上料过程中的顺畅
    的头像 发表于 02-27 14:43 267次阅读
    固定式安装工业读码器,助力提高<b class='flag-5'>流水线</b>人工上料效率

    牵引机和挖掘机装配流水线自动互锁防呆系统无线通讯应用

    在挖掘机装配工序中,液压系统检测、调试是其生产工艺中的重要环节。液压检测过程中需要操作铲斗、斗杆、动臂动作,这一过程中流水线挖掘机因带动偏移易发生安全事故及机械损伤故障等情况,需要采用牵引机链条牵引
    的头像 发表于 02-26 08:52 315次阅读
    牵引机和挖掘机装配<b class='flag-5'>流水线</b>自动互锁防呆系统无线通讯应用