序本文主要研究一下flink的ParallelIteratorInputFormat实例 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Long> dataSet = env.generateSequence(15,106) .setParallelism(3); dataSet.print();这里使用ExecutionEnvironment的generateSequence方法创建了带NumberSequenceIterator的ParallelIteratorInputFormatParallelIteratorInputFormatflink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java/** * An input format that generates data in parallel through a {@link SplittableIterator}. /@PublicEvolvingpublic class ParallelIteratorInputFormat<T> extends GenericInputFormat<T> { private static final long serialVersionUID = 1L; private final SplittableIterator<T> source; private transient Iterator<T> splitIterator; public ParallelIteratorInputFormat(SplittableIterator<T> iterator) { this.source = iterator; } @Override public void open(GenericInputSplit split) throws IOException { super.open(split); this.splitIterator = this.source.getSplit(split.getSplitNumber(), split.getTotalNumberOfSplits()); } @Override public boolean reachedEnd() { return !this.splitIterator.hasNext(); } @Override public T nextRecord(T reuse) { return this.splitIterator.next(); }}ParallelIteratorInputFormat继承了GenericInputFormat类,而GenericInputFormat类底下还有其他四个子类,分别是CRowValuesInputFormat、CollectionInputFormat、IteratorInputFormat、ValuesInputFormat,它们有一个共同的特点就是都实现了NonParallelInput接口NonParallelInputflink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/NonParallelInput.java/* * This interface acts as a marker for input formats for inputs which cannot be split. * Data sources with a non-parallel input formats are always executed with a parallelism * of one. * * @see InputFormat /@Publicpublic interface NonParallelInput {}这个接口没有定义任何方法,仅仅是一个标识,表示该InputFormat是否支持splitGenericInputFormat.createInputSplitsflink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/GenericInputFormat.java @Override public GenericInputSplit[] createInputSplits(int numSplits) throws IOException { if (numSplits < 1) { throw new IllegalArgumentException(“Number of input splits has to be at least 1.”); } numSplits = (this instanceof NonParallelInput) ? 1 : numSplits; GenericInputSplit[] splits = new GenericInputSplit[numSplits]; for (int i = 0; i < splits.length; i++) { splits[i] = new GenericInputSplit(i, numSplits); } return splits; }GenericInputFormat的createInputSplits方法对输入的numSplits进行了限制,如果小于1则抛出IllegalArgumentException异常,如果当前InputFormat有实现NonParallelInput接口,则将numSplits重置为1ExecutionEnvironment.fromParallelCollectionflink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java /* * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the * framework to create a parallel data source that returns the elements in the iterator. * * <p>Because the iterator will remain unmodified until the actual execution happens, the type of data * returned by the iterator must be given explicitly in the form of the type class (this is due to the * fact that the Java compiler erases the generic type information). * * @param iterator The iterator that produces the elements of the data set. * @param type The class of the data produced by the iterator. Must not be a generic class. * @return A DataSet representing the elements in the iterator. * * @see #fromParallelCollection(SplittableIterator, TypeInformation) / public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, Class<X> type) { return fromParallelCollection(iterator, TypeExtractor.getForClass(type)); } /* * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the * framework to create a parallel data source that returns the elements in the iterator. * * <p>Because the iterator will remain unmodified until the actual execution happens, the type of data * returned by the iterator must be given explicitly in the form of the type information. * This method is useful for cases where the type is generic. In that case, the type class * (as given in {@link #fromParallelCollection(SplittableIterator, Class)} does not supply all type information. * * @param iterator The iterator that produces the elements of the data set. * @param type The TypeInformation for the produced data set. * @return A DataSet representing the elements in the iterator. * * @see #fromParallelCollection(SplittableIterator, Class) / public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type) { return fromParallelCollection(iterator, type, Utils.getCallLocationName()); } // private helper for passing different call location names private <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type, String callLocationName) { return new DataSource<>(this, new ParallelIteratorInputFormat<>(iterator), type, callLocationName); } /* * Creates a new data set that contains a sequence of numbers. The data set will be created in parallel, * so there is no guarantee about the order of the elements. * * @param from The number to start at (inclusive). * @param to The number to stop at (inclusive). * @return A DataSet, containing all number in the {@code [from, to]} interval. / public DataSource<Long> generateSequence(long from, long to) { return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, Utils.getCallLocationName()); }ExecutionEnvironment的fromParallelCollection方法,针对SplittableIterator类型的iterator,会创建ParallelIteratorInputFormat;generateSequence方法也调用了fromParallelCollection方法,它创建的是NumberSequenceIterator(是SplittableIterator的子类)SplittableIteratorflink-core-1.6.2-sources.jar!/org/apache/flink/util/SplittableIterator.java/* * Abstract base class for iterators that can split themselves into multiple disjoint * iterators. The union of these iterators returns the original iterator values. * * @param <T> The type of elements returned by the iterator. /@Publicpublic abstract class SplittableIterator<T> implements Iterator<T>, Serializable { private static final long serialVersionUID = 200377674313072307L; /* * Splits this iterator into a number disjoint iterators. * The union of these iterators returns the original iterator values. * * @param numPartitions The number of iterators to split into. * @return An array with the split iterators. / public abstract Iterator<T>[] split(int numPartitions); /* * Splits this iterator into <i>n</i> partitions and returns the <i>i-th</i> partition * out of those. * * @param num The partition to return (<i>i</i>). * @param numPartitions The number of partitions to split into (<i>n</i>). * @return The iterator for the partition. / public Iterator<T> getSplit(int num, int numPartitions) { if (numPartitions < 1 || num < 0 || num >= numPartitions) { throw new IllegalArgumentException(); } return split(numPartitions)[num]; } /* * The maximum number of splits into which this iterator can be split up. * * @return The maximum number of splits into which this iterator can be split up. / public abstract int getMaximumNumberOfSplits();}SplittableIterator是个抽象类,它定义了抽象方法split以及getMaximumNumberOfSplits;它有两个实现类,分别是LongValueSequenceIterator以及NumberSequenceIterator,这里我们看下NumberSequenceIteratorNumberSequenceIteratorflink-core-1.6.2-sources.jar!/org/apache/flink/util/NumberSequenceIterator.java/* * The {@code NumberSequenceIterator} is an iterator that returns a sequence of numbers (as {@code Long})s. * The iterator is splittable (as defined by {@link SplittableIterator}, i.e., it can be divided into multiple * iterators that each return a subsequence of the number sequence. /@Publicpublic class NumberSequenceIterator extends SplittableIterator<Long> { private static final long serialVersionUID = 1L; /* The last number returned by the iterator. / private final long to; /* The next number to be returned. / private long current; /* * Creates a new splittable iterator, returning the range [from, to]. * Both boundaries of the interval are inclusive. * * @param from The first number returned by the iterator. * @param to The last number returned by the iterator. */ public NumberSequenceIterator(long from, long to) { if (from > to) { throw new IllegalArgumentException(“The ’to’ value must not be smaller than the ‘from’ value.”); } this.current = from; this.to = to; } @Override public boolean hasNext() { return current <= to; } @Override public Long next() { if (current <= to) { return current++; } else { throw new NoSuchElementException(); } } @Override public NumberSequenceIterator[] split(int numPartitions) { if (numPartitions < 1) { throw new IllegalArgumentException(“The number of partitions must be at least 1.”); } if (numPartitions == 1) { return new NumberSequenceIterator[] { new NumberSequenceIterator(current, to) }; } // here, numPartitions >= 2 !!! long elementsPerSplit; if (to - current + 1 >= 0) { elementsPerSplit = (to - current + 1) / numPartitions; } else { // long overflow of the range. // we compute based on half the distance, to prevent the overflow. // in most cases it holds that: current < 0 and to > 0, except for: to == 0 and current == Long.MIN_VALUE // the later needs a special case final long halfDiff; // must be positive if (current == Long.MIN_VALUE) { // this means to >= 0 halfDiff = (Long.MAX_VALUE / 2 + 1) + to / 2; } else { long posFrom = -current; if (posFrom > to) { halfDiff = to + ((posFrom - to) / 2); } else { halfDiff = posFrom + ((to - posFrom) / 2); } } elementsPerSplit = halfDiff / numPartitions * 2; } if (elementsPerSplit < Long.MAX_VALUE) { // figure out how many get one in addition long numWithExtra = -(elementsPerSplit * numPartitions) + to - current + 1; // based on rounding errors, we may have lost one) if (numWithExtra > numPartitions) { elementsPerSplit++; numWithExtra -= numPartitions; if (numWithExtra > numPartitions) { throw new RuntimeException(“Bug in splitting logic. To much rounding loss.”); } } NumberSequenceIterator[] iters = new NumberSequenceIterator[numPartitions]; long curr = current; int i = 0; for (; i < numWithExtra; i++) { long next = curr + elementsPerSplit + 1; iters[i] = new NumberSequenceIterator(curr, next - 1); curr = next; } for (; i < numPartitions; i++) { long next = curr + elementsPerSplit; iters[i] = new NumberSequenceIterator(curr, next - 1, true); curr = next; } return iters; } else { // this can only be the case when there are two partitions if (numPartitions != 2) { throw new RuntimeException(“Bug in splitting logic.”); } return new NumberSequenceIterator[] { new NumberSequenceIterator(current, current + elementsPerSplit), new NumberSequenceIterator(current + elementsPerSplit, to) }; } } @Override public int getMaximumNumberOfSplits() { if (to >= Integer.MAX_VALUE || current <= Integer.MIN_VALUE || to - current + 1 >= Integer.MAX_VALUE) { return Integer.MAX_VALUE; } else { return (int) (to - current + 1); } } //……}NumberSequenceIterator的构造器提供了from及to两个参数,它内部有一个current值,初始的时候等于fromsplit方法首先根据numPartitions,来计算elementsPerSplit,当to - current + 1 >= 0时,计算公式为(to - current + 1) / numPartitions之后根据计算出来的elementsPerSplit来计算numWithExtra,这是因为计算elementsPerSplit的时候用的是取整操作,如果每一批都按elementsPerSplit,可能存在多余的,于是就算出这个多余的numWithExtra,如果它大于numPartitions,则对elementsPerSplit增加1,然后对numWithExtra减去numPartitions最后就是先根据numWithExtra来循环分配前numWithExtra个批次,将多余的numWithExtra平均分配给前numWithExtra个批次;numWithExtra之后到numPartitions的批次,就正常的使用from + elementsPerSplit -1来计算togetMaximumNumberOfSplits则是返回可以split的最大数量,(to >= Integer.MAX_VALUE || current <= Integer.MIN_VALUE || to - current + 1 >= Integer.MAX_VALUE)的条件下返回Integer.MAX_VALUE,否则返回(int) (to - current + 1)小结GenericInputFormat类底下有五个子类,除了ParallelIteratorInputFormat外,其他的分别是CRowValuesInputFormat、CollectionInputFormat、IteratorInputFormat、ValuesInputFormat,后面这四个子类有一个共同的特点就是都实现了NonParallelInput接口GenericInputFormat的createInputSplits会对输入的numSplits进行限制,如果是NonParallelInput类型的,则强制重置为1NumberSequenceIterator是SplittableIterator的一个实现类,在ExecutionEnvironment的fromParallelCollection方法,generateSequence方法(它创建的是NumberSequenceIterator),针对SplittableIterator类型的iterator,创建ParallelIteratorInputFormat;而NumberSequenceIterator的split方法,它先计算elementsPerSplit,然后计算numWithExtra,把numWithExtra均分到前面几个批次,最后在按elementsPerSplit均分剩余的批次docParallelIteratorInputFormatSplittableIteratorNumberSequenceIterator