原文作者为 Luk Perkins,来自 Splunk 团队。
文章翻译已取得原作者受权。
音讯队列是大多数大规模数据架构的次要组件。如果 必须 对数据进行实时处理,那么应用音讯队列是很好的抉择。
数据处理管道会产生各种故障,数据 consumer 可能会受到提早或齐全不能工作,网络分区可能会临时切断整个 consumer 组与数据管道的连贯等。
有些状况必须应用音讯队列,例如:
- 开发拼车应用程序,不思考顶峰时段的应用峰值,须要确保每个乘车申请最终只匹配到一位司机
- 金融级事务交易管道须要同步申请解决,以避免数据失落
- 搭建基于微服务的解决管道,前端为具备多个写入端点的 REST API(每秒进行数千次运算),须要确保即便后端微服务呈现故障,所有的工作对象都保留在零碎中
音讯队列如何工作
下图为音讯队列 常见 的工作形式(并对故障做出响应)的示意图:
在上图中,producer 1、2、3 和 4 通过音讯 broker 将音讯发送到管道,而 consumer 1、2、3 和 4 解决(而后确认)这些音讯。在本示例中,当 consumer 1 呈现故障时,会呈现十分重大的问题 。Producer 会持续将数据传送到零碎中,但 consumer 1 不能持续解决音讯。Broker 应该 开始存储 所有本来将会用于 consumer 1 的音讯数据,直到 consumer 1 可能持续解决音讯。
从这个示例能够看出,对于堆栈中任何重要的音讯队列而言,稳固的存储组件都 必不可少。侥幸的是,音讯队列与反对音讯队列的存储系统一样性能良好。如果存储组件易发故障、受到损坏,或运行迟缓,因此即使仅有一个组件呈现故障,也不能很好地应答,那么强烈建议大家更换存储部件。
引入 Apache Pulsar
一般而言,由不同的零碎解决订阅 - 公布音讯和音讯队列。例如,典型的技术栈可能应用 Apache Kafka 解决公布 - 订阅音讯,应用 RabbitMQ 解决音讯队列。在这种状况下,尽管零碎工作良好,然而你须要同时部署、治理多个音讯零碎。
我最喜爱 Apache Pulsar 的一点就是,它能够轻松连贯订阅 - 公布音讯和音讯队列。Pulsar 是 第一个 为了 同时解决订阅 - 公布音讯和音讯队列 而开源的音讯零碎。
因为应用 Apache BookKeeper 分布式日志存储数据库作为存储组件,Pulsar 能够轻松地同时反对订阅 - 公布音讯和音讯队列。BookKeeper 作为日志存储系统,基于音讯 topic 数据结构而构建,反对程度扩大(减少“bookie”数量即可扩大容量),且运行迅速。
Pulsar 反对两种根本的 topic 类型:长久 topic 与非长久 topic。用户能够依据名称分别 topic 类型,因为类型即为 topic 名称的“schema”(相似于 https 是 URL https://google.com 的 schema)。
长久 topic 的名称格局为:persistent://public/default/some-topic,而非长久 topic 的名称格局为:non-persistent://public/default/some-topic。
用户应用长久 topic 时,Pulsar 将所有未确认音讯(即未解决音讯)存储在 BookKeeper 中的多个“bookie”服务器上。
Pulsar 确实反对非长久 topic,然而咱们倡议用户只在能够承受失落音讯的用例中,应用非长久音讯。对于具备音讯队列性能的 topic,绝不应该应用非长久 topic。与将音讯数据存储在内存中相比,这种存储形式具备很多劣势。
如何将 Apache Pulsar 用作音讯队列
Pulsar 无需非凡配置或调整,即可反对两种用例,因而在应用方面具备肯定的劣势。重点在于如何应用 Pulsar,如下图所示:
公布 - 订阅 producer 和 consumer 通过公布 - 订阅 topic 进行通信,而队列 producer 和 consumer 通过队列 topic 进行通信。不须要“标记”topic,也不须要预先指定 topic 为实时 topic 或队列 topic。
音讯队列 topic 须要 consumer 应用共享订阅,而不能是独占订阅(exclusive)或灾备订阅(failover)。另外,所有 consumer 必须应用雷同的订阅名称,否则就不是同一订阅。当 consumer 在 topic 上创立共享订阅后,Pulsar 会主动在接管音讯的 consumer 之间进行负载平衡,对于音讯队列来说,这是最现实的状态。
以下代码展现了五个 Java consumer 应用共享订阅监听同一 topic 的场景:
String PULSAR_SERVICE_URL = "pulsar://localhost:6650";
String MQ_TOPIC = "persistent://public/default/message-queue-topic";
String SUBSCRIPTION = "sub-1";
// Pulsar client
PulsarClient client = PulsarClient.builder()
.serviceUrl(PULSAR_SERVICE_URL)
.build();
// Base consumer builder for instantiating multiple consumers
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
.topic(MQ_TOPIC)
.subscriptionName(SUBSCRIPTION)
.subscriptionType(SubscriptionType.Shared)
.messageListener(messageCallback);
// Create five consumers (mq-consumer-0, mq-consumer-1, etc.)
IntStream.range(0, 4).forEach(i -> {String name = String.format("mq-consumer-%d", i);
consumerBuilder
.consumerName(name)
.subscribe();});
管制音讯调度
吞吐量在音讯队列中尤为重要。如果音讯队列没有足够的吞吐量来解决四周数据管道所须要的内容,那么音讯队列可能不仅性能不够好,甚至会产生一些负面影响。如果应用 Pulsar 作为音讯队列,则能够通过 调整 consumer 的配置来微调解决吞吐量。
默认状况下,Apache Pulsar consumer 有一个接管队列,用于一次解决多条音讯。用户能够自行配置单个 consumer 接管队列的大小(默认值为 1000 条音讯)。
现实状况下,应该依据 consumer 解决音讯的速度来设置接管队列的大小。如果能够十分疾速地解决音讯(只需几毫秒),那么倡议将接管队列的大小设置为较大的值,因为这样有助于最大化 consumer 的解决吞吐量。
然而如果解决音讯须要较长时间,最好将接管队列的大小设置为较小的值。如果 consumer 正在执行的工作属于 CPU 密集型,也就是说工作解决须要几秒钟甚至更久,则倡议将接管队列的大小设置为个位数或 1,这样负载平衡器可能在 consumer 之间正当地散发音讯。
在上面这段代码中,consumer 接管队列比拟小(Java):
Consumer<byte[]> consumer = client.newConsumer()
.topic("slow-processing-topic")
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("sub-1")
.receiverQueueSize(5)
.messageListener(messageCallback)
.subscribe();
接管队列的默认值实用于很多用例。然而倡议用户略微注意一下接管队列,免得在后续工作中须要进行调优。
一个音讯平台,两种用例场景
如果想在不同用例场景中同时运行多个音讯平台,大家能够思考应用 Pulsar。Pulsar 同时反对两种次要的音讯用例——公布 - 订阅音讯(尤其是长久音讯)和音讯队列,并且运行速度快、可扩大,还能够加重运维管理负担。