乐趣区

聊聊flink的RichParallelSourceFunction


本文主要研究一下 flink 的 RichParallelSourceFunction
RichParallelSourceFunction
/**
* Base class for implementing a parallel data source. Upon execution, the runtime will
* execute as many parallel instances of this function function as configured parallelism
* of the source.
*
* <p>The data source has access to context information (such as the number of parallel
* instances of the source, and which parallel instance the current instance is)
* via {@link #getRuntimeContext()}. It also provides additional life-cycle methods
* ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.</p>
*
* @param <OUT> The type of the records produced by this source.
*/
@Public
public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
implements ParallelSourceFunction<OUT> {

private static final long serialVersionUID = 1L;
}
RichParallelSourceFunction 实现了 ParallelSourceFunction 接口,同时继承了 AbstractRichFunction
ParallelSourceFunction
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
/**
* A stream data source that is executed in parallel. Upon execution, the runtime will
* execute as many parallel instances of this function function as configured parallelism
* of the source.
*
* <p>This interface acts only as a marker to tell the system that this source may
* be executed in parallel. When different parallel instances are required to perform
* different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime
* context, which reveals information like the number of parallel tasks, and which parallel
* task the current instance is.
*
* @param <OUT> The type of the records produced by this source.
*/
@Public
public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
}
ParallelSourceFunction 继承了 SourceFunction 接口,它并没有定义其他额外的方法,仅仅是用接口名来表达意图,即可以被并行执行的 stream data source
AbstractRichFunction
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/functions/AbstractRichFunction.java
/**
* An abstract stub implementation for rich user-defined functions.
* Rich functions have additional methods for initialization ({@link #open(Configuration)}) and
* teardown ({@link #close()}), as well as access to their runtime execution context via
* {@link #getRuntimeContext()}.
*/
@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {

private static final long serialVersionUID = 1L;

// ——————————————————————————————–
// Runtime context access
// ——————————————————————————————–

private transient RuntimeContext runtimeContext;

@Override
public void setRuntimeContext(RuntimeContext t) {
this.runtimeContext = t;
}

@Override
public RuntimeContext getRuntimeContext() {
if (this.runtimeContext != null) {
return this.runtimeContext;
} else {
throw new IllegalStateException(“The runtime context has not been initialized.”);
}
}

@Override
public IterationRuntimeContext getIterationRuntimeContext() {
if (this.runtimeContext == null) {
throw new IllegalStateException(“The runtime context has not been initialized.”);
} else if (this.runtimeContext instanceof IterationRuntimeContext) {
return (IterationRuntimeContext) this.runtimeContext;
} else {
throw new IllegalStateException(“This stub is not part of an iteration step function.”);
}
}

// ——————————————————————————————–
// Default life cycle methods
// ——————————————————————————————–

@Override
public void open(Configuration parameters) throws Exception {}

@Override
public void close() throws Exception {}
}
AbstractRichFunction 主要实现了 RichFunction 接口的 setRuntimeContext、getRuntimeContext、getIterationRuntimeContext 方法;open 及 close 方法都是空操作
RuntimeContext
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/functions/RuntimeContext.java
/**
* A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
* of the function will have a context through which it can access static contextual information (such as
* the current parallelism) and other constructs like accumulators and broadcast variables.
*
* <p>A function can, during runtime, obtain the RuntimeContext via a call to
* {@link AbstractRichFunction#getRuntimeContext()}.
*/
@Public
public interface RuntimeContext {

/**
* Returns the name of the task in which the UDF runs, as assigned during plan construction.
*
* @return The name of the task in which the UDF runs.
*/
String getTaskName();

/**
* Returns the metric group for this parallel subtask.
*
* @return The metric group for this parallel subtask.
*/
@PublicEvolving
MetricGroup getMetricGroup();

/**
* Gets the parallelism with which the parallel task runs.
*
* @return The parallelism with which the parallel task runs.
*/
int getNumberOfParallelSubtasks();

/**
* Gets the number of max-parallelism with which the parallel task runs.
*
* @return The max-parallelism with which the parallel task runs.
*/
@PublicEvolving
int getMaxNumberOfParallelSubtasks();

/**
* Gets the number of this parallel subtask. The numbering starts from 0 and goes up to
* parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
*
* @return The index of the parallel subtask.
*/
int getIndexOfThisSubtask();

/**
* Gets the attempt number of this parallel subtask. First attempt is numbered 0.
*
* @return Attempt number of the subtask.
*/
int getAttemptNumber();

/**
* Returns the name of the task, appended with the subtask indicator, such as “MyTask (3/6)”,
* where 3 would be ({@link #getIndexOfThisSubtask()} + 1), and 6 would be
* {@link #getNumberOfParallelSubtasks()}.
*
* @return The name of the task, with subtask indicator.
*/
String getTaskNameWithSubtasks();

/**
* Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing
* job.
*/
ExecutionConfig getExecutionConfig();

//…….
}
RuntimeContext 定义了很多方法,这里我们看下 getNumberOfParallelSubtasks 方法,它可以返回当前的 task 的 parallelism;而 getIndexOfThisSubtask 则可以获取当前 parallel subtask 的下标;可以根据这些信息,开发既能并行执行但各自发射的数据又不重复的 ParallelSourceFunction
JobMaster.startJobExecution
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/jobmaster/JobMaster.java
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
validateRunsInMainThread();

checkNotNull(newJobMasterId, “The new JobMasterId must not be null.”);

if (Objects.equals(getFencingToken(), newJobMasterId)) {
log.info(“Already started the job execution with JobMasterId {}.”, newJobMasterId);

return Acknowledge.get();
}

setNewFencingToken(newJobMasterId);

startJobMasterServices();

log.info(“Starting execution of job {} ({})”, jobGraph.getName(), jobGraph.getJobID());

resetAndScheduleExecutionGraph();

return Acknowledge.get();
}

private void resetAndScheduleExecutionGraph() throws Exception {
validateRunsInMainThread();

final CompletableFuture<Void> executionGraphAssignedFuture;

if (executionGraph.getState() == JobStatus.CREATED) {
executionGraphAssignedFuture = CompletableFuture.completedFuture(null);
} else {
suspendAndClearExecutionGraphFields(new FlinkException(“ExecutionGraph is being reset in order to be rescheduled.”));
final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
final ExecutionGraph newExecutionGraph = createAndRestoreExecutionGraph(newJobManagerJobMetricGroup);

executionGraphAssignedFuture = executionGraph.getTerminationFuture().handleAsync(
(JobStatus ignored, Throwable throwable) -> {
assignExecutionGraph(newExecutionGraph, newJobManagerJobMetricGroup);
return null;
},
getMainThreadExecutor());
}

executionGraphAssignedFuture.thenRun(this::scheduleExecutionGraph);
}

private void scheduleExecutionGraph() {
checkState(jobStatusListener == null);
// register self as job status change listener
jobStatusListener = new JobManagerJobStatusListener();
executionGraph.registerJobStatusListener(jobStatusListener);

try {
executionGraph.scheduleForExecution();
}
catch (Throwable t) {
executionGraph.failGlobal(t);
}
}
这里调用了 resetAndScheduleExecutionGraph 方法,而 resetAndScheduleExecutionGraph 则组合了 scheduleExecutionGraph 方法;scheduleExecutionGraph 这里调用 executionGraph.scheduleForExecution()来调度执行
ExecutionGraph.scheduleForExecution
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
public void scheduleForExecution() throws JobException {

final long currentGlobalModVersion = globalModVersion;

if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {

final CompletableFuture<Void> newSchedulingFuture;

switch (scheduleMode) {

case LAZY_FROM_SOURCES:
newSchedulingFuture = scheduleLazy(slotProvider);
break;

case EAGER:
newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
break;

default:
throw new JobException(“Schedule mode is invalid.”);
}

if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
schedulingFuture = newSchedulingFuture;

newSchedulingFuture.whenCompleteAsync(
(Void ignored, Throwable throwable) -> {
if (throwable != null && !(throwable instanceof CancellationException)) {
// only fail if the scheduling future was not canceled
failGlobal(ExceptionUtils.stripCompletionException(throwable));
}
},
futureExecutor);
} else {
newSchedulingFuture.cancel(false);
}
}
else {
throw new IllegalStateException(“Job may only be scheduled from state ” + JobStatus.CREATED);
}
}
这里走的是 EAGER 模式,因而调用 scheduleEager 方法
ExecutionGraph.scheduleEager
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
/**
*
*
* @param slotProvider The resource provider from which the slots are allocated
* @param timeout The maximum time that the deployment may take, before a
* TimeoutException is thrown.
* @returns Future which is completed once the {@link ExecutionGraph} has been scheduled.
* The future can also be completed exceptionally if an error happened.
*/
private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) {
checkState(state == JobStatus.RUNNING, “job is not running currently”);

// Important: reserve all the space we need up front.
// that way we do not have any operation that can fail between allocating the slots
// and adding them to the list. If we had a failure in between there, that would
// cause the slots to get lost
final boolean queued = allowQueuedScheduling;

// collecting all the slots may resize and fail in that operation without slots getting lost
final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());

// allocate the slots (obtain all their futures
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
// these calls are not blocking, they only return futures
Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
slotProvider,
queued,
LocationPreferenceConstraint.ALL,
allocationTimeout);

allAllocationFutures.addAll(allocationFutures);
}

// this future is complete once all slot futures are complete.
// the future fails once one slot future fails.
final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);

final CompletableFuture<Void> currentSchedulingFuture = allAllocationsFuture
.thenAccept(
(Collection<Execution> executionsToDeploy) -> {
for (Execution execution : executionsToDeploy) {
try {
execution.deploy();
} catch (Throwable t) {
throw new CompletionException(
new FlinkException(
String.format(“Could not deploy execution %s.”, execution),
t));
}
}
})
// Generate a more specific failure message for the eager scheduling
.exceptionally(
(Throwable throwable) -> {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
final Throwable resultThrowable;

if (strippedThrowable instanceof TimeoutException) {
int numTotal = allAllocationsFuture.getNumFuturesTotal();
int numComplete = allAllocationsFuture.getNumFuturesCompleted();
String message = “Could not allocate all requires slots within timeout of ” +
timeout + “. Slots required: ” + numTotal + “, slots allocated: ” + numComplete;

resultThrowable = new NoResourceAvailableException(message);
} else {
resultThrowable = strippedThrowable;
}

throw new CompletionException(resultThrowable);
});

return currentSchedulingFuture;
}

scheduleEager 方法这里先调用 getVerticesTopologically 来获取 ExecutionJobVertex
之后调用 ExecutionJobVertex.allocateResourcesForAll 来分配资源得到 Collection<CompletableFuture<Execution>>
最后通过 FutureUtils.combineAll(allAllocationFutures)等待这批 Future,之后挨个调用 execution.deploy()进行部署

ExecutionJobVertex.allocateResourcesForAll
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
/**
* Acquires a slot for all the execution vertices of this ExecutionJobVertex. The method returns
* pairs of the slots and execution attempts, to ease correlation between vertices and execution
* attempts.
*
* <p>If this method throws an exception, it makes sure to release all so far requested slots.
*
* @param resourceProvider The resource provider from whom the slots are requested.
* @param queued if the allocation can be queued
* @param locationPreferenceConstraint constraint for the location preferences
* @param allocationTimeout timeout for allocating the individual slots
*/
public Collection<CompletableFuture<Execution>> allocateResourcesForAll(
SlotProvider resourceProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint,
Time allocationTimeout) {
final ExecutionVertex[] vertices = this.taskVertices;
final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length];

// try to acquire a slot future for each execution.
// we store the execution with the future just to be on the safe side
for (int i = 0; i < vertices.length; i++) {
// allocate the next slot (future)
final Execution exec = vertices[i].getCurrentExecutionAttempt();
final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(
resourceProvider,
queued,
locationPreferenceConstraint,
allocationTimeout);
slots[i] = allocationFuture;
}

// all good, we acquired all slots
return Arrays.asList(slots);
}
这里根据 ExecutionJobVertex 的 taskVertices 来挨个调用 exec.allocateAndAssignSlotForExecution 进行分配;可以发现整个并行度由 taskVertices 来决定
Execution.deploy
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/Execution.java
/**
* Deploys the execution to the previously assigned resource.
*
* @throws JobException if the execution cannot be deployed to the assigned resource
*/
public void deploy() throws JobException {
final LogicalSlot slot = assignedResource;

checkNotNull(slot, “In order to deploy the execution we first have to assign a resource via tryAssignResource.”);

//……

try {

//……

final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
attemptId,
slot,
taskRestore,
attemptNumber);

// null taskRestore to let it be GC’ed
taskRestore = null;

final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout);

submitResultFuture.whenCompleteAsync(
(ack, failure) -> {
// only respond to the failure case
if (failure != null) {
if (failure instanceof TimeoutException) {
String taskname = vertex.getTaskNameWithSubtaskIndex() + ” (” + attemptId + ‘)’;

markFailed(new Exception(
“Cannot deploy task ” + taskname + ” – TaskManager (” + getAssignedResourceLocation()
+ “) not responding after a rpcTimeout of ” + rpcTimeout, failure));
} else {
markFailed(failure);
}
}
},
executor);
}
catch (Throwable t) {
markFailed(t);
ExceptionUtils.rethrow(t);
}
}
Execution.deploy 会创建 TaskDeploymentDescriptor,之后通过 taskManagerGateway.submitTask 提交这个 deployment;之后就是触发 TaskExecutor 去触发 Task 的 run 方法
ExecutionJobVertex
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
private final ExecutionVertex[] taskVertices;

public ExecutionJobVertex(
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
Time timeout,
long initialGlobalModVersion,
long createTimestamp) throws JobException {

if (graph == null || jobVertex == null) {
throw new NullPointerException();
}

this.graph = graph;
this.jobVertex = jobVertex;

int vertexParallelism = jobVertex.getParallelism();
int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;

final int configuredMaxParallelism = jobVertex.getMaxParallelism();

this.maxParallelismConfigured = (VALUE_NOT_SET != configuredMaxParallelism);

// if no max parallelism was configured by the user, we calculate and set a default
setMaxParallelismInternal(maxParallelismConfigured ?
configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices));

// verify that our parallelism is not higher than the maximum parallelism
if (numTaskVertices > maxParallelism) {
throw new JobException(
String.format(“Vertex %s’s parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.”,
jobVertex.getName(),
numTaskVertices,
maxParallelism));
}

this.parallelism = numTaskVertices;

this.serializedTaskInformation = null;

this.taskVertices = new ExecutionVertex[numTaskVertices];
//……

// create all task vertices
for (int i = 0; i < numTaskVertices; i++) {
ExecutionVertex vertex = new ExecutionVertex(
this,
i,
producedDataSets,
timeout,
initialGlobalModVersion,
createTimestamp,
maxPriorAttemptsHistoryLength);

this.taskVertices[i] = vertex;
}

//……
}

taskVertices 是一个 ExecutionVertex[],它的大小由 numTaskVertices 决定
ExecutionJobVertex 先判断 jobVertex.getParallelism()是否大于 0(一般大于 0),大于 0 则取 jobVertex.getParallelism()的值为 numTaskVertices;如果不大于 0 则取 defaultParallelism(ExecutionGraph 的 attachJobGraph 方法里头创建 ExecutionJobVertex 时,传递的 defaultParallelism 为 1)
之后就是根据 numTaskVertices 挨个创建 ExecutionVertex,放入到 taskVertices 数据中
而 jobVertex 的 parallelism 是 StreamingJobGraphGenerator 在 createJobVertex 方法中根据 streamNode.getParallelism()来设置的 (如果 streamNode.getParallelism() 的值大于 0 的话)
streamNode 的 parallelism 如果自己没有设置,则默认是取 StreamExecutionEnvironment 的 parallelism(详见 DataStreamSource 的构造器、DataStream.transform 方法、DataStreamSink 的构造器;DataStreamSource 里头会将不是 parallel 类型的 source 的 parallelism 重置为 1);如果是 LocalEnvironment 的话,它默认是取 Runtime.getRuntime().availableProcessors()

小结

RichParallelSourceFunction 实现了 ParallelSourceFunction 接口,同时继承了 AbstractRichFunction;AbstractRichFunction 主要实现了 RichFunction 接口的 setRuntimeContext、getRuntimeContext、getIterationRuntimeContext 方法;RuntimeContext 定义的 getNumberOfParallelSubtasks 方法 (返回当前的 task 的 parallelism) 以及 getIndexOfThisSubtask(获取当前 parallel subtask 的下标)方法,可以方便开发既能并行执行但各自发射的数据又不重复的 ParallelSourceFunction
JobMaster 在 startJobExecution 的时候调用 executionGraph.scheduleForExecution()进行调度;期间通过 ExecutionJobVertex.allocateResourcesForAll 来分配资源得到 Collection<CompletableFuture<Execution>>,之后挨个执行 execution.deploy()进行部署;Execution.deploy 会创建 TaskDeploymentDescriptor,之后通过 taskManagerGateway.submitTask 提交这个 deployment;之后就是触发 TaskExecutor 去触发 Task 的 run 方法
ExecutionJobVertex.allocateResourcesForAll 是根据 ExecutionJobVertex 的 taskVertices 来挨个调用 exec.allocateAndAssignSlotForExecution 进行分配,整个并行度由 taskVertices 来决定;而 taskVertices 是在 ExecutionJobVertex 构造器里头初始化的,如果 jobVertex.getParallelism()大于 0 则取该值,否则取 defaultParallelism 为 1;而 jobVertex 的 parallelism 是 StreamingJobGraphGenerator 在 createJobVertex 方法中根据 streamNode.getParallelism()来设置 (如果 streamNode.getParallelism() 的值大于 0 的话),如果用户没有设置则默认是取 StreamExecutionEnvironment 的 parallelism;LocalEnvironment 的话,它默认是取 Runtime.getRuntime().availableProcessors()

doc
RichParallelSourceFunction

退出移动版