Java Executor框架详解与应用

Java 的线程即是工作单元,也是执行机制。从 JDK 5 开始,把工作单元与执行机制分离开来。工作单元包括 RunnableCallable,而执行机制由 Executor 框架提供。

Executor 提供了一种将任务提交与每个任务如何运行的机制分离的方法(包括线程使用,调度的详细信息)。

Executor框架

在 HotSpot 的线程模型中,Java 线程会被一一映射为本地操作系统线程。Java 线程启动会创建一个本地操作系统线程;当 Java 线程终止时,这个操作系统线程也会被回收。

在 Java 多线程应用中,通过 Executor 框架将若干个任务映射为固定数量的线程;底层操作系统将这些线程映射到处理器上。Executor 控制上层的调度,下层由操作系统内核调度且不受上层应用控制。

框架结构

组成部分

Executor 框架主要由 3 大部分组成:

  • 任务:包括被执行任务需要实现的接口:Runnable 接口 或 Callable 接口。
  • 任务的执行:包括任务执行机制的核心接口 Executor 及子接口 ExecutorService。两个实现了 ExecutorService 接口的关键类 ThreadPoolExecutorScheduledThreadPoolExecutor
  • 异步计算的结果:包括 Future 及其实现类 FutureTask

类与接口

Executor 框架主要的类与接口:

  • Executor:这是个根接口,框架的基础,只有一个 execute() 方法,将任务提交与任务执行分离。
  • ThreadPoolExecutor:线程池的核心类,用于执行被提交的任务。
  • ScheduledThreadPoolExecutor:继承自 ThreadPoolExecutor,可以定时执行任务(延时任务)。
  • Future 接口及其实现类 FutureTask:表示异步计算的结果。
  • Runaable 接口和 Callable 接口的实现类:都可以被 ThreadPoolExecutorScheduledThreadPoolExecutor 执行。

Executor框架主要接口及类

使用示意

下图 Executor 框架的使用示意图。

Executor框架使用示意图

  1. 主线程首先创建实现 RunaableCallable 接口的任务对象。工具类 Executors 可以把一个 Runaable 对象封装为一个 Callable 对象。

    Executors 是个工具类,内部方法都是静态的。

    1
    2
    Executors.callable(Runnable task)
    Executors.callable(Runnable task, T result)
  2. 然后可以把 Runaable 对象 或 Callable 对象直接交给 ExecutorService 执行。

    ExecutorService 是个接口,继承自 Executor 接口,内部方法由实现类实现。

    1
    2
    3
    4
    void execute(Runnable command);	//父接口 Executor 的方法
    Future<?> submit(Runnable task);
    Future<T> submit(Runnable task, T result);
    Future<T> submit(Callable<T> task);
  3. 如果是通过 submit() 方法提交,返回 Future类型对象(目前只有一个 FutureTask 对象)。由于 FutureTask 实现了 Runnable ,也可以创建 FutureTask,然后直接交给 ExecutorService 执行。

    1
    2
    3
    4
    5
    6
    public interface RunnableFuture<V> extends Runnable, Future<V> {
    //......
    }
    public class FutureTask<V> implements RunnableFuture<V> {
    //......
    }

    最后主线程可以执行 FutureTask.get() 方法来获取任务完成的结果(如果正在执行还未完成,则等待(阻塞)),也可以执 ``FutureTask.cancel(boolean mayInterruptIfRunning)` 来取消此任务的执行。

框架成员

Executor 框架的主要成员:ThreadPoolExecutor 类,ScheduledThreadPoolExecutor 类,Future 接口,Runnable 接口,Callable 接口 和 工具类 Executors

Executors 是个工厂工具类,该类中定义了 Executor,ExecutorService,ScheduledExecutorService,ThreadFactoryCallable 类的工厂和实用方法。可以使用此工具类快速创建线程池,线程工厂。

  • ScheduledThreadPoolExecutor

    可以使用工厂工具类 Executors 快速创建 2 种类型的 ScheduledThreadPoolExecutor :ScheduledThreadPool,SingleThreadScheduledExecutor

  • Future 接口及其实现类 FutureTask

    表示异步计算的结果

ThreadPoolExecutor

可以使用工厂工具类 Executors 快速创建 3 种类型的 ThreadPoolExecutor 对象:FixedThreadPool,SingleThreadExecutor,CachedThreadPool

FixedThreadPool

创建一个使用无界阻塞队列固定线程数量(核心线程数 corePoolSize 与最大线程数 maximumPoolSize 一样)的线程池,空闲线程会被 立即 终止。

1
2
3
4
5
6
7
8
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
}

注意:FixedThreadPool 队列使用的是 LinkedBlockingQueue 无界队列,默认最大值是 Integer.MAX_VALUE,相当于不会拒绝任务(不会调用 RejectExecutionHandler.rejectedExecution 方法),要防止大量的等待任务导致 内存耗尽

SingleThreadExecutor

创建一个使用无界阻塞队列只有 1 个线程的线程池(核心线程数 corePoolSize 与最大线程数 maximumPoolSize 一样)的线程池,空闲线程会被 立即 终止。

1
2
3
4
5
6
7
8
9
10
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory));
}

在任何时间,活动任务不超过一个,如果该单线程在执行任务期间因执行失败而终止,则创建 1 个新的线程来执行后续新的任务。

适用于需要保证顺序地执行各个任务;且在任意的时间点,不会有多个线程是活动的应用场景。

CachedThreadPool

创建一个大小无界的线程池,核心线程数 corePoolSize0,最大线程数 maximumPoolSize 为 **Integer.MAX_VALUE;空闲时间 60 秒,即不维护核心线程,如果所有线程都空闲超过 60 秒都会被移除;使用 SynchronousQueue 队列,不保存元素的阻塞队列,即有任务进来会马上提交执行。

1
2
3
4
5
6
7
8
9
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory);
}

CachedThreadPool 的 maximumPoolSize 是无界的。这意味着,如果主线程提交任务的速度高于池中线程的处理速度,就会不断创建线程,极端情况下 CachedThreadPool 会因为创建过多的线程而耗尽 CPU 和内存资源。

适用于执行很多的短期异步任务的小应用,或者是负载较轻的服务器。

注意

在《阿里巴巴Java开发手册》中,并发处理强制不允许使用 Executors 创建线程池,原因也是明确的。

1
2
3
4
5
6
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

所以在使用时,应显式调用 ThreadPoolExecutor 的构造方法来创建线程池,通常会自定义一个创建线程池的工作类来使用:Utils-线程池和线程数管理工具类

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor ,实现了 ScheduledExecutorService 接口,主要用来在给定的 延迟后运行定期执行

  • ThreadPoolExecutor:提供了 execute() 和 submit() 方法提交异步任务的基础功能。
  • ScheduledExecutorService:提供了延时执行或周期执行任务的功能。

ScheduledThreadPoolExecutor 类的 UML 图如下:

ScheduledThreadPoolExecutor UML 图

ScheduledExecutorService 接口:

ScheduledThreadPoolExecutor 实现了此接口的 4 个方法,定义了可 延时执行按周期执行 异步任务的特有功能。

注意:ScheduledExecutorService接口下的方法需要使用 try....catch....捕获和吞掉异常,若抛出异常会导致定时任务终断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 给定的延迟后运行或按周期执行任务
*/
public interface ScheduledExecutorService extends ExecutorService {

/**
* 创建并执行一次操作,该操作在给定的延迟后启用。
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

/**
* 创建并执行ScheduledFuture,该ScheduledFuture在给定的延迟后变为启用状态。
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

/**
* 创建并执行一个周期性操作,该操作将在给定的初始延迟后首冷饮启用,然后按周期启用;
* 即将在initialDelay,(initialDelay + period),(initialDelay + 2 * period)开始执行。
* 如果任务在执行时遇到异常,则将禁止后续执行,否则将仅通过取消或终止执行程序而终止。
* 如果此任务的执行耗时超过间隔时间,则后续执行可能会开始得较晚,但不会同时执行。
*
* [period]:两次执行之间的时间间隔(本次开始时间到下次开始时间的间隔)
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit);

/**
* 创建并执行一个周期性操作,该操作在给定的初始延迟之后首次启用, 然后按周期启用;
*
* [delay]:延迟时间, 即本次执行结束后到下次开始之间的时间
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit);

}

运行机制

所有的构造方法都是调用的是父类 ThreadPoolExecutor 的构造方法,传入的 maximumPoolSizeInteger.MAX_VALUE,即传入的延迟工作队列 DelayedWorkQueue 是个无界阻塞队列。

ScheduledThreadPoolExecutor 的执行主要分为两大部分。

  • 当调用 scheduleAtFixedRate() 方法或者 scheduleWithFixedDelay() 方法时,会向 DelayedWorkQueue 添加一个实现了 RunnableScheduledFuture 接口的 ScheduledFutureTask。实际是将提交的任务转换成 ScheduledFutureTask 类。源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
    long delay,
    TimeUnit unit) {
    if (callable == null || unit == null)
    throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
    new ScheduledFutureTask<V>(callable,
    triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
    }
  • delayedExecute() 方法是执行延迟任务和定期任务的主要方法,如果关闭了线程池,则拒绝任务。否则,将任务添加到队列中,并在 必要时 启动线程来运行任务(线程池中的线程从 DelayedWorkQueue 中获取 ScheduledFutureTask,然后执行)。

ScheduledFutureTask

ScheduledFutureTask 调度任务 3 个主要成员变量。

  • time:任务将要被执行的具体时间。
  • period:任务执行的间隔周期。
  • sequenceNumber:任务被添加到 ScheduledThreadPoolExecutor 中的序号。

ScheduledFutureTask 任务执行需要调用 run() 方法。 ScheduledFutureTask 继承了实现了 Runnable 接口的run() 方法的 FutureTask 类,重写了 run() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void run() {
//判断是否周期性任务
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
//不是,直接调用run方法执行
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
//重设下次执行该任务的时间
setNextRunTime();
//重复执行任务,将下次要执行的任务加入到DelayedWorkQueue
reExecutePeriodic(outerTask);
}
}

从上可以看出,ScheduledFutureTask最主要的功能是根据当前任务是否周期性的,来对异步任务进行封装。

如果不是周期性任务(调用schedule方法)则直接通过run()执行;若是周期性任务,则在每一次执行完后,重设下一次执行的时间,然后将下一次任务继续放入到阻塞队列中。

DelayedWorkQueue

DelayedWorkQueue 是 ScheduledThreadPoolExecutor 中非常重要的内部类。

DelayedWorkQueue 是一个基于堆的数据结构,类似于 DelayQueue 和 PriorityQueu 中的数据结构,每个ScheduledFutureTask 也将其索引记录到堆数组中(ScheduledFutureTask 和 ScheduledFutureTask 都有个堆索引属性 int heapIndex,heapIndex 具有唯一性,同一个 ScheduledFutureTask 在队列中最多只能出现一次)。

执行 延时任务 时需要从队列中取出最近要执行的任务,所以队列中的任务出队时,一定要最近要执行的队列最靠前,所以 DelayQueue 自然要使用优先级队列,按照执行时间的升序来排列,将执行时间越近的任务放在队列的队头位置。详细可参考 DelayedWorkQueue

优先级队列通常通过数组来实现,DelayedWorkQueue 属性:

1
2
3
4
5
6
7
8
9
10
// 队列初始容量
private static final int INITIAL_CAPACITY = 16;
// 创建指定初始容量,RunnableScheduledFuture类型的数组
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// leader 线程,队列头等待的任务线程
private Thread leader = null;
//当新的任务在队列头可用或新的线程可能需要成为领导者时,会发出条件信号。
private final Condition available = lock.newCondition();

FutureTask

FutureTask 是可以取消任务,可查看计算是否完成,可获取执行结果的异步计算。

FutureTask 提供 Future的基本实现,其中包含 启动取消 计算,查询计算是否完成 及 检索计算结果的方法。只有在计算完成后才能检索结果,如果计算尚未完成,则get()方法将阻塞。一旦计算完成,就不能重新开始或取消计算(除非使用 runAndReset() 调用计算)。

FutureTask 可以用于包装 CallableRunnable 对象。由于 FutureTask 实现的 RunnableFuture 接口继承自 Runaable 接口 和 Future 接口,因此 FutureTask 可以交给 Exceutor 执行,也可由调用线程直接执行(FutureTask.run())。还可使用 ExecutorService.submit(...) 方法会返回一个 FutureTask,然后执行 FutureTast.get() 方法 或 FutureTask.cancel(...)

除了用作独立类之外,此类还提供了 protected 功能,这些功能在创建自定义任务类时可能会很有用。

FutureTask状态

  • FutureTask 有 7 种状态:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private volatile int state;
    // 一个新的任务,初始状态
    private static final int NEW = 0;
    // 任务完成期间,正在设置结果时的状态,临时状态
    private static final int COMPLETING = 1;
    // 任务正常结束
    private static final int NORMAL = 2;
    // 任务因异常而结束
    private static final int EXCEPTIONAL = 3;
    // 任务未执行前设置 cancel(true) 方法后的状态
    private static final int CANCELLED = 4;
    // 任务完成期间,调用了cancel(true) 方法中断执行的临时状态,后转为 INTERRUPTED 状态
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED = 6;
  • FutureTask 状态迁移

    1
    2
    3
    4
    5
    # 可能的状态迁移
    NEW -> COMPLETING -> NORMAL
    NEW -> COMPLETING -> EXCEPTIONAL
    NEW -> CANCELLED
    NEW -> INTERRUPTING -> INTERRUPTED

FutureTask使用

  • FutureTask 竞争等待

    当一个线程需要等待另一个线程把某个任务执行完后它才能继承执行,此时可使用 FutureTask。当有多个线程同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完后才能继续执行。

    示例代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    public class CreateThread {
    //任务缓存
    private final ConcurrentHashMap<Object, Future<String>> taskCache = new ConcurrentHashMap<>();

    private String executionTask(final String taskName) {
    while (true) {
    Future<String> future = taskCache.get(taskName);
    if (future == null) {
    // 创建任务
    Callable<String> task = new Callable<String>() {
    @Override
    public String call() throws Exception {
    return taskName;
    }
    };
    // 包装 Callable
    FutureTask<String> futureTask = new FutureTask<>(task);
    future = taskCache.putIfAbsent(taskName, futureTask);
    if(future == null){
    future = futureTask;
    // 执行任务
    futureTask.run();
    }
    }

    try {
    // 获取计算结果
    return future.get();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }

    }
    }
    }

    上面代码执行示意图:

    代码执地示意图

    当两个线程试图同时执行同一个任务时,如果Thread 1 执行 1.3 后 Thread 2 执行2.1,那么接下来 Thread 2 将在 2.2等待,直到 Thread 1 执行完 1.4 后 Thread 2 才能从 2.2(FutureTask.get())返回。

  • FutureTask 直接运行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 子类实现Callable, 或使用内部类
    Callable,<String> task = new Callable<String>() {
    @Override
    public String call() throws Exception {
    return taskName;
    }
    };
    // 可以包装 Runaable 和 Callable 对象
    FutureTask<String> futureTask = new FutureTask<>(task);
    futureTask.get();
  1. 使用 ExecutorService 的 submit(...) 方法会返回一个 FutureTask

    示列一:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 子类实现Callable, 或使用内部类
    Callable<String> task = new Callable<String>() {
    @Override
    public String call() throws Exception {
    return taskName;
    }
    };
    // 显式创建线程池
    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(8, 17, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000));
    // 使用 ExecutorService.submit(...) 方法会返回一个 FutureTask
    // ExecutorService.submit(...) 方法由 AbstractExecutorService 抽象类实现
    // ThreadPoolExecutor 继承了 AbstractExecutorService
    Future<String> future = poolExecutor.submit(task);
    future.get();

    示例二:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 子类实现Callable, 或使用内部类
    Callable<String> task = new Callable<String>() {
    @Override
    public String call() throws Exception {
    return taskName;
    }
    };
    // 使用工具类 Executors 创建线程池
    ExecutorService service = Executors.newFixedThreadPool(16);
    Future<String> future = service.submit(task);
    future.get();
  2. 使用 ExecutorService 的 execute() 执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 子类实现Callable, 或使用内部类
    Callable<String> task = new Callable<String>() {
    @Override
    public String call() throws Exception {
    return taskName;
    }
    };
    FutureTask<String> futureTask = new FutureTask<>(task);
    ExecutorService service = Executors.newFixedThreadPool(16);
    service.execute(futureTask);
    futureTask.get();

    注意:execute() 方法没有返回值,使用 FutureTask 的 get() 方法会一直等待结果的返回,如果 get 的调用顺序在execute 之前的话,那么程序将会停止在 get 这里,调用 get() 方法必须在 execute() 方法之后。

相关参考

  1. 深入理解Java线程池:ScheduledThreadPoolExecutor
作者

光星

发布于

2020-02-13

更新于

2024-09-05

许可协议

评论