关于数据仓库:一招教你数据仓库如何高效批量导入与更新数据

9次阅读

共计 10803 个字符,预计需要花费 28 分钟才能阅读完成。

摘要:GaussDB(DWS) 反对的 MERGE INTO 性能,能够同时进行大数据量的更新与插入。对于数据仓库是一项十分重要的技术。

本文分享自华为云社区《一招教你如何高效批量导入与更新数据》,原文作者:acydy。

前言

如果有一张表,咱们既想对它更新,又想对它插入应该如何操作?能够应用 UPDATE 和 INSERT 实现你的指标。

如果你的数据量很大,想尽快实现工作执行,可否有其余计划?那肯定不要错过 GaussDB(DWS) 的 MERGE INTO 性能。

MERGE INTO 概念

MERGE INTO 是 SQL 2003 引入的规范。

If a table T, as well as being updatable, is insertable-into, then rows can be inserted into it (subject to applicable Access Rules and Conformance Rules). The primary effect of an <insert statement> on T is to insert into T each of the zero or more rows contained in a specified table. The primary effect of a <merge statement> on T is to replace zero or more rows in T with specified rows and/or to insert into T zero or more specified rows, depending on the result of a <search condition> and on whether one or both of <merge when matched clause> and <merge when not matched clause> are specified.

一张表在一条语句外面既能够被更新,也能够被插入。是否被更新还是插入取决于 search condition 的后果和指定的 merge when matched clause(当 condition 匹配时做什么操作)和 merge when not matched clause(当 condition 不匹配时做什么操作)语法。

SQL 2008 进行了扩大,能够应用多个 MATCHED 和 NOT MATCHED。

MERGE has been extended to support multiple MATCHED and NOT MATCHED clauses, each accompanied by a search condition, that gives much greater flexibility in the coding of complex MERGE statements to handle update conflicts.

MERGE INTO 命令波及到两张表。指标表:被插入或者更新的表。源表:用于跟指标表进行匹配的表,指标表的数据起源。

MERGE INTO 语句将指标表和源表中数据针对关联条件进行匹配,若关联条件匹配时对指标表进行 UPDATE,无奈匹配时对指标表执行 INSERT。

应用场景:当业务中须要将一个表中大量数据增加到现有表时,应用 MERGE INTO 能够高效地将数据导入,防止屡次 INSERT+UPDATE 操作。

MERGE INTO 语法

GaussDB(DWS) MERGE INTO 语法如下:

MERGE INTO table_name [[ AS] alias ]
USING {{ table_name | view_name} | subquery } [[ AS] alias ]
ON (condition)
[
  WHEN MATCHED THEN
  UPDATE SET {column_name = { expression | DEFAULT} |
          (column_name [, ...] ) = ({ expression | DEFAULT} [, ...] ) } [, ...]
  [WHERE condition]
]
[
  WHEN NOT MATCHED THEN
  INSERT { DEFAULT VALUES |
  [( column_name [, ...] ) ] VALUES ({ expression | DEFAULT} [, ...] ) [, ...] [WHERE condition] }
];
  • INTO 指定指标表。
  • USING 指定源表。源表能够是一般表,也能够是子查问。
  • ON 关联条件,用于指定指标表和源表的关联条件。
  • WHEN MATCHED 当源表和指标表中数据能够匹配关联条件时,抉择 WHEN MATCHED 子句执行 UPDATE 操作。
  • WHEN NOT MATCHED 当源表和指标表中数据无奈匹配关联条件时,抉择 WHEN NOT MATCHED 子句执行 INSERT 操作。

    • WHEN MATCHED,WHEN NOT MATCHED 能够缺省一个,不能指定多个。
    • WHEN MATCHED,WHEN NOT MATCHED 能够应用 WHERE 进行条件过滤。
    • WHEN MATCHED,WHEN NOT MATCHED 程序能够替换。

实战利用

首先创立好上面几张表,用于执行 MREGE INTO 操作。

gaussdb=# CREATE TABLE dst (
  product_id INT,
  product_name VARCHAR(20),
  category VARCHAR(20),
  total INT
) DISTRIBUTE BY HASH(product_id);

gaussdb=# CREATE TABLE dst_data (
  product_id INT,
  product_name VARCHAR(20),
  category VARCHAR(20),
  total INT
) DISTRIBUTE BY HASH(product_id);

gaussdb=# CREATE TABLE src (
  product_id INT,
  product_name VARCHAR(20),
  category VARCHAR(20),
  total INT
) DISTRIBUTE BY HASH(product_id);

gaussdb=# INSERT INTO dst_data VALUES(1601,'lamaze','toys',100),(1600,'play gym','toys',100),(1502,'olympus','electrncs',100),(1501,'vivitar','electrnc',100),(1666,'harry potter','dvd',100);
gaussdb=# INSERT INTO src VALUES(1700,'wait interface','books',200),(1666,'harry potter','toys',200),(1601,'lamaze','toys',200),(1502,'olympus camera','electrncs',200);
gaussdb=# INSERT INTO dst SELECT * FROM dst_data;

同时指定 WHEN MATCHED 与 WHEN NOT MATCHED

  • 查看打算,看下 MERGE INTO 是如何执行的。

MERGE INTO 转化成 JOIN 将两个表进行关联解决,关联条件就是 ON 后指定的条件。

gaussdb=# EXPLAIN (COSTS off)
MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN MATCHED THEN
  UPDATE SET product_name = y.product_name, category = y.category, total = y.total
WHEN NOT MATCHED THEN
  INSERT VALUES (y.product_id, y.product_name, y.category, y.total);
 
                    QUERY PLAN
--------------------------------------------------
  id |                operation
-----+--------------------------------------------
   1 | ->  Streaming (type: GATHER)
   2 |    ->  Merge on dst x
   3 |       ->  Streaming(type: REDISTRIBUTE)
   4 |          ->  Hash Left Join (5, 6)
   5 |             ->  Seq Scan on src y
   6 |             ->  Hash
   7 |                ->  Seq Scan on dst x

  Predicate Information (identified by plan id)
 ------------------------------------------------
   4 --Hash Left Join (5, 6)
         Hash Cond: (y.product_id = x.product_id)
(14 rows)

为什么这里转化成了 LEFT JOIN?

因为须要在指标表与源表匹配时更新指标表,不匹配时向指标表插入数据。也就是源表的一部分数据用于更新指标表,另一部分用于向指标表插入。与 LEFT JOIN 语义是类似的。

 5 --Seq Scan on public.src y
         Output: y.product_id, y.product_name, y.category, y.total, y.ctid
         Distribute Key: y.product_id
   6 --Hash
         Output: x.product_id, x.product_name, x.category, x.total, x.ctid, x.xc_node_id
   7 --Seq Scan on public.dst x
         Output: x.product_id, x.product_name, x.category, x.total, x.ctid, x.xc_node_id
         Distribute Key: x.product_id
  • 执行 MERGE INTO,查看后果。

两张表在 product_id 是 1502,1601,1666 时能够关联,所以这三条记录被更新。src 表 product_id 是 1700 时未匹配,插入此条记录。其余未修改。

gaussdb=# SELECT * FROM dst ORDER BY 1;
 product_id | product_name | category  | total
------------+--------------+-----------+-------
       1501 | vivitar      | electrnc  |   100
       1502 | olympus      | electrncs |   100
       1600 | play gym     | toys      |   100       
       1601 | lamaze       | toys      |   100
       1666 | harry potter | dvd       |   100      
(5 rows)

gaussdb=# SELECT * FROM src ORDER BY 1;
 product_id |  product_name  | category  | total
------------+----------------+-----------+-------
       1502 | olympus camera | electrncs |   200
       1601 | lamaze         | toys      |   200       
       1666 | harry potter   | toys      |   200
       1700 | wait interface | books     |   200       
(4 rows)

gaussdb=# MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN MATCHED THEN
  UPDATE SET product_name = y.product_name, category = y.category, total = y.total
WHEN NOT MATCHED THEN
  INSERT VALUES (y.product_id, y.product_name, y.category, y.total);
MERGE 4
gaussdb=# SELECT * FROM dst ORDER BY 1;
 product_id |  product_name  | category  | total
------------+----------------+-----------+-------
       1501 | vivitar        | electrnc  |   100  -- 未修改
       1502 | olympus camera | electrncs |   200  -- 更新
       1600 | play gym       | toys      |   100  -- 未修改
       1601 | lamaze         | toys      |   200  -- 更新
       1666 | harry potter   | toys      |   200  -- 更新
       1700 | wait interface | books     |   200  -- 插入
(6 rows)
  • 查看具体 UPDATE、INSERT 个数

能够通过 EXPLAIN PERFORMANCE 或者 EXPLAIN ANALYZE 查看 UPDATE、INSERT 各自个数。(这里仅显示必要局部)

在 Predicate Information 局部能够看到总共插入一条,更新三条。

在 Datanode Information 局部能够看到每个节点的信息。datanode1 上更新 2 条,datanode2 上插入一条,更新 1 条。

gaussdb=# EXPLAIN PERFORMANCE
MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN MATCHED THEN
  UPDATE SET product_name = y.product_name, category = y.category, total = y.total
WHEN NOT MATCHED THEN
  INSERT VALUES (y.product_id, y.product_name, y.category, y.total);
 
  Predicate Information (identified by plan id)
 ------------------------------------------------
   2 --Merge on public.dst x
         Merge Inserted: 1
         Merge Updated: 3
 
                      Datanode Information (identified by plan id)
 ---------------------------------------------------------------------------------------
   2 --Merge on public.dst x
         datanode1 (Tuple Inserted 0, Tuple Updated 2)
         datanode2 (Tuple Inserted 1, Tuple Updated 1)  

省略 WHEN NOT MATCHED 局部。

  • 这里因为没有 WHEN NOT MATCHED 局部,在两个表不匹配时不须要执行任何操作,也就不须要源表这部分的数据,所有只须要 inner join 即可。
gaussdb=# EXPLAIN (COSTS off)
MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN MATCHED THEN
  UPDATE SET product_name = y.product_name, category = y.category, total = y.total;
                    QUERY PLAN
--------------------------------------------------
  id |             operation
 ----+-----------------------------------
   1 | ->  Streaming (type: GATHER)
   2 |    ->  Merge on dst x
   3 |       ->  Hash Join (4,5)
   4 |          ->  Seq Scan on dst x
   5 |          ->  Hash
   6 |             ->  Seq Scan on src y

  Predicate Information (identified by plan id)
 ------------------------------------------------
   3 --Hash Join (4,5)
         Hash Cond: (x.product_id = y.product_id)
(13 rows)
  • 执行后查看后果。MERGE INTO 只操作了 3 条数据。
gaussdb=# truncate dst;
gaussdb=# INSERT INTO dst SELECT * FROM dst_data;
gaussdb=# MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN MATCHED THEN
  UPDATE SET product_name = y.product_name, category = y.category, total = y.total;
MERGE 3
gaussdb=# SELECT * FROM dst;
 product_id |  product_name  | category  | total
------------+----------------+-----------+-------
       1501 | vivitar        | electrnc  |   100  -- 未修改
       1502 | olympus camera | electrncs |   200  -- 更新
       1600 | play gym       | toys      |   100  -- 未修改
       1601 | lamaze         | toys      |   200  -- 更新
       1666 | harry potter   | toys      |   200  -- 更新
(5 rows)

省略 WHEN NOT MATCHED

  • 只有在不匹配时进行插入。后果中没有数据被更新。
gaussdb=# EXPLAIN (COSTS off)
MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN NOT MATCHED THEN
  INSERT VALUES (y.product_id, y.product_name, y.category, y.total);
                    QUERY PLAN
--------------------------------------------------
  id |                operation
 ----+-----------------------------------------
   1 | ->  Streaming (type: GATHER)
   2 |    ->  Merge on dst x
   3 |       ->  Streaming(type: REDISTRIBUTE)
   4 |          ->  Hash Left Join (5, 6)
   5 |             ->  Seq Scan on src y
   6 |             ->  Hash
   7 |                ->  Seq Scan on dst x

  Predicate Information (identified by plan id)
 ------------------------------------------------
   4 --Hash Left Join (5, 6)
         Hash Cond: (y.product_id = x.product_id)
(14 rows)

gaussdb=# truncate dst;
gaussdb=# INSERT INTO dst SELECT * FROM dst_data;
gaussdb=# MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN NOT MATCHED THEN
  INSERT VALUES (y.product_id, y.product_name, y.category, y.total);
MERGE 1
gaussdb=# SELECT * FROM dst ORDER BY 1;
 product_id |  product_name  | category  | total
------------+----------------+-----------+-------
       1501 | vivitar        | electrnc  |   100  -- 未修改
       1502 | olympus        | electrncs |   100  -- 未修改
       1600 | play gym       | toys      |   100  -- 未修改
       1601 | lamaze         | toys      |   100  -- 未修改
       1666 | harry potter   | dvd       |   100  -- 未修改
       1700 | wait interface | books     |   200  -- 插入
(6 rows)

WHERE 过滤条件

语义是在进行更新或者插入前判断以后行是否满足过滤条件,如果不满足,就不进行更新或者插入。如果对于字段不想被更新,须要指定过滤条件。

上面例子在两表可关联时,只会更新 product_name = ‘olympus’的行。在两表无奈关联时且源表的 product_id != 1700 时才会进行插入。

gaussdb=# truncate dst;
gaussdb=# INSERT INTO dst SELECT * FROM dst_data;
gaussdb=# MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN MATCHED THEN
  UPDATE SET product_name = y.product_name, category = y.category, total = y.total
  WHERE x.product_name = 'olympus'
WHEN NOT MATCHED THEN
  INSERT VALUES (y.product_id, y.product_name, y.category, y.total) WHERE y.product_id != 1700;
MERGE 1
gaussdb=# SELECT * FROM dst ORDER BY 1;
SELECT * FROM dst ORDER BY 1;
 product_id |  product_name  | category  | total
------------+----------------+-----------+-------
       1501 | vivitar        | electrnc  |   100
       1502 | olympus camera | electrncs |   200
       1600 | play gym       | toys      |   100
       1601 | lamaze         | toys      |   100
       1666 | harry potter   | dvd       |   100
(5 rows)

子查问

在 USING 局部能够应用子查问,进行更简单的关联操作。

  • 对源表进行聚合操作的后果再与指标表匹配
MERGE INTO dst x
USING (SELECT product_id, product_name, category, sum(total) AS total FROM src group by product_id, product_name, category
) y
ON x.product_id = y.product_id
WHEN MATCHED THEN
    UPDATE SET product_name = x.product_name, category = x.category, total = x.total
WHEN NOT MATCHED THEN
    INSERT VALUES (y.product_id, y.product_name, y.category, y.total + 200);
  • 多个表 UNION 后的后果再与指标表匹配
MERGE INTO dst x
USING (
  SELECT 1501 AS product_id, 'vivitar 35mm' AS product_name, 'electrncs' AS category, 100 AS total UNION ALL
  SELECT 1666 AS product_id, 'harry potter' AS product_name, 'dvd' AS category, 100 AS total
) y
ON x.product_id = y.product_id
WHEN MATCHED THEN
    UPDATE SET product_name = x.product_name, category = x.category, total = x.total
WHEN NOT MATCHED THEN
    INSERT VALUES (y.product_id, y.product_name, y.category, y.total + 200);

存储过程

gaussdb=# CREATE OR REPLACE PROCEDURE store_procedure1()
AS
BEGIN
  MERGE INTO dst x
  USING src y
  ON x.product_id = y.product_id
  WHEN MATCHED THEN
    UPDATE SET product_name = y.product_name, category = y.category, total = y.total;
END;
/

CREATE PROCEDURE
gaussdb=# CALL store_procedure1();

MERGE INTO 背地原理

上文提到了 MREGE INTO 转化成 LEFT JOIN 或者 INNER JOIN 将指标表和源表进行关联。那么如何晓得某一行要进行更新还是插入?

通过 EXPLAIN VERBOSE 查看算子的输入。扫描两张表时都输入了 ctid 列。那么 ctid 列有什么作用呢?

  5 --Seq Scan on public.src y
         Output: y.product_id, y.product_name, y.category, y.total, y.ctid
         Distribute Key: y.product_id
   6 --Hash
         Output: x.product_id, x.product_name, x.category, x.total, x.ctid, x.xc_node_id
   7 --Seq Scan on public.dst x
         Output: x.product_id, x.product_name, x.category, x.total, x.ctid, x.xc_node_id
         Distribute Key: x.product_id

ctid 标识了这一行在存储上具体位置,晓得了这个地位就能够对这个地位的数据进行更新。GaussDB(DWS) 作为 MPP 分布式数据库,还须要晓得节点的信息 (xc_node_id)。UPDATE 操作须要这两个值。

在 MREGE INTO 这里 ctid 还另有妙用。当指标表匹配时须要更新,这是就保留本行 ctid 值。如果无奈匹配,插入即可。就不须要 ctid,此时可意识 ctid 值是 NULL。依据 LEFT JOIN 输入的 ctid 后果是否为 NULL,最终决定本行该被更新还是插入。

这样在两张表做完 JOIN 操作后,依据 JOIN 后输入的 ctid 列,更新或者插入某一行。

注意事项

应用 MERGE INTO 时要留神匹配条件是否适合。如果不留神,容易造成数据被非预期更新,可能整张表被更新。

总结

GAUSSDB(DWS) 提供了高效的数据导入的性能 MERGE INTO,对于数据仓库是一项十分要害的性能。能够应用 MERGE INTO 同时更新和插入一张表,在数据量十分大的状况下也能很快实现地数据导入。

想理解 GuassDB(DWS) 更多信息,欢送微信搜寻“GaussDB DWS”关注微信公众号,和您分享最新最全的 PB 级数仓黑科技,后盾还可获取泛滥学习材料哦~

点击关注,第一工夫理解华为云陈腐技术~

正文完
 0