0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

高并发场景下请求合并

jf_ro2CN3Fa 来源:CSDN 2023-10-09 16:05 次阅读

前言

请求合并到底有什么意义呢?我们来看下图。

4c4d5f44-666c-11ee-939d-92fbcf53809c.png

假设我们3个用户(用户id分别是1、2、3),现在他们都要查询自己的基本信息,请求到服务器,服务器端请求数据库,发出3次请求。我们都知道数据库连接资源是相当宝贵的,那么我们怎么尽可能节省连接资源呢?

这里把数据库换成被调用的远程服务,也是同样的道理。

我们改变下思路,如下图所示。

4c5f409c-666c-11ee-939d-92fbcf53809c.png

我们在服务器端把请求合并,只发出一条SQL查询数据库,数据库返回后,服务器端处理返回数据,根据一个唯一请求ID,把数据分组,返回给对应用户。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

技术手段

  • LinkedBlockQueue 阻塞队列
  • ScheduledThreadPoolExecutor 定时任务线程池
  • CompleteableFuture future 阻塞机制(Java 8 的 CompletableFuture 并没有 timeout 机制,后面优化,使用了队列替代)

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

代码实现

查询用户的代码

publicinterfaceUserService{

MapqueryUserByIdBatch(ListuserReqs);
}
@Service
publicclassUserServiceImplimplementsUserService{

@Resource
privateUsersMapperusersMapper;

@Override
publicMapqueryUserByIdBatch(ListuserReqs){
//全部参数
ListuserIds=userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
QueryWrapperqueryWrapper=newQueryWrapper<>();
//用in语句合并成一条SQL,避免多次请求数据库的IO
queryWrapper.in("id",userIds);
Listusers=usersMapper.selectList(queryWrapper);
Map>userGroup=users.stream().collect(Collectors.groupingBy(Users::getId));
HashMapresult=newHashMap<>();
userReqs.forEach(val->{
ListusersList=userGroup.get(val.getUserId());
if(!CollectionUtils.isEmpty(usersList)){
result.put(val.getRequestId(),usersList.get(0));
}else{
//表示没数据
result.put(val.getRequestId(),null);
}
});
returnresult;
}
}

合并请求的实现

packagecom.springboot.sample.service.impl;

importcom.springboot.sample.bean.Users;
importcom.springboot.sample.service.UserService;
importorg.springframework.stereotype.Service;

importjavax.annotation.PostConstruct;
importjavax.annotation.Resource;
importjava.util.*;
importjava.util.concurrent.*;

/***
*zzq
*包装成批量执行的地方
**/
@Service
publicclassUserWrapBatchService{
@Resource
privateUserServiceuserService;

/**
*最大任务数
**/
publicstaticintMAX_TASK_NUM=100;


/**
*请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
*CompletableFuture将处理结果返回
*/
publicclassRequest{
//请求id唯一
StringrequestId;
//参数
LonguserId;
//TODOJava8的CompletableFuture并没有timeout机制
CompletableFuturecompletableFuture;

publicStringgetRequestId(){
returnrequestId;
}

publicvoidsetRequestId(StringrequestId){
this.requestId=requestId;
}

publicLonggetUserId(){
returnuserId;
}

publicvoidsetUserId(LonguserId){
this.userId=userId;
}

publicCompletableFuturegetCompletableFuture(){
returncompletableFuture;
}

publicvoidsetCompletableFuture(CompletableFuturecompletableFuture){
this.completableFuture=completableFuture;
}
}

/*
LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
LinkedBlockingQueue与ArrayBlockingQueue的区别
ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
*/
privatefinalQueuequeue=newLinkedBlockingQueue();

@PostConstruct
publicvoidinit(){
//定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(1);

scheduledExecutorService.scheduleAtFixedRate(()->{
intsize=queue.size();
//如果队列没数据,表示这段时间没有请求,直接返回
if(size==0){
return;
}
Listlist=newArrayList<>();
System.out.println("合并了["+size+"]个请求");
//将队列的请求消费到一个集合保存
for(inti=0;i< size; i++) {
                //后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
if(i< MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
ListuserReqs=newArrayList<>();
for(Requestrequest:list){
userReqs.add(request);
}
//将参数传入service处理,这里是本地服务,也可以把userService看成RPC之类的远程调用
Mapresponse=userService.queryUserByIdBatch(userReqs);
//将处理结果返回各自的请求
for(Requestrequest:list){
Usersresult=response.get(request.requestId);
request.completableFuture.complete(result);//completableFuture.complete方法完成赋值,这一步执行完毕,下面future.get()阻塞的请求可以继续执行了
}
},100,10,TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性执行schedule是延迟执行initialDelay是初始延迟period是周期间隔后面是单位
//这里我写的是初始化后100毫秒后执行,周期性执行10毫秒执行一次
}

publicUsersqueryUser(LonguserId){
Requestrequest=newRequest();
//这里用UUID做请求id
request.requestId=UUID.randomUUID().toString().replace("-","");
request.userId=userId;
CompletableFuturefuture=newCompletableFuture<>();
request.completableFuture=future;
//将对象传入队列
queue.offer(request);
//如果这时候没完成赋值,那么就会阻塞,直到能够拿到值
try{
returnfuture.get();
}catch(InterruptedExceptione){
e.printStackTrace();
}catch(ExecutionExceptione){
e.printStackTrace();
}
returnnull;
}
}

控制层调用

/***
*请求合并
**/
@RequestMapping("/merge")
publicCallablemerge(LonguserId){
returnnewCallable(){
@Override
publicUserscall()throwsException{
returnuserBatchService.queryUser(userId);
}
};
}

Callable是什么可以参考:

  • https://blog.csdn.net/baidu_19473529/article/details/123596792

模拟高并发查询的代码

packagecom.springboot.sample;

importorg.springframework.web.client.RestTemplate;

importjava.util.Random;
importjava.util.concurrent.CountDownLatch;

publicclassTestBatch{
privatestaticintthreadCount=30;

privatefinalstaticCountDownLatchCOUNT_DOWN_LATCH=newCountDownLatch(threadCount);//为保证30个线程同时并发运行

privatestaticfinalRestTemplaterestTemplate=newRestTemplate();

publicstaticvoidmain(String[]args){


for(inti=0;i< threadCount; i++) {//循环开30个线程
newThread(newRunnable(){
publicvoidrun(){
COUNT_DOWN_LATCH.countDown();//每次减一
try{
COUNT_DOWN_LATCH.await();//此处等待状态,为了让30个线程同时进行
}catch(InterruptedExceptione){
e.printStackTrace();
}

for(intj=1;j<= 3;j++){
intparam=newRandom().nextInt(4);
if(param<=0){
param++;
}
StringresponseBody=restTemplate.getForObject("http://localhost:8080/asyncAndMerge/merge?userId="+param,String.class);
System.out.println(Thread.currentThread().getName()+"参数"+param+"返回值"+responseBody);
}
}
}).start();

}
}
}

测试效果

4c6bfaa8-666c-11ee-939d-92fbcf53809c.png4c7c9dfe-666c-11ee-939d-92fbcf53809c.png

要注意的问题

  • Java 8 的 CompletableFuture 并没有 timeout 机制
  • 后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行(本例中加了MAX_TASK_NUM判断)

使用队列的超时解决Java 8 的 CompletableFuture 并没有 timeout 机制

核心代码

packagecom.springboot.sample.service.impl;

importcom.springboot.sample.bean.Users;
importcom.springboot.sample.service.UserService;
importorg.springframework.stereotype.Service;

importjavax.annotation.PostConstruct;
importjavax.annotation.Resource;
importjava.util.*;
importjava.util.concurrent.*;

/***
*zzq
*包装成批量执行的地方,使用queue解决超时问题
**/
@Service
publicclassUserWrapBatchQueueService{
@Resource
privateUserServiceuserService;

/**
*最大任务数
**/
publicstaticintMAX_TASK_NUM=100;


/**
*请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
*CompletableFuture将处理结果返回
*/
publicclassRequest{
//请求id
StringrequestId;

//参数
LonguserId;
//队列,这个有超时机制
LinkedBlockingQueueusersQueue;


publicStringgetRequestId(){
returnrequestId;
}

publicvoidsetRequestId(StringrequestId){
this.requestId=requestId;
}

publicLonggetUserId(){
returnuserId;
}

publicvoidsetUserId(LonguserId){
this.userId=userId;
}

publicLinkedBlockingQueuegetUsersQueue(){
returnusersQueue;
}

publicvoidsetUsersQueue(LinkedBlockingQueueusersQueue){
this.usersQueue=usersQueue;
}
}

/*
LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
LinkedBlockingQueue与ArrayBlockingQueue的区别
ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
*/
privatefinalQueuequeue=newLinkedBlockingQueue();

@PostConstruct
publicvoidinit(){
//定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(1);

scheduledExecutorService.scheduleAtFixedRate(()->{
intsize=queue.size();
//如果队列没数据,表示这段时间没有请求,直接返回
if(size==0){
return;
}
Listlist=newArrayList<>();
System.out.println("合并了["+size+"]个请求");
//将队列的请求消费到一个集合保存
for(inti=0;i< size; i++) {
                //后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
if(i< MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
ListuserReqs=newArrayList<>();
for(Requestrequest:list){
userReqs.add(request);
}
//将参数传入service处理,这里是本地服务,也可以把userService看成RPC之类的远程调用
Mapresponse=userService.queryUserByIdBatchQueue(userReqs);
for(RequestuserReq:userReqs){
//这里再把结果放到队列里
Usersusers=response.get(userReq.getRequestId());
userReq.usersQueue.offer(users);
}

},100,10,TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性执行schedule是延迟执行initialDelay是初始延迟period是周期间隔后面是单位
//这里我写的是初始化后100毫秒后执行,周期性执行10毫秒执行一次
}

publicUsersqueryUser(LonguserId){
Requestrequest=newRequest();
//这里用UUID做请求id
request.requestId=UUID.randomUUID().toString().replace("-","");
request.userId=userId;
LinkedBlockingQueueusersQueue=newLinkedBlockingQueue<>();
request.usersQueue=usersQueue;
//将对象传入队列
queue.offer(request);
//取出元素时,如果队列为空,给定阻塞多少毫秒再队列取值,这里是3秒
try{
returnusersQueue.poll(3000,TimeUnit.MILLISECONDS);
}catch(InterruptedExceptione){
e.printStackTrace();
}
returnnull;
}
}
...省略..

@Override
publicMapqueryUserByIdBatchQueue(ListuserReqs){
//全部参数
ListuserIds=userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
QueryWrapperqueryWrapper=newQueryWrapper<>();
//用in语句合并成一条SQL,避免多次请求数据库的IO
queryWrapper.in("id",userIds);
Listusers=usersMapper.selectList(queryWrapper);
Map>userGroup=users.stream().collect(Collectors.groupingBy(Users::getId));
HashMapresult=newHashMap<>();
//数据分组
userReqs.forEach(val->{
ListusersList=userGroup.get(val.getUserId());
if(!CollectionUtils.isEmpty(usersList)){
result.put(val.getRequestId(),usersList.get(0));
}else{
//表示没数据,这里要new,不然加入队列会空指针
result.put(val.getRequestId(),newUsers());
}
});
returnresult;
}

...省略...

小结

请求合并,批量的办法能大幅节省被调用系统的连接资源,本例是以数据库为例,其他RPC调用也是类似的道理。缺点就是请求的时间在执行实际的逻辑之前增加了等待时间,不适合低并发的场景。

代码地址

  • https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5

参考

  • https://www.cnblogs.com/oyjg/p/13099998.html


声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 服务器
    +关注

    关注

    12

    文章

    9295

    浏览量

    85954
  • JAVA
    +关注

    关注

    19

    文章

    2974

    浏览量

    105048
  • 数据库
    +关注

    关注

    7

    文章

    3845

    浏览量

    64630

原文标题:高并发场景下请求合并

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    从服务端视角看并发难题

    `所谓服务器大流量并发指的是:在同时或极短时间内,有大量的请求到达服务端,每个请求都需要服务端耗费资源进行处理,并做出相应的反馈。 从服务端视角看
    发表于 11-02 15:11

    如何去实现一种基于SpringMVC的电商并发秒杀系统设计

    参考博客Java并发秒杀系统API目录业务场景要解决的问题Redis的使用业务场景首页倒计时秒杀活动,抢购商品要解决的问题
    发表于 01-03 07:50

    P2P流媒体系统中并发请求的数据分发算法

    大规模并发请求是流媒体直播系统面临的一个挑战,也是视频点播系统中亟待解决的一个问题。本文针对相同数据的并发请求问题,提出了一种高效,低带宽消耗、低延迟的数据
    发表于 12-30 14:21 14次下载

    缓存一致性问题及缓存并发问题

    并发场景,如果某一个key被并发访问,没有被命中,出于对容错性考虑,会尝试去从后端数据库
    的头像 发表于 08-09 15:52 5289次阅读
    缓存一致性问题及缓存<b class='flag-5'>并发</b>问题

    大型网站如何解决并发带来的问题

    在不使用消息队列服务器的时候,用户的请求数据直接写入数据库,在并发的情况数据库压力剧增,使得响应速度变慢。
    发表于 06-28 17:07 2470次阅读
    大型网站如何解决<b class='flag-5'>高</b><b class='flag-5'>并发</b>带来的问题

    解密并发业务场景典型的秒杀系统的架构

    中,就更别提如何构建并发系统了! 究竟什么样的系统算是并发系统?今天,我们就一起解密并发
    的头像 发表于 11-17 10:32 2286次阅读
    解密<b class='flag-5'>高</b><b class='flag-5'>并发</b>业务<b class='flag-5'>场景</b><b class='flag-5'>下</b>典型的秒杀系统的架构

    效率加倍,并发场景的接口请求合并方案

    假设我们3个用户(用户id分别是1、2、3),现在他们都要查询自己的基本信息,请求到服务器,服务器端请求数据库,发出3次请求。我们都知道数据库连接资源是相当宝贵的,那么我们怎么尽可能节省连接资源呢?
    的头像 发表于 01-13 10:09 900次阅读

    通过秒杀商品来模拟并发场景

    并发场景在现场的日常工作中很常见,特别是在互联网公司中,这篇文章就来通过秒杀商品来模拟并发场景
    的头像 发表于 02-07 10:47 459次阅读

    为什么Nginx可以支持并发

    先说答案,Nginx之所以支持并发,是因为它是基于epoll的异步及非阻塞的事件驱动模型。 在这个模型,Nginx服务端可以同一时间接收成千上万个客户端请求而不阻塞。这是因为Ngi
    的头像 发表于 02-13 10:48 1645次阅读

    Linux 6.3内核的合并窗口已开启

    Linus 认为,如果你不能解释清楚一个合并请求,那么就不要提交,这是很简单的道理。如果不解释提交合并请求的原因,那就是在生产垃圾。在这种情况,Linus 觉得这种合并请求根本就不应
    的头像 发表于 02-27 10:04 615次阅读

    服务器的并发能力如何提升?

    服务器的并发能力如何提升? 服务器并发能力体现着服务器在单位时间内的很强数据处理能力,一般来说,如果企业的互联网业务需要面对大量的同时在线请求
    的头像 发表于 03-17 17:07 1047次阅读

    工业物联网平台如何应对并发应用场景

    面对的巨大挑战。对此,数之能提供并发、官翻机接入的工业物联网平台,可以适应并发场景应用需求。
    的头像 发表于 09-06 14:21 661次阅读

    redis并发能力直接相关概念有哪些

    Redis是一种高性能的开源内存数据库,具有出色的并发能力。为了实现并发,需要有一些相关概念和技术。下面是关于Redis并发能力的详细解
    的头像 发表于 12-05 10:34 1252次阅读

    什么场景需要jvm调优

    JVM调优是指对Java虚拟机进行性能优化和资源管理,以提高应用程序的运行效率和吞吐量。JVM调优的场景有很多,下面将详细介绍各种不同的场景并发
    的头像 发表于 12-05 11:14 1501次阅读

    并发物联网云平台是什么

    并发物联网云平台是一种能够处理大量设备同时连接并进行数据交换的云计算平台。这种平台通常被设计用来应对来自数以万计甚至数十亿计的物联网设备的并发请求,保证系统的稳定性和响应速度。 首先
    的头像 发表于 08-13 13:50 293次阅读