聊聊flink taskmanager的data.port与rpc.port

30次阅读

共计 10034 个字符,预计需要花费 26 分钟才能阅读完成。


本文主要研究一下 flink taskmanager 的 data.port 与 rpc.port
TaskManagerServices
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
public class TaskManagerServices {
//……

public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
ResourceID resourceID,
Executor taskIOExecutor,
long freeHeapMemoryWithDefrag,
long maxJvmHeapMemory) throws Exception {

// pre-start checks
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());

final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration, maxJvmHeapMemory);
network.start();

final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
resourceID,
taskManagerServicesConfiguration.getTaskManagerAddress(),
network.getConnectionManager().getDataPort());

// this call has to happen strictly after the network stack has been initialized
final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory);

// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());

final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();

final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());

for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
resourceProfiles.add(ResourceProfile.ANY);
}

final TimerService<AllocationID> timerService = new TimerService<>(
new ScheduledThreadPoolExecutor(1),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());

final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);

final JobManagerTable jobManagerTable = new JobManagerTable();

final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);

final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();

final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];

for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
}

final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
stateRootDirectoryFiles,
taskIOExecutor);

return new TaskManagerServices(
taskManagerLocation,
memoryManager,
ioManager,
network,
broadcastVariableManager,
taskSlotTable,
jobManagerTable,
jobLeaderService,
taskStateManager);
}

private static NetworkEnvironment createNetworkEnvironment(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
long maxJvmHeapMemory) {

NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig();

final long networkBuf = calculateNetworkBufferMemory(taskManagerServicesConfiguration, maxJvmHeapMemory);
int segmentSize = networkEnvironmentConfiguration.networkBufferSize();

// tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
final long numNetBuffersLong = networkBuf / segmentSize;
if (numNetBuffersLong > Integer.MAX_VALUE) {
throw new IllegalArgumentException(“The given number of memory bytes (” + networkBuf
+ “) corresponds to more than MAX_INT pages.”);
}

NetworkBufferPool networkBufferPool = new NetworkBufferPool(
(int) numNetBuffersLong,
segmentSize);

ConnectionManager connectionManager;
boolean enableCreditBased = false;
NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig();
if (nettyConfig != null) {
connectionManager = new NettyConnectionManager(nettyConfig);
enableCreditBased = nettyConfig.isCreditBasedEnabled();
} else {
connectionManager = new LocalConnectionManager();
}

ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

KvStateRegistry kvStateRegistry = new KvStateRegistry();

QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();

int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();

int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();

final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
taskManagerServicesConfiguration.getTaskManagerAddress(),
qsConfig.getProxyPortRange(),
numProxyServerNetworkThreads,
numProxyServerQueryThreads,
new DisabledKvStateRequestStats());

int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();

int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();

final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer(
taskManagerServicesConfiguration.getTaskManagerAddress(),
qsConfig.getStateServerPortRange(),
numStateServerNetworkThreads,
numStateServerQueryThreads,
kvStateRegistry,
new DisabledKvStateRequestStats());

// we start the network first, to make sure it can allocate its buffers first
return new NetworkEnvironment(
networkBufferPool,
connectionManager,
resultPartitionManager,
taskEventDispatcher,
kvStateRegistry,
kvStateServer,
kvClientProxy,
networkEnvironmentConfiguration.ioMode(),
networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
networkEnvironmentConfiguration.networkBuffersPerChannel(),
networkEnvironmentConfiguration.floatingNetworkBuffersPerGate(),
enableCreditBased);
}

//……
}

TaskManagerServices 的 fromConfiguration 方法从 taskManagerServicesConfiguration 读取配置,然后创建 NetworkEnvironment,之后创建 TaskManagerLocation 用到了 NetworkEnvironment.getConnectionManager().getDataPort()
TaskExecutorToResourceManagerConnection 及 ConnectionID 均从 TaskManagerLocation 获取了 dataPort 信息
createNetworkEnvironment 方法从 taskManagerServicesConfiguration 获取 NetworkEnvironmentConfiguration(它从配置文件读取 taskmanager.data.port),如果它的 nettyConfig 不为 null,则根据它创建了 NettyConnectionManager

NettyConnectionManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
public class NettyConnectionManager implements ConnectionManager {

private final NettyServer server;

private final NettyClient client;

private final NettyBufferPool bufferPool;

private final PartitionRequestClientFactory partitionRequestClientFactory;

public NettyConnectionManager(NettyConfig nettyConfig) {
this.server = new NettyServer(nettyConfig);
this.client = new NettyClient(nettyConfig);
this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());

this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
}

@Override
public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
NettyProtocol partitionRequestProtocol = new NettyProtocol(
partitionProvider,
taskEventDispatcher,
client.getConfig().isCreditBasedEnabled());

client.init(partitionRequestProtocol, bufferPool);
server.init(partitionRequestProtocol, bufferPool);
}

@Override
public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
throws IOException, InterruptedException {
return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
}

@Override
public void closeOpenChannelConnections(ConnectionID connectionId) {
partitionRequestClientFactory.closeOpenChannelConnections(connectionId);
}

@Override
public int getNumberOfActiveConnections() {
return partitionRequestClientFactory.getNumberOfActiveClients();
}

@Override
public int getDataPort() {
if (server != null && server.getLocalAddress() != null) {
return server.getLocalAddress().getPort();
} else {
return -1;
}
}

@Override
public void shutdown() {
client.shutdown();
server.shutdown();
}

NettyClient getClient() {
return client;
}

NettyServer getServer() {
return server;
}

NettyBufferPool getBufferPool() {
return bufferPool;
}
}
NettyConnectionManager 的构造器根据 NettyConfig 构造了 NettyServer,而 getDataPort 则取的是 server.getLocalAddress().getPort()
TaskManagerRunner
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {
//……

public static RpcService createRpcService(
final Configuration configuration,
final HighAvailabilityServices haServices) throws Exception {

checkNotNull(configuration);
checkNotNull(haServices);

String taskManagerHostname = configuration.getString(TaskManagerOptions.HOST);

if (taskManagerHostname != null) {
LOG.info(“Using configured hostname/address for TaskManager: {}.”, taskManagerHostname);
} else {
Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());

InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
haServices.getResourceManagerLeaderRetriever(),
lookupTimeout);

taskManagerHostname = taskManagerAddress.getHostName();

LOG.info(“TaskManager will use hostname/address ‘{}’ ({}) for communication.”,
taskManagerHostname, taskManagerAddress.getHostAddress());
}

final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, configuration);
}

//……
}
TaskManagerRunner 提供了 createRpcService 方法,其从配置文件读取 taskmanager.rpc.port,然后调用 AkkaRpcServiceUtils.createRpcService 来创建 RpcService
小结

TaskManagerServices 的 fromConfiguration 方法从 taskManagerServicesConfiguration 读取配置,然后创建 NetworkEnvironment,之后创建 TaskManagerLocation 用到了 NetworkEnvironment.getConnectionManager().getDataPort();TaskExecutorToResourceManagerConnection 及 ConnectionID 均从 TaskManagerLocation 获取了 dataPort 信息
TaskManagerServices 的 createNetworkEnvironment 方法从 taskManagerServicesConfiguration 获取 NetworkEnvironmentConfiguration(它从配置文件读取 taskmanager.data.port),如果它的 nettyConfig 不为 null,则根据它创建了 NettyConnectionManager;NettyConnectionManager 的构造器根据 NettyConfig 构造了 NettyServer,而 getDataPort 则取的是 server.getLocalAddress().getPort()
TaskManagerRunner 提供了 createRpcService 方法,其从配置文件读取 taskmanager.rpc.port,然后调用 AkkaRpcServiceUtils.createRpcService 来创建 RpcService

doc

taskmanager-data-port
taskmanager-rpc-port

正文完
 0