共计 8147 个字符,预计需要花费 21 分钟才能阅读完成。
1、SortMergeResultPartition 的创立应用
首先是一个读过程的一个调用链
PartitionRequestServerHandler.channelRead0()
->CreditBasedSequenceNumberingViewReader.requestSubpartitionView()
->ResultPartitionManager.createSubpartitionView()
->SortMergeResultPartition.createSubpartitionView()
->SortMergeResultPartitionReadScheduler.crateSubpartitionReader()
->createFileReader()->new PartitionedFileReader()
SortMergeResultPartition 的创立,由上一篇写出篇可知,SortMergeResultPartition 是在 ResultPartitionFactory 创立的。首先 SortMergeResultPartition 对象的创立调用链:
new Task()
->NettyShuffleEnvironment.createResultPartitionWriters()
->ResultPartitionFactory.create()
之后调用 ConsumableNotifyingResultPartitionWriterDecorator.decorate() 封装进 Task 的成员 consumableNotifyingPartitionWriters,再之后是注册治理这个 SortMergeResultPartition:
Task.doRun()
->setupPartitionsAndGates()
->consumableNotifyingPartitionWriters: SortMergeResultPartition.setup()
->ResultPartition.setup()
->ResultPartitionManager.registerResultPartition(this)
->registeredPartitions.put()
以上注册进了 registeredPartitions 的列表当中(registeredPartitions 是 ResultPartitionManager 的成员变量),再依据第一个调用链,在 createSubpartitionView() 的时候从列表获取应用
有一点须要留神的是,shuffle 文件在工作完结的时候才会实现全副写出(次要是 index 文件),也就是 PartitionedFile 在 Task 完结才会创立,之后文件追随 TaskManager 的对立治理,也就是 ResultPartitionManager。也就是说,这里的读过程并不是上游来上游工作读的过程,而是对上游输入的读的一个解决。
整个治理相干的链路如下:
TaskManagerServices.createShuffleEnvironment()
->NettyShuffleServiceFactory.createShuffleEnvironment()
->createNettyShuffleEnvironment()-> new ResultPartitionManager()
->new NettyConnectionManager()
->new NettyProtocol() -> 成员 partitionProvider
->new PartitionRequestServerHandler(partitionProvider,...)
最初跟第一个链路关联上了,partitionProvider 即第一个链路中的 ResultPartitionManager
PartitionedFile 是在工作完结的时候实现对象的创立的,如下在 Task.doRun() 中,会调用实现 ResultPartition 的输入
// finish the produced partitions. if this fails, we consider the execution failed.
for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {if (partitionWriter != null) {partitionWriter.finish();
}
}
最终调用到 PartitionedFileWriter 的 finish() 接口,实现 PartitionedFile 对象的创立
public PartitionedFile finish() throws IOException {
......
......
return new PartitionedFile(
numRegions,
numSubpartitions,
dataFilePath,
indexFilePath,
dataFileSize,
indexFileSize,
numBuffers,
indexEntryCache);
}
2、PartitionedFileReader
这个类读取的原理能够解释原理章节形容的信息,即 Flink 一个分区的文件写在多个 region 中,写完之后并没有再消耗资源从新进行排序将分区数据聚合,而是在读取的时候,通过伎俩将跨 region 的数据一起读出来。
这个类是 sort shuffle 文件最上层的文件阅读器,负责从 shuffle 文件中读取数据返回下层,次要有三个办法。几个重要的读取应用的标记位成员变量如下。留神其中的 targetSubpartition 成员,该变量是 final 的,在初始化赋值当前只读不扭转,也就是说,每个 PartitionedFileReader 对应读取一个分区的数据
/** Target subpartition to read. */
private final int targetSubpartition;
/** Next data region to be read. */
private int nextRegionToRead;
/** Next file offset to be read. */
private long nextOffsetToRead;
/** Number of remaining buffers in the current data region read. */
private int currentRegionRemainingBuffers;
2.1、moveToNextReadableRegion
性能是将阅读器的各项指标设置到下一个可读的 region。应用这个类的时候,第一次读取会有一轮空读,而后调用到这个接口,实现各项指标的指向,之后才开始读取数据。
while (currentRegionRemainingBuffers <= 0
&& nextRegionToRead < partitionedFile.getNumRegions()) {
partitionedFile.getIndexEntry(indexFileChannel, indexEntryBuf, nextRegionToRead, targetSubpartition);
nextOffsetToRead = indexEntryBuf.getLong();
currentRegionRemainingBuffers = indexEntryBuf.getInt();
++nextRegionToRead;
}
while 的循环条件是两个:1、以后 region 读完;2、未达到最初的 region。
getIndexEntry 办法用于获取索引,依据写流程的章节,只有 buffer 有余时才会将 index 写出到文件,也就是说,buffer 没有用完的话,index 是存储在 buffer 中的,不须要去文件中读。如下,依据 cache 条件,别离从内存或文件获取 index
/**
* Gets the index entry of the target region and subpartition either from the index data cache
* or the index data file.
*/
void getIndexEntry(FileChannel indexFile, ByteBuffer target, int region, int subpartition)
throws IOException {checkArgument(target.capacity() == INDEX_ENTRY_SIZE, "Illegal target buffer size.");
target.clear();
long indexEntryOffset = getIndexEntryOffset(region, subpartition);
if (indexEntryCache != null) {for (int i = 0; i < INDEX_ENTRY_SIZE; ++i) {target.put(indexEntryCache.get((int) indexEntryOffset + i));
}
} else {indexFile.position(indexEntryOffset);
BufferReaderWriterUtil.readByteBufferFully(indexFile, target);
}
target.flip();}
缓存读取依据 index 占位数,循环从缓存中读取对应的字节数;文件读取,先跳转到文件指定地位,而后因为提供的读数据的 buffer 大小为 index 的大小,所以 buffer 大小用完即示意读取了一个 index
getIndexEntryOffset 办法用于获取以后须要的数据的 index 的地位,依据 index 存储规定,间接计算取得,如下依据 region 号、partition 号以及 index 占位数间接获取后果
private long getIndexEntryOffset(int region, int subpartition) {checkArgument(region >= 0 && region < getNumRegions(), "Illegal target region.");
checkArgument(
subpartition >= 0 && subpartition < numSubpartitions,
"Subpartition index out of bound.");
return (((long) region) * numSubpartitions + subpartition) * INDEX_ENTRY_SIZE;
}
index 获取实现当前依据 index 的内容,更新相干读取指标:1、读取地位;2、读取数量
2.2、readCurrentRegion
性能是从 shuffle 文件中读取对应分区的数据。依据相应的指标,定位到文件的具体位置,接着先解析元数据头,获取数据的相干信息,之后依据元数据中表明的数据大小,读取数据。
Buffer readCurrentRegion(MemorySegment target, BufferRecycler recycler) throws IOException {if (currentRegionRemainingBuffers == 0) {return null;}
dataFileChannel.position(nextOffsetToRead);
Buffer buffer = readFromByteChannel(dataFileChannel, headerBuf, target, recycler);
nextOffsetToRead = dataFileChannel.position();
--currentRegionRemainingBuffers;
return buffer;
}
其中的 headerBuf 是一个固定大小的 ByteBuffer,大小是元数据 head 的大小,8 bytes。
readFromByteChannel 具体读数据的时候,首先获取元数据,而后解析出对应的元数据信息,之后正式读数据
isEvent = headerBuffer.getShort() == HEADER_VALUE_IS_EVENT;
isCompressed = headerBuffer.getShort() == BUFFER_IS_COMPRESSED;
size = headerBuffer.getInt();
targetBuf = memorySegment.wrap(0, size);
Buffer.DataType dataType =
isEvent ? Buffer.DataType.EVENT_BUFFER : Buffer.DataType.DATA_BUFFER;
return new NetworkBuffer(memorySegment, bufferRecycler, dataType, isCompressed, size);
2.3、hasRemaining
这个办法的性能就是判断是否曾经将分区数据读取完了,同时会调用 2.1 的办法更新相应的指标。
boolean hasRemaining() throws IOException {moveToNextReadableRegion();
return currentRegionRemainingBuffers > 0;
}
3、读操作的调用
读取的调用链如下:
SortMergeResultPartitionReadScheduler.run()
->readData()
->SortMergeSubpartitionReader.readBuffers()
->PartitionedFileReader.readCurrentRegion()
其中,SortMergeResultPartitionReadScheduler 实现了 Runnable 类,也就是说,它是一个线程类,run 办法就是按线程的调度形式。SortMergeResultPartitionReadScheduler 有一个 Executor 成员,是一个线程执行类,SortMergeResultPartitionReadScheduler 的执行基于这个成员
/** Executor to run the shuffle data reading task. */
private final Executor ioExecutor;
在 mayTriggerReading() 接口中,Executor 将 SortMergeResultPartitionReadScheduler 退出了执行
private void mayTriggerReading() {assert Thread.holdsLock(lock);
if (!isRunning
&& !allReaders.isEmpty()
&& numRequestedBuffers + bufferPool.getNumBuffersPerRequest()
<= maxRequestedBuffers) {
isRunning = true;
ioExecutor.execute(this);
}
}
mayTriggerReading() 接口的调用在第一章所述调用链的 SortMergeResultPartitionReadScheduler.crateSubpartitionReader() 当中,其中还包含了 PartitionedFileReader 的创立
PartitionedFileReader fileReader = createFileReader(resultFile, targetSubpartition);
SortMergeSubpartitionReader subpartitionReader =
new SortMergeSubpartitionReader(availabilityListener, fileReader);
allReaders.add(subpartitionReader);
subpartitionReader
.getReleaseFuture()
.thenRun(() -> releaseSubpartitionReader(subpartitionReader));
mayTriggerReading();
ioExecutor 的成员变量最终起源是在在 NettyShuffleServiceFactory 当中,创立了一个 batchShuffleReadIOExecutor 的 IO 执行线程池,这个最终被接口层层传递到了 SortMergeResultPartitionReadScheduler 当中
// we create a separated IO executor pool here for batch shuffle instead of reusing the
// TaskManager IO executor pool directly to avoid the potential side effects of execution
// contention, for example, too long IO or waiting time leading to starvation or timeout
ExecutorService batchShuffleReadIOExecutor =
Executors.newFixedThreadPool(
Math.max(
1,
Math.min(batchShuffleReadBufferPool.getMaxConcurrentRequests(),
4 * Hardware.getNumberCPUCores())),
new ExecutorThreadFactory("blocking-shuffle-io"));
4、数据返回
第二章 PartitionedFileReader 简析了从文件读出数据的操作,第三章简析了读操作的触发,此章简析数据如何返回。
4.1、读入缓存
在读操作的调用链中,留神 SortMergeSubpartitionReader.readBuffers() 接口,此接口调用 PartitionedFileReader.readCurrentRegion() 实现 shuffle 数据读入 buffer,之后将该 buffer 放入一个 buffer 列表。对应如下两项调用
((buffer = fileReader.readCurrentRegion(segment, recycler)) == null) {buffers.add(segment);
break;
}
addBuffer(buffer);
在 addBuffer() 接口中,实现了数据 buffer 退出列表的操作
buffersRead.add(buffer);
4.2、buffersRead 读取
由 4.1 可知,buffersRead 寄存了读入内存的 shuffle 数据,这一步放入操作是由 blocking-shuffle-io 线程实现的,此处简析 buffersRead 读取如何被上游获取。
PartitionRequestQueue.writeAndFlushNextMessageIfPossible()
->CreditBasedSequenceNumberingViewReader.getNextBuffer()
->SortMergeSubpartitionReader.getNextBuffer()
->buffersRead.poll()
PartitionRequestQueue.writeAndFlushNextMessageIfPossible() 的调用有多个下层分支,其中一个分支是在收到上游的 AddCredit 或者 ResumeConsumption 音讯时会调用到,这两个音讯都是示意凋谢上游传输的。