共计 5153 个字符,预计需要花费 13 分钟才能阅读完成。
作者:闻乃松
将 Flink 流式计算引擎作为实时计算作业的标配带来的重大挑战之一是资源占用问题,因为大量的 Flink 作业一旦启动,将始终占用调配的 CPU 和内存资源,而不会主动开释(除非是批处理模式),并且不会随着解决负载主动伸缩。本文探讨如何在计算资源无限的状况下,如何最大化利用计算资源。基本思路是将流式计算工作中基于工夫触发的子过程转化为离线定时调度计算工作。上面列举几种常见的场景,姑且当做验证。
一、Kafka 数据入湖(仓)作业
比方这样一个 kafka topic 数据导入示例,将拉取的数据不加解决地写入到 Iceberg 表:
CREATE TABLE KafkaTable (`event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format
`origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- from Kafka connector
`offset` BIGINT METADATA VIRTUAL, -- from Kafka connector
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
INSERT INTO `hive_catalog`.`default`.`sample` SELECT * from KafkaTable; -- insert into iceberg
如果对拉取的频率要求不高,这种状况是能够转换为定时拉取,比方 1 分钟拉取一批,写入到 iceberg,然而须要本人实现拉取的并发管制、状态保留、提交 Iceberg 的事务性、生产负载平衡等性能。
二、S3 对象告诉事件处理作业
基于 S3 对象存储,用户可能实时监听对象的 CREATE、PUT、Remove、Expiration 或者 Replication 等事件告诉,通过应用形如上面的流程能够省去基于 Flink FileSystem 加载数据的过程。下图是基于 SQS 和 Lambda 函数的一种架构案例,实际上 SQS 和 Lambda 都能够替换为自定义的处理程序,因为每个事件处理都是小处理单元,能够短时间疾速实现,这时候事件告诉很适宜间接对接基于事件触发的调度零碎(如 Celery),而不须要启动一个实时 Flink 作业,但因为 S3 事件告诉只投递一次,用户程序须要保障音讯解决的服务等级。
三、CDC 近实时作业入湖(仓)作业
CDC 数据同 Kafka 数据类似,对容许高提早(比方分钟级)的场景也能够转为定时拉取的形式,用户程序定期向服务器发送 Dump 指令,承受 binlog 数据,而后再存储到其余介质,比方 Canal Server 模仿数据库 Slave 同数据库 Master 交互,将收到的 binlog 暂存内存 RingBuffer 或者路由到指定的 Kafka topic,Canal Client 定期从 Canal Server 或者 Kafka 拉取数据,如下图所示:
这里的 canal client 就能够依据须要替换 Flink,比方间接将生产到的数据提交到数据湖 Iceberg,用户程序须要实现的是并发管制、异样复原、生产负载平衡、数据合并与去重等性能。Canal Server 通过多实例部署,加强了 HA 性能,然而整体架构过于简单,而 Debezium 则更为简洁,它提供 Embeded 的形式,用户能够基于 Debezium API 间接读取并生产 binlog,开发自由度大大提高,也不须要依赖 Kafka、Server 等三方服务。
四、JDBC 增量数据入湖(仓)作业
因为 JDBC 的数据读取形式对业务数据库影响较大,不适宜做高频低提早的批量拉取,因而人造适宜低频、增量拉取,在这种场合下,很适宜代替 Flink JDBC Connector 的作业形式。增量拉取能够基于数据库的批改工夫字段、自增 ID 字段。这种用法在离线数仓开发过程中广泛应用,比方每天拉取前一天的增量数据,并保留到 Hive 数仓中的一个分区,因为同一条记录可能存在多个版本,因而须要上游实现基于主键的数据去重。另外,上次拉取地位等状态也须要用户本人实现。
五、Flink 批处理或有界流解决作业
Flink 批处理因为自身会完结并开释资源,所以不必放心资源占用问题,然而基于纯 SQL 形式的作业无奈自适应资源弹性伸缩,如果可能转为离线 Hive SQL,可能在 YARN 等资源管理框架中最大化利用计算资源。比方上面的有界流解决上的一个示例 SQL 作业:
select a,count(1) from t group by a
因为其计算结果随着数据的到来实时触发,并更新后果,其实在容许高提早的场景下,上述过程 能够等数据全副到来一次性计算一次,或者以肯定的工夫距离增量计算。这种形式适宜任意数据的规模计算,增量形式尤其适宜大规模数据集,然而须要用户保障每次增量计算结果的幂等性。
六、Flink 无界流解决作业
无界流解决因为数据集大小有限,因而常基于部分窗口进行计算,防止内存移除问题,这里举两个示例阐明。
A. Flink 简略窗口计算作业
Flink 简略窗口计算只解决一个流,不波及到跨流计算,在单流窗口计算过程中,当一个新的窗口开启,数据先被缓存到内存,当窗口工夫达到右边际,触发窗口内的计算,而后再依据策略要求清理窗口内的缓存数据,如果窗口工夫范畴过长,会占用较大存储空间,此时就能够将实时计算转为定时增量计算,比方上面的窗口计算,每 10 分钟计算以后窗口的记录数量:
select count(a) from t group by tumble(ts, interval '10' minute)
该过程同以下计算等价:
select count(a) from t where ts between ($scheduleTime -'10' minute) and $scheduleTime
其中 $scheduleTime 为调度工夫,离线计算相比之前的实时计算,可能无效缩小作业的累计持续时间。
B. Flink window join 计算作业
window join 计算作业是无界数据流中罕用模式,基于 Window 的 Flink 双流 Join 的 API 语法为:
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
比方上面将 Orders 订单表和 Shipments 运输单表根据订单 id 和运输单的订单 id Join 的查问案例:
shipStream.join(orderStream)
.where(ship=>ship.orderId)
.equalTo(order=> order.id)
.window(TumblingEventTimeWindows.of(Time.hours(4)))
.apply((s,o)=>{(s.orderId,o.id,...)
})
其中 join 条件是 ship.orderId=order.id,窗口为 4 hour 长度的滚动窗口:这个示例用 SQL 形容就是:
select * from Orders o,Shipments s
where o.id=s.orderId and s.shipTime between o.orderTime
and o.orderTime+interval '4' hour
查问为 Orders 表设置了 o.ordertime > s.shiptime- INTERVAL‘4’HOUR 的工夫下界:
上述 SQL 跟以下基于离线调度的形式等价:
select *
from Orders o,Shipments s
where o.id=s.orderId
and s.shipTime between $scheduleTime and $scheduleTime+4 hour
and o.orderTime between $scheduleTime and $scheduleTime+4 hour
通过离线解决转换后的工作就能够依照增量计算了,此时也须要用户去解决数据的合并、更新与幂等性等问题。
七、Iceberg 湖数据增量生产作业
形如上面的增量生产 Iceberg 湖数据的作业,如果 monitor-interval 达到分钟级以上的提早,也是能够转为离线增量计算作业:
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='10m', 'start-snapshot-id'='3821550127947089987')*/ ;
转换形式能够通过批改 OPTIONS,减少计算条件等,以上述示例其转换后的示例能够是:
SELECT * FROM sample where processing_time between $scheduleTime and ($scheduleTime+10 minute)
/*+ OPTIONS('streaming'='false', 'start-snapshot-id'='3821550127947089987')*/ ;
其中的工夫过滤字段能够依据应用上下文的设置决定应用 processing_time 还是 Event_time。这里为放弃 SQL 语法兼容性,持续应用 FLinkSQL,如果没有非凡语法,也能够转为其余计算引擎反对的 SQL。
小结
本文梳理了实时计算中,可能将实时计算转为离线计算的几个场景,通过转换后的比照可见,离线计算基于有界数据计算实现之后开释资源,尽管累计计算资源不会少,然而开释了大量闲暇期间的资源占用比例,增大了资源的整体利用率,同时转为离线计算有了更多的计算引擎选型空间,另外在可抢占资源分配形式的资源管理框架中(如 YARN),可能解决 Flink 实时作业应用资源无奈伸缩的问题。
将实时作业转为离线作业时,咱们除了须要解决离线调度的资源隔离与调配问题,还须要解决工作依赖问题,比方再来看下面基于窗口 Join 的案例:
shipStream.join(orderStream)
.where(ship=>ship.orderId)
.equalTo(order=> order.id)
.window(TumblingEventTimeWindows.of(Time.hours(4)))
.apply((s,o)=>{(s.orderId,o.id,...)
})
上述实时计算作业转为离线作业时,其实存在着这样的依赖 T 拓扑构造:
假如 Shipments 增量更新频率为每小时一次,Orders 更新频率为 4 小时一次,那么 Join 计算工作的更新频率设置多少适合呢?实际上 Join 的频率实践上以被依赖工作中较小者为准比拟适合,然而在理论面向用户的利用上,很难做到精确,除非零碎把实时转离线的过程以及依赖的确立工作齐全自动化。尽管如此,咱们依然须要解决不同调度周期依赖的问题:
- OIN 工作计算频率跟 Shipments 雷同,也是 1 小时一次。那么 JOIN 齐全依赖 Shipments,即当以后周期的 Shipments 计算实现才可能计算 JOIN。而 JOIN 频率高于 Orders,只须要利用最新计算结果数据即可,可看做 JOIN 弱依赖 Orders。
- JOIN 工作计算频率跟 Orders 雷同,也是 4 小时一次。那么 JOIN 齐全依赖 Orders,必须等以后周期的 Orders 计算实现才可能执行 JOIN,然而 JOIN 频率低于 Shipments,实践上能够不依赖 Shipments,然而理论中可能存在业务上的依赖关系,比方一个日周期类型的工作必须等小时工作执行满 24 周期才能够执行进行小时级别的数据后果汇总,那么此时能够将高频工作依照低频工作周期对齐。
将上述问题利用到数仓开发中,就是上面常见的依赖模型:
其中视图工作能够看做业务数据到数仓 Staging area 的映射,ODS 是基于视图数据进行贴源层的计算,DWD 再基于 ODS 做数据明细建模等,上述每个节点的工作计算频率都不一样,参照下面的两种状况很容易实现这种依赖调度。