JUC 简介
JUC 就是 java.util .concurrent
工具包的简称。这是一个处理线程的工具包,JDK 1.5开始出现的。
可重入锁
synchronized
和 Lock
(ReentrantLock
)都是可重入锁(又称为递归锁)。synchronized
是隐式锁,不用手工上锁与解锁,而 Lock
(ReentrantLock
)为显式锁,需要手工上锁与解锁。
可重入锁作用:避免某一对象递归调用同一方法时产生死锁 ,一个线程拿到了外层的锁就可以无视内部的锁。前提:必须是同一把锁
有了可重入锁之后,破解第一把锁之后就可以一直进入到内层结构,直接无视内部的其他锁。也就是说,线程可以进入任何一个它已经拥有的锁所同步着的代码块(只能进入同一把锁的同步代码块)。
可重入的原理 :AQS 里每次获取锁的时候,看下当前维护的那个线程和当前请求的线程是否一样,一样就可重入了
使用案例
下面分别为synchronized
和 Lock
锁的演示。注意:他们加的是同一把锁。若锁不同,还是无法进入。
1 2 3 4 5 6 7 8 9 10 11 12 Object o = new Object(); new Thread(()->{ synchronized (o) { System.out.println(Thread.currentThread().getName() + " 外层" ); synchronized (o) { System.out.println(Thread.currentThread().getName() + " 中层" ); synchronized (o) { System.out.println(Thread.currentThread().getName() + " 内层" ); } } } },"t1" ).start();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class SyncLockDemo { public synchronized void add () { add(); } public static void main (String[] args) { Lock lock = new ReentrantLock(); new Thread(()->{ try { lock.lock(); System.out.println(Thread.currentThread().getName()+" 外层" ); try { lock.lock(); System.out.println(Thread.currentThread().getName()+" 内层" ); }finally { lock.unlock(); } }finally { lock.unlock(); } },"t1" ).start(); new Thread(()->{ lock.lock(); System.out.println("aaaa" ); lock.unlock(); },"aa" ).start(); } }
锁的配对
锁之间要配对,加了几把锁,最后就得解开几把锁,下面的代码编译和运行都没有任何问题。但锁的数量不匹配会导致死循环。
1 2 3 4 5 6 7 8 lock.lock(); lock.lock(); try { someAction(); }finally { lock.unlock(); }
锁超时
使用 tryLock()
方法将在加锁失败时放弃继续加锁,因此可以避免死锁的发生。使用 tryLock()
方法可以解决哲学家进餐问题。
超时等待模式的 tryLock()
使用保护性暂停模式 ,在其他线程调用 lock.unlock()
时会立刻通知调用 tryLock()
阻塞的线程,使其再次尝试加锁(此时该线程还在等待时间范围内阻塞着)。
尝试加锁失败时立即停止加锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 ReentrantLock lock = new ReentrantLock(); Thread t1 = new Thread(() -> { log.debug("启动..." ); if (!lock.tryLock()) { log.debug("获取立刻失败,返回" ); return ; } try { log.debug("获得了锁" ); } finally { lock.unlock(); } }, "t1" ); lock.lock(); log.debug("获得了锁" ); t1.start(); try { sleep(2 ); } finally { lock.unlock(); }
尝试加锁失败时等待一段时间,若仍失败则再停止
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 ReentrantLock lock = new ReentrantLock(); Thread t1 = new Thread(() -> { log.debug("启动..." ); try { if (!lock.tryLock(1 , TimeUnit.SECONDS)) { log.debug("获取等待 1s 后失败,返回" ); return ; } } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("获得了锁" ); } finally { lock.unlock(); } }, "t1" ); lock.lock(); log.debug("获得了锁" ); t1.start(); try { sleep(2 ); } finally { lock.unlock(); }
synchronized 和 Lock 的区别
原始构成 :sync
是JVM层面的,底层通过monitorenter
和monitorexit
来实现的。Lock
是JDK API层面的。(sync
一个enter
会有两个exit
,一个是正常退出,一个是异常退出)
使用方法 :sync
不需要手动释放锁(若发生异常也会自动释放),而Lock
需要手动释放unlock()
(通常写在finally代码块中防止发生异常无法释放同步监视器)
是否可中断 :sync
不可中断,除非抛出异常或者正常运行完成。Lock
是可中断的(lock.lockInterruptbily()
),通过调用interrupt()
方法可以打断锁,会抛出异常。
是否为公平锁 :sync
只能是非公平锁 ,而Lock
既能是公平锁,又能是非公平锁。
绑定多个条件 :sync
不能,只能随机唤醒。而Lock
可以通过Condition
来绑定多个条件,精确唤醒。
是否可以设置超时时间 :sync
不能,而Lock
可以设置超时时间,使用tryLock()
方法尝试加锁失败后就放弃加锁
Lock
的优势:
可设置为可以被打断 lock.lockInterruptbily()
可以设置超时时间
可以设置条件变量Condition
,精确唤醒
优先使用顺序:Lock
——> 同步代码块(已经进入了方法体,分配了相应资源)——> 同步方法(在方法体之外)
在高并发场景下,Lock
的性能远优于 synchronized
在并发度不高的场景下,sync
效率更高(得益于偏向锁和轻量级锁的设计)
公平锁与非公平锁
概念 :所谓公平锁 ,就是多个线程按照申请锁的顺序 来获取锁,类似排队,先到先得。而非公平锁 ,则是多个线程抢夺锁,会导致优先级反转 或饥饿现象 。
区别 :公平锁在获取锁时先查看此锁维护的等待队列 ,为空 或者当前线程是等待队列的队首 ,则直接占有锁,否则插入到等待队列,FIFO原则。非公平锁比较粗鲁,上来直接先尝试占有锁 ,失败则采用公平锁方式。非公平锁的优点是吞吐量 比公平锁更大。
公平锁:效率相对低,线程不会饿死
非公平锁:效率高,但线程可能饿死
非公平锁可能造成其他线程饿死的情况,即某一个线程拿着锁一直工作,其他线程一直拿不到锁。
synchronized
和ReentrantLock
默认都是非公平锁 。ReentrantLock
在构造的时候传入true
则是公平锁 :
1 ReentrantLock lock = new ReentrantLock(true );
不传参时,使用的是非公平锁;传入true时使用的是公平锁:
FairSync
内:
一般没必要设置为公平锁,因为其会降低并发度
自旋锁
SpinLock :所谓自旋锁,就是尝试获取锁的线程不会立即阻塞 ,而是采用循环的方式去尝试获取 。自己在那儿一直循环获取,就像“自旋 ”一样。这样的好处是减少线程切换的上下文开销 ,缺点是会消耗CPU 。CAS底层的getAndAddInt
就是自旋锁 思想。
提到了互斥同步对性能最大的影响阻塞的实现,挂起线程和恢复线程的操作都需要转入内核态完成,这些操作给系统的并发性能带来了很大的压力。同时,虚拟机的开发团队也注意到在许多应用上,共享数据的锁定状态只会持续很短的一段时间,为了这段时间去挂起和恢复线程并不值得 。如果物理机器有一个以上的处理器,能让两个或以上的线程同时并行执行,我们就可以让后面请求锁的那个线程 “稍等一下”,但不放弃处理器的执行时间,看看持有锁的线程是否很快就会释放锁。为了让线程等待,我们只需让线程执行一个忙循环(自旋),这项技术就是所谓的自旋锁。
《深入理解JVM.2nd》Page 398
1 2 while (!atomicReference.compareAndSet(null , thread)) { }
基于原子引用手写一个自旋锁 AtomicReference<Thread>
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicReference;public class SpinLockDemo { AtomicReference<Thread> atomicReference = new AtomicReference<>(); public void myLock () { Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName() + "\t come in " ); while (!atomicReference.compareAndSet(null , thread)) { } } public void myUnLock () { Thread thread = Thread.currentThread(); atomicReference.compareAndSet(thread, null ); System.out.println(Thread.currentThread().getName() + "\t invoked myUnlock()" ); } public static void main (String[] args) { SpinLockDemo spinLockDemo = new SpinLockDemo(); new Thread(() -> { spinLockDemo.myLock(); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } spinLockDemo.myUnLock(); }, "t1" ).start(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { spinLockDemo.myLock(); spinLockDemo.myUnLock(); }, "t2" ).start(); } }
自旋锁:竞争锁的失败的线程,并不会真实的在操作系统层面挂起等待,而是JVM会让线程做几个空循环(基于预测在不久的将来就能获得),在经过若干次循环后,如果可以获得锁,那么进入临界区,如果还不能获得锁,才会真实的将线程在操作系统层面进行挂起。
适用场景:自旋锁可以减少线程的阻塞,这对于锁竞争不激烈,且占用锁时间非常短的代码块来说,有较大的性能提升,因为自旋的消耗会小于线程阻塞挂起操作的消耗。
如果锁的竞争激烈,或者持有锁的线程需要长时间占用锁执行同步块,就不适合使用自旋锁了,因为自旋锁在获取锁前一直都是占用cpu做无用功,线程自旋的消耗大于线程阻塞挂起操作的消耗,造成cpu的浪费。
作者回复: 不错,自旋是种乐观情况的优化
读写锁
读写锁设计主要解决什么问题?用于读多写少 情况下读操作不会阻塞等待 ,提高并发性
读锁 是共享的 ,写锁 是独占的 。ReentrantLock
和synchronized
都是独占锁 ,独占锁就是一个锁 只能被一个线程 所持有。有的时候,需要读写分离 ,那么就要引入读写锁,即ReentrantReadWriteLock
。
读写锁 ReentrantReadWriteLock :一个资源可以被多个读线程同时访问,也可以单独被一个写线程访问,但不能同时存在读写线程,读写互斥,读读共享。
读写锁用于保证所有线程一定能读取到最新数据,用法是:在改数据时加写锁 ,在读数据时加读锁 。修改数据期间:
写锁是一个独占锁 (互斥锁/排他锁)
读锁是一个共享锁 。
写锁如果没有释放,读锁就得一直阻塞等待。
创建读锁:ReentrantReadWriteLock.readLock()
创建写锁:ReentrantReadWriteLock.writeLock()
读写锁有以下三个重要的特性:
公平选择性 :支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
重进入 :读锁和写锁都支持线程重进入。
锁降级 :遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。
具体案例:模拟多线程在 Map
中取数据和读数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 class MyCache { private volatile Map<String,Object> map = new HashMap<>(); private ReadWriteLock rwLock = new ReentrantReadWriteLock(); public void put (String key,Object value) { rwLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()+" 正在写操作" +key); TimeUnit.MICROSECONDS.sleep(300 ); map.put(key,value); System.out.println(Thread.currentThread().getName()+" 写完了" +key); } catch (InterruptedException e) { e.printStackTrace(); } finally { rwLock.writeLock().unlock(); } } public Object get (String key) { rwLock.readLock().lock(); Object result = null ; try { System.out.println(Thread.currentThread().getName()+" 正在读取操作" +key); TimeUnit.MICROSECONDS.sleep(300 ); result = map.get(key); System.out.println(Thread.currentThread().getName()+" 取完了" +key); } catch (InterruptedException e) { e.printStackTrace(); } finally { rwLock.readLock().unlock(); } return result; } } public class ReadWriteLockDemo { public static void main (String[] args) throws InterruptedException { MyCache myCache = new MyCache(); for (int i = 1 ; i <=5 ; i++) { final int num = i; new Thread(()->{ myCache.put(num+"" ,num+"" ); }, String.valueOf(i)).start(); } TimeUnit.MICROSECONDS.sleep(300 ); for (int i = 1 ; i <=5 ; i++) { final int num = i; new Thread(()->{ myCache.get(num+"" ); }, String.valueOf(i)).start(); } } }
四种不同情况下的锁(两个线程并发情况下):
读 + 读 :相当于无锁,并发读。
写 + 读 :读锁阻塞等待写锁释放才能读
写 + 写 :后来的写锁阻塞等待前一个写锁释放才能写
读 + 写 :后来的写锁阻塞等待其他读锁释放才能写(防止前一个线程还没读到数据就被后来的线程修改了)
只要有写锁的存在,其他线程无论是先写还是后写,都必须阻塞等待。
读写锁的缺点:
可能造成锁饥饿:大量的读操作会让写操作一直阻塞等待
锁降级 :某个线程先上写锁,再上读锁,执行完业务后先释放写锁,再释放读锁。这样外部看来就是写锁降级退化成了读锁。反过来却不行,先加读锁后不能再加写锁。
总结
在线程持有读锁的情况下,该线程和其他线程均不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)
在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败),但其他线程获取读锁
原因:当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。
倒计时锁 CountDownLatch
CountDownLatch
常用来进行线程间的同步协作 ,例如主线程等待所有线程都完成任务后再执行 。
CountDownLatch
类可以设置一个计数器 ,然后通过 countDown()
方法来进行减 1 的操作,使用 await()
方法等待计数器不大于 0,然后继续执行 await()
方法之后的语句。具体步骤可以演化为定义一个类,减1操作,并等待到0,为0执行结果。其缺点是,只能在构造方法里设置 count
,一旦用完后不能再重置了 。
该类的构造方法为:CountDownLatch(int count)
两个常用的主要方法:
await()
:使当前线程在锁存器倒计数至零之前一直在等待,除非线程被中断
countDown()
:递减锁存器的计数,如果计数达到零,将释放所有等待的线程
其应用场景:
同步等待多个线程准备完毕 :例如游戏开始准备。主线程调用 await()
方法阻塞等待其他线程(所有玩家线程)都准备完毕,各个线程在准备完毕后调用 countDown()
方法。主线程等到所有线程都调用完毕后才被唤醒
同步等待多个远程调用完毕 :例如 RPC 调用。主线程调用 await()
方法阻塞等待其他线程(RPC 调用线程)都完成远程调用后再执行。异步编排的 CompletableFuture
类就采用这种思想。
该类的缺点:只能得知其他线程都执行完毕,但无法得到这些线程的执行结果,若必须获得返回结果,可以使用 Future
或 CompletableFutrue
。
具体案例:6个同学陆续离开教室之后,班长才能锁门。如果不加 CountDownLatch
类,会出现线程混乱执行,同学还未离开教室班长就已经锁门了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class CountDownLatchDemo { public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6 ); for (int i = 1 ; i <=6 ; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+" 号同学离开了教室" ); countDownLatch.countDown(); }, String.valueOf(i)).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName()+" 班长锁门走人了" ); } }
循环栅栏 CyclicBarrier
CyclicBarrier
也是用来进行线程间的同步协作 ,作用与 CountDownLatch
非常相似,区别在于:
CyclicBarrier
的 count
值是可以重置的 ,而 CountDownLatch
则不行,因此得名为循环栅栏 。
CyclicBarrier
的 count
是从零开始增加直到临界点,而 CountDownLatch
的 count
是从最大值减小到零
二者的侧重点不同:
CountDownLatch
一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行
CyclicBarrier
一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行
CountDownLatch
是不能够重用的,而 CyclicBarrier
是可以重用的
CyclicBarrier
可以被比喻为『人满发车』
CyclicBarrier
类是一个同步辅助类 ,允许一组线程互相等待,直至到达某个公共屏障点 ,在设计一组固定大小的线程的程序中,这些线程必须互相等待直至达到公共屏障点。这个类很有用,因为barrier在释放等待线程后可以重用,所以称为循环barrier。
创建一个新的CyclicBarrier
,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动barrier时执行给定的屏障操作,该操作由最后一个进入barrier的线程操作。
常用的构造方法:CyclicBarrier(int parties,Runnable barrierAction)
常用的方法有:await()
:在所有的线程都已经在此barrier上调用 await()
方法之前一直等待
案例
案例一:集齐7颗龙珠就可以召唤神龙
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class CyclicBarrierDemo { private static final int NUMBER = 7 ; public static void main (String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, ()->{ System.out.println("***** 集齐7颗龙珠了!召唤神龙! *****" ); }); for (int i = 1 ; i <=7 ; i++) { new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+" 星龙被收集到了" ); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } }, String.valueOf(i)).start(); } } }
上述代码中,只有在7个线程都调用了 await()
方法才会执行 CyclicBarrier
对象事先设置好的代码“召唤神龙”。
案例二:来自《Java编程思想》的例子,展现CyclicBarrier的可循环性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 import java.util.concurrent.*;import java.util.*;class Horse implements Runnable { private static int counter = 0 ; private final int id = counter++; private int strides = 0 ; private static Random rand = new Random(47 ); private static CyclicBarrier barrier; public Horse (CyclicBarrier b) { barrier = b; } public synchronized int getStrides () { return strides; } public void run () { try { while (!Thread.interrupted()) { synchronized (this ) { strides += rand.nextInt(3 ); } barrier.await(); } } catch (InterruptedException e) { } catch (BrokenBarrierException e) { throw new RuntimeException(e); } } public String toString () { return "Horse " + id + " " ; } public String tracks () { StringBuilder s = new StringBuilder(); for (int i = 0 ; i < getStrides(); i++) s.append("*" ); s.append(id); return s.toString(); } } public class HorseRace { static final int FINISH_LINE = 75 ; private List<Horse> horses = new ArrayList<Horse>(); private ExecutorService exec = Executors.newCachedThreadPool(); private CyclicBarrier barrier; public HorseRace (int nHorses, final int pause) { barrier = new CyclicBarrier(nHorses, new Runnable() { public void run () { StringBuilder s = new StringBuilder(); for (int i = 0 ; i < FINISH_LINE; i++) s.append("=" ); System.out.println(s); for (Horse horse : horses) System.out.println(horse.tracks()); for (Horse horse : horses) if (horse.getStrides() >= FINISH_LINE) { System.out.println(horse + "won!" ); exec.shutdownNow(); return ; } try { TimeUnit.MILLISECONDS.sleep(pause); } catch (InterruptedException e) { System.out.println("barrier-action sleep interrupted" ); } } }); for (int i = 0 ; i < nHorses; i++) { Horse horse = new Horse(barrier); horses.add(horse); exec.execute(horse); } } public static void main (String[] args) { int nHorses = 7 ; int pause = 200 ; new HorseRace(nHorses, pause); } }
总结 :CyclicBarrier
的构造方法第一个参数是目标障碍数 ,每执行一次 await()
方法障碍数会加一,如果达到了目标障碍数,才会执行事先设置好的代码“召唤神龙”。
信号量 Semaphore
Semaphore
本质上也是使用 AQS 实现的 ,可以理解为是一个支持多个线程同时占有锁 的 ReentrantLock
信号量主要用于两个目的,一个是用于多个共享资源的互斥使用 ,另一个用于并发线程数的控制 。正常的锁在任何时刻都只允许一个任务访问一项资源 ,而 Semaphore
允许n个任务 同时访问这个资源。
使用信号量可以实现:限制线程数量 、限流 ,保证同一时刻只能有有限个线程工作。
从概念上讲,信号量维护了一个许可集 (车位集),如有必要,在许可可用前会阻塞每一个acquire()
(没车位了就阻塞等待别人释放车位),然后再获取该许可。每个 release()
添加一个许可(释放一个车位),从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore
只对可用许可的号码进行计数,并采取相应的行动。
具体常用的构造方法:new Semaphore(3)
设置许可数量为3(3个车位)
具体常用的方法有:
acquire()
:从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,除非线程被中断
release()
:释放一个许可,将其返回给信号量
一般 acquire()
都会抛出异常,release()
在 finally 代码块中执行。
CountDownLatch
的问题是不能复用 。比如count=3
,那么加到3,就不能继续操作了。而Semaphore
可以解决这个问题,比如6辆车3个停车位,对于CountDownLatch
只能停3辆车 ,而Semaphore
可以停6辆车,车位空出来后,其它车可以占有:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class SemaphoreDemo { public static void main (String[] args) { Semaphore semaphore = new Semaphore(3 ); for (int i = 1 ; i <=6 ; i++) { new Thread(()->{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+" 抢到了车位" ); TimeUnit.SECONDS.sleep(new Random().nextInt(5 )); System.out.println(Thread.currentThread().getName()+" ------离开了车位" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }, String.valueOf(i)).start(); } } }
当前共有三个车位(三个许可)。之后三个线程先后获取许可0,许可量为0,代表没有多余的许可给其他线程了。此后再想来获取许可的线程会阻塞等待其他线程释放许可,才能继续获取到许可。
Semaphore 原理
Semaphore
本质上也是使用 AQS 实现的 ,可以理解为是一个支持多个线程同时占有锁 的 ReentrantLock
。该类内部包含一个 AQS 的同步器类:
其实现原理为:将构造方法里设置的信号量设置到 AQS 同步器类 Sync
的 state
属性中,使得最多只能有 N 个线程能抢占到锁 (此时 state
减少为 0),其余的线程都使用 AQS 中的阻塞机制进入双向链表阻塞队列中等待,直到这些占有锁的线程释放锁,使得 state
大于 0,并唤醒阻塞队列中的线程,重新竞争锁。因此该流程和 AQS 非常相似。
初始状态:state = 3
,代表可以有三个线程同时占有锁,其他线程都得进入阻塞队列
其他线程都得进入阻塞队列
某个时刻 Thread-4 释放了 permits,令 state = 1
,并唤醒队列中的 Thread-0
Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark()
接
下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态 (该过程属于 AQS 原理)
总结
CountDownLatch
和 CyclicBarrier
都能够实现线程之间的等待,只不过它们侧重点不同:
CountDownLatch
一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行
CyclicBarrier
一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行
CountDownLatch
是不能够重用的,而 CyclicBarrier
是可以重用的
Semaphore
其实和锁有点类似,它一般用于控制对某组资源的访问权限,使得同时只能有有限个线程访问资源
阻塞队列 BlockingQueue
BlockingQueue 简介
JUC 线程池的底层应用了阻塞队列存储排队的线程
JUC 包中,BlockingQueue
很好地解决了多线程中如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
阻塞队列是共享队列(多线程操作),一端输入,一端输出。不能无限放队列,满了之后就会进入阻塞,取出也同理
当队列是空的,从队列中获取元素的操作将会被阻塞
当队列是满的,从队列中添加元素的操作将会被阻塞
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
在多线程领域:所谓阻塞,在某些情况下会挂起 线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起。
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。
当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列
当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒
阻塞队列 (BlockingQueue)是 java util.concurrent
包下重要的数据结构,BlockingQueue
提供了线程安全的队列访问方式:
当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;
从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。
并发包下很多高级同步类的实现都是基于 BlockingQueue
实现的。
阻塞队列的阻塞功能是通过 lock 锁的**条件变量(Condition)**阻塞控制实现的
生产者在队列 满或消费者在队列空 时使用 condition.await()
方法 + while(true)
阻塞等待
生产者在消费数据后 或消费者在在添加数据后 使用 condition.signal()
唤醒阻塞线程
使用阻塞队列可以实现消息队列(生产者-消费者模型)
为什么需要 BlockingQueue?
在JUC包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。使用后我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue
都给你一手包办了。
体系 :Collection
→ Queue
→ BlockingQueue
→ 七个阻塞队列实现类。
种类
类名
作用
ArrayBlockingQueue
由数组 构成的有界 阻塞队列
LinkedBlockingQueue
由链表 构成的有界 阻塞队列
PriorityBlockingQueue
支持优先级排序的无界阻塞队列
DelayQueue
支持优先级的延迟无界阻塞队列
SynchronousQueue
单个元素 的阻塞队列
LinkedTransferQueue
由链表构成的无界阻塞队列
LinkedBlockingDeque
由链表构成的双向阻塞队列
粗体标记的三个用得比较多,许多消息中间件底层就是用它们实现的 。
需要注意的是LinkedBlockingQueue
虽然是有界的,但有个巨坑,其默认大小是Integer.MAX_VALUE
,高达21亿,一般情况下内存早爆了(在线程池的ThreadPoolExecutor
有体现)。
1. ArrayBlockingQueue
由数组 结构组成的有界 阻塞队列。ArrayBlockingQueue
在生产者放入数据和消费者获取数据,都是共用同一个锁对象,无法并行。
2. LinkedBlockingQueue
由链表 结构组成的有界 阻塞队列(但大小默认值为integer.MAX_VALUE
)。
之所以能够高效地处理并发数据,是因为其对于生产者端和消费者端分别采用了独立的锁 来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
3. DelayQueue
使用优先级队列 实现的延迟无界 阻塞队列。
DelayQueue
中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue
是一个没有大小限制 的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞
4. PriorityBlockingQueue
支持优先级排序 的无界 阻塞队列。不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。
5. SynchronousQueue
不存储元素的阻塞队列,也即单个元素 的队列。一种无缓冲 的等待队列。相对于有缓冲的 BlockingQueue
来说,少了一个中间经销商的环节(缓冲区)。
声明一个 SynchronousQueue
有两种不同的方式,它们之间有着不太一样的行为。
公平模式和非公平模式的区别:
公平模式:SynchronousQueue
会采用公平锁,并配合一个 FIFO 队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
非公平模式(默认):SynchronousQueue
采用非公平锁,同时配合一个 LIFO 队列来管理多余的生产者和消费者
后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现即饥饿的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理
6. LinkedTransferQueue
由链表 结构组成的无界 阻塞队列。
预占模式:意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,生成一个节点(节点元素为 null)入队,消费者线程被等待在这个节点上,生产者线程入队时发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。
7. LinkedBlockingDeque
由链表 结构组成的双向 阻塞队列。阻塞有两种情况:
插入元素时:如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再该元素插入,该操作可以通过设置超时参数,超时后返回 false 表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出 InterruptedException
异常
读取元素时:如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可以通过设置超时参数
常用方法
创建阻塞队列 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
抛出异常类型
执行该类型方法时,若阻塞队列已满/空,再向队列中插入/移除时将抛出异常
加入元素:blockingQueue.add("a")
检查元素:blockingQueue.element()
取出元素:blockingQueue.remove()
特殊值类型
执行该类型方法时,若阻塞队列已满/空,再向队列中插入/移除时,成功返回true
,失败返回false
加入元素:blockingQueue.offer("a")
检查元素:blockingQueue.peek()
取出元素:blockingQueue.poll()
阻塞类型
执行该类型方法时,若阻塞队列已满/空,再向队列中插入/移除时,线程进入阻塞状态
加入元素:blockingQueue.put("a");
取出元素:blockingQueue.take()
超时类型
执行该类方法时,若阻塞队列已满/空,再向队列中插入/移除时,线程进入阻塞状态,但等待一段时间后还无法插入/移除数据时,线程将退出
加入元素:blockingQueue.offer("w", 3L, TimeUnit.SECONDS)
取出元素:blockingQueue.poll( 3L, TimeUnit.SECONDS)
SynchronousQueue
队列只有一个元素 ,如果想插入多个,必须等队列元素取出后,才能插入,只能有一个“坑位”,用一个插一个:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package concurrent.blockingqueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.TimeUnit;public class SynchronousQueueDemo { public static void main (String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue<String>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "\t put 1" ); blockingQueue.put("1" ); System.out.println(Thread.currentThread().getName() + "\t put 2" ); blockingQueue.put("2" ); System.out.println(Thread.currentThread().getName() + "\t put 3" ); blockingQueue.put("3" ); } catch (InterruptedException e) { e.printStackTrace(); } }, "AAA" ).start(); new Thread(() -> { try { try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t take " + blockingQueue.take()); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t take " + blockingQueue.take()); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t take" + blockingQueue.take()); } catch (Exception e) { e.printStackTrace(); } }, "BBB" ).start(); } }
阻塞队列的应用——生产者消费者
传统模式
传统模式使用Lock
来进行操作,需要手动加锁、解锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public class ProdConsTradiDemo { public static void main (String[] args) { ShareData shareData = new ShareData(); new Thread(() -> { for (int i = 0 ; i < 5 ; i++) { try { shareData.increment(); } catch (Exception e) { e.printStackTrace(); } } }, "Producer" ).start(); new Thread(() -> { for (int i = 0 ; i < 5 ; i++) { try { shareData.decrement(); } catch (Exception e) { e.printStackTrace(); } } }, "Consumer" ).start(); } } class ShareData { private int number = 0 ; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment () throws InterruptedException { lock.lock(); try { while (number != 0 ) { condition.await(); } number++; System.out.println(Thread.currentThread().getName() + "\t" + number); condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement () throws InterruptedException { lock.lock(); try { while (number == 0 ) { condition.await(); } number--; System.out.println(Thread.currentThread().getName() + "\t" + number); condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
阻塞队列模式
使用阻塞队列就不需要手动加锁了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 package concurrent.blockingqueue;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;class MyResource { private volatile boolean FLAG = true ; private AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQueue = null ; public MyResource (BlockingQueue<String> blockingQueue) { this .blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } public void myProd () throws Exception { String data = null ; boolean retValue; while (FLAG) { data = atomicInteger.incrementAndGet() + "" ; retValue = blockingQueue.offer(data, 2L , TimeUnit.SECONDS); if (retValue) { System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "成功" ); } else { System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "失败" ); } try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "\t 停止生产,表示FLAG=false,生产介绍" ); } public void myConsumer () throws Exception { String retValue; while (FLAG) { retValue = blockingQueue.poll(2L , TimeUnit.SECONDS); if (retValue != null && retValue != "" ) { System.out.println(Thread.currentThread().getName() + "\t 消费队列:" + retValue + "成功" ); } else { FLAG = false ; System.out.println(Thread.currentThread().getName() + "\t 消费失败,队列中已为空,退出" ); return ; } } } public void stop () { this .FLAG = false ; } } public class ProdConsumerBlockingQueueDemo { public static void main (String[] args) { MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10 )); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t 生产线程启动" ); System.out.println("" ); System.out.println("" ); try { myResource.myProd(); System.out.println("" ); System.out.println("" ); } catch (Exception e) { e.printStackTrace(); } }, "prod" ).start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t 消费线程启动" ); try { myResource.myConsumer(); } catch (Exception e) { e.printStackTrace(); } }, "consumer" ).start(); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("5秒中后,生产和消费线程停止,线程结束" ); myResource.stop(); } }
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 java.util.concurrent.ArrayBlockingQueue producer 生产线程启动 consumer 消费线程启动 producer 插入队列:1成功 consumer 消费队列:1成功 producer 插入队列:2成功 consumer 消费队列:2成功 producer 插入队列:3成功 consumer 消费队列:3成功 producer 插入队列:4成功 consumer 消费队列:4成功 producer 插入队列:5成功 consumer 消费队列:5成功 5秒后,生产和消费线程停止,线程结束 producer 停止生产,表示FLAG=false,生产结束 consumer 消费失败,队列中已为空,退出