作者经验了屡次基于HBase实现全量与增量数据的迁徙测试,总结了在应用HBase进行数据迁徙的多种实际,本文针对全量与增量数据迁徙的场景不同,提供了1+2的技巧分享。

HBase全量与增量数据迁徙的办法

1.背景

在HBase应用过程中,应用的HBase集群常常会因为某些起因须要数据迁徙。大多数状况下,能够用离线的形式进行迁徙,迁徙离线数据的形式就比拟容易了,将整个hbase的data存储目录进行搬迁就行,然而当集群数据量比拟多的时候,文件拷贝的工夫很长,对业务影响工夫也比拟长,往往在设计的工夫窗口无奈实现,本文给出一种迁徙思路,能够利用HBase本身的性能,对集群进行迁徙,缩小集群业务中断工夫

2.简介

大家都晓得HBase有snapshot快照的性能,利用快照能够记录某个工夫点表的数据将其保留快照,在须要的时候能够将表数据恢复到打快照工夫时的样子。咱们利用hbase的snapshot能够导出某个工夫点的全量数据。

因为理论的业务还在不停的写入表中,除了迁徙快照工夫点之前的全量数据,咱们还须要将快照工夫点后源源不断的增量数据也迁徙走,这里如果能采纳双写的形式,将数据写入两个集群就好了,然而事实的业务不会这样做,如果这样做还得保障双写的事务一致性。于是能够利用HBase的replication性能,replication性能自身就是保留了源集群的WAL日志记录,去回放写入到目标集群,这样一来用户业务端->原始集群->目标集群便是个串形的数据流,且由HBase来保证数据的正确性。

所以这个迁徙的办法就是利用snapshot迁徙全量数据,利用replication迁徙增量数据。

3.迁徙步骤

上图给出了迁徙的整个工夫线流程,次要有这么5个工夫点。

T0: 配置好老集群A集群到新集群B的Replication关系,Replication的数据由A集群同步到集群B,将表设置成同步,从此刻开始新写入A集群表的数据会保留在WAL日志中;

T1: 生成该工夫点的全量数据,通过创立快照,以及导出快照数据的形式将该工夫点的数据导出到新集群B;

T2: 新集群B将T1时刻的快照数据导入,此时新集群B中会由快照创立出表,此时老集群A集群上设置的Replication的关系会主动开始将T0时刻保留的WAL日志回放至新集群B的表中,开始增量数据同步。

T3: 因为从T0-T3之间的操作会破费一段时间,此时会积攒很多WAL日志文件,须要肯定的工夫来同步至新集群,这里须要去监控一下数据同步状况,等老集群WAL被逐步生产完,此时能够将老集群的写业务进行一下并筹备将读写业务全副切到新集群B。

T4: T3-T4之间应该是个很短的工夫,整个迁徙也只有这个工夫点会有肯定中断,此时是将业务齐全切到新集群B,至此迁徙实现。

4.操作波及的命令

一、设置集群A和集群B的peer关系

在源集群hbase shell中, 设定peer

add_peer 'peer_name','ClusterB:2181:/hbase'

二、在集群A的表中设置replication属性

假如指标表名为Student,先获取Family=f

进入hbase shell中,

alter 'Student',{NAME => 'f',REPLICATION_SCOPE => '1'}

三、给集群A的表创立快照

在hbase shell中

snapshot 'Student','Student_table_snapshot'

四、在A集群中导出快照

hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot Student_table_snapshot -copy-to /snapshot-backup/Student

五、将快照数据搁置到集群B的对应的目录下

下面命令会导出2个目录,一个是快照元数据,一个是原始数据

将元数据放到/hbase/.hbase-snapshot中,将原始数据放到/hbase/archive目录中

因为hbase的archive目录会有个定时清理,这里能够提前将集群B的master的hbase.master.cleaner.interval值设置大点,防止拷贝过程中产生碰巧产生了数据清理。

如果集群B中没有对应的目录,能够提前创立

**hdfs dfs -mkdir -p /hbase/.hbase-snapshot
hdfs dfs -mkdir -p /hbase/archive/data/default/**

挪动导出的snapshot文件到snapshot目录

**hdfs dfs -mv /snapshot-backup/Student/.hbase-snapshot/Student_table_snapshot /hbase/.hbase-snapshot/
hdfs dfs -mv /snapshot-backup/Student/archive/data/default/Student /hbase/archive/data/default**

六、在新集群B中复原表的快照

进入hbase shell

restore_snapshot 'Student_table_snapshot'

复原实现后,记得将集群B的hmaster中hbase.master.cleaner.interval的值调整回来。

HBase增量数据迁徙的办法

1.概览

本章次要是想谈一下如何给HBase做增量数据的迁徙,也就是迁徙实时数据。上文中提到HBase增量数据迁徙能够应用Replication的形式去做,然而在理论搬迁时,要给原集群设置Replication可能须要重启,这样会影响业务,咱们须要做到不停机迁徙才行。

2.WAL原理

失常状况下,HBase新增的数据都是有日志记录的,数据在落盘成HFile之前,任何一个Put和Delete操作都是记录日志并存放在WALs目录中,日志中蕴含了所有曾经写入Memstore但还未Flush到HFile的更改(edits)。

默认状况下每个RegionServer只会写一个日志文件,该RS治理的所有region都在向这一个日志文件写入Put和Delete记录,直到日志文件大小达到128MB(由hbase.regionserver.hlog.blocksize设置)后roll出一个新的日志文件,总共能够roll出32个日志文件(由hbase.regionserver.maxlogs设置)。

如果日志文件未写满128MB,RegionServer距离1小时也会roll出新一个新日志文件(由hbase.regionserver.logroll.period设置)。

当日志文件中波及的所有region的记录都flush成HFile后,这个日志文件就会转移至oldWals目录下归档, Master没距离10分钟(hbase.master.cleaner.interval)会查看oldWALs目录下的过期日志文件,当文件过期时会被Master清理掉,(日志过期工夫由hbase.master.logcleaner.ttl管制)。

RegionServer默认距离1小时(由hbase.regionserver.optionalcacheflushinterval设置)会对它治理的region做一次flush动作,所以WALs目录中始终会有新的日志文件生成,并随同着老的日志文件挪动到oldWALs目录中。

3.迁徙形式

一、迁徙oldWALs目录中的文件,应用WALPlayer回放

因为日志文件文件最终挪动到oldWALs目录下,只须要写个脚本,定时查看oldWALs目录下是否有新文件生成,如果有文件,则move至其余目录,并应用WALPlayer工具对这个目录进行回放。

长处:无代码开发量,仅需脚本实现

毛病:无奈做到实时,因为从数据写入到最初达到oldWAL目录会距离很长时间。

二、开发独立工具,解析日志文件,写入目标集群

在网上查找迁徙办法的时候理解到了阿里开发了一个专门的HBase迁徙工具,能够实现不停机。通过浏览其设计BDS - HBase数据迁徙同步计划的设计与实际理解到阿里开发了利用去读取HBase的WAL日志文件并回放数据至目标集群。

长处:能够做到实时;

毛病:须要肯定的代码开发量;

要做出这样一个工具,须要理解下面说的WAL文件归档的原理以及日志回放工具WALPlayer,上面简略说一下能够怎么去实现。

独立工具实现

这里简略阐明下如何去做这样一个工具,只介绍读取WAL方面,工作编排就不形容了:

1、定时扫描WALs目录获取所有的日志文件,这里按ServerName去分组获取,每个分组内依据WAL文件上的工夫戳排序:

● 获取所有RS的ServerName

ClusterStatus clusterStatus = admin.getClusterStatus();Collection<ServerName> serverNames = clusterStatus.getServers();

● 依据ServerName去组成Path获取日志

Path rsWalPath = new Path(walPath, serverName.getServerName());List<FileStatus> hlogs = getFiles(fs, rsWalPath, Long.MIN_VALUE, Long.MAX_VALUE);

● getFiles()参考HBase源码中WALInputFormat.java中的实现,能够指定工夫范畴去取日志文件

private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)    throws IOException {  List<FileStatus> result = new ArrayList<FileStatus>();  LOG.debug("Scanning " + dir.toString() + " for WAL files");  FileStatus[] files = fs.listStatus(dir);  if (files == null) return Collections.emptyList();  for (FileStatus file : files) {    if (file.isDirectory()) {      // recurse into sub directories      result.addAll(getFiles(fs, file.getPath(), startTime, endTime));    } else {      String name = file.getPath().toString();      int idx = name.lastIndexOf('.');      if (idx > 0) {        try {          long fileStartTime = Long.parseLong(name.substring(idx+1));          if (fileStartTime <= endTime) {            LOG.info("Found: " + name);            result.add(file);          }        } catch (NumberFormatException x) {          idx = 0;        }      }      if (idx == 0) {        LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");      }    }  }  return result;}

2、对于取到的每一个WAL文件,当做一个工作Task执行迁徙,这个task次要有上面一些步骤:

● 应用WALFactory为每个日志文件创建一个Reader

WAL.Reader walReader = WALFactory.createReader(fileSystem, curWalPath, conf);

● 通过Reader去读取key和edit,并记录下position,为了放慢写入速度,这里能够优化为读取多个entry

WAL.Entry entry = walReader.next();WALKey walKey = entry.getKey();WALEdit walEdit = entry.getEdit();long curPos = reader.getPosition();

● 记录position的目标是为了Reader的reset以及seek,因为这个原始WAL文件还正在写入的时候,咱们的Reader速度很可能大于原WAL的写入速度,当Reader读到底的时候,须要期待一段时间reset而后再从新读取entry

WAL.Entry nextEntry = reader.next();if (nextEntry == null) {    LOG.info("Next entry is null, sleep 10000ms.");    Thread.sleep(5000);    curPos = reader.getPosition();    reader.reset();    reader.seek(curPos);    continue;}

● 在读取WAL的过程中很可能会遇到日志转移到oldWALs目录下,这个时候捕捉到FileNotFoundException时,须要从新生成一个oldWALs目录下Reader,而后设置curPos持续读取文件,这个时候如果再次读取到文件最初的时候,就能够敞开Reader了,因为oldWALs中的日志文件是固定大小的,不会再有追加数据。

这里须要留神的是这个参数hbase.master.logcleaner.ttl不能设置过小,否则会呈现这个在oldWALs目录下的日志文件还没读取完被清理掉了。

Path oldWALPath = new Path(oldWalPath, walFileName);WAL.Reader reader = WALFactory.createReader(fileSystem, oldWALPath, conf);reader.seek(curPos)

● 依据通过WAL.Reader能够读取到walKey,walEdit进而解析出Cell并写入目标集群,这个能够参考WALPlay的map()办法。