关于apache:博文推荐|使用-Apache-Pulsar-构建边缘应用程序

4次阅读

共计 7531 个字符,预计需要花费 19 分钟才能阅读完成。

本文由 StreamNative 组织 Apache Pulsar 中文社区志愿者翻译。原文来自 StreamNative 英文博客《Building Edge Applications With Apache Pulsar》,作者 Tim Spann,StreamNative 布道师。译者:YOLO,就任于 BSC BOMC ORP 的 bomc 团队。原文链接:https://streamnative.io/blog/…

近年来,近程连贯设施的爆炸性增长为集中式计算范式带来了挑战。受到网络和基础设施的限度,企业越来越难以在不呈现提早或性能问题的状况下,在数据中心或云中挪动和解决所有设施生成的数据。因而,边缘应用程序逐步衰亡。据 Gartner 预计,到 2025 年,企业将在数据中心或云之外创立和解决 75% 的数据。

那么什么是边缘应用程序?边缘利用程序运行在数据源上或其左近,如物联网设施、本地边缘服务器、边缘执行。边缘计算使计算、存储、缓存、治理、告警、机器学习和路由都可能在数据中心和云之外进行。批发、农业、制作、运输、医疗和电信等行业通过采纳边缘应用程序,从而实现更低的提早、更好的带宽、更低的基础设施老本和更高效的决策。

本文将为大家介绍开发边缘应用程序所面临的一些挑战,以及 Apache Pulsar 利用于边缘应用程序的解决方案。本文还将分享一个示例,逐渐展现如何应用 Pulsar 构建边缘应用程序。

要害挑战

边缘计算的分散性在带来许多益处的同时也带来了挑战,其中次要包含:

  • 边缘应用程序通常须要反对各种设施、协定、语言和数据格式。
  • 来自边缘应用程序的通信须要与来自传感器、日志和应用程序的事件流以疾速但不平均的速度进行异步。
  • 数据的边缘生产者依据设计要求须要部署不同的消息传递集群。
  • 从设计上看,边缘应用程序在天文上具备分散性和多样性的特点。

解决办法

须要一个适应性强、混合、反对天文复制且可扩大的开源解决方案,以可能解决构建边缘应用程序所面临的问题。领有泛滥用户的开源我的项目能够提供宽泛的社区反对,以及边缘应用程序所需的丰盛生态系统,包含适配器、连接器和扩大等。在过来二十年中,基于我与不同技术和开源我的项目的单干教训,我置信 Apache Pulsar 满足了边缘应用程序的需要。

Apache Pulsar 是一个开源、云原生、分布式音讯流平台。自 2018 年 Pulsar 成为 Apache 软件基金会顶级我的项目以来,它的社区参加、周边生态增长和寰球使用率都飞速增长。Pulsar 之所以可能解决边缘计算中存在的诸多挑战,归功于以下几点:

  • Apache Pulsar 反对多种 Schema 下的疾速消息传递、元数据和多种数据格式。
  • Pulsar 反对 Go、C++、Java、Node.js、Websockets 和 Python 等多语言客户端。此外,还有社区开发者提供的 Haskell、Scala、Rust 和.Net 开源客户端,以及 Apache Flink 和 Apache Spark 的流解决库。
  • Pulsar 反对多种音讯协定,包含 MQTT、Kafka、AMQP 和 JMS。
  • Pulsar 的跨地区复制性能解决了分布式设施的地位问题。
  • Pulsar 云原生的架构让其能够在多云、本地或 Kubernetes 环境中运行。它还能够适配小型边缘网关,以及像 NVIDIA Jetson Xavier NX 这样弱小的设施。

在本示例中,咱们在 NVIDIA Jetson Xavier NX 上构建边缘应用程序,它为咱们运行边缘 Apache Pulsar 单机 broker、多个 web 摄像头和深度学习边缘应用程序提供了足够的能力。我的边缘设施蕴含 384 个 NVIDIA CUDA® 内核和 48 个 Tensor 内核、6 个 64 位 ARM 内核和 8 GB 128 位 LPDDR4x RAM。在后续博客中,我将向大家展现,即便在 Raspberry PI 4s 和 NVIDIA Jetson Nano 等更为简略的设施上运行 Pulsar,依然能够满足疾速边缘事件流的须要。

架构

上文介绍了解决方案的物理构造,那么当初的问题是如何对传入数据有逻辑地搭建利用架构。对于不相熟 Pulsar 的人,首先须要理解到每个主题都属于租户和命名空间,如下图所示。

这些逻辑构造反对咱们依据各种规范(如数据的原始起源和不同的业务)将数据进行分组。一旦咱们决定了租户、命名空间和主题,咱们就须要确定收集剖析所需的额定数据所需字段。

接下来,咱们须要确定数据的格局。依据不同的架构,它能够与原始格局雷同,也能够依据传输、解决或存储的具体要求进行转换。此外,在许多状况下,咱们的设施、设施、传感器、操作系统或传输模式会要求咱们抉择特定的数据格式。

在本文中,咱们将应用 JSON 数据格式,它能够满足简直任何语言和少数人的可读性需要。此外,Apache Avro 作为一种二进制格局,也不失为一个不错的抉择,但我的系列博客会抉择最简略的格局。

选定数据格式之后,咱们可能须要在传感器、机器学习分类、日志或其余起源之外增加额定字段来丰盛原始数据。我喜爱增加 IP 地址、mac 地址、主机名、创立工夫戳、执行工夫,以及一些对于设施运行状况的字段,如磁盘空间、内存和 CPU。如果您认为没有必要,或者您的设施曾经广播设备运行状况,则能够适当增减。尤其是当领有数千台设施时,这些字段能够帮忙咱们调试程序。因而,我习惯在带宽容许的状况下增加这些数据。

咱们须要为事件记录找到主键或惟一标识符,物联网数据通常没有自带。咱们能够在创立记录时用 UUID 生成器合成一个。

领有个字段列表后,咱们须要为数据设置一个 schema,并确定字段名、类型、默认值和是否为空。一旦定义了一个 schema,咱们就能够应用 JSON schema 或应用字段构建一个类,进而应用 Pulsar SQL 查问主题中的数据。对于物联网应用程序而言,通常会用到此类事件的工夫序列主数据存储。我举荐 Aerospike、InfluxDB 或 ScyllaDB。咱们能够依据场景和需要应用 Pulsar IO Sink 连接器或其余机制。必要时,咱们还能够应用 Spark 连接器、Flink 连接器或 NiFi 连接器。

咱们的最终事件会和如下所示的 JSON 示例相似。

{"uuid": "xav_uuid_video0_lmj_20211027011044", "camera": "/dev/video0", "ipaddress": "192.168.1.70", "networktime": 4.284832000732422, "top1pct": 47.265625, "top1": "spotlight, spot", "cputemp": "29.0", "gputemp": "28.5", "gputempf": "83", "cputempf": "84", "runtime": "4", "host": "nvidia-desktop", "filename": "/home/nvidia/nvme/images/out_video0_tje_20211027011044.jpg", "imageinput": "/home/nvidia/nvme/images/img_video0_eqi_20211027011044.jpg", "host_name": "nvidia-desktop", "macaddress": "70:66:55:15:b4:a5", "te": "4.1648781299591064", "systemtime": "10/26/2021 21:10:48", "cpu": 11.7, "diskusage": "32367.5 MB", "memory": 82.1}

边缘生产者

接下来咱们在 NVIDIA Jetson Xavier NX 上测试一些库、语言和客户端,看看哪个最适宜咱们的场景。在 NVIDIA Jetson Xavier NX 版本 ARM 装置 Ubuntu 零碎运行多个库进行原型设计之后,我发现了如下技术选项,它们能够生成合乎我的应用程序所需的音讯。对于这个边缘平台来说,这些尽管不是惟一门路,但不失为十分好的抉择。

  • Go Lang Pulsar Producer
  • Python 3.x Websocket Producer
  • Python 3.x MQTT Producer
  • Java 8 Pulsar Producer
  • Go Lang Pulsar Producer

Go 语言 Pulsar 生产者

package main

import (
        "context"
        "fmt"
        "log"
        "github.com/apache/pulsar-client-go/pulsar"
        "github.com/streamnative/pulsar-examples/cloud/go/ccloud"
       "github.com/hpcloud/tail"
)

func main() {client := ccloud.CreateClient()

    producer, err := client.CreateProducer(pulsar.ProducerOptions{Topic: "jetson-iot",})
    if err != nil {log.Fatal(err)
    }
    defer producer.Close()

    t, err := tail.TailFile("demo1.log", tail.Config{Follow:true})
        for line := range t.Lines {if msgId, err := producer.Send(context.Background(),
&pulsar.ProducerMessage{Payload: []byte(line.Text),
        }); err != nil {log.Fatal(err)
        } else {
            fmt.Printf("jetson:Published message: %v-%s \n",
msgId,line.Text)
        }
    }
}

Python3 Websocket 生产者

import requests, uuid, websocket, base64, json

uuid2 = uuid.uuid4()
row = {}
row['host'] = 'nvidia-desktop'
ws = websocket.create_connection('ws://server:8080/ws/v2/producer/persistent/public/default/energy')
message = str(json.dumps(row) )
message_bytes = message.encode('ascii')
base64_bytes = base64.b64encode(message_bytes)
base64_message = base64_bytes.decode('ascii')
ws.send(json.dumps({ 'payload' : base64_message, 'properties': { 'device' : 'jetson2gb', 'protocol' : 'websockets'},'key': str(uuid2), 'context' : 5 }))
response =  json.loads(ws.recv())
if response['result'] == 'ok':
            print ('Message published successfully')
else:
            print ('Failed to publish message:', response)
ws.close()

带有 Schema 的 Java Pulsar 生产者

public static void main(String[] args) throws Exception {JCommanderPulsar jct = new JCommanderPulsar();
        JCommander jCommander = new JCommander(jct, args);
        if (jct.help) {jCommander.usage();
            return;
        }
        PulsarClient client = null;

        if (jct.issuerUrl != null && jct.issuerUrl.trim().length() >
0 ) {
            try {client = PulsarClient.builder()
                        .serviceUrl(jct.serviceUrl.toString())
                        .authentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(jct.issuerUrl.toString()),new URL(jct.credentialsUrl.toString()), jct.audience.toString())).build();} catch (PulsarClientException e) {e.printStackTrace();
            } catch (MalformedURLException e) {e.printStackTrace();
            }
        }
        else {
            try {client = PulsarClient.builder().serviceUrl(jct.serviceUrl.toString()).build();} catch (PulsarClientException e) {e.printStackTrace();
            }
        }

        UUID uuidKey = UUID.randomUUID();
        String pulsarKey = uuidKey.toString();
        String OS = System.getProperty("os.name").toLowerCase();
        String message = "" + jct.message;
        IoTMessage iotMessage = parseMessage("" + jct.message);
        String topic = DEFAULT_TOPIC;
        if (jct.topic != null && jct.topic.trim().length()>0) {topic = jct.topic.trim();
        }
        ProducerBuilder<IoTMessage> producerBuilder = client.newProducer(JSONSchema.of(IoTMessage.class))
                .topic(topic)
                .producerName("jetson").
                sendTimeout(5, TimeUnit.SECONDS);

        Producer<IoTMessage> producer = producerBuilder.create();

        MessageId msgID = producer.newMessage()
                .key(iotMessage.getUuid())
                .value(iotMessage)
                .property("device", OS)
                .property("uuid2", pulsarKey)
                .send();
        producer.close();
        client.close();
        producer = null;
        client = null;
    }

   private static IoTMessage parseMessage(String message) {

        IoTMessage iotMessage = null;

        try {if ( message != null && message.trim().length() > 0) {ObjectMapper mapper = new ObjectMapper();
                iotMessage = mapper.readValue(message, IoTMessage.class);
                mapper = null;
            }
        }
        catch(Throwable t) {t.printStackTrace();
        }

        if (iotMessage == null) {iotMessage = new IoTMessage();
        }
        return iotMessage;
    }

java -jar target/IoTProducer-1.0-jar-with-dependencies.jar --serviceUrl pulsar://nvidia-desktop:6650 --topic 'iotjetsonjson' --message "...JSON…"

你能够在这里找到所有的源代码。

当初,咱们决定如何在设施上执行应用程序。能够应用零碎附带的调度器,如 cron 或一些附加组件。作为参考,我常常应用 cron、MiNiFi 代理、Shell 脚本,或者将应用程序作为服务间断运行。您须要自行配置您的设施和传感器以获得最佳调度。

验证数据并进行监控

当初,咱们有了源源不断的事件流进入咱们的 Pulsar 集群,咱们能够验证数据并监控停顿。以 StreamNative Cloud Manager 界面为例,如下图所示。咱们还能够抉择查看此处记录的 Pulsar 指标端点。

通过 REST 查看统计数据

  • http://:8080/admin/v2/persist…
  • http://:8080/admin/v2/persist…

通过 Admin CLI 查看统计信息

bin/pulsar-admin topics stats-internal persistent://public/default/mqtt-2

查找主题所在的订阅

http://nvidia-desktop:8080/ad…

通过 REST 从订阅中生产

http://nvidia-desktop:8080/ad…

通过 CLI 生产音讯

bin/pulsar-client consume "persistent://public/default/mqtt-2" -s "mqtt2" -n 5

通过 Pulsar SQL 查问主题

select * from pulsar."public/default".iotjetsonjson;

后续步骤

当初,咱们曾经构建了一个边缘应用程序,它能够以事件速度传输数据,并将数千个其余应用程序的流数据连贯到 Apache Pulsar 集群中。接下来,咱们能够应用 Flink SQL 增加丰盛的实时剖析。由此咱们能够进行高级流解决、整合事件流以及进行大规模数据处理。

延长浏览

如果您有趣味理解无关边缘利用的更多信息并构建本人的连接器,请参阅以下资源:

  • Using the FLiPN Stack for Edge AI (Flink, NiFi, Pulsar)
  • PPT 下载地址
  • Pulsar 客户端库
  • 示例源数据
  • InfluxDB Pulsar IO sink connector

关注公众号「Apache Pulsar」,获取更多技术干货

退出 Apache Pulsar 中文交换群 👇🏻

正文完
 0