乐趣区

关于大数据:开源交流丨批流一体数据集成工具ChunJun同步Hive事务表原理详解及实战分享

原文链接:批流一体数据集成工具 ChunJun 同步 Hive 事务表原理详解及实战分享

课件获取:关注公众号__“数栈研习社”,__后盾私信__“ChengYing”__ 取得直播课件

视频回放:点击这里

ChengYing 开源我的项目地址:github 丨 gitee 喜爱咱们的我的项目给咱们点个__ STAR!STAR!!STAR!!!(重要的事件说三遍)__

技术交换钉钉 qun:30537511

本期咱们带大家回顾一下无倦同学的直播分享《Chunjun 同步 Hive 事务表详解》

一、Hive 事务表的构造及原理

Hive 是基于 Hadoop 的一个数据仓库工具,用来进行数据提取、转化、加载,这是一种能够存储、查问和剖析存储在 Hadoop 中的大规模数据的机制。Hive 数据仓库工具能将结构化的数据文件映射为一张数据库表,并提供 SQL 查问性能,能将 SQL 语句转变成 MapReduce 工作来执行。

在分享 Hive 事务表的具体内容前,咱们先来理解下 HIve 事务表在 HDFS 存储上的一些限度。

Hive 尽管反对了具备 ACID 语义的事务,然而没有像在 MySQL 中应用那样不便,有很多局限性,具体限度如下:

  • 尚不反对 BEGIN,COMMIT 和 ROLLBACK,所有语言操作都是主动提交的
  • 仅反对 ORC 文件格式(STORED AS ORC)

    • 默认状况下事务配置为敞开,须要配置参数开启应用
  • 表必须是分桶表(Bucketed)才能够应用事务性能
  • 表必须外部表,内部表无奈创立事务表
  • 表参数 transactional 必须为 true
  • 内部表不能成为 ACID 表,不容许从非 ACID 会话读取 / 写入 ACID 表

以下矩阵包含能够应用 Hive 创立的表的类型、是否反对 ACID 属性、所需的存储格局以及要害的 SQL 操作。

理解完 Hive 事务表的限度,当初咱们具体理解下 Hive 事务表的内容。

1、事务表文件名字详解

  • 根底目录:

$partition/base_$wid/$bucket

  • 增量目录:

$partition/delta_$wid_$wid_$stid/$bucket

  • 参数目录:

$partition/delete_delta_$wid_$wid_$stid/$bucket

2、事务表文件内容详解

$ orc-tools data bucket_00000

{“operation”:0,”originalTransaction”:1,”bucket”:536870912,”rowId”:0,”currentTransaction”:1,”row”:{“id”:1,”name”:”Jerry”,”age”:18}}

{“operation”:0,”originalTransaction”:1,”bucket”:536870912,”rowId”:1,”currentTransaction”:1,”row”:{“id”:2,”name”:”Tom”,”age”:19}}

{“operation”:0,”originalTransaction”:1,”bucket”:536870912,”rowId”:2,”currentTransaction”:1,”row”:{“id”:3,”name”:”Kate”,”age”:20}}

  • operation 0 示意插入、1 示意更新,2 示意删除。因为应用了 split-update,UPDATE 是不会呈现的。
  • originalTransaction 是该条记录的原始写事务 ID:

a、对于 INSERT 操作,该值和 currentTransaction 是统一的;

b、对于 DELETE,则是该条记录第一次插入时的写事务 ID。

  • bucket 是一个 32 位整型,由 BucketCodec 编码,各个二进制位的含意为:

a、1-3 位:编码版本,以后是 001;

b、4 位:保留;

c、5-16 位:分桶 ID,由 0 开始。分桶 ID 是由 CLUSTERED BY 子句所指定的字段、以及分桶的数量决定的。该值和 bucket_N 中的 N 统一;

d、17-20 位:保留;

e、21-32 位:语句 ID;

举例来说,整型 536936448 的二进制格局为 00100000000000010000000000000000,即它是按版本 1 的格局编码的,分桶 ID 为 1。

  • rowId 是一个自增的惟一 ID,在写事务和分桶的组合中惟一;
  • currentTransaction 以后的写事务 ID;
  • row 具体数据。对于 DELETE 语句,则为 null。

3、更新 Hive 事务表数据

UPDATE employee SET age = 21 WHERE id = 2;

这条语句会先查问出所有符合条件的记录,获取它们的 row_id 信息,而后别离创立 delete 和 delta 目录:

/user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000

/user/hive/warehouse/employee/delete_delta_0000002_0000002_0000/bucket_00000 (update)

/user/hive/warehouse/employee/delta_0000002_0000002_0000/bucket_00000 (update)

delete_delta_0000002_0000002_0000/bucket_00000

蕴含了删除的记录:

{“operation”:2,”originalTransaction”:1,”bucket”:536870912,”rowId”:1,”currentTransaction”:2,”row”:null}

delta_0000002_0000002_0000/bucket_00000

蕴含更新后的数据:

{“operation”:0,”originalTransaction”:2,”bucket”:536870912,”rowId”:0,”currentTransaction”:2,”row”:{“id”:2,”name”:”Tom”,”salary”:21}}

4、Row_ID 信息怎么查?

5、事务表压缩(Compact)

随着写操作的积攒,表中的 delta 和 delete 文件会越来越多,事务表的读取过程中须要合并所有文件,数量一多势必会影响效率,此外小文件对 HDFS 这样的文件系统也不够敌对,因而 Hive 引入了压缩(Compaction)的概念,分为 Minor 和 Major 两类。

● Minor

Minor Compaction 会将所有的 delta 文件压缩为一个文件,delete 也压缩为一个。压缩后的后果文件名中会蕴含写事务 ID 范畴,同时省略掉语句 ID。

压缩过程是在 Hive Metastore 中运行的,会依据肯定阈值主动触发。咱们也能够应用如下语句人工触发:

ALTER TABLE dtstack COMPACT ‘MINOR’。

● Major

Major Compaction 会将所有的 delta 文件,delete 文件压缩到一个 base 文件。压缩后的后果文件名中会蕴含所有写事务 ID 的最大事务 ID。

压缩过程是在 Hive Metastore 中运行的,会依据肯定阈值主动触发。咱们也能够应用如下语句人工触发:

ALTER TABLE dtstack COMPACT ‘MAJOR’。

6、文件内容详解

ALTER TABLE employee COMPACT ‘minor’;

语句执行前:

/user/hive/warehouse/employee/delta_0000001_0000001_0000

/user/hive/warehouse/employee/delta_0000002_0000002_0000 (insert 创立, mary 的数据)

/user/hive/warehouse/employee/delete_delta_0000002_0000002_0001 (update)

/user/hive/warehouse/employee/delta_0000002_0000002_0001 (update)

语句执行后:

/user/hive/warehouse/employee/delete_delta_0000001_0000002

/user/hive/warehouse/employee/delta_0000001_0000002

7、读 Hive 事务表

咱们能够看到 ACID 事务表中会蕴含三类文件,别离是 base、delta、以及 delete。文件中的每一行数据都会以 row_id 作为标识并排序。从 ACID 事务表中读取数据就是对这些文件进行合并,从而失去最新事务的后果。这一过程是在 OrcInputFormat 和 OrcRawRecordMerger 类中实现的,实质上是一个合并排序的算法。

以下列文件为例,产生这些文件的操作为:

  1. 插入三条记录
  2. 进行一次 Major Compaction
  3. 而后更新两条记录。

1-0-0-1 是对 originalTransaction – bucketId – rowId – currentTra

8、合并算法

对所有数据行依照 (originalTransaction, bucketId, rowId) 正序排列,(currentTransaction) 倒序排列,即:

originalTransaction-bucketId-rowId-currentTransaction

(base_1)1-0-0-1

(delete_2)1-0-1-2# 被跳过(DELETE)

(base_1)1-0-1-1 # 被跳过(以后记录的 row_id(1) 和上条数据一样)

(delete_2)1-0-2-2 # 被跳过(DELETE)

(base_1)1-0-2-1 # 被跳过(以后记录的 row_id(2) 和上条数据一样)

(delta_2)2-0-0-2

(delta_2)2-0-1-2

获取第一条记录;

  1. 如果以后记录的 row_id 和上条数据一样,则跳过;
  2. 如果以后记录的操作类型为 DELETE,也跳过;

通过以上两条规定,对于 1-0-1-2 和 1-0-1-1,这条记录会被跳过;

如果没有跳过,记录将被输入给上游;

反复以上过程。

合并过程是流式的,即 Hive 会将所有文件关上,预读第一条记录,并将 row_id 信息存入到 ReaderKey 类型中。

三、ChunJun 读写 Hive 事务表实战

理解完 Hive 事务表的基本原理后,咱们来为大家分享如何在 ChunJun 中读写 Hive 事务表。

1、事务表数据筹备

— 创立事务表

create table dtstack(

id int,

name string,

age int

)

stored as orc

TBLPROPERTIES(‘transactional’=’true’);

— 插入 10 条测试数据

insert into dtstack (id, name, age)

values (1, “aa”, 11), (2, “bb”, 12), (3, “cc”, 13), (4, “dd”, 14), (5, “ee”, 15),

   (6, "ff", 16), (7, "gg", 17), (8, "hh", 18), (9, "ii", 19), (10, "jj", 20);


2、配置 ChunJun json 脚本

3、提交工作(读写事务表)

启动 Session

/root/wujuan/flink-1.12.7/bin/yarn-session.sh -t $ChunJun_HOME -d

提交 Yarn Session 工作

读取事务表

/root/wujuan/ChunJun/bin/ChunJun-yarn-session.sh -job /root/wujuan/ChunJun/ChunJun-examples/json/hive3/hive3_transaction_stream.json -confProp {\”yarn.application.id\”:\”application_1650792512832_0134\”}

写入事务表

/root/wujuan/ChunJun/bin/ChunJun-yarn-session.sh -job /root/wujuan/ChunJun/ChunJun-examples/json/hive3/stream_hive3_transaction.json -confProp {\”yarn.application.id\”:\”application_1650792512832_0134\”}

依据上一行后果替换 yarn.application.id

三、ChunJun 读写 Hive 事务表源码剖析

压缩器是在 Metastore 境内运行的一组后台程序,用于反对 ACID 零碎。它由 Initiator、Worker、Cleaner、AcidHouseKeeperService 和其余一些组成。

1、Compactor

● Delta File Compaction

在一直的对表批改中,会创立越来越多的 delta 文件,须要这些文件须要被压缩以保障性能。有两种类型的压缩,即 (minor) 小压缩和 (major) 大压缩:

minor 须要一组现有的 delta 文件,并将它们重写为每个桶的一个 delta 文件

major 须要一个或多个 delta 文件和桶的根底文件,并将它们改写成每个桶的新根底文件。major 须要更久,然而成果更好

所有的压缩工作都是在后盾进行的,并不障碍对数据的并发读写。在压缩之后零碎会期待,直到所有旧文件的读都完结,而后删除旧文件。

●Initiator

这个模块负责发现哪些表或分区要进行压缩。这应该在元存储中应用 hive.compactor.initiator.on 来启用。每个 Compact 工作解决一个分区(如果表是未分区的,则解决整个表)。如果某个分区的间断压实失败次数超过 hive.compactor.initiator.failed.compacts.threshold,这个分区的主动压缩调度将进行。

● Worker

每个 Worker 解决一个压缩工作。一个压缩是一个 MapReduce 作业,其名称为以下模式。<hostname>-compactor-<db>.<table>.<partition>。每个 Worker 将作业提交给集群(如果定义了 hive.compactor.job.queue),并期待作业实现。hive.compactor.worker.threads 决定了每个 Metastore 中 Worker 的数量。Hive 仓库中的 Worker 总数决定了并发压缩的最大数量。

● Cleaner

这个过程是在压缩后,在确定不再须要 delta 文件后,将其删除。

● AcidHouseKeeperService

这个过程寻找那些在 hive.txn.timeout 工夫内没有心跳的事务并停止它们。零碎假设发动交易的客户端进行心跳后解体了,它锁定的资源应该被开释。

● SHOW COMPACTIONS

该命令显示以后运行的压实和最近的压实历史(可配置保留期)的信息。这个历史显示从 HIVE-12353 开始可用。

● Compact 重点配置

2、如何 debug Hive

  1. debug hive client

hive –debug

  1. debug hive metastore

hive –service metastore –debug:port=8881,mainSuspend=y,childSuspend=n –hiveconf hive.root.logger=DEBUG,console

  1. debug hive mr 工作

3、读写过滤和 CompactorMR 排序的要害代码

4、Minor&Major 合并源码(CompactorMR Map 类)

四、ChunJun 文件系统将来布局

最初为大家介绍 ChunJun 文件系统将来布局:

● 基于 FLIP-27 优化文件系统

批流对立实现,简略的线程模型,分片和读数据拆散。

● Hive 的分片优化

分片更精细化,粒度更细,充分发挥并发能力

● 欠缺 Exactly Once 语义

增强异常情况健壮性。

● HDFS 文件系统的断点续传

依据分区,文件个数,文件行数等确定端点地位,状态存储在 checkpoint 外面。

● 实时采集文件

实时监控目录下的多个追加文件。

● 文件系统格局的通用性

JSON、CSV、Text、XM、EXCELL 对立抽取公共包。

袋鼠云开源框架钉钉技术交换 qun(30537511),欢送对大数据开源我的项目有趣味的同学退出交换最新技术信息,开源我的项目库地址:https://github.com/DTStack

退出移动版