Spring Boot使用@Async实现异步调用:ThreadPoolTaskScheduler线程池的优雅关闭

2021年09月15日 阅读数:1
这篇文章主要向大家介绍Spring Boot使用@Async实现异步调用:ThreadPoolTaskScheduler线程池的优雅关闭,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

上周发了一篇关于Spring Boot中使用@Async来实现异步任务和线程池控制的文章:《Spring Boot使用@Async实现异步调用:自定义线程池》。因为最近身边也发现了很多异步任务没有正确处理而致使的很多问题,因此在本文就接前面内容,继续说说线程池的优雅关闭,主要针对ThreadPoolTaskScheduler线程池。java

问题现象

在上篇文章的例子Chapter4-1-3中,咱们定义了一个线程池,而后利用@Async注解写了3个任务,并指定了这些任务执行使用的线程池。在上文的单元测试中,咱们没有具体说说shutdown相关的问题,下面咱们就来模拟一个问题现场出来。redis

第一步:如前文同样,咱们定义一个ThreadPoolTaskScheduler线程池:spring

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @EnableAsync
    @Configuration
    class TaskPoolConfig {

        @Bean("taskExecutor")
        public Executor taskExecutor() {
        //定义一个线程池,并配置线程池的一些参数
            ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
            executor.setPoolSize(20);
            executor.setThreadNamePrefix("taskExecutor-");
            return executor;
        }

    }

}

第二步:改造以前的异步任务,让它依赖一个外部资源,好比:Redisapache

@Slf4j
@Component
public class Task {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Async("taskExecutor") //指定使用名为taskExecutor的线程池来执行该任务
    public void doTaskOne() throws Exception {
        log.info("开始作任务一");
        long start = System.currentTimeMillis();
        log.info(stringRedisTemplate.randomKey());
        long end = System.currentTimeMillis();
        log.info("完成任务一,耗时:" + (end - start) + "毫秒");
    }

    @Async("taskExecutor") //指定使用名为taskExecutor的线程池来执行该任务
    public void doTaskTwo() throws Exception {
        log.info("开始作任务二");
        long start = System.currentTimeMillis();
        log.info(stringRedisTemplate.randomKey());
        long end = System.currentTimeMillis();
        log.info("完成任务二,耗时:" + (end - start) + "毫秒");
    }

    @Async("taskExecutor") //指定使用名为taskExecutor的线程池来执行该任务
    public void doTaskThree() throws Exception {
        log.info("开始作任务三");
        long start = System.currentTimeMillis();
        log.info(stringRedisTemplate.randomKey());
        long end = System.currentTimeMillis();
        log.info("完成任务三,耗时:" + (end - start) + "毫秒");
    }

}

注意:这里省略了pom.xml中引入依赖和配置redis的步骤安全

第三步:修改单元测试,模拟高并发状况下ShutDown的状况:springboot

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ApplicationTests {

    @Autowired
    private Task task; //Task类在上面已经作了定义了

    @Test
    @SneakyThrows//@SneakyThrows的用法比较简单,其实就是对于异常的一个整理,将checked exception 看作unchecked exception, 不处理,直接扔掉。 减小了处处写catch的不便利性。好比在线程中,catch全部异常,再好比在一些不太可能发生异常的地方,可是你又必须cache checked exception的地方使用这个annotation会显得代码比较规整,易读。或许也会显得高大上一点吧
    public void test() {

        for (int i = 0; i < 10000; i++) {
            task.doTaskOne(); 
            task.doTaskTwo();
            task.doTaskThree();

            if (i == 9999) {
                System.exit(0);
            }
        }
    }

}

说明:经过for循环往上面定义的线程池中提交任务,因为是异步执行,在执行过程当中,利用System.exit(0)来关闭程序,此时因为有任务在执行,就能够观察这些异步任务的销毁与Spring容器中其余资源的顺序是否安全。markdown

第四步:运行上面的单元测试,咱们将碰到下面的异常内容。并发

org.springframework.data.redis.RedisConnectionFailureException: Cannot get Jedis connection; nested exception is redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
	at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:204) ~[spring-data-redis-1.8.10.RELEASE.jar:na]
	at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.getConnection(JedisConnectionFactory.java:348) ~[spring-data-redis-1.8.10.RELEASE.jar:na]
	at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:129) ~[spring-data-redis-1.8.10.RELEASE.jar:na]
	at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:92) ~[spring-data-redis-1.8.10.RELEASE.jar:na]
	at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:79) ~[spring-data-redis-1.8.10.RELEASE.jar:na]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:194) ~[spring-data-redis-1.8.10.RELEASE.jar:na]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:169) ~[spring-data-redis-1.8.10.RELEASE.jar:na]
	at org.springframework.data.redis.core.RedisTemplate.randomKey(RedisTemplate.java:781) ~[spring-data-redis-1.8.10.RELEASE.jar:na]
	at com.didispace.async.Task.doTaskOne(Task.java:26) ~[classes/:na]
	at com.didispace.async.Task$$FastClassBySpringCGLIB$$ca3ff9d6.invoke(<generated>) ~[classes/:na]
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:738) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:115) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_151]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.8.0_151]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_151]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_151]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
	at redis.clients.util.Pool.getResource(Pool.java:53) ~[jedis-2.9.0.jar:na]
	at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226) ~[jedis-2.9.0.jar:na]
	at redis.clients.jedis.JedisPool.getResource(JedisPool.java:16) ~[jedis-2.9.0.jar:na]
	at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:194) ~[spring-data-redis-1.8.10.RELEASE.jar:na]
	... 19 common frames omitted
Caused by: java.lang.InterruptedException: null
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) ~[na:1.8.0_151]
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088) ~[na:1.8.0_151]
	at org.apache.commons.pool2.impl.LinkedBlockingDeque.pollFirst(LinkedBlockingDeque.java:635) ~[commons-pool2-2.4.3.jar:2.4.3]
	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:442) ~[commons-pool2-2.4.3.jar:2.4.3]
	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:361) ~[commons-pool2-2.4.3.jar:2.4.3]
	at redis.clients.util.Pool.getResource(Pool.java:49) ~[jedis-2.9.0.jar:na]
	... 22 common frames omitted

如何解决

缘由分析

从异常信息JedisConnectionException: Could not get a resource from the pool来看,咱们很容易的能够想到,在应用关闭的时候异步任务还在执行,因为Redis链接池先销毁了(解释:由于主程序执行完毕了,因此主程序里面依赖的一些bean对象,好比
@Autowired
private Task task;
都要销毁了,spring容器里的各个bean都要开始销毁了,可是异步任务它很特殊,你销毁你的,我继续执行个人,而后等到redis的bean对象也销毁了,这个时候异步任务要访问Redis的操做就报了上面的错,也许你会问,为何不先销毁 @Bean(“taskExecutor”)呢,这样的话能够那些执行的任务就能够直接终止呀? 咱们要搞清楚bean之间的依赖关系,task类里面的doTaskOne(),doTaskTwo()等等这些任务的执行都依赖于线程池,因此bean销毁顺序也应该是先销毁task对象,而后再销毁@Bean(“taskExecutor”)。因此报异常的点实际上是在“刚销毁完task对象,可是还未销毁@Bean(“taskExecutor”)对象的间隙”,还能够这样理解,主程序一旦执行完,主程序里面的task对象(注:task对象里面有redis的bean对象)立马销毁,这时候doTaskOne(),doTaskTwo()等等这些任务还在线程池(线程池是最后才销毁的)里面执行,从而访问redis的时候确定就会报异常了
),致使异步任务中要访问Redis的操做就报了上面的错。因此,咱们得出结论,上面的实现方式在应用关闭的时候是不优雅的,那么咱们要怎么作呢?dom

解决方法

要解决上面的问题很简单,Spring的ThreadPoolTaskScheduler为咱们提供了相关的配置,只须要加入以下设置便可:异步

@Bean("taskExecutor")
public Executor taskExecutor() {
    ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
    executor.setPoolSize(20);
    executor.setThreadNamePrefix("taskExecutor-");
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setAwaitTerminationSeconds(60);
    return executor;
}

说明:setWaitForTasksToCompleteOnShutdown(true)该方法就是这里的关键,用来设置线程池关闭的时候等待全部任务都完成再继续销毁其余的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁。同时,这里还设置了setAwaitTerminationSeconds(60),该方法用来设置线程池中任务的等待时间,若是超过这个时候尚未销毁就强制销毁,以确保应用最后可以被关闭,而不是阻塞住。

本文转载自:http://blog.didispace.com/springbootasync-3/