0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

效率加倍,高并发场景下的接口请求合并方案

jf_ro2CN3Fa 来源:芋道源码 2023-01-13 10:09 次阅读


前言

请求合并到底有什么意义呢?我们来看下图。

281af012-92e3-11ed-bfe3-dac502259ad0.png

假设我们3个用户(用户id分别是1、2、3),现在他们都要查询自己的基本信息,请求到服务器,服务器端请求数据库,发出3次请求。我们都知道数据库连接资源是相当宝贵的,那么我们怎么尽可能节省连接资源呢?

这里把数据库换成被调用的远程服务,也是同样的道理。

我们改变下思路,如下图所示。

28311838-92e3-11ed-bfe3-dac502259ad0.png

我们在服务器端把请求合并,只发出一条SQL查询数据库,数据库返回后,服务器端处理返回数据,根据一个唯一请求ID,把数据分组,返回给对应用户。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

技术手段

  • LinkedBlockQueue 阻塞队列
  • ScheduledThreadPoolExecutor 定时任务线程池
  • CompleteableFuture future 阻塞机制(Java 8 的 CompletableFuture 并没有 timeout 机制,后面优化,使用了队列替代)

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

代码实现

查询用户的代码

publicinterfaceUserService{

MapqueryUserByIdBatch(ListuserReqs);
}
@Service
publicclassUserServiceImplimplementsUserService{

@Resource
privateUsersMapperusersMapper;

@Override
publicMapqueryUserByIdBatch(ListuserReqs){
//全部参数
ListuserIds=userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
QueryWrapperqueryWrapper=newQueryWrapper<>();
//用in语句合并成一条SQL,避免多次请求数据库的IO
queryWrapper.in("id",userIds);
Listusers=usersMapper.selectList(queryWrapper);
Map>userGroup=users.stream().collect(Collectors.groupingBy(Users::getId));
HashMapresult=newHashMap<>();
userReqs.forEach(val->{
ListusersList=userGroup.get(val.getUserId());
if(!CollectionUtils.isEmpty(usersList)){
result.put(val.getRequestId(),usersList.get(0));
}else{
//表示没数据
result.put(val.getRequestId(),null);
}
});
returnresult;
}
}

合并请求的实现

packagecom.springboot.sample.service.impl;

importcom.springboot.sample.bean.Users;
importcom.springboot.sample.service.UserService;
importorg.springframework.stereotype.Service;

importjavax.annotation.PostConstruct;
importjavax.annotation.Resource;
importjava.util.*;
importjava.util.concurrent.*;

/***
*zzq
*包装成批量执行的地方
**/
@Service
publicclassUserWrapBatchService{
@Resource
privateUserServiceuserService;

/**
*最大任务数
**/
publicstaticintMAX_TASK_NUM=100;


/**
*请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
*CompletableFuture将处理结果返回
*/
publicclassRequest{
//请求id唯一
StringrequestId;
//参数
LonguserId;
//TODOJava8的CompletableFuture并没有timeout机制
CompletableFuturecompletableFuture;

publicStringgetRequestId(){
returnrequestId;
}

publicvoidsetRequestId(StringrequestId){
this.requestId=requestId;
}

publicLonggetUserId(){
returnuserId;
}

publicvoidsetUserId(LonguserId){
this.userId=userId;
}

publicCompletableFuturegetCompletableFuture(){
returncompletableFuture;
}

publicvoidsetCompletableFuture(CompletableFuturecompletableFuture){
this.completableFuture=completableFuture;
}
}

/*
LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
LinkedBlockingQueue与ArrayBlockingQueue的区别
ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
*/
privatefinalQueuequeue=newLinkedBlockingQueue();

@PostConstruct
publicvoidinit(){
//定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(1);

scheduledExecutorService.scheduleAtFixedRate(()->{
intsize=queue.size();
//如果队列没数据,表示这段时间没有请求,直接返回
if(size==0){
return;
}
Listlist=newArrayList<>();
System.out.println("合并了["+size+"]个请求");
//将队列的请求消费到一个集合保存
for(inti=0;i< size; i++) {
                //后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
if(i< MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
ListuserReqs=newArrayList<>();
for(Requestrequest:list){
userReqs.add(request);
}
//将参数传入service处理,这里是本地服务,也可以把userService看成RPC之类的远程调用
Mapresponse=userService.queryUserByIdBatch(userReqs);
//将处理结果返回各自的请求
for(Requestrequest:list){
Usersresult=response.get(request.requestId);
request.completableFuture.complete(result);//completableFuture.complete方法完成赋值,这一步执行完毕,下面future.get()阻塞的请求可以继续执行了
}
},100,10,TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性执行schedule是延迟执行initialDelay是初始延迟period是周期间隔后面是单位
//这里我写的是初始化后100毫秒后执行,周期性执行10毫秒执行一次
}

publicUsersqueryUser(LonguserId){
Requestrequest=newRequest();
//这里用UUID做请求id
request.requestId=UUID.randomUUID().toString().replace("-","");
request.userId=userId;
CompletableFuturefuture=newCompletableFuture<>();
request.completableFuture=future;
//将对象传入队列
queue.offer(request);
//如果这时候没完成赋值,那么就会阻塞,直到能够拿到值
try{
returnfuture.get();
}catch(InterruptedExceptione){
e.printStackTrace();
}catch(ExecutionExceptione){
e.printStackTrace();
}
returnnull;
}
}

控制层调用

/***
*请求合并
**/
@RequestMapping("/merge")
publicCallablemerge(LonguserId){
returnnewCallable(){
@Override
publicUserscall()throwsException{
returnuserBatchService.queryUser(userId);
}
};
}

Callable是什么可以参考:

  • https://blog.csdn.net/baidu_19473529/article/details/123596792

模拟高并发查询的代码

packagecom.springboot.sample;

importorg.springframework.web.client.RestTemplate;

importjava.util.Random;
importjava.util.concurrent.CountDownLatch;

publicclassTestBatch{
privatestaticintthreadCount=30;

privatefinalstaticCountDownLatchCOUNT_DOWN_LATCH=newCountDownLatch(threadCount);//为保证30个线程同时并发运行

privatestaticfinalRestTemplaterestTemplate=newRestTemplate();

publicstaticvoidmain(String[]args){


for(inti=0;i< threadCount; i++) {//循环开30个线程
newThread(newRunnable(){
publicvoidrun(){
COUNT_DOWN_LATCH.countDown();//每次减一
try{
COUNT_DOWN_LATCH.await();//此处等待状态,为了让30个线程同时进行
}catch(InterruptedExceptione){
e.printStackTrace();
}

for(intj=1;j<= 3;j++){
intparam=newRandom().nextInt(4);
if(param<=0){
param++;
}
StringresponseBody=restTemplate.getForObject("http://localhost:8080/asyncAndMerge/merge?userId="+param,String.class);
System.out.println(Thread.currentThread().getName()+"参数"+param+"返回值"+responseBody);
}
}
}).start();

}
}
}

测试效果

283d32f8-92e3-11ed-bfe3-dac502259ad0.png284d3d4c-92e3-11ed-bfe3-dac502259ad0.png

要注意的问题

  • Java 8 的 CompletableFuture 并没有 timeout 机制
  • 后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行(本例中加了MAX_TASK_NUM判断)

使用队列的超时解决Java 8 的 CompletableFuture 并没有 timeout 机制

核心代码

packagecom.springboot.sample.service.impl;

importcom.springboot.sample.bean.Users;
importcom.springboot.sample.service.UserService;
importorg.springframework.stereotype.Service;

importjavax.annotation.PostConstruct;
importjavax.annotation.Resource;
importjava.util.*;
importjava.util.concurrent.*;

/***
*zzq
*包装成批量执行的地方,使用queue解决超时问题
**/
@Service
publicclassUserWrapBatchQueueService{
@Resource
privateUserServiceuserService;

/**
*最大任务数
**/
publicstaticintMAX_TASK_NUM=100;


/**
*请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
*CompletableFuture将处理结果返回
*/
publicclassRequest{
//请求id
StringrequestId;

//参数
LonguserId;
//队列,这个有超时机制
LinkedBlockingQueueusersQueue;


publicStringgetRequestId(){
returnrequestId;
}

publicvoidsetRequestId(StringrequestId){
this.requestId=requestId;
}

publicLonggetUserId(){
returnuserId;
}

publicvoidsetUserId(LonguserId){
this.userId=userId;
}

publicLinkedBlockingQueuegetUsersQueue(){
returnusersQueue;
}

publicvoidsetUsersQueue(LinkedBlockingQueueusersQueue){
this.usersQueue=usersQueue;
}
}

/*
LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
LinkedBlockingQueue与ArrayBlockingQueue的区别
ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
*/
privatefinalQueuequeue=newLinkedBlockingQueue();

@PostConstruct
publicvoidinit(){
//定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(1);

scheduledExecutorService.scheduleAtFixedRate(()->{
intsize=queue.size();
//如果队列没数据,表示这段时间没有请求,直接返回
if(size==0){
return;
}
Listlist=newArrayList<>();
System.out.println("合并了["+size+"]个请求");
//将队列的请求消费到一个集合保存
for(inti=0;i< size; i++) {
                //后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
if(i< MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
ListuserReqs=newArrayList<>();
for(Requestrequest:list){
userReqs.add(request);
}
//将参数传入service处理,这里是本地服务,也可以把userService看成RPC之类的远程调用
Mapresponse=userService.queryUserByIdBatchQueue(userReqs);
for(RequestuserReq:userReqs){
//这里再把结果放到队列里
Usersusers=response.get(userReq.getRequestId());
userReq.usersQueue.offer(users);
}

},100,10,TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性执行schedule是延迟执行initialDelay是初始延迟period是周期间隔后面是单位
//这里我写的是初始化后100毫秒后执行,周期性执行10毫秒执行一次
}

publicUsersqueryUser(LonguserId){
Requestrequest=newRequest();
//这里用UUID做请求id
request.requestId=UUID.randomUUID().toString().replace("-","");
request.userId=userId;
LinkedBlockingQueueusersQueue=newLinkedBlockingQueue<>();
request.usersQueue=usersQueue;
//将对象传入队列
queue.offer(request);
//取出元素时,如果队列为空,给定阻塞多少毫秒再队列取值,这里是3秒
try{
returnusersQueue.poll(3000,TimeUnit.MILLISECONDS);
}catch(InterruptedExceptione){
e.printStackTrace();
}
returnnull;
}
}
...省略..

@Override
publicMapqueryUserByIdBatchQueue(ListuserReqs){
//全部参数
ListuserIds=userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
QueryWrapperqueryWrapper=newQueryWrapper<>();
//用in语句合并成一条SQL,避免多次请求数据库的IO
queryWrapper.in("id",userIds);
Listusers=usersMapper.selectList(queryWrapper);
Map>userGroup=users.stream().collect(Collectors.groupingBy(Users::getId));
HashMapresult=newHashMap<>();
//数据分组
userReqs.forEach(val->{
ListusersList=userGroup.get(val.getUserId());
if(!CollectionUtils.isEmpty(usersList)){
result.put(val.getRequestId(),usersList.get(0));
}else{
//表示没数据,这里要new,不然加入队列会空指针
result.put(val.getRequestId(),newUsers());
}
});
returnresult;
}

...省略...

小结

请求合并,批量的办法能大幅节省被调用系统的连接资源,本例是以数据库为例,其他RPC调用也是类似的道理。缺点就是请求的时间在执行实际的逻辑之前增加了等待时间,不适合低并发的场景。

代码地址

  • https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5

参考

  • https://www.cnblogs.com/oyjg/p/13099998.html


审核编辑 :李倩


声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 接口
    +关注

    关注

    33

    文章

    8775

    浏览量

    152394
  • 服务器
    +关注

    关注

    12

    文章

    9422

    浏览量

    86484
  • 数据库
    +关注

    关注

    7

    文章

    3868

    浏览量

    65004

原文标题:效率加倍,高并发场景下的接口请求合并方案

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    RCA接口转换为其他接口方案

    RCA接口转换为其他接口方案多种多样,具体取决于需要转换的目标接口类型以及应用场景。以下是一些常见的转换
    的头像 发表于 02-17 16:52 311次阅读

    华为支付-商户基础支付场景开发步骤

    :orderStr中sign字段签名规则是将除sign外的参数都做排序拼接后再签名,签名值赋值给sign字段。 以下为开放API接口请求及orderStr构建示例代码片段: import
    发表于 02-12 17:12

    华为支付-平台类商户合单支付场景准备

    PayMercAuth对象内的入参排序拼接进行签名。请参考排序拼接和签名示例代码。 2. 3.构建合单订单信息参数orderStr并返回给客户端。业务接口请求示例代码可参考业务接口请求。 (二)拉起
    发表于 02-11 10:40

    华为支付-免密支付接入支付并签约场景

    参考排序拼接和签名示例代码。 2. 3.构建订单信息参数orderStr返回给客户端,业务接口请求示例代码可参考业务接口请求。 (二)拉起华为支付收银台(端侧开发) 使用orderStr调用
    发表于 02-10 09:55

    华为支付-(可选)特定场景配置操作

    如涉及以下场景,需提前完成相关产品的开通或配置操作。如不涉及,则不需要配置。 场景一:产品开通操作 部分支付场景接入涉及产品开通,未开通产品直接接入,商户请求华为支付开放的API
    发表于 01-21 10:30

    低成本解决方案,RK3506的应用场景分析!

    RK3506 是瑞芯微推出的MPU产品,芯片制程为22nm,定位于轻量级、低成本解决方案。该MPU具有低功耗、外设接口丰富、实时性的特点,适合用多种工商业场景。本文将基于RK3506
    的头像 发表于 12-11 15:26 712次阅读
    低成本解决<b class='flag-5'>方案</b>,RK3506的应用<b class='flag-5'>场景</b>分析!

    存储、高效率、更灵活,拆解联核科技“前拣后存”解决方案

    为了解决传统仓库低矮、空间小储量小,库位不足等行业痛点,联核科技重磅推出四向穿梭车,向密集存储场景的拓展结合无人叉车,打造存储、高效率的创新四向车前拣后存解决方案
    的头像 发表于 12-02 16:01 189次阅读

    NVIDIA助力丽蟾科技打造AI训练与推理加速解决方案

    丽蟾科技通过 Leaper 资源管理平台集成 NVIDIA AI Enterprise,为企业和科研机构提供了一套高效、灵活的 AI 训练与推理加速解决方案。无论是在复杂的 AI 开发任务中,还是在并发推理
    的头像 发表于 10-27 10:03 402次阅读
    NVIDIA助力丽蟾科技打造AI训练与推理加速解决<b class='flag-5'>方案</b>

    测试聊并发-入门篇

    服务端接口的性能测试中,我们面临的挑战不仅仅是处理单个请求效率,更在于如何在多用户同时访问时保持系统的稳定性和响应速度。并发编程和测试,作为性能测试的核心,对于评估系统在
    的头像 发表于 10-15 15:23 270次阅读
    测试聊<b class='flag-5'>并发</b>-入门篇

    使用DM365的DC/DC转换器的Vin、高效率电源解决方案

    电子发烧友网站提供《使用DM365的DC/DC转换器的Vin、高效率电源解决方案.pdf》资料免费下载
    发表于 10-11 10:47 0次下载
    使用DM365的DC/DC转换器的<b class='flag-5'>高</b>Vin、高<b class='flag-5'>效率</b>电源解决<b class='flag-5'>方案</b>

    使用带有DVFS的DC/DC转换器的Vin、高效率电源解决方案

    电子发烧友网站提供《使用带有DVFS的DC/DC转换器的Vin、高效率电源解决方案.pdf》资料免费下载
    发表于 10-10 10:28 0次下载
    使用带有DVFS的DC/DC转换器的<b class='flag-5'>高</b>Vin、高<b class='flag-5'>效率</b>电源解决<b class='flag-5'>方案</b>

    并发物联网云平台是什么

    并发物联网云平台是一种能够处理大量设备同时连接并进行数据交换的云计算平台。这种平台通常被设计用来应对来自数以万计甚至数十亿计的物联网设备的并发请求,保证系统的稳定性和响应速度。 首先
    的头像 发表于 08-13 13:50 336次阅读

    并发系统的艺术:如何在流量洪峰中游刃有余

    尤为重要。用户对在线服务的需求和期望不断提高,系统的并发处理能力成为衡量其性能和用户体验的关键指标之一。并发系统不仅仅是大型互联网企业的专利,对于任何希望在市场中占据一席之地的公司来说,能够处理大量
    的头像 发表于 08-05 13:43 366次阅读
    <b class='flag-5'>高</b><b class='flag-5'>并发</b>系统的艺术:如何在流量洪峰中游刃有余

    为何什么risc-v芯片比arm的效率高

    并不是绝对的,而是取决于具体的应用场景、设计优化等因素。 综上所述,RISC-V芯片在某些情况可能相对于ARM架构芯片表现出更高的效率,这主要得益于RISC-V设计的模块化、开源特性和在寄存器数目、指令数目等方面的优势。然而,
    发表于 04-28 09:38

    鸿蒙OS开发实例:【工具类封装-http请求

    ;@ohos.promptAction';** **封装HTTP接口请求类,提供格式化的响应信息输出功能。 使用 DevEco Studio 3.1.1 Release 及以上版本,API 版本为 api 9 及以上
    的头像 发表于 03-27 22:32 1567次阅读
    鸿蒙OS开发实例:【工具类封装-http<b class='flag-5'>请求</b>】