聊聊flink的NetworkEnvironmentConfiguration

28次阅读

共计 21453 个字符,预计需要花费 54 分钟才能阅读完成。


本文主要研究一下 flink 的 NetworkEnvironmentConfiguration
NetworkEnvironmentConfiguration
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
public class NetworkEnvironmentConfiguration {

private final float networkBufFraction;

private final long networkBufMin;

private final long networkBufMax;

private final int networkBufferSize;

private final IOMode ioMode;

private final int partitionRequestInitialBackoff;

private final int partitionRequestMaxBackoff;

private final int networkBuffersPerChannel;

private final int floatingNetworkBuffersPerGate;

private final NettyConfig nettyConfig;

/**
* Constructor for a setup with purely local communication (no netty).
*/
public NetworkEnvironmentConfiguration(
float networkBufFraction,
long networkBufMin,
long networkBufMax,
int networkBufferSize,
IOMode ioMode,
int partitionRequestInitialBackoff,
int partitionRequestMaxBackoff,
int networkBuffersPerChannel,
int floatingNetworkBuffersPerGate) {

this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize,
ioMode,
partitionRequestInitialBackoff, partitionRequestMaxBackoff,
networkBuffersPerChannel, floatingNetworkBuffersPerGate,
null);

}

public NetworkEnvironmentConfiguration(
float networkBufFraction,
long networkBufMin,
long networkBufMax,
int networkBufferSize,
IOMode ioMode,
int partitionRequestInitialBackoff,
int partitionRequestMaxBackoff,
int networkBuffersPerChannel,
int floatingNetworkBuffersPerGate,
@Nullable NettyConfig nettyConfig) {

this.networkBufFraction = networkBufFraction;
this.networkBufMin = networkBufMin;
this.networkBufMax = networkBufMax;
this.networkBufferSize = networkBufferSize;
this.ioMode = ioMode;
this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
this.networkBuffersPerChannel = networkBuffersPerChannel;
this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
this.nettyConfig = nettyConfig;
}

// ————————————————————————

public float networkBufFraction() {
return networkBufFraction;
}

public long networkBufMin() {
return networkBufMin;
}

public long networkBufMax() {
return networkBufMax;
}

public int networkBufferSize() {
return networkBufferSize;
}

public IOMode ioMode() {
return ioMode;
}

public int partitionRequestInitialBackoff() {
return partitionRequestInitialBackoff;
}

public int partitionRequestMaxBackoff() {
return partitionRequestMaxBackoff;
}

public int networkBuffersPerChannel() {
return networkBuffersPerChannel;
}

public int floatingNetworkBuffersPerGate() {
return floatingNetworkBuffersPerGate;
}

public NettyConfig nettyConfig() {
return nettyConfig;
}

// ————————————————————————

@Override
public int hashCode() {
int result = 1;
result = 31 * result + networkBufferSize;
result = 31 * result + ioMode.hashCode();
result = 31 * result + partitionRequestInitialBackoff;
result = 31 * result + partitionRequestMaxBackoff;
result = 31 * result + networkBuffersPerChannel;
result = 31 * result + floatingNetworkBuffersPerGate;
result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
else if (obj == null || getClass() != obj.getClass()) {
return false;
}
else {
final NetworkEnvironmentConfiguration that = (NetworkEnvironmentConfiguration) obj;

return this.networkBufFraction == that.networkBufFraction &&
this.networkBufMin == that.networkBufMin &&
this.networkBufMax == that.networkBufMax &&
this.networkBufferSize == that.networkBufferSize &&
this.partitionRequestInitialBackoff == that.partitionRequestInitialBackoff &&
this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff &&
this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
this.ioMode == that.ioMode &&
(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
}
}

@Override
public String toString() {
return “NetworkEnvironmentConfiguration{” +
“networkBufFraction=” + networkBufFraction +
“, networkBufMin=” + networkBufMin +
“, networkBufMax=” + networkBufMax +
“, networkBufferSize=” + networkBufferSize +
“, ioMode=” + ioMode +
“, partitionRequestInitialBackoff=” + partitionRequestInitialBackoff +
“, partitionRequestMaxBackoff=” + partitionRequestMaxBackoff +
“, networkBuffersPerChannel=” + networkBuffersPerChannel +
“, floatingNetworkBuffersPerGate=” + floatingNetworkBuffersPerGate +
“, nettyConfig=” + nettyConfig +
‘}’;
}
}
NetworkEnvironmentConfiguration 主要是 flink network 的相关配置,里头有 networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig 属性
TaskManagerServicesConfiguration
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
public class TaskManagerServicesConfiguration {

//……

/**
* Creates the {@link NetworkEnvironmentConfiguration} from the given {@link Configuration}.
*
* @param configuration to create the network environment configuration from
* @param localTaskManagerCommunication true if task manager communication is local
* @param taskManagerAddress address of the task manager
* @param slots to start the task manager with
* @return Network environment configuration
*/
@SuppressWarnings(“deprecation”)
private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration(
Configuration configuration,
boolean localTaskManagerCommunication,
InetAddress taskManagerAddress,
int slots) throws Exception {

// —-> hosts / ports for communication and data exchange

int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT);

checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
“Leave config parameter empty or use 0 to let the system choose a port automatically.”);

checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),
“Number of task slots must be at least one.”);

final int pageSize = checkedDownCast(MemorySize.parse(configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());

// check page size of for minimum size
checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
“Minimum memory segment size is ” + MemoryManager.MIN_PAGE_SIZE);

// check page size for power of two
checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
“Memory segment size must be a power of 2.”);

// network buffer memory fraction

float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
long networkBufMin = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
long networkBufMax = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
checkNetworkBufferConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);

// fallback: number of network buffers
final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
checkNetworkConfigOld(numNetworkBuffers);

if (!hasNewNetworkBufConf(configuration)) {
// map old config to new one:
networkBufMin = networkBufMax = ((long) numNetworkBuffers) * pageSize;
} else {
if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
LOG.info(“Ignoring old (but still present) network buffer configuration via {}.”,
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
}
}

final NettyConfig nettyConfig;
if (!localTaskManagerCommunication) {
final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);

nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(),
taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration);
} else {
nettyConfig = null;
}

// Default spill I/O mode for intermediate results
final String syncOrAsync = configuration.getString(
ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);

final IOManager.IOMode ioMode;
if (syncOrAsync.equals(“async”)) {
ioMode = IOManager.IOMode.ASYNC;
} else {
ioMode = IOManager.IOMode.SYNC;
}

int initialRequestBackoff = configuration.getInteger(
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
int maxRequestBackoff = configuration.getInteger(
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);

int buffersPerChannel = configuration.getInteger(
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
int extraBuffersPerGate = configuration.getInteger(
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);

return new NetworkEnvironmentConfiguration(
networkBufFraction,
networkBufMin,
networkBufMax,
pageSize,
ioMode,
initialRequestBackoff,
maxRequestBackoff,
buffersPerChannel,
extraBuffersPerGate,
nettyConfig);
}

//……
}
TaskManagerServicesConfiguration 有个私有方法 parseNetworkEnvironmentConfiguration,用于创建 NetworkEnvironmentConfiguration;它会读取 TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE 等配置
TaskManagerOptions
flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@PublicEvolving
public class TaskManagerOptions {
//……

/**
* Size of memory buffers used by the network stack and the memory manager.
*/
public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
key(“taskmanager.memory.segment-size”)
.defaultValue(“32kb”)
.withDescription(“Size of memory buffers used by the network stack and the memory manager.”);

/**
* Fraction of JVM memory to use for network buffers.
*/
public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
key(“taskmanager.network.memory.fraction”)
.defaultValue(0.1f)
.withDescription(“Fraction of JVM memory to use for network buffers. This determines how many streaming” +
” data exchange channels a TaskManager can have at the same time and how well buffered the channels” +
” are. If a job is rejected or you get a warning that the system has not enough buffers available,” +
” increase this value or the min/max values below. Also note, that \”taskmanager.network.memory.min\”” +
“` and \”taskmanager.network.memory.max\” may override this fraction.”);

/**
* Minimum memory size for network buffers.
*/
public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
key(“taskmanager.network.memory.min”)
.defaultValue(“64mb”)
.withDescription(“Minimum memory size for network buffers.”);

/**
* Maximum memory size for network buffers.
*/
public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
key(“taskmanager.network.memory.max”)
.defaultValue(“1gb”)
.withDescription(“Maximum memory size for network buffers.”);

/**
* Number of buffers used in the network stack. This defines the number of possible tasks and
* shuffles.
*
* @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
* and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
*/
@Deprecated
public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
key(“taskmanager.network.numberOfBuffers”)
.defaultValue(2048);

/**
* Minimum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
key(“taskmanager.network.request-backoff.initial”)
.defaultValue(100)
.withDeprecatedKeys(“taskmanager.net.request-backoff.initial”)
.withDescription(“Minimum backoff in milliseconds for partition requests of input channels.”);

/**
* Maximum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
key(“taskmanager.network.request-backoff.max”)
.defaultValue(10000)
.withDeprecatedKeys(“taskmanager.net.request-backoff.max”)
.withDescription(“Maximum backoff in milliseconds for partition requests of input channels.”);

/**
* Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
*
* <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
*/
public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
key(“taskmanager.network.memory.buffers-per-channel”)
.defaultValue(2)
.withDescription(“Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).” +
“In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be” +
” configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is” +
” for parallel serialization.”);

/**
* Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
*/
public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
key(“taskmanager.network.memory.floating-buffers-per-gate”)
.defaultValue(8)
.withDescription(“Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).” +
” In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels.” +
” The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can” +
” help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be” +
” increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.”);
//……
}

taskmanager.memory.segment-size 指定 memory segment 的大小,默认为 32kb;taskmanager.network.memory.fraction 指定 network buffers 使用的 memory 的比例,默认为 0.1;taskmanager.network.memory.min 指定 network buffers 使用的最小内存,默认为 64mb;taskmanager.network.memory.max 指定 network buffers 使用的最大内存,默认为 1gb;taskmanager.network.numberOfBuffers 指定 network 使用的 buffers 数量,默认为 2048,该配置已经被废弃,使用 taskmanager.network.memory.fraction、taskmanager.network.memory.min、taskmanager.network.memory.max 这几个配置来替代
taskmanager.network.request-backoff.initial 指定 input channels 的 partition requests 的最小 backoff 时间 (毫秒),默认为 100;taskmanager.network.request-backoff.max 指定 input channels 的 partition requests 的最大 backoff 时间 (毫秒),默认为 10000
taskmanager.network.memory.buffers-per-channel 指定每个 outgoing/incoming channel 使用 buffers 数量,默认为 2;taskmanager.network.memory.floating-buffers-per-gate 指定每个 outgoing/incoming gate 使用 buffers 数量,默认为 8

NettyConfig
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
public class NettyConfig {

private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);

// – Config keys ———————————————————-

public static final ConfigOption<Integer> NUM_ARENAS = ConfigOptions
.key(“taskmanager.network.netty.num-arenas”)
.defaultValue(-1)
.withDeprecatedKeys(“taskmanager.net.num-arenas”)
.withDescription(“The number of Netty arenas.”);

public static final ConfigOption<Integer> NUM_THREADS_SERVER = ConfigOptions
.key(“taskmanager.network.netty.server.numThreads”)
.defaultValue(-1)
.withDeprecatedKeys(“taskmanager.net.server.numThreads”)
.withDescription(“The number of Netty server threads.”);

public static final ConfigOption<Integer> NUM_THREADS_CLIENT = ConfigOptions
.key(“taskmanager.network.netty.client.numThreads”)
.defaultValue(-1)
.withDeprecatedKeys(“taskmanager.net.client.numThreads”)
.withDescription(“The number of Netty client threads.”);

public static final ConfigOption<Integer> CONNECT_BACKLOG = ConfigOptions
.key(“taskmanager.network.netty.server.backlog”)
.defaultValue(0) // default: 0 => Netty’s default
.withDeprecatedKeys(“taskmanager.net.server.backlog”)
.withDescription(“The netty server connection backlog.”);

public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
.key(“taskmanager.network.netty.client.connectTimeoutSec”)
.defaultValue(120) // default: 120s = 2min
.withDeprecatedKeys(“taskmanager.net.client.connectTimeoutSec”)
.withDescription(“The Netty client connection timeout.”);

public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
.key(“taskmanager.network.netty.sendReceiveBufferSize”)
.defaultValue(0) // default: 0 => Netty’s default
.withDeprecatedKeys(“taskmanager.net.sendReceiveBufferSize”)
.withDescription(“The Netty send and receive buffer size. This defaults to the system buffer size” +
” (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.”);

public static final ConfigOption<String> TRANSPORT_TYPE = ConfigOptions
.key(“taskmanager.network.netty.transport”)
.defaultValue(“nio”)
.withDeprecatedKeys(“taskmanager.net.transport”)
.withDescription(“The Netty transport type, either \”nio\” or \”epoll\””);

// ————————————————————————

enum TransportType {
NIO, EPOLL, AUTO
}

static final String SERVER_THREAD_GROUP_NAME = “Flink Netty Server”;

static final String CLIENT_THREAD_GROUP_NAME = “Flink Netty Client”;

private final InetAddress serverAddress;

private final int serverPort;

private final int memorySegmentSize;

private final int numberOfSlots;

private final Configuration config; // optional configuration

public NettyConfig(
InetAddress serverAddress,
int serverPort,
int memorySegmentSize,
int numberOfSlots,
Configuration config) {

this.serverAddress = checkNotNull(serverAddress);

checkArgument(serverPort >= 0 && serverPort <= 65536, “Invalid port number.”);
this.serverPort = serverPort;

checkArgument(memorySegmentSize > 0, “Invalid memory segment size.”);
this.memorySegmentSize = memorySegmentSize;

checkArgument(numberOfSlots > 0, “Number of slots”);
this.numberOfSlots = numberOfSlots;

this.config = checkNotNull(config);

LOG.info(this.toString());
}

InetAddress getServerAddress() {
return serverAddress;
}

int getServerPort() {
return serverPort;
}

int getMemorySegmentSize() {
return memorySegmentSize;
}

public int getNumberOfSlots() {
return numberOfSlots;
}

// ————————————————————————
// Getters
// ————————————————————————

public int getServerConnectBacklog() {
return config.getInteger(CONNECT_BACKLOG);
}

public int getNumberOfArenas() {
// default: number of slots
final int configValue = config.getInteger(NUM_ARENAS);
return configValue == -1 ? numberOfSlots : configValue;
}

public int getServerNumThreads() {
// default: number of task slots
final int configValue = config.getInteger(NUM_THREADS_SERVER);
return configValue == -1 ? numberOfSlots : configValue;
}

public int getClientNumThreads() {
// default: number of task slots
final int configValue = config.getInteger(NUM_THREADS_CLIENT);
return configValue == -1 ? numberOfSlots : configValue;
}

public int getClientConnectTimeoutSeconds() {
return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
}

public int getSendAndReceiveBufferSize() {
return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
}

public TransportType getTransportType() {
String transport = config.getString(TRANSPORT_TYPE);

switch (transport) {
case “nio”:
return TransportType.NIO;
case “epoll”:
return TransportType.EPOLL;
default:
return TransportType.AUTO;
}
}

@Nullable
public SSLHandlerFactory createClientSSLEngineFactory() throws Exception {
return getSSLEnabled() ?
SSLUtils.createInternalClientSSLEngineFactory(config) :
null;
}

@Nullable
public SSLHandlerFactory createServerSSLEngineFactory() throws Exception {
return getSSLEnabled() ?
SSLUtils.createInternalServerSSLEngineFactory(config) :
null;
}

public boolean getSSLEnabled() {
return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED)
&& SSLUtils.isInternalSSLEnabled(config);
}

public boolean isCreditBasedEnabled() {
return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
}

public Configuration getConfig() {
return config;
}

@Override
public String toString() {
String format = “NettyConfig [” +
“server address: %s, ” +
“server port: %d, ” +
“ssl enabled: %s, ” +
“memory segment size (bytes): %d, ” +
“transport type: %s, ” +
“number of server threads: %d (%s), ” +
“number of client threads: %d (%s), ” +
“server connect backlog: %d (%s), ” +
“client connect timeout (sec): %d, ” +
“send/receive buffer size (bytes): %d (%s)]”;

String def = “use Netty’s default”;
String man = “manual”;

return String.format(format, serverAddress, serverPort, getSSLEnabled() ? “true” : “false”,
memorySegmentSize, getTransportType(), getServerNumThreads(),
getServerNumThreads() == 0 ? def : man,
getClientNumThreads(), getClientNumThreads() == 0 ? def : man,
getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man,
getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(),
getSendAndReceiveBufferSize() == 0 ? def : man);
}
}

NettyConfig 的构造器接收 serverAddress、serverPort、memorySegmentSize、numberOfSlots、config 这几个参数;它还提供了 getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType 等方法用于从 config 读取配置
taskmanager.network.netty.server.backlog 用于指定 netty server 的 connection backlog,默认值为 0 即使用 netty 默认的配置;taskmanager.network.netty.client.connectTimeoutSec 指定 netty client 的 connection timeout,默认为 120(单位秒);taskmanager.network.netty.sendReceiveBufferSize 指定 netty send/receive buffer 大小,默认为 0 即使用 netty 的默认配置,默认是使用 system buffer size,即 /proc/sys/net/ipv4/tcp_[rw]mem 的配置;taskmanager.network.netty.transport 指定的是 netty transport 的类型,默认是 nio
taskmanager.network.netty.num-arenas 指定的是 netty arenas 的数量,默认为 -1;taskmanager.network.netty.server.numThreads 指定的是 netty server 的 threads 数量,默认为 -1;taskmanager.network.netty.client.numThreads 指定的是 netty client 的 threads 数量,默认为 -1;这几个配置当配置值为 - 1 的时候,对应 get 方法返回的是 numberOfSlots 值

小结

NetworkEnvironmentConfiguration 主要是 flink network 的相关配置,里头有 networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig 属性
TaskManagerServicesConfiguration 有个私有方法 parseNetworkEnvironmentConfiguration,用于创建 NetworkEnvironmentConfiguration;它会读取 TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE 等配置
NettyConfig 的构造器接收 serverAddress、serverPort、memorySegmentSize、numberOfSlots、config 这几个参数;它还提供了 getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType 等方法用于从 config 读取配置

doc
taskmanager-network-memory-fraction

正文完
 0