共计 10460 个字符,预计需要花费 27 分钟才能阅读完成。
序
本文主要研究一下 flink 的 KvStateRegistryGateway
KvStateRegistryGateway
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.java
public interface KvStateRegistryGateway {
/**
* Notifies that queryable state has been registered.
*
* @param jobId identifying the job for which to register a key value state
* @param jobVertexId JobVertexID the KvState instance belongs to.
* @param keyGroupRange Key group range the KvState instance belongs to.
* @param registrationName Name under which the KvState has been registered.
* @param kvStateId ID of the registered KvState instance.
* @param kvStateServerAddress Server address where to find the KvState instance.
* @return Future acknowledge if the key-value state has been registered
*/
CompletableFuture<Acknowledge> notifyKvStateRegistered(
final JobID jobId,
final JobVertexID jobVertexId,
final KeyGroupRange keyGroupRange,
final String registrationName,
final KvStateID kvStateId,
final InetSocketAddress kvStateServerAddress);
/**
* Notifies that queryable state has been unregistered.
*
* @param jobId identifying the job for which to unregister a key value state
* @param jobVertexId JobVertexID the KvState instance belongs to.
* @param keyGroupRange Key group index the KvState instance belongs to.
* @param registrationName Name under which the KvState has been registered.
* @return Future acknowledge if the key-value state has been unregistered
*/
CompletableFuture<Acknowledge> notifyKvStateUnregistered(
final JobID jobId,
final JobVertexID jobVertexId,
final KeyGroupRange keyGroupRange,
final String registrationName);
}
KvStateRegistryGateway 接口定义了 notifyKvStateRegistered、notifyKvStateUnregistered 两个方法;JobMaster 实现了这两个方法
JobMaster
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {
/** Default names for Flink’s distributed components. */
public static final String JOB_MANAGER_NAME = “jobmanager”;
public static final String ARCHIVE_NAME = “archive”;
// ————————————————————————
private final JobMasterConfiguration jobMasterConfiguration;
private final ResourceID resourceId;
private final JobGraph jobGraph;
private final Time rpcTimeout;
private final HighAvailabilityServices highAvailabilityServices;
private final BlobServer blobServer;
private final JobManagerJobMetricGroupFactory jobMetricGroupFactory;
private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager;
private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
private final ScheduledExecutorService scheduledExecutorService;
private final OnCompletionActions jobCompletionActions;
private final FatalErrorHandler fatalErrorHandler;
private final ClassLoader userCodeLoader;
private final SlotPool slotPool;
private final SlotPoolGateway slotPoolGateway;
private final RestartStrategy restartStrategy;
// ——— BackPressure ——–
private final BackPressureStatsTracker backPressureStatsTracker;
// ——— ResourceManager ——–
private final LeaderRetrievalService resourceManagerLeaderRetriever;
// ——— TaskManagers ——–
private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
// ——– Mutable fields ———
private ExecutionGraph executionGraph;
@Nullable
private JobManagerJobStatusListener jobStatusListener;
@Nullable
private JobManagerJobMetricGroup jobManagerJobMetricGroup;
@Nullable
private String lastInternalSavepoint;
@Nullable
private ResourceManagerAddress resourceManagerAddress;
@Nullable
private ResourceManagerConnection resourceManagerConnection;
@Nullable
private EstablishedResourceManagerConnection establishedResourceManagerConnection;
//……
@Override
public CompletableFuture<Acknowledge> notifyKvStateRegistered(
final JobID jobId,
final JobVertexID jobVertexId,
final KeyGroupRange keyGroupRange,
final String registrationName,
final KvStateID kvStateId,
final InetSocketAddress kvStateServerAddress) {
if (jobGraph.getJobID().equals(jobId)) {
if (log.isDebugEnabled()) {
log.debug(“Key value state registered for job {} under name {}.”,
jobGraph.getJobID(), registrationName);
}
try {
executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (Exception e) {
log.error(“Failed to notify KvStateRegistry about registration {}.”, registrationName);
return FutureUtils.completedExceptionally(e);
}
} else {
if (log.isDebugEnabled()) {
log.debug(“Notification about key-value state registration for unknown job {} received.”, jobId);
}
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
}
}
@Override
public CompletableFuture<Acknowledge> notifyKvStateUnregistered(
JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName) {
if (jobGraph.getJobID().equals(jobId)) {
if (log.isDebugEnabled()) {
log.debug(“Key value state unregistered for job {} under name {}.”,
jobGraph.getJobID(), registrationName);
}
try {
executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
jobVertexId, keyGroupRange, registrationName);
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (Exception e) {
log.error(“Failed to notify KvStateRegistry about registration {}.”, registrationName, e);
return FutureUtils.completedExceptionally(e);
}
} else {
if (log.isDebugEnabled()) {
log.debug(“Notification about key-value state deregistration for unknown job {} received.”, jobId);
}
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
}
}
//……
}
JobMaster 的 notifyKvStateRegistered 方法主要是触发 executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered 方法主要是触发 executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered
KvStateLocationRegistry
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
public class KvStateLocationRegistry {
/** JobID this coordinator belongs to. */
private final JobID jobId;
/** Job vertices for determining parallelism per key. */
private final Map<JobVertexID, ExecutionJobVertex> jobVertices;
/**
* Location info keyed by registration name. The name needs to be unique
* per JobID, i.e. two operators cannot register KvState with the same
* name.
*/
private final Map<String, KvStateLocation> lookupTable = new HashMap<>();
/**
* Creates the registry for the job.
*
* @param jobId JobID this coordinator belongs to.
* @param jobVertices Job vertices map of all vertices of this job.
*/
public KvStateLocationRegistry(JobID jobId, Map<JobVertexID, ExecutionJobVertex> jobVertices) {
this.jobId = Preconditions.checkNotNull(jobId, “JobID”);
this.jobVertices = Preconditions.checkNotNull(jobVertices, “Job vertices”);
}
/**
* Returns the {@link KvStateLocation} for the registered KvState instance
* or <code>null</code> if no location information is available.
*
* @param registrationName Name under which the KvState instance is registered.
* @return Location information or <code>null</code>.
*/
public KvStateLocation getKvStateLocation(String registrationName) {
return lookupTable.get(registrationName);
}
/**
* Notifies the registry about a registered KvState instance.
*
* @param jobVertexId JobVertexID the KvState instance belongs to
* @param keyGroupRange Key group range the KvState instance belongs to
* @param registrationName Name under which the KvState has been registered
* @param kvStateId ID of the registered KvState instance
* @param kvStateServerAddress Server address where to find the KvState instance
*
* @throws IllegalArgumentException If JobVertexID does not belong to job
* @throws IllegalArgumentException If state has been registered with same
* name by another operator.
* @throws IndexOutOfBoundsException If key group index is out of bounds.
*/
public void notifyKvStateRegistered(
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName,
KvStateID kvStateId,
InetSocketAddress kvStateServerAddress) {
KvStateLocation location = lookupTable.get(registrationName);
if (location == null) {
// First registration for this operator, create the location info
ExecutionJobVertex vertex = jobVertices.get(jobVertexId);
if (vertex != null) {
int parallelism = vertex.getMaxParallelism();
location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName);
lookupTable.put(registrationName, location);
} else {
throw new IllegalArgumentException(“Unknown JobVertexID ” + jobVertexId);
}
}
// Duplicated name if vertex IDs don’t match
if (!location.getJobVertexId().equals(jobVertexId)) {
IllegalStateException duplicate = new IllegalStateException(
“Registration name clash. KvState with name ‘” + registrationName +
“‘ has already been registered by another operator (” +
location.getJobVertexId() + “).”);
ExecutionJobVertex vertex = jobVertices.get(jobVertexId);
if (vertex != null) {
vertex.fail(new SuppressRestartsException(duplicate));
}
throw duplicate;
}
location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress);
}
/**
* Notifies the registry about an unregistered KvState instance.
*
* @param jobVertexId JobVertexID the KvState instance belongs to
* @param keyGroupRange Key group index the KvState instance belongs to
* @param registrationName Name under which the KvState has been registered
* @throws IllegalArgumentException If another operator registered the state instance
* @throws IllegalArgumentException If the registration name is not known
*/
public void notifyKvStateUnregistered(
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName) {
KvStateLocation location = lookupTable.get(registrationName);
if (location != null) {
// Duplicate name if vertex IDs don’t match
if (!location.getJobVertexId().equals(jobVertexId)) {
throw new IllegalArgumentException(“Another operator (” +
location.getJobVertexId() + “) registered the KvState ” +
“under ‘” + registrationName + “‘.”);
}
location.unregisterKvState(keyGroupRange);
if (location.getNumRegisteredKeyGroups() == 0) {
lookupTable.remove(registrationName);
}
} else {
throw new IllegalArgumentException(“Unknown registration name ‘” +
registrationName + “‘. ” + “Probably registration/unregistration race.”);
}
}
}
KvStateLocationRegistry 的构造器要求传入 jobId 及 jobVertices;它有一个属性为 lookupTable,存储了 registrationName 与 KvStateLocation 的映射关系
notifyKvStateRegistered 方法在 lookupTable 查找不到对应的 KvStateLocation 的时候会创建一个 KvStateLocation 并存放入 lookupTable,最后调用 location.registerKvState 方法
notifyKvStateUnregistere 方法在 lookupTable 查找对应 KvStateLocation 的时候会触发 location.unregisterKvState,然后将该 KvStateLocation 从 lookupTable 中移除
小结
KvStateRegistryGateway 接口定义了 notifyKvStateRegistered、notifyKvStateUnregistered 两个方法;JobMaster 实现了这两个方法
JobMaster 的 notifyKvStateRegistered 方法主要是触发 executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered 方法主要是触发 executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered
KvStateLocationRegistry 的构造器要求传入 jobId 及 jobVertices;它有一个属性为 lookupTable,存储了 registrationName 与 KvStateLocation 的映射关系;notifyKvStateRegistered 方法在 lookupTable 查找不到对应的 KvStateLocation 的时候会创建一个 KvStateLocation 并存放入 lookupTable,最后调用 location.registerKvState 方法;notifyKvStateUnregistere 方法在 lookupTable 查找对应 KvStateLocation 的时候会触发 location.unregisterKvState,然后将该 KvStateLocation 从 lookupTable 中移除
doc
KvStateRegistryGateway