序本文主要研究一下flink的ActorGatewayActorGatewayflink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.javapublic interface ActorGateway extends Serializable { /** * Sends a message asynchronously and returns its response. The response to the message is * returned as a future. * * @param message Message to be sent * @param timeout Timeout until the Future is completed with an AskTimeoutException * @return Future which contains the response to the sent message / Future<Object> ask(Object message, FiniteDuration timeout); /* * Sends a message asynchronously without a result. * * @param message Message to be sent / void tell(Object message); /* * Sends a message asynchronously without a result with sender being the sender. * * @param message Message to be sent * @param sender Sender of the message / void tell(Object message, ActorGateway sender); /* * Forwards a message. For the receiver of this message it looks as if sender has sent the * message. * * @param message Message to be sent * @param sender Sender of the forwarded message / void forward(Object message, ActorGateway sender); /* * Retries to send asynchronously a message up to numberRetries times. The response to this * message is returned as a future. The message is re-sent if the number of retries is not yet * exceeded and if an exception occurred while sending it. * * @param message Message to be sent * @param numberRetries Number of times to retry sending the message * @param timeout Timeout for each sending attempt * @param executionContext ExecutionContext which is used to send the message multiple times * @return Future of the response to the sent message / Future<Object> retry( Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext); /* * Returns the path of the remote instance. * * @return Path of the remote instance. / String path(); /* * Returns the underlying actor with which is communicated * * @return ActorRef of the target actor / ActorRef actor(); /* * Returns the leaderSessionID associated with the remote actor or null. * * @return Leader session ID if its associated with this gateway, otherwise null / UUID leaderSessionID();}ActorGateway接口定义了ask、tell、forward、retry、path、actor、leaderSessionID方法;它有一个实现类为AkkaActorGatewayAkkaActorGatewayflink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.javapublic class AkkaActorGateway implements ActorGateway, Serializable { private static final long serialVersionUID = 42L; // ActorRef of the remote instance private final ActorRef actor; // Associated leader session ID, which is used for RequiresLeaderSessionID messages private final UUID leaderSessionID; // Decorator for messages private final MessageDecorator decorator; public AkkaActorGateway(ActorRef actor, UUID leaderSessionID) { this.actor = Preconditions.checkNotNull(actor); this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID); // we want to wrap RequiresLeaderSessionID messages in a LeaderSessionMessage this.decorator = new LeaderSessionMessageDecorator(leaderSessionID); } /* * Sends a message asynchronously and returns its response. The response to the message is * returned as a future. * * @param message Message to be sent * @param timeout Timeout until the Future is completed with an AskTimeoutException * @return Future which contains the response to the sent message / @Override public Future<Object> ask(Object message, FiniteDuration timeout) { Object newMessage = decorator.decorate(message); return Patterns.ask(actor, newMessage, new Timeout(timeout)); } /* * Sends a message asynchronously without a result. * * @param message Message to be sent / @Override public void tell(Object message) { Object newMessage = decorator.decorate(message); actor.tell(newMessage, ActorRef.noSender()); } /* * Sends a message asynchronously without a result with sender being the sender. * * @param message Message to be sent * @param sender Sender of the message / @Override public void tell(Object message, ActorGateway sender) { Object newMessage = decorator.decorate(message); actor.tell(newMessage, sender.actor()); } /* * Forwards a message. For the receiver of this message it looks as if sender has sent the * message. * * @param message Message to be sent * @param sender Sender of the forwarded message / @Override public void forward(Object message, ActorGateway sender) { Object newMessage = decorator.decorate(message); actor.tell(newMessage, sender.actor()); } /* * Retries to send asynchronously a message up to numberRetries times. The response to this * message is returned as a future. The message is re-sent if the number of retries is not yet * exceeded and if an exception occurred while sending it. * * @param message Message to be sent * @param numberRetries Number of times to retry sending the message * @param timeout Timeout for each sending attempt * @param executionContext ExecutionContext which is used to send the message multiple times * @return Future of the response to the sent message / @Override public Future<Object> retry( Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) { Object newMessage = decorator.decorate(message); return AkkaUtils.retry( actor, newMessage, numberRetries, executionContext, timeout); } /* * Returns the ActorPath of the remote instance. * * @return ActorPath of the remote instance. / @Override public String path() { return actor.path().toString(); } /* * Returns {@link ActorRef} of the target actor * * @return ActorRef of the target actor / @Override public ActorRef actor() { return actor; } @Override public UUID leaderSessionID() { return leaderSessionID; } @Override public String toString() { return String.format(“AkkaActorGateway(%s, %s)”, actor.path(), leaderSessionID); }}AkkaActorGateway实现了ActorGateway接口,它的构造器要求输入ActorRef及leaderSessionID,同时基于leaderSessionID创建了LeaderSessionMessageDecorator;ask、tell、forward、retry方法均首先调用LeaderSessionMessageDecorator.decorate方法包装message参数,然后再去调用ActorRef的相应方法MessageDecoratorflink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.javapublic interface MessageDecorator extends java.io.Serializable { /* * Decorates a message * * @param message Message to decorate * @return Decorated message / Object decorate(Object message);}MessageDecorator接口定义了decorate方法用于包装message,它有一个实现类为LeaderSessionMessageDecoratorLeaderSessionMessageDecoratorflink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.javapublic class LeaderSessionMessageDecorator implements MessageDecorator { private static final long serialVersionUID = 5359618147408392706L; /* Leader session ID with which the RequiresLeaderSessionID messages will be decorated / private final UUID leaderSessionID; /* * Sets the leader session ID with which the messages will be decorated. * * @param leaderSessionID Leader session ID to be used for decoration / public LeaderSessionMessageDecorator(UUID leaderSessionID) { this.leaderSessionID = leaderSessionID; } @Override public Object decorate(Object message) { if (message instanceof RequiresLeaderSessionID) { return new JobManagerMessages.LeaderSessionMessage(leaderSessionID, message); } else { return message; } }}LeaderSessionMessageDecorator实现了MessageDecorator接口,其decorate方法判断message是RequiresLeaderSessionID类型的话,则返回JobManagerMessages.LeaderSessionMessage,否则返回原始的messageJobManagerMessages.LeaderSessionMessageflink-1.7.2/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scalaobject JobManagerMessages { /* Wrapper class for leader session messages. Leader session messages implement the * [[RequiresLeaderSessionID]] interface and have to be wrapped in a [[LeaderSessionMessage]], * which also contains the current leader session ID. * * @param leaderSessionID Current leader session ID * @param message [[RequiresLeaderSessionID]] message to be wrapped in a [[LeaderSessionMessage]] */ case class LeaderSessionMessage(leaderSessionID: UUID, message: Any) //……}JobManagerMessages.LeaderSessionMessage是一个case class,它有两个属性,分别是leaderSessionID及message小结ActorGateway接口定义了ask、tell、forward、retry、path、actor、leaderSessionID方法;它有一个实现类为AkkaActorGatewayAkkaActorGateway实现了ActorGateway接口,它的构造器要求输入ActorRef及leaderSessionID,同时基于leaderSessionID创建了LeaderSessionMessageDecorator;ask、tell、forward、retry方法均首先调用LeaderSessionMessageDecorator.decorate方法包装message参数,然后再去调用ActorRef的相应方法MessageDecorator接口定义了decorate方法用于包装message,它有一个实现类为LeaderSessionMessageDecorator;LeaderSessionMessageDecorator实现了MessageDecorator接口,其decorate方法判断message是RequiresLeaderSessionID类型的话,则返回JobManagerMessages.LeaderSessionMessage,否则返回原始的message;JobManagerMessages.LeaderSessionMessage是一个case class,它有两个属性,分别是leaderSessionID及messagedocActorGateway