sstableloader 是 cassandra 提供的 bulkload 工具,可以将 sstable 文件导入到集群中。本文详细介绍其用法和实现原理。
用法
sstableloader 工具在 cassandra 的 bin 目录下面,用法如下:
bin/sstableloader <options> <dir_path>
具体的选项可以参考官方文档的介绍,常见的选项有:
-d, –nodes 目标集群的 nodes
-u, –username 用户名
-pw, –password 密码
-t, –throttle 限速,单位 Mbits/s (默认不限制)
-cph, –connections-per-host 和每个节点建立多少连接
<dir_path>
参数指定要导入的 sstable 文件所在的目录。需要注意的是 sstableloader 会把目录名作为表名,上一级目录名作为 keyspace 名称。例如 sstableloader /whatever/path/test/t ...
这个命令会把数据导入到 test.t 这个表里面。
sstableloader 常见的使用场景包括:
- bulkload 批量写入数据
- 跨集群数据迁移
- 从备份的 snapshot 文件恢复数据
bulkload 批量写入
cassandra 中提供了 SSTableWriter 这个类来实现对 sstable 的写入,使用这个类用户可以不需要关心 sstable 的具体文件格式。需要注意的是使用这个类需要依赖 cassandra-all 而不是 cassandra 的 java driver。如下代码示意了如何使用 SSTableWriter 在本地生成 sstable 文件:
final String KS = "cql_keyspace7";
final String TABLE = "table7";
final String schema = "CREATE TABLE" + KS + "." + TABLE + "("
+ "k int,"
+ "c1 int,"
+ "c2 int,"
+ "v blob,"
+ "PRIMARY KEY (k, c1, c2)"
+ ")";
File tempdir = Files.createTempDir();
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(schema)
.using("INSERT INTO" + KS + "." + TABLE + "(k, c1, c2, v) VALUES (?, ?, ?, textAsBlob(?))")
.build();
writer.addRow(1, 2, 3, "abc");
writer.addRow(4, 5, 6, "efg");
writer.close();
生成文件之后,可以使用 sstableloader 将生成的文件导入到 cassandra 中。使用这种方式写入数据,减少了对服务器的请求量,而且写入本地文件会比向服务器写入数据要快,很适合大批量数据的离线导入。
集群间数据迁移
sstableloader 也可以用来做集群间的数据迁移。具体步骤如下:
1 在目标集群创建要同步的表的 schema。
2 停止源集群写入(针对停机迁移),或是开启增量数据的迁移(针对不停机迁移)。
3 在源集群的每个节点执行 flush:bin/nodetool flush
。
4 在源集群节点上执行 sstableloader 将数据文件导入到目标集群中。
原理
sstableloader 会首先通过 java 客户端与服务器建立连接,并读取 meta 信息。之后在 storage_port 通过 streaming 协议将 sstable 文件发送到各个节点上。在这个过程中,sstableloader 并不是简单的把数据文件拷贝到每个节点,而是根据 meta 中的相关信息,给每个节点发送他所管理的那一段数据。
下面简单介绍一下 cassandra 中的 streaming 协议协议。
streaming 协议
在 Cassandra 中,streaming 协议用来在两个节点之间同步 sstable 中的一段数据的过程,通常用于数据修复或移动的过程。除了 sstableloader 以外,如下场景中也可能会有 streaming 的过程:
- repair
- bootstrap 过程
- gossip 收到和本节点有关的 REMOVED_TOKEN 状态变化
- nodetool 里面会触发数据移动或修复的命令,例如 repair,rebuild,removenode,move
Streaming 过程中两个节点的网络交互如下图所示:
这个过程大致可以分为如下四个阶段:
1 建立连接
2 streaming 准备阶段
3 streaming 阶段
4 完成
1 建立连接
这个阶段主要是建立连接并把连接和 StreamSession 关联起来。
stream 的发起节点创建一个 StreamSession 对象,并建立两个到远端节点的连接,一个用于后续的发送消息,一个用于接收消息。之后会通过这两个连接向远端发送 StreamInit 消息,通知远端节点开启一次 streaming,并标明每个连接的用途。
远端收到 StreamInit 消息后,也会创建自己的 StreamSession 对象,并将收到 StreamInit 消息的两个连接和 StreamSession 关联起来。
连接建立完成后,进入准备阶段。
2 准备阶段
这个阶段主要用于协商节点之间需要传输的文件片段。
发起节点首先发送一个 PrepareMessage,其中包含当前节点会向远端节点发送哪些文件或片段,以及需要对方提供哪些表的哪些 range 的数据。
远端节点收到请求后,会根据请求的 range 查找对应的 sstable,然后向发起节点返回一个 PrepareMessage,其中包含要发送哪些 sstable 的哪些片段,之后远端节点进入 streaming 阶段。
发起节点收到 PrepareMessage 后,记录要接收的 sstable 片段,然后进入 streaming 阶段。
3 streaming 阶段
这个阶段就开始进行文件传输了。发送端和接收端会分别建立相应的任务。
发送端会针对要进行 streaming 的文件,按顺序发送 FileMessage。FileMessage 由消息头 FileMessageHeader 和文件内容的流组成。当所有文件发送完成后,StreamTransferTask 标记为完成。
接收端将收到的文件内容写入 sstable。当一个 StreamReceiveTask 中的所有文件都接收完成后,将 sstable 加入到 ColumnFamilyStore 中。
如果接收过程中发生错误,接收端会发送一个 SessionFailedMessage 给发送端,并关闭 StreamSession。
当所有发送和接收任务都完成后,进入完成阶段。
4 完成阶段
当一个节点完成所有的发送和接收任务后,如果该节点已经收到了 CompleteMessage,则会向对方发送 CompleteMessage 并关闭 session;如果还没有收到 CompleteMessage,则会向对方发送 CompleteMessage 并等待对方返回。
本文作者:_陆豪
阅读原文
本文为云栖社区原创内容,未经允许不得转载。