共计 9112 个字符,预计需要花费 23 分钟才能阅读完成。
作者:曾庆东,金地物业中级开发工程师,负责聚合营业平台实时计算开发及运维工作,从事过大数据开发,目前专一于 apache flink 实时计算,喜爱开源技术,喜爱分享。
01 我的项目背景
自己目前参加的我的项目属于公司外面数据密集、计算密集的一个重要我的项目,须要提供高效且精确的 OLAP 服务,提供灵便且实时的报表。业务数据存储在 MySQL 中,通过主从复制同步到报表库。作为团体级公司,数据增长多而且快,呈现了多个千万级、亿级的大表。为了实现各个维度的各种简单的报表业务,有些千万级大表依然须要进行 Join,计算规模十分惊人,常常不能及时响应申请。随着数据量的日益增长和实时剖析的需要越来越大,急需对系统进行流式计算、实时化革新。正是在这个背景下,开始了咱们与 Flink SQL CDC 的故事。
02 解决方案
针对平台当初存在的问题,咱们提出了把报表的数据实时化的计划。该计划次要通过 Flink SQL CDC + Elasticsearch 实现。Flink SQL 反对 CDC 模式的数据同步,将 MySQL 中的全增量数据实时地采集、预计算、并同步到 Elasticsearch 中,Elasticsearch 作为咱们的实时报表和即席剖析引擎。我的项目整体架构图如下所示:
实时报表实现具体思路是,应用 Flink CDC 读取全量数据,全量数据同步实现后,Flink CDC 会无缝切换至 MySQL 的 binlog 位点持续生产增量的变更数据,且保障不会多生产一条也不会少生产一条。读取到的账单和订单的全增量数据会与产品表做关联补全信息,并做一些预聚合,而后将聚合后果输入到 Elasticsearch,前端页面只须要到 Elasticsearch 通过精准匹配(terms)查找数据,或者再应用 agg 做高维聚合统计失去多个服务中心的报表数据。
从整体架构中,能够看到,Flink SQL 及其 CDC 性能在咱们的架构中扮演着外围角色。咱们采纳 Flink SQL CDC,而不是 Canal + Kafka 的传统架构,次要起因还是因为其依赖组件少,保护成本低,开箱即用,上手容易。具体来说 Flink SQL CDC 是一个集采集、计算、传输于一体的工具,其吸引咱们的长处有:
① 缩小保护的组件、简化实现链路;
② 缩小端到端提早;
③ 加重保护老本和开发成本;
④ 反对 Exactly Once 的读取和计算(因为咱们是账务零碎,所以数据一致性十分重要);
⑤ 数据不落地,缩小存储老本;
⑥ 反对全量和增量流式读取;
无关 Flink SQL CDC 的介绍和教程,能够观看 Apache Flink 社区公布的相干视频:https://www.bilibili.com/vide…
我的项目应用的是 flink-cdc-connectors 中提供的 mysql-cdc 组件。这是一个 Flink 数据源,反对对 MySQL 数据库的全量和增量读取。它在扫描全表前会先加一个全局读锁,而后获取此时的 binlog position,紧接着开释全局读锁。随后开始扫描全表,当全表快照读取完后,会从之前获取的 binlog position 获取增量的变更记录。因而这个读锁是十分轻量的,持锁工夫十分短,不会对线上业务造成太大影响。更多信息能够参考 flink-cdc-connectors 我的项目官网:https://github.com/ververica/…
03 我的项目运行环境与现状
咱们在生产环境搭建了 Hadoop + Flink + Elasticsearch 分布式环境,采纳的 Flink on YARN 的 per-job 模式运行,应用 RocksDB 作为 state backend,HDFS 作为 checkpoint 长久化地址,并且做好了 HDFS 的容错,保障 checkpoint 数据不失落。咱们应用 SQL Client 提交作业,所有作业对立应用纯 SQL,没有写一行 Java 代码。
目前已上线了 3 个基于 Flink CDC 的作业,已稳固在线上运行了两个星期,并且业务产生的订单实收和账单实收数据能实时聚合输入到 Elasticsearch,输入的数据准确无误。当初也正在对其余报表采纳 Flink SQL CDC 进行实时化革新,替换旧的业务零碎,让零碎数据更实时。
04 具体实现
① 进入 Flink/bin
,应用 ./sql-client.sh embedded
启动 SQL CLI 客户端。
② 应用 DDL 创立 Flink Source 和 Sink 表。这里创立的表字段个数不肯定要与 MySQL 的字段个数和程序统一,只须要筛选 MySQL 表中业务须要的字段即可,并且字段类型保持一致。
-- 在 Flink 创立账单实收 source 表
CREATE TABLE bill_info (
billCode STRING,
serviceCode STRING,
accountPeriod STRING,
subjectName STRING ,
subjectCode STRING,
occurDate TIMESTAMP,
amt DECIMAL(11,2),
status STRING,
proc_time AS PROCTIME() -–应用维表时须要指定该字段) WITH (
'connector' = 'mysql-cdc', -- 连接器
'hostname' = '******', --mysql 地址
'port' = '3307', -- mysql 端口
'username' = '******', --mysql 用户名
'password' = '******', -- mysql 明码
'database-name' = 'cdc', -- 数据库名称
'table-name' = '***'
);
-- 在 Flink 创立订单实收 source 表
CREATE TABLE order_info (
orderCode STRING,
serviceCode STRING,
accountPeriod STRING,
subjectName STRING ,
subjectCode STRING,
occurDate TIMESTAMP,
amt DECIMAL(11, 2),
status STRING,
proc_time AS PROCTIME() -–应用维表时须要指定该字段) WITH (
'connector' = 'mysql-cdc',
'hostname' = '******',
'port' = '3307',
'username' = '******',
'password' = '******',
'database-name' = 'cdc',
'table-name' = '***',
);
-- 创立科目维表
CREATE TABLE subject_info (code VARCHAR(32) NOT NULL,
name VARCHAR(64) NOT NULL,
PRIMARY KEY (code) NOT ENFORCED -- 指定主键
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxxx:xxxx/spd?useSSL=false&autoReconnect=true',
'driver' = 'com.mysql.cj.jdbc.Driver',
'table-name' = '***',
'username' = '******',
'password' = '******',
'lookup.cache.max-rows' = '3000',
'lookup.cache.ttl' = '10s',
'lookup.max-retries' = '3'
);
-- 创立实收散布后果表,把后果写到 Elasticsearch
CREATE TABLE income_distribution (
serviceCode STRING,
accountPeriod STRING,
subjectCode STRING,
subjectName STRING,
amt DECIMAL(13,2),
PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://xxxx:9200',
'index' = 'income_distribution',
'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL'
);
以上的建表 DDL 别离创立了订单实收 source 表、账单实收 source 表、产品科目维表和 Elasticsearch 后果表。建表实现后,Flink 是不会马上去同步 mysql 的数据,而是等到用户提交了一个 insert 作业后才会执行同步数据,并且 Flink 不会存储数据。咱们的第一个作业是计算支出散布,数据来源于 bill_info
和order_info
两张 MySQL 表,并且账单实收表和订单实收表都须要关联维表数据获取应收科目的最新中文名称,依照服务中心、账期、科目代码和科目名称进行分组计算实收金额的 sum 值,实收散布具体 DML 如下:
INSERT INTO income_distribution
SELECT t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName, SUM(amt) AS amt
FROM (SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt
FROM bill_info AS b
JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code
GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name
UNION ALL
SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt
FROM order_info AS b
JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code
GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name
) AS t1
GROUP BY t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName;
Flink SQL 的维表 JOIN 和双流 JOIN 写法上不太一样,对于维表,还须要在 Flink source table 上增加一个 proctime 字段 proc_time AS PROCTIME()
,关联的时候应用 FOR SYSTEM_TIME AS OF
的 SQL 语法查问时态表,意思是关联查问最新版本的维表数据。对于维表 JOIN 的应用可参阅 https://ci.apache.org/project…。
③ 在 SQL Client 执行以上作业后,YARN 会创立一个 Flink 集群运行作业,并且用户能够在 Hadoop 上查看到执行作业的所有信息,并且能进入 Flink 的 Web UI 页面查看 Flink 作业详情,以下是 Hadoop 所有作业状况。
④ 作业提交后,Flink SQL CDC 会扫描指定的 MySQL 表,在这期间 Flink 也会进行 checkpoint,所以须要依照上文所述的配置 checkpoint 的重试策略和重试次数。当数据被读取进 Flink 后,Flink 会流式地进行作业逻辑的计算,实时统计出聚合后果输入到 Elasticsearch(sink 端)。相当于咱们应用 Flink 在 MySQL 的表上保护了一个实时的物化视图,并将这个实时物化视图的后果存在了 Elasticsearch 中。在 Elasticsearch 中应用 GET /income_distribution/_search{"query": {"match_all": {}}}
命令查看输入的实收散布后果,如下图:
通过图中的后果能够看出聚合后果被实时的计算出来,并写到了 Elasticsearch 中了。
05 踩过的坑和学到的教训
-
Flink 作业原来运行在 standalone session 模式下,提交多个 Flink 作业会导致作业失败报错。
- 起因:因为 standalone session 模式下启动多个作业会导致多个作业的 Task 共享一个 JVM,可能会导致一些不稳固的问题。并且排查问题时,多个作业的日志混再一个 TaskManager 中,减少了排查的难度。
- 解决办法:采纳 YARN 的 per-job 模式启动多个作业,能有更好的隔离性
- SELECT elasticsearch table 报以下谬误
- 起因:Elasticsearch connector 目前只反对了 sink,不反对 source。所以不能 SELECT elasticsearch table。
- 在
flink-conf.yaml
里批改默认并行度,然而在 Web UI 看到作业的并行度还是 1,并行度批改不失效。
- 解决办法:在应用 SQL Client 时
sql-client-defaults.yaml
中的并行度配置的优先级更高。在sql-client-defaults.yaml
中批改并行度,或者删除sql-client-defaults.yaml
中的并行度配置。更倡议采纳后者。
- Flink 作业在扫描 MySQL 全量数据时,checkpoint 超时,呈现作业 failover,如下图:
- 起因:Flink CDC 在 scan 全表数据(咱们的实收表有 4 千万数据)须要小时级的工夫(受上游聚合反压),而在 scan 全表过程中是没有 offset 能够记录的(意味着没法做 checkpoint),然而 Flink 框架任何时候都会依照固定间隔时间做 checkpoint,所以此处 mysql-cdc source 做了比拟取巧的形式,即在 scan 全表的过程中,会让执行中的 checkpoint 始终期待甚至超时。超时的 checkpoint 会被仍未认为是 failed checkpoint,默认配置下,这会触发 Flink 的 failover 机制,而默认的 failover 机制是不重启。所以会造成下面的景象。
- 解决办法:在
flink-conf.yaml
配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
execution.checkpointing.interval: 10min # checkpoint 间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 100 # checkpoint 失败容忍次数
restart-strategy: fixed-delay # 重试策略
restart-strategy.fixed-delay.attempts: 2147483647 # 重试次数
目前 Flink 社区也有一个 issue(Flink-18578)来反对 source 被动回绝 checkpoint 的机制,未来基于该机制,能比拟优雅地解决这个问题。
- Flink 怎么样开启 YARN 的 per-job 模式?
- 解决办法:在
flink-conf.yaml
中配置execution.target: yarn-per-job
。
- 进入 SQL Client 创立 table 后,在另外一个节点进入 SQL Client 查问不到 table。
- 起因:因为 SQL Client 默认的 catalog 是在 in-memory 的,不是长久化 Catalog,所以这属于失常景象,每次启动 Catalog 外面都是空的。
- 作业在运行时 Elasticsearch 报如下谬误:
Caused by: org.apache.Flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=illegal_argument_exception, reason=mapper [amt] cannot be changed from type [long] to [float]]
- 起因:数据库表的字段 amt 的类型是 decimal,DDL 创立输入到 es 的 amt 字段的类型也是 decimal,因为输入到 es 的第一条数据的 amt 如果是整数,比方是 10,输入到 es 的类型是 long 类型的,es client 会主动创立 es 的索引并且设置 amt 字段为 long 类型的格局,那么如果下一次输入到 es 的 amt 是非整数 10.1,那么输入到 es 的时候就会呈现类型不匹配的谬误。
- 解决办法:手动生成 es 索引和 mapping 的信息,指定好 decimal 类型的数据格式是 saclefloat,然而在 DDL 处依然能够保留该字段类型是 decimal。
- 作业在运行时 mysql cdc source 报如下谬误:
- 起因:因为数据库中别的表做了字段批改,CDC source 同步到了 ALTER DDL 语句,然而解析失败抛出的异样。
- 解决办法:在 flink-cdc-connectors 最新版本中曾经修复该问题(跳过了无奈解析的 DDL)。降级 connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替换 flink/lib 下的旧包。
- 扫描全表阶段慢,在 Web UI 呈现如下景象:
- 起因:扫描全表阶段慢不肯定是 cdc source 的问题,可能是上游节点解决太慢反压了。
- 解决办法:通过 Web UI 的反压工具排查发现,瓶颈次要在聚合节点上。通过在
sql-client-defaults.yaml
文件配上 MiniBatch 相干参数和开启 distinct 优化(咱们的聚合中有 count distinct),作业的 scan 效率失去了很大的晋升,从原先的 10 小时,晋升到了 1 小时。对于性能调优的参数能够参阅 https://ci.apache.org/project…
configuration:
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 2s
table.exec.mini-batch.size: 5000
table.optimizer.distinct-agg.split.enabled: true
- CDC source 扫描 MySQL 表期间,发现无奈往该表 insert 数据。
- 起因:因为应用的 mysql 用户未受权 RELOAD 权限,导致无奈获取全局读锁(FLUSH TABLES WITH READ LOCK),CDC source 就会进化成表级读锁,而应用表级读锁须要等到全表 scan 完,能力开释锁,所以会发现持锁工夫过长的景象,影响其余业务写入数据。
- 解决办法:给应用的 MySQL 用户授予 RELOAD 权限即可。所需的权限列表详见文档:https://github.com/ververica/…
。如果出于某些起因无奈授予 RELOAD 权限,也能够显式配上 'debezium.snapshot.locking.mode' = 'none'
来防止所有锁的获取,但要留神只有当快照期间表的 schema 不会变更才平安。
- 多个作业共用同一张 source table 时,没有批改 server id 导致读取进去的数据有失落。
- 起因:MySQL binlog 数据同步的原理是,CDC source 会伪装成 MySQL 集群的一个 slave(应用指定的 server id 作为惟一 id),而后从 MySQL 拉取 binlog 数据。如果一个 MySQL 集群中有多个 slave 有同样的 id,就会导致拉取数据错乱的问题。
- 解决办法:默认会随机生成一个 server id,容易有碰撞的危险。所以倡议应用动静参数(table hint)在 query 中笼罩 server id。如下所示:
SELECT *
FROM bill_info /*+ OPTIONS('server-id'='123456') */ ;
- 在启动作业时,YARN 接管了工作,但作业始终未启动:
- 起因:Queue Resource Limit for AM 超过了限度资源限度。默认的最大内存是 30G (集群内存) * 0.1 = 3G,而每个 JM 申请 2G 内存,当提交第二个工作时,资源就不够了。
- 解决办法:调大 AM 的 resource limit,在 capacity-scheduler.xml 配置
yarn.scheduler.capacity.maximum-am-resource-percent
,代表 AM 的占总资源的百分比,默认为 0.1,改成 0.3(依据服务器的性能灵便配置)。
- AM 过程启不来,始终被 kill 掉
- 起因:386.9 MB of 1 GB physical memory used; 2.1 GB of 2.1 GB virtual memory use。默认物理内存是 1GB,动静申请到了 1GB,其中应用了 386.9 MB。物理内存 x 2.1= 虚拟内存,1GBx2.1≈2.1GB,2.1GB 虚拟内存曾经耗尽,当虚拟内存不够时候,AM 的 container 就会他杀。
- 解决办法:两个解决方案,或调整
yarn.nodemanager.vmem-pmem-ratio
值大点,或yarn.nodemanager.vmem-check-enabled=false
,敞开虚拟内存查看。参考:https://blog.csdn.net/lzxlfly…
06 总结
为了晋升了实时报表服务的可用性和实时性,一开始咱们采纳了 Canal+Kafka+Flink 的计划,可是发现须要写比拟多的 Java 代码,而且还须要解决好 DataStream 和 Table 的转换以及 binlong 地位的获取,开发难度绝对较大。另外,须要保护 Kafka 和 Canal 这两个组件的稳固运行,对于咱们小团队来说老本也不小。因为咱们公司曾经有基于 Flink 的工作在线上运行,因而采纳 Flink SQL CDC 就成了牵强附会的事件。基于 Flink SQL CDC 的计划只须要编写 SQL,不必写一行 Java 代码就能实现实时链路的买通和实时报表的计算,对于咱们来说十分的简略易用,而且在线上运行的稳定性和性能体现也让咱们称心。
咱们正在公司内大力推广 Flink SQL CDC 的应用,也正在着手革新其余几个实时链路的工作。非常感谢开源社区能为咱们提供如此弱小的工具,也心愿 Flink CDC 越来越弱小,反对更多的数据库和性能。也再次云邪老师对于咱们我的项目上线的大力支持!