关于pulsar:腾讯云基于-Apache-Pulsar-跨地域复制功能实现租户跨集群迁移

4次阅读

共计 6695 个字符,预计需要花费 17 分钟才能阅读完成。

导语

本文整顿自 Pulsar Summit Asia 2022 技术峰会上腾讯云中间件高级研发工程师韩明泽的分享《基于跨地区复制实现租户跨集群迁徙》。本文次要介绍基于跨地区数据复制和订阅进度同步的实现及优化,以及腾讯云在跨集群迁徙过程中遇到的问题及租户跨集群迁徙解决方案。

作者简介

韩明泽

毕业于武汉大学,腾讯云中间件高级研发工程师,领有多年消息中间件开发与运维教训,RoP (RocketMQ-on-Pulsar) Maintainer,Apache Pulsar 贡献者。

订阅进度同步的实现及优化

跨地区复制简介

跨地区复制是 Apache Pulsar 提供的跨机房数据复制能力。其典型的应用场景有:

  • 多机房数据复制,即数据容灾备份
  • 异地读写

下图为典型的异地读写案例,假设在北京生产与写入,而在上海生产,在对从生产到生产整体链路耗时要求不高的状况下,即可采纳跨地区复制的能力。同地区写入的工夫老本绝对较低,而比拟耗时的生产能够在外部通过跨地区复制屏蔽。

跨地区复制集群复制性能实现原理

如果 Apache Pulsar 不提供跨集群复制性能,如何在运维 RocketMQ 或者 Kafka 等状况下实现跨地区数据复制、容灾者备份和集群间数据迁徙的工作?

通常状况下,服务中有生产者和消费者两个角色,消费者连贯上游集群,生产者连贯上游集群。上游集群的生产数据通过生产者发送到上游指标集群。Apache Pulsar 在跨地区复制的设计中采纳了相似思路,跨地区复制实现的流程如下图所示。

在每个主题外部设置了 Replication 模块。如果开启数据复制,此模块则会发动外部订阅(或游标进度)。在任一主题内生产音讯时,生产者向对端集群投递音讯来实现跨集群数据的复制性能,不影响本集群的生产生产。在上述的过程中,音讯的读取与发送齐全异步解决。

订阅进度同步的实现原理

数据复制与同步的实现比较简单,但在一些场景中,除了同步音讯,还须要同步订阅的生产进度。

以异地容灾为例,假如本来业务的生产生产均在北京,当北京集群业务呈现故障时,业务端想疾速将集群切换到上海集群,以持续从北京集群曾经生产到的地位开始做生产和生产。

如果没有订阅进度同步的能力,那么用户很难确定在北京集群里哪些音讯曾经生产过;如果从最新的地位开始生产,可能会导致音讯失落;如果从最早的地位开始生产,会造成大量的反复生产。在实际操作中,略微折中的办法是通过工夫回溯退回到较近的工夫点。然而,这种办法无奈从根本上解决音讯失落或者反复生产的问题。

而 Apache Pulsar 所提供的订阅进度同步的性能,则能够让用户平滑地实现异地容灾的切换,不必放心音讯的失落或者反复。Pulsar 同时反对数据同步和订阅进度同步,如下图所示。

生产进度

生产进度由 markDeletePosition 和 individuallyDeletedMessages 两局部组成。在 RocketMQ 和 Kafka 中,生产进度在分区上通过 Offset 标识。Offset 对应 Pulsar 中的概念能够了解为 markDeletePosition。

Pulsar 同时反对多种生产模式,它的音讯确认机制 / 签收机制反对单条确认。因而,在 Pulsar 中除了须要记录 markDeletePosition,还须要 Individual Acks 记录单条被确认的音讯。

如上图,在共享生产模式下有很多消费者实例。因为每个消费者的生产速度不一样,音讯的推送程序和音讯 Ack 程序并不完全相同。假设咱们须要把标号为 0 到 9 的音讯同时推送给不同的消费者实例,音讯 0、1、2、3、4、6、9 曾经确认,然而 5、7、8 并没有确认。markDeletePosition 的游标位置,即 Offset 标识的生产进度只能标识到 4 的地位,示意 4(包含 4)之前的音讯都曾经被生产。音讯 5、7、8 曾经被生产,须要独自确认。Pulsar 通过 individuallyDeletedMessages 数组对象范畴去标识哪些音讯曾经被确认过。咱们能够将上述音讯的确认了解为几个开闭区间,从中能够显著得出 5、7、8 没有被生产。

Message ID 对应关系

在 Pulsar 中,订阅进度同步的复杂性在于同一条音讯在不同集群中的 Message ID 不统一,这也是 Pulsar 相较于 Kafka 和 Rocket MQ 而言比较复杂的中央。在 Kafka 分区里只有 Offset 一个概念,而在 Pulsar 中,Message ID 由 Entry ID 和 Ledger ID 组成。同一条音讯在不同集群里存储的 Entry ID 和 Ledger ID 无奈保持一致。

如上图所示,在 A 和 B 两个集群中,音讯 1 在集群 A 中的 ID 是 1:0,而在集群 B 中的 ID 是 3:0;音讯 2 在集群 A 中的 ID 是 1:1,在集群 B 中的 ID 是 3:1。如果同一条音讯在两个集群中的 ID 完全一致,同步生产进度非常容易,比方 ID 为 1:2 的音讯在集群 A 中被生产,集群 B 同步确认音讯 1:2 即可。然而因为 Message ID 不统一,或者在不晓得 Message ID 间对应关系的状况下,没有方法间接将不同集群间的音讯对应起来。

所以关键问题就在于,如何晓得 Message ID 间的对应关系?其实这也是最为简单的中央,咱们只有分明集群间同一 Message ID 的对应关系,能力在集群 A 确认音讯 1:2 之后,同步在集群 B 确认音讯 3:2,或者更新其 markDeletePosition。

构建 Cursor Snapshot

在原生 Pulsar 里通过定期结构 Cursor Snapshot 的机制来实现 Message ID 间的彼此对应。

"ReplicatedSubscriptionSnapshotRequest":{
    "snapshot_id":"444D3632-F96C-48D7-83DB-041C32164EC1",
    "source_cluster":"a"
}

以上图为例,集群 A 确认音讯 1:2 时通过疾速构建 Snapshot 向集群 B 和 C 发送申请,申请其告知集群 A 此音讯在其集群中的地位信息。

集群 A 会定期向其余集群发送 Replicate Subscription Snapshot Request。集群 B 在收到集群 A 的申请之后会回发响应,将以后复制到的最新消息地位发送给集群 A。

"ReplicatedSubscriptionSnapshotResponse":{
    "snapshotid":"444D3632-F96C-48D7-83DB-041C32164EC1",
    "cluster":{
        "cluster":"b",
        "message_id":{
            "ledger_id":1234,
            "entry_id":45678
            }
    }
}

集群 A 收到对端集群 B 和 C 返回的以后音讯的地位后,就会结构起 Message ID 间对应关系。如下图所示,集群 A 中 Message ID 为 192.123123 的音讯,在集群 B 中对应 ID 为 1234.45678,在集群 C 中对应 ID 为 7655.13421。

代码如下:

{
    "snapshot_id":"44403632-F96C-48D7-83DB-041C32164EC1",
    "local_message_id":{
         "ledger_id":192,
         "endtry_id":123123
    },
    "clusters":[
        {
            "cluster":"b",
            "message_id":{
                "ledger_id":1234, 
        "endtry_id":45678
            }
        },
        {
            "cluster":"c",
            "message _id";{
                "ledger_id":7655,
        "endtry_id":13421
            }
        }
    ],
}

Message ID 间对应关系的构建并不简单,然而实现逻辑会绝对简单。Cursor Snapshot 结构实现之后会造成一种对应关系作为 Cursor Snapshot Maker 写入到原主题。

如上图,阿拉伯数字示意业务主题里的业务音讯,字母 S 示意不同集群间结构进去的 Cursor Snapshot 数据。当生产到 Snapshot Marker 时会把对应的 Snapshot Marker 加载到内存里。比方,咱们在集群 A 中 Mark Delete 到音讯 3 的地位时,能够依据 S 外面记录的集群 B 中的音讯地位来更新集群 B 的 markDeletePosition。

以上图为例,在集群 A 中音讯 1:2 和音讯 1:6 的地位别离有一个 Snapshot,音讯 1:1、1:3、1:4、1:5 和 1:7 是一般音讯。当集群 A 中 markDeletePosition 更新到 1:4 时,音讯 1:2 在 1:4 之前并且有 Snapshot,就能够疾速到集群 B 去确认音讯 3:4,并更新该地位的 markDeletePosition。

订阅进度同步

订阅进度同步过程中存在的问题

订阅进度同步过程中存在的问题也是租户跨集群迁徙过程中卡点的问题:

  • 只同步 markDeletePosition,不同步 individuallyDeletedMessages。

这会导致在单条音讯确认时存在很多音讯确认空洞,对存在定时音讯的场景也会产生较大的影响。假设一个主题里有定时音讯和一般音讯,定时音讯的工夫是在一天后,也就意味着定时音讯的确认工夫须要提早一天。因为 markDeletePosition 只能记录此时曾经被全副确认过的音讯的地位,因而在定时音讯被确认时,markDeletePosition 还是一天前的地位。如果用户此时切换集群,就会造成音讯反复生产,至多一天的音讯会被反复生产。

  • 音讯沉积会导致无奈同步生产进度。这与 Cursor Snapshot 的创立机制无关。

如前文提到的,通过集群 A 构建与集群 B 和 C 之间的 Snapshot 时的申请并不是通过 RPC 接口收回的,而是借由咱们此前提到的“S”带入。在订阅进度或者音讯同步的过程中,音讯沉积不可避免,导致申请也被写入到本地主题。因为对端音讯沉积,且主题外部都会设置超时机制,如果在规定工夫内收不到构建 Snapshot 的申请,Snapshot 就无奈构建胜利,进而无奈同步订阅进度,markDeletePosition 也无奈同步。

  • 定期 Cursor Snapshot 机制。

沿用后面咱们在讲同步生产进度时所提到的案例,在集群 A 和集群 B 中,集群 A 中音讯 1:2 与音讯 1:6 与集群 B 之间有 Snapshot,一般音讯间没有 Snapshot。如果此时集群 A 的 markDeletePosition 更新到 1:4,因为此地位上两个集群之间并不存在 Snapshot,所以集群 A 无奈确认该条音讯在集群 B 中对应音讯地位,这也是以后机制中存在的问题。

综上所述,订阅进度同步过程中存在的问题次要在于只同步 markDeletePosition 而不同步 individuallyDeletedMessages,有时尽管同步了 markDeletePosition,但因为自身机制的问题会影响准确性或者呈现音讯沉积的状况。

订阅进度同步优化

上述问题在租户迁徙过程中会造成大量的反复生产,常见且难解。在一些实在用户在线业务场景中,大量、短暂且可控范畴内的反复生产能够承受,大量的反复生产不容许存在。

为了解决下面的问题,咱们优化了订阅进度同步的逻辑,在集群迁徙之前须要同步 markDeletePosition 和 individuallyDeletedMessages。在同步过程中,最大的问题依然是同一音讯在不同集群中 Message ID 的对应。即便集群 A 的 markDeletePosition 和 individuallyDeletedMessages 全副都同步到集群 B,然而集群 B 依然无奈确定 individuallyDeletedMessages 对应的本集群的 Message ID。

为了解决这个问题,咱们在原始集群(集群 A)发送音讯到集群 B 时,在音讯的 Metadata 里退出了集群 A 里的 Entry Position(Message ID)和 originalClusterPosition 的属性来携带音讯写入的地位。

这样,当咱们在集群 B 进行生产时,能够快捷地从 originalClusterPosition 属性中获取到集群 A 的 Message ID,将其与集群 A 同步到集群 B 的 individuallyDeletedMessages 进行比拟。如果音讯曾经被确认过就间接跳过此条音讯,不再发送给消费者。通过这样的办法实现对已确认音讯的过滤。

具体实现逻辑如上图。在迁徙集群迁徙之前,须要先将集群 1 中 individuallyDeletedMessages 的订阅同步到集群 2。在将音讯推送给消费者之前,音讯会先通过 Filter Entries For Consumer 过滤掉集群 1 中曾经生产过的音讯,将未生产的音讯推送给集群 2 中的消费者。

上述实现逻辑只是一种思路的转换。因为在 Pulsar 中,进度同步实现在集群 1 上,集群 2 中的音讯一直同步到集群 1,通过一直构建 Snapshot 记录集群 1 和集群 2 地位对应关系,这样在集群 1 确认音讯时,能够同步确认集群 2 对应地位。咱们的优化办法是把集群 1 中音讯的地位信息放在音讯里,通过同步 individuallyDeletedMessages 和 markDeletePosition 将进度同步到集群 2,在集群 2 理论生产时过滤。通过这种形式将反复生产管制在用户可承受范畴内。

租户跨集群迁徙的实现

晚期腾讯云外部的集群是共享集群,不同业务场景的用户应用同一套物理集群。有大规模音讯队列运维教训的同学晓得,不同用户混用同一集群会使用户之间相互影响。用户对服务的要求不同,须要为对服务质量要求比拟高的用户搭建独占集群,物理资源隔离来缩小对其余用户的影响。这时须要有平滑的迁徙计划实现集群的顺利迁徙。

租户迁徙整体架构

上图为腾讯云外部实现租户跨集群迁徙的架构图,其中最外围的模块 Lookup Service 是腾讯云外部代理客户端 Lookup 申请的服务模块,保留每个租户到物理集群的映射关系。咱们依据租户将用户客户端 Lookup 申请转发到对应的物理集群,进而获取用户客户端收发音讯时所须要连贯的 Broker 节点。须要留神的是,Lookup Service 不仅仅代理 Lookup 申请,还代理 getPartitionState、getPartitionMydata 和 getSchema 等申请,但不代理蕴含数据流的申请。数据流申请通过 CLB 或 VIP 间接连到集群来收发音讯,并不通过 Lookup Service。

其实 Lookup Service 不是为了跨集群迁徙而诞生,它的次要目标是在多种网络接入拜访场景下,为云上集群提供集中处理不同网络服务路由的能力。在私有云上不只存在通过简略的 Broker IP 就能连贯的内网用户,还须要通过公网 CLB、VPC 或 VIP 服务进行转发,Lookup Service 次要利用于这方面。咱们跨集群迁徙时利用了 Lookup Service 能力来保障集群切换简略顺利地实现,同时借助于跨地区复制的同步性能把数据从原有集群迁徙到指标集群上。迁徙实现后,通过 Lookup Service 的切入能力最终实现租户跨集群迁徙。

租户跨集群迁徙的次要流程

接下来介绍跨集群迁徙的具体流程。

  1. 同步元数据。在指标集群上依照原集群的租户、命名空间、主题和订阅角色等资源,实现元数据同步。
  2. 开启跨地区复制性能,迁徙租户下的主题数据。
  3. 在集群切换前开启订阅进度同步性能,把每个订阅的 individuallyDeletedMessages 和 Mark Delete Messages 同步到指标集群上。
  4. 批改 Lookup Service 中租户与物理集群的对应关系,被动调用 Unload 触发客户端从新寻址。Lookup Service 依据新租户到物理集群的对应关系返回新物理集群的地址。
  5. 在迁徙实现后,清理原集群上的资源。

总结

实现租户跨集群迁徙的形式有很多,本文只分享一种在私有云上实现革新老本较低、复杂程度较小并且可靠性较高的计划。这种计划不须要对现有 Pulsar 客户端和服务端协定做任何改变就能够实现平滑迁徙。

正文完
 0