0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

如何用Worker pool解决异步任务的问题

Linux爱好者 来源:blog.xizhibei.me 作者:blog.xizhibei.me 2022-06-08 14:58 次阅读

【导读】本文介绍了 Go 移步任务队列的实现。

在一些常见的场景中,如果遇到了某些请求特别耗时间,为了不影响其它用户的请求以及节约服务器资源,我们通常会考虑使用异步任务队列去解决,这样可以快速地处理请求、只返回给用户任务创建结果,等待任务完成之后,我们再告知用户任务的完成情况。

对于 Golang,我们可以通过 Worker pool 异步处理任务,在大多数情况下,如果不在意数据丢失以及服务器性能足够,我们就没有必要考虑别的方案,毕竟这样实现非常简单。

接下来我们先来说说如何用 Worker pool 解决异步任务的问题。

Worker pool

Worker pool,也有叫做 Goroutine pool 的,都是指用 Go channel 以及 Goroutine 来实现的任务队列。Go channel 本质上就是一个队列,因此用作任务队列是很自然的。

在我们不用 Go channel 的时候,我们也许会使用这样的方式来处理异步任务:

fori:=0;i< 100;i++{
gofunc(){
//processjob
}()
}

这样的方式是不推荐的,因为在请求量到达一定程度,系统处理不过来的时候,会造成 Goroutine 的爆炸,拖慢整个系统的运行,甚至程序的崩溃,数据也就完全丢失了。

如果我们用简单的方式,可以看看接下来的例子:一个发送者(也叫做生产者),一个接受者(也叫做消费者,或者 Worker):

typeJobstruct{...}
jobChan:=make(chanJob)
quit:=make(chanbool)
gofunc(){
select{
casejob:=<-jobChan:
   case<- quit:
   return
}
}()

fori:=0;i< 100;i++{
jobChan<- Job{...}
}
close(jobChan)
quit<- true

如果 Worker 不够,我们可以增加,这样可以并行处理任务:

fori:=0;i< 10;i++{
gofunc(){
forjob:=rangejobChan{
//processjob
}
}()
}

这样,一个非常简单的 Worker pool 就完成了,只是,它对任务的处理还会有问题,比如无法设置超时、无法处理 panic 错误等。

实际上,目前已经有很多的开源库可以帮你实现了,以worker pool为关键词在 GitHub 上可以搜到一大堆:

  • GitHub - Jeffail/tunny: A goroutine pool for Go
  • GitHub - gammazero/workerpool: Concurrency limiting goroutine pool

那么,它们的缺点呢?

很明显,它们的缺点就在于缺乏管理,可以说是完全不管任务的结果,即使我们加日志输出也只是为了简单监控,更要命的就是进程重启的时候,比如进程挂了,或者程序更新,都会导致数据丢失,毕竟生产者与消费者在一个进程中的时候,会互相影响(抢占 CPU 与内存资源)。因此前面我也说了,在不管这两个问题的时候,可以考虑用。

如果数据很重要(实际上,我认为用户上传的业务数据都重要,不能丢失),为了解决这些问题,我们必须换一种解决方案。

分布式异步任务队列

接下来再说说异步的分布式任务队列,要用到这个工具的时候,我们大致有以下几个需求:

  • 分布式:生产者与消费者隔离;
  • 数据持久化:在程序重启的时候,不丢失已有的数据;
  • 任务重试:会有任务偶然失败的场景,重试是最简单的方式,但需要保证任务的执行时是冪等的;
  • 任务延时:延迟执行,比如 5 分钟后给用户发红包;
  • 任务结果的临时存储,可用于储存;
  • 任务处理情况监控:及时发现任务执行出错情况;

对于 Python 来说,有个大名鼎鼎的 Celery(https://github.com/celery/celery),它完全包含上面的功能。它包含两个比较重要的组件:一个是消息队列,比如 Redis/RabbitMQ 等,Celery中叫做Broker,然后还需要有数据库,用于存储任务状态,叫做Result Backend

显然对于 Go 也有很多不错的开源库,其中一个学 Celery 的是 Machinery(github.com/RichardKnop/machinery),它目前能满足大部分需求,而且一直在积极维护,也是我们团队目前在用的。

它目前支持的 Broker 有 AMQP(RabbitMQ)、Redis、AWS SQS、GCP Pub/Sub,目前对国内同行来说,RabbitMQ 或者 Redis 会相对比较合适。

另外它还支持几个高级用法:

  1. Groups:允许你定义多个并行的任务,在最后取任务结果的时候,可以一起返回;
  2. Chords:允许你定义一个回调任务,在 Group 任务执行完毕后执行对应的回调任务;
  3. Chains:允许你定义串行执行的任务,任务将会被串行执行;

说了优点,再说说它的缺点:

  1. 任务监控支持不够,目前只有分布式追踪 opentracing 的支持,假如我要使用 prometheus,会比较困难,它的自定义错误处理过于简单,连上下文都不给你;
  2. 传入的参数目前只支持非常简单的参数,不支持 struct、map,还得定义参数的类型,这样的方式会将这个库限制在 Golang 世界中,而无法拓展适用于其它语言;

P.S.

其实对于 Goroutine 的方案,在以下两种情况下,可以考虑使用:

  1. 必须同步返回给用户请求结果;
  2. 服务器资源足够,仅仅用 Worker pool 就能降低请求的响应时长到可接受范围;

这两种方案都会返回请求结果,失败的情况下靠客户端重新请求来解决数据丢失的问题。

原文标题:Golang 中的异步任务队列

文章出处:【微信公众号:Linux爱好者】欢迎添加关注!文章转载请注明出处。

审核编辑:汤梓红
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • Channel
    +关注

    关注

    0

    文章

    31

    浏览量

    11773
  • 异步
    +关注

    关注

    0

    文章

    62

    浏览量

    18043
  • Worker
    +关注

    关注

    0

    文章

    8

    浏览量

    6457

原文标题:Golang 中的异步任务队列

文章出处:【微信号:LinuxHub,微信公众号:Linux爱好者】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    Spring Boot如何实现异步任务

    Spring Boot 提供了多种方式来实现异步任务,这里介绍三种主要实现方式。 1、基于注解 @Async @Async 注解是 Spring 提供的一种轻量级异步方法实现方式,它可以标记在方法上
    的头像 发表于 09-30 10:32 1436次阅读

    鸿蒙原生应用开发-ArkTS语言基础类库多线程TaskPool和Worker的对比(一)

    TaskPool(任务池)和Worker的作用是为应用程序提供一个多线程的运行环境,用于处理耗时的计算任务或其他密集型任务。可以有效地避免这些任务
    发表于 03-25 14:11

    鸿蒙原生应用开发-ArkTS语言基础类库多线程TaskPool和Worker的对比(二)

    易用,支持任务的执行、取消。工作线程数量上限为4。 Worker运作机制 图2 Worker运作机制示意图 创建Worker的线程称为宿主线程(不一定是主线程,工作线程也支持创建
    发表于 03-26 15:25

    鸿蒙原生应用开发-ArkTS语言基础类库多线程TaskPool和Worker的对比(三)

    一、TaskPool注意事项 实现任务的函数需要使用装饰器@Concurrent标注,且仅支持在.ets文件中使用。 实现任务的函数入参需满足序列化支持的类型。 由于不同线程中上下文对象
    发表于 03-27 16:26

    HarmonyOS CPU与I/O密集型任务开发指导

    。 基于多线程并发机制处理CPU密集型任务可以提高CPU利用率,提升应用程序响应速度。 当进行一系列同步任务时,推荐使用Worker;而进行大量或调度点较为分散的独立任务时,不方便使用
    发表于 09-26 16:29

    何用VxWorks的信号量机制实现任务同步

    何用VxWorks的信号量机制实现任务同步
    发表于 03-29 12:25 16次下载

    面向对象嵌入式实时操作系统Worker1.0

    Worker1.0继承图1 Worker1.0主要类的简介3 Worker1.0 API4 Worker1.0移植9 Worker1.0例程
    发表于 04-29 18:01 39次下载
    面向对象嵌入式实时操作系统<b class='flag-5'>Worker</b>1.0

    Android异步任务处理

    移动护理系统开发采用异步处理的方式,可以缩短执行操作的时间,避免UI线程阻塞。笔者介绍了采用异步处理方式开发移动护理程序的方法,并以移动护理中的病人列表异步任务处理为
    发表于 12-30 10:39 3696次阅读

    详解移动通信领域里的组POOL

    在移动通信领域,我们经常会提到Pool的概念。Pool,通常译为水塘、水池。在移动通信中POOL通称为“池”
    的头像 发表于 03-19 16:15 7884次阅读
    详解移动通信领域里的组<b class='flag-5'>POOL</b>

    normal worker_pool详细的创建过程代码分析

    默认 work 是在 normal worker_pool 中处理的。系统的规划是每个 CPU 创建两个 normal worker_pool:一个 normal 优先级 (nice=0)、一个高
    的头像 发表于 04-08 14:35 7457次阅读
    normal <b class='flag-5'>worker_pool</b>详细的创建过程代码分析

    为何需要CMWQ?CMWQ如何解决问题的呢?

    基于这样的思考,在CMWQ中,将这种固定的关系被打破,提出了worker pool这样的概念(其实就是一种thread pool的概念),也就是说,系统中存在若干worker
    的头像 发表于 08-20 14:47 5307次阅读

    介绍一种基于任务异步模式TAP

    TAP是基于任务异步模式,在 .NET Framework 4 中引入。TAP是 APM 和 EAP,是推荐的模式模式。
    的头像 发表于 08-19 11:45 2627次阅读

    ModBus Pool下载

    ModBus Pool下载
    发表于 10-08 09:41 6次下载

    Tokio中hang死所有worker的方法

    原因是 tokio 里的待执行 task 不是简单的放到一个 queue 里,除了 runtime 内共享的,可被每个 worker 消费的run_queue[2],每个 worker 还有一个自己的 lifo_slot[3],只存储一个最后被放入的 task (目的是减
    的头像 发表于 02-03 16:26 975次阅读

    鸿蒙语言基础类库:ohos.worker 启动一个Worker

    Worker是与主线程并行的独立线程。创建Worker的线程称之为宿主线程,Worker自身的线程称之为Worker线程。创建Worker
    的头像 发表于 07-11 17:03 462次阅读
    鸿蒙语言基础类库:ohos.<b class='flag-5'>worker</b> 启动一个<b class='flag-5'>Worker</b>