0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Redis工具集的实现和使用

jf_ro2CN3Fa 来源:稀土掘金技术社区 2023-12-03 17:32 次阅读

1 前言

Redis 基本上是互联网公司必备的工具了,Redis的应用场景实在太多了,但是有很多相似的功能如果每个项目都要实现一遍就显得太麻烦了,所以为了方便,我打算开发一个基于 Redis 的工具集,尽量做到开箱即用。

2 目前实现功能

这个工具集并没有开发完成,实现了部分功能,如下图

8577182e-91bd-11ee-939d-92fbcf53809c.png

简单介绍下已经实现的模块:

common : 整个项目公共模块,比如AOP工具等;

delay: Redis实现的延迟队列;

lock: Redis实现的分布式锁;

mq: Redis实现消息队列;

query: Redis实现分页模糊查询;

web: Redis实现web相关的功能;

duplicate :防止重复提交;

以上的这些模块都是已经实现的了,还有 社交、限流、幂等相关功能后面会陆续实现。

3 如何使用

1.引入 Maven 依赖

目前可以下载代码上传到自己的私服或者本地仓库,后面会推到 Maven 中央仓库


cn.org.wangchangjiu
redis-util-spring-boot-starter
1.0.0-SNAPSHOT

2.配置文件(application.yaml)开启各模块功能开关

redis:
util:
mq:
enable:true
delay:
enable:true

3.实现消息发送者

MQ消息发送:

858caa4a-91bd-11ee-939d-92fbcf53809c.png

延迟消息发送:

85a23932-91bd-11ee-939d-92fbcf53809c.png

4.实现消息监听器

MQ消息监听器:

85c4f9fe-91bd-11ee-939d-92fbcf53809c.png

延迟消息监听器:

85d4b9d4-91bd-11ee-939d-92fbcf53809c.png

4 MQ和delay实现细节

MQ实现细节

容器启动时,简单来说就是通过springboot自动装配,创建一些Bean,如下图:

85f415c2-91bd-11ee-939d-92fbcf53809c.png

值得注意的是,springboot3.X 自动装配方式有点变化,需要创建文件 META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports 文件,文件内容就直接写 自动配置类

8611a2cc-91bd-11ee-939d-92fbcf53809c.png

RedisUtilAutoConfiguration 主自动装配类会 import 各个模块的自动装配类:

8621012c-91bd-11ee-939d-92fbcf53809c.png

我们以 RedisStreamAutoConfiguration 为例:

864536d2-91bd-11ee-939d-92fbcf53809c.png

该装配类生效需要显示打开,然后就是创建各种Bean。

最主要的Bean有:

RedisMessageConsumerManager:

865c99da-91bd-11ee-939d-92fbcf53809c.png

该Bean实现了 BeanPostProcessor 接口,主要作用是,获取被注解 RedisMessageListener 修饰的方法,把信息封装在 RedisMessageConsumerContainer 对象里,方便后面反射调用。

8668f98c-91bd-11ee-939d-92fbcf53809c.png

StreamMessageListenerContainer:

这个Bean主要是做 redis MQ 的配置,比如配置:一次最多获取多少条消息、没有消息时阻塞时间、执行任务的executor、错误处理器、以及消费组、是否自动ACK等配置,具体代码如下:

@Bean(initMethod="start",destroyMethod="stop")
@DependsOn("redisMessageConsumerManager")
@ConditionalOnMissingBean
publicStreamMessageListenerContainer>streamMessageListenerContainer(@AutowiredRedisMessageConsumerManagerredisMessageConsumerManager,
@AutowiredRedisConnectionFactoryredisConnectionFactory,
@AutowiredErrorHandlererrorHandler){
MyRedisStreamProperties.Optionsoptions=myRedisStreamProperties.getOptions();
StreamMessageListenerContainer.StreamMessageListenerContainerOptions>containerOptions=
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
//一次最多获取多少条消息
.batchSize(options.getBatchSize())
//运行Stream的polltask
.executor(getStreamMessageListenerExecutor())
//Stream中没有消息时,阻塞多长时间,需要比`spring.redis.timeout`的时间小
.pollTimeout(options.getPollTimeout())
//获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理
.errorHandler(errorHandler)
.build();

StreamMessageListenerContainer>streamMessageListenerContainer=
StreamMessageListenerContainer.create(redisConnectionFactory,containerOptions);

//获取被RedisMessageListener注解修饰的bean
MapconsumerContainerGroups=
redisMessageConsumerManager.getConsumerContainerGroups();

//循环遍历,创建消费组
consumerContainerGroups.forEach((groupQueue,redisMessageConsumerContainer)->{
String[]groupQueues=groupQueue.split("#");

//创建消费组
createGroups(groupQueues);

RedisMessageListenerredisMessageListener=redisMessageConsumerContainer.getRedisMessageListener();
if(!redisMessageListener.useGroup()){
//独立消费不使用组
streamMessageListenerContainer.receive(StreamOffset.fromStart(groupQueues[1]),newDefaultGroupStreamListener(redisMessageConsumerContainer));
}else{
//消费组消费
if(redisMessageListener.autoAck()){
//自动ACK
streamMessageListenerContainer.receiveAutoAck(Consumer.from(groupQueues[0],"consumer:"+UUID.randomUUID()),
StreamOffset.create(groupQueues[1],ReadOffset.lastConsumed()),newDefaultGroupStreamListener(redisMessageConsumerContainer));
}else{
//手动ACK
streamMessageListenerContainer.receive(Consumer.from(groupQueues[0],"consumer:"+UUID.randomUUID()),
StreamOffset.create(groupQueues[1],ReadOffset.lastConsumed()),newDefaultGroupStreamListener(redisMessageConsumerContainer));
}
}
});
returnstreamMessageListenerContainer;
}

/**
*创建消费组
*@paramgroupQueues
*/
privatevoidcreateGroups(String[]groupQueues){
//判断是否存在队列Key
if(stringRedisTemplate.hasKey(groupQueues[1])){
//获取消费组没有则创建
StreamInfo.XInfoGroupsgroups=stringRedisTemplate.opsForStream().groups(groupQueues[1]);
if(groups.isEmpty()){
stringRedisTemplate.opsForStream().createGroup(groupQueues[1],groupQueues[0]);
}else{
AtomicBooleanexists=newAtomicBoolean(false);
groups.forEach(xInfoGroup->{
if(xInfoGroup.groupName().equals(groupQueues[0])){
exists.set(true);
}
});
if(!exists.get()){
stringRedisTemplate.opsForStream().createGroup(groupQueues[1],groupQueues[0]);
}
}
}else{
stringRedisTemplate.opsForStream().createGroup(groupQueues[1],groupQueues[0]);
}
}

//todo后面这个线程池也可以交由用户配置
privateExecutorgetStreamMessageListenerExecutor(){
AtomicIntegerindex=newAtomicInteger(1);
intprocessors=Runtime.getRuntime().availableProcessors();
ThreadPoolExecutorexecutor=newThreadPoolExecutor(processors,processors,0,TimeUnit.SECONDS,
newLinkedBlockingDeque<>(),r->{
Threadthread=newThread(r);
thread.setName("async-stream-consumer-"+index.getAndIncrement());
thread.setDaemon(true);
returnthread;
});
returnexecutor;
}

发送消息流程:

8680490c-91bd-11ee-939d-92fbcf53809c.png

redis 延迟队列的实现原理和这个差不多,主要是 redission延迟队列 + 自定义注解 + 反射,代码都差不多。

审核编辑:汤梓红
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 代码
    +关注

    关注

    30

    文章

    4741

    浏览量

    68324
  • 队列
    +关注

    关注

    1

    文章

    46

    浏览量

    10887
  • Redis
    +关注

    关注

    0

    文章

    371

    浏览量

    10843

原文标题:为了方便开发,我打算实现一个Redis 工具集

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    Redis的LRU实现和应用

    在编程中,计数器是一种基本但强大的工具,用于跟踪和管理数据和资源。本文将深入探讨不同类型的计数器的应用,从Redis的LRU(最近最少使用)缓存淘汰算法的实现,到如何在内存受限的环境中有效地使用计数器,再到普通计数器的巧妙应用。
    的头像 发表于 12-15 09:24 567次阅读

    Redis Stream应用案例

    的基本使用介绍和设计理念可以看我之前的一篇文章(Redis Stream简介)。Redis Stream本质上是在Redis内核上(非Redis Module)
    发表于 06-26 17:15

    centos7 redis的安装

    centos7 redis 使用,查看Redis工具(安装、添加权限验证、添加开机自启)
    发表于 05-14 17:13

    Redis Cluster的基本原理及实现细节

    Redis Cluster的基本原理和架构 Redis Cluster是分布式Redis实现。随着Redis版本的更替,以及各种已知bug
    发表于 09-28 19:09 0次下载
    <b class='flag-5'>Redis</b> Cluster的基本原理及<b class='flag-5'>实现</b>细节

    Java 使用Redis缓存工具的详细解说

    本文是关于Java 使用Redis缓存工具的详细解说。详细步骤请看下文
    的头像 发表于 02-09 14:10 7862次阅读
    Java 使用<b class='flag-5'>Redis</b>缓存<b class='flag-5'>工具</b>的详细解说

    Windows环境下使用Redis缓存工具的图文详细方法

    Windows环境下使用Redis缓存工具的图文详细方法。Redis 是一个高性能的key-value数据库。redis的出现,很大程度补偿了memcached这类key/value存
    的头像 发表于 02-09 14:25 4743次阅读
    Windows环境下使用<b class='flag-5'>Redis</b>缓存<b class='flag-5'>工具</b>的图文详细方法

    redis设计与实现

    redis
    发表于 06-20 14:44 0次下载

    谈谈Redis怎样配置实现主从复制?

    之前总结过redis的持久化机制:深度剖析Redis持久化机制,持久化机制主要解决redis数据单机备份问题;redis的高可用需要考虑数据的多机备份,多机备份通过主从复制来
    发表于 01-31 11:31 639次阅读

    Redis实现限流的三种方式分享

    当然,限流有许多种实现的方式,Redis具有很强大的功能,我用Redis实践了三种的实现方式,可以较为简单的实现其方式。
    的头像 发表于 02-22 09:52 1032次阅读

    Redis官方可视化工具功能强大

    RedisInsight 是一个高颜值,直观高效的 Redis GUI 管理工具,它可以对 Redis 的内存、连接数、命中率以及正常运行时间进行监控
    的头像 发表于 04-23 09:55 891次阅读
    <b class='flag-5'>Redis</b>官方可视化<b class='flag-5'>工具</b>功能强大

    Java redis锁怎么实现

    在Java中实现Redis锁涉及到以下几个方面:Redis的安装配置、Redis连接池的使用、Redis数据结构的选择、
    的头像 发表于 12-04 10:47 1112次阅读

    redis集群性能测试工具有哪些

    Redis是一种高性能的内存键值存储系统,它被广泛应用于各种互联网应用和大规模的数据存储中。为了评估Redis在不同场景下的性能,我们需要使用一些性能测试工具来对Redis集群进行基准
    的头像 发表于 12-04 11:36 753次阅读

    redis hash底层实现原理

    数据结构是如何实现的呢?本文将详细介绍Redis哈希底层的实现原理。 在Redis中,每个哈希都是由一个类似于字典(Dictionary)的结构实现
    的头像 发表于 12-04 16:27 557次阅读

    redis持久化机制和如何实现持久化

    File)。 RDB是Redis默认采用的持久化方式,它通过在指定时间间隔内将内存中的数据快照写入到磁盘的二进制文件中,实现数据的持久化。RDB方式具有高效和紧凑的特点,可以周期性地将数据
    的头像 发表于 12-05 10:02 437次阅读

    redis数据结构的底层实现

    Redis是一种内存键值数据库,常用于缓存、消息队列、实时数据分析等场景。它的高性能得益于其精心设计的数据结构和底层实现。本文将详细介绍Redis常用的数据结构和它们的底层实现
    的头像 发表于 12-05 10:14 589次阅读