乐趣区

阿里sentinel源码研究深入

1. 阿里 sentinel 源码研究深入

1.1. 前言

  • 昨天已经把 sentinel 成功部署到线上环境,可参考我上篇博文,该走的坑也都走了一遍,已经可以初步使用它的限流和降级功能,根据我目前的实践,限流和降级规则似乎不能一同起效,还不知道原因,下面继续探索

1.2. 源码

1.2.1. 流控降级监控等的构建

  • 首先客户端而言,我关注的是我写的代码SphU.entry,这明显是很关键的方法,下图的内容就是这里构建的

-Sentinel 工作主流程就包含在上面一个方法里,通过链式调用的方式,经过了建立树状结构,保存统计簇点,异常日志记录,实时数据统计,负载保护,权限认证,流量控制,熔断降级等 Slot

  • 进入链式方法的入口为 CtSph 类,try 方法大括号内
Entry e = new CtEntry(resourceWrapper, chain, context);
        try {chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }

1.2.2. 修改控制台规则是如何通知客户端的?

  • 看 sentinel-transport-simple-http 包中的 HttpEventTask 类,它开启了一个线程,转么用来做为 socket 连接,控制台通过 socket 请求通知客户端,从而更新客户端规则,更改规则核心代码如下
// Find the matching command handler.
            CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
            if (commandHandler != null) {CommandResponse<?> response = commandHandler.handle(request);
                handleResponse(response, printWriter, outputStream);
            } else {
                // No matching command handler.
                badRequest(printWriter, "Unknown command `" + commandName + '`');
            }

通过命令模式,commandName 为 setRules 时,更新规则

1.2.3. 既然它建立连接用的 socket,为什么不用 netty 呢?

  • 带着这个疑问,我本想在 issues 里找下,突然发现它的源码中有个 sentinel-transport-netty-http 这个包和 sentinel-transport-simple-http 处于同级,官方的例子用的 simple-http,但明显它也准备了 netty-http,于是我替换成了 netty-http,运行后效果和原先一样,至于效率上有没有提升,我就不清楚了 ^_^

1.2.4. 流量规则如何检查?

  • 该规则检查类为FlowRuleChecker,在 core 核心包中,核心检查方法如下
    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {return true;}

        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }

1.2.5. 熔断降级如何判断?

  • 判断类为DegradeRuleManager,在 core 核心包,核心内容如下,再深入就是它判断的算法了,感兴趣的自己去看如下的passCheck
    public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
        throws BlockException {Set<DegradeRule> rules = degradeRules.get(resource.getName());
        if (rules == null) {return;}

        for (DegradeRule rule : rules) {if (!rule.passCheck(context, node, count)) {throw new DegradeException(rule.getLimitApp(), rule);
            }
        }
    }

1.2.6. 默认的链条构建在哪?

  • 核心类为DefaultSlotChainBuilder,构建了如下的 slot
public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());

        return chain;
    }

}

1.2.7. 既然已经知道了它是如何构建链式的处理节点的,我们是否何可自己重新构建?

  • 发现类 SlotChainProvider 中的构建方法如下
private static void resolveSlotChainBuilder() {List<SlotChainBuilder> list = new ArrayList<SlotChainBuilder>();
        boolean hasOther = false;
        for (SlotChainBuilder builder : LOADER) {if (builder.getClass() != DefaultSlotChainBuilder.class) {
                hasOther = true;
                list.add(builder);
            }
        }
        if (hasOther) {builder = list.get(0);
        } else {
            // No custom builder, using default.
            builder = new DefaultSlotChainBuilder();}

        RecordLog.info("[SlotChainProvider] Global slot chain builder resolved:"
            + builder.getClass().getCanonicalName());
    }
  • 也就是说,我们如果在 LOADER 中加入了其他的非默认实现就可以替代原来的 DefaultSlotChainBuilder,那LOADER 怎么来的?看代码,如下的全局变量,也就是需要自定义实现 SlotChainBuilder 接口的实现类
private static final ServiceLoader<SlotChainBuilder> LOADER = ServiceLoader.load(SlotChainBuilder.class);

1.2.8. 如何实现 SlotChainBuilder 接口呢?

  • 这里要注意的是它使用了ServiceLoader,也就是SPI,全称Service Provider Interface,加载它需要特定的配合,比如我自定义实现一个Slot
/**
 * @author laoliangliang
 * @date 2019/7/25 14:13
 */
public class MySlotChainBuilder implements SlotChainBuilder {
    @Override
    public ProcessorSlotChain build() {ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());
        // 自定义的
        chain.addLast(new CarerSlot());

        return chain;
    }
}
/**
 * @author laoliangliang
 * @date 2019/7/25 14:15
 */
@Slf4j
public class CarerSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {log.info(JSON.toJSONString(resourceWrapper));
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);
    }
}
  • 这里我自定义了 CarerSlot,那是否能被加载到呢?事实上还不够,需要在META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder 建这样一个文件,内容如下

  • 好了,这样配置过后,它就能读到我们自定义的实现类代替它原先的类了

1.2.9. 该命令模式最初的初始化阶段在哪?

  • 用过 sentinel 的都会感受到,只有当有第一个 sentinel 监控的请求过来时,sentinel 客户端才会正式初始化,这样看来,这个初始化步骤应该在哪呢?
  • 我通过不断反向跟踪上述的命令模式最初的初始化,找到了最初初始化的地方如下
public class Env {public static final Sph sph = new CtSph();

    static {
        // If init fails, the process will exit.
        InitExecutor.doInit();}

}
  • 有没有觉得很熟悉?doInit 就是很多初始化的起点,当 Env 被调用时会运行 static 代码块,那么只有可能是 sph 被调用时
  • 只要你 debug 过我上述第一条 SphU.entry 的源码,就会发现,如下,该方法一进入不就是先获取 Env 的 sph,再调用的 entry 吗,所以初始化的地方也就找到了,第一次调用 SphU.entry 的地方,或者你不用这个,使用的注解,里面同样有这个方法
    public static Entry entry(String name) throws BlockException {return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
    }

1.2.10. 注解是如何实现熔断降级的?

  • 这个其实是比较容易理解的,既然通过 SphU.entry 包裹可以实现熔断降级,通过注解的形式包裹代码方法应该是比较容易的,那么在哪里实现和配置的呢
  • 看过我前一篇文章的应该看到了,有存在如下配置
    @Bean
    public SentinelResourceAspect sentinelResourceAspect() {pushlish();
        return new SentinelResourceAspect();}
  • 很明显的注解切面,通过 spring 注解的形式注入,我觉得这还是比较优雅的注入方式了,点进入就可以看到如下
    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {}

@SentinelResource 注解进行了处理

1.2.11. 什么是直接失败?

  • 这个很好理解,qps 超过设置的值,直接失败

1.2.12. 什么是排队等待?

  • 这个似乎看字面意思很好理解,但是一旦你点了这个选项,下面还有个参数的

  • 所以这个排队等待是有超时时间的,达到峰值后匀速通过,采用的漏桶算法,流控图

1.2.13. 什么是慢启动模式?

  • 以下是核心算法,Warm Up模式不看算法细节,看它的中文说明应该就能理解是怎么回事了吧;所谓慢启动模式,要求系统的 QPS 请求增速不能超过一定的速率,否则会被压制超过部分请求失败,应该是为了避免一启动就有大流量的请求进入导致系统一下子就宕机卡主或直接进入了熔断
    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {long passQps = (long) node.passQps();

        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);

        // 开始计算它的斜率
        // 如果进入了警戒线,开始调整他的 qps
        long restToken = storedTokens.get();
        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;
            // 消耗的速度要比 warning 快,但是要比慢
            // current interval = restToken*slope+1/count
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            if (passQps + acquireCount <= warningQps) {return true;}
        } else {if (passQps + acquireCount <= count) {return true;}
        }

        return false;
    }
  • 配置如下时,测试流控

  • 流控图

1.2.14. 模式总结

  • 你会发现直接失败和排队等待的区别在流控图上并不明显,那差别在哪呢?我重庆给个请求参数,5 秒内模拟 100 个人轮流请求 10 次

  • sentinel 控制台设置

  • 流控图

  • 总结:我设置了超时时间是 5 秒,而 100 个线程 10 次轮询也就是 1000 个请求,可以看出,它并不是一定要在 5 秒内解决这些请求,有了延时后,代表只要响应时间在 5 秒以内,不管多少请求都不会拒绝;
  • 几个模式有利有弊,默认的快速失败使我们可以最大程度的控制系统的 QPS,避免造成系统压力过大,但同时可能造成用于的体验效果变差
  • 慢启动上面说过了
  • 排队等待在设置合理的超时时间后可以最大程度的避免求情的失败,但同时可能造成线程压力过大
  • 综上,在我看来排队等待模式是比较适合线上运行的,只是需要设置合理的超时时间,大公司机器不愁那就设小点,业界一般标准是 200ms 用户无感知,中小型可以设 500ms 甚至更大,看机器情况动态调整了

1.2.15. 提醒

  • 像我是用 apollo 来持久化规则的,你也可以用 nacos,redis,zookeeper 等,当控制台未启动时,你启动客户端 规则也会生效,只是没了控制台实时监控数据

今日视频:微服务架构实战 160 讲


欢迎关注公众号,一起学习进步

退出移动版