0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Bamboo-pipeline:Python高效流程编排引擎

科技绿洲 来源:Python实用宝典 作者:Python实用宝典 2023-10-31 16:39 次阅读

Bamboo-pipeline 是蓝鲸智云旗下SaaS标准运维的流程编排引擎。其具备以下特点:

  1. 多种流程模式 :支持串行、并行,支持子流程,可以根据全局参数自动选择分支执行,节点失败处理机制可配置。
  2. 参数引擎 :支持参数共享,支持参数替换。
  3. 可交互的任务执行 :任务执行中可以随时暂停、继续、撤销,节点失败后可以重试、跳过。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,可以访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器,它有许多的优点:Python 编程的最好搭档—VSCode 详细指南

请选择以下任一种方式输入命令安装依赖

  1. Windows 环境 打开 Cmd (开始-运行-CMD)。
  2. MacOS 环境 打开 Terminal (command+空格输入Terminal)。
  3. 如果你用的是 VSCode编辑器 或 Pycharm,可以直接使用界面下方的Terminal.
pip install bamboo-engine
pip install bamboo-pipeline
pip install django
pip install celery

2. 项目初始化

(选项一:无Django项目) 如果你没有任何的现成Django项目,请按下面的流程初始化

由于 ** bamboo-pipeline ** 运行时基于 Django 实现,所以需要新建一个 Django 项目:

django-admin startproject easy_pipeline
cd easy_pipeline

在 ** easy_pipeline.settings.py ** 下添加如下配置:

from pipeline.eri.celery.queues import *
from celery import Celery

app = Celery("proj")

app.config_from_object("django.conf:settings")

INSTALLED_APPS = [
    ...
    "pipeline",
    "pipeline.engine",
    "pipeline.component_framework",
    "pipeline.eri",
    ...
]

在 ** easy_pipeline **目录下初始化数据库:

python manage.py migrate

(选项二:有Django项目需要使用流程引擎) 如果你有现成的PipeLine项目需要使用此流程引擎,请在项目的** settings.py **下添加如下配置:

from pipeline.eri.celery.queues import *
from celery import Celery

app = Celery("proj")

app.config_from_object("django.conf:settings")

INSTALLED_APPS = [
    ...
    "pipeline",
    "pipeline.engine",
    "pipeline.component_framework",
    "pipeline.eri",
    ...
]

然后重新执行migrate,生成pipeline相关的流程模型:

python manage.py migrate

migrate 执行完毕后会如下图所示:

图片

由于是在原有项目上使用流程引擎,可能会遇到一些版本不匹配的问题,如果遇到报错,请排查解决或到蓝鲸官网上进行询问。

3. 简单的流程例子

首先在项目目录下启动 celery worker:

python manage.py celery worker -Q er_execute,er_schedule --pool=solo -l info

启动成功类似下图所示:

图片

(注意) 如果你是在你的原有Django项目上做改造,它并不一定能够顺利地启动成功,这是因为Pipeline使用了 Django 2.2.24,会存在许多版本不兼容的情况。如果遇到报错,请排查解决或到蓝鲸官网上进行询问。

在下面的例子中,我们将会创建并执行一个简单的流程:

图片

3.1 创建流程APP

在 bamboo_pipeline 中,一个流程由多个组件组成,官方推荐使用APP统一管控组件:

python manage.py create_plugins_app big_calculator

该命令会在 Django 工程根目录下生成拥有以下目录结构的 APP:

big_calculator
├── __init__.py
├── components
│   ├── __init__.py
│   └── collections
│   ├── __init__.py
│   └── plugins.py
├── migrations
│   └── __init__.py
└── static
    └── big_calculator
        └── plugins.js

别忘了把新创建的这个插件添加到 Django 配置的 **INSTALLED_APPS **中:

INSTALLED_APPS = (
    ...
    'big_calculator',
    ...
)

3.2 编写流程的Service原子

组件服务 ** Service是组件的核心,Service ** 定义了组件被调用时执行的逻辑,下面让我们实现一个计算传入的参数n的阶乘,并把结果写到输出中的 ** Service ** ,在 **big_calculator/components/collections/plugins.py ** 中输入以下代码:

import math
from pipeline.core.flow.activity import Service


class FactorialCalculateService(Service):

    def execute(self, data, parent_data):
        """
        组件被调用时的执行逻辑
        :param data: 当前节点的数据对象
        :param parent_data: 该节点所属流程的数据对象
        :return:
        """
        n = data.get_one_of_inputs('n')
        if not isinstance(n, int):
            data.outputs.ex_data = 'n must be a integer!'
            return False

        data.outputs.factorial_of_n = math.factorial(n)
        return True

    def inputs_format(self):
        """
        组件所需的输入字段,每个字段都包含字段名、字段键、字段类型及是否必填的说明。
        :return:必须返回一个 InputItem 的数组,返回的这些信息能够用于确认该组件需要获取什么样的输入数据。
        """
        return [
            Service.InputItem(name='integer n', key='n', type='int', required=True)
        ]

    def outputs_format(self):
        """
        组件执行成功时输出的字段,每个字段都包含字段名、字段键及字段类型的说明
        :return: 必须返回一个 OutputItem 的数组, 便于在流程上下文或后续节点中进行引用
        """
        return [
            Service.OutputItem(name='factorial of n', key='factorial_of_n', type='int')
        ]

首先我们继承了 ** Service基类,并实现了execute() ** 和 **outputs_format() ** 这两个方法,他们的作用如下:

  1. execute :组件被调用时执行的逻辑。接收 data 和 parent_data 两个参数。
    其中,data 是当前节点的数据对象,这个数据对象存储了用户传递给当前节点的参数的值以及当前节点输出的值。parent_data 则是该节点所属流程的数据对象,通常会将一些全局使用的常量存储在该对象中,如当前流程的执行者、流程的开始时间等。
  2. outputs_format :组件执行成功时输出的字段,每个字段都包含字段名、字段键及字段类型的说明。这个方法必须返回一个 OutputItem 的数组,返回的这些信息能够用于确认某个组件在执行成功时输出的数据,便于在流程上下文或后续节点中进行引用。
  3. inputs_format :组件所需的输入字段,每个字段都包含字段名、字段键、字段类型及是否必填的说明。这个方法必须返回一个 InputItem 的数组,返回的这些信息能够用于确认某个组件需要获取什么样的输入数据。

下面我们来看一下 **execute() **方法内部执行的逻辑,首先我们尝试从当前节点数据对象的输出中获取输入参数n,如果获取到的参数不是一个 int 实例,那么我们会将异常信息写入到当前节点输出的 **ex_data **字段中, 这个字段是引擎内部的保留字段,节点执行失败时产生的异常信息都应该写入到该字段中 。随后我们返回 ** False ** 代表组件本次执行失败,随后节点会进入失败状态:

n = data.get_one_of_inputs('n')
if not isinstance(n, int):
    data.outputs.ex_data = 'n must be a integer!'
    return False

若获取到的 n 是一个正常的 int,我们就调用 **math.factorial() **函数来计算 n 的阶乘,计算完成后,我们会将结果写入到输出的 factorial_of_n 字段中,以供流程中的其他节点使用:

data.outputs.factorial_of_n = math.factorial(n)
return True

3.3 编写流程组件,绑定Service原子

完成 Service 的编写后,我们需要将其与一个 Component 绑定起来,才能够注册到组件库中,在**big_calculatorcomponents__init__.py **文件下添加如下的代码:

import logging
from pipeline.component_framework.component import Component
from big_calculator.components.collections.plugins import FactorialCalculateService

logger = logging.getLogger('celery')


class FactorialCalculateComponent(Component):
    name = 'FactorialCalculateComponent'
    code = 'fac_cal_comp'
    bound_service = FactorialCalculateService

我们定义了一个继承自基类 **Component **的类 FactorialCalculateComponent ,他拥有以下属性:

1.name:组件名。
2.code:组件代码,这个代码必须是全局唯一的。
3.bound_service:与该组件绑定的 Service。

这样一来,我们就完成了一个流程原子的开发。

3.4 生成流程,测试刚编写的组件

在 big_calculatortest.py 写入以下内容,生成一个流程,测试刚刚编写的组件:

# Python 实用宝典
# 2021/06/20

import time

from bamboo_engine.builder import *
from big_calculator.components import FactorialCalculateComponent
from pipeline.eri.runtime import BambooDjangoRuntime
from bamboo_engine import api
from bamboo_engine import builder


def bamboo_playground():
    """
    测试流程引擎
    """
    # 使用 builder 构造出流程描述结构
    start = EmptyStartEvent()
    # 这里使用 我们刚创建好的n阶乘组件
    act = ServiceActivity(component_code=FactorialCalculateComponent.code)
    # 传入参数
    act.component.inputs.n = Var(type=Var.PLAIN, value=4)
    end = EmptyEndEvent()

    start.extend(act).extend(end)

    pipeline = builder.build_tree(start)
    api.run_pipeline(runtime=BambooDjangoRuntime(), pipeline=pipeline)

    # 等待 1s 后获取流程执行结果
    time.sleep(1)

    result = api.get_execution_data_outputs(BambooDjangoRuntime(), act.id).data

    print(result)

随后,在命令行输入:

python manage.py shell

打开 django console, 输入以下命令,执行此流程:

from big_calculator.test import bamboo_playground
bamboo_playground()

流程运行完后,获取节点的执行结果,可以看到,该节点输出了 factorial_of_n,并且值为 24(4 * 3 * 2 *1),这正是我们需要的效果:

{'_loop': 0, '_result': True, 'factorial_of_n': 24}

恭喜你,你已经成功的创建了一个流程并把它运行起来了!在这期间你可能会遇到不少的坑,建议尝试先自行解决,如果实在无法解决,可以前往 标准运维 仓库提 issues,或者前往蓝鲸智云官网提问。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • WINDOWS
    +关注

    关注

    3

    文章

    3524

    浏览量

    88437
  • 参数
    +关注

    关注

    11

    文章

    1786

    浏览量

    32090
  • 编辑器
    +关注

    关注

    1

    文章

    801

    浏览量

    31120
  • python
    +关注

    关注

    56

    文章

    4782

    浏览量

    84456
收藏 人收藏

    评论

    相关推荐

    Python中的流程控制

    流程控制无非就是if else之类的控制语句,今天我们来看一下Python中的流程控制会有什么不太一样的地方。
    发表于 06-28 08:54

    python基础语法及流程控制

    爬虫复习1.python基础python基础语法 流程控制 函数封装2.防爬措施整体防爬User-AgentrefererIP代理池Cookie代理池 各自防爬数据内部动态加载网页设置有干扰项标签
    发表于 08-31 07:41

    什么是Python中的流程控制?

    什么是Python中的流程控制?
    发表于 10-09 07:24

    Pipeline ADCs Come of Age

    Pipeline ADCs Come of Age Abstract: In the mid 1970s, a new data converter architecture
    发表于 04-16 16:21 1096次阅读
    <b class='flag-5'>Pipeline</b> ADCs Come of Age

    Pipeline ADCs Come of Age

    and mixed-signal community, called pipeline ADCs. The following article takes the knowledge of advantages and disadvantages of the pipeline
    发表于 04-25 10:22 1078次阅读
    <b class='flag-5'>Pipeline</b> ADCs Come of Age

    LCD汉字编排软件

    LCD汉字编排软件LCD汉字编排软件LCD汉字编排软件LCD汉字编排软件
    发表于 12-28 14:31 7次下载

    python爬虫入门教程之python爬虫视频教程分布式爬虫打造搜索引擎

    本文档的主要内容详细介绍的是python爬虫入门教程之python爬虫视频教程分布式爬虫打造搜索引擎
    发表于 08-28 15:32 29次下载

    Vivado综合引擎的增量综合流程

    从 Vivado 2019.1 版本开始,Vivado 综合引擎就已经可以支持增量流程了。这使用户能够在设计变化较小时减少总的综合运行时间。
    发表于 07-21 11:02 1645次阅读

    什么是流程/规则编排

    以上两种基本代表了传统的编排思想,在简单的例子下,看起来也是非常直观,但,当变动发生时,尤其是需要灵活调整的场景,他们的表现又如何呢?
    的头像 发表于 09-21 10:04 1929次阅读

    Python一键转化代码为流程

    而今天我们要介绍的项目,就是基于Python和Graphviz开发的,能将源代码转化为流程图的工具:pycallgraph。
    的头像 发表于 02-24 11:19 3966次阅读
    <b class='flag-5'>Python</b>一键转化代码为<b class='flag-5'>流程</b>图

    一图读懂CodeArts Pipeline全新升级,5大特性使能企业研发治理

    2023 年2月27日,华为云正式发布流水线服务 CodeArts Pipeline ,旨在提升编排体验,开放插件平台,并提供标准化的DevOps企业治理模型,将华为公司内的优秀研发实践赋能给伙伴
    的头像 发表于 03-25 07:50 884次阅读
    一图读懂CodeArts <b class='flag-5'>Pipeline</b>全新升级,5大特性使能企业研发治理

    开箱即用!教你如何正确使用华为云CodeArts Pipeline

    软件持续交付流水线是一个可视化的自动化任务编排调度平台,串联编译构建、代码检查、自动化测试、部署发布等任务,承载软件从代码提交到发布上线全自动化流程。一次配置后即可重复触发执行,避免频繁低效
    的头像 发表于 08-30 11:20 1311次阅读
    开箱即用!教你如何正确使用华为云CodeArts <b class='flag-5'>Pipeline</b>!

    Python 如何一键转化代码为流程

    Graphviz是一个可以对图进行自动布局的绘图工具,由贝尔实验室开源。我们在上次 Python 快速绘制画出漂亮的系统架构图 提到的diagrams,其内部的编排逻辑就用到了这个开源工具包。 而今
    的头像 发表于 11-01 10:39 2133次阅读
    <b class='flag-5'>Python</b> 如何一键转化代码为<b class='flag-5'>流程</b>图

    什么是pipeline?Go中构建流数据pipeline的技术

    本文介绍了在 Go 中构建流数据pipeline的技术。 处理此类pipeline中的故障很棘手,因为pipeline中的每个阶段可能会阻止尝试向下游发送值,并且下游阶段可能不再关心传入的数据。
    的头像 发表于 03-11 10:16 554次阅读

    行云流水线 满足你对工作流编排的一切幻想~skr

    流水线模型 众所周知,DevOps流水线(DevOps pipeline)的本质是实现自动化工作流程,用于支持软件开发、测试和部署的连续集成、交付和部署(CI/CD)实践。它是DevOps方法论
    的头像 发表于 08-05 13:42 244次阅读