共计 5287 个字符,预计需要花费 14 分钟才能阅读完成。
组件间的 近程通信、组件内的本地通信以及组件内的状态在并发状况下的保护,都是基于 Akka Actor 来实现的
1、Akka 与 Actor 模型
-
Akka 是构建高并发、分布式、可扩大利用的框架。Akka 让开发者只须要关注业务逻辑,不须要写底层代码来反对可靠性、容错和高性能。Akka 带来了诸多益处,比方 :
- 提供新的多线程模型,不须要应用低级的锁和原子性的操作来解决内存可见性问题
- 提供通明的近程通信,不再须要编写和保护简单的网络代码
- 提供集群式、高可用的架构,不便构建真正的响应式模式的利用
- Akka 是基于 Actor 模型实现的,Actor 模型相似于 Erlang 的并行模型,能使实现并发、并行和分布式应用更加简略
- Actor 是 Actor 模型中最重要的形成局部,作为最根本的计算单位,能接管音讯并基于其执行计算。每个 Actor 都有本人的邮箱,用来存储接管到的音讯。每个 Actor 维持公有的状态,来实现 Actor 之间的隔离
- 每个 Actor 都是由单个线程负责从各自的邮箱拉取音讯,并间断解决接管到的音讯。对于接管到的音讯,Actor 能够更改其外部的状态,或者将其传给其余 Actor,或者创立新的 Actor
- ActorSystem 是 Actor 的工厂和管理者 。ActorSystem 会为其 Actor 提供调度、配置和日志等公共服务。多个 ActorSystem 能够共存于同一台机器中。 如果一个 ActorSystem 是以 RemoteActorRefProvider 的形式启动的,则它能够被近程 ActorSystem 拜访到 。ActorSystem 能主动判断 Actor 音讯是出自同一个 ActorSystem 的 Actor 还是来自近程 ActorSystem 的 Actor。 本地的 Actor 间通信,音讯通过共享内存传递; 而近程的 Actor 间通信,音讯通过网络栈传递
2、理解Actor 的创立、启动以及音讯的发送
Server 端
RemoteServerActor通过继承 AbstractActor 来实现 Actor,并实现 AbstractActor 的 createReceive 办法,来实现接管到音讯的解决逻辑 : 接管到 String 的音讯,将音讯内容打印到控制台。
public class RemoteServerActor extends AbstractActor {
@Override
public Receive createReceive() {return receiveBuilder()
.match(String.class, message-> {System.out.println(message);
}) .build();}
}
RemoteServerActorLauncher类首先通过加载 remote.conf 的配置(其中配置的 provider 为 RemoteActorRefProvider),来创立名为 remote、反对近程通信的 ActorSystem; 再通过 actorSystem 调用 actorOf 办法创立并启动名为 remoteServerActor 的 Actor 实例,并返回 Actor 地址 (ActorRef); 最初通过 ActorRef 调用 tell 办法向 RemoteServerActor 发送音讯
RemoteServerActorLauncher 启动 Actor 和发送音讯
public class RemoteServerActorLauncher {public static void main(String[] args) {Config config = ConfigFactory.load("remote.conf");
ActorSystem actorSystem = ActorSystem.create("remote", config);
ActorRef actor = actorSystem.actorOf(Props.create(RemoteServerActor.class), "remoteServerActor");
actor.tell("hello!", ActorRef.noSender());
}
}
remote.conf 配置 :
akka {
actor {provider = "akka.remote.RemoteActorRefProvider"}
remote {enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 50050
}
}
}
在这个示例中,整个 Actor 创立与音讯发送的流程具体步骤如下 :
1) 创立近程的 ActorSystem
2) 创立并启动 RemoteServerActorLauncher 实例,返回 ActorRef,创立音讯并通过 ActorRef 发送音讯
3) ActorRef 将音讯委托给 Dispatcher 发送到 Actor
4) Dispatcher 把音讯暂存在邮箱中,Dispatcher 中封装了一个线程池,用于音讯派发,实现异步音讯发送的成果
5) 从邮箱中取出音讯,委派给 RemoteServerActorLauncher 中通过 createReceive 办法创立的 Receive 实例来解决
Client 端
在 LocalClient 中,通过 Akka URL 与 RemoteServerActor 进行近程通信 。 首先通过加载 client.conf 来 配置近程 Actor 的地址状况。client.conf 的配置与 remote.conf 一样 ,在理论生产中,client.conf 中 的 hostName 为近程 Actor 对应的机器或者虚拟机的实在 IP。再 依据加载的配置创立 local 的 ActorSystem,而后 actorSystem 调用 actorSelection 的办法失去近程 Actor 的地址。最初获取近程 Actor 地址并发送给近程 Actor
public class LocalClient {public static void main(String[] args) {Config config = ConfigFactory.load("client.conf");
ActorSystem actorSystem = ActorSystem.create("local", config);
ActorSelection toFind = actorSystem.actorSelection("akka.tcp://remote@127.0.0.1:50050/user/remoteServerActor");
toFind.tell("I am from local.", ActorRef.noSender());
}
}
client.conf 配置内容:
akka {
actor {provider = "akka.remote.RemoteActorRefProvider"}
remote {enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 50050
}
}
}
在下面的示例中,发送音讯应用了 tell 模式。Actor 的发送音讯模式有 ask、tell 和 forward,三者的特点如下 :
ask 模式 : 发送音讯异步,并返回一个 Future 来代表可能的音讯回应
tell 模式 : 一种 fire-and-forget(发后即忘)的形式,发送音讯异步并立刻返回,无返回信息
forward 模式 : 相似邮件的转发,将收到的音讯由一个 Actor 转发到另一个 Actor
3、组件内通信
- 对于这些组件内的多线程拜访,没有锁和算子操作来保障状态,而次要通过 runAsync 办法、callAsync 办法、scheduleRunAsync 办法,以及通过getMainThreadExecutor 调度来执行 Future 的回调办法,来实现对组件 状态的平安操作
- 组件间通过 RpcGateway 子类 的办法实现近程的办法调用
- 组件外部的平安状态操作是基于本地 Actor 实现的,而组件间的通信是通过近程 Actor 实现的
-
组件内的本地通信与组件间通信的设计与实现,首先来看下组件通信的整体状况。组件通信相干的类位于 flink-runtime 模块下的 org.apache.flink.runtime.rpc 包中,组件通信的次要局部如下 :
- RpcEndpoint : 近程过程调用端点 (rpc) 根底类 ,提供近程过程调用的分布式组件须要继承这个根底类。 运行时组件 Dispatcher、TaskExecutor、ResourceManager 和 JobMaster 组件都继承了 RpcEndpoint
- AkkaRpcActor : 接管 RpcInvocation、RunAsync、CallAsync 和 ControlMessages 的音讯 来实现运行时组件中状态的平安操作
- AkkaInvocationHandler : 作为 RpcAkka 调用的 Handler,AkkaRpcActor 接管到的 RunAsync、CallAsync 和 RpcInvocation 音讯都由 AkkaInvocationHandler 发送
- AkkaRpcService : 实现 RpcService 接口,负责启动 AkkaRpcActor 和连贯到 RpcEndpoint。连贯到一个 RpcEndpoint,会返回 RpcGateway,供近程过程调用
- 与不带 Fenced 结尾的类相比,以 Fenced 结尾的类只是多了对 FencingToken 的解决逻辑
-
AkkaRpcActor
- 首先来看解决音讯的 AkkaRpcActor。除 REST 以外,其余运行时组件(Dispatcher、
TaskExecutor、ResourceManager 和 JobMaster)都有一个 AkkaRpcActor 对象
- akkaRpcActor 负责接管音讯,并对音讯进行解决 ,以操作 RpcEndpoint(Dispatcher、TaskExecutor、ResourceManager 和 JobMaster 是 RpcEnpoint 类中的子类) 的状态,实现对 RpcEndpoint 实现类对象的生命周期管制和状态操作
- AkkaRpcActor 解决的音讯分为近程握手音讯(RemoteHandshakeMessage)、管制音讯和一般音讯。近程握手音讯次要用于在 RpcEndpoint 之间的近程通信建设连贯之前,查看 RpcEndpoint 之间版本是否兼容
-
管制音讯分 START 音讯、STOP 音讯和 TERMINATE 音讯。AkkaRpcActor 接管到不同管制音讯的场景与解决逻辑各不相同,具体如下 :
- 当 AkkaRpcActor 接管到 START 音讯时,只有 AkkaRpcActor 的状态设置为开始状态,才能够解决流入的一般音讯。在 AkkaRpcActor 对应的 RpcEndpoint 启动时,会发送 START 音讯给 AkkaRpcActor
- 当 AkkaRpcActor 接管到 STOP 音讯时,AkkaRpcActor 处于不再解决流入的一般音讯且将接管到的一般音讯抛弃的状态。此时只会产生 JobMaster 失去领袖角色的状况。在这种 状况下,JobMaster 会将作业设置为暂停状态(Suspended),同时向与其对应的 AkkaRpcActor 发送 STOP 音讯
- 当 AkkaRpcActor 接管到 TERMINATE 音讯时,会调用对应 RpcEndpoint 的退出 (onStop 办法) 逻辑 。只有在 Master 或 Worker 过程失常退出或者过程中的组件产生致命谬误 (Fatal Error) 而退出时,才会接管到 TERMINATE 音讯
-
一般音讯有RunAsync、CallAsync 和 RpcInvocation 音讯三种类型。一般音讯在组件外部与组件间的应用场景各不相同,具体如下 :
- RunAsync 音讯蕴含所需执行的 Runnable 和待执行的工夫点,不须要返回执行后果。组件中的 runAsync 和 scheduleRunAsync 办法最终会将 RunAsync 音讯发送给 AkkaRpcActor,从而线程平安地执行 Runnable 的 run 办法,批改 RpcEndpoint 实现类对象的状态
- CallAsync 音讯蕴含所需执行的 Callable,须要返回执行后果。调用 callAsync 办法会触发 客户端以 ask 模式将 CallAsync 音讯发送给 AkkaRpcActor
-
RpcInvocation 音讯分 LocalRpcInvocation 音讯和 RemoteRpcInvocation 音讯 二者的区别是:
- LocalRpcInvocation 用于本地 Actor 之间的 RPC,不须要音讯的序列化和反序列 化,用于 Master 上运行时组件间的通信(如 ResourceManager 与 JobMaster 的通 信);
- RemoteRpcInvocation 用于 Actor 近程通信中的 RPC,须要序列化与反序列化,用 于 Master 组件与 Worker 组件的近程通信(如 JobMaster 与 TaskExecutor 的通信)
如感兴趣,点赞加关注,非常感谢!!!