乐趣区

关于消息中间件:Apache-Pulsar-在能源互联网领域的落地实践

对于 Apache Pulsar

Apache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。
GitHub 地址:http://github.com/apache/pulsar/

案例导读:本案例介绍了清华大学能源互联网翻新研究院将 Apache Pulsar 落地能源互联网方向的实际。Pulsar 的云原生架构、Schema、Functions 等个性满足了相干业务需要,也加重了他们开发和运维累赘。

浏览本文须要大概 8 分钟。

团队及业务简介

能源互联网是电力与能源工业倒退的方向。随着信息、通信和互联网技术的飞速发展,可获取的数据量正以爆炸式形式迅猛增长,传统的数据处理办法已难以应答这些海量且增长极快的信息资产,大数据实践正是在这样的状态下应运而生。大数据处理技术能帮忙咱们透过海量数据疾速分辨其运行状态及发展趋势,在纷纷的世界中独具洞察力。

清华大学能源互联网翻新研究院能源大数据与凋谢生态钻研核心会集了国内外能源及电力大数据畛域的多位专家,致力于推动大数据基础理论和实际利用的全面翻新。能源大数据与凋谢生态钻研核心将大数据技术利用于能源互联网、智能电网和智慧用能等工程场景,联合高性能优化、并行计算和人工智能等先进技术,研发实用于能源电力行业特点的大数据 / 云计算平台,和基于数据驱动的能源电力系统的高级利用,从而实现大数据产业的倒退,造成以数据为外围的新型产业链,推动我国能源产业的转型与降级。

挑 战

咱们团队的业务次要是与电力相干的物联网场景,旨在实现用户对传感器等设施数据的需要开发。咱们团队规模较小,但工作繁冗,心愿能更快更稳地实现客户的需要。

在整顿业务需要后,咱们提出当前端即服务(BaaS)为主、基于音讯的服务计划。在物联网畛域内,基于这样的解决方案,咱们能够共用更多基础设施服务,同时能够疾速应答不同需要进行业务开发。思考到非凡的业务需要,咱们的平台须要具备以下个性:

  • 多租户:平台要实现业务拆散,服务不拆散,又能够确保安全审核,满足客户对数据安全性的敏感需要,就必须反对多租户。此外,还能够在通信、数据、业务这三方面提供一些根底服务,比方自定义数据结构的 Schema Registry,自定义数据归属的 ACL 权限治理(减少删改的 API 接口),以及实现各种业务的自定义函数引擎。
  • Schema Registry:满足不同需要和利用场景下设施多变的数据结构,提供容许自定义数据结构的 Schema Registry。
  • 通用 API:提供蕴含减少删改的 HTTP RESTful APIs 和相应的 WebSocket 接口,确保在通信上提供根底服务,并基于这一根底服务进行扩大。
  • ACL 权限治理:可自定义数据的 ACL 权限管制服务,保障数据安全。
  • 时序数据库:少数状况下,物联网场景都在和时序数据打交道,所以咱们抉择了基于 PostgreSQL 的开源 TimeScaleDB,并且依靠 TimeScaleDB 做了一系列时序数据的聚合查问接口。
  • 用户自定义 functions:实现各种业务的自定义函数引擎。

之前咱们应用基于 RabbitMQ 和 Celery 的计划来实现用户自定义 functions 的函数引擎。这一计划的最后应用成果良好,但随着业务的增长,问题越来越多。咱们的小团队不得不花更多工夫来解决问题和优化整体计划。当 Celery 作为工作队列时,这些问题尤为重大。

咱们破费大量的工夫和精力解决的问题次要有两个:

  • 须要认真配置 Celery 的 worker 和 task,防止执行工夫长的工作阻塞其余工作;
  • Worker 更新时须要中断服务,更新工夫也绝对较长。

此外,在非凡场景中,如果单个音讯比拟大且音讯解决工夫长时,Celery 和 RabbitMQ 的内存累赘都比拟大。

随着客户数量和我的项目数量的减少,这些问题变得日益突出,咱们决定找一个新产品代替原有计划。

为什么抉择 Apache Pulsar?

如上所述,咱们心愿消息中间件能够提供以下个性:

  • 多租户
  • 可靠性和高可用
  • 反对多协定,尤其能够很不便地转换协定:在物联网畛域,咱们须要应答不同的通信协议,把不同通信协议的数据全副导入到消息中间件中。
  • 反对多语言:咱们团队次要应用 Go 语言,但咱们会和很多应用其余语言的团队单干,所以消息中间件最好能够反对其余语言。
  • 作为轻量级计算引擎实现简略的音讯解决。

在调研不同的消息中间件时,咱们很快发现了 Pulsar。通过 Pulsar 的文档和公布日志,咱们理解到 Pulsar 有很多优良的个性,所以决定对 Pulsar 进行测试和评估。通过深入研究、学习,咱们发现 Pulsar 的云原生架构、Schema、Functions 等非常适合咱们的业务需要。

  • 云原生:Pulsar 反对云原生,领有诸多优良的个性,如计算与存储拆散,能够很好地利用云的弹性伸缩能力,保障扩容和容错。此外,Pulsar 对 Kubernetes 的良好反对也在肯定水平上帮忙咱们将一部分业务轻松迁徙到了 Kubernetes 上。
  • Pulsar Functions:Pulsar Functions 是一个优良的轻量级计算引擎,能够很好地取代 Celery 计划。咱们能够更多地尝试应用 Pulsar Functions 来解决业务,这是咱们抉择 Pulsar 的次要起因。
  • 分层存储:这一个性可能节约存储老本。咱们的应用场景会产生很多传感器的原始数据,须要作为冷数据存储。借助分层存储,咱们能够间接将这些冷数据存储在价格更低的存储服务中,也无需开发额定的服务来存储数据。
  • MQTT/MoP:Pulsar 对各种协定的兼容展现了社区的开放性。在 MoP 公布前,咱们开发了 MQTT 协定的转发工具,把 MQTT 协定上的数据转发到 Pulsar 中。
  • Pulsar Schema:咱们的平台通过 JSON 来形容数据 schema,通过对接 Pulsar Schema 和咱们本人的 Schema Registry,能够实现音讯序列化的工作。目前 Pulsar 在 Go Schema 的性能仍处于起步阶段,咱们也会尝试做一些实际与奉献。
  • 多语言:咱们很看重多语言反对,尤其是 Go 语言。Pulsar 有 Go 语言相应的客户端、Go function runtime、基于 Go 语言实现的 Pulsarctl 等。咱们也心愿 Pulsar 将来能够反对更多语言,因为咱们不能预感客户的需要,反对多语言可能帮忙咱们更轻松地解决问题。
  • Pulsar Manager & Dashboard:Pulsar 在各个层级都启动了接口来获取 Metrics。Pulsar 的其余工具(如 Prometheus、Grafana、Pulsar Manager)可能帮忙咱们加重运维、优化、排错的投入。
  • 开源:Pulsar 社区凋谢、沉闷、敌对。有 StreamNative 这样的公司做撑持,用户能够释怀地抉择 Pulsar,把业务迁徙到 Pulsar 上。

深刻理解 Pulsar 后,咱们决定对 Pulsar 进行测试,并尝试迁徙一个生产环境的利用。

迁徙试验:楼宇智慧用电

楼宇智慧用电是咱们在用电剖析和预测畛域做的一次尝试,咱们心愿采集到办公室中每一个用电点的用电信息。在研究院新办公楼装修初期,咱们进行了技术评估,将应用 zigbee 协定的智能插座列入了装修计划。整个部署蕴含三层楼,约 700 个智能插座和 50 个 zigbee 网关。插座部署在办公场合的所有用电点,蕴含工位插座、墙壁插座以及中央空调风机插座。所有数据通过智能插座厂商提供的局域网播送计划,将播送数据转发到 Pulsar 中实现数据点的采集和预处理。目前用电量数据每 10 秒钟上送一次,其余与用户相干的操作(包含开关插座、插拔用电设施)则实时上送。针对这些数据,咱们做了一些数据可视化的尝试,并把数据奉献给研究院的其余团队进行剖析,或用作开发算法的参考信息和原始数据。

基于智能插座设施厂商提供的 MQTT 计划,咱们尝试将 MQTT 协定的数据都转发到 Pulsar 中。在转发过程中,咱们遇到的次要问题是 MQTT topic 和 Pulsar topic 的映射。咱们的解决方案是间接把所有的 MQTT 数据转发到同一个 Pulsar topic 中,同时把局部元数据包装在转发的音讯中,再通过 Pulsar Functions 做音讯路由,把音讯转发到不同的业务 topic 中。下图展现了如何将传感器产生的数据传送至平台并最终入库。

在从 MQTT 转发数据到 Pulsar 的过程中,咱们默认把所有设施的数据都转发到同一个 topic 中,并通过 verificate function 进行验证(包含解密和内容查看),保障数据的合法性。非法的数据会被转发到一个两头 topic 期待音讯路由散发,音讯散发的 function 会从数据中解析出设施类型和音讯类型,再转发到对应业务 topic 中,期待被对应业务 topic 绑定的 ETL function 做解决。在应用 ETL function 解决时,咱们也会依据设施类型提取不同的数据,对网关设施提取网关状态、设施信息,对插座提取用电数据和插座的状态信息。这些信息会匹配咱们平台的 Schema Registry 数据结构,咱们再把生成的数据做 Schema Mapping(通过 Functions 实现),最初对立转发这些结构化的数据到 sink topic 中,由 sink function 写入到数据库。

楼宇智慧用电的迁徙测试无力验证了 Pulsar 合乎咱们的需要。在迁徙过程中,咱们查阅了 Pulsar 文档,从社区取得了大力支持和帮忙,迁徙过程高效、顺利。借助 Functions 的凋谢与便当,咱们很快实现了流程图中所有 function 的开发和调试,上线了整个业务零碎。

在业务迁徙过程中,Pulsar 运行状态良好,团队统一认为 Pulsar 能够帮忙咱们加重开发和运维累赘,所以咱们抉择 Pulsar 作为钻研核心惟一的消息中间件服务,咱们的小团队也开始追随 Pulsar 一起进行一系列云原生迁徙和优化工作。

决定计划后,咱们将 Apache Pulsar 进一步利用到电网智能传感和智能变电所的场景,这些场景都与物联网、能源和电力相干。下文将具体介绍咱们如何应用 Pulsar 和 Pulsar Functions,以及如何通过 Pulsar Functions 简化传感器数据流的相干解决。

Pulsar x 电网智能传感

电网智能传感场景次要基于清华大学能源互联网翻新研究院与电网公司单干的输电线路智能多参数传感器集成钻研我的项目。该项目标传感器来自不同的厂家,散布在输电线路的各个地位,传感器类型因而也不尽相同,包含杆塔、杆塔上、输电线路侧等十多种。整个零碎目前接入总长度约六百公里,蕴含六百多个杆塔的输电线路传感器。这一场景次要负责对各种传感器的数据进行在线监测和告警,同时,咱们也独自针对电压传感器做了暂态电压剖析。

这个利用场景有两个难点:一是来自不同厂商的传感器没有对立的通信协议,有的应用电力相干的 IEC104 规约,有的应用 protobuf 或其余厂商自定义协定;二是我的项目数据量比拟大,有些传感器可能会单次产生 20 MB 甚至更大的音讯,有些传感器则每秒上传一次数据。

借助 Pulsar,咱们抉择在 producer 端不做任何数据处理,间接将数据转发到 Pulsar 中,再通过 Pulsar Functions 做进一步的数据预处理和其余业务操作。以电压传感器为例,电压传感器会产生三类数据,别离是心跳数据、稳态波形数据和暂态波形数据。其中心跳数据和稳态波形数据通过 protobuf 协定传输,暂态数据则通过 zip 压缩文件的模式传输。接管到 protobuf 的数据后,借助 Pulsar Functions 进行一系列的数据处理,包含通过解密 function 实现数据解密和 protobuf 的反序列化,再对数据进行路由,通过对应的 ETL function 做数据处理和解析,最初通过 Schema Mapping 将数据入库。咱们把这个流程的每一步都封装成独立的 Pulsar function,这样做出于三点思考:

  • 咱们心愿监控到整个数据流过程中每一个环节的状态,采集每个过程的 metrics,并且观测一些重点指标,比方是否存在 backlog 积压。状态监测不便咱们调整每个环节 function 的并行数量。
  • 使整个数据流更加灵便,便于咱们在不同流程中新增和删除 function。
  • 更大程度地保障了咱们能够重用本人保护的 function。

这个计划也遇到了一些小艰难,比方因为 function 比拟多,咱们须要花更多工夫部署、保护每一个过程的两头 topic。目前,咱们的解决方案是间接写对应的代码一次性实现部署和保护。尽管须要投入更多精力,但咱们认为这种 function 的开发和部署模式是值得的。上文提到电压传感器除了会产生 protobuf 的两种数据外,还会产生一种暂态数据。暂态数据个别在电网产生故障或异样时产生,相似电力系统的快照,记录故障产生前到产生时,再到产生后的波形状态。在电力系统中,暂态数据通常有规范的存储计划和特定的解析接口。绝对于传感器产生的其余数据来说,这类数据的特点是比拟大,动辄几十兆。咱们应答暂态数据的计划是先解压缩这些数据,再剖析数据文件。这里咱们借助了 Pulsar Functions 多语言反对的个性,流程图中的蓝色局部应用 Go function 实现,黄色局部应用 Python 实现,Python 有一个解析电网暂态数据的库,能够调用,就免去了咱们本人花工夫实现一套 Go 版本解析接口的工作。

Pulsar x 智能变电所

智能变电所是咱们在变电零碎中变电环节的一些尝试,这个我的项目基于咱们单干的智能输变电设施厂商,心愿基于开关柜等变电所设施实现变电所的数据接入。这个我的项目的次要指标是实现实时监测、故障诊断和异样监测这三大性能。

在智能变电所的场景中,通常由设施生产厂商提供设施的故障诊断算法或诊断利用,咱们须要将不同性质的算法或利用集成到现有计划中。客户提供的算法可能间接在 Pulsar Functions 中调用,也可能是曾经编译好的可执行文件,甚至可能是其余语言的实现,比方 R 语言。针对这一系列问题,咱们先把客户提供的实现封装在 Docker 容器中,在容器中实现一个最小的 Pulsar function runtime,再通过 Docker proxy function 和 Docker endpoint 沟通,在触发 function 时创立对应算法的容器实现计算,最初将后果回传到 Pulsar 对应的 topic 中。

另外,在这一场景中咱们也遇到了一些利用层面的需要,比方音讯推送。咱们借助 Pulsar Functions 实现了一些业务性能,在 Functions 中能够很不便地调用不同服务商的接口,实现音讯推送,比方短信、邮件、应用程序的推送服务。此外,通过 Pulsar Functions,咱们得以把音讯推送的业务需要从平台中解藕进去,把服务做成 function,便于后续在有同样需要的场景中间接应用。

应用 Pulsar 遇到的问题及解决方案

咱们在应用 Pulsar 的过程中遇到了一些问题,下文会分享解决这些问题的一些教训,心愿能够对筹备或者曾经在应用 Pulsar 的同学提供一些帮忙。

第一个是对于 Pulsar 默认音讯大小的问题。在默认配置下,Pulsar 反对的最大音讯是 5 MB,在上文提到的智慧电网案例中,单条音讯有时会超过 20 MB。咱们依据文档批改了 broker 配置文件中的 MaxMessageSize 参数,但批改的配置并没有失效,超过 5 MB 的音讯仍然不能失常传递到 Pulsar 中。于是咱们在 Pulsar 社区寻求帮忙,失去了社区的迅速回应。这个问题的次要起因是 Pulsar 2.4.0 中 MaxMessageSize 没有同步到 BookKeeper,所以即便 broker 能够接管更大的音讯,broker 依然不能把消息传递到负责存储的 BookKeeper 中。因而除了批改 MaxMessageSize 值外,还须要批改 broker 和 BookKeeper 中 nettyFrameSizeBytes 相干配置,这些配置保持一致,Pulsar 就能够解决更大的单条音讯。

第二个问题是咱们在应用 Pulsar Functions 解决数据时,topic 中可能会呈现 backlog 积压越来越多的状况。Backlog 包含没有发送给 Functions(consumer)的数据,也包含已发送但未被 Functions(consumer)ack 的数据。依据咱们的教训,在 Functions 场景下,音讯积压可能是因为 function 解决单条音讯的速度慢,解决工夫长,或者 function 解体。如果是因为 function 解决音讯慢,一种解决方案是减少 function 的并行数量,再具体分析执行速度慢的起因并进行优化;另一种计划是把简单的 function 分成多个简略的 function,也就是在智能电网场景中提到的把一个简单的 function 拆成多个 function,通过 function 的链式模式把整个流程链接起来。这样咱们能够很不便地观测每一个 function 的状态,也能够针对某个 function 做进一步的优化。如果因为 function 解体造成 backlog 积压,则须要保障 function 的稳定性,并借助 function 的 log topic 进行调试。

第三个问题是当 producer 数量减少时,很难对立治理和观测每个 producer 的状态,即 producer 与 broker 之间的通信状态和 producer 与数据源之间的通信状态。针对这个问题,咱们目前的解决方案是给 producer 减少心跳音讯到对应的心跳 topic 做整体监控,同时,监控 producer 和 broker 的状态连贯。通过这些改变,咱们能够较好地聚合观测 producer 的运行状态。咱们留神到 GitHub 上也在探讨相似问题,期待和社区一起提出更优良的解决方案。

期 待

咱们期待 Pulsar 能改善或减少以下性能。

  • Pulsar Functions Mesh 实现了对 function 进行相似于 Kubernetes 的服务编排,咱们期待该性能的公布。上文提到咱们实现了链式 function 的解决方案,但这种形式在保护上遇到很大挑战,心愿 Functions Mesh 能够解决这个问题。
  • 心愿 Pulsar functions 反对更多语言的 runtime。咱们用 function 做 Docker proxy function,这个计划尽管可行,但心愿有更优良的解决方案。
  • IoT 场景很重视边缘计算,咱们心愿 Pulsar 能够在边缘计算上做一些尝试。咱们关注到 Pulsar 容许将 Functions 的音讯推送到另一个 Pulsar 集群中,容许 Functions 与内部 Pulsar 集群通信。通过这一改变,能够尝试将 Pulsar 部署到边缘设施上,并应用 Pulsar Functions 在这些设施上进行计算。部署 Pulsar 对内存的需要较大,在一些运算能力较弱的边缘设施上部署 Pulsar 比拟艰难,心愿 Pulsar 能在后续版本中优化或提供其余计划解决这一困扰。

结 语

作为一个开源我的项目,Pulsar 正在疾速倒退,文档更新迅速,社区响应及时,社区规模一直壮大。咱们心愿深刻理解 Pulsar,参加 Pulsar 开发奉献,和社区分享咱们的实践经验,与 Pulsar 社区独特倒退。

在应用 Pulsar 的过程中,咱们遇到一些困惑,感激 StreamNative 团队小伙伴们的大力支持,帮忙咱们顺利将 Pulsar 利用到上述业务场景中。将来,咱们会踊跃尝试 Pulsar 的各种新性能,并将 Pulsar 利用于更多的能源互联网场景中。

作者简介

胡军,清华大学电机系副教授,清华大学能源互联网翻新研究院能源大数据与凋谢生态钻研核心执行主任,IEEE Member,CIGRE Member。

相干浏览

  • 案例 | Apache Pulsar 助力江苏挪动重塑 5G 时代计费撑持零碎
  • 案例 | Apache Pulsar 在腾讯 Angel PowerFL 联邦学习平台上的实际
  • Apache Pulsar 在 BIGO 的性能调优实战(上)
  • 新性能详解:Pulsar Function Mesh

点击 链接,获取 Apache Pulsar 硬核干货合集

退出移动版