Semaphore
Semaphore也是信号量,提供了资源数量的并发访问控制,其使用代码也很简单。如下所示:
Semaphore semaphore = new Semaphore(10, true);
semaphore.acquire();
semaphore.release();
假设有n个线程来获取Semaphore里面的资源(n > 10),n个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。
当初始化的资源个数为1的时候,Semaphore退化为排他锁。
总资源数即state的初始值,在acquire里对state变量进行CAS减操作,减到0之后,线程阻塞;在release里对state变量进行CAS加操作。
CountDownLatch
一个主线程要等待10个Worker线程工作完毕才退出,就能使用CountDownLatch来实现。
CountDownLatch countDownLatch = new CountDownLatch(10);
countDownLatch.await();
countDownLatch.countDown();
调用await()方法的线程会被放入AQS的阻塞队列,进入阻塞状态。
调用countDown()方法会将state值减一,直到state等于0时,一次性唤醒所有阻塞的线程。
最后做下小结,因为是基于AQS阻塞队列来实现的,所以可以让多个线程都阻塞在state = 0条件上,通过countDown()一直减state,减到0后一次性唤醒所有线程。
CyclicBarrier
CyclicBarrier基于ReentrantLock + Condition实现。
构造函数如下:
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
- CyclicBarrier是可以被重用的。
- CyclicBarrier会响应中断。
- 回调函数只会被完成后执行一次。
Exchanger
Exchanger用于线程之间交换数据。
实现原理
Exchange的核心机制和Lock一样,也是CAS + park/unpark。首先,在Exchange内部,有两个内部类:Slot和Node。每个线程在调用exchange()函数交换数据的时候,会先创建一个Node对象,这个Node对象就是对该线程的包装,里面包含了两个字段:一个是该线程要交互的数据,另一个是该线程自身。这里有个关键点:Node本身是继承自AtomicReference的,所以除了这两个字段,Node还有第三个字段,记录的是对方所要交换的数据,初始为NULL。
Slot的AtomicReference就是指向的一个Node,通过Slot和Node相结合,实现了两个线程之间的数据交换。一个Slot只支持两个线程之间交换数据,要实现多个线程并行地交换数据,需要多个Slot,因此在Exchange中定义了Slot数组。
- 当一个线程调用exchange准备和其他线程交换数据的时候,无外乎两种情况,一种是没有其他线程要交换的数据,自己只能自旋或者阻塞,等待;另一种是恰好有其他线程在Slot里面等着,要么和对方交换。
- 由于Slot不止一个,而是多个。如果运气好,根据自己的线程id找到对应的Slot里面恰好有别的线程在等待,就和对方交换。交换的办法是,取出Slot指向的Node,也就是对方的Node,然后把这个Node指向自己的item,唤醒对方,同时返回对方的item。
- 如果运气不好,Slot是空的,如何处理,当前Slot为空,不代表其他Slot没有线程在等待。因此如果当前Slot的index = 0,自己就阻塞,如果index != 0,则需要遍历所有的Slot,看其他的Slot里面是否有线程在等待。最好是遍历一遍发现没有其他线程,自己再在index = 0的位置等待。
Phaser
用Phaser代替CountDownLatch和CyclicBarrier
从JDK1.7开始,新增了一个同步工具类Phaser。
用Phaser代替CountDownLatch
Phaser phaser = new Phaser(10); // 初始为10
phaser.awaitAdvance(phaser.getPhase()); // 主线程调用该方法,阻塞在这。表示等待当前的phase完成
phaser.arrive(); // 10个worker线程,每个线程工作完成之后,调用一次arrive()
用Phaser代替CyclicBarrier
arriveAndAwaitAdance()就是arrive()与awaitAdvance()的组合,表示我自己已到达这个同步点,同时要等待所有人都到达这个同步点,然后再一起前行。
Phaser新特性
- 动态调整线程个数:CyclicBarrier所要同步的线程个数是在构造函数中指定的,之后不能更改,而Phaser可以再运行期间动态的调整要同步的线程个数。
register() // 注册一个
bulkRegister(int parties) // 注册多个
arriveAndDeregister() // 解注册
- 层次Phaser
在Phaser内部结构中,每个Phaser记录了自己的父节点,但并没有记录自己的子节点列表。所以,每个Phaser知道自己的父节点是谁,但父节点并不知道自己有多少个子节点,对父节点的操作,是通过子节点来实现的。
Phaser root = new Phaser(10);
Phaser c1 = new Phaser(root, 10);
Phaser c2 = new Phaser(root, 10);
Phaser c3 = new Phaser(c1, 10);
对于树状Phaser上的每个节点来说,可以当作一个独立的Phaser来看待,其运作机制和一个单独的Phaser是一样的。具体来讲:父Phaser并不用感知子Phaser的存在,当子Phaser中注册的参与者数量大于0时,会把自己向父节点注册;当子Phaser中注册的参与者数量等于0时,会自动向父节点解注册。父Phaser把子Phaser当作一个正常的参与的线程就可以了。
state变量解析
Phaser没有基于AQS来实现,但具备了AQS的核心特性:state变量、CAS操作、阻塞队列。先从state变量说起。
这个64位的state变量被拆成4部分:最高位0表示未同步完成,1表示同步完成,初始最高位为0。
阻塞与唤醒
基于上述的state变量,对其执行CAS操作,并进行相应的阻塞与唤醒。如图,右边的主线程会调用awaitAdvance()进行阻塞;左边的arrive()会对state进行CAS的累减操作,当未到达的线程数减到0时,唤醒右边阻塞的主线程。