本文首发于泊浮目标简书:https://www.jianshu.com/u/204...
版本日期备注
1.02021.10.8文章首发

0. 前言

前阵子笔者波及了些许监控相干的开发工作,在开发过程中也碰到过些许问题,便翻读了FLink相干局部的代码,在读代码的过程中发现了一些好的设计,因而也是写成文章整顿上来。

本文的源码基于FLink1.13.2

1. 扩大插件化

在官网中,FLink社区本人提供了一些已接入的Repoter,如果咱们有本人定制的Reporter,也能够依据它的标准去实现本人的Repoter。

在FLink的代码中,提供了反射机制实例化MetricReporter:要求MetricReporter的实现类必须是public的拜访修饰符,不能是抽象类,必须有一个无参构造函数。

外围代码为RepoterSetup#getAllReporterFactories

    private static Iterator<MetricReporterFactory> getAllReporterFactories(            @Nullable PluginManager pluginManager) {        final Iterator<MetricReporterFactory> factoryIteratorSPI =                ServiceLoader.load(MetricReporterFactory.class).iterator();        final Iterator<MetricReporterFactory> factoryIteratorPlugins =                pluginManager != null                        ? pluginManager.load(MetricReporterFactory.class)                        : Collections.emptyIterator();        return Iterators.concat(factoryIteratorPlugins, factoryIteratorSPI);    }

该代码会通过Java的SPI机制来获取MetricReporter的相干实现类,实质上是通过ClassLoder来获取。

|-- ReporterSetup     \-- fromConfiguration //当集群启动时,会从配置读取监控并初始化相干类         \-- loadAvailableReporterFactories // 加载无效的Reporter们             \-- getAllReporterFactories //  外围代码,通过SPI以及ClassLoader机制获取Repoter们

2. 内置松耦合

上文提到了社区会提供常见的一些监控Repoter。在代码中,实质是工厂模式的实现。

/** * {@link MetricReporter} factory. * * <p>Reporters that can be instantiated with a factory automatically qualify for being loaded as a * plugin, so long as the reporter jar is self-contained (excluding Flink dependencies) and contains * a {@code META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory} file * containing the qualified class name of the factory. * * <p>Reporters that previously relied on reflection for instantiation can use the {@link * InstantiateViaFactory} annotation to redirect reflection-base instantiation attempts to the * factory instead. */public interface MetricReporterFactory {    /**     * Creates a new metric reporter.     *     * @param properties configured properties for the reporter     * @return created metric reporter     */    MetricReporter createMetricReporter(final Properties properties);}

每接入一个监控,只有实现相应的工厂办法即可。目前实现的有:

  • org.apache.flink.metrics.graphite.GraphiteReporterFactory
  • org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
  • org.apache.flink.metrics.prometheus.PrometheusReporter
  • org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
  • org.apache.flink.metrics.statsd.StatsDReporterFactory
  • org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
  • org.apache.flink.metrics.slf4j.Slf4jReporterFactory

每当社区须要接入新的Repoter时,仅仅须要实现MetricReporterFactory 即可,而下层能感知到的也仅仅是MetricReporter ,和任何具体实现无关,这也是典型的一种防腐设计。

3. Fail safe

在流计算业务中,如果监控这种旁路逻辑产生问题,是否应该影响到骨干逻辑呢?答案是不应该的。

MetricRegistryImpl中(顾名思义,它会将所有的Repoter注册进这个类),构造函数会将相干的MetricReporter放到线程池中,定期的让它们上报数据。

|-- MetricRegistryImpl  \-- constructor

WebMonitorEndpoint中,也有线程池的身影。这个类提供了RestAPI来便于查问Metric。对于其余组件的申请通过Akka来异步发送,并通过线程池来解决这些回调的回复。

|-- WebMonitorEndpoint  \-- start    \-- initializeHandlers      \--   new JobConfigHandler|-- AbstractExecutionGraphHandler  \-- handleRequest

这是典型Fail-safe的设计。

4. 不仅只反对Push

在FLink中,监控数据不仅反对Push,同时还实现了Pull,而实现也十分的简略。

MetricQueryService实现了MetricQueryServiceGateway,这意味着它能够被近程调用。

其监控数据来源代码追踪:

|-- AbstractMetricGroup  \-- counter    |-- MetricRegistryImpl      \-- register        |-- MetricQueryService          \-- addMetric

下面提到的WebMonitorEndpoint也是一样,不过是基于RestAPI的实现,同样提供了Pull的策略。

5. 参考资料

  • https://nightlies.apache.org/...
  • https://cwiki.apache.org/conf...