Akka-in-Schedulerx20

62次阅读

共计 3809 个字符,预计需要花费 10 分钟才能阅读完成。

1. 前言

Schedulerx2.0 是阿里中间件自研的基于 akka 架构的新一代分布式任务调度平台,提供定时、任务编排、分布式跑批等功能,具有高可靠、海量任务、秒级调度等能力。

本篇文章以 Schedulerx2.0 为例子,介绍 akka 的应用场景,希望能给同样从事分布式系统开发的同学一些启发。这里不详细介绍 akka,初学者可以直接阅读官方文档(https://doc.akka.io/docs/akka/current/index.html?language=java)。

2. Reactive

说到近几年火热的反应式编程,谁都能说几句“异步、并发、非阻塞、高性能”等等,说到有代表性的项目,大家都知道 RxJava、Akka、Reactor。

Why Reactive?
——因为 Schedulerx2.0 作为任务调度平台,支持海量任务调度,提供任务状态机感知任务状态变化,需要 Reactive 的特性。

Why Akka?
——首先 akka 很简单,每个 actor 只需要实现一个 onReceive 方法。其次,Akka 真的非常强大!我们可以看下官方文档(https://doc.akka.io/docs/akka/current/index.html?language=java),Akka 几乎提供了一整套解决方案,使用 akka 可以很方便的实现一套高可靠、高并发、高性能的分布式系统。Schedulerx2.0 也只用到了 akka 生态圈里的一小部分功能:

  • akka-actor
  • akka-eventbus:实现高性能工作流引擎
  • akka-remoting:实现进程间通信
  • akka-persistence:实现消息的 At-Least-Once Delivery

3. Akka-actor in Schedulerx2.0

Schedulerx2.0 支持百万级别任务,一天上亿次调度,从架构上来说,主要是
server 无状态,可水平扩展
基于 akka-actor 模型,单机性能高

Schedulerx2.0 提供任务状态机,如下图

当有海量任务汇报任务状态,单线程肯定是处理不过来的。如果用线程池又会遇到并发问题,比如当前按顺序收到如下消息:
msg1: Instance=100 running
msg2: Instance=101 running
msg3: Instance=102 failed
msg4: Instance=101 success
msg5: Instance=100 failed
有可能 instance=100 先变成 failed,最后变成 running,导致状态机错误。

通过 Akka-actor 架构的模型,可以很容易处理这种场景:

如上图所示,JobInstanceRoutingActor 作为路由 actor,用来转发消息。下面挂载了很多 jobInstanceActor,用来真实处理消息。
所有 instance 状态的消息都发给 JobInstanceRoutingActor,路由 actor 会把同一个 instanceId 的消息发给同一个 jobInstanceActor,akka 能保证一个 actor 按照消息接收的顺序来处理消息,以此又能保证整个状态机消息的顺序性。

Schedulerx2.0 中,大量采用了上面这种模型,来支撑 job/workflow/instance 等消息的传递。

4. 基于 Akka-eventbus 的 Pub-Sub 模式

在异步处理场景中,当然少不了 Pub-Sub 模式。相信很多人都用过 guava 的 eventbus,可以很简单很优雅的实现一套基于事件驱动的解决方案。通过 @Subscribe 注解就能注册要订阅的事件,通过 @AllowConcurrentEvents 注解还能设置并发消费事件。但是 guava-eventbus 在实现并发消费事件的时候非常暴力,公用一个线程池。这在 Schedulerx2.0 的应用场景中不太合适,比如某个 job 触发频率特别高,可能整个线程池都被他占满了,造成其他 job 饿死。

在项目中大量使用 actor 模型之后,如果使用原生的 actor 通信会发现很困难,因为得知道 actor 的地址才能和他通信。如果有些 actor 要给多个 actor 发送消息,你的项目就会变成一个网状的结构,新增一个 actor 经常会漏掉一些通信。这个时候我们就会想到 Pub-Sub 模式,所有 actor 通信只需要给事件总线发送消息,每个 actor 只需要订阅自己的事件就好了。

如上图所示,定时调度器、工作流引擎、任务状态机等大部分模块,都由 akka-eventbus 进行管理,每个模块都是第四节定义的路由 actor+ 业务 actor 的模型。通过该模型,相同的 job 交给同一个 actor 处理,不会堵塞其他 actor,同样解决了上文提到的 guava-eventbus 公用线程池的问题。实现类图如下:

5. 两行代码实现进程间通信

Schedulerx2.0 是 Server-Worker 的架构,server 和 worker,worker 和 worker 都需要进行通信,使用 akka-remoting 可以很容易实现任意 2 个进程之间的通信。

Akka-remoting 是 peer-to-peer 的通信方式,每个节点都会暴露一个远程地址,其他节点只要知道地址,就能进行远程通信。Akka-remoting 也抽象成一个 actor,会让你的程序保持高度的一致,只不过这个 actor 的地址是远程的地址而已。Akka-remoting 支持多种协议,使用起来非常简单,以 netty-tcp 为例,首先我们在 server 端定义一个配置文件 akka-server.conf

akka {
  actor {provider = "akka.remote.RemoteActorRefProvider"}
  remote {enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {port = 52014}
  }
}

Server 只需要 2 行代码就可以起一个 remote actor

ActorSystem actorSystem = ActorSystem.create("server", akkaConfig);
actorSystem.actorOf(HelloActor.props(), "hello");

Worker 也只需要 2 行代码就能实现和 server 通信

ActorSelection helloSelection = context.actorSelection("akka.tcp://server@xx.xx.xx.xx:52014/user/hello");
helloSelection.tell("hello",getSelf());

对比 Schedulerx1.0 使用原生 netty 框架通信需要如下这么多代码

怎么样,使用 akka 进行远程通信,是不是非常简单和优雅 ^^

6. 消息 At-Least-Once Delivery

Akka 默认的消息传递是最多传递一次,即通过 tell,如果发送失败,不会重发。At-Least-Once Delivery,提供了一个消息至少传递一次的语义,即保证不丢!这在 Schedulerx2.0 中很多场景是非常需要的,比如某个实例在 worker 执行成功了,汇报成功的时候 server 正好重启了导致汇报失败,会造成工作流下游都卡住没法继续执行。

使用 At-Least-Once Delivery 要继承 UntypedPersistentActorWithAtLeastOnceDelivery(akka-2.4.x)或者 AbstractPersistentActorWithAtLeastOnceDelivery(akka-2.5.x)。Akka 在 2.5.x 为了拥抱函数式编程,只支持 java8,并用了很多 stream 的接口,所以接口和 2.4.x 已经大大不一样了。在 Schedulerx2.0 中,worker 主要是给用户用的,为了兼容低版本的 jdk,所以用了 2.4.x 版本的 UntypedPersistentActorWithAtLeastOnceDelivery。

UntypedPersistentActorWithAtLeastOnceDelivery 继承 UntypedPersistentActor 和 AtLeastOnceDelivery。

  • UntypedPersistentActor:提供了持久化的 actor,对消息持久化、恢复等能力。
  • AtLeastOnceDelivery:主要是 deliver、confirmDelivery(long deliveryId)两个接口。

AtLeastOnceDelivery 的原理非常简单,worker 向 server 汇报状态的时候,tell 改为 deliver,deliver 会自动生成一个 deliveryId,封装进 request 发送给 server,server 需要实现把 deliveryId 封装到 response 中并返回给 worker,worker 收到 response 的时候调用 confiremDelivery,会从 unconfirmed 列表中移除这个 deliveryId 的 request,否则 AtLeastOnceDelivery 会有一个 timer,定期重试这条 request。如下图


本文作者:黄晓萌

阅读原文

本文为云栖社区原创内容,未经允许不得转载。

正文完
 0