背景
日志零碎接入的日志品种多、格局简单多样,支流的有以下几种日志:
- filebeat 采集到的文本日志,格局多样
- winbeat 采集到的操作系统日志
- 设施上报到 logstash 的 syslog 日志
- 接入到 kafka 的业务日志
以上通过各种渠道接入的日志,存在 2 个次要的问题:
- 格局不对立、不标准、标准化不够
- 如何从各类日志中提取出用户关怀的指标,开掘更多的业务价值
为了解决下面 2 个问题,咱们基于 flink 和 drools 规定引擎做了实时的日志解决服务。
零碎架构
架构比较简单,架构图如下:
各类日志都是通过 kafka 汇总,做日志直达。
flink 生产 kafka 的数据,同时通过 API 调用拉取 drools 规定引擎,对日志做解析解决后,将解析后的数据存储到 Elasticsearch 中,用于日志的搜寻和剖析等业务。
为了监控日志解析的实时状态,flink 会将日志解决的统计数据,如每分钟解决的日志量,每种日志从各个机器 IP 来的日志量写到 Redis 中,用于监控统计。
模块介绍
零碎我的项目命名为 eagle。
eagle-api:基于 springboot,作为 drools 规定引擎的写入和读取 API 服务。
eagle-common:通用类模块。
eagle-log:基于 flink 的日志解决服务。
重点讲一下 eagle-log:
对接 kafka、ES 和 Redis
对接 kafka 和 ES 都比较简单,用的官网的 connector(flink-connector-kafka-0.10 和 flink-connector-elasticsearch6),详见代码。
对接 Redis,最开始用的是 org.apache.bahir 提供的 redis connector,起初发现灵便度不够,就应用了 Jedis。
在将统计数据写入 redis 的时候,大数据培训最开始用的 keyby 分组后缓存了分组数据,在 sink 中做统计解决后写入,参考代码如下:
String name = "redis-agg-log";
DataStream<Tuple2<String, List<LogEntry>>> keyedStream = dataSource.keyBy((KeySelector<LogEntry, String>) log -> log.getIndex())
.timeWindow(Time.seconds(windowTime)).trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))
.process(new ProcessWindowFunction<LogEntry, Tuple2<String, List<LogEntry>>, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<LogEntry> iterable, Collector<Tuple2<String, List<LogEntry>>> collector) {ArrayList<LogEntry> logs = Lists.newArrayList(iterable);
if (logs.size() > 0) {collector.collect(new Tuple2(s, logs));
}
}
}).setParallelism(redisSinkParallelism).name(name).uid(name);
起初发现这样做对内存耗费比拟大,其实不须要缓存整个分组的原始数据,只须要一个统计数据就 OK 了,优化后:
String name = "redis-agg-log";
DataStream<LogStatWindowResult> keyedStream = dataSource.keyBy((KeySelector<LogEntry, String>) log -> log.getIndex())
.timeWindow(Time.seconds(windowTime))
.trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))
.aggregate(new LogStatAggregateFunction(), new LogStatWindowFunction())
.setParallelism(redisSinkParallelism).name(name).uid(name);
这里应用了 flink 的聚合函数和 Accumulator,通过 flink 的 agg 操作做统计,加重了内存耗费的压力。
应用 broadcast 播送 drools 规定引擎
1、drools 规定流通过 broadcast map state 播送进来。
2、kafka 的数据流 connect 规定流解决日志。
// 播送规定流
env.addSource(new RuleSourceFunction(ruleUrl)).name(ruleName).uid(ruleName).setParallelism(1)
.broadcast(ruleStateDescriptor);
//kafka 数据流
FlinkKafkaConsumer010<LogEntry> source = new FlinkKafkaConsumer010<>(kafkaTopic, new LogSchema(), properties);env.addSource(source).name(kafkaTopic).uid(kafkaTopic).setParallelism(kafkaParallelism);
// 数据流 connect 规定流解决日志
BroadcastConnectedStream<LogEntry, RuleBase> connectedStreams = dataSource.connect(ruleSource);
connectedStreams.process(new LogProcessFunction(ruleStateDescriptor, ruleBase)).setParallelism(processParallelism).name(name).uid(name);
具体细节参考开源代码。
小结
本零碎提供了一个基于 flink 的实时数据处理参考,对接了 kafka、redis 和 elasticsearch,通过可配置的 drools 规定引擎,将数据处理逻辑配置化和动态化。
对于解决后的数据,也能够对接到其余 Fink,为其余各类业务平台提供数据的解析、荡涤和标准化服务。