乐趣区

聊聊flink的slot.idle.timeout配置


本文主要研究一下 flink 的 slot.idle.timeout 配置
JobManagerOptions
flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@PublicEvolving
public class JobManagerOptions {
//……

/**
* The timeout in milliseconds for a idle slot in Slot Pool.
*/
public static final ConfigOption<Long> SLOT_IDLE_TIMEOUT =
key(“slot.idle.timeout”)
// default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery
.defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
.withDescription(“The timeout in milliseconds for a idle slot in Slot Pool.”);

//……
}
slot.idle.timeout 默认为 HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue(),即 50000L 毫秒
SlotPool
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedSlotActions {

/** The interval (in milliseconds) in which the SlotPool writes its slot distribution on debug level. */
private static final int STATUS_LOG_INTERVAL_MS = 60_000;

private final JobID jobId;

private final SchedulingStrategy schedulingStrategy;

private final ProviderAndOwner providerAndOwner;

/** All registered TaskManagers, slots will be accepted and used only if the resource is registered. */
private final HashSet<ResourceID> registeredTaskManagers;

/** The book-keeping of all allocated slots. */
private final AllocatedSlots allocatedSlots;

/** The book-keeping of all available slots. */
private final AvailableSlots availableSlots;

/** All pending requests waiting for slots. */
private final DualKeyMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;

/** The requests that are waiting for the resource manager to be connected. */
private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager;

/** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor). */
private final Time rpcTimeout;

/** Timeout for releasing idle slots. */
private final Time idleSlotTimeout;

private final Clock clock;

/** Managers for the different slot sharing groups. */
protected final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers;

/** the fencing token of the job manager. */
private JobMasterId jobMasterId;

/** The gateway to communicate with resource manager. */
private ResourceManagerGateway resourceManagerGateway;

private String jobManagerAddress;

//……

/**
* Start the slot pool to accept RPC calls.
*
* @param jobMasterId The necessary leader id for running the job.
* @param newJobManagerAddress for the slot requests which are sent to the resource manager
*/
public void start(JobMasterId jobMasterId, String newJobManagerAddress) throws Exception {
this.jobMasterId = checkNotNull(jobMasterId);
this.jobManagerAddress = checkNotNull(newJobManagerAddress);

// TODO – start should not throw an exception
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(“This should never happen”, e);
}

scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);

if (log.isDebugEnabled()) {
scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}

/**
* Check the available slots, release the slot that is idle for a long time.
*/
private void checkIdleSlot() {

// The timestamp in SlotAndTimestamp is relative
final long currentRelativeTimeMillis = clock.relativeTimeMillis();

final List<AllocatedSlot> expiredSlots = new ArrayList<>(availableSlots.size());

for (SlotAndTimestamp slotAndTimestamp : availableSlots.availableSlots.values()) {
if (currentRelativeTimeMillis – slotAndTimestamp.timestamp > idleSlotTimeout.toMilliseconds()) {
expiredSlots.add(slotAndTimestamp.slot);
}
}

final FlinkException cause = new FlinkException(“Releasing idle slot.”);

for (AllocatedSlot expiredSlot : expiredSlots) {
final AllocationID allocationID = expiredSlot.getAllocationId();
if (availableSlots.tryRemove(allocationID) != null) {

log.info(“Releasing idle slot [{}].”, allocationID);
final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
allocationID,
cause,
rpcTimeout);

freeSlotFuture.whenCompleteAsync(
(Acknowledge ignored, Throwable throwable) -> {
if (throwable != null) {
if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) {
log.debug(“Releasing slot [{}] of registered TaskExecutor {} failed. ” +
“Trying to fulfill a different slot request.”, allocationID, expiredSlot.getTaskManagerId(),
throwable);
tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
} else {
log.debug(“Releasing slot [{}] failed and owning TaskExecutor {} is no ” +
“longer registered. Discarding slot.”, allocationID, expiredSlot.getTaskManagerId());
}
}
},
getMainThreadExecutor());
}
}

scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
}

//……
}
SlotPool 在 start 方法里头,调用 scheduleRunAsync 方法,延时 idleSlotTimeout 调度执行 checkIdleSlot;checkIdleSlot 方法会挨个检查 availableSlots 的 SlotAndTimestamp,判断当前时间与 slotAndTimestamp.timestamp 的时间差是否超过 idleSlotTimeout,超过的话,则放入 expiredSlots,之后对 expiredSlots 挨个进行 availableSlots.tryRemove,然后调用 TaskManagerGateway.freeSlot 进行释放,之后再次调用 scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout) 进行下一次的延时调度检测
RpcEndpoint
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
public abstract class RpcEndpoint implements RpcGateway {
//……

/**
* Execute the runnable in the main thread of the underlying RPC endpoint, with
* a delay of the given number of milliseconds.
*
* @param runnable Runnable to be executed
* @param delay The delay after which the runnable will be executed
*/
protected void scheduleRunAsync(Runnable runnable, Time delay) {
scheduleRunAsync(runnable, delay.getSize(), delay.getUnit());
}

/**
* Execute the runnable in the main thread of the underlying RPC endpoint, with
* a delay of the given number of milliseconds.
*
* @param runnable Runnable to be executed
* @param delay The delay after which the runnable will be executed
*/
protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
rpcServer.scheduleRunAsync(runnable, unit.toMillis(delay));
}

//……
}
RpcEndpoint 提供了 scheduleRunAsync,其最后调用的是 rpcServer.scheduleRunAsync
小结

slot.idle.timeout 默认为 HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue(),即 50000L 毫秒
SlotPool 在 start 方法里头,调用 scheduleRunAsync 方法,延时 idleSlotTimeout 调度执行 checkIdleSlot;checkIdleSlot 方法会挨个检查 availableSlots 的 SlotAndTimestamp,判断当前时间与 slotAndTimestamp.timestamp 的时间差是否超过 idleSlotTimeout,超过的话,则放入 expiredSlots,之后对 expiredSlots 挨个进行 availableSlots.tryRemove,然后调用 TaskManagerGateway.freeSlot 进行释放,之后再次调用 scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout) 进行下一次的延时调度检测
RpcEndpoint 提供了 scheduleRunAsync,其最后调用的是 rpcServer.scheduleRunAsync

doc
slot-idle-timeout

退出移动版