【JUC】CAS 原理

CAS

CAS 是无锁实现线程安全的方式,是一种乐观锁

CAS是指Compare And Swap比较并交换,它是一条CPU并发原语,是一种很重要的同步思想,是乐观锁思想的一种实现。如果主内存的值跟期望值一样,那么就进行修改,否则一直重试,直到一致为止。

CAS并发原语体现在Java语言中就是sun.misc.Unsafe类中的各个方法(由C++实现)。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。再次强调,由于CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。CAS的好处是其能在保证数据一致性的同时,也保证并发性。

Java中CAS操作的执行依赖于Unsafe类的 native本地方法(由C++实现,其调用操作系统底层原语,能保证原子性)。

CAS 常和 volatile 配合使用。前者不断自旋比较共享变量在主存中的最新值是否和预期值一直,后者负责保证共享变量的值始终为主存中的最新值(可见性)。

为什么无锁效率高

  • 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻:线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大
  • 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。

但是特别频繁的 CAS 也会导致性能降低,因此 CAS 适合短时间内的并发控制,不适合长时间的自旋

CAS 的特点

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

  • CAS 是基于乐观锁的思想
  • synchronized 是基于悲观锁的思想
  • CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
    • 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
    • 但如果竞争激烈(写操作多),可以想到重试必然频繁发生,反而效率会受影响

原子类型工具类

java.util.concurrent.atomic 并发包提供了一些并发工具类,这里把它分成五类:

  • 原子基本类型
    • AtomicInteger:整型原子类
    • AtomicLong:长整型原子类
    • AtomicBoolean :布尔型原子类
  • 原子引用 AtomicReference
  • 原子数组
    • AtomicIntegerArray:整形数组原子类
    • AtomicLongArray:长整形数组原子类
    • AtomicReferenceArray :引用类型数组原子类
  • 字段更新器 AtomicReferenceFieldUpdater
  • 原子累加器 LongAdder

原子整型 AtomicInteger

常用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(0);
// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
System.out.println(i.getAndIncrement());
// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println(i.incrementAndGet());
// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println(i.decrementAndGet());
// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());
// 获取并加值(i = 0, 结果 i = 5, 返回 0)
System.out.println(i.getAndAdd(5));
// 加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println(i.addAndGet(-5));
// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.getAndUpdate(p -> p - 2));
// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.updateAndGet(p -> p + 2));
// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1值, 结果 i = 0, 返回 0)
// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
}

原子整型类AtomicInteger类的方法底层即使用了CAS思想:

1
2
3
4
5
6
7
8
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(5);
System.out.println(atomicInteger.compareAndSet(5, 2019)+"\t current data : "+ atomicInteger.get());
//修改失败
System.out.println(atomicInteger.compareAndSet(5, 1024)+"\t current data : "+ atomicInteger.get());
}
}

第一次修改,期望值为5,主内存也为5,修改成功,为2019。第二次修改,期望值为5,主内存为2019,修改失败。

AtomicInteger类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;

// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;

/**
* Creates a new AtomicInteger with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicInteger(int initialValue) {
value = initialValue;
}

/**
* Creates a new AtomicInteger with initial value {@code 0}.
*/
public AtomicInteger() {
}

...

/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

...
}

其内还有许多采用了CAS思想的方法,例如AtomicInteger.getAndIncrement()方法,发现其没有加synchronized也实现了同步。这是为什么?

1
2
3
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

CAS 底层原理

AtomicInteger内部维护了三个比较重要的参数:

1
2
3
4
5
6
7
8
9
10
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
private volatile int value;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
  • unsafeAtomicInteger类通过调用 Unsafe类的 native 本地方法保证修改操作的原子性
  • valueOffset:表示该变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的
  • value:用volatile修饰,保证了多线程之间的内存可见性

Unsafe 类

Java中CAS操作的执行依赖于Unsafe类的 native本地方法。

Unsafe是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe类存在于sun.misc包中,其内部方法操作可以像C的指针一样直接操作内存。

注意Unsafe类中的大多方法都是native修饰的,也就是说Unsafe类中的方法可以直接调用操作系统底层资源执行相应任务

AtomicInteger.getAndIncrement()调用了Unsafe.getAndAddInt()方法:

1
2
3
public final int getAndIncrement(){
return unsafe.getAndAddInt(this,valueOffset,1);
}

Unsafe类的 getAnddAddInt() 方法:

1
2
3
4
5
6
7
8
9
10
public final int getAnddAddInt(Object var1,long var2,int var4){
int var5;
do{
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}

public native int getIntVolatile(Object var1, long var2);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

其中,getIntVolatile() 方法和 compareAndSwapInt() 方法都是 native 本地方法,其由底层操作系统执行,能保证操作的原子性

UnSafe.getAndAddInt()源码解释:

  • var1AtomicInteger对象本身。
  • var2:该对象值得引用地址。
  • var4:需要变动的数量。
  • var5:是用var1var2找出的主内存中真实的值。
  • 用该对象当前的值与var5比较:
    • 如果相同,更新var5+var4并且返回true,
    • 如果不同,继续取值然后再比较,直到更新完成。

这个方法的var1和var2,就是根据对象偏移量得到在主内存的快照值var5。然后compareAndSwapInt方法通过var1和var2得到当前主内存的实际值。如果这个实际值快照值相等,那么就更新主内存的值为var5+var4。如果不等,那么就一直循环,一直获取快照,一直对比,直到实际值和快照值相等为止。

比如有A、B两个线程,一开始都从主内存中拷贝了原值为3,A线程执行到var5=this.getIntVolatile,即var5=3。此时A线程挂起,B修改原值为4,B线程执行完毕,由于加了volatile,所以这个修改是立即可见的。A线程被唤醒,执行this.compareAndSwapInt()方法,发现这个时候主内存的值不等于快照值3,所以继续循环,重新从主内存获取。

底层汇编

Unsafe类中的compareAndSwapInt()是一个本地方法,该方法的实现位于unsafe.cpp中:

1
2
3
4
5
6
7
8
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)
UnsafeWrapper("Unsafe_CompareAndSwaplnt");
oop p = JNlHandles::resolve(obj);
jint* addr = (jint *)index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e))== e;
UNSAFE_END
//先想办法拿到变量value在内存中的地址。
//通过Atomic::cmpxchg实现比较替换,其中参数x是即将更新的值,参数e是原内存的值。

ABA 问题

CAS 缺点

CAS实际上是一种自旋锁,其缺点是:

  • 一直循环,开销比较大(尤其是长时间的自旋对 CPU 资源占用较多)。
  • 只能保证一个变量的原子操作,多个变量依然要加锁。
  • 引出了ABA问题

所谓ABA问题,就是比较并交换的循环,存在一个时间差,而这个时间差可能带来意想不到的问题。比如线程T1将值从A改为B,然后又从B改为A。线程T2看到的就是A,但是却不知道这个A发生了更改。尽管线程T2 CAS操作成功,但不代表就没有问题。

有的需求,比如CAS,只注重头和尾,只要首尾一致就接受。但是有的需求,还看重过程,中间不能发生任何修改,这就引出了AtomicReference原子引用。

ABA 主要是指引用(指针)值没变,但实际对象变化了(新生成的),之前那个已经被回收了,如果这时再对新生成的这个引用变量操作,无疑是错误的,因此需要判断版本号是否一致,如果不一致则不能删除。设置版本号严格递增,就知道这个改变是否发生过了。

ABA 问题的解决

使用AtomicStampedReference类可以解决ABA问题。这个类维护了一个“版本号Stamp,在进行CAS操作的时候,不仅要比较当前值,还要比较版本号。只有两者都相等,才执行更新操作。即乐观锁思想。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class ABAProblem {
static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);

public static void main(String[] args) {
System.out.println("======ABA问题的产生======");

new Thread(() -> {
atomicReference.compareAndSet(100, 101);
atomicReference.compareAndSet(101, 100);
}, "t1").start();

new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();

}
System.out.println(atomicReference.compareAndSet(100, 2019) + "\t" + atomicReference.get().toString());
}, "t2").start();

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("======ABA问题的解决======");
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t第一次版本号: " + stamp);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t第二次版本号: " + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t第三次版本号: " + atomicStampedReference.getStamp());
}, "t3").start();

new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t第一次版本号: " + stamp);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean result = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "\t修改成功与否:" + result + " 当前最新版本号"
+ atomicStampedReference.getStamp());
System.out.println(Thread.currentThread().getName() + "\t当前实际值:" + atomicStampedReference.getReference());
}, "t4").start();
}
}

输出结果:

1
2
3
4
5
6
7
8
9
============以下是ABA问题的产生==========
true 2019
============以下是ABA问题的解决==========
t3 第一次版本号1
t4 第一次版本号1
t3 第二次版本号2
t3 第三次版本号3
t4 修改成功否:false 当前最新实际版本号:3
t4 当前实际最新值100

流程

https://blog.csdn.net/chenzengnian123/article/details/122700255

img

AtomicMarkableReference

AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:A -> B -> A ->C,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference

原子引用 AtomicReference

AtomicInteger对整数进行原子操作,如果是一个POJO呢?可以用AtomicReference来包装这个POJO,使其操作原子化。

原子引用判断的是 == ,判断引用地址是否修改。场景是引用对象地址会改变,例如 BigDecimal

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class User {
String name;
int age;
User(String name, int age) {
this.name = name;
this.age = age;
}
}

public class AtomReferenceDemo {
public static void main(String[] args) {
User user1 = new User("Jack", 25);
User user2 = new User("Lucy", 21);
AtomicReference<User> atomicReference = new AtomicReference<>();
atomicReference.set(user1);
System.out.println(atomicReference.compareAndSet(user1, user2)); // true
// 此时atomicReference变为user2,不再是user1,故原值不匹配,return false
System.out.println(atomicReference.compareAndSet(user1, user2)); // false
}
}

面试题:BigDecimal 类一定能保证数据的计算正确吗?

不一定:因为该类在高并发下不安全,需要使用 AtomicReference 包装,这样才是线程安全的


原子数组 AtomicIntegerArray

使用原子的方式更新数组里的某个元素

  • AtomicIntegerArray:整形数组原子类
  • AtomicLongArray:长整形数组原子类
  • AtomicReferenceArray:引用类型数组原子类

字段更新器 AtomicReferenceFieldUpdater

利用字段更新器 AtomicReferenceFieldUpdater,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常:

1
Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type

原子累加器 LongAdder

使用原子累加器进行累加时性能更高。原因:在有竞争时,设置多个累加单元(但不会超过cpu的核心数),Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1],以此类推。最后查询值的时候再将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

细节:使用 LongAdder 时,则是在内部维护多个 Cell 变量,每个 Cell 里面有一个初始值为 0 的 long 型变量,这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少,这变相地减少了争夺共享资源的并发量。另外,多个线程在争夺同一个 Cell 原子变量时如果失败了,它并不是在当前 Cell 变量上一直自旋 CAS 重试,而是尝试在其他 Cell 的变量上进行CAS 尝试,这个改变增加了当前线程重试 CAS 成功的可能性。最后,在获取 LongAdder 当前值时,是把所有 Cell 变量的value值累加后再加上base返回的。

LongAdder 累加器的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
demo(() -> new LongAdder(), adder -> adder.increment());
}
for (int i = 0; i < 5; i++) {
demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
}

}

private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
long start = System.nanoTime();
List<Thread> ts = new ArrayList<>();
// 4 个线程,每人累加 50 万
for (int i = 0; i < 40; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 500000; j++) {
action.accept(adder);
}
}));
}
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost:" + (end - start)/1000_000);
}

源码分析

具体分析见视频 https://www.bilibili.com/video/BV16J411h7Rd?p=179