共计 32986 个字符,预计需要花费 83 分钟才能阅读完成。
作者:京东物流 杨建民
一、Sentinel 简介
Sentinel 以流量为切入点,从 流量管制 、 熔断降级 、 零碎负载 爱护等多个维度爱护服务的稳定性。
Sentinel 具备以下特色:
- 丰盛的利用场景:秒杀(即突发流量管制在零碎容量能够接受的范畴)、音讯削峰填谷、集群流量管制、实时熔断上游不可用利用等。
- 齐备的实时监控:Sentinel 同时提供实时的监控性能。您能够在控制台中看到接入利用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行状况。
- 宽泛的开源生态:Sentinel 提供开箱即用的与其它开源框架 / 库的整合模块,例如与 Spring Cloud、Apache Dubbo、gRPC、Quarkus 的整合。您只须要引入相应的依赖并进行简略的配置即可疾速地接入 Sentinel。同时 Sentinel 提供 Java/Go/C++ 等多语言的原生实现。
- 欠缺的 SPI 扩大机制:Sentinel 提供简略易用、欠缺的 SPI 扩大接口。您能够通过实现扩大接口来疾速地定制逻辑。例如定制规定治理、适配动静数据源等
无关 Sentinel 的具体介绍以及和 Hystrix 的区别能够自行网上检索,举荐一篇文章:https://mp.weixin.qq.com/s/Q7Xv8cypQFrrOQhbd9BOXw
本次次要应用了 Sentinel 的降级、限流、零碎负载爱护性能
二、Sentinel 关键技术源码解析
无论是限流、降级、负载等管制伎俩,大抵流程如下:
•StatisticSlot 则用于记录、统计不同维度的 runtime 指标监控信息
•责任链顺次触发后续 slot 的 entry 办法,如 SystemSlot、FlowSlot、DegradeSlot 等的规定校验;
•当后续的 slot 通过,没有抛出 BlockException 异样,阐明该资源被胜利调用,则减少执行线程数和通过的申请数等信息。
对于数据统计,次要会牵扯到 ArrayMetric、BucketLeapArray、MetricBucket、WindowWrap 等类。
我的项目构造
以下次要剖析 core 包里的内容
2.1 注解入口
2.1.1 Entry、Context、Node
SphU 门面类的办法出参都是 Entry,Entry 能够了解为每次进入资源的一个凭证,如果调用 SphO.entry()或者 SphU.entry()能获取 Entry 对象,代表获取了凭证,没有被限流,否则抛出一个 BlockException。
Entry 中持有本次对资源调用的相干信息:
•createTime:创立该 Entry 的工夫戳。
•curNode:Entry 以后是在哪个节点。
•orginNode:Entry 的调用源节点。
•resourceWrapper:Entry 关联的资源信息。
Entry 是一个抽象类,CtEntry 是 Entry 的实现,CtEntry 持有 Context 和调用链的信息
Context 的源码正文如下,
This class holds metadata of current invocation
Node 的源码正文
Holds real-time statistics for resources
Node 中保留了对资源的实时数据的统计,Sentinel 中的限流或者降级等性能就是通过 Node 中的数据进行判断的。Node 是一个接口,外面定义了各种操作 request、exception、rt、qps、thread 的办法。
在细看 Node 实现时,不难发现 LongAddr 的应用,对于 LongAddr 和 DoubleAddr 都是 java8 java.util.concurrent.atomic 里的内容,感兴趣的小伙伴能够再深入研究一下,这两个是高并发下计数性能 十分优良 的数据结构,理论利用场景里须要计数时能够思考应用。
对于 Node 的介绍后续还会深刻,此处大抵先提一下这个概念。
2.2 初始化
2.2.1 Context 初始化
在初始化 slot 责任链局部前,还执行了 context 的初始化,外面波及几个重要概念,须要解释一下:
能够发现在 Context 初始化的过程中,会把 EntranceNode 退出到 Root 子节点中(理论 Root 自身是一个非凡的 EntranceNode),并把 EntranceNode 放到 contextNameNodeMap 中。
之前简略提到过 Node,是用来统计数据用的,不同 Node 性能如下:
•Node:用于实现数据统计的接口
•StatisticNode:统计节点,是 Node 接口的实现类,用于实现数据统计
•EntranceNode:入口节点,一个 Context 会有一个入口节点,用于统计以后 Context 的总体流量数据
•DefaultNode:默认节点,用于统计一个资源在以后 Context 中的流量数据
•ClusterNode:集群节点,用于统计一个资源在所有 Context 中的总体流量数据
protected static Context trueEnter(String name, String origin) {Context context = contextHolder.get();
if (context == null) {
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {setNullContext();
return NULL_CONTEXT;
} else {LOCK.lock();
try {node = contextNameNodeMap.get(name);
if (node == null) {if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {setNullContext();
return NULL_CONTEXT;
} else {node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// Add entrance node.
Constants.ROOT.addChild(node);
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {LOCK.unlock();
}
}
}
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context);
}
return context;
}
2.2.2 通过 SpiLoader 默认初始化 8 个 slot
每个 slot 的主要职责如下:
•NodeSelectorSlot 负责 收集资源的门路 ,并将这些资源的调用门路,以树状构造 存储 起来,用于依据调用门路来 限流降级;
•ClusterBuilderSlot 则用于 存储 资源的 统计信息 以及 调用者信息 ,例如该资源的 RT, QPS, thread count 等等,这些信息将用 作为多维度限流,降级的根据;
•StatisticSlot 则用于 记录、统计不同纬度的 runtime 指标监控信息;
•FlowSlot 则用于依据预设的限流规定以及后面 slot 统计的状态,来进行 流量管制;
•AuthoritySlot 则依据配置的黑白名单和调用起源信息,来做 黑白名单管制;
•DegradeSlot 则通过统计信息以及预设的规定,来做 熔断降级;
•SystemSlot 则通过零碎的状态,例如 集群 QPS、线程数、RT、负载 等,来 管制总的入口流量;
2.3 StatisticSlot
2.3.1 Node
深刻看一下 Node,因为统计信息都在外面,前面不论是限流、熔断、负载爱护等都是联合规定 + 统计信息判断是否要执行
从 Node 的源码正文看,它会持有资源维度的实时统计数据,以下是接口里的办法定义,能够看到 totalRequest、totalPass、totalSuccess、blockRequest、totalException、passQps 等很多 request、qps、thread 的相干办法:
/**
* Holds real-time statistics for resources.
*
* @author qinan.qn
* @author leyou
* @author Eric Zhao
*/
public interface Node extends OccupySupport, DebugSupport {long totalRequest();
long totalPass();
long totalSuccess();
long blockRequest();
long totalException();
double passQps();
double blockQps();
double totalQps();
double successQps();
……
}
2.3.2 StatisticNode
咱们先从最根底的 StatisticNode 开始看,源码给出的定位是:
The statistic node keep three kinds of real-time statistics metrics:metrics in second level ({@code rollingCounterInSecond})
metrics in minute level ({@code rollingCounterInMinute})
thread count
StatisticNode 只有四个属性,除了之前提到过的 LongAddr 类型的 curThreadNum 外,还有两个属性是 Metric 对象,通过入参曾经属性命名能够看出,一个用于秒级,一个用于分钟级统计。接下来咱们就要看看 Metric
// StatisticNode 持有两个 Metric,一个秒级一个分钟级,由入参可知,秒级统计划分了两个工夫窗口,窗口水平是 500ms
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
// 分钟级统计划分了 60 个工夫窗口,窗口长度是 1000ms
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
/**
* The counter for thread count.
*/
private LongAdder curThreadNum = new LongAdder();
/**
* The last timestamp when metrics were fetched.
*/
private long lastFetchTime = -1;
ArrayMetric 只有一个属性 LeapArray<MetricBucket>,其余都是用于统计的办法,LeapArray 是 sentinel 中统计最根本的数据结构,这里有必要具体看一下,总体就是依据 timeMillis 去获取一个 bucket,分为:没有创立、有间接返回、被废除后的 reset 三种场景。
// 以分钟级的统计属性为例,看一下工夫窗口初始化过程
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
public LeapArray(int sampleCount, int intervalInMs) {AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid:" + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
// windowLengthInMs = 60*1000 / 60 = 1000 滑动窗口工夫长度,可见 sentinel 默认将单位工夫分为了 60 个滑动窗口进行数据统计
this.windowLengthInMs = intervalInMs / sampleCount;
// 60*1000
this.intervalInMs = intervalInMs;
// 60
this.intervalInSecond = intervalInMs / 1000.0;
// 60
this.sampleCount = sampleCount;
// 数组长度 60
this.array = new AtomicReferenceArray<>(sampleCount);
}
/**
* Get bucket item at provided timestamp.
*
* @param timeMillis a valid timestamp in milliseconds
* @return current bucket item at provided timestamp if the time is valid; null if time is invalid
*/
public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}
// 依据以后工夫戳算一个数组索引
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
// timeMillis % 1000
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket.
*/
while (true) {WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
// newEmptyBucket 办法重写,秒级和分钟级统计对象实现不同
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
// 持有一个工夫窗口对象的数据,会依据以后工夫戳除以工夫窗口长度而后散列到数组中
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
WindowWrap 持有了 windowLengthInMs, windowStart 和 LeapArray(分钟统计实现是 BucketLeapArray,秒级统计实现是 OccupiableBucketLeapArray),对于分钟级别的统计,MetricBucket 保护了一个 longAddr 数组和一个配置的 minRT
/**
* The fundamental data structure for metric statistics in a time span.
*
* @author jialiang.linjl
* @author Eric Zhao
* @see LeapArray
*/
public class BucketLeapArray extends LeapArray<MetricBucket> {public BucketLeapArray(int sampleCount, int intervalInMs) {super(sampleCount, intervalInMs);
}
@Override
public MetricBucket newEmptyBucket(long time) {return new MetricBucket();
}
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
w.value().reset();
return w;
}
}
对于秒级统计,QPS=20 场景下,如何精确统计的问题,此处用到了另外一个 LeapArry 实现 FutureBucketLeapArray,至于秒级统计如何保障没有统计误差,读者能够再钻研一下 FutureBucketLeapArray 的上下文就好。
2.4 FlowSlot
2.4.1 常见限流算法
介绍 sentinel 限流实现前,先介绍一下常见限流算法,根本分为三种:计数器、漏斗、令牌桶。
计数器算法
顾名思义,计数器算法就是统计某个时间段内的申请,每单位工夫加 1,而后与配置的限流值(最大 QPS)进行比拟,如果超出则触发限流。然而这种算法不能做到“平滑限流”,以 1s 为单位工夫,100QPS 为限流值为例,如下图,会呈现某时段超出限流值的状况
因而在单纯计数器算法上,又呈现了滑动窗口计数器算法,咱们将统计工夫细分,比方将 1s 统计时长分为 5 个工夫窗口,通过滚动统计所有工夫窗口的 QPS 作为零碎理论的 QPS 的形式,就能解决上述临界统计问题,后续咱们看 sentinel 源码时也能看到相似操作。
漏斗算法
不管流量有多大都会先到漏桶中,而后以平均的速度流出。如何在代码中实现这个匀速呢?比方咱们想让匀速为 100q/s,那么咱们能够失去每流出一个流量须要耗费 10ms,相似一个队列,每隔 10ms 从队列头部取出流量进行放行,而咱们的队列也就是漏桶,当流量大于队列的长度的时候,咱们就能够回绝超出的局部。
漏斗算法同样的也有肯定的毛病:无奈应答突发流量。比方一瞬间来了 100 个申请,在漏桶算法中只能一个一个的过来,当最初一个申请流出的时候工夫曾经过了一秒了,所以漏斗算法比拟适宜申请达到比拟平均,须要严格控制申请速率的场景。
令牌桶算法
令牌桶算法和漏斗算法比拟相似,区别是令牌桶寄存的是令牌数量不是申请数量,令牌桶能够依据本身需要多样性得治理令牌的生产和耗费,能够解决突发流量的问题。
2.4.2 单机限流模式
接下来咱们看一下 Sentinel 中的限流实现,相比上述根本限流算法,Sentinel 限流的第一个个性就是引入“资源”的概念,能够细粒度多样性的反对特定资源、关联资源、指定链路的限流。
FlowSlot 的次要逻辑都在 FlowRuleChecker 里,介绍之前,咱们先看一下 Sentinel 对于规定的模型形容,下图别离是限流、访问控制规定、零碎爱护规定(Linux 负载)、降级规定
/**
* 流量管制两种模式
* 0: thread count(当调用该 api 的线程数达到阈值的时候,进行限流)* 1: QPS(当调用该 api 的 QPS 达到阈值的时候,进行限流)*/
private int grade = RuleConstant.FLOW_GRADE_QPS;
/**
* 流量管制阈值,值含意与 grade 无关
*/
private double count;
/**
* 调用关系限流策略(能够反对关联资源或指定链路的多样性限流需要)* 间接(api 达到限流条件时,间接限流)* 关联(当关联的资源达到限流阈值时,就限流本人)* 链路(只记录指定链路上的流量)* {@link RuleConstant#STRATEGY_DIRECT} for direct flow control (by origin);
* {@link RuleConstant#STRATEGY_RELATE} for relevant flow control (with relevant resource);
* {@link RuleConstant#STRATEGY_CHAIN} for chain flow control (by entrance resource).
*/
private int strategy = RuleConstant.STRATEGY_DIRECT;
/**
* Reference resource in flow control with relevant resource or context.
*/
private String refResource;
/**
* 流控成果:* 0. default(reject directly), 间接回绝,抛异样 FlowException
* 1. warm up, 慢启动模式(依据 coldFactor(冷加载因子,默认 3)的值,从阈值 /coldFactor,通过预热时长,才达到设置的 QPS 阈值)* 2. rate limiter 排队期待
* 3. warm up + rate limiter
*/
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
private int warmUpPeriodSec = 10;
/**
* Max queueing time in rate limiter behavior.
*/
private int maxQueueingTimeMs = 500;
/**
* 是否集群限流,默认为否
*/
private boolean clusterMode;
/**
* Flow rule config for cluster mode.
*/
private ClusterFlowConfig clusterConfig;
/**
* The traffic shaping (throttling) controller.
*/
private TrafficShapingController controller;
接着咱们持续剖析 FlowRuleChecker
canPassCheck 第一步会难看 limitApp,这个是联合拜访受权限度规定应用的,默认是所有。
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
// 依据策略抉择 Node 来进行统计(能够是自身 Node、关联的 Node、指定的链路)Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {return true;}
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
// limitApp 是访问控制应用的,默认是 default,不限度起源
String limitApp = rule.getLimitApp();
// 拿到限流策略
int strategy = rule.getStrategy();
String origin = context.getOrigin();
// 基于调用起源做鉴权
if (limitApp.equals(origin) && filterOrigin(origin)) {if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Matches limit origin, return origin statistic node.
return context.getOriginNode();}
//
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Return the cluster node.
return node.getClusterNode();}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {if (strategy == RuleConstant.STRATEGY_DIRECT) {return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
return null;
}
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {String refResource = rule.getRefResource();
int strategy = rule.getStrategy();
if (StringUtil.isEmpty(refResource)) {return null;}
if (strategy == RuleConstant.STRATEGY_RELATE) {return ClusterBuilderSlot.getClusterNode(refResource);
}
if (strategy == RuleConstant.STRATEGY_CHAIN) {if (!refResource.equals(context.getName())) {return null;}
return node;
}
// No node.
return null;
}
// 此代码是 load 限流规定时依据规定初始化流量整形控制器的逻辑,rule.getRater()返回 TrafficShapingController
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {switch (rule.getControlBehavior()) {
// 预热模式返回 WarmUpController
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
// 排队模式返回 ThrottlingController
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new ThrottlingController(rule.getMaxQueueingTimeMs(), rule.getCount());
// 预热 + 排队模式返回 WarmUpRateLimiterController
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
// 默认是 DefaultController
return new DefaultController(rule.getCount(), rule.getGrade());
}
Sentinel 单机限流算法
下面咱们看到依据限流规定 controlBehavior 属性(流控成果),会初始化以下实现:
•DefaultController:是一个十分典型的滑动窗口计数器算法实现,将以后统计的 qps 和申请进来的 qps 进行求和,小于限流值则通过,大于则计算一个等待时间,稍后再试
•ThrottlingController:是漏斗算法的实现,实现思路曾经在源码片段中加了备注
•WarmUpController:实现参考了 Guava 的带预热的 RateLimiter,区别是 Guava 侧重于申请距离,相似后面提到的令牌桶,而 Sentinel 更关注于申请数,和令牌桶算法有点相似
•WarmUpRateLimiterController:低水位应用预热算法,高水位应用滑动窗口计数器算法排队。
DefaultController
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
ThrottlingController
public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat) {this(queueingTimeoutMs, maxCountPerStat, 1000);
}
public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat, int statDurationMs) {AssertUtil.assertTrue(statDurationMs > 0, "statDurationMs should be positive");
AssertUtil.assertTrue(maxCountPerStat >= 0, "maxCountPerStat should be >= 0");
AssertUtil.assertTrue(queueingTimeoutMs >= 0, "queueingTimeoutMs should be >= 0");
this.maxQueueingTimeMs = queueingTimeoutMs;
this.count = maxCountPerStat;
this.statDurationMs = statDurationMs;
// Use nanoSeconds when durationMs%count != 0 or count/durationMs> 1 (to be accurate)
// 可见配置限流值 count 大于 1000 时 useNanoSeconds 会是 true 否则是 false
if (maxCountPerStat > 0) {this.useNanoSeconds = statDurationMs % Math.round(maxCountPerStat) != 0 || maxCountPerStat / statDurationMs > 1;
} else {this.useNanoSeconds = false;}
}
@Override
public boolean canPass(Node node, int acquireCount) {return canPass(node, acquireCount, false);
}
private boolean checkPassUsingNanoSeconds(int acquireCount, double maxCountPerStat) {
final long maxQueueingTimeNs = maxQueueingTimeMs * MS_TO_NS_OFFSET;
long currentTime = System.nanoTime();
// Calculate the interval between every two requests.
final long costTimeNs = Math.round(1.0d * MS_TO_NS_OFFSET * statDurationMs * acquireCount / maxCountPerStat);
// Expected pass time of this request.
long expectedTime = costTimeNs + latestPassedTime.get();
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {final long curNanos = System.nanoTime();
// Calculate the time to wait.
long waitTime = costTimeNs + latestPassedTime.get() - curNanos;
if (waitTime > maxQueueingTimeNs) {return false;}
long oldTime = latestPassedTime.addAndGet(costTimeNs);
waitTime = oldTime - curNanos;
if (waitTime > maxQueueingTimeNs) {latestPassedTime.addAndGet(-costTimeNs);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {sleepNanos(waitTime);
}
return true;
}
}
// 漏斗算法具体实现
private boolean checkPassUsingCachedMs(int acquireCount, double maxCountPerStat) {long currentTime = TimeUtil.currentTimeMillis();
// 计算两次申请的距离(分为秒级和纳秒级)long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat);
// 申请的冀望的工夫
long expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
// latestPassedTime 是 AtomicLong 类型,反对 volatile 语义
latestPassedTime.set(currentTime);
return true;
} else {
// 计算等待时间
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
// 如果大于最大排队工夫,则触发限流
if (waitTime > maxQueueingTimeMs) {return false;}
long oldTime = latestPassedTime.addAndGet(costTime);
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {sleepMs(waitTime);
}
return true;
}
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {return true;}
// Reject when count is less or equal than 0.
// Otherwise, the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {return false;}
if (useNanoSeconds) {return checkPassUsingNanoSeconds(acquireCount, this.count);
} else {return checkPassUsingCachedMs(acquireCount, this.count);
}
}
private void sleepMs(long ms) {
try {Thread.sleep(ms);
} catch (InterruptedException e) {}}
private void sleepNanos(long ns) {LockSupport.parkNanos(ns);
}
long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat);
由上述计算两次申请距离的公式咱们能够发现,当 maxCountPerStat(规定配置的限流值 QPS)超过 1000 后,就无奈精确计算出匀速排队模式下的申请距离时长,因而对应后面介绍的,当规定配置限流值超过 1000QPS 后,会采纳 checkPassUsingNanoSeconds,小于 1000QPS 会采纳 checkPassUsingCachedMs,比照一下 checkPassUsingNanoSeconds 和 checkPassUsingCachedMs,能够发现主体思路没变,只是统计维度从毫秒换算成了纳秒,因而只看 checkPassUsingCachedMs 实现就能够
WarmUpController
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的 qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 耗费的速度要比 warning 快,然而要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {return true;}
} else {if (passQps + acquireCount <= count) {return true;}
}
return false;
}
protected void syncToken(long passQps) {long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {return;}
long oldValue = storedTokens.get();
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
private long coolDownTokens(long currentTime, long passQps) {long oldValue = storedTokens.get();
long newValue = oldValue;
// 增加令牌的判断前提条件:
// 当令牌的耗费水平远远低于警戒线的时候
if (oldValue < warningToken) {newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {if (passQps < (int)count / coldFactor) {newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
2.4.3 集群限流
passClusterCheck 办法(因为 clusterService 找不到会降级到非集群限流)
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
try {
// 获取以后节点是 Token Client 还是 Token Server
TokenService clusterService = pickClusterService();
if (clusterService == null) {return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
long flowId = rule.getClusterConfig().getFlowId();
// 依据获取的 flowId 通过 TokenService 进行申请 token。从下面可知,它可能是 TokenClient 调用的,也可能是 ToeknServer 调用的。别离对应的类是 DefaultClusterTokenClient 和 DefaultTokenService
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
// If client is absent, then fallback to local mode.
} catch (Throwable ex) {RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
}
// Fallback to local flow control when token client or server for this rule is not available.
// If fallback is not enabled, then directly pass.
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
// 获取以后节点是 Token Client 还是 Token Server。//1)如果以后节点的角色是 Client,返回的 TokenService 为 DefaultClusterTokenClient;//2)如果以后节点的角色是 Server,则默认返回的 TokenService 为 DefaultTokenService。private static TokenService pickClusterService() {if (ClusterStateManager.isClient()) {return TokenClientProvider.getClient();
}
if (ClusterStateManager.isServer()) {return EmbeddedClusterTokenServerProvider.getServer();
}
return null;
}
集群限流模式
Sentinel 集群限流服务端有两种启动形式:
•嵌入模式(Embedded)适宜利用级别的限流,部署简略,但对利用性能有影响
•独立模式(Alone)适宜全局限流,须要独立部署
思考到文章篇幅,集群限流有机会再开展具体介绍。
集群限流模式降级
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
try {TokenService clusterService = pickClusterService();
if (clusterService == null) {return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
long flowId = rule.getClusterConfig().getFlowId();
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
// If client is absent, then fallback to local mode.
} catch (Throwable ex) {RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
}
// Fallback to local flow control when token client or server for this rule is not available.
// If fallback is not enabled, then directly pass.
// 能够看到如果集群限流有异样,会降级到单机限流模式,如果配置不容许降级,那么间接会跳过此次校验
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
2.5 DegradeSlot
CircuitBreaker
大神对断路器的解释:https://martinfowler.com/bliki/CircuitBreaker.html
首先就看到了依据资源名称获取断路器列表,Sentinel 的断路器有两个实现:RT 模式应用 ResponseTimeCircuitBreaker、异样模式应用 ExceptionCircuitBreaker
public interface CircuitBreaker {
/**
* Get the associated circuit breaking rule.
*
* @return associated circuit breaking rule
*/
DegradeRule getRule();
/**
* Acquires permission of an invocation only if it is available at the time of invoking.
*
* @param context context of current invocation
* @return {@code true} if permission was acquired and {@code false} otherwise
*/
boolean tryPass(Context context);
/**
* Get current state of the circuit breaker.
*
* @return current state of the circuit breaker
*/
State currentState();
/**
* <p>Record a completed request with the context and handle state transformation of the circuit breaker.</p>
* <p>Called when a <strong>passed</strong> invocation finished.</p>
*
* @param context context of current invocation
*/
void onRequestComplete(Context context);
/**
* Circuit breaker state.
*/
enum State {
/**
* In {@code OPEN} state, all requests will be rejected until the next recovery time point.
*/
OPEN,
/**
* In {@code HALF_OPEN} state, the circuit breaker will allow a "probe" invocation.
* If the invocation is abnormal according to the strategy (e.g. it's slow), the circuit breaker
* will re-transform to the {@code OPEN} state and wait for the next recovery time point;
* otherwise the resource will be regarded as "recovered" and the circuit breaker
* will cease cutting off requests and transform to {@code CLOSED} state.
*/
HALF_OPEN,
/**
* In {@code CLOSED} state, all requests are permitted. When current metric value exceeds the threshold,
* the circuit breaker will transform to {@code OPEN} state.
*/
CLOSED
}
}
以 ExceptionCircuitBreaker 为例看一下具体实现
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
// 异样模式有两种,异样率和异样数
private final int strategy;
// 最小申请数
private final int minRequestAmount;
// 阈值
private final double threshold;
// LeapArray 是 sentinel 统计数据十分重要的一个构造,次要封装了工夫窗口相干的操作
private final LeapArray<SimpleErrorCounter> stat;
public ExceptionCircuitBreaker(DegradeRule rule) {this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs()));
}
ExceptionCircuitBreaker(DegradeRule rule, LeapArray<SimpleErrorCounter> stat) {super(rule);
this.strategy = rule.getGrade();
boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT;
AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count");
AssertUtil.notNull(stat, "stat cannot be null");
this.minRequestAmount = rule.getMinRequestAmount();
this.threshold = rule.getCount();
this.stat = stat;
}
@Override
protected void resetStat() {// Reset current bucket (bucket count = 1).
stat.currentWindow().value().reset();}
@Override
public void onRequestComplete(Context context) {Entry entry = context.getCurEntry();
if (entry == null) {return;}
Throwable error = entry.getError();
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {counter.getErrorCount().add(1);
}
counter.getTotalCount().add(1);
handleStateChangeWhenThresholdExceeded(error);
}
private void handleStateChangeWhenThresholdExceeded(Throwable error) {if (currentState.get() == State.OPEN) {return;}
if (currentState.get() == State.HALF_OPEN) {
// In detecting request
if (error == null) {fromHalfOpenToClose();
} else {fromHalfOpenToOpen(1.0d);
}
return;
}
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {+= counter.errorCount.sum();
totalCount += counter.totalCount.sum();}
if (totalCount < minRequestAmount) {return;}
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
// Use errorRatio
curCount = errCount * 1.0d / totalCount;
}
if (curCount > threshold) {transformToOpen(curCount);
}
}
static class SimpleErrorCounter {
private LongAdder errorCount;
private LongAdder totalCount;
public SimpleErrorCounter() {this.errorCount = new LongAdder();
this.totalCount = new LongAdder();}
public LongAdder getErrorCount() {return errorCount;}
public LongAdder getTotalCount() {return totalCount;}
public SimpleErrorCounter reset() {errorCount.reset();
totalCount.reset();
return this;
}
@Override
public String toString() {
return "SimpleErrorCounter{" +
"errorCount=" + errorCount +
", totalCount=" + totalCount +
'}';
}
}
static class SimpleErrorCounterLeapArray extends LeapArray<SimpleErrorCounter> {public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) {super(sampleCount, intervalInMs);
}
@Override
public SimpleErrorCounter newEmptyBucket(long timeMillis) {return new SimpleErrorCounter();
}
@Override
protected WindowWrap<SimpleErrorCounter> resetWindowTo(WindowWrap<SimpleErrorCounter> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
w.value().reset();
return w;
}
}
}
2.6 SystemSlot
校验逻辑次要集中在 com.alibaba.csp.sentinel.slots.system.SystemRuleManager#checkSystem,以下是片段,能够看到,作为负载爱护规定校验,实现了集群的 QPS、线程、RT(响应工夫)、零碎负载的管制,除零碎负载以外,其余统计都是依赖 StatisticSlot 实现,零碎负载是通过 SystemRuleManager 定时调度 SystemStatusListener,通过 OperatingSystemMXBean 去获取
/**
* Apply {@link SystemRule} to the resource. Only inbound traffic will be checked.
*
* @param resourceWrapper the resource.
* @throws BlockException when any system rule's threshold is exceeded.
*/
public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {if (resourceWrapper == null) {return;}
// Ensure the checking switch is on.
if (!checkSystemStatus.get()) {return;}
// for inbound traffic only
if (resourceWrapper.getEntryType() != EntryType.IN) {return;}
// total qps 此处是拿到某个资源在集群中的 QPS 总和,相干概念能够会看初始化对于 Node 的介绍
double currentQps = Constants.ENTRY_NODE.passQps();
if (currentQps + count > qps) {throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// total thread
int currentThread = Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
double rt = Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
// load. BBR algorithm.
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {if (!checkBbr(currentThread)) {throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// cpu usage
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
private static boolean checkBbr(int currentThread) {
if (currentThread > 1 &&
currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {return false;}
return true;
}
public static double getCurrentSystemAvgLoad() {return statusListener.getSystemAverageLoad();
}
public static double getCurrentCpuUsage() {return statusListener.getCpuUsage();
}
public class SystemStatusListener implements Runnable {
volatile double currentLoad = -1;
volatile double currentCpuUsage = -1;
volatile String reason = StringUtil.EMPTY;
volatile long processCpuTime = 0;
volatile long processUpTime = 0;
public double getSystemAverageLoad() {return currentLoad;}
public double getCpuUsage() {return currentCpuUsage;}
@Override
public void run() {
try {OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
currentLoad = osBean.getSystemLoadAverage();
/*
* Java Doc copied from {@link OperatingSystemMXBean#getSystemCpuLoad()}:</br>
* Returns the "recent cpu usage" for the whole system. This value is a double in the [0.0,1.0] interval.
* A value of 0.0 means that all CPUs were idle during the recent period of time observed, while a value
* of 1.0 means that all CPUs were actively running 100% of the time during the recent period being
* observed. All values between 0.0 and 1.0 are possible depending of the activities going on in the
* system. If the system recent cpu usage is not available, the method returns a negative value.
*/
double systemCpuUsage = osBean.getSystemCpuLoad();
// calculate process cpu usage to support application running in container environment
RuntimeMXBean runtimeBean = ManagementFactory.getPlatformMXBean(RuntimeMXBean.class);
long newProcessCpuTime = osBean.getProcessCpuTime();
long newProcessUpTime = runtimeBean.getUptime();
int cpuCores = osBean.getAvailableProcessors();
long processCpuTimeDiffInMs = TimeUnit.NANOSECONDS
.toMillis(newProcessCpuTime - processCpuTime);
long processUpTimeDiffInMs = newProcessUpTime - processUpTime;
double processCpuUsage = (double) processCpuTimeDiffInMs / processUpTimeDiffInMs / cpuCores;
processCpuTime = newProcessCpuTime;
processUpTime = newProcessUpTime;
currentCpuUsage = Math.max(processCpuUsage, systemCpuUsage);
if (currentLoad > SystemRuleManager.getSystemLoadThreshold()) {writeSystemStatusLog();
}
} catch (Throwable e) {RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e);
}
}
private void writeSystemStatusLog() {StringBuilder sb = new StringBuilder();
sb.append("Load exceeds the threshold:");
sb.append("load:").append(String.format("%.4f", currentLoad)).append(";");
sb.append("cpuUsage:").append(String.format("%.4f", currentCpuUsage)).append(";");
sb.append("qps:").append(String.format("%.4f", Constants.ENTRY_NODE.passQps())).append(";");
sb.append("rt:").append(String.format("%.4f", Constants.ENTRY_NODE.avgRt())).append(";");
sb.append("thread:").append(Constants.ENTRY_NODE.curThreadNum()).append(";");
sb.append("success:").append(String.format("%.4f", Constants.ENTRY_NODE.successQps())).append(";");
sb.append("minRt:").append(String.format("%.2f", Constants.ENTRY_NODE.minRt())).append(";");
sb.append("maxSuccess:").append(String.format("%.2f", Constants.ENTRY_NODE.maxSuccessQps())).append(";");
RecordLog.info(sb.toString());
}
}
三、京东版最佳实际
3.1 应用形式
Sentinel 应用形式自身非常简单,就是一个注解,然而要思考规定加载和规定长久化的形式,现有的形式有:
•应用 Sentinel-dashboard 性能:应用面板接入须要保护一个配置规定的治理端,思考到偏后端的零碎须要额定保护一个面板老本较大,如果是像 RPC 框架这种自身有治理端的接入能够思考次计划。
•中间件(如:zookepper、nacos、eureka、redis 等):Sentinel 源码 extension 包里提供了相似的实现,如下图
联合京东理论,我实现了一个规定热部署的 Sentinel 组件,实现形式相似 zookeeper 的形式,将规定记录到 ducc 的一个 key 上,在 spring 容器启动时做第一次规定加载和监听器注册,组件也做一了一些规定读取,校验、实例化不同规定对象的工作
插件应用形式:注解 + 配置
第一步 引入组件
<dependency>
<groupId>com.jd.ldop.tools</groupId>
<artifactId>sentinel-tools</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
第二步 初始化 sentinelProcess
反对 ducc、本地文件读取、间接写入三种形式规定写入形式
目前反对限流规定、熔断降级规定两种模式,零碎负载保护模式待开发和验证
<!-- 基于 sentinel 的降级、限流、熔断组件 -->
<bean id="sentinelProcess" class="com.jd.ldop.sentinel.SentinelProcess">
<property name="ruleResourceWrappers">
<list>
<ref bean="degradeRule"/>
</list>
</property>
</bean>
<!-- 降级或限流规定配置 -->
<bean id="degradeRule" class="com.jd.ldop.sentinel.dto.RuleResourceWrapper">
<constructor-arg index="0" value="ducc.degradeRule"/>
<constructor-arg index="1" value="0"/>
<constructor-arg index="2" value="0"/>
</bean>
ducc 上配置如下:
第三步 定义资源和关联类型
通过 @SentinelResource 能够间接在任意地位定义资源名以及对应的熔断降级或者限流形式、回调办法等,同时也能够指定关联类型,反对间接、关联、指定链路三种
@Override
@SentinelResource(value = "modifyGetWaybillState", fallback = "executeDegrade")
public ExecutionResult<List<Integer>> execute(@NotNull Model imodel) {// 业务逻辑解决}
public ExecutionResult<List<Integer>> executeDegrade(@NotNull Model imodel) {// 降级业务逻辑解决}
3.2 利用场景
组件反对任意的业务降级、限流、负载爱护
四、Sentinel 压测数据
4.1 压测指标
调用量:1.2W/m
利用机器内存稳固在 50% 以内
机器规格:8C16G50G 磁盘 *2
Sentinel 降级规定:
count=350——- 慢调用临界阈值 350ms
timeWindow=180—— 熔断工夫窗口 180s
grade=0—– 降级模式 慢调用
statIntervalMs=60000—— 统计时长 1min
4.2 压测后果
利用机器监控:
压测分为了两个阶段,别离是组件开启和组件敞开两次,前半部分是组件开启的状况,后半局部是组件敞开的状况
利用过程内存剖析, 和 sentinel 无关的前三对象是
com.alibaba.csp.sentinel.node.metric.MetricNode
com.alibaba.csp.sentinel.CtEntry
com.alibaba.csp.sentinel.context.Context
4.3 压测论断
使 Sentinel 组件实现零碎服务主动降级或限流,因为 sentinel 会依照滑动窗口周期性统计数据,因而会占用肯定的机器内存,应用时应设置正当的规定,如:正当的统计时长、防止过多的 Sentinel 资源创立等。
总体来说,应用 sentinel 组件对利用 cpu 和内存影响不大。