0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

DeferredResult异步请求处理 提高系统吞吐量的一把利器

jf_ro2CN3Fa 来源:程序员Sunny 作者:程序员Sunny 2022-10-10 16:55 次阅读

基础准备

ResponseMsg

TaskService

阻塞调用

Callable异步调用

DeferredResult异步调用

后记

大家都知道,Callable和DeferredResult可以用来进行异步请求处理。利用它们,我们可以异步生成返回值,在具体处理的过程中,我们直接在controller中返回相应的Callable或者DeferredResult,在这之后,servlet线程将被释放,可用于其他连接;DeferredResult另外会有线程来进行结果处理,并setResult。

基础准备

在正式开始之前,我们先做一点准备工作,在项目中新建了一个base模块。其中包含一些提供基础支持的java类,在其他模块中可能会用到。

ResponseMsg

我们定义了一个ResponseMsg的实体类来作为我们的返回值类型:

@Data
@NoArgsConstructor
@AllArgsConstructor
publicclassResponseMsg{

privateintcode;

privateStringmsg;

privateTdata;

}

非常简单,里面包含了code、msg和data三个字段,其中data为泛型类型。另外类的注解Data、NoArgsConstructor和AllArgsConstructor都是lombok提供的简化我们开发的,主要功能分别是,为我们的类生成set和get方法,生成无参构造器和生成全参构造器。

使用idea进行开发的童鞋可以装一下lombok的支持插件。另外,lombok的依赖参见:


org.projectlombok
lombok-maven
1.16.16.0
pom

TaskService

我们建立了一个TaskService,用来为阻塞调用和Callable调用提供实际结果处理的。代码如下:

@Service
publicclassTaskService{

privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskService.class);

publicResponseMsggetResult(){

log.info("任务开始执行,持续等待中...");

try{
Thread.sleep(30000L);
}catch(InterruptedExceptione){
e.printStackTrace();
}
log.info("任务处理完成");
returnnewResponseMsg(0,"操作成功","success");
}
}

可以看到,里面实际提供服务的是getResult方法,这边直接返回一个new ResponseMsg(0,“操作成功”,“success”)。但是其中又特意让它sleep了30秒,模拟一个耗时较长的请求。

阻塞调用

平时我们用的最普遍的还是阻塞调用,通常请求的处理时间较短,在并发量较小的情况下,使用阻塞调用问题也不是很大。 阻塞调用实现非常简单,我们首先新建一个模块blockingtype,里面只包含一个controller类,用来接收请求并利用TaskService来获取结果。

@RestController
publicclassBlockController{

privatestaticfinalLoggerlog=LoggerFactory.getLogger(BlockController.class);

@Autowired
privateTaskServicetaskService;

@RequestMapping(value="/get",method=RequestMethod.GET)
publicResponseMsggetResult(){

log.info("接收请求,开始处理...");
ResponseMsgresult=taskService.getResult();
log.info("接收任务线程完成并退出");

returnresult;

}
}

我们请求的是getResult方法,其中调用了taskService,这个taskService我们是注入得到的。关于怎么跨模块注入的,其实也非常简单,在本模块,加入对其他模块的依赖就可以了。比如这里我们在blockingtype的pom.xml文件中加入对base模块的依赖:


com.sunny
base
1.0-SNAPSHOT

然后我们看一下实际调用效果,这里我们设置端口号为8080,启动日志如下:

2018-06-2419:02:48.514INFO11207---[main]com.sunny.BlockApplication:StartingBlockApplicationonxdeMacBook-Pro.localwithPID11207(/Users/zsunny/IdeaProjects/asynchronoustask/blockingtype/target/classesstartedbyzsunnyin/Users/zsunny/IdeaProjects/asynchronoustask)
2018-06-2419:02:48.519INFO11207---[main]com.sunny.BlockApplication:Noactiveprofileset,fallingbacktodefaultprofiles:default
2018-06-2419:02:48.762INFO11207---[main]ationConfigEmbeddedWebApplicationContext:Refreshingorg.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629:startupdate[SunJun2419:02:48CST2018];rootofcontexthierarchy
2018-06-2419:02:50.756INFO11207---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatinitializedwithport(s):8080(http)
2018-06-241950.778INFO11207---[main]o.apache.catalina.core.StandardService:Startingservice[Tomcat]
2018-06-241950.780INFO11207---[main]org.apache.catalina.core.StandardEngine:StartingServletEngine:ApacheTomcat/8.5.23
2018-06-241950.922INFO11207---[ost-startStop-1]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpringembeddedWebApplicationContext
2018-06-241950.922INFO11207---[ost-startStop-1]o.s.web.context.ContextLoader:RootWebApplicationContext:initializationcompletedin2200ms
2018-06-241951.156INFO11207---[ost-startStop-1]o.s.b.w.servlet.ServletRegistrationBean:Mappingservlet:'dispatcherServlet'to[/]
2018-06-241951.162INFO11207---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'characterEncodingFilter'to:[/*]
2018-06-241951.163INFO11207---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'hiddenHttpMethodFilter'to:[/*]
2018-06-241951.163INFO11207---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'httpPutFormContentFilter'to:[/*]
2018-06-241951.163INFO11207---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'requestContextFilter'to:[/*]
2018-06-241951.620INFO11207---[main]s.w.s.m.m.a.RequestMappingHandlerAdapter:Lookingfor@ControllerAdvice:org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629:startupdate[SunJun241948CST2018];rootofcontexthierarchy
2018-06-241951.724INFO11207---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/get],methods=[GET]}"ontopubliccom.sunny.entity.ResponseMsgcom.sunny.controller.BlockController.getResult()
2018-06-241951.730INFO11207---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error]}"ontopublicorg.springframework.http.ResponseEntity>org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest)
2018-06-241951.731INFO11207---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error],produces=[text/html]}"ontopublicorg.springframework.web.servlet.ModelAndVieworg.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse)
2018-06-241951.780INFO11207---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/webjars/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-241951.780INFO11207---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-241951.838INFO11207---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**/favicon.ico]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-241952.126INFO11207---[main]o.s.j.e.a.AnnotationMBeanExporter:RegisteringbeansforJMXexposureonstartup
2018-06-241952.205INFO11207---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatstartedonport(s):8080(http)
2018-06-241952.211INFO11207---[main]com.sunny.BlockApplication:StartedBlockApplicationin5.049seconds(JVMrunningfor6.118)

可以看到顺利启动了,那么我们就来访问一下:

http://localhost:8080/get

等待了大概30秒左右,得到json数据:

{"code":0,"msg":"操作成功","data":"success"}
e090108c-3eec-11ed-9e49-dac502259ad0.png

然后我们来看看控制台的日志:

2018-06-2419:04:07.315INFO11207---[nio-8080-exec-1]com.sunny.controller.BlockController:接收请求,开始处理...
2018-06-2419:04:07.316INFO11207---[nio-8080-exec-1]com.sunny.service.TaskService:任务开始执行,持续等待中...
2018-06-2419:04:37.322INFO11207---[nio-8080-exec-1]com.sunny.service.TaskService:任务处理完成
2018-06-2419:04:37.322INFO11207---[nio-8080-exec-1]com.sunny.controller.BlockController:接收任务线程完成并退出

可以看到在“ResponseMsg result = taskService.getResult();”的时候是阻塞了大约30秒钟,随后才执行它后面的打印语句“log.info(“接收任务线程完成并退出”);”。

Callable异步调用

涉及到较长时间的请求处理的话,比较好的方式是用异步调用,比如利用Callable返回结果。异步主要表现在,接收请求的servlet可以不用持续等待结果产生,而可以被释放去处理其他事情。当然,在调用者来看的话,其实还是表现在持续等待30秒。这有利于服务端提供更大的并发处理量。

这里我们新建一个callabledemo模块,在这个模块中,我们一样只包含一个TaskController,另外也是需要加入base模块的依赖。只不过这里我们的返回值不是ResponseMsg类型了,而是一个Callable类型。

@RestController
publicclassTaskController{

privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskController.class);

@Autowired
privateTaskServicetaskService;

@RequestMapping(value="/get",method=RequestMethod.GET)
publicCallable>getResult(){

log.info("接收请求,开始处理...");

Callable>result=(()->{
returntaskService.getResult();
});

log.info("接收任务线程完成并退出");

returnresult;
}

}

在里面,我们创建了一个Callable类型的变量result,并实现了它的call方法,在call方法中,我们也是调用taskService的getResult方法得到返回值并返回。

下一步我们就运行一下这个模块,这里我们在模块的application.yml中设置端口号为8081:

server:
port:8081

启动,可以看到控制台的消息:

2018-06-2419:38:14.658INFO11226---[main]com.sunny.CallableApplication:StartingCallableApplicationonxdeMacBook-Pro.localwithPID11226(/Users/zsunny/IdeaProjects/asynchronoustask/callabledemo/target/classesstartedbyzsunnyin/Users/zsunny/IdeaProjects/asynchronoustask)
2018-06-2419:38:14.672INFO11226---[main]com.sunny.CallableApplication:Noactiveprofileset,fallingbacktodefaultprofiles:default
2018-06-2419:38:14.798INFO11226---[main]ationConfigEmbeddedWebApplicationContext:Refreshingorg.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629:startupdate[SunJun2419:38:14CST2018];rootofcontexthierarchy
2018-06-2419:38:16.741INFO11226---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatinitializedwithport(s):8081(http)
2018-06-241916.762INFO11226---[main]o.apache.catalina.core.StandardService:Startingservice[Tomcat]
2018-06-241916.764INFO11226---[main]org.apache.catalina.core.StandardEngine:StartingServletEngine:ApacheTomcat/8.5.23
2018-06-241916.918INFO11226---[ost-startStop-1]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpringembeddedWebApplicationContext
2018-06-241916.919INFO11226---[ost-startStop-1]o.s.web.context.ContextLoader:RootWebApplicationContext:initializationcompletedin2126ms
2018-06-241917.144INFO11226---[ost-startStop-1]o.s.b.w.servlet.ServletRegistrationBean:Mappingservlet:'dispatcherServlet'to[/]
2018-06-241917.149INFO11226---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'characterEncodingFilter'to:[/*]
2018-06-241917.150INFO11226---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'hiddenHttpMethodFilter'to:[/*]
2018-06-241917.150INFO11226---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'httpPutFormContentFilter'to:[/*]
2018-06-241917.150INFO11226---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'requestContextFilter'to:[/*]
2018-06-241917.632INFO11226---[main]s.w.s.m.m.a.RequestMappingHandlerAdapter:Lookingfor@ControllerAdvice:org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629:startupdate[SunJun241914CST2018];rootofcontexthierarchy
2018-06-241917.726INFO11226---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/get],methods=[GET]}"ontopublicjava.util.concurrent.Callable>com.sunny.controller.TaskController.getResult()
2018-06-241917.731INFO11226---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error]}"ontopublicorg.springframework.http.ResponseEntity>org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest)
2018-06-241917.733INFO11226---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error],produces=[text/html]}"ontopublicorg.springframework.web.servlet.ModelAndVieworg.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse)
2018-06-241917.777INFO11226---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/webjars/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-241917.777INFO11226---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-241917.825INFO11226---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**/favicon.ico]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-241918.084INFO11226---[main]o.s.j.e.a.AnnotationMBeanExporter:RegisteringbeansforJMXexposureonstartup
2018-06-241918.176INFO11226---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatstartedonport(s):8081(http)
2018-06-241918.183INFO11226---[main]com.sunny.CallableApplication:StartedCallableApplicationin4.538seconds(JVMrunningfor5.327)

完美启动了,然后我们还是一样,访问一下:

http://localhost:8081/get

在大约等待了30秒左右,我们在浏览器上得到json数据:

{"code":0,"msg":"操作成功","data":"success"}
e09eae3a-3eec-11ed-9e49-dac502259ad0.png

和阻塞调用的结果一样——当然一样啦,都是同taskService中得到的结果。

然后我们看看控制台的消息:

2018-06-2419:39:07.738INFO11226---[nio-8081-exec-1]com.sunny.controller.TaskController:接收请求,开始处理...
2018-06-2419:39:07.740INFO11226---[nio-8081-exec-1]com.sunny.controller.TaskController:接收任务线程完成并退出
2018-06-2419:39:07.753INFO11226---[MvcAsync1]com.sunny.service.TaskService:任务开始执行,持续等待中...
2018-06-2419:39:37.756INFO11226---[MvcAsync1]com.sunny.service.TaskService:任务处理完成

很显然,这里的消息出现的顺序和阻塞模式有所不同了,这里在“接收请求,开始处理…”之后直接打印了“接收任务线程完成并退出”。而不是先出现“任务处理完成”后再出现“接收任务线程完成并退出”。

这就说明,这里没有阻塞在从taskService中获得数据的地方,controller中直接执行后面的部分(这里可以做其他很多事,不仅仅是打印日志)。

DeferredResult异步调用

前面铺垫了那么多,还是主要来说DeferredResult的;和Callable一样,DeferredResult也是为了支持异步调用。两者的主要差异,Sunny觉得主要在DeferredResult需要自己用线程来处理结果setResult,而Callable的话不需要我们来维护一个结果处理线程。

总体来说,Callable的话更为简单,同样的也是因为简单,灵活性不够;相对地,DeferredResult更为复杂一些,但是又极大的灵活性。在可以用Callable的时候,直接用Callable;而遇到Callable没法解决的场景的时候,可以尝试使用DeferredResult。

这里Sunny将会设计两个DeferredResult使用场景。

场景一:

创建一个持续在随机间隔时间后从任务队列中获取任务的线程

访问controller中的方法,创建一个DeferredResult,设定超时时间和超时返回对象

设定DeferredResult的超时回调方法和完成回调方法

将DeferredResult放入任务中,并将任务放入任务队列

步骤1中的线程获取到任务队列中的任务,并产生一个随机结果返回

场景其实非常简单,接下来我们来看看具体的实现。首先,我们还是来看任务实体类是怎么样的。

/**
*任务实体类
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
publicclassTask{

privateinttaskId;

privateDeferredResult>taskResult;

@Override
publicStringtoString(){
return"Task{"+
"taskId="+taskId+
",taskResult"+"{responseMsg="+taskResult.getResult()+"}"+
'}';
}
}

看起来非常简单,成员变量又taskId和taskResult,前者是int类型,后者为我们的DeferredResult类型,它的泛型类型为ResponseMsg,注意这里用到ResponseMsg,所以也需要导入base模块的依赖。

另外注解之前已经说明了,不过这里再提一句,@Data注解也包含了toString的重写,但是这里为了知道具体的ResponseMsg的内容,Sunny特意手动重写。

看完Task类型,我们再来看看任务队列。

@Component
publicclassTaskQueue{

privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskQueue.class);

privatestaticfinalintQUEUE_LENGTH=10;

privateBlockingQueuequeue=newLinkedBlockingDeque<>(QUEUE_LENGTH);

privateinttaskId=0;


/**
*加入任务
*@paramdeferredResult
*/
publicvoidput(DeferredResult>deferredResult){

taskId++;

log.info("任务加入队列,id为:{}",taskId);

queue.offer(newTask(taskId,deferredResult));

}

/**
*获取任务
*@return
*@throwsInterruptedException
*/
publicTasktake()throwsInterruptedException{

Tasktask=queue.poll();

log.info("获得任务:{}",task);

returntask;
}
}

这里我们将它作为一个bean,之后会在其他bean中注入,这里实际的队列为成员变量queue,它是LinkedBlockingDeque类型的。还有一个成员变量为taskId,是用于自动生成任务id的,并且在加入任务的方法中实现自增,以确保每个任务的id唯一性。方法的话又put和take方法,分别用于向队列中添加任务和取出任务;其中,对queue的操作,分别用了offer和poll,这样是实现一个非阻塞的操作,并且在队列为空和队列已满的情况下不会抛出异常。

另外,大家实现的时候,可以考虑使用ConcurrentLinkedQueue来高效处理并发,因为它属于无界非阻塞队列,使用过程中需要考虑可能造成的OOM问题。Sunny这里选择阻塞队列LinkedBlockingDeque,它底层使用加锁进行了同步;但是这里使用了TaskQueue进行封装,处理过程中有一些额外操作,调用时需要加锁以防发生某些意料之外的问题。

然后我们来看步骤1中的,启动一个持续从任务队列中获取任务的线程的具体实现。

@Component
publicclassTaskExecute{

privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskExecute.class);

privatestaticfinalRandomrandom=newRandom();

//默认随机结果的长度
privatestaticfinalintDEFAULT_STR_LEN=10;

//用于生成随机结果
privatestaticfinalStringstr="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";

@Autowired
privateTaskQueuetaskQueue;


/**
*初始化启动
*/
@PostConstruct
publicvoidinit(){

log.info("开始持续处理任务");

newThread(this::execute).start();


}


/**
*持续处理
*返回执行结果
*/
privatevoidexecute(){

while(true){

try{

//取出任务
Tasktask;

synchronized(taskQueue){

task=taskQueue.take();

}

if(task!=null){

//设置返回结果
StringrandomStr=getRandomStr(DEFAULT_STR_LEN);

ResponseMsgresponseMsg=newResponseMsg(0,"success",randomStr);

log.info("返回结果:{}",responseMsg);

task.getTaskResult().setResult(responseMsg);
}

inttime=random.nextInt(10);

log.info("处理间隔:{}秒",time);

Thread.sleep(time*1000L);

}catch(InterruptedExceptione){
e.printStackTrace();
}

}

}

/**
*获取长度为len的随机串
*@paramlen
*@return
*/
privateStringgetRandomStr(intlen){

intmaxInd=str.length();

StringBuildersb=newStringBuilder();

intind;

for(inti=0;i

这里,我们注入了TaskQueue,成员变量比较简单并且有注释,不再说明,主要来看方法。先看一下最后一个方法getRandomStr,很显然,这是一个获得长度为len的随机串的方法,访问限定为private,为类中其他方法服务的。然后我们看init方法,它执行的其实就是开启了一个线程并且执行execute方法,注意一下它上面的@PostContruct注解,这个注解就是在这个bean初始化的时候就执行这个方法。

所以我们需要关注的实际逻辑在execute方法中。可以看到,在execute方法中,用了一个while(true)来保证线程持续运行。因为是并发环境下,考虑对taskQueue加锁,从中取出任务;如果任务不为空,获取用getRandomStr生成一个随机结果并用setResult方法进行返回。

最后可以看到,利用random生成来一个[0,10)的随机数,并让线程sleep相应的秒数。这里注意一下,需要设定一个时间间隔,否则,先线程持续跑会出现CPU负载过高的情况。

接下来我们就看看controller是如何处理的。

@RestController
publicclassTaskController{

privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskController.class);

//超时结果
privatestaticfinalResponseMsgOUT_OF_TIME_RESULT=newResponseMsg<>(-1,"超时","outoftime");

//超时时间
privatestaticfinallongOUT_OF_TIME=3000L;

@Autowired
privateTaskQueuetaskQueue;


@RequestMapping(value="/get",method=RequestMethod.GET)
publicDeferredResult>getResult(){

log.info("接收请求,开始处理...");

//建立DeferredResult对象,设置超时时间,以及超时返回超时结果
DeferredResult>result=newDeferredResult<>(OUT_OF_TIME,OUT_OF_TIME_RESULT);

result.onTimeout(()->{
log.info("调用超时");
});

result.onCompletion(()->{
log.info("调用完成");
});

//并发,加锁
synchronized(taskQueue){

taskQueue.put(result);

}

log.info("接收任务线程完成并退出");

returnresult;

}

}

这里我们同样注入了taskQueue。请求方法就只有一个getResult,返回值为DeferredResult。这里我们首先创建了DeferredResult对象result并且设定超时时间和超时返回结果;随后设定result的onTimeout和onCompletion方法,其实就是传入两个Runnable对象来实现回调的效果;之后就是加锁并且将result加入任务队列中。

总体来说,场景不算非常复杂,看到这里大家应该都能基本了解了。然后我们来跑一下测试一下。

我们在application.yml中设定端口为8082:

server:
port:8082

启动模块,控制台信息如下:

2018-06-2421:49:28.815INFO11322---[main]com.sunny.DeferredResultApplication:StartingDeferredResultApplicationonxdeMacBook-Pro.localwithPID11322(/Users/zsunny/IdeaProjects/asynchronoustask/deferredresultdemo/target/classesstartedbyzsunnyin/Users/zsunny/IdeaProjects/asynchronoustask)
2018-06-2421:49:28.821INFO11322---[main]com.sunny.DeferredResultApplication:Noactiveprofileset,fallingbacktodefaultprofiles:default
2018-06-2421:49:29.010INFO11322---[main]ationConfigEmbeddedWebApplicationContext:Refreshingorg.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@5ccddd20:startupdate[SunJun2421:49:28CST2018];rootofcontexthierarchy
2018-06-2421:49:30.971INFO11322---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatinitializedwithport(s):8082(http)
2018-06-242130.980INFO11322---[main]o.apache.catalina.core.StandardService:Startingservice[Tomcat]
2018-06-242130.981INFO11322---[main]org.apache.catalina.core.StandardEngine:StartingServletEngine:ApacheTomcat/8.5.23
2018-06-242131.062INFO11322---[ost-startStop-1]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpringembeddedWebApplicationContext
2018-06-242131.063INFO11322---[ost-startStop-1]o.s.web.context.ContextLoader:RootWebApplicationContext:initializationcompletedin2066ms
2018-06-242131.207INFO11322---[ost-startStop-1]o.s.b.w.servlet.ServletRegistrationBean:Mappingservlet:'dispatcherServlet'to[/]
2018-06-242131.212INFO11322---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'characterEncodingFilter'to:[/*]
2018-06-242131.213INFO11322---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'hiddenHttpMethodFilter'to:[/*]
2018-06-242131.213INFO11322---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'httpPutFormContentFilter'to:[/*]
2018-06-242131.213INFO11322---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'requestContextFilter'to:[/*]
2018-06-242131.247INFO11322---[main]com.sunny.bean.TaskExecute:开始持续处理任务
2018-06-242131.249INFO11322---[Thread-8]com.sunny.bean.TaskQueue:获得任务:null
2018-06-242131.250INFO11322---[Thread-8]com.sunny.bean.TaskExecute:处理间隔:6秒
2018-06-242131.498INFO11322---[main]s.w.s.m.m.a.RequestMappingHandlerAdapter:Lookingfor@ControllerAdvice:org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@5ccddd20:startupdate[SunJun242128CST2018];rootofcontexthierarchy
2018-06-242131.572INFO11322---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/get],methods=[GET]}"ontopublicorg.springframework.web.context.request.async.DeferredResult>com.sunny.controller.TaskController.getResult()
2018-06-242131.576INFO11322---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error]}"ontopublicorg.springframework.http.ResponseEntity>org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest)
2018-06-242131.577INFO11322---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error],produces=[text/html]}"ontopublicorg.springframework.web.servlet.ModelAndVieworg.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse)
2018-06-242131.602INFO11322---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/webjars/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-242131.602INFO11322---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-242131.628INFO11322---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**/favicon.ico]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-242131.811INFO11322---[main]o.s.j.e.a.AnnotationMBeanExporter:RegisteringbeansforJMXexposureonstartup
2018-06-242131.892INFO11322---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatstartedonport(s):8082(http)
2018-06-242131.897INFO11322---[main]com.sunny.DeferredResultApplication:StartedDeferredResultApplicationin3.683seconds(JVMrunningfor4.873)
2018-06-242137.254INFO11322---[Thread-8]com.sunny.bean.TaskQueue:获得任务:null
2018-06-242137.254INFO11322---[Thread-8]com.sunny.bean.TaskExecute:处理间隔:6秒

首先程序完美启动,这没有问题,然后我们注意这几条信息:

2018-06-2421:49:31.247INFO11322---[main]com.sunny.bean.TaskExecute:开始持续处理任务
2018-06-2421:49:31.249INFO11322---[Thread-8]com.sunny.bean.TaskQueue:获得任务:null
2018-06-2421:49:31.250INFO11322---[Thread-8]com.sunny.bean.TaskExecute:处理间隔:6秒

这说明我们TaskExecute中已经成功启动了持续获取任务的线程。

接着,我们还是访问一下:

http://localhost:8082/get

这一回等待了若干秒就出现了结果:

{"code":0,"msg":"success","data":"CEUO2lmMJr"}
e0b82806-3eec-11ed-9e49-dac502259ad0.png

可以看到我们的随机串是CEUO2lmMJr。再一次请求又会出现不同的随机串。再看一下我们控制台的相关信息:

2018-06-2421:51:04.303INFO11322---[nio-8082-exec-1]com.sunny.controller.TaskController:接收请求,开始处理...
2018-06-2421:51:04.304INFO11322---[nio-8082-exec-1]com.sunny.bean.TaskQueue:任务加入队列,id为:1
2018-06-2421:51:04.304INFO11322---[nio-8082-exec-1]com.sunny.controller.TaskController:接收任务线程完成并退出
2018-06-2421:51:04.323INFO11322---[Thread-8]com.sunny.bean.TaskQueue:获得任务:Task{taskId=1,taskResult{responseMsg=null}}
2018-06-2421:51:04.323INFO11322---[Thread-8]com.sunny.bean.TaskExecute:返回结果:ResponseMsg(code=0,msg=success,data=CEUO2lmMJr)

也是符合我们的预期,请求进来进入队列中,由TaskExecute获取请求并进行处理结果返回。

场景二

用户发送请求到TaskController的getResult方法,该方法接收到请求,创建一个DeferredResult,设定超时时间和超时返回对象

设定DeferredResult的超时回调方法和完成回调方法,超时和完成都会将本次请求产生的DeferredResult从集合中remove

将DeferredResult放入集合中

另有一个TaskExecuteController,访问其中一个方法,可取出集合中的等待返回的DeferredResult对象,并将传入的参数设定为结果

首先我们来看看DeferredResult的集合类:

@Component
@Data
publicclassTaskSet{

privateSet>>set=newHashSet<>();

}

非常简单,只包含了一个HashSet的成员变量。这里可以考虑用ConcurrentHashMap来实现高效并发,Sunny这里简单实用HashSet,配合加锁实现并发处理。

然后我们看看发起调用的Controller代码:

@RestController
publicclassTaskController{

privateLoggerlog=LoggerFactory.getLogger(TaskController.class);

//超时结果
privatestaticfinalResponseMsgOUT_OF_TIME_RESULT=newResponseMsg<>(-1,"超时","outoftime");

//超时时间
privatestaticfinallongOUT_OF_TIME=60000L;

@Autowired
privateTaskSettaskSet;

@RequestMapping(value="/get",method=RequestMethod.GET)
publicDeferredResult>getResult(){

log.info("接收请求,开始处理...");

//建立DeferredResult对象,设置超时时间,以及超时返回超时结果
DeferredResult>result=newDeferredResult<>(OUT_OF_TIME,OUT_OF_TIME_RESULT);

result.onTimeout(()->{
log.info("调用超时,移除任务,此时队列长度为{}",taskSet.getSet().size());

synchronized(taskSet.getSet()){

taskSet.getSet().remove(result);
}
});

result.onCompletion(()->{
log.info("调用完成,移除任务,此时队列长度为{}",taskSet.getSet().size());

synchronized(taskSet.getSet()){

taskSet.getSet().remove(result);
}
});

//并发,加锁
synchronized(taskSet.getSet()){

taskSet.getSet().add(result);

}
log.info("加入任务集合,集合大小为:{}",taskSet.getSet().size());

log.info("接收任务线程完成并退出");

returnresult;

}
}

和场景一中有些类似,但是注意这里在onTimeout和onCompletion中都多了一个移除元素的操作,这也就是每次调用结束,需要将集合中的DeferredResult对象移除,即集合中保存的都是等待请求结果的DeferredResult对象。

然后我们看处理请求结果的Controller:

@RestController
publicclassTaskExecuteController{

privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskExecuteController.class);

@Autowired
privateTaskSettaskSet;

@RequestMapping(value="/set/{result}",method=RequestMethod.GET)
publicStringsetResult(@PathVariable("result")Stringresult){

ResponseMsgres=newResponseMsg<>(0,"success",result);

log.info("结果处理开始,得到输入结果为:{}",res);

Set>>set=taskSet.getSet();



synchronized(set){

set.forEach((deferredResult)->{deferredResult.setResult(res);});

}

return"Successfullysetresult:"+result;
}
}

看起来非常简单,只是做了两个操作,接收得到的参数并利用参数生成一个ResponseMsg对象,随后将集合中的所有DeferredResult都设定结果为根据参数生成的ResponseMsg对象。最后返回一个提示:成功设置结果…

好了,话不多说,我们来启动测试验证一下。我们说一下验证的过程,我们同时打开两个请求,然后再设定一个结果,最后两个请求都会得到这个结果。当然同时多个或者一个请求也是一样。这里有一个地方需要注意一下:

浏览器可能会对相同的url请求有缓存策略,也就是同时两个标签向同一个url发送请求,浏览器只会先发送一个请求,等一个请求结束才会再发送另外一个请求。

这样,我们考虑从两个浏览器中发送请求:

localhost:8083/get

然后随便找其中一个,发送请求来设置结果:

http://localhost:8083/set/aaa

首先我们先启动模块,可以从控制台中看到完美启动管理了:

2018-06-2500:18:44.379INFO12688---[main]com.sunny.DeferredResultApplication:StartingDeferredResultApplicationonxdeMacBook-Pro.localwithPID12688(/Users/zsunny/IdeaProjects/asynchronoustask/deferredresultdemo2/target/classesstartedbyzsunnyin/Users/zsunny/IdeaProjects/asynchronoustask)
2018-06-2500:18:44.382INFO12688---[main]com.sunny.DeferredResultApplication:Noactiveprofileset,fallingbacktodefaultprofiles:default
2018-06-2500:18:44.489INFO12688---[main]ationConfigEmbeddedWebApplicationContext:Refreshingorg.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@96def03:startupdate[MonJun2500:18:44CST2018];rootofcontexthierarchy
2018-06-2500:18:45.650INFO12688---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatinitializedwithport(s):8083(http)
2018-06-250045.658INFO12688---[main]o.apache.catalina.core.StandardService:Startingservice[Tomcat]
2018-06-250045.659INFO12688---[main]org.apache.catalina.core.StandardEngine:StartingServletEngine:ApacheTomcat/8.5.23
2018-06-250045.722INFO12688---[ost-startStop-1]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpringembeddedWebApplicationContext
2018-06-250045.723INFO12688---[ost-startStop-1]o.s.web.context.ContextLoader:RootWebApplicationContext:initializationcompletedin1241ms
2018-06-250045.817INFO12688---[ost-startStop-1]o.s.b.w.servlet.ServletRegistrationBean:Mappingservlet:'dispatcherServlet'to[/]
2018-06-250045.821INFO12688---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'characterEncodingFilter'to:[/*]
2018-06-250045.821INFO12688---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'hiddenHttpMethodFilter'to:[/*]
2018-06-250045.821INFO12688---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'httpPutFormContentFilter'to:[/*]
2018-06-250045.821INFO12688---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'requestContextFilter'to:[/*]
2018-06-250046.150INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerAdapter:Lookingfor@ControllerAdvice:org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@96def03:startupdate[MonJun250044CST2018];rootofcontexthierarchy
2018-06-250046.197INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/get],methods=[GET]}"ontopublicorg.springframework.web.context.request.async.DeferredResult>com.sunny.controller.TaskController.getResult()
2018-06-250046.199INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/set/{result}],methods=[GET]}"ontopublicjava.lang.Stringcom.sunny.controller.TaskExecuteController.setResult(java.lang.String)
2018-06-250046.202INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error]}"ontopublicorg.springframework.http.ResponseEntity>org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest)
2018-06-250046.202INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error],produces=[text/html]}"ontopublicorg.springframework.web.servlet.ModelAndVieworg.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse)
2018-06-250046.237INFO12688---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/webjars/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-250046.238INFO12688---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-250046.262INFO12688---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**/favicon.ico]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-250046.362INFO12688---[main]o.s.j.e.a.AnnotationMBeanExporter:RegisteringbeansforJMXexposureonstartup
2018-06-250046.467INFO12688---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatstartedonport(s):8083(http)
2018-06-250046.472INFO12688---[main]com.sunny.DeferredResultApplication:StartedDeferredResultApplicationin2.675seconds(JVMrunningfor3.623)

完美启动,接下来Sunny在火狐中发起一个请求

e0c896fa-3eec-11ed-9e49-dac502259ad0.png

可以看到正在等待请求结果。随后我们在谷歌浏览器中发起请求

e0e28c0e-3eec-11ed-9e49-dac502259ad0.png

两个请求同时处于等待状态,这时候我们看一下控制台信息:

2018-06-2500:22:34.642INFO12688---[nio-8083-exec-6]com.sunny.controller.TaskController:接收请求,开始处理...
2018-06-2500:22:34.642INFO12688---[nio-8083-exec-6]com.sunny.controller.TaskController:加入任务集合,集合大小为:1
2018-06-2500:22:34.642INFO12688---[nio-8083-exec-6]com.sunny.controller.TaskController:接收任务线程完成并退出
2018-06-2500:22:37.332INFO12688---[nio-8083-exec-7]com.sunny.controller.TaskController:接收请求,开始处理...
2018-06-2500:22:37.332INFO12688---[nio-8083-exec-7]com.sunny.controller.TaskController:加入任务集合,集合大小为:2
2018-06-2500:22:37.332INFO12688---[nio-8083-exec-7]com.sunny.controller.TaskController:接收任务线程完成并退出

可以看到两个请求都已经接收到了,并且加入了队列。这时候,我们再发送一个设置结果的请求。

e0fdc24e-3eec-11ed-9e49-dac502259ad0.png

随后我们查看两个调用请求的页面,发现页面已经不在等待状态中了,都已经得到了结果。

e10bb70a-3eec-11ed-9e49-dac502259ad0.pnge12f56c4-3eec-11ed-9e49-dac502259ad0.png

另外,再给大家展示一下超时的结果,即我们发起调用请求,但是不发起设置结果的请求,等待时间结束。

e1469dc0-3eec-11ed-9e49-dac502259ad0.png

查看控制台信息:

2018-06-2500:26:15.898INFO12688---[nio-8083-exec-4]com.sunny.controller.TaskController:接收请求,开始处理...
2018-06-2500:26:15.898INFO12688---[nio-8083-exec-4]com.sunny.controller.TaskController:加入任务集合,集合大小为:1
2018-06-2500:26:15.898INFO12688---[nio-8083-exec-4]com.sunny.controller.TaskController:接收任务线程完成并退出
2018-06-2500:27:16.014INFO12688---[nio-8083-exec-5]com.sunny.controller.TaskController:调用超时,移除任务,此时队列长度为1
2018-06-2500:27:16.018INFO12688---[nio-8083-exec-5]com.sunny.controller.TaskController:调用完成,移除任务,此时队列长度为0

后记

想要完整代码的童鞋,点这里:

https://gitee.com/sunnymore/asynchronous_task

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 吞吐量
    +关注

    关注

    0

    文章

    47

    浏览量

    12328
  • 代码
    +关注

    关注

    30

    文章

    4779

    浏览量

    68525
  • 异步请求
    +关注

    关注

    0

    文章

    2

    浏览量

    1124

原文标题:提高系统吞吐量的一把利器:DeferredResult 到底有多强?

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    如何提高CYBT-243053-02吞吐量

    25KB/s,这对于我们的用例来说非常低。 使用自定义固件代替 EZ-Serial 是否有助于提高吞吐量? 欢迎提出任何建议。我已经就此向英飞凌开了张罚单,但他们回来时没有提供更多信息。 因此,为了
    发表于 02-27 06:56

    提高BLE吞吐量的可行办法

    提高BLE吞吐量的可行办法如何实现更快的BLE吞吐量
    发表于 01-18 06:26

    如何利用NI LabVIEW技术提高测试系统吞吐量

    怎么可以创建出高性能的测试系统?如何利用NI LabVIEW技术提高测试系统吞吐量?如何利用NI LabVIEW技术实现并行化处理和并行化
    发表于 04-15 07:00

    如何提高VLD的吞吐量和执行效率?

    本文讨论种新型的VLD解码结构,它通过并行侦测多路码字,将Buffer中的多个可变长码次读出,这将极大地提高VLD的吞吐量和执行效率。然后采用FPGA对这种并行VLD算法的结构进行
    发表于 04-28 06:08

    如何通过触发模型提高吞吐量

    如何通过触发模型提高吞吐量
    发表于 05-11 07:00

    FF H1基于RDA的吞吐量优化算法

    为了进提高FF H1异步通信吞吐量,本文在原有优化算法[1]的基础上,提出了基于异步窗口碎片合理分布的RDA
    发表于 09-03 09:17 9次下载

    防火墙术语-吞吐量

    防火墙术语-吞吐量  术语名称:吞吐量 术语解释:网络中的数据是由个个数据包组成,防火
    发表于 02-24 11:06 1537次阅读

    如何提高无线传感器网络的吞吐量

    吞吐量是无线传感器网络(Wireless Sensor Network,WSN)的项重要性能指标,它直接反映了无线传感器网络工作运行的效率,如何提高吞吐量
    发表于 10-04 17:17 2568次阅读
    如何<b class='flag-5'>提高</b>无线传感器网络的<b class='flag-5'>吞吐量</b>

    如何提高系统设计容量和吞吐量

    和流媒体的视频容量。 然而,在更高频率范围内工作可能会带来更多的挑战,特别是 bandBoost 滤波器该如何提高系统设计容量和吞吐量呢?今天就带大家了解下,Qorvo给出的具体方案
    的头像 发表于 09-30 09:14 2162次阅读

    debug 吞吐量的办法

    Debug 网络质量的时候,我们般会关注两个因素:延迟和吞吐量(带宽)。延迟比较好验证,Ping 下或者 mtr[1] 下就能看出来。这篇文章分享
    的头像 发表于 08-23 09:17 958次阅读

    debug 吞吐量的办法

    Debug 网络质量的时候,我们般会关注两个因素:延迟和吞吐量(带宽)。延迟比较好验证,Ping 下或者 mtr[1] 下就能看出来。这篇文章分享
    的头像 发表于 09-02 09:36 864次阅读

    如何让接口吞吐量提升10多倍

    想,500/s吞吐量还不简单。Tomcat按照100个线程,那就是单线程1S内处理5个请求,200ms处理
    的头像 发表于 01-17 10:22 1814次阅读

    iperf吞吐量的测试流程

    iperf吞吐量测试指南
    发表于 04-03 15:40 2次下载

    如何显著提高ATE电源吞吐量

    作为名测试工程师,你的工作并不容易。降低成本和提高系统吞吐量的压力直存在。本文中,我们将讨论影响系统
    的头像 发表于 11-08 14:59 702次阅读
    如何显著<b class='flag-5'>提高</b>ATE电源<b class='flag-5'>吞吐量</b>?

    影响ATE电源系统吞吐量的关键因素

    从串行设备测试改变为并行设备测试可以显著地增加测试系统吞吐量。测试执行活动的大部分可能涉及使用DC电源设置条件和进行测量。配置测试系统,使其能够使用多个直流电源同时对多个设备执行测试,是显著
    发表于 11-29 12:36 410次阅读
    影响ATE电源<b class='flag-5'>系统</b><b class='flag-5'>吞吐量</b>的关键因素