聊聊flink的RestClusterClientConfiguration

29次阅读

共计 4923 个字符,预计需要花费 13 分钟才能阅读完成。


本文主要研究一下 flink 的 RestClusterClientConfiguration
RestClusterClientConfiguration
flink-release-1.7.2/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
public final class RestClusterClientConfiguration {

private final RestClientConfiguration restClientConfiguration;

private final long awaitLeaderTimeout;

private final int retryMaxAttempts;

private final long retryDelay;

private RestClusterClientConfiguration(
final RestClientConfiguration endpointConfiguration,
final long awaitLeaderTimeout,
final int retryMaxAttempts,
final long retryDelay) {
checkArgument(awaitLeaderTimeout >= 0, “awaitLeaderTimeout must be equal to or greater than 0”);
checkArgument(retryMaxAttempts >= 0, “retryMaxAttempts must be equal to or greater than 0”);
checkArgument(retryDelay >= 0, “retryDelay must be equal to or greater than 0”);

this.restClientConfiguration = Preconditions.checkNotNull(endpointConfiguration);
this.awaitLeaderTimeout = awaitLeaderTimeout;
this.retryMaxAttempts = retryMaxAttempts;
this.retryDelay = retryDelay;
}

public RestClientConfiguration getRestClientConfiguration() {
return restClientConfiguration;
}

/**
* @see RestOptions#AWAIT_LEADER_TIMEOUT
*/
public long getAwaitLeaderTimeout() {
return awaitLeaderTimeout;
}

/**
* @see RestOptions#RETRY_MAX_ATTEMPTS
*/
public int getRetryMaxAttempts() {
return retryMaxAttempts;
}

/**
* @see RestOptions#RETRY_DELAY
*/
public long getRetryDelay() {
return retryDelay;
}

public static RestClusterClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
RestClientConfiguration restClientConfiguration = RestClientConfiguration.fromConfiguration(config);

final long awaitLeaderTimeout = config.getLong(RestOptions.AWAIT_LEADER_TIMEOUT);
final int retryMaxAttempts = config.getInteger(RestOptions.RETRY_MAX_ATTEMPTS);
final long retryDelay = config.getLong(RestOptions.RETRY_DELAY);

return new RestClusterClientConfiguration(restClientConfiguration, awaitLeaderTimeout, retryMaxAttempts, retryDelay);
}
}
RestClusterClientConfiguration 除了 RestClientConfiguration 外,还有 3 个属性,分别是 awaitLeaderTimeout、retryMaxAttempts、retryDelay;awaitLeaderTimeout 读取的是 rest.await-leader-timeout 配置,默认是 30 秒;retryMaxAttempts 读取的是 rest.retry.max-attempts 配置,默认是 20;retryDelay 读取的是 rest.retry.delay 配置,默认是 3 秒
RestClusterClient
flink-release-1.7.2/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
public class RestClusterClient<T> extends ClusterClient<T> implements NewClusterClient {

private final RestClusterClientConfiguration restClusterClientConfiguration;

private final RestClient restClient;

private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory(“Flink-RestClusterClient-IO”));

private final WaitStrategy waitStrategy;

private final T clusterId;

private final LeaderRetrievalService webMonitorRetrievalService;

private final LeaderRetrievalService dispatcherRetrievalService;

private final LeaderRetriever webMonitorLeaderRetriever = new LeaderRetriever();

private final LeaderRetriever dispatcherLeaderRetriever = new LeaderRetriever();

/** ExecutorService to run operations that can be retried on exceptions. */
private ScheduledExecutorService retryExecutorService;

//……

private <C> CompletableFuture<C> retry(
CheckedSupplier<CompletableFuture<C>> operation,
Predicate<Throwable> retryPredicate) {
return FutureUtils.retryWithDelay(
CheckedSupplier.unchecked(operation),
restClusterClientConfiguration.getRetryMaxAttempts(),
Time.milliseconds(restClusterClientConfiguration.getRetryDelay()),
retryPredicate,
new ScheduledExecutorServiceAdapter(retryExecutorService));
}

@VisibleForTesting
CompletableFuture<URL> getWebMonitorBaseUrl() {
return FutureUtils.orTimeout(
webMonitorLeaderRetriever.getLeaderFuture(),
restClusterClientConfiguration.getAwaitLeaderTimeout(),
TimeUnit.MILLISECONDS)
.thenApplyAsync(leaderAddressSessionId -> {
final String url = leaderAddressSessionId.f0;
try {
return new URL(url);
} catch (MalformedURLException e) {
throw new IllegalArgumentException(“Could not parse URL from ” + url, e);
}
}, executorService);
}

//……
}

RestClusterClient 的构造器会从使用 RestClusterClientConfiguration.fromConfiguration(configuration) 方法从 Configuration 构建 RestClusterClientConfiguration
retry 方法内部使用的是 FutureUtils.retryWithDelay 方法,其 retries 参数使用的是 restClusterClientConfiguration.getRetryMaxAttempts(),retryDelay 参数使用的是 Time.milliseconds(restClusterClientConfiguration.getRetryDelay())
getWebMonitorBaseUrl 方法内部使用的是 FutureUtils.orTimeout 方法,其 timeout 参数使用的是 restClusterClientConfiguration.getAwaitLeaderTimeout()

小结

RestClusterClientConfiguration 除了 RestClientConfiguration 外,还有 3 个属性,分别是 awaitLeaderTimeout、retryMaxAttempts、retryDelay
awaitLeaderTimeout 读取的是 rest.await-leader-timeout 配置,默认是 30 秒;retryMaxAttempts 读取的是 rest.retry.max-attempts 配置,默认是 20;retryDelay 读取的是 rest.retry.delay 配置,默认是 3 秒
RestClusterClient 的 etry 方法内部使用的是 FutureUtils.retryWithDelay 方法,其 retries 参数使用的是 restClusterClientConfiguration.getRetryMaxAttempts(),retryDelay 参数使用的是 Time.milliseconds(restClusterClientConfiguration.getRetryDelay());getWebMonitorBaseUrl 方法内部使用的是 FutureUtils.orTimeout 方法,其 timeout 参数使用的是 restClusterClientConfiguration.getAwaitLeaderTimeout()

doc
RestClusterClientConfiguration

正文完
 0