乐趣区

关于程序员:Clickhouse如何实现数据更新

Clickhouse 作为一个 OLAP 数据库,它对事务的反对十分无限。Clickhouse 提供了 MUTATION 操作(通过 ALTER TABLE 语句)来实现数据的更新、删除,但这是一种“较重”的操作,它与规范 SQL 语法中的 UPDATE、DELETE 不同,是异步执行的,对于批量数据不频繁的更新或删除比拟有用。除了 MUTATION 操作,Clickhouse 还能够通过 CollapsingMergeTree、VersionedCollapsingMergeTree、ReplacingMergeTree 联合具体业务数据结构来实现数据的更新、删除,这三种形式都通过 INSERT 语句插入最新的数据,新数据会“对消”或“替换”掉老数据,然而“对消”或“替换”都是产生在数据文件后盾 Merge 时,也就是说,在 Merge 之前,新数据和老数据会同时存在。因而,咱们须要在查问时做一些解决,防止查问到老数据。Clickhouse 官网文档提供了应用 CollapsingMergeTree、VersionedCollapsingMergeTree 的领导。相比于 CollapsingMergeTree、VersionedCollapsingMergeTree 须要标记位字段、版本字段,用 ReplacingMergeTree 来实现数据的更新删除会更加不便,这里着重介绍一下如何用 ReplacingMergeTree 来实现数据的更新删除。

咱们假如一个须要频繁数据更新的场景,如某市用户用电量的统计,咱们晓得,用户的用电量每分每秒都有可能发生变化,所以会波及到数据频繁的更新。首先,创立一张表来记录某市所有用户的用电量。

CREATE TABLE IF NOT EXISTS default.PowerConsumption_local ON CLUSTER default_cluster
(
    User_ID             UInt64                              COMMENT '用户 ID',
    Record_Time         DateTime    DEFAULT toDateTime(0)   COMMENT '电量记录时间',
    District_Code       UInt8                               COMMENT '用户所在行政区编码',
    Address             String                              COMMENT '用户地址',
    Power               UInt64                              COMMENT '用电量',
    Deleted             BOOLEAN     DEFAULT 0               COMMENT '数据是否被删除'
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/default.PowerConsumption_local/{shard}', '{replica}', Record_Time)
ORDER BY (User_ID, Address)
PARTITION BY District_Code;
CREATE TABLE default.PowerConsumption ON CLUSTER default_cluster AS default.PowerConsumption_local
ENGINE = Distributed(default_cluster, default, PowerConsumption_local, rand());

PowerConsumption_local 为本地表,PowerConsumption 为对应的分布式表。其中 PowerConsumption_local 应用 ReplicatedReplacingMergeTree 表引擎,第三个参数‘Record_Time’示意雷同主键的多条数据,只会保留 Record_Time 最大的一条,咱们正是利用 ReplacingMergeTree 的这一个性来实现数据的更新删除。因而,在抉择主键时,咱们须要确保主键惟一。这里咱们抉择 (User_ID, Address) 来作为主键,因为用户 ID 加上用户的地址能够确定惟一的一个电表,不会呈现第二个雷同的电表,所以对于某个电表多条数据,只会保留电量记录时间最新的一条。

而后咱们向表中插入 10 条数据:

INSERT INTO default.PowerConsumption VALUES (0, '2021-10-30 12:00:00', 3, 'Yanta', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (1, '2021-10-30 12:10:00', 2, 'Beilin', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (2, '2021-10-30 12:15:00', 1, 'Weiyang', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (3, '2021-10-30 12:18:00', 1, 'Gaoxin', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (4, '2021-10-30 12:23:00', 2, 'Qujiang', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (5, '2021-10-30 12:43:00', 3, 'Baqiao', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (6, '2021-10-30 12:45:00', 1, 'Lianhu', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (7, '2021-10-30 12:46:00', 3, 'Changan', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (8, '2021-10-30 12:55:00', 1, 'Qianhan', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (9, '2021-10-30 12:57:00', 4, 'Fengdong', rand64() % 1000 + 1, 0);

表中数据如图所示:

如果当初咱们要行政区编码为 1 的所有用户数据都须要更新,咱们插入最新的数据:

INSERT INTO default.PowerConsumption VALUES (2, now(), 1, 'Weiyang', rand64() % 100 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (3, now(), 1, 'Gaoxin', rand64() % 100 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (6, now(), 1, 'Lianhu', rand64() % 100 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (8, now(), 1, 'Qianhan', rand64() % 100 + 1, 0);

插入最新数据后,表中数据如图所示:

能够看到,此时新插入的数据与老数据同时存在于表中,因为后盾数据文件还没有进行 Merge,“替换”还没有产生,这时就须要对查问语句做一些解决来过滤掉老数据,函数 argMax(a, b)能够依照 b 的最大值取 a 的值,所以通过如下查问语句就能够只获取到最新数据:

SELECT
    User_ID,
    max(Record_Time) AS R_Time,
    District_Code,
    Address,
    argMax(Power, Record_Time) AS Power,
    argMax(Deleted, Record_Time) AS Deleted
FROM default.PowerConsumption
GROUP BY
    User_ID,
    Address,
    District_Code
HAVING Deleted = 0;

查问后果如下图:

为了更不便咱们查问,这里能够创立一个视图:

CREATE VIEW PowerConsumption_view ON CLUSTER default_cluster AS
SELECT
    User_ID,
    max(Record_Time) AS R_Time,
    District_Code,
    Address,
    argMax(Power, Record_Time) AS Power,
    argMax(Deleted, Record_Time) AS Deleted
FROM default.PowerConsumption
GROUP BY
    User_ID,
    Address,
    District_Code
HAVING Deleted = 0;

通过该视图,能够查问到最新的数据:

如果当初咱们又须要删除用户 ID 为 0 的数据,咱们须要插入一条 User_ID 字段为 0,Deleted 字段为 1 的数据:

INSERT INTO default.PowerConsumption VALUES (0, now(), 3, 'Yanta', null, 1);

查问视图,发现 User_ID 为 0 的数据曾经查问不到了:

通过如上办法,咱们能够实现 Clickhouse 数据的更新、删除,就如同在应用 OLTP 数据库一样,但咱们应该分明,实际上老数据真正的删除是在数据文件 Merge 时产生的,只有在 Merge 后,老数据才会真正物理意义上的删除掉。

本文由华为云公布

退出移动版