明天咱们来聊一个比拟实用的话题,动静可监控可观测的线程池实际。

这是个全新的开源我的项目,作者提供了一种十分好的思路解决了线程池的可观测问题。

这个开源我的项目叫:DynamicTp

地址在文章开端。


写在后面

略微有些Java编程教训的小伙伴都晓得,Java的精华在juc包,这是赫赫有名的Doug Lea老爷子的杰作,评估一个程序员Java程度怎么样,肯定水平上看他对juc包下的一些技术把握的怎么样,这也是面试中的基本上必问的一些技术点之一。

juc包次要包含:

1.原子类(AtomicXXX)

2.锁类(XXXLock)

3.线程同步类(AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger)

4.工作执行器类(Executor体系类,包含明天的配角ThreadPoolExecutor)

5.并发汇合类(ConcurrentXXX、CopyOnWriteXXX)相干汇合类

6.阻塞队列类(BlockingQueue继承体系类)

7.Future相干类

8.其余一些辅助工具类

多线程编程场景下,这些类都是必备技能,会这些能够帮忙咱们写出高质量、高性能、少bug的代码,同时这些也是Java中比拟难啃的一些技术,须要坚持不懈,学以致用,在应用中感触他们带来的奥秘。

上边简略列举了下juc包下性能分类,这篇文章咱们次要来介绍动静可监控线程池的,所以具体内容也就不开展讲了,当前有工夫独自来聊吧。看这篇文章前,心愿读者最好有肯定的线程池ThreadPoolExecutor应用教训,不然看起来会有点懵。

如果你对ThreadPoolExecutor不是很相熟,举荐浏览上面两篇文章

javadoop: https://www.javadoop.com/post/java-thread-pool

美团技术博客: https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html


背景

应用ThreadPoolExecutor过程中你是否有以下痛点呢?

1.代码中创立了一个ThreadPoolExecutor,然而不晓得那几个外围参数设置多少比拟适合

2.凭教训设置参数值,上线后发现须要调整,改代码重启服务,十分麻烦

3.线程池绝对开发人员来说是个黑盒,运行状况不能感知到,直到呈现问题

如果你有以上痛点,这篇文章要介绍的动静可监控线程池(DynamicTp)或者能帮忙到你。

如果看过ThreadPoolExecutor的源码,大略能够晓得其实它有提供一些set办法,能够在运行时动静去批改相应的值,这些办法有:

public void setCorePoolSize(int corePoolSize);public void setMaximumPoolSize(int maximumPoolSize);public void setKeepAliveTime(long time, TimeUnit unit);public void setThreadFactory(ThreadFactory threadFactory);public void setRejectedExecutionHandler(RejectedExecutionHandler handler);

当初大多数的互联网我的项目其实都会微服务化部署,有一套本人的服务治理体系,微服务组件中的分布式配置核心表演的就是动静批改配置,实时失效的角色。那么咱们是否能够联合配置核心来做运行时线程池参数的动静调整呢?答案是必定的,而且配置核心绝对都是高可用的,应用它也不必过于放心配置推送呈现问题这类事儿,而且也能缩小研发动静线程池组件的难度和工作量。

综上,咱们总结出以下的背景

  • 广泛性:在Java开发中,想要进步零碎性能,线程池曾经是一个90%以上的人都会抉择应用的根底工具
  • 不确定性:我的项目中可能会创立很多线程池,既有IO密集型的,也有CPU密集型的,但线程池的参数并不好确定;须要有套机制在运行过程中动静去调整参数
  • 无感知性,线程池运行过程中的各项指标个别感知不到;须要有套监控报警机制在事先、事中就能让开发人员感知到线程池的运行状况,及时处理
  • 高可用性,配置变更须要及时推送到客户端;须要有高可用的配置管理推送服务,配置核心是当初大多数互联网零碎都会应用的组件,与之联合能够大幅度缩小开发量及接入难度

简介

咱们基于配置核心对线程池ThreadPoolExecutor做一些扩大,实现对运行中线程池参数的动静批改,实时失效;以及实时监控线程池的运行状态,触发设置的报警策略时报警,报警信息会推送办公平台(钉钉、企微等)。报警维度包含(队列容量、线程池活性、回绝触发等);同时也会定时采集线程池指标数据供监控平台可视化应用。使咱们能时刻感知到线程池的负载,依据状况及时调整,避免出现问题影响线上业务。

    |  __ \                            (_) |__   __|    | |  | |_   _ _ __   __ _ _ __ ___  _  ___| |_ __      | |  | | | | | '_ \ / _` | '_ ` _ | |/ __| | '_ \     | |__| | |_| | | | | (_| | | | | | | | (__| | |_) |    |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/              __/ |                              | |                |___/                               |_|         :: Dynamic Thread Pool :: 

个性

  • 参考美团线程池实际 ,对线程池参数动态化治理,减少监控、报警性能
  • 基于Spring框架,现只反对SpringBoot我的项目应用,轻量级,引入starter即可食用
  • 基于配置核心实现线程池参数动静调整,实时失效;集成支流配置核心,默认反对Nacos、Apollo,同时也提供SPI接口可自定义扩大实现
  • 内置告诉报警性能,提供多种报警维度(配置变更告诉、活性报警、容量阈值报警、回绝策略触发报警),默认反对企业微信、钉钉报警,同时提供SPI接口可自定义扩大实现
  • 内置线程池指标采集性能,反对通过MicroMeter、JsonLog日志输入、Endpoint三种形式,可通过SPI接口自定义扩大实现

架构设计

次要分四大模块

  • 配置变更监听模块:

    1.监听特定配置核心的指定配置文件(默认实现Nacos、Apollo),可通过外部提供的SPI接口扩大其余实现

    2.解析配置文件内容,内置实现yml、properties配置文件的解析,可通过外部提供的SPI接口扩大其余实现

    3.告诉线程池治理模块实现刷新

  • 线程池治理模块:

    1.服务启动时从配置核心拉取配置信息,生成线程池实例注册到外部线程池注册核心中

    2.监听模块监听到配置变更时,将变更信息传递给治理模块,实现线程池参数的刷新

    3.代码中通过getExecutor()办法依据线程池名称来获取线程池对象实例

  • 监控模块:

    实现监控指标采集以及输入,默认提供以下三种形式,也可通过外部提供的SPI接口扩大其余实现

    1.默认实现Json log输入到磁盘

    2.MicroMeter采集,引入MicroMeter相干依赖

    3.暴雷Endpoint端点,可通过http形式拜访

  • 告诉告警模块:

    对接办公平台,实现通告告警性能,默认实现钉钉、企微,可通过外部提供的SPI接口扩大其余实现,告诉告警类型如下

    1.线程池参数变更告诉

    2.阻塞队列容量达到设置阈值告警

    3.线程池活性达到设置阈值告警

    4.触发回绝策略告警


应用

  • maven依赖

    <dependency>     <groupId>io.github.lyh200</groupId>     <artifactId>dynamic-tp-spring-cloud-starter</artifactId>     <version>1.0.2-RELEASE</version></dependency>
  • 线程池配置

    spring:  dynamic:    tp:      enabled: true      enabledBanner: true        # 是否开启banner打印,默认true      enabledCollect: false      # 是否开启监控指标采集,默认false      collectorType: logging     # 监控数据采集器类型(JsonLog | MicroMeter),默认logging      logPath: /home/logs        # 监控日志数据门路,默认${user.home}/logs      monitorInterval: 5         # 监控工夫距离(报警判断、指标采集),默认5s      nacos:                     # nacos配置,不配置有默认值(规定name-dev.yml这样)        dataId: dynamic-tp-demo-dev.yml        group: DEFAULT_GROUP      apollo:                    # apollo配置,不配置默认拿apollo配置第一个namespace        namespace: dynamic-tp-demo-dev.yml      configType: yml            # 配置文件类型      platforms:                 # 告诉报警平台配置        - platform: wechat          urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c  # 替换          receivers: test1,test2                   # 承受人企微名称        - platform: ding          urlKey: f80dad441fcd655438f4a08dcd6a     # 替换          secret: SECb5441fa6f375d5b9d21           # 替换,非sign模式能够没有此值          receivers: 15810119805                   # 钉钉账号手机号                executors:                                   # 动静线程池配置        - threadPoolName: dynamic-tp-test-1          corePoolSize: 6          maximumPoolSize: 8          queueCapacity: 200          queueType: VariableLinkedBlockingQueue   # 工作队列,查看源码QueueTypeEnum枚举类          rejectedHandlerType: CallerRunsPolicy    # 回绝策略,查看RejectedTypeEnum枚举类          keepAliveTime: 50          allowCoreThreadTimeOut: false          threadNamePrefix: test           # 线程名前缀          notifyItems:                     # 报警项,不配置主动会配置(变更告诉、容量报警、活性报警、回绝报警)            - type: capacity               # 报警项类型,查看源码 NotifyTypeEnum枚举类              enabled: true              threshold: 80                # 报警阈值              platforms: [ding,wechat]     # 可选配置,不配置默认拿下层platforms配置的所以平台              interval: 120                # 报警距离(单位:s)            - type: change              enabled: true            - type: liveness              enabled: true              threshold: 80            - type: reject              enabled: true              threshold: 1
  • 代码形式生成,服务启动会主动注册

    @Configurationpublic class DtpConfig {   @Bean   public DtpExecutor demo1Executor() {       return DtpCreator.createDynamicFast("demo1-executor");  }   @Bean   public ThreadPoolExecutor demo2Executor() {       return ThreadPoolBuilder.newBuilder()              .threadPoolName("demo2-executor")              .corePoolSize(8)              .maximumPoolSize(16)              .keepAliveTime(50)              .allowCoreThreadTimeOut(true)              .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)              .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName())              .buildDynamic();  }}
  • 代码调用,依据线程池名称获取

    public static void main(String[] args) {       DtpExecutor dtpExecutor = DtpRegistry.getExecutor("dynamic-tp-test-1");       dtpExecutor.execute(() -> System.out.println("test"));}

注意事项

  1. 配置文件配置的参数会笼罩通过代码生成形式配置的参数
  2. 阻塞队列只有VariableLinkedBlockingQueue类型能够批改capacity,该类型性能和LinkedBlockingQueue类似,只是capacity不是final类型,能够批改,
    VariableLinkedBlockingQueue参考RabbitMq的实现
  3. 启动看到如下日志输入证实接入胜利

    |  __ \                            (_) |__   __|   | |  | |_   _ _ __   __ _ _ __ ___  _  ___| |_ __  | |  | | | | | '_ \ / _` | '_ ` _ | |/ __| | '_ \ | |__| | |_| | | | | (_| | | | | | | | (__| | |_) ||_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/          __/ |                              | |            |___/                               |_|     :: Dynamic Thread Pool :: DynamicTp register, executor: DtpMainPropWrapper(dtpName=dynamic-tp-test-1, corePoolSize=6, maxPoolSize=8, keepAliveTime=50, queueType=VariableLinkedBlockingQueue, queueCapacity=200, rejectType=RejectedCountableCallerRunsPolicy, allowCoreThreadTimeOut=false)
  4. 配置变更会推送告诉音讯,且会高亮变更的字段

    DynamicTp [dynamic-tp-test-1] refresh end, changed keys: [corePoolSize, queueCapacity], corePoolSize: [6 => 4], maxPoolSize: [8 => 8], queueType: [VariableLinkedBlockingQueue => VariableLinkedBlockingQueue], queueCapacity: [200 => 2000], keepAliveTime: [50s => 50s], rejectedType: [CallerRunsPolicy => CallerRunsPolicy], allowsCoreThreadTimeOut: [false => false]

告诉报警

触发报警阈值会推送相应报警音讯(活性、容量、回绝),且会高亮显示相应字段

配置变更会推送告诉音讯,且会高亮变更的字段


监控日志

通过collectType属性配置监控指标采集类型,默认 logging

  • MicroMeter:通过引入相干MicroMeter依赖采集到相应的平台
    (如Prometheus,InfluxDb...)
  • Logging:定时采集指标数据以Json日志格局输入磁盘,地址${logPath}/dy
    namictp/${appName}.monitor.log

    2022-01-11 00:25:20.599 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8}2022-01-11 00:25:25.603 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8}2022-01-11 00:25:30.609 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8}2022-01-11 00:25:35.613 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8}2022-01-11 00:25:40.616 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8}
  • 裸露EndPoint端点(dynamic-tp),能够通过http形式申请

    [    {        "dtp_name": "remoting-call",        "core_pool_size": 6,        "maximum_pool_size": 12,        "queue_type": "SynchronousQueue",        "queue_capacity": 0,        "queue_size": 0,        "fair": false,        "queue_remaining_capacity": 0,        "active_count": 0,        "task_count": 21760,        "completed_task_count": 21760,        "largest_pool_size": 12,        "pool_size": 6,        "wait_task_count": 0,        "reject_count": 124662,        "reject_handler_name": "CallerRunsPolicy"    },    {        "max_memory": "228 MB",        "total_memory": "147 MB",        "free_memory": "44.07 MB",        "usable_memory": "125.07 MB"    }]

我的项目地址

gitee地址: https://gitee.com/yanhom/dynamic-tp-spring-cloud-starter

github地址:https://github.com/lyh200/dynamic-tp-spring-cloud-starter


分割作者

对我的项目有什么想法或者倡议,能够在上述地址中加到作者微信进行交换,或者创立issues,一起欠缺我的项目!

最初,反对的话还望大家去点个star哦。