JDK源码阅读:Atomic类

Scroll Down

AtomicInteger和AtomicLong

悲观锁与乐观锁

对于悲观锁,作者认为数据发生并发冲突的概率很大,所以在读操作之前就上锁。synchronized关键字,以及后面的ReentrantLock都是悲观锁的典型例子。
对于乐观锁,作者认为数据发生并发冲突的概率比较小,所以在读操作之前不上锁。等到写操作的时候,再判断数据在此期间是否被其他线程修改了。如果被其他线程修改了,就把数据重新读取出来,重复该过程;如果没有被修改,就写回去。判断数据是否被修改,同时写回新值,这两个操作要合成一个原子操作,也就是CAS(Compare And Set)。
AtomicInteger的实现就是典型的乐观锁,在mysql和redis中有类似的思路。

Unsafe的CAS详解

上面调用的CAS函数,其实是封装Unsafe类中的一个native函数,如下所示:

public final boolean compareAndSet(int expect, int update) {
  return unsafe.compareAndSwapInt(this, valueOffSet, expect, update);
}

AtomicInteger封装过的compareAndSet有两个参数。第一个参数expect是指变量的旧值(是读出来的值,写回去的时候,希望没有被其他线程修改,所以称为expect);第二个参数update是指变量的新值(修改过的,希望写入的值)。当expect等于变量的当前的值时,说明在修改的期间,没有其他线程对此变量进行过修改,所以可以成功写入,变量被更新为update,返回true;否则返回false。
Unsafe类是Concurrent包的基础,里面所有函数都是native的。具体到compareAndSwapInt函数如下所示:

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

该函数有四个参数。在前两个参数中,第一个是对象(也就是AtomicInteger对象),第二个是对象的成员变量(也就是AtomicInteger里面包的int变量value),后面两个参数保持不变。
要特别说明下第二个参数,它是一个long型的整数,经常被称为xxxOffset,意思是某个成员变量在对应的类中的内存偏移量(该变量在内存中的位置),表示该成员变量的本身。在Unsafe中专门有一个函数,把成员变量转化成偏移量,如下所示:

public native long objectFieldOffset(Field var1);

所有调用CAS的地方,都会先通过这个函数的成员变量转换成一个offset。以AtomicInteger为例。

// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
    try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}

从上面的代码可以看出,无论是Unsafe还是valueOffset,都是静态的,也就是类级别的,所有对象都是共用的。
在转化的时候,先通过反射获取value成员变量对应的Field对象,再通过objectFieldOffset函数转化成valueOffset。此处valueOffset就代表了value变量本身,后面执行CAS操作的时候,不是直接操作value,而是操作valueOffset。

自旋与阻塞

当一个线程拿不到锁的时候,有以下两种基本的等待策略。
策略1:放弃CPU,进入阻塞状态,等待后续被唤醒,再重新被操作系统调度。
策略2:不放弃CPU,空转,不断重试,也就是所谓的自旋。
很显然,如果是单核CPU,只能用策略1。因为如果不放弃CPU,那么其他线程无法运行,也就无法释放锁。但是对于多核CPU来说,策略2就很有用了,因为没有线程切换的开销。
AtomicInteger的实现就用的是自旋策略,如果拿不到锁,就会一直重试。
有一点说明,这两种策略并不是互斥的,可以结合使用。如果拿不到锁,先自旋几圈;如果自旋还拿不到锁,再阻塞,synchronized关键字就是这样的实现策略。
除了AtomicInteger,AtomicLong也是同样的原理。

AtomicBoolean

为什么需要AtomicBoolean

对于int或者long型变量,需要进行加减操作,所以需要加锁,但是对于一个Boolean类型来说,true和false的赋值和取值操作,加上volatile关键字就足够了,为什么还需要AtomicBoolean呢?

public final boolean compareAndSet(V expect, V update) {
  return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

其中,expect是旧的引用,update为新的引用。

如何支持Boolean和double类型

在Unsafe类中,只提供了三种类型的CAS操作:int、long、Object(也就是引用类型)。

public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

第一个参数是要修改的对象,第一个参数是对象的成员变量在内存中的位置,第三个参数是该变量的旧值,第四个参数是该变量的新值。
AtomicBoolean的实现如下:

/**
 * Atomically sets the value to the given updated value
 * if the current value {@code ==} the expected value.
 *
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful. False return indicates that
 * the actual value was not equal to the expected value.
 */
public final boolean compareAndSet(boolean expect, boolean update) {
    int e = expect ? 1 : 0;
    int u = update ? 1 : 0;
    return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}

AtomicStampedReference和AtomicMarkableReference

ABA问题解决办法

到目前为止,CAS都是基于值来做比较的。但如果另外一个线程把变量的值从A值改为B,再从B改回A,那么尽管修改过两次,可是在当前线程做CAS操作的时候,却会因为值没改变而认为数据没有被其他线程修改过,这就是所谓的ABA问题。
要解决ABA的问题,不仅要比较值,还要比较版本号,而这正是AtomicStampedRefence做的事情,其对应的CAS函数如下:

/**
 * Atomically sets the value of both the reference and stamp
 * to the given update values if the
 * current reference is {@code ==} to the expected reference
 * and the current stamp is equal to the expected stamp.
 *
 * @param expectedReference the expected value of the reference
 * @param newReference the new value for the reference
 * @param expectedStamp the expected value of the stamp
 * @param newStamp the new value for the stamp
 * @return {@code true} if successful
 */
public boolean compareAndSet(V   expectedReference,
                             V   newReference,
                             int expectedStamp,
                             int newStamp) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        expectedStamp == current.stamp &&
        ((newReference == current.reference &&
          newStamp == current.stamp) ||
         casPair(current, Pair.of(newReference, newStamp)));
}

之前的CAS只有两个参数,这里的CAS有四个参数,后两个参数就是版本号的旧值和新值。
当expectedReference != 对象当前的reference时,说明该数据肯定被其他线程修改过;
当expectedReference == 对象当前的reference时,再进一步比较expectedStamp是否等于对象当前的版本号,以此判断数据是否被其他线程修改过。

AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater

为什么需要AtomicXXXFieldUpdater

如果一个类是自己编写的,则可以在编写的时候把成员变量定义为Atomic类型,但如果是一个已经有的类,在不能更改其源代码的情况下,要想实现对其成员变量的原子操作,就需要AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater。
首先,其构造函数是protected,不能直接构造其对象,必须通过它提供的一个静态函数来创建,如下所示:

/**
 * Creates and returns an updater for objects with the given field.
 * The Class argument is needed to check that reflective types and
 * generic types match.
 *
 * @param tclass the class of the objects holding the field
 * @param fieldName the name of the field to be updated
 * @param <U> the type of instances of tclass
 * @return the updater
 * @throws IllegalArgumentException if the field is not a
 * volatile integer type
 * @throws RuntimeException with a nested reflection-based
 * exception if the class does not hold field or is the wrong type,
 * or the field is inaccessible to the caller according to Java language
 * access control
 */
@CallerSensitive
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
                                                          String fieldName) {
    return new AtomicIntegerFieldUpdaterImpl<U>
        (tclass, fieldName, Reflection.getCallerClass());
}

newUpdater静态函数传入的是要修改的类和对应的成员变量的名字,内部通过反射拿到这个类的成员变量,然后包装成一个AtomicIntegerFieldUpdater对象。所以,这个对象表示的是类的某个成员,而不是对象的成员变量。

限制条件

要想使用AtomicIntegerFieldUpdater修改成员变量,成员变量必须是volatile的int类型不能是Integer类型。

AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

Concurrent包提供了AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray三个数组元素的原子操作。注意,这里并不是说对整个数组的操作是原子的,而是针对数组中一个元素的原子操作而言。

使用方式

public final int getAndIncrement(int i) {
  return getAndAdd(i, 1);
}

实现原理

其底层的CAS函数用的还是compareAndSwapInt,但是把数组下标i转化成对应的内存偏移量,所用的方法和之前的AtomicInteger不太一样。

private static long byteOffset(int i) {
    return ((long) i << shift) + base;
}

把下标i转换成对应的内存地址,用到了shift和base两个变量。这两个变量都是AtomicIntegerArray的静态成员变量,用了Unsafe类的arrayBaseOffset和arrayIndexScale两个函数来获取。

private static final int base = unsafe.arrayBaseOffset(int[].class);

static {
    int scale = unsafe.arrayIndexScale(int[].class);
    if ((scale & (scale - 1)) != 0)
        throw new Error("data type scale not a power of two");
    shift = 31 - Integer.numberOfLeadingZeros(scale);
}

q其中,base表示数组的首地址的位置,scale表示一个数组元素的大小,i的偏移量则等于:iscale+base。
但为了优化性能,使用了位移操作,shift表示scale中1的位置所以,偏移量的计算变成上面的代码中的:i<<shift+base,表达的是:i
scale+base。

Striped64与LongAdder

从JDK8开始,针对long型的原子操作,Java又提供了LongAdder、LongAccumulator;针对Double类型,Java提供了DoubleAdder、DoubleAccumulator。

LongAdder原理

AtomicLong内部是一个volatile long型变量,由多个线程对这个变量进行CAS操作。多个线程同时对一个变量进行CAS操作,在高并发的场景下仍不够快,如果再要提高性能,该怎么做呢?
把一个变量拆成多份,变为多个变量,有些类似于ConcurrentHashMap的分段锁的例子。把一个long类型拆分成一个base变量外加多个cell,每个cell包装成一个Long型变量。当多个线程并发累加的时候,如果并发度较低,就直接加到base变量上,如果并发程度高,冲突大,平摊到这些cell上。在最后取值的时候,再把base和这些cell求sum运算。

伪共享与缓存行填充

在Cell类的定义中,用了一个独特的注解@sun.misc.Contented,这是JDK8之后才有的,背后涉及一个很重要的优化原理,伪共享和缓存行填充。
每个CPU都有自己的缓存。缓存与主内存进行数据交换的基本单位叫Cache Line(缓存行)。在64位X86架构中,缓存行是64字节,也就是8个Long型的大小。这也意味着当缓存失效,要刷新到主内存的时候,最少要刷新64字节。
当主内存中有变量X、Y、Z,被CPU1和CPU2分别读入自己的缓存,放在了同一行CacheLine里面,当CPU1修改了X变量,它要失效整行的Cache Line,也就是往总线上发消息,通知CPU2对应的CacheLine失效。由于Cache Line是数据交换的基本单位,无法只失效X,要失效就会失效整行的Cache Line,这会导致Y、Z变量的缓存也失效。
虽然只修改了X变量,本应该只失效X变量的缓存,但Y、Z变量也随之失效。Y、Z变量的数据没有修改,本应该很好地被CPU1和CPU2共享,却没做到,这就是所谓的伪共享问题。
问题的原因是,Y、Z和X变量处在了同一行CacheLine里面。要解决这个问题,需要用到所谓的缓存行填充,分别在X、Y、Z后面加上7个无用的Long型,填充整个缓存行,让X、Y、Z处在三行不同的缓存行中。

LongAdder核心实现

/**
 * Adds the given value.
 *
 * @param x the value to add
 */
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

当一个线程调用add的时候,首先会尝试使用casBase把x加到Base变量上。如果不成功,则再用a.cas()函数尝试把x加到Cell数组的某个元素上。如果还不成功,最后再调用longAccumulate()函数。

LongAccumulator

LongAdder只能进行累加操作,并且初始值默认为0;LongAccumulator可以自己定义一个二元操作符,并且可以传入一个初始值。

DoubleAdder与DoubleAccumulator

与LongAdder和LongAccumulator原理一样,只不过是将Double转化成Long。
到此为止,Concurrent包的所有原子类都介绍完了。