简介:Flink+Hologres 亿级用户实时 UV 准确去重最佳实际
UV、PV 计算,因为业务需要不同,通常会分为两种场景:
• 离线计算场景:以 T + 1 为主,计算历史数据
• 实时计算场景:实时计算日常新增的数据,对用户标签去重
针对离线计算场景,Hologres 基于 RoaringBitmap,提供超高基数的 UV 计算,只需进行一次最细粒度的预聚合计算,也只生成一份最细粒度的预聚合后果表,就能达到亚秒级查问。具体详情能够参见往期文章 >>Hologres 如何反对超高基数 UV 计算(基于 RoaringBitmap 实现)
对于实时计算场景,能够应用 Flink+Hologres 形式,并基于 RoaringBitmap,实时对用户标签去重。这样的形式,能够较细粒度的实时失去用户 UV、PV 数据,同时便于依据需要调整最小统计窗口(如最近 5 分钟的 UV),实现相似实时监控的成果,更好的在大屏等 BI 展现。相较于以天、周、月等为单位的去重,更适宜在流动日期进行更细粒度的统计,并且通过简略的聚合,也能够失去较大工夫单位的统计后果。
- Flink 将流式数据转化为表与维表进行 JOIN 操作,再转化为流式数据。此举能够利用 Hologres 维表的 insertIfNotExists 个性联合自增字段实现高效的 uid 映射。
- Flink 把关联的后果数据依照工夫窗口进行解决,依据查问维度应用 RoaringBitmap 进行聚合,并将查问维度以及聚合的 uid 寄存在聚合后果表,其中聚合出的 uid 后果放入 Hologres 的 RoaringBitmap 类型的字段中。
- 查问时,与离线形式类似,间接依照查问条件查问聚合后果表,并对其中要害的 RoaringBitmap 字段做 or 运算后并统计基数,即可得出对应用户数。
- 解决流程如下图所示
1. 创立相干根底表
1)创立表 uid_mapping 为 uid 映射表,用于映射 uid 到 32 位 int 类型。
• RoaringBitmap 类型要求用户 ID 必须是 32 位 int 类型且越浓密越好(即用户 ID 最好间断)。常见的业务零碎或者埋点中的用户 ID 很多是字符串类型或 Long 类型,因而须要应用 uid_mapping 类型构建一张映射表。映射表利用 Hologres 的 SERIAL 类型(自增的 32 位 int)来实现用户映射的主动治理和稳固映射。
• 因为是实时数据, 设置该表为行存表,以进步 Flink 维表实时 JOIN 的 QPS。
BEGIN; | |
CREATE TABLE public.uid_mapping ( | |
uid text NOT NULL, | |
uid_int32 serial, | |
PRIMARY KEY (uid) | |
); | |
-- 将 uid 设为 clustering_key 和 distribution_key 便于疾速查找其对应的 int32 值 | |
CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid'); | |
CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid'); | |
CALL set_table_property('public.uid_mapping', 'orientation', 'row'); | |
2)创立表 dws_app 为根底聚合表,用于寄存在根底维度上聚合后的后果。
• 应用 RoaringBitmap 前须要创立 RoaringBitmap extention,同时也须要 Hologres 实例为 0.10 版本
• 为了更好性能,倡议依据根底聚合表数据量正当的设置 Shard 数,但倡议根底聚合表的 Shard 数设置不超过计算资源的 Core 数。举荐应用以下形式通过 Table Group 来设置 Shard 数
-- 新建 shard 数为 16 的 Table Group,-- 因为测试数据量百万级,其中后端计算资源为 100core,设置 shard 数为 16 | |
BEGIN; | |
CREATE TABLE tg16 (a int); --Table Group 哨兵表 | |
call set_table_property('tg16', 'shard_count', '16'); | |
• 相比离线后果表,此后果表减少了工夫戳字段,用于实现以 Flink 窗口周期为单位的统计。后果表 DDL 如下:
BEGIN; | |
create table dws_app( | |
country text, | |
prov text, | |
city text, | |
ymd text NOT NULL, -- 日期字段 | |
timetz TIMESTAMPTZ, -- 统计工夫戳,能够实现以 Flink 窗口周期为单位的统计 | |
uid32_bitmap roaringbitmap, -- 应用 roaringbitmap 记录 uv | |
primary key(country, prov, city, ymd, timetz)-- 查问维度和工夫作为主键,避免反复插入数据 | |
); | |
CALL set_table_property('public.dws_app', 'orientation', 'column'); | |
-- 日期字段设为 clustering_key 和 event_time_column,便于过滤 | |
CALL set_table_property('public.dws_app', 'clustering_key', 'ymd'); | |
CALL set_table_property('public.dws_app', 'event_time_column', 'ymd'); | |
-- 等价于将表放在 shard 数为 16 的 table group | |
call set_table_property('public.dws_app', 'colocate_with', 'tg16'); | |
--group by 字段设为 distribution_key | |
CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city'); | |
2.Flink 实时读取数据并更新 dws_app 根底聚合表
残缺示例源码请见 alibabacloud-hologres-connectors examples
1)Flink 流式读取数据源(DataStream),并转化为源表(Table)
// 此处应用 csv 文件作为数据源,也能够是 kafka 等 | |
DataStreamSource odsStream = env.createInput(csvInput, typeInfo); | |
// 与维表 join 须要增加 proctime 字段,详见 https://help.aliyun.com/document_detail/62506.html | |
Table odsTable = | |
tableEnv.fromDataStream( | |
odsStream, | |
$("uid"), | |
$("country"), | |
$("prov"), | |
$("city"), | |
$("ymd"), | |
$("proctime").proctime()); | |
// 注册到 catalog 环境 | |
tableEnv.createTemporaryView("odsTable", odsTable); |
2)将源表与 Hologres 维表(uid_mapping)进行关联
其中维表应用 insertIfNotExists 参数,即查问不到数据时自行插入,uid_int32 字段便能够利用 Hologres 的 serial 类型自增创立。
// 创立 Hologres 维表,其中 nsertIfNotExists 示意查问不到则自行插入 | |
String createUidMappingTable = | |
String.format( | |
"create table uid_mapping_dim(" | |
+ "uid string," | |
+ "uid_int32 INT" | |
+ ") with (" | |
+ "'connector'='hologres',"+" 'dbname' = '%s'," //Hologres DB 名 | |
+ "'tablename' = '%s',"//Hologres 表名 | |
+ "'username' = '%s'," // 以后账号 access id | |
+ "'password' = '%s'," // 以后账号 access key | |
+ "'endpoint' = '%s'," //Hologres endpoint | |
+ "'insertifnotexists'='true'" | |
+ ")", | |
database, dimTableName, username, password, endpoint); | |
tableEnv.executeSql(createUidMappingTable); | |
// 源表与维表 join | |
String odsJoinDim = | |
"SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32" | |
+ "FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim" | |
+ "ON ods.uid = dim.uid"; | |
Table joinRes = tableEnv.sqlQuery(odsJoinDim); |
3)将关联后果转化为 DataStream,通过 Flink 工夫窗口解决,联合 RoaringBitmap 进行聚合
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource = | |
source | |
// 筛选须要统计的维度(country, prov, city, ymd).keyBy(0, 1, 2, 3) | |
// 滚动工夫窗口;此处因为应用读取 csv 模仿输出流,采纳 ProcessingTime,理论应用中可应用 EventTime | |
.window(TumblingProcessingTimeWindows.of(Time.minutes(5))) | |
// 触发器,能够在窗口未完结时获取聚合后果 | |
.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1))) | |
.aggregate( | |
// 聚合函数,依据 key By 筛选的维度,进行聚合 | |
new AggregateFunction< | |
Tuple5<String, String, String, String, Integer>, | |
RoaringBitmap, | |
RoaringBitmap>() { | |
@Override | |
public RoaringBitmap createAccumulator() {return new RoaringBitmap(); | |
} | |
@Override | |
public RoaringBitmap add( | |
Tuple5<String, String, String, String, Integer> in, | |
RoaringBitmap acc) { | |
// 将 32 位的 uid 增加到 RoaringBitmap 进行去重 | |
acc.add(in.f4); | |
return acc; | |
} | |
@Override | |
public RoaringBitmap getResult(RoaringBitmap acc) {return acc;} | |
@Override | |
public RoaringBitmap merge(RoaringBitmap acc1, RoaringBitmap acc2) {return RoaringBitmap.or(acc1, acc2); | |
} | |
}, | |
// 窗口函数,输入聚合后果 | |
new WindowFunction< | |
RoaringBitmap, | |
Tuple6<String, String, String, String, Timestamp, byte[]>, | |
Tuple, | |
TimeWindow>() { | |
@Override | |
public void apply( | |
Tuple keys, | |
TimeWindow timeWindow, | |
Iterable<RoaringBitmap> iterable, | |
Collector< | |
Tuple6<String, String, String, String, Timestamp, byte[]>> out) | |
throws Exception {RoaringBitmap result = iterable.iterator().next(); | |
// 优化 RoaringBitmap | |
result.runOptimize(); | |
// 将 RoaringBitmap 转化为字节数组以存入 Holo 中 | |
byte[] byteArray = new byte[result.serializedSizeInBytes()]; | |
result.serialize(ByteBuffer.wrap(byteArray)); | |
// 其中 Tuple6.f4(Timestamp) 字段示意以窗口长度为周期进行统计,以秒为单位 | |
out.collect( | |
new Tuple6<>(keys.getField(0), | |
keys.getField(1), | |
keys.getField(2), | |
keys.getField(3), | |
new Timestamp(timeWindow.getEnd() / 1000 * 1000), | |
byteArray)); | |
} | |
}); |
须要留神的是,Hologres 中 RoaringBitmap 类型在 Flink 中对应 Byte 数组类型 | |
// 计算结果转换为表 | |
Table resTable = | |
tableEnv.fromDataStream( | |
processedSource, | |
$("country"), | |
$("prov"), | |
$("city"), | |
$("ymd"), | |
$("timest"), | |
$("uid32_bitmap")); | |
// 创立 Hologres 后果表, 其中 Hologres 的 RoaringBitmap 类型通过 Byte 数组存入 | |
String createHologresTable = | |
String.format( | |
"create table sink(" | |
+ "country string," | |
+ "prov string," | |
+ "city string," | |
+ "ymd string," | |
+ "timetz timestamp," | |
+ "uid32_bitmap BYTES" | |
+ ") with (" | |
+ "'connector'='hologres',"+" 'dbname' = '%s',"+" 'tablename' = '%s',"+" 'username' = '%s',"+" 'password' = '%s',"+" 'endpoint' = '%s',"+" 'connectionSize' = '%s',"+" 'mutatetype' = 'insertOrReplace'" | |
+ ")", | |
database, dwsTableName, username, password, endpoint, connectionSize); | |
tableEnv.executeSql(createHologresTable); | |
// 写入计算结果到 dws 表 | |
tableEnv.executeSql("insert into sink select * from" + resTable); |
3. 数据查问
查问时,从根底聚合表 (dws_app) 中依照查问维度做聚合计算,查问 bitmap 基数,得出 group by 条件下的用户数
• 查问某天内各个城市的 uv
-- 运行上面 RB_AGG 运算查问,可执行参数先敞开三阶段聚合开关(默认敞开), 性能更好 | |
set hg_experimental_enable_force_three_stage_agg=off | |
SELECT country | |
,prov | |
,city | |
,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv | |
FROM dws_app | |
WHERE ymd = '20210329' | |
GROUP BY country | |
,prov | |
,city | |
; |
• 查问某段时间内各个省份的 uv
-- 运行上面 RB_AGG 运算查问,可执行参数先敞开三阶段聚合开关(默认敞开), 性能更好 | |
set hg_experimental_enable_force_three_stage_agg=off | |
SELECT country | |
,prov | |
,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv | |
FROM dws_app | |
WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08' | |
GROUP BY country | |
 |