乐趣区

基于可视化配置的日志结构化转换实现

导读:数据总线 DBus 的总体架构中主要包括六大模块,分别是:日志抓取模块、增量转换模块、全量抽取程序、日志算子处理模块、心跳监控模块、Web 管理模块。六大模块各自的功能相互连接,构成 DBus 的工作原理:通过读取 RDBMS 增量日志的方式来实时获取增量数据日志(支持全量拉取);基于 Logstash,flume,filebeat 等抓取工具来实时获得数据,以可视化的方式对数据进行结构化输出。本文主要介绍的是 DBus 中基于可视化配置的日志结构化转换实现的部分。

一、结构化日志的原理

1.1 源端日志抓取

DBus 可以对接多种 log 数据源,例如:Logstash、Flume、Filebeat 等。上述组件都是业界比较流行的日志抓取工具,一方面便于用户和业界统一标准,方便用户技术方案的整合;另一方面也避免了无谓的重复造轮子。抓取的数据我们称为原始数据日志(raw data log),由抓取组件将其写入 Kafka 中,等待 DBus 后续处理。

1.2 可视化配置规则,使日志结构化

用户可自定义配置日志源和目标端。同一个日志源的数据可以输出到多个目标端。每一条“日志源 - 目标端”线,用户可以根据自己的需要来配置相应的过滤规则。经过规则算子处理后的日志是结构化的,即:有 schema 约束,类似于数据库中的表。

1.3 规则算子

DBus 设计了丰富易用的算子,用于对数据进行定制化操作。用户对数据的处理可分为多个步骤进行,每个步骤的数据处理结果可即时查看、验证;并且可重复使用不同算子,直到转换、裁剪出自己需要的数据。

1.4 执行引擎

将配置好的规则算子组应用到执行引擎中,对目标日志数据进行预处理,形成结构化数据,输出到 Kafka,供下游数据使用方使用。系统流程图如下所示:

根据 DBus log 设计原则,同一条原始日志,可以被提取到一个或多个表中。每个表是结构化的,满足相同的 schema 约束。

  • 每个表是一个规则算子组的集合,每个表可以拥有 1 个或多个规则算子组;
  • 每个规则算子组,由一组规则算子组合而成,每个算子具有独立性;

对于任意一条原始数据日志(raw data log),它应该属于哪张表呢?

假如用户定义了若干张逻辑表(T1,T2…),用于抽取不同类型的日志,那么,每条日志需要与规则算子组进行匹配:

  • 进入某张表 T1 的所有规则算子组的执行过程
  • 符合条件的进入规则算子组,并且被执行引擎转换为结构化的表数据
  • 不符合提取条件的日志尝试下一个规则算子组
  • 对于 T1 的所有规则算子组,如果都不满足要求,则进入下一张表 T2 的执行过程,以此类推
  • 如果该条日志不符合任何一张表的过滤规则,则进入_unknown_table_表

例如,对于同一条应用日志,其可能属于不止一个规则组或 Table,而在我们定义的规则组或 Table 中,只要其满足过滤条件,该应用日志就可以被规则组提取,即保证了同一条应用日志可以同属于不同的规则组或 Table。

规则算子是对数据进行过滤、加工、转换的基本单元。常见的规则算子如上图所示。

算子之间具有独立性,算子之间可以任意组合使用,从而可以实现许多复杂的、高级的功能,通过对算子进行迭代使用,最终可以实现对任意数据进行加工的目的。用户可以开发自定义算子,算子的开发非常容易,用户只要遵循基本接口原则,就可以开发任意的算子。

二、DBus 日志处理实例

以 DBus 集群环境为例,DBus 集群中有两台机器(即 master-slave)部署了心跳程序,用于监控、统计、预警等,心跳程序会产生一些应用日志,这些应用日志中包含各类事件信息,假如我们想要对这些日志进行分类处理并结构化到数据库中,我们就可以采用 DBus log 程序对日志进行处理。

DBus 可以接入多种数据源(Logstash、Flume、Filebeat 等),此处以 Logstash 为例来说明如何接入 DBus 的监控和报警日志数据。

由于在 dbus-n2 和 dbus-n3 两台机器上分别存在监控和预警日志,为此我们分别在两台机器上部署了 Logstash 程序。心跳数据由 Logstash 自带的心跳插件产生,其作用是便于 DBus 对数据进行统计和输出,以及对源端日志抽取端(此处为 Logstash)进行预警(对于 Flume 和 Filebeat 来说,因为它们没有心跳插件,所以需要额外为其定时产生心跳数据)。Logstash 程序写入到 Kafka 中的数据中既有普通格式的数据,同时也有心跳数据。

这里不只是局限于 2 台部署有 Logstash 程序的机器,DBus 对 Logstash 数量不做限制,比如应用日志分布在几十上百台机器上,只需要在每台机器上部署 Logstash 程序,并将数据统一抽取到同一个 Kafka Topic 中,DBus 就能够对所有主机的数据进行数据处理、监控、预警、统计等。

2.1 启动 Logstash

在启动 Logstash 程序后,我们就可以从 topic : heartbeat_log_logstash 中读取数据,数据样例如下:

1) 心跳数据

2) 普通日志数据

2.2 配置规则

接下来,我们只需要在 DBus Web 中配置相应的规则就可以对数据进行处理了。

首先新建一个逻辑表 sink_info_table,该表用来抽取 sink 事件的日志信息,然后配置该表的规则组(一个或多个,但所有的规则组过滤后的数据需要满足相同 schema 特性),heartbeat_log_logstash 作为原始数据 topic,我们可以实时的对数据进行可视化操作配置(所见即所得,即席验证)。

1) 读取原始数据日志

可以看到由 Logstash 预先提取已经包含了 log4j 的基本信息,例如 path、@timestamp、level 等。但是数据日志的详细信息在字段 log 中。由于不同的数据日志输出是不一样的,因此可以看到 log 列数据是不同的。

2) 提取感兴趣的列

假如我们对 timestamp、log 等原始信息感兴趣,那么可以添加一个 toIndex 算子,来提取这些字段:

这里需要指出,我们考虑使用数组下标方式,是有原因的:

  • 并不是所有列本身自带列名(例如 flume 抽取的原始数据,或者 split 算子处理后的数据列);
  • 下标方式可以使用数组方式指定列(类似 python 方式,例如:1:3 表示 1,2 列);

因此后续操作全部基于数组下标方式访问。

执行规则,就可以看到被提取后的字段情况:

3) 过滤需要的数据

在这个例子中,我们只对含有“Sink to influxdb OK!”的数据感兴趣。因此添加一个 filter 算子,提取第 7 列中包含”Sink to influxdb OK!”内容的行数据:

执行后,只有符合条件的日志行数据才会存在。

4) 对特定列进行提取

添加一个 select 算子,我们对第 1 和 3 列的内容感兴趣,所以对这两列进行提取。

执行 select 算子,数据中就会只含有第 1 和 3 列了。

5) 以正则表达式的方式处理数据

我们想从第 1 列的数据中提取符合特定正则表达式的值,使用 regexExtract 算子对数据进行过滤。正则表达式如下:http_code=(d*).*type=(.*),ds=(.*),schema=(.*),table=(.*)s.*errorCount=(d*),用户可以写自定义的正则表达式。

执行后,就会获取正则表达式执行后的数据。

6) 选择输出列

最后我们把感兴趣的列进行输出,使用 saveAs 算子,指定列名和类型,方便于保存在关系型数据库中。

执行 saveAs 算子后,这就是处理好的最终输出数据样本。

2.3 查看结构化输出结果

保存上一步配置好的规则组,日志数据经过 DBus 执行算子引擎,就可以生成相应的结构化数据了。目前根据项目实际,DBus 输出的数据是 UMS 格式,如果不想使用 UMS,可以经过简单的开发,实现定制化。

注:UMS 是 DBus 定义并使用的、通用的数据交换格式,是标准的 JSON。其中同时包含了 schema 和数据信息。更多 UMS 介绍请参考 DBus 开源项目主页的介绍。开源地址:https://github.com/bridata/dbus

以下是测试案例,输出的结构化 UMS 数据的样例:

2.4 日志监控

为了便于掌握数据抽取、规则匹配、监控预警等情况,我们提供了日志数据抽取的可视化实时监控界面,如下图所示,可随时了解以下信息:

  • 实时数据条数
  • 错误条数情况(错误条数是指:执行算子时出现错误的情况,帮助发现算子与数据是否匹配,用于修改算子,DBus 同时也提供了日志回读的功能,以免丢失部分数据)
  • 数据延时情况
  • 日志抽取端是否正常

监控信息中包含了来自集群内各台主机的监控信息,以主机 IP(或域名)对数据分别进行监控、统计和预警等。

监控中还有一张表叫做_unkown_table_ 表明所有没有被匹配上的数据条数。例如:Logstash 抓取的日志中有 5 种不同事件的日志数据,我们只捕获了其中 3 种事件,其它没有被匹配上的数据,全部在_unkown_table_计数中。

DBus 同样可以接入 Flume、Filebeat、UMS 等数据源,只需要稍作配置,就可以实现类似于对 Logstash 数据源同样的处理效果,更多关于 DBus 对 log 的处理说明,请参考:

  • https://bridata.github.io/DBu…
  • https://bridata.github.io/DBu…
  • https://bridata.github.io/DBu…

应用日志经过 DBus 处理后,将原始数据日志转换为了结构化数据,输出到 Kafka 中提供给下游数据使用方进行使用,比如通过 Wormhole 将数据落入数据库等。具体如何将 DBus 与 Wormhole 结合起来使用,请参考:如何设计实时数据平台(技术篇)。

作者:仲振林

退出移动版