1、SortMergeResultPartition的创立应用
首先是一个读过程的一个调用链
PartitionRequestServerHandler.channelRead0() ->CreditBasedSequenceNumberingViewReader.requestSubpartitionView() ->ResultPartitionManager.createSubpartitionView() ->SortMergeResultPartition.createSubpartitionView() ->SortMergeResultPartitionReadScheduler.crateSubpartitionReader() ->createFileReader()->new PartitionedFileReader()
SortMergeResultPartition的创立,由上一篇写出篇可知,SortMergeResultPartition是在ResultPartitionFactory创立的。首先SortMergeResultPartition对象的创立调用链:
new Task() ->NettyShuffleEnvironment.createResultPartitionWriters() ->ResultPartitionFactory.create()
之后调用ConsumableNotifyingResultPartitionWriterDecorator.decorate()封装进Task的成员consumableNotifyingPartitionWriters,再之后是注册治理这个SortMergeResultPartition:
Task.doRun() ->setupPartitionsAndGates() ->consumableNotifyingPartitionWriters: SortMergeResultPartition.setup() ->ResultPartition.setup() ->ResultPartitionManager.registerResultPartition(this) ->registeredPartitions.put()
以上注册进了registeredPartitions的列表当中(registeredPartitions是ResultPartitionManager的成员变量),再依据第一个调用链,在createSubpartitionView()的时候从列表获取应用
有一点须要留神的是,shuffle文件在工作完结的时候才会实现全副写出(次要是index文件),也就是PartitionedFile在Task完结才会创立,之后文件追随TaskManager的对立治理,也就是ResultPartitionManager。也就是说,这里的读过程并不是上游来上游工作读的过程,而是对上游输入的读的一个解决。
整个治理相干的链路如下:
TaskManagerServices.createShuffleEnvironment() ->NettyShuffleServiceFactory.createShuffleEnvironment() ->createNettyShuffleEnvironment()-> new ResultPartitionManager() ->new NettyConnectionManager() ->new NettyProtocol() -> 成员 partitionProvider ->new PartitionRequestServerHandler(partitionProvider,...)
最初跟第一个链路关联上了,partitionProvider即第一个链路中的ResultPartitionManager
PartitionedFile是在工作完结的时候实现对象的创立的,如下在Task.doRun()中,会调用实现ResultPartition的输入
// finish the produced partitions. if this fails, we consider the execution failed.for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) { if (partitionWriter != null) { partitionWriter.finish(); }}
最终调用到PartitionedFileWriter的finish()接口,实现PartitionedFile对象的创立
public PartitionedFile finish() throws IOException { ...... ...... return new PartitionedFile( numRegions, numSubpartitions, dataFilePath, indexFilePath, dataFileSize, indexFileSize, numBuffers, indexEntryCache);}
2、PartitionedFileReader
这个类读取的原理能够解释原理章节形容的信息,即Flink一个分区的文件写在多个region中,写完之后并没有再消耗资源从新进行排序将分区数据聚合,而是在读取的时候,通过伎俩将跨region的数据一起读出来。
这个类是sort shuffle文件最上层的文件阅读器,负责从shuffle文件中读取数据返回下层,次要有三个办法。几个重要的读取应用的标记位成员变量如下。留神其中的targetSubpartition成员,该变量是final的,在初始化赋值当前只读不扭转,也就是说,每个PartitionedFileReader对应读取一个分区的数据
/** Target subpartition to read. */private final int targetSubpartition;/** Next data region to be read. */private int nextRegionToRead;/** Next file offset to be read. */private long nextOffsetToRead;/** Number of remaining buffers in the current data region read. */private int currentRegionRemainingBuffers;
2.1、moveToNextReadableRegion
性能是将阅读器的各项指标设置到下一个可读的region。应用这个类的时候,第一次读取会有一轮空读,而后调用到这个接口,实现各项指标的指向,之后才开始读取数据。
while (currentRegionRemainingBuffers <= 0 && nextRegionToRead < partitionedFile.getNumRegions()) { partitionedFile.getIndexEntry( indexFileChannel, indexEntryBuf, nextRegionToRead, targetSubpartition); nextOffsetToRead = indexEntryBuf.getLong(); currentRegionRemainingBuffers = indexEntryBuf.getInt(); ++nextRegionToRead;}
while的循环条件是两个:1、以后region读完;2、未达到最初的region。
getIndexEntry办法用于获取索引,依据写流程的章节,只有buffer有余时才会将index写出到文件,也就是说,buffer没有用完的话,index是存储在buffer中的,不须要去文件中读。如下,依据cache条件,别离从内存或文件获取index
/** * Gets the index entry of the target region and subpartition either from the index data cache * or the index data file. */void getIndexEntry(FileChannel indexFile, ByteBuffer target, int region, int subpartition) throws IOException { checkArgument(target.capacity() == INDEX_ENTRY_SIZE, "Illegal target buffer size."); target.clear(); long indexEntryOffset = getIndexEntryOffset(region, subpartition); if (indexEntryCache != null) { for (int i = 0; i < INDEX_ENTRY_SIZE; ++i) { target.put(indexEntryCache.get((int) indexEntryOffset + i)); } } else { indexFile.position(indexEntryOffset); BufferReaderWriterUtil.readByteBufferFully(indexFile, target); } target.flip();}
缓存读取依据index占位数,循环从缓存中读取对应的字节数;文件读取,先跳转到文件指定地位,而后因为提供的读数据的buffer大小为index的大小,所以buffer大小用完即示意读取了一个index
getIndexEntryOffset办法用于获取以后须要的数据的index的地位,依据index存储规定,间接计算取得,如下依据region号、partition号以及index占位数间接获取后果
private long getIndexEntryOffset(int region, int subpartition) { checkArgument(region >= 0 && region < getNumRegions(), "Illegal target region."); checkArgument( subpartition >= 0 && subpartition < numSubpartitions, "Subpartition index out of bound."); return (((long) region) * numSubpartitions + subpartition) * INDEX_ENTRY_SIZE;}
index获取实现当前依据index的内容,更新相干读取指标:1、读取地位;2、读取数量
2.2、readCurrentRegion
性能是从shuffle文件中读取对应分区的数据。依据相应的指标,定位到文件的具体位置,接着先解析元数据头,获取数据的相干信息,之后依据元数据中表明的数据大小,读取数据。
Buffer readCurrentRegion(MemorySegment target, BufferRecycler recycler) throws IOException { if (currentRegionRemainingBuffers == 0) { return null; } dataFileChannel.position(nextOffsetToRead); Buffer buffer = readFromByteChannel(dataFileChannel, headerBuf, target, recycler); nextOffsetToRead = dataFileChannel.position(); --currentRegionRemainingBuffers; return buffer;}
其中的headerBuf是一个固定大小的ByteBuffer,大小是元数据head的大小,8 bytes。
readFromByteChannel具体读数据的时候,首先获取元数据,而后解析出对应的元数据信息,之后正式读数据
isEvent = headerBuffer.getShort() == HEADER_VALUE_IS_EVENT;isCompressed = headerBuffer.getShort() == BUFFER_IS_COMPRESSED;size = headerBuffer.getInt();targetBuf = memorySegment.wrap(0, size);Buffer.DataType dataType = isEvent ? Buffer.DataType.EVENT_BUFFER : Buffer.DataType.DATA_BUFFER;return new NetworkBuffer(memorySegment, bufferRecycler, dataType, isCompressed, size);
2.3、hasRemaining
这个办法的性能就是判断是否曾经将分区数据读取完了,同时会调用2.1的办法更新相应的指标。
boolean hasRemaining() throws IOException { moveToNextReadableRegion(); return currentRegionRemainingBuffers > 0;}
3、读操作的调用
读取的调用链如下:
SortMergeResultPartitionReadScheduler.run() ->readData() ->SortMergeSubpartitionReader.readBuffers() ->PartitionedFileReader.readCurrentRegion()
其中,SortMergeResultPartitionReadScheduler实现了Runnable类,也就是说,它是一个线程类,run办法就是按线程的调度形式。SortMergeResultPartitionReadScheduler有一个Executor成员,是一个线程执行类,SortMergeResultPartitionReadScheduler的执行基于这个成员
/** Executor to run the shuffle data reading task. */private final Executor ioExecutor; 在mayTriggerReading()接口中,Executor将SortMergeResultPartitionReadScheduler退出了执行private void mayTriggerReading() { assert Thread.holdsLock(lock); if (!isRunning && !allReaders.isEmpty() && numRequestedBuffers + bufferPool.getNumBuffersPerRequest() <= maxRequestedBuffers) { isRunning = true; ioExecutor.execute(this); }}
mayTriggerReading()接口的调用在第一章所述调用链的SortMergeResultPartitionReadScheduler.crateSubpartitionReader()当中,其中还包含了PartitionedFileReader的创立
PartitionedFileReader fileReader = createFileReader(resultFile, targetSubpartition);SortMergeSubpartitionReader subpartitionReader = new SortMergeSubpartitionReader(availabilityListener, fileReader);allReaders.add(subpartitionReader);subpartitionReader .getReleaseFuture() .thenRun(() -> releaseSubpartitionReader(subpartitionReader));mayTriggerReading();
ioExecutor的成员变量最终起源是在在NettyShuffleServiceFactory当中,创立了一个batchShuffleReadIOExecutor的IO执行线程池,这个最终被接口层层传递到了SortMergeResultPartitionReadScheduler当中
// we create a separated IO executor pool here for batch shuffle instead of reusing the// TaskManager IO executor pool directly to avoid the potential side effects of execution// contention, for example, too long IO or waiting time leading to starvation or timeoutExecutorService batchShuffleReadIOExecutor = Executors.newFixedThreadPool( Math.max( 1, Math.min( batchShuffleReadBufferPool.getMaxConcurrentRequests(), 4 * Hardware.getNumberCPUCores())), new ExecutorThreadFactory("blocking-shuffle-io"));
4、数据返回
第二章PartitionedFileReader简析了从文件读出数据的操作,第三章简析了读操作的触发,此章简析数据如何返回。
4.1、读入缓存
在读操作的调用链中,留神SortMergeSubpartitionReader.readBuffers()接口,此接口调用PartitionedFileReader.readCurrentRegion()实现shuffle数据读入buffer,之后将该buffer放入一个buffer列表。对应如下两项调用
((buffer = fileReader.readCurrentRegion(segment, recycler)) == null) { buffers.add(segment); break;} addBuffer(buffer);
在addBuffer()接口中,实现了数据buffer退出列表的操作
buffersRead.add(buffer);
4.2、buffersRead读取
由4.1可知,buffersRead寄存了读入内存的shuffle数据,这一步放入操作是由blocking-shuffle-io线程实现的,此处简析buffersRead读取如何被上游获取。
PartitionRequestQueue.writeAndFlushNextMessageIfPossible() ->CreditBasedSequenceNumberingViewReader.getNextBuffer() ->SortMergeSubpartitionReader.getNextBuffer() ->buffersRead.poll()
PartitionRequestQueue.writeAndFlushNextMessageIfPossible()的调用有多个下层分支,其中一个分支是在收到上游的AddCredit或者ResumeConsumption音讯时会调用到,这两个音讯都是示意凋谢上游传输的。