关于pulsar:Apache-Pulsar-在腾讯-Angel-PowerFL-联邦学习平台上的实践

8次阅读

共计 8536 个字符,预计需要花费 22 分钟才能阅读完成。

腾讯 Angel PowerFL 联邦学习平台

联邦学习作为新一代人工智能根底技术,通过解决数据隐衷与数据孤岛问题,重塑金融、医疗、城市安防等畛域。

腾讯 Angel PowerFL 联邦学习平台构建在 Angel 机器学习平台上,利用 Angel-­PS 反对万亿级模型训练的能力,将很多在 Worker 上的计算晋升到 PS(参数服务器)端;Angel PowerFL 为联邦学习算法提供了计算、加密、存储、状态同步等基本操作接口,通过流程调度模块协调参与方工作执行状态,而通信模块实现了工作训练过程中所有数据的传输。Angel PowerFL 联邦学习曾经在腾讯金融云、腾讯广告联结建模等业务中开始落地,并获得初步的成果。

Angel 机器学习平台:https://github.com/Angel-ML

Angel PowerFL 对联邦通信服务的要求

Angel PowerFL 联邦学习平台在训练任务过程当中,对参与方之间的音讯通信要求极高,要求音讯零碎必须稳固牢靠、放弃高性能且能保障数据安全。Angel PowerFL 的学习工作在训练过程当中,参与方之间会有大量的加密数据通过通信模块传输,Angel PowerFL 对通信服务有以下需要:

➡️ 稳固牢靠

Angel PowerFL 的学习工作时长从几分钟到几小时,算法执行对数据的准确性要求很高,不同算法的数据传输峰值也不一样,这须要通信模块的服务足够稳固,并且不能丢数据。

➡️ 高性能传输

Angel PowerFL 底层通过 Spark 进行计算,Executor 并发执行会产生很多待传输的两头数据,通信模块须要将这些加密后的数据及时传输给对方,这就要求通信服务做到低延时、高吞吐量。

➡️ 数据安全

尽管 Angel PowerFL 所有数据都通过加密模块进行了加密,但参加联邦学习的业务可能散布在不同公司;跨公网进行传输,须要通信模块足够平安,不易被攻打。

为什么抉择 Pulsar

联邦通信服务在做技术预研的时候,思考过 RPC 直连、HDFS 同步、MQ 同步三种技术计划。思考到对平安和性能的要求比拟高,排除了 RPC 直连和 HDFS 同步计划,确定采纳 MQ 同步计划。

MQ 可选的服务很多,比方 Pulsar、Kafka、RabbitMQ、TubeMQ 等。 思考到 Angel PowerFL 对稳定性、可靠性、高性能传输和数据安全有很高的需要,咱们征询了腾讯数据平台部 MQ 团队,他们向咱们举荐了 Pulsar。

随后,咱们对 Pulsar 发展了深刻调研,发现 Pulsar 内置的诸多个性,正好满足了咱们对音讯零碎的要求。Pulsar broker 和 bookie 采纳了计算存储分层架构,保障了数据稳固牢靠,性能良好;Pulsar 反对跨地区复制(geo­-replication),解决了 PowerFL 跨联邦同步 MQ 问题;而 Pulsar 的验证和受权模式也能保障传输平安。

云原生的计算与存储分层架构

Apache Pulsar 是下一代云原生分布式音讯和事件流平台,采纳了计算和存储分层的架构:在 Broker 上进行 Pub/Sub 相干的计算,在 Apache BookKeeper 上存储数据。

和传统的音讯平台(如 Kafka)相比,这种架构有显著的劣势:

  • Broker 和 bookie 互相独立,能够独立扩大和容错,晋升零碎的可用性。
  • 分区存储不受单个节点存储容量的限度,数据分布更平均。
  • BookKeeper 存储安全可靠,保障音讯不失落,同时反对批量刷盘以取得更高吞吐量。

Pulsar Geo­-replication

Pulsar 原生反对跨地区复制(Geo­-replication),能够在多个数据中心的多个 Pulsar 集群中同时同步 / 异步复制数据。还能够在音讯级别,通过 setReplicationClusters 管制音讯复制到哪些集群。

在上图中,无论 Producer P1、P2 和 P3 在什么时候别离将音讯公布给 Cluster A、Cluster B 和 Cluster C 中的 topic T1,这些音讯均会立即复制到整个集群。一旦实现复制,Consumer C1 和 C2 即可从本人所在的集群生产这些音讯。

程度扩大

因为 Pulsar 的存储设计基于分片,Pulsar 把主题分区划分为更小的块,称其为分片。每个分片都作为 Apache BookKeeper ledger 来存储,这样形成分区的分片汇合散布在 Apache BookKeeper 集群中。这样设计不便咱们治理容量和程度扩大,并且满足高吞吐量的需要。

  • 容量治理简略: 主题分区的容量能够扩大至整个 BookKeeper 集群的容量,不受单个节点容量的限度。
  • 扩容简略: 扩容无需从新均衡或复制数据。增加新存储节点时,新节点仅用于新分片或其正本,Pulsar 主动均衡分片散布和集群中的流量。
  • 高吞吐量: 写入流量散布在存储层中,不会呈现分区写入争用单个节点资源的状况。

通过深刻调研后,咱们决定在腾讯 Angel PowerFL 联邦学习平台上应用 Apache Pulsar。

基于 Apache Pulsar 的联邦通信计划

联邦学习的各个业务(Angel PowerFL 称之为 Party,每个 Party 有不同的 ID,如 10000/20000),可能散布在同个公司的不同部门(无网络隔离),也可能散布在不同公司(跨公网),各个 Party 之间通过 Pulsar 跨地区复制性能进行同步复制,总体设计计划如下:

联邦学习的每个训练任务,通过音讯的 producer 和 consumer 连贯所在 Party 的 Pulsar 集群,集群名以 fl-pulsar-[partyID] 进行辨别,训练任务产生须要传输的两头数据后,生产者将这些数据发送给本地 Pulsar 集群。

Pulsar 集群收到数据后,通过 Pulsar proxy 建设的同步复制网络通道,将数据发送给应用方 Party。而应用方 Party 的消费者,会始终监听该训练任务对应的 topic,当有数据达到后,间接生产数据进行下一步的计算。

在 Angel PowerFL 执行训练任务时,driver 和每个 partition 会创立一个 channel 类型变量,该变量和 Pulsar 当中具体的 topic 一一对应,须要替换的数据都会通过生产者发送到这个 topic。

Angel PowerFL 反对多方联邦,因而会有 2+ 个 Pulsar 集群须要同步复制数据。每个联邦学习工作通过各自的 parties 工作参数指定了参与方,生产者在发送音讯时调用 setReplicationClusters 接口,确保数据只在参加 Party 之间传输。

在 Angel PowerFL 的通信模块中,咱们充分利用了 Pulsar 的 geo-­replication、topic 限流、Token Authentication 等性能。上面我来具体介绍如何在 Angel PowerFL 联邦学习平台中应用 Pulsar。

Geo­-replication 去掉 Global ZooKeeper 依赖

在 Angel PowerFL 联邦学习平台上,部署一套残缺的 Pulsar 依赖两个 ZooKeeper 集群,别离是 Local ZooKeeper 和 Global ZooKeeper。Local ZooKeeper 和 Kafka 中的 ZooKeeper 作用相似,用来存储元数据。而 Global ZooKeeper 则在 Pulsar 多个集群间中共享配置信息。

在 Angel PowerFL 场景中,每个 Party 退出前,都要先部署一个 Global ZooKeeper 的子节点,或者共用一套跨公司或跨地区的公共 ZooKeeper,这样不仅会减少部署的难度,也会减少被攻打的危险,不利于新 Party 退出。

Global ZooKeeper 中存储的元数据,次要是集群名 / 服务地址 /namespace 权限等信息。Pulsar 反对创立和退出新集群。咱们通过以下两个步骤注册联邦 Pulsar 集群的信息到 local ZooKeeper,就去除了对 Global ZooKeeper 的依赖:

步骤 1: 注册新退出 Party 的 Pulsar 集群

# OTHER_CLUSTER_NAME 为待注册 Party 的 Pulsar 集群名
# OTHER_CLUSTER_BROKER_URL 为 Pulsar 集群对应的 broker 地址
./bin/pulsar-admin clusters create ${OTHER_CLUSTER_NAME} 
 --url http://${OTHER_CLUSTER_HTTP_URL} 
 --broker-url pulsar://${OTHER_CLUSTER_BROKER_URL}

步骤 2: 授予训练用到的 namespace 拜访集群权限

./bin/pulsar-admin namespaces set-clusters fl-tenant/${namespace} 
 -clusters ${LOCAL_CLUSTR_NAME},${OTHER_CLUSTER_NAME}

对于新退出的 Party,只用提供与其对应的 Pulsar 的集群名 / 服务地址即可实现注册,geo-replication 就能够通过注册信息同步复制数据。

Client 减少 Token 认证

Pulsar 作为 Angel PowerFL 的通信模块,没有退出用户级别的权限管制。为了进一步保障 client 生产和生产数据的平安,咱们参考 Pulsar Client authentication using tokens based on JSON Web Tokens 减少了 token 认证,Angel PowerFL 的训练任务除了配置以后 Party 应用的服务地址外,还须要配置 admin token。

https://pulsar.apache.org/doc…
因为 Angel PowerFL 整套零碎部署在 Kubernetes 上,咱们通过容器筹备 Pulsar 集群须要的 Public/Private keys 等文件,而后注册到 K8S secret 中。

# 生成 fl-private.key 和 fl-public.key
docker run --rm -v "$(pwd)":/tmp 
 apachepulsar/pulsar-all:2.5.2 
 /pulsar/bin/pulsar tokens create-key-pair --output-private-key 
 /tmp/fl-private.key --output-public-key /tmp/fl-public.key
# 生成 admin-token.txt token 文件
echo -n `docker run --rm -v 
 "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 
 /pulsar/bin/pulsar tokens create --private-key 
 file:///tmp/fl-private.key --subject admin`
# 将认证相干的文件注册到 K8S
kubectl create secret generic token-symmetric-key 
 --from-file=TOKEN=admin-token.txt 
 --from-file=PUBLICKEY=fl-public.key -n ${PARTY_NAME}

开启多集群 topic 主动回收

Pulsar 集群开启了 geo-­replication 性能后,无奈通过命令间接删除用过的 topic,而 Angel PowerFL 训练任务每次应用的工作是一次性的,工作完结后这些 topic 就没用了,如果不及时删除会呈现大量累积。

对于通过 geo­-replication 开启复制的 topic,能够配置 brokerDeleteInactivetopicsEnabled 参数,开启 topic 主动回收。主动回收无用的 topic,需满足以下几个条件:

  • 以后 topic 没有生产者(producer)或者消费者(consumer)连贯
  • 以后 topic 没有被订阅
  • 以后 topic 没有须要保留的信息

Angel PowerFL 部署的 Pulsar 集群,通过 brokerDeleteInactivetopicsEnabled 开启 topic 主动回收。在执行训练任务的过程中,应用后对每个 topic 按回收条件进行解决。同时,咱们减少了

brokerDeleteInactivetopicsFrequencySeconds 配置,将回收的频率设置为 3 小时。

优化 topic 限流

Angel PowerFL 中的训练任务,在不同的数据集 / 算法 / 执行阶段,生产数据的流量峰值也不同。目前生产环境中单个工作最大的数据量超过 200G/ 小时。训练过程中,如果 Pulsar 连贯中断或者生产和生产过程出现异常,须要从新开始整个训练任务。

为了躲避 Pulsar 集群被单个训练任务冲垮的危险,咱们应用了 Pulsar 的限流性能。Pulsar 反对 message-rate 和 byte-rate 两种生产限流策略,前者限度每秒生产音讯的数量,后者限度每秒生产音讯的大小。Angel PowerFL 将数据切分成多个 4M 的音讯,通过 message-­rate 限度生产音讯的数量。在 Angel PowerFL 中,咱们将 namespace 的音讯限度为 30 条(小于 <30*4=120M/s):

./bin/pulsar-admin namespaces set-publish-rate fl-tenant/${namespace} -m 30

刚开始测试 message-rate 的限流性能时,呈现了限不住的状况(限流设置生效)。腾讯数据平台部 MQ 团队负责 Pulsar 的共事帮忙一起排查,发现设置 topicPublisherThrottlingTickTimeMillis 参数后,限度不能失效。

因而咱们想方法在 broker 端启用了准确的 topic 公布频率限度,优化了限流性能并奉献回社区,详情见 PR-7078: introduce precise topic publish rate limiting。
https://github.com/apache/pul…

优化 topic unloading 配置

Pulsar 依据 broker 集群负载情况,能够将 topic 动态分配到 broker 上。如果领有该 topic 的 broker 宕机,或者领有该 topic 的 broker 负载过大,则该 topic 会立刻重新分配给另一个 broker;而重新分配的过程就是 topic 的 unloading,该操作意味着敞开 topic,开释所有者(owner)。

实践上,topic unloading 由负载平衡调整,客户端将经验极小的提早抖动,通常耗时 10ms 左右。但 Angel PowerFL 初期在执行训练任务时,日志爆出大量因为 unloading topic 导致的连贯异样。日志显示 topic unloading 在一直的重试,但都不胜利:

[sub] Could not get connection to broker: topic is temporarily unavailable -- Will try again in 0.1 s

先来看 broker/namespace/bundle/topic 这四者的关系。Bundle 是 Pulsar namespace 的一个分片机制,namespace 被分片为 bundle 列表,每个 bundle 蕴含 namespace 的整个哈希范畴的一部分。Topic 不间接调配给 broker,而是通过计算 topic 的哈希码将 topic 调配给特定的 bundle;每个 bundle 相互独立,再被调配到不同的 broker 上。

Angel PowerFL 晚期的工作 topic 没有复用,一个 LR 算法训练任务创立了 2000 多个 topic,每个 topic 生产的数据负载也不同,咱们判断上述断连问题是因为短时间内(最小工作十分钟内能完结,同时会有多个工作在运行)大量创立和应用 topic,导致负载不平衡,topic unloading 频繁产生。为了升高 topic unloading 的频率,咱们调整了 Pulsar Bundle 的相干参数:

# 减少 broker 可最大调配 topic 数量
loadBalancerBrokerMaxTopics=500000
# 启用主动拆分 namespace bundle
loadBalancerAutoBundleSplitEnabled=true
# 减少触发拆分 bundle 的 topic 数量
loadBalancerNamespaceBundleMaxTopics=10000
# 减少触发拆分 bundle 的音讯数
loadBalancerNamespaceBundleMaxMsgRate=10000

同时,在创立 namespace 时,把 bundle 数量默认设置为 64。

./bin/pulsar-admin namespaces create fl-tenant/${namespace} --bundles 64

通过以上调整,Angel PowerFL 在工作执行期间没有再呈现过因为 topic unloading 导致的断连。

Pulsar on Kubernetes

Angel PowerFL 的所有服务均通过 Helm 部署在 Kubernetes 上。Pulsar 作为其中的一个 chart,能够很好的利用 K8S 的资源隔离、疾速扩缩容等个性。在 Angel PowerFL 应用 Helm 部署 Pulsar 的实际中,咱们总结了以下教训:

????️ 应用 Local Persistent Volume 作为存储

Pulsar 是 IO 敏感的服务,尤其 bookie 组件,在生产环境中倡议应用 SSD 或独立的磁盘。Angel PowerFL 在跑一些大数据集工作时,Pulsar 经常出现“No Bookies Available”的异样。这期间磁盘的 IO 使用率很高。

咱们通过 Local Persistent Volume 将 bookie 和 ZooKeeper 等其它组件挂载到独自的磁盘,减缓了磁盘 IO 竞争。咱们也测试过将 Pulsar 的 PV 存储换成 Ceph 和 NFS,性能都没有间接应用 Local Persistent Volume 好。

????️ 应用 NodeSelector

Geo-replication 同步复制数据期间,broker 须要拜访对方的 Pulsar proxy 容器。Angel PowerFL 将网关机独自打了标签,通过 NodeSelector 将 broker 装置在可拜访外网的网关机上。

????️ 配置 useHostNameAsBookieID

Bookie 是有状态的组件,为了 bookie pod 重建后服务失常,须要配置 useHostNameAsBookieID,确保向 ZooKeeper 注册的 ID 是 pod 的 hostname。

将来打算

Angel PowerFL 目前应用 Pulsar 快一年了,稳固运行工夫最长的集群曾经超过半年,将来对 Pulsar 的应用打算次要有两个。

???? 降级 Pulsar 到 2.6.x 版本

咱们目前应用的是 Pulsar 2.5.2 版本,因为最近会应用 Pulsar Key_Shared 性能做 Angel-PS 的容灾复原。2.6.0 版本刚好有加强 Key_Shared 订阅模式,所以咱们预计将来一个月降级到 Pulsar 2.6.x。
https://github.com/apache/pul…

???? Pulsar on K8S 反对多磁盘挂载

Angel PowerFL 所有服务都运行在 Kubernetes 上(除了工作应用的 YARN 计算资源),Pulsar 作为其中的一个 chart 和其它服务一起部署,应用 Local Persistent Volume 作为存储。但目前 bookie 只反对挂载一块磁盘(目录),对于多磁盘的机器没有更充沛的利用,咱们打算减少该个性。

总结

咱们介绍了在人工智能利用场景下,应用 Pulsar 作为 Angel PowerFL 通信模块的相干实际。在计划实现过程当中,咱们充沛应用了 Pulsar 诸多内置个性,并依据本身需要做了相干优化,如 geo-­replication 去掉 Global ZooKeeper 依赖,为 client 减少 token 认证,开启多集群 topic 主动回收,优化 topic 限流性能和 topic unloading 配置等。

Pulsar 作为下一代云原生分布式音讯和流平台,有泛滥吸引人的性能,在直播与短视频、批发与电子商务、媒体、金融等行业有广泛应用,期待 Pulsar 在不同的利用场景下一直有新的案例落地。

致 谢

特别感谢腾讯数据平台部 MQ 团队,在 Angel PowerFL 平台应用 Pulsar 过程中给与的技术领导。该团队在 Apache Pulsar 和 TubeMQ 上有多年的技术积攒,踊跃为 Pulsar 社区做出了巨大贡献。Pulsar 社区非常沉闷,正处于疾速成长之中。咱们会继续关注并和 Apache Pulsar 社区深刻单干,把优化的性能奉献给 Pulsar 社区,和社区其余用户一起进一步欠缺、优化 Pulsar 的个性和性能,独特建设一个更弱小欠缺的 Pulsar 社区。

作者简介

张超,腾讯数据平台部高级工程师,负责 Angel PowerFL 联邦通信 /PowerFL on K8S 等工作。他和腾讯数据平台部 MQ 团队一起将 Apache Pulsar 引入 PowerFL 联邦学习平台,开启了 Pulsar 在机器学习畛域的利用。

正文完
 0