关于clickhouse:火山引擎在行为分析场景下的ClickHouse-JOIN优化

44次阅读

共计 9015 个字符,预计需要花费 23 分钟才能阅读完成。

更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群

背景

火山引擎增长剖析 DataFinder 基于 ClickHouse 来进行行为日志的剖析,ClickHouse 的次要版本是基于社区版改良开发的字节外部版本。次要的表构造:

事件表:存储用户行为数据,以用户 ID 分 shard 存储。

-- 列出了次要的字段信息
CREATE TABLE tob_apps_all
(
    `tea_app_id`                UInt32,  -- 利用 ID
    `device_id`             String DEFAULT '', -- 设施 ID
    `time`                  UInt64,-- 事件日志承受工夫
    `event`                 String,-- 事件名称
    `user_unique_id`        String,-- 用户 ID
    `event_date`            Date,-- 事件日志日期,由 time 转换而来
    `hash_uid`              UInt64 -- 用户 ID hash 过后的 id,用来 join 升高内存耗费
)│

用户表:存储用户的属性数据,以用户 ID 分 shard 存储。

-- 列出了次要的字段信息
CREATE TABLE users_unique_all
(
    `tea_app_id`            UInt32,            -- 利用 ID
    `user_unique_id`        String DEFAULT '', -- 用户 ID
    `device_id`             String DEFAULT '', -- 用户最近的设施 ID
    `hash_uid`              UInt64,-- 用户 ID hash 过后的 id,用来 join 升高内存耗费
    `update_time`           UInt64,-- 最近一次更新工夫
    `last_active_date`      Date   -- 用户最初沉闷日期
)

设施表:存储设备相干的数据,以设施 ID 分 shard 存储。

-- 列出了次要的字段信息
CREATE TABLE devices_all
(
    `tea_app_id`            UInt32,            -- 利用 ID
    `device_id`             String DEFAULT '', -- 设施 ID    
    `update_time`           UInt64,            -- 最近一次更新工夫
    `last_active_date`      Date               -- 用户最初沉闷日期
)

业务对象表:存储业务对象相干的数据,每个 shard 存储全量的数据

-- 列出了次要的字段信息
CREATE TABLE rangers.items_all
(
    `tea_app_id`            UInt32,
    `hash_item_id`          Int64,
    `item_name`             String, -- 业务对象名称。比方商品
    `item_id`               String, -- 业务对象 ID。比方商品 id 1000001
    `last_active_date`      Date
) 

业务挑战

随着接入利用以及利用的 DAU 日益减少,ClickHouse 表的事件量增长迅速;并且基于行为数据须要剖析的业务指标越来越简单,须要 JOIN 的表增多;咱们遇到有一些波及到 JOIN 的简单 SQL 执行效率低,内存和 CPU 资源占用高,导致剖析接口响应时延和错误率减少。

对于 Clickhouse 的 JOIN

在介绍优化之前,先介绍一下根本的 ClickHouse JOIN 的类型和实现形式

1. 分布式 JOIN

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')

根本执行过程:

  1. 一个 Clickhouse 节点作为 Coordinator 节点,给每个节点散发子查问,子查问 sql(tob_apps_all 替换老本地表,users_unique_all 放弃不变仍然是分布式表)
  2. 每个节点执行 Coordinator 散发的 sql 时,发现 users_unique_all 是分布式表,就会去所有节点下来查问以下 SQL(一共有 N *N。N 为 shard 数量)

    • SELECT device_id, hash_uid FROM users_unique WHERE (tea_app_id = 268411) AND (last_active_date >= ‘2022-08-06’)
  3. 每个节点从其余 N - 1 个节点拉取 2 中子查问的全副数据,全量存储(内存 or 文件),进行本地 JOIN
  4. Coordinator 节点从每个节点拉取 3 中的后果集,而后做解决返回给 client
    存在的问题:
  5. 子查问数量放大
  6. 每个节点都全量存储全量的数据

2. 分布式 Global JOIN

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')

根本执行过程:

  1. 一个 Clickhouse 节点作为 Coordinator 节点,散发查问。在每个节点上执行 sql(tob_apps_all 替换老本地表,右表子查问替换成别名 ut)
  2. Coordinator 节点去其余节点拉取 users_unique_all 的全副数据,而后散发到全副节点(作为 1 中别名表 ut 的数据)
  3. 每个节点都会存储全量的 2 中散发的数据(内存 or 文件),进行本地 local join
  4. Coordinator 节点从每个节点拉取 3 中的后果集,而后做解决返回给 client
    存在的问题:
  5. 每个节点都全量存储数据
  6. 如果右表较大,散发的数据较大,会占用网络带宽资源

3. 本地 JOIN

SQL 外面只有本地表的 JOIN,只会在以后节点执行

SELECT et.os_name,ut.device_id AS user_device_id
FROM tob_apps et any LEFT JOIN
    (SELECT device_id,
         hash_uid
    FROM rangers.users_unique
    WHERE tea_app_id = 268411
            AND last_active_date>='2022-08-06') ut
    ON et.hash_uid=ut.hash_uid
WHERE tea_app_id = 268411
        AND event='app_launch'
        AND event_date='2022-08-06' 

3.1 Hash join

  • 右表全副数据加载到内存,再在内存构建 hash table。key 为 joinkey
  • 从左表分批读取数据,从右表 hash table 匹配数据
  • 长处是:速度快 毛病是:右表数据量大的状况下占用内存
    3.2 Merge join
  • 对右表排序,外部 block 切分,超出内存局部 flush 到磁盘上,内存大小通过参数设定
  • 左表基于 block 排序,依照每个 block 顺次与右表 merge
  • 长处是:能无效管制内存 毛病是:大数据状况下速度会慢
    优先应用 hash join 当内存达到肯定阈值后再应用 merge join,优先满足性能要求

解决方案

1. 防止 JOIN

1.1 数据预生成

数据预生成 (由 Spark/Flink 或者 Clickhouse 物化视图产出数据),造成大宽表,基于单表的查问是 ClickHouse 最为善于的场景
咱们有个指标,实现的 SQL 比较复杂(如下),每次实时查问很耗时,咱们独自建了一个表 table,由 Spark 每日构建出这个指标,查问时间接基于 table 查问

SELECT event_date,count(distinct uc1) AS uv,sum(value) AS sum_value, ......
FROM
    (SELECT event_date,hash_uid AS uc1,sum(et.float_params{'amount'}) AS value, count(1) AS cnt, value*cnt AS multiple
    FROM tob_apps_all et GLOBAL ANY LEFT JOIN
        (SELECT hash_uid AS join_key,int_profiles{'$ab_time_34'}*1000 AS first_time
        FROM users_unique_all
        WHERE app_id = 10000000 AND last_active_date >= '2022-07-19' AND first_time is NOT null) upt
            ON et.hash_uid=upt.join_key
        WHERE (查问条件)
        GROUP BY  uc1,event_date)
GROUP BY event_date;

数据量 2300W,查问工夫由 7 秒 ->0.008 秒。当然这种形式,须要保护额定的数据构建工作。总的思路就是不要让 ClickHouse 实时去 JOIN

1.2 应用 IN 代替 JOIN

JOIN 须要基于内存构建 hash table 且须要存储右表全副的数据,而后再去匹配左表的数据。而 IN 查问会对右表的全副数据构建 hash set,然而不须要匹配左表的数据,且不须要回写数据到 block
比方

SELECT event_date, count()
FROM tob_apps_all et global any INNER JOIN
    (SELECT hash_uid AS join_key
    FROM users_unique_all
    WHERE app_id = 10000000
            AND last_active_date >= '2022-01-01') upt
    ON et.hash_uid = upt.join_key
WHERE app_id = 10000000
        AND event_date >= '2022-01-01'
        AND event_date <= '2022-08-02'
GROUP BY  event_date 

能够改成如下模式:

SELECT event_date,
         count()
FROM tob_apps_all
WHERE app_id = 10000000
        AND event_date >= '2022-01-01'
        AND event_date <= '2022-08-02'
        AND hash_uid global IN 
    (SELECT hash_uid
    FROM users_unique_all
    WHERE (tea_app_id = 10000000)
            AND (last_active_date >= '2022-01-01') )
 GROUP BY event_date

如果须要从右表提取出属性到外层进行计算,则不能应用 IN 来代替 JOIN 雷同的条件下,下面的测试 SQL,由 JOIN 时的 16 秒优化到了 IN 查问时的 11 秒

2. 更快的 JOIN

2.1 优先本地 JOIN

2.1.1 数据事后雷同规定分区
也就是 Colocate JOIN。优先将须要关联的表依照雷同的规定进行散布,查问时就不须要分布式的 JOIN

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

比方事件表 tob_apps_all 和用户表 users_unique_all 都是依照用户 ID 来分 shard 存储的,雷同的用户的两个表的数据都在同一个 shard 上,因而这两个表的 JOIN 就不须要分布式 JOIN 了
distributed_perfect_shard 这个 settings key 是字节外部 ClickHouse 反对的,设置过这个参数,指定执行打算时就不会再执行分布式 JOIN 了
根本执行过程:

  1. 一个 ClickHouse 节点作为 Coordinator 节点,散发查问。在每个节点上执行 sql(tob_apps_all、users_unique_all 替换老本地表)
  2. 每个节点都执行 1 中散发的本地表 join 的 SQL(这一步不再散发右表全量的数据)
  3. 数据再回传到 coordinator 节点,而后返回给 client

2.1.2 数据冗余存储
如果一个表的数据量比拟小,能够不分 shard 存储,每个 shard 都存储全量的数据,例如咱们的业务对象表。查问时,不须要分布式 JOIN,间接在本地进行 JOIN 即可

SELECT count()
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT item_id
    FROM items_all 
    WHERE (tea_app_id = 268411)
) AS it ON et.item_id = it.item_id
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

例如这个 SQL,items_all 表每个 shard 都存储同样的数据,这样也能够防止分布式 JOIN 带来的查问放大和全表数据散发问题

2.2 更少的数据

不论是分布式 JOIN 还是本地 JOIN,都须要尽量让少的数据参加 JOIN,既能晋升查问速度也能缩小资源耗费

2.2.1 SQL 下推

ClickHouse 对 SQL 的下推做的不太好,有些简单的 SQL 下推会生效。因而,咱们手动对 SQL 做了下推,目前正在测试基于查问优化器来帮忙实现下推优化,以便让 SQL 更加简洁
下推的 SQL:

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) 
        AND (last_active_date >= '2022-08-06'
        AND 用户属性条件 1  OR 用户属性条件 2)
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

对应的不下推的 SQL:

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM rangers.users_unique_all 
    WHERE (tea_app_id = 268411) 
        AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
AND (ut. 用户属性条件 1  OR ut. 用户属性条件 2)
settings distributed_perfect_shard=1

能够看到,不下推的 SQL 更加简洁,间接基于 JOIN 过后的宽表进行过滤。然而 ClickHouse 可能会将不满足条件的 users_unique_all 数据也进行 JOIN 咱们应用中有一个简单的 case,用户表过滤条件不下推有 1 千万 +,SQL 执行了 3000 秒仍然执行超时,而做了下推之后 60 秒内就执行胜利了

2.3Clickhouse 引擎层优化

一个 SQL 理论在 Clickhouse 如何执行,对 SQL 的执行工夫和资源耗费至关重要。社区版的 Clickhouse 在执行模型和 SQL 优化器上还要改良的空间,尤其是简单 SQL 以及多 JOIN 的场景下

执行模型优化社区版的 Clickhouse

目前还是一个两阶段执行的执行模型。第一阶段,Coordinator 在收到查问后,将申请发送给对应的 Worker 节点。第二阶段,Worker 节点实现计算,Coordinator 在收到各 Worker 节点的数据后进行汇聚和解决,并将解决后的后果返回。

有以下几个问题:

  1. 第二阶段的计算比较复杂时,Coordinator 的节点计算压力大,容易成为瓶颈
  2. 不反对 shuffle join,hash join 时右表为大表时构建慢,容易 OOM
  3. 对简单查问的反对不敌对

字节跳动 ClickHouse 团队为了解决上述问题,改良了执行模型,参考其余的分布式数据库引擎(例如 Presto 等),将一个简单的 Query 按数据交换状况切分成多个 Stage,各 Stage 之间则通过 Exchange 实现数据交换。依据 Stage 依赖关系定义拓扑构造,产生 DAG 图,并依据 DAG 图调度 Stage。例如两表 Join,会先调度左右表读取 Stage,之后再调度 Join 这个 Stage,Join 的 Stage 依赖于左右表的 Stage。

举个例子

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id, 
    dt.hash_did AS device_hashid
FROM tob_apps_all AS et 
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_did
    FROM devices_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS dt ON et.device_id = dt.device_id
WHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')
LIMIT 10

Stage 执行模型根本过程(可能的):

  • 读取 tob_apps_all 数据,依照 join key(hash_uid)进行 shuffle,数据散发到每个节点。这是一个 Stage
  • 读取 users_unique_all 数据,依照 join key(hash_uid)进行 shuffle,数据散发到每个节点。这是一个 Stage
  • 上述两个表的数据,在每个节点上的数据进行本地 join,而后再依照 join key(device_id)进行 shuffle。这是一个 Stage
  • 读取 devices_all 数据,依照 join key(device_id)进行 shuffle,这是一个 Stage
  • 第 3 步、第 4 步的数据,雷同 join key(device_id)的数据都在同一个节点上,而后进行本地 JOIN,这是一个 Stage
  • 汇总数据,返回 limit 10 的数据。这是一个 Stage
    统计成果如下:

    查问优化器
    有了下面的 stage 的执行模型,能够灵便调整 SQL 的执行程序,字节跳动 Clickhouse 团队自研了查问优化器,依据优化规定 (基于规定和代价预估) 对 SQL 的执行打算进行转换,一个执行打算通过优化规定后会变成另外一个执行打算,可能精确的抉择出一条效率最高的执行门路,而后构建 Stage 的 DAG 图,大幅度降低查问工夫
    下图形容了整个查问的执行流程,从 SQL parse 到执行期间所有内容全副进行了从新实现(其中紫色模块),构建了一套残缺的且标准的查问优化器。

    还是下面的三表 JOIN 的例子,可能的一个执行过程是:

  • 查问优化器发现 users_unique_all 表与 tob_apps_all 表的分 shard 规定一样(基于用户 ID),所以就不会先对表按 join key 进行 shuffle,users_unique 与 tob_apps 间接基于本地表 JOIN,而后再依照 join key(device_id)进行 shuffle。这是一个 Stage
  • 查问优化器依据规定或者代价预估决定设施表 devices_all 是须要 broadcast join 还是 shuffle join 如果 broadcast join:在一个节点查到全副的 device 数据,而后散发到其余节点。这是一个 Stage 如果 shuffle join:在每个节点对 device 数据依照 join key(device_id)进行 shuffle。这是一个 Stage
  • 汇总数据,返回 limit 10 的数据。这是一个 Stage

成果:能够看到,查问优化器能优化典型的简单的 SQL 的执行效率,缩短执行工夫

总结

ClickHouse 最为善于的畛域是一个大宽表来进行查问,多表 JOIN 时 Clickhouse 性能体现不佳。作为业内当先的用户剖析与经营平台,火山引擎增长剖析 DataFinder 基于海量数据做到了简单指标可能秒级查问。本文介绍了咱们是如何优化 Clickhouse JOIN 查问的。
次要有以下几个方面:

  1. 缩小参加 JOIN 的表以及数据量
  2. 优先应用本地 JOIN,防止分布式 JOIN 带来的性能损耗
  3. 优化本地 JOIN,优先应用内存进行 JOIN
  4. 优化分布式 JOIN 的执行逻辑,依靠于字节跳动对 ClickHouse 的深度定制化

立刻跳转火山引擎 DataFinder 官网理解详情

正文完
 0