组件间的近程通信、组件内的本地通信以及组件内的状态在并发状况下的保护,都是基于Akka Actor来实现的
1、Akka与Actor模型
Akka是构建高并发、分布式、可扩大利用的框架。Akka让开发者只须要关注业务逻辑,不须要写底层代码来反对可靠性、容错和高性能。Akka带来了诸多益处,比方 :
- 提供新的多线程模型,不须要应用低级的锁和原子性的操作来解决内存可见性问题
- 提供通明的近程通信,不再须要编写和保护简单的网络代码
- 提供集群式、高可用的架构,不便构建真正的响应式模式的利用
- Akka是基于Actor模型实现的,Actor模型相似于Erlang的并行模型,能使实现并发、并行和分布式应用更加简略
- Actor是Actor模型中最重要的形成局部,作为最根本的计算单位,能接管音讯并基于其执行计算。 每个Actor都有本人的邮箱,用来存储接管到的音讯。每个Actor维持公有的状态,来实现Actor之间的隔离
- 每个Actor都是由单个线程负责从各自的邮箱拉取音讯,并间断解决接管到的音讯。对于接管到的音讯,Actor能够更改其外部的状态,或者将其传给其余Actor,或者创立新的Actor
- ActorSystem是Actor的工厂和管理者。ActorSystem会为其Actor提供调度、配置和日志等公共服务。多个ActorSystem能够共存于同一台机器中。如果一个ActorSystem是以RemoteActorRefProvider的形式启动的,则它能够被近程ActorSystem拜访到。ActorSystem能主动判断Actor音讯是出自同一个ActorSystem的Actor还是来自近程ActorSystem的Actor。本地的Actor间通信,音讯通过共享内存传递;而近程的Actor间通信,音讯通过网络栈传递
2、理解Actor的创立、启动以及音讯的发送
Server端
RemoteServerActor通过继承AbstractActor来实现Actor,并实现AbstractActor的 createReceive办法,来实现接管到音讯的解决逻辑 : 接管到String的音讯,将音讯内容打印到控制台。
public class RemoteServerActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() .match(String.class, message-> { System.out.println(message); }) .build(); }}
RemoteServerActorLauncher类首先通过加载remote.conf的配置(其中配置的provider为 RemoteActorRefProvider),来创立名为remote、反对近程通信的ActorSystem;再通过 actorSystem调用actorOf办法创立并启动名为remoteServerActor的Actor实例,并返回Actor地址 (ActorRef);最初通过ActorRef调用tell办法向RemoteServerActor发送音讯
RemoteServerActorLauncher启动Actor和发送音讯
public class RemoteServerActorLauncher { public static void main(String[] args) { Config config = ConfigFactory.load("remote.conf"); ActorSystem actorSystem = ActorSystem.create("remote", config); ActorRef actor = actorSystem.actorOf(Props.create(RemoteServerActor.class), "remoteServerActor"); actor.tell("hello!", ActorRef.noSender()); } }
remote.conf配置 :
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 50050 } } }
在这个示例中,整个Actor创立与音讯发送的流程具体步骤如下 :
1) 创立近程的ActorSystem
2) 创立并启动RemoteServerActorLauncher实例,返回ActorRef,创立音讯并通过ActorRef发送音讯
3) ActorRef将音讯委托给Dispatcher发送到Actor
4) Dispatcher把音讯暂存在邮箱中,Dispatcher中封装了一个线程池,用于音讯派发,实现异步音讯发送的成果
5) 从邮箱中取出音讯,委派给RemoteServerActorLauncher中通过createReceive办法创立的Receive实例来解决
Client端
在LocalClient中,通过Akka URL与RemoteServerActor进行近程通信。首先通过加载client.conf来 配置近程Actor的地址状况。client.conf的配置与remote.conf一样,在理论生产中,client.conf中 的hostName为近程Actor对应的机器或者虚拟机的实在IP。再依据加载的配置创立local的 ActorSystem,而后actorSystem调用actorSelection的办法失去近程Actor的地址。最初获取近程 Actor地址并发送给近程Actor
public class LocalClient { public static void main(String[] args) { Config config = ConfigFactory.load("client.conf"); ActorSystem actorSystem = ActorSystem.create("local", config); ActorSelection toFind = actorSystem.actorSelection("akka.tcp://remote@127.0.0.1:50050/user/remoteServerActor"); toFind.tell("I am from local.", ActorRef.noSender()); } }
client.conf配置内容:
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 50050 } } }
在下面的示例中,发送音讯应用了tell模式。Actor的发送音讯模式有ask、tell和forward,三者的特点如下 :
ask模式 : 发送音讯异步,并返回一个Future来代表可能的音讯回应
tell模式 : 一种fire-and-forget(发后即忘)的形式,发送音讯异步并立刻返回,无返回信息
forward模式 : 相似邮件的转发,将收到的音讯由一个Actor转发到另一个Actor
3、组件内通信
- 对于这些组件内的多线程拜访,没有锁和算子操作来保障状态,而次要通过runAsync办法、 callAsync办法、scheduleRunAsync办法,以及通过getMainThreadExecutor调度来执行Future的回调办法,来实现对组件 状态的平安操作
- 组件间通过RpcGateway子类的办法实现近程的办法调用
- 组件外部的平安状态操作是基于本地Actor实现的,而组件间的通信是通过近程Actor实现的
组件内的本地通信与组件间通信的设计与实现,首先来看下组件通信的整体状况。组件通信相干的类位于flink-runtime模块下的org.apache.flink.runtime.rpc包中,组件通信的次要局部如下 :
- RpcEndpoint : 近程过程调用端点(rpc)根底类,提供近程过程调用的分布式组件须要继承这个根底类。运行时组件Dispatcher、TaskExecutor、ResourceManager和JobMaster组件都继承了RpcEndpoint
- AkkaRpcActor : 接管RpcInvocation、RunAsync、CallAsync和ControlMessages的音讯来实现运行时组件中状态的平安操作
- AkkaInvocationHandler : 作为RpcAkka调用的Handler,AkkaRpcActor接管到的RunAsync、CallAsync和RpcInvocation音讯都由AkkaInvocationHandler发送
- AkkaRpcService : 实现RpcService接口,负责启动AkkaRpcActor和连贯到RpcEndpoint。连贯到一个RpcEndpoint,会返回RpcGateway,供近程过程调用
- 与不带Fenced结尾的类相比,以Fenced结尾的类只是多了对FencingToken的解决逻辑
AkkaRpcActor
- 首先来看解决音讯的AkkaRpcActor。除REST以外,其余运行时组件(Dispatcher、
TaskExecutor、ResourceManager和JobMaster)都有一个AkkaRpcActor对象
- akkaRpcActor负责接管音讯,并对音讯进行解决,以操作RpcEndpoint(Dispatcher、 TaskExecutor、ResourceManager和JobMaster是RpcEnpoint类中的子类)的状态,实现对 RpcEndpoint实现类对象的生命周期管制和状态操作
- AkkaRpcActor解决的音讯分为近程握手音讯(RemoteHandshakeMessage)、管制音讯和一般音讯。近程握手音讯次要用于在RpcEndpoint之间的近程通信建设连贯之前,查看RpcEndpoint之间版本是否兼容
管制音讯分START音讯、STOP音讯和TERMINATE音讯。AkkaRpcActor接管到不同管制音讯的场景与解决逻辑各不相同,具体如下 :
- 当AkkaRpcActor接管到START音讯时,只有AkkaRpcActor的状态设置为开始状态,才能够解决流入的一般音讯。在AkkaRpcActor对应的RpcEndpoint启动时,会发送START音讯给AkkaRpcActor
- 当AkkaRpcActor接管到STOP音讯时,AkkaRpcActor处于不再解决流入的一般音讯且将接管到的一般音讯抛弃的状态。此时只会产生JobMaster失去领袖角色的状况。在这种 状况下,JobMaster会将作业设置为暂停状态(Suspended),同时向与其对应的 AkkaRpcActor发送STOP音讯
- 当AkkaRpcActor接管到TERMINATE音讯时,会调用对应RpcEndpoint的退出(onStop办法)逻辑。只有在Master或Worker过程失常退出或者过程中的组件产生致命谬误 (Fatal Error)而退出时,才会接管到TERMINATE音讯
一般音讯有RunAsync、CallAsync和RpcInvocation音讯三种类型。一般音讯在组件外部与组件间的应用场景各不相同,具体如下 :
- RunAsync音讯蕴含所需执行的Runnable和待执行的工夫点,不须要返回执行后果。组件中的runAsync和scheduleRunAsync办法最终会将RunAsync音讯发送给 AkkaRpcActor,从而线程平安地执行Runnable的run办法,批改RpcEndpoint实现类对象的状态
- CallAsync音讯蕴含所需执行的Callable,须要返回执行后果。调用callAsync办法会触发 客户端以ask模式将CallAsync音讯发送给AkkaRpcActor
RpcInvocation音讯分LocalRpcInvocation音讯和RemoteRpcInvocation音讯二者的区别是:
- LocalRpcInvocation用于本地Actor之间的RPC,不须要音讯的序列化和反序列 化,用于Master上运行时组件间的通信(如ResourceManager与JobMaster的通 信);
- RemoteRpcInvocation用于Actor近程通信中的RPC,须要序列化与反序列化,用 于Master组件与Worker组件的近程通信(如JobMaster与TaskExecutor的通信)
如感兴趣,点赞加关注,非常感谢!!!