关于clickhouse:ClickHouse集群数据均衡方案分享

106次阅读

共计 5848 个字符,预计需要花费 15 分钟才能阅读完成。

导语

ClickHouse集群数据在写入时,尽管能够通过 Distributed 引擎的 sharding_key 指定策略,从而保障肯定水平的数据平衡,但这并不是最终解决方案。

比方 rand() 平衡策略尽管能够保证数据的绝对平衡,然而可能会毁坏数据的外在业务逻辑。举个简略的例子,咱们想要将 kafka 的数据写入 clickhouse 集群,如果采纳 rand() 的策略,则可能将同一个 partition 的数据拆分到 clickhouse 集群不同的 shard 中,为后续的数据分析等造成了肯定的麻烦。

尽管有相似 clickhouse-sinker 之类的数据导入工具,能够做到数据导入时的平衡,然而一旦集群扩大了节点,依然无奈将存量数据平衡到新减少的节点中去。这样就造成了存量节点的数据依然很多,新增节点的数据绝对较少,并不能起到很好的负载平衡的作用。

数据平衡计划探讨

咱们在探讨数据平衡计划的时候,首先须要明确两个前提:

  • 针对 clickhouse 集群,而不是单点
  • 针对 MergeTree 家族的引擎数据(其余引擎的数据表因为无奈通过分布式表去读写,因而也不具备数据平衡的意义)

咱们晓得,clickhouse存储数据是完完全全的列式存储,这也就意味着,在同一个 partition 下,数据很难再一条条的进行拆分(尽管能够做到,但比拟麻烦)。因而,数据平衡最迷信的计划是以 partition 为单位,整个 partition 进行搬迁。这也就意味着,分区的粒度越小,最终的数据越靠近平衡。

另一个咱们要思考的问题就是,如果其中某一个分区咱们始终在写入数据,咱们是无奈获取该分区的理论大小的(因为始终在变动)。那么,如果该分区数据也参数数据平衡的话,可能参加平衡的 partition 并不是一个残缺的分区,就会导致分区数据被拆散,从而造成不可预知的问题。所以,咱们心愿最新的一个分区,不参加数据平衡的运算。

如何能获取到最新的分区呢?其实能够通过 SQL 查问到:

SELECT argMax(partition, modification_time) FROM system.parts WHERE database='?' AND table='?'

以上 SQL 查问进去的,就是指定的数据库表最新的分区。将这个分区排除在外,那么剩下的分区就是都能够参加数据平衡的分区了。

另一个外围问题是,如何将 partition 数据在不同节点之间进行挪动?咱们很容易想到 attachdetach,但attachdetach的前提是,咱们必须要设置配置文件中以后操作用户 allow_drop_detached 标记为 1。对于带正本的集群,咱们总能通过zookeeper 的门路十分不便地将分区数据在不同节点间 fetch 过去。

 -- 在指标节点执行
 ALTER TABLE {{.tbname}} FETCH PARTITION '{{.partition}}' FROM '{{.zoopath}}'
 ALTER TABLE {{.tbname}} ATTACH PARTITION '{{.partition}}'
 
 -- 在原始节点执行
 ALTER TABLE {{.tbname}} DROP PARTITION '{{.partition}}'

然而对于非正本模式的集群则没那么简略了。因为咱们无奈晓得 zoopath,所以不能通过fetch 的形式获取到数据,因而,只能应用物理的形式将数据进行传输(比方 scp, rsync 等)到指定节点上。思考到传输效率,这里咱们应用 rsync 的形式。

-- 原始节点执行
ALTER TABLE {{.tbname}} DETACH PARTITION '{{.partition}}'
# 原始节点执行
rsync -e "ssh -o StrictHostKeyChecking=false" -avp /{{.datapath}}/clickhouse/data/{{.database}}/{{.table}}/detached dstHost:/{{.datapath}}/clickhouse/data/{{.database}}/{{.table}}/detached
rm -fr /{{.datapath}}/clickhouse/data/{{.database}}/{{.table}}/detached
-- 指标节点执行
ALTER TABLE {{.tbname}} ATTACH PARTITION '{{.partition}}'
-- 原始节点执行
ALTER TABLE {{.tbname}} DROP DETACHED PARTITION '{{.partition}}'

然而,通过 rsync 的形式须要有前提,那就是首先必须在各个节点上曾经装置过 rsync 工具了,如果没有装置,可通过上面的命令装置:

yum install -y rsync

其次,须要配置各节点之间的互信(次要是 moveout 的节点到 movein 节点之间的互信,但其实咱们并不知道数据在节点间数如何挪动的,因而最好全副配置)。

以上问题解决后,那么就剩下最外围的一个问题了。数据如何平衡?

这里须要阐明的是,因为是整个 partition 的挪动,因而,无奈做到相对的平衡,而是只能做到绝对的数据平衡。partition的粒度越小,平衡越准确。

一种比拟迷信的计划是,将各个节点的分区数据按大小排列之后,将最大的节点数据挪动到最小的节点中去,次大的节点移到次小的节点,以此类推,一直向两头聚拢,直到满足某一个阈值,则不再挪动。

这一段的代码实现咱们曾经通过 ckman 我的项目开源进去了,如果感兴趣的敌人能够通过上面的链接浏览源码:ckman:rebalancer。

因而,不难发现,数据平衡的过程中,分区数据因为可能曾经被detach,然而还没来得及在新的节点上attach,这时候去做查问,可能存在肯定几率的不精确。

所以,在做数据平衡的过程中,最好不要有查问操作。

插入操作反而不受影响,因为咱们曾经排除了最新的分区不参加平衡运算。

ckman 如何实现数据平衡

ckman 作为一款治理和监控 ClickHouse 集群的可视化工具,人造集成了数据平衡的性能。只须要点击集群治理页面的 ” 平衡集群 ” 按钮,即可实现数据平衡的操作。

与此同时,ckman还提供了命令行形式的数据平衡工具rebalancer,其参数如下:

  • -ch-data-dir

    • clickhouse集群数据目录
  • -ch-hosts

    • 节点列表(每个 shard 只需列出一个,如果 shard 有多个正本,无需全副列出)
  • -ch-password

    • clickhouse用户明码
  • -ch-port

    • clickhouseTCP 端口,默认9000
  • -ch-user

    • clickhouse的用户,界面操作时,应用 default 用户
  • -os-password

    • 节点的 ssh 登录明码(非正本模式时须要)
  • -os-port

    • 节点的 ssh 端口,默认22(非正本模式时须要)
  • -os-user

    • 节点的 ssh 用户(非正本模式时须要)

如:

rebalancer -ch-data-dir=/var/lib/ --ch-hosts=192.168.0.1,192.168.0.2,192.168.0.3 --ch-password=123123 --ch-port=9000 --ch-user=default --os-password=123456 --os-port=22 --os-user=root

实操案例

咱们在 ckman 中筹备了一个名为 eoi 的集群,该集群有三个节点,别离为192.168.21.73,192.168.21.74,192.168.21.75,集群为非正本模式。

咱们从官网文档给出的数据集中导入如下数据:https://clickhouse.com/docs/e…

该数据是从 2019 年 1 月到 2021 年 5 月,共计 30 个月的航空数据,为了更直观地展现数据平衡,本文将官网的建表语句做了微调,依照月进行分区,并且在集群各个节点都创立表:

CREATE TABLE opensky ON CLUSTER eoi
(
    callsign String,
    number String,
    icao24 String,
    registration String,
    typecode String,
    origin String,
    destination String,
    firstseen DateTime,
    lastseen DateTime,
    day DateTime,
    latitude_1 Float64,
    longitude_1 Float64,
    altitude_1 Float64,
    latitude_2 Float64,
    longitude_2 Float64,
    altitude_2 Float64
) ENGINE = MergeTree 
PARTITION BY toYYYYMM(day)
ORDER BY (origin, destination, callsign);

并创立分布式表:

CREATE TABLE dist_opensky ON CLUSTER eoi AS opensky
ENGINE = Distributed(eoi, default, opensky, rand())

下载数据:

wget -O- https://zenodo.org/record/5092942 | grep -oP 'https://zenodo.org/record/5092942/files/flightlist_\d+_\d+\.csv\.gz' | xargs wget

数据下载实现大概4.3G

应用上面的脚本将数据导入到其中一个节点:

for file in flightlist_*.csv.gz; do gzip -c -d "$file" | clickhouse-client --password 123123 --date_time_input_format best_effort --query "INSERT INTO opensky FORMAT CSVWithNames"; done

导入实现后,别离查看各节点数据如下:

-- 总数据
master :) select  count() from dist_opensky;

SELECT count()
FROM dist_opensky

Query id: b7bf794b-086b-4986-b616-aef1d40963e3

┌──count()─┐
│ 66010819 │
└──────────┘

1 rows in set. Elapsed: 0.024 sec. 

-- node 21.73
master :) select  count() from opensky;

SELECT count()
FROM opensky

Query id: 5339e93c-b2ed-4085-9f58-da099a641f8f

┌──count()─┐
│ 66010819 │
└──────────┘

1 rows in set. Elapsed: 0.002 sec. 


-- node 21.74
worker-1 :) select  count() from opensky;

SELECT count()
FROM opensky

Query id: 60155715-064e-4c4a-9103-4fd6bf9b7667

┌─count()─┐
│       0 │
└─────────┘

1 rows in set. Elapsed: 0.002 sec. 

-- node 21.75
worker-2 :) select count() from opensky;

SELECT count()
FROM opensky

Query id: d04f42df-d1a4-4d90-ad47-f944b7a32a3d

┌─count()─┐
│       0 │
└─────────┘

1 rows in set. Elapsed: 0.002 sec. 

从以上信息,咱们能够晓得,原始数据 6600 万条全副在 21.73 这个节点上,另外两个节点 21.7421.75没有数据。

ckman 界面能够看到如下信息:

而后点击数据平衡,期待一段时间后,会看到界面提醒数据平衡胜利,再次查看各节点数据:

-- 总数据
master :) select  count() from dist_opensky;

SELECT count()
FROM dist_opensky

Query id: bc4d27a9-12bf-4993-b37c-9f332ed958c9

┌──count()─┐
│ 66010819 │
└──────────┘

1 rows in set. Elapsed: 0.006 sec. 


-- node 21.73
master :) select  count() from opensky;

SELECT count()
FROM opensky

Query id: a4da9246-190c-4663-8091-d09b2a9a2ea3

┌──count()─┐
│ 24304792 │
└──────────┘

1 rows in set. Elapsed: 0.002 sec.

-- node 21.74
worker-1 :) select  count() from opensky;

SELECT count()
FROM opensky

Query id: 5f6a8c89-c21a-4ae1-b69f-2755246ca5d7

┌──count()─┐
│ 20529143 │
└──────────┘

1 rows in set. Elapsed: 0.002 sec. 

-- node 21.75
worker-2 :) select count() from opensky;

SELECT count()
FROM opensky

Query id: 569d7c63-5279-48ad-a296-013dc1df6756

┌──count()─┐
│ 21176884 │
└──────────┘

1 rows in set. Elapsed: 0.002 sec.

通过上述操作,简略演示了数据平衡在 ckman 中的实现,原始数据 6600 万条全副在 node1,通过平衡之后,其中node1 数据为 2400 万条,node22000 万条,node32100 万条,实现了大抵的数据平衡。

结语

尽管咱们能够通过 ckman 之类的工具能够实现数据的大抵平衡,大大改善了操作的便利性,但数据平衡自身就是一个非常复杂的命题,一旦波及到存储策略(如数据存储在远端 HDFS 上),那么又会减少数据平衡的复杂性,这些都是 ckman 目前所不能反对的操作(近程数据做数据平衡没有意义,然而能够平衡其元数据,这样能够在查问时充分利用各节点的 CPU 性能)。因而,数据平衡想要做得迷信而准确,依然须要更多的致力。

正文完
 0