聊聊flink的slot.request.timeout配置

23次阅读

共计 9426 个字符,预计需要花费 24 分钟才能阅读完成。


本文主要研究一下 flink 的 slot.request.timeout 配置
JobManagerOptions
flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@PublicEvolving
public class JobManagerOptions {
//……

/**
* The timeout in milliseconds for requesting a slot from Slot Pool.
*/
public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT =
key(“slot.request.timeout”)
.defaultValue(5L * 60L * 1000L)
.withDescription(“The timeout in milliseconds for requesting a slot from Slot Pool.”);

//……
}
slot.request.timeout 默认为 5 分钟
SlotManagerConfiguration
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
public class SlotManagerConfiguration {

private static final Logger LOGGER = LoggerFactory.getLogger(SlotManagerConfiguration.class);

private final Time taskManagerRequestTimeout;
private final Time slotRequestTimeout;
private final Time taskManagerTimeout;

public SlotManagerConfiguration(
Time taskManagerRequestTimeout,
Time slotRequestTimeout,
Time taskManagerTimeout) {
this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
}

public Time getTaskManagerRequestTimeout() {
return taskManagerRequestTimeout;
}

public Time getSlotRequestTimeout() {
return slotRequestTimeout;
}

public Time getTaskManagerTimeout() {
return taskManagerTimeout;
}

public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT);
final Time rpcTimeout;

try {
rpcTimeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
} catch (NumberFormatException e) {
throw new ConfigurationException(“Could not parse the resource manager’s timeout ” +
“value ” + AkkaOptions.ASK_TIMEOUT + ‘.’, e);
}

final Time slotRequestTimeout = getSlotRequestTimeout(configuration);
final Time taskManagerTimeout = Time.milliseconds(
configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));

return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout);
}

private static Time getSlotRequestTimeout(final Configuration configuration) {
final long slotRequestTimeoutMs;
if (configuration.contains(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)) {
LOGGER.warn(“Config key {} is deprecated; use {} instead.”,
ResourceManagerOptions.SLOT_REQUEST_TIMEOUT,
JobManagerOptions.SLOT_REQUEST_TIMEOUT);
slotRequestTimeoutMs = configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT);
} else {
slotRequestTimeoutMs = configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT);
}
return Time.milliseconds(slotRequestTimeoutMs);
}
}
SlotManagerConfiguration 的 getSlotRequestTimeout 方法会从配置文件读取 JobManagerOptions.SLOT_REQUEST_TIMEOUT
SlotManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
public class SlotManager implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);

/** Scheduled executor for timeouts. */
private final ScheduledExecutor scheduledExecutor;

/** Timeout for slot requests to the task manager. */
private final Time taskManagerRequestTimeout;

/** Timeout after which an allocation is discarded. */
private final Time slotRequestTimeout;

/** Timeout after which an unused TaskManager is released. */
private final Time taskManagerTimeout;

/** Map for all registered slots. */
private final HashMap<SlotID, TaskManagerSlot> slots;

/** Index of all currently free slots. */
private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;

/** All currently registered task managers. */
private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;

/** Map of fulfilled and active allocations for request deduplication purposes. */
private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;

/** Map of pending/unfulfilled slot allocation requests. */
private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;

private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;

/** ResourceManager’s id. */
private ResourceManagerId resourceManagerId;

/** Executor for future callbacks which have to be “synchronized”. */
private Executor mainThreadExecutor;

/** Callbacks for resource (de-)allocations. */
private ResourceActions resourceActions;

private ScheduledFuture<?> taskManagerTimeoutCheck;

private ScheduledFuture<?> slotRequestTimeoutCheck;

/** True iff the component has been started. */
private boolean started;

public SlotManager(
ScheduledExecutor scheduledExecutor,
Time taskManagerRequestTimeout,
Time slotRequestTimeout,
Time taskManagerTimeout) {
this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);

slots = new HashMap<>(16);
freeSlots = new LinkedHashMap<>(16);
taskManagerRegistrations = new HashMap<>(4);
fulfilledSlotRequests = new HashMap<>(16);
pendingSlotRequests = new HashMap<>(16);
pendingSlots = new HashMap<>(16);

resourceManagerId = null;
resourceActions = null;
mainThreadExecutor = null;
taskManagerTimeoutCheck = null;
slotRequestTimeoutCheck = null;

started = false;
}

public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
LOG.info(“Starting the SlotManager.”);

this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
resourceActions = Preconditions.checkNotNull(newResourceActions);

started = true;

taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
() -> mainThreadExecutor.execute(
() -> checkTaskManagerTimeouts()),
0L,
taskManagerTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);

slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
() -> mainThreadExecutor.execute(
() -> checkSlotRequestTimeouts()),
0L,
slotRequestTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
}

/**
* Suspends the component. This clears the internal state of the slot manager.
*/
public void suspend() {
LOG.info(“Suspending the SlotManager.”);

// stop the timeout checks for the TaskManagers and the SlotRequests
if (taskManagerTimeoutCheck != null) {
taskManagerTimeoutCheck.cancel(false);
taskManagerTimeoutCheck = null;
}

if (slotRequestTimeoutCheck != null) {
slotRequestTimeoutCheck.cancel(false);
slotRequestTimeoutCheck = null;
}

for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
cancelPendingSlotRequest(pendingSlotRequest);
}

pendingSlotRequests.clear();

ArrayList<InstanceID> registeredTaskManagers = new ArrayList<>(taskManagerRegistrations.keySet());

for (InstanceID registeredTaskManager : registeredTaskManagers) {
unregisterTaskManager(registeredTaskManager);
}

resourceManagerId = null;
resourceActions = null;
started = false;
}

public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException {
checkInit();

if (checkDuplicateRequest(slotRequest.getAllocationId())) {
LOG.debug(“Ignoring a duplicate slot request with allocation id {}.”, slotRequest.getAllocationId());

return false;
} else {
PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);

pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);

try {
internalRequestSlot(pendingSlotRequest);
} catch (ResourceManagerException e) {
// requesting the slot failed –> remove pending slot request
pendingSlotRequests.remove(slotRequest.getAllocationId());

throw new SlotManagerException(“Could not fulfill slot request ” + slotRequest.getAllocationId() + ‘.’, e);
}

return true;
}
}

private void checkSlotRequestTimeouts() {
if (!pendingSlotRequests.isEmpty()) {
long currentTime = System.currentTimeMillis();

Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator();

while (slotRequestIterator.hasNext()) {
PendingSlotRequest slotRequest = slotRequestIterator.next().getValue();

if (currentTime – slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) {
slotRequestIterator.remove();

if (slotRequest.isAssigned()) {
cancelPendingSlotRequest(slotRequest);
}

resourceActions.notifyAllocationFailure(
slotRequest.getJobId(),
slotRequest.getAllocationId(),
new TimeoutException(“The allocation could not be fulfilled in time.”));
}
}
}
}

//……

}

SlotManager 的构造器接收 slotRequestTimeout 参数;它维护了 pendingSlotRequests 的 map;start 方法会注册 slotRequestTimeoutCheck,每隔 slotRequestTimeout 的时间调度一次,执行的是 checkSlotRequestTimeouts 方法;suspend 方法会 cancel 这些 pendingSlotRequest,然后情况 pendingSlotRequests 的 map
registerSlotRequest 方法会先执行 checkDuplicateRequest 判断是否有重复,没有重复的话,则将该 slotRequest 维护到 pendingSlotRequests,然后调用 internalRequestSlot 进行分配,如果出现异常则从 pendingSlotRequests 中异常,然后抛出 SlotManagerException
checkSlotRequestTimeouts 则会遍历 pendingSlotRequests,然后根据 slotRequest.getCreationTimestamp() 及当前时间判断时间差是否大于等于 slotRequestTimeout,已经超时的话,则会从 pendingSlotRequests 中移除该 slotRequest,然后进行 cancel,同时触发 resourceActions.notifyAllocationFailure

小结

SlotManagerConfiguration 的 getSlotRequestTimeout 方法会从配置文件读取 JobManagerOptions.SLOT_REQUEST_TIMEOUT;slot.request.timeout 默认为 5 分钟
SlotManager 的构造器接收 slotRequestTimeout 参数;它维护了 pendingSlotRequests 的 map;start 方法会注册 slotRequestTimeoutCheck,每隔 slotRequestTimeout 的时间调度一次,执行的是 checkSlotRequestTimeouts 方法;suspend 方法会 cancel 这些 pendingSlotRequest,然后情况 pendingSlotRequests 的 map
registerSlotRequest 方法会先执行 checkDuplicateRequest 判断是否有重复,没有重复的话,则将该 slotRequest 维护到 pendingSlotRequests,然后调用 internalRequestSlot 进行分配,如果出现异常则从 pendingSlotRequests 中异常,然后抛出 SlotManagerException;checkSlotRequestTimeouts 则会遍历 pendingSlotRequests,然后根据 slotRequest.getCreationTimestamp() 及当前时间判断时间差是否大于等于 slotRequestTimeout,已经超时的话,则会从 pendingSlotRequests 中移除该 slotRequest,然后进行 cancel,同时触发 resourceActions.notifyAllocationFailure

doc
slot-request-timeout

正文完
 0