0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

CyclicBarrier 任务实践

科技绿洲 来源:Java技术指北 作者:Java技术指北 2023-10-13 11:10 次阅读

下面用一个小 demo,对 CyclicBarrier 有一个初步的印象。

public class Test implements Runnable{
    
    //定义一个循环栅栏
    private CyclicBarrier cyclicBarrier = new CyclicBarrier(3,this);
    //存放每个线程的数据
    private BlockingQueue< Integer > list = new ArrayBlockingQueue<  >(3);

    public void count() {
        //运行 3 个线程
        for(int i = 0;i< 3;i++){
            new Thread(new Runnable() {

                public void run() {
                    //这里可以存放更复杂的操作,比如查询 SQL、写入 Excel 等等
                    Random r = new Random();
                    int a = r.nextInt(100);
                    System.out.println("线程 " + Thread.currentThread().getName() + "获得数字:" + a);
                    list.add(a);
                    try {
                        //线程到达栅栏后,等待
                        cyclicBarrier.await();
                    }catch (Exception e){
                        e.printStackTrace();
                    }

                }
            }).start();
        }
    }
    //这里就是线程一起跨过栅栏后执行的任务
    public void run() {
        int result = 0;
        for (Integer i: list) {
            result += i;
        }
        System.out.println(result);
    }

    public static void main(String[] args){
        Test test = new Test();
        test.count();
    }
}

这个 demo 中一共 3 个线程,每个线程都随机获取一个数字(在实际生产代码中会有更复杂的操作),最后将每个线程获取的数字相加后打印最后的结果。

源码分析

内部类

CyclicBarrier 的一个内部类,Generation 被翻译成为“代”。当这一代的所有线程都到达栅栏后可以开启下一代,所以才被成为循环栅栏。broken 属性表示栅栏是否被打破了。

private static class Generation {
    boolean broken = false;
}

属性与构造函数

//可重入的 ReentrantLock 锁,非公平锁
private final ReentrantLock lock = new ReentrantLock();
//lock 的条件队列
private final Condition trip = lock.newCondition();
//线程的数量
private final int parties;
//所有线程到达后,可执行的方法
private final Runnable barrierCommand;
//当前代
private Generation generation = new Generation();
//当前代还需要等待线程的数量
private int count;

public CyclicBarrier (int parties) {
    //调用 2 个参数的构造函数
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    //检查线程的数量是否小于 0
    if (parties <= 0) throw new IllegalArgumentException();
    //设置线程数、需要等待的线程数量
    this.parties = parties;
    this.count = parties;
    //设置所有线程都到达的时候需要运行的方法
    this.barrierCommand = barrierAction;
}

从上面的内容和 demo 粗略的可以看出,CyclicBarrier 在初始化时设置了线程数量 parties,必须等待所有的线程都到栅栏处 cyclicBarrier.await() 时才可以运行 barrierCommand 方法。

如果还有线程没有到达栅栏处,会将先到达栅栏处的线程放入 trip 条件队列中等待最后一个线程到达。

图片

await()

//CyclicBarrier.await()
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        //调用了 dowait()
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe);
    }
}

外部调用 await() 方法,等待线程到达栅栏后一起执行后续的操作。await() 可以被复用,每多调用一次 await() 就表示多增加一代,第一次调用是一代、第二次调用是二代、第三次调用是三代...。

dowait()

dowait() 是 CyclicBarrier 的核心方法。

//CyclicBarrier.dowait()
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    //加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        
        final Generation g = generation;
        //判断当前代的栅栏是否被打断了
        if (g.broken)
            throw new BrokenBarrierException();
        //线程是否被中断
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        //到达一个线程,数量就减去 1,直到最后一个线程
        int index = --count;
        if (index == 0) {
            //是否执行了 barrierCommand 的标识位
            boolean ranAction = false;
            try {
                //所有线程到达后,一起执行的方法
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                //进入下一代
                nextGeneration();
                return 0;
            } finally {
                //barrierCommand 没被执行,打破栅栏
                if (!ranAction)
                    breakBarrier();
            }
        }

        //无限循环,这里只有最后一个线程不进入循环
        for (;;) {
            try {
                //没有设置需要等待的时间
                if (!timed)
                    //到达栅栏的线程进入 trip 条件队列等待被唤醒
                    trip.await();
                //等待的时间还没有超时
                else if (nanos > 0L)
                    //等待指定的时间
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                //出了异常且当前代没有打破栅栏,那么打破栅栏并且抛出异常
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    //中断当前线程
                    Thread.currentThread().interrupt();
                }
            }
            
            //检查
            if (g.broken)
                throw new BrokenBarrierException();
            //不是当前的代
            if (g != generation)
                return index;
            //等待超时了
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        //解锁,出队
        lock.unlock();
    }
}

dowait() 的运行被分成了 2 部分:

  1. 最后一个线程的时候,进入运行 barrierCommand 方法的流程,并且进入下一代。
  2. 前面的其他线程都进入循环中,将线程添加到 trip 的条件队列中,等待最后一个线程将它们唤醒。

图片

nextGeneration()

//CyclicBarrier.nextGeneration()
 private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();
}

CyclicBarrier 为什么会一代结束后可以开始下一代,就靠这个 nextGeneration() 方法,它干了三件事:

  1. trip.signalAll() 方法将 trip() 条件队列中的线程全部转移到 AQS 队列中去。AQS 队列中出队是在 lock.unlock() 的时候。
  2. 将线程的数量重置。
  3. 初始化一个新的代。

总结

CyclicBarrier 使用了两个队列,一个条件队列,一个 AQS 队列,在 trip.await() 出进入条件队列。当最后一个线程到达栅栏出的时候,条件队列中的线程全部移动到 AQS 队列中,要注意的是最后一个线程并没有进入 AQS 队列中。在 lock.unlock() 的时候 AQS 队列中的线程出队。

CyclicBarrier 基于 ReentrantLock 和 Condition 实现同步线程的逻辑。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
收藏 人收藏

    评论

    相关推荐

    请教:有SPC560B的多任务实时系统的例程吗(freertos)

    请教:有SPC560B的多任务实时系统的例程吗(freertos)
    发表于 01-15 17:04

    请问微型抢占式多任务实时内核怎样去设计?

    微型抢占式多任务实时内核怎样去设计?
    发表于 04-27 06:04

    如何通过在GPS多任务实时操作系统成功移植uip0.6协议栈?

    本系统通过在GPS多任务实时操作系统中增加一个通信任务,并成功移植了uip0.6协议栈,完成了链路层PPP协议及底层硬件的驱动调用,使得GPS定位数据能够实时、准确传输。经过实践证明,该系统传输过程稳定,数据传输无误。
    发表于 06-04 06:08

    AT89S51单片机实验及实践教程

    技术拉幕式数码显示技术 1 1 1 1 实验任务实任务 用 AT89S51 单片机的 P0 0 AD0 P0 7 AD7 端口接数码管的a h 端 8 位数码管的 S1 S8 通过 74LS138 译码...
    发表于 07-15 07:50

    STM32裸板多任务实

    对于大部分的项目来讲,STM32基本都是处于“裸奔”的状态,将程序全部放在一个while(1)中顺序执行,但也会有部分任务要求几个任务“同时进行”,这个时候就需要一个RTOS了,但是有的时候就是这样
    发表于 08-24 06:37

    什么是多任务系统?FreeRTOS任务与协程简析

    功能,初学者必须先掌握——任务的创建、删除、挂起和恢复等操作。本章节分为如下几部分:*什么是多任务系统*FreeRTOS任务与协程*初次使用*任务状态*
    发表于 02-18 06:38

    黑客任务实战服务器攻防篇

    黑客任务实战服务器攻防篇网站服务器的黑客攻防一直是网络安全中最重要的一部分。本书作者在经过数月的努力之后,终于将网站服务器的黑客攻防以深入浅出、简单易懂的方
    发表于 10-22 11:03 0次下载
    黑客<b class='flag-5'>任务实</b>战服务器攻防篇

    电子商务实践教学

    电子商务实践教学 电子商务实践教学的重要性 以项目为依托进行实践教学 专业实验教学总体架构 实践教学应用实例 电子商务应用研究
    发表于 04-28 16:53 0次下载

    基于TMS320LF2407的多任务实时处理系统设计

    基于TMS320LF2407的多任务实时处理系统设计
    发表于 05-06 15:39 5次下载

    基于LabVIEW平台的多任务实时测控系统的实现_刘为玉

    基于LabVIEW平台的多任务实时测控系统的实现_刘为玉
    发表于 03-18 09:26 9次下载

    python定时任务实践

    由于程序需求,监测配置变化需要设置定时任务,每分钟执行一次,对任务持久化要求不高,不需要时可以关闭定时任务
    的头像 发表于 05-20 17:53 960次阅读
    python定时<b class='flag-5'>任务实践</b>

    FreeRTOS任务实现和控制块

    任务实现 FreeRTOS 官方给出的任务函数模板如下: void vATaskFunction (void *pvParameters){ for ( ; ; ) { -- 任务应用程序
    的头像 发表于 09-28 11:28 584次阅读

    基于RTX51嵌入式多任务实时操作系统的智能卡电能表设计

    电子发烧友网站提供《基于RTX51嵌入式多任务实时操作系统的智能卡电能表设计.pdf》资料免费下载
    发表于 10-26 09:12 2次下载
    基于RTX51嵌入式多<b class='flag-5'>任务实</b>时操作系统的智能卡电能表设计

    SAP半导体及光伏行业最佳业务实践

    电子发烧友网站提供《SAP半导体及光伏行业最佳业务实践.ppt》资料免费下载
    发表于 11-02 14:32 0次下载
    SAP半导体及光伏行业最佳业<b class='flag-5'>务实践</b>

    电气控制及PLC综合实践任务

    电气控制及PLC综合实践_任务
    发表于 10-17 13:47 0次下载