关于Flink:基于-Flink-和-Drools-的实时日志处理

4次阅读

共计 2747 个字符,预计需要花费 7 分钟才能阅读完成。

背景

日志零碎接入的日志品种多、格局简单多样,支流的有以下几种日志:

  • filebeat 采集到的文本日志,格局多样
  • winbeat 采集到的操作系统日志
  • 设施上报到 logstash 的 syslog 日志
  • 接入到 kafka 的业务日志

以上通过各种渠道接入的日志,存在 2 个次要的问题:

  • 格局不对立、不标准、标准化不够
  • 如何从各类日志中提取出用户关怀的指标,开掘更多的业务价值

为了解决下面 2 个问题,咱们基于 flink 和 drools 规定引擎做了实时的日志解决服务。

零碎架构

架构比较简单,架构图如下:

各类日志都是通过 kafka 汇总,做日志直达。

flink 生产 kafka 的数据,同时通过 API 调用拉取 drools 规定引擎,对日志做解析解决后,将解析后的数据存储到 Elasticsearch 中,用于日志的搜寻和剖析等业务。

为了监控日志解析的实时状态,flink 会将日志解决的统计数据,如每分钟解决的日志量,每种日志从各个机器 IP 来的日志量写到 Redis 中,用于监控统计。

模块介绍

零碎我的项目命名为 eagle。

eagle-api:基于 springboot,作为 drools 规定引擎的写入和读取 API 服务。

eagle-common:通用类模块。

eagle-log:基于 flink 的日志解决服务。

重点讲一下 eagle-log:

对接 kafka、ES 和 Redis

对接 kafka 和 ES 都比较简单,用的官网的 connector(flink-connector-kafka-0.10 和 flink-connector-elasticsearch6),详见代码。

对接 Redis,最开始用的是 org.apache.bahir 提供的 redis connector,起初发现灵便度不够,就应用了 Jedis。

在将统计数据写入 redis 的时候,大数据培训最开始用的 keyby 分组后缓存了分组数据,在 sink 中做统计解决后写入,参考代码如下:

String name = "redis-agg-log";
        DataStream<Tuple2<String, List<LogEntry>>> keyedStream = dataSource.keyBy((KeySelector<LogEntry, String>) log -> log.getIndex())
                .timeWindow(Time.seconds(windowTime)).trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))
                .process(new ProcessWindowFunction<LogEntry, Tuple2<String, List<LogEntry>>, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<LogEntry> iterable, Collector<Tuple2<String, List<LogEntry>>> collector) {ArrayList<LogEntry> logs = Lists.newArrayList(iterable);
                        if (logs.size() > 0) {collector.collect(new Tuple2(s, logs));
                        }
                    }
                }).setParallelism(redisSinkParallelism).name(name).uid(name);

起初发现这样做对内存耗费比拟大,其实不须要缓存整个分组的原始数据,只须要一个统计数据就 OK 了,优化后:

String name = "redis-agg-log";
        DataStream<LogStatWindowResult> keyedStream = dataSource.keyBy((KeySelector<LogEntry, String>) log -> log.getIndex())
                .timeWindow(Time.seconds(windowTime))
                .trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))
                .aggregate(new LogStatAggregateFunction(), new LogStatWindowFunction())
                .setParallelism(redisSinkParallelism).name(name).uid(name);

这里应用了 flink 的聚合函数和 Accumulator,通过 flink 的 agg 操作做统计,加重了内存耗费的压力。

应用 broadcast 播送 drools 规定引擎

1、drools 规定流通过 broadcast map state 播送进来。

2、kafka 的数据流 connect 规定流解决日志。

// 播送规定流
env.addSource(new RuleSourceFunction(ruleUrl)).name(ruleName).uid(ruleName).setParallelism(1)
                .broadcast(ruleStateDescriptor);

//kafka 数据流
FlinkKafkaConsumer010<LogEntry> source = new FlinkKafkaConsumer010<>(kafkaTopic, new LogSchema(), properties);env.addSource(source).name(kafkaTopic).uid(kafkaTopic).setParallelism(kafkaParallelism);

// 数据流 connect 规定流解决日志
BroadcastConnectedStream<LogEntry, RuleBase> connectedStreams = dataSource.connect(ruleSource);
connectedStreams.process(new LogProcessFunction(ruleStateDescriptor, ruleBase)).setParallelism(processParallelism).name(name).uid(name);

具体细节参考开源代码。

小结
本零碎提供了一个基于 flink 的实时数据处理参考,对接了 kafka、redis 和 elasticsearch,通过可配置的 drools 规定引擎,将数据处理逻辑配置化和动态化。

对于解决后的数据,也能够对接到其余 Fink,为其余各类业务平台提供数据的解析、荡涤和标准化服务。

正文完
 0