0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

一种异步延迟队列的实现方式调研

OSC开源社区 来源:京东云开发者 2023-03-31 10:10 次阅读

一、应用场景

目前系统中有很多需要用到延时处理的功能:支付超时取消、排队超时、短信、微信等提醒延迟发送、token刷新、会员卡过期等等。通过延时处理,极大地节省系统的资源,不必轮询数据库处理任务。

目前大部分功能通过定时任务完成,定时任务还分使用quartz及xxljob两种类型轮询时间短,每秒执行一次,对数据库造成一定的压力,并且会有1秒的误差。轮询时间久,如30分钟一次,03:01插入一条数据,正常3:31执行过期,但是3:30执行轮询时,扫描330的数据,是扫描不到3:31的数据的,需要4:00的时候才能扫描到,相当于多延迟了29分钟!

二、演示处理方式调研

1.DelayQueue

实现方式:

jvm提供的延迟阻塞队列,通过优先级队列对不同延迟时间任务进行排序,通过condition进行阻塞、睡眠dealy时间 获取延迟任务。

当有新任务加入时,会判断新任务是否是第一个待执行的任务,若是,会解除队列睡眠,防止新加入的元素时需要执行的元素而不能正常被执行线程获取到。

存在的问题:

单机运行,系统宕机后,无法进行有效的重试

没有执行记录和备份

没有重试机制

系统重启时,会将任务清空!

不能分片消费

优势: 实现简单,无任务时阻塞,节省资源,执行时间准确

2.延迟队列mq

实现方式:依赖mq,通过设置延迟消费时间,达到延迟消费功能。像rabbitMq、jmq都可以设置延迟消费时间。RabbitMq通过将消息设置过期时间,放入私信队列进行消费实现。

存在的问题:时间设置不灵活,每个queue是固定的到期时间,每次新创建延时队列,需要创建新的消息队列

优点:依靠jmq,可以有效的监控、消费记录、重试,具备多机同时消费能力,不惧怕宕机

3.定时任务

通过定时任务轮询符合条件的数据

缺点:

必须要读业务数据库,对数据库造成一定的压力,

存在延时

一次扫描数据量过大时,占用过多的系统资源。

无法分片消费

优点:

消费失败后,下次还能继续消费,具备重试能力,

消费能力稳定

4.redis

任务存储在redis中,使用redis的 zset队列根据score进行排序,程序通过线程不断获取队列数据消费,实现延时队列

优点:

查询redis相比较数据库快,set队列长度过大,会根据跳表结构进行查询,效率高

redis可根据时间戳进行排序,只需要查询当前时间戳内的分数的任务即可

无惧机器重启

分布式消费

缺点:

受限于redis性能,并发10W

多个命令无法保证原子性,使用lua脚本会要求所有数据都在一个redis分片上。

5. 时间轮

通过时间轮实现的延迟任务执行,也是基于jvm单机运行,如kafka、netty都有实现时间轮,redisson的看门狗也是通过netty的时间轮实现的。

缺点:不适合分布式服务的使用,宕机后,会丢失任务。

d0c5de70-cf06-11ed-bfe3-dac502259ad0.jpg

三、实现目标

兼容目前在使用的异步事件组件,并提供更可靠,可重试、有记录、可监控报警、高性能的延迟组件。

消息传输可靠性:消息进入到延迟队列后,保证至少被消费一次。

Client支持丰富:支持多重语言。

高可用性:支持多实例部署。挂掉一个实例后,还有后备实例继续提供服务。

实时性:允许存在一定的时间误差。

支持消息删除:业务使用方,可以随时删除指定消息。

支持消费查询

支持手动重试

对当前异步事件的执行增加监控

四、架构设计

d0db2f8c-cf06-11ed-bfe3-dac502259ad0.png

五、延迟组件实现方式

1.实现原理

目前选择使用jimdb通过zset实现延时功能,将任务id和对应的执行时间作为score存在在zset队列中,默认会按照score排序,每次取0-当前时间内的score的任务id,

发送延迟任务时,会根据时间戳+机器ip+queueName+sequence 生成唯一的id,构造消息体,加密后放入zset队列中。

通过搬运线程,将达到执行时间的任务移动到发布队列中,等待消费者获取。

监控方通过集成ump

消费记录通过redis备份+数据库持久化完成。

通过缓存实现的方式,只是实现的一种,可以通过参数控制使用哪一种实现方式,并可通过spi自由扩展。

2.消息结构

每个Job必须包含以下几个属性:

Topic:Job类型,即QueueName

Id:Job的唯一标识。用来检索和删除指定的Job信息

Delay:Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)

Body:Job的内容,供消费者做具体的业务处理,以json格式存储。

traceId:发送线程的traceId,待后续pfinder支持设置traceId后,可与发送线程公用同一个traceiD,便于日志追踪

具体结构如下图表示:

d113aa24-cf06-11ed-bfe3-dac502259ad0.png

TTR的设计目的是为了保证消息传输的可靠性。

3.数据流转及流程图

d12e09be-cf06-11ed-bfe3-dac502259ad0.png

基于redis-disruptor方式进行发布、消费,可以作为消息来进行使用,消费者采用原有异步事件的disruptor无锁队列消费,不同应用、不同queue之间无锁

1)支持应用只发布,不消费,达到消息队列的功能。

2)支持分桶,针对大key问题,若事件多,可以设置延迟队列和任务队列桶的数量,减小因大key造成的redis阻塞问题。

3)通过ducc配置,进行性能的扩展,目前只支持开启消费和关闭消费。

4)支持设置超时时间配置,防止消费线程执行过久

瓶颈:消费速度慢,生产速度过快,会导致ringbuffer队列占满,当前应用既是生产者也是消费者时,生产者会休眠,性能取决于消费速度,可通过水平扩展机器,直接提升性能。监控redis队列的长度,若不断增长,可考虑增加消费者,直接提高性能。

可能出现的情况:因一个应用公用一个disruptor,拥有64个消费者线程,如果某一个事件消费过慢,导致64个线程都在消费这个事件,会导致其他事件无消费线程消费,生产者线程也被阻塞,导致所有事件的消费都被阻塞。

后期观察是否有这个性能瓶颈,可给每一个queue一个消费者线程池。

六、demo示例

增加配置文件

判断是否开启jd.event.enable:true

 com.jd.car
 senna-event
 1.0-SNAPSHOT 
配置
jd:
senna:
event:
enable: true
queue:
retryEventQueue:
bucketNum: 1
handleBean: retryHandle
消费代码:

package com.jd.car.senna.admin.event;


import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;


/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@Component("retryHandle")
public class RetryQueueEvent extends EventHandler {


@Override
protected void onHandle(String key, String eventType) {
log.info("Handler开始消费:{}", key);
}


@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler开始消费:{}", key);
}
}

注解形式:


package com.jd.car.senna.admin.event;


import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;


/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)
public class TestQueueEvent extends EventHandler {


@Override
protected void onHandle(String key, String eventType) {
log.info("Handler开始消费:{}", key);
}


@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler开始消费:{}", key);
}
}

发送代码:


package com.jd.car.senna.admin.controller;


import com.jd.car.senna.event.queue.IEventQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;


import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;




/**
* @author zly
*/
@RestController
@Slf4j
public class DemoController {


@Lazy
@Resource(name = "testQueue")
private IEventQueue eventQueue;


@ResponseBody
@GetMapping("/api/v1/demo")
public String demo() {
log.info("发送无延迟消息");
eventQueue.push("no delay 5000 millseconds message 3");
return "ok";
}


@ResponseBody
@GetMapping("/api/v1/demo1")
public String demo1() {
log.info("发送延迟5秒消息");
eventQueue.push(" delay 5000 millseconds message,name",1000*5L);
return "ok";
}


@ResponseBody
@GetMapping("/api/v1/demo2")
public String demo2() {
log.info("发送延迟到2022-04-02 00:00:00执行的消息");
eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));
return "ok";
} 


}





审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 看门狗
    +关注

    关注

    10

    文章

    559

    浏览量

    70746
  • SPI接口
    +关注

    关注

    0

    文章

    258

    浏览量

    34341
  • JVM
    JVM
    +关注

    关注

    0

    文章

    157

    浏览量

    12207
  • Redis
    +关注

    关注

    0

    文章

    371

    浏览量

    10846

原文标题:一种异步延迟队列的实现方式

文章出处:【微信号:OSC开源社区,微信公众号:OSC开源社区】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    用FPGA芯片实现高速异步FIFO的一种方法

    现代集成电路芯片中,随着设计规模的不断扩大。个系统中往往含有数个时钟。多时钟带来的个问题就是,如何设计异步时钟之间的接口电路。异步 FIFO(First In First Out)
    发表于 05-28 10:56 3789次阅读

    延迟队列实现方式

    由MQ中消费到完整的数据则直接处理,否则进入其他流程。 针对这种场景使用了延迟任务来实现,以此为契机对延迟任务相关的技术做了个简单了解... 简介 延迟任务是
    的头像 发表于 09-30 11:17 792次阅读

    Spring Boot如何实现异步任务

    Spring Boot 提供了多种方式实现异步任务,这里介绍三主要实现方式。 1、基于注解
    的头像 发表于 09-30 10:32 1407次阅读

    异步电路匹配延迟延迟线,如何设计能实现自适应连续地调整延迟长度

    异步电路匹配延迟延迟线如何设计能实现自适应连续地调整延迟长度。这是个新的想法,希望有大神能和
    发表于 03-19 17:19

    请问怎样去设计一种异步FIFO?

    为什么要设计一种异步FIFO?异步FIFO的设计原理是什么?怎样去设计一种异步FIFO?
    发表于 06-18 09:20

    怎样去设计一种采用覆盖机制的FIFO队列模型呢

    FIFO队列是什么?怎样去设计一种采用覆盖机制的FIFO队列模型呢?
    发表于 12-08 06:07

    实现队列环形缓冲的方法

    串口队列环形缓冲区队列串口环形缓冲的好处代码实现队列  要实现队列环形缓冲,还需要
    发表于 02-21 07:11

    如何去实现一种队列程序的设计呢

    队列的原理是什么?队列有何作用?如何去实现一种队列程序的设计呢?
    发表于 02-25 07:50

    一种改进的主动队列管理算法

    主动队列管理是实现网络拥塞控制的重要技术,但是多数主动队列管理算法如随机早期检(RED)都存在对参数依赖性强的问题。针对RED算法中平均队列长度不能完全反映网络拥塞状况的
    发表于 04-13 09:08 14次下载

    一种高效的磁盘队列I/O机制

    分析了传统磁盘队列的存储管理开销和读写性能,针对磁盘队列I/O已成为影响消息服务器性能的首要瓶颈,提出了一种高效磁盘队列I/O机制—FlashQ。FlashQ采用物理上连续的磁盘块
    发表于 05-14 19:51 32次下载

    异步传输方式的HDLC协议的实现

    研究实现一种 HDLC (High Level Data Link Contr01)协议的改进方法,该方法把HDLC协议传统的同步传榆方式改成了异步传输
    发表于 07-20 17:25 62次下载
    <b class='flag-5'>异步</b>传输<b class='flag-5'>方式</b>的HDLC协议的<b class='flag-5'>实现</b>

    一种基于信号延迟的光网络攻击方式

    针对光网络攻击易被发现的问题,提出一种基于信号延迟插入的光网络攻击方式。该方法在不改变链路光学性能的基础上,利用信号延迟在系统中引起较高的串扰,极大的降低了系统的
    发表于 03-20 15:34 27次下载
    <b class='flag-5'>一种</b>基于信号<b class='flag-5'>延迟</b>的光网络攻击<b class='flag-5'>方式</b>

    TencentOS-tiny中环形队列实现

    1. 什么是队列队列(queue)是一种只能在端插入元素、在另端删除元素的数据结构,遵循「先入先出」(FIFO)的规则。 队列中有两个基
    的头像 发表于 10-08 16:30 1364次阅读

    如何用Redis实现延迟队列呢?

    前段时间有个小项目需要使用延迟任务,谈到延迟任务,我脑子第时间闪而过的就是使用消息队列来做,比如RabbitMQ的死信
    的头像 发表于 03-16 14:28 645次阅读

    JavaWeb消息队列使用指南

    在现代的JavaWeb应用中,消息队列(Message Queue)是一种常见的技术,用于异步处理任务、解耦系统组件、提高系统性能和可靠性。 1. 消息队列的基本概念 消息
    的头像 发表于 11-25 09:27 62次阅读