序本文主要研究一下flink的CheckpointSchedulerCheckpointCoordinatorDeActivatorflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java/** * This actor listens to changes in the JobStatus and activates or deactivates the periodic * checkpoint scheduler. /public class CheckpointCoordinatorDeActivator implements JobStatusListener { private final CheckpointCoordinator coordinator; public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) { this.coordinator = checkNotNull(coordinator); } @Override public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { if (newJobStatus == JobStatus.RUNNING) { // start the checkpoint scheduler coordinator.startCheckpointScheduler(); } else { // anything else should stop the trigger for now coordinator.stopCheckpointScheduler(); } }}CheckpointCoordinatorDeActivator实现了JobStatusListener接口,在jobStatusChanges的时候,根据状态来调用coordinator.startCheckpointScheduler或者coordinator.stopCheckpointSchedulerCheckpointCoordinator.ScheduledTriggerflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java/* * The checkpoint coordinator coordinates the distributed snapshots of operators and state. * It triggers the checkpoint by sending the messages to the relevant tasks and collects the * checkpoint acknowledgements. It also collects and maintains the overview of the state handles * reported by the tasks that acknowledge the checkpoint. /public class CheckpointCoordinator { /* Map from checkpoint ID to the pending checkpoint / private final Map<Long, PendingCheckpoint> pendingCheckpoints; /* The number of consecutive failed trigger attempts / private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0); //…… public void startCheckpointScheduler() { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException(“Checkpoint coordinator is shut down”); } // make sure all prior timers are cancelled stopCheckpointScheduler(); periodicScheduling = true; long initialDelay = ThreadLocalRandom.current().nextLong( minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L); currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS); } } public void stopCheckpointScheduler() { synchronized (lock) { triggerRequestQueued = false; periodicScheduling = false; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } for (PendingCheckpoint p : pendingCheckpoints.values()) { p.abortError(new Exception(“Checkpoint Coordinator is suspending.”)); } pendingCheckpoints.clear(); numUnsuccessfulCheckpointsTriggers.set(0); } } private final class ScheduledTrigger implements Runnable { @Override public void run() { try { triggerCheckpoint(System.currentTimeMillis(), true); } catch (Exception e) { LOG.error(“Exception while triggering checkpoint for job {}.”, job, e); } } } //……}CheckpointCoordinator的startCheckpointScheduler方法首先调用stopCheckpointScheduler取消PendingCheckpoint,之后使用timer.scheduleAtFixedRate重新调度ScheduledTriggerstopCheckpointScheduler会调用PendingCheckpoint.abortError来取消pendingCheckpoints,然后清空pendingCheckpoints(Map<Long, PendingCheckpoint>)以及numUnsuccessfulCheckpointsTriggers(AtomicInteger)ScheduledTrigger实现了Runnable接口,其run方法主要是调用triggerCheckpoint,传递的isPeriodic参数为trueCheckpointCoordinator.triggerCheckpointflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java/* * The checkpoint coordinator coordinates the distributed snapshots of operators and state. * It triggers the checkpoint by sending the messages to the relevant tasks and collects the * checkpoint acknowledgements. It also collects and maintains the overview of the state handles * reported by the tasks that acknowledge the checkpoint. /public class CheckpointCoordinator { /* Tasks who need to be sent a message when a checkpoint is started / private final ExecutionVertex[] tasksToTrigger; /* Tasks who need to acknowledge a checkpoint before it succeeds / private final ExecutionVertex[] tasksToWaitFor; /* Map from checkpoint ID to the pending checkpoint / private final Map<Long, PendingCheckpoint> pendingCheckpoints; /* The maximum number of checkpoints that may be in progress at the same time / private final int maxConcurrentCheckpointAttempts; /* The min time(in ns) to delay after a checkpoint could be triggered. Allows to * enforce minimum processing time between checkpoint attempts / private final long minPauseBetweenCheckpointsNanos; /* * Triggers a new standard checkpoint and uses the given timestamp as the checkpoint * timestamp. * * @param timestamp The timestamp for the checkpoint. * @param isPeriodic Flag indicating whether this triggered checkpoint is * periodic. If this flag is true, but the periodic scheduler is disabled, * the checkpoint will be declined. * @return <code>true</code> if triggering the checkpoint succeeded. / public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic).isSuccess(); } @VisibleForTesting public CheckpointTriggerResult triggerCheckpoint( long timestamp, CheckpointProperties props, @Nullable String externalSavepointLocation, boolean isPeriodic) { // make some eager pre-checks synchronized (lock) { // abort if the coordinator has been shutdown in the meantime if (shutdown) { return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); } // Don’t allow periodic checkpoint if scheduling has been disabled if (isPeriodic && !periodicScheduling) { return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN); } // validate whether the checkpoint can be triggered, with respect to the limit of // concurrent checkpoints, and the minimum time between checkpoints. // these checks are not relevant for savepoints if (!props.forceCheckpoint()) { // sanity check: there should never be more than one trigger request queued if (triggerRequestQueued) { LOG.warn(“Trying to trigger another checkpoint for job {} while one was queued already.”, job); return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); } // if too many checkpoints are currently in progress, we need to mark that a request is queued if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); } // make sure the minimum interval between checkpoints has passed final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos; final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000; if (durationTillNextMillis > 0) { if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } // Reassign the new trigger to the currentPeriodicTrigger currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } } // check if all tasks that we need to trigger are running. // if not, abort the checkpoint Execution[] executions = new Execution[tasksToTrigger.length]; for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); if (ee == null) { LOG.info(“Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.”, tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } else if (ee.getState() == ExecutionState.RUNNING) { executions[i] = ee; } else { LOG.info(“Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.”, tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job, ExecutionState.RUNNING, ee.getState()); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } // next, check if all tasks that need to acknowledge the checkpoint are running. // if not, abort the checkpoint Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length); for (ExecutionVertex ev : tasksToWaitFor) { Execution ee = ev.getCurrentExecutionAttempt(); if (ee != null) { ackTasks.put(ee.getAttemptId(), ev); } else { LOG.info(“Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.”, ev.getTaskNameWithSubtaskIndex(), job); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } // we will actually trigger this checkpoint! // we lock with a special lock to make sure that trigger requests do not overtake each other. // this is not done with the coordinator-wide lock, because the ‘checkpointIdCounter’ // may issue blocking operations. Using a different lock than the coordinator-wide lock, // we avoid blocking the processing of ‘acknowledge/decline’ messages during that time. synchronized (triggerLock) { final CheckpointStorageLocation checkpointStorageLocation; final long checkpointID; try { // this must happen outside the coordinator-wide lock, because it communicates // with external services (in HA mode) and may block for a while. checkpointID = checkpointIdCounter.getAndIncrement(); checkpointStorageLocation = props.isSavepoint() ? checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) : checkpointStorage.initializeLocationForCheckpoint(checkpointID); } catch (Throwable t) { int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); LOG.warn(“Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).”, job, numUnsuccessful, t); return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } final PendingCheckpoint checkpoint = new PendingCheckpoint( job, checkpointID, timestamp, ackTasks, props, checkpointStorageLocation, executor); if (statsTracker != null) { PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint( checkpointID, timestamp, props); checkpoint.setStatsCallback(callback); } // schedule the timer that will clean up the expired checkpoints final Runnable canceller = () -> { synchronized (lock) { // only do the work if the checkpoint is not discarded anyways // note that checkpoint completion discards the pending checkpoint object if (!checkpoint.isDiscarded()) { LOG.info(“Checkpoint {} of job {} expired before completing.”, checkpointID, job); checkpoint.abortExpired(); pendingCheckpoints.remove(checkpointID); rememberRecentCheckpointId(checkpointID); triggerQueuedRequests(); } } }; try { // re-acquire the coordinator-wide lock synchronized (lock) { // since we released the lock in the meantime, we need to re-check // that the conditions still hold. if (shutdown) { return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); } else if (!props.forceCheckpoint()) { if (triggerRequestQueued) { LOG.warn(“Trying to trigger another checkpoint for job {} while one was queued already.”, job); return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); } if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); } // make sure the minimum interval between checkpoints has passed final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos; final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000; if (durationTillNextMillis > 0) { if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } // Reassign the new trigger to the currentPeriodicTrigger currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } LOG.info(“Triggering checkpoint {} @ {} for job {}.”, checkpointID, timestamp, job); pendingCheckpoints.put(checkpointID, checkpoint); ScheduledFuture<?> cancellerHandle = timer.schedule( canceller, checkpointTimeout, TimeUnit.MILLISECONDS); if (!checkpoint.setCancellerHandle(cancellerHandle)) { // checkpoint is already disposed! cancellerHandle.cancel(false); } // trigger the master hooks for the checkpoint final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(), checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout)); for (MasterState s : masterStates) { checkpoint.addMasterState(s); } } // end of lock scope final CheckpointOptions checkpointOptions = new CheckpointOptions( props.getCheckpointType(), checkpointStorageLocation.getLocationReference()); // send the messages to the tasks that trigger their checkpoint for (Execution execution: executions) { execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); } numUnsuccessfulCheckpointsTriggers.set(0); return new CheckpointTriggerResult(checkpoint); } catch (Throwable t) { // guard the map against concurrent modifications synchronized (lock) { pendingCheckpoints.remove(checkpointID); } int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); LOG.warn(“Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)”, checkpointID, job, numUnsuccessful, t); if (!checkpoint.isDiscarded()) { checkpoint.abortError(new Exception(“Failed to trigger checkpoint”, t)); } try { checkpointStorageLocation.disposeOnFailure(); } catch (Throwable t2) { LOG.warn(“Cannot dispose failed checkpoint storage location {}”, checkpointStorageLocation, t2); } return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } } // end trigger lock } //……}首先判断如果不是forceCheckpoint的话,则判断当前的pendingCheckpoints值是否超过maxConcurrentCheckpointAttempts,超过的话,立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);之后判断距离lastCheckpointCompletionNanos的时间是否大于等于minPauseBetweenCheckpointsNanos,否则fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS),确保checkpoint不被频繁触发之后检查tasksToTrigger的任务(触发checkpoint的时候需要通知到的task)是否都处于RUNNING状态,不是的话则立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING)之后检查tasksToWaitFor的任务(需要在执行成功的时候ack checkpoint的任务)是否都处于RUNNING状态,不是的话立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING)前面几步检查通过了之后才开始真正的checkpoint的触发,它首先分配一个checkpointID,然后初始化checkpointStorageLocation,如果异常则返回CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);之后创建PendingCheckpoint,同时准备canceller(用于在失效的时候执行abort操作);之后对于不是forceCheckpoint的,再重新来一轮TOO_MANY_CONCURRENT_CHECKPOINTS、MINIMUM_TIME_BETWEEN_CHECKPOINTS校验最后就是针对Execution,挨个触发execution的triggerCheckpoint操作,成功返回CheckpointTriggerResult(checkpoint),异常则返回CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION)Execution.triggerCheckpointflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/executiongraph/Execution.javapublic class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload { /* * Trigger a new checkpoint on the task of this execution. * * @param checkpointId of th checkpoint to trigger * @param timestamp of the checkpoint to trigger * @param checkpointOptions of the checkpoint to trigger / public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { final LogicalSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions); } else { LOG.debug(“The execution has no slot assigned. This indicates that the execution is " + “no longer running.”); } } //……}triggerCheckpoint主要是调用taskManagerGateway.triggerCheckpoint,这里的taskManagerGateway为RpcTaskManagerGatewayRpcTaskManagerGatewayflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java/* * Implementation of the {@link TaskManagerGateway} for Flink’s RPC system. /public class RpcTaskManagerGateway implements TaskManagerGateway { private final TaskExecutorGateway taskExecutorGateway; public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { taskExecutorGateway.triggerCheckpoint( executionAttemptID, checkpointId, timestamp, checkpointOptions); } //……}RpcTaskManagerGateway的triggerCheckpoint方法调用taskExecutorGateway.triggerCheckpoint,这里的taskExecutorGateway为AkkaInvocationHandler,通过rpc通知TaskExecutorTaskExecutor.triggerCheckpointflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskExecutor.java/* * TaskExecutor implementation. The task executor is responsible for the execution of multiple * {@link Task}. /public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { public CompletableFuture<Acknowledge> triggerCheckpoint( ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) { log.debug(“Trigger checkpoint {}@{} for {}.”, checkpointId, checkpointTimestamp, executionAttemptID); final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions); return CompletableFuture.completedFuture(Acknowledge.get()); } else { final String message = “TaskManager received a checkpoint request for unknown task " + executionAttemptID + ‘.’; log.debug(message); return FutureUtils.completedExceptionally(new CheckpointException(message)); } } //……}TaskExecutor的triggerCheckpoint方法这里调用task.triggerCheckpointBarrierTask.triggerCheckpointBarrierflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.javapublic class Task implements Runnable, TaskActions, CheckpointListener { /* The invokable of this task, if initialized. All accesses must copy the reference and * check for null, as this field is cleared as part of the disposal logic. / @Nullable private volatile AbstractInvokable invokable; /* * Calls the invokable to trigger a checkpoint. * * @param checkpointID The ID identifying the checkpoint. * @param checkpointTimestamp The timestamp associated with the checkpoint. * @param checkpointOptions Options for performing this checkpoint. / public void triggerCheckpointBarrier( final long checkpointID, long checkpointTimestamp, final CheckpointOptions checkpointOptions) { final AbstractInvokable invokable = this.invokable; final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp); if (executionState == ExecutionState.RUNNING && invokable != null) { // build a local closure final String taskName = taskNameWithSubtask; final SafetyNetCloseableRegistry safetyNetCloseableRegistry = FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread(); Runnable runnable = new Runnable() { @Override public void run() { // set safety net from the task’s context for checkpointing thread LOG.debug(“Creating FileSystem stream leak safety net for {}”, Thread.currentThread().getName()); FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); try { boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions); if (!success) { checkpointResponder.declineCheckpoint( getJobID(), getExecutionId(), checkpointID, new CheckpointDeclineTaskNotReadyException(taskName)); } } catch (Throwable t) { if (getExecutionState() == ExecutionState.RUNNING) { failExternally(new Exception( “Error while triggering checkpoint " + checkpointID + " for " + taskNameWithSubtask, t)); } else { LOG.debug(“Encountered error while triggering checkpoint {} for " + “{} ({}) while being not in state running.”, checkpointID, taskNameWithSubtask, executionId, t); } } finally { FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null); } } }; executeAsyncCallRunnable(runnable, String.format(“Checkpoint Trigger for %s (%s).”, taskNameWithSubtask, executionId)); } else { LOG.debug(“Declining checkpoint request for non-running task {} ({}).”, taskNameWithSubtask, executionId); // send back a message that we did not do the checkpoint checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask)); } } //……}Task的triggerCheckpointBarrier方法首先判断executionState是否RUNNING以及invokable是否不为null,不满足条件则执行checkpointResponder.declineCheckpoint满足条件则执行executeAsyncCallRunnable(runnable, String.format(“Checkpoint Trigger for %s (%s).”, taskNameWithSubtask, executionId))这个runnable方法里头会执行invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions),这里的invokable为SourceStreamTaskSourceStreamTask.triggerCheckpointflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java@Internalpublic class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>> extends StreamTask<OUT, OP> { private volatile boolean externallyInducedCheckpoints; @Override public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { if (!externallyInducedCheckpoints) { return super.triggerCheckpoint(checkpointMetaData, checkpointOptions); } else { // we do not trigger checkpoints here, we simply state whether we can trigger them synchronized (getCheckpointLock()) { return isRunning(); } } } //……}SourceStreamTask的triggerCheckpoint先判断,如果externallyInducedCheckpoints为false,则调用父类StreamTask的triggerCheckpointStreamTask.triggerCheckpoint@Internalpublic abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler { @Override public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { try { // No alignment if we inject a checkpoint CheckpointMetrics checkpointMetrics = new CheckpointMetrics() .setBytesBufferedInAlignment(0L) .setAlignmentDurationNanos(0L); return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics); } catch (Exception e) { // propagate exceptions only if the task is still in “running” state if (isRunning) { throw new Exception(“Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + ‘.’, e); } else { LOG.debug(“Could not perform checkpoint {} for operator {} while the " + “invokable was not in state running.”, checkpointMetaData.getCheckpointId(), getName(), e); return false; } } } private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { LOG.debug(“Starting checkpoint ({}) {} on task {}”, checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName()); synchronized (lock) { if (isRunning) { // we can do a checkpoint // All of the following steps happen as an atomic step from the perspective of barriers and // records/watermarks/timers/callbacks. // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream // checkpoint alignments // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work. // The pre-barrier work should be nothing or minimal in the common case. operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId()); // Step (2): Send the checkpoint barrier downstream operatorChain.broadcastCheckpointBarrier( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions); // Step (3): Take the state snapshot. This should be largely asynchronous, to not // impact progress of the streaming topology checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); return true; } else { // we cannot perform our checkpoint - let the downstream operators know that they // should not wait for any input from this operator // we cannot broadcast the cancellation markers on the ‘operator chain’, because it may not // yet be created final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()); Exception exception = null; for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) { try { streamRecordWriter.broadcastEvent(message); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed( new Exception(“Could not send cancel checkpoint marker to downstream tasks.”, e), exception); } } if (exception != null) { throw exception; } return false; } } } private void checkpointState( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation( checkpointMetaData.getCheckpointId(), checkpointOptions.getTargetLocation()); CheckpointingOperation checkpointingOperation = new CheckpointingOperation( this, checkpointMetaData, checkpointOptions, storage, checkpointMetrics); checkpointingOperation.executeCheckpointing(); } //……}StreamTask的triggerCheckpoint方法的主要处理逻辑在performCheckpoint方法上,该方法针对task的isRunning分别进行不同处理isRunning为true的时候,这里头分了三步来处理,第一步执行operatorChain.prepareSnapshotPreBarrier,第二步执行operatorChain.broadcastCheckpointBarrier,第三步执行checkpointState方法,checkpointState里头创建CheckpointingOperation,然后调用checkpointingOperation.executeCheckpointing()如果isRunning为false,则这里streamRecordWriter.broadcastEvent(message),这里的message为CancelCheckpointMarkerOperatorChain.prepareSnapshotPreBarrierflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java@Internalpublic class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer { public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { // go forward through the operator chain and tell each operator // to prepare the checkpoint final StreamOperator<?>[] operators = this.allOperators; for (int i = operators.length - 1; i >= 0; –i) { final StreamOperator<?> op = operators[i]; if (op != null) { op.prepareSnapshotPreBarrier(checkpointId); } } } //……}OperatorChain的prepareSnapshotPreBarrier会遍历allOperators挨个调用StreamOperator的prepareSnapshotPreBarrier方法OperatorChain.broadcastCheckpointBarrierflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java@Internalpublic class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer { public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException { CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions); for (RecordWriterOutput<?> streamOutput : streamOutputs) { streamOutput.broadcastEvent(barrier); } } //……}OperatorChain的broadcastCheckpointBarrier方法则会遍历streamOutputs挨个调用streamOutput的broadcastEvent方法CheckpointingOperation.executeCheckpointingflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java private static final class CheckpointingOperation { private final StreamTask<?, ?> owner; private final CheckpointMetaData checkpointMetaData; private final CheckpointOptions checkpointOptions; private final CheckpointMetrics checkpointMetrics; private final CheckpointStreamFactory storageLocation; private final StreamOperator<?>[] allOperators; private long startSyncPartNano; private long startAsyncPartNano; // ———————— private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress; public CheckpointingOperation( StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStorageLocation, CheckpointMetrics checkpointMetrics) { this.owner = Preconditions.checkNotNull(owner); this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData); this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions); this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics); this.storageLocation = Preconditions.checkNotNull(checkpointStorageLocation); this.allOperators = owner.operatorChain.getAllOperators(); this.operatorSnapshotsInProgress = new HashMap<>(allOperators.length); } public void executeCheckpointing() throws Exception { startSyncPartNano = System.nanoTime(); try { for (StreamOperator<?> op : allOperators) { checkpointStreamOperator(op); } if (LOG.isDebugEnabled()) { LOG.debug(“Finished synchronous checkpoints for checkpoint {} on task {}”, checkpointMetaData.getCheckpointId(), owner.getName()); } startAsyncPartNano = System.nanoTime(); checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000); // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable( owner, operatorSnapshotsInProgress, checkpointMetaData, checkpointMetrics, startAsyncPartNano); owner.cancelables.registerCloseable(asyncCheckpointRunnable); owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable); if (LOG.isDebugEnabled()) { LOG.debug(”{} - finished synchronous part of checkpoint {}. " + “Alignment duration: {} ms, snapshot duration {} ms”, owner.getName(), checkpointMetaData.getCheckpointId(), checkpointMetrics.getAlignmentDurationNanos() / 1_000_000, checkpointMetrics.getSyncDurationMillis()); } } catch (Exception ex) { // Cleanup to release resources for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) { if (null != operatorSnapshotResult) { try { operatorSnapshotResult.cancel(); } catch (Exception e) { LOG.warn(“Could not properly cancel an operator snapshot result.”, e); } } } if (LOG.isDebugEnabled()) { LOG.debug(”{} - did NOT finish synchronous part of checkpoint {}. " + “Alignment duration: {} ms, snapshot duration {} ms”, owner.getName(), checkpointMetaData.getCheckpointId(), checkpointMetrics.getAlignmentDurationNanos() / 1_000_000, checkpointMetrics.getSyncDurationMillis()); } owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex); } } @SuppressWarnings(“deprecation”) private void checkpointStreamOperator(StreamOperator<?> op) throws Exception { if (null != op) { OperatorSnapshotFutures snapshotInProgress = op.snapshotState( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions, storageLocation); operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress); } } private enum AsyncCheckpointState { RUNNING, DISCARDED, COMPLETED } }CheckpointingOperation定义在StreamTask类里头,executeCheckpointing方法先对所有的StreamOperator执行checkpointStreamOperator操作,checkpointStreamOperator方法会调用StreamOperator的snapshotState方法,之后创建AsyncCheckpointRunnable任务并提交异步运行AbstractStreamOperator.snapshotStateflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java@PublicEvolvingpublic abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, Serializable { @Override public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws Exception { KeyGroupRange keyGroupRange = null != keyedStateBackend ? keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures(); try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl( checkpointId, timestamp, factory, keyGroupRange, getContainingTask().getCancelables())) { snapshotState(snapshotContext); snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture()); if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } if (null != keyedStateBackend) { snapshotInProgress.setKeyedStateManagedFuture( keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } } catch (Exception snapshotException) { try { snapshotInProgress.cancel(); } catch (Exception e) { snapshotException.addSuppressed(e); } String snapshotFailMessage = “Could not complete snapshot " + checkpointId + " for operator " + getOperatorName() + “.”; if (!getContainingTask().isCanceled()) { LOG.info(snapshotFailMessage, snapshotException); } throw new Exception(snapshotFailMessage, snapshotException); } return snapshotInProgress; } /* * Stream operators with state, which want to participate in a snapshot need to override this hook method. * * @param context context that provides information and means required for taking a snapshot */ public void snapshotState(StateSnapshotContext context) throws Exception { final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend(); //TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots if (keyedStateBackend instanceof AbstractKeyedStateBackend && ((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) { KeyedStateCheckpointOutputStream out; try { out = context.getRawKeyedOperatorStateOutput(); } catch (Exception exception) { throw new Exception(“Could not open raw keyed operator state stream for " + getOperatorName() + ‘.’, exception); } try { KeyGroupsList allKeyGroups = out.getKeyGroupList(); for (int keyGroupIdx : allKeyGroups) { out.startNewKeyGroup(keyGroupIdx); timeServiceManager.snapshotStateForKeyGroup( new DataOutputViewStreamWrapper(out), keyGroupIdx); } } catch (Exception exception) { throw new Exception(“Could not write timer service of " + getOperatorName() + " to checkpoint state stream.”, exception); } finally { try { out.close(); } catch (Exception closeException) { LOG.warn(“Could not close raw keyed operator state stream for {}. This " + “might have prevented deleting some state data.”, getOperatorName(), closeException); } } } } //……}AbstractStreamOperator的snapshotState方法只有在keyedStateBackend是AbstractKeyedStateBackend类型,而且requiresLegacySynchronousTimerSnapshots为true的条件下才会操作,具体是触发timeServiceManager.snapshotStateForKeyGroup(new DataOutputViewStreamWrapper(out), keyGroupIdx);不过它有不同的子类可能覆盖了snapshotState方法,比如AbstractUdfStreamOperatorAbstractUdfStreamOperatorflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java@PublicEvolvingpublic abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> { @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction); } //……}AbstractUdfStreamOperator覆盖了父类AbstractStreamOperator的snapshotState方法,新增了StreamingFunctionUtils.snapshotFunctionState操作StreamingFunctionUtils.snapshotFunctionStateflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java@Internalpublic final class StreamingFunctionUtils { public static void snapshotFunctionState( StateSnapshotContext context, OperatorStateBackend backend, Function userFunction) throws Exception { Preconditions.checkNotNull(context); Preconditions.checkNotNull(backend); while (true) { if (trySnapshotFunctionState(context, backend, userFunction)) { break; } // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function if (userFunction instanceof WrappingFunction) { userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction(); } else { break; } } } private static boolean trySnapshotFunctionState( StateSnapshotContext context, OperatorStateBackend backend, Function userFunction) throws Exception { if (userFunction instanceof CheckpointedFunction) { ((CheckpointedFunction) userFunction).snapshotState(context); return true; } if (userFunction instanceof ListCheckpointed) { @SuppressWarnings(“unchecked”) List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction). snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp()); ListState<Serializable> listState = backend. getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); listState.clear(); if (null != partitionableState) { try { for (Serializable statePartition : partitionableState) { listState.add(statePartition); } } catch (Exception e) { listState.clear(); throw new Exception(“Could not write partitionable state to operator " + “state backend.”, e); } } return true; } return false; } //……}snapshotFunctionState方法,这里执行了trySnapshotFunctionState操作,这里userFunction的类型,如果实现了CheckpointedFunction接口,则调用CheckpointedFunction.snapshotState,如果实现了ListCheckpointed接口,则调用ListCheckpointed.snapshotState方法,注意这里先clear了ListState,然后调用ListState.add方法将返回的List添加到ListState中小结flink的CheckpointCoordinatorDeActivator在job的status为RUNNING的时候会触发CheckpointCoordinator的startCheckpointScheduler,非RUNNING的时候调用CheckpointCoordinator的stopCheckpointScheduler方法CheckpointCoordinator的startCheckpointScheduler主要是注册了ScheduledTrigger任务,其run方法执行triggerCheckpoint操作,triggerCheckpoint方法在真正触发checkpoint之前会进行一系列的校验,不满足则立刻fail fast,其中可能的原因有(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS、CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS、NOT_ALL_REQUIRED_TASKS_RUNNING);满足条件的话,就是挨个遍历executions,调用Execution.triggerCheckpoint,它借助taskManagerGateway.triggerCheckpoint来通过rpc调用TaskExecutor的triggerCheckpoint方法TaskExecutor的triggerCheckpoint主要是调用Task的triggerCheckpointBarrier方法,后者主要是异步执行一个runnable,里头的run方法是调用invokable.triggerCheckpoint,这里的invokable为SourceStreamTask,而它主要是调用父类StreamTask的triggerCheckpoint方法,该方法的主要逻辑在performCheckpoint操作上;performCheckpoint在isRunning为true的时候,分了三步来处理,第一步执行operatorChain.prepareSnapshotPreBarrier,第二步执行operatorChain.broadcastCheckpointBarrier,第三步执行checkpointState方法,checkpointState里头创建CheckpointingOperation,然后调用checkpointingOperation.executeCheckpointing()CheckpointingOperation的executeCheckpointing方法会对所有的StreamOperator执行checkpointStreamOperator操作,而checkpointStreamOperator方法会调用StreamOperator的snapshotState方法;AbstractStreamOperator的snapshotState方法只有在keyedStateBackend是AbstractKeyedStateBackend类型,而且requiresLegacySynchronousTimerSnapshots为true的条件下才会操作AbstractUdfStreamOperator覆盖了父类AbstractStreamOperator的snapshotState方法,新增了StreamingFunctionUtils.snapshotFunctionState操作,该操作会根据userFunction的类型调用相应的方法(如果实现了CheckpointedFunction接口,则调用CheckpointedFunction.snapshotState,如果实现了ListCheckpointed接口,则调用ListCheckpointed.snapshotState方法)docWorking with State