0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

效率加倍,高并发场景下的接口请求合并方案

jf_ro2CN3Fa 来源:芋道源码 2023-01-13 10:09 次阅读


前言

请求合并到底有什么意义呢?我们来看下图。

281af012-92e3-11ed-bfe3-dac502259ad0.png

假设我们3个用户(用户id分别是1、2、3),现在他们都要查询自己的基本信息,请求到服务器,服务器端请求数据库,发出3次请求。我们都知道数据库连接资源是相当宝贵的,那么我们怎么尽可能节省连接资源呢?

这里把数据库换成被调用的远程服务,也是同样的道理。

我们改变下思路,如下图所示。

28311838-92e3-11ed-bfe3-dac502259ad0.png

我们在服务器端把请求合并,只发出一条SQL查询数据库,数据库返回后,服务器端处理返回数据,根据一个唯一请求ID,把数据分组,返回给对应用户。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

技术手段

  • LinkedBlockQueue 阻塞队列
  • ScheduledThreadPoolExecutor 定时任务线程池
  • CompleteableFuture future 阻塞机制(Java 8 的 CompletableFuture 并没有 timeout 机制,后面优化,使用了队列替代)

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

代码实现

查询用户的代码

publicinterfaceUserService{

MapqueryUserByIdBatch(ListuserReqs);
}
@Service
publicclassUserServiceImplimplementsUserService{

@Resource
privateUsersMapperusersMapper;

@Override
publicMapqueryUserByIdBatch(ListuserReqs){
//全部参数
ListuserIds=userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
QueryWrapperqueryWrapper=newQueryWrapper<>();
//用in语句合并成一条SQL,避免多次请求数据库的IO
queryWrapper.in("id",userIds);
Listusers=usersMapper.selectList(queryWrapper);
Map>userGroup=users.stream().collect(Collectors.groupingBy(Users::getId));
HashMapresult=newHashMap<>();
userReqs.forEach(val->{
ListusersList=userGroup.get(val.getUserId());
if(!CollectionUtils.isEmpty(usersList)){
result.put(val.getRequestId(),usersList.get(0));
}else{
//表示没数据
result.put(val.getRequestId(),null);
}
});
returnresult;
}
}

合并请求的实现

packagecom.springboot.sample.service.impl;

importcom.springboot.sample.bean.Users;
importcom.springboot.sample.service.UserService;
importorg.springframework.stereotype.Service;

importjavax.annotation.PostConstruct;
importjavax.annotation.Resource;
importjava.util.*;
importjava.util.concurrent.*;

/***
*zzq
*包装成批量执行的地方
**/
@Service
publicclassUserWrapBatchService{
@Resource
privateUserServiceuserService;

/**
*最大任务数
**/
publicstaticintMAX_TASK_NUM=100;


/**
*请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
*CompletableFuture将处理结果返回
*/
publicclassRequest{
//请求id唯一
StringrequestId;
//参数
LonguserId;
//TODOJava8的CompletableFuture并没有timeout机制
CompletableFuturecompletableFuture;

publicStringgetRequestId(){
returnrequestId;
}

publicvoidsetRequestId(StringrequestId){
this.requestId=requestId;
}

publicLonggetUserId(){
returnuserId;
}

publicvoidsetUserId(LonguserId){
this.userId=userId;
}

publicCompletableFuturegetCompletableFuture(){
returncompletableFuture;
}

publicvoidsetCompletableFuture(CompletableFuturecompletableFuture){
this.completableFuture=completableFuture;
}
}

/*
LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
LinkedBlockingQueue与ArrayBlockingQueue的区别
ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
*/
privatefinalQueuequeue=newLinkedBlockingQueue();

@PostConstruct
publicvoidinit(){
//定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(1);

scheduledExecutorService.scheduleAtFixedRate(()->{
intsize=queue.size();
//如果队列没数据,表示这段时间没有请求,直接返回
if(size==0){
return;
}
Listlist=newArrayList<>();
System.out.println("合并了["+size+"]个请求");
//将队列的请求消费到一个集合保存
for(inti=0;i< size; i++) {
                //后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
if(i< MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
ListuserReqs=newArrayList<>();
for(Requestrequest:list){
userReqs.add(request);
}
//将参数传入service处理,这里是本地服务,也可以把userService看成RPC之类的远程调用
Mapresponse=userService.queryUserByIdBatch(userReqs);
//将处理结果返回各自的请求
for(Requestrequest:list){
Usersresult=response.get(request.requestId);
request.completableFuture.complete(result);//completableFuture.complete方法完成赋值,这一步执行完毕,下面future.get()阻塞的请求可以继续执行了
}
},100,10,TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性执行schedule是延迟执行initialDelay是初始延迟period是周期间隔后面是单位
//这里我写的是初始化后100毫秒后执行,周期性执行10毫秒执行一次
}

publicUsersqueryUser(LonguserId){
Requestrequest=newRequest();
//这里用UUID做请求id
request.requestId=UUID.randomUUID().toString().replace("-","");
request.userId=userId;
CompletableFuturefuture=newCompletableFuture<>();
request.completableFuture=future;
//将对象传入队列
queue.offer(request);
//如果这时候没完成赋值,那么就会阻塞,直到能够拿到值
try{
returnfuture.get();
}catch(InterruptedExceptione){
e.printStackTrace();
}catch(ExecutionExceptione){
e.printStackTrace();
}
returnnull;
}
}

控制层调用

/***
*请求合并
**/
@RequestMapping("/merge")
publicCallablemerge(LonguserId){
returnnewCallable(){
@Override
publicUserscall()throwsException{
returnuserBatchService.queryUser(userId);
}
};
}

Callable是什么可以参考:

  • https://blog.csdn.net/baidu_19473529/article/details/123596792

模拟高并发查询的代码

packagecom.springboot.sample;

importorg.springframework.web.client.RestTemplate;

importjava.util.Random;
importjava.util.concurrent.CountDownLatch;

publicclassTestBatch{
privatestaticintthreadCount=30;

privatefinalstaticCountDownLatchCOUNT_DOWN_LATCH=newCountDownLatch(threadCount);//为保证30个线程同时并发运行

privatestaticfinalRestTemplaterestTemplate=newRestTemplate();

publicstaticvoidmain(String[]args){


for(inti=0;i< threadCount; i++) {//循环开30个线程
newThread(newRunnable(){
publicvoidrun(){
COUNT_DOWN_LATCH.countDown();//每次减一
try{
COUNT_DOWN_LATCH.await();//此处等待状态,为了让30个线程同时进行
}catch(InterruptedExceptione){
e.printStackTrace();
}

for(intj=1;j<= 3;j++){
intparam=newRandom().nextInt(4);
if(param<=0){
param++;
}
StringresponseBody=restTemplate.getForObject("http://localhost:8080/asyncAndMerge/merge?userId="+param,String.class);
System.out.println(Thread.currentThread().getName()+"参数"+param+"返回值"+responseBody);
}
}
}).start();

}
}
}

测试效果

283d32f8-92e3-11ed-bfe3-dac502259ad0.png284d3d4c-92e3-11ed-bfe3-dac502259ad0.png

要注意的问题

  • Java 8 的 CompletableFuture 并没有 timeout 机制
  • 后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行(本例中加了MAX_TASK_NUM判断)

使用队列的超时解决Java 8 的 CompletableFuture 并没有 timeout 机制

核心代码

packagecom.springboot.sample.service.impl;

importcom.springboot.sample.bean.Users;
importcom.springboot.sample.service.UserService;
importorg.springframework.stereotype.Service;

importjavax.annotation.PostConstruct;
importjavax.annotation.Resource;
importjava.util.*;
importjava.util.concurrent.*;

/***
*zzq
*包装成批量执行的地方,使用queue解决超时问题
**/
@Service
publicclassUserWrapBatchQueueService{
@Resource
privateUserServiceuserService;

/**
*最大任务数
**/
publicstaticintMAX_TASK_NUM=100;


/**
*请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
*CompletableFuture将处理结果返回
*/
publicclassRequest{
//请求id
StringrequestId;

//参数
LonguserId;
//队列,这个有超时机制
LinkedBlockingQueueusersQueue;


publicStringgetRequestId(){
returnrequestId;
}

publicvoidsetRequestId(StringrequestId){
this.requestId=requestId;
}

publicLonggetUserId(){
returnuserId;
}

publicvoidsetUserId(LonguserId){
this.userId=userId;
}

publicLinkedBlockingQueuegetUsersQueue(){
returnusersQueue;
}

publicvoidsetUsersQueue(LinkedBlockingQueueusersQueue){
this.usersQueue=usersQueue;
}
}

/*
LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
LinkedBlockingQueue与ArrayBlockingQueue的区别
ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
*/
privatefinalQueuequeue=newLinkedBlockingQueue();

@PostConstruct
publicvoidinit(){
//定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(1);

scheduledExecutorService.scheduleAtFixedRate(()->{
intsize=queue.size();
//如果队列没数据,表示这段时间没有请求,直接返回
if(size==0){
return;
}
Listlist=newArrayList<>();
System.out.println("合并了["+size+"]个请求");
//将队列的请求消费到一个集合保存
for(inti=0;i< size; i++) {
                //后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
if(i< MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
ListuserReqs=newArrayList<>();
for(Requestrequest:list){
userReqs.add(request);
}
//将参数传入service处理,这里是本地服务,也可以把userService看成RPC之类的远程调用
Mapresponse=userService.queryUserByIdBatchQueue(userReqs);
for(RequestuserReq:userReqs){
//这里再把结果放到队列里
Usersusers=response.get(userReq.getRequestId());
userReq.usersQueue.offer(users);
}

},100,10,TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性执行schedule是延迟执行initialDelay是初始延迟period是周期间隔后面是单位
//这里我写的是初始化后100毫秒后执行,周期性执行10毫秒执行一次
}

publicUsersqueryUser(LonguserId){
Requestrequest=newRequest();
//这里用UUID做请求id
request.requestId=UUID.randomUUID().toString().replace("-","");
request.userId=userId;
LinkedBlockingQueueusersQueue=newLinkedBlockingQueue<>();
request.usersQueue=usersQueue;
//将对象传入队列
queue.offer(request);
//取出元素时,如果队列为空,给定阻塞多少毫秒再队列取值,这里是3秒
try{
returnusersQueue.poll(3000,TimeUnit.MILLISECONDS);
}catch(InterruptedExceptione){
e.printStackTrace();
}
returnnull;
}
}
...省略..

@Override
publicMapqueryUserByIdBatchQueue(ListuserReqs){
//全部参数
ListuserIds=userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
QueryWrapperqueryWrapper=newQueryWrapper<>();
//用in语句合并成一条SQL,避免多次请求数据库的IO
queryWrapper.in("id",userIds);
Listusers=usersMapper.selectList(queryWrapper);
Map>userGroup=users.stream().collect(Collectors.groupingBy(Users::getId));
HashMapresult=newHashMap<>();
//数据分组
userReqs.forEach(val->{
ListusersList=userGroup.get(val.getUserId());
if(!CollectionUtils.isEmpty(usersList)){
result.put(val.getRequestId(),usersList.get(0));
}else{
//表示没数据,这里要new,不然加入队列会空指针
result.put(val.getRequestId(),newUsers());
}
});
returnresult;
}

...省略...

小结

请求合并,批量的办法能大幅节省被调用系统的连接资源,本例是以数据库为例,其他RPC调用也是类似的道理。缺点就是请求的时间在执行实际的逻辑之前增加了等待时间,不适合低并发的场景。

代码地址

  • https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5

参考

  • https://www.cnblogs.com/oyjg/p/13099998.html


审核编辑 :李倩


声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 接口
    +关注

    关注

    33

    文章

    8284

    浏览量

    150080
  • 服务器
    +关注

    关注

    12

    文章

    8744

    浏览量

    84644
  • 数据库
    +关注

    关注

    7

    文章

    3717

    浏览量

    64057

原文标题:效率加倍,高并发场景下的接口请求合并方案

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    并发物联网云平台是什么

    并发物联网云平台是一种能够处理大量设备同时连接并进行数据交换的云计算平台。这种平台通常被设计用来应对来自数以万计甚至数十亿计的物联网设备的并发请求,保证系统的稳定性和响应速度。 首先
    的头像 发表于 08-13 13:50 148次阅读

    并发系统的艺术:如何在流量洪峰中游刃有余

    尤为重要。用户对在线服务的需求和期望不断提高,系统的并发处理能力成为衡量其性能和用户体验的关键指标之一。并发系统不仅仅是大型互联网企业的专利,对于任何希望在市场中占据一席之地的公司来说,能够处理大量
    的头像 发表于 08-05 13:43 126次阅读
    <b class='flag-5'>高</b><b class='flag-5'>并发</b>系统的艺术:如何在流量洪峰中游刃有余

    为何什么risc-v芯片比arm的效率高

    并不是绝对的,而是取决于具体的应用场景、设计优化等因素。 综上所述,RISC-V芯片在某些情况可能相对于ARM架构芯片表现出更高的效率,这主要得益于RISC-V设计的模块化、开源特性和在寄存器数目、指令数目等方面的优势。然而,
    发表于 04-28 09:38

    鸿蒙OS开发实例:【工具类封装-http请求

    ;@ohos.promptAction';** **封装HTTP接口请求类,提供格式化的响应信息输出功能。 使用 DevEco Studio 3.1.1 Release 及以上版本,API 版本为 api 9 及以上
    的头像 发表于 03-27 22:32 1175次阅读
    鸿蒙OS开发实例:【工具类封装-http<b class='flag-5'>请求</b>】

    Golang根据job数量动态控制每秒协程的最大创建数量方法简析

    需求:第三方的接口,限制接口请求的QPS,每秒5次
    的头像 发表于 12-24 14:21 581次阅读

    什么场景需要jvm调优

    JVM调优是指对Java虚拟机进行性能优化和资源管理,以提高应用程序的运行效率和吞吐量。JVM调优的场景有很多,下面将详细介绍各种不同的场景
    的头像 发表于 12-05 11:14 1121次阅读

    redis并发能力直接相关概念有哪些

    Redis是一种高性能的开源内存数据库,具有出色的并发能力。为了实现并发,需要有一些相关概念和技术。下面是关于Redis并发能力的详细解
    的头像 发表于 12-05 10:34 683次阅读

    多线程并发查询oracle数据库

    数据库的原理、使用场景、实现方法以及可能遇到的问题和解决方案。 一、多线程并发查询的原理 在传统的单线程查询方式中,当一个查询请求发起时,数据库会按照顺序执行查询语句并返回结果。如果查
    的头像 发表于 11-17 14:22 3177次阅读

    Linux场景数据包是如何在协议层传输的

    所有互联网服务,均依赖于TCP/IP协议栈。懂得数据是如何在协议栈传输的,将会帮助你提升互联网程序的性能和解决TCP相关问题的能力。 我们讲述在Linux场景数据包是如何在协议层传输的。 1、发送
    的头像 发表于 11-11 11:33 983次阅读
    Linux<b class='flag-5'>场景</b><b class='flag-5'>下</b>数据包是如何在协议层传输的

    api网关 kong 教程入门

    统一权限控制、接口请求访问日志统计 安全,是保护内部服务而设计的一道屏障 开源-最大好处 当然也有一个很大的缺点,api-gw很可能成为性能瓶颈,因为所有的请求都经过这里,可以通过横向扩展和限流解决这个问题。 在众多API GATEWAY框架中,Mashape开源的高性
    的头像 发表于 11-10 11:39 590次阅读
    api网关 kong 教程入门

    服务器并发的概念

    自己调整系统的相关参数 并发的概念是什么?什么是并发? 对于服务器并发的概念,下面几点是错误的定义 ①服务器处理客户端请求的数量:没有时间、空间等限制,因此不能作为
    的头像 发表于 11-10 10:05 2866次阅读
    服务器<b class='flag-5'>并发</b>的概念

    辐射骚扰整改思路及方法:方案合并与原理探究 ?

    辐射骚扰整改思路及方法:方案合并与原理探究 ?|深圳比创达电子EMC
    的头像 发表于 11-09 11:22 536次阅读
    辐射骚扰整改思路及方法:<b class='flag-5'>方案</b><b class='flag-5'>合并</b>与原理探究 ?

    并发内存池项目实现

    本项目实现了一个并发内存池,参考了Google的开源项目tcmalloc实现的简易版;其功能就是实现高效的多线程内存管理。由功能可知,并发指的是高效的多线程,而内存池则是实现内存管
    的头像 发表于 11-09 11:16 588次阅读
    <b class='flag-5'>高</b><b class='flag-5'>并发</b>内存池项目实现

    并发场景请求合并

    我们在服务器端把请求合并,只发出一条SQL查询数据库,数据库返回后,服务器端处理返回数据,根据一个唯一请求ID,把数据分组,返回给对应用户。
    的头像 发表于 10-09 16:05 323次阅读
    <b class='flag-5'>高</b><b class='flag-5'>并发</b><b class='flag-5'>场景</b><b class='flag-5'>下</b><b class='flag-5'>请求</b><b class='flag-5'>合并</b>

    有源pfc效率高还是无源效pfc效率高

    有源pfc效率高还是无源效pfc效率高
    发表于 10-07 09:01