关于java:ThreadPoolExecutor八种拒绝策略对的不是4种

4次阅读

共计 7605 个字符,预计需要花费 20 分钟才能阅读完成。

送大家以下 java 学习材料,文末有支付形式

前言

=====

谈到 Java 的线程池最相熟的莫过于 ExecutorService 接口了,jdk1.5 新增的 java.util.concurrent 包下的这个 api,大大的简化了多线程代码的开发。而不管你用 FixedThreadPool 还是 CachedThreadPool 其背地实现都是 ThreadPoolExecutor。

ThreadPoolExecutor 是一个典型的缓存池化设计的产物,因为池子有大小,当池子体积不够承载时,就波及到回绝策略。JDK 中曾经预设了 4 种线程池回绝策略,上面联合场景具体聊聊这些策略的应用场景,以及咱们还能扩大哪些回绝策略。

池化设计思维

池话设计应该不是一个新名词。咱们常见的如 Java 线程池、JDBC 连接池、Redis 连接池等就是这类设计的代表实现。这种设计会初始预设资源,解决的问题就是对消每次获取资源的耗费,如创立线程的开销,获取近程连贯的开销等。

就好比你去食堂打饭,打饭的大妈会先把饭盛好几份放那里,你来了就间接拿着饭盒加菜即可,不必再长期又盛饭又打菜,效率就高了。

除了初始化资源,池化设计还包含如下这些特色:池子的初始值、池子的沉闷值、池子的最大值等,这些特色能够间接映射到 Java 线程池和数据库连接池的成员属性中。

线程池触发回绝策略的机会

和数据源连接池不一样,线程池除了初始大小和池子最大值,还多了一个阻塞队列来缓冲。数据源连接池个别申请的连接数超过连接池的最大值的时候就会触发回绝策略,策略个别是阻塞期待设置的工夫或者间接抛异样。而线程池的触发机会如下图:

如图,想要理解线程池什么时候触发回绝粗略,须要明确下面三个参数的具体含意,是这三个参数总体协调的后果,而不是简略的超过最大线程数就会触发线程回绝粗略,当提交的工作数大于 corePoolSize 时,会优先放到队列缓冲区,只有填满了缓冲区后,才会判断以后运行的工作是否大于 maxPoolSize,小于时会新建线程解决。大于时就触发了回绝策略,总结就是:以后提交工作数大于(maxPoolSize + queueCapacity)时就会触发线程池的回绝策略了。

JDK 内置 4 种线程池回绝策略

回绝策略接口定义

在剖析 JDK 自带的线程池回绝策略前,先看下 JDK 定义的 回绝策略接口,如下:

1public interface RejectedExecutionHandler {2    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
3}

接口定义很明确,当触发回绝策略时,线程池会调用你设置的具体的策略,将以后提交的工作以及线程池实例自身传递给你解决,具体作何解决,不同场景会有不同的思考,上面看 JDK 为咱们内置了哪些实现:

CallerRunsPolicy(调用者运行策略)

 1    public static class CallerRunsPolicy implements RejectedExecutionHandler {
 2
 3        public CallerRunsPolicy() {}
 4
 5        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {6            if (!e.isShutdown()) {7                r.run();
 8            }
 9        }
10    }

性能:当触发回绝策略时,只有线程池没有敞开,就由提交工作的以后线程解决。

应用场景:个别在不容许失败的、对性能要求不高、并发量较小的场景下应用,因为线程池个别状况下不会敞开,也就是提交的工作肯定会被运行,然而因为是调用者线程本人执行的,当屡次提交工作时,就会阻塞后续工作执行,性能和效率天然就慢了。

AbortPolicy(停止策略)

 1    public static class AbortPolicy implements RejectedExecutionHandler {
 2
 3        public AbortPolicy() {}
 4
 5        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {6            throw new RejectedExecutionException("Task" + r.toString() +
 7                                                 "rejected from" +
 8                                                 e.toString());
 9        }
10    }

性能:当触发回绝策略时,间接抛出拒绝执行的异样,停止策略的意思也就是打断以后执行流程

应用场景:这个就没有非凡的场景了,然而一点要正确处理抛出的异样。ThreadPoolExecutor 中默认的策略就是 AbortPolicy,ExecutorService 接口的系列 ThreadPoolExecutor 因为都没有显示的设置回绝策略,所以默认的都是这个。然而请留神,ExecutorService 中的线程池实例队列都是无界的,也就是说把内存撑爆了都不会触发回绝策略。当本人自定义线程池实例时,应用这个策略肯定要解决好触发策略时抛的异样,因为他会打断以后的执行流程。

DiscardPolicy(抛弃策略)

1    public static class DiscardPolicy implements RejectedExecutionHandler {
2
3        public DiscardPolicy() {}
4
5        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {6}
7    }

性能:间接静悄悄的抛弃这个工作,不触发任何动作

应用场景:如果你提交的工作无关紧要,你就能够应用它。因为它就是个空实现,会悄无声息的吞噬你的的工作。所以这个策略基本上不必了

DiscardOldestPolicy(弃老策略)

 1    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
 2
 3        public DiscardOldestPolicy() {}
 4
 5        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {6            if (!e.isShutdown()) {7                e.getQueue().poll();
 8                e.execute(r);
 9            }
10        }
11    }

性能:如果线程池未敞开,就弹出队列头部的元素,而后尝试执行

应用场景:这个策略还是会抛弃工作,抛弃时也是毫无声息,然而特点是抛弃的是老的未执行的工作,而且是待执行优先级较高的工作。基于这个个性,我能想到的场景就是,公布音讯,和批改音讯,当音讯公布进来后,还未执行,此时更新的音讯又来了,这个时候未执行的音讯的版本比当初提交的音讯版本要低就能够被抛弃了。因为队列中还有可能存在音讯版本更低的音讯会排队执行,所以在真正解决音讯的时候肯定要做好音讯的版本比拟

第三方实现的回绝策略

Dubbo 中的线程回绝策略

 1public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
 2
 3    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
 4
 5    private final String threadName;
 6
 7    private final URL url;
 8
 9    private static volatile long lastPrintTime = 0;
10
11    private static Semaphore guard = new Semaphore(1);
12
13    public AbortPolicyWithReport(String threadName, URL url) {
14        this.threadName = threadName;
15        this.url = url;
16    }
17
18    @Override
19    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
20        String msg = String.format("Thread pool is EXHAUSTED!" +
21                        "Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
22                        "Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
23                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
24                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
25                url.getProtocol(), url.getIp(), url.getPort());
26        logger.warn(msg);
27        dumpJStack();
28        throw new RejectedExecutionException(msg);
29    }
30
31    private void dumpJStack() {
32       // 省略实现
33    }
34}

能够看到,当 dubbo 的工作线程触发了线程回绝后,次要做了三个事件,准则就是尽量让使用者分明触发线程回绝策略的实在起因

  • 输入了一条正告级别的日志,日志内容为线程池的具体设置参数,以及线程池以后的状态,还有以后回绝工作的一些详细信息。能够说,这条日志,应用 dubbo 的有过生产运维教训的或多或少是见过的,这个日志几乎就是日志打印的榜样,其余的日志打印的榜样还有 spring。得益于这么具体的日志,能够很容易定位到问题所在
  • 输入以后线程堆栈详情,这个太有用了,当你通过下面的日志信息还不能定位问题时,案发现场的 dump 线程上下文信息就是你发现问题的救命稻草,这个能够参考《dubbo 线程池耗尽事件 -“CyclicBarrier 惹的祸 ”》
  • 持续抛出拒绝执行异样,使本次工作失败,这个继承了 JDK 默认回绝策略的个性

Netty 中的线程池回绝策略

 1    private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {2        NewThreadRunsPolicy() {3            super();
 4        }
 5
 6        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
 7            try {8                final Thread t = new Thread(r, "Temporary task executor");
 9                t.start();
10            } catch (Throwable e) {
11                throw new RejectedExecutionException(12                        "Failed to start a new thread", e);
13            }
14        }
15    }

Netty 中的实现很像 JDK 中的 CallerRunsPolicy,舍不得抛弃工作。不同的是,CallerRunsPolicy 是间接在调用者线程执行的工作。而 Netty 是新建了一个线程来解决的。所以,Netty 的实现相较于调用者执行策略的应用面就能够扩大到反对高效率高性能的场景了。然而也要留神一点,Netty 的实现里,在创立线程时未做任何的判断束缚,也就是说只有零碎还有资源就会创立新的线程来解决,直到 new 不出新的线程了,才会抛创立线程失败的异样

ActiveMQ 中的线程池回绝策略

 1 new RejectedExecutionHandler() {
 2                @Override
 3                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
 4                    try {5                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
 6                    } catch (InterruptedException e) {7                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
 8                    }
 9
10                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
11                }
12            });

activeMq 中的策略属于最大致力执行工作型,当触发回绝策略时,在尝试一分钟的工夫从新将工作塞进工作队列,当一分钟超时还没胜利时,就抛出异样

PinPoint 中的线程池回绝策略

 1public class RejectedExecutionHandlerChain implements RejectedExecutionHandler {2    private final RejectedExecutionHandler[] handlerChain;
 3
 4    public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) {5        Objects.requireNonNull(chain, "handlerChain must not be null");
 6        RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]);
 7        return new RejectedExecutionHandlerChain(handlerChain);
 8    }
 9
10    private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) {11        this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null");
12    }
13
14    @Override
15    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {16        for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) {17            rejectedExecutionHandler.rejectedExecution(r, executor);
18        }
19    }
20}

pinpoint 的回绝策略实现很有特点,和其余的实现都不同。他定义了一个回绝策略链,包装了一个回绝策略列表,当触发回绝策略时,会将策略链中的 rejectedExecution 顺次执行一遍。

结语

前文从线程池设计思维,以及线程池触发回绝策略的机会引出 java 线程池回绝策略接口的定义。并辅以 JDK 内置 4 种以及四个第三方开源软件的回绝策略定义形容了线程池回绝策略实现的各种思路和应用场景。心愿浏览此文后能让你对 java 线程池回绝策略有更加粗浅的意识,可能依据不同的应用场景更加灵便的利用。

正文完
 0