本文主要研究一下flink jdbc的ParameterValuesProvider

ParameterValuesProvider

flink-jdbc_2.11-1.8.0-sources.jar!/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java

/** * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits). * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider} * implementation. */public interface ParameterValuesProvider {    /** Returns the necessary parameters array to use for query in parallel a table. */    Serializable[][] getParameterValues();}
  • ParameterValuesProvider接口定义了getParameterValues方法,用于返回并行表查询所需的参数,该参数主要是用于将一个大的sql查询分为几个分段查询用于并行处理;它有两个实现类分别是GenericParameterValuesProvider及NumericBetweenParametersProvider

GenericParameterValuesProvider

flink-jdbc_2.11-1.8.0-sources.jar!/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java

/** * This splits generator actually does nothing but wrapping the query parameters * computed by the user before creating the {@link JDBCInputFormat} instance. */public class GenericParameterValuesProvider implements ParameterValuesProvider {    private final Serializable[][] parameters;    public GenericParameterValuesProvider(Serializable[][] parameters) {        this.parameters = parameters;    }    @Override    public Serializable[][] getParameterValues(){        //do nothing...precomputed externally        return parameters;    }}
  • GenericParameterValuesProvider实际上没有做其他事情,它实现的getParameterValues方法返回的值是构造器要求输入的

NumericBetweenParametersProvider

flink-jdbc_2.11-1.8.0-sources.jar!/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java

/** * This query parameters generator is an helper class to parameterize from/to queries on a numeric column. * The generated array of from/to values will be equally sized to fetchSize (apart from the last one), * ranging from minVal up to maxVal. * * <p>For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like: * <PRE> *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ? * </PRE> * * <p>You can take advantage of this class to automatically generate the parameters of the BETWEEN clause, * based on the passed constructor parameters. * */public class NumericBetweenParametersProvider implements ParameterValuesProvider {    private final long fetchSize;    private final long minVal;    private final long maxVal;    /**     * NumericBetweenParametersProvider constructor.     *     * @param fetchSize the max distance between the produced from/to pairs     * @param minVal the lower bound of the produced "from" values     * @param maxVal the upper bound of the produced "to" values     */    public NumericBetweenParametersProvider(long fetchSize, long minVal, long maxVal) {        checkArgument(fetchSize > 0, "Fetch size must be greater than 0.");        checkArgument(minVal <= maxVal, "Min value cannot be greater than max value.");        this.fetchSize = fetchSize;        this.minVal = minVal;        this.maxVal = maxVal;    }    @Override    public Serializable[][] getParameterValues() {        double maxElemCount = (maxVal - minVal) + 1;        int numBatches = new Double(Math.ceil(maxElemCount / fetchSize)).intValue();        Serializable[][] parameters = new Serializable[numBatches][2];        int batchIndex = 0;        for (long start = minVal; start <= maxVal; start += fetchSize, batchIndex++) {            long end = start + fetchSize - 1;            if (end > maxVal) {                end = maxVal;            }            parameters[batchIndex] = new Long[]{start, end};        }        return parameters;    }}
  • NumericBetweenParametersProvider为基于numeric主键的范围查询(WHERE id BETWEEN ? AND ?)自动生成了分段的参数,其构造器要求输入每次的fetchSize、最小值minVal、最大值maxVal;getParameterValues方法会根据这几个值计算numBatches,然后计算好分段的参数值

JDBCInputFormat

flink-jdbc_2.11-1.8.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java

public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable<Row> {    private static final long serialVersionUID = 1L;    private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);    private String username;    private String password;    private String drivername;    private String dbURL;    private String queryTemplate;    private int resultSetType;    private int resultSetConcurrency;    private RowTypeInfo rowTypeInfo;    private transient Connection dbConn;    private transient PreparedStatement statement;    private transient ResultSet resultSet;    private int fetchSize;    private boolean hasNext;    private Object[][] parameterValues;    public JDBCInputFormat() {    }    @Override    public RowTypeInfo getProducedType() {        return rowTypeInfo;    }    @Override    public void configure(Configuration parameters) {        //do nothing here    }    @Override    public void openInputFormat() {        //called once per inputFormat (on open)        try {            Class.forName(drivername);            if (username == null) {                dbConn = DriverManager.getConnection(dbURL);            } else {                dbConn = DriverManager.getConnection(dbURL, username, password);            }            statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);            if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {                statement.setFetchSize(fetchSize);            }        } catch (SQLException se) {            throw new IllegalArgumentException("open() failed." + se.getMessage(), se);        } catch (ClassNotFoundException cnfe) {            throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);        }    }    @Override    public void closeInputFormat() {        //called once per inputFormat (on close)        try {            if (statement != null) {                statement.close();            }        } catch (SQLException se) {            LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage());        } finally {            statement = null;        }        try {            if (dbConn != null) {                dbConn.close();            }        } catch (SQLException se) {            LOG.info("Inputformat couldn't be closed - " + se.getMessage());        } finally {            dbConn = null;        }        parameterValues = null;    }    /**     * Connects to the source database and executes the query in a <b>parallel     * fashion</b> if     * this {@link InputFormat} is built using a parameterized query (i.e. using     * a {@link PreparedStatement})     * and a proper {@link ParameterValuesProvider}, in a <b>non-parallel     * fashion</b> otherwise.     *     * @param inputSplit which is ignored if this InputFormat is executed as a     *        non-parallel source,     *        a "hook" to the query parameters otherwise (using its     *        <i>splitNumber</i>)     * @throws IOException if there's an error during the execution of the query     */    @Override    public void open(InputSplit inputSplit) throws IOException {        try {            if (inputSplit != null && parameterValues != null) {                for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {                    Object param = parameterValues[inputSplit.getSplitNumber()][i];                    if (param instanceof String) {                        statement.setString(i + 1, (String) param);                    } else if (param instanceof Long) {                        statement.setLong(i + 1, (Long) param);                    } else if (param instanceof Integer) {                        statement.setInt(i + 1, (Integer) param);                    } else if (param instanceof Double) {                        statement.setDouble(i + 1, (Double) param);                    } else if (param instanceof Boolean) {                        statement.setBoolean(i + 1, (Boolean) param);                    } else if (param instanceof Float) {                        statement.setFloat(i + 1, (Float) param);                    } else if (param instanceof BigDecimal) {                        statement.setBigDecimal(i + 1, (BigDecimal) param);                    } else if (param instanceof Byte) {                        statement.setByte(i + 1, (Byte) param);                    } else if (param instanceof Short) {                        statement.setShort(i + 1, (Short) param);                    } else if (param instanceof Date) {                        statement.setDate(i + 1, (Date) param);                    } else if (param instanceof Time) {                        statement.setTime(i + 1, (Time) param);                    } else if (param instanceof Timestamp) {                        statement.setTimestamp(i + 1, (Timestamp) param);                    } else if (param instanceof Array) {                        statement.setArray(i + 1, (Array) param);                    } else {                        //extends with other types if needed                        throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet).");                    }                }                if (LOG.isDebugEnabled()) {                    LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));                }            }            resultSet = statement.executeQuery();            hasNext = resultSet.next();        } catch (SQLException se) {            throw new IllegalArgumentException("open() failed." + se.getMessage(), se);        }    }    /**     * Closes all resources used.     *     * @throws IOException Indicates that a resource could not be closed.     */    @Override    public void close() throws IOException {        if (resultSet == null) {            return;        }        try {            resultSet.close();        } catch (SQLException se) {            LOG.info("Inputformat ResultSet couldn't be closed - " + se.getMessage());        }    }    /**     * Checks whether all data has been read.     *     * @return boolean value indication whether all data has been read.     * @throws IOException     */    @Override    public boolean reachedEnd() throws IOException {        return !hasNext;    }    /**     * Stores the next resultSet row in a tuple.     *     * @param row row to be reused.     * @return row containing next {@link Row}     * @throws java.io.IOException     */    @Override    public Row nextRecord(Row row) throws IOException {        try {            if (!hasNext) {                return null;            }            for (int pos = 0; pos < row.getArity(); pos++) {                row.setField(pos, resultSet.getObject(pos + 1));            }            //update hasNext after we've read the record            hasNext = resultSet.next();            return row;        } catch (SQLException se) {            throw new IOException("Couldn't read data - " + se.getMessage(), se);        } catch (NullPointerException npe) {            throw new IOException("Couldn't access resultSet", npe);        }    }    @Override    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {        return cachedStatistics;    }    @Override    public InputSplit[] createInputSplits(int minNumSplits) throws IOException {        if (parameterValues == null) {            return new GenericInputSplit[]{new GenericInputSplit(0, 1)};        }        GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length];        for (int i = 0; i < ret.length; i++) {            ret[i] = new GenericInputSplit(i, ret.length);        }        return ret;    }    @Override    public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {        return new DefaultInputSplitAssigner(inputSplits);    }    @VisibleForTesting    PreparedStatement getStatement() {        return statement;    }    //......}
  • JDBCInputFormat继承了RichInputFormat,同时实现了ResultTypeQueryable接口
  • createInputSplits方法会根据parameterValues来创建GenericInputSplit数组,如果parameterValues为null则默认创建的totalNumberOfPartitions为1
  • getInputSplitAssigner方法根据InputSplit数组创建了DefaultInputSplitAssigner;getStatistics方法返回的是方法参数cachedStatistics
  • openInputFormat方法主要是获取数据库连接,准备好statement;closeInputFormat方法主要是关闭statement以及关闭数据库连接
  • open方法接收inputSplit,其主要是根据inputSplit从parameterValues提取查询参数,并设置到statement,之后执行statement.executeQuery()获取resultSet;nextRecord方法主要是遍历resultSet读取数据;close方法主要是关闭resultSet

InputSplit

/flink-core-1.8.0-sources.jar!/org/apache/flink/core/io/InputSplit.java

/** * This interface must be implemented by all kind of input splits that can be assigned to input formats. *  * <p>Input splits are transferred in serialized form via the messages, so they need to be serializable * as defined by {@link java.io.Serializable}.</p> */@Publicpublic interface InputSplit extends Serializable {        /**     * Returns the number of this input split.     *      * @return the number of this input split     */    int getSplitNumber();}
  • InputSplit接口定义了getSplitNumber方法用于返回当前input的split number

GenericInputSplit

flink-core-1.8.0-sources.jar!/org/apache/flink/core/io/GenericInputSplit.java

/** * A generic input split that has only a partition number. */@Publicpublic class GenericInputSplit implements InputSplit, java.io.Serializable {    private static final long serialVersionUID = 1L;    /** The number of this split. */    private final int partitionNumber;    /** The total number of partitions */    private final int totalNumberOfPartitions;        // --------------------------------------------------------------------------------------------    /**     * Creates a generic input split with the given split number.     *      * @param partitionNumber The number of the split's partition.     * @param totalNumberOfPartitions The total number of the splits (partitions).     */    public GenericInputSplit(int partitionNumber, int totalNumberOfPartitions) {        this.partitionNumber = partitionNumber;        this.totalNumberOfPartitions = totalNumberOfPartitions;    }    // --------------------------------------------------------------------------------------------    @Override    public int getSplitNumber() {        return this.partitionNumber;    }        public int getTotalNumberOfSplits() {        return this.totalNumberOfPartitions;    }        // --------------------------------------------------------------------------------------------    @Override    public int hashCode() {        return this.partitionNumber ^ this.totalNumberOfPartitions;    }        @Override    public boolean equals(Object obj) {        if (obj instanceof GenericInputSplit) {            GenericInputSplit other = (GenericInputSplit) obj;            return this.partitionNumber == other.partitionNumber &&                    this.totalNumberOfPartitions == other.totalNumberOfPartitions;        } else {            return false;        }    }        public String toString() {        return "GenericSplit (" + this.partitionNumber + '/' + this.totalNumberOfPartitions + ')';    }}
  • GenericInputSplit实现了InputSplit接口,其getSplitNumber方法返回的是partitionNumber

InputSplitAssigner

flink-core-1.8.0-sources.jar!/org/apache/flink/core/io/InputSplitAssigner.java

/** * An input split assigner distributes the {@link InputSplit}s among the instances on which a * data source exists. */@PublicEvolvingpublic interface InputSplitAssigner {    /**     * Returns the next input split that shall be consumed. The consumer's host is passed as a parameter     * to allow localized assignments.     *      * @param host The host address of split requesting task.     * @param taskId The id of the split requesting task.     * @return the next input split to be consumed, or <code>null</code> if no more splits remain.     */    InputSplit getNextInputSplit(String host, int taskId);}
  • InputSplitAssigner接口定义了getNextInputSplit方法,其方法接收两个参数分别是host及taskId,该方法用于返回下一个inputSplit

DefaultInputSplitAssigner

flink-core-1.8.0-sources.jar!/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java

/** * This is the default implementation of the {@link InputSplitAssigner} interface. The default input split assigner * simply returns all input splits of an input vertex in the order they were originally computed. */@Internalpublic class DefaultInputSplitAssigner implements InputSplitAssigner {    /** The logging object used to report information and errors. */    private static final Logger LOG = LoggerFactory.getLogger(DefaultInputSplitAssigner.class);    /** The list of all splits */    private final List<InputSplit> splits = new ArrayList<InputSplit>();    public DefaultInputSplitAssigner(InputSplit[] splits) {        Collections.addAll(this.splits, splits);    }        public DefaultInputSplitAssigner(Collection<? extends InputSplit> splits) {        this.splits.addAll(splits);    }            @Override    public InputSplit getNextInputSplit(String host, int taskId) {        InputSplit next = null;                // keep the synchronized part short        synchronized (this.splits) {            if (this.splits.size() > 0) {                next = this.splits.remove(this.splits.size() - 1);            }        }                if (LOG.isDebugEnabled()) {            if (next == null) {                LOG.debug("No more input splits available");            } else {                LOG.debug("Assigning split " + next + " to " + host);            }        }        return next;    }}
  • DefaultInputSplitAssigner是InputSplitAssigner的默认实现,其getNextInputSplit方法会使用synchronized修改splits值,移除最后一个元素

InputFormatSourceFunction

flink-streaming-java_2.11-1.8.0-sources.jar!/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java

@Internalpublic class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {    private static final long serialVersionUID = 1L;    private TypeInformation<OUT> typeInfo;    private transient TypeSerializer<OUT> serializer;    private InputFormat<OUT, InputSplit> format;    private transient InputSplitProvider provider;    private transient Iterator<InputSplit> splitIterator;    private volatile boolean isRunning = true;    @SuppressWarnings("unchecked")    public InputFormatSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {        this.format = (InputFormat<OUT, InputSplit>) format;        this.typeInfo = typeInfo;    }    @Override    @SuppressWarnings("unchecked")    public void open(Configuration parameters) throws Exception {        StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();        if (format instanceof RichInputFormat) {            ((RichInputFormat) format).setRuntimeContext(context);        }        format.configure(parameters);        provider = context.getInputSplitProvider();        serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());        splitIterator = getInputSplits();        isRunning = splitIterator.hasNext();    }    @Override    public void run(SourceContext<OUT> ctx) throws Exception {        try {            Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");            if (isRunning && format instanceof RichInputFormat) {                ((RichInputFormat) format).openInputFormat();            }            OUT nextElement = serializer.createInstance();            while (isRunning) {                format.open(splitIterator.next());                // for each element we also check if cancel                // was called by checking the isRunning flag                while (isRunning && !format.reachedEnd()) {                    nextElement = format.nextRecord(nextElement);                    if (nextElement != null) {                        ctx.collect(nextElement);                    } else {                        break;                    }                }                format.close();                completedSplitsCounter.inc();                if (isRunning) {                    isRunning = splitIterator.hasNext();                }            }        } finally {            format.close();            if (format instanceof RichInputFormat) {                ((RichInputFormat) format).closeInputFormat();            }            isRunning = false;        }    }    @Override    public void cancel() {        isRunning = false;    }    @Override    public void close() throws Exception {        format.close();        if (format instanceof RichInputFormat) {            ((RichInputFormat) format).closeInputFormat();        }    }    /**     * Returns the {@code InputFormat}. This is only needed because we need to set the input     * split assigner on the {@code StreamGraph}.     */    public InputFormat<OUT, InputSplit> getFormat() {        return format;    }    private Iterator<InputSplit> getInputSplits() {        return new Iterator<InputSplit>() {            private InputSplit nextSplit;            private boolean exhausted;            @Override            public boolean hasNext() {                if (exhausted) {                    return false;                }                if (nextSplit != null) {                    return true;                }                final InputSplit split;                try {                    split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader());                } catch (InputSplitProviderException e) {                    throw new RuntimeException("Could not retrieve next input split.", e);                }                if (split != null) {                    this.nextSplit = split;                    return true;                } else {                    exhausted = true;                    return false;                }            }            @Override            public InputSplit next() {                if (this.nextSplit == null && !hasNext()) {                    throw new NoSuchElementException();                }                final InputSplit tmp = this.nextSplit;                this.nextSplit = null;                return tmp;            }            @Override            public void remove() {                throw new UnsupportedOperationException();            }        };    }}
  • InputFormatSourceFunction的splitIterator的hasNext()方法会使用provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader())来获取nextInputSplit,其provider为RpcInputSplitProvider

InputSplitProvider

flink-runtime_2.11-1.8.0-sources.jar!/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java

/** * An input split provider can be successively queried to provide a series of {@link InputSplit} objects a * task is supposed to consume in the course of its execution. */@Publicpublic interface InputSplitProvider {    /**     * Requests the next input split to be consumed by the calling task.     *     * @param userCodeClassLoader used to deserialize input splits     * @return the next input split to be consumed by the calling task or <code>null</code> if the     *         task shall not consume any further input splits.     * @throws InputSplitProviderException if fetching the next input split fails     */    InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException;}
  • InputSplitProvider接口定义了getNextInputSplit方法,用于给每个task调用获取它要处理的inputSplit

RpcInputSplitProvider

flink-runtime_2.11-1.8.0-sources.jar!/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java

public class RpcInputSplitProvider implements InputSplitProvider {    private final JobMasterGateway jobMasterGateway;    private final JobVertexID jobVertexID;    private final ExecutionAttemptID executionAttemptID;    private final Time timeout;    public RpcInputSplitProvider(            JobMasterGateway jobMasterGateway,            JobVertexID jobVertexID,            ExecutionAttemptID executionAttemptID,            Time timeout) {        this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);        this.jobVertexID = Preconditions.checkNotNull(jobVertexID);        this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID);        this.timeout = Preconditions.checkNotNull(timeout);    }    @Override    public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {        Preconditions.checkNotNull(userCodeClassLoader);        CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(            jobVertexID,            executionAttemptID);        try {            SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit());            if (serializedInputSplit.isEmpty()) {                return null;            } else {                return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader);            }        } catch (Exception e) {            throw new InputSplitProviderException("Requesting the next input split failed.", e);        }    }}
  • RpcInputSplitProvider的getNextInputSplit方法主要是通过jobMasterGateway.requestNextInputSplit,像jobMaster请求nextInputSplit

JobMaster.requestNextInputSplit

flink-runtime_2.11-1.8.0-sources.jar!/org/apache/flink/runtime/jobmaster/JobMaster.java

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {    //......    @Override    public CompletableFuture<SerializedInputSplit> requestNextInputSplit(            final JobVertexID vertexID,            final ExecutionAttemptID executionAttempt) {        final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);        if (execution == null) {            // can happen when JobManager had already unregistered this execution upon on task failure,            // but TaskManager get some delay to aware of that situation            if (log.isDebugEnabled()) {                log.debug("Can not find Execution for attempt {}.", executionAttempt);            }            // but we should TaskManager be aware of this            return FutureUtils.completedExceptionally(new Exception("Can not find Execution for attempt " + executionAttempt));        }        final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);        if (vertex == null) {            log.error("Cannot find execution vertex for vertex ID {}.", vertexID);            return FutureUtils.completedExceptionally(new Exception("Cannot find execution vertex for vertex ID " + vertexID));        }        final InputSplitAssigner splitAssigner = vertex.getSplitAssigner();        if (splitAssigner == null) {            log.error("No InputSplitAssigner for vertex ID {}.", vertexID);            return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertex ID " + vertexID));        }        final LogicalSlot slot = execution.getAssignedResource();        final int taskId = execution.getVertex().getParallelSubtaskIndex();        final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;        final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);        if (log.isDebugEnabled()) {            log.debug("Send next input split {}.", nextInputSplit);        }        try {            final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);            return CompletableFuture.completedFuture(new SerializedInputSplit(serializedInputSplit));        } catch (Exception ex) {            log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);            IOException reason = new IOException("Could not serialize the next input split of class " +                    nextInputSplit.getClass() + ".", ex);            vertex.fail(reason);            return FutureUtils.completedExceptionally(reason);        }    }    //......}
  • JobMaster的requestNextInputSplit方法会通过splitAssigner.getNextInputSplit(host, taskId)来获取nextInputSplit,然后返回给请求的RpcInputSplitProvider

小结

  • ParameterValuesProvider接口定义了getParameterValues方法,用于返回并行表查询所需的参数,该参数主要是用于将一个大的sql查询分为几个分段查询用于并行处理;它有两个实现类分别是GenericParameterValuesProvider及NumericBetweenParametersProvider
  • GenericParameterValuesProvider实际上没有做其他事情,它实现的getParameterValues方法返回的值是构造器要求输入的;NumericBetweenParametersProvider为基于numeric主键的范围查询(WHERE id BETWEEN ? AND ?)自动生成了分段的参数,其构造器要求输入每次的fetchSize、最小值minVal、最大值maxVal;getParameterValues方法会根据这几个值计算numBatches,然后计算好分段的参数值
  • JDBCInputFormat继承了RichInputFormat,同时实现了ResultTypeQueryable接口;createInputSplits方法会根据parameterValues来创建GenericInputSplit数组,如果parameterValues为null则默认创建的totalNumberOfPartitions为1;getInputSplitAssigner方法根据InputSplit数组创建了DefaultInputSplitAssigner;getStatistics方法返回的是方法参数cachedStatistics;openInputFormat方法主要是获取数据库连接,准备好statement;closeInputFormat方法主要是关闭statement以及关闭数据库连接;open方法接收inputSplit,其主要是根据inputSplit从parameterValues提取查询参数,并设置到statement,之后执行statement.executeQuery()获取resultSet;nextRecord方法主要是遍历resultSet读取数据;close方法主要是关闭resultSet
  • InputSplit接口定义了getSplitNumber方法用于返回当前input的split number;GenericInputSplit实现了InputSplit接口,其getSplitNumber方法返回的是partitionNumber;InputSplitAssigner接口定义了getNextInputSplit方法,其方法接收两个参数分别是host及taskId,该方法用于返回下一个inputSplit;DefaultInputSplitAssigner是InputSplitAssigner的默认实现,其getNextInputSplit方法会使用synchronized修改splits值,移除最后一个元素
  • InputFormatSourceFunction的splitIterator的hasNext()方法会使用provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader())来获取nextInputSplit,其provider为RpcInputSplitProvider;InputSplitProvider接口定义了getNextInputSplit方法,用于给每个task调用获取它要处理的inputSplit;RpcInputSplitProvider的getNextInputSplit方法主要是通过jobMasterGateway.requestNextInputSplit,像jobMaster请求nextInputSplit;JobMaster的requestNextInputSplit方法会通过splitAssigner.getNextInputSplit(host, taskId)来获取nextInputSplit,然后返回给请求的RpcInputSplitProvider

doc

  • InputSplit
  • InputSplitProvider
  • RpcInputSplitProvider
  • InputSplitAssigner
  • DefaultInputSplitAssigner
  • ParameterValuesProvider
  • GenericParameterValuesProvider
  • NumericBetweenParametersProvider