乐趣区

关于grafana:Flink-实时-metrics

Flink 实时 metrics
目前咱们的 flink 工作跑在 yarn 集群上,在面对以下问题时

  1. 常驻实时 job 是否在稳固运行?
  2. 实时数据的解决能力如何?生产过慢?是否须要申请更多资源晋升生产能力?
  3. 实时数据品质牢靠?是否有丢数据的危险?
  4. 实时工作现有的资源是否足够撑持现有的数据量?资源是否闲置节约?

尽管 flink web ui 提供了一些监控信息,然而对开发还是不够敌对,所以咱们利用 flink metrics + prometheus + grafana 搭建了一套实时监控看板,有利于收集 flink 工作的实时状态。

首先介绍下 Flink Metric

  1. Metric Types

    1. Counter: 示意收集的数据是依照某个趋势(减少/缩小)始终变动的
    2. Gauge: 示意收集的数据是一个刹时的值,与工夫没有关系,能够任意变高变低,往往能够用来记录内存使用率、磁盘使用率等。
    3. Histogram: 统计数据的散布状况。
    4. Meter: 度量一系列事件产生的速率 (rate)。
  2. Metric Reporters

    1. Metrics 信息能够通过 flink-conf.yaml 配置,在 job 启动的时候实时上报到内部零碎上。
  3. System Metrics

    1. Flink 外部会预约义一些 Metrics 指标信息,蕴含 CPU,Memory,IO,Thread,Network,JVM GarbageCollection 等信息
  4. User Defined Metrics

    1. 用户能够本人依据本人的业务须要,自定义一些监控指标
val counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

Metric 监控搭建

  1. 梳理监控指标
    a. 零碎指标

    1. job 数量的监控

      1. 常驻 job 数量的监控
      2. 及时发现 job 运行过程中的重启,失败问题
    2. 算子音讯解决的 numRecordsIn 和 numRecordsOut

      1. 线图趋势把握工作解决的负载量
      2. 及时发现 job 资源分配是否正当,尽量避免音讯稳定带来的零碎提早增高
    3. 音讯提早监控

      1. Flink 算子之间消息传递的最大,最小,均匀提早。
      2. 及时发现工作音讯的解决效率稳定
    4. 内存,JVM GC 的状态

      1. taskmanager 的内存,GC 状态的线图稳定。
      2. 及时发现零碎中资源的利用率,正当调配集群资源。

    b. 自定义监控指标

    1. Source 端咱们采纳 kafka 作为数据的输出源
      a. 通过监控 kafka consumer group 的 lagOffset 来发现 flow 的数据生产能力是否有升高。
    2. Sink 端咱们本人实现了 clickhouse,hbase,hive,kafka 等多端输入,为了防止 Flink 的流式解决对 Sink 终端造成过大的写入压力,咱们形象了一个批次的 buffer cache,当数据的批次达到了阀值,或者 buffer cache 肯定的工夫距离,就将 buffer cache 内的数据一次性 doFlush 到各端存储, 各个 sink 实例 只需实现 BucketBufferedSink.doFlush 办法

因为 Sink 过程中,可能面临局部 buffer cache 中的数据在 flush 过程中因为某种原因失败而导致数据失落,所以必须要及时发现数据不统一,以便重跑工作复原数据。咱们在 BucketBufferedSink 之上形象了 SinkMetric, 并在 BucketBufferedSink.addBuffer() 做了 sinkPushCounter.inc 埋点计数 BucketBufferedSink.flush() 做了 sinkFlushCounter.inc()

  1. sinkPushCounter 统计进入到 buffercache 的数据条数
  2. sinkFlushCounter 统计 buffercache flush 进来的数据条数 *
  3. 施行搭建监控零碎
    a. 零碎部署图

    b. 在 flink-conf.yml 中 配置 flink metrics reporter,可让 flink 主动的上报 metric 信息

    # metrics configuration
    metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
    metrics.reporter.grph.host: ${host}
    metrics.reporter.grph.port: ${port}
    metrics.reporter.grph.protocol: TCP
    # 运行时指定
    metrics.reporter.grph.prefix="flink.${JOB_NAME}"
    
    metrics.latency.interval: 30000
    1. 通过 flink run -yD metrics.reporter.grph.prefix=”${JOB_NAME}” 的形式可动静指定各个实时工作的监控进行分组。
    2. 通过 metrics.latency.interval: 30000 设置每 30s flink 主动上报算子之间的提早信息。

    c. Graphite-exporter 作为 prometheus 收集零碎的网关,是所有 metric 信息的上报入口

    1. 通过配置 mapping.yml 转化为有 label 维度的 Prometheus 数据,推送给 Prometheus
mappings:
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.Status\.(\w+)\.(\w+)\.([\w-]+)\.(\w+)'
  match_type: regex
  name: flink_taskmanager_Status_${4}_${5}_${6}_${7}
  labels:
    host: $2
    container: $3
    job_name: $1
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.([\w-]+)\.(.+)\.(\d+)\.Shuffle\.Netty\.(.*)'
  match_type: regex
  action: drop
  name: dropped
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.([\w-]+)\.(.+)\.(\d+)\.(.*)\.(Buffers|buffers)\.(.*)$'
  match_type: regex
  action: drop
  name: dropped
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.([\w-]+)\.(.+)\.(\d+)\.([\w]+)\-([\w]+)\.(\w+)'
  match_type: regex
  name: flink_taskmanager_operator_${7}_${9}
  labels:
    host: $2
    container: $3
    job_name: $1
    operator: $5
    task: $6
    custom_metric: $7
    sink_instance: $8
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.([\w-]+)\.(.+)\.(\d+)\.([\w-]+)\.(\w+)'
  match_type: regex
  name: flink_taskmanager_operator_${7}_${8}
  labels:
    host: $2
    container: $3
    job_name: $1
    operator: $5
    task: $6
- match: 'flink\.([\w-]+)\.(.*)\.jobmanager\.Status\.(.*)'
  match_type: regex
  name: flink_jobmanager_Status_$3
  labels:
    host: $2
    job_name: $1
- match: 'flink\.([\w-]+)\.(.*)\.jobmanager.(\w+)$'
  match_type: regex
  name: flink_jobmanager_${3}
  labels:
    host: $2
    job_name: $1
- match: 'flink\.([\w-]+)\.(.*)\.jobmanager.(.*)\.(\w+)'
  match_type: regex
  name: flink_jobmanager_${4}
  labels:
    host: $2
    job_name: $1
- match: 'flink\.([\w-]+)\.(.*)\.jobmanager.(.*)\.(.*)\.(.*)'
  match_type: regex
  name: flink_jobmanager_${4}_${5}
  labels:
    host: $2
    job_name: $1
- match: "."
  match_type: regex
  action: drop
  name: "dropped"

实时监控看板展现

  1. 通过 kafka lag 及时发现数据沉积导致的生产提早。
  2. 通过检测在线运行 Job 数量,及时发现 Job 运行失败的问题。
  3. 通过统计 Source 端 和 Sink 端的音讯处理速度,及时反馈当前任务的解决能力。
  4. 通过音讯的延时指标,发现 Job 的流解决的响应提早。
  5. 通过 Jvm 内存及 GC 状态,正当调配系统资源。
  6. 通过 Sink 算子的 Push to BufferCache 数量与 BufferCache Flush 到各端存储数量的比照,及时发现数据失落问题。

援用文章

  • https://ci.apache.org/project…
  • https://github.com/prometheus…
  • https://prometheus.io/docs/in…
  • https://grafana.com/docs/?plc…
退出移动版