乐趣区

聊聊flink的KvStateRegistryGateway


本文主要研究一下 flink 的 KvStateRegistryGateway
KvStateRegistryGateway
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.java
public interface KvStateRegistryGateway {

/**
* Notifies that queryable state has been registered.
*
* @param jobId identifying the job for which to register a key value state
* @param jobVertexId JobVertexID the KvState instance belongs to.
* @param keyGroupRange Key group range the KvState instance belongs to.
* @param registrationName Name under which the KvState has been registered.
* @param kvStateId ID of the registered KvState instance.
* @param kvStateServerAddress Server address where to find the KvState instance.
* @return Future acknowledge if the key-value state has been registered
*/
CompletableFuture<Acknowledge> notifyKvStateRegistered(
final JobID jobId,
final JobVertexID jobVertexId,
final KeyGroupRange keyGroupRange,
final String registrationName,
final KvStateID kvStateId,
final InetSocketAddress kvStateServerAddress);

/**
* Notifies that queryable state has been unregistered.
*
* @param jobId identifying the job for which to unregister a key value state
* @param jobVertexId JobVertexID the KvState instance belongs to.
* @param keyGroupRange Key group index the KvState instance belongs to.
* @param registrationName Name under which the KvState has been registered.
* @return Future acknowledge if the key-value state has been unregistered
*/
CompletableFuture<Acknowledge> notifyKvStateUnregistered(
final JobID jobId,
final JobVertexID jobVertexId,
final KeyGroupRange keyGroupRange,
final String registrationName);
}
KvStateRegistryGateway 接口定义了 notifyKvStateRegistered、notifyKvStateUnregistered 两个方法;JobMaster 实现了这两个方法
JobMaster
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {

/** Default names for Flink’s distributed components. */
public static final String JOB_MANAGER_NAME = “jobmanager”;
public static final String ARCHIVE_NAME = “archive”;

// ————————————————————————

private final JobMasterConfiguration jobMasterConfiguration;

private final ResourceID resourceId;

private final JobGraph jobGraph;

private final Time rpcTimeout;

private final HighAvailabilityServices highAvailabilityServices;

private final BlobServer blobServer;

private final JobManagerJobMetricGroupFactory jobMetricGroupFactory;

private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager;

private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;

private final ScheduledExecutorService scheduledExecutorService;

private final OnCompletionActions jobCompletionActions;

private final FatalErrorHandler fatalErrorHandler;

private final ClassLoader userCodeLoader;

private final SlotPool slotPool;

private final SlotPoolGateway slotPoolGateway;

private final RestartStrategy restartStrategy;

// ——— BackPressure ——–

private final BackPressureStatsTracker backPressureStatsTracker;

// ——— ResourceManager ——–

private final LeaderRetrievalService resourceManagerLeaderRetriever;

// ——— TaskManagers ——–

private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;

// ——– Mutable fields ———

private ExecutionGraph executionGraph;

@Nullable
private JobManagerJobStatusListener jobStatusListener;

@Nullable
private JobManagerJobMetricGroup jobManagerJobMetricGroup;

@Nullable
private String lastInternalSavepoint;

@Nullable
private ResourceManagerAddress resourceManagerAddress;

@Nullable
private ResourceManagerConnection resourceManagerConnection;

@Nullable
private EstablishedResourceManagerConnection establishedResourceManagerConnection;

//……

@Override
public CompletableFuture<Acknowledge> notifyKvStateRegistered(
final JobID jobId,
final JobVertexID jobVertexId,
final KeyGroupRange keyGroupRange,
final String registrationName,
final KvStateID kvStateId,
final InetSocketAddress kvStateServerAddress) {
if (jobGraph.getJobID().equals(jobId)) {
if (log.isDebugEnabled()) {
log.debug(“Key value state registered for job {} under name {}.”,
jobGraph.getJobID(), registrationName);
}

try {
executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);

return CompletableFuture.completedFuture(Acknowledge.get());
} catch (Exception e) {
log.error(“Failed to notify KvStateRegistry about registration {}.”, registrationName);
return FutureUtils.completedExceptionally(e);
}
} else {
if (log.isDebugEnabled()) {
log.debug(“Notification about key-value state registration for unknown job {} received.”, jobId);
}
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
}
}

@Override
public CompletableFuture<Acknowledge> notifyKvStateUnregistered(
JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName) {
if (jobGraph.getJobID().equals(jobId)) {
if (log.isDebugEnabled()) {
log.debug(“Key value state unregistered for job {} under name {}.”,
jobGraph.getJobID(), registrationName);
}

try {
executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
jobVertexId, keyGroupRange, registrationName);

return CompletableFuture.completedFuture(Acknowledge.get());
} catch (Exception e) {
log.error(“Failed to notify KvStateRegistry about registration {}.”, registrationName, e);
return FutureUtils.completedExceptionally(e);
}
} else {
if (log.isDebugEnabled()) {
log.debug(“Notification about key-value state deregistration for unknown job {} received.”, jobId);
}
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
}
}

//……
}
JobMaster 的 notifyKvStateRegistered 方法主要是触发 executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered 方法主要是触发 executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered
KvStateLocationRegistry
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
public class KvStateLocationRegistry {

/** JobID this coordinator belongs to. */
private final JobID jobId;

/** Job vertices for determining parallelism per key. */
private final Map<JobVertexID, ExecutionJobVertex> jobVertices;

/**
* Location info keyed by registration name. The name needs to be unique
* per JobID, i.e. two operators cannot register KvState with the same
* name.
*/
private final Map<String, KvStateLocation> lookupTable = new HashMap<>();

/**
* Creates the registry for the job.
*
* @param jobId JobID this coordinator belongs to.
* @param jobVertices Job vertices map of all vertices of this job.
*/
public KvStateLocationRegistry(JobID jobId, Map<JobVertexID, ExecutionJobVertex> jobVertices) {
this.jobId = Preconditions.checkNotNull(jobId, “JobID”);
this.jobVertices = Preconditions.checkNotNull(jobVertices, “Job vertices”);
}

/**
* Returns the {@link KvStateLocation} for the registered KvState instance
* or <code>null</code> if no location information is available.
*
* @param registrationName Name under which the KvState instance is registered.
* @return Location information or <code>null</code>.
*/
public KvStateLocation getKvStateLocation(String registrationName) {
return lookupTable.get(registrationName);
}

/**
* Notifies the registry about a registered KvState instance.
*
* @param jobVertexId JobVertexID the KvState instance belongs to
* @param keyGroupRange Key group range the KvState instance belongs to
* @param registrationName Name under which the KvState has been registered
* @param kvStateId ID of the registered KvState instance
* @param kvStateServerAddress Server address where to find the KvState instance
*
* @throws IllegalArgumentException If JobVertexID does not belong to job
* @throws IllegalArgumentException If state has been registered with same
* name by another operator.
* @throws IndexOutOfBoundsException If key group index is out of bounds.
*/
public void notifyKvStateRegistered(
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName,
KvStateID kvStateId,
InetSocketAddress kvStateServerAddress) {

KvStateLocation location = lookupTable.get(registrationName);

if (location == null) {
// First registration for this operator, create the location info
ExecutionJobVertex vertex = jobVertices.get(jobVertexId);

if (vertex != null) {
int parallelism = vertex.getMaxParallelism();
location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName);
lookupTable.put(registrationName, location);
} else {
throw new IllegalArgumentException(“Unknown JobVertexID ” + jobVertexId);
}
}

// Duplicated name if vertex IDs don’t match
if (!location.getJobVertexId().equals(jobVertexId)) {
IllegalStateException duplicate = new IllegalStateException(
“Registration name clash. KvState with name ‘” + registrationName +
“‘ has already been registered by another operator (” +
location.getJobVertexId() + “).”);

ExecutionJobVertex vertex = jobVertices.get(jobVertexId);
if (vertex != null) {
vertex.fail(new SuppressRestartsException(duplicate));
}

throw duplicate;
}
location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress);
}

/**
* Notifies the registry about an unregistered KvState instance.
*
* @param jobVertexId JobVertexID the KvState instance belongs to
* @param keyGroupRange Key group index the KvState instance belongs to
* @param registrationName Name under which the KvState has been registered
* @throws IllegalArgumentException If another operator registered the state instance
* @throws IllegalArgumentException If the registration name is not known
*/
public void notifyKvStateUnregistered(
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName) {

KvStateLocation location = lookupTable.get(registrationName);

if (location != null) {
// Duplicate name if vertex IDs don’t match
if (!location.getJobVertexId().equals(jobVertexId)) {
throw new IllegalArgumentException(“Another operator (” +
location.getJobVertexId() + “) registered the KvState ” +
“under ‘” + registrationName + “‘.”);
}

location.unregisterKvState(keyGroupRange);

if (location.getNumRegisteredKeyGroups() == 0) {
lookupTable.remove(registrationName);
}
} else {
throw new IllegalArgumentException(“Unknown registration name ‘” +
registrationName + “‘. ” + “Probably registration/unregistration race.”);
}
}

}

KvStateLocationRegistry 的构造器要求传入 jobId 及 jobVertices;它有一个属性为 lookupTable,存储了 registrationName 与 KvStateLocation 的映射关系
notifyKvStateRegistered 方法在 lookupTable 查找不到对应的 KvStateLocation 的时候会创建一个 KvStateLocation 并存放入 lookupTable,最后调用 location.registerKvState 方法
notifyKvStateUnregistere 方法在 lookupTable 查找对应 KvStateLocation 的时候会触发 location.unregisterKvState,然后将该 KvStateLocation 从 lookupTable 中移除

小结

KvStateRegistryGateway 接口定义了 notifyKvStateRegistered、notifyKvStateUnregistered 两个方法;JobMaster 实现了这两个方法
JobMaster 的 notifyKvStateRegistered 方法主要是触发 executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered 方法主要是触发 executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered
KvStateLocationRegistry 的构造器要求传入 jobId 及 jobVertices;它有一个属性为 lookupTable,存储了 registrationName 与 KvStateLocation 的映射关系;notifyKvStateRegistered 方法在 lookupTable 查找不到对应的 KvStateLocation 的时候会创建一个 KvStateLocation 并存放入 lookupTable,最后调用 location.registerKvState 方法;notifyKvStateUnregistere 方法在 lookupTable 查找对应 KvStateLocation 的时候会触发 location.unregisterKvState,然后将该 KvStateLocation 从 lookupTable 中移除

doc
KvStateRegistryGateway

退出移动版