目录

HBase数据迁移实战

概述

本文简单介绍如何做 HBase 的数据迁移。

前期准备

确认集群使用的版本

源 HBase 集群(以下称旧集群)和目的 HBase 集群(以下称新集群)的版本可能并不是一致的,特别是其底层所使用的 HDFS 版本信息。譬如这样一个数据迁移场景: 业务希望从低版本的 HBase 集群(0.94.x)迁移到当前稳定的 HBase 集群(1.2.x),因为新版本的 HBase 有新特性,Bug 更少,稳定性和可运维行更优。理论上新版本会兼容老版本的 API,但是如果彼此版本差距过大,可能会出现 HDFS RPC 版本不一致(一般来说是 Protobuf 的版本),那么集群之间的迁移就会因为彼此与对方的 NameNode 无法进行通信而无法进行。 这种情况下,需要先考虑升级低版本的 HDFS。

确认集群是否开启Kerberos认证

这里会有三种可能情况:一是都未开启认证,二是都开启认证,三是一个开了,另一个没开。前两种情况按照正常的认证(或非认证)的配置操作即可,第三种情况则需要在开启了认证的集群上开启 ipc.client.fallback-to-simple-auth-allowed 参数,意即在使用 Kerberos 的方式访问非 Kerberos 的集群时,系统自动转换成简单认证,否则会出现认证问题。Kerberos 的配置和使用本文不做展开。

确认操作账号的读写权限问题

要在不同的 HBase 集群之间做数据迁移,必然要涉及到不同集群的读写权限问题。HBase 使用 ACL 来管理不同数据表的读写权限,在开启了 Kerberos 认证的环境下,还需要验证 Kerberos;而 HBase 使用的 HDFS 自身也有一套类似的权限管理规则,当两个集群配置不同时(如部署账号不一致),极易出现冲突。在迁移前需要确认两个集群的管理员账号(HDFS 和 HBase 账号)是否一致,如果不一致,需要开通权限。

开启YARN服务

数据迁移任务本质上是一个 MapRedcue 任务,故需要在一个集群上开启 YARN 服务。如何选择在哪个集群上开呢?建议是在新集群上开,因为旧集群上可能还需要继续跑线上业务,在上面起大量 Map 任务并把数据远程写入到新集群,会对线上业务带来较大的性能影响;而新集群较大可能是一个独立集群,尚没有业务运行,在其上运行 Map 任务通过网络从旧集群中拉数据到本地写入,性价比更高,且对线上业务的侵入性更低。

在 HBase 集群上配置 YARN 服务可以查阅其安装部署文档,这里不做展开。

确认数据迁移的SLA

数据迁移是否是在线迁移,即业务不能中断。若业务允许做离线迁移,可以先将该表 Disable 后再做迁移,然后在新集群上重新 clone 成新表即可;但若需要在线进行迁移,则需要提前新集群上生成对应的 HBase 表,开启 ACL 权限等操作,并让业务开启数据双写,确保两个集群的数据在迁移时刻之后的数据是一致的。因为数据迁移和后续的数据合并耗时都很长,如果不开启双写,是无法达到数据一致性要求的。所以大多数情况下,业务都是要求数据迁移以在线方式进行。

源集群开启Snapshot

HBase 的 Snapshot 是从0.94.6之后才引入的特性,开启 Snapshot 特性需要开启 hbase.snapshot.enabled (默认已开启)。如果没有开启该特性,则需要重启服务以开启该特性;如果版本过低,就只能使用其他对业务影响较大的 CopyTable/ExportTable(需要 Disable 表)才能操作。

生成HBase表和Region

完成前期的验证和准备工作后,就可以在新集群中创建待迁移的目的表和域(以下称 Region)了。 因为迁移过程中业务需要开启双写,所以目的表结构必须和源表是一致;同时源数据表可能已经存在多个 Region 了,那么目的表也必须提前规划好这些 Region,以免双写期间出现 Region 数量不足出现热点或者 Region 内文件数过多频繁 Compact 导致线上业务出现性能问题。 这里详细说明下如何正确创建一个带有多个 Region 的表。

使用RegionSplitter生成表

如果新建一个自带多个 Region 的表,可以使用以下命令:

例1. 生成一个表 t1, 有30个 region,且表有一个列族"d",则使用

1
bin/hbase org.apache.hadoop.hbase.util.RegionSplitter t1 UniformSplit -c 30 -f d

例2. 生成一个表 t2, 有10个 region,有两个列族 d1,d2, 其起始 rowkey 是'0'

1
bin/hbase org.apache.hadoop.hbase.util.RegionSplitter t2 UniformSplit -c 10 -f d1:d2 --firstrow '0'

使用HBase Shell生成表

使用 HBase shell 中的 create 命令也可以直接生成多个 Regions,前提是必须要指定 split keys

例3. 生成一个表 t3,按照'10’,‘20’,‘30’,‘40’为Regions的split keys

1
create 't3', 'f1', SPLITS => ['10', '20', '30', '40']

整个表划分成5个Region,其起始和结束key分别是 [‘0’,‘10’],[‘10’,‘20’],[‘20’,‘30’],[‘30’,‘40’],[‘40’,-]

将已有的表重新切分或合并

如果一个表的 Region 范围过大,可以使用 split 来将其切分成两个子 Region

1
split 't1', '1' split '110e80fecae753e848eaaa08843a3e87', '\x001'

同理,如果表的 Region 过于零散,可以使用 merge_region 来进行合并

1
hbase> merge_region 'ENCODED_REGIONNAME', 'ENCODED_REGIONNAME'  hbase> merge_region 'ENCODED_REGIONNAME', 'ENCODED_REGIONNAME', true

具体的操作命令用户可以自己查看 HBase 的相关文档来了解。

注:在为迁移的目的表划分多个 Region 时,其 StartEndKey 最好和旧 HBase 集群中源表分布一致。这样后续文件加载时,不需要额外进行过多的拆分,可以节省载入时间。另外有一点需要说明,使用 splitKey 时,系统不支持 Hex 字符串。 如果想用 HexString 来作为自己 Region 的 StartKey,则需要对 HBase 的客户端代码进行简单的修改。简单来说需要能支持 Bytes.toBinaryString() 方法, 但系统读取后都是直接用 Byets.toBytes() 方法。 具体的代码实现,可以私聊。

Snapshot机制与使用

HBase 快照是一份指向多个 HFile 文件的元数据文件。在执行 snapshot 命令时,不会触发任何的 HBase 数据操作,所以这个命令非常高效。使用快照来恢复或克隆一个表也非常快,因为它只需要引用已有的 HFile 文件即可。所以使用 Snapshot 进行数据迁移的优势就是备份和拷贝数据对线上服务没有影响,或者影响极低。其流程如下:

  1. 执行 snapshot 命令时,Master 会从自己管理的 meta 信息中,找到该表所在 RegionServer,然后下发该命令到相应的一个或多个 RegionServer(RS)
  2. RS 负责生成 HFile 文件引用,同时会获取其 Region 的 HFile 文件信息,将当前文件的大小写入到 manifest 文件中。
  3. HFile 文件是使用 Append 方式来添加的,所以某一个时刻的文件大小相当于记录了一份当前时刻的文件偏移量。 恢复时,系统也只会读到该偏移量的位置。 如果想再次对该表做快照,那么文件引用的偏移量会正确设置为当前 HFile 的大小。
  4. Snapshot 命令有一个 skipFlush 参数,设置为 true 时,会强制将 RS 的 MemStore 里内容刷到磁盘中,可能会造成 RS 短暂的中止服务。时间长短视内存中的数据量而定。 在这里我们不需要靠强制刷新出内存中的数据来保证数据完整性,理由如下:

如果我们使用的是停服迁移的方式,那么使用快照时内存中是没有数据写入的。 如果使用的双写迁移的方案,则快照时存在于内存中的那部分数据实际上会被双写到另外的集群中,同样不会有数据丢失的问题。

创建快照

1
hbase> snapshot 'sourceTable', 'snapshotName'  hbase> snapshot 'namespace:sourceTable', 'snapshotName', {SKIP_FLUSH => true}

查看快照

1
2
hbase> list_snapshots
hbase> list_snapshots 'abc.*'

克隆快照

1
hbase> clone_snapshot 'snapshotName', 'tableName'  hbase> clone_snapshot 'snapshotName', 'namespace:tableName'

生成 snapshot 后,可以通过 hadoop 的 Shell 命令来查看到对应的 snapshot 目录

1
2
3
4
# bin/hadoop fs -ls /hbase/.hbase-snapshot/newSnapshot
 Found 2 items
 -rw-r--r--   3 xxx xxx         35 2017-04-24 21:58 /hbase/.hbase-snapshot/newSnapshot/.snapshotinfo
 -rw-r--r--   3 xxx xxx        486 2017-04-24 21:58 /hbase/.hbase-snapshot/newSnapshot/data.manifest

使用ExportSnapshot工具迁移快照数据

ExportSnapshot 是 HBase 提供的 Snapshot 迁移工具,其使用方法见下图:

可以看出,这个工具的参数列表和 HDFS 的 DistCp 工具很类似。其简要流程如下:

首先通过 HDFS 的 cp 方法,将 /.hbase-snapshot/newSnapshot 目录拷贝至新集群上 然后将 /hbase/data/<table> 下面的数据文件通过 MapReduce 的方式(DistCp)拷贝至 新集群的 /hbase/archive/data/<tablename>

最后检查 snapshot 相关文件的完整性

数据合并方式

一旦数据迁移到了新集群,我们可以通过 clone_snapshot 命令重新生成该表,如果业务是一个可以支持离线迁移的,那迁移工作也就算完成了。更多的情况是,业务开了双写,即老集群和新集群同时在更新数据,我们需要把迁移后的数据进行合并。这里有三种方法。

使用Phoenix SQL导入

需要在新集群上开启 Phoenix 支持(如何安装 Phoenix 见相关文档)

假设双写的新表为A’,A’表必须使用 Phoenix 的接口来创建。其使用方法与常规的 SQL 语法类似,但是要注意其 splitKey 的用法:

例1. 创建一个表 t1,只有一个列族 f1,有1个修饰字段 body, splitKey 为 [‘a’,‘b’,‘c’]

1
2
3
CREATE TABLE IF NOT EXISTS t1
    ( "id" char(10) not null primary key, "f1".body varchar)
    DATA_BLOCK_ENCODING='NONE',VERSIONS=5,MAX_FILESIZE=2000000 split on ('a', 'b', 'c')

使用 clone_snapshot 命令将迁移的数据重新生成一个 HBase 表B,然后再使用 Phoenix 的 DDL 重新生成B表(和实际的表B不会冲突,Phoenix 的元数据存在另外的目录下),最后使用 UPSERT SELECT 命令将B表中的数据插入到双写的A’表即可

1
UPSERT INTO A'("id","f1".body) SELECT "id","f1".body FROM B;

注: 使用Phoenix的问题在于,原来的业务模式需要做较大改动以适应新的JDBC访问HBase方式。

使用MapReduce导入

使用 MapReduce 导入需要有 YARN 服务支持,同样需要先使用 clone_snapshot 命令将迁移的数据重新生成一个 HBase 表。

用户需要使用 HBase 的 API 从表中读取记录,然后插入到新的表,这个方法实际上就是上面 Phoenix 的底层实现。如果集群没有安装 Phoenix 插件,可以用这个方法。但这个方法的缺点也是显而易见,需要自己写代码来实现在一个 Map 中实现上述操作,且如何切分 RowKey 到各个 Map 任务中也是一个不小的难题。

使用IncrementLoadHFile工具

顾名思义,这个工具可以实现往 HBase 表中添加 HFile 来实现数据的批量写入,其使用方法如下:

1
2
3
4
bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
usage: completebulkload /path/to/hfileoutputformat-output tablename
 -Dcreate.table=no - can be used to avoid creation of table by this tool
  Note: if you set this to 'no', then the target table must already exist in HBase

这个工具的使用说明很简单,只需要提供一个 HFile 文件所在的 Hdfs 路径名和所需要写入的 HBase Table 名即可。

例1. 将 /tmp/hbase/archive/data/test/test/f8510124151cabf704bc02c9c7e687f6 目录下的 HFile 文件加载到 test:test 表中

1
bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /tmp/hbase/archive/data/test/test/f8510124151cabf704bc02c9c7e687f6 test:test

其实现原理说明如下:

  1. 首先确认目录下的 HFile 文件是否合法,得到一个文件列表。
  2. 从列表中获取某一个 HFile,获取文件的起始和结束 rowKey
  3. 查找要导入到新表的各个 Region,得到其 StartEndKeys(每次都是重新重启)
  4. 根据 HFile 的 startKey,判断该 HFile 应该插入的 Region 位置
  5. 以该 HFile 的起始 rowkey 所要插入的 Region 的 EndKey 将文件切成 top 和 bottom 两部分,将这2个文件加入到待加载的文件列表中
  6. 使用 SecureBulkLoadHFile 方法一次性批量加载这些文件,如果其中有文件加载失败,则方法失败,返回异常的文件列表。
  7. 将返回的异常文件,加入到循环加载的文件列表里。

继续重复2-7整个流程,直至完成加载或达到重试阈值而异常退出。

SecureLoadHFile 的原理也很简单,它是一个原子操作,所以操作过程中会有短暂的卡顿。

检查待加载的 HFiles 是否来自多个列族,需要对多个列族同时加锁以保障一致性。 检查该次操作是否满足相关权限要求,同时也会将对应的 HFile 文件进行权限变更操作 完成 HFile 文件加载,新的 HFile 引用被加入到 Region 的 StoreFile 列表中。 对这块逻辑感兴趣的同学,可以自行查阅 SecureBulkLoadEndpoint,HRegion,HStore 等类。

关于如何减少批量加载的时间,有以下几点需要注意:

如果新集群上的 regions 的起始和结束 rowkey 分布正好和旧集群一致,那么使用批量加载 HFile 的方式可以最快的方式来合并到线上表中。否则就需要针对新的 Region 来拆分 HFile。

调整 hbase.hregion.max.filesize 参数,该参数用于控制一个 Region 下的最大 HFile 的文件大小,超过该值后,系统会强制拆分这个文件。新旧集群上这个参数配置可能会不一致,为了尽快完成加载,可以考虑将其设置成一致,或者新集群上的配置更大,这样也能减少加载时间 该工具默认的重试次数是10次,即一个 Hfile 如果拆分次数超过10次,就会放弃本次批量加载。 需要注意留意日志

数据验证

HFile 文件增量加载更新完毕之后,进行数据验证流程。因为数据量实在太大,不可能对两边的 HBase 表中的记录做一一比对,故可进行抽样验证。按照 Snapshot 和双写的机制来讲,数据可能有重复,但不可能存在丢失的情况。验证算法描述如下:

按照迁移过程中不同阶段,划分成不同的时间区域。 对于每一个时间区间,选一个子区间作为样本 选定旧集群中的 A 表,获取其 Region 信息,得到每个 Region 的起始 rowkey

按照每个 Region 的起始 rowkey, 顺序查找 N 条该区间中的 rowkey 记录

根据上一步拿到的 rowkey 到相应的测试表中(A’)中查找是否能找到匹配的记录。

找到记录后,对比相应 Column+Cell 信息,能够都完全匹配即为一个匹配的记录(因为双写的原因,时间戳会有先后,故不判断时间戳。rowkey 已能满足要求)

使用 Java 连接 Kerberized HBase 的需要的配置如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://test1.163.org:8020</value>
    </property>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://test1.163.org:8020/hbase</value>
    </property>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>test1.163.org,test2.163.org,test3.163.org</value>
    </property>
    <property>
        <name>zookeeper.znode.parent</name>
        <value>/hbase</value>
    </property>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <property>
        <name>hadoop.security.authorization</name>
        <value>true</value>
    </property>
    <property>
        <name>hadoop.security.authentication</name>
        <value>kerberos</value>
    </property>
    <property>
        <name>hbase.rpc.timeout</name>
        <value>180000</value>
    </property>
    <property>
        <name>hbase.client.operation.timeout</name>
        <value>120000</value>
    </property>
    <property>
        <name>hbase.security.authentication</name>
        <value>kerberos</value>
    </property>
    <property>
        <name>hbase.security.authorization</name>
        <value>true</value>
    </property>
    <property>
        <name>dfs.namenode.principal</name>
        <value>hdfs/_HOST@HADOOP.HZ.NETEASE.COM</value>
    </property>
    <property>
        <name>hbase.master.kerberos.principal</name>
        <value>hbase/_HOST@HADOOP.HZ.NETASE.COM</value>
    </property>
    <property>
        <name>hbase.regionserver.kerberos.principal</name>
        <value>hbase/_HOST@HADOOP.HZ.NETASE.COM</value>
    </property>
    <property>
        <name>hbase.client.scanner.caching</name>
        <value>100000</value>
    </property>
</configuration>

认证模块的代码片段如下(仅作参考)

1
2
3
4
5
6
7
8
Configuration configuration = HBaseConfiguration.create();
configuration.addResource("hbase-site.xml");
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab("principal", "keytab.path");

TableName tableName = TableName.valueOf("hbase.table.name"));
Connection connection = ConnectionFactory.createConnection(configuration);
HTable table = (HTable) connection.getTable(tableName);

事后操作

因为批量加载操作会对原 HFile 文件进行多次拷贝,拆分等操作,会消耗大量的 HDFS 存储资源和物理机磁盘空间。 在数据合并完成并验证后,可以清理掉这些临时结果。此外如果在加载过程中出现了较多的自动 Region 切分,也可在此时重新将小 Region 进行合并。 最后在新集群上专门为数据迁移开启的 YARN 服务也可以停掉了,减少对 HBase 服务的影响。

警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。