关于数据湖:数据湖存储的安全写入之道

3次阅读

共计 12403 个字符,预计需要花费 32 分钟才能阅读完成。

背景

数据湖的衰亡,给数据存储带来了一轮新的反动。越来越多的公司抉择将存储切换到云上对象存储。因为云上对象存储往往意味着大容量、低成本、易扩容。

说到对象存储,必然波及到 S3 协定,S3 协定曾经事实上成为对象存储的通用协定。不过,市面上不少数据平台公司,也会抉择基于 S3 协定又兼顾 Hadoop 应用习惯的 S3A Connector,比方 Databricks 在对象存储上提供的表数据结构 Delta Lake。

咱们就以 Hadoop 社区中的 S3A Connector 的实现为切入,来剖析一下数据湖写入门路的安全性。

Hadoop S3 的写入反对

因为 S3 协定自身不反对增量写入,因而 S3A 实现时默认的写入形式是先通过缓存到本地,最初在文件 close 后再上传到对象存储。然而,这种默认的形式并不一定高效,对大文件来说,在 close 被调用前,本地曾经缓存大量的数据,这样会造成 close 操作十分耗时,文件写入整体看也不高效。

从 Hadoop 2.8.5 版本开始,能够通过设置 fs.s3a.fast.upload 为 true,关上疾速上传来优化写入门路。关上后,能够边写本地缓存块,边将满足大小的块异步上传(默认 100M 一个块)。这样也满足了对象存储中分阶段上传接口的一些限度,比方单个块不能小于 5M,分块总数不能大于 10000。

通过浏览 Hadoop 2.8.5 相干源码,咱们能够发现关上 fs.s3a.fast.upload 后,S3AFileSystem 在创立文件时会关上 S3ABlockOutputStream(Hadoop 3.x 也有相似的 S3AFastOutputStream)随后,S3ABlockOutputStream 在解决 write、flush 等操作时,则会调用一个形象的 S3ADataBlock 来执行。

而 S3ADataBlock 则可由三种工厂办法来创立,别离创立基于堆内存的 ArrayBlock、基于磁盘的 DiskBlock,或者基于堆外内存的 ByteBufferBlock。抉择哪种工厂,由 fs.s3a.fast.upload.buffer 这个配置项管制,默认为磁盘(disk)。其余两种可选配置为堆内存(array)和 堆外内存(bytebuffer)。

磁盘的问题

通过理解 Hadoop 社区中 S3A 的实现,咱们发现借助磁盘缓存数据是常见甚至默认的行为。因为这样能够缩小内存占用,缓存更多的数据。然而,这样也带来了磁盘自身的阿喀琉斯之踵 — 磁盘的稳定性问题。

在数据存储畛域,磁盘的问题往往十分令人头疼。比方磁盘写满,磁盘坏道问题,还有偶现的磁盘数据比特反转导致的数据安全性问题。

哪怕单块磁盘的可靠性十分高,但因为磁盘呈现问题的概率会随着磁盘数的晋升而变大,这会使数据安全性蒙上一层暗影。

对于 R 个正本的状况,设磁盘的年故障率为 P,磁盘数为 N,则整个机群有 C (N, R) = N! / (R! * ( N- R)! ) 种 R 正本的组合形式。机群数据总量为 M,分片大小为 T,那么有 R 个磁盘同时损坏造成数据失落的概率是:

援用于《磁盘故障与存储系统的年失效率估算》

因而,要保障写门路的数据安全型,咱们不能齐全依赖底层存储介质的保障。仍须要咱们在数据写入时就做一些致力。咱们先来做一些试验来看看 S3AFileSystem 在这些问题上的体现。

模仿磁盘 IO 问题

1. 批改 core-sites.xml 中的 fs.s3a.buffer.dir 指向 /dev/vdc 所在的门路,比方我机器上的 /data2/

<property>
      <name>fs.s3a.fast.upload</name>
      <value>true</value>
    </property>
    <property>
      <!-- 本地 buffer 缓存目录,不存在会创立  -->
      <name>fs.s3a.buffer.dir</name>
      <value>/data2/tmp/</value>
    </property>

2. 创立并运行 stap 脚本,对所有在 /dev/vdc 上写操作的返回 IO Error

#!/usr/bin/stap

probe vfs.write.return {if (devname == "vdc")  {$return = -5}
}
$ stap-gio_errno.stp

3. 执行写入程序 demo,验证 stap 脚本无效

$ dd if=/dev/zero of=test-1G-stap bs=1G count=1
$ hadoop fs -put test-1G s3a://<your-bucket>/

返回后果:

put: 输出 / 输入谬误 

能够发现相干操作能正确抛出 IO 谬误。

模仿磁盘比特反转

1. 魔改 libfuse passthrough 中的 write 办法,并将 /data2/ 通过 fuse 挂载到 /mnt/passthrough

$ mkdir -p /mnt/passthrough/
$ ./passthrough /mnt/passthrough/ -omodules=subdir -osubdir=/data2/ -oauto_unmount

2. 批改 core-sites.xml 中的 hadoop.tmp.dir 指向 /mnt/passthrough

<property>
      <name>fs.s3a.fast.upload</name>
      <value>true</value>
    </property>
    <property>
      <!-- 本地 buffer 缓存目录,不存在会创立  -->
      <name>fs.s3a.buffer.dir</name>
      <value>/mnt/passthrough/</value>
    </property>

3. 执行写入程序 demo,验证上传内容的正确性。

$ mkdir -p input output
$ dd if=/dev/zero of=input/test-1G-fuse bs=1G count=1
$ hadoop fs -put input/test-1G-fuse s3a://<your-bucket>/
$ hadoop fs -get s3a://<your-bucket>/test-1G-fuse output/
$ md5sum input/test-1G-fuse output/test-1G-fuse

返回后果:

cd573cfaace07e7949bc0c46028904ff  input/test-1G-fuse
37eb6e664e706ea48281acbd4676569e  output/test-1G-fuse

能够发现,输出和输入的数据并不统一。

综上,通过 Hadoop S3AFileSystem 写入能够发现磁盘 IO 问题并正确抛出异样,但无奈发现磁盘比特反转问题。

网络的问题

既然磁盘写入有问题,那咱们应用内存写入是否就肯定能够防止踩坑呢?答案是不能,还可能有网络问题。

Amazon S3 在 2008 年就曾因为网络问题导致的比特位反转引发过重大事故。起初,大家剖析这种问题多产生于两端距离多个路由器的状况,路由器可能因为硬件 / 内存故障导致单 / 多比特位反转或双字节替换,这种反转如果产生在 payload 区,则无奈通过链路层、网络层、传输层的 checksum 查看进去。

因而 Amazon S3 在这次事变中汲取的教训是,要通过在应用层给所有货色都增加 checksum 来保证数据正确性。

让咱们来做一个试验,来看看 S3 是怎么做到 Checksum all of the things 的,又是否能防止网络比特反转或者网络丢包呢?

模仿网络比特反转

1. 装置 mitmproxy

$ pip3 install mitmproxy
$ mitmproxy --version
Mitmproxy: 5.3.0
Python:    3.6.8
OpenSSL:   OpenSSL 1.1.1h  22 Sep 2020
Platform:  Linux-3.10.0-1160.71.1.el7.x86_64-x86_64-with-centos-7.9.2009-Core

2. 利用 mitmdump 反向代理 s3a endpoint,并篡改其中的写申请。

编写 addons.py

from mitmproxy import ctx, http
import json
import time
import os

class HookOssRequest:

    def request(self, flow: http.HTTPFlow):
        print("")
        print("="*50)
        print("FOR:" + flow.request.url)
        print(flow.request.method + "" + flow.request.path +" " + flow.request.http_version)

        print("-"*50 + "request headers:")
        for k, v in flow.request.headers.items():
            print("%-20s: %s" % (k.upper(), v))

        if flow.request.host == "<your-bucket>.oss-cn-shanghai-internal.aliyuncs.com" and flow.request.method == "PUT":
            clen = len(flow.request.content)
            rbit = ord('a')
            clist = list(flow.request.content)
            origin = clist[clen - 1]
            clist[clen - 1] = rbit
            updated = clist[clen - 1]
            flow.request.content = bytes(clist)
            ctx.log.info("updated requesting content pos(" + str(clen - 1) + ") from" + str(chr(origin)) + "to" + str(chr(updated)))

    def response(self, flow: http.HTTPFlow):
        pass


addons = [HookOssRequest()
]

反向代理:http://.oss-cn-shanghai-internal.aliyuncs.com 到 http://localhost:8765

$ mitmdump -s addons.py -p 8765 --set block_global=false --mode 
reverse:http://<your-bucket>.oss-cn-shanghai-internal.aliyuncs.com

3. 批改 core-sites.xml 中的 fs.s3a.endpoint 指向 localhost:8765,并敞开 ssl。

<property>
      <name>fs.s3a.connection.ssl.enabled</name>
      <value>false</value>
    </property>
    <property>
      <name>fs.s3a.fast.upload</name>
      <value>true</value>
    </property>

4. 执行写入程序 demo,验证上传内容的正确性

$ mkdir -p input output
$ dd if=/dev/zero of=input/test-100M-proxy bs=$((100*1024*1024 + 1)) count=1
$ hadoop fs -put input/test-100M-proxy s3a://<your-bucket>/

返回后果:

xx/xx/xx xx:xx:xx WARN s3a.S3ABlockOutputStream: Transfer failure of block FileBlock{index=2, destFile=/data/hadoop/hadoop-2.8.5/tmp/s3a/s3ablock-0002-6832685202941984333.tmp, state=Upload, dataSize=1, limit=104857600}
xx/xx/xx xx:xx:xx WARN s3a.S3ABlockOutputStream: Transfer failure of block FileBlock{index=1, destFile=/data/hadoop/hadoop-2.8.5/tmp/s3a/s3ablock-0001-635596269039598032.tmp, state=Closed, dataSize=104857600, limit=104857600}
put: Multi-part upload with id '14ABE04E57114D0D9D8DBCFE4CB9366E' to test-100M-proxy._COPYING_ on test-100M-proxy._COPYING_: com.amazonaws.AmazonClientException: Unable to verify integrity of data upload.  Client calculated content hash (contentMD5: 93B885ADFE0DA089CDF634904FD59F71 in hex) didn't match hash (etag: 0CC175B9C0F1B6A831C399E269772661 in hex) calculated by Amazon S3.  You may need to delete the data stored in Amazon S3. (bucketName: <your-bucket>, key: test-100M-proxy._COPYING_, uploadId: 14ABE04E57114D0D9D8DBCFE4CB9366E, partNumber: 2, partSize: 1): Unable to verify integrity of data upload.  Client calculated content hash (contentMD5: 93B885ADFE0DA089CDF634904FD59F71 in hex) didn't match hash (etag: 0CC175B9C0F1B6A831C399E269772661 in hex) calculated by Amazon S3.  You may need to delete the data stored in Amazon S3. (bucketName: <your-bucket>, key: test-100M-proxy._COPYING_, uploadId: 14ABE04E57114D0D9D8DBCFE4CB9366E, partNumber: 2, partSize: 1)

可见,Amazon S3 在 header 签名中强制对每个 upload part 的 payload 做了 Content-MD5 的校验,可能无效检测出网络比特反转。

模仿网络丢包

之前的测试验证了,S3 应用 Content-MD5 的校验能够保障单个申请的正确性,但在写一些大文件,或是波及 JobCommitter 的作业中,往往会应用 multipart upload 来进行并发上传。而网络丢包也是一种常见的问题。于是,接下来咱们来验证下,如果上传过程中其中一个 part 失落,是否会给上传后果造成影响。

  1. 同样应用 mitmproxy 来模仿丢包
  2. 利用 mitmdump 反向代理 s3a endpoint,并抛弃其中 part2 的申请。

编写 addons.py

from mitmproxy import ctx, http
import json
import time
import os

class HookOssRequest:
    def request(self, flow: http.HTTPFlow):
        print("")
        print("="*50)
        print("FOR:" + flow.request.url)
        print(flow.request.method + "" + flow.request.path +" " + flow.request.http_version)

        print("-"*50 + "request headers:")
        for k, v in flow.request.headers.items():
            print("%-20s: %s" % (k.upper(), v))

        if flow.request.host == "<your-bucket>.oss-cn-shanghai-internal.aliyuncs.com" and flow.request.method == "PUT":
            if "partNumber=2" in flow.request.path:
                flow.response = http.HTTPResponse.make(200,  # (optional) status code
                    b"Hello World",  # (optional) content
                    {"Content-Type": "text/html"},  # (optional) headers
                )
                ctx.log.info("drop part-2 request!")
            ctx.log.info("requesting length:" + str(len(flow.request.content)))

    def response(self, flow: http.HTTPFlow):
        pass

addons = [HookOssRequest()
]

反向代理:http://.oss-cn-shanghai-internal.aliyuncs.com 到 http://localhost:8765

$ mitmdump-saddons.py-p8765--setblock_global=false--moder
everse:http://<your-bucket>.oss-cn-shanghai-internal.aliyuncs.com

3. 同样批改 core-sites.xml 中的 fs.s3a.endpoint 指向 localhost:8765,并敞开 ssl。

<property>
      <name>fs.s3a.connection.ssl.enabled</name>
      <value>false</value>
    </property>
    <property>
      <name>fs.s3a.fast.upload</name>
      <value>true</value>
    </property>

4. 执行写入程序 demo,验证上传内容的正确性

$ mkdir -p input output
$ dd if=/dev/zero of=input/test-100M-proxy bs=$((100*1024*1024 + 1)) count=1
$ hadoop fs -put input/test-100M-proxy s3a://<your-bucket>/

xx/xx/xx xx:xx:x WARN s3a.S3ABlockOutputStream: Transfer failure of block FileBlock{index=2, destFile=/data/hadoop/hadoop-2.8.5/tmp/s3a/s3ablock-0002-2063629354855241099.tmp, state=Upload, dataSize=1, limit=104857600}
put: Multi-part upload with id 'D58303E74A5F4E6D8A27DD112297D0BE' to test-100M-proxy._COPYING_ on test-100M-proxy._COPYING_: com.amazonaws.AmazonClientException: Unable to verify integrity of data upload.  Client calculated content hash (contentMD5: 93B885ADFE0DA089CDF634904FD59F71 in hex) didn't match hash (etag: null in hex) calculated by Amazon S3.  You may need to delete the data stored in Amazon S3. (bucketName: <your-bucket>, key: test-100M-proxy._COPYING_, uploadId: D58303E74A5F4E6D8A27DD112297D0BE, partNumber: 2, partSize: 1): Unable to verify integrity of data upload.  Client calculated content hash (contentMD5: 93B885ADFE0DA089CDF634904FD59F71 in hex) didn't match hash (etag: null in hex) calculated by Amazon S3.  You may need to delete the data stored in Amazon S3. (bucketName: <your-bucket>, key: test-100M-proxy._COPYING_, uploadId: D58303E74A5F4E6D8A27DD112297D0BE, partNumber: 2, partSize: 1)

可见,Amazon S3 在 close 申请中通过 CompleteMultipartUpload 对每个上传的 Part 做了查看,可能发现失落的申请。

校验算法的抉择

上文曾经证实了校验码的不可或缺性,而且能够看到 Amazon S3 默认采纳了 MD5 作为校验码。那就是最优的抉择了吗?让咱们来看看还有没有别的抉择。

数据摘要算法

MD5、SHA-1、SHA-256、SHA-512 都是数据摘要算法,均被宽泛作为明码的散列函数。

但因为 MD5、SHA- 1 曾经被证实为不平安的算法,目前倡议应用较新的 SHA-256 和 SHA-512。

所有算法的输出均能够是不定长的数据。MD5 输入是 16 字节(128 位),SHA- 1 输入为 20 字节(160 位),SHA-256 为 32 字节(256 位),SHA-512 为 64 字节(512 位)。

能够看到,SHA 算法的输入长度更长,因而更难产生碰撞,数据也更为平安。但运算速度与 MD5 相比,也更慢。

循环冗余校验

循环冗余校验又称 CRC(Cyclic redundancy check),将待发送的比特串看做是系数为 0 或者 1 的多项式。

M = 1001010
M(x) = 1x^6 + 0x^5 + 0x^4 + 1x^3 + 0x^2 + 1x^1 + 0*x^0
M(x) = x^6 + x^3 + x
CRC 编码时,发送方和接管方必须事后约定一个生成多项式 G(x)。

发送方将比特串和生成多项式 G(x) 进行运算失去校验码,在比特串尾附加校验码,使得带校验码的比特串的多项式能被 G(x) 整除。接管方接管到后,除以 G(x),若有余数,则传输有错。

校验算法的开销

CRC 算法的长处是算法实现绝对简略、运算速度较快。而且谬误检错能力很强,因而被广泛应用于通信数据校验。

咱们做了一些简略的 benchmark 以供参考:

CRC32 > CRC64 > MD5 > SHA-1 > SHA-512 > SHA-256

而 OSS 反对的校验算法有 MD5 和 CRC64,那么同样的场景下,咱们会优先选择 CRC64 代替 MD5。

阿里云 EMR JindoSDK 的最佳实际

在总结了 S3AFileSystem 做法中的优缺点,并联合 OSS 本身提供的一些性能舍短取长后,阿里云 EMR JindoSDK 得出了本人的最佳实际。

JindoSDK 实现的 JindoOutputStream 反对了两种校验形式,一种是申请级别的校验,一种是文件块级别的校验。

申请级别的校验,默认敞开。须要关上时,配置 fs.oss.checksum.md5.enable 为 true 即可。配置好之后,客户端会在块级别的申请(PutObject/MultipartUpload)Header 中增加 Payload 的 Content-MD5。如果服务端计算 Payload 的 md5 与 客户端提供的不符,则客户端会重试。

文件块级别的校验,默认关上。须要敞开时,须要配置 fs.oss.checksum.crc64.enable 为 false。则是在写入流一开始就在内存中同步计算传入 Buffer 的 CRC64,并在文件块落盘时和服务端计算返回的 CRC64 进行比拟。

应用最新的 jindosdk-4.6.2 版本与 S3AFileSystem 在数据湖写入门路上,综合比照的后果如下:

能够看到 EMR JindoSDK 在写 OSS 时,不仅有着相比 S3AFileSystem 更欠缺的谬误查看,性能也更为优异。

总结与瞻望

数据湖存储的平安写入,必须要能思考到内存、磁盘、网络的不可靠性。同时,也要联合存储介质自身的个性,抉择适合的校验算法。相熟数据写入残缺链路,全面地思考各种可能遇到的问题,并提供欠缺的测试计划验证可行性,才算善始善终。

阿里云 EMR JindoSDK 通过以上形式造成了本人的最佳实际,不仅保障了对象存储写入链路的安全性,同样也反对了 EMR JindoFS 服务(OSS-HDFS)的写入链路。尽管 OSS-HDFS 中的一个文件能够对应 OSS 上的多个对象,然而在写入 OSS 时,底层复用了同一套实现。因而,在应用时也不须要做额定的适配,齐全能够共用雷同的配置项。

将来咱们还将联合 OSS-HDFS,提供在数据随机读场景的安全性校验,而这是对象存储自身目前无奈做到的。

附录一:测试 S3A 的配置形式

core-sites.xml

<property>
      <name>fs.s3a.impl</name>
      <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>
    <property>
      <name>fs.AbstractFileSystem.s3a.impl</name>
      <value>org.apache.hadoop.fs.s3a.S3A</value>
    </property>
    <property>
      <name>fs.s3a.access.key</name>
      <value>xxx</value>
    </property>
    <property>
      <name>fs.s3a.secret.key</name>
      <value>xx</value>
    </property>
    <property>
      <name>fs.s3a.endpoint</name>
      <value>localhost:8765</value>
    </property>
    <property>
      <name>fs.s3a.connection.ssl.enabled</name>
      <value>false</value>
    </property>
    <property>
      <name>fs.s3a.fast.upload</name>
      <value>true</value>
    </property>
    <property>
      <!-- 本地 buffer 缓存目录,不存在会创立  -->
      <name>fs.s3a.buffer.dir</name>
      <value>/mnt/passthrough/</value>
    </property>

附录二:测试 EMR JindoSDK 的配置形式

core-sites.xml

<property>
        <name>fs.AbstractFileSystem.oss.impl</name>
        <value>com.aliyun.jindodata.oss.OSS</value>
    </property>
    <property>
        <name>fs.oss.impl</name>
        <value>com.aliyun.jindodata.oss.JindoOssFileSystem</value>
    </property>
    <property>
        <name>fs.oss.accessKeyId</name>
        <value>xxx</value>
    </property>
    <property>
        <name>fs.oss.accessKeySecret</name>
        <value>xxx</value>
    </property>
    <property>
        <name>fs.oss.endpoint</name>
        <!-- 阿里云 ECS 环境下举荐应用内网 OSS Endpoint,即 oss-cn-xxx-internal.aliyuncs.com -->
        <value>oss-cn-xxx.aliyuncs.com</value>
    </property>
    <property>
        <!-- 客户端写入时的长期文件目录,可配置多个(逗号隔开),会轮流写入,多用户环境需配置可读写权限 -->
        <name>fs.oss.tmp.data.dirs</name>
        <value>/data2/tmp/</value>
    </property>
    <property>
        <!-- 是否应用二级域名写入
             关上后 <your-bucket>.oss-cn-xxx-internal.aliyuncs.com/<your-dir>
             会变为 oss-cn-xxx-internal.aliyuncs.com/<your-bucket>/<your-dir> -->
        <name>fs.oss.second.level.domain.enable</name>
        <value>true</value>
    </property>

log4j.properties

log4j.logger.com.aliyun.jindodata=INFO
log4j.logger.com.aliyun.jindodata.common.FsStats=INFO

mitmproxy

获取 endpoint ip

ping oss-cn-shanghai-internal.aliyuncs.com
64 bytes from xxx.xxx.xxx.xx (xxx.xxx.xxx.xx): icmp_seq=1 ttl=102 time=0.937 ms

将 addons.py 中应用 ip 代替 .http://oss-cn-shanghai-internal.aliyuncs.com

if flow.request.host == "xxx.xxx.xxx.xx" and flow.request.method == "PUT":

反向代理时也应用 ip 代替 .http://oss-cn-shanghai-internal.aliyuncs.com

mitmdump-saddons.py-p8765--setblock_global=false--mode
reverse:http://xxx.xxx.xxx.xx:80

作者:焱冰 @阿里云

原文链接

本文为阿里云原创内容,未经容许不得转载。

正文完
 0