乐趣区

关于flink:网易游戏基于-Flink-的流式-ETL-建设

网易游戏资深开发工程师林小铂为大家带来网易游戏基于 Flink 的流式 ETL 建设的介绍。内容包含:

  1. 业务背景
  2. 专用 ETL
  3. EntryX 通用 ETL
  4. 调优实际
  5. 将来布局

一. 业务背景

网易游戏 ETL 服务详情

网易游戏的根底数据次要日志形式采集,这些日志通常是非结构化或半结构化数据,须要通过数据集成 ETL 才能够入库至实时或离线的数据仓库。尔后,业务用户才能够不便地用 SQL 实现大部分数据计算,包含实时的 Flink SQL 和离线的 Hive 或 Spark。

网易游戏数据集成的数据流与大多数公司大同小异,次要有游戏客户端日志、游戏服务端日志和其余周边根底的日志,比方 Nginx access log、数据库日志等等。这些日志会被采集到对立的 Kafka 数据管道,而后经由 ETL 入库服务写入到 Hive 离线数据仓库或者 Kafka 实时数据仓库。

这是很常见的架构,但在咱们在需要方面是有一些比拟非凡的状况。

网易游戏流式 ETL 需要特点

首先,不同于互联网、金融等行业根本罕用 MySQL、Postgres 等的关系型数据库,游戏行业经常应用 MongoDB 这类 schema-free 的文档型数据库。这给咱们 ETL 服务带来的问题是并没有一个线上业务的精确的 schema 能够依赖,在理论数据处理中,多字段或少字段,甚至一个字段因为玩法迭代变更为齐全不同的格局,这样的状况都是可能产生的。这样的数据异构问题给咱们 ETL 的数据荡涤带来了比拟高的老本。

其次,也是因为数据库选型的起因,大部分业务的数据库模式都遵循了反范式设计,会刻意以简单内嵌的字段来防止表间的 join。这种状况给咱们带来的一个益处是,在数据集成阶段咱们不须要去实时地去 join 多个数据流,害处则是数据结构可能会非常复杂,多层嵌套非常常见。

而后,因为近年来实时数仓的风行,咱们也同样在逐渐建设实时数据仓库,所以复用现有的 ETL 管道,提取转换一次,加载到实时离线两个数据仓库,成为一个很天然的倒退方向。

最初,咱们的日志类型多且变更频繁,比方一个玩法简单的游戏,可能有 1,000 个以上的日志类型,每两周可能就会有一次发版。在这样的背景下 ETL 出现异常数据是不可避免的。因而咱们须要提供欠缺的异样解决,让业务能够及时得悉数据异样和通过流程修复数据。

日志分类及特点

为了更好地针对不同业务应用模式优化,咱们对不同日志类型的业务提供了不同的服务。咱们的日志通常分为三个类型:经营日志、业务日志和程序日志。

经营日志记录的是玩家行为事件,比方登录帐号、支付礼包等。这类日志是最为重要日志,有固定的格局,也就是特定 header + json 的文本格式。数据的主要用途是做数据报表、数据分析还有游戏内的举荐,比方玩家的组队匹配举荐。

业务日志记录的是玩家行为以外的业务事件,这个就比拟宽泛,比方 Nginx access log、CDN 下载日志等等,这些齐全没有固定格局,可能是二进制也可能是文本。主要用途相似于经营日志,但更加丰盛和定制化。

程序日志记录是程序的运行状况,也就是平时咱们通过日志框架打的 INFO、ERROR 这类日志。程序日志主要用途是检索定位运行问题,通常是写入 ES,但有时数量过大或者须要提取指标剖析时,也会写入数据仓库。

网易游戏 ETL 服务分析

针对这些日志分类,咱们具体提供了三类 ETL 入库的服务。首先是经营日志专用的 ETL,这会依据经营日志的模式进行定制化。而后是通用的面向文本日志的 EntryX ETL 服务,它会服务于经营日志以外的所有日志。最初是 EntryX 无奈反对的非凡 ETL 需要,比方有加密或者须要进行非凡转换的数据,这种状况下咱们就会针对性地开发 ad-hoc 作业来解决。

二. 经营日志专用 ETL

经营日志 ETL 倒退历程

经营日志 ETL 服务有着一个比拟久的历史。大略在 2013 年,网易游戏就建设了基于 Hadoop Streaming + Python 预处理 / 后处理的第一版离线 ETL 框架。这套框架是安稳运行了多年。

在 2017 年的时候,随着 Spark Streaming 的锋芒毕露,咱们开发了基于 Spark Streaming 的第二个版本,相当于一个 POC,但因为微批调优艰难且小文件多等问题没有上线利用。

工夫来到 2018 年,过后 Flink 曾经比拟成熟,咱们也决定将业务迁徙到 Flink 上,所以咱们很天然地开发了基于 Flink DataStream 的第三版经营日志 ETL 服务。这外面比拟非凡的一点就是,因为长久以来咱们业务方积攒了很多 Python 的 ETL 脚本,而后新版最重要的一点就是要反对这些 Python UDF 的无缝迁徙。

经营日志 ETL 架构

接下来看下两个版本的架构比照。

在晚期 Hadoop Streaming 的版本外面,数据首先会被 dump 到 HDFS 上,而后 Hadoop Streaming 启动 Mapper 来读取数据并通过规范输出的形式传递给 Python 脚本。Python 脚本外面会分为三个模块:首先预处理 UDF,这里通常会进行基于字符串的替换,个别用作规范化数据,比方有些海内单干厂商的工夫格局可能跟咱们不同,那么就能够在这里进行对立。预处理完的数据会进入通用的解析 / 转换模块,这里咱们会依据经营日志的格局来解析数据,并进行通用转换,比方滤掉测试服数据。通用模块之后,最初还有一个后处理模块进行针对字段的转换,比方常见的汇率转换。之后数据会通过规范输入返回给 Mapper,而后 Mapper 再将数据批量写到 Hive 目录中。

咱们用 Flink 重构后,数据源就由 HDFS 改为间接对接 Kafka,而 IO 模块则用 Flink 的 Source/Sink Operator 来代替本来的 Mapper,而后两头通用模块能够间接重写为 Java,残余的预处理和后处理则是咱们须要反对 Python UDF 的中央。

Python UDF 实现

在具体实现上,咱们在 Flink ProcessFunction 之上退出了 Runner 层,Runner 层负责跨语言的执行。技术选型上是选了 Jython,而没有抉择 Py4j,次要因为 Jython 能够间接在 JVM 外面去实现计算,不须要额定启动 Python 过程,这样开发和运维治理老本都比拟低。而 Jython 带来的限度,比方不反对 pandas 等基于 c 的库,这些对于咱们的 Python UDF 来说都是可承受的。

整个调用链是,ProcessFunction 在 TaskManager 被调用时会在 open 函数提早初始化 Runner,这是因为 Jython 是不可序列化的。Runner 初始化时会负责资源筹备,包含将依赖的模块退出 PYTHONPATH,而后依据配置反射调用 UDF 函数。

调用时,对于预处理 UDF Runner 会把字符串转化为 Jython 的 PyUnicode 类型,而对于后处理 UDF 则会把解析后的 Map 对象转为 Jython 的 PyDcitionary,别离作为两者的输出。UDF 能够调用其余模块进行计算,最终返回 PyObject,而后 Runner 再将其转换成 Java String 或者 Map,返回给 ProcessFunction 输入。

经营日志 ETL 运行时

刚刚是 UDF 模块的部分视图,咱们再来看下整体的 ETL 作业视图。首先在咱们提供了通用的 Flink jar,当咱们生成并提交 ETL 作业到作业平台时,调度器会执行通用的 main 函数构建 Flink JobGraph。这时会从咱们的配置核心,也就是 ConfigServer,拉取 ETL 配置。ETL 配置中蕴含应用到的 Python 模块,后端服务会扫描其中援用到的其余模块,把它们对立作为资源文件通过 YARN 散发性能上传到 HDFS 上。在 Flink JobManager 和 TaskManager 启动时,这些 Python 资源会被 YARN 主动同步到工作目录上备用。这就是整个作业初始化的过程。

而后因为 ETL 规定的小变更是很频繁的,比方新增一个字段或者变更一下过滤条件,如果咱们每次变更都须要重启作业,那么作业重启带来的不可用工夫会对咱们的上游用户造成比拟蹩脚的体验。因而,咱们对变更进行了分类,对于一些不影响 Flink JobGraph 的轻量级变更反对热更新。实现的形式是每个 TaskManager 启动一个热更新线程,定时轮询配置核心同步配置。

三. EntryX 通用 ETL

接下来介绍咱们的通用 ETL 服务 EntryX。这里的通用能够分为两层意义,首先是数据格式上的通用,反对非结构化到结构化的各种文本数据,其次是用户群体的通用,指标用户笼罩数据分析、数据开发等传统用户,和业务程序、策动这些数据背景较弱的用户。

EntryX 基本概念

先介绍 EntryX 的三个基本概念,Source、StreamingTable 和 Sink。用户须要别离配置这个三个模块,零碎会依据这些主动生成 ETL 作业。

Source 是 ETL 作业的输出源,通常是从业务端采集而来的原始日志 topic,或者是通过散发过滤后的 topic。这些 topic 可能只蕴含一种日志,但更多状况下会蕴含多种异构日志。

接下来 StreamingTable,一个比拟艰深的名称就是流表。流表定义了 ETL 管道的次要元数据,包含如何转换数据,还有依据转换好的数据定义的流表 schema,将数据 schema 化。流表 schema 是最为要害的概念,它相当于 Table DDL,次要包含字段名、字段数据类型、字段束缚和表属性等。为了更不便对接上下游,流表 schema 应用的是自研的 SQL-Like 的类型零碎,外面会反对咱们一些拓展的数据类型,比方 JSON 类型。

最初 Sink 负责流表到指标存储的物理表的映射,比方映射到指标 Hive 表。这里次要须要 schema 的映射关系,比方流表哪个字段映射到指标表哪个字段,流表哪个字段用作指标 Hive 表分区字段。在底层,零碎会主动依据 schema 映射关系来提取字段,并将数据转换为指标表的存储格局,加载到指标表。

EntryX ETL 管道

再来看下 EntryX ETL 管道的具体实现。蓝色局部是内部存储系统,而绿色局部则是 EnrtyX 的外部模块。

数据首先从对接采集的原始数据 Topic 流入,通过 Source 摄入到 Filter。Filter 负责依据关键词过滤数据,通常来说咱们要求过滤完的数据是有雷同 schema 的。通过这两步数据实现 Extract,来到 Transform 阶段。

Transform 第一步是解析数据,也就是这里的 Parser。Parser 反对 JSON/Regex/Csv 三种解析,根本能够笼罩所有案例。第二步是对数据进行转换,这是由 Extender 负责的。Extender 通过内置函数或 UDF 计算衍生字段,最常见的是将 JSON 对象拉平开展,提取出内嵌字段。最初是 Formatter,Formatter 会依据之前用户定义的字段逻辑类型,将字段的值转为对应的物理类型。比方一个逻辑类型为 BIGINT 的字段,咱们在这里会对立转为 Java long 的物理类型。

数据实现 Transform 之后来到最初的 Load 阶段。Load 第一步是决定数据应该加载到哪个表。Splitter 模块会依据每个表的入库条件(也就是一个表达式)来分流数据,而后再到第二步的 Loader 来负责将数据写到具体的内部存储系统。目前咱们反对 Hive/Kafka 两种存储,Hive 反对 Text/Parquet/JSON 三种格局,而 Kafka 反对 JSON 和 Avro 两种格局。

实时离线对立 Schema

在 Entryx 的设计里数据能够被写入实时和离线两个数据仓库,也就是说同一份数据,但在不同的存储系统中以不同格局示意。从 Flink SQL 的角度来说是 schema 局部雷同,但 connector 和 format 不同的两个表。而 schema 局部常常会随业务变更,而 connector 和 format(也就是存储系统和存储格局)是绝对稳固的。那么一个很天然的想法就是,能不能将 schema 局部提取进去独立保护?实际上,这个形象的 schema 曾经存在了,就是咱们在 ETL 提取的流表 schema。

在 EntryX 外面,流表 schema 是与序列化器、存储系统无关的 schema,作为 Single Source of Truth。基于流表 schema,加上存储系统信息和存储格局信息,咱们就能够衍生出具体的物理表的 DDL。目前咱们次要是反对 Hive/Kafka,如果之后要拓展至反对 ES/HBase 表也是十分不便。

实时数据仓库集成

EntryX 一个重要的定位是作为实时仓库的对立入口。刚刚其实曾经屡次提到 Kafka 表,但还没有说实时数仓是怎么做的。实时数仓的常见问题是 Kafka 并没有原生反对 schema 元数据的长久化。目前社区的支流解决方案是基于 Hive MetaStore 来保留 Kafka 表的元数据,并复用 HiveCatalog 来间接对接到 Flink SQL。

但这对于咱们来说应用 Hive MetaStore 次要有几个问题:一是在实时作业里引入 Hive 依赖并与 Hive 耦合,这是很重的依赖,导致定义的表很难被其余组件复用,包含 Flink DataStream 用户;二是咱们曾经有 Kafka SaaS 平台 Avatar 来治理物理 schema,比方 Avro schema,如果再引入 Hive MetaStore 会导致元数据的割裂。因而,咱们是拓展了 Avatar 平台的 schema 注册核心,同时反对逻辑 schema 和物理 schema。

那么实时数仓和 EntryX 的集成关系是:首先咱们有 EntryX 的流表 schema,在新建 Sink 的时候调用 Avatar 的 schema 接口,依据映射关系生成逻辑 schema,而 Avatar 再依据 Flink SQL 类型与物理类型的映射关系生成 topic 的物理 schema。

与 Avatar schema 注册核心配套的还有咱们自研的 KafkaCatalog,它负责读取 topic 的逻辑和物理 schema 来生成 Flink SQL 的 TableSource 或 TableSink。而对于一些 Flink SQL 以外的用户,比方 Flink DataStream API 的用户,他们也能够间接读取物理 schema 来享受到数据仓库的便当。

EntryX 运行时

和经营日志 ETL 相似,在 EntryX 运行时,零碎会基于通用的 jar 和配置生成 Flink 作业,但这里有两种状况须要特地解决。

首先是一个 Kafka topic 往往有几十甚至上千种日志,那么对应其实有也几十甚至上千的流表,如果每个流表都独自运行在一个作业里,那么一个 topic 会可能会被读上千遍,这是十分大的节约。因而,在作业运行时提供一个优化策略,能够将同个 source 的不同流表合并到一个作业里跑。比方图中,某个手游上传了 3 种日志到 Kafka,用户别离配置了玩家注册、玩家登录、支付礼包三个流表,那么咱们能够这三个流表合并起来到一个作业,共享同一个 Kafka Source。

另外的一个优化是,个别状况下咱们能够依照之前“提取转换一次,加载一次”的思路来将数据同时写到 Hive 和 Kafka,然而因为 Hive 或者说 HDFS 毕竟是离线零碎,实时性比拟差,写入在一些负载比拟高的 HDFS 老集群常常会呈现反压,同时阻塞上游,导致 Kafka 的写入也受到影响。在这种状况下,咱们通常要拆散加载到实时和离线的 ETL 管道,具体会取决于业务的 SLA 还有 HDFS 的性能。

四. 调优实际

接下来给大家分享下咱们在 ETL 建设中的调优实践经验。

HDFS 写入调优

首先是 HDFS 写入的调优。流式写入 HDFS 场景中陈词滥调的一个问题便是小文件过多。通常来说小文件和实时性是鱼与熊掌不可兼得。如果要提早低,那么咱们须要频繁地滚动文件来提交数据,必然导致小文件过多。

小文件过多次要造成两个问题:一从 HDFS 集群治理角度看,小文件会占用大量的文件数和 block 数,节约 NameNode 内存;二是从用户角度看,读写效率都会升高,因为写的时候要更频繁地调用 RPC 和 flush 数据,造成更多的阻塞,有时甚至造成 checkpoint 超时,而读时则须要关上更多的文件能力读完数据。

HDFS 写入调优 – 数据流预分区

咱们在优化小文件问题时做的一点调优是对数据流先做一遍预分区,具体来说,便是在 Flink 作业外部先基于指标 Hive 表进行一次 keyby 分区,让同一个表的数据尽量集中在多数的几个 subtask 上。

举个例子,假如 Flink 作业并行度为 n,而指标 Hive 分区数为 m 个。因为每个 subtask 都有可能读到任意分区的数据,在默认的各 subtask 齐全并行的状况下,每个 subtask 都会写所有分区,造成总体的写入文件数是 n * m。假如 n 是 100,m 是 1000,按 10 分钟滚一次文件算,每天会造成 14,400,000 个文件,这对于很多老集群来说是十分大的压力。

如果通过数据流分区的优化之后,咱们就能够限制住 Flink 并行度带来的增长。比方咱们 keyby hive 表字段,并退出范畴为 0-s 整数的盐来防止数据歪斜,那么分区最多会被 s 个 subtask 读写。假如 s 是 5,比起原先 n 是 100,那么咱们就将本来的文件数升高为原来 20 分之一。

基于 OperatorState 的 SLA 统计

第二个我想分享的是咱们的 SLA 统计工具。背景是咱们的用户常常会通过 Web UI 来进行调试和问题的排查,比方不同 subtask 的输入输出数目,但这些 metric 会因为作业重启或者 failover 而重置,因而咱们开发了基于 OperatorState 的 SLA-Utils 工具来统计数据的输出和分类输入。这个工具设计得十分轻量级,能够很容易集成到咱们本人的服务或者用户的作业外面。

在 SLA-Utils 外面,咱们反对了三种 metric。首先是规范的 metric,有 recordsIn/recordsOut/recordsDropped/recordsErrored,别离对应输出记录数 / 失常输入记录数 / 被过滤掉的记录数 / 解决异样的记录数。通常来说 recordsIn 就等于前面三者的总和。第二种用户能够自定义的 metric,通常能够用于记录更具体的起因,比方是 recordsEventTimeDropped 代表数据是因为 event time 被过滤的。

那么上述两种 metric 动态的,也就是说 metric key 在作业运行前就要确定,此外 SLA-Utils 还反对在运行时动静注册的 TTL metric。这种 metric 通常有动静生成的日期作为前缀,在通过 TTL 的工夫之后被主动清理。TTL metric 次要能够用于做天级别工夫窗口的统计。这里比拟特地的一点是,因为 OperatorState 是不反对 TTL 的,SLA-Utils 是在每次进行 checkpoint 快照的时候进行一次过滤,剔除掉过期的 metric,以实现 TTL 的成果。

那么在 State 保留了 SLA 指标之后要做的就是裸露给用户。咱们目前的做法是通过 Accumulater 的形式来裸露,长处是 Web UI 有反对,开箱即用,同时 Flink 能够主动合并不同的 subtask 的 metric。毛病在于没有方法利用 metric reporter 来 push 到监控零碎,同时因为 Acuumulater 是不能在运行时动静登记的,所以应用 TTL metric 会有内存透露的危险。因而,在将来咱们也思考反对 metric group 来防止这些问题。

数据容错及复原

最初再分享下咱们在数据容错和复原上的实际。

以很多最佳实际类似,咱们用 SideOutput 来收集 ETL 各环节中出错的数据,汇总到一个对立的谬误流。谬误记录中蕴含咱们预设的错误码、原始输出数据以及谬误类和错误信息。个别状况下,谬误数据会被分类写入 HDFS,用户通过监控 HDFS 目录能够得悉数据是否失常。

那么存储好异样数据后,下一步就是要复原数据。这通常有两种状况。

一是数据格式异样,比方日志被截断导致不残缺或者工夫戳不合乎约定格局,这种状况下咱们个别通过离线批作业来修复数据,从新回填到原有的数据管道。

二是 ETL 管道异样,比方数据理论的 schema 有变更但流表配置没有更新,可能会导致某个字段都是空值,这时咱们的解决方法是:首先更新线上的流表配置为最新,保障不再产生更多异样数据,这时 Hive 外面仍有局部分区是异样的。而后,咱们公布一个独立的补数作业来专门修复异样的数据,输入的数据会写到一个长期的目录,并在 hive metastore 上切换 partition 分区的 location 来替换掉原来的异样目录。因而这样的一个补数流程对离线查问的用户来说是通明的。最初咱们再在适合的工夫替换掉异样分区的数据并复原 location。

五. 将来布局

最初介绍下咱们的将来布局。

  • 第一个是数据湖的反对。目前咱们的日志绝大多数都是 append 类型,不过随着 CDC 和 Flink SQL 业务的欠缺,咱们可能会有更多的 update、delete 的需要,因而数据湖是一个很好的抉择。
  • 第二个会提供更加丰盛的附加性能,比方实时的数据去重和小文件的主动合并。这两个都是对业务方十分实用的性能。
  • 最初是一个反对 PyFlink。目前咱们的 Python 反对只笼罩到数据集成阶段,后续数据仓库的 Python 反对咱们是心愿通过 PyFlink 来实现。
退出移动版