乐趣区

聊聊flink的RestClientConfiguration


本文主要研究一下 flink 的 RestClientConfiguration
RestClientConfiguration
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
public final class RestClientConfiguration {

@Nullable
private final SSLHandlerFactory sslHandlerFactory;

private final long connectionTimeout;

private final long idlenessTimeout;

private final int maxContentLength;

private RestClientConfiguration(
@Nullable final SSLHandlerFactory sslHandlerFactory,
final long connectionTimeout,
final long idlenessTimeout,
final int maxContentLength) {
checkArgument(maxContentLength > 0, “maxContentLength must be positive, was: %d”, maxContentLength);
this.sslHandlerFactory = sslHandlerFactory;
this.connectionTimeout = connectionTimeout;
this.idlenessTimeout = idlenessTimeout;
this.maxContentLength = maxContentLength;
}

/**
* Returns the {@link SSLEngine} that the REST client endpoint should use.
*
* @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
*/
@Nullable
public SSLHandlerFactory getSslHandlerFactory() {
return sslHandlerFactory;
}

/**
* {@see RestOptions#CONNECTION_TIMEOUT}.
*/
public long getConnectionTimeout() {
return connectionTimeout;
}

/**
* {@see RestOptions#IDLENESS_TIMEOUT}.
*/
public long getIdlenessTimeout() {
return idlenessTimeout;
}

/**
* Returns the max content length that the REST client endpoint could handle.
*
* @return max content length that the REST client endpoint could handle
*/
public int getMaxContentLength() {
return maxContentLength;
}

/**
* Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}.
*
* @param config configuration from which the REST client endpoint configuration should be created from
* @return REST client endpoint configuration
* @throws ConfigurationException if SSL was configured incorrectly
*/

public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
Preconditions.checkNotNull(config);

final SSLHandlerFactory sslHandlerFactory;
if (SSLUtils.isRestSSLEnabled(config)) {
try {
sslHandlerFactory = SSLUtils.createRestClientSSLEngineFactory(config);
} catch (Exception e) {
throw new ConfigurationException(“Failed to initialize SSLContext for the REST client”, e);
}
} else {
sslHandlerFactory = null;
}

final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);

final long idlenessTimeout = config.getLong(RestOptions.IDLENESS_TIMEOUT);

int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);

return new RestClientConfiguration(sslHandlerFactory, connectionTimeout, idlenessTimeout, maxContentLength);
}
}

RestClientConfiguration 有四个属性,分别是 sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength
fromConfiguration 方法从 Configuration 中创建 SSLHandlerFactory,其读取的是相关配置有 security.ssl.rest.enabled,默认为 false;security.ssl.protocol,默认为 TLSv1.2;security.ssl.algorithms,默认为 TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为 false
connectionTimeout 读取的是 rest.connection-timeout 配置,默认是 15000 毫秒;idlenessTimeout 读取的是 rest.idleness-timeout 配置,默认 5 分钟;maxContentLength 读取的是 rest.client.max-content-length 配置,默认是 104_857_600

RestClient
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
public class RestClient implements AutoCloseableAsync {
private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);

private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();

// used to open connections to a rest server endpoint
private final Executor executor;

private final Bootstrap bootstrap;

private final CompletableFuture<Void> terminationFuture;

private final AtomicBoolean isRunning = new AtomicBoolean(true);

public RestClient(RestClientConfiguration configuration, Executor executor) {
Preconditions.checkNotNull(configuration);
this.executor = Preconditions.checkNotNull(executor);
this.terminationFuture = new CompletableFuture<>();

final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory();
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
try {
// SSL should be the first handler in the pipeline
if (sslHandlerFactory != null) {
socketChannel.pipeline().addLast(“ssl”, sslHandlerFactory.createNettySSLHandler());
}

socketChannel.pipeline()
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
.addLast(new ChunkedWriteHandler()) // required for multipart-requests
.addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS))
.addLast(new ClientHandler());
} catch (Throwable t) {
t.printStackTrace();
ExceptionUtils.rethrow(t);
}
}
};
NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory(“flink-rest-client-netty”));

bootstrap = new Bootstrap();
bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout()))
.group(group)
.channel(NioSocketChannel.class)
.handler(initializer);

LOG.info(“Rest client endpoint started.”);
}

@Override
public CompletableFuture<Void> closeAsync() {
return shutdownInternally(Time.seconds(10L));
}

public void shutdown(Time timeout) {
final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout);

try {
shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
LOG.info(“Rest endpoint shutdown complete.”);
} catch (Exception e) {
LOG.warn(“Rest endpoint shutdown failed.”, e);
}
}

private CompletableFuture<Void> shutdownInternally(Time timeout) {
if (isRunning.compareAndSet(true, false)) {
LOG.info(“Shutting down rest endpoint.”);

if (bootstrap != null) {
if (bootstrap.group() != null) {
bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
.addListener(finished -> {
if (finished.isSuccess()) {
terminationFuture.complete(null);
} else {
terminationFuture.completeExceptionally(finished.cause());
}
});
}
}
}
return terminationFuture;
}

//……
}
RestClient 的构造器接收 RestClientConfiguration 及 Executor 两个参数,构造器里头创建了 netty 的 Bootstrap,其中 ChannelOption.CONNECT_TIMEOUT_MILLIS 使用的是 configuration.getConnectionTimeout();IdleStateHandler 的 readerIdleTime、writerIdleTime、allIdleTime 使用的是 configuration.getIdlenessTimeout();HttpObjectAggregator 的 maxContentLength 使用的是 configuration.getMaxContentLength();SSLHandlerFactory 使用的是 configuration.getSslHandlerFactory()
小结

RestClientConfiguration 有四个属性,分别是 sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength;fromConfiguration 方法从 Configuration 中创建 SSLHandlerFactory,其读取的是相关配置有 security.ssl.rest.enabled,默认为 false;security.ssl.protocol,默认为 TLSv1.2;security.ssl.algorithms,默认为 TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为 false
connectionTimeout 读取的是 rest.connection-timeout 配置,默认是 15000 毫秒;idlenessTimeout 读取的是 rest.idleness-timeout 配置,默认 5 分钟;maxContentLength 读取的是 rest.client.max-content-length 配置,默认是 104_857_600
RestClient 的构造器接收 RestClientConfiguration 及 Executor 两个参数,构造器里头创建了 netty 的 Bootstrap,其中 ChannelOption.CONNECT_TIMEOUT_MILLIS 使用的是 configuration.getConnectionTimeout();IdleStateHandler 的 readerIdleTime、writerIdleTime、allIdleTime 使用的是 configuration.getIdlenessTimeout();HttpObjectAggregator 的 maxContentLength 使用的是 configuration.getMaxContentLength();SSLHandlerFactory 使用的是 configuration.getSslHandlerFactory()

doc
RestClientConfiguration

退出移动版