作者:闻乃松

将Flink流式计算引擎作为实时计算作业的标配带来的重大挑战之一是资源占用问题,因为大量的Flink作业一旦启动,将始终占用调配的CPU和内存资源,而不会主动开释(除非是批处理模式),并且不会随着解决负载主动伸缩。本文探讨如何在计算资源无限的状况下,如何最大化利用计算资源。基本思路是将流式计算工作中基于工夫触发的子过程转化为离线定时调度计算工作。上面列举几种常见的场景,姑且当做验证。

一、Kafka数据入湖(仓)作业

比方这样一个kafka topic数据导入示例,将拉取的数据不加解决地写入到Iceberg表:

CREATE TABLE KafkaTable (  `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format  `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format  `partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,  -- from Kafka connector  `offset` BIGINT METADATA VIRTUAL,  -- from Kafka connector  `user_id` BIGINT,  `item_id` BIGINT,  `behavior` STRING) WITH (  'connector' = 'kafka',  'topic' = 'user_behavior',  'properties.bootstrap.servers' = 'localhost:9092',  'properties.group.id' = 'testGroup',  'scan.startup.mode' = 'earliest-offset',  'value.format' = 'debezium-json');INSERT INTO `hive_catalog`.`default`.`sample` SELECT * from KafkaTable; -- insert into iceberg

如果对拉取的频率要求不高,这种状况是能够转换为定时拉取,比方1分钟拉取一批,写入到iceberg,然而须要本人实现拉取的并发管制、状态保留、提交Iceberg的事务性、生产负载平衡等性能。

二、S3对象告诉事件处理作业

基于S3对象存储,用户可能实时监听对象的CREATE、PUT、Remove、Expiration或者Replication等事件告诉,通过应用形如上面的流程能够省去基于Flink FileSystem 加载数据的过程。下图是基于SQS和Lambda 函数的一种架构案例,实际上 SQS和Lambda 都能够替换为自定义的处理程序,因为每个事件处理都是小处理单元,能够短时间疾速实现,这时候事件告诉很适宜间接对接基于事件触发的调度零碎(如Celery),而不须要启动一个实时Flink作业,但因为S3事件告诉只投递一次,用户程序须要保障音讯解决的服务等级。

三、CDC 近实时作业入湖(仓)作业

CDC数据同Kafka数据类似,对容许高提早(比方分钟级)的场景也能够转为定时拉取的形式,用户程序定期向服务器发送Dump指令,承受binlog数据,而后再存储到其余介质,比方Canal Server模仿数据库Slave同数据库Master交互,将收到的binlog暂存内存RingBuffer或者路由到指定的Kafka topic,Canal Client定期从Canal Server或者Kafka拉取数据,如下图所示:

这里的canal client就能够依据须要替换Flink,比方间接将生产到的数据提交到数据湖Iceberg,用户程序须要实现的是并发管制、异样复原、生产负载平衡、数据合并与去重等性能。Canal Server通过多实例部署,加强了HA性能,然而整体架构过于简单,而Debezium则更为简洁,它提供Embeded的形式,用户能够基于Debezium API间接读取并生产binlog,开发自由度大大提高,也不须要依赖Kafka、Server等三方服务。

四、JDBC 增量数据入湖(仓)作业

因为JDBC的数据读取形式对业务数据库影响较大,不适宜做高频低提早的批量拉取,因而人造适宜低频、增量拉取,在这种场合下,很适宜代替Flink JDBC Connector的作业形式。增量拉取能够基于数据库的批改工夫字段、自增ID字段。这种用法在离线数仓开发过程中广泛应用,比方每天拉取前一天的增量数据,并保留到Hive数仓中的一个分区,因为同一条记录可能存在多个版本,因而须要上游实现基于主键的数据去重。另外,上次拉取地位等状态也须要用户本人实现。

五、Flink批处理或有界流解决作业

Flink批处理因为自身会完结并开释资源,所以不必放心资源占用问题,然而基于纯SQL形式的作业无奈自适应资源弹性伸缩,如果可能转为离线Hive SQL,可能在YARN等资源管理框架中最大化利用计算资源。比方上面的有界流解决上的一个示例SQL作业:

select a,count(1) from t group by a

因为其计算结果随着数据的到来实时触发,并更新后果,其实在容许高提早的场景下,上述过程 能够等数据全副到来一次性计算一次,或者以肯定的工夫距离增量计算。这种形式适宜任意数据的规模计算,增量形式尤其适宜大规模数据集,然而须要用户保障每次增量计算结果的幂等性。

六、Flink无界流解决作业

无界流解决因为数据集大小有限,因而常基于部分窗口进行计算,防止内存移除问题,这里举两个示例阐明。

A. Flink简略窗口计算作业

Flink简略窗口计算只解决一个流,不波及到跨流计算,在单流窗口计算过程中,当一个新的窗口开启,数据先被缓存到内存,当窗口工夫达到右边际,触发窗口内的计算,而后再依据策略要求清理窗口内的缓存数据,如果窗口工夫范畴过长,会占用较大存储空间,此时就能够将实时计算转为定时增量计算,比方上面的窗口计算,每10分钟计算以后窗口的记录数量:

select count(a) from t group by tumble(ts, interval '10' minute)

该过程同以下计算等价:

select count(a) from t where ts between ($scheduleTime -'10' minute) and $scheduleTime

其中$scheduleTime为调度工夫,离线计算相比之前的实时计算,可能无效缩小作业的累计持续时间。

B. Flink window join计算作业

window join计算作业是无界数据流中罕用模式,基于Window的Flink双流Join的 API语法为:

stream.join(otherStream).where(<KeySelector>) .equalTo(<KeySelector>) .window(<WindowAssigner>) .apply(<JoinFunction>)

比方上面将 Orders 订单表和 Shipments 运输单表根据订单id和运输单的订单id Join 的查问案例:

shipStream.join(orderStream)            .where(ship=>ship.orderId)            .equalTo(order=> order.id)            .window(TumblingEventTimeWindows.of(Time.hours(4)))            .apply((s,o)=>{                (s.orderId,o.id,...)            })

其中 join条件是ship.orderId=order.id,窗口为4 hour 长度的滚动窗口:这个示例用SQL形容就是:

select *   from Orders o,Shipments s where o.id=s.orderId and s.shipTime between o.orderTimeand o.orderTime+interval '4' hour

查问为 Orders 表设置了 o.ordertime > s.shiptime- INTERVAL ‘4’ HOUR 的工夫下界:

上述SQL跟以下基于离线调度的形式等价:

select * from Orders o,Shipments s where o.id=s.orderIdand s.shipTime between $scheduleTime and $scheduleTime+4 hourand o.orderTime between $scheduleTime and $scheduleTime+4 hour

通过离线解决转换后的工作就能够依照增量计算了,此时也须要用户去解决数据的合并、更新与幂等性等问题。

七、Iceberg湖数据增量生产作业

形如上面的增量生产Iceberg湖数据的作业,如果monitor-interval达到分钟级以上的提早,也是能够转为离线增量计算作业:

SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='10m', 'start-snapshot-id'='3821550127947089987')*/ ;

转换形式能够通过批改OPTIONS,减少计算条件等,以上述示例其转换后的示例能够是:

SELECT * FROM sample where processing_time between  $scheduleTime and ($scheduleTime+10 minute) /*+ OPTIONS('streaming'='false', 'start-snapshot-id'='3821550127947089987')*/ ;

其中的工夫过滤字段能够依据应用上下文的设置决定应用processing_time还是Event_time。这里为放弃SQL语法兼容性,持续应用FLinkSQL,如果没有非凡语法,也能够转为其余计算引擎反对的SQL。

小结

本文梳理了实时计算中,可能将实时计算转为离线计算的几个场景,通过转换后的比照可见,离线计算基于有界数据计算实现之后开释资源,尽管累计计算资源不会少,然而开释了大量闲暇期间的资源占用比例,增大了资源的整体利用率,同时转为离线计算有了更多的计算引擎选型空间,另外在可抢占资源分配形式的资源管理框架中(如YARN),可能解决Flink实时作业应用资源无奈伸缩的问题。

将实时作业转为离线作业时,咱们除了须要解决离线调度的资源隔离与调配问题,还须要解决工作依赖问题,比方再来看下面基于窗口Join的案例:

shipStream.join(orderStream).where(ship=>ship.orderId) .equalTo(order=> order.id) .window(TumblingEventTimeWindows.of(Time.hours(4))).apply((s,o)=>{ (s.orderId,o.id,...)})

上述实时计算作业转为离线作业时,其实存在着这样的依赖T拓扑构造:

假如Shipments增量更新频率为每小时一次,Orders更新频率为4小时一次,那么Join计算工作的更新频率设置多少适合呢?实际上Join的频率实践上以被依赖工作中较小者为准比拟适合,然而在理论面向用户的利用上,很难做到精确,除非零碎把实时转离线的过程以及依赖的确立工作齐全自动化。尽管如此,咱们依然须要解决不同调度周期依赖的问题:

  1. OIN工作计算频率跟Shipments雷同,也是1小时一次。那么JOIN齐全依赖Shipments,即当以后周期的Shipments计算实现才可能计算JOIN。而JOIN频率高于Orders,只须要利用最新计算结果数据即可,可看做JOIN弱依赖Orders。
  2. JOIN工作计算频率跟Orders雷同,也是4小时一次。那么JOIN齐全依赖Orders,必须等以后周期的Orders计算实现才可能执行JOIN,然而JOIN频率低于Shipments,实践上能够不依赖Shipments,然而理论中可能存在业务上的依赖关系,比方一个日周期类型的工作必须等小时工作执行满24周期才能够执行进行小时级别的数据后果汇总,那么此时能够将高频工作依照低频工作周期对齐。

将上述问题利用到数仓开发中,就是上面常见的依赖模型:

其中视图工作能够看做业务数据到数仓Staging area的映射,ODS是基于视图数据进行贴源层的计算,DWD再基于ODS做数据明细建模等,上述每个节点的工作计算频率都不一样,参照下面的两种状况很容易实现这种依赖调度。