共计 4642 个字符,预计需要花费 12 分钟才能阅读完成。
作者:京东科技 贾世闻
对象存储是云的根底组件之一,各大云厂商都有相干产品。这里跟大家介绍一下 rust 与对象存储交到的根本套路和其中的一些技巧。
根本连贯
咱们以 [S3 sdk](
https://github.com/awslabs/aws-sdk-rust) 为例来说说根本的连贯与操作,作者验证过 aws、京东云、阿里云。次要的增删改查性能没有什么差异。
- 建设客户端
let shared_config = SdkConfig::builder()
.credentials_provider(SharedCredentialsProvider::new(Credentials::new(
"LTAI5t7NPuPKsXm6UeSa1",
"DGHuK03ESXQYqQ83buKMHs9NAwz",
None,
None,
"Static",
)))
.endpoint_url("http://oss-cn-beijing.aliyuncs.com")
.region(Region::new("oss-cn-beijing"))
.build();
let s3_config_builder = aws_sdk_s3::config::Builder::from(&shared_config);
let client = aws_sdk_s3::Client::from_conf(s3_config_builder.build());
建设 Client 所须要的参数次要有你须要拜访的 oss 的 AK、SK,endpoint url 以及服务所在的区域。以上信息都能够在服务商的帮忙文档查问到。
- 对象列表
let mut obj_list = client
.list_objects_v2()
.bucket(bucket)
.max_keys(max_keys)
.prefix(prefix_str)
.continuation_token(token_str);
let list = obj_list.send().await.unwrap();
println!("{:?}",list.contents());
println!("{:?}",list.next_continuation_token());
应用 list\_objects\_v2 函数返回对象列表,相比 list\_objects 函数,list\_objects\_v2 能够通过 continuation\_token 和 max_keys 管制返回列表的长度。list.contents() 返回对象列表数组,
list.next\_continuation\_token() 返回持续查问的 token。
- 上传文件
let content = ByteStream::from("content in file".as_bytes());
let exp = aws_smithy_types::DateTime::from_secs(100);
let upload = client
.put_object()
.bucket("bucket")
.key("/test/key")
.expires(exp)
.body(content);
upload.send().await.unwrap();
指定 bucket 及对象门路,body 承受 ByteStream 类型作为文件内容,最初设置过期工夫 expires,无过期工夫时不指定该配置即可。
- 下载文件
let key = "/tmp/test/key".to_string();
let resp = client
.get_object()
.bucket("bucket")
.key(&key)
.send()
.await.unwrap();
let data = resp.body.collect().await.unwrap();
let bytes = data.into_bytes();
let path = std::path::Path::new("/tmp/key")
if let Some(p) = path.parent() {std::fs::create_dir_all(p).unwrap();}
let mut file = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(path).unwrap();
let _ = file.write(&*bytes);
file.flush().unwrap();
通过 get_object() 函数获取 GetObjectOutput。返回值的 body 就是文件内容,将 body 转换为 bytes,最初关上文件写入即可。
- 删除文件
let mut keys = vec![];
let key1 = ObjectIdentifier::builder()
.set_key(Some("/tmp/key1".to_string()))
.build();
let key2 = ObjectIdentifier::builder()
.set_key(Some("/tmp/key2".to_string()))
.build()
keys.push(key1);
keys.push(key2)
client
.delete_objects()
.bucket(bucket)
.delete(Delete::builder().set_objects(Some(keys)).build())
.send()
.await
.unwrap();
delete_objects 批量删除对象。首先构建 keys vector,定义要删除的对象,而后通过 Delete::builder(),构建 Delete model。
大文件上传
let mut file = fs::File::open("/tmp/file_name").unwrap();
let chunk_size = 1024*1024;
let mut part_number = 0;
let mut upload_parts: Vec = Vec::new();
// 获取上传 id
let multipart_upload_res: CreateMultipartUploadOutput = self
.client
.create_multipart_upload()
.bucket("bucket")
.key("/tmp/key")
.send()
.await.unwrap();
let upload_id = match multipart_upload_res.upload_id() {Some(id) => id,
None => {return Err(anyhow!("upload id is None"));
}
};
// 分段上传文件并记录 completer_part
loop {let mut buf = vec![0; chuck_size];
let read_count = file.read(&mut buf)?;
part_number += 1;
if read_count == 0 {break;}
let body = &buf[..read_count];
let stream = ByteStream::from(body.to_vec());
let upload_part_res = self
.client
.upload_part()
.key(key)
.bucket(bucket)
.upload_id(upload_id)
.body(stream)
.part_number(part_number)
.send()
.await.unwrap();
let completer_part = CompletedPart::builder()
.e_tag(upload_part_res.e_tag.unwrap_or_default())
.part_number(part_number)
.build();
upload_parts.push(completer_part);
if read_count != chuck_size {break;}
}
// 实现上传文件合并
let completed_multipart_upload: CompletedMultipartUpload =
CompletedMultipartUpload::builder()
.set_parts(Some(upload_parts))
.build();
let _complete_multipart_upload_res = self
.client
.complete_multipart_upload()
.bucket("bucket")
.key(key)
.multipart_upload(completed_multipart_upload)
.upload_id(upload_id)
.send()
.await.unwrap();
有时候面对大文件,比方几百兆甚至几个 G 的文件,为了节约带宽和内存,我才采取分段上传的计划,而后在对象存储的服务端做合并。根本流程是:指定 bucket 和 key,获取一个上传 id;按流读取文件,分段上传字节流,并记录 CompletedPart; 告诉服务器依照 CompletedPart 汇合来合并文件。具体过程代码已加正文,这里不再累述。
大文件下载
let mut file = match OpenOptions::new()
.truncate(true)
.create(true)
.write(true)
.open("/tmp/target_file");
let key = "/tmp/test/key".to_string();
let resp = client
.get_object()
.bucket("bucket")
.key(&key)
.send()
.await.unwrap();
let content_len = resp.content_length();
let mut byte_stream_async_reader = resp.body.into_async_read();
let mut content_len_usize: usize = content_len.try_into().unwrap();
loop {
if content_len_usize > chunk_size {let mut buffer = vec![0; chunk_size];
let _ = byte_stream_async_reader.read_exact(&mut buffer).await.unwrap();
file.write_all(&buffer).unwrap();
content_len_usize -= chunk_size;
continue;
} else {let mut buffer = vec![0; content_len_usize];
let _ = byte_stream_async_reader.read_exact(&mut buffer).await.unwrap();
file.write_all(&buffer).unwrap();
break;
}
}
file.flush().unwrap();
在从对象存储服务端下载文件的过程中也会遇到大文件问题。为了节约带宽和内存,咱们采取读取字节流的形式分段写入文件。首先 get\_object() 函数获取 ByteStream,通过 async\_reader 流式读取对象字节,分段写入文件。
对象存储的相干话题明天先聊到这儿,下期见。