【JUC】线程池
线程池
连接池是创建和管理连接 Connection 的缓冲池的技术,这些连接准备好被任何需要它们的线程使用
为什么要使用线程池?在高并发场景下,如果有 1000 万个请求同时访问服务器,则服务器需要新建 1000 万个线程,这样无疑是非常浪费资源的。需要一种技术,能够控制和管理线程的数量,只能同时工作一定数量的线程,使得后来的请求阻塞等待。线程池可以解决这个问题
线程池(ThreadPool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。其特点:
- 线程复用
- 管理线程
- 控制最大并发数
线程池不仅能够保证内核的充分利用,还能防止过分调度。优势:
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
- 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
Java 中的线程池是通过 Executor
框架实现的,该框架中用到了 Executor
,Executors
,ExecutorService
,ThreadPoolExecutor
这几个类( Executors
为工具类,用于创建几种基础线程池):
线程池状态
ThreadPoolExecutor
使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量,ThreadPoolExecutor
类中的线程状态变量如下:
1 | // Integer.SIZE 值为 32 |
线程池状态和线程池中线程的数量由一个原子整型 ctl
来共同表示。使用一个数来表示两个值的主要原因是:可以通过一次 CAS 操作同时更改两个属性的值
1 | // 原子整数,前 3 位保存了线程池的状态,剩余位保存的是线程数量 |
获取线程池状态、线程数量以及合并两个值的操作:
1 | // Packing and unpacking ctl |
shutdown()
当调用线程池的 shutdown()
方法时,线程池将不再接受新的请求创建线程,但其也不会立即停止工作,而是等待正在运行的线程执行完毕后(包括阻塞队列里的)才会整体关闭。
线程池中工作的线程都不是守护线程,即使主线程运行完毕也会继续执行。所以调用 shutdown()
方法后仍然会等待之前正在工作的线程执行完毕。
Tomcat 中的线程都是守护线程,一旦 shutdown() 后就都会立即停止
线程池属性
线程池的主要属性:
1 | // 工作线程,内部封装了Thread |
阻塞队列(BlockingQueue)是 java util.concurrent
包下重要的数据结构,BlockingQueue
提供了线程安全的队列访问方式:
- 当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;
- 从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。
并发包下很多高级同步类的实现都是基于 BlockingQueue
实现的。
阻塞队列的阻塞功能是通过 lock 锁的多条件(Condition)阻塞控制实现的,队列满或空时使用 condition.await()
方法 + while(true)
阻塞等待;使用 condition.signal()
唤醒阻塞线程
使用阻塞队列可以实现消息队列(生产者-消费者模型)
线程池种类
Executors
类提供工厂方法来创建线程池:
Executors.newFixedThreadPool(int)
:一池 N 线程Executors.newSingleThreadExecutor()
:一池一线程Executors.newCachedThreadPool()
:一池可扩容根据需求创建线程
但三者底层都是 new 的 ThreadPoolExecutor
,只不过传入的参数不同而已:
特点:
Executors.newFixedThreadPool(int)
:一池 N 线程- 核心线程数等于最大线程数(没有救急线程)
- 阻塞队列是无界的,可以放任意数量的任务
- 适用于任务量已知,相对耗时的任务
Executors.newCachedThreadPool()
:一池可扩容,根据需求创建线程- 核心线程数是 0, 最大线程数是
Integer.MAX_VALUE
,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收),救急线程可以无限创建 - 队列没有容量,没有线程来取是放不进去的(相当于必须一手交钱一手交货)
- 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1 分钟后释放线程。
- 适合任务数比较密集,但每个任务执行时间较短的情况
- 核心线程数是 0, 最大线程数是
Executors.newSingleThreadExecutor()
:一池一线程。- 线程数固定为 1,任务数多于 1 时,会放入无界队列排队
- 任务执行完毕,这唯一的线程也不会被释放
- 和自己创建单线程执行任务的区别:自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证线程池的正常工作
Executors.newSingleThreadExecutor()
线程个数始终为1,不能修改FinalizableDelegatedExecutorService
应用的是装饰器模式,只对外暴露了ExecutorService
接口,因此不能调用ThreadPoolExecutor
中特有的方法- 和
Executors.newFixedThreadPool(1)
初始时为1时的区别:Executors.newFixedThreadPool(1)
初始时为1,以后还可以修改,对外暴露的是ThreadPoolExecutor
对象,可以强转后调用setCorePoolSize()
等方法进行修改
提供的基础线程池的缺点:
FixedThreadPool
和SingleThreadExecutor
: 允许请求的队列长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOMCachedThreadPool
和ScheduledThreadPool
: 允许创建的线程数量为Integer.MAX_VALUE
,可能会创建大量线程,从而导致 OOM
因此实际开发场景都需要根据自己机器和业务需求自定义创建线程池。
一个线程池中的任务有相互依赖就可能引发线程饥饿,前提是只有固定线程数,没有救急线程,线程数不足导致的线程饥饿。
案例代码:
1 | // 演示线程池三种常用分类 |
任务调度线程池
在『任务调度线程池』功能加入之前,可以使用 java.util.Timer
来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
使用 ScheduledExecutorService
改写:
- 整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放,用来执行延迟或反复执行的任务。
ScheduledExecutorService
中scheduleAtFixedRate()
方法可以执行定时任务ScheduledExecutorService
中scheduleWithFixedDelay
方法的使用
处理执行任务异常
如果线程池中的线程执行任务时,如果任务抛出了异常,默认是中断执行该任务而不是抛出异常或者打印异常信息。若想捕获该异常:
方法1:主动捉异常
1 | ExecutorService pool = Executors.newFixedThreadPool(1); |
方法2:使用 Future
,错误信息都被封装进 submit
方法的返回方法中
1 | ExecutorService pool = Executors.newFixedThreadPool(1); |
线程池底层原理
ThreadPoolExecutor
的构造方法需要传入7个参数:
1 | public ThreadPoolExecutor(int corePoolSize, |
7 个参数解读:
int corePoolSize
:常驻线程数量(核心)int maximumPoolSize
:线程池中能够容纳同时执行的最大线程数量long keepAliveTime,TimeUnit unit
:多余的空闲线程(救急线程、扩容线程)的存活时间。当前线程池中线程数量超过corePoolSize
时,且空闲时间达到keepAliveTime
的扩容线程会被销毁BlockingQueue<Runnable> workQueue
:阻塞队列(已提交但是尚未执行的线程将放入其中)ThreadFactory threadFactory
:线程工厂,用于创建线程,一般用默认工厂即可RejectedExecutionHandler handler
:拒绝策略(线程与阻塞队列都满了会执行拒绝策略)
线程池具体工作流程
线程池具体工作流程:
- 在创建了线程池后,线程池中的线程数为零
- 当调用
execute()
方法添加一个请求任务时,线程池会做出如下判断:- 如果正在运行的线程数量小于
corePoolSize
,那么**马上创建线程(核心线程)**运行这个任务; - 如果正在运行的线程数量大于或等于
corePoolSize
,那么将这个任务放入阻塞队列等待空余线程; - 如果这个时候队列满了且正在运行的线程数量还小于
maximumPoolSize
,那么还是要创建非核心线程(扩容线程)立刻运行这个任务; - 如果队列满了且正在运行的线程数量大于或等于
maximumPoolSize
,那么线程池会启动饱和拒绝策略来执行。
- 如果正在运行的线程数量小于
- 当一个线程完成任务时,它会从阻塞队列中取下一个任务来执行
- 当一个线程无事可做(空闲)超过一定的时间(
keepAliveTime
)时(即过期时),线程池会判断:- 如果当前运行的线程数大于
corePoolSize
,那么这个线程(扩容线程)就被停掉。 - 所以线程池的所有任务完成后,它最终会收缩到
corePoolSize
的大小(核心线程不会被停掉,会一直存活等待任务)。
- 如果当前运行的线程数大于
细节
- 当核心线程都被占用时,再来新的线程并不是立即使用扩容线程,而是直接进入阻塞队列等待(所谓的懒加载线程)。只有阻塞队列也满了,再新来的线程才会使用扩容线程
- 如果扩容线程也满了,则再来的新线程就要使用拒绝策略进行拒绝了
- 阻塞队列里阻塞的请求会在所有线程(包括核心线程和扩容线程)处理完自己的任务后依次取出执行。若扩容线程空闲一定时间(存活时间)后仍无任务可以执行时就被销毁
- 核心线程运行完任务也不会停止,会一直存活等待新任务,直到线程池被关闭
为什么先进入阻塞队列等待而不是直接创建非核心线程?
如果先增加非核心线程再加入队列,那么有可能会出现频繁的创建和销毁线程的情况,线程是稀有资源,频繁的创建和销毁线程正是线程池所避讳的。如果先阻塞队列满了再增加非核心线程,则可以表明一段时间内任务数是在稳步提升并且大于核心线程的处理速度的,这种情况下适合创建非核心线程。
拒绝策略
CallerRunsPolicy
:回退,线程请求从哪个线程来的回哪个线程去执行。当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大AbortPolicy
:(默认配置)丢弃任务,并抛出拒绝执行RejectedExecutionException
异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。DiscardPolicy
:直接丢弃,不抛异常DiscardOldestPolicy
:当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列workQueue
中最老的一个任务,并将新任务加入
其它著名框架也提供了自己线程池的实现:
- Dubbo 的实现,在抛出
RejectedExecutionException
异常之前会记录日志,并 dump 线程栈信息,方便定位问题 - Netty 的实现,是创建一个新线程来执行任务
- ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
- PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
- Tomcat 的实现,如果总线程数达到
maximumPoolSize
,这时不会立刻抛RejectedExecutionException
异常,而是再次尝试将任务放入队列,如果还失败,才抛出RejectedExecutionException
异常
案例:一个线程池,核心线程数为 7,最大线程数为 20,阻塞队列的大小为 50。假设同时有 100 个并发请求访问服务器。则线程池将如何分配线程?
线程池将为首先来的 7 个请求分配 7 个核心线程。然后再来的 50 个请求都将进入阻塞队列等待。在接下来的 13 个请求将会使用剩余 13 个扩容线程。最后来的 30 个请求就使用拒绝策略进行拒绝。
阻塞队列里阻塞的请求会在所有线程(包括核心线程和扩容线程)处理完自己的任务后依次取出执行。若扩容线程空闲一定时间(存活时间)后仍无任务可以执行时就被销毁
自定义线程池
实际在开发中不允许使用Executors
创建,而是通过ThreadPoolExecutor
的方式自定义线程数参数,规避资源耗尽风险。原因是 Executors
创建的线程池允许的请求队列长度为 Integer.MAX_VALUE
,这可能会堆积大量的请求,从而导致OOM:
自定义线程池:
1 | ExecutorService threadPool = new ThreadPoolExecutor( |
Web 场景属于 I/O 密集型任务,通常可以将线程数设置的大一些。参考公式:CPU 核数 / (1 - 阻塞系数)。阻塞系数在 0.8 ~ 0.9 之间比如 8 核 CPU:8/(1-0.9)=80
个线程数
Spring Boot 配置自定义线程池
- 自定义线程池,并注入到 Spring 容器中:
1 | /** |
- 配置文件绑定类:
1 | // 绑定配置文件中的 yunmall.thread 前缀 |
- 配置文件:
1 | # 自定义线程池配置 |
- 开启配置文件自动提示自定义的前缀:
1 | <!-- 加了以后,配置文件中就会有自定义的提示了 --> |
手写线程池
主程序:
1 |
|
自定义拒绝策略接口:
1 |
|
自定义线程池:
1 |
|
自定义阻塞队列:
1 | // 实现阻塞队列 |
工作思路:
- 当调用线程池的
executor()
方法时:- 如果正在工作的线程数小于核心线程数,就为该任务创建一个
Worker
类对象(代表一个核心工作线程,继承自Thread
类),然后立即调用该工作线程的worker.start()
方法 - 如果正在工作的线程数大于等于核心线程数,就将该任务
task
放入到阻塞队列taskQueue.tryPut(rejectPolicy, task)
,并且唤醒一次emptyWaitSet
,通知阻塞着的核心工作线程现在有新线程了
- 如果正在工作的线程数小于核心线程数,就为该任务创建一个
- 每个核心工作线程
worker
重写的run()
方法将不断尝试从阻塞队列中获取新任务taskQueue.take()
:- 如果队列中有任务
task
(Runnbale
接口实现类)就获取到,并执行其start()
方法。然后继续去队列中获取下一个任务 - 如果队列为空就阻塞等待队列中被添加新的任务
emptyWaitSet.await()
(使用Condition
的方式阻塞)
- 如果队列中有任务
- 当 main 方法调用线程池的
executor()
方法时发现正在工作的线程数大于等于核心线程数,就将该任务task
放入到阻塞队列taskQueue.tryPut(rejectPolicy, task)
,并且唤醒一次emptyWaitSet
,通知阻塞着的核心工作线程现在有新线程了(emptyWaitSet.signal()
),这样前面阻塞着的工作线程就可以干活了
线程池是通过 Condition 机制,与阻塞队列进行通信的,从而不断从阻塞队列中获取任务。队列为空时阻塞等待,一有新任务就会被唤醒。
Tomcat 线程池
Tomcat 中连接器组件 Connector 与线程池相关的部分结构:
单 Reactor 多线程模型
上图中共有三类线程:
Acceptor
:唯一一个线程,只负责接收 Socket 连接Poller
:唯一一个线程,Acceptor
接收到的 Socket 连接交给Poller
,其负责判断该Socket
是否有可读的 I/OExecutors
:如果有可读 I/O,就会从线程池里取一个线程处理该请求
Tomcat 扩展线程池的拒绝策略为:如果总线程数达到 maximumPoolSize
,这时不会立刻抛 RejectedExecutionException
异常,而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException
异常。
上述提到的 Tomcat 中的线程都是守护线程,一旦 Tomcat 关闭就会立即销毁这些线程。
Connector 配置
Executor 线程配置
默认是懒惰创建线程。
线程池配置合理线程数
CPU 密集型
CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。常为大量科学计算的任务。CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些。
CPU密集型任务配置尽可能少的线程数量,避免上下文切换,一般公式:(CPU核数 + 1)个线程的线程池
加一的原因是保证当线程由于页缺失故障(操作系统)或其它原因导致而暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
l/O 密集型
I/O 密集型,即该任务需要大量的 I/O,即大量的阻塞。此情景下 CPU 不总是处于繁忙状态。
Web 应用场景通常为 I/O 密集型。例如,当执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程 RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了(因为这些线程因为 I/O 而阻塞了,无法使用系统资源,因此失去 CPU 使用权),完全可以多创建一些线程来提高 CPU 的利用率。
在单线程上运行I/O密集型的任务会导致浪费大量的CPU运算能力浪费在等待。所以在I/O密集型任务中使用多线程可以大大加速程序运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。
I/O 密集型时,大部分线程都阻塞,故需要多配置线程数,参考公式:CPU核数/ (1-阻塞系数)。阻塞系数在 0.8 ~ 0.9 之间比如 8 核 CPU:8/(1-0.9)=80
个线程数
Fork/Join
Fork/Join 也是线程池技术中的一种(Fork:分支,Join:合并)
Fork/Join 可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。可以理解为并行版本的递归调用,例如在二叉树查找中令两个分支并行的递归执行。
Fork/Join 框架要完成两件事情:
- 任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
- 执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。
在Java的 Fork/Join 框架中,使用两个类完成上述操作
ForkJoinTask
:我们要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务。该类提供了在任务中执行 fork 和 join 的机制。通常情况下我们不需要直接集成ForkJoinTask
类,只需要继承它的子类,Fork/Join 框架提供了两个子类:RecursiveAction
:用于没有返回结果的任务RecursiveTask
:用于有返回结果的任务
ForkJoinPool
:线程池的一种,实现了Executor
接口。ForkJoinTask
需要通过ForkJoinPool
来执行RecursiveTask
:继承后可以实现递归调用的任务
示例:
1 | class Fibonacci extends RecursiveTask<Integer> { |
Fork/Join 框架的实现原理
ForkJoinPool
由 ForkJoinTask
数组和 ForkJoinWorkerThread
数组组成,ForkJoinTask
数组负责存放以及将程序提交给 ForkJoinPool
,而 ForkJoinWorkerThread
负责执行这些任务。
Fork/Join 框架的实现原理见文档 JUC并发编程。
使用案例
具体案例:1加到100,相加两个数值不能大于10
1 | class MyTask extends RecursiveTask<Integer> { |