乐趣区

关于flink:踩坑记-Flink-天级别窗口中存在的时区问题

本系列每篇文章都是从一些理论的 case 登程,剖析一些生产环境中常常会遇到的问题,抛砖引玉,以帮忙小伙伴们解决一些理论问题。本文介绍 Flink 工夫以及时区问题,剖析了在天级别的窗口时会遇到的时区问题,如果对小伙伴有帮忙的话,欢送点赞 + 再看~
本文次要分为两局部:
第一局部(第 1 – 3 节)的剖析次要针对 flink,剖析了 flink 天级别窗口的中存在的时区问题以及解决方案。
第二局部(第 4 节)的剖析能够作为所有时区问题的剖析思路,次要以解决方案中的时区偏移量为什么是加 8 小时为案例做了通用的深度解析。
为了让读者能对本文探讨的问题有一个大抵理解,本文先给出问题 sql,以及解决方案。后文给出具体的剖析~

1. 问题以及解决方案

问题 sql

sql 很简略,用来统计当天累计 uv。

--------------- 伪代码 ---------------
INSERT INTO
 kafka_sink_tableSELECT
 -- 窗口开始工夫 CAST(TUMBLE_START(proctime, INTERVAL '1' DAY) AS bigint
 ) AS window_start, -- 以后记录解决的工夫 cast(max(proctime) AS BIGINT) AS current_ts,
 -- 每个桶内的 uv count(DISTINCT id) AS part_daily_full_uv
FROM
 kafka_source_tableGROUP BY
 mod(id, bucket_number),
 -- bucket_number 为常数,依据具体场景指定具体数值 TUMBLE(proctime, INTERVAL '1' DAY)--------------- 伪代码 ---------------

你是否能一眼看出这个 sql 所存在的问题?(PS:数据源以及数据汇时区都为东八区)
没错,天级别窗口所存在的时区问题,即这段代码统计的不是楼主所在东八区一整天数据的 uv,这段代码统计的一整天的范畴在东八区是第一天早 8 点至第二天早 8 点。

解决方案

楼主目前所处时区为东八区,解决方案如下:

--------------- 伪代码 ---------------
CREATE VIEW view_table AS
SELECT
 id, -- 通过注入工夫解决 -- 加上东八区的工夫偏移量,设置注入工夫为工夫戳列 CAST(CURRENT_TIMESTAMP AS BIGINT) * 1000 + 8 * 60 * 60 * 1000 as ingest_time
FROM 
   source_table;
INSERT INTO
 target_tableSELECT
 CAST(TUMBLE_START(ingest_time, INTERVAL '1' DAY) AS bigint
 ) AS window_start, cast(max(ingest_time) AS BIGINT) - 8 * 3600 * 1000 AS current_ts,
 count(DISTINCT id) AS part_daily_full_uv
FROM
 view_tableGROUP BY
 mod(id, 1024),
 -- 依据注入工夫划分天级别窗口 TUMBLE(ingest_time, INTERVAL '1' DAY)
--------------- 伪代码 ---------------

通过上述计划,就能够将统计的数据工夫范畴调整为东八区的今日 0 点至明日 0 点。下文具体阐明整个需要场景以及解决方案的实现和剖析过程。

2. 需要场景以及实现计划

需要场景

coming,需要场景比较简单,就是生产上游的一个埋点日志数据源,依据埋点中的 id 统计当天 0 点至以后时刻的累计 uv,依照分钟级别产出到上游 OLAP 引擎中进行简略的聚合,最初在 BI 看板进行展现,没有任何维度字段(打动到哭????)。

数据链路以及组件选型

客户端用户行为埋点日志 -> logServer -> kafka -> flink(sql)-> kafka -> druid -> BI 看板。
实现计划以及具体的实现形式很多,这次应用的是 sql API。

flink sql schema

source 和 sink 表 schema 如下(只保留关键字段):

--------------- 伪代码 ---------------
CREATE TABLE kafka_sink_table (
 -- 天级别窗口开始工夫 window_start BIGINT,
 -- 以后记录解决的工夫 current_ts BIGINT,
 -- 每个桶内的 uv(处理过程对 id 进行了分桶)part_daily_full_uv BIGINT
) WITH (-- ...);
CREATE TABLE kafka_source_table (
 -- ...  -- 须要进行 uv 计算的 id
 id BIGINT,
 -- 解决工夫 proctime AS PROCTIME()) WITH (-- ...);
--------------- 伪代码 ---------------

flink sql transform

--------------- 伪代码 ---------------
INSERT INTO
 kafka_sink_tableSELECT
 -- 窗口开始工夫 CAST(TUMBLE_START(proctime, INTERVAL '1' DAY) AS bigint
 ) AS window_start, -- 以后记录解决的工夫 cast(max(proctime) AS BIGINT) AS current_ts,
 -- 每个桶内的 uv count(DISTINCT id) AS part_daily_full_uv
FROM
 kafka_source_tableGROUP BY
 mod(id, bucket_number),
 -- bucket_number 为常数,依据具体场景指定具体数值 TUMBLE(proctime, INTERVAL '1' DAY)--------------- 伪代码 ---------------

应用 early-fire 机制(同 DataStream API 中的 ContinuousProcessingTimeTrigger),并设定触发距离为 60 s。
在上述实现 sql 中,咱们对 id 进行了分桶,那么每分钟输入的数据条数即为 bucket_number 条,最终在 druid 中依照分钟粒度将所有桶的数据进行 sum 聚合,即可失去从当天 0 点累计到以后分钟的全量 uv。

时区问题

激情场景还原:

头文字 ∩ 技术小哥哥:应用 sql,easy game,闲坐摸鱼 …

头文字 ∩ 技术小哥哥:等到 00:00 时,发现指标还在不停地往上涨,难道是 sql 逻辑错了,不应该啊,试过分钟,小时级别窗口都木有这个问题

头文字 ∩ 技术小哥哥:抠头 ing,算了,稍后再剖析这个问题吧,当初还有闲事要干????

头文字 ∩ 技术小哥哥:到了早上,瞅了一眼配置的工夫序列报表,发现在 08:00 点的时候指标归零,从新开始累计。想法一闪而过,东八区?(过后为啥没 format 下 sink 数据中的 window_start…)

3. 问题定位

问题阐明

flink 在应用工夫的这个概念的时候是基于 java 工夫纪元(即格林威治 1970/01/01 00:00:00,也即 Unix 工夫戳为 0)概念的,窗口对齐以及触发也是基于 java 工夫纪元。

问题场景复现

能够通过间接查看 sink 数据的 window_start 得出上述论断。
但为了还原整个过程,咱们依照如下 source 和 sink 数据进行整个问题的复现:
source 数据如下:

id proctime proctime UTC + 0(格林威治)格式化工夫 proctime UTC + 8(北京)格式化工夫
1 1599091140000 2020/09/02 23:59:00 2020/09/03 07:59:00
2 1599091140000 2020/09/02 23:59:00 2020/09/03 07:59:00
3 1599091140000 2020/09/02 23:59:00 2020/09/03 07:59:00
1 1599091200000 2020/09/03 00:00:00 2020/09/03 08:00:00
2 1599091200000 2020/09/03 00:00:00 2020/09/03 08:00:00
3 1599091260000 2020/09/03 00:01:00 2020/09/03 08:01:00

sink 数据(为了不便了解,间接依照 druid 聚合之后的数据展现):

window_start current_ts part_daily_full_uv window_start UTC + 8(北京)格式化工夫 current_ts UTC + 8(北京)格式化工夫
1599004800000 1599091140000 3 2020/09/02 08:00:00 2020/09/03 07:59:00
1599091200000 1599091200000 2 2020/09/03 08:00:00 2020/09/03 08:00:00
1599091200000 1599091260000 3 2020/09/03 08:00:00 2020/09/03 08:01:00

从上述数据能够发现,天级别窗口 开始工夫 在 UTC + 8(北京)的时区是每天早上 8 点,即 UTC + 0(格林威治)的凌晨 0 点。
下文先给出解决方案,而后具体解析各个工夫以及时区概念~

解决方案

  • 框架层面解决:Blink Planner 反对时区设置
  • sql 层面解决:从 sql 实现层面给出解决方案

sql 层面解决方案

--------------- 伪代码 ---------------
CREATE VIEW view_table AS
SELECT
 id, -- 通过注入工夫解决 -- 加上东八区的工夫偏移量,设置注入工夫为工夫戳列 CAST(CURRENT_TIMESTAMP AS BIGINT) * 1000 + 8 * 60 * 60 * 1000 as ingest_time
FROM 
   source_table;
INSERT INTO
 target_tableSELECT
 CAST(TUMBLE_START(ingest_time, INTERVAL '1' DAY) AS bigint
 ) AS window_start, cast(max(ingest_time) AS BIGINT) - 8 * 3600 * 1000 AS current_ts,
 count(DISTINCT id) AS part_daily_full_uv
FROM
 view_tableGROUP BY
 mod(id, 1024),
 -- 依据注入工夫划分天级别窗口 TUMBLE(ingest_time, INTERVAL '1' DAY)
--------------- 伪代码 ---------------

我目前所属的时区是东八区(北京工夫),通过上述 sql,设置注入工夫,并对注入工夫加上 8 小时的偏移量进行天级别窗口的划分,就能够对此问题进行解决(也能够在 create table 时,在 schema 中依据计算列增加对应的注入工夫戳进行解决)。如果你在 sql 层面有更好的解决方案,欢送探讨~

Notes:

  • 东 n 区的解决方案就是工夫戳 +n 3600 秒的偏移量,西 n 区的解决方案就是工夫戳 -n 3600 秒的偏移量
  • DataStream API 存在雷同的天级别窗口时区问题

这里提出一个问题,为什么东八区是须要在工夫戳上加 8 小时偏移量进行天级别窗口计算,而不是减 8 小时或是加上 32(24 + 8)小时,小伙伴们有详细分析过嘛~
根据上述问题,引出本文的第二大部分,即深度解析时区偏移量问题,这部分能够作为所有时区问题的剖析思路。

4. 为什么东八区是加 8 小时?

工夫和时区基本概念

时区 :因为世界各国家与地区经度不同,中央时也有所不同,因而会划分为不同的时区。
Unix 工夫戳(Unix timestamp)”):Unix 工夫戳(Unix timestamp),或称 Unix 工夫(Unix time)、POSIX 工夫(POSIX time),是一种工夫示意形式,定义为从格林威治工夫 1970 年 01 月 01 日 00 时 00 分 00 秒(UTC/GMT 的午夜)起至当初的总秒数。
Unix 工夫戳不仅被应用在 Unix 零碎、类 Unix 零碎中,也在许多其余操作系统中被宽泛采纳。
GMT:Greenwich Mean Time 格林威治规范工夫。这是以英国格林威治天文台观测后果得出的工夫,这是英国格林威治当地工夫,这个中央的当地工夫过来被当成世界规范的工夫。
UT:Universal Time 世界时。依据原子钟计算出来的工夫。
UTC:Coordinated Universal Time 协调世界时。因为地球自转越来越慢,每年都会比前一年多出零点几秒,每隔几年协调世界时组织都会给世界时 +1 秒,让基于原子钟的世界时和基于天文学(人类感知)的格林威治规范工夫相差不至于太大。并将失去的工夫称为 UTC,这是当初应用的世界规范工夫。
协调世界时不与任何地区地位相干,也不代表此刻某地的工夫,所以在阐明某地工夫时要加上时区也就是说 GMT 并不等于 UTC,而是等于 UTC + 0,只是格林威治刚好在 0 时区上。

文言工夫和时区

过后看完这一系列的工夫以及时区阐明之后我大脑其实是一片空白。…ojbk…,我用本人当初的一些了解,尝试将上述所有波及到工夫的概念解释一下。

  • GMT:格林威治规范工夫。
  • UTC:基于原子钟协调之后的世界规范工夫。能够认为 UTC 工夫和格林威治规范工夫统一。即 GMT = UTC + 0,其中 0 代表格林威治为 0 时区。
  • 时区:逆向思维来解释下(只从技术层面解释,不从其余简单层面解释),没有时区划分代表着全世界都是同一时区,那么同一时刻看到的外显工夫是一样的。举个????:如果全世界都依照格林威治工夫作为对立工夫,在格林威治工夫 0 点时,对于北京和加拿大的两个同学来说,这两个同学感知到的是北京是太阳刚刚升起(凌晨),加拿大是太阳刚刚落下(黄昏)。

然而因为没有时区划分,这两个同学看到的工夫都是 0 点,因而这是不合乎人类对 感知到的工夫 和本人 看到的工夫 的了解的。所以划分时区之后,能够满足北京(东八区 UTC + 8)同学看到的工夫是上午 8 点,加拿大(西四区 UTC – 4)同学看到的工夫是下午 8 点。留神时区的划分是和 UTC 绑定的。东八区即 UTC + 8。

  • flink 工夫:flink 应用的工夫基于 java 工夫纪元(GMT 1970/01/01 00:00:00,UTC + 0 1970/01/01 00:00:00)。
  • Unix 工夫戳:世界上任何一个中央,同时接管到的数据的对应的 Unix 工夫戳都是雷同的,相似时区中咱们举的不分时区的????,全世界同一时刻的 Unix 工夫戳统一。
  • Unix 工夫戳为 0:对应的格林威治工夫:1970-01-01 00:00:00,对应的北京工夫(东八区):1970-01-01 08:00:00**

概念关系如图所示:

为什么东八区是加 8 小时?

下述表格只对一些重要的工夫进行了标注:


拿第一条数据解释下,其代表在北京工夫 1970/01/01 00:00:00 时,生成的一条数据所携带的 Unix 工夫戳为 -8 * 3600。
依据需要和上图和上述表格内容,咱们能够失去如下推导过程:

  • 需要场景是统计一个终日的 uv,即天级别窗口,比方统计北京工夫 1970/01/01 00:00:00 – 1970/01/02 00:00:00 范畴的数据时,这个日期范畴内的数据所携带的 Unix 工夫戳范畴为 -8 3600 到 16 3600
  • 对于 flink 来说,默认状况下它所能统计的一个终日的 Unix 工夫戳的范畴是 0 到 24 * 3600
  • 所以当咱们想通过 flink 实现正确统计北京工夫(1970/01/01 00:00:00 – 1970/01/02 00:00:00)范畴内的数据时,即统计 Unix 工夫戳为 -8 3600 到 16 3600 的数据时,就须要对工夫戳做个映射。
  • 映射办法如下,就是将整体范畴内的工夫戳做在时间轴上做平移映射,就是把 -8 3600 映射到 0,16 3600 映射到 24 3600。相当于是对北京工夫的 Unix 工夫戳整体加 8 3600。
  • 最初在产出的工夫戳上把加上的 8 小时再减掉(因为外显工夫会主动依照时区对 Unix 工夫戳进行格式化)。

Notes:

  • 能够加 32 小时吗?答案是能够。在东八区,对于天级别窗口的划分,加 8 小时和加 8 + n * 24(其中 n 为整数)小时后进行的天级别窗口划分和计算的成果是一样的,flink 都会将东八区的整一天内的数据划分到一个天级别窗口内。所以加 32(8 + 24),56(8 + 48),-16(8 – 24)小时成果都雷同,上述例子只是抉择了时间轴平移最小的间隔,即 8 小时。留神某些零碎的 Unix 工夫戳为负值时会出现异常。
  • 此推理过程实用于所有遇到时区问题的场景,如果你也有其余利用场景有这个问题,也能够依照上述形式解决

Appendix

求输出 Unix 工夫戳对应的东八区每天 0 点的 Unix 工夫戳。

public static final long ONE_DAY_MILLS = 24 * 60 * 60 * 1000L;
public static long transform(long timestamp) {return timestamp - (timestamp + 8 * 60 * 60 * 1000) % ONE_DAY_MILLS;}

5. 总结

本文首先介绍了间接给出了咱们的问题 sql 和解决方案。
第二节从需要场景以及整个数据链路的实现计划登程,解释了咱们怎么应用 flink sql 进行了需要实现,并进而引出了 sql 中天级别窗口存在的时区问题。
第三节确认了天级别窗口时区问题起因,引出了 flink 应用了 java 工夫纪元,并针对此问题给出了引擎层面和 sql 层面的解决方案。也进而提出了一个问题:为什么咱们的解决方案是加 8 小时偏移量?
第四节针对加 8 小时偏移量的起因进行了剖析,并具体论述了时区,UTC,GMT,Unix 工夫戳之间的关系。
最初一节对本文进行了总结。
如果你有更不便的时区偏移量了解形式,欢送留言~

公众号 (mangodata) 里回复 flink 关键字能够获取 flink 的学习材料以及视频。

退出移动版