Java 线程池的认识和使用

多线程编程很难,难点在于多线程代码的执行不是按照我们直觉上的执行顺序。所以多线程编程必须要建立起一个宏观的认识。

线程池是多线程编程中的一个重要概念。为了能够更好地使用多线程,学习好线程池当然是必须的。

为什么要使用线程池?

平时我们在使用多线程的时候,通常都是架构师配置好了线程池的 Bean,我们需要使用的时候,提交一个线程即可,不需要过多关注其内部原理。

在学习一门新的技术之前,我们还是先了解下为什么要使用它,使用它能够解决什么问题:

  1. 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率

例如:

记创建线程消耗时间T1,执行任务消耗时间T2,销毁线程消耗时间T3

如果T1+T3>T2,那么是不是说开启一个线程来执行这个任务太不划算了!

正好,线程池缓存线程,可用已有的闲置线程来执行新任务,避免了T1+T3带来的系统开销

  1. 线程并发数量过多,抢占系统资源从而导致阻塞

我们知道线程能共享系统资源,如果同时执行的线程过多,就有可能导致系统资源不足而产生阻塞的情况

运用线程池能有效的控制线程最大并发数,避免以上的问题

  1. 对线程进行一些简单的管理

比如:延时执行、定时循环执行的策略等

运用线程池都能进行很好的实现

创建一个线程池

在 Java 中,新建一个线程池对象非常简单,Java 本身提供了工具类java.util.concurrent.Executors,可以使用如下代码创建一个固定数量线程的线程池:

1
ExecutorService service = Executors.newFixedThreadPool(10);

注意:以上代码用来测试还可以,实际使用中最好能够显示地指定相关参数。

我们可以看下其内部源码实现:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

在阿里巴巴代码规范中,建议我们自己指定线程池的相关参数,为的是让开发人员能够自行理解线程池创建中的每个参数,根据实际情况,创建出合理的线程池。接下来,我们来剖析下java.util.concurrent.ThreadPoolExecutor的构造方法参数。

ThreadPoolExecutor 浅析

java.util.concurrent.ThreadPoolExecutor有多个构造方法,我们拿参数最多的构造方法来举例,以下是阿里巴巴代码规范中给出的创建线程池的范例:

1
2
3
4
5
ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());

首先最重要的几个参数,可能就是:corePoolSizemaximumPoolSizeworkQueue了,先看下这几个参数的解释:

  • corePoolSize

    用于设定 thread pool 需要时刻保持的最小 core threads 的数量,即便这些 core threads 处于空闲状态啥事都不做也不会将它们回收掉,当然前提是你没有设置 allowCoreThreadTimeOut 为 true。至于 pool 是如何做到保持这些个 threads 不死的,我们稍后再说。

  • maximumPoolSize

    用于限定 pool 中线程数的最大值。如果你自己构造了 pool 且传入了一个 Unbounded 的 queue 且没有设置它的 capacity,那么不好意思,最大线程数会永远 <= corePoolSize,maximumPoolSize 变成了无效的。

  • workQueue

    该线程池中的任务队列:维护着等待执行的 Runnable 对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务

由于本文是初步了解线程池,所以先理解这几个参数,上文对于这三个参数的解释,基本上跟JDK源码中的注释一致(java.util.concurrent.ThreadPoolExecutor#execute里的代码)。

我们编写个程序来方便理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 创建线程池
ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
// 等待执行的runnable
Runnable runnable = () -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
};

// 启动的任务数量
int counts = 1224;
for (int i = 0; i < counts; i++) {
service.execute(runnable);
}

// 监控线程池执行情况的代码
ThreadPoolExecutor tpe = ((ThreadPoolExecutor) service);
while (true) {
System.out.println();

int queueSize = tpe.getQueue().size();
System.out.println("当前排队线程数:" + queueSize);

int activeCount = tpe.getActiveCount();
System.out.println("当前活动线程数:" + activeCount);

long completedTaskCount = tpe.getCompletedTaskCount();
System.out.println("执行完成线程数:" + completedTaskCount);

long taskCount = tpe.getTaskCount();
System.out.println("总线程数:" + taskCount);

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

线程池的容量与我们启动的任务数量息息相关。

已知:

  • corePoolSize = 5
  • maximumPoolSize = 200
  • workQueue.size() = 1024

我们修改同时 execute 添加到线程池的 Runnable 数量 counts:

  • counts <= corePoolSize:所有的任务均为核心线程执行,没有任何 Runnable 被添加到 workQueue中
1
2
3
4
当前排队线程数:0
当前活动线程数:3
执行完成线程数:0
总线程数:3
  • corePoolSize < counts <= corePoolSize + workQueue.size():所有任务均为核心线程执行,当核心线程处于繁忙状态,则将任务添加到 workQueue 中等待
1
2
3
4
当前排队线程数:15
当前活动线程数:5
执行完成线程数:0
总线程数:20
  • corePoolSize + workQueue.size() < counts <= maximumPoolSize + workQueue.size():corePoolSize 个线程由核心线程执行,超出队列长度 workQueue.size() 的任务,将另启动非核心线程执行
1
2
3
4
当前排队线程数:1024
当前活动线程数:105
执行完成线程数:0
总线程数:1129
  • counts > maximumPoolSize + workQueue.size():将会报异常java.util.concurrent.RejectedExecutionException
1
java.util.concurrent.RejectedExecutionException: Task com.bwjava.util.ExecutorServiceUtilTest$$Lambda$1/314265080@725bef66 rejected from java.util.concurrent.ThreadPoolExecutor@2aaf7cc2[Running, pool size = 200, active threads = 200, queued tasks = 1024, completed tasks = 0]

线程池踩坑:线程嵌套导致阻塞

这次的踩坑才是我写这篇文章的初衷,借此机会好好了解下线程池的各个概念。本身这段时间在研究爬虫,为了尽量提高爬虫的效率,用到了多线程处理。由于代码写得比较随性,所以遇到了一个阻塞的问题,研究了一下才搞明白,模拟的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());

@Test
public void testBlock() {
Runnable runnableOuter = () -> {
try {
Runnable runnableInner1 = () -> {
try {
TimeUnit.SECONDS.sleep(3); // 模拟比较耗时的爬虫操作
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Future<?> submit = service.submit(runnableInner1);

submit.get(); // 实际业务中,runnableInner2需要用到此处返回的参数,所以必须get

Runnable runnableInner2 = () -> {
try {
TimeUnit.SECONDS.sleep(5); // 模拟比较耗时的爬虫操作
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Future<?> submit2 = service.submit(runnableInner2);
submit2.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
};

for (int i = 0; i < 20; i++) {
service.execute(runnableOuter);
}

ThreadPoolExecutor tpe = ((ThreadPoolExecutor) service);

while (true) {
System.out.println();

int queueSize = tpe.getQueue().size();
System.out.println("当前排队线程数:" + queueSize);

int activeCount = tpe.getActiveCount();
System.out.println("当前活动线程数:" + activeCount);

long completedTaskCount = tpe.getCompletedTaskCount();
System.out.println("执行完成线程数:" + completedTaskCount);

long taskCount = tpe.getTaskCount();
System.out.println("总线程数:" + taskCount);

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

线程池是前文的线程池,参数完全不变。线程的监控代码也一致。当我们运行这个单元测试的时候,会发现打印出来的结果一直是如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
当前排队线程数:15
当前活动线程数:5
执行完成线程数:0
总线程数:20

当前排队线程数:20
当前活动线程数:5
执行完成线程数:0
总线程数:25

当前排队线程数:20
当前活动线程数:5
执行完成线程数:0
总线程数:25

……略

根本问题是 Runnable 内部还嵌套了 Runnable ,且他们都提交到了一个线程池。下面分步骤说明问题:

  1. runnableOuter 被提交到了线程池
  2. runnableOuter 开始执行,runnableInner1 被提交到线程池,对 runnableInner1 的结果进行 get,导致runnableOuter 被阻塞
    于此同时,更多的 runnableOuter 被提交到线程池,核心线程被 runnableOuter 和 runnableInner1 占满,多余的线程 runnableInner2 被加入 workQueue 中等待执行
  3. runnableInner2 被提交到线程池,但是因为核心线程已满,被提交到了 workQueue ,也处于阻塞状态,此时对 runnableInner2 的结果进行 get,导致 runnableOuter 被阻塞
  4. runnableOuter 被阻塞,无法释放核心线程资源,而 runnableInner2 又因为无法得到核心线程资源,只能呆在 workQueue 里,导致整个程序卡死,无法返回。(有点类似死锁,互相占有了资源,对方不释放,我也不释放)

用图表示大概为:

既然明白了出错的原因,那么解决起来就简单了。这个案例告诉我们,设计一个多线程程序,一定要自顶向下有一个良好的设计,然后再开始编码,不能够盲目地使用多线程、线程池,这样只会导致程序出现莫名其妙的错误。

动态修改 corePoolSize & maximumPoolSize

其实这个我没怎么关注过,曾经在一次面试中被问到过。很简单,java.util.concurrent.ThreadPoolExecutor提供了Setter方法,可以直接设置相关参数。按我目前的实践经验,几乎没有用到过,但是知道这个聊胜于无吧。特定的复杂场景下应该很有用。

线程池和消息队列

笔者在实际工程应用中,使用过多线程和消息队列处理过异步任务。很多新手工程师往往弄不清楚这两者的区别。按笔者的浅见:

多线程是用来充分利用多核 CPU 以提高程序性能的一种开发技术,线程池可以维持一个队列保存等待处理的多线程任务,但是由于此队列是内存控制的,所以断电或系统故障后未执行的任务会丢失。

消息队列是为消息处理而生的一门技术。其根据消费者的自身消费能力进行消费的特性使其广泛用于削峰的高并发任务处理。此外利用其去耦合的特性也可以实现代码上的解耦。消息队列大多可以对其消息进行持久化,即使断电也能够恢复未被消费的任务并继续处理。

以上是笔者在学习实践之后对于多线程和消息队列的粗浅认识,初学者切莫混淆两者的作用。