序
本文主要研究一下 flink 的 RestartStrategies
RestartStrategies
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
@PublicEvolving
public class RestartStrategies {
/**
* Generates NoRestartStrategyConfiguration.
*
* @return NoRestartStrategyConfiguration
*/
public static RestartStrategyConfiguration noRestart() {
return new NoRestartStrategyConfiguration();
}
public static RestartStrategyConfiguration fallBackRestart() {
return new FallbackRestartStrategyConfiguration();
}
/**
* Generates a FixedDelayRestartStrategyConfiguration.
*
* @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy
* @param delayBetweenAttempts Delay in-between restart attempts for the FixedDelayRestartStrategy
* @return FixedDelayRestartStrategy
*/
public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, long delayBetweenAttempts) {
return fixedDelayRestart(restartAttempts, Time.of(delayBetweenAttempts, TimeUnit.MILLISECONDS));
}
/**
* Generates a FixedDelayRestartStrategyConfiguration.
*
* @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy
* @param delayInterval Delay in-between restart attempts for the FixedDelayRestartStrategy
* @return FixedDelayRestartStrategy
*/
public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, Time delayInterval) {
return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayInterval);
}
/**
* Generates a FailureRateRestartStrategyConfiguration.
*
* @param failureRate Maximum number of restarts in given interval {@code failureInterval} before failing a job
* @param failureInterval Time interval for failures
* @param delayInterval Delay in-between restart attempts
*/
public static FailureRateRestartStrategyConfiguration failureRateRestart(
int failureRate, Time failureInterval, Time delayInterval) {
return new FailureRateRestartStrategyConfiguration(failureRate, failureInterval, delayInterval);
}
//……
}
RestartStrategies 提供了 noRestart、fallBackRestart、fixedDelayRestart、failureRateRestart 静态方法用于构建 RestartStrategyConfiguration
RestartStrategyConfiguration
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
public abstract static class RestartStrategyConfiguration implements Serializable {
private static final long serialVersionUID = 6285853591578313960L;
private RestartStrategyConfiguration() {}
/**
* Returns a description which is shown in the web interface.
*
* @return Description of the restart strategy
*/
public abstract String getDescription();
}
RestartStrategyConfiguration 是个抽象类,它定义了 getDescription 抽象方法,它有 NoRestartStrategyConfiguration、FixedDelayRestartStrategyConfiguration、FailureRateRestartStrategyConfiguration、FallbackRestartStrategyConfiguration 这几个子类
NoRestartStrategyConfiguration
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
public static final class NoRestartStrategyConfiguration extends RestartStrategyConfiguration {
private static final long serialVersionUID = -5894362702943349962L;
@Override
public String getDescription() {
return “Restart deactivated.”;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
return o instanceof NoRestartStrategyConfiguration;
}
@Override
public int hashCode() {
return Objects.hash();
}
}
NoRestartStrategyConfiguration 继承了 RestartStrategyConfiguration,它代表 no restart strategy
FixedDelayRestartStrategyConfiguration
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
public static final class FixedDelayRestartStrategyConfiguration extends RestartStrategyConfiguration {
private static final long serialVersionUID = 4149870149673363190L;
private final int restartAttempts;
private final Time delayBetweenAttemptsInterval;
FixedDelayRestartStrategyConfiguration(int restartAttempts, Time delayBetweenAttemptsInterval) {
this.restartAttempts = restartAttempts;
this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
}
public int getRestartAttempts() {
return restartAttempts;
}
public Time getDelayBetweenAttemptsInterval() {
return delayBetweenAttemptsInterval;
}
@Override
public int hashCode() {
int result = restartAttempts;
result = 31 * result + (delayBetweenAttemptsInterval != null ? delayBetweenAttemptsInterval.hashCode() : 0);
return result;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof FixedDelayRestartStrategyConfiguration) {
FixedDelayRestartStrategyConfiguration other = (FixedDelayRestartStrategyConfiguration) obj;
return restartAttempts == other.restartAttempts && delayBetweenAttemptsInterval.equals(other.delayBetweenAttemptsInterval);
} else {
return false;
}
}
@Override
public String getDescription() {
return “Restart with fixed delay (” + delayBetweenAttemptsInterval + “). #”
+ restartAttempts + ” restart attempts.”;
}
}
FixedDelayRestartStrategyConfiguration 继承了 RestartStrategyConfiguration,它代表 fixed delay restart strategy,它有 restartAttempts 及 delayBetweenAttemptsInterval 两个属性
FailureRateRestartStrategyConfiguration
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
public static final class FailureRateRestartStrategyConfiguration extends RestartStrategyConfiguration {
private static final long serialVersionUID = 1195028697539661739L;
private final int maxFailureRate;
private final Time failureInterval;
private final Time delayBetweenAttemptsInterval;
public FailureRateRestartStrategyConfiguration(int maxFailureRate, Time failureInterval, Time delayBetweenAttemptsInterval) {
this.maxFailureRate = maxFailureRate;
this.failureInterval = failureInterval;
this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
}
public int getMaxFailureRate() {
return maxFailureRate;
}
public Time getFailureInterval() {
return failureInterval;
}
public Time getDelayBetweenAttemptsInterval() {
return delayBetweenAttemptsInterval;
}
@Override
public String getDescription() {
return “Failure rate restart with maximum of ” + maxFailureRate + ” failures within interval ” + failureInterval.toString()
+ ” and fixed delay ” + delayBetweenAttemptsInterval.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FailureRateRestartStrategyConfiguration that = (FailureRateRestartStrategyConfiguration) o;
return maxFailureRate == that.maxFailureRate &&
Objects.equals(failureInterval, that.failureInterval) &&
Objects.equals(delayBetweenAttemptsInterval, that.delayBetweenAttemptsInterval);
}
@Override
public int hashCode() {
return Objects.hash(maxFailureRate, failureInterval, delayBetweenAttemptsInterval);
}
}
FailureRateRestartStrategyConfiguration 继承了 RestartStrategyConfiguration,它代表 failure rate restart strategy,它有 maxFailureRate、failureInterval、delayBetweenAttemptsInterval 三个属性
FallbackRestartStrategyConfiguration
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration {
private static final long serialVersionUID = -4441787204284085544L;
@Override
public String getDescription() {
return “Cluster level default restart strategy”;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
return o instanceof FallbackRestartStrategyConfiguration;
}
@Override
public int hashCode() {
return Objects.hash();
}
}
FallbackRestartStrategyConfiguration 继承了 RestartStrategyConfiguration,它代表 Cluster level default restart strategy
RestartStrategyResolving
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
public final class RestartStrategyResolving {
/**
* Resolves which {@link RestartStrategy} to use. It should be used only on the server side.
* The resolving strategy is as follows:
* <ol>
* <li>Strategy set within job graph.</li>
* <li>Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing
* is enabled.</li>
* <li>If no strategy was set on client and server side and checkpointing was enabled then
* {@link FixedDelayRestartStrategy} is used</li>
* </ol>
*
* @param clientConfiguration restart configuration given within the job graph
* @param serverStrategyFactory default server side strategy factory
* @param isCheckpointingEnabled if checkpointing was enabled for the job
* @return resolved strategy
*/
public static RestartStrategy resolve(
RestartStrategies.RestartStrategyConfiguration clientConfiguration,
RestartStrategyFactory serverStrategyFactory,
boolean isCheckpointingEnabled) {
final RestartStrategy clientSideRestartStrategy =
RestartStrategyFactory.createRestartStrategy(clientConfiguration);
if (clientSideRestartStrategy != null) {
return clientSideRestartStrategy;
} else {
if (serverStrategyFactory instanceof NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) {
return ((NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) serverStrategyFactory)
.createRestartStrategy(isCheckpointingEnabled);
} else {
return serverStrategyFactory.createRestartStrategy();
}
}
}
private RestartStrategyResolving() {
}
}
RestartStrategyResolving 提供了一个静态方法 resolve,用于解析 RestartStrategies.RestartStrategyConfiguration,然后使用 RestartStrategyFactory 创建 RestartStrategy
RestartStrategy
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
public interface RestartStrategy {
/**
* True if the restart strategy can be applied to restart the {@link ExecutionGraph}.
*
* @return true if restart is possible, otherwise false
*/
boolean canRestart();
/**
* Called by the ExecutionGraph to eventually trigger a full recovery.
* The recovery must be triggered on the given callback object, and may be delayed
* with the help of the given scheduled executor.
*
* <p>The thread that calls this method is not supposed to block/sleep.
*
* @param restarter The hook to restart the ExecutionGraph
* @param executor An scheduled executor to delay the restart
*/
void restart(RestartCallback restarter, ScheduledExecutor executor);
}
RestartStrategy 定义了 canRestart 及 restart 两个方法,它有 NoRestartStrategy、FixedDelayRestartStrategy、FailureRateRestartStrategy 这几个子类
NoRestartStrategy
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
public class NoRestartStrategy implements RestartStrategy {
@Override
public boolean canRestart() {
return false;
}
@Override
public void restart(RestartCallback restarter, ScheduledExecutor executor) {
throw new UnsupportedOperationException(“NoRestartStrategy does not support restart.”);
}
/**
* Creates a NoRestartStrategyFactory instance.
*
* @param configuration Configuration object which is ignored
* @return NoRestartStrategyFactory instance
*/
public static NoRestartStrategyFactory createFactory(Configuration configuration) {
return new NoRestartStrategyFactory();
}
@Override
public String toString() {
return “NoRestartStrategy”;
}
public static class NoRestartStrategyFactory extends RestartStrategyFactory {
private static final long serialVersionUID = -1809462525812787862L;
@Override
public RestartStrategy createRestartStrategy() {
return new NoRestartStrategy();
}
}
}
NoRestartStrategy 实现了 RestartStrategy 接口,它的 canRestart 方法返回 false,restart 方法抛出 UnsupportedOperationException
FixedDelayRestartStrategy
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
public class FixedDelayRestartStrategy implements RestartStrategy {
private final int maxNumberRestartAttempts;
private final long delayBetweenRestartAttempts;
private int currentRestartAttempt;
public FixedDelayRestartStrategy(
int maxNumberRestartAttempts,
long delayBetweenRestartAttempts) {
Preconditions.checkArgument(maxNumberRestartAttempts >= 0, “Maximum number of restart attempts must be positive.”);
Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, “Delay between restart attempts must be positive”);
this.maxNumberRestartAttempts = maxNumberRestartAttempts;
this.delayBetweenRestartAttempts = delayBetweenRestartAttempts;
currentRestartAttempt = 0;
}
public int getCurrentRestartAttempt() {
return currentRestartAttempt;
}
@Override
public boolean canRestart() {
return currentRestartAttempt < maxNumberRestartAttempts;
}
@Override
public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
currentRestartAttempt++;
executor.schedule(new Runnable() {
@Override
public void run() {
restarter.triggerFullRecovery();
}
}, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS);
}
/**
* Creates a FixedDelayRestartStrategy from the given Configuration.
*
* @param configuration Configuration containing the parameter values for the restart strategy
* @return Initialized instance of FixedDelayRestartStrategy
* @throws Exception
*/
public static FixedDelayRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY);
long delay;
try {
delay = Duration.apply(delayString).toMillis();
} catch (NumberFormatException nfe) {
throw new Exception(“Invalid config value for ” +
ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + “: ” + delayString +
“. Value must be a valid duration (such as ‘100 milli’ or ’10 s’)”);
}
return new FixedDelayRestartStrategyFactory(maxAttempts, delay);
}
@Override
public String toString() {
return “FixedDelayRestartStrategy(” +
“maxNumberRestartAttempts=” + maxNumberRestartAttempts +
“, delayBetweenRestartAttempts=” + delayBetweenRestartAttempts +
‘)’;
}
public static class FixedDelayRestartStrategyFactory extends RestartStrategyFactory {
private static final long serialVersionUID = 6642934067762271950L;
private final int maxAttempts;
private final long delay;
public FixedDelayRestartStrategyFactory(int maxAttempts, long delay) {
this.maxAttempts = maxAttempts;
this.delay = delay;
}
@Override
public RestartStrategy createRestartStrategy() {
return new FixedDelayRestartStrategy(maxAttempts, delay);
}
}
}
FixedDelayRestartStrategy 实现了 RestartStrategy 接口,它的 canRestart 方法依据 currentRestartAttempt 及 maxNumberRestartAttempts 来判断;restart 方法则直接调用 ScheduledExecutor.schedule 方法,延时 delayBetweenRestartAttempts 毫秒执行 RestartCallback.triggerFullRecovery()
FailureRateRestartStrategy
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
public class FailureRateRestartStrategy implements RestartStrategy {
private final Time failuresInterval;
private final Time delayInterval;
private final int maxFailuresPerInterval;
private final ArrayDeque<Long> restartTimestampsDeque;
public FailureRateRestartStrategy(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) {
Preconditions.checkNotNull(failuresInterval, “Failures interval cannot be null.”);
Preconditions.checkNotNull(delayInterval, “Delay interval cannot be null.”);
Preconditions.checkArgument(maxFailuresPerInterval > 0, “Maximum number of restart attempts per time unit must be greater than 0.”);
Preconditions.checkArgument(failuresInterval.getSize() > 0, “Failures interval must be greater than 0 ms.”);
Preconditions.checkArgument(delayInterval.getSize() >= 0, “Delay interval must be at least 0 ms.”);
this.failuresInterval = failuresInterval;
this.delayInterval = delayInterval;
this.maxFailuresPerInterval = maxFailuresPerInterval;
this.restartTimestampsDeque = new ArrayDeque<>(maxFailuresPerInterval);
}
@Override
public boolean canRestart() {
if (isRestartTimestampsQueueFull()) {
Long now = System.currentTimeMillis();
Long earliestFailure = restartTimestampsDeque.peek();
return (now – earliestFailure) > failuresInterval.toMilliseconds();
} else {
return true;
}
}
@Override
public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
if (isRestartTimestampsQueueFull()) {
restartTimestampsDeque.remove();
}
restartTimestampsDeque.add(System.currentTimeMillis());
executor.schedule(new Runnable() {
@Override
public void run() {
restarter.triggerFullRecovery();
}
}, delayInterval.getSize(), delayInterval.getUnit());
}
private boolean isRestartTimestampsQueueFull() {
return restartTimestampsDeque.size() >= maxFailuresPerInterval;
}
@Override
public String toString() {
return “FailureRateRestartStrategy(” +
“failuresInterval=” + failuresInterval +
“delayInterval=” + delayInterval +
“maxFailuresPerInterval=” + maxFailuresPerInterval +
“)”;
}
public static FailureRateRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
int maxFailuresPerInterval = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1);
String failuresIntervalString = configuration.getString(
ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, Duration.apply(1, TimeUnit.MINUTES).toString()
);
String timeoutString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL);
String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, timeoutString);
Duration failuresInterval = Duration.apply(failuresIntervalString);
Duration delay = Duration.apply(delayString);
return new FailureRateRestartStrategyFactory(maxFailuresPerInterval, Time.milliseconds(failuresInterval.toMillis()), Time.milliseconds(delay.toMillis()));
}
public static class FailureRateRestartStrategyFactory extends RestartStrategyFactory {
private static final long serialVersionUID = -373724639430960480L;
private final int maxFailuresPerInterval;
private final Time failuresInterval;
private final Time delayInterval;
public FailureRateRestartStrategyFactory(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) {
this.maxFailuresPerInterval = maxFailuresPerInterval;
this.failuresInterval = Preconditions.checkNotNull(failuresInterval);
this.delayInterval = Preconditions.checkNotNull(delayInterval);
}
@Override
public RestartStrategy createRestartStrategy() {
return new FailureRateRestartStrategy(maxFailuresPerInterval, failuresInterval, delayInterval);
}
}
}
FailureRateRestartStrategy 实现了 RestartStrategy 接口,它的 canRestart 方法在 restartTimestampsDeque 队列大小小于 maxFailuresPerInterval 时返回 true,大于等于 maxFailuresPerInterval 时则判断当前时间距离 earliestFailure 是否大于 failuresInterval;restart 方法则往 restartTimestampsDeque 添加当前时间,然后调用 ScheduledExecutor.schedule 方法,延时 delayInterval 执行 RestartCallback.triggerFullRecovery()
小结
RestartStrategies 提供了 noRestart、fallBackRestart、fixedDelayRestart、failureRateRestart 静态方法用于构建 RestartStrategyConfiguration
RestartStrategyConfiguration 是个抽象类,它定义了 getDescription 抽象方法,它有 NoRestartStrategyConfiguration、FixedDelayRestartStrategyConfiguration、FailureRateRestartStrategyConfiguration、FallbackRestartStrategyConfiguration 这几个子类
RestartStrategyResolving 提供了一个静态方法 resolve,用于解析 RestartStrategies.RestartStrategyConfiguration,然后使用 RestartStrategyFactory 创建 RestartStrategy;RestartStrategy 定义了 canRestart 及 restart 两个方法,它有 NoRestartStrategy、FixedDelayRestartStrategy、FailureRateRestartStrategy 这几个子类
doc
Restart Strategies