乐趣区

关于apache:KoP-280-新特性前瞻内附视频

导读:在 4 月 11 日 TGIP-CN 直播流动上,咱们邀请到 StreamNative 工程师徐昀泽,他为大家分享了 KoP 2.8.0 新个性前瞻。上面是徐昀泽分享视频的简洁文字整顿版本,供大家参考。

在 4 月 11 日 TGIP-CN 直播中,来自 StreamNative 的软件工程师徐昀泽为大家带来了《KoP 2.8.0 性能个性预览》的分享。上面是其分享视频的简洁文字整顿版本,敬请参考。

明天分享的内容是《KoP(Kafka on Pulsar)2.8.0 新个性前瞻》,首先我简略自我介绍下:我就任于 StreamNative,是 Apache Pulsar 的 Contributor,也是 KoP 的次要维护者。

对于 KoP 版本号标准

首先,咱们先聊一聊 KoP 版本号的问题。

Apache Pulsar 领有 Major release。因为 KoP 后期版本治理比拟凌乱,所以从 2.6.2.0 开始,KoP 的版本号跟 Pulsar 根本保持一致。KoP Master 分支会不定期的更新依赖的 Pulsar 版本。这样的话,如果想要有一些新的性能能够在 Pulsar 中提一个 PR,而后 KoP 去依赖这个办法就行了。KoP 2.8.0 是一个能够实用于生产的一个版本。

明天我次要从如下次要四点为大家开展本次的直播:

  • 为什么须要 KoP
  • KoP 的根本实现
  • KoP 2.8.0-SNAPSHOT 版本的近期停顿
  • 近期打算及将来瞻望

Kafka Vs Pulsar

首先说一下我对 Kafka 和 Pulsar 这两个零碎的认识,抛开一些杂七杂八的 feature 的话,这两个零碎还是很类似的,但最大的区别是它们的存储模型。

Kafka 的 Broker 是兼具计算和存储的。所谓计算是指把 Client 发过来的数据抽象成不同的 topic 和 partition,可能还会做一些 schema 等等之类的解决,解决后音讯会写到存储上。Kafka 的 Broker 解决完会间接写到这台机器上的 file system。而 Pulsar 却不同,它会写到一个 Bookie 集群,每个 Bookie 节点是对等的。

分层存储带来了很多益处,比方你想减少吞吐量能够加一台 Broker;如果想减少磁盘容量的话,能够减少 Bookie,而且因为它的每个节点是对等的,所以是无需进行 rebalance 也没有 Kafka 的 Leader 的 Follower 之分。当然这不是本次 Talk 的重点,我想表白的是它们架构上最大的差别就是 Kafka 写入本地文件,Pulsar 写入 Bookie。

另一方面尽管我认为两者没有相对的优劣,然而大家有抉择的自在。我置信有很多场景都是能够用 Pulsar 来代替 Kafka 的。

从 Kafka 迁徙到 Pulsar

如果我看中的 Pulsar 的一些长处,想从 Kafka 迁徙到 Pulsar,那么会遇到什么问题呢?

  1. 推动业务更换客户端?
  • 业务说太麻烦,不想换。
  • Pulsar adaptors?(Pulsar 推出了一个 adaptor,Kafka 的代码无需扭转要改一下 maven 的依赖即可)
  • 看起来不错,惋惜我不是用的 Java 客户端。
  • 我不嫌麻烦,但我只会 PHP。
  1. 用户间接应用了 Kafka 连接器(近百种)连贯到内部零碎怎么办?
  2. 用户应用内部零碎的连接器连贯到 Kafka 怎么办?

KoP(Kafka on Pulsar)

面对上述从 Kafka 迁徙到 Pulsar 的多种问题,KoP(Kafka on Pulsar)我的项目应运而生。KoP 将 Kafka 协定解决插件引入 Pulsar Broker,从而实现 Apache Pulsar 对原生 Apache Kafka 协定的反对。借助 KoP,用户不必批改代码就能够将现有的 Kafka 应用程序和服务迁徙到 Pulsar,从而应用 Pulsar 的弱小性能。对于 KoP 我的项目的背景,能够理解 KoP 相干材料,这里不再赘述。

如上图,从 Pulsar 2.5.0 开始引入 Protocol Handler,它运行在 Broker 的服务之上。默认的是 Pulsar Protocol Handler 其实只是一个概念,它是与 Pulsar 的客户端进行通信。Kafka Protocol Handler 是动静加载的,配置好相当于加载了一层插件,通过这个插件与 Kafka 客户端进行通信。

KoP 的应用非常简单,只需将 Protocol Handler 的 NAR 包放入 Pulsar 目录下的 protocols 子目录,对 broker.conf 或 standalone.conf 增加相应配置,启动时就会默认启动 9092 端口的服务,与 Kafka 相似。

目前来说,KoP 反对的客户端:

  • Java >= 1.0
  • C/C++: librdkafka
  • Golang: sarama
  • NodeJS:
  • 其余基于 rdkafka 的客户端

Protocol Handler

Protocol Handler 其实是一个接口,咱们能够实现本人的 Protocol Handler。Broker 的启动流程:

从目录下加载这个 Protocol Handler,再去加载 Class,用 accept 办法和 protocolName 办法来验证,而后就是循序渐进的三步:

  • initialize()
  • start()
  • newChannelInitializer()

第一步,加载 Protocol Handler 的配置。Protocol Handler 与 Broker 共用同一配置,所以这里用的也是 ServiceConfiguiation。Start 这一步就最重要的因为它传入了 BrokerService 这个参数。

BrokerService 掌控每个 Broker 的所有资源:

  • 连贯的 producers,subscriptions
  • 持有的 topic 及其对应的 managed ledgers
  • 内置的 admin 和 client

KoP 的实现

Topic & Partition

Kafka 与 Pulsar 很多中央都很类似。Kafka 外面 TopicPartition 是一个字符串和 int;Pulsar 略微简单一些,分了如下局部:

  • 是否长久化
  • 租户
  • 命名空间
  • 主题
  • 分区编号

KoP 中有这三项配置:

  • 默认租户:kafkaTenant=Public
  • 默认命名空间:kafkaNamespace=default
  • 禁止主动创立 non-partitioned topic:allowAutoTopicCreationType=partitioned

为什么要配一个禁止主动创立 non-partitioned topic 的配置呢?因为 Kafka 中只有 partitioned topic 的概念而没有这个 non-partitioned topic 的概念。如果用 Pulsar 客户端去主动创立一个 topic,可能导致 Kafka 的客户端无法访问这个 topic。在 KoP 外面做一些简略的解决,将默认的租户跟命名空间独立映射。

Produce & Fetch 申请

PRODUCE 申请:

  • 通过 topic 名字找到 PersistentTopic 对象(内含 ManagedLedger)。
  • 对音讯格局进行转换。
  • 异步写入音讯到 Bookie。

FETCH 申请:

  • 通过 topic 名字找到 PersistentTopic 对象。
  • 通过 Offset 找到对应的 ManagedCursor。
  • 从 ManagedCursor 对应地位读取 Entry。
  • 对 Entry 格局进行转换后将音讯返回给客户端。

Group Coordinator

Group Coordinator 是用来进行 rebalance,决定 partition 和 group 的映射关系。因为 Group 会有多个消费者,消费者会拜访哪些 partition,这个就是由 Group Coordinator 来决定的。

当 consumer 退出(订阅)一个 group 时:

  • 会发送 JoinGroup 申请,告诉 Broker 有新的消费者退出。
  • 会发送 SyncGroup 申请用于 partition 的调配。

还会把信息发给 Client,consumer 再发一个新的申请,拿到 Broker 的一些调配的信息。Group Coordinator 会把这些 group 相干的信息写入一个非凡的 topic。

KoP 这里也做了一些配置,这个非凡的 topic 会存在于一个默认的 namespace 下,它的 partition 数量默认是 8。Kafka group 根本等价于 Pulsar Failover subscription。如果想让 Kafka 的 Offset 被 Pulsar 客户端辨认的话,就须要 Offset 对应的 MessageId 进行 ACK。因而 KoP 外面有个组件是叫 OffsetAcker,它保护了一组 Consumer。每次 Group Coordinator 要进行 ACK 时,就会创立一个 partition 对应的 consumer 来把 group ACK。

这里会提到一个“namespace bundle”的概念。Group Coordinator 决定了 consumer 与 partition 的映射关系。

在 Apache Pulsar 中,每台 broker 都领有(own)一些 Bundle range(如上图示例);topic 会按名字哈希到其中一个 Bundle range,这个 range 的 owner broker 就是 topic 的 owner broker,那么你订阅的 topic 就连贯对应到 broker。这里大家要留神两个问题,一是 bundle 可能会决裂(你也能够配置使其禁止决裂),二是 Broker 有可能挂掉,因而导致 bundle 和 Broker 的映射关系可能产生扭转。因而为了避免这两个问题的产生,KoP 注册了一个监听器(listener),可用来感知 bundle ownership 的变动,一旦 bundle ownership 发生变化则告诉 Group Coordinator 调用处理函数进行解决。

Kafka Offset

先介绍下 Kafka Offset 与 Pulsar MessageId 这两个概念。Kafka Offset 是一个 64 位整型,用来标识音讯存储的地位,Kafka 的音讯存储在本机所以能够用整数来示意音讯的序号。Pulsar 是将音讯存储在 Bookie 上,Bookie 可能散布在多台机器,因而 Bookie 应用 Ledger ID 与 Entry ID 来示意音讯的地位。Ledger ID 能够了解对应 Kafka 中的 Segment,Entry ID 则近似等价于 Kafka Offset。Pulsar 中的 Entry 对应的不是单条音讯,而是一条打包后的音讯,因而产生了 Batch Index。由此,须要 Ledger ID、Entry ID 和 Batch Index 三个字段独特标记一条 Pulsar 的音讯。

那么,就不能单纯的将 Kafka Offset 映射为 Pulsar 的 MessageID,这样简略的解决可能会造成 Pulsar 音讯失落。在 KoP 2.8.0 之前,通过对 Pulsar LedgerID、Entry ID 和 Batch Index 别离调配 20 位、32 位、12 位拼凑成一个 Kafka Offset (如上图所示),这种调配策略在少数状况下可行,可能保障 Kafka offset 的有序性,但面对 MessageID 拆分依然难以提出「适合」的调配计划,存在以下几种情景的问题:

  • 比方,调配给 LedgerID 20 字节,在 2^20 时会产生 LedgerID 耗尽的问题,也容易造成 Batch Index 字节用光的状况;
  • 从 cursor 读取 entry 时只能一个一个读取,否则可能导致 Maximum offset delta exceeded 问题;
  • 有些第三方组件(比方 Spark)依赖于间断 Offset 的性能

鉴于上述对于 Kafka Offset 的种种问题,StreamNative 联结腾讯工程师独特提出基于 broker entry metadata 的优化计划 PIP 70: Introduce lightweight broker entry metadata,新计划可参考下图右侧示意。

上图左侧:目前 Pulsar 音讯组成包含 Metadata 与 Payload 两局部,Payload 指的是具体写入的数据,Metadata 则是元数据如公布工夫戳等。Pulsar Broker 会将音讯写入 Client,同时将音讯存到 Bookie 中。

上图右侧:右侧展现的是 PIP 70 提出的改良计划。在新计划中,Broker 依然会将音讯写入到 Client 中,但写入到 Bookie 中的是 Raw Message ── 何为 Raw Message?就是在原 Message 根底上减少了 BrokerEntryMetadata。从上图能够看到 Client 是无奈获取 Raw Message 的,只有 Broker 能够获取 Raw Message。之前提到,Protoctol handler 能够获取 Broker 全副权限,因而 Protocol Handler 也获取 Raw Message。如果在 Pulsar 中置入 offset,那么 KoP 就能够获取 Offset。

咱们做了这样的实现:在 protocol buffer 文件中有两个字段,次要的是第二字段。Index 对应 Kafka 的 Offset,相当于在 Pulsar 中将 Kafka 实现了一遍。有两个 intercepter 别离是 ManagedLedgerInterceptor

   private boolean beforeAddEntry(OpAddEntry addOperation) {
        // if no interceptor, just return true to make sure addOperation will be 
initiate()
        if (managedLedgerInterceptor == null) {return true;}
        try {
            managedLedgerInterceptor.beforeAddEntry(addOperation, 
addOperation.getNumberOfMessages());
            return true;

和 BrokerEntryMetadataInterceptor。

    public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {if (op == null || numberOfMessages <= 0) {return op;}
        op.setData(Commands.addBrokerEntryMetadata(op.getData(), 
brokerEntryMetadataInterceptors, numberOfMessages));
        return op;
    }

addOperation 蕴含了从 producer 发过来的音讯的字节和数量,由此 interceptor 能够拦挡到所有生产的音讯。而 Commands.addBrokerEntryMetadata 的作用是在 OpAddEntry data 后面加一个 BrokerEntryMetadata。加在后面的起因是为了易于解析,能够先读后面的字段是否是 magic number,是的话就能够接着读 BrokerEntryMetadata,不是的话就能够按失常的协定解析一般的 Metadata。BrokerEntryMetadataInterceptor 相当于在 Broker 端做的拦截器。

因而,在 KoP 中基于 BrokerEntryMetadata 就很容易实现间断 Offset:

  • FETCH 申请:间接读 Bookie,解析 BrokerEntryMetadata 即可;
  • PRODUCE 申请:将 ManagedLedger 传入异步写 Bookie 的上下文,从 ManagedLedger 的 interceptor 中拿到 Offset
  • COMMIT_OFFSET 申请:对于 __consumer_offsets,一成不变写入 topic,对于 Pulsar 的 cumulative acknowledgement,则对 ManagedLedger 进行二分查找。

鉴于上述改变,在 KoP 2.8.0 中必须进行如下配置,以确保 Offset 操作失常应用:

brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

音讯的编码与解码

这也是 KoP 2.8.0 改良比拟重要的一部分。

在 KoP 2.8.0 之前,KoP 对音讯的生产和生产都须要通过对音讯的解压缩、解 batch 等操作,操作时延较重大。咱们也提出了一个问题:为什么 KoP 要兼容 Pulsar 的客户端呢?如果从 Kafka 迁徙到 Pulsar,大部分状况可能只存在 Kafka 客户端,不太可能存在 Kafka client 与 Pulsar Client 的交互,针对音讯的编解码就显得没有必要。在生产音讯时,将 MemoryRecords 外部的 ByteBuffer 间接写入 Bookie 即可。

在音讯生产时绝对不太一样,咱们是用 ManagedCursor 进行读取的,也须要将若干个 Entry 转成一个 ByteBuf,但在理论利用场景中发现这种形式开销依然比拟大,进一步排查后发现是在 appendWithOffset 对每条音讯从新计算校验和时产生的,如果 batch 数量较多则计算次数过多,产生了不必要的开销。针对该问题,BIGO 团队成员给到了相干 PR 计划,提交了一个 appendWithOffset 简化版本(如下图),去掉了非必要动作,当然该提案也是基于后面提交的间断 Offset 改良根底上进行的。

性能测试(WIP)

性能测试还处于 WIP(Work in Progress)阶段,目前发现了一些问题。首先,在下图图峰中,端到端延时为 6ms 对 4ms,该工夫在可承受范畴内。然而在后续排查中,我发现经常出现 full GC 高达 600 ms 的状况,甚至呈现延时更高的状况,咱们在排查这个问题。

以下几张图别离为 HandleProduceRequest、ProduceEncode、MessageQueuedLatency、MessagePublish 的监控。从监控来看,HandleProduceRequest(PRODUCE 申请的解决开始,到这次申请所有音讯全副胜利写入 Bookie)的延时为 4 ms 左右,与 Pulsar 客户端差不多然而少了一趟网络的往返。

咱们次要看编码 ProduceEncode(对 Kafka 音讯编码的工夫)的工夫,我的测试是用 Kafka 的 EntryFormat,能够看到只耗费不到 0.1 ms 的工夫;如果用 Pulsar 的 EntryFormat,那么监控后果在零点几 ms ~ 几 ms 之间。

其实这里的实现还存在一点问题,因为目前还在用一个队列,所以会有下图的指标 MessageQueuedLatency。MessageQueuedLatency 是从每个分区的音讯队列开始,到筹备异步发送的工夫。咱们狐疑是不是队列导致性能变差,然而从监控来看 0.1 ms 的延时影响不大。

最初是 MessagePublish 是 Bookie 的延时,即单个分区的音讯从异步发送开始,到胜利写入 Bookie 的工夫。监控后果较现实,所以近期咱们会钻研 GC 的问题起源。

KoP Authentication

在 2.8.0 版本之前

在理论生产环境中如果须要部署到云上,须要反对 Authentication。在 2.8.0 之前,KoP 对 Authentication 的反对还比较简单,反对仅限于 SASL/PLAIN 机制,它基于 Pulsar 的 JSON Web Token 认证,在 Broker 的根本配置之外,只须要额定配置 saslAllowedMechanism=Plain。用户端则须要输出 namespace 和 token 作为 JAAS 配置的用户名和明码。

security.protocol=SASL_PLAINTEXT    # or security.protocol=SASL_SSL if SSL connection is used
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
Required username=''public/default'' password=''token:xxx'';

反对 OAuth 2.0

最近 KoP 2.8.0 反对 OAuth 2.0 进行认证,也就是 SASL/OAUTHBEARER 机制。简略科普一下,它采纳了简略的第三方服务。首先 Client 从 Resource Owner 获得受权 Grant,Resource Owner 能够是相似于微信公众号的受权码,也能够是真人当时给的 Grant。而后将 Grant 发给 Authorization Server,即 OAuth 2 的 Server,通过受权码可获得 Access Token,即可拜访 Resource Server,即 Pulsar 的 Broker,Boker 会进行 Token 的验证。获取 Token 的形式是第三方验证,这个办法绝对平安。

KoP 默认的 Handler 和 Kafka 雷同。相似 Kafka,KoP 也须要在 broker 端配置 Server Callback Handler 用于 token 验证:

  • kopOauth2AuthenticateCallbackHandler:handler 类
  • kopOauth2ConfigFile:配置文件门路

这外面用 JAAS 的办法,用独自的配置文件。KoP 提供了一种实现类,它基于 Pulsar Broker 配置的 AutnticationProvider 进行验证。因为 KoP 有 Broker Service,那么即领有 Broker 的所有权限,能够去调用 Broker 配置的 provider authentication 办法进行验证,因而配置文件中仅需配置 auth.validate.method=,该 method 对应的是 provider 的 getAuthNa me 办法返回值。如果用 JWT 认证的话,这个 method 是 token;用 OAuth 2 认证的话,这个 method 可能会不同。

客户端

对于 Kafka 客户端,KoP 提供了一种 Login Callback Handler 实现。Kafka Java 客户端 OAuth 2.0 认证:

sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler
security.protocol=SASL_PLAINTEXT # or security.protocol=SASL_SSL if SSL connection is used sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
    required oauth.issuer.url="https://accounts.google.com"\
    oauth.credentials.url="file:///path/to/credentials_file.json"\
    oauth.audience="https://broker.example.com";

Server Callback 是用来验证 client token 的,Login Callback Handler 是从第三方的 OAuth 2 服务上获取第三方 token。我的实现是参考 Pulsar 的实现,配置是按 Kafka 的 JAAS 进行配置。次要有三个须要配置:issueUrl、credentialsUrl、audience。它的含意和 Pulsar 的 Java 客户端认证是一样的,因而能够参考 Pulsar 的文档。Pulsar Java 客户端 OAuth 2.0 认证:

String issuerUrl = "https://dev-kt-aa9ne.us.auth0.comH;
String credentialsUrl = "file:///path/to/KeyFile.json";
String audience = "https://dev-kt-aa9ne.us.auth0.com/api/v2/";
PulsarClient client = PulsarClient.builder() 
    .serviceUrl("pulsar://broker.example.com:6650/") 
    .authentication(AuthenticationFactoryOAuth2.clientcredentials(issuerUrl, credentialsUrl, audience)) .build();

因而 KoP 对 OAuth 2 的反对在于它提供了 Client 端和默认的 Server 端的 Callback Handler。在 Kafka 应用 OAuth 2 验证时,须要本人写 Handler;然而 KoP 和 Pulsar 的实现相似,不须要本人写 Handler,开箱即用。

KoP 2.8.0 其余停顿

  • 移植了 Kafka 的 Transaction Coordinator。若想启用 Transaction,须要增加如下配置:
enableTransactionCoordinator=true
brokerid=<id>
  • 基于 PrometheusRawMetricsProvider 增加了 KoP 自定义的 metrics。该个性由 BIGO 的陈航增加,即刚刚展现的监测。
  • 裸露 advertised listeners,从而反对 Envoy Kafka Filter 进行代理。在以前的 KoP 中不敌对的一点是,配置的 listener 必须和 broker 的 advertised listener 一样。在新版本中咱们将 listener 和 advertised listener 离开,能够反对代理,比方:部署在云上能够用 Envoy 代理。
  • 欠缺对 Kafka AdminClient 的反对。这是从前被疏忽的一点。大家认为用 Pulsar 的 admin 就能够了,实际上一方面用户习惯应用 Kafka AdminClient,另一方面有些用户配置的组件内置了 AdminClient,如果不反对该协定会影响应用。

近期打算

Pulsar 2.8.0 争取在 4 月底公布。在正式公布前须要排查一些性能测试的问题:

  1. 增加更具体的 metrics。
  2. 排查压测过程中内存持续增长以及 full GC 的问题。
  3. 进行更为零碎的性能测试。
  4. 解决社区近期反馈的问题。

相干浏览

  • 新性能详解 + 试用征集:Pulsar Function Mesh
  • Apache Pulsar PMC 成员翟佳:Pulsar 2021 瞻望与布局

点击 链接,获取 Apache Pulsar 硬核干货材料!

退出移动版