关于大数据:全量增量数据在HBase迁移的多种技巧实践

10次阅读

共计 5751 个字符,预计需要花费 15 分钟才能阅读完成。

作者经验了屡次基于 HBase 实现全量与增量数据的迁徙测试,总结了在应用 HBase 进行数据迁徙的多种实际,本文针对全量与增量数据迁徙的场景不同,提供了 1 + 2 的技巧分享。

HBase 全量与增量数据迁徙的办法

1. 背景

在 HBase 应用过程中,应用的 HBase 集群常常会因为某些起因须要数据迁徙。大多数状况下,能够用离线的形式进行迁徙,迁徙离线数据的形式就比拟容易了,将整个 hbase 的 data 存储目录进行搬迁就行,然而当集群数据量比拟多的时候,文件拷贝的工夫很长,对业务影响工夫也比拟长,往往在设计的工夫窗口无奈实现,本文给出一种迁徙思路,能够利用 HBase 本身的性能,对集群进行迁徙,缩小集群业务中断工夫

2. 简介

大家都晓得 HBase 有 snapshot 快照的性能,利用快照能够记录某个工夫点表的数据将其保留快照,在须要的时候能够将表数据恢复到打快照工夫时的样子。咱们利用 hbase 的 snapshot 能够导出某个工夫点的全量数据。

因为理论的业务还在不停的写入表中,除了迁徙快照工夫点之前的全量数据,咱们还须要将快照工夫点后源源不断的增量数据也迁徙走,这里如果能采纳双写的形式,将数据写入两个集群就好了,然而事实的业务不会这样做,如果这样做还得保障双写的事务一致性。于是能够利用 HBase 的 replication 性能,replication 性能自身就是保留了源集群的 WAL 日志记录,去回放写入到目标集群,这样一来用户业务端 -> 原始集群 -> 目标集群便是个串形的数据流,且由 HBase 来保证数据的正确性。

所以这个迁徙的办法就是利用 snapshot 迁徙全量数据,利用 replication 迁徙增量数据。

3. 迁徙步骤

上图给出了迁徙的整个工夫线流程,次要有这么 5 个工夫点。

T0: 配置好老集群 A 集群到新集群 B 的 Replication 关系,Replication 的数据由 A 集群同步到集群 B,将表设置成同步,从此刻开始新写入 A 集群表的数据会保留在 WAL 日志中;

T1: 生成该工夫点的全量数据,通过创立快照,以及导出快照数据的形式将该工夫点的数据导出到新集群 B;

T2: 新集群 B 将 T1 时刻的快照数据导入,此时新集群 B 中会由快照创立出表,此时老集群 A 集群上设置的 Replication 的关系会主动开始将 T0 时刻保留的 WAL 日志回放至新集群 B 的表中,开始增量数据同步。

T3: 因为从 T0-T3 之间的操作会破费一段时间,此时会积攒很多 WAL 日志文件,须要肯定的工夫来同步至新集群,这里须要去监控一下数据同步状况,等老集群 WAL 被逐步生产完,此时能够将老集群的写业务进行一下并筹备将读写业务全副切到新集群 B。

T4: T3-T4 之间应该是个很短的工夫,整个迁徙也只有这个工夫点会有肯定中断,此时是将业务齐全切到新集群 B,至此迁徙实现。

4. 操作波及的命令

一、设置集群 A 和集群 B 的 peer 关系

在源集群 hbase shell 中, 设定 peer

add_peer ‘peer_name’,’ClusterB:2181:/hbase’

二、在集群 A 的表中设置 replication 属性

假如指标表名为 Student,先获取 Family=f

进入 hbase shell 中,

alter ‘Student’,{NAME => ‘f’,REPLICATION_SCOPE => ‘1’}

三、给集群 A 的表创立快照

在 hbase shell 中

snapshot ‘Student’,’Student_table_snapshot’

四、在 A 集群中导出快照

hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot Student_table_snapshot -copy-to /snapshot-backup/Student

五、将快照数据搁置到集群 B 的对应的目录下

下面命令会导出 2 个目录,一个是快照元数据,一个是原始数据

将元数据放到 /hbase/.hbase-snapshot 中,将原始数据放到 /hbase/archive 目录中

因为 hbase 的 archive 目录会有个定时清理,这里能够提前将集群 B 的 master 的 hbase.master.cleaner.interval 值设置大点,防止拷贝过程中产生碰巧产生了数据清理。

如果集群 B 中没有对应的目录,能够提前创立

**hdfs dfs -mkdir -p /hbase/.hbase-snapshot
hdfs dfs -mkdir -p /hbase/archive/data/default/**

挪动导出的 snapshot 文件到 snapshot 目录

**hdfs dfs -mv /snapshot-backup/Student/.hbase-snapshot/Student_table_snapshot /hbase/.hbase-snapshot/
hdfs dfs -mv /snapshot-backup/Student/archive/data/default/Student /hbase/archive/data/default**

六、在新集群 B 中复原表的快照

进入 hbase shell

restore_snapshot ‘Student_table_snapshot’

复原实现后,记得将集群 B 的 hmaster 中 hbase.master.cleaner.interval 的值调整回来。

HBase 增量数据迁徙的办法

1. 概览

本章次要是想谈一下如何给 HBase 做增量数据的迁徙,也就是迁徙实时数据。上文中提到 HBase 增量数据迁徙能够应用 Replication 的形式去做,然而在理论搬迁时,要给原集群设置 Replication 可能须要重启,这样会影响业务,咱们须要做到不停机迁徙才行。

2.WAL 原理

失常状况下,HBase 新增的数据都是有日志记录的,数据在落盘成 HFile 之前,任何一个 Put 和 Delete 操作都是记录日志并存放在 WALs 目录中,日志中蕴含了所有曾经写入 Memstore 但还未 Flush 到 HFile 的更改 (edits)。

默认状况下每个 RegionServer 只会写一个日志文件,该 RS 治理的所有 region 都在向这一个日志文件写入 Put 和 Delete 记录,直到日志文件大小达到 128MB(由 hbase.regionserver.hlog.blocksize 设置) 后 roll 出一个新的日志文件,总共能够 roll 出 32 个日志文件 (由 hbase.regionserver.maxlogs 设置)。

如果日志文件未写满 128MB,RegionServer 距离 1 小时也会 roll 出新一个新日志文件(由 hbase.regionserver.logroll.period 设置)。

当日志文件中波及的所有 region 的记录都 flush 成 HFile 后,这个日志文件就会转移至 oldWals 目录下归档,Master 没距离 10 分钟(hbase.master.cleaner.interval)会查看 oldWALs 目录下的过期日志文件,当文件过期时会被 Master 清理掉,(日志过期工夫由 hbase.master.logcleaner.ttl 管制)。

RegionServer 默认距离 1 小时(由 hbase.regionserver.optionalcacheflushinterval 设置)会对它治理的 region 做一次 flush 动作,所以 WALs 目录中始终会有新的日志文件生成,并随同着老的日志文件挪动到 oldWALs 目录中。

3. 迁徙形式

一、迁徙 oldWALs 目录中的文件,应用 WALPlayer 回放

因为日志文件文件最终挪动到 oldWALs 目录下,只须要写个脚本,定时查看 oldWALs 目录下是否有新文件生成,如果有文件,则 move 至其余目录,并应用 WALPlayer 工具对这个目录进行回放。

长处:无代码开发量,仅需脚本实现

毛病:无奈做到实时,因为从数据写入到最初达到 oldWAL 目录会距离很长时间。

二、开发独立工具,解析日志文件,写入目标集群

在网上查找迁徙办法的时候理解到了阿里开发了一个专门的 HBase 迁徙工具,能够实现不停机。通过浏览其设计 BDS – HBase 数据迁徙同步计划的设计与实际理解到阿里开发了利用去读取 HBase 的 WAL 日志文件并回放数据至目标集群。

长处:能够做到实时;

毛病:须要肯定的代码开发量;

要做出这样一个工具,须要理解下面说的 WAL 文件归档的原理以及日志回放工具 WALPlayer,上面简略说一下能够怎么去实现。

独立工具实现

这里简略阐明下如何去做这样一个工具,只介绍读取 WAL 方面,工作编排就不形容了:

1、定时扫描 WALs 目录获取所有的日志文件,这里按 ServerName 去分组获取,每个分组内依据 WAL 文件上的工夫戳排序:

● 获取所有 RS 的 ServerName

ClusterStatus clusterStatus = admin.getClusterStatus();
Collection<ServerName> serverNames = clusterStatus.getServers();

● 依据 ServerName 去组成 Path 获取日志

Path rsWalPath = new Path(walPath, serverName.getServerName());
List<FileStatus> hlogs = getFiles(fs, rsWalPath, Long.MIN_VALUE, Long.MAX_VALUE);

● getFiles() 参考 HBase 源码中 WALInputFormat.java 中的实现,能够指定工夫范畴去取日志文件

private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
    throws IOException {List<FileStatus> result = new ArrayList<FileStatus>();
  LOG.debug("Scanning" + dir.toString() + "for WAL files");

  FileStatus[] files = fs.listStatus(dir);
  if (files == null) return Collections.emptyList();
  for (FileStatus file : files) {if (file.isDirectory()) {
      // recurse into sub directories
      result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
    } else {String name = file.getPath().toString();
      int idx = name.lastIndexOf('.');
      if (idx > 0) {
        try {long fileStartTime = Long.parseLong(name.substring(idx+1));
          if (fileStartTime <= endTime) {LOG.info("Found:" + name);
            result.add(file);
          }
        } catch (NumberFormatException x) {idx = 0;}
      }
      if (idx == 0) {LOG.warn("File" + name + "does not appear to be an WAL file. Skipping...");
      }
    }
  }
  return result;
}

2、对于取到的每一个 WAL 文件,当做一个工作 Task 执行迁徙,这个 task 次要有上面一些步骤:

● 应用 WALFactory 为每个日志文件创建一个 Reader

WAL.Reader walReader = WALFactory.createReader(fileSystem, curWalPath, conf);

● 通过 Reader 去读取 key 和 edit,并记录下 position,为了放慢写入速度,这里能够优化为读取多个 entry

WAL.Entry entry = walReader.next();
WALKey walKey = entry.getKey();
WALEdit walEdit = entry.getEdit();
long curPos = reader.getPosition();

● 记录 position 的目标是为了 Reader 的 reset 以及 seek,因为这个原始 WAL 文件还正在写入的时候,咱们的 Reader 速度很可能大于原 WAL 的写入速度,当 Reader 读到底的时候,须要期待一段时间 reset 而后再从新读取 entry

WAL.Entry nextEntry = reader.next();
if (nextEntry == null) {LOG.info("Next entry is null, sleep 10000ms.");
    Thread.sleep(5000);
    curPos = reader.getPosition();
    reader.reset();
    reader.seek(curPos);
    continue;
}

● 在读取 WAL 的过程中很可能会遇到日志转移到 oldWALs 目录下,这个时候捕捉到 FileNotFoundException 时,须要从新生成一个 oldWALs 目录下 Reader,而后设置 curPos 持续读取文件,这个时候如果再次读取到文件最初的时候,就能够敞开 Reader 了,因为 oldWALs 中的日志文件是固定大小的,不会再有追加数据。

这里须要留神的是这个参数 hbase.master.logcleaner.ttl 不能设置过小,否则会呈现这个在 oldWALs 目录下的日志文件还没读取完被清理掉了。

Path oldWALPath = new Path(oldWalPath, walFileName);
WAL.Reader reader = WALFactory.createReader(fileSystem, oldWALPath, conf);
reader.seek(curPos)

● 依据通过 WAL.Reader 能够读取到 walKey,walEdit 进而解析出 Cell 并写入目标集群,这个能够参考 WALPlay 的 map() 办法。

正文完
 0