关于sentinel:基于Sentinel自研组件的系统限流降级负载保护最佳实践探索-京东云技术团队

40次阅读

共计 32986 个字符,预计需要花费 83 分钟才能阅读完成。

作者:京东物流 杨建民

一、Sentinel 简介

Sentinel 以流量为切入点,从 流量管制 熔断降级 零碎负载 爱护等多个维度爱护服务的稳定性。

Sentinel 具备以下特色:

  • 丰盛的利用场景:秒杀(即突发流量管制在零碎容量能够接受的范畴)、音讯削峰填谷、集群流量管制、实时熔断上游不可用利用等。
  • 齐备的实时监控:Sentinel 同时提供实时的监控性能。您能够在控制台中看到接入利用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行状况。
  • 宽泛的开源生态:Sentinel 提供开箱即用的与其它开源框架 / 库的整合模块,例如与 Spring Cloud、Apache Dubbo、gRPC、Quarkus 的整合。您只须要引入相应的依赖并进行简略的配置即可疾速地接入 Sentinel。同时 Sentinel 提供 Java/Go/C++ 等多语言的原生实现。
  • 欠缺的 SPI 扩大机制:Sentinel 提供简略易用、欠缺的 SPI 扩大接口。您能够通过实现扩大接口来疾速地定制逻辑。例如定制规定治理、适配动静数据源等

无关 Sentinel 的具体介绍以及和 Hystrix 的区别能够自行网上检索,举荐一篇文章:https://mp.weixin.qq.com/s/Q7Xv8cypQFrrOQhbd9BOXw

本次次要应用了 Sentinel 的降级、限流、零碎负载爱护性能

二、Sentinel 关键技术源码解析

无论是限流、降级、负载等管制伎俩,大抵流程如下:

•StatisticSlot 则用于记录、统计不同维度的 runtime 指标监控信息

•责任链顺次触发后续 slot 的 entry 办法,如 SystemSlot、FlowSlot、DegradeSlot 等的规定校验;

•当后续的 slot 通过,没有抛出 BlockException 异样,阐明该资源被胜利调用,则减少执行线程数和通过的申请数等信息。

对于数据统计,次要会牵扯到 ArrayMetric、BucketLeapArray、MetricBucket、WindowWrap 等类。

我的项目构造

以下次要剖析 core 包里的内容

2.1 注解入口

2.1.1 Entry、Context、Node

SphU 门面类的办法出参都是 Entry,Entry 能够了解为每次进入资源的一个凭证,如果调用 SphO.entry()或者 SphU.entry()能获取 Entry 对象,代表获取了凭证,没有被限流,否则抛出一个 BlockException。

Entry 中持有本次对资源调用的相干信息:

•createTime:创立该 Entry 的工夫戳。

•curNode:Entry 以后是在哪个节点。

•orginNode:Entry 的调用源节点。

•resourceWrapper:Entry 关联的资源信息。

Entry 是一个抽象类,CtEntry 是 Entry 的实现,CtEntry 持有 Context 和调用链的信息

Context 的源码正文如下,

This class holds metadata of current invocation

Node 的源码正文

Holds real-time statistics for resources

Node 中保留了对资源的实时数据的统计,Sentinel 中的限流或者降级等性能就是通过 Node 中的数据进行判断的。Node 是一个接口,外面定义了各种操作 request、exception、rt、qps、thread 的办法。

在细看 Node 实现时,不难发现 LongAddr 的应用,对于 LongAddr 和 DoubleAddr 都是 java8 java.util.concurrent.atomic 里的内容,感兴趣的小伙伴能够再深入研究一下,这两个是高并发下计数性能 十分优良 的数据结构,理论利用场景里须要计数时能够思考应用。

对于 Node 的介绍后续还会深刻,此处大抵先提一下这个概念。

2.2 初始化

2.2.1 Context 初始化

在初始化 slot 责任链局部前,还执行了 context 的初始化,外面波及几个重要概念,须要解释一下:

能够发现在 Context 初始化的过程中,会把 EntranceNode 退出到 Root 子节点中(理论 Root 自身是一个非凡的 EntranceNode),并把 EntranceNode 放到 contextNameNodeMap 中。

之前简略提到过 Node,是用来统计数据用的,不同 Node 性能如下:

•Node:用于实现数据统计的接口

•StatisticNode:统计节点,是 Node 接口的实现类,用于实现数据统计

•EntranceNode:入口节点,一个 Context 会有一个入口节点,用于统计以后 Context 的总体流量数据

•DefaultNode:默认节点,用于统计一个资源在以后 Context 中的流量数据

•ClusterNode:集群节点,用于统计一个资源在所有 Context 中的总体流量数据

protected static Context trueEnter(String name, String origin) {Context context = contextHolder.get();
        if (context == null) {
            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
            DefaultNode node = localCacheNameMap.get(name);
            if (node == null) {if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {setNullContext();
                    return NULL_CONTEXT;
                } else {LOCK.lock();
                    try {node = contextNameNodeMap.get(name);
                        if (node == null) {if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {setNullContext();
                                return NULL_CONTEXT;
                            } else {node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                // Add entrance node.
                                Constants.ROOT.addChild(node);

                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                newMap.putAll(contextNameNodeMap);
                                newMap.put(name, node);
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {LOCK.unlock();
                    }
                }
            }
            context = new Context(node, name);
            context.setOrigin(origin);
            contextHolder.set(context);
        }

        return context;
    }

2.2.2 通过 SpiLoader 默认初始化 8 个 slot

每个 slot 的主要职责如下:

•NodeSelectorSlot 负责 收集资源的门路 ,并将这些资源的调用门路,以树状构造 存储 起来,用于依据调用门路来 限流降级

•ClusterBuilderSlot 则用于 存储 资源的 统计信息 以及 调用者信息 ,例如该资源的 RT, QPS, thread count 等等,这些信息将用 作为多维度限流,降级的根据

•StatisticSlot 则用于 记录、统计不同纬度的 runtime 指标监控信息

•FlowSlot 则用于依据预设的限流规定以及后面 slot 统计的状态,来进行 流量管制

•AuthoritySlot 则依据配置的黑白名单和调用起源信息,来做 黑白名单管制

•DegradeSlot 则通过统计信息以及预设的规定,来做 熔断降级

•SystemSlot 则通过零碎的状态,例如 集群 QPS、线程数、RT、负载 等,来 管制总的入口流量

2.3 StatisticSlot

2.3.1 Node

深刻看一下 Node,因为统计信息都在外面,前面不论是限流、熔断、负载爱护等都是联合规定 + 统计信息判断是否要执行

从 Node 的源码正文看,它会持有资源维度的实时统计数据,以下是接口里的办法定义,能够看到 totalRequest、totalPass、totalSuccess、blockRequest、totalException、passQps 等很多 request、qps、thread 的相干办法:

/**
 * Holds real-time statistics for resources.
 *
 * @author qinan.qn
 * @author leyou
 * @author Eric Zhao
 */
public interface Node extends OccupySupport, DebugSupport {long totalRequest();
    long totalPass();
    long totalSuccess();
    long blockRequest();
    long totalException();
    double passQps();
    double blockQps();
    double totalQps();
    double successQps();
    ……
}

2.3.2 StatisticNode

咱们先从最根底的 StatisticNode 开始看,源码给出的定位是:

The statistic node keep three kinds of real-time statistics metrics:metrics in second level ({@code rollingCounterInSecond})
metrics in minute level ({@code rollingCounterInMinute})
thread count

StatisticNode 只有四个属性,除了之前提到过的 LongAddr 类型的 curThreadNum 外,还有两个属性是 Metric 对象,通过入参曾经属性命名能够看出,一个用于秒级,一个用于分钟级统计。接下来咱们就要看看 Metric

// StatisticNode 持有两个 Metric,一个秒级一个分钟级,由入参可知,秒级统计划分了两个工夫窗口,窗口水平是 500ms
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
    IntervalProperty.INTERVAL);

// 分钟级统计划分了 60 个工夫窗口,窗口长度是 1000ms
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

/**
 * The counter for thread count.
 */
private LongAdder curThreadNum = new LongAdder();

/**
 * The last timestamp when metrics were fetched.
 */
private long lastFetchTime = -1;

ArrayMetric 只有一个属性 LeapArray<MetricBucket>,其余都是用于统计的办法,LeapArray 是 sentinel 中统计最根本的数据结构,这里有必要具体看一下,总体就是依据 timeMillis 去获取一个 bucket,分为:没有创立、有间接返回、被废除后的 reset 三种场景。

// 以分钟级的统计属性为例,看一下工夫窗口初始化过程
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);


public LeapArray(int sampleCount, int intervalInMs) {AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid:" + sampleCount);
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
        // windowLengthInMs = 60*1000 / 60 = 1000 滑动窗口工夫长度,可见 sentinel 默认将单位工夫分为了 60 个滑动窗口进行数据统计
        this.windowLengthInMs = intervalInMs / sampleCount;
        // 60*1000
        this.intervalInMs = intervalInMs;
        // 60
        this.intervalInSecond = intervalInMs / 1000.0;
        // 60
        this.sampleCount = sampleCount;
        // 数组长度 60
        this.array = new AtomicReferenceArray<>(sampleCount);
    }

/**
     * Get bucket item at provided timestamp.
     *
     * @param timeMillis a valid timestamp in milliseconds
     * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
     */
    public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}
        // 依据以后工夫戳算一个数组索引
        int idx = calculateTimeIdx(timeMillis);
        // Calculate current bucket start time.
        // timeMillis % 1000
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket.
         */
        while (true) {WindowWrap<T> old = array.get(idx);
            if (old == null) {
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
                // newEmptyBucket 办法重写,秒级和分钟级统计对象实现不同
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();}
            } else if (windowStart == old.windowStart()) {
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            } else if (windowStart > old.windowStart()) {
                /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();}
            } else if (windowStart < old.windowStart()) {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
// 持有一个工夫窗口对象的数据,会依据以后工夫戳除以工夫窗口长度而后散列到数组中
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
        long timeId = timeMillis / windowLengthInMs;
        // Calculate current index so we can map the timestamp to the leap array.
        return (int)(timeId % array.length());
    }

WindowWrap 持有了 windowLengthInMs, windowStart 和 LeapArray(分钟统计实现是 BucketLeapArray,秒级统计实现是 OccupiableBucketLeapArray),对于分钟级别的统计,MetricBucket 保护了一个 longAddr 数组和一个配置的 minRT

/**
 * The fundamental data structure for metric statistics in a time span.
 *
 * @author jialiang.linjl
 * @author Eric Zhao
 * @see LeapArray
 */
public class BucketLeapArray extends LeapArray<MetricBucket> {public BucketLeapArray(int sampleCount, int intervalInMs) {super(sampleCount, intervalInMs);
    }

    @Override
    public MetricBucket newEmptyBucket(long time) {return new MetricBucket();
    }

    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
        // Update the start time and reset value.
        w.resetTo(startTime);
        w.value().reset();
        return w;
    }
}

对于秒级统计,QPS=20 场景下,如何精确统计的问题,此处用到了另外一个 LeapArry 实现 FutureBucketLeapArray,至于秒级统计如何保障没有统计误差,读者能够再钻研一下 FutureBucketLeapArray 的上下文就好。

2.4 FlowSlot

2.4.1 常见限流算法

介绍 sentinel 限流实现前,先介绍一下常见限流算法,根本分为三种:计数器、漏斗、令牌桶。

计数器算法

顾名思义,计数器算法就是统计某个时间段内的申请,每单位工夫加 1,而后与配置的限流值(最大 QPS)进行比拟,如果超出则触发限流。然而这种算法不能做到“平滑限流”,以 1s 为单位工夫,100QPS 为限流值为例,如下图,会呈现某时段超出限流值的状况

因而在单纯计数器算法上,又呈现了滑动窗口计数器算法,咱们将统计工夫细分,比方将 1s 统计时长分为 5 个工夫窗口,通过滚动统计所有工夫窗口的 QPS 作为零碎理论的 QPS 的形式,就能解决上述临界统计问题,后续咱们看 sentinel 源码时也能看到相似操作。

漏斗算法

不管流量有多大都会先到漏桶中,而后以平均的速度流出。如何在代码中实现这个匀速呢?比方咱们想让匀速为 100q/s,那么咱们能够失去每流出一个流量须要耗费 10ms,相似一个队列,每隔 10ms 从队列头部取出流量进行放行,而咱们的队列也就是漏桶,当流量大于队列的长度的时候,咱们就能够回绝超出的局部。

漏斗算法同样的也有肯定的毛病:无奈应答突发流量。比方一瞬间来了 100 个申请,在漏桶算法中只能一个一个的过来,当最初一个申请流出的时候工夫曾经过了一秒了,所以漏斗算法比拟适宜申请达到比拟平均,须要严格控制申请速率的场景。

令牌桶算法

令牌桶算法和漏斗算法比拟相似,区别是令牌桶寄存的是令牌数量不是申请数量,令牌桶能够依据本身需要多样性得治理令牌的生产和耗费,能够解决突发流量的问题。

2.4.2 单机限流模式

接下来咱们看一下 Sentinel 中的限流实现,相比上述根本限流算法,Sentinel 限流的第一个个性就是引入“资源”的概念,能够细粒度多样性的反对特定资源、关联资源、指定链路的限流。

FlowSlot 的次要逻辑都在 FlowRuleChecker 里,介绍之前,咱们先看一下 Sentinel 对于规定的模型形容,下图别离是限流、访问控制规定、零碎爱护规定(Linux 负载)、降级规定

    /**
     * 流量管制两种模式 
     *   0: thread count(当调用该 api 的线程数达到阈值的时候,进行限流)*   1: QPS(当调用该 api 的 QPS 达到阈值的时候,进行限流)*/
    private int grade = RuleConstant.FLOW_GRADE_QPS;

    /**
     * 流量管制阈值,值含意与 grade 无关
     */
    private double count;

    /**
     * 调用关系限流策略(能够反对关联资源或指定链路的多样性限流需要)*  间接(api 达到限流条件时,间接限流)*  关联(当关联的资源达到限流阈值时,就限流本人)*  链路(只记录指定链路上的流量)* {@link RuleConstant#STRATEGY_DIRECT} for direct flow control (by origin);
     * {@link RuleConstant#STRATEGY_RELATE} for relevant flow control (with relevant resource);
     * {@link RuleConstant#STRATEGY_CHAIN} for chain flow control (by entrance resource).
     */
    private int strategy = RuleConstant.STRATEGY_DIRECT;

    /**
     * Reference resource in flow control with relevant resource or context.
     */
    private String refResource;

    /**
     * 流控成果:* 0. default(reject directly), 间接回绝,抛异样 FlowException
     * 1. warm up, 慢启动模式(依据 coldFactor(冷加载因子,默认 3)的值,从阈值 /coldFactor,通过预热时长,才达到设置的 QPS 阈值)* 2. rate limiter  排队期待
     * 3. warm up + rate limiter
     */
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;

    private int warmUpPeriodSec = 10;

    /**
     * Max queueing time in rate limiter behavior.
     */
    private int maxQueueingTimeMs = 500;

    /**
    *  是否集群限流,默认为否
    */
    private boolean clusterMode;
    /**
     * Flow rule config for cluster mode.
     */
    private ClusterFlowConfig clusterConfig;

    /**
     * The traffic shaping (throttling) controller.
     */
    private TrafficShapingController controller;

接着咱们持续剖析 FlowRuleChecker

canPassCheck 第一步会难看 limitApp,这个是联合拜访受权限度规定应用的,默认是所有。

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        // 依据策略抉择 Node 来进行统计(能够是自身 Node、关联的 Node、指定的链路)Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {return true;}

        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }


static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
        // limitApp 是访问控制应用的,默认是 default,不限度起源
        String limitApp = rule.getLimitApp();
        // 拿到限流策略
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();
        // 基于调用起源做鉴权
        if (limitApp.equals(origin) && filterOrigin(origin)) {if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Matches limit origin, return origin statistic node.
                return context.getOriginNode();}
            // 
            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Return the cluster node.
                return node.getClusterNode();}

            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
            && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {if (strategy == RuleConstant.STRATEGY_DIRECT) {return context.getOriginNode();
            }

            return selectReferenceNode(rule, context, node);
        }

        return null;
    }

static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {String refResource = rule.getRefResource();
        int strategy = rule.getStrategy();

        if (StringUtil.isEmpty(refResource)) {return null;}

        if (strategy == RuleConstant.STRATEGY_RELATE) {return ClusterBuilderSlot.getClusterNode(refResource);
        }

        if (strategy == RuleConstant.STRATEGY_CHAIN) {if (!refResource.equals(context.getName())) {return null;}
            return node;
        }
        // No node.
        return null;
    }

// 此代码是 load 限流规定时依据规定初始化流量整形控制器的逻辑,rule.getRater()返回 TrafficShapingController
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {switch (rule.getControlBehavior()) {
                // 预热模式返回 WarmUpController
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                    return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                            ColdFactorProperty.coldFactor);
                // 排队模式返回 ThrottlingController
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                    return new ThrottlingController(rule.getMaxQueueingTimeMs(), rule.getCount());
                // 预热 + 排队模式返回 WarmUpRateLimiterController
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                    return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                            rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
                default:
                    // Default mode or unknown mode: default traffic shaping controller (fast-reject).
            }
        }
        // 默认是 DefaultController
        return new DefaultController(rule.getCount(), rule.getGrade());
    }

Sentinel 单机限流算法

下面咱们看到依据限流规定 controlBehavior 属性(流控成果),会初始化以下实现:

•DefaultController:是一个十分典型的滑动窗口计数器算法实现,将以后统计的 qps 和申请进来的 qps 进行求和,小于限流值则通过,大于则计算一个等待时间,稍后再试

•ThrottlingController:是漏斗算法的实现,实现思路曾经在源码片段中加了备注

•WarmUpController:实现参考了 Guava 的带预热的 RateLimiter,区别是 Guava 侧重于申请距离,相似后面提到的令牌桶,而 Sentinel 更关注于申请数,和令牌桶算法有点相似

•WarmUpRateLimiterController:低水位应用预热算法,高水位应用滑动窗口计数器算法排队。

DefaultController
    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

ThrottlingController
 public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat) {this(queueingTimeoutMs, maxCountPerStat, 1000);
    }

    public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat, int statDurationMs) {AssertUtil.assertTrue(statDurationMs > 0, "statDurationMs should be positive");
        AssertUtil.assertTrue(maxCountPerStat >= 0, "maxCountPerStat should be >= 0");
        AssertUtil.assertTrue(queueingTimeoutMs >= 0, "queueingTimeoutMs should be >= 0");
        this.maxQueueingTimeMs = queueingTimeoutMs;
        this.count = maxCountPerStat;
        this.statDurationMs = statDurationMs;
        // Use nanoSeconds when durationMs%count != 0 or count/durationMs> 1 (to be accurate)
        // 可见配置限流值 count 大于 1000 时 useNanoSeconds 会是 true 否则是 false
        if (maxCountPerStat > 0) {this.useNanoSeconds = statDurationMs % Math.round(maxCountPerStat) != 0 || maxCountPerStat / statDurationMs > 1;
        } else {this.useNanoSeconds = false;}
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {return canPass(node, acquireCount, false);
    }

    private boolean checkPassUsingNanoSeconds(int acquireCount, double maxCountPerStat) {
        final long maxQueueingTimeNs = maxQueueingTimeMs * MS_TO_NS_OFFSET;
        long currentTime = System.nanoTime();
        // Calculate the interval between every two requests.
        final long costTimeNs = Math.round(1.0d * MS_TO_NS_OFFSET * statDurationMs * acquireCount / maxCountPerStat);

        // Expected pass time of this request.
        long expectedTime = costTimeNs + latestPassedTime.get();

        if (expectedTime <= currentTime) {
            // Contention may exist here, but it's okay.
            latestPassedTime.set(currentTime);
            return true;
        } else {final long curNanos = System.nanoTime();
            // Calculate the time to wait.
            long waitTime = costTimeNs + latestPassedTime.get() - curNanos;
            if (waitTime > maxQueueingTimeNs) {return false;}

            long oldTime = latestPassedTime.addAndGet(costTimeNs);
            waitTime = oldTime - curNanos;
            if (waitTime > maxQueueingTimeNs) {latestPassedTime.addAndGet(-costTimeNs);
                return false;
            }
            // in race condition waitTime may <= 0
            if (waitTime > 0) {sleepNanos(waitTime);
            }
            return true;
        }
    }
    
    // 漏斗算法具体实现
    private boolean checkPassUsingCachedMs(int acquireCount, double maxCountPerStat) {long currentTime = TimeUtil.currentTimeMillis();
        // 计算两次申请的距离(分为秒级和纳秒级)long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat);

        // 申请的冀望的工夫
        long expectedTime = costTime + latestPassedTime.get();

        if (expectedTime <= currentTime) {
            // latestPassedTime 是 AtomicLong 类型,反对 volatile 语义
            latestPassedTime.set(currentTime);
            return true;
        } else {
            // 计算等待时间
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
            // 如果大于最大排队工夫,则触发限流
            if (waitTime > maxQueueingTimeMs) {return false;}
            
            long oldTime = latestPassedTime.addAndGet(costTime);
            waitTime = oldTime - TimeUtil.currentTimeMillis();
            if (waitTime > maxQueueingTimeMs) {latestPassedTime.addAndGet(-costTime);
                return false;
            }
            // in race condition waitTime may <= 0
            if (waitTime > 0) {sleepMs(waitTime);
            }
            return true;
        }
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // Pass when acquire count is less or equal than 0.
        if (acquireCount <= 0) {return true;}
        // Reject when count is less or equal than 0.
        // Otherwise, the costTime will be max of long and waitTime will overflow in some cases.
        if (count <= 0) {return false;}
        if (useNanoSeconds) {return checkPassUsingNanoSeconds(acquireCount, this.count);
        } else {return checkPassUsingCachedMs(acquireCount, this.count);
        }
    }

    private void sleepMs(long ms) {
        try {Thread.sleep(ms);
        } catch (InterruptedException e) {}}

    private void sleepNanos(long ns) {LockSupport.parkNanos(ns);
    }

long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat);

由上述计算两次申请距离的公式咱们能够发现,当 maxCountPerStat(规定配置的限流值 QPS)超过 1000 后,就无奈精确计算出匀速排队模式下的申请距离时长,因而对应后面介绍的,当规定配置限流值超过 1000QPS 后,会采纳 checkPassUsingNanoSeconds,小于 1000QPS 会采纳 checkPassUsingCachedMs,比照一下 checkPassUsingNanoSeconds 和 checkPassUsingCachedMs,能够发现主体思路没变,只是统计维度从毫秒换算成了纳秒,因而只看 checkPassUsingCachedMs 实现就能够

WarmUpController
 
@Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {long passQps = (long) node.passQps();

        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);

        // 开始计算它的斜率
        // 如果进入了警戒线,开始调整他的 qps
        long restToken = storedTokens.get();
        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;
            // 耗费的速度要比 warning 快,然而要比慢
            // current interval = restToken*slope+1/count
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            if (passQps + acquireCount <= warningQps) {return true;}
        } else {if (passQps + acquireCount <= count) {return true;}
        }

        return false;
    }

protected void syncToken(long passQps) {long currentTime = TimeUtil.currentTimeMillis();
        currentTime = currentTime - currentTime % 1000;
        long oldLastFillTime = lastFilledTime.get();
        if (currentTime <= oldLastFillTime) {return;}

        long oldValue = storedTokens.get();
        long newValue = coolDownTokens(currentTime, passQps);

        if (storedTokens.compareAndSet(oldValue, newValue)) {long currentValue = storedTokens.addAndGet(0 - passQps);
            if (currentValue < 0) {storedTokens.set(0L);
            }
            lastFilledTime.set(currentTime);
        }

    }

private long coolDownTokens(long currentTime, long passQps) {long oldValue = storedTokens.get();
        long newValue = oldValue;

        // 增加令牌的判断前提条件:
        // 当令牌的耗费水平远远低于警戒线的时候
        if (oldValue < warningToken) {newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
        } else if (oldValue > warningToken) {if (passQps < (int)count / coldFactor) {newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
            }
        }
        return Math.min(newValue, maxToken);
    }

2.4.3 集群限流

passClusterCheck 办法(因为 clusterService 找不到会降级到非集群限流)

private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                            boolean prioritized) {
        try {
            // 获取以后节点是 Token Client 还是 Token Server
            TokenService clusterService = pickClusterService();
            if (clusterService == null) {return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
            }
            long flowId = rule.getClusterConfig().getFlowId();
            // 依据获取的 flowId 通过 TokenService 进行申请 token。从下面可知,它可能是 TokenClient 调用的,也可能是 ToeknServer 调用的。别离对应的类是 DefaultClusterTokenClient 和 DefaultTokenService
            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
            return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
            // If client is absent, then fallback to local mode.
        } catch (Throwable ex) {RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
        }
        // Fallback to local flow control when token client or server for this rule is not available.
        // If fallback is not enabled, then directly pass.
        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
    }

// 获取以后节点是 Token Client 还是 Token Server。//1)如果以后节点的角色是 Client,返回的 TokenService 为 DefaultClusterTokenClient;//2)如果以后节点的角色是 Server,则默认返回的 TokenService 为 DefaultTokenService。private static TokenService pickClusterService() {if (ClusterStateManager.isClient()) {return TokenClientProvider.getClient();
        }
        if (ClusterStateManager.isServer()) {return EmbeddedClusterTokenServerProvider.getServer();
        }
        return null;
    }

集群限流模式

Sentinel 集群限流服务端有两种启动形式:

•嵌入模式(Embedded)适宜利用级别的限流,部署简略,但对利用性能有影响

•独立模式(Alone)适宜全局限流,须要独立部署

思考到文章篇幅,集群限流有机会再开展具体介绍。

集群限流模式降级
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                            boolean prioritized) {
        try {TokenService clusterService = pickClusterService();
            if (clusterService == null) {return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
            }
            long flowId = rule.getClusterConfig().getFlowId();
            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
            return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
            // If client is absent, then fallback to local mode.
        } catch (Throwable ex) {RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
        }
        // Fallback to local flow control when token client or server for this rule is not available.
        // If fallback is not enabled, then directly pass.
        // 能够看到如果集群限流有异样,会降级到单机限流模式,如果配置不容许降级,那么间接会跳过此次校验
        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
    }

2.5 DegradeSlot

CircuitBreaker

大神对断路器的解释:https://martinfowler.com/bliki/CircuitBreaker.html

首先就看到了依据资源名称获取断路器列表,Sentinel 的断路器有两个实现:RT 模式应用 ResponseTimeCircuitBreaker、异样模式应用 ExceptionCircuitBreaker

public interface CircuitBreaker {

    /**
     * Get the associated circuit breaking rule.
     *
     * @return associated circuit breaking rule
     */
    DegradeRule getRule();

    /**
     * Acquires permission of an invocation only if it is available at the time of invoking.
     *
     * @param context context of current invocation
     * @return {@code true} if permission was acquired and {@code false} otherwise
     */
    boolean tryPass(Context context);

    /**
     * Get current state of the circuit breaker.
     *
     * @return current state of the circuit breaker
     */
    State currentState();

    /**
     * <p>Record a completed request with the context and handle state transformation of the circuit breaker.</p>
     * <p>Called when a <strong>passed</strong> invocation finished.</p>
     *
     * @param context context of current invocation
     */
    void onRequestComplete(Context context);

    /**
     * Circuit breaker state.
     */
    enum State {
        /**
         * In {@code OPEN} state, all requests will be rejected until the next recovery time point.
         */
        OPEN,
        /**
         * In {@code HALF_OPEN} state, the circuit breaker will allow a "probe" invocation.
         * If the invocation is abnormal according to the strategy (e.g. it's slow), the circuit breaker
         * will re-transform to the {@code OPEN} state and wait for the next recovery time point;
         * otherwise the resource will be regarded as "recovered" and the circuit breaker
         * will cease cutting off requests and transform to {@code CLOSED} state.
         */
        HALF_OPEN,
        /**
         * In {@code CLOSED} state, all requests are permitted. When current metric value exceeds the threshold,
         * the circuit breaker will transform to {@code OPEN} state.
         */
        CLOSED
    }
}

以 ExceptionCircuitBreaker 为例看一下具体实现

public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
    
    // 异样模式有两种,异样率和异样数
    private final int strategy;
    // 最小申请数
    private final int minRequestAmount;
    // 阈值
    private final double threshold;
    
    // LeapArray 是 sentinel 统计数据十分重要的一个构造,次要封装了工夫窗口相干的操作
    private final LeapArray<SimpleErrorCounter> stat;

    public ExceptionCircuitBreaker(DegradeRule rule) {this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs()));
    }

    ExceptionCircuitBreaker(DegradeRule rule, LeapArray<SimpleErrorCounter> stat) {super(rule);
        this.strategy = rule.getGrade();
        boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT;
        AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count");
        AssertUtil.notNull(stat, "stat cannot be null");
        this.minRequestAmount = rule.getMinRequestAmount();
        this.threshold = rule.getCount();
        this.stat = stat;
    }

    @Override
    protected void resetStat() {// Reset current bucket (bucket count = 1).
        stat.currentWindow().value().reset();}

    
    @Override
    public void onRequestComplete(Context context) {Entry entry = context.getCurEntry();
        if (entry == null) {return;}
        Throwable error = entry.getError();
        SimpleErrorCounter counter = stat.currentWindow().value();
        if (error != null) {counter.getErrorCount().add(1);
        }
        counter.getTotalCount().add(1);

        handleStateChangeWhenThresholdExceeded(error);
    }

    private void handleStateChangeWhenThresholdExceeded(Throwable error) {if (currentState.get() == State.OPEN) {return;}
        
        if (currentState.get() == State.HALF_OPEN) {
            // In detecting request
            if (error == null) {fromHalfOpenToClose();
            } else {fromHalfOpenToOpen(1.0d);
            }
            return;
        }
        
        List<SimpleErrorCounter> counters = stat.values();
        long errCount = 0;
        long totalCount = 0;
        for (SimpleErrorCounter counter : counters) {+= counter.errorCount.sum();
            totalCount += counter.totalCount.sum();}
        if (totalCount < minRequestAmount) {return;}
        double curCount = errCount;
        if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
            // Use errorRatio
            curCount = errCount * 1.0d / totalCount;
        }
        if (curCount > threshold) {transformToOpen(curCount);
        }
    }

    static class SimpleErrorCounter {
        private LongAdder errorCount;
        private LongAdder totalCount;

        public SimpleErrorCounter() {this.errorCount = new LongAdder();
            this.totalCount = new LongAdder();}

        public LongAdder getErrorCount() {return errorCount;}

        public LongAdder getTotalCount() {return totalCount;}

        public SimpleErrorCounter reset() {errorCount.reset();
            totalCount.reset();
            return this;
        }

        @Override
        public String toString() {
            return "SimpleErrorCounter{" +
                "errorCount=" + errorCount +
                ", totalCount=" + totalCount +
                '}';
        }
    }

    static class SimpleErrorCounterLeapArray extends LeapArray<SimpleErrorCounter> {public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) {super(sampleCount, intervalInMs);
        }

        @Override
        public SimpleErrorCounter newEmptyBucket(long timeMillis) {return new SimpleErrorCounter();
        }

        @Override
        protected WindowWrap<SimpleErrorCounter> resetWindowTo(WindowWrap<SimpleErrorCounter> w, long startTime) {
            // Update the start time and reset value.
            w.resetTo(startTime);
            w.value().reset();
            return w;
        }
    }
}

2.6 SystemSlot

校验逻辑次要集中在 com.alibaba.csp.sentinel.slots.system.SystemRuleManager#checkSystem,以下是片段,能够看到,作为负载爱护规定校验,实现了集群的 QPS、线程、RT(响应工夫)、零碎负载的管制,除零碎负载以外,其余统计都是依赖 StatisticSlot 实现,零碎负载是通过 SystemRuleManager 定时调度 SystemStatusListener,通过 OperatingSystemMXBean 去获取

/**
     * Apply {@link SystemRule} to the resource. Only inbound traffic will be checked.
     *
     * @param resourceWrapper the resource.
     * @throws BlockException when any system rule's threshold is exceeded.
     */
    public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {if (resourceWrapper == null) {return;}
        // Ensure the checking switch is on.
        if (!checkSystemStatus.get()) {return;}

        // for inbound traffic only
        if (resourceWrapper.getEntryType() != EntryType.IN) {return;}

        // total qps 此处是拿到某个资源在集群中的 QPS 总和,相干概念能够会看初始化对于 Node 的介绍
        double currentQps = Constants.ENTRY_NODE.passQps();
        if (currentQps + count > qps) {throw new SystemBlockException(resourceWrapper.getName(), "qps");
        }

        // total thread 
        int currentThread = Constants.ENTRY_NODE.curThreadNum();
        if (currentThread > maxThread) {throw new SystemBlockException(resourceWrapper.getName(), "thread");
        }

        double rt = Constants.ENTRY_NODE.avgRt();
        if (rt > maxRt) {throw new SystemBlockException(resourceWrapper.getName(), "rt");
        }

        // load. BBR algorithm.
        if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {if (!checkBbr(currentThread)) {throw new SystemBlockException(resourceWrapper.getName(), "load");
            }
        }

        // cpu usage
        if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {throw new SystemBlockException(resourceWrapper.getName(), "cpu");
        }
    }

    private static boolean checkBbr(int currentThread) {
        if (currentThread > 1 &&
            currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {return false;}
        return true;
    }

    public static double getCurrentSystemAvgLoad() {return statusListener.getSystemAverageLoad();
    }

    public static double getCurrentCpuUsage() {return statusListener.getCpuUsage();
    }

public class SystemStatusListener implements Runnable {

    volatile double currentLoad = -1;
    volatile double currentCpuUsage = -1;

    volatile String reason = StringUtil.EMPTY;

    volatile long processCpuTime = 0;
    volatile long processUpTime = 0;

    public double getSystemAverageLoad() {return currentLoad;}

    public double getCpuUsage() {return currentCpuUsage;}

    @Override
    public void run() {
        try {OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
            currentLoad = osBean.getSystemLoadAverage();

            /*
             * Java Doc copied from {@link OperatingSystemMXBean#getSystemCpuLoad()}:</br>
             * Returns the "recent cpu usage" for the whole system. This value is a double in the [0.0,1.0] interval.
             * A value of 0.0 means that all CPUs were idle during the recent period of time observed, while a value
             * of 1.0 means that all CPUs were actively running 100% of the time during the recent period being
             * observed. All values between 0.0 and 1.0 are possible depending of the activities going on in the
             * system. If the system recent cpu usage is not available, the method returns a negative value.
             */
            double systemCpuUsage = osBean.getSystemCpuLoad();

            // calculate process cpu usage to support application running in container environment
            RuntimeMXBean runtimeBean = ManagementFactory.getPlatformMXBean(RuntimeMXBean.class);
            long newProcessCpuTime = osBean.getProcessCpuTime();
            long newProcessUpTime = runtimeBean.getUptime();
            int cpuCores = osBean.getAvailableProcessors();
            long processCpuTimeDiffInMs = TimeUnit.NANOSECONDS
                    .toMillis(newProcessCpuTime - processCpuTime);
            long processUpTimeDiffInMs = newProcessUpTime - processUpTime;
            double processCpuUsage = (double) processCpuTimeDiffInMs / processUpTimeDiffInMs / cpuCores;
            processCpuTime = newProcessCpuTime;
            processUpTime = newProcessUpTime;

            currentCpuUsage = Math.max(processCpuUsage, systemCpuUsage);

            if (currentLoad > SystemRuleManager.getSystemLoadThreshold()) {writeSystemStatusLog();
            }
        } catch (Throwable e) {RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e);
        }
    }

    private void writeSystemStatusLog() {StringBuilder sb = new StringBuilder();
        sb.append("Load exceeds the threshold:");
        sb.append("load:").append(String.format("%.4f", currentLoad)).append(";");
        sb.append("cpuUsage:").append(String.format("%.4f", currentCpuUsage)).append(";");
        sb.append("qps:").append(String.format("%.4f", Constants.ENTRY_NODE.passQps())).append(";");
        sb.append("rt:").append(String.format("%.4f", Constants.ENTRY_NODE.avgRt())).append(";");
        sb.append("thread:").append(Constants.ENTRY_NODE.curThreadNum()).append(";");
        sb.append("success:").append(String.format("%.4f", Constants.ENTRY_NODE.successQps())).append(";");
        sb.append("minRt:").append(String.format("%.2f", Constants.ENTRY_NODE.minRt())).append(";");
        sb.append("maxSuccess:").append(String.format("%.2f", Constants.ENTRY_NODE.maxSuccessQps())).append(";");
        RecordLog.info(sb.toString());
    }
}

三、京东版最佳实际

3.1 应用形式

Sentinel 应用形式自身非常简单,就是一个注解,然而要思考规定加载和规定长久化的形式,现有的形式有:

•应用 Sentinel-dashboard 性能:应用面板接入须要保护一个配置规定的治理端,思考到偏后端的零碎须要额定保护一个面板老本较大,如果是像 RPC 框架这种自身有治理端的接入能够思考次计划。

•中间件(如:zookepper、nacos、eureka、redis 等):Sentinel 源码 extension 包里提供了相似的实现,如下图

联合京东理论,我实现了一个规定热部署的 Sentinel 组件,实现形式相似 zookeeper 的形式,将规定记录到 ducc 的一个 key 上,在 spring 容器启动时做第一次规定加载和监听器注册,组件也做一了一些规定读取,校验、实例化不同规定对象的工作

插件应用形式:注解 + 配置

第一步 引入组件

<dependency>
    <groupId>com.jd.ldop.tools</groupId>
    <artifactId>sentinel-tools</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>

第二步 初始化 sentinelProcess

反对 ducc、本地文件读取、间接写入三种形式规定写入形式

目前反对限流规定、熔断降级规定两种模式,零碎负载保护模式待开发和验证

<!-- 基于 sentinel 的降级、限流、熔断组件 -->
    <bean id="sentinelProcess" class="com.jd.ldop.sentinel.SentinelProcess">
        <property name="ruleResourceWrappers">
            <list>
                <ref bean="degradeRule"/>
            </list>
        </property>
    </bean>

    <!-- 降级或限流规定配置 -->
    <bean id="degradeRule" class="com.jd.ldop.sentinel.dto.RuleResourceWrapper">
        <constructor-arg index="0" value="ducc.degradeRule"/>
        <constructor-arg index="1" value="0"/>
        <constructor-arg index="2" value="0"/>
    </bean>

ducc 上配置如下:

第三步 定义资源和关联类型

通过 @SentinelResource 能够间接在任意地位定义资源名以及对应的熔断降级或者限流形式、回调办法等,同时也能够指定关联类型,反对间接、关联、指定链路三种

    @Override
    @SentinelResource(value = "modifyGetWaybillState", fallback = "executeDegrade")
    public ExecutionResult<List<Integer>> execute(@NotNull Model imodel) {// 业务逻辑解决}

    public ExecutionResult<List<Integer>> executeDegrade(@NotNull Model imodel) {// 降级业务逻辑解决}

3.2 利用场景

组件反对任意的业务降级、限流、负载爱护

四、Sentinel 压测数据

4.1 压测指标

调用量:1.2W/m

利用机器内存稳固在 50% 以内

机器规格:8C16G50G 磁盘 *2

Sentinel 降级规定:

count=350——- 慢调用临界阈值 350ms

timeWindow=180—— 熔断工夫窗口 180s

grade=0—– 降级模式 慢调用

statIntervalMs=60000—— 统计时长 1min

4.2 压测后果

利用机器监控:

压测分为了两个阶段,别离是组件开启和组件敞开两次,前半部分是组件开启的状况,后半局部是组件敞开的状况

利用过程内存剖析, 和 sentinel 无关的前三对象是

com.alibaba.csp.sentinel.node.metric.MetricNode

com.alibaba.csp.sentinel.CtEntry

com.alibaba.csp.sentinel.context.Context

4.3 压测论断

使 Sentinel 组件实现零碎服务主动降级或限流,因为 sentinel 会依照滑动窗口周期性统计数据,因而会占用肯定的机器内存,应用时应设置正当的规定,如:正当的统计时长、防止过多的 Sentinel 资源创立等。

总体来说,应用 sentinel 组件对利用 cpu 和内存影响不大。

正文完
 0