背景

日志零碎接入的日志品种多、格局简单多样,支流的有以下几种日志:

  • filebeat采集到的文本日志,格局多样
  • winbeat采集到的操作系统日志
  • 设施上报到logstash的syslog日志
  • 接入到kafka的业务日志

以上通过各种渠道接入的日志,存在2个次要的问题:

  • 格局不对立、不标准、标准化不够
  • 如何从各类日志中提取出用户关怀的指标,开掘更多的业务价值

为了解决下面2个问题,咱们基于flink和drools规定引擎做了实时的日志解决服务。

零碎架构

架构比较简单,架构图如下:

各类日志都是通过kafka汇总,做日志直达。

flink生产kafka的数据,同时通过API调用拉取drools规定引擎,对日志做解析解决后,将解析后的数据存储到Elasticsearch中,用于日志的搜寻和剖析等业务。

为了监控日志解析的实时状态,flink会将日志解决的统计数据,如每分钟解决的日志量,每种日志从各个机器IP来的日志量写到Redis中,用于监控统计。

模块介绍

零碎我的项目命名为eagle。

eagle-api:基于springboot,作为drools规定引擎的写入和读取API服务。

eagle-common:通用类模块。

eagle-log:基于flink的日志解决服务。

重点讲一下eagle-log:

对接kafka、ES和Redis

对接kafka和ES都比较简单,用的官网的connector(flink-connector-kafka-0.10和flink-connector-elasticsearch6),详见代码。

对接Redis,最开始用的是org.apache.bahir提供的redis connector,起初发现灵便度不够,就应用了Jedis。

在将统计数据写入redis的时候,大数据培训最开始用的keyby分组后缓存了分组数据,在sink中做统计解决后写入,参考代码如下:

String name = "redis-agg-log";        DataStream<Tuple2<String, List<LogEntry>>> keyedStream = dataSource.keyBy((KeySelector<LogEntry, String>) log -> log.getIndex())                .timeWindow(Time.seconds(windowTime)).trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))                .process(new ProcessWindowFunction<LogEntry, Tuple2<String, List<LogEntry>>, String, TimeWindow>() {                    @Override                    public void process(String s, Context context, Iterable<LogEntry> iterable, Collector<Tuple2<String, List<LogEntry>>> collector) {                        ArrayList<LogEntry> logs = Lists.newArrayList(iterable);                        if (logs.size() > 0) {                            collector.collect(new Tuple2(s, logs));                        }                    }                }).setParallelism(redisSinkParallelism).name(name).uid(name);

起初发现这样做对内存耗费比拟大,其实不须要缓存整个分组的原始数据,只须要一个统计数据就OK了,优化后:

String name = "redis-agg-log";        DataStream<LogStatWindowResult> keyedStream = dataSource.keyBy((KeySelector<LogEntry, String>) log -> log.getIndex())                .timeWindow(Time.seconds(windowTime))                .trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))                .aggregate(new LogStatAggregateFunction(), new LogStatWindowFunction())                .setParallelism(redisSinkParallelism).name(name).uid(name);

这里应用了flink的聚合函数和Accumulator,通过flink的agg操作做统计,加重了内存耗费的压力。

应用broadcast播送drools规定引擎

1、drools规定流通过broadcast map state播送进来。

2、kafka的数据流connect规定流解决日志。

//播送规定流env.addSource(new RuleSourceFunction(ruleUrl)).name(ruleName).uid(ruleName).setParallelism(1)                .broadcast(ruleStateDescriptor);//kafka数据流FlinkKafkaConsumer010<LogEntry> source = new FlinkKafkaConsumer010<>(kafkaTopic, new LogSchema(), properties);env.addSource(source).name(kafkaTopic).uid(kafkaTopic).setParallelism(kafkaParallelism);//数据流connect规定流解决日志BroadcastConnectedStream<LogEntry, RuleBase> connectedStreams = dataSource.connect(ruleSource);connectedStreams.process(new LogProcessFunction(ruleStateDescriptor, ruleBase)).setParallelism(processParallelism).name(name).uid(name);

具体细节参考开源代码。

小结
本零碎提供了一个基于flink的实时数据处理参考,对接了kafka、redis和elasticsearch,通过可配置的drools规定引擎,将数据处理逻辑配置化和动态化。

对于解决后的数据,也能够对接到其余Fink,为其余各类业务平台提供数据的解析、荡涤和标准化服务。