作者:京东物流 杨建民

一、Sentinel简介

Sentinel 以流量为切入点,从流量管制熔断降级零碎负载爱护等多个维度爱护服务的稳定性。

Sentinel 具备以下特色:

  • 丰盛的利用场景:秒杀(即突发流量管制在零碎容量能够接受的范畴)、音讯削峰填谷、集群流量管制、实时熔断上游不可用利用等。
  • 齐备的实时监控:Sentinel 同时提供实时的监控性能。您能够在控制台中看到接入利用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行状况。
  • 宽泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Apache Dubbo、gRPC、Quarkus 的整合。您只须要引入相应的依赖并进行简略的配置即可疾速地接入 Sentinel。同时 Sentinel 提供 Java/Go/C++ 等多语言的原生实现。
  • 欠缺的 SPI 扩大机制:Sentinel 提供简略易用、欠缺的 SPI 扩大接口。您能够通过实现扩大接口来疾速地定制逻辑。例如定制规定治理、适配动静数据源等

无关Sentinel的具体介绍以及和Hystrix的区别能够自行网上检索,举荐一篇文章:https://mp.weixin.qq.com/s/Q7Xv8cypQFrrOQhbd9BOXw

本次次要应用了Sentinel的降级、限流、零碎负载爱护性能

二、Sentinel关键技术源码解析

无论是限流、降级、负载等管制伎俩,大抵流程如下:

•StatisticSlot 则用于记录、统计不同维度的 runtime 指标监控信息

•责任链顺次触发后续 slot 的 entry 办法,如 SystemSlot、FlowSlot、DegradeSlot 等的规定校验;

•当后续的 slot 通过,没有抛出 BlockException 异样,阐明该资源被胜利调用,则减少执行线程数和通过的申请数等信息。

对于数据统计,次要会牵扯到 ArrayMetric、BucketLeapArray、MetricBucket、WindowWrap 等类。

我的项目构造

以下次要剖析core包里的内容

2.1注解入口

2.1.1 Entry、Context、Node

SphU门面类的办法出参都是Entry,Entry能够了解为每次进入资源的一个凭证,如果调用SphO.entry()或者SphU.entry()能获取Entry对象,代表获取了凭证,没有被限流,否则抛出一个BlockException。

Entry中持有本次对资源调用的相干信息:

•createTime:创立该Entry的工夫戳。

•curNode:Entry以后是在哪个节点。

•orginNode:Entry的调用源节点。

•resourceWrapper:Entry关联的资源信息。

Entry是一个抽象类,CtEntry是Entry的实现,CtEntry持有Context和调用链的信息

Context的源码正文如下,

This class holds metadata of current invocation

Node的源码正文

Holds real-time statistics for resources

Node中保留了对资源的实时数据的统计,Sentinel中的限流或者降级等性能就是通过Node中的数据进行判断的。Node是一个接口,外面定义了各种操作request、exception、rt、qps、thread的办法。

在细看Node实现时,不难发现LongAddr的应用,对于LongAddr和DoubleAddr都是java8 java.util.concurrent.atomic里的内容,感兴趣的小伙伴能够再深入研究一下,这两个是高并发下计数性能十分优良的数据结构,理论利用场景里须要计数时能够思考应用。

对于Node的介绍后续还会深刻,此处大抵先提一下这个概念。

2.2 初始化

2.2.1 Context初始化

在初始化slot责任链局部前,还执行了context的初始化,外面波及几个重要概念,须要解释一下:

能够发现在Context初始化的过程中,会把EntranceNode退出到Root子节点中(理论Root自身是一个非凡的EntranceNode),并把EntranceNode放到contextNameNodeMap中。

之前简略提到过Node,是用来统计数据用的,不同Node性能如下:

•Node:用于实现数据统计的接口

•StatisticNode:统计节点,是Node接口的实现类,用于实现数据统计

•EntranceNode:入口节点,一个Context会有一个入口节点,用于统计以后Context的总体流量数据

•DefaultNode:默认节点,用于统计一个资源在以后Context中的流量数据

•ClusterNode:集群节点,用于统计一个资源在所有Context中的总体流量数据

protected static Context trueEnter(String name, String origin) {        Context context = contextHolder.get();        if (context == null) {            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;            DefaultNode node = localCacheNameMap.get(name);            if (node == null) {                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {                    setNullContext();                    return NULL_CONTEXT;                } else {                    LOCK.lock();                    try {                        node = contextNameNodeMap.get(name);                        if (node == null) {                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {                                setNullContext();                                return NULL_CONTEXT;                            } else {                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);                                // Add entrance node.                                Constants.ROOT.addChild(node);                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);                                newMap.putAll(contextNameNodeMap);                                newMap.put(name, node);                                contextNameNodeMap = newMap;                            }                        }                    } finally {                        LOCK.unlock();                    }                }            }            context = new Context(node, name);            context.setOrigin(origin);            contextHolder.set(context);        }        return context;    }

2.2.2 通过SpiLoader默认初始化8个slot

每个slot的主要职责如下:

•NodeSelectorSlot 负责收集资源的门路,并将这些资源的调用门路,以树状构造存储起来,用于依据调用门路来限流降级

•ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的根据

•StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息

•FlowSlot 则用于依据预设的限流规定以及后面 slot 统计的状态,来进行流量管制

•AuthoritySlot 则依据配置的黑白名单和调用起源信息,来做黑白名单管制

•DegradeSlot 则通过统计信息以及预设的规定,来做熔断降级

•SystemSlot 则通过零碎的状态,例如 集群QPS、线程数、RT、负载 等,来管制总的入口流量

2.3 StatisticSlot

2.3.1 Node

深刻看一下Node,因为统计信息都在外面,前面不论是限流、熔断、负载爱护等都是联合规定+统计信息判断是否要执行

从Node的源码正文看,它会持有资源维度的实时统计数据,以下是接口里的办法定义,能够看到totalRequest、totalPass、totalSuccess、blockRequest、totalException、passQps等很多request、qps、thread的相干办法:

/** * Holds real-time statistics for resources. * * @author qinan.qn * @author leyou * @author Eric Zhao */public interface Node extends OccupySupport, DebugSupport {    long totalRequest();    long totalPass();    long totalSuccess();    long blockRequest();    long totalException();    double passQps();    double blockQps();    double totalQps();    double successQps();    ……}

2.3.2 StatisticNode

咱们先从最根底的StatisticNode开始看,源码给出的定位是:

The statistic node keep three kinds of real-time statistics metrics:metrics in second level ({@code rollingCounterInSecond})metrics in minute level ({@code rollingCounterInMinute})thread count

StatisticNode只有四个属性,除了之前提到过的LongAddr类型的curThreadNum外,还有两个属性是Metric对象,通过入参曾经属性命名能够看出,一个用于秒级,一个用于分钟级统计。接下来咱们就要看看Metric

// StatisticNode持有两个Metric,一个秒级一个分钟级,由入参可知,秒级统计划分了两个工夫窗口,窗口水平是500msprivate transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,    IntervalProperty.INTERVAL);// 分钟级统计划分了60个工夫窗口,窗口长度是1000msprivate transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);/** * The counter for thread count. */private LongAdder curThreadNum = new LongAdder();/** * The last timestamp when metrics were fetched. */private long lastFetchTime = -1;

ArrayMetric只有一个属性LeapArray<MetricBucket>,其余都是用于统计的办法,LeapArray是sentinel中统计最根本的数据结构,这里有必要具体看一下,总体就是依据timeMillis去获取一个bucket,分为:没有创立、有间接返回、被废除后的reset三种场景。

//以分钟级的统计属性为例,看一下工夫窗口初始化过程private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);public LeapArray(int sampleCount, int intervalInMs) {        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");        // windowLengthInMs = 60*1000 / 60 = 1000 滑动窗口工夫长度,可见sentinel默认将单位工夫分为了60个滑动窗口进行数据统计        this.windowLengthInMs = intervalInMs / sampleCount;        // 60*1000        this.intervalInMs = intervalInMs;        // 60        this.intervalInSecond = intervalInMs / 1000.0;        // 60        this.sampleCount = sampleCount;        // 数组长度60        this.array = new AtomicReferenceArray<>(sampleCount);    }/**     * Get bucket item at provided timestamp.     *     * @param timeMillis a valid timestamp in milliseconds     * @return current bucket item at provided timestamp if the time is valid; null if time is invalid     */    public WindowWrap<T> currentWindow(long timeMillis) {        if (timeMillis < 0) {            return null;        }        // 依据以后工夫戳算一个数组索引        int idx = calculateTimeIdx(timeMillis);        // Calculate current bucket start time.        // timeMillis % 1000        long windowStart = calculateWindowStart(timeMillis);        /*         * Get bucket item at given time from the array.         *         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.         * (2) Bucket is up-to-date, then just return the bucket.         * (3) Bucket is deprecated, then reset current bucket.         */        while (true) {            WindowWrap<T> old = array.get(idx);            if (old == null) {                /*                 *     B0       B1      B2    NULL      B4                 * ||_______|_______|_______|_______|_______||___                 * 200     400     600     800     1000    1200  timestamp                 *                             ^                 *                          time=888                 *            bucket is empty, so create new and update                 *                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},                 * then try to update circular array via a CAS operation. Only one thread can                 * succeed to update, while other threads yield its time slice.                 */                // newEmptyBucket 办法重写,秒级和分钟级统计对象实现不同                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));                if (array.compareAndSet(idx, null, window)) {                    // Successfully updated, return the created bucket.                    return window;                } else {                    // Contention failed, the thread will yield its time slice to wait for bucket available.                    Thread.yield();                }            } else if (windowStart == old.windowStart()) {                /*                 *     B0       B1      B2     B3      B4                 * ||_______|_______|_______|_______|_______||___                 * 200     400     600     800     1000    1200  timestamp                 *                             ^                 *                          time=888                 *            startTime of Bucket 3: 800, so it's up-to-date                 *                 * If current {@code windowStart} is equal to the start timestamp of old bucket,                 * that means the time is within the bucket, so directly return the bucket.                 */                return old;            } else if (windowStart > old.windowStart()) {                /*                 *   (old)                 *             B0       B1      B2    NULL      B4                 * |_______||_______|_______|_______|_______|_______||___                 * ...    1200     1400    1600    1800    2000    2200  timestamp                 *                              ^                 *                           time=1676                 *          startTime of Bucket 2: 400, deprecated, should be reset                 *                 * If the start timestamp of old bucket is behind provided time, that means                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.                 * Note that the reset and clean-up operations are hard to be atomic,                 * so we need a update lock to guarantee the correctness of bucket update.                 *                 * The update lock is conditional (tiny scope) and will take effect only when                 * bucket is deprecated, so in most cases it won't lead to performance loss.                 */                if (updateLock.tryLock()) {                    try {                        // Successfully get the update lock, now we reset the bucket.                        return resetWindowTo(old, windowStart);                    } finally {                        updateLock.unlock();                    }                } else {                    // Contention failed, the thread will yield its time slice to wait for bucket available.                    Thread.yield();                }            } else if (windowStart < old.windowStart()) {                // Should not go through here, as the provided time is already behind.                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));            }        }    }// 持有一个工夫窗口对象的数据,会依据以后工夫戳除以工夫窗口长度而后散列到数组中private int calculateTimeIdx(/*@Valid*/ long timeMillis) {        long timeId = timeMillis / windowLengthInMs;        // Calculate current index so we can map the timestamp to the leap array.        return (int)(timeId % array.length());    }

WindowWrap持有了windowLengthInMs, windowStart和LeapArray(分钟统计实现是BucketLeapArray,秒级统计实现是OccupiableBucketLeapArray),对于分钟级别的统计,MetricBucket保护了一个longAddr数组和一个配置的minRT

/** * The fundamental data structure for metric statistics in a time span. * * @author jialiang.linjl * @author Eric Zhao * @see LeapArray */public class BucketLeapArray extends LeapArray<MetricBucket> {    public BucketLeapArray(int sampleCount, int intervalInMs) {        super(sampleCount, intervalInMs);    }    @Override    public MetricBucket newEmptyBucket(long time) {        return new MetricBucket();    }    @Override    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {        // Update the start time and reset value.        w.resetTo(startTime);        w.value().reset();        return w;    }}

对于秒级统计,QPS=20场景下,如何精确统计的问题,此处用到了另外一个LeapArry实现FutureBucketLeapArray,至于秒级统计如何保障没有统计误差,读者能够再钻研一下FutureBucketLeapArray的上下文就好。

2.4 FlowSlot

2.4.1 常见限流算法

介绍sentinel限流实现前,先介绍一下常见限流算法,根本分为三种:计数器、漏斗、令牌桶。

计数器算法

顾名思义,计数器算法就是统计某个时间段内的申请,每单位工夫加1,而后与配置的限流值(最大QPS)进行比拟,如果超出则触发限流。然而这种算法不能做到“平滑限流”,以1s为单位工夫,100QPS为限流值为例,如下图,会呈现某时段超出限流值的状况

因而在单纯计数器算法上,又呈现了滑动窗口计数器算法,咱们将统计工夫细分,比方将1s统计时长分为5个工夫窗口,通过滚动统计所有工夫窗口的QPS作为零碎理论的QPS的形式,就能解决上述临界统计问题,后续咱们看sentinel源码时也能看到相似操作。

漏斗算法

不管流量有多大都会先到漏桶中,而后以平均的速度流出。如何在代码中实现这个匀速呢?比方咱们想让匀速为100q/s,那么咱们能够失去每流出一个流量须要耗费10ms,相似一个队列,每隔10ms从队列头部取出流量进行放行,而咱们的队列也就是漏桶,当流量大于队列的长度的时候,咱们就能够回绝超出的局部。

漏斗算法同样的也有肯定的毛病:无奈应答突发流量。比方一瞬间来了100个申请,在漏桶算法中只能一个一个的过来,当最初一个申请流出的时候工夫曾经过了一秒了,所以漏斗算法比拟适宜申请达到比拟平均,须要严格控制申请速率的场景。

令牌桶算法

令牌桶算法和漏斗算法比拟相似,区别是令牌桶寄存的是令牌数量不是申请数量,令牌桶能够依据本身需要多样性得治理令牌的生产和耗费,能够解决突发流量的问题。

2.4.2 单机限流模式

接下来咱们看一下Sentinel中的限流实现,相比上述根本限流算法,Sentinel限流的第一个个性就是引入“资源”的概念,能够细粒度多样性的反对特定资源、关联资源、指定链路的限流。

FlowSlot的次要逻辑都在FlowRuleChecker里,介绍之前,咱们先看一下Sentinel对于规定的模型形容,下图别离是限流、访问控制规定、零碎爱护规定(Linux负载)、降级规定

    /**     * 流量管制两种模式      *   0: thread count(当调用该api的线程数达到阈值的时候,进行限流)     *   1: QPS(当调用该api的QPS达到阈值的时候,进行限流)     */    private int grade = RuleConstant.FLOW_GRADE_QPS;    /**     * 流量管制阈值,值含意与grade无关     */    private double count;    /**     * 调用关系限流策略(能够反对关联资源或指定链路的多样性限流需要)     *  间接(api 达到限流条件时,间接限流)     *  关联(当关联的资源达到限流阈值时,就限流本人)     *  链路(只记录指定链路上的流量)     * {@link RuleConstant#STRATEGY_DIRECT} for direct flow control (by origin);     * {@link RuleConstant#STRATEGY_RELATE} for relevant flow control (with relevant resource);     * {@link RuleConstant#STRATEGY_CHAIN} for chain flow control (by entrance resource).     */    private int strategy = RuleConstant.STRATEGY_DIRECT;    /**     * Reference resource in flow control with relevant resource or context.     */    private String refResource;    /**     * 流控成果:     * 0. default(reject directly),间接回绝,抛异样FlowException     * 1. warm up, 慢启动模式(依据coldFactor(冷加载因子,默认3)的值,从阈值/coldFactor,通过预热时长,才达到设置的QPS阈值)     * 2. rate limiter  排队期待     * 3. warm up + rate limiter     */    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;    private int warmUpPeriodSec = 10;    /**     * Max queueing time in rate limiter behavior.     */    private int maxQueueingTimeMs = 500;    /**    *  是否集群限流,默认为否    */    private boolean clusterMode;    /**     * Flow rule config for cluster mode.     */    private ClusterFlowConfig clusterConfig;    /**     * The traffic shaping (throttling) controller.     */    private TrafficShapingController controller;

接着咱们持续剖析FlowRuleChecker

canPassCheck第一步会难看limitApp,这个是联合拜访受权限度规定应用的,默认是所有。

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,                                          boolean prioritized) {        // 依据策略抉择Node来进行统计(能够是自身Node、关联的Node、指定的链路)        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);        if (selectedNode == null) {            return true;        }        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);    }static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {        // limitApp是访问控制应用的,默认是default,不限度起源        String limitApp = rule.getLimitApp();        // 拿到限流策略        int strategy = rule.getStrategy();        String origin = context.getOrigin();        // 基于调用起源做鉴权        if (limitApp.equals(origin) && filterOrigin(origin)) {            if (strategy == RuleConstant.STRATEGY_DIRECT) {                // Matches limit origin, return origin statistic node.                return context.getOriginNode();            }            //             return selectReferenceNode(rule, context, node);        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {            if (strategy == RuleConstant.STRATEGY_DIRECT) {                // Return the cluster node.                return node.getClusterNode();            }            return selectReferenceNode(rule, context, node);        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)            && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {            if (strategy == RuleConstant.STRATEGY_DIRECT) {                return context.getOriginNode();            }            return selectReferenceNode(rule, context, node);        }        return null;    }static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {        String refResource = rule.getRefResource();        int strategy = rule.getStrategy();        if (StringUtil.isEmpty(refResource)) {            return null;        }        if (strategy == RuleConstant.STRATEGY_RELATE) {            return ClusterBuilderSlot.getClusterNode(refResource);        }        if (strategy == RuleConstant.STRATEGY_CHAIN) {            if (!refResource.equals(context.getName())) {                return null;            }            return node;        }        // No node.        return null;    }// 此代码是load限流规定时依据规定初始化流量整形控制器的逻辑,rule.getRater()返回TrafficShapingControllerprivate static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {            switch (rule.getControlBehavior()) {                // 预热模式返回WarmUpController                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:                    return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),                            ColdFactorProperty.coldFactor);                // 排队模式返回ThrottlingController                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:                    return new ThrottlingController(rule.getMaxQueueingTimeMs(), rule.getCount());                // 预热+排队模式返回WarmUpRateLimiterController                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:                    return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),                            rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:                default:                    // Default mode or unknown mode: default traffic shaping controller (fast-reject).            }        }        // 默认是DefaultController        return new DefaultController(rule.getCount(), rule.getGrade());    }
Sentinel单机限流算法

下面咱们看到依据限流规定controlBehavior属性(流控成果),会初始化以下实现:

•DefaultController:是一个十分典型的滑动窗口计数器算法实现,将以后统计的qps和申请进来的qps进行求和,小于限流值则通过,大于则计算一个等待时间,稍后再试

•ThrottlingController:是漏斗算法的实现,实现思路曾经在源码片段中加了备注

•WarmUpController:实现参考了Guava的带预热的RateLimiter,区别是Guava侧重于申请距离,相似后面提到的令牌桶,而Sentinel更关注于申请数,和令牌桶算法有点相似

•WarmUpRateLimiterController:低水位应用预热算法,高水位应用滑动窗口计数器算法排队。

DefaultController
    @Override    public boolean canPass(Node node, int acquireCount, boolean prioritized) {        int curCount = avgUsedTokens(node);        if (curCount + acquireCount > count) {            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {                long currentTime;                long waitInMs;                currentTime = TimeUtil.currentTimeMillis();                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);                    node.addOccupiedPass(acquireCount);                    sleep(waitInMs);                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.                    throw new PriorityWaitException(waitInMs);                }            }            return false;        }        return true;    }
ThrottlingController
 public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat) {        this(queueingTimeoutMs, maxCountPerStat, 1000);    }    public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat, int statDurationMs) {        AssertUtil.assertTrue(statDurationMs > 0, "statDurationMs should be positive");        AssertUtil.assertTrue(maxCountPerStat >= 0, "maxCountPerStat should be >= 0");        AssertUtil.assertTrue(queueingTimeoutMs >= 0, "queueingTimeoutMs should be >= 0");        this.maxQueueingTimeMs = queueingTimeoutMs;        this.count = maxCountPerStat;        this.statDurationMs = statDurationMs;        // Use nanoSeconds when durationMs%count != 0 or count/durationMs> 1 (to be accurate)        // 可见配置限流值count大于1000时useNanoSeconds会是true否则是false        if (maxCountPerStat > 0) {            this.useNanoSeconds = statDurationMs % Math.round(maxCountPerStat) != 0 || maxCountPerStat / statDurationMs > 1;        } else {            this.useNanoSeconds = false;        }    }    @Override    public boolean canPass(Node node, int acquireCount) {        return canPass(node, acquireCount, false);    }    private boolean checkPassUsingNanoSeconds(int acquireCount, double maxCountPerStat) {        final long maxQueueingTimeNs = maxQueueingTimeMs * MS_TO_NS_OFFSET;        long currentTime = System.nanoTime();        // Calculate the interval between every two requests.        final long costTimeNs = Math.round(1.0d * MS_TO_NS_OFFSET * statDurationMs * acquireCount / maxCountPerStat);        // Expected pass time of this request.        long expectedTime = costTimeNs + latestPassedTime.get();        if (expectedTime <= currentTime) {            // Contention may exist here, but it's okay.            latestPassedTime.set(currentTime);            return true;        } else {            final long curNanos = System.nanoTime();            // Calculate the time to wait.            long waitTime = costTimeNs + latestPassedTime.get() - curNanos;            if (waitTime > maxQueueingTimeNs) {                return false;            }            long oldTime = latestPassedTime.addAndGet(costTimeNs);            waitTime = oldTime - curNanos;            if (waitTime > maxQueueingTimeNs) {                latestPassedTime.addAndGet(-costTimeNs);                return false;            }            // in race condition waitTime may <= 0            if (waitTime > 0) {                sleepNanos(waitTime);            }            return true;        }    }        // 漏斗算法具体实现    private boolean checkPassUsingCachedMs(int acquireCount, double maxCountPerStat) {        long currentTime = TimeUtil.currentTimeMillis();        // 计算两次申请的距离(分为秒级和纳秒级)        long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat);        // 申请的冀望的工夫        long expectedTime = costTime + latestPassedTime.get();        if (expectedTime <= currentTime) {            // latestPassedTime是AtomicLong类型,反对volatile语义            latestPassedTime.set(currentTime);            return true;        } else {            // 计算等待时间            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();            // 如果大于最大排队工夫,则触发限流            if (waitTime > maxQueueingTimeMs) {                return false;            }                        long oldTime = latestPassedTime.addAndGet(costTime);            waitTime = oldTime - TimeUtil.currentTimeMillis();            if (waitTime > maxQueueingTimeMs) {                latestPassedTime.addAndGet(-costTime);                return false;            }            // in race condition waitTime may <= 0            if (waitTime > 0) {                sleepMs(waitTime);            }            return true;        }    }    @Override    public boolean canPass(Node node, int acquireCount, boolean prioritized) {        // Pass when acquire count is less or equal than 0.        if (acquireCount <= 0) {            return true;        }        // Reject when count is less or equal than 0.        // Otherwise, the costTime will be max of long and waitTime will overflow in some cases.        if (count <= 0) {            return false;        }        if (useNanoSeconds) {            return checkPassUsingNanoSeconds(acquireCount, this.count);        } else {            return checkPassUsingCachedMs(acquireCount, this.count);        }    }    private void sleepMs(long ms) {        try {            Thread.sleep(ms);        } catch (InterruptedException e) {        }    }    private void sleepNanos(long ns) {        LockSupport.parkNanos(ns);    }
long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat);

由上述计算两次申请距离的公式咱们能够发现,当maxCountPerStat(规定配置的限流值QPS)超过1000后,就无奈精确计算出匀速排队模式下的申请距离时长,因而对应后面介绍的,当规定配置限流值超过1000QPS后,会采纳checkPassUsingNanoSeconds,小于1000QPS会采纳checkPassUsingCachedMs,比照一下checkPassUsingNanoSeconds和checkPassUsingCachedMs,能够发现主体思路没变,只是统计维度从毫秒换算成了纳秒,因而只看checkPassUsingCachedMs实现就能够

WarmUpController
 @Override    public boolean canPass(Node node, int acquireCount, boolean prioritized) {        long passQps = (long) node.passQps();        long previousQps = (long) node.previousPassQps();        syncToken(previousQps);        // 开始计算它的斜率        // 如果进入了警戒线,开始调整他的qps        long restToken = storedTokens.get();        if (restToken >= warningToken) {            long aboveToken = restToken - warningToken;            // 耗费的速度要比warning快,然而要比慢            // current interval = restToken*slope+1/count            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));            if (passQps + acquireCount <= warningQps) {                return true;            }        } else {            if (passQps + acquireCount <= count) {                return true;            }        }        return false;    }protected void syncToken(long passQps) {        long currentTime = TimeUtil.currentTimeMillis();        currentTime = currentTime - currentTime % 1000;        long oldLastFillTime = lastFilledTime.get();        if (currentTime <= oldLastFillTime) {            return;        }        long oldValue = storedTokens.get();        long newValue = coolDownTokens(currentTime, passQps);        if (storedTokens.compareAndSet(oldValue, newValue)) {            long currentValue = storedTokens.addAndGet(0 - passQps);            if (currentValue < 0) {                storedTokens.set(0L);            }            lastFilledTime.set(currentTime);        }    }private long coolDownTokens(long currentTime, long passQps) {        long oldValue = storedTokens.get();        long newValue = oldValue;        // 增加令牌的判断前提条件:        // 当令牌的耗费水平远远低于警戒线的时候        if (oldValue < warningToken) {            newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);        } else if (oldValue > warningToken) {            if (passQps < (int)count / coldFactor) {                newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);            }        }        return Math.min(newValue, maxToken);    }

2.4.3 集群限流

passClusterCheck办法(因为clusterService找不到会降级到非集群限流)

private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,                                            boolean prioritized) {        try {            // 获取以后节点是Token Client还是Token Server            TokenService clusterService = pickClusterService();            if (clusterService == null) {                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);            }            long flowId = rule.getClusterConfig().getFlowId();            // 依据获取的flowId通过TokenService进行申请token。从下面可知,它可能是TokenClient调用的,也可能是ToeknServer调用的。别离对应的类是DefaultClusterTokenClient和DefaultTokenService            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);            return applyTokenResult(result, rule, context, node, acquireCount, prioritized);            // If client is absent, then fallback to local mode.        } catch (Throwable ex) {            RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);        }        // Fallback to local flow control when token client or server for this rule is not available.        // If fallback is not enabled, then directly pass.        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);    }//获取以后节点是Token Client还是Token Server。//1) 如果以后节点的角色是Client,返回的TokenService为DefaultClusterTokenClient;//2)如果以后节点的角色是Server,则默认返回的TokenService为DefaultTokenService。private static TokenService pickClusterService() {        if (ClusterStateManager.isClient()) {            return TokenClientProvider.getClient();        }        if (ClusterStateManager.isServer()) {            return EmbeddedClusterTokenServerProvider.getServer();        }        return null;    }
集群限流模式

Sentinel 集群限流服务端有两种启动形式:

•嵌入模式(Embedded)适宜利用级别的限流,部署简略,但对利用性能有影响

•独立模式(Alone)适宜全局限流,须要独立部署

思考到文章篇幅,集群限流有机会再开展具体介绍。

集群限流模式降级
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,                                            boolean prioritized) {        try {            TokenService clusterService = pickClusterService();            if (clusterService == null) {                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);            }            long flowId = rule.getClusterConfig().getFlowId();            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);            return applyTokenResult(result, rule, context, node, acquireCount, prioritized);            // If client is absent, then fallback to local mode.        } catch (Throwable ex) {            RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);        }        // Fallback to local flow control when token client or server for this rule is not available.        // If fallback is not enabled, then directly pass.        // 能够看到如果集群限流有异样,会降级到单机限流模式,如果配置不容许降级,那么间接会跳过此次校验        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);    }

2.5 DegradeSlot

CircuitBreaker

大神对断路器的解释:https://martinfowler.com/bliki/CircuitBreaker.html

首先就看到了依据资源名称获取断路器列表,Sentinel的断路器有两个实现:RT模式应用ResponseTimeCircuitBreaker、异样模式应用ExceptionCircuitBreaker

public interface CircuitBreaker {    /**     * Get the associated circuit breaking rule.     *     * @return associated circuit breaking rule     */    DegradeRule getRule();    /**     * Acquires permission of an invocation only if it is available at the time of invoking.     *     * @param context context of current invocation     * @return {@code true} if permission was acquired and {@code false} otherwise     */    boolean tryPass(Context context);    /**     * Get current state of the circuit breaker.     *     * @return current state of the circuit breaker     */    State currentState();    /**     * <p>Record a completed request with the context and handle state transformation of the circuit breaker.</p>     * <p>Called when a <strong>passed</strong> invocation finished.</p>     *     * @param context context of current invocation     */    void onRequestComplete(Context context);    /**     * Circuit breaker state.     */    enum State {        /**         * In {@code OPEN} state, all requests will be rejected until the next recovery time point.         */        OPEN,        /**         * In {@code HALF_OPEN} state, the circuit breaker will allow a "probe" invocation.         * If the invocation is abnormal according to the strategy (e.g. it's slow), the circuit breaker         * will re-transform to the {@code OPEN} state and wait for the next recovery time point;         * otherwise the resource will be regarded as "recovered" and the circuit breaker         * will cease cutting off requests and transform to {@code CLOSED} state.         */        HALF_OPEN,        /**         * In {@code CLOSED} state, all requests are permitted. When current metric value exceeds the threshold,         * the circuit breaker will transform to {@code OPEN} state.         */        CLOSED    }}

以ExceptionCircuitBreaker为例看一下具体实现

public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {        // 异样模式有两种,异样率和异样数    private final int strategy;    // 最小申请数    private final int minRequestAmount;    // 阈值    private final double threshold;        // LeapArray是sentinel统计数据十分重要的一个构造,次要封装了工夫窗口相干的操作    private final LeapArray<SimpleErrorCounter> stat;    public ExceptionCircuitBreaker(DegradeRule rule) {        this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs()));    }    ExceptionCircuitBreaker(DegradeRule rule, LeapArray<SimpleErrorCounter> stat) {        super(rule);        this.strategy = rule.getGrade();        boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT;        AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count");        AssertUtil.notNull(stat, "stat cannot be null");        this.minRequestAmount = rule.getMinRequestAmount();        this.threshold = rule.getCount();        this.stat = stat;    }    @Override    protected void resetStat() {        // Reset current bucket (bucket count = 1).        stat.currentWindow().value().reset();    }        @Override    public void onRequestComplete(Context context) {        Entry entry = context.getCurEntry();        if (entry == null) {            return;        }        Throwable error = entry.getError();        SimpleErrorCounter counter = stat.currentWindow().value();        if (error != null) {            counter.getErrorCount().add(1);        }        counter.getTotalCount().add(1);        handleStateChangeWhenThresholdExceeded(error);    }    private void handleStateChangeWhenThresholdExceeded(Throwable error) {        if (currentState.get() == State.OPEN) {            return;        }                if (currentState.get() == State.HALF_OPEN) {            // In detecting request            if (error == null) {                fromHalfOpenToClose();            } else {                fromHalfOpenToOpen(1.0d);            }            return;        }                List<SimpleErrorCounter> counters = stat.values();        long errCount = 0;        long totalCount = 0;        for (SimpleErrorCounter counter : counters) {             += counter.errorCount.sum();            totalCount += counter.totalCount.sum();        }        if (totalCount < minRequestAmount) {            return;        }        double curCount = errCount;        if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {            // Use errorRatio            curCount = errCount * 1.0d / totalCount;        }        if (curCount > threshold) {            transformToOpen(curCount);        }    }    static class SimpleErrorCounter {        private LongAdder errorCount;        private LongAdder totalCount;        public SimpleErrorCounter() {            this.errorCount = new LongAdder();            this.totalCount = new LongAdder();        }        public LongAdder getErrorCount() {            return errorCount;        }        public LongAdder getTotalCount() {            return totalCount;        }        public SimpleErrorCounter reset() {            errorCount.reset();            totalCount.reset();            return this;        }        @Override        public String toString() {            return "SimpleErrorCounter{" +                "errorCount=" + errorCount +                ", totalCount=" + totalCount +                '}';        }    }    static class SimpleErrorCounterLeapArray extends LeapArray<SimpleErrorCounter> {        public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) {            super(sampleCount, intervalInMs);        }        @Override        public SimpleErrorCounter newEmptyBucket(long timeMillis) {            return new SimpleErrorCounter();        }        @Override        protected WindowWrap<SimpleErrorCounter> resetWindowTo(WindowWrap<SimpleErrorCounter> w, long startTime) {            // Update the start time and reset value.            w.resetTo(startTime);            w.value().reset();            return w;        }    }}

2.6 SystemSlot

校验逻辑次要集中在com.alibaba.csp.sentinel.slots.system.SystemRuleManager#checkSystem,以下是片段,能够看到,作为负载爱护规定校验,实现了集群的QPS、线程、RT(响应工夫)、零碎负载的管制,除零碎负载以外,其余统计都是依赖StatisticSlot实现,零碎负载是通过SystemRuleManager定时调度SystemStatusListener,通过OperatingSystemMXBean去获取

/**     * Apply {@link SystemRule} to the resource. Only inbound traffic will be checked.     *     * @param resourceWrapper the resource.     * @throws BlockException when any system rule's threshold is exceeded.     */    public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {        if (resourceWrapper == null) {            return;        }        // Ensure the checking switch is on.        if (!checkSystemStatus.get()) {            return;        }        // for inbound traffic only        if (resourceWrapper.getEntryType() != EntryType.IN) {            return;        }        // total qps 此处是拿到某个资源在集群中的QPS总和,相干概念能够会看初始化对于Node的介绍        double currentQps = Constants.ENTRY_NODE.passQps();        if (currentQps + count > qps) {            throw new SystemBlockException(resourceWrapper.getName(), "qps");        }        // total thread         int currentThread = Constants.ENTRY_NODE.curThreadNum();        if (currentThread > maxThread) {            throw new SystemBlockException(resourceWrapper.getName(), "thread");        }        double rt = Constants.ENTRY_NODE.avgRt();        if (rt > maxRt) {            throw new SystemBlockException(resourceWrapper.getName(), "rt");        }        // load. BBR algorithm.        if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {            if (!checkBbr(currentThread)) {                throw new SystemBlockException(resourceWrapper.getName(), "load");            }        }        // cpu usage        if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {            throw new SystemBlockException(resourceWrapper.getName(), "cpu");        }    }    private static boolean checkBbr(int currentThread) {        if (currentThread > 1 &&            currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {            return false;        }        return true;    }    public static double getCurrentSystemAvgLoad() {        return statusListener.getSystemAverageLoad();    }    public static double getCurrentCpuUsage() {        return statusListener.getCpuUsage();    }
public class SystemStatusListener implements Runnable {    volatile double currentLoad = -1;    volatile double currentCpuUsage = -1;    volatile String reason = StringUtil.EMPTY;    volatile long processCpuTime = 0;    volatile long processUpTime = 0;    public double getSystemAverageLoad() {        return currentLoad;    }    public double getCpuUsage() {        return currentCpuUsage;    }    @Override    public void run() {        try {            OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);            currentLoad = osBean.getSystemLoadAverage();            /*             * Java Doc copied from {@link OperatingSystemMXBean#getSystemCpuLoad()}:</br>             * Returns the "recent cpu usage" for the whole system. This value is a double in the [0.0,1.0] interval.             * A value of 0.0 means that all CPUs were idle during the recent period of time observed, while a value             * of 1.0 means that all CPUs were actively running 100% of the time during the recent period being             * observed. All values between 0.0 and 1.0 are possible depending of the activities going on in the             * system. If the system recent cpu usage is not available, the method returns a negative value.             */            double systemCpuUsage = osBean.getSystemCpuLoad();            // calculate process cpu usage to support application running in container environment            RuntimeMXBean runtimeBean = ManagementFactory.getPlatformMXBean(RuntimeMXBean.class);            long newProcessCpuTime = osBean.getProcessCpuTime();            long newProcessUpTime = runtimeBean.getUptime();            int cpuCores = osBean.getAvailableProcessors();            long processCpuTimeDiffInMs = TimeUnit.NANOSECONDS                    .toMillis(newProcessCpuTime - processCpuTime);            long processUpTimeDiffInMs = newProcessUpTime - processUpTime;            double processCpuUsage = (double) processCpuTimeDiffInMs / processUpTimeDiffInMs / cpuCores;            processCpuTime = newProcessCpuTime;            processUpTime = newProcessUpTime;            currentCpuUsage = Math.max(processCpuUsage, systemCpuUsage);            if (currentLoad > SystemRuleManager.getSystemLoadThreshold()) {                writeSystemStatusLog();            }        } catch (Throwable e) {            RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e);        }    }    private void writeSystemStatusLog() {        StringBuilder sb = new StringBuilder();        sb.append("Load exceeds the threshold: ");        sb.append("load:").append(String.format("%.4f", currentLoad)).append("; ");        sb.append("cpuUsage:").append(String.format("%.4f", currentCpuUsage)).append("; ");        sb.append("qps:").append(String.format("%.4f", Constants.ENTRY_NODE.passQps())).append("; ");        sb.append("rt:").append(String.format("%.4f", Constants.ENTRY_NODE.avgRt())).append("; ");        sb.append("thread:").append(Constants.ENTRY_NODE.curThreadNum()).append("; ");        sb.append("success:").append(String.format("%.4f", Constants.ENTRY_NODE.successQps())).append("; ");        sb.append("minRt:").append(String.format("%.2f", Constants.ENTRY_NODE.minRt())).append("; ");        sb.append("maxSuccess:").append(String.format("%.2f", Constants.ENTRY_NODE.maxSuccessQps())).append("; ");        RecordLog.info(sb.toString());    }}

三、京东版最佳实际

3.1 应用形式

Sentinel应用形式自身非常简单,就是一个注解,然而要思考规定加载和规定长久化的形式,现有的形式有:

•应用Sentinel-dashboard性能:应用面板接入须要保护一个配置规定的治理端,思考到偏后端的零碎须要额定保护一个面板老本较大,如果是像RPC框架这种自身有治理端的接入能够思考次计划。

•中间件(如:zookepper、nacos、eureka、redis等):Sentinel源码extension包里提供了相似的实现,如下图

联合京东理论,我实现了一个规定热部署的Sentinel组件,实现形式相似zookeeper的形式,将规定记录到ducc的一个key上,在spring容器启动时做第一次规定加载和监听器注册,组件也做一了一些规定读取,校验、实例化不同规定对象的工作

插件应用形式:注解+配置

第一步 引入组件

<dependency>    <groupId>com.jd.ldop.tools</groupId>    <artifactId>sentinel-tools</artifactId>    <version>1.0.0-SNAPSHOT</version></dependency>

第二步 初始化sentinelProcess

反对ducc、本地文件读取、间接写入三种形式规定写入形式

目前反对限流规定、熔断降级规定两种模式,零碎负载保护模式待开发和验证

<!-- 基于sentinel的降级、限流、熔断组件 -->    <bean id="sentinelProcess" class="com.jd.ldop.sentinel.SentinelProcess">        <property name="ruleResourceWrappers">            <list>                <ref bean="degradeRule"/>            </list>        </property>    </bean>    <!-- 降级或限流规定配置 -->    <bean id="degradeRule" class="com.jd.ldop.sentinel.dto.RuleResourceWrapper">        <constructor-arg index="0" value="ducc.degradeRule"/>        <constructor-arg index="1" value="0"/>        <constructor-arg index="2" value="0"/>    </bean>

ducc上配置如下:

第三步 定义资源和关联类型

通过@SentinelResource能够间接在任意地位定义资源名以及对应的熔断降级或者限流形式、回调办法等,同时也能够指定关联类型,反对间接、关联、指定链路三种

    @Override    @SentinelResource(value = "modifyGetWaybillState", fallback = "executeDegrade")    public ExecutionResult<List<Integer>> execute(@NotNull Model imodel) {        // 业务逻辑解决    }    public ExecutionResult<List<Integer>> executeDegrade(@NotNull Model imodel) {        // 降级业务逻辑解决    }

3.2 利用场景

组件反对任意的业务降级、限流、负载爱护

四、Sentinel压测数据

4.1 压测指标

调用量:1.2W/m

利用机器内存稳固在50%以内

机器规格: 8C16G50G磁盘*2

Sentinel降级规定:

count=350-------慢调用临界阈值350ms

timeWindow=180------熔断工夫窗口180s

grade=0-----降级模式 慢调用

statIntervalMs=60000------统计时长1min

4.2 压测后果

利用机器监控:

压测分为了两个阶段,别离是组件开启和组件敞开两次,前半部分是组件开启的状况,后半局部是组件敞开的状况

利用过程内存剖析,和sentinel无关的前三对象是

com.alibaba.csp.sentinel.node.metric.MetricNode

com.alibaba.csp.sentinel.CtEntry

com.alibaba.csp.sentinel.context.Context

4.3 压测论断

使Sentinel组件实现零碎服务主动降级或限流,因为sentinel会依照滑动窗口周期性统计数据,因而会占用肯定的机器内存,应用时应设置正当的规定,如:正当的统计时长、防止过多的Sentinel资源创立等。

总体来说,应用sentinel组件对利用cpu和内存影响不大。