Spring(二十一):Spring EnableAsync异步方法与任务线程池TaskExecutor

Spring 提供了任务执行器(TaskExecutor)来实现多线程和并发编程。使用ThreadPoolTaskExecutor可实现一个基于线程池的TaskExecutor

要实现任务异步执行,在配置为中添加@EnableAsync开启对异步任务的支持,在执行Bean的方法中使用@Async注解来声明一个异步方法。

注意:创建本地实例(new)调用@Async注解的方法是,异步执行是不起效的。实例必须在@Configuration 类中创建或由@ComponentScan 扫描,意味必须是 Spring Bean 之间调用异步方法才会生效。

定义异步线程池

创建定义异步线程池的配置类,继承 AsyncConfigurer,重写 getAsyncExecutor 方法,返回 ThreadPoolTaskExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* @desc: 自定义异步线程池
**/
@Configuration
@EnableAsync
public class AsyncTaskExecutePool implements AsyncConfigurer {
private Logger logger = LoggerFactory.getLogger(AsyncTaskExecutePool.class);

@Override
public Executor getAsyncExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
/*核收线程数*/
executor.setCorePoolSize(5);
/*最大线程数*/
executor.setMaxPoolSize(50);
/*线程被关闭前的空闲时间*/
executor.setKeepAliveSeconds(60);
/*队列长度*/
executor.setQueueCapacity(10000);
/*设置线程名前缀*/
executor.setThreadNamePrefix("....");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
logger.error(ex.getMessage() + "--------" + ex);
logger.error("exception method:" + method.getName());
}
};

}
}

AsyncConfigurer:为异步执行提供配置。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {

@Nullable
protected AnnotationAttributes enableAsync;

@Nullable
protected Supplier<Executor> executor;

@Nullable
protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;


@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
this.enableAsync = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
if (this.enableAsync == null) {
throw new IllegalArgumentException(
"@EnableAsync is not present on importing class " + importMetadata.getClassName());
}
}

/**
* 注入 AsyncConfigurer 集合,只能有一个存在
*/
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}

}

Spring 项目,如果没有自定义异步线程池,Spring 会创建一个 SimpleAsyncTaskExecutor并使用它。

AsyncExecutionInterceptor:AOP 实现的异步执行拦截器。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 此实现在上下文中搜索唯一的 org.springframework.core.task.TaskExecutor bean,否则搜索名
* 为“taskExecutor”的 Executor bean。
* 如果两者都不可解析(例如,如果根本没有配置 BeanFactory),如果找不到默认值,则此实现回退到新创建的
* SimpleAsyncTaskExecutor 实例以供本地使用。
* 也可以看看:DEFAULT_TASK_EXECUTOR_BEAN_NAME
*/
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

Spring Boot 项目,在启动时会自动配置一个 ThreadPoolTaskExecutor Bean,beanName 为 applicationTaskExecutortaskExecutor。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* {@link EnableAutoConfiguration Auto-configuration} for {@link TaskExecutor}.
*
* @author Stephane Nicoll
* @author Camille Vienot
* @since 2.1.0
*/
@ConditionalOnClass(ThreadPoolTaskExecutor.class)
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(TaskExecutionProperties.class)
public class TaskExecutionAutoConfiguration {

/**
* Bean name of the application {@link TaskExecutor}.
*/
public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";

@Bean
@ConditionalOnMissingBean
public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,
ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
ObjectProvider<TaskDecorator> taskDecorator) {
TaskExecutionProperties.Pool pool = properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
Shutdown shutdown = properties.getShutdown();
builder = builder.awaitTermination(shutdown.isAwaitTermination());
builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
builder = builder.taskDecorator(taskDecorator.getIfUnique());
return builder;
}

// DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}

}

ThreadPoolTaskExecutor 使用 TaskExecutionProperties 设置线程池参数,可以通过 spring.task.execution前缀配置参数值。如下:

1
2
3
4
5
6
spring.task.execution.pool.core-size=8
spring.task.execution.pool.max-size=200
spring.task.execution.pool.keep-alive=60s
spring.task.execution.pool.queue-capacity=10240
spring.task.execution.pool.allow-core-thread-timeout=true
spring.task.execution.thread-name-prefix=Async--

当线程数量高于线程池的处理速度时,任务会被缓存到本地的队列中,如果超过队列容量,就会执行拒绝策略。Spring Boot 自动配置的ThreadPoolTaskExecutor的拒绝策略没有属性配置项,使用的是默认的拒绝策略 AbortPolicy。通常会配置以下 2 种拒绝策略:

  • AbortPolicy:直接抛出 RejectedExecutionException 异常。
  • CallerRunsPolicy:主线程直接执行该任务(同步了),执行完后尝试将下一个任务添加到线程池,这样可以有效降低线程池内添加任务的速度。
    建议使用 CallerRunsPolicy 策略,任务不会丢弃。因为当任务满后,如果直接抛异常,这个任务就会被丢弃。

@EnableAsync

在配置类添加注解 @EnableAsync 开启异步任务支持。

@Async声明异步

@Async作用在方法上表示该方法是异步的;如果作用在类上,则表示该类下的所有方法都是异步的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
* @Async作用在方法上表示该方法是异步的;
* 如果作用在类上,则表示该类下的所有方法都是异常的
*/
@Service
public class AsyncTaskService {

@Async
public void executeAsyncTask(Integer i) {
System.out.println("执行异步任务:" + i);
}

@Async
public void executeAsyncTaskPlus(Integer i) {
System.out.println("执行异步任务+1:" + (i+1));
}
}

运行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class TaskMain {

public static void main(String[] args) {
AnnotationConfigApplicationContext context =
new AnnotationConfigApplicationContext(TaskExecutorConfig.class);

AsyncTaskService asyncTaskService = context.getBean(AsyncTaskService.class);

for (int i = 0; i < 10; i++) {
asyncTaskService.executeAsyncTask(i);
asyncTaskService.executeAsyncTaskPlus(i);
}

context.close();
}
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
执行异步任务+1:1
执行异步任务+1:3
执行异步任务:0
执行异步任务+1:4
执行异步任务:4
执行异步任务+1:5
执行异步任务:5
执行异步任务+1:6
执行异步任务:6
执行异步任务+1:7
执行异步任务:7
执行异步任务:3
执行异步任务:8
执行异步任务+1:9
执行异步任务:9
执行异步任务+1:10
执行异步任务:1
执行异步任务+1:8
执行异步任务+1:2
执行异步任务:2

相关参考

  1. Sping:Creating Asynchronous Methods
  2. Spring Doc EnableAsync:Annotation Type EnableAsync

Spring(二十一):Spring EnableAsync异步方法与任务线程池TaskExecutor

http://blog.gxitsky.com/2018/05/15/spring-21-EnableAsync-TaskExecutor/

作者

光星

发布于

2018-05-15

更新于

2022-08-14

许可协议

评论