0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

延迟队列的实现方式

科技绿洲 来源:Java技术指北 作者:Java技术指北 2023-09-30 11:17 次阅读

延迟任务

最近有一个需求,基于消息队列对数据消费,并根据多次消费的结果对数据进行重新组装,如果在指定时间内,需要的数据全部到达,则进行数据组装以及后续逻辑。简单的说,设置一个超时时间,如果在该时间内由MQ中消费到完整的数据则直接处理,否则进入其他流程。

针对这种场景使用了延迟任务来实现,以此为契机对延迟任务相关的技术做了个简单了解...

简介

延迟任务是一种指定任务在未来某个时间点或一定时间后执行的方式。通常情况下,延迟任务可以通过设置任务的执行时间或延迟时间来实现。

延迟任务可以用于异步操作、定时任务和任务调度等场景。例如,在用户注册后发送欢迎邮件或者在用户下单后发送订单确认短信,可以通过延迟任务来实现异步操作。定时检查服务器状态、定时备份数据等任务,也可以通过延迟任务来实现定时任务。在某个时间点触发某个任务、在某个时间段内重复执行某个任务等,可以通过延迟任务来实现任务调度。

延迟任务通常使用队列或者定时器来实现。在队列中,任务会被添加到一个等待队列中,等待队列中的任务会在指定的时间点或延迟时间后被取出执行。在定时器中,任务会被添加到一个定时器中,定时器会在指定的时间点触发任务执行。

总之,延迟任务是一种非常实用的技术,可以帮助我们更好地管理系统中的异步操作、定时任务和任务调度等场景。

使用场景

异步操作:延迟任务可以用于异步操作,例如在用户注册后发送欢迎邮件或者在用户下单后发送订单确认短信。通过使用延迟任务,可以将这些操作推迟到后台处理,从而提高系统的响应速度和并发能力。

定时任务:延迟任务可以用于定时任务,例如定时检查服务器状态、定时备份数据等。通过使用延迟任务,可以在指定的时间点自动触发任务,避免手动操作的繁琐和容易出错。

任务调度:延迟任务可以用于任务调度,例如在某个时间点触发某个任务、在某个时间段内重复执行某个任务等。通过使用延迟任务,可以方便地进行任务调度,提高系统的可靠性和稳定性。

技术实现

  • Timer 基于java基础类库java.util.Timer实现
  • DelayQueue
    基于延时队列实现
  1. 基于内存,应用重启(或宕机)会导致任务丢失
  2. 基于内存存放队列,不支持集群
  3. 依据compareTo方法排列队列,调用take阻塞式的取出第一个任务(不调用则不取出),比较不灵活,会影响时间的准确性
  • ScheduledThreadPoolExecutor
    1. 基于内存,应用重启(或宕机)会导致任务丢失
    2. 基于内存存放任务,不支持集群
    3. 一个任务就要新建一个线程绑定任务的执行,容易造成资源浪费
  • Redis过期监听 基于Redis过期订阅
    1. 客户端断开后重连会导致所有事件丢失
    2. 高并发场景下,存在大量的失效key场景会导出失效时间存在延迟
    3. 若有多个监听器监听该key,是会重复消费这个过期事件的,需要特定逻辑判断
  • MQ延迟队列 基于消息死信队列实现 支持集群,分布式,高并发场景;缺点:引入额外的消息队列,增加项目的部署和维护的复杂度。
  • HashedWheelTimer 基于Netty提供的工具类HashedWheelTimer HashedWheelTimer 是使用定时轮实现的,定时轮其实就是一种环型的数据结构,可以把它想象成一个时钟, 分成了许多格子,每个格子代表一定的时间,在这个格子上用一个链表来保存要执行的超时任务,同时有一个指针一格一格的走,走到那个格子时就执行格子对应的延迟任务,

其中前三种Timer、DelayQueue、ScheduledThreadPoolExecutor实现比较简单,只不过只适用于单体应用,任务数据都在内存中,在系统崩溃后数据丢失;后两张实现相对复杂,并且需要依赖于第三方应用,在系统整体结构上更加复杂且消耗更多资源,但能支持分布式系统,且有较高的容错性。

示例

定义延迟任务对象:

@Getter
public class DelayTask implements Serializable{

    private static final long serialVersionUID = -5062977578344039366L;
    
    private long delaySeconds;
    private TaskExecute taskExecute;

    public DelayTask(long delaySeconds, TaskExecute taskExecute) {
        this.delaySeconds = delaySeconds;
        this.taskExecute = taskExecute;
    }

    /**
     *
     */
    public void execute(){
        taskExecute.run();
    }

    public interface TaskExecute extends Runnable, Serializable {

    }
}

调度器:

public interface ScheduleTrigger {

    /**
     * 延迟任务调度
     * @param delayTask
     */
    void schedule(DelayTask delayTask);
}
  1. Timer
public class JavaTrigger implements ScheduleTrigger{

    private Timer timer;

    public JavaTimer(){
        this.timer = new Timer();
    }
    
    /**
     *
     * @param delayTask
     */
    public void schedule(DelayTask delayTask){
        timer.schedule(buildTimerTask(delayTask.getTaskExecute()), toMillis(delayTask.getDelaySeconds()));
    }

    private TimerTask buildTimerTask(Runnable runnable){
        return new TimerTask() {
            @Override
            public void run() {
                runnable.run();
            }
        };
    }

}
  1. DelayQueue
public class DelayQueueTrigger implements ScheduleTrigger{

    private DelayQueue< Task > queue = new DelayQueue<  >();

    public DelayQueueTrigger() {
        Thread thread = new Thread(() - > {
            while (true) {
                try {
                    Task task = queue.take();
                    if(task != null)
                        task.execute();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

    /**
     * @param delayTask
     */
    public void schedule(DelayTask delayTask){
        if( delayTask instanceof Task ){
            queue.put((Task) delayTask);
        }
    }

}

class Task extends DelayTask implements Delayed{

    private long execTime;

    public Task(long delaySeconds, TaskExecute taskExecute) {
        super(delaySeconds, taskExecute);
        this.execTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delaySeconds);
    }

    /**
     * 轮询执行该方法判断是否满足执行条件(<=0)
     * 同时该返回作为等待时长
     * @param unit the time unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return this.execTime - System.currentTimeMillis(); // ms
    }

    public long getExecTime() {
        return execTime;
    }

    @Override
    public int compareTo(Delayed other) {
        if(this.getExecTime() == ((Task)other).getExecTime()){
            return 0;
        }
        return this.getExecTime() > ((Task)other).getExecTime() ? 1: -1;
    }
}
  1. ScheduledThreadPoolExecutor
    ScheduledThreadPoolExecutor实现也是基于延迟队列BlockingQueue实现
public class ScheduledExecutorTrigger implements ScheduleTrigger{

    private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
    
    public void schedule(DelayTask delayTask){
        executorService.schedule(delayTask.getTaskExecute(), delayTask.getDelaySeconds(), TimeUnit.SECONDS);
    }

}
  1. Redis过期监听
    需要修改redis配置文件:notify-keyspace-events Ex
public class RedisTimer{

    private static final String EXPIRATION_KEY = "REDIS_EXPIRATION_KEY";

    @Configuration
    @Import(RedisAutoConfiguration.class)
    public static class Config{

        @Bean(name = "redisTemplate")
        public RedisTemplate< Object, Object > redisTemplate(RedisConnectionFactory factory) {
            RedisTemplate< Object, Object > template = new RedisTemplate<  >();
            RedisSerializer< String > keySerializer = new StringRedisSerializer();
            RedisSerializer< Object > valueSerializer = new ObjectRedisSerializer();
            template.setConnectionFactory(factory);
            template.setKeySerializer(keySerializer);
            template.setValueSerializer(valueSerializer);
            return template;
        }

        /**
         * 消息监听器容器bean
         * @param connectionFactory
         * @return
         */
        @Bean
        public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            return container;
        }

        @Bean
        public RedisKeyExpirationListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer){
            RedisKeyExpirationListener redisKeyExpirationListener = new RedisKeyExpirationListener(redisMessageListenerContainer);
            redisKeyExpirationListener.setContext(context());
            return redisKeyExpirationListener;
        }

        @Bean
        public Context context(){
            return new Context();
        }

        @Bean
        public RedisTrigger redisTrigger(RedisTemplate redisTemplate){
            return new RedisTrigger(redisTemplate, context());
        }


        class ObjectRedisSerializer implements RedisSerializer{

            @Override
            public byte[] serialize(Object o) throws SerializationException {
                return SerializeUtils.serialize(o);
            }

            @Override
            public Object deserialize(byte[] bytes) throws SerializationException {
                return SerializeUtils.deserialize(bytes);
            }
        }
    }

    public static class RedisTrigger implements ScheduleTrigger{

        private RedisTemplate redisTemplate;
        private Context context;

        public RedisTrigger(RedisTemplate redisTemplate, Context context){
            this.redisTemplate = redisTemplate;
            this.context = context;
        }
        
        public void schedule(DelayTask delayTask){
            context.put(EXPIRATION_KEY, delayTask);
            redisTemplate.opsForValue().set(EXPIRATION_KEY, delayTask, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
        }
    }

    @Slf4j
    public static class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

        private Context context;

        public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
            super(listenerContainer);
        }

        /**
         * 这里没法拿到过期值
         * @param message never {@literal null}.
         */
        @SneakyThrows
        @Override
        public void doHandleMessage(Message message) {
            try {
                String topic = new String(message.getChannel(), "utf-8");
                String key = new String(message.getBody(), "utf-8");
                if (EXPIRATION_KEY.equals(key)) {
                    Object object = context.get(EXPIRATION_KEY);
                    if( object instanceof DelayTask ){
                        log.info("redis key[{}] 过期回调", key);
                        ((DelayTask) object).execute();
                    }
                }
            } catch (Exception e) {
                log.error("处理Redis延迟任务异常:{}", e.getMessage() ,e);
            }
        }

        public void setContext(Context context) {
            this.context = context;
        }
    }

    public static class Context{
        private Map< String,Object > context = new ConcurrentHashMap<  >();

        public void put(String key, Object value){
            context.put(key, value);
        }

        public Object get(String key){
            return context.get(key);
        }
    }
}
  1. MQ延迟队列
    这里MQ选择的是RabbitMq,要知道在RabbitMq中是没有延迟队列的,但可以通过延迟消息插件rabbitmq_delayed_message_exchange实现,另外一种是基于死信来实现。

什么时候消息进入死信?

  • 1)消息消费方调用了basicNack() 或 basicReject(),并且参数都是 requeue = false,则消息会路由进死信队列
  • 2)消息消费过期,过了TTL(消息、或队列设置超时时间) 存活时间,就是消费方在 TTL 时间之内没有消费,则消息会路由进死信队列
  • 3)队列设置了x-max-length 最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的消息会路由进死信队列
public class RabbitTimer{

    @Configuration
    @Import(RabbitAutoConfiguration.class)
    public static class Config{

        static final String TTL_EXCHANGE_FOR_SCHEDULE = "TTL_EXCHANGE_FOR_SCHEDULE";
        static final String TTL_QUEUE_FOR_SCHEDULE = "TTL_QUEUE_FOR_SCHEDULE";
        static final String TTL_ROUTING_KEY_FOR_SCHEDULE = "TTL_ROUTING_KEY_FOR_SCHEDULE";
        static final String COMMON_QUEUE_FOR_SCHEDULE = "COMMON_QUEUE_FOR_SCHEDULE";

        @Bean
        public Queue ttlQueue(){
            return QueueBuilder.durable(TTL_QUEUE_FOR_SCHEDULE).build();
        }

        @Bean
        public Exchange ttlExchange(){
            return ExchangeBuilder.directExchange(TTL_EXCHANGE_FOR_SCHEDULE).build();
        }

        @Bean
        public Binding ttlBinding(){
            return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with(TTL_ROUTING_KEY_FOR_SCHEDULE).noargs();
        }

        @Bean
        public Queue commonQueue(){
            return QueueBuilder.durable(COMMON_QUEUE_FOR_SCHEDULE)
                    .deadLetterExchange(TTL_EXCHANGE_FOR_SCHEDULE)
                    .deadLetterRoutingKey(TTL_ROUTING_KEY_FOR_SCHEDULE)
                    .build();
        }

        @Bean
        public TtlMessageConsumer ttlMessageConsumer(){
            return new TtlMessageConsumer();
        }
        
        @Bean
        public RabbitTrigger rabbitTrigger(RabbitTemplate rabbitTemplate){
            return new RabbitTrigger(rabbitTemplate);
        }
    }

    @Slf4j
    @RabbitListener(queues=TTL_QUEUE_FOR_SCHEDULE)
    public static class TtlMessageConsumer{

        @RabbitHandler
        public void handle(byte [] message){
            Object deserialize = SerializeUtils.deserialize(message);
            if( deserialize instanceof DelayTask ){
                ((DelayTask) deserialize).execute();
            }
        }

    }
    
    public static class RabbitTrigger implements ScheduleTrigger{

        @Autowired
        private RabbitTemplate rabbitTemplate;

        public RabbitTrigger(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
        
        public void schedule(DelayTask delayTask){
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setExpiration( String.valueOf(TimeUnit.SECONDS.toMillis(delayTask.getDelaySeconds())));
            Message message = new Message(SerializeUtils.serialize(delayTask), messageProperties);
            rabbitTemplate.send(COMMON_QUEUE_FOR_SCHEDULE, message);
        }

    }

}
  1. HashedWheelTimer
public class NettyTrigger implements ScheduleTrigger {

    HashedWheelTimer timer = new HashedWheelTimer(200,
            TimeUnit.MILLISECONDS,
            100); // 时间轮中的槽数

    /**
     *
     */
    @Override
    public void schedule(DelayTask delayTask){
        TimerTask task = timeout - > delayTask.execute();
        //
        timer.newTimeout(task, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
    }

}

测试:

ScheduleTrigger.schedule(DelayTask delayTask);

结束语

通过几个简单的示例了解延迟队列的实现方式,可以根据实际业务场景以及应用架构做出合理的选择。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 数据
    +关注

    关注

    8

    文章

    6867

    浏览量

    88800
  • 服务器
    +关注

    关注

    12

    文章

    9010

    浏览量

    85163
  • 内存
    +关注

    关注

    8

    文章

    2996

    浏览量

    73870
  • 定时器
    +关注

    关注

    23

    文章

    3237

    浏览量

    114435
  • 延迟
    +关注

    关注

    1

    文章

    70

    浏览量

    13511
收藏 人收藏

    评论

    相关推荐

    利用CAS技术实现无锁队列

    【 导读 】:本文 主要讲解利用CAS技术实现无锁队列。 关于无锁队列实现,网上有很多文章,虽然本文可能和那些文章有所重复,但是我还是想以我自己的
    的头像 发表于 01-11 10:52 2251次阅读
    利用CAS技术<b class='flag-5'>实现</b>无锁<b class='flag-5'>队列</b>

    深度解析数据结构与算法篇之队列及环形队列实现

    的位置。 02 — 环形队列实现 要想将元素放入队列我们必须知道对头和队尾,在队列长度不能无限大的条件下我们还要知道队列的最大容量,我们还
    的头像 发表于 06-18 10:07 1893次阅读

    TencentOS-tiny中环形队列实现

    ; 队尾指针(可变):永远指向此队列的最后一个数据元素; 队列中的数据存储方式有两种: ① 基于静态连续内存(数组)存储,如图:② 基于动态内存(链表节点)存储,如图: ❝ 后续都使用基于静态内存存储的
    的头像 发表于 10-08 16:30 1359次阅读

    QueueForMcu 基于单片机实现队列功能模块

    QueueForMcu基于单片机实现队列功能模块,主要用于8位、16位、32位非运行RTOS的单片机应用,兼容大多数单片机平台。一、特性动态创建队列对象动态设置队列数据缓冲区静态指定
    发表于 12-31 19:35 1次下载
    QueueForMcu 基于单片机<b class='flag-5'>实现</b>的<b class='flag-5'>队列</b>功能模块

    RTOS消息队列的多种用途

      消息队列可以以多种不同的方式使用。事实上,您可以编写可能只使用消息队列的相当复杂的应用程序。仅使用消息队列可以减少代码的大小(即占用空间),因为可以模拟许多其他服务(信号量、时间
    的头像 发表于 06-29 14:57 2501次阅读
    RTOS消息<b class='flag-5'>队列</b>的多种用途

    实现一个双端队列的步骤简析

    队列是非常基础且重要的数据结构,双端队列属于队列的升级。很多的算法都是基于队列实现,例如搜索中的bfs,图论中的spfa,计算几何中的me
    的头像 发表于 10-27 18:11 1403次阅读

    什么是消息队列?消息队列中间件重要吗?

    应用解耦:消息队列减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心彼此的实现细节。
    的头像 发表于 11-07 14:55 1371次阅读

    如何用Redis实现延迟队列呢?

    前段时间有个小项目需要使用延迟任务,谈到延迟任务,我脑子第一时间一闪而过的就是使用消息队列来做,比如RabbitMQ的死信队列又或者RocketMQ的
    的头像 发表于 03-16 14:28 644次阅读

    一种异步延迟队列实现方式调研

    目前系统中有很多需要用到延时处理的功能:支付超时取消、排队超时、短信、微信等提醒延迟发送、token刷新、会员卡过期等等。
    的头像 发表于 03-31 10:10 590次阅读

    嵌入式环形队列和消息队列实现

    嵌入式环形队列和消息队列实现数据缓存和通信的常见数据结构,广泛应用于嵌入式系统中的通信协议和领域。
    的头像 发表于 04-14 11:52 1513次阅读

    嵌入式环形队列和消息队列是如何去实现的?

    嵌入式环形队列和消息队列实现数据缓存和通信的常见数据结构,广泛应用于嵌入式系统中的通信协议和领域。
    发表于 05-20 14:55 1093次阅读

    单片机消息队列实现原理和机制

    单片机开发过程中通常会用到“消息队列”,一般实现的方法有多种。 本文给大家分享一下队列实现的原理和机制。
    的头像 发表于 05-26 09:50 1470次阅读
    单片机消息<b class='flag-5'>队列</b>的<b class='flag-5'>实现</b>原理和机制

    RTOS消息队列的应用

    基于RTOS的应用中,通常使用队列机制实现任务间的数据交互,一个应用程序可以有任意数量的消息队列,每个消息队列都有自己的用途。
    发表于 05-29 10:49 613次阅读
    RTOS消息<b class='flag-5'>队列</b>的应用

    Disruptor高性能队列的原理

    许多应用程序依靠队列在处理阶段之间交换数据。我们的性能测试表明,当以这种方式使用队列时,其延迟成本与磁盘(基于RAID或SSD的磁盘系统)的IO操作成本处于同一数量级都很慢。如果在一个
    的头像 发表于 07-26 10:47 652次阅读
    Disruptor高性能<b class='flag-5'>队列</b>的原理

    嵌入式环形队列与消息队列实现原理

    嵌入式环形队列,也称为环形缓冲区或循环队列,是一种先进先出(FIFO)的数据结构,用于在固定大小的存储区域中高效地存储和访问数据。其主要特点包括固定大小的数组和两个指针(头指针和尾指针),分别指向队列的起始位置和结束位置。
    的头像 发表于 09-02 15:29 323次阅读