AtomicInteger和AtomicLong
悲观锁与乐观锁
对于悲观锁,作者认为数据发生并发冲突的概率很大,所以在读操作之前就上锁。synchronized关键字,以及后面的ReentrantLock都是悲观锁的典型例子。
对于乐观锁,作者认为数据发生并发冲突的概率比较小,所以在读操作之前不上锁。等到写操作的时候,再判断数据在此期间是否被其他线程修改了。如果被其他线程修改了,就把数据重新读取出来,重复该过程;如果没有被修改,就写回去。判断数据是否被修改,同时写回新值,这两个操作要合成一个原子操作,也就是CAS(Compare And Set)。
AtomicInteger的实现就是典型的乐观锁,在mysql和redis中有类似的思路。
Unsafe的CAS详解
上面调用的CAS函数,其实是封装Unsafe类中的一个native函数,如下所示:
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffSet, expect, update);
}
AtomicInteger封装过的compareAndSet有两个参数。第一个参数expect是指变量的旧值(是读出来的值,写回去的时候,希望没有被其他线程修改,所以称为expect);第二个参数update是指变量的新值(修改过的,希望写入的值)。当expect等于变量的当前的值时,说明在修改的期间,没有其他线程对此变量进行过修改,所以可以成功写入,变量被更新为update,返回true;否则返回false。
Unsafe类是Concurrent包的基础,里面所有函数都是native的。具体到compareAndSwapInt函数如下所示:
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
该函数有四个参数。在前两个参数中,第一个是对象(也就是AtomicInteger对象),第二个是对象的成员变量(也就是AtomicInteger里面包的int变量value),后面两个参数保持不变。
要特别说明下第二个参数,它是一个long型的整数,经常被称为xxxOffset,意思是某个成员变量在对应的类中的内存偏移量(该变量在内存中的位置),表示该成员变量的本身。在Unsafe中专门有一个函数,把成员变量转化成偏移量,如下所示:
public native long objectFieldOffset(Field var1);
所有调用CAS的地方,都会先通过这个函数的成员变量转换成一个offset。以AtomicInteger为例。
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
从上面的代码可以看出,无论是Unsafe还是valueOffset,都是静态的,也就是类级别的,所有对象都是共用的。
在转化的时候,先通过反射获取value成员变量对应的Field对象,再通过objectFieldOffset函数转化成valueOffset。此处valueOffset就代表了value变量本身,后面执行CAS操作的时候,不是直接操作value,而是操作valueOffset。
自旋与阻塞
当一个线程拿不到锁的时候,有以下两种基本的等待策略。
策略1:放弃CPU,进入阻塞状态,等待后续被唤醒,再重新被操作系统调度。
策略2:不放弃CPU,空转,不断重试,也就是所谓的自旋。
很显然,如果是单核CPU,只能用策略1。因为如果不放弃CPU,那么其他线程无法运行,也就无法释放锁。但是对于多核CPU来说,策略2就很有用了,因为没有线程切换的开销。
AtomicInteger的实现就用的是自旋策略,如果拿不到锁,就会一直重试。
有一点说明,这两种策略并不是互斥的,可以结合使用。如果拿不到锁,先自旋几圈;如果自旋还拿不到锁,再阻塞,synchronized关键字就是这样的实现策略。
除了AtomicInteger,AtomicLong也是同样的原理。
AtomicBoolean
为什么需要AtomicBoolean
对于int或者long型变量,需要进行加减操作,所以需要加锁,但是对于一个Boolean类型来说,true和false的赋值和取值操作,加上volatile关键字就足够了,为什么还需要AtomicBoolean呢?
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
其中,expect是旧的引用,update为新的引用。
如何支持Boolean和double类型
在Unsafe类中,只提供了三种类型的CAS操作:int、long、Object(也就是引用类型)。
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
第一个参数是要修改的对象,第一个参数是对象的成员变量在内存中的位置,第三个参数是该变量的旧值,第四个参数是该变量的新值。
AtomicBoolean的实现如下:
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(boolean expect, boolean update) {
int e = expect ? 1 : 0;
int u = update ? 1 : 0;
return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}
AtomicStampedReference和AtomicMarkableReference
ABA问题解决办法
到目前为止,CAS都是基于值来做比较的。但如果另外一个线程把变量的值从A值改为B,再从B改回A,那么尽管修改过两次,可是在当前线程做CAS操作的时候,却会因为值没改变而认为数据没有被其他线程修改过,这就是所谓的ABA问题。
要解决ABA的问题,不仅要比较值,还要比较版本号,而这正是AtomicStampedRefence做的事情,其对应的CAS函数如下:
/**
* Atomically sets the value of both the reference and stamp
* to the given update values if the
* current reference is {@code ==} to the expected reference
* and the current stamp is equal to the expected stamp.
*
* @param expectedReference the expected value of the reference
* @param newReference the new value for the reference
* @param expectedStamp the expected value of the stamp
* @param newStamp the new value for the stamp
* @return {@code true} if successful
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
之前的CAS只有两个参数,这里的CAS有四个参数,后两个参数就是版本号的旧值和新值。
当expectedReference != 对象当前的reference时,说明该数据肯定被其他线程修改过;
当expectedReference == 对象当前的reference时,再进一步比较expectedStamp是否等于对象当前的版本号,以此判断数据是否被其他线程修改过。
AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater
为什么需要AtomicXXXFieldUpdater
如果一个类是自己编写的,则可以在编写的时候把成员变量定义为Atomic类型,但如果是一个已经有的类,在不能更改其源代码的情况下,要想实现对其成员变量的原子操作,就需要AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater。
首先,其构造函数是protected,不能直接构造其对象,必须通过它提供的一个静态函数来创建,如下所示:
/**
* Creates and returns an updater for objects with the given field.
* The Class argument is needed to check that reflective types and
* generic types match.
*
* @param tclass the class of the objects holding the field
* @param fieldName the name of the field to be updated
* @param <U> the type of instances of tclass
* @return the updater
* @throws IllegalArgumentException if the field is not a
* volatile integer type
* @throws RuntimeException with a nested reflection-based
* exception if the class does not hold field or is the wrong type,
* or the field is inaccessible to the caller according to Java language
* access control
*/
@CallerSensitive
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>
(tclass, fieldName, Reflection.getCallerClass());
}
newUpdater静态函数传入的是要修改的类和对应的成员变量的名字,内部通过反射拿到这个类的成员变量,然后包装成一个AtomicIntegerFieldUpdater对象。所以,这个对象表示的是类的某个成员,而不是对象的成员变量。
限制条件
要想使用AtomicIntegerFieldUpdater修改成员变量,成员变量必须是volatile的int类型不能是Integer类型。
AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
Concurrent包提供了AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray三个数组元素的原子操作。注意,这里并不是说对整个数组的操作是原子的,而是针对数组中一个元素的原子操作而言。
使用方式
public final int getAndIncrement(int i) {
return getAndAdd(i, 1);
}
实现原理
其底层的CAS函数用的还是compareAndSwapInt,但是把数组下标i转化成对应的内存偏移量,所用的方法和之前的AtomicInteger不太一样。
private static long byteOffset(int i) {
return ((long) i << shift) + base;
}
把下标i转换成对应的内存地址,用到了shift和base两个变量。这两个变量都是AtomicIntegerArray的静态成员变量,用了Unsafe类的arrayBaseOffset和arrayIndexScale两个函数来获取。
private static final int base = unsafe.arrayBaseOffset(int[].class);
static {
int scale = unsafe.arrayIndexScale(int[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
shift = 31 - Integer.numberOfLeadingZeros(scale);
}
q其中,base表示数组的首地址的位置,scale表示一个数组元素的大小,i的偏移量则等于:iscale+base。
但为了优化性能,使用了位移操作,shift表示scale中1的位置所以,偏移量的计算变成上面的代码中的:i<<shift+base,表达的是:iscale+base。
Striped64与LongAdder
从JDK8开始,针对long型的原子操作,Java又提供了LongAdder、LongAccumulator;针对Double类型,Java提供了DoubleAdder、DoubleAccumulator。
LongAdder原理
AtomicLong内部是一个volatile long型变量,由多个线程对这个变量进行CAS操作。多个线程同时对一个变量进行CAS操作,在高并发的场景下仍不够快,如果再要提高性能,该怎么做呢?
把一个变量拆成多份,变为多个变量,有些类似于ConcurrentHashMap的分段锁的例子。把一个long类型拆分成一个base变量外加多个cell,每个cell包装成一个Long型变量。当多个线程并发累加的时候,如果并发度较低,就直接加到base变量上,如果并发程度高,冲突大,平摊到这些cell上。在最后取值的时候,再把base和这些cell求sum运算。
伪共享与缓存行填充
在Cell类的定义中,用了一个独特的注解@sun.misc.Contented,这是JDK8之后才有的,背后涉及一个很重要的优化原理,伪共享和缓存行填充。
每个CPU都有自己的缓存。缓存与主内存进行数据交换的基本单位叫Cache Line(缓存行)。在64位X86架构中,缓存行是64字节,也就是8个Long型的大小。这也意味着当缓存失效,要刷新到主内存的时候,最少要刷新64字节。
当主内存中有变量X、Y、Z,被CPU1和CPU2分别读入自己的缓存,放在了同一行CacheLine里面,当CPU1修改了X变量,它要失效整行的Cache Line,也就是往总线上发消息,通知CPU2对应的CacheLine失效。由于Cache Line是数据交换的基本单位,无法只失效X,要失效就会失效整行的Cache Line,这会导致Y、Z变量的缓存也失效。
虽然只修改了X变量,本应该只失效X变量的缓存,但Y、Z变量也随之失效。Y、Z变量的数据没有修改,本应该很好地被CPU1和CPU2共享,却没做到,这就是所谓的伪共享问题。
问题的原因是,Y、Z和X变量处在了同一行CacheLine里面。要解决这个问题,需要用到所谓的缓存行填充,分别在X、Y、Z后面加上7个无用的Long型,填充整个缓存行,让X、Y、Z处在三行不同的缓存行中。
LongAdder核心实现
/**
* Adds the given value.
*
* @param x the value to add
*/
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
当一个线程调用add的时候,首先会尝试使用casBase把x加到Base变量上。如果不成功,则再用a.cas()函数尝试把x加到Cell数组的某个元素上。如果还不成功,最后再调用longAccumulate()函数。
LongAccumulator
LongAdder只能进行累加操作,并且初始值默认为0;LongAccumulator可以自己定义一个二元操作符,并且可以传入一个初始值。
DoubleAdder与DoubleAccumulator
与LongAdder和LongAccumulator原理一样,只不过是将Double转化成Long。
到此为止,Concurrent包的所有原子类都介绍完了。