关于pulsar:实现可扩展的流处理Pulsar-KeyShared-订阅模式

7次阅读

共计 8003 个字符,预计需要花费 21 分钟才能阅读完成。

本文翻译自 StreamNative 博客《Scalable Stream Processing with Pulsar’s Key_Shared Subscription》[1],作者:David Kjerrumgaard。
译者:刘梓霖、段嘉

摘要

1. 传统的音讯零碎通过一个 topic 上的多个并发消费者实现了高吞吐量、无状态解决。
2. 流零碎为单个消费者提供有状态的解决,但在吞吐量上有所保留。
3.Pulsar 的 Key_Shared 订阅类型容许对单个 topic 进行高吞吐量和有状态解决。
4.Pulsar 的 Key_Shared 订阅类型适宜须要对大量数据进行有状态解决的用户场景,例如个性化、实时营销、微定向广告和网络安全。

在建设 Pulsar 的 Key_Shared 订阅前,用户在应用传统流零碎框架时须决定是在一个 topic 上领有多个消费者以取得高吞吐量,还是领有一个消费者以取得有状态的解决。本博客中将介绍如何应用 Pulsar 的 Key_Shared 订阅对点击流数据进行行为剖析。

音讯零碎和流零碎之区别

很多开发者认为音讯零碎和流零碎实质上是一样的,因而常常混用这两个术语。然而,音讯零碎和流零碎是截然不同的,理解它们之间的区别能够让用户依据本人的用户场景抉择适合的零碎。

本节内容比拟了各自的音讯生产和解决语义,帮忙大家了解为什么有时独自的音讯零碎和流零碎都不能满足你的场景,以及为什么有些场景须要对立的音讯和流零碎。

音讯零碎

应用音讯零碎的外围数据结构是音讯队列。传入的音讯以先进先出(FIFO)的顺序存储。音讯被保留在队列内,直到被生产。一旦音讯被生产,音讯就会被删除,以便为新传入的音讯腾出空间。

从消费者解决的角度来看,消息传递是齐全无状态的,因为每条音讯都蕴含执行解决所需的所有信息。因而能够在不须要来自先前音讯的任何信息的状况下进行操作,容许用户在多个消费者之间调配音讯解决,缩小解决提早。

音讯零碎非常适合用户心愿扩充某个 topic 的并发消费者数量以减少解决吞吐量的场景。很好的例子是传统的工作队列,即须要由一个订单执行的微服务来解决传入的电子商务订单。因为每个订单都是独立于其余订单的,通过减少从队列中生产的微服务实例的数量来满足需要。

Pulsar 的共享订阅就是为此类型的场景设计。如图 1 所示,它通过确保每条音讯精确地传递给附加订阅的一个消费者来提供消息传递语义。


图示 1:Pulsar 的共享订阅类型反对多个消费者。

流零碎

在流解决中,核心数据结构是日志,它是一个按工夫排序的追加记录序列。音讯被追加到日志的结尾,读取程序顺次从最早到最新。音讯生产是一种非破坏性的流解决操作,因为消费者只是更新它在流中的地位。

从解决的角度来看,流是有状态的,因为流解决是在一连串的音讯上进行的,这些音讯通常依据工夫或大小被分组为固定大小的“窗口”(例如:每 5 分钟)。流解决依赖于窗口中所有音讯的信息以产生正确的后果。

流零碎非常适合聚合操作,例如计算传感器读数的简略挪动平均值,因为所有传感器读数必须由同一个消费者组合解决,以便计算正确数值。

Pulsar 的独占订阅为这种类型的场景提供了正确的流解决语义。如图 2 所示,独占订阅模式确保所有音讯都依照接管的工夫程序传递给单个消费者。


图示 2:Pulsar 的独占订阅模式反对繁多消费者。

比照与取舍

如你所见,音讯队列和流提供了不同的解决语义。音讯零碎通过反对多个并发消费者来达到反对高扩大。在解决须要疾速解决的大量数据时,应该应用音讯零碎,这样每个音讯从产生到被解决之间的提早都很低。

流零碎领有更为简单的剖析解决能力,但以就义每个 topic 分区的可扩展性为代价。为了失去准确后果,只容许单个消费者解决数据,因而解决数据的速度会受到重大的限度,这导致流零碎场景中呈现更高的提早。

只管能够通过应用分片和分区来缩小提早,但可扩展性依然无限。将解决的可扩展性与分区的数量做绑定会升高架构的灵活性。更改分区数量也会影响数据公布到 topic 的形式。因而,只有当你须要有状态的解决并且可能容忍较慢的解决时才应该应用流解决。

然而,如果你的场景是既须要低提早又须要有状态解决应如何抉择?如果你在应用 Apache Pulsar,那么你应该思考 Key_Shared 订阅模式,它提供的解决语义将消息传递和流解决的合二为一。

Apache Pulsar’s Key_Shared 订阅模式

音讯是 Pulsar 的根本单元,它们不仅仅包含生产者和消费者之间发送的原始字节,还包含一些元数据字段。如图 3 所示,每个 Pulsar 音讯中的一个元数据字段是“key”字段,能够包容一个字符串值。这就是 Key_Shared 订阅用来进行分组的字段。


图示 3:一条 Pulsar 音讯蕴含可选的元数据字段,其中蕴含一个名为”key”的字段,是 Key_Shared 订阅用于分组的字段。

Pulsar Key_Shared 订阅模式反对多个并发消费者,因而你能够通过减少消费者的数量来轻松升高解决提早。在此方面,它提供了音讯队列类型的语义,因为每个音讯都能够独立于其余音讯进行解决。

然而,这种订阅类型与传统的 Shared 订阅类型的不同之处就在于它在消费者之间散发数据的形式。与任何消费者都能够解决任意音讯的传统消息传递不同,在 Pulsar 的 Key_Shared 订阅中,音讯被调配到消费者中,并保障具备雷同 key 的音讯被发送到同一个消费者。


图示 4:Pulsar 的 Key_Shared 订阅类型确保具备雷同 key 的音讯依照收到的程序发送到同一个消费者。

Pulsar 通过对传入的 key 值进行哈希并将哈希值平均分配给订阅的所有消费者来实现此等保障。因而,咱们晓得具备雷同 key 的音讯将产生雷同的哈希值,并被发送到与先前具备雷同 key 的同一个消费者。

通过确保所有具备雷同 key 的音讯都被发送到同一个消费者中,且消费者能够保障依照收到的程序接管特定 key 的所有音讯,这合乎流式生产语义。让咱们来摸索一个能够无效应用 Pulsar 的 Key_Shared 订阅的实在用例。

场景案例:点击流数据(Clickstream Data)的行为剖析

基于点击流数据在电子商务网站上提供实时有针对性的举荐是一个很好的案例。因为它须要低提早地解决大量数据,咱们通过点击流数据(Clickstream Data)的行为剖析解释阐明 Key_Shared 订阅模式。

点击流数据

点击流数据是指单个用户在与网站交互时执行的点击程序。点击流蕴含了用户的所有交互,例如点击的地位、拜访的页面以及在每个页面上破费的工夫。


图示 5:点击流数据是代表集体与网站交互事件的工夫序列。

该类数据可用于剖析并报告特定网站上的用户行为,例如路由、粘性和通过网站的常见用户门路的跟踪。点击风行为基本上是用户与特定网站互动的序列。

数据跟踪

为了接管点击流数据,须要将一些跟踪软件嵌入到网站中,以便收集点击流事件并将其转发到剖析零碎中。这些标签通常是一小段 JavaScript 代码,捕捉集体级别用户行数据(例如 IP 地址和 cookie)。每次用户点击标记的网站时,跟踪软件都会检测到该事件,并通过 HTTP POST 申请以 JSON 格局将信息收集并转发到服务端。

列表 1 是一个跟踪库生成的 JSON 对象示例,这些点击流事件在生产剖析之前就包含了做聚合、过滤和填充等解决用到的信息。

{
   "app_id":"gottaeat.com",
   "platform":"web",
   "collector_tstamp":"2021-08-17T23:46:46.818Z",
   "dvce_created_tstamp":"2021-08-17T23:46:45.894Z",
   "event":"page_view",
   "event_id":"933b4974-ffbd-11eb-9a03-0242ac130003",
   "user_ipaddress":"206.10.136.123",
   "domain_userid":"8bf27e62-ffbd-11eb-9a03-0242ac130003",
   "session_id":"7",
   "page_url":"http://gottaeat.com/menu/shinjuku-ramen/zNiq_Q8-TaCZij1Prj9GGA"
   ...
}

列表 1:蕴含个人用户辨认信息的点击流事件示例。

在任何工夫都可能有数百万沉闷的 JavaScript 跟踪器,每个跟踪器都在收集公司网站上单个访问者的点击流事件。这些事件被转发到单个标签收集器,该标签收集器将它们间接公布到 Pulsar topic 中。


图 6:跟踪器收集单个用户的点击流事件,将其转发给单个收集器。一旦收到事件就会公布,导致来自多个用户的数据穿插在该 topic 中。

从图 6 中能够看出问题:因为这些 JavaScript 标签彼此不协调,来自多个用户的点击流数据最终会混合在 Pulsar 主题中。起因是咱们只会对单个用户的点击流数据做行为剖析。

身份拼接

为了正确剖析数据,首先须要将每个用户的原始点击流事件组合在一起,以确保能够依照它们产生的程序残缺地理解他们的交互旅程。这种从混合数据重构每个用户点击流的过程称为身份拼接。它是通过基于尽可能多的用户惟一标识符将点击流事件关联在一起来实现的。

这就是 Key_Shared 订阅模式完满的用例:须要依照事件产生的程序解决每个独自用户的残缺事件流,因而须要流数据处理语义,并且须要扩大此解决以匹配公司网站上的流量。Pulsar 的 Key_Shared 订阅容许您同时进行这两个解决。

为了重建每个用户的点击流,在点击流事件中应用 domain_userid 字段,它是由 JavaScript 标记生成的惟一标识符。此字段是随机生成的通用惟一标识符 (UUID),用于惟一标识每个用户。因而所有具备雷同 domain_userid 值的点击流事件都属于同一个用户。应用此值让 Pulsar 的 Key_Shared 订阅将所有用户的事件组合在一起。

应用 Key_Shared 订阅模式

为实现行为剖析,须要全面理解用户与网站的交互状况,因而须要确保将单个用户的所有点击组合在一起,并将它们传递给同一个消费者。正如上一节中探讨的,每个点击流事件中的 domain_userid 字段都蕴含用户的惟一标识符。通过应用这个值作为音讯键,当咱们应用 Key_Shared 订阅时,Pulsar 能够保障将所有雷同用户的事件传递给同一个消费者。

数据填充

这个从 JavaScript 标签收集并转发的 JSON 对象,蕴含原始 JSON 字节(key 段为空)。因而,为了利用 Key_Shared 订阅,首先须要用每个 JSON 对象内的 domain_userid 字段的值填充音讯键来丰盛该音讯。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.manning.pulsar.chapter4.types.TrackingTag;
import org.apache.pulsar.client.impl.schema.JSONSchema;
public class WebTagEnricher implements Function<String, Void> {
    static final String TOPIC = "persistent://tracking/web-activity/tags";
    @Override
    public Void process(String json, Context ctx) throws Exception {ObjectMapper objectMapper = new ObjectMapper();
    TrackingTag tag = objectMapper.readValue(json, TrackingTag.class);
        
    ctx.newOutputMessage(TOPIC, JSONSchema.of(TrackingTag.class))
        .key(tag.getDomainUserId())
        .value(tag)
        .send();
        
    return null;
    }
}

列表 2:Pulsar Function 将原始标签字节转换为 JSON 对象,并将 domain_userid 字段的值复制到传出音讯的 key 字段中。

这能够通过一段绝对简略的代码来实现,如列表 2 所示,它解析 JSON 对象、获取 domain_userid 字段的值,并输入一条蕴含原始点击流事件的新音讯,该事件的键填充为用户的 UUID。这种类型的逻辑解决是 Pulsar Functions 的完满场景案例。此外,因为逻辑是无状态的,因而能够应用共享订阅类型并行执行,这将最大限度地缩小执行此工作所需的解决耗时。

应用 Key_Shared 订阅进行身份拼接

一旦应用正确的键值将蕴含点击流事件的音讯空虚切当,下一步就是确认 Key_Shared 订阅解决执行身份拼接。列表 3 中的代码在 Key_Shared 订阅上启动了总共五个消费者。

public class ClickstreamAggregator {
  static final String PULSAR_SERVICE_URL = "pulsar://localhost:6650";
  static final String MY_TOPIC = "persistent://tracking/web-activity/tags\"";
  static final String SUBSCRIPTION = "aggregator-sub";
  public static void main() throws PulsarClientException {PulsarClient client = PulsarClient.builder()
          .serviceUrl(PULSAR_SERVICE_URL)
          .build();
    ConsumerBuilder<TrackingTag> consumerBuilder = 
       client.newConsumer(JSONSchema.of(TrackingTag.class))
            .topic(MY_TOPIC)
            .subscriptionName(SUBSCRIPTION)
            .subscriptionType(SubscriptionType.Key_Shared)
            .messageListener(new TagMessageListener());
    
       IntStream.range(0, 4).forEach(i -> {String name = String.format("mq-consumer-%d", i);
          try {
            consumerBuilder
                .consumerName(name)
                .subscribe();} catch (PulsarClientException e) {e.printStackTrace();
           }
       });
    }
}

列表 3:主类应用 MessageListener 接口在同一个 Key_Shared 订阅上启动消费者,该接口运行在外部线程池中。

新事件达到 TagMessageListener 类时,其解决逻辑为如下所示。因为消费者很可能会被调配多个键,因而传入的点击流事件须要存储在外部映射中,该映射应用每个网页访问者的 UUID 作为键。因而,通过应用 Apache Commons 库中的最近起码应用(LRU) 映射实现来实现,通过在事件变满时删除事件中最旧的元素来确保映射放弃固定大小。

import org.apache.commons.collections4.map.LRUMap;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
public class TagMessageListener implements MessageListener<TrackingTag> {
  private LRUMap<String, List<TrackingTag>> userActivity = 
    new LRUMap<String, List<TrackingTag>>(100);
  @Override
  public void received(Consumer<TrackingTag> consumer, 
    Message<TrackingTag> msg) {
    try {recordEvent(msg.getValue());
      invokeML(msg.getValue().getDomainUserId());
      consumer.acknowledge(msg);
    } catch (PulsarClientException e) {e.printStackTrace();
    }
  }
  private void recordEvent(TrackingTag event) {if (!userActivity.containsKey(event.getDomainUserId())) {userActivity.put(event.getDomainUserId(), 
           new ArrayList<TrackingTag> ());
    }       
    userActivity.get(event.getDomainUserId()).add(event);
  }
  // Invokes the ML model with the collected events for the user    
  private void invokeML(String domainUserId) {. . .} 
}

列表 4:负责聚合点击流事件的类应用 LRU 映射按用户 ID 对事件进行排序。每个新事件都会追加到以前的事件列表中。而后能够通过机器学习模型输出这些列表以生成举荐数据。

当新事件达到时,它会被增加到相应用户的点击流中,从而为已调配给消费者的用户 key 重构点击流。

实时行为剖析

当初既已重构了点击流,能够将它们提供给机器学习模型,该模型将为公司网站的每个访问者提供有针对性的举荐,比方依据购物车中的商品、最近查看的商品或优惠券倡议将商品增加到购物车。通过实时行为剖析,可能通过个性化举荐来改善用户体验,有助于进步转化率和均匀订单规模。

总结

传统音讯队列通过多个并发消费者对一个 topic 进行解决。典型的场景是订单微服务解决生产订单的传统工作队列。对于此类场景,能够应用 Pulsar 的共享订阅。

传统的流零碎进行有状态的数据处理,一个主题上只有一个消费者,但在吞吐量上有限度。流零碎可用于更简单的剖析解决能力。Pulsar 的 Exclusive 和 Failover 订阅模式旨在反对这种语义。

Pulsar 的 Key_Shared 订阅类型容许对单个 topic 进行高吞吐量和有状态解决。它非常适合须要对大量数据进行有状态解决的场景,例如个性化、实时营销、微定向广告和网络安全。

更多无关 Pulsar 的 Key_Shared 订阅的信息,可浏览 Apache Pulsar 文档[2]。

致谢
感激 Apache Pulsar 社区志愿者刘梓霖 @珏衫、段嘉 @Janusjia 对本文的翻译。

援用链接
[1]《Scalable Stream Processing with Pulsar’s Key_Shared Subscription》: https://streamnative.io/en/bl…
[2] Apache Pulsar 文档: https://pulsar.apache.org/doc…

点击进入取得更多技术信息~~

正文完
 0