聊聊flink的RpcService

23次阅读

共计 11364 个字符,预计需要花费 29 分钟才能阅读完成。


本文主要研究一下 flink 的 RpcService
RpcService
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
public interface RpcService {

String getAddress();

int getPort();

<C extends RpcGateway> CompletableFuture<C> connect(
String address,
Class<C> clazz);

<F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
String address,
F fencingToken,
Class<C> clazz);

<C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint);

<F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken);

void stopServer(RpcServer selfGateway);

CompletableFuture<Void> stopService();

CompletableFuture<Void> getTerminationFuture();

Executor getExecutor();

ScheduledExecutor getScheduledExecutor();

ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);

void execute(Runnable runnable);

<T> CompletableFuture<T> execute(Callable<T> callable);
}
RpcService 用于连接到一个远程的 rpc server,或者启动一个 rpc server 来转发远程调用到 rpcEndpoint;它提供了 connect、startServer、fenceRpcServer、stopServer、stopService、getTerminationFuture、scheduleRunnable、execute 等方法
AkkaRpcService
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@ThreadSafe
public class AkkaRpcService implements RpcService {

private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcService.class);

static final int VERSION = 1;

static final String MAXIMUM_FRAME_SIZE_PATH = “akka.remote.netty.tcp.maximum-frame-size”;

private final Object lock = new Object();

private final ActorSystem actorSystem;
private final Time timeout;

@GuardedBy(“lock”)
private final Map<ActorRef, RpcEndpoint> actors = new HashMap<>(4);

private final long maximumFramesize;

private final String address;
private final int port;

private final ScheduledExecutor internalScheduledExecutor;

private final CompletableFuture<Void> terminationFuture;

private volatile boolean stopped;

public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) {
this.actorSystem = checkNotNull(actorSystem, “actor system”);
this.timeout = checkNotNull(timeout, “timeout”);

if (actorSystem.settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
maximumFramesize = actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH);
} else {
// only local communication
maximumFramesize = Long.MAX_VALUE;
}

Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);

if (actorSystemAddress.host().isDefined()) {
address = actorSystemAddress.host().get();
} else {
address = “”;
}

if (actorSystemAddress.port().isDefined()) {
port = (Integer) actorSystemAddress.port().get();
} else {
port = -1;
}

internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);

terminationFuture = new CompletableFuture<>();

stopped = false;
}

public ActorSystem getActorSystem() {
return actorSystem;
}

protected int getVersion() {
return VERSION;
}

@Override
public String getAddress() {
return address;
}

@Override
public int getPort() {
return port;
}

// this method does not mutate state and is thus thread-safe
@Override
public <C extends RpcGateway> CompletableFuture<C> connect(
final String address,
final Class<C> clazz) {

return connectInternal(
address,
clazz,
(ActorRef actorRef) -> {
Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);

return new AkkaInvocationHandler(
addressHostname.f0,
addressHostname.f1,
actorRef,
timeout,
maximumFramesize,
null);
});
}

// this method does not mutate state and is thus thread-safe
@Override
public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String address, F fencingToken, Class<C> clazz) {
return connectInternal(
address,
clazz,
(ActorRef actorRef) -> {
Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);

return new FencedAkkaInvocationHandler<>(
addressHostname.f0,
addressHostname.f1,
actorRef,
timeout,
maximumFramesize,
null,
() -> fencingToken);
});
}

@Override
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
checkNotNull(rpcEndpoint, “rpc endpoint”);

CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
final Props akkaRpcActorProps;

if (rpcEndpoint instanceof FencedRpcEndpoint) {
akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
} else {
akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
}

ActorRef actorRef;

synchronized (lock) {
checkState(!stopped, “RpcService is stopped”);
actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
actors.put(actorRef, rpcEndpoint);
}

LOG.info(“Starting RPC endpoint for {} at {} .”, rpcEndpoint.getClass().getName(), actorRef.path());

final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
final String hostname;
Option<String> host = actorRef.path().address().host();
if (host.isEmpty()) {
hostname = “localhost”;
} else {
hostname = host.get();
}

Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));

implementedRpcGateways.add(RpcServer.class);
implementedRpcGateways.add(AkkaBasedEndpoint.class);

final InvocationHandler akkaInvocationHandler;

if (rpcEndpoint instanceof FencedRpcEndpoint) {
// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
akkaAddress,
hostname,
actorRef,
timeout,
maximumFramesize,
terminationFuture,
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);

implementedRpcGateways.add(FencedMainThreadExecutable.class);
} else {
akkaInvocationHandler = new AkkaInvocationHandler(
akkaAddress,
hostname,
actorRef,
timeout,
maximumFramesize,
terminationFuture);
}

// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
ClassLoader classLoader = getClass().getClassLoader();

@SuppressWarnings(“unchecked”)
RpcServer server = (RpcServer) Proxy.newProxyInstance(
classLoader,
implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
akkaInvocationHandler);

return server;
}

@Override
public <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken) {
if (rpcServer instanceof AkkaBasedEndpoint) {

InvocationHandler fencedInvocationHandler = new FencedAkkaInvocationHandler<>(
rpcServer.getAddress(),
rpcServer.getHostname(),
((AkkaBasedEndpoint) rpcServer).getActorRef(),
timeout,
maximumFramesize,
null,
() -> fencingToken);

// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
ClassLoader classLoader = getClass().getClassLoader();

return (RpcServer) Proxy.newProxyInstance(
classLoader,
new Class<?>[]{RpcServer.class, AkkaBasedEndpoint.class},
fencedInvocationHandler);
} else {
throw new RuntimeException(“The given RpcServer must implement the AkkaGateway in order to fence it.”);
}
}

@Override
public void stopServer(RpcServer selfGateway) {
if (selfGateway instanceof AkkaBasedEndpoint) {
final AkkaBasedEndpoint akkaClient = (AkkaBasedEndpoint) selfGateway;
final RpcEndpoint rpcEndpoint;

synchronized (lock) {
if (stopped) {
return;
} else {
rpcEndpoint = actors.remove(akkaClient.getActorRef());
}
}

if (rpcEndpoint != null) {
akkaClient.getActorRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
} else {
LOG.debug(“RPC endpoint {} already stopped or from different RPC service”, selfGateway.getAddress());
}
}
}

@Override
public CompletableFuture<Void> stopService() {
synchronized (lock) {
if (stopped) {
return terminationFuture;
}

stopped = true;
}

LOG.info(“Stopping Akka RPC service.”);

final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());

actorSystemTerminationFuture.whenComplete(
(Terminated ignored, Throwable throwable) -> {
synchronized (lock) {
actors.clear();
}

if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
} else {
terminationFuture.complete(null);
}

LOG.info(“Stopped Akka RPC service.”);
});

return terminationFuture;
}

@Override
public CompletableFuture<Void> getTerminationFuture() {
return terminationFuture;
}

@Override
public Executor getExecutor() {
return actorSystem.dispatcher();
}

@Override
public ScheduledExecutor getScheduledExecutor() {
return internalScheduledExecutor;
}

@Override
public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {
checkNotNull(runnable, “runnable”);
checkNotNull(unit, “unit”);
checkArgument(delay >= 0L, “delay must be zero or larger”);

return internalScheduledExecutor.schedule(runnable, delay, unit);
}

@Override
public void execute(Runnable runnable) {
actorSystem.dispatcher().execute(runnable);
}

@Override
public <T> CompletableFuture<T> execute(Callable<T> callable) {
Future<T> scalaFuture = Futures.<T>future(callable, actorSystem.dispatcher());

return FutureUtils.toJava(scalaFuture);
}

private <C extends RpcGateway> CompletableFuture<C> connectInternal(
final String address,
final Class<C> clazz,
Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
checkState(!stopped, “RpcService is stopped”);

LOG.debug(“Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.”,
address, clazz.getName());

final ActorSelection actorSel = actorSystem.actorSelection(address);

final Future<ActorIdentity> identify = Patterns
.ask(actorSel, new Identify(42), timeout.toMilliseconds())
.<ActorIdentity>mapTo(ClassTag$.MODULE$.<ActorIdentity>apply(ActorIdentity.class));

final CompletableFuture<ActorIdentity> identifyFuture = FutureUtils.toJava(identify);

final CompletableFuture<ActorRef> actorRefFuture = identifyFuture.thenApply(
(ActorIdentity actorIdentity) -> {
if (actorIdentity.getRef() == null) {
throw new CompletionException(new RpcConnectionException(“Could not connect to rpc endpoint under address ” + address + ‘.’));
} else {
return actorIdentity.getRef();
}
});

final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
(ActorRef actorRef) -> FutureUtils.toJava(
Patterns
.ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), timeout.toMilliseconds())
.<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class))));

return actorRefFuture.thenCombineAsync(
handshakeFuture,
(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);

// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
ClassLoader classLoader = getClass().getClassLoader();

@SuppressWarnings(“unchecked”)
C proxy = (C) Proxy.newProxyInstance(
classLoader,
new Class<?>[]{clazz},
invocationHandler);

return proxy;
},
actorSystem.dispatcher());
}

//……
}

AkkaRpcService 实现了 RpcService 接口,其构造器要求传入 actorSystem 及 timeout 参数;connect 方法会创建一个 AkkaInvocationHandler 或者 FencedAkkaInvocationHandler,然后调用 connectInternal 方法使用 akka 进行连接
startServer 方法会利用 actorSystem 创建 ActorRef,然后创建 AkkaInvocationHandler 或者 FencedAkkaInvocationHandler,最后使用 Proxy.newProxyInstance 创建 RpcServer;stopServer 方法会使用 PoisonPill 来终止 actor;stopService 用于终止当前的 RpcService,它会执行 actorSystem.terminate()
fenceRpcServer 方法用于根据指定的 fencingToken 重新使用代理创建新的 RpcServer;execute 方法使用的是 actorSystem.dispatcher() 来调度执行;scheduleRunnable 方法则使用的是 ActorSystemScheduledExecutorAdapter 来进行调度

小结

RpcService 用于连接到一个远程的 rpc server,或者启动一个 rpc server 来转发远程调用到 rpcEndpoint;它提供了 connect、startServer、fenceRpcServer、stopServer、stopService、getTerminationFuture、scheduleRunnable、execute 等方法
AkkaRpcService 实现了 RpcService 接口,它的 connect 方法会创建一个 AkkaInvocationHandler 或者 FencedAkkaInvocationHandler,然后调用 connectInternal 方法使用 akka 进行连接
AkkaRpcService 的 startServer 方法会利用 actorSystem 创建 ActorRef,然后创建 AkkaInvocationHandler 或者 FencedAkkaInvocationHandler,最后使用 Proxy.newProxyInstance 创建 RpcServer;stopServer 方法会使用 PoisonPill 来终止 actor;stopService 用于终止当前的 RpcService,它会执行 actorSystem.terminate()

doc
RpcService

正文完
 0