简介:Flink+Hologres亿级用户实时UV准确去重最佳实际UV、PV计算,因为业务需要不同,通常会分为两种场景:
离线计算场景:以T+1为主,计算历史数据实时计算场景:实时计算日常新增的数据,对用户标签去重针对离线计算场景,Hologres基于RoaringBitmap,提供超高基数的UV计算,只需进行一次最细粒度的预聚合计算,也只生成一份最细粒度的预聚合后果表,就能达到亚秒级查问。具体详情能够参见往期文章>>Hologres如何反对超高基数UV计算(基于RoaringBitmap实现)
对于实时计算场景,能够应用Flink+Hologres形式,并基于RoaringBitmap,实时对用户标签去重。这样的形式,能够较细粒度的实时失去用户UV、PV数据,同时便于依据需要调整最小统计窗口(如最近5分钟的UV),实现相似实时监控的成果,更好的在大屏等BI展现。相较于以天、周、月等为单位的去重,更适宜在流动日期进行更细粒度的统计,并且通过简略的聚合,也能够失去较大工夫单位的统计后果。
主体思维Flink将流式数据转化为表与维表进行JOIN操作,再转化为流式数据。此举能够利用Hologres维表的_insertIfNotExists_个性联合自增字段实现高效的uid映射。Flink把关联的后果数据依照工夫窗口进行解决,依据查问维度应用RoaringBitmap进行聚合,并将查问维度以及聚合的uid寄存在聚合后果表,其中聚合出的uid后果放入Hologres的RoaringBitmap类型的字段中。查问时,与离线形式类似,间接依照查问条件查问聚合后果表,并对其中要害的RoaringBitmap字段做or运算后并统计基数,即可得出对应用户数。解决流程如下图所示
计划最佳实际1.创立相干根底表1)创立表uid\_mapping为uid映射表,用于映射uid到32位int类型。
RoaringBitmap类型要求用户ID必须是32位int类型且越浓密越好(即用户ID最好间断)。常见的业务零碎或者埋点中的用户ID很多是字符串类型或Long类型,因而须要应用uid\_mapping类型构建一张映射表。映射表利用Hologres的SERIAL类型(自增的32位int)来实现用户映射的主动治理和稳固映射。因为是实时数据, 设置该表为行存表,以进步Flink维表实时JOIN的QPS。BEGIN;CREATE TABLE public.uid_mapping (uid text NOT NULL,uid_int32 serial,PRIMARY KEY (uid));--将uid设为clustering_key和distribution_key便于疾速查找其对应的int32值CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid');CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid');CALL set_table_property('public.uid_mapping', 'orientation', 'row');COMMIT;2)创立表dws\_app为根底聚合表,用于寄存在根底维度上聚合后的后果。
应用RoaringBitmap前须要创立RoaringBitmap extention,同时也须要Hologres实例为0.10版本CREATE EXTENSION IF NOT EXISTS roaringbitmap;为了更好性能,倡议依据根底聚合表数据量正当的设置Shard数,但倡议根底聚合表的Shard数设置不超过计算资源的Core数。举荐应用以下形式通过Table Group来设置Shard数--新建shard数为16的Table Group,--因为测试数据量百万级,其中后端计算资源为100core,设置shard数为16BEGIN;CREATE TABLE tg16 (a int); --Table Group哨兵表call set_table_property('tg16', 'shard_count', '16'); COMMIT;相比离线后果表,此后果表减少了工夫戳字段,用于实现以Flink窗口周期为单位的统计。后果表DDL如下:BEGIN;create table dws_app( country text, prov text, city text, ymd text NOT NULL, --日期字段 timetz TIMESTAMPTZ, --统计工夫戳,能够实现以Flink窗口周期为单位的统计 uid32_bitmap roaringbitmap, -- 应用roaringbitmap记录uv primary key(country, prov, city, ymd, timetz)--查问维度和工夫作为主键,避免反复插入数据);CALL set_table_property('public.dws_app', 'orientation', 'column');--日期字段设为clustering_key和event_time_column,便于过滤CALL set_table_property('public.dws_app', 'clustering_key', 'ymd');CALL set_table_property('public.dws_app', 'event_time_column', 'ymd');--等价于将表放在shard数为16的table groupcall set_table_property('public.dws_app', 'colocate_with', 'tg16');--group by字段设为distribution_keyCALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city');COMMIT;2.Flink实时读取数据并更新dws\_app根底聚合表残缺示例源码请见alibabacloud-hologres-connectors examples
...