BlockingQueue
在所有的并发容器中,BlockingQueue是最常见的一种。BlockingQueue是一个带阻塞功能的队列,当入队列时,若队列已满,则阻塞调用者;当出队列时,若队列为空,则阻塞调用者。
在Concurrent包中,BlockingQueue是一个接口,有许多不同的实现类:
ArrayBlockingQueue
ArrayBlockingQueue是一个用数组实现的环形队列,在构造函数中,会要求传入数组的容量。
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
LinkedBlockingQueue
LinkedBlockingQueue是一种基于单向链表的阻塞队列。因为队头和队尾是两个指针分开操作的。所以用了2把锁 + 2个条件,同时有1个AtomicInteger的原子变量记录count数。
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
在其构造函数中,也可以指定队列的总容量。如果不指定,默认为Integer.MAX_VALUE。
LinkedBlockingQueue和ArrayBlockingQueue的实现有一些差异,有几点要特别说明:
- 为了提高并发度,用2把锁,分别控制队头、队尾的操作。意味着在put和put之间take和take之间是互斥的,put和take之间并不互斥。但对于count变量,双方都需要操作,所以必须是原子类型。
- 因为各自拿了一把锁,所以当需要调用对方的condition的signal时,还必须加上对方的锁,就是signalNotEmpty和signalNotFull函数。
- 不仅put会通知take,take也会通知put,当put发现非满的时候,也会通知其他的put线程,当take发现非空的时候,也会通知其他take线程。
PriorityBlockingQueue
队列通常是先进先出的,而PriorityQueue是按照元素的优先级从小到大出队列的。正因为如此,PriorityQueue中的2个元素之间需要可以比较大小,并实现Comparable接口。
其构造函数如下所示,如果不指定初始大小,内部会设定一个默认值11,当元素个数超过这个大小之后,会自动扩容。
/**
* Creates a {@code PriorityBlockingQueue} with the specified initial
* capacity that orders its elements according to the specified
* comparator.
*
* @param initialCapacity the initial capacity for this priority queue
* @param comparator the comparator that will be used to order this
* priority queue. If {@code null}, the {@linkplain Comparable
* natural ordering} of the elements will be used.
* @throws IllegalArgumentException if {@code initialCapacity} is less
* than 1
*/
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
在阻塞的实现方面,和ArrayBlockingQeuue的机制相似,主要区别是用数组实现了一个二叉堆,从而实现按优先级从小到大出队列。另一个区别是没有notFull条件,当元素个数超出数组长度时,执行扩容操作。
DelayQueue
DelayQueue即延迟队列,也就是一个按延迟时间从小到大出队的PriorityQueue。所谓延迟时间,就是未来将要执行的时间 - 当前时间。为此,放入DelayQueue中的元素,必须实现DelayQueue接口。
关于该接口,有两点说明:
- 如果getDelay的返回值小于或等于0,则说明该元素到期,需要从队列中拿出来执行。
- 该接口首先继承了Comparable接口,所以要实现该接口,必须实现Comparable接口。具体来说,就是基于getDelay的返回值比较两个元素的大小。
关于take函数,有两点需要说明:
- 不同于一般的阻塞队列,只在队列为空的时候,才阻塞。如果栈顶元素的延迟时间没到,也会阻塞。
- 在代码中使用了一个优化技术,用一个Thread leader变量记录了等待堆顶元素的第1个线程。通过getDelay可以知道堆顶元素何时到期,不必无限期等待,可以使用condition.awaitNanos()等待一个有限的时间,只有当发现还有其他线程也在等待堆顶元素时,才需要无限期等待。
对于put方法,有一点需要说明,不是每放入一个元素,都需要通知等待的线程。放入的元素,如果其延迟时间大于当前堆顶的元素延迟时间,就没必要通知等待的线程;只有当延迟时间是最小的,在堆顶时,才有必要通知等待的线程。
SynchronousQueue
SynchronousQueue是一种特殊的BlockingQueue,它本身没有容量。先调put,线程会阻塞;直到另外一个线程调用了take,两个线程才同时解锁,反之亦然。对于多个线程而言,例如多个线程,调用3次put,3个线程都会阻塞;直到另外的线程调用3次take,6个线程才能同时解锁,反之亦然。
BlockingDeque
BlockingDeque定义了一个阻塞的双端队列接口。
CopyOnWrite
CopyOnWrite指在写的时候,不是直接写源数据,而是把数据拷贝一份进行修改,而通过悲观锁或者乐观锁的方式写回。那为什么不直接修改,而是要拷贝一份修改呢?这是为了在读的时候不加锁。
CopyOnWriteArrayList
CopyOnWriteArrayList的核心数据结构是一个数组。
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
CopyOnWriteArraySet
CopyOnWriteArraySet就是用Array实现的一个Set,保证所有元素都不重复。其内部是封装的一个CopyOnWriteArrayList.
ConcurrentHashMap
HashMap通常的实现方式是数组 + 链表,这种方式被称为拉链法。ConcurrentHashMap在这个基本原理之上进行了各种优化,在JDK7和JDK8中的实现方式有很大差异。
JDK7中的实现方式
为了提高并发度,在JDK1.7中,一个HashMap被拆分为多个子HashMap。每一个子HashMap称作一个Segment,多个线程操作多个Segment相互独立。
具体来说,每个Segment都继承自ReentrantLock,Segment的数量等于锁的数量,这些锁彼此之间相互独立,即所谓的分段锁。
构造函数中第三个参数concurrentLevel,是并发度,也就是Segment数组的大小。这个值一旦在构造函数中设定,之后不能再扩容,为了提升hash的计算性能,会保证数组的大小始终是2的整数次方。初始的时候,如果不能指定任何参数,就会使用默认值,默认的Segment数组大小是16.
扩容
关于扩容,有几点需要说明:
- 函数的参数,也就是将要加入的最新节点。再扩容完成之后,把该节点加入新的Hash表。
- 整个数组的长度是2的整数次方,每次按二倍扩容,而hash函数就是对数组长度取模,即node.hash & sizeMask。因此,如果元素之前处于第i个位置,当再次hash时,必然处于第i个或者第i + oldCapacity个位置。
- 上面的扩容进行了一次优化,并没有对元素依次拷贝,而是先找到lastRun位置,也就是for循环。lastRun到链表末尾的所有元素,其hash值没有改变,所以不需要依次重新拷贝。
get实现分析
整个get过程也就是两次hash:
第一次hash,函数为(h >>> segmentShift)& segmentMask,计算出所在的Segment;
第二次hash,函数为h & (tab.length - 1),即h对数组长度取模,找到Segment里面对应的HashEntry数组下标,然后遍历该位置的链表。
整个读的过程完全没有加锁,而是使用了UNSAFE.getObjectVolatile函数。
JDK8中的实现方式
JDK8中的实现有很大变化,首先是没有了分段锁,所有的数据都放在了一个大的HaskMap中;其次是引入了红黑树。
如果头节点是Node类型,则尾随它的就是一个普通的链表,如果头节点是TreeNode类型,它的后面就是一颗红黑树,TreeNode是Node的子类。
链表和红黑树之间可以相互转换:初始的时候是链表,当链表中的元素超过某个阈值时,把链表转换成红黑树;反之,当红黑树中的元素个数小于某个阈值时,再转换为链表。
那么为什么JDK8要做这种改变呢?在JDK7中的分段锁,有三个好处:
(1)、减少Hash冲突,避免一个槽里有太多元素。
(2)、提高读和写的并发度。段与段之间相互独立。
(3)、提供扩容的并发度。扩容的时候,不是整个ConcurrentHashMap一起扩容,而是每个Segment独立扩容。
针对这三个好处,我们来看下在JDK8中相应的处理方式:
(1)、使用红黑树,当一个槽里有很多元素时,其查询和更新速度会比链表快很多,Hash冲突的问题由此得到较好的解决。
(2)、加锁的粒度,并非整个ConcurrentHashMap,而是对每个头节点分别加锁,即并发度,就是Node数组的长度,初始长度为16,和在JDK中初始的Segment的个数相同。
(3)、并发扩容,这是难度最大的。在JDK7中,一旦Segment的个数在初始化的时候确立,不能再修改,并发度被固定,之后只是在每个Segment内部扩容,这意味着每个Segment独立扩容,互不影响,不存在并发扩容的问题。但在JDK8中,相当于只有1个Segment,当一个线程要扩容Node数组的时候,其他线程还要读写,因此处理过程很复杂,后面会详细分析。
扩容
MIN_TREE_CAPACITY = 64,意味着当数组的长度没有超过64的时候,数组的每个节点里都是链表,只会扩容,不会转红黑树。只有当数组长度大于或等于64时,才考虑把链表转换成红黑树。
扩容的过程如下:
- 首先建一个新的HashMap,其数组长度是旧数组长度的2被,然后把旧的元素逐个迁移过来。所以函数中的参数有2个,第一个参数tab是扩容之前的HashMap,第2个参数nextTab是扩容之后的HashMap。当nextTab = null的时候,函数最初会对nextTab进行初始化。这里有个关键的说明,该函数会被多个线程调用,所以每个线程只是扩容旧的HashMap部分,这就涉及了如何划分任务的问题。
- 旧数组的长度是N,每个线程扩容一段,一段的长度用变量stride步长来表示,transferIndex表示了整个数组扩容的进度。
stride的计算公式,即在单核模式下直接等于n,因为在单核模式下没有办法多个线程并行扩容,只需要1个线程来扩容整个数组;在多核模式下为(n>>>3)/CPU,并且保证步长的最小值是16.显然,需要的线程个数约为n/stride。
transferIndex是ConcurrentHashMap的一个成员变量,记录了扩容的进度。初始值为n,从大到小扩容,每次减stride个位置,最终减至n<=0,表示整个扩容完成。因此从[0, transferIndex - 1]的位置表示还没有分配到线程扩容的部分,从[transferIndex, n - 1]的位置表示已经分配给某个线程进行扩容,当前正在扩容中,或者已经扩容成功。
因为transferIndex会被多个线程并发修改,每次减stride,所以需要通过CAS进行操作。 - 在扩容未完成之前,有的数组下标对应的槽已经迁移到了新的HashMap里面,有的还在旧的HashMap里面。这个时候,所有调用get的线程还是会访问旧HashMap,怎么处理呢?当Node[0]已经迁移成功,而其他Node还在迁移过程中时,如果有线程要读取Node[0]的数据,就会访问失败。为此,新键一个ForwardingNode,即转发节点,在这个节点里面记录的是新的ConcurrentHashMap的引用。这样,当线程访问到ForwardingNode之后,会去查询新的ConcurrentHashMap。
- 因为数组的长度tab.length是2的整数次方,每次扩容又是2倍。而Hash函数是Hashcode % tab.length,等价于hashCode & (tab.length - 1)。这意味着:处于第i个位置的元素,在新的Hash表的数组中一定处于第i个或者第i+n个位置。