关于rust:如何在-Rust-中使用-MQTT

57次阅读

共计 7048 个字符,预计需要花费 18 分钟才能阅读完成。

Rust 是由 Mozilla 主导开发的通用、编译型编程语言。该语言的设计准则为:平安、并发、实用,反对 函数式、并发式、过程式以及面向对象的编程格调。Rust 速度惊人且内存利用率极高。因为没有运行时和垃圾回收,它可能胜任对性能要求特地高的服务,能够在嵌入式设施上运行,还能轻松和其余语言集成。Rust 丰盛的类型零碎和所有权模型保障了内存平安和线程平安,让您在编译期就可能打消各种各样的谬误。

MQTT 是一种基于公布 / 订阅模式的 轻量级物联网音讯传输协定 ,能够用极少的代码和带宽为联网设施提供实时牢靠的音讯服务,它广泛应用于物联网、挪动互联网、智能硬件、车联网、电力能源等行业。

本文次要介绍如何在 Rust 我的项目中应用 paho-mqtt 客户端库,实现客户端与 MQTT 服务器的连贯、订阅、勾销订阅、收发音讯等性能。

我的项目初始化

本我的项目应用 Rust 1.44.0 进行开发测试,并应用 Cargo 1.44.0 包管理工具进行项目管理,读者可用如下命令查看以后的 Rust 版本。

~ rustc --version
rustc 1.44.0 (49cae5576 2020-06-01)

抉择 MQTT 客户端库

paho-mqtt 是目前 Rust 中,功能完善且应用较多的 MQTT 客户端,最新的 0.7.1 版本反对 MQTT v5、3.1.1、3.1,反对通过规范 TCP、SSL / TLS、WebSockets 传输数据,QoS 反对 0、1、2 等。

初始化我的项目

执行以下命令创立名为 mqtt-example 的 Rust 新我的项目。

~ cargo new mqtt-example
    Created binary (application) `mqtt-example` package

编辑我的项目中的 Cargo.toml 文件,在 dependencies 中增加 paho-mqtt 库的地址,以及指定订阅、公布代码文件对应的二进制文件。

[dependencies]
paho-mqtt = {git = "https://github.com/eclipse/paho.mqtt.rust.git", branch = "master"}

[[bin]]
name = "sub"
path = "src/sub/main.rs"

[[bin]]
name = "pub"
path = "src/pub/main.rs"

Rust MQTT 的应用

创立客户端连贯

本文将应用 EMQ X 提供的 收费公共 MQTT 服务器 作为测试连贯的 MQTT 服务器,该服务基于 EMQ X 的 MQTT 物联网云平台 创立。服务器接入信息如下:

  • Broker: broker.emqx.io
  • TCP Port: 1883
  • Websocket Port: 8083

配置 MQTT Broker 连贯参数

配置 MQTT Broker 连贯地址 (包含端口)、topic (这里咱们配置了两个 topic),以及客户端 id。

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];

编写 MQTT 连贯代码

编写 MQTT 连贯代码,为了晋升应用体验,可在执行二进制文件时通过命令行参数的模式传入连贯地址。通常咱们须要先创立一个客户端,而后将该客户端连贯到 broker.emqx.io

let host = env::args().nth(1).unwrap_or_else(||
    DFLT_BROKER.to_string());

// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
    .server_uri(host)
    .client_id(DFLT_CLIENT.to_string())
    .finalize();

// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {println!("Error creating the client: {:?}", err);
    process::exit(1);
});

// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
    .keep_alive_interval(Duration::from_secs(20))
    .clean_session(true)
    .finalize();

// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {println!("Unable to connect:\n\t{:?}", e);
    process::exit(1);
}

公布音讯

这里咱们总共公布五条音讯,依据循环的奇偶性,别离向 rust/mqttrust/test 这两个主题公布。

for num in 0..5 {let content =  "Hello world!".to_string() + &num.to_string();
    let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
    if num % 2 == 0 {println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
        msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
    } else {println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
    }
    let tok = cli.publish(msg);

            if let Err(e) = tok {println!("Error sending message: {:?}", e);
                    break;
            }
}

订阅音讯

在客户端连贯之前,须要先初始化消费者。这里咱们会循环解决消费者中的音讯队列,并打印出订阅的 topic 名称及接管到的音讯内容。

fn subscribe_topics(cli: &mqtt::Client) {if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {println!("Error subscribes topics: {:?}", e);
        process::exit(1);
    }
}

fn main() {
      ...
    // Initialize the consumer before connecting.
    let rx = cli.start_consuming();
      ...
    // Subscribe topics.
    subscribe_topics(&cli);

    println!("Processing requests...");
    for msg in rx.iter() {if let Some(msg) = msg {println!("{}", msg);
        }
        else if !cli.is_connected() {if try_reconnect(&cli) {println!("Resubscribe topics...");
                subscribe_topics(&cli);
            } else {break;}
        }
    }
      ...
}

残缺代码

音讯公布代码

use std::{
    env,
    process,
    time::Duration
};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// Define the qos.
const QOS:i32 = 1;

fn main() {let host = env::args().nth(1).unwrap_or_else(||
        DFLT_BROKER.to_string());

    // Define the set of options for the create.
    // Use an ID for a persistent session.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(DFLT_CLIENT.to_string())
        .finalize();

    // Create a client.
    let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {println!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    // Define the set of options for the connection.
    let conn_opts = mqtt::ConnectOptionsBuilder::new()
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(true)
        .finalize();

    // Connect and wait for it to complete or fail.
    if let Err(e) = cli.connect(conn_opts) {println!("Unable to connect:\n\t{:?}", e);
        process::exit(1);
    }

    // Create a message and publish it.
    // Publish message to 'test' and 'hello' topics.
    for num in 0..5 {let content =  "Hello world!".to_string() + &num.to_string();
        let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
        if num % 2 == 0 {println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
            msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
        } else {println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
        }
        let tok = cli.publish(msg);

                if let Err(e) = tok {println!("Error sending message: {:?}", e);
                        break;
                }
    }


    // Disconnect from the broker.
    let tok = cli.disconnect(None);
    println!("Disconnect from the broker");
    tok.unwrap();}

音讯订阅代码

为了晋升应用体验,音讯订阅做了断开重连的解决,并在从新建设连贯后对主题进行从新订阅。

use std::{
    env,
    process,
    thread,
    time::Duration
};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_subscribe";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS:&[i32] = &[0, 1];

// Reconnect to the broker when connection is lost.
fn try_reconnect(cli: &mqtt::Client) -> bool
{println!("Connection lost. Waiting to retry connection");
    for _ in 0..12 {thread::sleep(Duration::from_millis(5000));
        if cli.reconnect().is_ok() {println!("Successfully reconnected");
            return true;
        }
    }
    println!("Unable to reconnect after several attempts.");
    false
}

// Subscribes to multiple topics.
fn subscribe_topics(cli: &mqtt::Client) {if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {println!("Error subscribes topics: {:?}", e);
        process::exit(1);
    }
}

fn main() {let host = env::args().nth(1).unwrap_or_else(||
        DFLT_BROKER.to_string());

    // Define the set of options for the create.
    // Use an ID for a persistent session.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(DFLT_CLIENT.to_string())
        .finalize();

    // Create a client.
    let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {println!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    // Initialize the consumer before connecting.
    let rx = cli.start_consuming();

    // Define the set of options for the connection.
    let lwt = mqtt::MessageBuilder::new()
        .topic("test")
        .payload("Consumer lost connection")
        .finalize();
    let conn_opts = mqtt::ConnectOptionsBuilder::new()
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(false)
        .will_message(lwt)
        .finalize();

    // Connect and wait for it to complete or fail.
    if let Err(e) = cli.connect(conn_opts) {println!("Unable to connect:\n\t{:?}", e);
        process::exit(1);
    }

    // Subscribe topics.
    subscribe_topics(&cli);

    println!("Processing requests...");
    for msg in rx.iter() {if let Some(msg) = msg {println!("{}", msg);
        }
        else if !cli.is_connected() {if try_reconnect(&cli) {println!("Resubscribe topics...");
                subscribe_topics(&cli);
            } else {break;}
        }
    }

    // If still connected, then disconnect now.
    if cli.is_connected() {println!("Disconnecting");
        cli.unsubscribe_many(DFLT_TOPICS).unwrap();
        cli.disconnect(None).unwrap();}
    println!("Exiting");
}

运行与测试

编译二进制文件

执行以下命令,会在 mqtt-example/target/debug 目录下生成音讯订阅、公布对应的 subpub 二进制文件。

cargo build

音讯订阅

执行 sub 二进制文件,期待生产公布。

音讯公布

执行 pub 二进制文件,能够看到别离往 rust/testrust/mqtt 这两个主题公布了音讯。


同时在音讯订阅中可看到公布的音讯

至此,咱们实现了应用 paho-mqtt 客户端连贯到 公共 MQTT 服务器,并实现了测试客户端与 MQTT 服务器的连贯、音讯公布和订阅。

版权申明:本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.io/cn/blog/h…

正文完
 0