通过之前三篇对于Spring Boot异步工作实现的博文,咱们别离学会了用@Async创立异步工作、为异步工作配置线程池、应用多个线程池隔离不同的异步工作。明天这篇,咱们持续对下面的常识进行欠缺和优化!

如果你曾经看过下面几篇内容并曾经把握之后,一起来思考上面这个问题:

假如,线程池配置为外围线程数2、最大线程数2、缓冲队列长度2。此时,有5个异步工作同时开始,会产生什么?

场景重现

咱们先来把下面的假如用代码实现一下:

第一步:创立Spring Boot利用,依据下面的假如写好线程池配置。

@EnableAsync@SpringBootApplicationpublic class Chapter78Application {    public static void main(String[] args) {        SpringApplication.run(Chapter78Application.class, args);    }    @EnableAsync    @Configuration    class TaskPoolConfig {        @Bean        public Executor taskExecutor1() {            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();            executor.setCorePoolSize(2);            executor.setMaxPoolSize(2);            executor.setQueueCapacity(2);            executor.setKeepAliveSeconds(60);            executor.setThreadNamePrefix("executor-1-");            return executor;        }    }}

第二步:用@Async注解实现一个局部工作

@Slf4j@Componentpublic class AsyncTasks {    public static Random random = new Random();    @Async("taskExecutor1")    public CompletableFuture<String> doTaskOne(String taskNo) throws Exception {        log.info("开始工作:{}", taskNo);        long start = System.currentTimeMillis();        Thread.sleep(random.nextInt(10000));        long end = System.currentTimeMillis();        log.info("实现工作:{},耗时:{} 毫秒", taskNo, end - start);        return CompletableFuture.completedFuture("工作实现");    }}

第三步:编写测试用例

@Slf4j@SpringBootTestpublic class Chapter78ApplicationTests {    @Autowired    private AsyncTasks asyncTasks;    @Test    public void test2() throws Exception {        // 线程池配置:core-2,max-2,queue=2,同时有5个工作,呈现上面异样:        // org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@59901c4d[Running, pool size = 2,        // active threads = 0, queued tasks = 2, completed tasks = 4]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@408e96d9        long start = System.currentTimeMillis();        // 线程池1        CompletableFuture<String> task1 = asyncTasks.doTaskOne("1");        CompletableFuture<String> task2 = asyncTasks.doTaskOne("2");        CompletableFuture<String> task3 = asyncTasks.doTaskOne("3");        CompletableFuture<String> task4 = asyncTasks.doTaskOne("4");        CompletableFuture<String> task5 = asyncTasks.doTaskOne("5");        // 一起执行        CompletableFuture.allOf(task1, task2, task3, task4, task5).join();        long end = System.currentTimeMillis();        log.info("工作全副实现,总耗时:" + (end - start) + "毫秒");    }}

执行一下,能够相似上面这样的日志信息:

2021-09-22 17:33:08.159  INFO 21119 --- [   executor-1-2] com.didispace.chapter78.AsyncTasks       : 开始工作:22021-09-22 17:33:08.159  INFO 21119 --- [   executor-1-1] com.didispace.chapter78.AsyncTasks       : 开始工作:1org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@64968732    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)    at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)    at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)    at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:274)    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)    at com.didispace.chapter78.AsyncTasks$$EnhancerBySpringCGLIB$$c7e8d57b.doTaskOne(<generated>)    at com.didispace.chapter78.Chapter78ApplicationTests.test2(Chapter78ApplicationTests.java:51)    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:498)    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)    at java.util.ArrayList.forEach(ArrayList.java:1255)    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)    at java.util.ArrayList.forEach(ArrayList.java:1255)    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@64968732 rejected from java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)    ... 74 more

从异样信息org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: 中,能够很明确的晓得,第5个工作因为超过了执行线程+缓冲队列长度,而被回绝了。

所有,默认状况下,线程池的回绝策略是:当线程池队列满了,会抛弃这个工作,并抛出异样。

配置回绝策略

尽管线程池有默认的回绝策略,但理论开发过程中,有些业务场景,间接回绝的策略往往并不实用,有时候咱们可能会抉择舍弃最早开始执行而未实现的工作、也可能会抉择舍弃刚开始执行而未实现的工作等更贴近业务须要的策略。所以,为线程池配置其余回绝策略或自定义回绝策略是很常见的需要,那么这个要怎么实现呢?

上面就来具体说说明天的正题,如何为线程池配置回绝策略、如何自定义回绝策略。

看上面这段代码的最初一行,setRejectedExecutionHandler办法就是为线程池设置回绝策略的办法:

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//...其余线程池配置executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

ThreadPoolExecutor中提供了4种线程的策略能够供开发者间接应用,你只须要像上面这样设置即可:

// AbortPolicy策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());// DiscardPolicy策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());// DiscardOldestPolicy策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());// CallerRunsPolicy策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

这四个策略对应的含意别离是:

  • AbortPolicy策略:默认策略,如果线程池队列满了丢掉这个工作并且抛出RejectedExecutionException异样。
  • DiscardPolicy策略:如果线程池队列满了,会间接丢掉这个工作并且不会有任何异样。
  • DiscardOldestPolicy策略:如果队列满了,会将最早进入队列的工作删掉腾出空间,再尝试退出队列。
  • CallerRunsPolicy策略:如果增加到线程池失败,那么主线程会本人去执行该工作,不会期待线程池中的线程去执行。

而如果你要自定义一个回绝策略,那么能够这样写:

executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {    @Override    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {        // 回绝策略的逻辑    }});

当然如果你喜爱用Lamba表达式,也能够这样写:

executor.setRejectedExecutionHandler((r, executor1) -> {    // 回绝策略的逻辑});

好了,明天的学习就到这里!

如果您学习过程中如遇艰难?能够退出咱们超高品质的Spring技术交换群,参加交换与探讨,更好的学习与提高!更多Spring Boot教程能够点击中转!,欢送珍藏与转发反对!

代码示例

本文的残缺工程能够查看上面仓库中2.x目录下的chapter7-8工程:

  • Github:https://github.com/dyc87112/SpringBoot-Learning/
  • Gitee:https://gitee.com/didispace/SpringBoot-Learning/

如果您感觉本文不错,欢送Star反对,您的关注是我保持的能源!

欢送关注我的公众号:程序猿DD,分享其余中央看不到的常识与思考