LongAdder源码

Scroll Down

前言

并发场景下通常会使用 AtomicLong 原子类进行计数等操作,在 JDK1.8 中提供了 LongAdder 来实现类似功能。相对于 AtomicLong ,LongAdder 有着更高的性能,可以完全替代 AtomicLong 的计数操作。

下面我们先对 AtomicLong 简单介绍,然后重点分析 LongAdder 的实现。
AtomicLong 关键的代码如下:

public class AtomicLong extends Number implements java.io.Serializable {
        // setup to use Unsafe.compareAndSwapLong for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicLong.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    // 计数值
    private volatile long value;

    // 累计计数
    public final long incrementAndGet() {
        return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
    }
}

AtomicLong 底层是使用 Unsafe 的 CAS 来实现的,当操作失败时会以自旋的方式重试,直到成功才会退出自旋。因此在高并发场景下,可能会有很多线程处于重试状态,导致性能下降。具体表现在源码中的部分如下:

+--- Unsafe
    public final long getAndAddLong(Object var1, long var2, long var4) {
        long var6;
        do {
            var6 = this.getLongVolatile(var1, var2);
        } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

        return var6;
    }

可以看到,使用 AtomicLong 进行计数操作时,多个线程竞争修改共享资源 value 值时是通过自旋来完成的。在高并发环境下,同一时刻只有一个线程可以 CAS 操作成功,其它线程都会 CAS 失败,从而会不断重试直到成功才会退出自旋,这就是对性能造成影响的原因。

LongAdder

LongAdder 本质上是使用空间换时间的思想来进行设计的,不过消耗的内存空间可以忽略不计。它维护了一个基础变量 base 作为基础计数器,在竞争比较小的情况下 CAS 操作该数据即可;为了应对竞争激烈的场景,它里面还维护了一组按需分配的计数单元数组 Cell[],处理计数请求时,不同线程可以映射到不同的计算单元上进行计数,采用分而治之的思想减小了线程竞争,将操作单个共享资源的压力分摊到多个计算单元上,提高了并发度。

通过将请求计数压力分摊,LongAdder 可以提供比 AtomicLong 更好的性能。获取计数值时,只要将 base 与 Cell[] 数组中的计数单元累加即可。当然,在并发很低的情况,使用 AtomicLong 更简单直接一些,并且效率稍微高一些。

abstract class Striped64 extends Number {
    /**
     * CPU 核数,用于限制 cells 数组的大小
     */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     * 计算单位数组,大小是 2^n;
     * 核心思想是,每个线程会映射自己的 Cell 进行计数,从而减少竞争。
     */
    transient volatile Cell[] cells;

    /**
     * 基础计算器,用于最初无竞争的情况下以及初始化计数单元失败的情况
     */
    transient volatile long base;


    /**
     * 通过 CAS 更新该值,标记当前是否有线程在创建或扩容 cells或创建 Cell行为
     */
    transient volatile int cellsBusy;
}

可以看到,LongAdder 本身中没有相关属性,它是通过直接继承 Striped64 类中属性。以上四个属性已经详细注释,在对应的场景下再详细介绍。

计数单元

+--- Striped64
    @sun.misc.Contended
    static final class Cell {
        /**
         * 计数属性,使用 volatile 修饰保证可见性
         */
        volatile long value;

        Cell(long x) {
            value = x;
        }

        /**
         * CAS 更新计数值
         *
         * @param cmp 原始值
         * @param val 新值
         * @return
         */
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        // Unsafe 实例
        private static final sun.misc.Unsafe UNSAFE;
        // value 字段的偏移量
        private static final long valueOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                        (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

计数单元 Cell 是 Striped64 的内部类,用于某个/某些线程处理计数请求的,也就是一个 Cell 可以对应多个线程。多个线程操作 Cell 数组的原理如下:
image-1690462001248
通过定义一个 volatile 类型的 value 属性作为计数值,使用 Unsafe 的 CAS 来修改它的值。LongAdder 计数器的值就是所有 Cell 的值加上 base 的值。

注意:该类使用了 @sun.misc.Contended 注解,这个注解的作用是用来解决伪共享问题的

计数方法

public class LongAdder extends Striped64 implements Serializable {
    private static final long serialVersionUID = 7249069246863182397L;
    /**
     * 创建一个初始和为零的新加法器。
     */
    public LongAdder() {
    }

    /**
     * 原子累加 1
     */
    public void increment() {
        add(1L);
    }

    /**
     * 原子递减 1
     */
    public void decrement() {
        add(-1L);
    }
}

可以看到,计数时无论是递增还是递减,都是调用了 add() 方法,它是计数的核心。

+--- LongAdder
    /**
     * 累加给定的值
     * @param x the value to add
     */
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;

        // 1 cells 不为空,说明出现过竞争,cells 已经创建
        // 2 CAS 操作基础计数器 base 失败,说明出现了竞争
        // PS: cells 数组为延迟加载,只有在 CAS 更新 base 失败的情况下才会初始化;即没有竞争时操作的是 base 值,发生竞争时 cells 才起作用
        if ((as = cells) != null || !casBase(b = base, b + x)) {

            // true 表示当前没有竞争;false 表示有竞争
            boolean uncontended = true;

            // 1 cells 为空,说明出现竞争,是由于 CAS 操作基础计数器 base 失败才执行到这里
            // 2 当前线程映射的 Cell 为空,说明当前线程还没有对应的 Cell,初始化一个 Cell
            // 3 更新当前线程映射的 Cell 失败,说明其它线程也映射到了这个 Cell,表明存在竞争
            if (as == null || (m = as.length - 1) < 0 ||
                    // getProbe() 方法返回的是线程中的 threadLocalRandomProbe 字段,它是通过随机数生成的一个值,对于一个确定的线程,这个值是固定的
                    (a = as[getProbe() & m]) == null ||
                    !(uncontended = a.cas(v = a.value, v + x)))

                longAccumulate(x, null, uncontended);
        }
    }

上述方法的分支逻辑有点多,不过总体上是根据是否出现过竞争来处理的。下面我们进行详细分析:

没有出现竞争的情况:没有出现竞争就意味着 cells 数组为空,此时线程处理计数请求只需要 CAS 操作 base 值即可,操作成功直接结束;操作失败说明存在竞争,此时需要通过计数单元分摊计数请求,此时 as == null 成立,会进入到 longAccumulate 方法;
出现过竞争的情况:已经出现过竞争就意味着 cells 数组已经存在,此时线程处理计算请求需要映射寻找对应的计数单元完成计数,也就不会操作 base 值了。此时会通过 a = as[getProbe() & m]) 来映射对应的计数单元,如果当前线程没有对应的计数单元,或者 CAS 操作计算单元失败「有冲突」,都会进入到 longAccumulate 方法。
需要说明的是,线程在映射计数单元时,和 HashMap 中定位哈希桶一样的方式,只是这个 hash 值是线程相关的值。

关于以上两种情况的详细说明,已经在代码中详细注释,就不再展开说明。

longAccumulate

通过前文我们知道,进入 longAccumulate 方法的情况总得来说有两种,CAS 操作 base 值失败,需要初始化 Cell 计数单元数组;线程没有映射到计数单元或 CAS 操作计数单元失败。也就是该方法集计数单元数组初始化、创建新的计数单元、竞争更新、扩容计数数组操作于一体,总体比较复杂。下面我们将该方法分成已初始化计数单元数组和未初始化计数单元数组进行分析。

已初始化计数单元数组

+--- Striped64
    /**
     *
     * @param x              the value 待更新的值
     * @param fn             更新函数,或 null 用于添加(此约定避免了 LongAdder 中需要额外的字段或函数)。
     * @param wasUncontended 进入该方法前,是否 CAS 操作失败,也就是是否有竞争。true 表示没有竞争
     */
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {

        // 存储线程的 probe 值
        int h;
        // 如果 getProbe() 方法返回 0 ,说明随机数未初始化
        if ((h = getProbe()) == 0) {
            // 强制初始化
            ThreadLocalRandom.current(); // force initialization
            // 重新获取 probe 值
            h = getProbe();

            // 都未初始化,肯定还不存在竞争的情况
            wasUncontended = true;
        }

        // 是否发生碰撞,即多个线程映射到同一个 Cell 元素
        boolean collide = false;                // True if last slot nonempty
        for (; ; ) {
            Cell[] as;
            Cell a;
            int n;
            long v;

            // cell 已经初始化过的情况
            if ((as = cells) != null && (n = as.length) > 0) {
                // 当前线程映射的 Cell 为空,那么尝试创建一个 Cell
                if ((a = as[(n - 1) & h]) == null) {

                    // 当前无其它线程在创建或扩容 cells,也没有线程在创建 Cell
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        // 创建一个 Cell,值为当前需要增加的值
                        Cell r = new Cell(x);   // Optimistically create

                        // 再次检测 cellsBusy 的状态,如果没有其它线程忙碌,那么当前线程 CAS 设置值为 1,相当于获取了锁
                        // spin lock
                        if (cellsBusy == 0 && casCellsBusy()) {
                            // 标记是否创建成功
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs;
                                int m, j;
                                // 找到当前线程映射到 cells 数组中的位置
                                if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                    // 将创建的 Cell 放在 cells
                                    rs[j] = r;
                                    // 标记成功
                                    created = true;
                                }

                            } finally {
                                // 相当于释放锁
                                cellsBusy = 0;
                            }

                            // 创建成功后直接返回
                            if (created)
                                break;

                            // 创建不成功,下一轮循环重试
                            continue;           // Slot is now non-empty
                        }
                    }

                    // 当前线程映射的位置为空,自然是没有发生碰撞
                    collide = false;

                    // 当前线程映射的 Cell 不为空且更新失败了,表示有竞争。这里重置为 true ,重置线程的 probe 并自旋重试。
                    // 这里对应调用方法 add() 中的 Cell 冲突更新的情况,设置 wasUncontended 为 true 
                } else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash

                    // 执行到这里,至少进行了 1 轮重试
                    // 尝试 CAS 更新当前线程映射的 Cell 的值,如果成功返回即可;失败重置线程的 probe 继续重试。
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                        fn.applyAsLong(v, x))))
                    break;

                    // 如果 cells 数组的长度达到了 CPU 核心数,或者 cells 扩容了,那么重试即可,不进行扩容。
                    // 设置 collide 为 false ,重置线程的 probe 并自旋重试。
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale

                    // 执行到这里说明发生了碰撞,且竞争失败了,则设置 collide 为 true 表示发生碰撞竞争失败,然后进行自旋重试
                else if (!collide)
                    collide = true;

                    // 只有重试一定次数后仍然失败才会扩容
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        // 检查是否有其它线程已经扩容过了
                        if (cells == as) {      // Expand table unless stale

                            // 新数组大小是原来数组的两倍
                            Cell[] rs = new Cell[n << 1];

                            // 把旧数组元素直接拷贝到新数组
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];

                            // 设置 cells 为新数组
                            cells = rs;
                        }
                    } finally {
                        // 释放资格
                        cellsBusy = 0;
                    }

                    // 扩容成功后,重置 collide,解除冲突标志
                    collide = false;

                    // 使用扩容后的新数组,重新尝试
                    continue;                   // Retry with expanded table
                }

                // 更新失败或者达到了CPU核心数,重新生成probe,并重试
                h = advanceProbe(h);

                // 未初始化过 cells 数组,尝试获取资格并初始化 cells 数组
            } else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
               //... 省略
            }
        }
    }

上述方法总体上还是比较复杂的,下面对主要的分支逻辑进行说明:

当前线程映射的位置没有对应的计数单元 Cell,则创建 Cell 并加入到对应位置上。加入的过程使用 spin lock;
当前线程调用 add() 方法 CAS 操作映射的 Cell 失败,那么通过重置当前线程 probe 然后自旋重试。注意:此时不着急扩容 Cell 数组,这种情况下继续重试;
使用 CAS 尝试更新当前线程映射的 Cell 的值,执行到这里,至少已经经历过了第 2 步。更新成功则结束,更新失败则继续下一步判断;
CAS 尝试更新当前线程映射的 Cell 的值失败,说明多个线程映射到了同一个 Cell 导致冲突,按理说此时最好扩容,但实际上并没有扩容,而是判断 cells 数组的长度是否达到了 CPU 核心数,或者 cells 数组是否已经扩容了。针对这种情况,还是不着急扩容,继续自旋重试。这种情况是合理的,下文会重点分析下;
如果是发生碰撞竞争失败,即多个线程映射到了同一个 Cell 导致更新失败,那么解除碰撞标志,这种情况再给个机会重试下;
最多重试三次后仍然失败,那么就需要扩容 Cell 数组了。扩容的时候使用 CAS 抢占扩容资格 cellsBusy,抢占成功进行扩容;注意:扩容并没有完成计数操作,扩容完成后需要继续自旋完成计数,由于扩容了,自旋一次就能成功的可性能很大;
特别说明:如果 cells 数组的长度达到了 CPU 核心数就不会扩容了,即使竞争激烈。我么知道 cells 数组的大小是 2^n ,这就意味着 cells 数组最大只能达到 >= NCPU 的最小2次方;比如服务器是 8 核的,那么 cells 数组的最大只会到 8 ,达到 8 就不会扩容了。因为同一个 CPU 核心同时只能运行一个线程,并发执行的时候,服务器同一时刻能运行的线程数是固定的。

未初始化计数单元数组

/**
 *
 * @param x              the value 待更新的值
 * @param fn             更新函数,或 null 用于添加(此约定避免了 LongAdder 中需要额外的字段或函数)。
 * @param wasUncontended 进入该方法前,是否 CAS 操作失败,也就是是否有竞争。true 表示没有竞争
 */
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {

    // 存储线程的 probe 值
    int h;
    // 如果 getProbe() 方法返回 0 ,说明随机数未初始化
    if ((h = getProbe()) == 0) {
        // 强制初始化
        ThreadLocalRandom.current(); // force initialization
        // 重新获取 probe 值
        h = getProbe();

        // 都未初始化,肯定还不存在竞争的情况
        wasUncontended = true;
    }

    // 是否发生碰撞,即多个线程映射到同一个 Cell 元素
    boolean collide = false;                // True if last slot nonempty
    for (; ; ) {
        Cell[] as;
        Cell a;
        int n;
        long v;

        // cell 已经初始化过的情况
        if ((as = cells) != null && (n = as.length) > 0) {
           
         //... 省略

            // 未初始化过 cells 数组,尝试获取资格并初始化 cells 数组
        } else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // 标记初始化成功
            boolean init = false;
            try {                           // Initialize table
                // 检测是否有其它线程初始化过
                if (cells == as) {
                    // 新建一个大小为 2 的 Cell 数组
                    Cell[] rs = new Cell[2];

                    // 找到当前线程映射到的位置,并创建对应的 Cell
                    rs[h & 1] = new Cell(x);

                    // 将创建的数组赋值给 cells
                    cells = rs;

                    // 初始化成功
                    init = true;
                }

            } finally {
                // 释放资格
                cellsBusy = 0;
            }

            // 初始化成功直接返回,因为增加的值已经同时创建到Cell中了
            if (init)
                break;

            // 如果初始化 cells 数组有竞争,就尝试更新基础计数器 base,成功返回即可;失败重试;
        } else if (casBase(v = base, ((fn == null) ? v + x :
                fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

这种情况比较简单,具体流程如上,代码中已经详细注释。需要说明的是,如果初始化的时候也有竞争,那么竞争失败的线程会尝试更新基础计数器 base,成功直接返回,失败了才会继续重试。

sum() 方法

+--- LongAdder
    public long sum() {
        Cell[] as = cells; Cell a;

        // 初始值为基础计数器的值
        long sum = base;

        // 如果 cells 不为空,则统计每个计数单元中的值
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }

        // 返回 sum
        return sum;
    }

LongAdder 加法器的值包含两部分,将 base 和所有 Cell 的值相加就是结果。

这里需要注意,如果前面已经累加过的 Cell 的 value 有修改,那么就无法计算到了。由此看出,LongAdder 不是强一致性的。

LongAdder 小结

JDK 不仅提供了支持 long 类型计数器,也提供了 double 类型的 DoubleAdder 计数器,它们都继承了 Striped64,原理是类似的。

小结

LongAdder 通过 base 和 Cell 数组来存储值。当没有竞争的情况下直接 CAS 操作 base 即可;当存在竞争时,采用分而治之思想将不同线程处理计数请求映射到不同的 Cell 上以增加并发度。

在并发较小时,使用 AtomicLong 更简单高效;在并发较大时,LongAdder 性能更高,随着扩容 Cell 数组,最终可以达到一种无竞争的状态。