关于Flink:360-政企安全集团基于-Flink-的-PB-级数据即席查询实践

27次阅读

共计 7666 个字符,预计需要花费 20 分钟才能阅读完成。

本文整顿自 360 政企平安团体的大数据工程师苏军以及刘佳在 Flink Forward Asia 2020 分享的议题《基于 Flink 的 PB 级数据即席查问实际》,文章内容为:

  1. Threat Hunting 平台的架构与设计(苏军)
  2. 以升高 IO 为指标的优化与摸索(刘佳)
  3. 将来布局

首先做一个简略的集体以及团队介绍。咱们来自 360 政企平安团体,目前次要从事 360 平安大脑的“威逼狩猎“我的项目的开发工作。咱们团队接触 Flink 的工夫比拟早,在此期间,咱们基于 Flink 开发出了多款产品,并在 2017 年和 2019 年加入了于柏林举办的 Flink Forward 大会,别离介绍了咱们的“UEBA”以及“AutoML”两款产品。

本次分享次要分为两块内容:

  • 第一局部“Threat Hunting 平台的架构与设计”将由苏军来为大家分享;
  • 第二局部“以升高 IO 为指标的优化与摸索”将由刘佳来为大家分享。

一、Threat Hunting 平台的架构与设计 (苏军)

第一局部内容大抵分为三个局部,别离是:

  • 平台的演进
  • 架构设计
  • 深刻摸索索引构造

1. 平台的演进

咱们认为所有技术的演变和变革都须要具体的商业问题来驱动,以下是咱们团队近几年基于 Flink 开发的几款产品:

  • 2017 年咱们基于 Flink DataStream 开发了用户行为剖析零碎 UEBA,它是通过接入企业 IT 拓扑的各类行为数据,比方身份认证数据、利用零碎拜访数据、终端平安数据、网络流量解析数据等等,以用户 / 资产为外围来进行威逼行为的实时检测,最初构建出用户威逼等级和画像的零碎;
  • 2018 年基于 UEBA 的施行教训,咱们发现平安剖析人员往往须要一种伎俩来获取安全事件对应的原始日志,去进一步确认平安威逼的源头和解决形式。于是咱们基于 Spark 开发了 HQL 来解决在离线模式下的数据检索问题,其中 HQL 能够认为是表达能力比 SQL 更加丰盛的查询语言,大抵能够看作是在 SQL 能力的根底上减少了算法类算;
  • 2019 年随着离线 HQL 在客户那边的应用,咱们发现其自身就可能疾速定义平安规定,构建威逼模型,如果在离线模式下写完语句后间接公布成在线工作,会大大缩短开发周期,加上 Flink SQL 能力绝对欠缺,于是咱们基于 Flink SQL + CEP 来降级了 HQL 的能力,产生了 HQL RealTime 版本;
  • 2020 年随着客户数据量的增大,很多曾经达到了 PB 级,过往的解决方案导致离线的数据检索性能远远低于预期,平安剖析人员习惯应用 like 和全文检索等含糊匹配操作,造成查问延时十分大。于是从往年开始,咱们着重优化 HQL 的离线检索能力,并推出了全新的 Threat Hunting 平台。

通过考察发现,领有 PB 级数据规模的客户往往有以下几个商业需要:

  • 第一是低成本的云原生架构。咱们晓得目前大部分的大数据架构都是基于 hadoop 的,其特点是数据就在计算节点上,可能缩小大量网络开销,减速计算性能。然而整个集群为了做到资源平衡,往往须要雷同的资源配置,且为了可能存储尽量多的数据,集群规模会很大, 所以这类架构在后期须要投入大量硬件老本。

    而存算拆散和弹性计算则可能解决这一问题,因为磁盘的价格是远低于内存和 CPU 的,所以用便宜的磁盘存储搭配低配 CPU 和内存来存储数据,用大量高配机器来做计算,能够在很大水平上降低成本。

  • 第二是低延时的查问响应。平安剖析人员在做威逼检测时,大部分工夫是即席查问,即通过过滤、join 来做数据的检索和关联。为了可能尽快的获取查问后果,对应的技术计划是:列存 / 索引 / 缓存。

    • 列存不必多说了,是大数据畛域常见的存储计划;
    • 在列存的根底上,高效的索引计划可能大量升高 io,进步查问性能;
    • 而存算剖析带来的网络延时能够由分布式缓存来补救。
  • 第三是须要丰盛的查问能力,其中包含单行的 fields/filter/udf 等,多行的聚合 /join,甚至算法类的剖析能力,这部分咱们次要依赖于本人开发的剖析语言 HQL 来提供。

2. 架构设计

首先,数据是来自于曾经存储在 ES 中的历史数据和 kafka 里的实时数据,其中 ES 里的历史数据咱们通过本人开发的同步工具来同步,kafka 里的实时数据咱们则通过 Streaming File Sink 写 orc 文件到存储集群。在数据同步的同时,咱们会将这批数据的索引信息更新到数据库中。

平安剖析人员会从前端页面通过写交互式剖析语言 HQL 发动数据检索的申请,此时申请会进入调度零碎,一旦开始执行作业,首先会将剖析语句解析成算子列表,算子缓存算法会判断该次查问是否能够命中缓存零碎中已有的缓存数据。

  • 如果剖析语句的输出是曾经算好并且 cache 好了的两头后果,那么间接读取缓存来持续计算;
  • 如果不能命中,证实咱们必须从 orc 文件开始从新计算。

咱们会先提取出查询语言的过滤条件或者是 Join 条件来做谓词下推,进入索引数据库中取得目前合乎该查问的文件列表,随后将文件列表交给计算引擎来进行计算。计算引擎咱们采纳双引擎模式,其中复杂度高的语句咱们通过 Flink 引擎来实现,其它较为简单的工作咱们交给平台外部的“蜂鸟引擎”。“蜂鸟引擎”基于 Apache arrow 做向量化执行,加上 LLVM 编译,查问提早会十分小。

因为整个零碎的存算拆散,为了减速数据读取,咱们在计算集群节点上减少了 alluxio 来提供数据缓存服务,其中不仅缓存 remote cluster 上的数据,同时会缓存局部历史作业后果,通过算子缓存的算法来减速下次计算工作。

这里还须要强调两点:

  • 第一点是索引数据库会返回一批合乎该条件的文件列表,如果文件列表十分大的话,以后的 Flink 版本在构建 job graph 时,在获取 Filelist Statistics 逻辑这里在遍历大量文件的时候,会造成长时间无奈构建出 job graph 的问题。目前咱们对其进行了修复,前期会奉献给社区。
  • 第二点是数据缓存那一块,咱们的 HQL 之前是通过 Spark 来实现的。用过 Spark 的人可能晓得,Spark 会把一个 table 来做 cache 或 persist。咱们在迁徙到 Flink 的时候,也沿用了这个算子。Flink 这边咱们本人实现了一套,就是用户在 cache table 时, 咱们会把它注册成一个全新的 table source,前面在从新读取的时候只会用这个新的 table source 来买通整个流程。

3. 深刻摸索索引构造

数据库为了减速数据检索,咱们往往会当时为数据创立索引,再在扫描数据之前通过索引定位到数据的起始地位,从而减速数据检索。而传统数据库常见的是行索引,通过一个或若干字段创立索引,索引后果以树形构造存储,此类索引可能准确到行级别,索引效率最高。

某些大数据我的项目也反对了行索引,而它所带来的弊病就是大量的索引数据会造成写入和检索的延时。而咱们平台解决的是机器数据,例如终端 / 网络这类数据,它的特点是反复度十分高,而平安剖析的后果往往非常少,极少数的威逼行为会暗藏在海量数据里,占比往往会是 1/1000 甚至更少。

所以咱们抉择性价比更高的块索引计划,曾经可能撑持目前的利用场景。目前通过客户数据来看, 索引可能为 85% 的语句提供 90% 以上的裁剪率,根本满足延时要求。

某些大数据平台是将索引数据以文件的模式存储在磁盘上,外加一些 cache 机制来减速数据拜访,而咱们是将索引数据间接存在了数据库中。次要有以下两个方面的思考:

  • 第一是 transaction。咱们晓得列存文件往往是无奈 update 的,而咱们在定期优化文件散布时会做 Merge File 操作,为了保障查问一致性,须要数据库提供 transaction 能力。
  • 第二是性能。数据库领有较强的读写和检索能力,甚至能够将谓词下推到数据库来实现,数据库的高压缩比也能进一步节俭存储。

上图为块索引的设计。在咱们的索引数据库中,咱们把这些数据分为不同类别数据源,比方终端数据为一类数据源,网络数据为一类数据源,咱们分类数据源的逻辑是他们是否领有对立的 Schema。就单个数据源来说,它以日期作为 Partition,Partition 外部是大量的 ORC 小文件,具体到索引构造,咱们会为每一个字段建 min/max 索引,基数小于 0.001 的字段咱们建 Bloom 索引。

上文提到过,平安人员比拟喜爱用 like 和全文检索。对于 like 这一块,咱们也做了一些优化。全文检索方面,咱们会为数据来做分词,来构建倒排索引,同时也会对于单个分词过后的单个 item 来做文件散布层面的位图索引。

上图是一个索引大小的大抵的比例假如,JSON 格局的原始日志大有 50PB,转化成 ORC 大略是 1PB 左右。咱们的 Index 数据是 508GB,其中 8GB 为 Min/Max 索引,500GB 为 Bloom。加上上文提到的位图以及倒排,这个索引数据的占比会进一步加大。基于此,咱们采纳的是分布式的索引计划。

咱们晓得日志是在一直的进行变动的,对于有的数据员来说,他有时会减少字段或者缩小字段,甚至有时字段类型也会发生变化。

那么咱们采取这种 Merge Schema 模式计划,在文件增量写入的过程中,也就是在更新这批数据的索引信息的同时来做 Schema Merge 的操作。如图所示,在 block123 中,文件 3 是最初一个写入的。随着文件的一直写入,会组成一个全新的 Merge Schema。能够看到 B 字段和 C 字段其实是历史字段,而 A_V 字段是 A 字段的历史版本字段,咱们用这种形式来尽量多的让客户看到比拟全的数据。最初基于本人开发的 Input format 加 Merge Schema 来构建一个新的 table source,从而买通整个流程。

二、以升高 IO 为指标的优化与摸索 (刘佳)

上文介绍了为什么要抉择块索引,那么接下来将具体介绍如何应用块索引。块索引的外围能够落在两个字上:“裁剪”。裁剪就是在查问语句被真正执行前就将无关的文件给过滤掉,尽可能减少进入计算引擎的数据量,从数据源端进行节流。

这张图展现了整个零碎应用 IndexDB 来做裁剪流程:

  • 第一步是解析查问语句。获取到相干的 filter,能够看到最右边的 SQL 语句中有两个过滤条件, 别离是 src_address = 某个 ip,occur_time > 某个工夫戳。
  • 第二步将查问条件带入 Index DB 对应数据源的 meta 表中去进行文件筛选。src_address 是字符串类型字段,它会联结应用 min/max 和 bloom 索引进行裁剪。occur_time 是数值类型字段并且是工夫字段,咱们会优先查找 min/max 索引来进行文件裁剪。须要强调的是, 这里咱们是将用户写的 filter 封装成了 index db 的查问条件,间接将 filter pushdown 到数据库中实现。
  • 第三步在获取到文件列表后,这些文件加上后面提到的 merged schema 会独特结构成一个 TableSource 来交给 Flink 进行后续计算。

同时,构建 source 的时候,咱们在细节上做了一些优化。比方在将 filter 传给 ORC reader 的时候,革除掉曾经 pushdown 了的 filter,防止在引擎侧进行二次过滤。当然, 这里并不是将所有 filter 都革除掉了,咱们保留了 like 表达式,对于 like 的 filter pushdown 会在后文介绍。

接下来着重介绍一下四大优化点:

  • 第一点,数据在未排序的状况下,裁剪率是有实践下限的,咱们通过在数据写入的时候应用 hilbert 曲线排序原始数据来晋升裁剪率;
  • 第二点,因为平安畛域的特殊性,做威逼检测重大依赖 like 语法,所以咱们对 orc api 进行了加强,使其反对了 like 语法的下推;
  • 第三点,同样是因为应用场景重大依赖 join,所以咱们对 join 操作也做了相应的优化;
  • 第四点,咱们的零碎底层反对多种文件系统,所以咱们选取 Alluxio 这一成熟的云原生数据编排零碎来做数据缓存,进步数据的拜访局部性。

1. 裁剪率的实践下限及 Hilbert 空间填充曲线

裁剪能够形象成 N 个球扔进 M 个桶的概率问题,在这里咱们间接说论断。假如行在块中随机均匀分布,所有块的总行数固定,查问条件命中的总行数也固定,则块命中率间接与“命中的总行数 / 总块数”正相干。

论断有两个:

  • 第一点,如果命中总行数 = 总块数,即 X 轴值为 1 的时候,命中率为 2/3,也就是 2/3 的块,都蕴含命中的行,对应的块修剪率的下限是 1/ 3。1/3 是一个很低数值,然而因为它的前提是数据随机均匀分布,所以为了让数据分布更好,咱们须要在数据写入时对原始数据进行排序。
  • 第二点,假如命中总行数固定,那么大幅度缩小每块中的行数来减少总块数,也能晋升块修剪率。所以咱们放大了块大小。依据测试后果,咱们设定每个文件的大小为:16M。放大文件大小是很简略的。针对排序,咱们引入了 hilbert 空间填充曲线。

为什么应用 hilbert 曲线?次要是基于两点:

  • 首先是,以什么门路遍历 2 维空间,使门路的地址序列对其中任一维度都根本有序?为什么要对每一列或者说子集都有序?因为零碎在应用的过程中,查问条件是不固定的。数据写入时排序用到了 5 个字段,查问的时候可能只用到了其中的一个或两个字段。Hilbert 排序能让多个字段做到既整体有序,又部分有序。
  • 另外,空间填充曲线有很多,还有 Z 形曲线、蛇形曲线等等,大家能够看看左边这两张比照图。直观的看,曲线门路的长跨度跳跃越少越好,点的地位在迭代过程中越稳固越好。而 hilbert 曲线在空间填充曲线外面综合体现最好。

hilbert 用法,就是实现一个 UDF,输出列值,输入坐标值,而后依据坐标值排序。

咱们抽样了客户环境所应用的 1500 条 SQL 语句,过滤掉了其中裁剪率为分之 100% 的相干语句,也就是没有命中文件的有效语句。而后还剩下 1148 条,咱们应用这些语句做了裁剪率排序后,对裁剪率进行了比照,裁剪率 95 百分位从之前的 68% 晋升到了 87%,晋升了 19%。可能大家会感觉 19% 这个数值不是特地高,但如果咱们带上一个基数,比如说 10 万个文件,这样看的话就会很可观了。

2. 字典索引上 Like 的优化

之前也有讲到平安行业的特殊性,咱们做威逼检测的时候会重大依赖 like 查问。鉴于此,咱们也对它做了优化。

  • 首先咱们为 ORC api 增加了 like 条件表达式,保障 SQL 中的 like 能下推到 orc record reader 中。
  • 其次,重构了 orc record reader 的 row group filter 逻辑,如果发现是 like 表达式,首先读取该字段的 dict steam,判断 dict stream 是否蕴含 like 指标字符串,如果字典中不存在该值,间接跳过该 row group,不必读取 data stream 和 length steam,能大幅提高文件读取速度。前期咱们也思考构建字典索引到索引数据库中,间接将字典过滤 pushdown 到数据库中实现。

例如图上所示,最右边的 SQL 中有三个表达式。前两个在上文中曾经提到了,是将 filter 间接 pushdown 到 index db 中实现,咱们交给 orc reader 的 filter 只有最初一个 attachment_name like ‘% 招标 %’,真正须要读取的记录只是 dict 蕴含”招标“的 row group,也就是做到了 row group 级别的过滤,进一步缩小了须要进入计算引擎的数据量。

3. 基于索引对 join 的优化

威逼情报的匹配中大量应用 join 操作,如果要减速 join 的性能,仅仅是 where 条件的 filter pushdown 是远远不够的。

Flink 中曾经内置了许多 join 算法,比方 broadcast join, hash join 和 sort merge join。其中,sort merge join 对事后排好序的表 join 十分敌对,而上文有提到咱们应用 Hilbert 曲线来对多字段进行联结排序,所以 sort merge join 临时不在咱们的优化范畴之内。

另外,咱们晓得 join 的性能和左右表的大小正相干,而威逼情报 join 的稠密度十分高,所以当时对左右表做裁剪,可能大幅缩小进入 join 阶段的数据。

上文提到过咱们曾经为常见字段建设了 bloom 索引。那么利用这些曾经创立好的 bloom,来进行文件预过滤,就变得牵强附会,并且省掉了构建 bloom 的工夫开销。

对于 broadcast join,咱们间接扫描小表,将小表记录顺次进入大表所属文件的 bloom,判断该数据块是否须要, 对数据量大的表做预裁剪。

对于 hash join,正如咱们看到的,咱们能够事后对 join key 的文件级 bloom 做“预 join”操作,具体就是将左表所属的某个文件的 bloom 顺次与右表所属文件的 bloom 做“与”操作,只保留左右表能”与后后果条数不为 0“的文件,再让各表残余的文件进入引擎做后续计算。

比如说图上的这三张表,别离是 table1、table2 和 table3。咱们能够从 index DB 中获取到表的统计信息,也就是文件个数或者说是文件表的大小。图上就间接列的是文件个数:table 1 是 1000 个,而后 table 2 是 5 万个文件,table 3 是 3 万个文件。

咱们就是参照上一张图片外面的逻辑进行预 join,而后预估 join 的老本。咱们会让成本低的预 join 先进行,这样的话就可能大幅度缩小两头后果,晋升 join 的效率。

4. Alluxio 作为对象存储的缓存

因为底层文件存储系统的多种多样,所以咱们选取了 Alluxio 数据编排零碎,Alluxio 的长处是让数据更凑近计算框架,利用内存或者 SSD 多级缓存机制减速文件拜访,如果在齐全命中 cache 的状况下,可能达到内存级 IO 的文件访问速度,缩小间接从底层文件系统读文件的频次,很大水平上缓解了底层文件系统的压力。

对咱们零碎来说就是它带来了更高的并发,而且对低裁剪率的查问更敌对,因为低裁剪率的话就意味着须要读取大量的文件。

如果这些文件在之前的查问中曾经被 load 到 cache 外面,就可能大幅度的晋升查问速度。

在做完这些优化当前,咱们做了性能比照测试。咱们选取了一个规模为 249TB 的 es 集群。它应用了 20 台服务器,Flink 应用了两台服务器,为了在图标上看到更直观的比照成果,咱们选取了 16 条测试后果。

图表上红橙色的是 es,蓝色的是 HQL 优化前,绿色的是 HQL 优化后。下面的数字标签是与 es 相比,HQL 的性能差值。比方第一个标签就意味着 HQL 的性能五倍于 es,其中 6 号和 7 号比 es 慢,次要是因为 HQL 是块索引,es 是行索引,全在内存外面,所以能够做到超快的检索速度。13 号是因为 HQL 在应用 not equal 的状况下,裁剪率绝对较差。

总体说,优化成果是很显著的,大部分语句在与 es 查问速度相比是持平甚至略优的。齐全满足客户对长周期数据存储和查问的冀望。

三、将来布局

上图是将来布局。因为客户现场常常会波及到很多的 BI Dashboard 运算和长周期运算报告的需要,所以咱们下一步会思考做 BI 估算,以及苏军提到的容器化和 JVM 预热,当然还有对标 es,以及晋升多用户并发查问的能力。


更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

正文完
 0