简介:Flink+Hologres亿级用户实时UV准确去重最佳实际
UV、PV计算,因为业务需要不同,通常会分为两种场景:
• 离线计算场景:以T+1为主,计算历史数据
• 实时计算场景:实时计算日常新增的数据,对用户标签去重
针对离线计算场景,Hologres基于RoaringBitmap,提供超高基数的UV计算,只需进行一次最细粒度的预聚合计算,也只生成一份最细粒度的预聚合后果表,就能达到亚秒级查问。具体详情能够参见往期文章>>Hologres如何反对超高基数UV计算(基于RoaringBitmap实现)
对于实时计算场景,能够应用Flink+Hologres形式,并基于RoaringBitmap,实时对用户标签去重。这样的形式,能够较细粒度的实时失去用户UV、PV数据,同时便于依据需要调整最小统计窗口(如最近5分钟的UV),实现相似实时监控的成果,更好的在大屏等BI展现。相较于以天、周、月等为单位的去重,更适宜在流动日期进行更细粒度的统计,并且通过简略的聚合,也能够失去较大工夫单位的统计后果。
主体思维
- Flink将流式数据转化为表与维表进行JOIN操作,再转化为流式数据。此举能够利用Hologres维表的insertIfNotExists个性联合自增字段实现高效的uid映射。
- Flink把关联的后果数据依照工夫窗口进行解决,依据查问维度应用RoaringBitmap进行聚合,并将查问维度以及聚合的uid寄存在聚合后果表,其中聚合出的uid后果放入Hologres的RoaringBitmap类型的字段中。
- 查问时,与离线形式类似,间接依照查问条件查问聚合后果表,并对其中要害的RoaringBitmap字段做or运算后并统计基数,即可得出对应用户数。
- 解决流程如下图所示
计划最佳实际
1.创立相干根底表
1)创立表uid_mapping为uid映射表,用于映射uid到32位int类型。
• RoaringBitmap类型要求用户ID必须是32位int类型且越浓密越好(即用户ID最好间断)。常见的业务零碎或者埋点中的用户ID很多是字符串类型或Long类型,因而须要应用uid_mapping类型构建一张映射表。映射表利用Hologres的SERIAL类型(自增的32位int)来实现用户映射的主动治理和稳固映射。
• 因为是实时数据, 设置该表为行存表,以进步Flink维表实时JOIN的QPS。
BEGIN;
CREATE TABLE public.uid_mapping (
uid text NOT NULL,
uid_int32 serial,
PRIMARY KEY (uid)
);
--将uid设为clustering_key和distribution_key便于疾速查找其对应的int32值
CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid');
CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid');
CALL set_table_property('public.uid_mapping', 'orientation', 'row');
COMMIT;
2)创立表dws_app为根底聚合表,用于寄存在根底维度上聚合后的后果。
• 应用RoaringBitmap前须要创立RoaringBitmap extention,同时也须要Hologres实例为0.10版本
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
• 为了更好性能,倡议依据根底聚合表数据量正当的设置Shard数,但倡议根底聚合表的Shard数设置不超过计算资源的Core数。举荐应用以下形式通过Table Group来设置Shard数
--新建shard数为16的Table Group,
--因为测试数据量百万级,其中后端计算资源为100core,设置shard数为16
BEGIN;
CREATE TABLE tg16 (a int); --Table Group哨兵表
call set_table_property('tg16', 'shard_count', '16');
COMMIT;
• 相比离线后果表,此后果表减少了工夫戳字段,用于实现以Flink窗口周期为单位的统计。后果表DDL如下:
BEGIN;
create table dws_app(
country text,
prov text,
city text,
ymd text NOT NULL, --日期字段
timetz TIMESTAMPTZ, --统计工夫戳,能够实现以Flink窗口周期为单位的统计
uid32_bitmap roaringbitmap, -- 应用roaringbitmap记录uv
primary key(country, prov, city, ymd, timetz)--查问维度和工夫作为主键,避免反复插入数据
);
CALL set_table_property('public.dws_app', 'orientation', 'column');
--日期字段设为clustering_key和event_time_column,便于过滤
CALL set_table_property('public.dws_app', 'clustering_key', 'ymd');
CALL set_table_property('public.dws_app', 'event_time_column', 'ymd');
--等价于将表放在shard数为16的table group
call set_table_property('public.dws_app', 'colocate_with', 'tg16');
--group by字段设为distribution_key
CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city');
COMMIT;
2.Flink实时读取数据并更新dws_app根底聚合表
残缺示例源码请见alibabacloud-hologres-connectors examples
1)Flink 流式读取数据源(DataStream),并转化为源表(Table)
//此处应用csv文件作为数据源,也能够是kafka等
DataStreamSource odsStream = env.createInput(csvInput, typeInfo);
// 与维表join须要增加proctime字段,详见https://help.aliyun.com/document_detail/62506.html
Table odsTable =
tableEnv.fromDataStream(
odsStream,
$("uid"),
$("country"),
$("prov"),
$("city"),
$("ymd"),
$("proctime").proctime());
// 注册到catalog环境
tableEnv.createTemporaryView("odsTable", odsTable);
2)将源表与Hologres维表(uid_mapping)进行关联
其中维表应用insertIfNotExists参数,即查问不到数据时自行插入,uid_int32字段便能够利用Hologres的serial类型自增创立。
// 创立Hologres维表,其中nsertIfNotExists示意查问不到则自行插入
String createUidMappingTable =
String.format(
"create table uid_mapping_dim("
+ " uid string,"
+ " uid_int32 INT"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s'," //Hologres DB名
+ " 'tablename' = '%s',"//Hologres 表名
+ " 'username' = '%s'," //以后账号access id
+ " 'password' = '%s'," //以后账号access key
+ " 'endpoint' = '%s'," //Hologres endpoint
+ " 'insertifnotexists'='true'"
+ ")",
database, dimTableName, username, password, endpoint);
tableEnv.executeSql(createUidMappingTable);
// 源表与维表join
String odsJoinDim =
"SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"
+ " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"
+ " ON ods.uid = dim.uid";
Table joinRes = tableEnv.sqlQuery(odsJoinDim);
3)将关联后果转化为DataStream,通过Flink工夫窗口解决,联合RoaringBitmap进行聚合
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource =
source
// 筛选须要统计的维度(country, prov, city, ymd)
.keyBy(0, 1, 2, 3)
// 滚动工夫窗口;此处因为应用读取csv模仿输出流,采纳ProcessingTime,理论应用中可应用EventTime
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
// 触发器,能够在窗口未完结时获取聚合后果
.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1)))
.aggregate(
// 聚合函数,依据key By筛选的维度,进行聚合
new AggregateFunction<
Tuple5<String, String, String, String, Integer>,
RoaringBitmap,
RoaringBitmap>() {
@Override
public RoaringBitmap createAccumulator() {
return new RoaringBitmap();
}
@Override
public RoaringBitmap add(
Tuple5<String, String, String, String, Integer> in,
RoaringBitmap acc) {
// 将32位的uid增加到RoaringBitmap进行去重
acc.add(in.f4);
return acc;
}
@Override
public RoaringBitmap getResult(RoaringBitmap acc) {
return acc;
}
@Override
public RoaringBitmap merge(
RoaringBitmap acc1, RoaringBitmap acc2) {
return RoaringBitmap.or(acc1, acc2);
}
},
//窗口函数,输入聚合后果
new WindowFunction<
RoaringBitmap,
Tuple6<String, String, String, String, Timestamp, byte[]>,
Tuple,
TimeWindow>() {
@Override
public void apply(
Tuple keys,
TimeWindow timeWindow,
Iterable<RoaringBitmap> iterable,
Collector<
Tuple6<String, String, String, String, Timestamp, byte[]>> out)
throws Exception {
RoaringBitmap result = iterable.iterator().next();
// 优化RoaringBitmap
result.runOptimize();
// 将RoaringBitmap转化为字节数组以存入Holo中
byte[] byteArray = new byte[result.serializedSizeInBytes()];
result.serialize(ByteBuffer.wrap(byteArray));
// 其中 Tuple6.f4(Timestamp) 字段示意以窗口长度为周期进行统计,以秒为单位
out.collect(
new Tuple6<>(
keys.getField(0),
keys.getField(1),
keys.getField(2),
keys.getField(3),
new Timestamp(
timeWindow.getEnd() / 1000 * 1000),
byteArray));
}
});
4)写入后果表
须要留神的是,Hologres中RoaringBitmap类型在Flink中对应Byte数组类型
// 计算结果转换为表
Table resTable =
tableEnv.fromDataStream(
processedSource,
$("country"),
$("prov"),
$("city"),
$("ymd"),
$("timest"),
$("uid32_bitmap"));
// 创立Hologres后果表, 其中Hologres的RoaringBitmap类型通过Byte数组存入
String createHologresTable =
String.format(
"create table sink("
+ " country string,"
+ " prov string,"
+ " city string,"
+ " ymd string,"
+ " timetz timestamp,"
+ " uid32_bitmap BYTES"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s',"
+ " 'connectionSize' = '%s',"
+ " 'mutatetype' = 'insertOrReplace'"
+ ")",
database, dwsTableName, username, password, endpoint, connectionSize);
tableEnv.executeSql(createHologresTable);
// 写入计算结果到dws表
tableEnv.executeSql("insert into sink select * from " + resTable);
3.数据查问
查问时,从根底聚合表(dws_app)中依照查问维度做聚合计算,查问bitmap基数,得出group by条件下的用户数
• 查问某天内各个城市的uv
--运行上面RB_AGG运算查问,可执行参数先敞开三阶段聚合开关(默认敞开),性能更好
set hg_experimental_enable_force_three_stage_agg=off
SELECT country
,prov
,city
,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM dws_app
WHERE ymd = '20210329'
GROUP BY country
,prov
,city
;
• 查问某段时间内各个省份的uv
--运行上面RB_AGG运算查问,可执行参数先敞开三阶段聚合开关(默认敞开),性能更好
set hg_experimental_enable_force_three_stage_agg=off
SELECT country
,prov
,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM dws_app
WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08'
GROUP BY country

原文链接
本文为阿里云原创内容,未经容许不得转载。
发表回复