前言

大家好,明天开始给大家分享 — Dubbo 专题之 Dubbo 线程池模型。在后面上个章节中咱们探讨了 Dubbo SPI,理解了 Dubbo SPI 其本质是从 JDK 规范的 SPI (Service Provider Interface) 扩大点发现机制增强而来,同时解决了 Java 中 SPI 的一些缺点。以及咱们应用 Dubbo SPI 实现自定义能力的拓展。那本章节咱们要探讨的 Dubbo 线程模型也是基于 SPI 实现,那什么是线程模型呢?以及其在咱们的我的项目中有什么作用呢?那么咱们在本章节中进行探讨。上面就让咱们疾速开始吧!

1. 线程模型简介

小伙伴如果对 Servlet 相熟就晓得,从 Servlet 3.x 开始反对异步非阻塞模式。至于什么异步非阻塞后面我在后面的章节中有探讨小伙伴能够自行学习之前的文章。咱们通过一个拜访Web利用流程图简略阐明:

在下面的流程图中咱们能够看到第一个申请发动同步 Web 调用,而后 Web 再发动对第三方服务的调用,整个过程全链路是同步调用。第二个申请同样也是发动同步调用,然而在发动第三方调用的时候切换了线程(基于 Servlet 3.x 咱们不须要手动的创立线程来切换)。这么做的益处在于咱们能够用专门解决线程池去做业务解决或第三方服务的调用。那什么状况下咱们须要切换线程不应用主线程呢?如果事件处理的逻辑能迅速实现,并且不会发动新的 IO 申请,比方只是在内存中记个标识,则间接在 IO 线程上解决更快,因为缩小了线程池调度。但如果事件处理逻辑较慢,或者须要发动新的 IO 申请,比方须要查询数据库或其它服务调用时,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接管其它申请。

2. 应用形式

那在 Dubbo 中给咱们提供了通过不同的派发策略和不同的线程池配置的组合来应答不同的场景。配置形式如下:

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

上面咱们简略形容下dispatcherthreadpool的参数阐明:

  1. Dispatcher
  • all 所有音讯都派发到线程池,包含申请,响应,连贯事件,断开事件,心跳等。(默认)
  • direct 所有音讯都不派发到线程池,全副在 IO 线程上间接执行。
  • message 只有申请响应音讯派发到线程池,其它连贯断开事件,心跳等音讯,间接在 IO 线程上执行。
  • execution 只有申请音讯派发到线程池,不含响应,响应和其它连贯断开事件,心跳等音讯,间接在 IO 线程上执行。
  • connection 在 IO 线程上,将连贯断开事件放入队列,有序一一执行,其它音讯派发到线程池。
  1. ThreadPool
  • fixed 固定大小线程池,启动时建设线程,不敞开,始终持有。(默认)
  • cached 缓存线程池,闲暇一分钟主动删除,须要时重建。
  • limited 可伸缩线程池,但池中的线程数只会增长不会膨胀。只增长不膨胀的目标是为了防止膨胀时忽然来了大流量引起的性能问题。
  • eager 优先创立Worker线程池。在工作数量大于corePoolSize然而小于maximumPoolSize时,优先创立Worker来解决工作。当工作数量大于maximumPoolSize时,将工作放入阻塞队列中。阻塞队列充斥时抛出RejectedExecutionException。(相比于cached:cached在工作数量超过maximumPoolSize时间接抛出异样而不是将工作放入阻塞队列)

3. 应用场景

通过后面的介绍咱们应该明确咱们为什么须要切换线程,遵循一个很简略的准则:如果咱们解决的工作须要操作新的 IO 或者解决工作须要很长的工夫那么咱们就能够把这部分工作放到咱们的工作线程池去解决。那么咱们简略的总结下在工作常遇到的场景:

  1. 计算型服务:在我之前的工作中遇到这样的一个需要:咱们的车机实时上报数据给服务器,服务器记录数据并且实时计算和纠正导航数据。那么这里咱们须要一个计算型的微服务,次要的工作就是计算和修改实时数据,那么这个服务就是典型的计算型服务,所有咱们计算过程中尽量减少线程的切换并尽可能的在一个线程内进行计算。这样缩小线程切换的开销提供计算速度。
  2. 网关服务:首先咱们须要理解什么是网关,简略的了解就是所有的服务入口,对每个服务的调用必须通过网关转发到对应服务上(相似 Nginx )。那这里网关次要工作就是服务转发(鉴权、限流等等),能够了解为发动申请。很显著发动申请就是开启新的 IO 所有咱们能够切换到线程池去解决。

4. 示例演示

上面咱们通过以获取图书列表为例进行演示。以下是我的项目的结构图:

因为这里咱们次要是对服务提供端的配置,所有咱们次要看dubbo-provider-xml.xml配置内容:

<?xml version="1.0" encoding="UTF-8"?><beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"       xmlns="http://www.springframework.org/schema/beans"       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">    <!-- 指定散发策略为:all 线程池:fixed 固定大小为:100 -->    <dubbo:protocol port="20880" name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />    <dubbo:application name="demo-provider" metadata-type="remote"/>    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>    <bean id="bookFacade" class="com.muke.dubbocourse.test.provider.BookFacadeImpl"/>    <!--裸露服务为Dubbo服务-->    <dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade" ref="bookFacade" /></beans>

下面的 XML 配置中dispatcher="all"指定事件的散发策略、threadpool="fixed" threads="100"指定线程池固定大小为100

5. 原理剖析

这里散发策略和线程池采纳 Dubbo 中的 SPI 形式加载的小伙伴能够参考后面的 《Dubbo SPI》章节进行理解。上面咱们进入主题,首先看看在 Dubbo 中为咱们提供的5种事件散发策略:

咱们这里简略的剖析 all散发策略其它的都是相似的小伙伴自行查阅源码剖析。上面咱们看看org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler外围源码:

/*** *@className AllChannelHandler *        *@description 所有解决散发到线程池去解决 *        *@author <a href="http://youngitman.tech">青年IT男</a> *        *@date 12:50 2020-03-05 *        *@JunitTest: {@link  }      * *@version v1.0.0 *       **/public class AllChannelHandler extends WrappedChannelHandler {    public AllChannelHandler(ChannelHandler handler, URL url) {        super(handler, url);    }    /**     *     * 近程连贯事件回调     *     * @author liyong      * @date 1:34 PM 2020/12/6      * @param channel      * @exception      * @return void      **/    @Override    public void connected(Channel channel) throws RemotingException {        ExecutorService executor = getExecutorService();        try {            //连贯到近程事件放入线程池执行            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));        } catch (Throwable t) {            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);        }    }    /**     *     * 端口近程连贯     *     * @author liyong      * @date 1:34 PM 2020/12/6      * @param channel      * @exception      * @return void      **/    @Override    public void disconnected(Channel channel) throws RemotingException {        ExecutorService executor = getExecutorService();        try {            //断开连接处理事件放入线程池执行            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));        } catch (Throwable t) {            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);        }    }    /**     *     * 接管到数据回调     *     * @author liyong      * @date 1:34 PM 2020/12/6      * @param channel      * @param message      * @exception      * @return void      **/    @Override    public void received(Channel channel, Object message) throws RemotingException {        ExecutorService executor = getPreferredExecutorService(message);        try {            //接管到数据放入线程池解决            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));        } catch (Throwable t) {            if(message instanceof Request && t instanceof RejectedExecutionException){                sendFeedback(channel, (Request) message, t);                return;            }            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);        }    }    /**     *     * 产生异样回调     *     * @author liyong      * @date 1:35 PM 2020/12/6      * @param channel      * @param exception      * @exception      * @return void      **/    @Override    public void caught(Channel channel, Throwable exception) throws RemotingException {        ExecutorService executor = getExecutorService();        try {            //产生异样放入线程池解决            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));        } catch (Throwable t) {            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);        }    }}

从下面的代码正文中能够看到 all 这种解决策略就是所有音讯都派发到线程池,包含申请、响应、连贯事件、断开事件、心跳等。

接下来咱们看看线程池的解决策略次要反对4种:

咱们以fixed策略进行剖析。咱们看到org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool外围源码:

/** * 创立固定大小线程池 * * @see java.util.concurrent.Executors#newFixedThreadPool(int) */public class FixedThreadPool implements ThreadPool {    @Override    public Executor getExecutor(URL url) {        //线程池名称        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);        //线程池大小        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);        //队列大小        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,                //如果队列大小为0应用同步队列                queues == 0 ? new SynchronousQueue<Runnable>() :                        //否则应用指定大小到阻塞队列                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()                                : new LinkedBlockingQueue<Runnable>(queues)),                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));    }}

下面的源码中应用指定大小的队列创立线程池,如果队列大小为0应用同步队列。

6. 小结

在本大节中咱们次要学习了 Dubbo 中的线程池模型,在 Dubbo 中为咱们提供了两种策略调整线程池模型别离是:DispatcherThreadPool。其中Dispatcher提供了5种策略:alldirectmessageexecutionconnectionThreadPool提供了4种策略:fixedcachedlimitedeager。同时咱们别离从源码中学习了底层的实现逻辑。

本节课程的重点如下:

  1. 了解 Dubbo 中线程模型
  2. 理解什么是 Dispatcher模式
  3. 理解什么是 ThreadPool模式
  4. 理解线程模型实现原理

写在最初

本大节是 Dubbo 入门到精通系列 (《从零开始学习Dubbo》、《Dubbo高阶利用》、《Dubbo源码剖析》) 中 《从零开始学习Dubbo》根底课程最初一大节,感激大家长期的反对。因为自己工夫精力有限前面课程的相干专题更新可能比拟迟缓请多多蕴含,再次感激小伙伴的关注。如果想取得最新的专题分享请关注我的微信公众号。

作者

集体从事金融行业,就任过易极付、思建科技、某网约车平台等重庆一流技术团队,目前就任于某银行负责对立领取零碎建设。本身对金融行业有强烈的喜好。同时也实际大数据、数据存储、自动化集成和部署、散布式微服务、响应式编程、人工智能等畛域。同时也热衷于技术分享创建公众号和博客站点对常识体系进行分享。关注公众号:青年IT男 获取最新技术文章推送!

博客地址: http://youngitman.tech

微信公众号: