聊聊flink的ConnectionManager


本文主要研究一下flink的ConnectionManager
ConnectionManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
public interface ConnectionManager {

void start(ResultPartitionProvider partitionProvider,
TaskEventDispatcher taskEventDispatcher) throws IOException;

/**
* Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.
*/
PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException;

/**
* Closes opened ChannelConnections in case of a resource release.
*/
void closeOpenChannelConnections(ConnectionID connectionId);

int getNumberOfActiveConnections();

int getDataPort();

void shutdown() throws IOException;

}
ConnectionManager定义了start、shutdown、closeOpenChannelConnections等方法用于管理physical connections;它有两个子类,一个是LocalConnectionManager,一个是NettyConnectionManager
LocalConnectionManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
public class LocalConnectionManager implements ConnectionManager {

@Override
public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) {
}

@Override
public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {
return null;
}

@Override
public void closeOpenChannelConnections(ConnectionID connectionId) {}

@Override
public int getNumberOfActiveConnections() {
return 0;
}

@Override
public int getDataPort() {
return -1;
}

@Override
public void shutdown() {}
}
LocalConnectionManager实现了ConnectionManager接口,不过它的实现基本是空操作
NettyConnectionManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
public class NettyConnectionManager implements ConnectionManager {

private final NettyServer server;

private final NettyClient client;

private final NettyBufferPool bufferPool;

private final PartitionRequestClientFactory partitionRequestClientFactory;

public NettyConnectionManager(NettyConfig nettyConfig) {
this.server = new NettyServer(nettyConfig);
this.client = new NettyClient(nettyConfig);
this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());

this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
}

@Override
public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
NettyProtocol partitionRequestProtocol = new NettyProtocol(
partitionProvider,
taskEventDispatcher,
client.getConfig().isCreditBasedEnabled());

client.init(partitionRequestProtocol, bufferPool);
server.init(partitionRequestProtocol, bufferPool);
}

@Override
public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
throws IOException, InterruptedException {
return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
}

@Override
public void closeOpenChannelConnections(ConnectionID connectionId) {
partitionRequestClientFactory.closeOpenChannelConnections(connectionId);
}

@Override
public int getNumberOfActiveConnections() {
return partitionRequestClientFactory.getNumberOfActiveClients();
}

@Override
public int getDataPort() {
if (server != null && server.getLocalAddress() != null) {
return server.getLocalAddress().getPort();
} else {
return -1;
}
}

@Override
public void shutdown() {
client.shutdown();
server.shutdown();
}

NettyClient getClient() {
return client;
}

NettyServer getServer() {
return server;
}

NettyBufferPool getBufferPool() {
return bufferPool;
}
}

NettyConnectionManager实现了ConnectionManager接口;它的构造器使用NettyConfig创建了NettyServer、NettyClient、NettyBufferPool,同时使用NettyClient创建了PartitionRequestClientFactory
start方法创建了NettyProtocol,同时初始化NettyClient、NettyServer;shutdown方法则关闭NettyClient、NettyServer;closeOpenChannelConnections则是使用partitionRequestClientFactory.closeOpenChannelConnections来关闭指定的connectionId
createPartitionRequestClient方法通过partitionRequestClientFactory.createPartitionRequestClient来创建PartitionRequestClient

PartitionRequestClientFactory
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
class PartitionRequestClientFactory {

private final NettyClient nettyClient;

private final ConcurrentMap<ConnectionID, Object> clients = new ConcurrentHashMap<ConnectionID, Object>();

PartitionRequestClientFactory(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}

/**
* Atomically establishes a TCP connection to the given remote address and
* creates a {@link PartitionRequestClient} instance for this connection.
*/
PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException {
Object entry;
PartitionRequestClient client = null;

while (client == null) {
entry = clients.get(connectionId);

if (entry != null) {
// Existing channel or connecting channel
if (entry instanceof PartitionRequestClient) {
client = (PartitionRequestClient) entry;
}
else {
ConnectingChannel future = (ConnectingChannel) entry;
client = future.waitForChannel();

clients.replace(connectionId, future, client);
}
}
else {
// No channel yet. Create one, but watch out for a race.
// We create a “connecting future” and atomically add it to the map.
// Only the thread that really added it establishes the channel.
// The others need to wait on that original establisher’s future.
ConnectingChannel connectingChannel = new ConnectingChannel(connectionId, this);
Object old = clients.putIfAbsent(connectionId, connectingChannel);

if (old == null) {
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);

client = connectingChannel.waitForChannel();

clients.replace(connectionId, connectingChannel, client);
}
else if (old instanceof ConnectingChannel) {
client = ((ConnectingChannel) old).waitForChannel();

clients.replace(connectionId, old, client);
}
else {
client = (PartitionRequestClient) old;
}
}

// Make sure to increment the reference count before handing a client
// out to ensure correct bookkeeping for channel closing.
if (!client.incrementReferenceCounter()) {
destroyPartitionRequestClient(connectionId, client);
client = null;
}
}

return client;
}

public void closeOpenChannelConnections(ConnectionID connectionId) {
Object entry = clients.get(connectionId);

if (entry instanceof ConnectingChannel) {
ConnectingChannel channel = (ConnectingChannel) entry;

if (channel.dispose()) {
clients.remove(connectionId, channel);
}
}
}

int getNumberOfActiveClients() {
return clients.size();
}

/**
* Removes the client for the given {@link ConnectionID}.
*/
void destroyPartitionRequestClient(ConnectionID connectionId, PartitionRequestClient client) {
clients.remove(connectionId, client);
}

//……
}

PartitionRequestClientFactory的构造器需要一个NettyClient;它使用ConcurrentHashMap在内存维护了一个ConnectionID与PartitionRequestClient或ConnectingChannel的映射关系
createPartitionRequestClient方法会先从ConcurrentHashMap查找是否有对应ConnectionID的PartitionRequestClient或ConnectingChannel,如果存在且是PartitionRequestClient实例则返回,如果存在且是ConnectingChannel实例则调用ConnectingChannel.waitForChannel等待PartitionRequestClient,然后替换对应ConnectionID在ConcurrentHashMap的值为PartitionRequestClient;如果ConcurrentHashMap没有对应ConnectionID的值,则会创建一个ConnectingChannel,然后放入到ConcurrentHashMap中,同时获取old object,如果old为null,则使用nettyClient.connect进行连接,然后获取PartitionRequestClient,之后替换ConcurrentHashMap中的值;如果old是ConnectingChannel则调用ConnectingChannel.waitForChannel等待PartitionRequestClient,然后替换ConcurrentHashMap中的值;在返回PartitionRequestClient之前会通过client.incrementReferenceCounter()来递增引用,如果递增不成功则调用destroyPartitionRequestClient,返回null,递增成功则返回PartitionRequestClient
closeOpenChannelConnections方法则判断,如果是ConnectingChannel,则调用ConnectingChannel.dispose,成功之后从ConcurrentHashMap中移除

ConnectingChannel
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
private static final class ConnectingChannel implements ChannelFutureListener {

private final Object connectLock = new Object();

private final ConnectionID connectionId;

private final PartitionRequestClientFactory clientFactory;

private boolean disposeRequestClient = false;

public ConnectingChannel(ConnectionID connectionId, PartitionRequestClientFactory clientFactory) {
this.connectionId = connectionId;
this.clientFactory = clientFactory;
}

private boolean dispose() {
boolean result;
synchronized (connectLock) {
if (partitionRequestClient != null) {
result = partitionRequestClient.disposeIfNotUsed();
}
else {
disposeRequestClient = true;
result = true;
}

connectLock.notifyAll();
}

return result;
}

private void handInChannel(Channel channel) {
synchronized (connectLock) {
try {
NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class);
partitionRequestClient = new PartitionRequestClient(
channel, clientHandler, connectionId, clientFactory);

if (disposeRequestClient) {
partitionRequestClient.disposeIfNotUsed();
}

connectLock.notifyAll();
}
catch (Throwable t) {
notifyOfError(t);
}
}
}

private volatile PartitionRequestClient partitionRequestClient;

private volatile Throwable error;

private PartitionRequestClient waitForChannel() throws IOException, InterruptedException {
synchronized (connectLock) {
while (error == null && partitionRequestClient == null) {
connectLock.wait(2000);
}
}

if (error != null) {
throw new IOException(“Connecting the channel failed: ” + error.getMessage(), error);
}

return partitionRequestClient;
}

private void notifyOfError(Throwable error) {
synchronized (connectLock) {
this.error = error;
connectLock.notifyAll();
}
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
handInChannel(future.channel());
}
else if (future.cause() != null) {
notifyOfError(new RemoteTransportException(
“Connecting to remote task manager + ‘” + connectionId.getAddress() +
“‘ has failed. This might indicate that the remote task ” +
“manager has been lost.”,
connectionId.getAddress(), future.cause()));
}
else {
notifyOfError(new LocalTransportException(
String.format(
“Connecting to remote task manager ‘%s’ has been cancelled.”,
connectionId.getAddress()),
null));
}
}
}
ConnectingChannel实现了netty的ChannelFutureListener接口,它的operationComplete方法在ChannelFuture是success的时候会调用handInChannel方法,该方法会创建PartitionRequestClient;waitForChannel方法则会等待partitionRequestClient创建成功然后返回
小结

ConnectionManager定义了start、shutdown、closeOpenChannelConnections等方法用于管理physical connections;它有两个子类,一个是LocalConnectionManager,一个是NettyConnectionManager
LocalConnectionManager实现了ConnectionManager接口,不过它的实现基本是空操作;NettyConnectionManager实现了ConnectionManager接口,它的构造器使用NettyConfig创建了NettyServer、NettyClient、NettyBufferPool,同时使用NettyClient创建了PartitionRequestClientFactory,start方法创建了NettyProtocol,同时初始化NettyClient、NettyServer,shutdown方法则关闭NettyClient、NettyServer,closeOpenChannelConnections则是使用partitionRequestClientFactory.closeOpenChannelConnections来关闭指定的connectionId,createPartitionRequestClient方法通过partitionRequestClientFactory.createPartitionRequestClient来创建PartitionRequestClient
PartitionRequestClientFactory的构造器需要一个NettyClient;它使用ConcurrentHashMap在内存维护了一个ConnectionID与PartitionRequestClient或ConnectingChannel的映射关系;ConnectingChannel实现了netty的ChannelFutureListener接口,它的operationComplete方法在ChannelFuture是success的时候会调用handInChannel方法,该方法会创建PartitionRequestClient;waitForChannel方法则会等待partitionRequestClient创建成功然后返回

doc
ConnectionManager

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理