共计 9060 个字符,预计需要花费 23 分钟才能阅读完成。
明天咱们来聊一个比拟实用的话题,动静可监控可观测的线程池实际。
这是个全新的开源我的项目,作者提供了一种十分好的思路解决了线程池的可观测问题。
这个开源我的项目叫:DynamicTp
地址在文章开端。
写在后面
略微有些 Java 编程教训的小伙伴都晓得,Java 的精华在 juc 包,这是赫赫有名的 Doug Lea 老爷子的杰作,评估一个程序员 Java 程度怎么样,肯定水平上看他对 juc 包下的一些技术把握的怎么样,这也是面试中的基本上必问的一些技术点之一。
juc 包次要包含:
1. 原子类(AtomicXXX)
2. 锁类(XXXLock)
3. 线程同步类(AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger)
4. 工作执行器类(Executor 体系类,包含明天的配角 ThreadPoolExecutor)
5. 并发汇合类(ConcurrentXXX、CopyOnWriteXXX)相干汇合类
6. 阻塞队列类(BlockingQueue 继承体系类)
7.Future 相干类
8. 其余一些辅助工具类
多线程编程场景下,这些类都是必备技能,会这些能够帮忙咱们写出高质量、高性能、少 bug 的代码,同时这些也是 Java 中比拟难啃的一些技术,须要坚持不懈,学以致用,在应用中感触他们带来的奥秘。
上边简略列举了下 juc 包下性能分类,这篇文章咱们次要来介绍动静可监控线程池的,所以具体内容也就不开展讲了,当前有工夫独自来聊吧。看这篇文章前,心愿读者最好有肯定的线程池 ThreadPoolExecutor 应用教训,不然看起来会有点懵。
如果你对 ThreadPoolExecutor 不是很相熟,举荐浏览上面两篇文章
javadoop: https://www.javadoop.com/post/java-thread-pool
美团技术博客: https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
背景
应用 ThreadPoolExecutor 过程中你是否有以下痛点呢?
1. 代码中创立了一个 ThreadPoolExecutor,然而不晓得那几个外围参数设置多少比拟适合
2. 凭教训设置参数值,上线后发现须要调整,改代码重启服务,十分麻烦
3. 线程池绝对开发人员来说是个黑盒,运行状况不能感知到,直到呈现问题
如果你有以上痛点,这篇文章要介绍的动静可监控线程池(DynamicTp)或者能帮忙到你。
如果看过 ThreadPoolExecutor 的源码,大略能够晓得其实它有提供一些 set 办法,能够在运行时动静去批改相应的值,这些办法有:
public void setCorePoolSize(int corePoolSize);
public void setMaximumPoolSize(int maximumPoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setThreadFactory(ThreadFactory threadFactory);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
当初大多数的互联网我的项目其实都会微服务化部署,有一套本人的服务治理体系,微服务组件中的分布式配置核心表演的就是动静批改配置,实时失效的角色。那么咱们是否能够联合配置核心来做运行时线程池参数的动静调整呢?答案是必定的,而且配置核心绝对都是高可用的,应用它也不必过于放心配置推送呈现问题这类事儿,而且也能缩小研发动静线程池组件的难度和工作量。
综上,咱们总结出以下的背景
- 广泛性:在 Java 开发中,想要进步零碎性能,线程池曾经是一个 90% 以上的人都会抉择应用的根底工具
- 不确定性:我的项目中可能会创立很多线程池,既有 IO 密集型的,也有 CPU 密集型的,但线程池的参数并不好确定;须要有套机制在运行过程中动静去调整参数
- 无感知性,线程池运行过程中的各项指标个别感知不到;须要有套监控报警机制在事先、事中就能让开发人员感知到线程池的运行状况,及时处理
- 高可用性,配置变更须要及时推送到客户端;须要有高可用的配置管理推送服务,配置核心是当初大多数互联网零碎都会应用的组件,与之联合能够大幅度缩小开发量及接入难度
简介
咱们基于配置核心对线程池 ThreadPoolExecutor 做一些扩大,实现对运行中线程池参数的动静批改,实时失效;以及实时监控线程池的运行状态,触发设置的报警策略时报警,报警信息会推送办公平台(钉钉、企微等)。报警维度包含(队列容量、线程池活性、回绝触发等);同时也会定时采集线程池指标数据供监控平台可视化应用。使咱们能时刻感知到线程池的负载,依据状况及时调整,避免出现问题影响线上业务。
| __ \ (_) |__ __|
| | | |_ _ _ __ __ _ _ __ ___ _ ___| |_ __
| | | | | | | '_ \ / _` |'_ ` _ | |/ __| | '_ \
| |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
|_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/
__/ | | |
|___/ |_|
:: Dynamic Thread Pool ::
个性
- 参考美团线程池实际,对线程池参数动态化治理,减少监控、报警性能
- 基于 Spring 框架,现只反对 SpringBoot 我的项目应用,轻量级,引入 starter 即可食用
- 基于配置核心实现线程池参数动静调整,实时失效;集成支流配置核心,默认反对 Nacos、Apollo,同时也提供 SPI 接口可自定义扩大实现
- 内置告诉报警性能,提供多种报警维度(配置变更告诉、活性报警、容量阈值报警、回绝策略触发报警),默认反对企业微信、钉钉报警,同时提供 SPI 接口可自定义扩大实现
- 内置线程池指标采集性能,反对通过 MicroMeter、JsonLog 日志输入、Endpoint 三种形式,可通过 SPI 接口自定义扩大实现
架构设计
次要分四大模块
-
配置变更监听模块:
1. 监听特定配置核心的指定配置文件(默认实现 Nacos、Apollo), 可通过外部提供的 SPI 接口扩大其余实现
2. 解析配置文件内容,内置实现 yml、properties 配置文件的解析,可通过外部提供的 SPI 接口扩大其余实现
3. 告诉线程池治理模块实现刷新
-
线程池治理模块:
1. 服务启动时从配置核心拉取配置信息,生成线程池实例注册到外部线程池注册核心中
2. 监听模块监听到配置变更时,将变更信息传递给治理模块,实现线程池参数的刷新
3. 代码中通过 getExecutor() 办法依据线程池名称来获取线程池对象实例
-
监控模块:
实现监控指标采集以及输入,默认提供以下三种形式,也可通过外部提供的 SPI 接口扩大其余实现
1. 默认实现 Json log 输入到磁盘
2.MicroMeter 采集,引入 MicroMeter 相干依赖
3. 暴雷 Endpoint 端点,可通过 http 形式拜访
-
告诉告警模块:
对接办公平台,实现通告告警性能,默认实现钉钉、企微,可通过外部提供的 SPI 接口扩大其余实现,告诉告警类型如下
1. 线程池参数变更告诉
2. 阻塞队列容量达到设置阈值告警
3. 线程池活性达到设置阈值告警
4. 触发回绝策略告警
应用
-
maven 依赖
<dependency> <groupId>io.github.lyh200</groupId> <artifactId>dynamic-tp-spring-cloud-starter</artifactId> <version>1.0.2-RELEASE</version> </dependency>
-
线程池配置
spring: dynamic: tp: enabled: true enabledBanner: true # 是否开启 banner 打印,默认 true enabledCollect: false # 是否开启监控指标采集,默认 false collectorType: logging # 监控数据采集器类型(JsonLog | MicroMeter),默认 logging logPath: /home/logs # 监控日志数据门路,默认 ${user.home}/logs monitorInterval: 5 # 监控工夫距离(报警判断、指标采集),默认 5s nacos: # nacos 配置,不配置有默认值(规定 name-dev.yml 这样)dataId: dynamic-tp-demo-dev.yml group: DEFAULT_GROUP apollo: # apollo 配置,不配置默认拿 apollo 配置第一个 namespace namespace: dynamic-tp-demo-dev.yml configType: yml # 配置文件类型 platforms: # 告诉报警平台配置 - platform: wechat urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c # 替换 receivers: test1,test2 # 承受人企微名称 - platform: ding urlKey: f80dad441fcd655438f4a08dcd6a # 替换 secret: SECb5441fa6f375d5b9d21 # 替换,非 sign 模式能够没有此值 receivers: 15810119805 # 钉钉账号手机号 executors: # 动静线程池配置 - threadPoolName: dynamic-tp-test-1 corePoolSize: 6 maximumPoolSize: 8 queueCapacity: 200 queueType: VariableLinkedBlockingQueue # 工作队列,查看源码 QueueTypeEnum 枚举类 rejectedHandlerType: CallerRunsPolicy # 回绝策略,查看 RejectedTypeEnum 枚举类 keepAliveTime: 50 allowCoreThreadTimeOut: false threadNamePrefix: test # 线程名前缀 notifyItems: # 报警项,不配置主动会配置(变更告诉、容量报警、活性报警、回绝报警)- type: capacity # 报警项类型,查看源码 NotifyTypeEnum 枚举类 enabled: true threshold: 80 # 报警阈值 platforms: [ding,wechat] # 可选配置,不配置默认拿下层 platforms 配置的所以平台 interval: 120 # 报警距离(单位:s)- type: change enabled: true - type: liveness enabled: true threshold: 80 - type: reject enabled: true threshold: 1
-
代码形式生成,服务启动会主动注册
@Configuration public class DtpConfig { @Bean public DtpExecutor demo1Executor() {return DtpCreator.createDynamicFast("demo1-executor"); } @Bean public ThreadPoolExecutor demo2Executor() {return ThreadPoolBuilder.newBuilder() .threadPoolName("demo2-executor") .corePoolSize(8) .maximumPoolSize(16) .keepAliveTime(50) .allowCoreThreadTimeOut(true) .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false) .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName()) .buildDynamic();} }
-
代码调用,依据线程池名称获取
public static void main(String[] args) {DtpExecutor dtpExecutor = DtpRegistry.getExecutor("dynamic-tp-test-1"); dtpExecutor.execute(() -> System.out.println("test")); }
注意事项
- 配置文件配置的参数会笼罩通过代码生成形式配置的参数
- 阻塞队列只有 VariableLinkedBlockingQueue 类型能够批改 capacity,该类型性能和 LinkedBlockingQueue 类似,只是 capacity 不是 final 类型,能够批改,
VariableLinkedBlockingQueue 参考 RabbitMq 的实现 -
启动看到如下日志输入证实接入胜利
| __ \ (_) |__ __| | | | |_ _ _ __ __ _ _ __ ___ _ ___| |_ __ | | | | | | | '_ \ / _` |'_ ` _ | |/ __| | '_ \ | |__| | |_| | | | | (_| | | | | | | | (__| | |_) | |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ __/ | | | |___/ |_| :: Dynamic Thread Pool :: DynamicTp register, executor: DtpMainPropWrapper(dtpName=dynamic-tp-test-1, corePoolSize=6, maxPoolSize=8, keepAliveTime=50, queueType=VariableLinkedBlockingQueue, queueCapacity=200, rejectType=RejectedCountableCallerRunsPolicy, allowCoreThreadTimeOut=false)
-
配置变更会推送告诉音讯,且会高亮变更的字段
DynamicTp [dynamic-tp-test-1] refresh end, changed keys: [corePoolSize, queueCapacity], corePoolSize: [6 => 4], maxPoolSize: [8 => 8], queueType: [VariableLinkedBlockingQueue => VariableLinkedBlockingQueue], queueCapacity: [200 => 2000], keepAliveTime: [50s => 50s], rejectedType: [CallerRunsPolicy => CallerRunsPolicy], allowsCoreThreadTimeOut: [false => false]
告诉报警
触发报警阈值会推送相应报警音讯(活性、容量、回绝),且会高亮显示相应字段
配置变更会推送告诉音讯,且会高亮变更的字段
监控日志
通过 collectType 属性配置监控指标采集类型,默认 logging
- MicroMeter:通过引入相干 MicroMeter 依赖采集到相应的平台
(如 Prometheus,InfluxDb…) -
Logging:定时采集指标数据以 Json 日志格局输入磁盘,地址 ${logPath}/dy
namictp/${appName}.monitor.log2022-01-11 00:25:20.599 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8} 2022-01-11 00:25:25.603 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8} 2022-01-11 00:25:30.609 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8} 2022-01-11 00:25:35.613 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8} 2022-01-11 00:25:40.616 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8}
-
裸露 EndPoint 端点 (dynamic-tp),能够通过 http 形式申请
[ { "dtp_name": "remoting-call", "core_pool_size": 6, "maximum_pool_size": 12, "queue_type": "SynchronousQueue", "queue_capacity": 0, "queue_size": 0, "fair": false, "queue_remaining_capacity": 0, "active_count": 0, "task_count": 21760, "completed_task_count": 21760, "largest_pool_size": 12, "pool_size": 6, "wait_task_count": 0, "reject_count": 124662, "reject_handler_name": "CallerRunsPolicy" }, { "max_memory": "228 MB", "total_memory": "147 MB", "free_memory": "44.07 MB", "usable_memory": "125.07 MB" } ]
我的项目地址
gitee 地址: https://gitee.com/yanhom/dynamic-tp-spring-cloud-starter
github 地址 :https://github.com/lyh200/dynamic-tp-spring-cloud-starter
分割作者
对我的项目有什么想法或者倡议,能够在上述地址中加到作者微信进行交换,或者创立 issues,一起欠缺我的项目!
最初,反对的话还望大家去点个 star 哦。