0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

代码改成多线程调用之后带来的9大问题

jf_ro2CN3Fa 来源:苏三说技术 2023-04-17 10:19 次阅读

前言

很多时候,我们为了提升接口的性能,会把之前单线程同步执行的代码,改成多线程异步执行。

比如:查询用户信息接口,需要返回用户基本信息、积分信息、成长值信息,而用户、积分和成长值,需要调用不同的接口获取数据。

如果查询用户信息接口,同步调用三个接口获取数据,会非常耗时。

这就非常有必要把三个接口调用,改成异步调用,最后汇总结果。

再比如:注册用户接口,该接口主要包含:写用户表,分配权限,配置用户导航页,发通知消息等功能。

该用户注册接口包含的业务逻辑比较多,如果在接口中同步执行这些代码,该接口响应时间会非常慢。

这时就需要把业务逻辑梳理一下,划分:核心逻辑和非核心逻辑。这个例子中的核心逻辑是:写用户表和分配权限,非核心逻辑是:配置用户导航页和发通知消息。

显然核心逻辑必须在接口中同步执行,而非核心逻辑可以多线程异步执行。

等等。

需要使用多线程的业务场景太多了,使用多线程异步执行的好处不言而喻。

但我要说的是,如果多线程没有使用好,它也会给我们带来很多意想不到的问题,不信往后继续看。

今天跟大家一起聊聊,代码改成多线程调用之后,带来的9大问题。

1.获取不到返回值

如果你通过直接继承Thread类,或者实现Runnable接口的方式去创建线程。

那么,恭喜你,你将没法获取该线程方法的返回值。

使用线程的场景有两种:

不需要关注线程方法的返回值。

需要关注线程方法的返回值。

大部分业务场景是不需要关注线程方法返回值的,但如果我们有些业务需要关注线程方法的返回值该怎么处理呢?

查询用户信息接口,需要返回用户基本信息、积分信息、成长值信息,而用户、积分和成长值,需要调用不同的接口获取数据。

如下图所示:

d4001c3c-dcc0-11ed-bfe3-dac502259ad0.png

Java8之前可以通过实现Callable接口,获取线程返回结果。

Java8以后通过CompleteFuture类实现该功能。我们这里以CompleteFuture为例:

publicUserInfogetUserInfo(Longid)throwsInterruptedException,ExecutionException{
finalUserInfouserInfo=newUserInfo();
CompletableFutureuserFuture=CompletableFuture.supplyAsync(()->{
getRemoteUserAndFill(id,userInfo);
returnBoolean.TRUE;
},executor);

CompletableFuturebonusFuture=CompletableFuture.supplyAsync(()->{
getRemoteBonusAndFill(id,userInfo);
returnBoolean.TRUE;
},executor);

CompletableFuturegrowthFuture=CompletableFuture.supplyAsync(()->{
getRemoteGrowthAndFill(id,userInfo);
returnBoolean.TRUE;
},executor);
CompletableFuture.allOf(userFuture,bonusFuture,growthFuture).join();

userFuture.get();
bonusFuture.get();
growthFuture.get();

returnuserInfo;
}

温馨提醒一下,这两种方式别忘了使用线程池。示例中我用到了executor,表示自定义的线程池,为了防止高并发场景下,出现线程过多的问题。

此外,Fork/join框架也提供了执行任务并返回结果的能力。

2.数据丢失

我们还是以注册用户接口为例,该接口主要包含:写用户表,分配权限,配置用户导航页,发通知消息等功能。

其中:写用户表和分配权限功能,需要在一个事务中同步执行。而剩余的配置用户导航页和发通知消息功能,使用多线程异步执行。

表面上看起来没问题。

但如果前面的写用户表和分配权限功能成功了,用户注册接口就直接返回成功了。

但如果后面异步执行的配置用户导航页,或发通知消息功能失败了,怎么办?

如下图所示:

d4064a9e-dcc0-11ed-bfe3-dac502259ad0.png

该接口前面明明已经提示用户成功了,但结果后面又有一部分功能在多线程异步执行中失败了。

这时该如何处理呢?

没错,你可以做失败重试。

但如果重试了一定的次数,还是没有成功,这条请求数据该如何处理呢?如果不做任何处理,该数据是不是就丢掉了?

为了防止数据丢失,可以用如下方案:

使用mq异步处理。在分配权限之后,发送一条mq消息,到mq服务器,然后在mq的消费者中使用多线程,去配置用户导航页和发通知消息。如果mq消费者中处理失败了,可以自己重试。

使用job异步处理。在分配权限之后,往任务表中写一条数据。然后有个job定时扫描该表,然后配置用户导航页和发通知消息。如果job处理某条数据失败了,可以在表中记录一个重试次数,然后不断重试。但该方案有个缺点,就是实时性可能不太高。

3.顺序问题

如果你使用了多线程,就必须接受一个非常现实的问题,即顺序问题。

假如之前代码的执行顺序是:a,b,c,改成多线程执行之后,代码的执行顺序可能变成了:a,c,b。(这个跟cpu调度算法有关)

例如:

publicstaticvoidmain(String[]args){
Threadthread1=newThread(()->System.out.println("a"));
Threadthread2=newThread(()->System.out.println("b"));
Threadthread3=newThread(()->System.out.println("c"));

thread1.start();
thread2.start();
thread3.start();
}

执行结果:

a
c
b

那么,来自灵魂的一问:如何保证线程的顺序呢?

即线程启动的顺序是:a,b,c,执行的顺序也是:a,b,c。

如下图所示:

d413f73e-dcc0-11ed-bfe3-dac502259ad0.png

3.1 join

Thread类的join方法它会让主线程等待子线程运行结束后,才能继续运行。

列如:

publicstaticvoidmain(String[]args)throwsInterruptedException{
Threadthread1=newThread(()->System.out.println("a"));
Threadthread2=newThread(()->System.out.println("b"));
Threadthread3=newThread(()->System.out.println("c"));

thread1.start();
thread1.join();
thread2.start();
thread2.join();
thread3.start();
}

执行结果永远都是:

a
b
c

3.2 newSingleThreadExecutor

我们可以使用JDK自带的Excutors类的newSingleThreadExecutor方法,创建一个单线程的线程池。

例如:

publicstaticvoidmain(String[]args){
ExecutorServiceexecutorService=Executors.newSingleThreadExecutor();

Threadthread1=newThread(()->System.out.println("a"));
Threadthread2=newThread(()->System.out.println("b"));
Threadthread3=newThread(()->System.out.println("c"));

executorService.submit(thread1);
executorService.submit(thread2);
executorService.submit(thread3);

executorService.shutdown();
}

执行结果永远都是:

a
b
c

使用Excutors类的newSingleThreadExecutor方法创建的单线程的线程池,使用了LinkedBlockingQueue作为队列,而此队列按 FIFO(先进先出)排序元素。

添加到队列的顺序是a,b,c,则执行的顺序也是a,b,c。

3.3 CountDownLatch

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。

例如:

publicclassThreadTest{

publicstaticvoidmain(String[]args)throwsInterruptedException{
CountDownLatchlatch1=newCountDownLatch(0);
CountDownLatchlatch2=newCountDownLatch(1);
CountDownLatchlatch3=newCountDownLatch(1);

Threadthread1=newThread(newTestRunnable(latch1,latch2,"a"));
Threadthread2=newThread(newTestRunnable(latch2,latch3,"b"));
Threadthread3=newThread(newTestRunnable(latch3,latch3,"c"));

thread1.start();
thread2.start();
thread3.start();
}
}

classTestRunnableimplementsRunnable{

privateCountDownLatchlatch1;
privateCountDownLatchlatch2;
privateStringmessage;

TestRunnable(CountDownLatchlatch1,CountDownLatchlatch2,Stringmessage){
this.latch1=latch1;
this.latch2=latch2;
this.message=message;
}

@Override
publicvoidrun(){
try{
latch1.await();
System.out.println(message);
}catch(InterruptedExceptione){
e.printStackTrace();
}
latch2.countDown();
}
}

执行结果永远都是:

a
b
c

此外,使用CompletableFuture的thenRun方法,也能多线程的执行顺序,在这里就不一一介绍了。

4.线程安全问题

既然使用了线程,伴随而来的还会有线程安全问题。

假如现在有这样一个需求:用多线程执行查询方法,然后把执行结果添加到一个list集合中。

代码如下:

Listlist=Lists.newArrayList();
dataList.stream()
.map(data->CompletableFuture
.supplyAsync(()->query(list,data),asyncExecutor)
));
CompletableFuture.allOf(futureArray).join();

使用CompletableFuture异步多线程执行query方法:

publicvoidquery(Listlist,UserEntitycondition){
Useruser=queryByCondition(condition);
if(Objects.isNull(user)){
return;
}
list.add(user);
UserExtenduserExtend=queryByOther(condition);
if(Objects.nonNull(userExtend)){
user.setExtend(userExtend.getInfo());
}
}

在query方法中,将获取的查询结果添加到list集合中。

结果list会出现线程安全问题,有时候会少数据,当然也不一定是必现的。

这是因为ArrayList是非线程安全的,没有使用synchronized等关键字修饰。

如何解决这个问题呢?

答:使用CopyOnWriteArrayList集合,代替普通的ArrayList集合,CopyOnWriteArrayList是一个线程安全的机会。

只需一行小小的改动即可:

ListlistLists.newCopyOnWriteArrayList();

温馨的提醒一下,这里创建集合的方式,用了google的collect包。

5.ThreadLocal获取数据异常

我们都知道JDK为了解决线程安全问题,提供了一种用空间换时间的新思路:ThreadLocal。

它的核心思想是:共享变量在每个线程都有一个副本,每个线程操作的都是自己的副本,对另外的线程没有影响。

例如:

@Service
publicclassThreadLocalService{
privatestaticfinalThreadLocalthreadLocal=newThreadLocal<>();

publicvoidadd(){
threadLocal.set(1);
doSamething();
Integerinteger=threadLocal.get();
}
}

ThreadLocal在普通中线程中,的确能够获取正确的数据。

但在真实的业务场景中,一般很少用单独的线程,绝大多数,都是用的线程池。

那么,在线程池中如何获取ThreadLocal对象生成的数据呢?

如果直接使用普通ThreadLocal,显然是获取不到正确数据的。

我们先试试InheritableThreadLocal,具体代码如下:

privatestaticvoidfun1(){
InheritableThreadLocalthreadLocal=newInheritableThreadLocal<>();
threadLocal.set(6);
System.out.println("父线程获取数据:"+threadLocal.get());

ExecutorServiceexecutorService=Executors.newSingleThreadExecutor();

threadLocal.set(6);
executorService.submit(()->{
System.out.println("第一次从线程池中获取数据:"+threadLocal.get());
});

threadLocal.set(7);
executorService.submit(()->{
System.out.println("第二次从线程池中获取数据:"+threadLocal.get());
});
}

执行结果:

父线程获取数据:6
第一次从线程池中获取数据:6
第二次从线程池中获取数据:6

由于这个例子中使用了单例线程池,固定线程数是1。

第一次submit任务的时候,该线程池会自动创建一个线程。因为使用了InheritableThreadLocal,所以创建线程时,会调用它的init方法,将父线程中的inheritableThreadLocals数据复制到子线程中。所以我们看到,在主线程中将数据设置成6,第一次从线程池中获取了正确的数据6。

之后,在主线程中又将数据改成7,但在第二次从线程池中获取数据却依然是6。

因为第二次submit任务的时候,线程池中已经有一个线程了,就直接拿过来复用,不会再重新创建线程了。所以不会再调用线程的init方法,所以第二次其实没有获取到最新的数据7,还是获取的老数据6。

那么,这该怎么办呢?

答:使用TransmittableThreadLocal,它并非JDK自带的类,而是阿里巴巴开源jar包中的类。

可以通过如下pom文件引入该jar包:


com.alibaba
transmittable-thread-local
2.11.0
compile

代码调整如下:

privatestaticvoidfun2()throwsException{
TransmittableThreadLocalthreadLocal=newTransmittableThreadLocal<>();
threadLocal.set(6);
System.out.println("父线程获取数据:"+threadLocal.get());

ExecutorServicettlExecutorService=TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(1));

threadLocal.set(6);
ttlExecutorService.submit(()->{
System.out.println("第一次从线程池中获取数据:"+threadLocal.get());
});

threadLocal.set(7);
ttlExecutorService.submit(()->{
System.out.println("第二次从线程池中获取数据:"+threadLocal.get());
});

}

执行结果:

父线程获取数据:6
第一次从线程池中获取数据:6
第二次从线程池中获取数据:7

我们看到,使用了TransmittableThreadLocal之后,第二次从线程中也能正确获取最新的数据7了。

nice。

如果你仔细观察这个例子,你可能会发现,代码中除了使用TransmittableThreadLocal类之外,还使用了TtlExecutors.getTtlExecutorService方法,去创建ExecutorService对象。

这是非常重要的地方,如果没有这一步,TransmittableThreadLocal在线程池中共享数据将不会起作用。

创建ExecutorService对象,底层的submit方法会TtlRunnable或TtlCallable对象。

以TtlRunnable类为例,它实现了Runnable接口,同时还实现了它的run方法:

publicvoidrun(){
Map,Object>copied=(Map)this.copiedRef.get();
if(copied!=null&&(!this.releaseTtlValueReferenceAfterRun||this.copiedRef.compareAndSet(copied,(Object)null))){
Mapbackup=TransmittableThreadLocal.backupAndSetToCopied(copied);

try{
this.runnable.run();
}finally{
TransmittableThreadLocal.restoreBackup(backup);
}
}else{
thrownewIllegalStateException("TTLvaluereferenceisreleasedafterrun!");
}
}

这段代码的主要逻辑如下:

把当时的ThreadLocal做个备份,然后将父类的ThreadLocal拷贝过来。

执行真正的run方法,可以获取到父类最新的ThreadLocal数据。

从备份的数据中,恢复当时的ThreadLocal数据。

6.OOM问题

众所周知,使用多线程可以提升代码执行效率,但也不是绝对的。

对于一些耗时的操作,使用多线程,确实可以提升代码执行效率。

但线程不是创建越多越好,如果线程创建多了,也可能会导致OOM异常。

例如:

Causedby:
java.lang.OutOfMemoryError:unabletocreatenewnativethread

在JVM中创建一个线程,默认需要占用1M的内存空间。

如果创建了过多的线程,必然会导致内存空间不足,从而出现OOM异常。

除此之外,如果使用线程池的话,特别是使用固定大小线程池,即使用Executors.newFixedThreadPool方法创建的线程池。

该线程池的核心线程数和最大线程数是一样的,是一个固定值,而存放消息的队列是LinkedBlockingQueue。

该队列的最大容量是Integer.MAX_VALUE,也就是说如果使用固定大小线程池,存放了太多的任务,有可能也会导致OOM异常。

java.lang.OutOfMemeryError:Javaheapspace

7.CPU使用率飙高

不知道你有没有做过excel数据导入功能,需要将一批excel的数据导入到系统中。

每条数据都有些业务逻辑,如果单线程导入所有的数据,导入效率会非常低。

于是改成了多线程导入。

如果excel中有大量的数据,很可能会出现CPU使用率飙高的问题。

我们都知道,如果代码出现死循环,cpu使用率会飚的很多高。因为代码一直在某个线程中循环,没法切换到其他线程,cpu一直被占用着,所以会导致cpu使用率一直高居不下。

而多线程导入大量的数据,虽说没有死循环代码,但由于多个线程一直在不停的处理数据,导致占用了cpu很长的时间。

也会出现cpu使用率很高的问题。

那么,如何解决这个问题呢?

答:使用Thread.sleep休眠一下。

在线程中处理完一条数据,休眠10毫秒。

当然CPU使用率飙高的原因很多,多线程处理数据和死循环只是其中两种,还有比如:频繁GC、正则匹配、频繁序列化和反序列化等。

后面我会写一篇介绍CPU使用率飙高的原因的专题文章,感兴趣的小伙伴,可以关注一下我后续的文章。

8.事务问题

在实际项目开发中,多线程的使用场景还是挺多的。如果spring事务用在多线程场景中,会有问题吗?

例如:

@Slf4j
@Service
publicclassUserService{

@Autowired
privateUserMapperuserMapper;
@Autowired
privateRoleServiceroleService;

@Transactional
publicvoidadd(UserModeluserModel)throwsException{
userMapper.insertUser(userModel);
newThread(()->{
roleService.doOtherThing();
}).start();
}
}

@Service
publicclassRoleService{

@Transactional
publicvoiddoOtherThing(){
System.out.println("保存role表数据");
}
}

从上面的例子中,我们可以看到事务方法add中,调用了事务方法doOtherThing,但是事务方法doOtherThing是在另外一个线程中调用的。

这样会导致两个方法不在同一个线程中,获取到的数据库连接不一样,从而是两个不同的事务。如果想doOtherThing方法中抛了异常,add方法也回滚是不可能的。

如果看过spring事务源码的朋友,可能会知道spring的事务是通过数据库连接来实现的。当前线程中保存了一个map,key是数据源,value是数据库连接。

privatestaticfinalThreadLocal>resources=

newNamedThreadLocal<>("Transactionalresources");

我们说的同一个事务,其实是指同一个数据库连接,只有拥有同一个数据库连接才能同时提交和回滚。如果在不同的线程,拿到的数据库连接肯定是不一样的,所以是不同的事务。

所以不要在事务中开启另外的线程,去处理业务逻辑,这样会导致事务失效。

9.导致服务挂掉

使用多线程会导致服务挂掉,这不是危言耸听,而是确有其事。

假设现在有这样一种业务场景:在mq的消费者中需要调用订单查询接口,查到数据之后,写入业务表中。

本来是没啥问题的。

突然有一天,mq生产者跑了一个批量数据处理的job,导致mq服务器上堆积了大量的消息。

此时,mq消费者的处理速度,远远跟不上mq消息的生产速度,导致的结果是出现了大量的消息堆积,对用户有很大的影响。

为了解决这个问题,mq消费者改成多线程处理,直接使用了线程池,并且最大线程数配置成了20。

这样调整之后,消息堆积问题确实得到了解决。

但带来了另外一个更严重的问题:订单查询接口并发量太大了,有点扛不住压力,导致部分节点的服务直接挂掉。

d41cac9e-dcc0-11ed-bfe3-dac502259ad0.png

为了解决问题,不得不临时加服务节点。

在mq的消费者中使用多线程,调用接口时,一定要评估好接口能够承受的最大访问量,防止因为压力过大,而导致服务挂掉的问题。





审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • cpu
    cpu
    +关注

    关注

    68

    文章

    10882

    浏览量

    212224
  • 服务器
    +关注

    关注

    12

    文章

    9237

    浏览量

    85666
  • JAVA
    +关注

    关注

    19

    文章

    2972

    浏览量

    104866

原文标题:麻了,代码改成多线程,竟有9大问题

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    socket 多线程编程实现方法

    是指在同一个进程中运行多个线程,每个线程可以独立执行任务。线程共享进程的资源,如内存空间和文件句柄,但每个线程有自己的程序计数器、寄存器集合和堆栈。
    的头像 发表于 11-12 14:16 398次阅读

    Python中多线程和多进程的区别

    Python作为一种高级编程语言,提供了多种并发编程的方式,其中多线程与多进程是最常见的两种方式之一。在本文中,我们将探讨Python中多线程与多进程的概念、区别以及如何使用线程池与进程池来提高并发执行效率。
    的头像 发表于 10-23 11:48 424次阅读
    Python中<b class='flag-5'>多线程</b>和多进程的区别

    ESP32会不会有多线程问题,需要加锁吗?

    ESP32会不会有多线程问题,需要加锁吗
    发表于 07-19 08:05

    多线程设计模式到对 CompletableFuture 的应用

    最近在开发 延保服务 频道页时,为了提高查询效率,使用到了多线程技术。为了对多线程方案设计有更加充分的了解,在业余时间读完了《图解 Java 多线程设计模式》这本书,觉得收获良多。本篇文章将介绍其中
    的头像 发表于 06-26 14:18 373次阅读
    从<b class='flag-5'>多线程</b>设计模式到对 CompletableFuture 的应用

    LWIP中udp_send(upcb, p); 之后马上调用pbuf_free(p);可以吗 ?

    如题:LWIP中udp_send(upcb, p); 之后马上调用 pbuf_free(p); 行么 ? udp_send(upcb, p);调用之后, 数据不一定就发出了,至少在申请 arp
    发表于 04-26 07:07

    bootloader开多线程做引导程序,跳app初始化后直接进hardfualt,为什么?

    如标题,想做一个远程升级的项目,bootloader引导区域和app都是开多线程跑的,就是自己写了个小的任务调度器,没什么功能主要是想让程序快速的响应,延时不会对其他程序造成堵塞,程序测试
    发表于 04-18 06:07

    鸿蒙OS开发实例:【ArkTS类库多线程CPU密集型任务TaskPool】

    CPU密集型任务是指需要占用系统资源处理大量计算能力的任务,需要长时间运行,这段时间会阻塞线程其它事件的处理,不适宜放在主线程进行。例如图像处理、视频编码、数据分析等。 基于多线程并发机制处理CPU密集型任务可以提高CPU
    的头像 发表于 04-01 22:25 859次阅读
    鸿蒙OS开发实例:【ArkTS类库<b class='flag-5'>多线程</b>CPU密集型任务TaskPool】

    鸿蒙APP开发:【ArkTS类库多线程】TaskPool和Worker的对比(2)

    创建Worker的线程称为宿主线程(不一定是主线程,工作线程也支持创建Worker子线程),Worker自身的
    的头像 发表于 03-27 15:44 547次阅读
    鸿蒙APP开发:【ArkTS类库<b class='flag-5'>多线程</b>】TaskPool和Worker的对比(2)

    鸿蒙APP开发:【ArkTS类库多线程】TaskPool和Worker的对比

    TaskPool(任务池)和Worker的作用是为应用程序提供一个多线程的运行环境,用于处理耗时的计算任务或其他密集型任务。可以有效地避免这些任务阻塞主线程,从而最大化系统的利用率,降低整体资源消耗,并提高系统的整体性能。
    的头像 发表于 03-26 22:09 670次阅读
    鸿蒙APP开发:【ArkTS类库<b class='flag-5'>多线程</b>】TaskPool和Worker的对比

    鸿蒙原生应用开发-ArkTS语言基础类库多线程TaskPool和Worker的对比(一)

    TaskPool(任务池)和Worker的作用是为应用程序提供一个多线程的运行环境,用于处理耗时的计算任务或其他密集型任务。可以有效地避免这些任务阻塞主线程,从而最大化系统的利用率,降低整体资源消耗
    发表于 03-25 14:11

    鸿蒙原生应用开发-ArkTS语言基础类库多线程I/O密集型任务开发

    使用异步并发可以解决单次I/O任务阻塞的问题,但是如果遇到I/O密集型任务,同样会阻塞线程中其它任务的执行,这时需要使用多线程并发能力来进行解决。 I/O密集型任务的性能重点通常不在于CPU的处理
    发表于 03-21 14:57

    java实现多线程的几种方式

    Java实现多线程的几种方式 多线程是指程序中包含了两个或以上的线程,每个线程都可以并行执行不同的任务或操作。Java中的多线程可以提高程序
    的头像 发表于 03-14 16:55 754次阅读

    python中5种线程锁盘点

    线程安全是多线程或多进程编程中的一个概念,在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个
    发表于 03-07 11:08 1623次阅读
    python中5种<b class='flag-5'>线程</b>锁盘点

    AT socket可以多线程调用吗?

    请问AT socket 可以多线程调用吗? 有互锁机制吗,还是要自己做互锁。
    发表于 03-01 08:22

    linux多线程编程实例

    linux线程
    的头像 发表于 02-15 21:16 491次阅读
    linux<b class='flag-5'>多线程</b>编程实例