线程池 ThreadPoolExecutor 详解与应用
Java 应用中对宝贵的稀缺资源池化是保障系统稳定运行,优化系统响应速度的重要手段。
线程池的运用场景非常广,几乎所有需要异步或并发执行任务的程序都可以使用线程池。
合理使用线程池能带来 3 个好处:
降低资源消耗:通过重复利用已创建的线程,降低创建线程和销毁线程产生的消耗。
提高响应效率:当任务到达时,不需要等待线程创建就能立即执行。
提高线程的可管理性:线程是稀缺资源,不能无限创建,否则会消耗系统资源,降低系统稳定性。
使用线程池可以进行统一分配、调优、监控。
线程池处理流程
当提交一个新任务时,线程池的处理流程如上图:
- 判断核心线程池是否都在执行任务,如果否,则创建一个线程执行任务;如果是,进入下个流程。
- 线程池判断工作队列是否已满,如果否,则将新任务加入到工作队列中;如果是,进入下个流程。
- 判断线程池是否都处理工作状态,如果否,则创建一个线程执行任务;如果是,则交给饱和策略来处理此任务。
ThreadPoolExecutor
ThreadPoolExecutor 执行 execute() 方法的示意图:
ThreadPoolExecutor 遇到下面 4 种情况会执行 execute() 方法:
- 如果正在运行的线程数小于 corePoolSize,则创建新线程执行任务(注意,要先获得全局锁)。
- 如果运行的线程等于或大小 corePoolSize,则将任务加入 BlockingQueue。
- 如果 BlockingQueue 已满,则创建新的线程来处理任务(注意,要先获得全局锁)。
- 如果创建新线程将使当前运行的线程数超过 maximumPoolSize,任务将被拒绝,并调用 RejectExecutionHandler.rejectedExecution() 方法。
采用此设计的思路是在调用 **execute()**方法时,尽可能避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。
源码简析:
1 | public void execute(Runnable command) { |
- 在
execute()
方法中创建一个线程时,会让这个线程执行当前任务。 - 当前任务执行完后,会反复从 BlockingQueue 获取任务来执行。
线程池的使用
可以通过 ThreadPoolExecutor 的构造方法创建一个线程池。
1 | public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, |
构造方法参数
参数名 | 描述 |
---|---|
corePoolSize | 核心线程池数量,新提交任务将创建一个新线程执行任务, 即使其它核心线程空闲能够执行新任务也会创建线程, 等到需要执行的任务数大于核心线程数时就不再创建 |
maximumPoolSize | 最大线程池数量,允许创建的最大线程数。 如果队列满了,且已创建的线程数,不超过该值时, 则会创建新的线程执行任务。 |
keepAliveTime | 超过 corePoolSize 的线程的空闲时间,超过该时间则被销毁 即线程活动保持时间 |
TimeUnit | keepAliveTime 时间单位,TimeUnit 是个枚举类。 |
workQueue | 任务队列,保存等待执行的任务的阻塞队列 |
threadFactory | 线程工厂,用于创建线程,可以给线程设置更有意义的名字。 使用 Guava 提供的 ThreadFactoryBuilder 可以快速给线程池里的线程设置名称。 |
RejectedExecutionHandler | 拒绝策略(饱和策略),当队列和线程池已满,必须采取的处理策略。 |
任务队列类型
BlockingQueue(任务队列):用于保存等待执行的任务的阻塞队列(Runnable 对象),可以选择以类型的阻塞队列。
ArrayBlockingQueue:基于数组结构的
有界阻塞队列
,此队列按FIFO
(先进先出)原则对元数排序。若有新的任务需要执行,线程池创建新的线程直到线程数量达到
corePoolSize
,则将新的任务加入到等待队列中。
若等待队列已满,则创建线程执行,直到线程数量达到maximumPoolSize
,若大于maximumPoolSize
,则执行拒绝策略。超出corePoolSize
的线程是有空闲存话时间的。注意,通过构造函数创建线程池, 构建
ArrayBlockingQueue
需要要传入容量
,若容量足够大或没有达到超负荷的状态,线程数将一直维持在corePoolSize
以下,若队列已满,则以maximumPoolSize
为上限。LinkedBlockingDeque:基于链表结构的
无界阻塞队列
,此队列按FIFO
排序元素,吞吐量通常高于ArrayBlockingQueue
。构建LinkedBlockingDeque
最好是传入容量。使用此队列,maximumPoolSize
参数将无效,即使队列中缓存了很多待执行的任务,当线程数达到corePoolSize
后就不再增加;若后续有新的任务进入,则直接加入队列等待。静态工厂方法
Executors.newFixedThreadPool()
使用这个队列。队列默认容量为Integer.MAX_VALUE
。如果任务提交速度持续大于处理速度,会造成队列大量阻塞,因队列很大,很可能在执行拒绝策略之前就内存溢出
,所以需要注意任务提交与处理之间的协调控制。SynchronousQueue:
不保存元素的阻塞队列
,或称为直接提交队列,会马上提交执行。
吞吐量通常要高于LinkedBlockingQueue
,静态工厂Executors.newCachedThreadPool
使用此队列。每执行一个插入操作就会阻塞,直到另一个线程调用删除操作才会被唤醒;反之每一个删除操作也都要等待一个插入操作。
PriorityBlockingQueue:具有
优先级
的无限阻塞队列
。除了第一个任务直接创建线程执行外,其他的任务都会被放入了优先队列,按优先级进行重新排列执行,线程池创建的线程数不会超过
corePoolSize
的数量。其它队列按照先进先出的规则处理任务,而优先队列可以自定义规则根据任务的优先级顺序先后执行。
拒绝策略类型
RejectedExecutionHandler(拒绝执行策略),当队列和线程池都满时,说明线程池处于饱和状态,则必须采用一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常。
- AbortPolicy:直接抛出异常,阻止系统下一步工作。
- CallerRunsPolicy:如果线程数达到上线,该策略会把任务队列中的任务放在调用者线程中运行。
- DiscardPolicy:丢弃无法处理的任务,不处理。使用此策略,需要注意业务场景中是否允许任务丢失。
- DiscardOldestPolicy:丢弃队列中最老的一个任务(最先被添加进队列,即最近要执行的任务),并尝试再次提交。
可以根据应用场景需要来实现 RejectedExecutionHandler 接口自定义策略。如记录日志或持久化存储不能处理的任务。
向线程池提交任务
可以使用 execute()
和 submit()
方法向线程池提交任务。
execute() 方法
该方法提交的是不需要返回值的任务,所以也无法判断任务是否被线程池执行成功。如下示例:
1
2
3
4
5
6poolExecutor.execute(new Runnable() {
public void run() {
//
}
});submit() 方法
提交的任务需要返回值时使用该方法。submit() 方法的实现由 ThreadPoolExecutor 的抽象父类 AbstractExecutorService 提供。
使用该方法,线程池会返回一个 Future 类型的对象,通过该对象可以判断线程是否执行成功。
通过 future 的 get() 方法来获取返回值, get() 方法会阻塞当前线程直到任务完成。
还有 get(long timeout, TimeUnit unit) 方法,会阻塞当前线程一段时间后立即返回,有可能任务还没执行完。
使用示例如下:
1
2
3
4
5
6
7
8
9
10
11
12Future<Object> future = poolExecutor.submit(needReturnValuTask);
try {
Object o = future.get();
} catch (InterruptedException e) {
//处理中断异常
} catch (ExecutionException e) {
//处理无法执行任务异常
} finally {
//关闭线程池
poolExecutor.shutdown();
}
关闭线程池
线程池类 ThreadPoolExecutor
提供了 shutdown()
和 shutdownNow()
两个方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt
方法来中断线程,所以无法响应中断的任务可能永远无法终止,但两者还是有一定的区别。
shutdown()
调用该方法后,先设置线程池的状态为
SHUTDOWN
,线程池会继续执行中的任务,中止正在待待的任务,但不接受新的任务,如果有新的任务进入,会抛出 RejectedExecutionHandler 类型错误。当调用 shutdown() 方法,会立即返回,而不会阻塞等待先前提交的任务完成执行。若要等待任务完成执行可以使用 awaitTermination() 方法。
shutdownNow()
调用该方法后,先设置线程池的状态为
STOP
,线程池会尝试停止所有正在执行的任务,中止正在等待的任务,并返回正在等待执行的任务列表。从此方法返回后,将从任务队列中清空(删除)这些任务。当调用 shutdownNow() 方法后, 线程池会尽最大努力(best-effort) 尝试停止正在执行的任务,但是没有保证的。
这两个方法任意一个被调用,isShutdown()
方法返回 true
。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminating()
方法返回 true
。
合理配置线程池
要合理配置线程池,需要对任务特性从以下几个角度进行分析
- 性质:CPU密集性、IO密集性、混合型。
- 优先级:高,中,低。
- 执行时间:长,中,短。
- 依赖性:是否依赖其他系统资源,如数据库连接池。
性质
性质不同的任务可以创建不同规模的线程池分开处理。
CPU 密集型任务应配置尽可能小的线程,如配置
N(cpu) + 1
个线程的线程池。IO 密集型任务并不是一直在执行,则应配置尽可能多的线程,如
2 * N(cpu)
。混合性任务可以拆分成一个 CPU密集型任务和一个 IO密集型任务,只要这两个任务执行的时间相关不是太大,那么拆分后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相关太大,则没必要进行折分。
可以通过
Runtime.getRuntime().availableProcessors()
来获取当前服务嘎嘎的 CPU 个数。优先级
优先级不同的任务可以使用 优先级队列(PriorityBlockingQueue)来处理,让任务按优先顺序执行。
注意:如果一直有优先级高的任务提交到队列,则低优先级的任务可能永远不能被执行。
执行时间
执行时间不同的任务可以交给不同规模的线程池来处理,或者使用优先级队列,让执行时间短的任务先执行。
依赖性
依赖外部资源返回结果的,等待时间越长,则 CPU 空闲时间越长,那么线程数应设置得越大,这样才能更好地利用 CPU。
建议使用 有界阻塞队列(ArrayBlockingQueue),能增加系统的稳定性和预警能力,可以根据需要设置大一点,如几千。特别注意因依赖外部资源(如数据库操作)响应缓慢,而导致工作线程全部阻塞,线程队列和线程池全满的问题。若设置 无界限阻塞队列,使用的是默认容量,线程队列则可能持续堆积耗尽内存,导致整个系统不可用。
线程池工作具类:Utils-线程池和线程数管理工具类
线程池的监控
系统中使用大量的线程池,有必要对线程池进行监控,方便在出现问题时可以快速定位。
可以使用线程池提供的参数属性进行监控。
- taskCount:需要执行的总任务数。正要执行的任务数 + 队列中的任务数 + 已完成的任务数,是个近似值,因线程状态可能被动态改变。
- comletedTaskCount:已完成的任务数,小于或等于 taskCount。
- largestPoolSize:曾经创建过的最大线程数。通过这个数据可以知道线程池是否有满过。
- getPoolSize:获取当前线程池中的线程数。
- getActiveCount:获取正在执行任务的线程数,是个近似值,因为线程状态是动态改变的。
线程池监控:可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute(Thread t, Runnable r),afterExecute(Runnable r, Throwable t),terminated() 方法,这三个方法都是 ThreadPoolExecutor 的空方法,可在子类中对其进行自定义。
beforeExecute(Thread t, Runnable r)
在执行给定的线程的 Runnable 之前调用该方法。
由执行任务的线程调用,可用于重新初始化 ThreadLocal 或执行日志记录。
注意:若有多级子类重写,子类应该在这个方法的未尾调用
super.beforeExecute()
。afterExecute(Runnable r, Throwable t)
给定的 Runnable 执行完成时调用的方法。
由执行任务的线程调用。如果不为 null,说明出现了未捕获的 RuntimeException 或 Error 导致执行突然被终止。
注意:如果将任务动作显式或通过诸如
Submit
之类的方法包含在任务(如 FutureTask)中,则这些任务对象会捕获并维护计算异常,因此它们不会导致突然终止,并且内部异常不会传递给此方法。如果想使用此方法捕获两种类型的失败,可在子类中打印直接原因或潜在异常,如下示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19class ExtendedExecutor extends ThreadPoolExecutor {
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
}}terminated()
当线程池被关闭时调用。
注意:若有多级子类重写,子类应该在此方法内调用
super.terminated()
。
其它参数
线程池 ThreadPoolExecutor 详解与应用
http://blog.gxitsky.com/2020/02/12/Java-Thread-07-ThreadPoolExecutor/