乐趣区

关于apache:博文推荐|使用-Apache-Pulsar-和-Scala-进行事件流处理

本文翻译自《Event Streaming with Apache Pulsar and Scala》,作者 Giannis。

译者信息:姚余钱 @深圳觉行科技有限公司,致力于医疗大数据畛域。热衷开源,沉闷于 Apache Pulsar 社区。

本文作者 Giannis Polyzos,StreamNative 高级工程师,主攻 Apache Pulsar 方向。Apache Pulsar 是云原生音讯流平台,领有广大前景。在本文中,他将介绍 Pulsar 是什么以及它杰出的性能,而后通过疾速教程以帮忙读者入门 Scala 语言运行 Pulsar。

文章摘要

在古代数据时代,对尽可能快地提供数据洞察的需要一直减少。“以后正在”产生的事件在几分钟甚至几秒钟后就可能变得无关紧要,因而越来越须要尽可能快地接管和处理事件——无论是为了改善业务使其在要求刻薄的市场中更具竞争力,还是为了使一个零碎能依据其所受到的环境刺激而自我成长和适应。

随着容器和云基础设施的倒退,公司在寻求利用和采纳云原生的方法。迁徙到云端并在零碎中采纳容器意味着咱们很可能会利用 Kubernetes 等技术来实现其所有惊人的性能。将基础架构搁置云端并采纳云原生解决方案意味着很多用户也心愿其消息传递和流解决方案合乎这些准则。

在这篇文章中,咱们将介绍如何应用 Apache Pulsar 和 Scala 实现云原生事件流解决。咱们将回顾 Apache Pulsar 在这个古代数据时代须具备的能力,是什么让它怀才不遇,以及如何通过应用 Scala 和 pulsar4s 库创立一些简略的生产者和消费者来运行它。

1. 什么是 Apache Pulsar

如文档中所述,

Apache Pulsar 是一个云原生、分布式音讯和流平台,每天治理数千亿个事件。

它最后是在 2013 年由 Yahoo 创立的,以满足其微小的扩大需要 – 工程团队过后也审查了相似 Apache Kafka 等解决方案(只管这些零碎尔后有了很大的倒退),但并没有齐全满足他们的需要。

其它零碎短少跨地区复制、多租户和偏移量治理等个性,以及解决音讯积压状况下的性能,因而 Apache Pulsar 诞生了。

让咱们认真看看是什么让它怀才不遇:

  1. 对立音讯和流两种场景:对于 Apache Pulsar,您应该留神的第一件事是,它是音讯和流的对立平台。音讯和流这两个术语常常被一概而论,理论存在基本差别。例如在消息传递的场景中,用户心愿音讯一到就立即生产它,而后将该音讯删除;然而,对于流解决的场景,用户可能心愿保留音讯并可能重现它们。
  2. 多租户:Apache Pulsar 从一开始就被设计成一个多租户零碎。您能够将多租户视为不同的用户组,每个用户组都在本人的隔离环境中运行。Pulsar 的逻辑架构由租户、命名空间和主题组成。命名空间是租户内主题的逻辑分组。您能够应用定义的层次结构轻松映射组织的需要,并提供隔离、身份验证、受权、配额以及在命名空间和主题级别利用不同的策略。电子商务业务的多租户示例如下,将 WebBanking 和 Marketing 等不同部门作为租户,而后这些部门成员能够在租户内进行操作。

  1. 跨地区复制:跨地区复制就是通过在跨地区上散布的不同集群的数据中心提供数据副原本保障容灾能力。Apache Pulsar 提供开箱即用的跨地区复制,无需内部工具。相似 Apache Kafka 的代替计划依赖于第三方解决方案 —— 即 MirrorMaker – 来解决此类已知存在问题的场景。应用 Pulsar,您能够通过弱小的内置跨地区复制克服这些问题,并设计满足您需要的劫难复原解决方案。
  2. 程度扩大:Apache Pulsar 的架构由三个组件组成。Pulsar Broker 是无状态服务层,Apache BookKeeper(bookie 服务器)作为存储层,最初 Apache ZooKeeper 作为元数据层 —— 只管 2.8.0 版本引入了元数据层作为代替(参考 PIP 45)。所有层都彼此拆散,这意味着您能够依据须要独立地扩大每个组件。Apache BookKeeper 应用分布式 ledger 的概念而不是基于日志的形象,这使得它非常容易扩大,无需从新均衡。这也使得 Apache Pulsar 非常适合云原生环境。
  3. 分层存储:当您解决大量数据时,主题可能会无限度地变大,随着工夫的推移,(存储老本)可能会变得十分低廉。Apache Pulsar 提供了分层存储,因而随着主题的增长,您能够将旧数据卸载到一些更便宜的存储中(例如 Amazon S3),而您的客户端依然能够拜访数据并持续提供服务,就如同什么都没有产生过一样。
  4. Pulsar Functions:Pulsar Functions 是一个轻量级的无服务器计算框架,容许您以非常简单的形式部署本人的流解决逻辑。轻量级也使其成为物联网边缘剖析用例的绝佳抉择。

Apache Pulsar 还有很多个性,比方内置的 Schema Registry、对事务和 Pulsar SQL 的反对,但当初让咱们看看如何真正启动和运行 Pulsar,并用 Scala 创立咱们的第一个生产者和消费者。

2. 场景和集群设置示例

以简略的场景作为例子,咱们创立一个读取模仿传感器事件的生产者,将事件发送到一个主题,而后在另一端创立订阅该主题并仅读取传入事件的消费者。咱们将应用 pulsar4s client 库进行实现,同时应用 docker 运行 Pulsar 集群。为了以单机模式启动 Pulsar 集群,请在终端中运行以下命令:

docker run -it \ 
    -p 6650:6650 \ 
    -p 8080:8080 \ 
    --name pulsar \ 
    apachepulsar/pulsar:2.8.0 \ 
    bin/pulsar standalone

该命令将启动 Pulsar 并将必要的端口绑定到本地计算机。随着集群的启动运行,能够开始创立生产者和消费者。

3. Apache Pulsar 生产者

首先,须要有 pulsar4s-core 和 pulsar4s-circe 依赖——所以咱们须要将以下内容增加到 build.sbt 文件中:

val pulsar4sVersion = "2.7.3"

lazy val pulsar4s       = "com.sksamuel.pulsar4s" %% "pulsar4s-core"  % pulsar4sVersion
lazy val pulsar4sCirce  = "com.sksamuel.pulsar4s" %% "pulsar4s-circe" % pulsar4sVersion

libraryDependencies ++= Seq(pulsar4s, pulsar4sCirce)
``
Then we will define the message payload for a sensor event as follows:

而后咱们为传感器事件定义音讯负载,如下所示:


case class SensorEvent(sensorId: String,
                         status: String,
                         startupTime: Long,
                         eventTime: Long,
                         reading: Double)

咱们还须要引入以下范畴内容:

import com.sksamuel.pulsar4s.{DefaultProducerMessage, EventTime, ProducerConfig, PulsarClient, Topic}
import io.ipolyzos.models.SensorDomain
import io.ipolyzos.models.SensorDomain.SensorEvent
import io.circe.generic.auto._
import com.sksamuel.pulsar4s.circe._
import scala.concurrent.ExecutionContext.Implicits.global

所有应用程序的生产和生产的次要入口点是 Pulsar Client,它解决与 broker 的连贯。在 Pulsar 客户端中,您还能够设置集群的身份验证或者调整其余重要的配置,例如超时设置和连接池。您能够通过提供要连贯的 service url 来简略地实例化客户端。

val pulsarClient = PulsarClient("pulsar://localhost:6650")

有了客户端,让咱们看看生产者的初始化和循环。

val topic = Topic("sensor-events")

// create the producer
val eventProducer = pulsarClient.producer[SensorEvent](ProducerConfig(
  topic, 
  producerName = Some("sensor-producer"), 
  enableBatching = Some(true),
  blockIfQueueFull = Some(true))
)

// sent 100 messages 
(0 until 100) foreach { _ =>
   val sensorEvent = SensorDomain.generate()
   val message = DefaultProducerMessage(Some(sensorEvent.sensorId), 
      sensorEvent, 
      eventTime = Some(EventTime(sensorEvent.eventTime)))
  
   eventProducer.sendAsync(message) // use the async method to sent the message
}

这里有几点须要留神:

  • 咱们通过提供必要的配置来创立生产者 —— 生产者和消费者都是高度可配的,能够依据所需场景进行配置。
  • 咱们在此处提供生产者的主题名称,启用批处理并使生产者在队列已满时阻塞操作。
  • 通过启用批处理,Pulsar 将应用外部队列来保留音讯(默认值为 1000),并在队列满后将批次发送音讯给 broker。
  • 正如您在示例代码中看到的,咱们应用 .sendAsync() 办法将音讯发送到 Pulsar。这是在不期待确认的状况下发送音讯,同时因为咱们将音讯缓冲到队列中,这可能会使客户端不堪重负。
  • 通过选项 blockIfQueueFull 应用反压机制并告诉生产者在发送更多音讯之前期待。
  • 最初,咱们创立要发送的音讯。这里咱们指定 sensorId 作为音讯的键、sensorEvent 的值,咱们还提供 eventTime,即事件产生的工夫。

此时,咱们的生产者就位,开始向 Pulsar 发送音讯。残缺的实现详见此处。

4. Apache Pulsar 消费者

当初让咱们把注意力转移到生产方面。就像咱们对生产端所做的那样,生产端须要配置一个 Pulsar Client。

val consumerConfig = ConsumerConfig(Subscription("sensor-event-subscription"), 
  Seq(Topic("sensor-events")), 
  consumerName = Some("sensor-event-consumer"), 
  subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest), 
  subscriptionType = Some(SubscriptionType.Exclusive)
)

val consumerFn = pulsarClient.consumer[SensorEvent](consumerConfig) 

var totalMessageCount = 0 
while (true) {   
  consumerFn.receive match {case Success(message) => 
         consumerFn.acknowledge(message.messageId)
         totalMessageCount += 1
         println(s"Total Messages'$totalMessageCount - Acked Message: ${message.messageId}")
      case Failure(exception) => 
         println(s"Failed to receive message: ${exception.getMessage}")
  }
}

步骤顺次如下:

  • 同样的,首先创立消费者配置。在这里咱们指定一个订阅名称,要订阅的主题、消费者的名称,以及咱们想要消费者开始生产音讯的地位 —— 在这里咱们指定 Earliest – 这意味着订阅将在其确认的最初一条音讯之后开始读取。
  • 最初,咱们指定 SubscriptionType – 在这里它是 Exclusive(独占)类型,这也是默认的订阅类型(稍后会具体介绍订阅类型)。
  • 配置就绪后,应用创立的配置设置一个新的消费者,而后咱们就有了一个简略的生产循环 —— 咱们要做的就是应用接管办法读取一条新音讯,该办法会阻塞直到有音讯可用,而后咱们确认音讯,最初获取到目前为止收到的音讯总数以及已确认的 messageId。
  • 请留神:当收到新音讯时,如果一切顺利,您须要向客户端确认,否则须要应用 negativeAcknowledge() 办法否定确认音讯。
  • 有了生产实现形式,咱们就有了一个在运行的公布 - 订阅应用程序,它为 Pulsar 主题生产传感器事件,以及一个订阅并生产这些音讯的消费者。
  • 消费者的残缺实现详见此处。

5. Apache Pulsar 订阅类型

正如文章中提到的,Apache Pulsar 通过提供不同的订阅类型做到了提供音讯和流模式的对立。

Pulsar 有以下订阅类型:

  • 独占订阅:任何工夫点都只容许一个消费者应用订阅读取音讯
  • 灾备订阅:在任何工夫点只容许一个消费者应用订阅读取音讯,但您能够有多个备用消费者来接手工作,以防沉闷的消费者失败
  • 共享订阅:能够将多个消费者附加到订阅上,并以轮询形式在它们之间共享工作。
  • 键共享订阅:能够将多个消费者附加到订阅上,并且每个消费者都调配有一组惟一的键。该消费者负责解决调配给它的一组键。如果失败,将为另一个消费者调配该组键。

不同的订阅类型用于不同的场景。例如,为了实现典型的扇出消息传递模式,您能够应用独占或灾备订阅类型。对于音讯队列和工作队列,可抉择共享订阅,以便在多个消费者之间共享工作;而对于流场景或基于键的流解决,灾备和键共享订阅不失为很好的抉择,它们能够容许有序生产或基于某些键扩大您的解决。

6. 总结与浏览延长

在这篇文章中,咱们简要介绍了 Apache Pulsar 是什么、它作为对立的音讯流平台与怀才不遇的能力、如何创立一些简略的生产和生产应用程序,最初咱们重点介绍了 Pulsar 是如何通过不同的订阅模式对立音讯和流。

延长浏览:

  • Pulsar IO:轻松地将数据移入和移出 Pulsar
  • Pulsar Functions(Pulsar 的无服务器和轻量级的计算框架):用户能够应用它们在 Pulsar 主题上解决逻辑,缩小生产和生产应用程序所需的所有样板代码
  • Function Mesh:通过利用 Kubernetes 原生性能(如部署和主动缩放)使您的事件流真正成为云原生。

关注 公众号「Apache Pulsar」,获取更多技术干货

退出 Apache Pulsar 中文交换群👇🏻

点击 链接 浏览英文原文

退出移动版