【JUC】AQS 源码分析

AQS 理论

AQS:AbstractQueuedSynchronizer 抽象的队列同步器

AQS是用来构建锁或者其它同步器组件的重量级基础框架整个JUC体系的基石, 通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量state表示持有锁的状态。

AQS 中大量应用了 CAS + Unsafe 保证自旋原子性地添加节点。

  • 每次在抢占锁的时候都会CAS,因为可能其他非公平锁并发地抢占了锁,那么当前线程就需要再次自旋尝试获取锁。
  • 每次在修改时(例如添加修改节点时)都使用 Unsafe 类的 native 方法保证修改的原子性。

img

CLH:Craig、Landin and Hagersten队列(三位科学家的名字简写),是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO

image-20210926135141194

JUC的locks包下:

  • AbstractOwnableSynchronizer
  • AbstractQueuedLongSynchronizer
  • AbstractQueuedSynchronizer

AQS是一个抽象的父类,可以将其理解为一个框架。基于AQS这个框架,我们可以实现多种同步器,比如下方图中的几个Java内置的同步器。同时我们也可以基于AQS框架实现我们自己的同步器以满足不同的业务场景需求。

img

AQS中使用了模板方法的设计模式,AQS父类中的 tryAcquire() 方法和 tryRelease() 方法中只有 throw new UnsupportedOperationException();,说明如果子类不重写代码时被调用就会抛出异常:

image-20210927102559000

AQS 能干嘛?

加锁会导致阻塞:有阻塞就需要排队,实现排队必然需要有某种形式的队列来进行管理

img

1、抢到资源的线程直接使用办理业务,抢占不到资源的线程的必然涉及一种排队等候机制,抢占资源失败的线程继续去等待(类似办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。

2、既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

3、如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node) ,通过CAS、自旋以及LockSuport.park()的方式,维护state变量的状态,使并发达到同步的效果。

AQS 结构

AQS使用一个volatileint类型的成员变量state来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,将每条要去抢占资源的线程封装成一个Node,节点来实现锁的分配,通过CAS完成对state值的修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

private static final long serialVersionUID = 7373984972572414691L;

// Creates a new {@code AbstractQueuedSynchronizer} instance
protected AbstractQueuedSynchronizer() { }

// Wait queue node class.
static final class Node {
}

// 头结点,你直接把它当做当前持有锁的线程 可能是最好理解的。实际上可能略有出入,往下看分析即可
private transient volatile Node head;

// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;

// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;

// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

// Returns the current value of synchronization state.
protected final int getState() {...}

// Sets the value of synchronization state.
protected final void setState(int newState) {...}

// Atomically sets synchronization state to the given updated
protected final boolean compareAndSetState(int expect, int update) {
...
}
}

四个比较重要的属性

  • Node head:指向哨兵节点
  • Node tail:指向尾结点,新来的线程将成为尾结点
  • state:代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁。这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
  • Thread exclusiveOwnerThread:占有排它锁的线程,代表当前哪个线程正在占有着锁,后续抢占锁时将依据该值是否为 null 来判断锁是否已经被占用;释放锁时会将该值设置为 null

Node 类结构

Node 的数据结构其实就是 thread+ waitStatus+ pre+ next四个属性而已。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static final class Node {
// 标识节点当前在共享模式下
static final Node SHARED = new Node();
// 标识节点当前在独占模式下
static final Node EXCLUSIVE = null;

// ======== 下面的几个int常量是给waitStatus用的 ===========
/** waitStatus value to indicate thread has cancelled */
// 代表此线程取消了争抢这个锁
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
// 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
// 等待condition唤醒
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
// 共享模式同步状态获取讲会无条件的传播下去(共享模式下,该字段才会使用)
static final int PROPAGATE = -3;
// ===============-2和-3用的不多,暂时不分析======================================

// 取值为上面的1、-1、-2、-3,或者0(以后会讲到,waitStatus初始值为0)
// 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
// ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的
volatile int waitStatus;
// 前驱节点的引用
volatile Node prev;
// 后继节点的引用
volatile Node next;
// 这个就是线程本尊
volatile Thread thread;

}

AQS 队列基本结构

img

注意排队队列,不包括head(也就是后文要说的哨兵节点)。


CANCELLED 1 因为超时或中断设置为此状态,标志节点不可用
SIGNAL -1 处于此状态的节点释放资源时会唤醒后面的节点
CONDITION -2 处于条件队列里,等待条件成立(signal signalall) 条件成立后会置入获取资源的队列里
PROPAGATE -3 共享模式下使用,头节点获取资源时将后面节点设置为此状态,如果头节点获取资源后还有足够的资源,则后面节点会尝试获取,这个状态主要是为了共享状态下队列里足够多的节点同时获取资源
0 初始状态

Node 的 waitStatus 的含义


AQS 整体工作流程

本文将以 ReentrantLock 源码为例介绍AQS源码。

Lock接口的实现类,基本都是通过聚合了一个队列同步器的子类完成线程访问控制的,即每个实现类内部都有一个AQS的子类 Sync

image-20210926154035848

同时 Sync 类有两个子类:FairSyncNonfairSync。继承关系:

img

加锁阶段流程示意图:

加锁阶段流程图

解锁阶段流程示意图:

解锁阶段流程图

AQS 加锁过程源码分析

1. 构造器中创建公平锁或非公平锁 sync

ReentrantLock 的对象在实例化时将根据构造器中传入参数的不同创建公平锁或非公平锁对象sync

image-20210926154334057

公平锁与非公平锁的区别在于:公平锁在获取同步状态时多了一个限制条件:!hasQueuedPredecessors(),其作用是在加锁时判断等待队列中前面是否已经有正在排队的线程

image-20210926154908695

hasQueuedPredecessors()方法:

image-20210926155809751

Predecessors:前辈

hasQueuedPredecessors()中判断队列中是否前面还有正在排队的线程,导致公平锁和非公平锁的差异如下:

  • 公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中,不会直接去抢占;
  • 非公平锁:不管等待队列中是否有其他线程正在等待,直接尝试占有锁。

非公平锁的非公平体现在:其 lock() 方法一开始会先CAS尝试抢锁,若失败则进入队列中等待,这之后就和公平锁一样要按照顺序排队了。

image-20210927160002837

2. sync.lock():外部程序尝试加锁

image-20210927101855258

在主程序调用 ReentrantLocklock() 方法尝试加锁后,ReentrantLock对象将调用内聚的公平锁或非公平锁sync对象的lock()方法(公平锁和非公平锁的lock()方法有所区别,体现在非公平锁会先尝试抢占一次锁):

image-20210927160006504

​ 此处以非公平锁 NonfairSync 为例分析 lock() 方法的细节:

首先,试图将状态量state设置为1,此时是使用的CAS机制,底层调用Unsafe类的 native 方法进行原子操作。

  • 若设置成功,说明没有线程占有锁,则将当前线程设置为占有锁的线程。
  • 若设置失败,说明已经有线程正在占有锁,则调用 acquire(1) 尝试再次抢占锁(因为可能有重入锁的情况发生,即当前线程再次重入该锁,自然是需要放行的),若还是抢占失败,则进入队列排队等待。

image-20210926163751633

其中,compareAndSetState(0,1) 方法底层调用Unsafe类的 native 方法进行原子操作:

image-20210926162216445

3. acquire()

该方法为 AQS 父类中的方法,为公平锁和非公平锁公用

若设置失败,说明已经有线程正在占有锁,则调用 acquire(1) 尝试再次抢占锁。acquire() 方法内:

  • 首先调用子类重写的tryAcquire(arg)方法尝试占有锁。若占有成功,则直接返回,无需执行后续操作
  • 若占有失败,则调用 addWaiter() 方法将当前线程封装成Node类型的节点,添加到等待队列(双向链表)的尾部
  • 调用 acquireQueued() 方法将当前节点的前一个节点的waitStatus设置为-1,并调用LockSupport.park() 方法阻塞当前线程

image-20210926202310361

下面逐一分析上述三个方法的细节。

4. tryAcquire():尝试占有锁

首先以非公平锁为例,执行tryAcquire(arg)方法时将调用非公平锁重写的 nonfairTryAcquire() 方法:

  1. 再次CAS,试图抢占锁。大多数情况还会抢占失败,但也可能在此时占有锁的线程恰好释放锁,那么当前线程就可以顺利抢占到锁,并将当前线程设置为占有锁的线程,并返回 true 代表尝试获取锁成功。这也是一种双重检验的思想(外面抢占一次失败,进来再抢占一次)。
  2. 如果当前是重入锁的情况(即当前线程和占有锁的线程是同一个线程,也代表了当前锁被同一个线程重入了),那么将状态量state大小增加,并返回 true 代表尝试获取锁成功。
  3. 若1、2两次判断都不成立,则返回 false 代表尝试获取锁失败。

image-20210926175021909

若返回false,代表抢占锁失败,则进行下一步:addWaiter() 加入到等待队列(双向链表)中。


公平锁与非公平锁在 tryAcquire() 方法中的区别在于:公平锁在获取同步状态时多了一个限制条件:!hasQueuedPredecessors(),其作用是在加锁时判断等待队列中前面是否已经有正在排队的线程

image-20210926154908695

因此公平锁的线程在抢占锁前会判断当前队列中是否已经有正在排队的线程,如果有就不会尝试抢占锁,直接进入队列排队等待。


5. addWaiter():添加到等待队列中

addWaiter()方法中,将当前线程封装成一个Node节点,并添加到等待队列(双向链表)的队尾。将tail指向当前节点。

image-20210926212716145

  • 橙黄色区域的代码在tail != null时才会触发,代表当前队列中已经有了尾结点,说明当前线程不是第一个来排队的线程,无需再创建哨兵节点(head指向的头节点)
  • 粉色区域的代码在tail = null时触发,代表当前队列中还没有尾结点,说明当前线程是第一个来排队的线程,需要创建哨兵节点(head指向的头节点)

哨兵节点:又称为虚节点,傀儡节点。其内不存储任何信息,只是占位。真正有数据的节点是从第二个开始的。

粉色区域的 enq(node) 方法只会执行一次,用于创建哨兵节点(head指向的头节点):

image-20210926220033922

其中,再次if == null判断的目的是双重检验,防止刚进来时尾结点就恰好被其他线程设置了。else内的代码是为了在这种情况下将当前节点作为那个恰好加塞的节点的后驱节点。同时该过程采用CAS思想,一直自旋直到某一次成功将当前节点原子性地加入到队列中

compareAndSetXxx():都是调用底层 Unsafe 类的 native 方法,保证修改的原子性。

该过程示意图:

img

订正:队列中B的waitStatus应该等于-1,因为在线程C排队时会将其前一个节点B的 waitStatus 设置为 -1(waitStatus的值将在下一步 acquireQueued() 被设置)

6. acquireQueued():阻塞当前线程

当执行完 addWaiter() 后,当前节点就被加入到了队列中排队等待。

此时再调用 acquireQueued()方法阻塞当前线程:

  • 在 for 循环中每次都取出当前节点的前一个节点 p
  • 判断 p 是否是哨兵节点,如果是,说明当前节点排在队首,那么尝试抢占一次锁。若抢占成功,取消当前的哨兵节点的引用(未来将被GC),把当前节点设为新的哨兵节点,并开始占有锁。退出 for 循环
  • p 不是哨兵节点或当前节点抢占失败,则调用 shouldParkAfterFailedAcquire() 方法将前一个节点的 waitStatus 设置为 -1,并在 parkAndCheckInterrupt() 方法中调用 LockSupport.park(this) 阻塞当前线程,直到其前面的线程都出队并且自己被 unpark() 唤醒,才会取消阻塞再次进入 for 循环自旋尝试获取锁(因为可能其他非公平锁会在此时抢占锁,那么当前线程又要阻塞)。

image-20210927154405966

上述过程将一直在 for 循环中自旋,直到当前节点前面所有的节点都出队,并且锁被释放掉,parkAndCheckInterrupt() 方法才会从阻塞状态被唤醒,从而执行橙黄色框内的方法:取消当前的哨兵节点的引用(未来将被GC),把当前节点设为新的哨兵节点,并开始占有锁(该方法在后文解锁时才会执行,具体分析见下文)。


shouldParkAfterFailedAcquire() 方法:

image-20210927115459331

当经过 shouldParkAfterFailedAcquire() 方法将前一个节点的 waitStatus 设置为 Node.SIGNAL=-1 (代表等待锁释放的状态)后,再执行 parkAndCheckInterrupt() 方法阻塞当前线程,本质是使用 LockSupport.park() 方法:

image-20210927101303519

调用该方法后,当前线程将阻塞在此,等待前面所有线程执行完毕后被 LockSupport.unpark() 方法唤醒。


经过上面的分析,每个后来的线程都会在队列中排队并阻塞等待锁的释放。下面开始介绍解锁时的流程:

AQS 解锁过程源码分析

1. sync.release(1):外部程序尝试解锁

image-20210927102025829

在主程序调用 ReentrantLockunlock() 方法释放锁后,ReentrantLock对象将调用内聚的公平锁或非公平锁sync对象的release()方法,该方法在AQS父类中:

该方法为 AQS 父类中的方法,为公平锁和非公平锁公用

image-20210927150103000

  • 首先调用 tryRelease() 方法尝试释放锁,其先将AQS中的state减一(代表释放一个锁),再将AQS中的占有线程设置为 null
  • 接着获取哨兵节点,若其不为空且 waitStatus != 0,说明其后已经有了在排队的线程,该线程将waitStatus设置为-1,见上文 【6. acquireQueued():阻塞当前线程】分析
  • 满足 if 判断后调用 unparkSuccessor(h) 方法,该方法内将调用 LockSupport.unpark(s.thread) 方法将排在队首的线程唤醒。

接下来将介绍这两个方法的内容:

2. tryRelease():尝试释放锁

tryRelease() 方法内定义在AQS父类中,使用了模板方法的设计模式, tryRelease() 方法中只有 throw new UnsupportedOperationException();,说明如果子类不重写代码时被调用就会抛出异常:

image-20210927150608798

公平锁和非公平锁重写了该方法,并且内容一致(不同于tryAcquire方法子类重写的不同):

image-20210927103216804

  • 首先将AQS中的 state 值减一
  • 然后判断当前线程和拥有锁的线程是否是同一个线程,若不是,则抛出异常IllegalMonitorStateException
  • 若是同一个线程,则判断减一后的state是否为0,若是,则代表目前没有线程正占有着锁。则将占有锁的线程设置为 null

该方法执行完毕后,状态量 state 减少了一,并且占有锁的线程被设置为了 null

3. unparkSuccessor(h):唤醒队首线程

接着将哨兵节点 h 传入 unparkSuccessor(h) 方法:

image-20210927152330860

该方法将调用 LockSupport.unpark(s.thread) 方法将排在队首的线程唤醒,此时将分析上文中阻塞等待的方法【6. acquireQueued():阻塞当前线程】:

image-20210927154354074

根据上文分析,此时队首的线程阻塞在绿色框内的 shouldParkAfterFailedAcquire() 方法:

image-20210927101303519

那么在唤醒了该线程后,程序继续向下执行,程序没有被中断过,所以 return Thread.interrupted() 将返回 false。

接着再次自旋后执行橙黄色框的 if 判断,此时 p == head 条件满足,则执行 tryAcquire(arg) 尝试抢占锁。因为此时的锁已经被解开了,当前队首线程就能够抢占成功,则进入橙黄色框内的代码:

该代码块的作用是:取消当前的哨兵节点的引用,从队列中移除(未来将被GC),并把当前节点设为新的哨兵节点,并开始占有锁。

该过程示意图:

img