【JUC】JUC 常用锁

JUC 简介

JUC 就是 java.util .concurrent 工具包的简称。这是一个处理线程的工具包,JDK 1.5开始出现的。

可重入锁

synchronizedLockReentrantLock)都是可重入锁(又称为递归锁)。synchronized是隐式锁,不用手工上锁与解锁,而 LockReentrantLock)为显式锁,需要手工上锁与解锁。

可重入锁作用:避免某一对象递归调用同一方法时产生死锁,一个线程拿到了外层的锁就可以无视内部的锁。前提:必须是同一把锁

有了可重入锁之后,破解第一把锁之后就可以一直进入到内层结构,直接无视内部的其他锁。也就是说,线程可以进入任何一个它已经拥有的锁所同步着的代码块(只能进入同一把锁的同步代码块)。

可重入的原理:AQS 里每次获取锁的时候,看下当前维护的那个线程和当前请求的线程是否一样,一样就可重入了

使用案例

下面分别为synchronizedLock锁的演示。注意:他们加的是同一把锁。若锁不同,还是无法进入。

1
2
3
4
5
6
7
8
9
10
11
12
Object o = new Object();
new Thread(()->{
synchronized(o) {
System.out.println(Thread.currentThread().getName() + " 外层");
synchronized (o) {
System.out.println(Thread.currentThread().getName() + " 中层");
synchronized (o) {
System.out.println(Thread.currentThread().getName() + " 内层");
}
}
}
},"t1").start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class SyncLockDemo {
public synchronized void add() {
add();
}

public static void main(String[] args) {
// Lock演示可重入锁
Lock lock = new ReentrantLock();
// 创建线程
new Thread(()->{
try {
// 上锁
lock.lock();
System.out.println(Thread.currentThread().getName()+" 外层");
try {
// 上锁
lock.lock();
System.out.println(Thread.currentThread().getName()+" 内层");
}finally {
// 释放锁
lock.unlock();
}
}finally {
// 释放锁
lock.unlock();
}
},"t1").start();
// 创建新线程
new Thread(()->{
lock.lock();
System.out.println("aaaa");
lock.unlock();
},"aa").start();
}
}

锁的配对

锁之间要配对,加了几把锁,最后就得解开几把锁,下面的代码编译和运行都没有任何问题。但锁的数量不匹配会导致死循环。

1
2
3
4
5
6
7
8
lock.lock();
lock.lock();

try{
someAction();
}finally{
lock.unlock();
}

锁超时

使用 tryLock() 方法将在加锁失败时放弃继续加锁,因此可以避免死锁的发生。使用 tryLock() 方法可以解决哲学家进餐问题。

超时等待模式的 tryLock() 使用保护性暂停模式,在其他线程调用 lock.unlock() 时会立刻通知调用 tryLock() 阻塞的线程,使其再次尝试加锁(此时该线程还在等待时间范围内阻塞着)。

  1. 尝试加锁失败时立即停止加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
if (!lock.tryLock()) {
log.debug("获取立刻失败,返回");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(2);
} finally {
lock.unlock();
}
  1. 尝试加锁失败时等待一段时间,若仍失败则再停止
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("获取等待 1s 后失败,返回");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(2);
} finally {
lock.unlock();
}

synchronized 和 Lock 的区别

  • 原始构成sync是JVM层面的,底层通过monitorentermonitorexit来实现的。Lock是JDK API层面的。(sync一个enter会有两个exit,一个是正常退出,一个是异常退出)
  • 使用方法sync不需要手动释放锁(若发生异常也会自动释放),而Lock需要手动释放unlock()(通常写在finally代码块中防止发生异常无法释放同步监视器)
  • 是否可中断sync不可中断,除非抛出异常或者正常运行完成。Lock是可中断的(lock.lockInterruptbily()),通过调用interrupt()方法可以打断锁,会抛出异常。
  • 是否为公平锁sync只能是非公平锁,而Lock既能是公平锁,又能是非公平锁。
  • 绑定多个条件sync不能,只能随机唤醒。而Lock可以通过Condition来绑定多个条件,精确唤醒。
  • 是否可以设置超时时间sync不能,而Lock可以设置超时时间,使用tryLock()方法尝试加锁失败后就放弃加锁

Lock 的优势:

  • 可设置为可以被打断 lock.lockInterruptbily()
  • 可以设置超时时间
  • 可以设置条件变量Condition,精确唤醒

优先使用顺序:Lock——> 同步代码块(已经进入了方法体,分配了相应资源)——> 同步方法(在方法体之外)

  • 在高并发场景下,Lock 的性能远优于 synchronized
  • 在并发度不高的场景下,sync 效率更高(得益于偏向锁和轻量级锁的设计)

公平锁与非公平锁

概念:所谓公平锁,就是多个线程按照申请锁的顺序来获取锁,类似排队,先到先得。而非公平锁,则是多个线程抢夺锁,会导致优先级反转饥饿现象

区别:公平锁在获取锁时先查看此锁维护的等待队列为空或者当前线程是等待队列的队首,则直接占有锁,否则插入到等待队列,FIFO原则。非公平锁比较粗鲁,上来直接先尝试占有锁,失败则采用公平锁方式。非公平锁的优点是吞吐量比公平锁更大。

  • 公平锁:效率相对低,线程不会饿死
  • 非公平锁:效率高,但线程可能饿死

非公平锁可能造成其他线程饿死的情况,即某一个线程拿着锁一直工作,其他线程一直拿不到锁。

synchronizedReentrantLock默认都是非公平锁ReentrantLock在构造的时候传入true则是公平锁

1
ReentrantLock lock = new ReentrantLock(true);

不传参时,使用的是非公平锁;传入true时使用的是公平锁:

image-20210917094529785

FairSync 内:

image-20210917094646045

一般没必要设置为公平锁,因为其会降低并发度

自旋锁

SpinLock:所谓自旋锁,就是尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取。自己在那儿一直循环获取,就像“自旋”一样。这样的好处是减少线程切换的上下文开销,缺点是会消耗CPU。CAS底层的getAndAddInt就是自旋锁思想。

提到了互斥同步对性能最大的影响阻塞的实现,挂起线程和恢复线程的操作都需要转入内核态完成,这些操作给系统的并发性能带来了很大的压力。同时,虚拟机的开发团队也注意到在许多应用上,共享数据的锁定状态只会持续很短的一段时间,为了这段时间去挂起和恢复线程并不值得。如果物理机器有一个以上的处理器,能让两个或以上的线程同时并行执行,我们就可以让后面请求锁的那个线程 “稍等一下”,但不放弃处理器的执行时间,看看持有锁的线程是否很快就会释放锁。为了让线程等待,我们只需让线程执行一个忙循环(自旋),这项技术就是所谓的自旋锁。

《深入理解JVM.2nd》Page 398

1
2
// 跟CAS类似,一直循环比较。
while (!atomicReference.compareAndSet(null, thread)) { }

基于原子引用手写一个自旋锁 AtomicReference<Thread>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class SpinLockDemo {
// 现在的泛型装的是Thread,原子引用线程
AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void myLock() {
// 获取当前进来的线程
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t come in ");

// 开始自旋,期望值是null,更新值是当前线程,如果是null,则更新为当前线程,否者自旋
while(!atomicReference.compareAndSet(null, thread)) {
//摸鱼
}
}

public void myUnLock() {
// 获取当前进来的线程
Thread thread = Thread.currentThread();

// 自己用完了后,把atomicReference变成null
atomicReference.compareAndSet(thread, null);

System.out.println(Thread.currentThread().getName() + "\t invoked myUnlock()");
}

public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();

// 启动t1线程,开始操作
new Thread(() -> {

// 开始占有锁
spinLockDemo.myLock();

try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 开始释放锁
spinLockDemo.myUnLock();

}, "t1").start();


// 让main线程暂停1秒,使得t1线程,先执行
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 1秒后,启动t2线程,开始占用这个锁
new Thread(() -> {

// 开始占有锁
spinLockDemo.myLock();
// 开始释放锁
spinLockDemo.myUnLock();

}, "t2").start();
}
}

自旋锁:竞争锁的失败的线程,并不会真实的在操作系统层面挂起等待,而是JVM会让线程做几个空循环(基于预测在不久的将来就能获得),在经过若干次循环后,如果可以获得锁,那么进入临界区,如果还不能获得锁,才会真实的将线程在操作系统层面进行挂起。

适用场景:自旋锁可以减少线程的阻塞,这对于锁竞争不激烈,且占用锁时间非常短的代码块来说,有较大的性能提升,因为自旋的消耗会小于线程阻塞挂起操作的消耗。
如果锁的竞争激烈,或者持有锁的线程需要长时间占用锁执行同步块,就不适合使用自旋锁了,因为自旋锁在获取锁前一直都是占用cpu做无用功,线程自旋的消耗大于线程阻塞挂起操作的消耗,造成cpu的浪费。

作者回复: 不错,自旋是种乐观情况的优化

读写锁

读写锁设计主要解决什么问题?用于读多写少情况下读操作不会阻塞等待提高并发性

读锁共享的写锁独占的ReentrantLocksynchronized都是独占锁,独占锁就是一个锁只能被一个线程所持有。有的时候,需要读写分离,那么就要引入读写锁,即ReentrantReadWriteLock

读写锁 ReentrantReadWriteLock:一个资源可以被多个读线程同时访问,也可以单独被一个写线程访问,但不能同时存在读写线程,读写互斥,读读共享。

读写锁用于保证所有线程一定能读取到最新数据,用法是:在改数据时加写锁,在读数据时加读锁。修改数据期间:

  • 写锁是一个独占锁(互斥锁/排他锁)
  • 读锁是一个共享锁

写锁如果没有释放,读锁就得一直阻塞等待。

  • 创建读锁:ReentrantReadWriteLock.readLock()
  • 创建写锁:ReentrantReadWriteLock.writeLock()

读写锁有以下三个重要的特性:

  • 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
  • 重进入:读锁和写锁都支持线程重进入。
  • 锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。

具体案例:模拟多线程在 Map 中取数据和读数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 资源类
class MyCache {
// 创建map集合
private volatile Map<String,Object> map = new HashMap<>();

// 创建读写锁对象
private ReadWriteLock rwLock = new ReentrantReadWriteLock();

// 放数据
public void put(String key,Object value) {
// 添加写锁
rwLock.writeLock().lock();

try {
System.out.println(Thread.currentThread().getName()+" 正在写操作"+key);
// 暂停一会
TimeUnit.MICROSECONDS.sleep(300);
// 放数据
map.put(key,value);
System.out.println(Thread.currentThread().getName()+" 写完了"+key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放写锁
rwLock.writeLock().unlock();
}
}

// 取数据
public Object get(String key) {
// 添加读锁
rwLock.readLock().lock();
Object result = null;
try {
System.out.println(Thread.currentThread().getName()+" 正在读取操作"+key);
// 暂停一会
TimeUnit.MICROSECONDS.sleep(300);
result = map.get(key);
System.out.println(Thread.currentThread().getName()+" 取完了"+key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放读锁
rwLock.readLock().unlock();
}
return result;
}
}

public class ReadWriteLockDemo {
public static void main(String[] args) throws InterruptedException {
MyCache myCache = new MyCache();
// 创建线程放数据
for (int i = 1; i <=5; i++) {
final int num = i;
new Thread(()->{
myCache.put(num+"",num+"");
}, String.valueOf(i)).start();
}

TimeUnit.MICROSECONDS.sleep(300);

// 创建线程取数据
for (int i = 1; i <=5; i++) {
final int num = i;
new Thread(()->{
myCache.get(num+"");
}, String.valueOf(i)).start();
}
}
}

四种不同情况下的锁(两个线程并发情况下):

  • 读 + 读:相当于无锁,并发读。
  • 写 + 读:读锁阻塞等待写锁释放才能读
  • 写 + 写:后来的写锁阻塞等待前一个写锁释放才能写
  • 读 + 写:后来的写锁阻塞等待其他读锁释放才能写(防止前一个线程还没读到数据就被后来的线程修改了)

只要有写锁的存在,其他线程无论是先写还是后写,都必须阻塞等待。

读写锁的缺点:

  • 可能造成锁饥饿:大量的读操作会让写操作一直阻塞等待
  • 锁降级:某个线程先上写锁,再上读锁,执行完业务后先释放写锁,再释放读锁。这样外部看来就是写锁降级退化成了读锁。反过来却不行,先加读锁后不能再加写锁。

总结

  • 在线程持有读锁的情况下,该线程和其他线程均不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)
  • 在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败),但其他线程获取读锁

原因:当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。

倒计时锁 CountDownLatch

CountDownLatch 常用来进行线程间的同步协作,例如主线程等待所有线程都完成任务后再执行

CountDownLatch类可以设置一个计数器,然后通过 countDown() 方法来进行减 1 的操作,使用 await() 方法等待计数器不大于 0,然后继续执行 await() 方法之后的语句。具体步骤可以演化为定义一个类,减1操作,并等待到0,为0执行结果。其缺点是,只能在构造方法里设置 count,一旦用完后不能再重置了

该类的构造方法为:CountDownLatch(int count)

两个常用的主要方法:

  • await():使当前线程在锁存器倒计数至零之前一直在等待,除非线程被中断
  • countDown():递减锁存器的计数,如果计数达到零,将释放所有等待的线程

其应用场景:

  • 同步等待多个线程准备完毕:例如游戏开始准备。主线程调用 await() 方法阻塞等待其他线程(所有玩家线程)都准备完毕,各个线程在准备完毕后调用 countDown() 方法。主线程等到所有线程都调用完毕后才被唤醒
  • 同步等待多个远程调用完毕:例如 RPC 调用。主线程调用 await() 方法阻塞等待其他线程(RPC 调用线程)都完成远程调用后再执行。异步编排的 CompletableFuture 类就采用这种思想。

该类的缺点:只能得知其他线程都执行完毕,但无法得到这些线程的执行结果,若必须获得返回结果,可以使用 FutureCompletableFutrue

具体案例:6个同学陆续离开教室之后,班长才能锁门。如果不加 CountDownLatch 类,会出现线程混乱执行,同学还未离开教室班长就已经锁门了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class CountDownLatchDemo {
//6 个同学陆续离开教室之后,班长锁门
public static void main(String[] args) throws InterruptedException {
// 创建CountDownLatch对象,设置初始值
CountDownLatch countDownLatch = new CountDownLatch(6);

// 6个同学陆续离开教室之后
for (int i = 1; i <=6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" 号同学离开了教室");
// 计数-1
countDownLatch.countDown();
}, String.valueOf(i)).start();
}

// 阻塞等待直到计数器中的数为0
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+" 班长锁门走人了");
}
}

循环栅栏 CyclicBarrier

CyclicBarrier 也是用来进行线程间的同步协作,作用与 CountDownLatch 非常相似,区别在于:

  • CyclicBarriercount 值是可以重置的,而 CountDownLatch 则不行,因此得名为循环栅栏
  • CyclicBarriercount 是从零开始增加直到临界点,而 CountDownLatchcount 是从最大值减小到零

二者的侧重点不同:

  • CountDownLatch 一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行
  • CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行
  • CountDownLatch 是不能够重用的,而 CyclicBarrier 是可以重用的

CyclicBarrier 可以被比喻为『人满发车』

CyclicBarrier 类是一个同步辅助类,允许一组线程互相等待,直至到达某个公共屏障点,在设计一组固定大小的线程的程序中,这些线程必须互相等待直至达到公共屏障点。这个类很有用,因为barrier在释放等待线程后可以重用,所以称为循环barrier。

创建一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动barrier时执行给定的屏障操作,该操作由最后一个进入barrier的线程操作。

常用的构造方法:CyclicBarrier(int parties,Runnable barrierAction)

常用的方法有:await():在所有的线程都已经在此barrier上调用 await() 方法之前一直等待

案例

案例一:集齐7颗龙珠就可以召唤神龙

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 集齐7颗龙珠就可以召唤神龙
public class CyclicBarrierDemo {
// 创建固定值
private static final int NUMBER = 7;

public static void main(String[] args) {
// 创建CyclicBarrier,只有在7个线程都调用了await()方法才会执行其事先设置好的代码
CyclicBarrier cyclicBarrier =
new CyclicBarrier(NUMBER, ()->{
System.out.println("***** 集齐7颗龙珠了!召唤神龙! *****");
});

// 集齐七颗龙珠过程
for (int i = 1; i <=7; i++) {
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" 星龙被收集到了");
// 等待
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}

上述代码中,只有在7个线程都调用了 await() 方法才会执行 CyclicBarrier对象事先设置好的代码“召唤神龙”。

案例二:来自《Java编程思想》的例子,展现CyclicBarrier的可循环性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import java.util.concurrent.*;
import java.util.*;

class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;

public Horse(CyclicBarrier b) {
barrier = b;
}

public synchronized int getStrides() {
return strides;
}

public void run() {
try {
while (!Thread.interrupted()) {//没有中断,就不断循环
synchronized (this) {
//模拟马单位时间的移动距离
strides += rand.nextInt(3); // Produces 0, 1 or 2
}
barrier.await();//<---等待其他马到齐到循环屏障
}
} catch (InterruptedException e) {
// A legitimate way to exit
} catch (BrokenBarrierException e) {
// This one we want to know about
throw new RuntimeException(e);
}
}

public String toString() {
return "Horse " + id + " ";
}

public String tracks() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < getStrides(); i++)
s.append("*");
s.append(id);
return s.toString();
}
}

public class HorseRace {
static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<Horse>();
private ExecutorService exec = Executors.newCachedThreadPool();
private CyclicBarrier barrier;

public HorseRace(int nHorses, final int pause) {
//初始化循环屏障
barrier = new CyclicBarrier(nHorses, new Runnable() {
// 循环多次执行的任务
public void run() {

// The fence on the racetrack
StringBuilder s = new StringBuilder();
for (int i = 0; i < FINISH_LINE; i++)
s.append("=");
System.out.println(s);

//打印马移动距离
for (Horse horse : horses)
System.out.println(horse.tracks());

//判断有没有马到终点了
for (Horse horse : horses)
if (horse.getStrides() >= FINISH_LINE) {
System.out.println(horse + "won!");
exec.shutdownNow();// 有只马跑赢了,所有任务都结束了
return;
}

try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch (InterruptedException e) {
System.out.println("barrier-action sleep interrupted");
}
}
});
// 开跑!
for (int i = 0; i < nHorses; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}

public static void main(String[] args) {
int nHorses = 7;
int pause = 200;
new HorseRace(nHorses, pause);
}
}

总结CyclicBarrier的构造方法第一个参数是目标障碍数,每执行一次 await() 方法障碍数会加一,如果达到了目标障碍数,才会执行事先设置好的代码“召唤神龙”。

信号量 Semaphore

Semaphore 本质上也是使用 AQS 实现的,可以理解为是一个支持多个线程同时占有锁ReentrantLock

信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。正常的锁在任何时刻都只允许一个任务访问一项资源,而 Semaphore 允许n个任务同时访问这个资源。

使用信号量可以实现:限制线程数量限流,保证同一时刻只能有有限个线程工作。

从概念上讲,信号量维护了一个许可集(车位集),如有必要,在许可可用前会阻塞每一个acquire()(没车位了就阻塞等待别人释放车位),然后再获取该许可。每个 release() 添加一个许可(释放一个车位),从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

具体常用的构造方法:new Semaphore(3) 设置许可数量为3(3个车位)

具体常用的方法有:

  • acquire():从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,除非线程被中断
  • release():释放一个许可,将其返回给信号量

一般 acquire() 都会抛出异常,release()在 finally 代码块中执行。

CountDownLatch的问题是不能复用。比如count=3,那么加到3,就不能继续操作了。而Semaphore可以解决这个问题,比如6辆车3个停车位,对于CountDownLatch只能停3辆车,而Semaphore可以停6辆车,车位空出来后,其它车可以占有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 6辆汽车,停3个车位
public class SemaphoreDemo {
public static void main(String[] args) {
// 创建Semaphore,设置许可数量3(3个车位)
Semaphore semaphore = new Semaphore(3);

// 模拟6辆汽车
for (int i = 1; i <=6; i++) {
new Thread(()->{
try {
// 抢占一个许可(车位)
semaphore.acquire();

System.out.println(Thread.currentThread().getName()+" 抢到了车位");
// 设置随机停车时间
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName()+" ------离开了车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放这个许可(车位)
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}

当前共有三个车位(三个许可)。之后三个线程先后获取许可0,许可量为0,代表没有多余的许可给其他线程了。此后再想来获取许可的线程会阻塞等待其他线程释放许可,才能继续获取到许可。

Semaphore 原理

Semaphore 本质上也是使用 AQS 实现的,可以理解为是一个支持多个线程同时占有锁ReentrantLock。该类内部包含一个 AQS 的同步器类:

image-20220216163034024

其实现原理为:将构造方法里设置的信号量设置到 AQS 同步器类 Syncstate 属性中,使得最多只能有 N 个线程能抢占到锁(此时 state 减少为 0),其余的线程都使用 AQS 中的阻塞机制进入双向链表阻塞队列中等待,直到这些占有锁的线程释放锁,使得 state 大于 0,并唤醒阻塞队列中的线程,重新竞争锁。因此该流程和 AQS 非常相似。

  1. 初始状态:state = 3,代表可以有三个线程同时占有锁,其他线程都得进入阻塞队列

image-20220216162630701

  1. 其他线程都得进入阻塞队列

image-20220216163737770

  1. 某个时刻 Thread-4 释放了 permits,令 state = 1,并唤醒队列中的 Thread-0

image-20220216163817049

  1. Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark()
    下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态 (该过程属于 AQS 原理)

image-20220216163940159

总结

  • CountDownLatchCyclicBarrier 都能够实现线程之间的等待,只不过它们侧重点不同:
    • CountDownLatch 一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行
    • CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行
    • CountDownLatch 是不能够重用的,而 CyclicBarrier 是可以重用的
  • Semaphore 其实和锁有点类似,它一般用于控制对某组资源的访问权限,使得同时只能有有限个线程访问资源

阻塞队列 BlockingQueue

img

BlockingQueue 简介

JUC 线程池的底层应用了阻塞队列存储排队的线程

JUC 包中,BlockingQueue 很好地解决了多线程中如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

阻塞队列是共享队列(多线程操作),一端输入,一端输出。不能无限放队列,满了之后就会进入阻塞,取出也同理

image-20210917175644768

  • 当队列是空的,从队列中获取元素的操作将会被阻塞
  • 当队列是满的,从队列中添加元素的操作将会被阻塞
  • 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
  • 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起。

多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。

  • 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列
  • 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒

阻塞队列(BlockingQueue)是 java util.concurrent 包下重要的数据结构,BlockingQueue 提供了线程安全的队列访问方式:

  • 当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;
  • 从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。

并发包下很多高级同步类的实现都是基于 BlockingQueue 实现的。

阻塞队列的阻塞功能是通过 lock 锁的**条件变量(Condition)**阻塞控制实现的

  • 生产者在队列满或消费者在队列空时使用 condition.await() 方法 + while(true) 阻塞等待
  • 生产者在消费数据后消费者在在添加数据后使用 condition.signal() 唤醒阻塞线程

使用阻塞队列可以实现消息队列(生产者-消费者模型)


为什么需要 BlockingQueue?

在JUC包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。使用后我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了。

体系CollectionQueueBlockingQueue → 七个阻塞队列实现类。

种类

类名 作用
ArrayBlockingQueue 数组构成的有界阻塞队列
LinkedBlockingQueue 链表构成的有界阻塞队列
PriorityBlockingQueue 支持优先级排序的无界阻塞队列
DelayQueue 支持优先级的延迟无界阻塞队列
SynchronousQueue 单个元素的阻塞队列
LinkedTransferQueue 由链表构成的无界阻塞队列
LinkedBlockingDeque 由链表构成的双向阻塞队列

粗体标记的三个用得比较多,许多消息中间件底层就是用它们实现的

需要注意的是LinkedBlockingQueue虽然是有界的,但有个巨坑,其默认大小是Integer.MAX_VALUE,高达21亿,一般情况下内存早爆了(在线程池的ThreadPoolExecutor有体现)。

1. ArrayBlockingQueue

数组结构组成的有界阻塞队列。ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,无法并行。

2. LinkedBlockingQueue

链表结构组成的有界阻塞队列(但大小默认值为integer.MAX_VALUE)。

之所以能够高效地处理并发数据,是因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

3. DelayQueue

使用优先级队列实现的延迟无界阻塞队列。

DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞

4. PriorityBlockingQueue

支持优先级排序无界阻塞队列。不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。

5. SynchronousQueue

不存储元素的阻塞队列,也即单个元素的队列。一种无缓冲的等待队列。相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区)。

声明一个 SynchronousQueue 有两种不同的方式,它们之间有着不太一样的行为。

公平模式和非公平模式的区别:

  • 公平模式:SynchronousQueue会采用公平锁,并配合一个 FIFO 队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
  • 非公平模式(默认):SynchronousQueue采用非公平锁,同时配合一个 LIFO 队列来管理多余的生产者和消费者

后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现即饥饿的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理

6. LinkedTransferQueue

链表结构组成的无界阻塞队列。

预占模式:意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,生成一个节点(节点元素为 null)入队,消费者线程被等待在这个节点上,生产者线程入队时发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。

7. LinkedBlockingDeque

链表结构组成的双向阻塞队列。阻塞有两种情况:

  • 插入元素时:如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再该元素插入,该操作可以通过设置超时参数,超时后返回 false 表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出 InterruptedException 异常
  • 读取元素时:如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可以通过设置超时参数

常用方法

img

创建阻塞队列 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

抛出异常类型

执行该类型方法时,若阻塞队列已满/空,再向队列中插入/移除时将抛出异常

  • 加入元素:blockingQueue.add("a")
  • 检查元素:blockingQueue.element()
  • 取出元素:blockingQueue.remove()

特殊值类型

执行该类型方法时,若阻塞队列已满/空,再向队列中插入/移除时,成功返回true,失败返回false

  • 加入元素:blockingQueue.offer("a")
  • 检查元素:blockingQueue.peek()
  • 取出元素:blockingQueue.poll()

阻塞类型

执行该类型方法时,若阻塞队列已满/空,再向队列中插入/移除时,线程进入阻塞状态

  • 加入元素:blockingQueue.put("a");
  • 取出元素:blockingQueue.take()

超时类型

执行该类方法时,若阻塞队列已满/空,再向队列中插入/移除时,线程进入阻塞状态,但等待一段时间后还无法插入/移除数据时,线程将退出

  • 加入元素:blockingQueue.offer("w", 3L, TimeUnit.SECONDS)
  • 取出元素:blockingQueue.poll( 3L, TimeUnit.SECONDS)

SynchronousQueue

队列只有一个元素,如果想插入多个,必须等队列元素取出后,才能插入,只能有一个“坑位”,用一个插一个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package concurrent.blockingqueue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<String>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + "\t put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + "\t put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "AAA").start();
new Thread(() -> {
try {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t take " + blockingQueue.take());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t take " + blockingQueue.take());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t take" + blockingQueue.take());
} catch (Exception e) {
e.printStackTrace();
}
}, "BBB").start();
}
}

阻塞队列的应用——生产者消费者

传统模式

传统模式使用Lock来进行操作,需要手动加锁、解锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class ProdConsTradiDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Producer").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Consumer").start();
}
}

class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

public void increment() throws InterruptedException {
lock.lock();
try {
//1 判断
while (number != 0) {
//等待,不能生产
condition.await();
}
//2 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3 通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void decrement() throws InterruptedException {
lock.lock();
try {
//1 判断
while (number == 0) {
//等待,不能消费
condition.await();
}
//2 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3 通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

阻塞队列模式

使用阻塞队列就不需要手动加锁了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package concurrent.blockingqueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class MyResource {
// 默认开启,进行生产消费
// 这里用到了volatile是为了保持数据的可见性,也就是当TLAG修改时,要马上通知其它线程进行修改
private volatile boolean FLAG = true;

// 使用原子包装类,而不用number++
private AtomicInteger atomicInteger = new AtomicInteger();

// 这里不能为了满足条件,而实例化一个具体的SynchronousBlockingQueue
BlockingQueue<String> blockingQueue = null;

// 而应该采用依赖注入里面的,构造注入方法传入
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
// 查询出传入的class是什么
System.out.println(blockingQueue.getClass().getName());
}

/**
* 生产
* @throws Exception
*/
public void myProd() throws Exception{
String data = null;
boolean retValue;
// 多线程环境的判断,一定要使用while进行,防止出现虚假唤醒
// 当FLAG为true的时候,开始生产
while(FLAG) {
data = atomicInteger.incrementAndGet() + "";

// 2秒存入1个data
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if(retValue) {
System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "成功" );
} else {
System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "失败" );
}

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.println(Thread.currentThread().getName() + "\t 停止生产,表示FLAG=false,生产介绍");
}

/**
* 消费
* @throws Exception
*/
public void myConsumer() throws Exception{
String retValue;
// 多线程环境的判断,一定要使用while进行,防止出现虚假唤醒
// 当FLAG为true的时候,开始生产
while(FLAG) {
// 2秒消耗1个data
retValue = blockingQueue.poll(2L, TimeUnit.SECONDS);
if(retValue != null && retValue != "") {
System.out.println(Thread.currentThread().getName() + "\t 消费队列:" + retValue + "成功" );
} else {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t 消费失败,队列中已为空,退出" );

// 退出消费队列
return;
}
}
}

/**
* 停止生产的判断
*/
public void stop() {
this.FLAG = false;
}

}
public class ProdConsumerBlockingQueueDemo {

public static void main(String[] args) {
// 传入具体的实现类, ArrayBlockingQueue
MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));

new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 生产线程启动");
System.out.println("");
System.out.println("");
try {
myResource.myProd();
System.out.println("");
System.out.println("");
} catch (Exception e) {
e.printStackTrace();
}
}, "prod").start();


new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");

try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "consumer").start();

// 5秒后,停止生产和消费
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("5秒中后,生产和消费线程停止,线程结束");
myResource.stop();
}
}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java.util.concurrent.ArrayBlockingQueue
producer 生产线程启动


consumer 消费线程启动
producer 插入队列:1成功
consumer 消费队列:1成功
producer 插入队列:2成功
consumer 消费队列:2成功
producer 插入队列:3成功
consumer 消费队列:3成功
producer 插入队列:4成功
consumer 消费队列:4成功
producer 插入队列:5成功
consumer 消费队列:5成功

5秒后,生产和消费线程停止,线程结束
producer 停止生产,表示FLAG=false,生产结束

consumer 消费失败,队列中已为空,退出