更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群
背景
火山引擎增长剖析DataFinder基于ClickHouse来进行行为日志的剖析,ClickHouse的次要版本是基于社区版改良开发的字节外部版本。次要的表构造:
事件表:存储用户行为数据,以用户ID分shard存储。
--列出了次要的字段信息CREATE TABLE tob_apps_all( `tea_app_id` UInt32, --利用ID `device_id` String DEFAULT '', --设施ID `time` UInt64,--事件日志承受工夫 `event` String,--事件名称 `user_unique_id` String,--用户ID `event_date` Date,--事件日志日期,由time转换而来 `hash_uid` UInt64 --用户ID hash过后的id,用来join升高内存耗费)│
用户表:存储用户的属性数据,以用户ID分shard存储。
--列出了次要的字段信息CREATE TABLE users_unique_all( `tea_app_id` UInt32, --利用ID `user_unique_id` String DEFAULT '', -- 用户ID `device_id` String DEFAULT '', -- 用户最近的设施ID `hash_uid` UInt64,--用户ID hash过后的id,用来join升高内存耗费 `update_time` UInt64,--最近一次更新工夫 `last_active_date` Date --用户最初沉闷日期)
设施表:存储设备相干的数据,以设施ID分shard存储。
--列出了次要的字段信息CREATE TABLE devices_all( `tea_app_id` UInt32, --利用ID `device_id` String DEFAULT '', --设施ID `update_time` UInt64, --最近一次更新工夫 `last_active_date` Date --用户最初沉闷日期)
业务对象表:存储业务对象相干的数据,每个shard存储全量的数据
--列出了次要的字段信息CREATE TABLE rangers.items_all( `tea_app_id` UInt32, `hash_item_id` Int64, `item_name` String, --业务对象名称。比方商品 `item_id` String, --业务对象ID。比方商品id 1000001 `last_active_date` Date)
业务挑战
随着接入利用以及利用的DAU日益减少,ClickHouse表的事件量增长迅速;并且基于行为数据须要剖析的业务指标越来越简单,须要JOIN的表增多;咱们遇到有一些波及到JOIN的简单SQL执行效率低,内存和CPU资源占用高,导致剖析接口响应时延和错误率减少。
对于Clickhouse的JOIN
在介绍优化之前,先介绍一下根本的ClickHouse JOIN的类型和实现形式
1. 分布式JOIN
SELECT et.os_name, ut.device_id AS user_device_idFROM tob_apps_all AS et ANY LEFT JOIN ( SELECT device_id, hash_uid FROM users_unique_all WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')
根本执行过程:
- 一个Clickhouse节点作为Coordinator节点,给每个节点散发子查问,子查问sql(tob_apps_all替换老本地表,users_unique_all放弃不变仍然是分布式表)
每个节点执行Coordinator散发的sql时,发现users_unique_all是分布式表,就会去所有节点下来查问以下SQL(一共有N*N。N为shard数量)
- SELECT device_id, hash_uid FROM users_unique WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
- 每个节点从其余N-1个节点拉取2中子查问的全副数据,全量存储(内存or文件),进行本地JOIN
- Coordinator节点从每个节点拉取3中的后果集,而后做解决返回给client
存在的问题: - 子查问数量放大
- 每个节点都全量存储全量的数据
2. 分布式Global JOIN
SELECT et.os_name, ut.device_id AS user_device_idFROM tob_apps_all AS et GLOBAL ANY LEFT JOIN ( SELECT device_id, hash_uid FROM users_unique_all WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')
根本执行过程:
- 一个Clickhouse节点作为Coordinator节点,散发查问。在每个节点上执行sql(tob_apps_all替换老本地表,右表子查问替换成别名ut)
- Coordinator节点去其余节点拉取users_unique_all的全副数据,而后散发到全副节点(作为1中别名表ut的数据)
- 每个节点都会存储全量的2中散发的数据(内存or文件),进行本地local join
- Coordinator节点从每个节点拉取3中的后果集,而后做解决返回给client
存在的问题: - 每个节点都全量存储数据
- 如果右表较大,散发的数据较大,会占用网络带宽资源
3. 本地JOIN
SQL外面只有本地表的JOIN,只会在以后节点执行
SELECT et.os_name,ut.device_id AS user_device_idFROM tob_apps et any LEFT JOIN (SELECT device_id, hash_uid FROM rangers.users_unique WHERE tea_app_id = 268411 AND last_active_date>='2022-08-06') ut ON et.hash_uid=ut.hash_uidWHERE tea_app_id = 268411 AND event='app_launch' AND event_date='2022-08-06'
3.1 Hash join
- 右表全副数据加载到内存,再在内存构建hash table。key为joinkey
- 从左表分批读取数据,从右表hash table匹配数据
- 长处是:速度快 毛病是:右表数据量大的状况下占用内存
3.2 Merge join - 对右表排序,外部 block 切分,超出内存局部 flush 到磁盘上,内存大小通过参数设定
- 左表基于 block 排序,依照每个 block 顺次与右表 merge
- 长处是:能无效管制内存 毛病是:大数据状况下速度会慢
优先应用hash join当内存达到肯定阈值后再应用merge join,优先满足性能要求
解决方案
1. 防止JOIN
1.1 数据预生成
数据预生成(由Spark/Flink或者Clickhouse物化视图产出数据),造成大宽表,基于单表的查问是ClickHouse最为善于的场景
咱们有个指标,实现的SQL比较复杂(如下),每次实时查问很耗时,咱们独自建了一个表table,由Spark每日构建出这个指标,查问时间接基于table查问
SELECT event_date,count(distinct uc1) AS uv,sum(value) AS sum_value, ......FROM (SELECT event_date,hash_uid AS uc1,sum(et.float_params{'amount'}) AS value, count(1) AS cnt, value*cnt AS multiple FROM tob_apps_all et GLOBAL ANY LEFT JOIN (SELECT hash_uid AS join_key,int_profiles{'$ab_time_34'}*1000 AS first_time FROM users_unique_all WHERE app_id = 10000000 AND last_active_date >= '2022-07-19' AND first_time is NOT null) upt ON et.hash_uid=upt.join_key WHERE (查问条件) GROUP BY uc1,event_date)GROUP BY event_date;
数据量2300W,查问工夫由7秒->0.008秒。当然这种形式,须要保护额定的数据构建工作。总的思路就是不要让ClickHouse实时去JOIN
1.2 应用IN代替JOIN
JOIN须要基于内存构建hash table且须要存储右表全副的数据,而后再去匹配左表的数据。而IN查问会对右表的全副数据构建hash set,然而不须要匹配左表的数据,且不须要回写数据到block
比方
SELECT event_date, count()FROM tob_apps_all et global any INNER JOIN (SELECT hash_uid AS join_key FROM users_unique_all WHERE app_id = 10000000 AND last_active_date >= '2022-01-01') upt ON et.hash_uid = upt.join_keyWHERE app_id = 10000000 AND event_date >= '2022-01-01' AND event_date <= '2022-08-02'GROUP BY event_date
能够改成如下模式:
SELECT event_date, count()FROM tob_apps_allWHERE app_id = 10000000 AND event_date >= '2022-01-01' AND event_date <= '2022-08-02' AND hash_uid global IN (SELECT hash_uid FROM users_unique_all WHERE (tea_app_id = 10000000) AND (last_active_date >= '2022-01-01') ) GROUP BY event_date
如果须要从右表提取出属性到外层进行计算,则不能应用IN来代替JOIN雷同的条件下,下面的测试SQL,由JOIN时的16秒优化到了IN查问时的11秒
2. 更快的JOIN
2.1 优先本地JOIN
2.1.1 数据事后雷同规定分区
也就是Colocate JOIN。优先将须要关联的表依照雷同的规定进行散布,查问时就不须要分布式的JOIN
SELECT et.os_name, ut.device_id AS user_device_idFROM tob_apps_all AS et ANY LEFT JOIN ( SELECT device_id, hash_uid FROM users_unique_all WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')settings distributed_perfect_shard=1
比方事件表tob_apps_all和用户表users_unique_all都是依照用户ID来分shard存储的,雷同的用户的两个表的数据都在同一个shard上,因而这两个表的JOIN就不须要分布式JOIN了
distributed_perfect_shard这个settings key是字节外部ClickHouse反对的,设置过这个参数,指定执行打算时就不会再执行分布式JOIN了
根本执行过程:
- 一个ClickHouse节点作为Coordinator节点,散发查问。在每个节点上执行sql(tob_apps_all、users_unique_all替换老本地表)
- 每个节点都执行1中散发的本地表join的SQL(这一步不再散发右表全量的数据)
- 数据再回传到coordinator节点,而后返回给client
2.1.2 数据冗余存储
如果一个表的数据量比拟小,能够不分shard存储,每个shard都存储全量的数据,例如咱们的业务对象表。查问时,不须要分布式JOIN,间接在本地进行JOIN即可
SELECT count()FROM tob_apps_all AS et ANY LEFT JOIN ( SELECT item_id FROM items_all WHERE (tea_app_id = 268411)) AS it ON et.item_id = it.item_idWHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')settings distributed_perfect_shard=1
例如这个SQL,items_all表每个shard都存储同样的数据,这样也能够防止分布式JOIN带来的查问放大和全表数据散发问题
2.2 更少的数据
不论是分布式JOIN还是本地JOIN,都须要尽量让少的数据参加JOIN,既能晋升查问速度也能缩小资源耗费
2.2.1 SQL下推
ClickHouse对SQL的下推做的不太好,有些简单的SQL下推会生效。因而,咱们手动对SQL做了下推,目前正在测试基于查问优化器来帮忙实现下推优化,以便让SQL更加简洁
下推的SQL:
SELECT et.os_name, ut.device_id AS user_device_idFROM tob_apps_all AS et ANY LEFT JOIN ( SELECT device_id, hash_uid FROM users_unique_all WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06' AND 用户属性条件1 OR 用户属性条件2)) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')settings distributed_perfect_shard=1
对应的不下推的SQL:
SELECT et.os_name, ut.device_id AS user_device_idFROM tob_apps_all AS et ANY LEFT JOIN ( SELECT device_id, hash_uid FROM rangers.users_unique_all WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')AND (ut.用户属性条件1 OR ut.用户属性条件2)settings distributed_perfect_shard=1
能够看到,不下推的SQL更加简洁,间接基于JOIN过后的宽表进行过滤。然而ClickHouse可能会将不满足条件的users_unique_all数据也进行JOIN咱们应用中有一个简单的case,用户表过滤条件不下推有1千万+,SQL执行了3000秒仍然执行超时,而做了下推之后60秒内就执行胜利了
2.3Clickhouse引擎层优化
一个SQL理论在Clickhouse如何执行,对SQL的执行工夫和资源耗费至关重要。社区版的Clickhouse在执行模型和SQL优化器上还要改良的空间,尤其是简单SQL以及多JOIN的场景下
执行模型优化社区版的Clickhouse
目前还是一个两阶段执行的执行模型。第一阶段,Coordinator在收到查问后,将申请发送给对应的Worker节点。第二阶段,Worker节点实现计算,Coordinator在收到各Worker节点的数据后进行汇聚和解决,并将解决后的后果返回。
有以下几个问题:
- 第二阶段的计算比较复杂时,Coordinator的节点计算压力大,容易成为瓶颈
- 不反对shuffle join,hash join时右表为大表时构建慢,容易OOM
- 对简单查问的反对不敌对
字节跳动ClickHouse团队为了解决上述问题,改良了执行模型,参考其余的分布式数据库引擎(例如Presto等),将一个简单的Query按数据交换状况切分成多个 Stage,各Stage之间则通过Exchange实现数据交换。依据Stage依赖关系定义拓扑构造,产生DAG图,并依据DAG图调度Stage。例如两表Join,会先调度左右表读取Stage,之后再调度Join这个Stage,Join的Stage依赖于左右表的Stage。
举个例子
SELECT et.os_name, ut.device_id AS user_device_id, dt.hash_did AS device_hashidFROM tob_apps_all AS et GLOBAL ANY LEFT JOIN ( SELECT device_id, hash_uid FROM users_unique_all WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')) AS ut ON et.hash_uid = ut.hash_uidGLOBAL ANY LEFT JOIN ( SELECT device_id, hash_did FROM devices_all WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')) AS dt ON et.device_id = dt.device_idWHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')LIMIT 10
Stage执行模型根本过程(可能的):
- 读取tob_apps_all数据,依照join key(hash_uid)进行shuffle,数据散发到每个节点。这是一个Stage
- 读取users_unique_all数据,依照join key(hash_uid)进行shuffle,数据散发到每个节点。这是一个Stage
- 上述两个表的数据,在每个节点上的数据进行本地join,而后再依照join key(device_id)进行shuffle。这是一个Stage
- 读取devices_all数据,依照join key(device_id)进行shuffle,这是一个Stage
- 第3步、第4步的数据,雷同join key(device_id)的数据都在同一个节点上,而后进行本地JOIN,这是一个Stage
- 汇总数据,返回limit 10的数据。这是一个Stage
统计成果如下:
查问优化器
有了下面的stage的执行模型,能够灵便调整SQL的执行程序,字节跳动Clickhouse团队自研了查问优化器,依据优化规定(基于规定和代价预估)对SQL的执行打算进行转换,一个执行打算通过优化规定后会变成另外一个执行打算,可能精确的抉择出一条效率最高的执行门路,而后构建Stage的DAG图,大幅度降低查问工夫
下图形容了整个查问的执行流程,从 SQL parse 到执行期间所有内容全副进行了从新实现(其中紫色模块),构建了一套残缺的且标准的查问优化器。
还是下面的三表JOIN的例子,可能的一个执行过程是: - 查问优化器发现users_unique_all表与tob_apps_all表的分shard规定一样(基于用户ID),所以就不会先对表按 join key 进行 shuffle,users_unique与tob_apps间接基于本地表JOIN,而后再依照join key(device_id)进行shuffle。这是一个Stage
- 查问优化器依据规定或者代价预估决定设施表devices_all是须要broadcast join还是shuffle join 如果broadcast join:在一个节点查到全副的device数据,而后散发到其余节点。这是一个Stage 如果shuffle join:在每个节点对device数据依照join key(device_id)进行shuffle。这是一个Stage
- 汇总数据,返回limit 10的数据。这是一个Stage
成果:能够看到,查问优化器能优化典型的简单的SQL的执行效率,缩短执行工夫
总结
ClickHouse最为善于的畛域是一个大宽表来进行查问,多表JOIN时Clickhouse性能体现不佳。作为业内当先的用户剖析与经营平台,火山引擎增长剖析DataFinder基于海量数据做到了简单指标可能秒级查问。本文介绍了咱们是如何优化Clickhouse JOIN查问的。
次要有以下几个方面:
- 缩小参加JOIN的表以及数据量
- 优先应用本地JOIN,防止分布式JOIN带来的性能损耗
- 优化本地JOIN,优先应用内存进行JOIN
- 优化分布式JOIN的执行逻辑,依靠于字节跳动对ClickHouse的深度定制化
立刻跳转火山引擎DataFinder官网理解详情