关于pulsar:译文|基于-Pulsar-的事件驱动铁路网

34次阅读

共计 10551 个字符,预计需要花费 27 分钟才能阅读完成。

对于 Apache Pulsar

Apache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。

GitHub 地址:http://github.com/apache/pulsar/

本文译者:teng-da,作者:PAVELS SISOJEVS,文章首发于 InfoQ,原文地址:https://scala.monster/train-s…

浏览本文须要大概 10 分钟。

这张照片拍摄于瑞士的 Landwasser 高架桥。瑞士以其铁路网络闻名于世,依据维基百科,瑞士领有世界上最密集的铁路网。本文带你一起模仿瑞士的铁路网络。

咱们会用到 Apache Pulsar 和 Neutron。Apache Pulsar 是开源分布式 pub-sub 音讯零碎,最后由 Yahoo! 开发,目前属于 Apache 软件基金会。数据架构师、数据分析师、程序员等常常比照 Apache Pulsar 和 Apache Kafka,目前已有许多比照二者优劣势的文章。

Neutron 是基于 FS2 并发流解决库文件的 Pulsar 客户端。作为一款成熟的产品,Neutron 曾经用于 Chatroulette 的生产,但 Neutron 的开发并未进行。

领有一套玩具铁路网始终是我童年时的幻想。当初,我能够本人入手搭建一套虚构铁路网了。

接下来,咱们将一起开发一个事件驱动的铁路网络模拟器。

思路

咱们要搭建一套蕴含三个车站的铁路网:日内瓦、伯尔尼和苏黎世。其中日内瓦和苏黎世均与伯尔尼相连,但日内瓦与苏黎世不相连。

每个站点为一个节点,相连节点通过音讯 broker——Apache Pulsar 通信。节点生产其相连节点公布的事件。consumer 过滤传入事件后生产与特定城市相干的事件。

有两种形式能够管制模拟器的行为,一是增加可用于人工干预的 HTTP 端点。用户通过发送 HTTP 申请向零碎中增加新列车。

咱们不长久保留任何数据,无需应用数据库或缓存零碎,将所有数据保留在内存中。因而咱们能够应用相似于 Ref 的高级并发机制。

Apache Pulsar 是零碎的外围,负责节点间通信。一旦状态产生扭转,零碎应该公布形容这一动作的新事件。也就是说,每个事件都应该有一个 工夫戳 。此外,每个事件应有一个 列车 ID,代表特定列车的标识号码。初始时,有两个事件:

  • 登程(Departed)事件——列车登程时公布登程事件。
  • 达到(Arrived)事件——列车达到时公布达到事件。

这两个事件蕴含对于列车的根本信息:列车标识号码、登程城市、目的地城市、预计达到工夫和事件工夫戳。

每个城市都生产来自相连城市的事件。例如,苏黎世生产来自伯尔尼的事件,但不关注来自日内瓦的事件。苏黎世的事件 consumer 应确保可能捕捉到由伯尔尼 登程 并且苏黎世为目的地的事件。每个城市对应一个 topic,3 个城市就对应 3 个 topic。须要优化时,能够把通用的 “ 城市 topic “ 分成几个更具体的 topic。

业务逻辑通过 Neutron 连贯到 Apache Pulsar。

每个被生产的 topic 都会转换为 fs2 流,如果你不理解如何解决 fs2 流,能够参考 fs2 指南,本文代码不会波及到这部分内容。

我基于 cats 库的 Tagless Final 技术编写了这一应用程序,并以 ZIO 作为运行时 effect。

Pulsar 简介

Apache Pulsar 是分布式音讯和流平台,可用于搭建高扩展性零碎。零碎外部通过音讯进行通信,topic 数量可达数百万个。从开发者的角度来讲,Apache Pulsar 能够看作是一个黑匣子,但我倡议多理解它的底层工作原理。为了更好地了解本文中的操作,我先介绍几个概念:

  • topic——信息传输的媒介。topic 分为两种:

    1. 长久化 topic——长久存储音讯数据。
    2. 非长久化 topic——不长久存储音讯数据,将音讯保留在内存中。如果 Pulsar broker 宕机,所有传输中的音讯都会失落。
  • producer——与 topic 相连,用于公布音讯。
  • consumer——通过订阅与 topic 相连,用于接管音讯。
  • 订阅——制订向 consumer 公布音讯的配置规定。Pulsar 反对四种订阅类型:

    1. 独占——繁多 consumer,如有多个 consumer 同时订阅则会引发谬误;
    2. 故障转移——多个 consumer,但只有一个 consumer 能收到音讯;
    3. 共享——多个 consumer,以轮询形式接管音讯;
    4. Key_Shared——多个 consumer,按 key 散发音讯(一个 consumer 对应一个 key)。

音讯零碎公布事件后,由 producer 解决这些事件并公布到 topic 上,另一个零碎里的 consumer 通过 订阅 连贯到这个 topic。

理解更多对于 Apache Pulsar 的信息。

业务逻辑

上文提到铁路网中会产生的两个事件——列车的登程与达到。定义这两个事件的代码如下:

case class Departed(id: EventId, trainId: TrainId, from: From, to: To, expected: Expected, created: Timestamp) extends Event
case class Arrived(id: EventId, trainId: TrainId, from: From, to: To, expected: Expected, created: Timestamp)  extends Event

事件需蕴含零碎中已产生动作的根本信息:惟一的事件 id、列车 id、登程城市、目的地城市、预计达到工夫和理论事件工夫戳。咱们当前还能够增加站台号等信息。

为确保本文简略易懂,咱们对本零碎工作所需的数据量加以限度。为了便于辨别事件中的字段(比方目的地和登程城市),所有字段都应用强类型。

因为没有能够自动检测火车达到或登程的零碎,咱们须要手动管制铁路网。假如有一名火车调度员在通过按钮和仪表盘来管制铁路网络,咱们尽管没有炫酷的 UI,但能够搭建 API,API 的外围是两个简略的命令,用于触发车站的业务逻辑:

case class Arrival(trainId: TrainId, time: Actual)
case class Departure(id: TrainId, to: To, time: Expected, actual: Actual)

列车登程

让咱们从创立火车登程开始吧!这个命令比较简单,能够通过 cURL 发送:

curl --request POST \
  --url http://localhost:8081/departure \
  --header 'content-type: application/json' \
  --data '{"id":"153","to":"Zurich","time":"2020-12-03T10:15:30.00Z","actual":"2020-12-03T10:15:30.00Z"}'

上述命令假如伯尔尼服务节点在 8081 端口运行,每个节点都运行 HTTP 服务器,也都可能解决这一申请。咱们应用 Http4s 库作为 HTTP 服务器,第一个线路定义如下:

case req @ POST -> Root / "departure" =>
  req
    .asJsonDecode[Departure]
    .flatMap(departures.register)
    .flatMap(_.fold(handleDepartureErrors, _ => Ok()))

调用 Departures 服务仅需注册(register)一列登程的火车:

trait Departures[F[_]] {def register(departure: Departure): F[Either[DepartureError, Departed]]
}

Scala 反对多种验证数据的形式,我抉择最间接的一种——返回带有自定义谬误的 Either。如果火车注册胜利,则返回 Departed 事件;否则,返回谬误。

为确保本文简略易懂,咱们会在 Departures 服务执行过程中调用音讯 producer。首先需执行 Departures 服务,即在 Departures 伴生对象中创立 make 函数:

object Departures {def make[F[_]: Monad: UUIDGen: Logger](
      city: City,
      connectedTo: List[City],
      producer: Producer[F, Event]
  ): Departures[F] = new Departures[F] {def register(departure: Departure): F[Either[DepartureError, Departed]] = ???
  }
}

为实现 Departures 接口,咱们要给 effect F 设置边界:需有 UUIDGenLogger 实例。我曾经在程序中创立了虚构的 UUIDGenLogger 接口。

F 还应有 Monad 实例,用于连贯函数调用。

首先执行验证逻辑,查看 登程 事件是否无效。咱们只需查看目的地城市是否在相连城市列表中:

def validated(departure: Departure)(f: F[Departed]): F[Either[DepartureError, Departed]] = {
  val destination = departure.to.city
  connectedTo.find(_ === destination) match {
    case None =>
      val e: DepartureError = DepartureError.UnexpectedDestination(destination)
      F.error(s"Tried to departure to an unexpected destination: $departure")
       .as(e.asLeft)
    case _ =>
      f.map(_.asRight)
  }
}

如果目的地城市不在列表中,则生成错误信息日志并返回谬误。否则创立 Departed 事件并将其作为后果返回。

接下来须要实现 注册 性能,示例代码如下:

def register(departure: Departure): F[Either[DepartureError, Departed]] =
  validated(departure) {
    F.newEventId
      .map {
        Departed(
          _,
          departure.id,
          From(city),
          departure.to,
          departure.time,
          departure.actual.toTimestamp
        )
      }
      .flatTap(producer.send_)
  }

先验证目的地城市,若无效,生成一个 newEventId,用于创立新的 Departed 事件,该事件将通过传递给 make 函数的 producer 公布到 Pulsar 的 城市 topic。查看 Departures 事件的最终版本。

预计登程列车

咱们曾经理解了如何生成列车。如果一列火车从苏黎世开往伯尔尼,那么伯尔尼会收到相应告诉。

伯尔尼收听来自苏黎世的事件,一旦有把伯尔尼设为目的地的 Departed 事件,就将其退出预期列车表中。当初咱们只关注业务逻辑,后文会再探讨音讯生产。为预期 登程 事件定义 DepartureTracker,示例代码如下:

trait DepartureTracker[F[_]] {def save(e: Departed): F[Unit]
}

该服务会成为 Departed 事件流中的 sink,所以咱们不关注返回类型,也不心愿呈现任何验证谬误。和上文 Departures 服务一样,先创立伴生对象,定义 make 函数:

def make[F[_]: Applicative: Logger](
    city: City,
    expectedTrains: ExpectedTrains[F]
  ): DepartureTracker[F] = new DepartureTracker[F] {def save(e: Departed): F[Unit] =
      val updateExpectedTrains =
        expectedTrains.update(e.trainId, ExpectedTrain(e.from, e.expected)) *>
          F.info(s"$city is expecting ${e.trainId} from ${e.from} at ${e.expected}")
      updateExpectedTrains.whenA(e.to.city === city)
  }

咱们依赖于 ExpectedTrains 服务。ExpectedTrain 是存储进站列车的服务,咱们很快就能实现该服务。咱们实现了 save 函数,只有进站列车的目的地城市与预期城市相符时,该函数才会执行。例如,日内瓦和苏黎世均生产来自伯尔尼的事件。伯尔尼收回 Departed 事件时,其中一个城市会疏忽此音讯,而另一个城市,即目的地城市,会更新预期列车表,创立日志音讯。

预期列车存储中至多蕴含以下函数:

trait ExpectedTrains[F[_]] {def get(id: TrainId): F[Option[ExpectedTrain]]
  def remove(id: TrainId): F[Unit]
  def update(id: TrainId, expectedTrain: ExpectedTrain): F[Unit]
}

即便咱们尝试删除不存在于零碎中的列车,也不会操作失败。在某些业务状况下可能会呈现系统故障的谬误,但在这种非凡状况下,咱们会疏忽这一谬误。整个测试过程中,数据始终存储在内存中,不长久保留。

def make[F[_]: Functor](ref: Ref[F, Map[TrainId, ExpectedTrain]]
  ): ExpectedTrains[F] = new ExpectedTrains[F] {def get(id: TrainId): F[Option[ExpectedTrain]] = 
      ref.get.map(_.get(id))
    def remove(id: TrainId): F[Unit] = 
      ref.update(_.removed(id))
    def update(id: TrainId, train: ExpectedTrain): F[Unit] = 
      ref.update(_.updated(id, train))
  }

咱们在这一应用程序中应用 Ref 作为高级并发机制。

列车达到

业务逻辑三部曲的最初一部分是列车达到。与列车登程相似,先创立一个 HTTP 端点,能够用简略的 cURL POST 申请来调用:

curl --request POST \
  --url http://localhost:8081/arrival \
  --header 'Content-Type: application/json' \
  --data '{"trainId":"123","time":"2020-12-03T10:15:30.00Z"}'

再由 Http4s 路线解决申请:

case req @ POST -> Root / "arrival" =>
  req
    .asJsonDecode[Arrival]
    .flatMap(arrivals.register)
    .flatMap(_.fold(handleArrivalErrors, _ => Ok()))

Arrivals 服务相似于上文介绍的 Departures 服务。Arrivals 服务中也只有一个办法,即 register 办法:

trait Arrivals[F[_]] {def register(arrival: Arrival): F[Either[ArrivalError, Arrived]]
}

而后须要验证申请,示例代码如下:

def validated(arrival: Arrival)(f: ExpectedTrain => F[Arrived]): F[Either[ArrivalError, Arrived]] =
  expectedTrains
    .get(arrival.trainId)
    .flatMap {
      case None =>
        val e: ArrivalError = ArrivalError.UnexpectedTrain(arrival.trainId)
        F.error(s"Tried to create arrival of an unexpected train: $arrival")
         .as(e.asLeft)
      case Some(train) =>
        f(train).map(_.asRight)
    }

查看达到的列车是否与预期相符,若相符,则创立 Arrived 事件;否则,生成谬误日志。列车达到事件中 register 办法的实现中与之前 register 办法的实现相似:

def register(arrival: Arrival): F[Either[ArrivalError, Arrived]] =
  validated(arrival) { train =>
    F.newEventId
      .map {
        Arrived(
          _,
          arrival.trainId,
          train.from,
          To(city),
          train.time,
          arrival.time.toTimestamp
        )
      }
      .flatTap(a => expectedTrains.remove(a.trainId))
      .flatTap(producer.send_)
  }

Departures 相比,达到事件不仅公布了新事件,还把达到列车从预计登程列车列表中删除。

以上为全副业务逻辑,代码曾经通过单元测试(应用 ZIO Test 实现),可参考 GitHub 文件。

音讯生产

这一节次要讲音讯生产,也会把所有逻辑服务连在一起。

创立资源

首先创立所需资源。一个城市节点蕴含四个组件:配置、事件 producer、事件 consumer,以及存储 ExpectedTrainsRef。咱们能够把这四种资源在一个 case class 中组合起来,在 Main 类外创立:

final case class Resources[F[_], E](
  config: Config,
  producer: Producer[F, E],
  consumers: List[Consumer[F, E]],
  trainRef: Ref[F, Map[TrainId, ExpectedTrain]]
)

咱们应用 ciris 库从环境变量中读取 Config。对于配置,能够参考 GitHub 文件。咱们应用 Chatroulette 开发的 Neutron 库来创立 producer 和 consumer。

首先,创立一个 Pulsar 对象实例,用于与 Apache Pulsar 集群建设连贯:

Pulsar.create[F](config.pulsar.serviceUrl)

以上操作仅需 serviceUrl,咱们会失去 Resource[F, PulsarClient],能够用来创立 producer 和 consumer。创立 producer 之前,应该先创立蕴含 topic 配置的 topic 对象:

def topic(config: PulsarConfig, city: City) =
  Topic(Topic.Name(city.value.toLowerCase),
    config
  ).withType(Topic.Type.Persistent)

Topic 名就是城市名,而且是 长久化 topic,这样,任何未确认的音讯都不会失落。另外,作为配置的一部分,咱们传递了 命名空间 租户。对于命名空间和租户的更多信息,能够查阅 Pulsar 文档。

创立 producer 操作只是简略的一行:

def producer(client: Pulsar.T, config: Config): Resource[F, Producer[F, E]] =
  Producer.create[F, E](client, topic(config.pulsar, config.city))

创立 producer 的办法有很多,咱们抉择最简略的一种,只需应用之前创立的 Pulsar 客户端和一个 topic。

创立 consumer 所需操作稍多,因为还要创立 订阅

def consumer(client: PulsarClient, config: Config, city: City): Resource[F, Consumer[F, E]] = {val name         = s"${city.value}-${config.city.value}"
  val subscription =
          Subscription
            .Builder
            .withName(Subscription.Name(name))
            .withType(Subscription.Type.Failover)
            .build
  Consumer.create[F, E](client, topic(config.pulsar, city), subscription)
}

创立订阅,设置订阅名称为相连的城市名称与火车经停城市名组合。默认应用 Failover 订阅类型,并行运行 2 个实例(以防其中一个实例宕机)。

加上所需 Ref,咱们终于能够创立 Resources 了:

for {config    <- Resource.liftF(Config.load[F])
  client    <- Pulsar.create[F](config.pulsar.url)
  producer  <- producer(client, config)
  consumers <- config.connectedTo.traverse(consumer(client, config, _))
  trainRef  <- Resource.liftF(Ref.of[F, Map[TrainId, ExpectedTrain]](Map.empty))
} yield Resources(config, producer, consumers, trainRef)

请留神,咱们应用 traverse 办法在 connectedTo 城市列表中创立了一份 consumer 列表,点击 GitHub 文件查看最终后果。

启动引擎

咱们在应用程序中应用 zio.Task 作为 effect 类型。zio.Task 蕴含的类型参数起码,对于不相熟 ZIO 的人来说,zio.Task 更易了解。如果你想理解更多类型参数,能够参考 ZIO 简介。

首先,创立之前定义过的 Resources 类:

Resources
  .make[Task, Event]
  .use {case Resources(config, producer, consumers, trainRef) => ???
  }

仍然是 4 个参数。先初始化服务,为 HTTP 服务器创立 路线

val expectedTrains   = ExpectedTrains.make[Task](trainRef)
val arrivals         = Arrivals.make[Task](config.city, producer, expectedTrains)
val departures       = Departures.make[Task](config.city, config.connectedTo, producer)
val departureTracker = DepartureTracker.make[Task](config.city, expectedTrains)
val routes = new StationRoutes[F](arrivals, departures).routes.orNotFound

创立 HTTP 服务器:

val httpServer = Task.concurrentEffectWith { implicit CE =>
  BlazeServerBuilder[Task](ec)
    .bindHttp(config.httpPort.value, "0.0.0.0")
    .withHttpApp(routes)
    .serve
    .compile
    .drain
}

如果你很理解 Http4s,那么以上操作应该不难理解。若不理解,查看相干文档。开始生产传入音讯,并创立一个流:

val departureListener =
  Stream
    .emits(consumers)
    .map(_.autoSubscribe)
    .parJoinUnbounded
    .collect {case e: Departed => e}
    .evalMap(departureTracker.save)
    .compile
    .drain

简而言之,咱们应用 FS2 库创立了事件流。首先,创立 consumer 流,对每个 consumer 调用 autoSubscribe 办法,用于订阅 topic,再通过 parJoinUnbounded 把所有流合在一起,而后,用 collect 办法删除 Departed 以外的所有音讯。最初,在之前实现的 departureTracker 上调用 save 办法,编译并排出流。

当初有两个最终流:HTTP 服务器和 Pulsar 的传入音讯。此时咱们曾经解决完了所有音讯,只需运行流,即并行压缩并抛弃后果:

departureListener
  .zipPar(httpServer)
  .unit

组成 Main 类的代码块都比较简单,读取和保护也绝对容易。

结语

本文给出了事件驱动零碎示例,按步骤梳理了业务逻辑,模仿了瑞士铁路网。你能够在本文示例代码的根底上进行批改和拓展。

本文中应用到了 Apache Pulsar 的局部性能,但 Pulsar 不止于此,它操作繁难,功能强大。咱们搭建了一个简略的分布式系统,由几个节点组成,这些节点在 Apache Pulsar 上应用消息传递进行通信。本应用程序应用基于 cats 库的 Tagless Final 技术编写,其中 ZIO Task 为次要的 effect 类型。

此外,咱们还尝试了 Neutron,尽管 Neutron 已用于 Chatroulette 生产,但仍在开发中。

点击查看本程序的最终版本,操作指南可见 readme 局部。

相干浏览

  • 博文举荐|Pulsar 的音讯存储机制和 Bookie 的 GC 机制原理
  • 博文举荐|Apache Pulsar 在自研数据管道中的技术实际

正文完
 0