延迟任务
最近有一个需求,基于消息队列对数据消费,并根据多次消费的结果对数据进行重新组装,如果在指定时间内,需要的数据全部到达,则进行数据组装以及后续逻辑。简单的说,设置一个超时时间,如果在该时间内由MQ中消费到完整的数据则直接处理,否则进入其他流程。
针对这种场景使用了延迟任务来实现,以此为契机对延迟任务相关的技术做了个简单了解...
简介
延迟任务是一种指定任务在未来某个时间点或一定时间后执行的方式。通常情况下,延迟任务可以通过设置任务的执行时间或延迟时间来实现。
延迟任务可以用于异步操作、定时任务和任务调度等场景。例如,在用户注册后发送欢迎邮件或者在用户下单后发送订单确认短信,可以通过延迟任务来实现异步操作。定时检查服务器状态、定时备份数据等任务,也可以通过延迟任务来实现定时任务。在某个时间点触发某个任务、在某个时间段内重复执行某个任务等,可以通过延迟任务来实现任务调度。
延迟任务通常使用队列或者定时器来实现。在队列中,任务会被添加到一个等待队列中,等待队列中的任务会在指定的时间点或延迟时间后被取出执行。在定时器中,任务会被添加到一个定时器中,定时器会在指定的时间点触发任务执行。
总之,延迟任务是一种非常实用的技术,可以帮助我们更好地管理系统中的异步操作、定时任务和任务调度等场景。
使用场景
异步操作:延迟任务可以用于异步操作,例如在用户注册后发送欢迎邮件或者在用户下单后发送订单确认短信。通过使用延迟任务,可以将这些操作推迟到后台处理,从而提高系统的响应速度和并发能力。
定时任务:延迟任务可以用于定时任务,例如定时检查服务器状态、定时备份数据等。通过使用延迟任务,可以在指定的时间点自动触发任务,避免手动操作的繁琐和容易出错。
任务调度:延迟任务可以用于任务调度,例如在某个时间点触发某个任务、在某个时间段内重复执行某个任务等。通过使用延迟任务,可以方便地进行任务调度,提高系统的可靠性和稳定性。
技术实现
- 基于内存,应用重启(或宕机)会导致任务丢失
- 基于内存存放队列,不支持集群
- 依据compareTo方法排列队列,调用take阻塞式的取出第一个任务(不调用则不取出),比较不灵活,会影响时间的准确性
- ScheduledThreadPoolExecutor
- 基于内存,应用重启(或宕机)会导致任务丢失
- 基于内存存放任务,不支持集群
- 一个任务就要新建一个线程绑定任务的执行,容易造成资源浪费
- Redis过期监听 基于Redis过期订阅
- 客户端断开后重连会导致所有事件丢失
- 高并发场景下,存在大量的失效key场景会导出失效时间存在延迟
- 若有多个监听器监听该key,是会重复消费这个过期事件的,需要特定逻辑判断
- MQ延迟队列 基于消息死信队列实现 支持集群,分布式,高并发场景;缺点:引入额外的消息队列,增加项目的部署和维护的复杂度。
- HashedWheelTimer 基于Netty提供的工具类HashedWheelTimer HashedWheelTimer 是使用定时轮实现的,定时轮其实就是一种环型的数据结构,可以把它想象成一个时钟, 分成了许多格子,每个格子代表一定的时间,在这个格子上用一个链表来保存要执行的超时任务,同时有一个指针一格一格的走,走到那个格子时就执行格子对应的延迟任务,
其中前三种Timer、DelayQueue、ScheduledThreadPoolExecutor实现比较简单,只不过只适用于单体应用,任务数据都在内存中,在系统崩溃后数据丢失;后两张实现相对复杂,并且需要依赖于第三方应用,在系统整体结构上更加复杂且消耗更多资源,但能支持分布式系统,且有较高的容错性。
示例
定义延迟任务对象:
@Getter
public class DelayTask implements Serializable{
private static final long serialVersionUID = -5062977578344039366L;
private long delaySeconds;
private TaskExecute taskExecute;
public DelayTask(long delaySeconds, TaskExecute taskExecute) {
this.delaySeconds = delaySeconds;
this.taskExecute = taskExecute;
}
/**
*
*/
public void execute(){
taskExecute.run();
}
public interface TaskExecute extends Runnable, Serializable {
}
}
调度器:
public interface ScheduleTrigger {
/**
* 延迟任务调度
* @param delayTask
*/
void schedule(DelayTask delayTask);
}
- Timer
public class JavaTrigger implements ScheduleTrigger{
private Timer timer;
public JavaTimer(){
this.timer = new Timer();
}
/**
*
* @param delayTask
*/
public void schedule(DelayTask delayTask){
timer.schedule(buildTimerTask(delayTask.getTaskExecute()), toMillis(delayTask.getDelaySeconds()));
}
private TimerTask buildTimerTask(Runnable runnable){
return new TimerTask() {
@Override
public void run() {
runnable.run();
}
};
}
}
- DelayQueue
public class DelayQueueTrigger implements ScheduleTrigger{
private DelayQueue< Task > queue = new DelayQueue< >();
public DelayQueueTrigger() {
Thread thread = new Thread(() - > {
while (true) {
try {
Task task = queue.take();
if(task != null)
task.execute();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
thread.setDaemon(true);
thread.start();
}
/**
* @param delayTask
*/
public void schedule(DelayTask delayTask){
if( delayTask instanceof Task ){
queue.put((Task) delayTask);
}
}
}
class Task extends DelayTask implements Delayed{
private long execTime;
public Task(long delaySeconds, TaskExecute taskExecute) {
super(delaySeconds, taskExecute);
this.execTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delaySeconds);
}
/**
* 轮询执行该方法判断是否满足执行条件(<=0)
* 同时该返回作为等待时长
* @param unit the time unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return this.execTime - System.currentTimeMillis(); // ms
}
public long getExecTime() {
return execTime;
}
@Override
public int compareTo(Delayed other) {
if(this.getExecTime() == ((Task)other).getExecTime()){
return 0;
}
return this.getExecTime() > ((Task)other).getExecTime() ? 1: -1;
}
}
- ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor实现也是基于延迟队列BlockingQueue实现
public class ScheduledExecutorTrigger implements ScheduleTrigger{
private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
public void schedule(DelayTask delayTask){
executorService.schedule(delayTask.getTaskExecute(), delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
- Redis过期监听
需要修改redis配置文件:notify-keyspace-events Ex
public class RedisTimer{
private static final String EXPIRATION_KEY = "REDIS_EXPIRATION_KEY";
@Configuration
@Import(RedisAutoConfiguration.class)
public static class Config{
@Bean(name = "redisTemplate")
public RedisTemplate< Object, Object > redisTemplate(RedisConnectionFactory factory) {
RedisTemplate< Object, Object > template = new RedisTemplate< >();
RedisSerializer< String > keySerializer = new StringRedisSerializer();
RedisSerializer< Object > valueSerializer = new ObjectRedisSerializer();
template.setConnectionFactory(factory);
template.setKeySerializer(keySerializer);
template.setValueSerializer(valueSerializer);
return template;
}
/**
* 消息监听器容器bean
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
@Bean
public RedisKeyExpirationListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer){
RedisKeyExpirationListener redisKeyExpirationListener = new RedisKeyExpirationListener(redisMessageListenerContainer);
redisKeyExpirationListener.setContext(context());
return redisKeyExpirationListener;
}
@Bean
public Context context(){
return new Context();
}
@Bean
public RedisTrigger redisTrigger(RedisTemplate redisTemplate){
return new RedisTrigger(redisTemplate, context());
}
class ObjectRedisSerializer implements RedisSerializer{
@Override
public byte[] serialize(Object o) throws SerializationException {
return SerializeUtils.serialize(o);
}
@Override
public Object deserialize(byte[] bytes) throws SerializationException {
return SerializeUtils.deserialize(bytes);
}
}
}
public static class RedisTrigger implements ScheduleTrigger{
private RedisTemplate redisTemplate;
private Context context;
public RedisTrigger(RedisTemplate redisTemplate, Context context){
this.redisTemplate = redisTemplate;
this.context = context;
}
public void schedule(DelayTask delayTask){
context.put(EXPIRATION_KEY, delayTask);
redisTemplate.opsForValue().set(EXPIRATION_KEY, delayTask, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
@Slf4j
public static class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
private Context context;
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 这里没法拿到过期值
* @param message never {@literal null}.
*/
@SneakyThrows
@Override
public void doHandleMessage(Message message) {
try {
String topic = new String(message.getChannel(), "utf-8");
String key = new String(message.getBody(), "utf-8");
if (EXPIRATION_KEY.equals(key)) {
Object object = context.get(EXPIRATION_KEY);
if( object instanceof DelayTask ){
log.info("redis key[{}] 过期回调", key);
((DelayTask) object).execute();
}
}
} catch (Exception e) {
log.error("处理Redis延迟任务异常:{}", e.getMessage() ,e);
}
}
public void setContext(Context context) {
this.context = context;
}
}
public static class Context{
private Map< String,Object > context = new ConcurrentHashMap< >();
public void put(String key, Object value){
context.put(key, value);
}
public Object get(String key){
return context.get(key);
}
}
}
- MQ延迟队列
这里MQ选择的是RabbitMq,要知道在RabbitMq中是没有延迟队列的,但可以通过延迟消息插件rabbitmq_delayed_message_exchange实现,另外一种是基于死信来实现。
什么时候消息进入死信?
- 1)消息消费方调用了basicNack() 或 basicReject(),并且参数都是 requeue = false,则消息会路由进死信队列
- 2)消息消费过期,过了TTL(消息、或队列设置超时时间) 存活时间,就是消费方在 TTL 时间之内没有消费,则消息会路由进死信队列
- 3)队列设置了x-max-length 最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的消息会路由进死信队列
public class RabbitTimer{
@Configuration
@Import(RabbitAutoConfiguration.class)
public static class Config{
static final String TTL_EXCHANGE_FOR_SCHEDULE = "TTL_EXCHANGE_FOR_SCHEDULE";
static final String TTL_QUEUE_FOR_SCHEDULE = "TTL_QUEUE_FOR_SCHEDULE";
static final String TTL_ROUTING_KEY_FOR_SCHEDULE = "TTL_ROUTING_KEY_FOR_SCHEDULE";
static final String COMMON_QUEUE_FOR_SCHEDULE = "COMMON_QUEUE_FOR_SCHEDULE";
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable(TTL_QUEUE_FOR_SCHEDULE).build();
}
@Bean
public Exchange ttlExchange(){
return ExchangeBuilder.directExchange(TTL_EXCHANGE_FOR_SCHEDULE).build();
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with(TTL_ROUTING_KEY_FOR_SCHEDULE).noargs();
}
@Bean
public Queue commonQueue(){
return QueueBuilder.durable(COMMON_QUEUE_FOR_SCHEDULE)
.deadLetterExchange(TTL_EXCHANGE_FOR_SCHEDULE)
.deadLetterRoutingKey(TTL_ROUTING_KEY_FOR_SCHEDULE)
.build();
}
@Bean
public TtlMessageConsumer ttlMessageConsumer(){
return new TtlMessageConsumer();
}
@Bean
public RabbitTrigger rabbitTrigger(RabbitTemplate rabbitTemplate){
return new RabbitTrigger(rabbitTemplate);
}
}
@Slf4j
@RabbitListener(queues=TTL_QUEUE_FOR_SCHEDULE)
public static class TtlMessageConsumer{
@RabbitHandler
public void handle(byte [] message){
Object deserialize = SerializeUtils.deserialize(message);
if( deserialize instanceof DelayTask ){
((DelayTask) deserialize).execute();
}
}
}
public static class RabbitTrigger implements ScheduleTrigger{
@Autowired
private RabbitTemplate rabbitTemplate;
public RabbitTrigger(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void schedule(DelayTask delayTask){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration( String.valueOf(TimeUnit.SECONDS.toMillis(delayTask.getDelaySeconds())));
Message message = new Message(SerializeUtils.serialize(delayTask), messageProperties);
rabbitTemplate.send(COMMON_QUEUE_FOR_SCHEDULE, message);
}
}
}
- HashedWheelTimer
public class NettyTrigger implements ScheduleTrigger {
HashedWheelTimer timer = new HashedWheelTimer(200,
TimeUnit.MILLISECONDS,
100); // 时间轮中的槽数
/**
*
*/
@Override
public void schedule(DelayTask delayTask){
TimerTask task = timeout - > delayTask.execute();
//
timer.newTimeout(task, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
测试:
ScheduleTrigger.schedule(DelayTask delayTask);
结束语
通过几个简单的示例了解延迟队列的实现方式,可以根据实际业务场景以及应用架构做出合理的选择。
-
数据
+关注
关注
8文章
6867浏览量
88800 -
服务器
+关注
关注
12文章
9010浏览量
85163 -
内存
+关注
关注
8文章
2996浏览量
73870 -
定时器
+关注
关注
23文章
3237浏览量
114435 -
延迟
+关注
关注
1文章
70浏览量
13511
发布评论请先 登录
相关推荐
评论