关于akka:Flink之Akka-RPC通信

52次阅读

共计 4150 个字符,预计需要花费 11 分钟才能阅读完成。

1、简说 Akka

Flink 外部节点之间的通信是用 Akka,比方 JobManager 和 TaskManager 之间的通信。而 operator 之间的数据传输是利用 Netty,所以是不是有必要说一下 Akka ?

Akka 和 Actor

并发问题的外围就是存在 数据共享 同时有多个线程对一份数据进行批改,就会呈现错乱的状况

解决该问题个别有两种形式 :
1、基于 JVM 内存模型的设计,通常须要通过加锁等同步机制保障共享数据的一致性。然而加锁在高并发的场景下,往往性能不是很好
2、应用消息传递的形式
Actor 的根底就是消息传递 一个 Actor 能够认为是一个根本的计算单元,它能接管音讯并基于其执行运算,它也能够发送音讯给其余 Actor。Actors 之间互相隔离,它们之间并不共享内存

Actor 自身封装了状态和行为,在进行并发编程时,Actor 只须要关注音讯和它自身。而音讯是一个不可变对象,所以 Actor 不须要去关注锁和内存原子性等一系列多线程常见的问题。

所以 Actor 是由 状态(State)、行为(Behavior)和邮箱(MailBox,能够认为是一个音讯队列)三局部组成

状态:Actor 中的状态指 Actor 对象的变量信息,状态由 Actor 本人治理,防止了并发环境下的锁和内存原子性等问题
行为:Actor 中的计算逻辑,通过 Actor 接管到的音讯来扭转 Actor 的状态
邮箱:邮箱是 Actor 和 Actor 之间的通信桥梁,邮箱外部通过 FIFO(先入先出)音讯队列来存储发送方 Actor 音讯,接受方 Actor 从邮箱队列中获取音讯

任意一个 Actor 即可发送音讯,也能够承受音讯

Akka 是一个构建在 JVM 上,基于 Actor 模型的的并发框架,反对 Java 和 Scala 两种 API

2、Akka 详解

应用 Akka 能够让你从为 Actor 零碎创立基础设施和编写管制根本行为所需的高级代码中解脱进去。为了了解这一点,让咱们看看在代码中创立的 Actor 与 Akka 在外部创立和治理的 Actor 之间的关系,Actor 的生命周期和失败解决

Akka 的 Actor 层级

  • Akka 的 Actor 总是属于父 Actor。通常,你能够通过调用 getContext().actorOf() 来创立 Actor。与创立一个“独立的”Actor 不同,这会将新 Actor 作为一个子节点注入到曾经存在的树中,创立 Actor 的 Actor 成为新创建的子 Actor 的父级。你可能会问,你发明的第一个 Actor 的父节点是谁?
  • 如下图所示,所有的 Actor 都有一个独特的父节点,即用户守护者。能够应用 system.actorOf() 在以后 Actor 下创立新的 Actor 实例。创立 Actor 将返回一个无效的 URL 援用。例如,如果咱们用 system.actorOf(…, “someActor”) 创立一个名为 someActor 的 Actor,它的援用将包含门路 /user/someActor
  • 事实上,在你在代码中创立 Actor 之前,Akka 曾经在零碎中创立了三个 Actor。这些内置的 Actor 的名字蕴含 guardian,因为他们守护他们所在门路下的每一个子 Actor。守护者 Actor 包含 :

    • /,根守护者(root guardian)。这是零碎中所有 Actor 的父 Actor,也是零碎自身终止时要进行的最初一个 Actor
    • /user,守护者(guardian)。这是用户创立的所有 Actor 的父 Actor。不要让用户名混同,它与最终用户和用户解决无关。应用 Akka 库创立的每个 Actor 都将有一个当时筹备的固定门路 /user/
    • /system,零碎守护者(system guardian)。这是除上述三个 Actor 外,零碎创立的所有 Actor 的父 Actor

示例:

public class HierarchyActorTest {public static void main(String[] args) throws Exception {ActorSystem system = ActorSystem.create("testSystem");
        ActorRef firstActor = system.actorOf(MyPrintActor.props(), "firstActor");
        System.out.println("firstActor :" + firstActor);
        firstActor.tell("print_info", ActorRef.noSender());

        System.out.println(">>> Press ENTER to exit <<<");
        try {System.in.read(); }
        finally {system.terminate();
        }
    }

    // 创立 Actor
   static class MyPrintActor extends AbstractActor {static Props props() {return Props.create(MyPrintActor.class, MyPrintActor::new);
        }

        @Override
        public Receive createReceive() {return receiveBuilder()
                    .matchEquals("print_info", p -> {ActorRef secondActorRef = getContext().actorOf(Props.empty());
                        System.out.println("secondActorRef :" + secondActorRef);
                    }).build();}
    }
}

输入后果 :

firstActor : Actor[akka://testSystem/user/firstActor#-1802697549]
>>> Press ENTER to exit <<<
secondActorRef : Actor[akka://testSystem/user/firstActor/$a#-1282757800]
  • 两条门路都以 akka://testSystem/ 结尾。因为所有 Actor 的援用都是无效的 URL,akka:// 是协定字段的值
  • ActorSystem 名为 testSystem,但它能够是任何其余名称。如果启用了多个零碎之间的近程通 信,则 URL 的这一部分包含主机名和端口,以便其余零碎能够在网络上找到它,上面会有案例
  • 因为第二个 Actor 的援用蕴含门路 /firstActor/,这个标识它为第一个 Actor 的子 Actor
  • Actor 援用的最初一部分,即 #-1802697549 和#-1282757800 是惟一标识符

Actor 的生命周期

  • 既然理解了 Actor 层次结构的样子,你可能会想 : 为什么咱们须要这个层次结构? 它是用来干什么的?
  • 层次结构的一个重要作用是平安地治理 Actor 的生命周期。接下来,咱们来考虑一下,这些常识如何帮忙咱们编写更好的代码
  • Actor 在被创立时就会呈现,而后在用户申请时被进行。每当一个 Actor 被进行时,它的所有子 Actor 也会被递归地进行。这种行为大大简化了资源清理,并有助于防止诸如由关上的套接字和文件引起的资源透露
  • 要进行 Actor,倡议的模式是调用 Actor 外部的 getContext().stop(getSelf()) 来进行本身,通常是对某些用户定义的进行音讯的响应,或者当 Actor 实现其工作时
  • Akka Actor 的 API 裸露了许多生命周期的钩子,你能够在 Actor 的实现中笼罩这些钩子。最罕用的是 preStart()postStop() 办法

    • preStart() 在 Actor 启动之后但在解决其第一条音讯之前调用
    • postStop() 在 Actor 进行之前调用,在此时之后将不再解决任何音讯

示例:

public class Actor1 extends AbstractActor {static Props props() {return Props.create(Actor1.class, Actor1:: new);
    }

    @Override
    public void preStart() throws Exception {System.out.println("first actor started");
        getContext().actorOf(Actor2.props(), "second");
    }

    @Override
    public void postStop() throws Exception {System.out.println("first actor stopped");
    }

    @Override
    public Receive createReceive() {return receiveBuilder()
                .matchEquals("stop", s -> {getContext().stop(getSelf());
                }).build();}
}

public class Actor2 extends AbstractActor {static Props props() {return Props.create(Actor2.class, Actor2::new);
    }

    @Override
    public void preStart() throws Exception {System.out.println("second actor started");
    }

    @Override
    public void postStop() throws Exception {System.out.println("second actor stopped");
    }

    @Override
    public Receive createReceive() {return receiveBuilder().build();}
}

public class LifeCycleMain {public static void main(String[] args) {ActorSystem system = ActorSystem.create("testSystem");
        ActorRef first = system.actorOf(Actor1.props(), "first");
        first.tell("stop", ActorRef.noSender());
    }
}

待更新。。。,写文章不易,如感兴趣点赞关注,谢谢!

正文完
 0