【JUC】AQS 源码分析
AQS 理论
AQS:AbstractQueuedSynchronizer
抽象的队列同步器。
AQS是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石, 通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量state
表示持有锁的状态。
AQS 中大量应用了 CAS + Unsafe 保证自旋原子性地添加节点。
- 每次在抢占锁的时候都会CAS,因为可能其他非公平锁并发地抢占了锁,那么当前线程就需要再次自旋尝试获取锁。
- 每次在修改时(例如添加修改节点时)都使用
Unsafe
类的native
方法保证修改的原子性。
CLH:Craig、Landin and Hagersten队列(三位科学家的名字简写),是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO。
JUC的locks
包下:
- AbstractOwnableSynchronizer
- AbstractQueuedLongSynchronizer
- AbstractQueuedSynchronizer
AQS是一个抽象的父类,可以将其理解为一个框架。基于AQS这个框架,我们可以实现多种同步器,比如下方图中的几个Java内置的同步器。同时我们也可以基于AQS框架实现我们自己的同步器以满足不同的业务场景需求。
AQS中使用了模板方法的设计模式,AQS父类中的 tryAcquire()
方法和 tryRelease()
方法中只有 throw new UnsupportedOperationException();
,说明如果子类不重写代码时被调用就会抛出异常:
AQS 能干嘛?
加锁会导致阻塞:有阻塞就需要排队,实现排队必然需要有某种形式的队列来进行管理
1、抢到资源的线程直接使用办理业务,抢占不到资源的线程的必然涉及一种排队等候机制,抢占资源失败的线程继续去等待(类似办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。
2、既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
3、如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node) ,通过CAS、自旋以及LockSuport.park()
的方式,维护state
变量的状态,使并发达到同步的效果。
AQS 结构
AQS使用一个volatile
的int
类型的成员变量state
来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,将每条要去抢占资源的线程封装成一个Node
,节点来实现锁的分配,通过CAS完成对state
值的修改。
1 | public abstract class AbstractQueuedSynchronizer |
四个比较重要的属性:
Node head
:指向哨兵节点Node tail
:指向尾结点,新来的线程将成为尾结点state
:代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁。这个值可以大于 1,是因为锁可以重入,每次重入都加上 1Thread exclusiveOwnerThread
:占有排它锁的线程,代表当前哪个线程正在占有着锁,后续抢占锁时将依据该值是否为null
来判断锁是否已经被占用;释放锁时会将该值设置为null
Node 类结构
Node 的数据结构其实就是 thread
+ waitStatus
+ pre
+ next
四个属性而已。
1 | static final class Node { |
AQS 队列基本结构
注意排队队列,不包括head(也就是后文要说的哨兵节点)。
CANCELLED 1 因为超时或中断设置为此状态,标志节点不可用
SIGNAL -1 处于此状态的节点释放资源时会唤醒后面的节点
CONDITION -2 处于条件队列里,等待条件成立(signal signalall) 条件成立后会置入获取资源的队列里
PROPAGATE -3 共享模式下使用,头节点获取资源时将后面节点设置为此状态,如果头节点获取资源后还有足够的资源,则后面节点会尝试获取,这个状态主要是为了共享状态下队列里足够多的节点同时获取资源
0 初始状态
Node 的 waitStatus 的含义
AQS 整体工作流程
本文将以 ReentrantLock
源码为例介绍AQS源码。
Lock
接口的实现类,基本都是通过聚合了一个队列同步器的子类完成线程访问控制的,即每个实现类内部都有一个AQS的子类 Sync:
同时 Sync 类有两个子类:FairSync、NonfairSync。继承关系:
加锁阶段流程示意图:
解锁阶段流程示意图:
AQS 加锁过程源码分析
1. 构造器中创建公平锁或非公平锁 sync
ReentrantLock
的对象在实例化时将根据构造器中传入参数的不同创建公平锁或非公平锁对象sync
:
公平锁与非公平锁的区别在于:公平锁在获取同步状态时多了一个限制条件:!hasQueuedPredecessors()
,其作用是在加锁时判断等待队列中前面是否已经有正在排队的线程。
hasQueuedPredecessors()
方法:
Predecessors:前辈
hasQueuedPredecessors()
中判断队列中是否前面还有正在排队的线程,导致公平锁和非公平锁的差异如下:
- 公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中,不会直接去抢占;
- 非公平锁:不管等待队列中是否有其他线程正在等待,直接尝试占有锁。
非公平锁的非公平体现在:其 lock()
方法一开始会先CAS尝试抢锁,若失败则进入队列中等待,这之后就和公平锁一样要按照顺序排队了。
2. sync.lock():外部程序尝试加锁
在主程序调用 ReentrantLock
的 lock()
方法尝试加锁后,ReentrantLock
对象将调用内聚的公平锁或非公平锁sync
对象的lock()
方法(公平锁和非公平锁的lock()
方法有所区别,体现在非公平锁会先尝试抢占一次锁):
此处以非公平锁 NonfairSync
为例分析 lock()
方法的细节:
首先,试图将状态量state
设置为1,此时是使用的CAS机制,底层调用Unsafe
类的 native
方法进行原子操作。
- 若设置成功,说明没有线程占有锁,则将当前线程设置为占有锁的线程。
- 若设置失败,说明已经有线程正在占有锁,则调用
acquire(1)
尝试再次抢占锁(因为可能有重入锁的情况发生,即当前线程再次重入该锁,自然是需要放行的),若还是抢占失败,则进入队列排队等待。
其中,compareAndSetState(0,1)
方法底层调用Unsafe
类的 native
方法进行原子操作:
3. acquire()
该方法为 AQS 父类中的方法,为公平锁和非公平锁公用
若设置失败,说明已经有线程正在占有锁,则调用 acquire(1)
尝试再次抢占锁。acquire()
方法内:
- 首先调用子类重写的
tryAcquire(arg)
方法尝试占有锁。若占有成功,则直接返回,无需执行后续操作 - 若占有失败,则调用
addWaiter()
方法将当前线程封装成Node
类型的节点,添加到等待队列(双向链表)的尾部 - 调用
acquireQueued()
方法将当前节点的前一个节点的waitStatus
设置为-1,并调用LockSupport.park()
方法阻塞当前线程
下面逐一分析上述三个方法的细节。
4. tryAcquire():尝试占有锁
首先以非公平锁为例,执行tryAcquire(arg)
方法时将调用非公平锁重写的 nonfairTryAcquire()
方法:
- 再次CAS,试图抢占锁。大多数情况还会抢占失败,但也可能在此时占有锁的线程恰好释放锁,那么当前线程就可以顺利抢占到锁,并将当前线程设置为占有锁的线程,并返回 true 代表尝试获取锁成功。这也是一种双重检验的思想(外面抢占一次失败,进来再抢占一次)。
- 如果当前是重入锁的情况(即当前线程和占有锁的线程是同一个线程,也代表了当前锁被同一个线程重入了),那么将状态量
state
大小增加,并返回 true 代表尝试获取锁成功。 - 若1、2两次判断都不成立,则返回 false 代表尝试获取锁失败。
若返回false,代表抢占锁失败,则进行下一步:addWaiter()
加入到等待队列(双向链表)中。
公平锁与非公平锁在 tryAcquire()
方法中的区别在于:公平锁在获取同步状态时多了一个限制条件:!hasQueuedPredecessors()
,其作用是在加锁时判断等待队列中前面是否已经有正在排队的线程。
因此公平锁的线程在抢占锁前会判断当前队列中是否已经有正在排队的线程,如果有就不会尝试抢占锁,直接进入队列排队等待。
5. addWaiter():添加到等待队列中
在addWaiter()
方法中,将当前线程封装成一个Node
节点,并添加到等待队列(双向链表)的队尾。将tail指向当前节点。
- 橙黄色区域的代码在
tail != null
时才会触发,代表当前队列中已经有了尾结点,说明当前线程不是第一个来排队的线程,无需再创建哨兵节点(head指向的头节点) - 粉色区域的代码在
tail = null
时触发,代表当前队列中还没有尾结点,说明当前线程是第一个来排队的线程,需要创建哨兵节点(head指向的头节点)
哨兵节点:又称为虚节点,傀儡节点。其内不存储任何信息,只是占位。真正有数据的节点是从第二个开始的。
粉色区域的 enq(node)
方法只会执行一次,用于创建哨兵节点(head指向的头节点):
其中,再次if == null
判断的目的是双重检验,防止刚进来时尾结点就恰好被其他线程设置了。else内的代码是为了在这种情况下将当前节点作为那个恰好加塞的节点的后驱节点。同时该过程采用CAS思想,一直自旋直到某一次成功将当前节点原子性地加入到队列中。
compareAndSetXxx()
:都是调用底层 Unsafe 类的 native 方法,保证修改的原子性。
该过程示意图:
订正:队列中B的
waitStatus
应该等于-1,因为在线程C排队时会将其前一个节点B的waitStatus
设置为 -1(waitStatus
的值将在下一步acquireQueued()
被设置)
6. acquireQueued():阻塞当前线程
当执行完 addWaiter()
后,当前节点就被加入到了队列中排队等待。
此时再调用 acquireQueued()
方法阻塞当前线程:
- 在 for 循环中每次都取出当前节点的前一个节点
p
- 判断
p
是否是哨兵节点,如果是,说明当前节点排在队首,那么尝试抢占一次锁。若抢占成功,取消当前的哨兵节点的引用(未来将被GC),把当前节点设为新的哨兵节点,并开始占有锁。退出 for 循环 - 若
p
不是哨兵节点或当前节点抢占失败,则调用shouldParkAfterFailedAcquire()
方法将前一个节点的waitStatus
设置为 -1,并在parkAndCheckInterrupt()
方法中调用LockSupport.park(this)
阻塞当前线程,直到其前面的线程都出队并且自己被unpark()
唤醒,才会取消阻塞再次进入 for 循环自旋尝试获取锁(因为可能其他非公平锁会在此时抢占锁,那么当前线程又要阻塞)。
上述过程将一直在 for 循环中自旋,直到当前节点前面所有的节点都出队,并且锁被释放掉,parkAndCheckInterrupt()
方法才会从阻塞状态被唤醒,从而执行橙黄色框内的方法:取消当前的哨兵节点的引用(未来将被GC),把当前节点设为新的哨兵节点,并开始占有锁(该方法在后文解锁时才会执行,具体分析见下文)。
shouldParkAfterFailedAcquire()
方法:
当经过 shouldParkAfterFailedAcquire()
方法将前一个节点的 waitStatus
设置为 Node.SIGNAL=-1
(代表等待锁释放的状态)后,再执行 parkAndCheckInterrupt()
方法阻塞当前线程,本质是使用 LockSupport.park()
方法:
调用该方法后,当前线程将阻塞在此,等待前面所有线程执行完毕后被 LockSupport.unpark()
方法唤醒。
经过上面的分析,每个后来的线程都会在队列中排队并阻塞等待锁的释放。下面开始介绍解锁时的流程:
AQS 解锁过程源码分析
1. sync.release(1):外部程序尝试解锁
在主程序调用 ReentrantLock
的 unlock()
方法释放锁后,ReentrantLock
对象将调用内聚的公平锁或非公平锁sync
对象的release()
方法,该方法在AQS父类中:
该方法为 AQS 父类中的方法,为公平锁和非公平锁公用
- 首先调用
tryRelease()
方法尝试释放锁,其先将AQS中的state减一(代表释放一个锁),再将AQS中的占有线程设置为null
- 接着获取哨兵节点,若其不为空且
waitStatus != 0
,说明其后已经有了在排队的线程,该线程将waitStatus
设置为-1,见上文 【6. acquireQueued():阻塞当前线程】分析 - 满足 if 判断后调用
unparkSuccessor(h)
方法,该方法内将调用LockSupport.unpark(s.thread)
方法将排在队首的线程唤醒。
接下来将介绍这两个方法的内容:
2. tryRelease():尝试释放锁
tryRelease()
方法内定义在AQS父类中,使用了模板方法的设计模式, tryRelease()
方法中只有 throw new UnsupportedOperationException();
,说明如果子类不重写代码时被调用就会抛出异常:
公平锁和非公平锁重写了该方法,并且内容一致(不同于tryAcquire
方法子类重写的不同):
- 首先将AQS中的
state
值减一 - 然后判断当前线程和拥有锁的线程是否是同一个线程,若不是,则抛出异常
IllegalMonitorStateException
- 若是同一个线程,则判断减一后的
state
是否为0,若是,则代表目前没有线程正占有着锁。则将占有锁的线程设置为null
该方法执行完毕后,状态量 state
减少了一,并且占有锁的线程被设置为了 null
3. unparkSuccessor(h):唤醒队首线程
接着将哨兵节点 h
传入 unparkSuccessor(h)
方法:
该方法将调用 LockSupport.unpark(s.thread)
方法将排在队首的线程唤醒,此时将分析上文中阻塞等待的方法【6. acquireQueued():阻塞当前线程】:
根据上文分析,此时队首的线程阻塞在绿色框内的 shouldParkAfterFailedAcquire()
方法:
那么在唤醒了该线程后,程序继续向下执行,程序没有被中断过,所以 return Thread.interrupted()
将返回 false。
接着再次自旋后执行橙黄色框的 if 判断,此时 p == head
条件满足,则执行 tryAcquire(arg)
尝试抢占锁。因为此时的锁已经被解开了,当前队首线程就能够抢占成功,则进入橙黄色框内的代码:
该代码块的作用是:取消当前的哨兵节点的引用,从队列中移除(未来将被GC),并把当前节点设为新的哨兵节点,并开始占有锁。
该过程示意图: