聊聊flink taskmanager的jvm-exit-on-oom配置


本文主要研究一下flink taskmanager的jvm-exit-on-oom配置
taskmanager.jvm-exit-on-oom
flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@PublicEvolving
public class TaskManagerOptions {
//……

/**
* Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.
*/
public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
key(“taskmanager.jvm-exit-on-oom”)
.defaultValue(false)
.withDescription(“Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.”);

//……
}
taskmanager.jvm-exit-on-oom配置默认为false,用于指定当task线程抛出OutOfMemoryError的时候,是否需要kill掉TaskManager
TaskManagerConfiguration
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {

private static final Logger LOG = LoggerFactory.getLogger(TaskManagerConfiguration.class);

private final int numberSlots;

private final String[] tmpDirectories;

private final Time timeout;

// null indicates an infinite duration
@Nullable
private final Time maxRegistrationDuration;

private final Time initialRegistrationPause;
private final Time maxRegistrationPause;
private final Time refusedRegistrationPause;

private final UnmodifiableConfiguration configuration;

private final boolean exitJvmOnOutOfMemory;

private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder;

private final String[] alwaysParentFirstLoaderPatterns;

@Nullable
private final String taskManagerLogPath;

@Nullable
private final String taskManagerStdoutPath;

public TaskManagerConfiguration(
int numberSlots,
String[] tmpDirectories,
Time timeout,
@Nullable Time maxRegistrationDuration,
Time initialRegistrationPause,
Time maxRegistrationPause,
Time refusedRegistrationPause,
Configuration configuration,
boolean exitJvmOnOutOfMemory,
FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
String[] alwaysParentFirstLoaderPatterns,
@Nullable String taskManagerLogPath,
@Nullable String taskManagerStdoutPath) {

this.numberSlots = numberSlots;
this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);
this.timeout = Preconditions.checkNotNull(timeout);
this.maxRegistrationDuration = maxRegistrationDuration;
this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);
this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause);
this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
this.classLoaderResolveOrder = classLoaderResolveOrder;
this.alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderPatterns;
this.taskManagerLogPath = taskManagerLogPath;
this.taskManagerStdoutPath = taskManagerStdoutPath;
}

public int getNumberSlots() {
return numberSlots;
}

public Time getTimeout() {
return timeout;
}

@Nullable
public Time getMaxRegistrationDuration() {
return maxRegistrationDuration;
}

public Time getInitialRegistrationPause() {
return initialRegistrationPause;
}

@Nullable
public Time getMaxRegistrationPause() {
return maxRegistrationPause;
}

public Time getRefusedRegistrationPause() {
return refusedRegistrationPause;
}

@Override
public Configuration getConfiguration() {
return configuration;
}

@Override
public String[] getTmpDirectories() {
return tmpDirectories;
}

@Override
public boolean shouldExitJvmOnOutOfMemoryError() {
return exitJvmOnOutOfMemory;
}

public FlinkUserCodeClassLoaders.ResolveOrder getClassLoaderResolveOrder() {
return classLoaderResolveOrder;
}

public String[] getAlwaysParentFirstLoaderPatterns() {
return alwaysParentFirstLoaderPatterns;
}

@Nullable
public String getTaskManagerLogPath() {
return taskManagerLogPath;
}

@Nullable
public String getTaskManagerStdoutPath() {
return taskManagerStdoutPath;
}

// ——————————————————————————————–
// Static factory methods
// ——————————————————————————————–

public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);

if (numberSlots == -1) {
numberSlots = 1;
}

final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(configuration);

final Time timeout;

try {
timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
} catch (Exception e) {
throw new IllegalArgumentException(
“Invalid format for ‘” + AkkaOptions.ASK_TIMEOUT.key() +
“‘.Use formats like ’50 s’ or ‘1 min’ to specify the timeout.”);
}

LOG.info(“Messages have a max timeout of ” + timeout);

final Time finiteRegistrationDuration;

try {
Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.REGISTRATION_TIMEOUT));
if (maxRegistrationDuration.isFinite()) {
finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis());
} else {
finiteRegistrationDuration = null;
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException(“Invalid format for parameter ” +
TaskManagerOptions.REGISTRATION_TIMEOUT.key(), e);
}

final Time initialRegistrationPause;
try {
Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF));
if (pause.isFinite()) {
initialRegistrationPause = Time.milliseconds(pause.toMillis());
} else {
throw new IllegalArgumentException(“The initial registration pause must be finite: ” + pause);
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException(“Invalid format for parameter ” +
TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
}

final Time maxRegistrationPause;
try {
Duration pause = Duration.create(configuration.getString(
TaskManagerOptions.REGISTRATION_MAX_BACKOFF));
if (pause.isFinite()) {
maxRegistrationPause = Time.milliseconds(pause.toMillis());
} else {
throw new IllegalArgumentException(“The maximum registration pause must be finite: ” + pause);
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException(“Invalid format for parameter ” +
TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
}

final Time refusedRegistrationPause;
try {
Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF));
if (pause.isFinite()) {
refusedRegistrationPause = Time.milliseconds(pause.toMillis());
} else {
throw new IllegalArgumentException(“The refused registration pause must be finite: ” + pause);
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException(“Invalid format for parameter ” +
TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
}

final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);

final String classLoaderResolveOrder =
configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);

final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(configuration);

final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty(“log.file”));
final String taskManagerStdoutPath;

if (taskManagerLogPath != null) {
final int extension = taskManagerLogPath.lastIndexOf(‘.’);

if (extension > 0) {
taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + “.out”;
} else {
taskManagerStdoutPath = null;
}
} else {
taskManagerStdoutPath = null;
}

return new TaskManagerConfiguration(
numberSlots,
tmpDirPaths,
timeout,
finiteRegistrationDuration,
initialRegistrationPause,
maxRegistrationPause,
refusedRegistrationPause,
configuration,
exitOnOom,
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
alwaysParentFirstLoaderPatterns,
taskManagerLogPath,
taskManagerStdoutPath);
}
}
TaskManagerConfiguration的静态方法fromConfiguration通过configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY)读取exitOnOom,然后传到构造器中的exitJvmOnOutOfMemory属性;同时提供了shouldExitJvmOnOutOfMemoryError方法来读取exitJvmOnOutOfMemory属性
Task
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
public class Task implements Runnable, TaskActions, CheckpointListener {
//……

@Override
public void run() {

// —————————-
// Initial State transition
// —————————-
//……

// all resource acquisitions and registrations from here on
// need to be undone in the end
Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
AbstractInvokable invokable = null;

try {
//……

// —————————————————————-
// call the user code initialization methods
// —————————————————————-

TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());

Environment env = new RuntimeEnvironment(
jobId,
vertexId,
executionId,
executionConfig,
taskInfo,
jobConfiguration,
taskConfiguration,
userCodeClassLoader,
memoryManager,
ioManager,
broadcastVariableManager,
taskStateManager,
accumulatorRegistry,
kvStateRegistry,
inputSplitProvider,
distributedCacheEntries,
producedPartitions,
inputGates,
network.getTaskEventDispatcher(),
checkpointResponder,
taskManagerConfig,
metrics,
this);

// now load and instantiate the task’s invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

// —————————————————————-
// actual task core work
// —————————————————————-

// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;

// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}

// notify everyone that we switched to running
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);

// run the invokable
invokable.invoke();

// make sure, we enter the catch block if the task leaves the invoke() method due
// to the fact that it has been canceled
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}

// —————————————————————-
// finalization of a successful execution
// —————————————————————-

// finish the produced partitions. if this fails, we consider the execution failed.
for (ResultPartition partition : producedPartitions) {
if (partition != null) {
partition.finish();
}
}

// try to mark the task as finished
// if that fails, the task was canceled/failed in the meantime
if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
throw new CancelTaskException();
}
}
catch (Throwable t) {

// unwrap wrapped exceptions to make stack traces more compact
if (t instanceof WrappingRuntimeException) {
t = ((WrappingRuntimeException) t).unwrap();
}

// —————————————————————-
// the execution failed. either the invokable code properly failed, or
// an exception was thrown as a side effect of cancelling
// —————————————————————-

try {
// check if the exception is unrecoverable
if (ExceptionUtils.isJvmFatalError(t) ||
(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) {

// terminate the JVM immediately
// don’t attempt a clean shutdown, because we cannot expect the clean shutdown to complete
try {
LOG.error(“Encountered fatal error {} – terminating the JVM”, t.getClass().getName(), t);
} finally {
Runtime.getRuntime().halt(-1);
}
}

// transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED
// loop for multiple retries during concurrent state changes via calls to cancel() or
// to failExternally()
while (true) {
ExecutionState current = this.executionState;

if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
if (t instanceof CancelTaskException) {
if (transitionState(current, ExecutionState.CANCELED)) {
cancelInvokable(invokable);
break;
}
}
else {
if (transitionState(current, ExecutionState.FAILED, t)) {
// proper failure of the task. record the exception as the root cause
failureCause = t;
cancelInvokable(invokable);

break;
}
}
}
else if (current == ExecutionState.CANCELING) {
if (transitionState(current, ExecutionState.CANCELED)) {
break;
}
}
else if (current == ExecutionState.FAILED) {
// in state failed already, no transition necessary any more
break;
}
// unexpected state, go to failed
else if (transitionState(current, ExecutionState.FAILED, t)) {
LOG.error(“Unexpected state in task {} ({}) during an exception: {}.”, taskNameWithSubtask, executionId, current);
break;
}
// else fall through the loop and
}
}
catch (Throwable tt) {
String message = String.format(“FATAL – exception in exception handler of task %s (%s).”, taskNameWithSubtask, executionId);
LOG.error(message, tt);
notifyFatalError(message, tt);
}
}
finally {
//……
}
}

//……
}
Task实现了Runnable接口,其run方法对invokable.invoke()进行了try catch,在catch的时候会判断,如果是ExceptionUtils.isJvmFatalError(t)或者(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()),则会调用Runtime.getRuntime().halt(-1)来停止JVM
ExceptionUtils.isJvmFatalError
flink-1.7.2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@Internal
public final class ExceptionUtils {
//……

/**
* Checks whether the given exception indicates a situation that may leave the
* JVM in a corrupted state, meaning a state where continued normal operation can only be
* guaranteed via clean process restart.
*
* <p>Currently considered fatal exceptions are Virtual Machine errors indicating
* that the JVM is corrupted, like {@link InternalError}, {@link UnknownError},
* and {@link java.util.zip.ZipError} (a special case of InternalError).
* The {@link ThreadDeath} exception is also treated as a fatal error, because when
* a thread is forcefully stopped, there is a high chance that parts of the system
* are in an inconsistent state.
*
* @param t The exception to check.
* @return True, if the exception is considered fatal to the JVM, false otherwise.
*/
public static boolean isJvmFatalError(Throwable t) {
return (t instanceof InternalError) || (t instanceof UnknownError) || (t instanceof ThreadDeath);
}

//……
}
isJvmFatalError方法判断Throwable是否是InternalError或者UnknownError或者ThreadDeath,如果是则返回true
Runtime.getRuntime().halt
java.base/java/lang/Runtime.java
public class Runtime {
//……

private static final Runtime currentRuntime = new Runtime();

/**
* Returns the runtime object associated with the current Java application.
* Most of the methods of class {@code Runtime} are instance
* methods and must be invoked with respect to the current runtime object.
*
* @return the {@code Runtime} object associated with the current
* Java application.
*/
public static Runtime getRuntime() {
return currentRuntime;
}

/**
* Forcibly terminates the currently running Java virtual machine. This
* method never returns normally.
*
* <p> This method should be used with extreme caution. Unlike the
* {@link #exit exit} method, this method does not cause shutdown
* hooks to be started. If the shutdown sequence has already been
* initiated then this method does not wait for any running
* shutdown hooks to finish their work.
*
* @param status
* Termination status. By convention, a nonzero status code
* indicates abnormal termination. If the {@link Runtime#exit exit}
* (equivalently, {@link System#exit(int) System.exit}) method
* has already been invoked then this status code
* will override the status code passed to that method.
*
* @throws SecurityException
* If a security manager is present and its
* {@link SecurityManager#checkExit checkExit} method
* does not permit an exit with the specified status
*
* @see #exit
* @see #addShutdownHook
* @see #removeShutdownHook
* @since 1.3
*/
public void halt(int status) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkExit(status);
}
Shutdown.beforeHalt();
Shutdown.halt(status);
}

//……
}
halt方法在SecurityManager不为null是会调用SecurityManager.checkExit;然后调用Shutdown.beforeHalt()以及Shutdown.halt(status)来停止JVM
小结

taskmanager.jvm-exit-on-oom配置默认为false,用于指定当task线程抛出OutOfMemoryError的时候,是否需要kill掉TaskManager
TaskManagerConfiguration的静态方法fromConfiguration通过configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY)读取exitOnOom,然后传到构造器中的exitJvmOnOutOfMemory属性;同时提供了shouldExitJvmOnOutOfMemoryError方法来读取exitJvmOnOutOfMemory属性
Task实现了Runnable接口,其run方法对invokable.invoke()进行了try catch,在catch的时候会判断,如果是ExceptionUtils.isJvmFatalError(t)或者(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()),则会调用Runtime.getRuntime().halt(-1)来停止JVM;isJvmFatalError方法判断Throwable是否是InternalError或者UnknownError或者ThreadDeath,如果是则返回true;halt方法在SecurityManager不为null是会调用SecurityManager.checkExit;然后调用Shutdown.beforeHalt()以及Shutdown.halt(status)来停止JVM

doc
taskmanager.jvm-exit-on-oom

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理