序本文主要研究一下flink的KvStateRegistryGatewayKvStateRegistryGatewayflink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.javapublic interface KvStateRegistryGateway { /** * Notifies that queryable state has been registered. * * @param jobId identifying the job for which to register a key value state * @param jobVertexId JobVertexID the KvState instance belongs to. * @param keyGroupRange Key group range the KvState instance belongs to. * @param registrationName Name under which the KvState has been registered. * @param kvStateId ID of the registered KvState instance. * @param kvStateServerAddress Server address where to find the KvState instance. * @return Future acknowledge if the key-value state has been registered / CompletableFuture<Acknowledge> notifyKvStateRegistered( final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName, final KvStateID kvStateId, final InetSocketAddress kvStateServerAddress); /* * Notifies that queryable state has been unregistered. * * @param jobId identifying the job for which to unregister a key value state * @param jobVertexId JobVertexID the KvState instance belongs to. * @param keyGroupRange Key group index the KvState instance belongs to. * @param registrationName Name under which the KvState has been registered. * @return Future acknowledge if the key-value state has been unregistered / CompletableFuture<Acknowledge> notifyKvStateUnregistered( final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName);}KvStateRegistryGateway接口定义了notifyKvStateRegistered、notifyKvStateUnregistered两个方法;JobMaster实现了这两个方法JobMasterflink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.javapublic class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway { /* Default names for Flink’s distributed components. / public static final String JOB_MANAGER_NAME = “jobmanager”; public static final String ARCHIVE_NAME = “archive”; // ———————————————————————— private final JobMasterConfiguration jobMasterConfiguration; private final ResourceID resourceId; private final JobGraph jobGraph; private final Time rpcTimeout; private final HighAvailabilityServices highAvailabilityServices; private final BlobServer blobServer; private final JobManagerJobMetricGroupFactory jobMetricGroupFactory; private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager; private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager; private final ScheduledExecutorService scheduledExecutorService; private final OnCompletionActions jobCompletionActions; private final FatalErrorHandler fatalErrorHandler; private final ClassLoader userCodeLoader; private final SlotPool slotPool; private final SlotPoolGateway slotPoolGateway; private final RestartStrategy restartStrategy; // ——— BackPressure ——– private final BackPressureStatsTracker backPressureStatsTracker; // ——— ResourceManager ——– private final LeaderRetrievalService resourceManagerLeaderRetriever; // ——— TaskManagers ——– private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers; // ——– Mutable fields ——— private ExecutionGraph executionGraph; @Nullable private JobManagerJobStatusListener jobStatusListener; @Nullable private JobManagerJobMetricGroup jobManagerJobMetricGroup; @Nullable private String lastInternalSavepoint; @Nullable private ResourceManagerAddress resourceManagerAddress; @Nullable private ResourceManagerConnection resourceManagerConnection; @Nullable private EstablishedResourceManagerConnection establishedResourceManagerConnection; //…… @Override public CompletableFuture<Acknowledge> notifyKvStateRegistered( final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName, final KvStateID kvStateId, final InetSocketAddress kvStateServerAddress) { if (jobGraph.getJobID().equals(jobId)) { if (log.isDebugEnabled()) { log.debug(“Key value state registered for job {} under name {}.”, jobGraph.getJobID(), registrationName); } try { executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered( jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress); return CompletableFuture.completedFuture(Acknowledge.get()); } catch (Exception e) { log.error(“Failed to notify KvStateRegistry about registration {}.”, registrationName); return FutureUtils.completedExceptionally(e); } } else { if (log.isDebugEnabled()) { log.debug(“Notification about key-value state registration for unknown job {} received.”, jobId); } return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } } @Override public CompletableFuture<Acknowledge> notifyKvStateUnregistered( JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) { if (jobGraph.getJobID().equals(jobId)) { if (log.isDebugEnabled()) { log.debug(“Key value state unregistered for job {} under name {}.”, jobGraph.getJobID(), registrationName); } try { executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered( jobVertexId, keyGroupRange, registrationName); return CompletableFuture.completedFuture(Acknowledge.get()); } catch (Exception e) { log.error(“Failed to notify KvStateRegistry about registration {}.”, registrationName, e); return FutureUtils.completedExceptionally(e); } } else { if (log.isDebugEnabled()) { log.debug(“Notification about key-value state deregistration for unknown job {} received.”, jobId); } return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } } //……}JobMaster的notifyKvStateRegistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateUnregisteredKvStateLocationRegistryflink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.javapublic class KvStateLocationRegistry { /* JobID this coordinator belongs to. / private final JobID jobId; /* Job vertices for determining parallelism per key. / private final Map<JobVertexID, ExecutionJobVertex> jobVertices; /* * Location info keyed by registration name. The name needs to be unique * per JobID, i.e. two operators cannot register KvState with the same * name. / private final Map<String, KvStateLocation> lookupTable = new HashMap<>(); /* * Creates the registry for the job. * * @param jobId JobID this coordinator belongs to. * @param jobVertices Job vertices map of all vertices of this job. / public KvStateLocationRegistry(JobID jobId, Map<JobVertexID, ExecutionJobVertex> jobVertices) { this.jobId = Preconditions.checkNotNull(jobId, “JobID”); this.jobVertices = Preconditions.checkNotNull(jobVertices, “Job vertices”); } /* * Returns the {@link KvStateLocation} for the registered KvState instance * or <code>null</code> if no location information is available. * * @param registrationName Name under which the KvState instance is registered. * @return Location information or <code>null</code>. / public KvStateLocation getKvStateLocation(String registrationName) { return lookupTable.get(registrationName); } /* * Notifies the registry about a registered KvState instance. * * @param jobVertexId JobVertexID the KvState instance belongs to * @param keyGroupRange Key group range the KvState instance belongs to * @param registrationName Name under which the KvState has been registered * @param kvStateId ID of the registered KvState instance * @param kvStateServerAddress Server address where to find the KvState instance * * @throws IllegalArgumentException If JobVertexID does not belong to job * @throws IllegalArgumentException If state has been registered with same * name by another operator. * @throws IndexOutOfBoundsException If key group index is out of bounds. / public void notifyKvStateRegistered( JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) { KvStateLocation location = lookupTable.get(registrationName); if (location == null) { // First registration for this operator, create the location info ExecutionJobVertex vertex = jobVertices.get(jobVertexId); if (vertex != null) { int parallelism = vertex.getMaxParallelism(); location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName); lookupTable.put(registrationName, location); } else { throw new IllegalArgumentException(“Unknown JobVertexID " + jobVertexId); } } // Duplicated name if vertex IDs don’t match if (!location.getJobVertexId().equals(jobVertexId)) { IllegalStateException duplicate = new IllegalStateException( “Registration name clash. KvState with name ‘” + registrationName + “’ has already been registered by another operator (” + location.getJobVertexId() + “).”); ExecutionJobVertex vertex = jobVertices.get(jobVertexId); if (vertex != null) { vertex.fail(new SuppressRestartsException(duplicate)); } throw duplicate; } location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress); } /* * Notifies the registry about an unregistered KvState instance. * * @param jobVertexId JobVertexID the KvState instance belongs to * @param keyGroupRange Key group index the KvState instance belongs to * @param registrationName Name under which the KvState has been registered * @throws IllegalArgumentException If another operator registered the state instance * @throws IllegalArgumentException If the registration name is not known */ public void notifyKvStateUnregistered( JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) { KvStateLocation location = lookupTable.get(registrationName); if (location != null) { // Duplicate name if vertex IDs don’t match if (!location.getJobVertexId().equals(jobVertexId)) { throw new IllegalArgumentException(“Another operator (” + location.getJobVertexId() + “) registered the KvState " + “under ‘” + registrationName + “’.”); } location.unregisterKvState(keyGroupRange); if (location.getNumRegisteredKeyGroups() == 0) { lookupTable.remove(registrationName); } } else { throw new IllegalArgumentException(“Unknown registration name ‘” + registrationName + “’. " + “Probably registration/unregistration race.”); } }}KvStateLocationRegistry的构造器要求传入jobId及jobVertices;它有一个属性为lookupTable,存储了registrationName与KvStateLocation的映射关系notifyKvStateRegistered方法在lookupTable查找不到对应的KvStateLocation的时候会创建一个KvStateLocation并存放入lookupTable,最后调用location.registerKvState方法notifyKvStateUnregistere方法在lookupTable查找对应KvStateLocation的时候会触发location.unregisterKvState,然后将该KvStateLocation从lookupTable中移除小结KvStateRegistryGateway接口定义了notifyKvStateRegistered、notifyKvStateUnregistered两个方法;JobMaster实现了这两个方法JobMaster的notifyKvStateRegistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateUnregisteredKvStateLocationRegistry的构造器要求传入jobId及jobVertices;它有一个属性为lookupTable,存储了registrationName与KvStateLocation的映射关系;notifyKvStateRegistered方法在lookupTable查找不到对应的KvStateLocation的时候会创建一个KvStateLocation并存放入lookupTable,最后调用location.registerKvState方法;notifyKvStateUnregistere方法在lookupTable查找对应KvStateLocation的时候会触发location.unregisterKvState,然后将该KvStateLocation从lookupTable中移除docKvStateRegistryGateway