4.原子操作类:AtomicLong、LongAdderLong、Accumulator

JUC包中有AtomicInteger、AtomicLong和AtomicBoolean等原子性操作类,它们原理类似,下面以AtomicLong为例进行讲解。

AtomicLong

底层的操作自增自减都用Unsafe类中的getAndAddLong方法(获取本类内存偏移值)实现的,getAndAddLong底层用Unsafe类中的CAS方法,大量线程竞争只有一个线程成功,会导致大量的自旋尝试

public class AtomicLong extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 1927816293512124184L;
    // 获取Unsafe实例:AtomicLong类是通过BootStarp类加载器加载的所以可以拿到Unsafe类的实例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // 存放变量value的偏移量
    private static final long valueOffset;
	// 判断JVM是否支持Long类型的无锁CAS
    static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
    private static native boolean VMSupportsCS8();
    static {
        try {
            // 获取value在AtomicLong类中的偏移量
            valueOffset = unsafe.objectFieldOffset
                (AtomicLong.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    // 实际变量值 volatile是为了多线程下保证内存的可见性 
    private volatile long value;
    public AtomicLong(long initialValue) {
        value = initialValue;
    }
    ....
}

递增和递减操作代码

public final long getAndIncrement() {
    return unsafe.getAndAddLong(this, valueOffset, 1L);
}
public final long getAndDecrement() {
    return unsafe.getAndAddLong(this, valueOffset, -1L);
}
public final long incrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
public final long decrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
}
// Unsafe类中的getAndAddLong方法  
// 参数1: AtomicLong实例的引用
// 参数2: value变量在AtomicLong中的偏移值
// 参数3: 要设置的第二个变量的值
public final long getAndAddLong(Object var1, long var2, long var4) {
    long var6;
    //CAS操作设置var1对象偏移为var2处的值增加var4
    do {
        var6 = this.getLongVolatile(var1, var2);
    } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
    return var6;
}

上述代码中,valueOffset为AtomicLong在static语句块中进行初始化时通过Unsafe类获得的本类中value属性的内存偏移值

可以看到,上述四个方法都是基于Unsafe类中的getAndAddLong方法(原子性操作)实现的。

compareAndSet方法

public final boolean compareAndSet(long expect, long update) {
    return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

内部还是调用了Unsafe类中的CAS方法。如果原子变量中的value值等于expect,则使用update值更新该值并返回true,否则false。

AtomicLong使用示例

public class AtomicLongDemo {
    private static AtomicLong al = new AtomicLong(0);
    public static long addNext() {
        return al.getAndIncrement();
    }
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            new Thread() {
                @Override
                public void run() {
                    AtomicLongDemo.addNext();
                }
            }.start();
        }

        // 等待线程运行完
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("final result is " + AtomicLongDemo.addNext());
    }
}

AtomicLong使用CAS非阻塞算法,性能比使用synchronized等的阻塞算法实现同步好很多。但在高并发下,大量线程会同时去竞争更新同一个原子变量,由于同时只有一个线程的CAS会成功,会造成大量的自旋尝试,十分浪费CPU资源。因此,JDK8中新增了原子操作类LongAdder。

LongAdder

JDK8中新增的原子操作类LongAdder。

问题:

  1. LongAdder结构是怎样的

    将一个原子性变量分解成多个原子性变量。内部维护多个Cell(初始值为0的long类型变量),如果在一个Cell原子变量失败了会尝试在其他Cell变量上进行CAS尝试。

    在获取LongAdder当前值时,是把所有的Cell变量值的value值累加再加上base返回的。

    延迟初始化原子更新数组(Cell数组默认为null),Cells占用内存比较大,在需要时创建,惰性加载,Cell数组null并且并发线程较少时,所有的累加都是对base变量进行的。

  2. 当前线程应该访问Cell数组里面的哪一个Cell元素

    getProbe() & m

    m是当前cells数组元素个数-1

    getProbe()用于获取当前线程中变量threadLocalRandomProbe的值

  3. 如何初始化Cell数组

    cellsBusy是一个标识,0:当前cells数组没有在被初始化或扩容也没有新建Cell元素;1:cells数组在被初始化或扩容或在新建Cell元素。通过CAS操作进行0、1状态切换使用casCellsBusy函数。

    初始化cells元素个数为2,h & 1 计算当前线程应该访问cells数组的哪个位置,即当前线程的threadLocalRandomProbe变量值 & (cells数组元素个数-1)

    标识cells数组已被初始化。

    最后对cellsBusy重置标记 线程安全因为cellsBusy是volatile保证了内存可见性,且没有其他地方修改cellsBusy的值。

        // 初始化cells数组(重点)
        // cellsBusy是一个标识,0:当前cells数组没有在被初始化或扩容也没有新建Cell元素;1:cells数组在被初始化或扩容或在新建Cell元素
        // 通过CAS操作进行0、1状态切换使用casCellsBusy函数。
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           
                if (cells == as) {
                    // 初始化cells元素个数为2
                    Cell[] rs = new Cell[2];
                    // h & 1 计算当前线程应该访问cells数组的哪个位置,即当前线程的threadLocalRandomProbe变量值 & (cells数组元素个数-1)
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    // 标识cells数组已被初始化
                    init = true;
                }
            } finally {
                // 对cellsBusy重置标记 线程安全因为cellsBusy是volatile保证了内存可见性,且没有其他地方修改cellsBusy的值
                cellsBusy = 0;
            }
    
  4. Cell数组如何扩容

    扩容条件:当前cells元素个数小于CPU个数且有冲突(当前多个线程cells访问了cells中同一个元素)使其中一个线程CAS失败才会进行扩容。

    注意:每个CPU都运行一个线程时,也就是每个cell都使用一个CPU处理时性能才是最高的。

    扩容操作:

    1. 先通过CAS设置cellBusy为1(casCellsBusy),然后才能进行扩容

    2. 将容量扩充到之前的2倍,并复制Cell元素到扩容后的数组,扩容后的数组里除了包含复制过来的元素之外还包含其他新元素,新元素为null

      Cell[] rs = new Cell[n << 1];
      for (int i = 0; i < n; ++i)
         rs[i] = as[i];
      cells = rs;
      

    整个扩充逻辑:

    		 // 如果当前cells长度大于CPU个数,则不进行扩容。如果当前cells已经过时(其他线程对cells执行了扩容操作,改变了cells指向),也不会扩容。因为每个CPU都运行一个线程时,也就是每个cell都使用一个CPU处理时性能才是最高的。
                else if (n >= NCPU || cells != as)
                    collide = false; 
               // 是否有冲突,执行到此处说明a.cas()执行失败,即有冲突,将collide置为true,跳过扩容阶段,重新获取probe,到cells不同位置尝试cas,再次失败则扩容
                else if (!collide)
                    collide = true;  
                // 扩容(重点)
                // 如果当前元素个数没有达到CPU个数并且有冲突(当前多个线程访问了cells中的同一个元素,从而导致冲突使其中一个线程CAS失败)则扩容。
                // 通过CAS设置cellBusy为1
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {
                            // 将容量扩充到之前的2倍,并复制Cell元素到扩容后的数组,扩容后的数组里除了包含复制过来的元素之外还包含其他新元素,新元素为null
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue; // 扩容后再次尝试(扩容后cells长度改变,
                              // 根据(n - 1) & h计算当前线程在cells中对应元素下标会变化,减少再次冲突的可能性)
                }
                h = advanceProbe(h); // 重新计算线程probe,减小下次访问cells元素时的冲突机会
            }
    
  5. 线程访问分配的Cell元素有冲突后如何处理

    对CAS失败的线程重新计算当前线程的随机值threadLocalRandomProbe,以减少下次访问cells元素冲突的机会

    // 重新计算线程probe,减小下次访问cells元素时的冲突机会
                h = advanceProbe(h); 
    
  6. 如何保证线程操作被分配的Cell元素的原子性

    当前线程通过分配的Cell元素的cas函数来保证对Cell元素value值更新的原子性。

    a.cas(v = a.value, v + x)

由上可知,AtomicLong的性能瓶颈是多个线程同时去竞争一个变量的更新权导致的。而LongAdder通过将一个变量分解成多个变量,让同样多的线程去竞争多个资源解决了此问题。

原理

如图,LongAdder内部维护了多个Cell,每个Cell内部有一个初始值为0的long类型变量,这样,在同等并发下,对单个变量的争夺会变少。此外,多个线程争夺同一个变量失败时,会到另一个Cell上去尝试,增加了重试成功的可能性。当LongAdder要获取当前值时,将所有Cell的值于base相加返回即可

LongAdder维护了一个初始值为null的Cell数组和一个基值变量base。当一开始Cell数组为空且并发线程较少时,仅使用base进行累加当并发增大时,会动态地增加Cell数组的容量

Cell类中使用了**@sun.misc.Contented注解进行了字节填充**,解决了由于连续分布于数组中且被多个线程操作可能造成的伪共享问题(关于伪共享,可查看《伪共享(false sharing),并发编程无声的性能杀手》这篇文章)。

LongAdder

先看LongAdder的定义

public class LongAdder extends Striped64 implements Serializable

Striped64类中有如下三个变量:

transient volatile Cell[] cells;

transient volatile long base;
// 实现CAS自旋锁,状态只有0、1
transient volatile int cellsBusy;

cellsBusy用于实现自旋锁,状态值只有0和1,当创建Cell元素、扩容Cell数组或初始化Cell数组时,使用CAS操作该变量来保证同时只有一个变量可以进行其中之一的操作。

Cell

下面看Cell的定义

@sun.misc.Contended static final class Cell {
    volatile long value; // volatile确保内存可见性
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); // CAS操作保证value原子性
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

将value声明为volatile确保了内存可见性,CAS操作保证了value值的原子性,@sun.misc.Contented注解的使用解决了伪共享问题。

LongAdder方法

下面来看LongAdder中的几个方法:

  • long Sum():返回当前的值,内部操作是累加所有Cell内部的value值后再累加base。sum的结果并非一个精确值,因为计算总和时并没有对Cell数组加锁,累加过程中Cell的值可能被更改。
public long sum() {
    Cell[] as = cells; 
    Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
  • void reset():将base和Cell数组中非空元素的值置为0。
public void reset() {
    Cell[] as = cells; Cell a;
    base = 0L;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                a.value = 0L;
        }
    }
}
  • long sumThenRest():sum的改造版本。使用sum累加对应的Cell值后,把当前Cell的值重置为0,base重置为0。
public long sumThenReset() {
    Cell[] as = cells; Cell a;
    long sum = base;
    base = 0L;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null) {
                sum += a.value;
                a.value = 0L;
            }
        }
    }
    return sum;
}
  • void add(long x):判断cells数组是否为空,非空则进入内层,否则尝试直接通过CAS操作在base上进行add。内层代码中,声明了一个uncontented变量来记录调用longAccumulate方法前在相应cell上是否进行了失败的CAS操作。当前线程通过分配Cell元素的cas函数来保证对Cell元素value值更新的原子性。
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    // 判断cells是否为空,如果不为空则直接进入内层判断,
    // 否则尝试通过CAS在base上进行add操作,若CAS成功则结束,否则进入内层
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // 记录cell上的CAS操作是否失败
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            // 计算当前线程应该访问cells数组的哪个元素:getProbe() & m (m是当前cells数组元素个数-1,getProbe()用于获取当前线程中变量threadLocalRandomProbe的值,这个值一开始为0)
            (a = as[getProbe() & m]) == null ||
            // 尝试通过CAS操作在对应cell上add
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended); // 当前线程中变量threadLocalRandomProbe的值,这个值一开始为0,如果是0就要初始化
    }
}

longAccumelate方法

longAccumulate时Striped64类中定义的,其中包含了初始化cells数组,改变cells数组长度,新建cell等逻辑。

判断cells是否为空或者长度为0:

  • 如果空或者长度为0则尝试进行cells数组初始化,初始化失败的话则尝试通过CAS操作在base上进行add,仍然失败则重走一次流程;

  • 如果cells不为空且长度大于0,则获取当前线程对应于cells中的元素,如果该元素为null则尝试创建,否则尝试通过CAS操作在上面进行add仍失败则扩容

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
    // 初始化当前线程的变量threadLocalRandomProbe的值
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current();
        h = getProbe();
        // 标记执行longAccumulate前对相应cell的CAS操作是否失败,失败为false
        wasUncontended = true; 
    }
    // 是否冲突,如果当前线程尝试访问的cell元素与其他线程冲突,则为true
    boolean collide = false; 
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // 当前cells不为空且元素个数大于0则进入内层,否则尝试初始化
        if ((as = cells) != null && (n = as.length) > 0) {
            // 当前线程调用add方法并根据当前线程的随机数threadLocalRandomProbe和cells元素个数计算要访问的Cell元素下标 如果发现下标元素的值为null
            if ((a = as[(n - 1) & h]) == null) {//
                // 尝试添加新的cell
                if (cellsBusy == 0) {   
                    // 新增一个Cell元素到cells数组,并且将其添加到cells数组之前要竞争 设置cellsBusy为1
                    Cell r = new Cell(x);
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;
                    }
                }
                collide = false;
            }
             // 如果已经进行了失败的CAS操作
            else if (!wasUncontended) 
                // 则不调用下面的a.cas()函数(反正肯定是失败的),而是重新计算probe值来尝试
                wasUncontended = true; 
            // 当前Cell存在,则执行CAS设置
            else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))
                break;
           // 如果当前cells长度大于CPU个数,则不进行扩容。如果当前cells已经过时(其他线程对cells执行了扩容操作,改变了cells指向),也不会扩容
           // 因为每个CPU都运行一个线程时,也就是每个cell都使用一个CPU处理时性能才是最高的。
            else if (n >= NCPU || cells != as)
                collide = false; 
           // 是否有冲突,执行到此处说明a.cas()执行失败,即有冲突,将collide置为true,跳过扩容阶段,重新获取probe,到cells不同位置尝试cas,再次失败则扩容
            else if (!collide)
                collide = true;  
            // 扩容(重点)
            // 如果当前元素个数没有达到CPU个数并且有冲突(当前多个线程访问了cells中的同一个元素,从而导致冲突使其中一个线程CAS失败)则扩容。
            // 通过CAS设置cellBusy为1
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {
                        // 将容量扩充到之前的2倍,并复制Cell元素到扩容后的数组,扩容后的数组里除了包含复制过来的元素之外还包含其他新元素,新元素为null
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue; // 扩容后再次尝试(扩容后cells长度改变,
                          // 根据(n - 1) & h计算当前线程在cells中对应元素下标会变化,减少再次冲突的可能性)
            }
            // 重新计算线程probe,减小下次访问cells元素时的冲突机会
            h = advanceProbe(h); 
        }
        // 初始化cells数组(重点)
        // cellsBusy是一个标识,0:当前cells数组没有在被初始化或扩容也没有新建Cell元素;1:cells数组在被初始化或扩容或在新建Cell元素
        // 通过CAS操作进行0、1状态切换使用casCellsBusy函数。
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           
                if (cells == as) {
                    // 初始化cells元素个数为2
                    Cell[] rs = new Cell[2];
                    // h & 1 计算当前线程应该访问cells数组的哪个位置,即当前线程的threadLocalRandomProbe变量值 & (cells数组元素个数-1)
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    // 标识cells数组已被初始化
                    init = true;
                }
            } finally {
                // 对cellsBusy重置标记 线程安全因为cellsBusy是volatile保证了内存可见性,且没有其他地方修改cellsBusy的值
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        // 尝试通过base的CAS操作进行add,成功则结束当前函数,否则再次循环
        else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))
            break; 
    }
}

LongAccumulator

LongAdder是LongAccumulator的特例,两者都继承自Striped64。

看如下代码:

public LongAccumulator(LongBinaryOperator accumulatorFunction,
                        long identity) {
    this.function = accumulatorFunction;
    base = this.identity = identity;
}

public interface LongBinaryOperator {
    long applyAsLong(long left, long right);
}

LongAccumulator构造器允许传入一个双目运算符接口用于自定义加法规则,还允许传入一个初始值。

自定义的加法函数是如何被应用的呢?以上提到的longAccumulate()方法中有如下代码:

a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x)))

LongAdder的add()方法中调用longAccumulate()方法时传入的是null,而LongAccumulator的accumulate()方法传入的是this.function,即自定义的加法函数。