【JUC】线程池

线程池

连接池是创建管理连接 Connection 的缓冲池的技术,这些连接准备好被任何需要它们的线程使用

为什么要使用线程池?在高并发场景下,如果有 1000 万个请求同时访问服务器,则服务器需要新建 1000 万个线程,这样无疑是非常浪费资源的。需要一种技术,能够控制和管理线程的数量,只能同时工作一定数量的线程,使得后来的请求阻塞等待。线程池可以解决这个问题

线程池(ThreadPool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。其特点:

  • 线程复用
  • 管理线程
  • 控制最大并发数

线程池不仅能够保证内核的充分利用,还能防止过分调度。优势:

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
  • 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

Java 中的线程池是通过 Executor框架实现的,该框架中用到了 ExecutorExecutorsExecutorServiceThreadPoolExecutor这几个类( Executors为工具类,用于创建几种基础线程池):

img

线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量,ThreadPoolExecutor 类中的线程状态变量如下:

1
2
3
4
5
6
7
8
// Integer.SIZE 值为 32 
private static final int COUNT_BITS = Integer.SIZE - 3;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

image-20220212200125851

线程池状态和线程池中线程的数量由一个原子整型 ctl 来共同表示。使用一个数来表示两个值的主要原因是:可以通过一次 CAS 操作同时更改两个属性的值

1
2
3
4
5
6
7
8
9
10
11
// 原子整数,前 3 位保存了线程池的状态,剩余位保存的是线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 并不是所有平台的 int 都是 32 位。
// 去掉前三位保存线程状态的位数,剩下的用于保存线程数量
// 高3位为0,剩余位数全为1
private static final int COUNT_BITS = Integer.SIZE - 3;

// 2^COUNT_BITS次方,表示可以保存的最大线程数
// CAPACITY 的高3位为 0
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

获取线程池状态、线程数量以及合并两个值的操作:

1
2
3
4
5
6
7
8
9
10
11
// Packing and unpacking ctl
// 获取运行状态
// 该操作会让除高3位以外的数全部变为0
private static int runStateOf(int c) { return c & ~CAPACITY; }

// 获取运行线程数
// 该操作会让高3位为0
private static int workerCountOf(int c) { return c & CAPACITY; }

// 计算ctl新值
private static int ctlOf(int rs, int wc) { return rs | wc; }

shutdown()

当调用线程池的 shutdown() 方法时,线程池将不再接受新的请求创建线程,但其也不会立即停止工作,而是等待正在运行的线程执行完毕后(包括阻塞队列里的)才会整体关闭。

线程池中工作的线程都不是守护线程,即使主线程运行完毕也会继续执行。所以调用 shutdown() 方法后仍然会等待之前正在工作的线程执行完毕。

Tomcat 中的线程都是守护线程,一旦 shutdown() 后就都会立即停止

线程池属性

线程池的主要属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 工作线程,内部封装了Thread
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
...
}

// 阻塞队列,用于存放来不及被核心线程执行的任务
private final BlockingQueue<Runnable> workQueue;

// 锁
private final ReentrantLock mainLock = new ReentrantLock();

// 用于存放核心线程的容器,只有当持有锁时才能够获取其中的元素(核心线程)
private final HashSet<Worker> workers = new HashSet<Worker>();

阻塞队列(BlockingQueue)是 java util.concurrent 包下重要的数据结构,BlockingQueue 提供了线程安全的队列访问方式:

  • 当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;
  • 从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。

并发包下很多高级同步类的实现都是基于 BlockingQueue 实现的。

阻塞队列的阻塞功能是通过 lock 锁的多条件(Condition)阻塞控制实现的,队列满或空时使用 condition.await() 方法 + while(true) 阻塞等待;使用 condition.signal() 唤醒阻塞线程

使用阻塞队列可以实现消息队列(生产者-消费者模型)


线程池种类

Executors 类提供工厂方法来创建线程池:

  • Executors.newFixedThreadPool(int):一池 N 线程
  • Executors.newSingleThreadExecutor():一池一线程
  • Executors.newCachedThreadPool():一池可扩容根据需求创建线程

但三者底层都是 new 的 ThreadPoolExecutor,只不过传入的参数不同而已:

image-20210918192102967

image-20210918192119156

image-20210918192130791

特点:

  • Executors.newFixedThreadPool(int):一池 N 线程
    • 核心线程数等于最大线程数(没有救急线程
    • 阻塞队列是无界的,可以放任意数量的任务
    • 适用于任务量已知相对耗时的任务
  • Executors.newCachedThreadPool():一池可扩容,根据需求创建线程
    • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收),救急线程可以无限创建
    • 队列没有容量,没有线程来取是放不进去的(相当于必须一手交钱一手交货)
    • 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1 分钟后释放线程。
    • 适合任务数比较密集,但每个任务执行时间较短的情况
  • Executors.newSingleThreadExecutor():一池一线程。
    • 线程数固定为 1,任务数多于 1 时,会放入无界队列排队
    • 任务执行完毕,这唯一的线程也不会被释放
    • 和自己创建单线程执行任务的区别:自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证线程池的正常工作
    • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
      • FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
      • Executors.newFixedThreadPool(1) 初始时为1时的区别:Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改,对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize() 等方法进行修改

提供的基础线程池的缺点:

  • FixedThreadPoolSingleThreadExecutor : 允许请求的队列长度Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM
  • CachedThreadPoolScheduledThreadPool : 允许创建的线程数量Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM

因此实际开发场景都需要根据自己机器和业务需求自定义创建线程池。

一个线程池中的任务有相互依赖就可能引发线程饥饿,前提是只有固定线程数,没有救急线程,线程数不足导致的线程饥饿。

案例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 演示线程池三种常用分类
public class ThreadPoolDemo1 {
public static void main(String[] args) {
// 一池五线程
ExecutorService threadPool1 = Executors.newFixedThreadPool(5); // 5个窗口
// 一池一线程
ExecutorService threadPool2 = Executors.newSingleThreadExecutor(); // 一个窗口
// 一池可扩容线程
ExecutorService threadPool3 = Executors.newCachedThreadPool();

// 10个顾客请求
try {
for (int i = 1; i <=10; i++) {
//执行
threadPool3.execute(()->{
System.out.println(Thread.currentThread().getName()+" 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭线程池
threadPool3.shutdown();
}
}
}

任务调度线程池

在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

使用 ScheduledExecutorService 改写:

  • 整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放,用来执行延迟或反复执行的任务。
  • ScheduledExecutorServicescheduleAtFixedRate() 方法可以执行定时任务
  • ScheduledExecutorServicescheduleWithFixedDelay 方法的使用

处理执行任务异常

如果线程池中的线程执行任务时,如果任务抛出了异常,默认是中断执行该任务而不是抛出异常或者打印异常信息。若想捕获该异常:

方法1:主动捉异常

1
2
3
4
5
6
7
8
9
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> {
try {
log.debug("task1");
int i = 1 / 0;
} catch (Exception e) {
log.error("error:", e);
}
});

方法2:使用 Future,错误信息都被封装进 submit 方法的返回方法中

1
2
3
4
5
6
7
ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> f = pool.submit(() -> {
log.debug("task1");
int i = 1 / 0;
return true;
});
log.debug("result:{}", f.get());

线程池底层原理

ThreadPoolExecutor 的构造方法需要传入7个参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

7 个参数解读:

  • int corePoolSize:常驻线程数量(核心)
  • int maximumPoolSize:线程池中能够容纳同时执行的最大线程数量
  • long keepAliveTime,TimeUnit unit:多余的空闲线程(救急线程、扩容线程)的存活时间。当前线程池中线程数量超过 corePoolSize时,且空闲时间达到 keepAliveTime 的扩容线程会被销毁
  • BlockingQueue<Runnable> workQueue:阻塞队列(已提交但是尚未执行的线程将放入其中)
  • ThreadFactory threadFactory:线程工厂,用于创建线程,一般用默认工厂即可
  • RejectedExecutionHandler handler:拒绝策略(线程与阻塞队列都满了会执行拒绝策略)

线程池具体工作流程

线程池具体工作流程:

  1. 在创建了线程池后,线程池中的线程数为零
  2. 当调用 execute() 方法添加一个请求任务时,线程池会做出如下判断:
    1. 如果正在运行的线程数量小于corePoolSize,那么**马上创建线程(核心线程)**运行这个任务;
    2. 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入阻塞队列等待空余线程;
    3. 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程(扩容线程)立刻运行这个任务;
    4. 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
  3. 当一个线程完成任务时,它会从阻塞队列中取下一个任务来执行
  4. 当一个线程无事可做(空闲)超过一定的时间(keepAliveTime)时(即过期时),线程池会判断:
    1. 如果当前运行的线程数大于corePoolSize,那么这个线程(扩容线程)就被停掉。
    2. 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小(核心线程不会被停掉,会一直存活等待任务)。

img

img

细节

  • 当核心线程都被占用时,再来新的线程并不是立即使用扩容线程,而是直接进入阻塞队列等待(所谓的懒加载线程)。只有阻塞队列也满了,再新来的线程才会使用扩容线程
  • 如果扩容线程也满了,则再来的新线程就要使用拒绝策略进行拒绝了
  • 阻塞队列里阻塞的请求会在所有线程(包括核心线程和扩容线程)处理完自己的任务后依次取出执行。若扩容线程空闲一定时间(存活时间)后仍无任务可以执行时就被销毁
  • 核心线程运行完任务也不会停止,会一直存活等待新任务,直到线程池被关闭

为什么先进入阻塞队列等待而不是直接创建非核心线程?

如果先增加非核心线程再加入队列,那么有可能会出现频繁的创建和销毁线程的情况,线程是稀有资源,频繁的创建和销毁线程正是线程池所避讳的。如果先阻塞队列满了再增加非核心线程,则可以表明一段时间内任务数是在稳步提升并且大于核心线程的处理速度的,这种情况下适合创建非核心线程。

拒绝策略

  • CallerRunsPolicy:回退,线程请求从哪个线程来的回哪个线程去执行。当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
  • AbortPolicy(默认配置)丢弃任务,并抛出拒绝执行 RejectedExecutionException异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
  • DiscardPolicy:直接丢弃,不抛异常
  • DiscardOldestPolicy:当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue最老的一个任务,并将新任务加入

img

img

其它著名框架也提供了自己线程池的实现:

  • Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题
  • Netty 的实现,是创建一个新线程来执行任务
  • ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
  • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
  • Tomcat 的实现,如果总线程数达到 maximumPoolSize,这时不会立刻抛 RejectedExecutionException 异常,而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常

案例:一个线程池,核心线程数为 7,最大线程数为 20,阻塞队列的大小为 50。假设同时有 100 个并发请求访问服务器。则线程池将如何分配线程?

线程池将为首先来的 7 个请求分配 7 个核心线程。然后再来的 50 个请求都将进入阻塞队列等待。在接下来的 13 个请求将会使用剩余 13 个扩容线程。最后来的 30 个请求就使用拒绝策略进行拒绝。

阻塞队列里阻塞的请求会在所有线程(包括核心线程和扩容线程)处理完自己的任务后依次取出执行。若扩容线程空闲一定时间(存活时间)后仍无任务可以执行时就被销毁


自定义线程池

实际在开发中不允许使用Executors创建,而是通过ThreadPoolExecutor的方式自定义线程数参数,规避资源耗尽风险。原因是 Executors 创建的线程池允许的请求队列长度为 Integer.MAX_VALUE,这可能会堆积大量的请求,从而导致OOM:

在这里插入图片描述

自定义线程池:

1
2
3
4
5
6
7
8
9
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);

Web 场景属于 I/O 密集型任务,通常可以将线程数设置的大一些。参考公式:CPU 核数 / (1 - 阻塞系数)。阻塞系数在 0.8 ~ 0.9 之间比如 8 核 CPU:8/(1-0.9)=80 个线程数

Spring Boot 配置自定义线程池

  1. 自定义线程池,并注入到 Spring 容器中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 加上下面的注解就不需要再在ConfigProperties类上加@Component注解了(加了反而会导致报错:重复注入同一个组件)
* @EnableConfigurationProperties(ThreadPoolConfigProperties.class)
* 如果不加,也可以在入参中自动注入
*/
@Configuration
public class MyThreadConfig {
/**
* 向容器中注入一个自定义线程池,并使用配置文件中配置的参数
* @param pool 自动注入 ThreadPoolConfigProperties,其绑定了配置文件中的相关参数
* @return
*/
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
return new ThreadPoolExecutor(pool.getMaxSize(), pool.getMaxSize(), pool.getKeepAliveTime(),
TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
}
}
  1. 配置文件绑定类:
1
2
3
4
5
6
7
8
9
// 绑定配置文件中的 yunmall.thread 前缀
@Data
@Component // 如果上面加了 @EnableConfigurationProperties(xxx),这里就不能再加 @Component,否则会重复注入该组件
@ConfigurationProperties(prefix = "yunmall.thread")
public class ThreadPoolConfigProperties {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveTime;
}
  1. 配置文件:
1
2
3
4
5
6
# 自定义线程池配置
yunmall:
thread:
core-size: 50
max-size: 200
keep-alive-time: 10
  1. 开启配置文件自动提示自定义的前缀:
1
2
3
4
5
6
<!--  加了以后,配置文件中就会有自定义的提示了 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>

手写线程池

主程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j(topic = "c.ThreadPoolTest")
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(
1, 1000, TimeUnit.MILLISECONDS, 1,
(queue, task) -> {
// 自定义具体的拒绝策略
// 1. 阻塞等待。
// queue.put(task);
// 2. 带超时的等待
// queue.offer(task, 500, TimeUnit.MILLISECONDS);
// 3. 调用者放弃
// log.info("放弃 {}", task);
// 4. 调用者抛出异常
// throw new RuntimeException("任务执行失败" + task);
// 5. 调用者自己执行任务
task.run();
});
// 从线程池中获取四个线程进行执行
for (int i = 0; i < 4; i++) {
int j = i;
threadPool.executor(() ->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("{}", j);
});
}
}
}

自定义拒绝策略接口:

1
2
3
4
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}

自定义线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
// 线程集合
private Set<Worker> works = new HashSet<Worker>();
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程池的核心数
private int coreSize;
// 获取任务的超时时间
private long timeout;
private TimeUnit unit;
// 使用策略模式
private RejectPolicy<Runnable> rejectPolicy;

public ThreadPool(int coreSize, long timeout, TimeUnit unit, int queueCapacity,
RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.unit = unit;
taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}

// 执行任务
public void executor(Runnable task) {
// 如果线程池满了. 就将任务加入到任务队列, 否则执行任务
synchronized (works) {
if (works.size() < coreSize) {
// 如果工作线程数没满,就可以立即执行该任务
Worker worker = new Worker(task);
log.info("新增 worker {} ,任务 {}", worker, task);
works.add(worker);
worker.start();
} else {
// taskQueue.put(task);
// 如果核心线程都在工作,就将该任务添加到阻塞队列
// 可以设置一直阻塞等待添加到队列,也可以设置等待一定时间后就放弃添加
taskQueue.tryPut(rejectPolicy, task);
}
}
}

class Worker extends Thread {
// 当前工作线程的 Runnable 对象
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
// 执行任务
// 1)当 task 不为空,执行任务
// 2)当 task 执行完毕,再接着从任务队列获取任务并执行,
@Override
public void run() {
// while (task != null || (task = taskQueue.poll(timeout, unit)) != null) {
while (task != null || (task = taskQueue.take()) != null) {
try {
log.info("正在执行 {}", task);
task.run();
} catch (Exception e) {

} finally {
task = null;
}
}
synchronized (works) {
log.info("worker 被移除 {}", this);
works.remove(this);
}
}
}
}

自定义阻塞队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// 实现阻塞队列
@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
// 阻塞队列的容量
private int capacity;
// 双端链表, 从头取, 从尾加
private Deque<T> queue;
// 定义锁
private ReentrantLock lock;
// 当阻塞队列满了时候, 去 fullWaitSet 休息, 生产者条件变量
private Condition fullWaitSet;
// 当阻塞队列空了时候,去 emptyWaitSet 休息, 消费者条件变量
private Condition emptyWaitSet;

public BlockingQueue(int capacity) {
queue = new ArrayDeque<>(capacity);
lock = new ReentrantLock();
fullWaitSet = lock.newCondition();
emptyWaitSet = lock.newCondition();
this.capacity = capacity;
}

// 线程池从阻塞队列中获取任务。如果队列为空就阻塞等待直到有任务
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}

// 带有超时时间的获取。如果超时队列还是空的就不再获取任务
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
// 同一时间单位
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if(nanos <= 0) {
return null;
}
// 防止虚假唤醒, 返回的是所剩时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}

// 添加。如果队列满了就阻塞等待,直到添加成功
public void put(T task) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
log.info("等待加入任务队列 {}", task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
// 带有超时时间的添加,超过时间就不再添加
public boolean offer(T task, long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if(nanos <= 0) {
return false;
}
log.info("等待加入任务队列 {}", task);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}

// 如果队列已满就使用拒绝策略
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 判断判断是否满
if(queue.size() == capacity) {
rejectPolicy.reject(this, task);
} else { // 有空闲
log.info("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}

public int getSize() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}

工作思路:

  • 当调用线程池的 executor() 方法时:
    • 如果正在工作的线程数小于核心线程数,就为该任务创建一个 Worker 类对象(代表一个核心工作线程,继承自 Thread 类),然后立即调用该工作线程的 worker.start() 方法
    • 如果正在工作的线程数大于等于核心线程数,就将该任务 task 放入到阻塞队列 taskQueue.tryPut(rejectPolicy, task),并且唤醒一次 emptyWaitSet,通知阻塞着的核心工作线程现在有新线程了
  • 每个核心工作线程 worker 重写的 run() 方法将不断尝试从阻塞队列中获取新任务 taskQueue.take()
    • 如果队列中有任务 taskRunnbale 接口实现类)就获取到,并执行其 start() 方法。然后继续去队列中获取下一个任务
    • 如果队列为空就阻塞等待队列中被添加新的任务 emptyWaitSet.await()(使用 Condition 的方式阻塞)
  • 当 main 方法调用线程池的 executor() 方法时发现正在工作的线程数大于等于核心线程数,就将该任务 task 放入到阻塞队列 taskQueue.tryPut(rejectPolicy, task),并且唤醒一次 emptyWaitSet,通知阻塞着的核心工作线程现在有新线程了(emptyWaitSet.signal()),这样前面阻塞着的工作线程就可以干活了

线程池是通过 Condition 机制,与阻塞队列进行通信的,从而不断从阻塞队列中获取任务。队列为空时阻塞等待,一有新任务就会被唤醒。

image-20220214160018087

Tomcat 线程池

Tomcat 中连接器组件 Connector 与线程池相关的部分结构:

image-20220212222743311

单 Reactor 多线程模型

上图中共有三类线程:

  • Acceptor:唯一一个线程,只负责接收 Socket 连接
  • Poller:唯一一个线程,Acceptor 接收到的 Socket 连接交给 Poller,其负责判断该 Socket 是否有可读的 I/O
  • Executors:如果有可读 I/O,就会从线程池里取一个线程处理该请求

Tomcat 扩展线程池的拒绝策略为:如果总线程数达到 maximumPoolSize,这时不会立刻抛 RejectedExecutionException 异常,而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常。

上述提到的 Tomcat 中的线程都是守护线程,一旦 Tomcat 关闭就会立即销毁这些线程。

Connector 配置

img

Executor 线程配置

img

默认是懒惰创建线程

线程池配置合理线程数

CPU 密集型

CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。常为大量科学计算的任务。CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些。

CPU密集型任务配置尽可能少的线程数量,避免上下文切换,一般公式:(CPU核数 + 1)个线程的线程池

加一的原因是保证当线程由于页缺失故障(操作系统)或其它原因导致而暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费

l/O 密集型

I/O 密集型,即该任务需要大量的 I/O,即大量的阻塞。此情景下 CPU 不总是处于繁忙状态。

Web 应用场景通常为 I/O 密集型。例如,当执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程 RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了(因为这些线程因为 I/O 而阻塞了,无法使用系统资源,因此失去 CPU 使用权),完全可以多创建一些线程来提高 CPU 的利用率。

在单线程上运行I/O密集型的任务会导致浪费大量的CPU运算能力浪费在等待。所以在I/O密集型任务中使用多线程可以大大加速程序运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。

I/O 密集型时,大部分线程都阻塞,故需要多配置线程数,参考公式:CPU核数/ (1-阻塞系数)。阻塞系数在 0.8 ~ 0.9 之间比如 8 核 CPU:8/(1-0.9)=80 个线程数

Fork/Join

Fork/Join 也是线程池技术中的一种(Fork:分支,Join:合并)

Fork/Join 可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。可以理解为并行版本的递归调用,例如在二叉树查找中令两个分支并行的递归执行。

Fork/Join 框架要完成两件事情:

  • 任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
  • 执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。

在Java的 Fork/Join 框架中,使用两个类完成上述操作

  • ForkJoinTask:我们要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务。该类提供了在任务中执行 fork 和 join 的机制。通常情况下我们不需要直接集成 ForkJoinTask 类,只需要继承它的子类,Fork/Join 框架提供了两个子类:
    • RecursiveAction:用于没有返回结果的任务
    • RecursiveTask:用于有返回结果的任务
  • ForkJoinPool:线程池的一种,实现了 Executor接口。ForkJoinTask 需要通过 ForkJoinPool来执行
  • RecursiveTask:继承后可以实现递归调用的任务

示例:

1
2
3
4
5
6
7
8
9
10
11
12
class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) { this.n = n; }
Integer compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}

Fork/Join 框架的实现原理

ForkJoinPoolForkJoinTask数组和 ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放以及将程序提交给 ForkJoinPool,而 ForkJoinWorkerThread 负责执行这些任务。

image-20210917181012212

image-20210917181020781

Fork/Join 框架的实现原理见文档 JUC并发编程

示意图:https://blog.csdn.net/tyrroo/article/details/81483608

img

使用案例

具体案例:1加到100,相加两个数值不能大于10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class MyTask extends RecursiveTask<Integer> {

// 拆分差值不能超过10,计算10以内运算
private static final Integer VALUE = 10;
private int begin ;// 拆分开始值
private int end;// 拆分结束值
private int result ; // 返回结果

// 创建有参数构造
public MyTask(int begin,int end) {
this.begin = begin;
this.end = end;
}

// 拆分和合并过程
@Override
protected Integer compute() {
// 判断相加两个数值是否大于10
if((end-begin)<=VALUE) {
// 相加操作
for (int i = begin; i <=end; i++) {
result = result+i;
}
} else {
// 进一步拆分
// 获取中间值
int middle = (begin + end) / 2;
// 拆分左边
MyTask task01 = new MyTask(begin, middle);
// 拆分右边
MyTask task02 = new MyTask(middle + 1, end);
// 调用方法拆分
task01.fork();
task02.fork();
// 合并结果
result = task01.join() + task02.join();
}
return result;
}
}

public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建MyTask对象
MyTask myTask = new MyTask(0,100);
// 创建分支合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
// 获取最终合并之后结果
Integer result = forkJoinTask.get();
System.out.println(result);
// 关闭池对象
forkJoinPool.shutdown();
}
}