聊聊flink的BlobWriter

38次阅读

共计 14806 个字符,预计需要花费 38 分钟才能阅读完成。


本文主要研究一下 flink 的 BlobWriter
BlobWriter
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
/**
* BlobWriter is used to upload data to the BLOB store.
*/
public interface BlobWriter {

Logger LOG = LoggerFactory.getLogger(BlobWriter.class);

/**
* Uploads the data of the given byte array for the given job to the BLOB server and makes it
* a permanent BLOB.
*
* @param jobId
* the ID of the job the BLOB belongs to
* @param value
* the buffer to upload
*
* @return the computed BLOB key identifying the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
* store
*/
PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException;

/**
* Uploads the data from the given input stream for the given job to the BLOB server and makes it
* a permanent BLOB.
*
* @param jobId
* ID of the job this blob belongs to
* @param inputStream
* the input stream to read the data from
*
* @return the computed BLOB key identifying the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while reading the data from the input stream, writing it to a
* local file, or uploading it to the HA store
*/
PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException;

/**
* Returns the min size before data will be offloaded to the BLOB store.
*
* @return minimum offloading size
*/
int getMinOffloadingSize();

/**
* Serializes the given value and offloads it to the BlobServer if its size exceeds the minimum
* offloading size of the BlobServer.
*
* @param value to serialize
* @param jobId to which the value belongs.
* @param blobWriter to use to offload the serialized value
* @param <T> type of the value to serialize
* @return Either the serialized value or the stored blob key
* @throws IOException if the data cannot be serialized
*/
static <T> Either<SerializedValue<T>, PermanentBlobKey> serializeAndTryOffload(
T value,
JobID jobId,
BlobWriter blobWriter) throws IOException {
Preconditions.checkNotNull(value);
Preconditions.checkNotNull(jobId);
Preconditions.checkNotNull(blobWriter);

final SerializedValue<T> serializedValue = new SerializedValue<>(value);

if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) {
return Either.Left(new SerializedValue<>(value));
} else {
try {
final PermanentBlobKey permanentBlobKey = blobWriter.putPermanent(jobId, serializedValue.getByteArray());

return Either.Right(permanentBlobKey);
} catch (IOException e) {
LOG.warn(“Failed to offload value {} for job {} to BLOB store.”, value, jobId, e);

return Either.Left(serializedValue);
}
}
}
}
BlobWriter 定义了 putPermanent、getMinOffloadingSize 方法,同时还提供了 serializeAndTryOffload 静态方法用于序列化指定 value 并在其大小超过 minimum offloading size 时调用 blobWriter.putPermanent 存放到 BlobServer
BlobServer
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
/**
* This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and
* spawning threads to handle these requests. Furthermore, it takes care of creating the directory structure to store
* the BLOBs or temporarily cache them.
*/
public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService {
//……

@Override
public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {
checkNotNull(jobId);
return (PermanentBlobKey) putBuffer(jobId, value, PERMANENT_BLOB);
}

@Override
public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
checkNotNull(jobId);
return (PermanentBlobKey) putInputStream(jobId, inputStream, PERMANENT_BLOB);
}

/**
* Returns the configuration used by the BLOB server.
*
* @return configuration
*/
@Override
public final int getMinOffloadingSize() {
return blobServiceConfiguration.getInteger(BlobServerOptions.OFFLOAD_MINSIZE);
}

/**
* Uploads the data of the given byte array for the given job to the BLOB server.
*
* @param jobId
* the ID of the job the BLOB belongs to
* @param value
* the buffer to upload
* @param blobType
* whether to make the data permanent or transient
*
* @return the computed BLOB key identifying the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
* store
*/
private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, BlobKey.BlobType blobType)
throws IOException {

if (LOG.isDebugEnabled()) {
LOG.debug(“Received PUT call for BLOB of job {}.”, jobId);
}

File incomingFile = createTemporaryFilename();
MessageDigest md = BlobUtils.createMessageDigest();
BlobKey blobKey = null;
try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
md.update(value);
fos.write(value);
} catch (IOException ioe) {
// delete incomingFile from a failed download
if (!incomingFile.delete() && incomingFile.exists()) {
LOG.warn(“Could not delete the staging file {} for job {}.”,
incomingFile, jobId);
}
throw ioe;
}

try {
// persist file
blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);

return blobKey;
} finally {
// delete incomingFile from a failed download
if (!incomingFile.delete() && incomingFile.exists()) {
LOG.warn(“Could not delete the staging file {} for blob key {} and job {}.”,
incomingFile, blobKey, jobId);
}
}
}

/**
* Uploads the data from the given input stream for the given job to the BLOB server.
*
* @param jobId
* the ID of the job the BLOB belongs to
* @param inputStream
* the input stream to read the data from
* @param blobType
* whether to make the data permanent or transient
*
* @return the computed BLOB key identifying the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while reading the data from the input stream, writing it to a
* local file, or uploading it to the HA store
*/
private BlobKey putInputStream(
@Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType)
throws IOException {

if (LOG.isDebugEnabled()) {
LOG.debug(“Received PUT call for BLOB of job {}.”, jobId);
}

File incomingFile = createTemporaryFilename();
MessageDigest md = BlobUtils.createMessageDigest();
BlobKey blobKey = null;
try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
// read stream
byte[] buf = new byte[BUFFER_SIZE];
while (true) {
final int bytesRead = inputStream.read(buf);
if (bytesRead == -1) {
// done
break;
}
fos.write(buf, 0, bytesRead);
md.update(buf, 0, bytesRead);
}

// persist file
blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);

return blobKey;
} finally {
// delete incomingFile from a failed download
if (!incomingFile.delete() && incomingFile.exists()) {
LOG.warn(“Could not delete the staging file {} for blob key {} and job {}.”,
incomingFile, blobKey, jobId);
}
}
}

/**
* Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for
* use.
*
* @param incomingFile
* temporary file created during transfer
* @param jobId
* ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
* @param digest
* BLOB content digest, i.e. hash
* @param blobType
* whether this file is a permanent or transient BLOB
*
* @return unique BLOB key that identifies the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while moving the file or uploading it to the HA store
*/
BlobKey moveTempFileToStore(
File incomingFile, @Nullable JobID jobId, byte[] digest, BlobKey.BlobType blobType)
throws IOException {

int retries = 10;

int attempt = 0;
while (true) {
// add unique component independent of the BLOB content
BlobKey blobKey = BlobKey.createKey(blobType, digest);
File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);

// try again until the key is unique (put the existence check into the lock!)
readWriteLock.writeLock().lock();
try {
if (!storageFile.exists()) {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, storageFile, LOG,
blobKey instanceof PermanentBlobKey ? blobStore : null);
// add TTL for transient BLOBs:
if (blobKey instanceof TransientBlobKey) {
// must be inside read or write lock to add a TTL
blobExpiryTimes
.put(Tuple2.of(jobId, (TransientBlobKey) blobKey),
System.currentTimeMillis() + cleanupInterval);
}
return blobKey;
}
} finally {
readWriteLock.writeLock().unlock();
}

++attempt;
if (attempt >= retries) {
String message = “Failed to find a unique key for BLOB of job ” + jobId + ” (last tried ” + storageFile.getAbsolutePath() + “.”;
LOG.error(message + ” No retries left.”);
throw new IOException(message);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(“Trying to find a unique key for BLOB of job {} (retry {}, last tried {})”,
jobId, attempt, storageFile.getAbsolutePath());
}
}
}
}

/**
* Returns a temporary file inside the BLOB server’s incoming directory.
*
* @return a temporary file inside the BLOB server’s incoming directory
*
* @throws IOException
* if creating the directory fails
*/
File createTemporaryFilename() throws IOException {
return new File(BlobUtils.getIncomingDirectory(storageDir),
String.format(“temp-%08d”, tempFileCounter.getAndIncrement()));
}

//……
}

BlobServer 实现了 BlobWriter 接口,putPermanent 方法分别用到了 putBuffer 及 putInputStream 方法,而 getMinOffloadingSize 方法则从 blobServiceConfiguration 获取 BlobServerOptions.OFFLOAD_MINSIZE 配置,默认是 1M
putBuffer 方法接收 byte[] 参数,它先把 byte[] 写入到临时文件,之后调用 moveTempFileToStore 方法进行持久化;putInputStream 方法接收 InputStream 参数,它也是先把 InputStream 写入到临时文件,然后调用 moveTempFileToStore 方法进行持久化
moveTempFileToStore 方法调用了 BlobUtils.moveTempFileToStore 将本地临时文件转移到 permanent location;其中 storageDir 由 BlobUtils.initLocalStorageDirectory(config) 来初始化,而 storageFile 通过 BlobUtils.getStorageLocation(storageDir, jobId, blobKey) 来获取

BlobUtils
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
/**
* Utility class to work with blob data.
*/
public class BlobUtils {
//……

/**
* Creates a local storage directory for a blob service under the configuration parameter given
* by {@link BlobServerOptions#STORAGE_DIRECTORY}. If this is <tt>null</tt> or empty, we will
* fall back to Flink’s temp directories (given by
* {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}) and choose one among them at
* random.
*
* @param config
* Flink configuration
*
* @return a new local storage directory
*
* @throws IOException
* thrown if the local file storage cannot be created or is not usable
*/
static File initLocalStorageDirectory(Configuration config) throws IOException {

String basePath = config.getString(BlobServerOptions.STORAGE_DIRECTORY);

File baseDir;
if (StringUtils.isNullOrWhitespaceOnly(basePath)) {
final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(config);
baseDir = new File(tmpDirPaths[RANDOM.nextInt(tmpDirPaths.length)]);
}
else {
baseDir = new File(basePath);
}

File storageDir;

// NOTE: although we will be using UUIDs, there may be collisions
int maxAttempts = 10;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
storageDir = new File(baseDir, String.format(
“blobStore-%s”, UUID.randomUUID().toString()));

// Create the storage dir if it doesn’t exist. Only return it when the operation was
// successful.
if (storageDir.mkdirs()) {
return storageDir;
}
}

// max attempts exceeded to find a storage directory
throw new IOException(“Could not create storage directory for BLOB store in ‘” + baseDir + “‘.”);
}

/**
* Returns the (designated) physical storage location of the BLOB with the given key.
*
* @param storageDir
* storage directory used be the BLOB service
* @param key
* the key identifying the BLOB
* @param jobId
* ID of the job for the incoming files (or <tt>null</tt> if job-unrelated)
*
* @return the (designated) physical storage location of the BLOB
*
* @throws IOException
* if creating the directory fails
*/
static File getStorageLocation(
File storageDir, @Nullable JobID jobId, BlobKey key) throws IOException {
File file = new File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));

Files.createDirectories(file.getParentFile().toPath());

return file;
}

/**
* Returns the path for the given blob key.
*
* <p>The returned path can be used with the (local or HA) BLOB store file system back-end for
* recovery purposes and follows the same scheme as {@link #getStorageLocation(File, JobID,
* BlobKey)}.
*
* @param storageDir
* storage directory used be the BLOB service
* @param key
* the key identifying the BLOB
* @param jobId
* ID of the job for the incoming files
*
* @return the path to the given BLOB
*/
static String getStorageLocationPath(
String storageDir, @Nullable JobID jobId, BlobKey key) {
if (jobId == null) {
// format: $base/no_job/blob_$key
return String.format(“%s/%s/%s%s”,
storageDir, NO_JOB_DIR_PREFIX, BLOB_FILE_PREFIX, key.toString());
} else {
// format: $base/job_$jobId/blob_$key
return String.format(“%s/%s%s/%s%s”,
storageDir, JOB_DIR_PREFIX, jobId.toString(), BLOB_FILE_PREFIX, key.toString());
}
}

/**
* Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for
* use (not thread-safe!).
*
* @param incomingFile
* temporary file created during transfer
* @param jobId
* ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
* @param blobKey
* BLOB key identifying the file
* @param storageFile
* (local) file where the blob is/should be stored
* @param log
* logger for debug information
* @param blobStore
* HA store (or <tt>null</tt> if unavailable)
*
* @throws IOException
* thrown if an I/O error occurs while moving the file or uploading it to the HA store
*/
static void moveTempFileToStore(
File incomingFile, @Nullable JobID jobId, BlobKey blobKey, File storageFile,
Logger log, @Nullable BlobStore blobStore) throws IOException {

try {
// first check whether the file already exists
if (!storageFile.exists()) {
try {
// only move the file if it does not yet exist
Files.move(incomingFile.toPath(), storageFile.toPath());

incomingFile = null;

} catch (FileAlreadyExistsException ignored) {
log.warn(“Detected concurrent file modifications. This should only happen if multiple” +
“BlobServer use the same storage directory.”);
// we cannot be sure at this point whether the file has already been uploaded to the blob
// store or not. Even if the blobStore might shortly be in an inconsistent state, we have
// to persist the blob. Otherwise we might not be able to recover the job.
}

if (blobStore != null) {
// only the one moving the incoming file to its final destination is allowed to upload the
// file to the blob store
blobStore.put(storageFile, jobId, blobKey);
}
} else {
log.warn(“File upload for an existing file with key {} for job {}. This may indicate a duplicate upload or a hash collision. Ignoring newest upload.”, blobKey, jobId);
}
storageFile = null;
} finally {
// we failed to either create the local storage file or to upload it –> try to delete the local file
// while still having the write lock
if (storageFile != null && !storageFile.delete() && storageFile.exists()) {
log.warn(“Could not delete the storage file {}.”, storageFile);
}
if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) {
log.warn(“Could not delete the staging file {} for blob key {} and job {}.”, incomingFile, blobKey, jobId);
}
}
}

//……
}

initLocalStorageDirectory 方法从配置文件读取 BlobServerOptions.STORAGE_DIRECTORY 配置 (blob.storage.directory),如果没有配置,则通过 ConfigurationUtils.parseTempDirectories 来获取 tmpDirPaths,然后随机选一个作为 baseDir,而 storageDir 目录则是 baseDir 的子目录,其目录名前缀为 blobStore
getStorageLocation 方法则在 storageDir 的基础上根据 JobID 及 BlobKey 构造具体的存储路径,其格式为 $base/no_job/blob_$key 或者 $base/job_$jobId/blob_$key

moveTempFileToStore 方法则在目标文件不存在的场景下使用 Files.move 将 incomingFile 转移到 storageFile,如果 blobStore 不为 null,还会将 storageFile 放入到 BlobStore

小结

BlobWriter 定义了 putPermanent、getMinOffloadingSize 方法,同时还提供了 serializeAndTryOffload 静态方法用于序列化指定 value 并在其大小超过 minimum offloading size 时调用 blobWriter.putPermanent 存放到 BlobServer
BlobServer 实现了 BlobWriter 接口,putPermanent 方法分别用到了 putBuffer 及 putInputStream 方法,而 getMinOffloadingSize 方法则从 blobServiceConfiguration 获取 BlobServerOptions.OFFLOAD_MINSIZE 配置,默认是 1M;putBuffer 方法接收 byte[] 参数,它先把 byte[] 写入到临时文件,之后调用 moveTempFileToStore 方法进行持久化;putInputStream 方法接收 InputStream 参数,它也是先把 InputStream 写入到临时文件,然后调用 moveTempFileToStore 方法进行持久化;moveTempFileToStore 方法调用了 BlobUtils.moveTempFileToStore 将本地临时文件转移到 permanent location;其中 storageDir 由 BlobUtils.initLocalStorageDirectory(config) 来初始化,而 storageFile 通过 BlobUtils.getStorageLocation(storageDir, jobId, blobKey) 来获取
BlobUtils 的 initLocalStorageDirectory 方法从配置文件读取 BlobServerOptions.STORAGE_DIRECTORY 配置 (blob.storage.directory),如果没有配置,则通过 ConfigurationUtils.parseTempDirectories 来获取 tmpDirPaths,然后随机选一个作为 baseDir,而 storageDir 目录则是 baseDir 的子目录,其目录名前缀为 blobStore;getStorageLocation 方法则在 storageDir 的基础上根据 JobID 及 BlobKey 构造具体的存储路径,其格式为 $base/no_job/blob_$key 或者 $base/job_$jobId/blob_$key;moveTempFileToStore 方法则在目标文件不存在的场景下使用 Files.move 将 incomingFile 转移到 storageFile,如果 blobStore 不为 null,还会将 storageFile 放入到 BlobStore

doc
BlobWriter

正文完
 0