1、简说Akka
Flink 外部节点之间的通信是用 Akka,比方 JobManager 和 TaskManager 之间的通信。 而 operator 之间的数据传输是利用 Netty,所以是不是有必要说一下Akka ?
Akka和Actor
并发问题的外围就是存在数据共享,同时有多个线程对一份数据进行批改,就会呈现错乱的状况
解决该问题个别有两种形式 :
1、基于JVM内存模型的设计,通常须要通过加锁等同步机制保障共享数据的一致性。然而加锁在高并发的场景下,往往性能不是很好
2、应用消息传递的形式
Actor的根底就是消息传递,一个Actor能够认为是一个根本的计算单元,它能接管音讯并基于其执行运算,它也能够发送音讯给其余Actor。Actors 之间互相隔离,它们之间并不共享内存
Actor 自身封装了状态和行为,在进行并发编程时,Actor只须要关注音讯和它自身。而音讯是一个不可变对象,所以 Actor 不须要去关注锁和内存原子性等一系列多线程常见的问题。
所以Actor是由状态(State)、行为(Behavior)和邮箱(MailBox,能够认为是一个音讯队列)三局部组成
状态:Actor 中的状态指Actor对象的变量信息,状态由Actor本人治理,防止了并发环境下的锁和内存原子性等问题
行为:Actor 中的计算逻辑,通过Actor接管到的音讯来扭转Actor的状态
邮箱:邮箱是 Actor 和 Actor 之间的通信桥梁,邮箱外部通过 FIFO(先入先出)音讯队列来存储发送方Actor音讯,接受方Actor从邮箱队列中获取音讯
任意一个Actor即可发送音讯,也能够承受音讯
Akka是一个构建在JVM上,基于Actor模型的的并发框架,反对Java和Scala两种API
2、Akka详解
应用 Akka 能够让你从为 Actor 零碎创立基础设施和编写管制根本行为所需的高级代码中解脱进去。为了了解这一点,让咱们看看在代码中创立的Actor与Akka在外部创立和治理的Actor之间的关系,Actor的生命周期和失败解决
Akka的Actor层级
- Akka的Actor总是属于父Actor。通常,你能够通过调用 getContext().actorOf() 来创立 Actor。与创立一个“独立的”Actor不同,这会将新Actor作为一个子节点注入到曾经存在的树中,创立Actor的Actor成为新创建的子Actor的父级。你可能会问,你发明的第一个Actor的父节点是谁?
- 如下图所示,所有的 Actor 都有一个独特的父节点,即用户守护者。能够应用 system.actorOf() 在以后Actor下创立新的Actor实例。创立 Actor 将返回一个无效的 URL 援用。例如,如果咱们用 system.actorOf(..., "someActor") 创立一个名为 someActor 的 Actor,它的援用将包含门路 /user/someActor
事实上,在你在代码中创立 Actor 之前,Akka 曾经在零碎中创立了三个 Actor 。这些内置的 Actor 的名字蕴含 guardian ,因为他们守护他们所在门路下的每一个子 Actor。守护者 Actor 包含 :
- / ,根守护者( root guardian )。这是零碎中所有Actor的父Actor,也是零碎自身终止时要进行的最初一个 Actor
- /user ,守护者( guardian )。这是用户创立的所有Actor的父 Actor。不要让用户名混同,它与最终用户和用户解决无关。应用Akka库创立的每个Actor都将有一个当时筹备的固定门路/user/
- /system ,零碎守护者( system guardian )。这是除上述三个Actor外,零碎创立的所有Actor的父Actor
示例:
public class HierarchyActorTest { public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("testSystem"); ActorRef firstActor = system.actorOf(MyPrintActor.props(), "firstActor"); System.out.println("firstActor : " + firstActor); firstActor.tell("print_info", ActorRef.noSender()); System.out.println(">>> Press ENTER to exit <<<"); try { System.in.read(); } finally { system.terminate(); } } // 创立Actor static class MyPrintActor extends AbstractActor { static Props props() { return Props.create(MyPrintActor.class, MyPrintActor::new); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("print_info", p -> { ActorRef secondActorRef = getContext().actorOf(Props.empty()); System.out.println("secondActorRef : " + secondActorRef); }).build(); } }}
输入后果 :
firstActor : Actor[akka://testSystem/user/firstActor#-1802697549]>>> Press ENTER to exit <<<secondActorRef : Actor[akka://testSystem/user/firstActor/$a#-1282757800]
- 两条门路都以akka://testSystem/结尾。因为所有 Actor的援用都是无效的URL, akka://是协定字段的值
- ActorSystem名为testSystem ,但它能够是任何其余名称。如果启用了多个零碎之间的近程通 信,则URL的这一部分包含主机名和端口,以便其余零碎能够在网络上找到它,上面会有案例
- 因为第二个 Actor的援用蕴含门路 /firstActor/ ,这个标识它为第一个Actor的子Actor
- Actor援用的最初一部分,即#-1802697549和#-1282757800是惟一标识符
Actor的生命周期
- 既然理解了Actor层次结构的样子,你可能会想 : 为什么咱们须要这个层次结构?它是用来干什么的?
- 层次结构的一个重要作用是平安地治理Actor的生命周期。接下来,咱们来考虑一下,这些常识如何帮忙咱们编写更好的代码
- Actor在被创立时就会呈现,而后在用户申请时被进行。每当一个Actor被进行时,它的所有子 Actor也会被递归地进行。这种行为大大简化了资源清理,并有助于防止诸如由关上的套接字和文件引起的资源透露
- 要进行Actor,倡议的模式是调用Actor外部的 getContext().stop(getSelf()) 来进行本身,通常是对某些用户定义的进行音讯的响应,或者当Actor实现其工作时
Akka Actor的API裸露了许多生命周期的钩子,你能够在 Actor 的实现中笼罩这些钩子。最罕用的是 preStart() 和 postStop() 办法
- preStart() 在 Actor 启动之后但在解决其第一条音讯之前调用
- postStop() 在 Actor 进行之前调用,在此时之后将不再解决任何音讯
示例:
public class Actor1 extends AbstractActor { static Props props() { return Props.create(Actor1.class, Actor1:: new); } @Override public void preStart() throws Exception { System.out.println("first actor started"); getContext().actorOf(Actor2.props(), "second"); } @Override public void postStop() throws Exception { System.out.println("first actor stopped"); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("stop", s -> { getContext().stop(getSelf()); }).build(); }}public class Actor2 extends AbstractActor { static Props props() { return Props.create(Actor2.class, Actor2::new); } @Override public void preStart() throws Exception { System.out.println("second actor started"); } @Override public void postStop() throws Exception { System.out.println("second actor stopped"); } @Override public Receive createReceive() { return receiveBuilder().build(); }}public class LifeCycleMain { public static void main(String[] args) { ActorSystem system = ActorSystem.create("testSystem"); ActorRef first = system.actorOf(Actor1.props(), "first"); first.tell("stop", ActorRef.noSender()); }}
待更新。。。,写文章不易,如感兴趣点赞关注,谢谢!