共计 3980 个字符,预计需要花费 10 分钟才能阅读完成。
Flink 实时 metrics
目前咱们的 flink 工作跑在 yarn 集群上,在面对以下问题时
- 常驻实时 job 是否在稳固运行?
- 实时数据的解决能力如何?生产过慢?是否须要申请更多资源晋升生产能力?
- 实时数据品质牢靠?是否有丢数据的危险?
- 实时工作现有的资源是否足够撑持现有的数据量?资源是否闲置节约?
尽管 flink web ui 提供了一些监控信息,然而对开发还是不够敌对,所以咱们利用 flink metrics + prometheus + grafana 搭建了一套实时监控看板,有利于收集 flink 工作的实时状态。
首先介绍下 Flink Metric
-
Metric Types
- Counter: 示意收集的数据是依照某个趋势(减少/缩小)始终变动的
- Gauge: 示意收集的数据是一个刹时的值,与工夫没有关系,能够任意变高变低,往往能够用来记录内存使用率、磁盘使用率等。
- Histogram: 统计数据的散布状况。
- Meter: 度量一系列事件产生的速率 (rate)。
-
Metric Reporters
- Metrics 信息能够通过 flink-conf.yaml 配置,在 job 启动的时候实时上报到内部零碎上。
-
System Metrics
- Flink 外部会预约义一些 Metrics 指标信息,蕴含 CPU,Memory,IO,Thread,Network,JVM GarbageCollection 等信息
-
User Defined Metrics
- 用户能够本人依据本人的业务须要,自定义一些监控指标
val counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter")
Metric 监控搭建
-
梳理监控指标
a. 零碎指标-
job 数量的监控
- 常驻 job 数量的监控
- 及时发现 job 运行过程中的重启,失败问题
-
算子音讯解决的 numRecordsIn 和 numRecordsOut
- 线图趋势把握工作解决的负载量
- 及时发现 job 资源分配是否正当,尽量避免音讯稳定带来的零碎提早增高
-
音讯提早监控
- Flink 算子之间消息传递的最大,最小,均匀提早。
- 及时发现工作音讯的解决效率稳定
-
内存,JVM GC 的状态
- taskmanager 的内存,GC 状态的线图稳定。
- 及时发现零碎中资源的利用率,正当调配集群资源。
b. 自定义监控指标
- Source 端咱们采纳 kafka 作为数据的输出源
a. 通过监控 kafka consumer group 的 lagOffset 来发现 flow 的数据生产能力是否有升高。 - Sink 端咱们本人实现了 clickhouse,hbase,hive,kafka 等多端输入,为了防止 Flink 的流式解决对 Sink 终端造成过大的写入压力,咱们形象了一个批次的 buffer cache,当数据的批次达到了阀值,或者 buffer cache 肯定的工夫距离,就将 buffer cache 内的数据一次性 doFlush 到各端存储, 各个 sink 实例 只需实现 BucketBufferedSink.doFlush 办法
-
因为 Sink 过程中,可能面临局部 buffer cache 中的数据在 flush 过程中因为某种原因失败而导致数据失落,所以必须要及时发现数据不统一,以便重跑工作复原数据。咱们在 BucketBufferedSink 之上形象了 SinkMetric, 并在 BucketBufferedSink.addBuffer() 做了 sinkPushCounter.inc 埋点计数 BucketBufferedSink.flush() 做了 sinkFlushCounter.inc()
- sinkPushCounter 统计进入到 buffercache 的数据条数
- sinkFlushCounter 统计 buffercache flush 进来的数据条数 *
-
施行搭建监控零碎
a. 零碎部署图
b. 在 flink-conf.yml 中 配置 flink metrics reporter,可让 flink 主动的上报 metric 信息# metrics configuration metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter metrics.reporter.grph.host: ${host} metrics.reporter.grph.port: ${port} metrics.reporter.grph.protocol: TCP # 运行时指定 metrics.reporter.grph.prefix="flink.${JOB_NAME}" metrics.latency.interval: 30000
- 通过 flink run -yD metrics.reporter.grph.prefix=”${JOB_NAME}” 的形式可动静指定各个实时工作的监控进行分组。
- 通过 metrics.latency.interval: 30000 设置每 30s flink 主动上报算子之间的提早信息。
c. Graphite-exporter 作为 prometheus 收集零碎的网关,是所有 metric 信息的上报入口
- 通过配置 mapping.yml 转化为有 label 维度的 Prometheus 数据,推送给 Prometheus
mappings:
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.Status\.(\w+)\.(\w+)\.([\w-]+)\.(\w+)'
match_type: regex
name: flink_taskmanager_Status_${4}_${5}_${6}_${7}
labels:
host: $2
container: $3
job_name: $1
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.([\w-]+)\.(.+)\.(\d+)\.Shuffle\.Netty\.(.*)'
match_type: regex
action: drop
name: dropped
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.([\w-]+)\.(.+)\.(\d+)\.(.*)\.(Buffers|buffers)\.(.*)$'
match_type: regex
action: drop
name: dropped
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.([\w-]+)\.(.+)\.(\d+)\.([\w]+)\-([\w]+)\.(\w+)'
match_type: regex
name: flink_taskmanager_operator_${7}_${9}
labels:
host: $2
container: $3
job_name: $1
operator: $5
task: $6
custom_metric: $7
sink_instance: $8
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.([\w-]+)\.(.+)\.(\d+)\.([\w-]+)\.(\w+)'
match_type: regex
name: flink_taskmanager_operator_${7}_${8}
labels:
host: $2
container: $3
job_name: $1
operator: $5
task: $6
- match: 'flink\.([\w-]+)\.(.*)\.jobmanager\.Status\.(.*)'
match_type: regex
name: flink_jobmanager_Status_$3
labels:
host: $2
job_name: $1
- match: 'flink\.([\w-]+)\.(.*)\.jobmanager.(\w+)$'
match_type: regex
name: flink_jobmanager_${3}
labels:
host: $2
job_name: $1
- match: 'flink\.([\w-]+)\.(.*)\.jobmanager.(.*)\.(\w+)'
match_type: regex
name: flink_jobmanager_${4}
labels:
host: $2
job_name: $1
- match: 'flink\.([\w-]+)\.(.*)\.jobmanager.(.*)\.(.*)\.(.*)'
match_type: regex
name: flink_jobmanager_${4}_${5}
labels:
host: $2
job_name: $1
- match: "."
match_type: regex
action: drop
name: "dropped"
实时监控看板展现
- 通过 kafka lag 及时发现数据沉积导致的生产提早。
- 通过检测在线运行 Job 数量,及时发现 Job 运行失败的问题。
- 通过统计 Source 端 和 Sink 端的音讯处理速度,及时反馈当前任务的解决能力。
- 通过音讯的延时指标,发现 Job 的流解决的响应提早。
- 通过 Jvm 内存及 GC 状态,正当调配系统资源。
- 通过 Sink 算子的 Push to BufferCache 数量与 BufferCache Flush 到各端存储数量的比照,及时发现数据失落问题。
援用文章
- https://ci.apache.org/project…
- https://github.com/prometheus…
- https://prometheus.io/docs/in…
- https://grafana.com/docs/?plc…