关于java:Spring-Boot中如何配置线程池拒绝策略妥善处理好溢出的任务

40次阅读

共计 12595 个字符,预计需要花费 32 分钟才能阅读完成。

通过之前三篇对于 Spring Boot 异步工作实现的博文,咱们别离学会了用 @Async 创立异步工作、为异步工作配置线程池、应用多个线程池隔离不同的异步工作。明天这篇,咱们持续对下面的常识进行欠缺和优化!

如果你曾经看过下面几篇内容并曾经把握之后,一起来思考上面这个问题:

假如,线程池配置为外围线程数 2、最大线程数 2、缓冲队列长度 2。此时,有 5 个异步工作同时开始,会产生什么?

场景重现

咱们先来把下面的假如用代码实现一下:

第一步:创立 Spring Boot 利用,依据下面的假如写好线程池配置。

@EnableAsync
@SpringBootApplication
public class Chapter78Application {public static void main(String[] args) {SpringApplication.run(Chapter78Application.class, args);
    }

    @EnableAsync
    @Configuration
    class TaskPoolConfig {

        @Bean
        public Executor taskExecutor1() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(2);
            executor.setMaxPoolSize(2);
            executor.setQueueCapacity(2);
            executor.setKeepAliveSeconds(60);
            executor.setThreadNamePrefix("executor-1-");
            return executor;
        }

    }

}

第二步 :用@Async 注解实现一个局部工作

@Slf4j
@Component
public class AsyncTasks {public static Random random = new Random();

    @Async("taskExecutor1")
    public CompletableFuture<String> doTaskOne(String taskNo) throws Exception {log.info("开始工作:{}", taskNo);
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("实现工作:{},耗时:{} 毫秒", taskNo, end - start);
        return CompletableFuture.completedFuture("工作实现");
    }

}

第三步:编写测试用例

@Slf4j
@SpringBootTest
public class Chapter78ApplicationTests {

    @Autowired
    private AsyncTasks asyncTasks;

    @Test
    public void test2() throws Exception {
        // 线程池配置:core-2,max-2,queue=2,同时有 5 个工作,呈现上面异样:// org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@59901c4d[Running, pool size = 2,
        // active threads = 0, queued tasks = 2, completed tasks = 4]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@408e96d9

        long start = System.currentTimeMillis();

        // 线程池 1
        CompletableFuture<String> task1 = asyncTasks.doTaskOne("1");
        CompletableFuture<String> task2 = asyncTasks.doTaskOne("2");
        CompletableFuture<String> task3 = asyncTasks.doTaskOne("3");
        CompletableFuture<String> task4 = asyncTasks.doTaskOne("4");
        CompletableFuture<String> task5 = asyncTasks.doTaskOne("5");

        // 一起执行
        CompletableFuture.allOf(task1, task2, task3, task4, task5).join();

        long end = System.currentTimeMillis();

        log.info("工作全副实现,总耗时:" + (end - start) + "毫秒");
    }

}

执行一下,能够相似上面这样的日志信息:

2021-09-22 17:33:08.159  INFO 21119 --- [executor-1-2] com.didispace.chapter78.AsyncTasks       : 开始工作:2
2021-09-22 17:33:08.159  INFO 21119 --- [executor-1-1] com.didispace.chapter78.AsyncTasks       : 开始工作:1

org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@64968732

    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)
    at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
    at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
    at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:274)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at com.didispace.chapter78.AsyncTasks$$EnhancerBySpringCGLIB$$c7e8d57b.doTaskOne(<generated>)
    at com.didispace.chapter78.Chapter78ApplicationTests.test2(Chapter78ApplicationTests.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at java.util.ArrayList.forEach(ArrayList.java:1255)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at java.util.ArrayList.forEach(ArrayList.java:1255)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@64968732 rejected from java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
    ... 74 more

从异样信息 org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: 中,能够很明确的晓得,第 5 个工作因为超过了执行线程 + 缓冲队列长度,而被回绝了。

所有,默认状况下,线程池的回绝策略是:当线程池队列满了,会抛弃这个工作,并抛出异样。

配置回绝策略

尽管线程池有默认的回绝策略,但理论开发过程中,有些业务场景,间接回绝的策略往往并不实用,有时候咱们可能会抉择舍弃最早开始执行而未实现的工作、也可能会抉择舍弃刚开始执行而未实现的工作等更贴近业务须要的策略。所以,为线程池配置其余回绝策略或自定义回绝策略是很常见的需要,那么这个要怎么实现呢?

上面就来具体说说明天的正题,如何为线程池配置回绝策略、如何自定义回绝策略。

看上面这段代码的最初一行,setRejectedExecutionHandler办法就是为线程池设置回绝策略的办法:

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

//... 其余线程池配置

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

ThreadPoolExecutor 中提供了 4 种线程的策略能够供开发者间接应用,你只须要像上面这样设置即可:

// AbortPolicy 策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

// DiscardPolicy 策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

// DiscardOldestPolicy 策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

// CallerRunsPolicy 策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

这四个策略对应的含意别离是:

  • AbortPolicy 策略:默认策略,如果线程池队列满了丢掉这个工作并且抛出 RejectedExecutionException 异样。
  • DiscardPolicy 策略:如果线程池队列满了,会间接丢掉这个工作并且不会有任何异样。
  • DiscardOldestPolicy 策略:如果队列满了,会将最早进入队列的工作删掉腾出空间,再尝试退出队列。
  • CallerRunsPolicy 策略:如果增加到线程池失败,那么主线程会本人去执行该工作,不会期待线程池中的线程去执行。

而如果你要自定义一个回绝策略,那么能够这样写:

executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 回绝策略的逻辑}
});

当然如果你喜爱用 Lamba 表达式,也能够这样写:

executor.setRejectedExecutionHandler((r, executor1) -> {// 回绝策略的逻辑});

好了,明天的学习就到这里!

如果您学习过程中如遇艰难?能够退出咱们超高品质的 Spring 技术交换群,参加交换与探讨,更好的学习与提高!更多 Spring Boot 教程能够点击中转!,欢送珍藏与转发反对!

代码示例

本文的残缺工程能够查看上面仓库中 2.x 目录下的 chapter7-8 工程:

  • Github:https://github.com/dyc87112/SpringBoot-Learning/
  • Gitee:https://gitee.com/didispace/SpringBoot-Learning/

如果您感觉本文不错,欢送 Star 反对,您的关注是我保持的能源!

欢送关注我的公众号:程序猿 DD,分享其余中央看不到的常识与思考

正文完
 0