Spring 提供了任务执行器(TaskExecutor
)来实现多线程和并发编程。使用ThreadPoolTaskExecutor
可实现一个基于线程池的TaskExecutor。
要实现任务异步执行,在配置为中添加@EnableAsync
开启对异步任务的支持,在执行Bean
的方法中使用@Async
注解来声明一个异步方法。
注意:创建本地实例(new
)调用@Async
注解的方法是,异步执行是不起效的。实例必须在@Configuration
类中创建或由@ComponentScan
扫描,意味必须是 Spring Bean 之间调用异步方法才会生效。
定义异步线程池
创建定义异步线程池的配置类,继承 AsyncConfigurer
,重写 getAsyncExecutor
方法,返回 ThreadPoolTaskExecutor
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
@Configuration @EnableAsync public class AsyncTaskExecutePool implements AsyncConfigurer { private Logger logger = LoggerFactory.getLogger(AsyncTaskExecutePool.class);
@Override public Executor getAsyncExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(50); executor.setKeepAliveSeconds(60); executor.setQueueCapacity(10000); executor.setThreadNamePrefix("...."); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; }
@Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){ return new AsyncUncaughtExceptionHandler() { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { logger.error(ex.getMessage() + "--------" + ex); logger.error("exception method:" + method.getName()); } };
} }
|
AsyncConfigurer:为异步执行提供配置。源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @Configuration public abstract class AbstractAsyncConfiguration implements ImportAware {
@Nullable protected AnnotationAttributes enableAsync;
@Nullable protected Supplier<Executor> executor;
@Nullable protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
@Override public void setImportMetadata(AnnotationMetadata importMetadata) { this.enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false)); if (this.enableAsync == null) { throw new IllegalArgumentException( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); } }
@Autowired(required = false) void setConfigurers(Collection<AsyncConfigurer> configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer::getAsyncExecutor; this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; }
}
|
Spring 项目,如果没有自定义异步线程池,Spring 会创建一个 SimpleAsyncTaskExecutor
并使用它。
AsyncExecutionInterceptor:AOP 实现的异步执行拦截器。源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
|
@Override @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }
|
Spring Boot 项目,在启动时会自动配置一个 ThreadPoolTaskExecutor
Bean,beanName 为 applicationTaskExecutor
和 taskExecutor
。源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
|
@ConditionalOnClass(ThreadPoolTaskExecutor.class) @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(TaskExecutionProperties.class) public class TaskExecutionAutoConfiguration {
public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";
@Bean @ConditionalOnMissingBean public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) { TaskExecutionProperties.Pool pool = properties.getPool(); TaskExecutorBuilder builder = new TaskExecutorBuilder(); builder = builder.queueCapacity(pool.getQueueCapacity()); builder = builder.corePoolSize(pool.getCoreSize()); builder = builder.maxPoolSize(pool.getMaxSize()); builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); builder = builder.keepAlive(pool.getKeepAlive()); Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator); builder = builder.taskDecorator(taskDecorator.getIfUnique()); return builder; }
@Lazy @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME, AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME }) @ConditionalOnMissingBean(Executor.class) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); }
}
|
此 ThreadPoolTaskExecutor
使用 TaskExecutionProperties 设置线程池参数,可以通过 spring.task.execution
前缀配置参数值。如下:
1 2 3 4 5 6
| spring.task.execution.pool.core-size=8 spring.task.execution.pool.max-size=200 spring.task.execution.pool.keep-alive=60s spring.task.execution.pool.queue-capacity=10240 spring.task.execution.pool.allow-core-thread-timeout=true spring.task.execution.thread-name-prefix=Async--
|
当线程数量高于线程池的处理速度时,任务会被缓存到本地的队列中,如果超过队列容量,就会执行拒绝策略。Spring Boot 自动配置的ThreadPoolTaskExecutor
的拒绝策略没有属性配置项
,使用的是默认的拒绝策略 AbortPolicy。通常会配置以下 2 种拒绝策略:
- AbortPolicy:直接抛出 RejectedExecutionException 异常。
- CallerRunsPolicy:主线程直接执行该任务(同步了),执行完后尝试将下一个任务添加到线程池,这样可以有效降低线程池内添加任务的速度。
建议使用 CallerRunsPolicy 策略,任务不会丢弃。因为当任务满后,如果直接抛异常,这个任务就会被丢弃。
@EnableAsync
在配置类添加注解 @EnableAsync 开启异步任务支持。
@Async声明异步
@Async
作用在方法上表示该方法是异步的;如果作用在类上,则表示该类下的所有方法都是异步的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service;
@Service public class AsyncTaskService { @Async public void executeAsyncTask(Integer i) { System.out.println("执行异步任务:" + i); } @Async public void executeAsyncTaskPlus(Integer i) { System.out.println("执行异步任务+1:" + (i+1)); } }
|
运行测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class TaskMain { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TaskExecutorConfig.class); AsyncTaskService asyncTaskService = context.getBean(AsyncTaskService.class); for (int i = 0; i < 10; i++) { asyncTaskService.executeAsyncTask(i); asyncTaskService.executeAsyncTaskPlus(i); } context.close(); } }
|
执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| 执行异步任务+1:1 执行异步任务+1:3 执行异步任务:0 执行异步任务+1:4 执行异步任务:4 执行异步任务+1:5 执行异步任务:5 执行异步任务+1:6 执行异步任务:6 执行异步任务+1:7 执行异步任务:7 执行异步任务:3 执行异步任务:8 执行异步任务+1:9 执行异步任务:9 执行异步任务+1:10 执行异步任务:1 执行异步任务+1:8 执行异步任务+1:2 执行异步任务:2
|
相关参考
- Sping:Creating Asynchronous Methods
- Spring Doc EnableAsync:Annotation Type EnableAsync