CompletableFuture是jdk8的新特性。CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步会点、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
一、创建异步任务
1. supplyAsync
supplyAsync是创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法
//带返回值异步请求,默认线程池
publicstaticCompletableFuturesupplyAsync(Suppliersupplier)
//带返回值的异步请求,可以自定义线程池
publicstaticCompletableFuturesupplyAsync(Suppliersupplier,Executorexecutor)
测试代码:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf=CompletableFuture.supplyAsync(()->{
System.out.println("dosomething....");
return"result";
});
//等待任务执行完成
System.out.println("结果->"+cf.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
//自定义线程池
ExecutorServiceexecutorService=Executors.newSingleThreadExecutor();
CompletableFuturecf=CompletableFuture.supplyAsync(()->{
System.out.println("dosomething....");
return"result";
},executorService);
//等待子任务执行完成
System.out.println("结果->"+cf.get());
}
测试结果:
2. runAsync
runAsync是创建没有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法
//不带返回值的异步请求,默认线程池
publicstaticCompletableFuturerunAsync(Runnablerunnable)
//不带返回值的异步请求,可以自定义线程池
publicstaticCompletableFuturerunAsync(Runnablerunnable,Executorexecutor)
测试代码:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf=CompletableFuture.runAsync(()->{
System.out.println("dosomething....");
});
//等待任务执行完成
System.out.println("结果->"+cf.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
//自定义线程池
ExecutorServiceexecutorService=Executors.newSingleThreadExecutor();
CompletableFuturecf=CompletableFuture.runAsync(()->{
System.out.println("dosomething....");
},executorService);
//等待任务执行完成
System.out.println("结果->"+cf.get());
}
测试结果:
3.获取任务结果的方法
//如果完成则返回结果,否则就抛出具体的异常
publicTget()throwsInterruptedException,ExecutionException
//最大时间等待返回结果,否则就抛出具体异常
publicTget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException
//完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
publicTjoin()
//如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的valueIfAbsent。
publicTgetNow(TvalueIfAbsent)
//如果任务没有完成,返回的值设置为给定值
publicbooleancomplete(Tvalue)
//如果任务没有完成,就抛出给定异常
publicbooleancompleteExceptionally(Throwableex)
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
- 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
- 视频教程:https://doc.iocoder.cn/video/
二、异步回调处理
1. thenApply和thenApplyAsync
thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,带有返回值。
测试代码:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=cf1.thenApplyAsync((result)->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
result+=2;
returnresult;
});
//等待任务1执行完成
System.out.println("cf1结果->"+cf1.get());
//等待任务2执行完成
System.out.println("cf2结果->"+cf2.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=cf1.thenApply((result)->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
result+=2;
returnresult;
});
//等待任务1执行完成
System.out.println("cf1结果->"+cf1.get());
//等待任务2执行完成
System.out.println("cf2结果->"+cf2.get());
}
测试结果:
从上面代码和测试结果我们发现thenApply和thenApplyAsync区别在于,使用thenApply方法时子任务与父任务使用的是同一个线程,而thenApplyAsync在子任务中是另起一个线程执行任务,并且thenApplyAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。
2. thenAccept和thenAcceptAsync
thenAccep表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,无返回值。
测试代码
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=cf1.thenAccept((result)->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
});
//等待任务1执行完成
System.out.println("cf1结果->"+cf1.get());
//等待任务2执行完成
System.out.println("cf2结果->"+cf2.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=cf1.thenAcceptAsync((result)->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
});
//等待任务1执行完成
System.out.println("cf1结果->"+cf1.get());
//等待任务2执行完成
System.out.println("cf2结果->"+cf2.get());
}
测试结果:
从上面代码和测试结果我们发现thenAccep和thenAccepAsync区别在于,使用thenAccep方法时子任务与父任务使用的是同一个线程,而thenAccepAsync在子任务中可能是另起一个线程执行任务,并且thenAccepAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。
3.thenRun和thenRunAsync
thenRun表示某个任务执行完成后执行的动作,即回调方法,无入参,无返回值。
测试代码:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=cf1.thenRun(()->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
});
//等待任务1执行完成
System.out.println("cf1结果->"+cf1.get());
//等待任务2执行完成
System.out.println("cf2结果->"+cf2.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=cf1.thenRunAsync(()->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
});
//等待任务1执行完成
System.out.println("cf1结果->"+cf1.get());
//等待任务2执行完成
System.out.println("cf2结果->"+cf2.get());
}
测试结果:
从上面代码和测试结果我们发现thenRun和thenRunAsync区别在于,使用thenRun方法时子任务与父任务使用的是同一个线程,而thenRunAsync在子任务中可能是另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。
4.whenComplete和whenCompleteAsync
whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。
测试代码:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
inta=1/0;
return1;
});
CompletableFuturecf2=cf1.whenComplete((result,e)->{
System.out.println("上个任务结果:"+result);
System.out.println("上个任务抛出异常:"+e);
System.out.println(Thread.currentThread()+"cf2dosomething....");
});
////等待任务1执行完成
//System.out.println("cf1结果->"+cf1.get());
////等待任务2执行完成
System.out.println("cf2结果->"+cf2.get());
}
测试结果:
whenCompleteAsync和whenComplete区别也是whenCompleteAsync可能会另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。
5.handle和handleAsync
跟whenComplete基本一致,区别在于handle的回调方法有返回值。
测试代码:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
//inta=1/0;
return1;
});
CompletableFuturecf2=cf1.handle((result,e)->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
System.out.println("上个任务结果:"+result);
System.out.println("上个任务抛出异常:"+e);
returnresult+2;
});
//等待任务2执行完成
System.out.println("cf2结果->"+cf2.get());
}
测试结果 :
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
三、多任务组合处理
1. thenCombine、thenAcceptBoth 和runAfterBoth
这三个方法都是将两个CompletableFuture组合起来处理,只有两个任务都正常完成时,才进行下阶段任务。
区别:thenCombine会将两个任务的执行结果作为所提供函数的参数,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
测试代码:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
return2;
});
CompletableFuturecf3=cf1.thenCombine(cf2,(a,b)->{
System.out.println(Thread.currentThread()+"cf3dosomething....");
returna+b;
});
System.out.println("cf3结果->"+cf3.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
return2;
});
CompletableFuturecf3=cf1.thenAcceptBoth(cf2,(a,b)->{
System.out.println(Thread.currentThread()+"cf3dosomething....");
System.out.println(a+b);
});
System.out.println("cf3结果->"+cf3.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
return2;
});
CompletableFuturecf3=cf1.runAfterBoth(cf2,()->{
System.out.println(Thread.currentThread()+"cf3dosomething....");
});
System.out.println("cf3结果->"+cf3.get());
}
测试结果:
2.applyToEither、acceptEither和runAfterEither
这三个方法和上面一样也是将两个CompletableFuture组合起来处理,当有一个任务正常完成时,就会进行下阶段任务。
区别:applyToEither会将已经完成任务的执行结果作为所提供函数的参数,且该方法有返回值;acceptEither同样将已经完成任务的执行结果作为方法入参,但是无返回值;runAfterEither没有入参,也没有返回值。
测试代码:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf1dosomething....");
Thread.sleep(2000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
return"cf1任务完成";
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf2dosomething....");
Thread.sleep(5000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
return"cf2任务完成";
});
CompletableFuturecf3=cf1.applyToEither(cf2,(result)->{
System.out.println("接收到"+result);
System.out.println(Thread.currentThread()+"cf3dosomething....");
return"cf3任务完成";
});
System.out.println("cf3结果->"+cf3.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf1dosomething....");
Thread.sleep(2000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
return"cf1任务完成";
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf2dosomething....");
Thread.sleep(5000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
return"cf2任务完成";
});
CompletableFuturecf3=cf1.acceptEither(cf2,(result)->{
System.out.println("接收到"+result);
System.out.println(Thread.currentThread()+"cf3dosomething....");
});
System.out.println("cf3结果->"+cf3.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf1dosomething....");
Thread.sleep(2000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf1任务完成");
return"cf1任务完成";
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf2dosomething....");
Thread.sleep(5000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf2任务完成");
return"cf2任务完成";
});
CompletableFuturecf3=cf1.runAfterEither(cf2,()->{
System.out.println(Thread.currentThread()+"cf3dosomething....");
System.out.println("cf3任务完成");
});
System.out.println("cf3结果->"+cf3.get());
}
测试结果:
从上面可以看出cf1任务完成需要2秒,cf2任务完成需要5秒,使用applyToEither组合两个任务时,只要有其中一个任务完成时,就会执行cf3任务,显然cf1任务先完成了并且将自己任务的结果传值给了cf3任务,cf3任务中打印了接收到cf1任务完成,接着完成自己的任务,并返回cf3任务完成;acceptEither和runAfterEither类似,acceptEither会将cf1任务的结果作为cf3任务的入参,但cf3任务完成时并无返回值;runAfterEither不会将cf1任务的结果作为cf3任务的入参,它是没有任务入参,执行完自己的任务后也并无返回值。
2. allOf / anyOf
allOf:CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
anyOf :CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果。
测试代码:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf1dosomething....");
Thread.sleep(2000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf1任务完成");
return"cf1任务完成";
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf2dosomething....");
inta=1/0;
Thread.sleep(5000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf2任务完成");
return"cf2任务完成";
});
CompletableFuturecf3=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf2dosomething....");
Thread.sleep(3000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf3任务完成");
return"cf3任务完成";
});
CompletableFuturecfAll=CompletableFuture.allOf(cf1,cf2,cf3);
System.out.println("cfAll结果->"+cfAll.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf1dosomething....");
Thread.sleep(2000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf1任务完成");
return"cf1任务完成";
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf2dosomething....");
Thread.sleep(5000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf2任务完成");
return"cf2任务完成";
});
CompletableFuturecf3=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf2dosomething....");
Thread.sleep(3000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf3任务完成");
return"cf3任务完成";
});
CompletableFuture
测试结果:
审核编辑 :李倩
-
接口
+关注
关注
33文章
8568浏览量
150996 -
线程
+关注
关注
0文章
504浏览量
19674
原文标题:一网打尽:异步神器 CompletableFuture 万字详解!
文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录
相关推荐
评论