离线批量数据通道Tunnel的最佳实践及常见问题

19次阅读

共计 6004 个字符,预计需要花费 16 分钟才能阅读完成。

基本介绍及应用场景 Tunnel 是 MaxCompute 提供的离线批量数据通道服务,主要提供大批量离线数据上传和下载,仅提供每次批量大于等于 64MB 数据的场景,小批量流式数据场景请使用 DataHub 实时数据通道以获得更好的性能和体验。
SDK 上传最佳实践 import java.io.IOException;import java.util.Date;
import com.aliyun.odps.Column;import com.aliyun.odps.Odps;import com.aliyun.odps.PartitionSpec;import com.aliyun.odps.TableSchema;import com.aliyun.odps.account.Account;import com.aliyun.odps.account.AliyunAccount;import com.aliyun.odps.data.Record;import com.aliyun.odps.data.RecordWriter;import com.aliyun.odps.tunnel.TableTunnel;import com.aliyun.odps.tunnel.TunnelException;import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
public class UploadSample {private static String accessId = “<your access id>”; private static String accessKey = “<your access Key>”; private static String odpsUrl = “http://service.odps.aliyun.com/api”;
private static String project = “<your project>”; private static String table = “<your table name>”; private static String partition = “<your partition spec>”;
public static void main(String args[]) {// 准备工作,仅需做一次 Account account = new AliyunAccount(accessId, accessKey); Odps odps = new Odps(account); odps.setEndpoint(odpsUrl); odps.setDefaultProject(project); TableTunnel tunnel = new TableTunnel(odps);
try {
// 确定写入分区
PartitionSpec partitionSpec = new PartitionSpec(partition);
// 在服务端创建一个在本表本分区上有效期 24 小时的 session,24 小时内该 session 可以共计上传 20000 个 Block 数据
// 创建 Session 的时耗为秒级,会在服务端使用部分资源、创建临时目录等,操作较重,因此强烈建议同一个分区数据尽可能复用 Session 上传。
UploadSession uploadSession = tunnel.createUploadSession(project,
table, partitionSpec);
System.out.println(“Session Status is : ”
+ uploadSession.getStatus().toString());
TableSchema schema = uploadSession.getSchema();
// 准备数据后打开 Writer 开始写入数据,准备数据后写入一个 Block,每个 Block 仅能成功上传一次,不可重复上传,CloseWriter 成功代表该 Block 上传完成,失败可以重新上传该 Block,同一个 Session 下最多允许 20000 个 BlockId,即 0 -19999,若超出请 CommitSession 并且再创建一个新 Session 使用,以此类推。
// 单个 Block 内写入数据过少会产生大量小文件 严重影响计算性能,强烈建议每次写入 64MB 以上数据 (100GB 以内数据均可写入同一 Block)
// 可通过数据的平均大小与记录数量大致计算总量即 64MB < 平均记录大小 * 记录数 < 100GB

// maxBlockID 服务端限制为 20000,用户可以根据自己业务需求,每个 Session 使用一定数量的 block 例如 100 个,但是建议每个 Session 内使用 block 越多越好,因为创建 Session 是一个很重的操作
// 如果创建一个 Session 后仅仅上传少量数据,不仅会造成小文件、空目录等问题,还会严重影响上传整体性能(创建 Session 花费秒级,真正上传可能仅仅用了十几毫秒)
int maxBlockID = 20000;
for (int blockId = 0; blockId < maxBlockID; blockId++) {
// 准备好至少 64MB 以上数据,准备完成后方可写入
// 例如:读取若干文件或者从数据库中读取数据
try {
// 在该 Block 上创建一个 Writer,writer 创建后任意一段时间内,若某连续 2 分钟没有写入 4KB 以上数据,则会超时断开连接
// 因此建议在创建 writer 前在内存中准备可以直接进行写入的数据
RecordWriter recordWriter = uploadSession.openRecordWriter(blockId);

// 将读取到的所有数据转换为 Tunnel Record 格式并切入
int recordNumber = 1000000;
for (int index = 0; i < recordNumber; i++) {
// 将第 index 条原始数据转化为 odps record
Record record = uploadSession.newRecord();
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, 1L);
break;
case BOOLEAN:
record.setBoolean(i, true);
break;
case DATETIME:
record.setDatetime(i, new Date());
break;
case DOUBLE:
record.setDouble(i, 0.0);
break;
case STRING:
record.setString(i, “sample”);
break;
default:
throw new RuntimeException(“Unknown column type: ”
+ column.getType());
}
}
// Write 本条数据至服务端,每写入 4KB 数据会进行一次网络传输
// 若 120s 没有网络传输服务端将会关闭连接,届时该 Writer 将不可用,需要重新写入
recordWriter.write(record);
}
// close 成功即代表该 block 上传成功,但是在整个 Session Commit 前,这些数据是在 odps 临时目录中不可见的
recordWriter.close();
} catch (TunnelException e) {
// 建议重试一定次数
e.printStackTrace();
System.out.println(“write failed:” + e.getMessage());
} catch (IOException e) {
// 建议重试一定次数
e.printStackTrace();
System.out.println(“write failed:” + e.getMessage());
}
}
// 提交所有 Block,uploadSession.getBlockList() 可以自行指定需要提交的 Block,Commit 成功后数据才会正式写入 Odps 分区,Commit 失败建议重试 10 次
for (int retry = 0; retry < 10; ++retry) {
try {
// 秒级操作,正式提交数据
uploadSession.commit(uploadSession.getBlockList());
break;
} catch (TunnelException e) {
System.out.println(“uploadSession commit failed:” + e.getMessage());
} catch (IOException e) {
System.out.println(“uploadSession commit failed:” + e.getMessage());
}
}
System.out.println(“upload success!”);

} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} }} 构造器举例说明:
PartitionSpec(String spec):通过字符串构造此类对象。
参数:
spec: 分区定义字符串,比如: pt=’1′,ds=’2’。因此程序中应该这样配置:private static String partition = “pt=’XXX’,ds=’XXX'”;
常见问题 MaxCompute Tunnel 是什么?Tunnel 是 MaxCompute 的数据通道,用户可以通过 Tunnel 向 MaxCompute 中上传或者下载数据。目前 Tunnel 仅支持表(不包括视图 View)数据的上传下载。
BlockId 是否可以重复?同一个 UploadSession 里的 blockId 不能重复。也就是说,对于同一个 UploadSession,用一个 blockId 打开 RecordWriter,写入一批数据后,调用 close,然后再 commit 完成后,写入成功后不可以重新再用该 blockId 打开另一个 RecordWriter 写入数据。Block 默认最多 20000 个,即 0 -19999。
Block 大小是否存在限制?一个 block 大小上限 100GB,强烈建议大于 64M 的数据,每一个 Block 对应一个文件,小于 64MB 的文件统称为小文件,小文件过多将会影响使用性能。使用新版 BufferedWriter 可以更简单的进行上传功能避免小文件等问题 Tunnel-SDK-BufferedWriter
Session 是否可以共享使用,存在生命周期吗?每个 Session 在服务端的生命周期为 24 小时,创建后 24 小时内均可使用,也可以跨进程 / 线程共享使用,但是必须保证同一个 BlockId 没有重复使用,分布式上传可以按照如下步骤:创建 Session-> 数据量估算 -> 分配 Block(例如线程 1 使用 0 -100,线程 2 使用 100-200)-> 准备数据 -> 上传数据 ->Commit 所有写入成功的 Block。
Session 创建后不使用是否对系统有消耗?每个 Session 在创建时会生成两个文件目录,如果大量创建而不使用,会导致临时目录增多,大量堆积时可能造成系统负担,请一定避免此类行为,尽量共享利用 session。
遇到 Write/Read 超时或 IOException 怎么处理?上传数据时,Writer 每写入 8KB 数据会触发一次网络动作,如果 120 秒内没有网络动作,服务端将主动关闭连接,届时 Writer 将不可用,请重新打开一个新的 Writer 写入。
建议使用 [Tunnel-SDK-BufferedWriter] 接口上传数据,该接口对用户屏蔽了 blockId 的细节,并且内部带有数据缓存区,会自动进行失败重试。
下载数据时,Reader 也有类似机制,若长时间没有网络 IO 会被断开连接,建议 Read 过程连续进行中间不穿插其他系统的接口。
MaxCompute Tunnel 目前有哪些语言的 SDK?MaxCompute Tunnel 目前提供 Java 版的 SDK。
MaxCompute Tunnel 是否支持多个客户端同时上传同一张表?支持。
MaxCompute Tunnel 适合批量上传还是流式上传 MaxCompute Tunnel 用于批量上传,不适合流式上传,流式上传可以使用 [DataHub 高速流式数据通道],毫秒级延时写入。
MaxCompute Tunnel 上传数据时一定要先存在分区吗?是的,Tunnel 不会自动创建分区。
Dship 与 MaxCompute Tunnel 的关系?dship 是一个工具,通过 MaxCompute Tunnel 来上传下载。
Tunnel upload 数据的行为是追加还是覆盖?追加的模式。
Tunnel 路由功能是怎么回事?路由功能指的是 Tunnel SDK 通过设置 MaxCompute 获取 Tunnel Endpoint 的功能。因此,SDK 可以只设置 MaxCompute 的 endpoint 来正常工作。
用 MaxCompute Tunnel 上传数据时,一个 block 的数据量大小多大比较合适没有一个绝对最优的答案,要综合考虑网络情况,实时性要求,数据如何使用以及集群小文件等因素。一般,如果数量较大是持续上传的模式,可以在 64M – 256M,如果是每天传一次的批量模式,可以设大一些到 1G 左右
使用 MaxCompute Tunnel 下载, 总是提示 timeout 一般是 endpoint 错误,请检查 Endpoint 配置,简单的判断方法是通过 telnet 等方法检测网络连通性。
通过 MaxCompute Tunnel 下载,抛出 You have NO privilege‘odps:Select‘on {acs:odps:*:projects/XXX/tables/XXX}. project‘XXX‘is protected 的异常该 project 开启了数据保护功能,用户操作这是从一个项目的数据导向另一个项目,这需要该 project 的 owner 操作。
Tunnel 上传抛出异常 ErrorCode=FlowExceeded, ErrorMessage=Your flow quota is exceeded.**Tunnel 对请求的并发进行了控制,默认上传和下载的并发 Quota 为 2000,任何相关的请求发出到结束过程中均会占用一个 Quota 单位。若出现类似错误,有如下几种建议的解决方案:1 sleep 一下再重试;2 将 project 的 tunnel 并发 quota 调大,需要联系管理员评估流量压力;3 报告 project owner 调查谁占用了大量并发 quota,控制一下。

正文完
 0