乐趣区

关于后端:Rust中channel的使用

对于 Rust 中的 channel

Rust 的 channel 是一种用于在不同线程间传递信息的通信机制,它实现了线程间的消息传递。

Channel 容许在 Rust 中创立一个消息传递渠道,它返回一个元组构造体,其中蕴含发送和接收端。发送端用于向通道发送数据,而接收端则用于从通道接收数据。

每个 channel 由两局部组成:发送端(Sender)和接收端(Receiver)。

发送端用于向 channel 发送音讯,而接收端则用于接管这些音讯。这种机制容许线程之间的平安通信,防止了共享内存的复杂性和潜在的数据竞争问题。(通过通信来共享内存, 而非通过共享内存来通信)

Rust 的 channel 为线程间通信提供了一种平安、简略的形式,是构建并发利用的根底工具之一。

channel是 Rust 规范库的一部分,自 Rust 1.0 版本以来就蕴含了这个性能。随着 Rust 语言和规范库的倒退,channel的实现和 API 可能会有所改进,但其基本概念和用法保持一致。

应用形式

根本步骤如下:

  1. 创立 : 应用std::sync::mpsc::channel() 函数创立一个新的 channel,这个函数返回一个蕴含发送端(Sender) 和接收端 (Receiver) 的元组。
  2. 发送 : 应用发送端的send 办法发送音讯。send办法承受一个音讯值,如果接收端曾经被抛弃,会返回一个谬误。
  3. 接管 : 应用接收端的recv 办法接管音讯。recv会阻塞以后线程直到一个音讯可用,或者 channel 被敞开。

示例

以下是一个应用 channel 在两个线程间发送和接管音讯的简略例子:

use std::sync::mpsc;
use std::thread;

fn main() {
    // 创立一个 channel
    let (tx, rx) = mpsc::channel();

    // 创立一个新线程,并向其中发送一个音讯
    thread::spawn(move || {
        let msg = "Hello from the thread";
        tx.send(msg).unwrap();
        println!("Sent message: {}", msg);
    });

    // 在主线程中接管音讯
    let received = rx.recv().unwrap();
    println!("Received message: {}", received);
}

下面例子展现了 channel 的根本办法:先创立一个channel,而后在一个新线程中发送一个字符串音讯,并在主线程中接管这个音讯。

留神: 发送端 tx 通过 move 关键字挪动到新线程中,这是因为 Rust 的所有权规定要求确保应用数据的线程领有该数据的所有权。

对于 MPSC

其中 mpsc 是 Multi producer, Single consumer FIFO queue 的缩写, 即 多生产者单消费者先入先出队列

Rust 规范库提供的 channel 是 MPSC(多生产者,单消费者)模型,这意味着能够有多个发送端(Sender)向同一个接收端(Receiver)发送音讯。这种模式十分实用于工作队列模型,其中多个生产者线程生成工作,而单个消费者线程解决这些工作。

除了 MPSC 之外, 还有如下几种模型:

  • SPSC(Single Producer Single Consumer): 单生产者单消费者。
  • SPMC(Single Producer Multiple Consumer): 单生产者多消费者。
  • MPSC(Multi Producer Single Consumer): 多生产者单消费者, Rust 中规范的 mpsc 模型。
  • MPMC(Multi Producer Multi Consumer)*: 多生产者多消费者。

MPSC 是规范库中应用的模型

不须要阻塞吗?

主线程是否会立马完结退出程序?

在下面的示例中,如果主线程执行得太快,有可能在接管到 子线程发送音讯之前就完结了,没打印出接管到的内容程序就退出了.

但事实上, 并没有产生这种景象. 即使在新过程段增加休眠 3s 的代码,thread::sleep(std::time::Duration::from_secs(3));, 程序也不会提前退出.

对于 Rust 中程序的休眠, 可参考 Rust 中程序休眠的几种形式

这是因为,recv办法是阻塞的,即 它会阻塞以后线程, 直到从通道中接管到音讯。

因而,在下面例子中,主线程在调用 rx.recv().unwrap() 时会阻塞 期待音讯的到来。一旦子线程通过 tx.send(msg).unwrap(); 发送了音讯,主线程会接管到这个音讯并继续执行,之后程序才会失常退出。

摸索更多阻塞形式

能够应用 join 办法,来确保主线程期待一个或多个子线程实现执行。这在解决多个线程时特地有用。

use std::sync::mpsc;
use std::thread;

fn main() {let (tx, rx) = mpsc::channel();

    // 创立一个新线程,并保留其句柄
    let handle = thread::spawn(move || {
        let msg = "Hello from the thread";
        tx.send(msg).unwrap();
        println!("Sent message: {}", msg);
    });

    // 在主线程中接管音讯
    let received = rx.recv().unwrap();
    println!("Received message: {}", received);

    // 应用 join 期待子线程实现
    handle.join().unwrap();
}

thread::spawn返回一个 JoinHandle,通过调用这个句柄的join 办法来确保主线程在子线程实现其执行之后才继续执行

然而因为 recv 办法 自身就是阻塞的,曾经确保了主线程会期待至多一个音讯的到来,这时再应用 join 看起来没有太大必要。

但当有多个线程执行独立工作,且这些工作不肯定波及到主线程立刻须要的通道通信时,join 的作用就变得非常明显了, 如下示例展现了如何创立多个线程,并应用 join 确保它们都实现了工作:

use std::thread;
use std::time::Duration;

fn main() {
    // 创立一个向量来存储子线程的句柄
    let mut handles = vec![];

    for i in 0..10 {
        // 创立 10 个子线程
        let handle = thread::spawn(move || {println!("Thread {} is starting", i);
            println!("--------------");
            // 模仿工作负载, 耗时 1s
            thread::sleep(Duration::from_secs(1));
            println!("Thread {} has finished", i);
            println!("~~~~~~~~~~~~~~");
        });
        handles.push(handle);
    }


    // 期待所有子线程实现
    for handle in handles {handle.join().unwrap();}

    println!("All threads have finished");
}

输入:

Thread 0 is starting
--------------
Thread 1 is starting
--------------
Thread 3 is starting
--------------
Thread 2 is starting
--------------
Thread 4 is starting
--------------
Thread 5 is starting
--------------
Thread 6 is starting
--------------
Thread 7 is starting
--------------
Thread 9 is starting
--------------
Thread 8 is starting
-------------- (到此都是立即打印进去; 上面的输入等 1s 后一股脑打印进去)
Thread 0 has finished
~~~~~~~~~~~~~~
Thread 1 has finished
Thread 2 has finished
Thread 5 has finished
~~~~~~~~~~~~~~
~~~~~~~~~~~~~~
Thread 4 has finished
~~~~~~~~~~~~~~
Thread 6 has finished
~~~~~~~~~~~~~~
Thread 3 has finished
~~~~~~~~~~~~~~
~~~~~~~~~~~~~~
Thread 7 has finished
~~~~~~~~~~~~~~
Thread 8 has finished
~~~~~~~~~~~~~~
Thread 9 has finished
~~~~~~~~~~~~~~
All threads have finished

在这个例子中创立了 10 个子线程,每个子线程都模仿执行一些操作,而后在主线程中应用一个循环来 join 这些线程。

通过这种形式,即便这些子线程并没有向主线程发送任何音讯,依然可能确保它们都实现了各自的工作,而后程序才会退出。这就是 join 在解决多个线程时的劣势所在。

应用 join 确保主线程期待所有子线程实现其工作,这在解决并行计算、执行多个独立工作时特地重要,因为这些工作可能不会立刻或基本不会向主线程报告其实现状态。在这种状况下,如果没有应用join,主线程可能会在子线程实现它们的工作之前完结,导致程序提前退出,而且可能留下未实现的后盾工作。

Rust channel 的更多高阶用法

Rust 中的 channel 不仅仅反对简略的消息传递,还能够用于实现更简单的并发模式和高级用法。这些用法能够减少程序的灵活性和性能,特地是在解决大量数据、多线程工作或须要高度并行的场景中。

选择性接管(Select)

在解决多个 channel 时,可能心愿可能选择性地接管多个起源的音讯。

Rust 的规范库目前并没有间接反对 select 机制,然而 crossbeam-channel 库提供了这样的性能,使得能够从多个 channel 中选择性地接管音讯。

use crossbeam_channel::{select, unbounded};
use std::thread;

fn main() {let (tx1, rx1) = unbounded();
    let (tx2, rx2) = unbounded();

    thread::spawn(move || {tx1.send(1).unwrap();});

    thread::spawn(move || {tx2.send(2).unwrap();});

    select! {recv(rx1) -> msg => println!("Received {} from rx1", msg.unwrap()),
        recv(rx2) -> msg => println!("Received {} from rx2", msg.unwrap()),
    }
}

cargo add crossbeam_channel 增加依赖库,

而后屡次 cargo run, 能够发现, 会在 Received 1 from rx1
和 Received 2 from rx2中随机打印其中一个

如上代码演示了如何在 Rust 中应用 crossbeam-channel 库实现选择性接管(select)机制。该机制容许程序从多个不同的 channel 中接管音讯,而不是被限度在繁多的 channel 上期待。这是通过 select! 宏来实现的,它能够监听多个 channel,并在任一 channel 接管到音讯时立刻响应。

具体来说,代码的性能如下:

  1. 引入库 :首先,引入了crossbeam_channelselectunbounded,以及std::threadcrossbeam_channel 是一个提供了高性能 channel 实现的内部库,包含了 select 机制。unbounded用于创立一个无界(unbounded)的 channel,即没有容量限度的 channel。
  2. 创立无界 channel:通过调用 unbounded() 函数,创立了两个无界 channel,别离是 tx1/rx1tx2/rx2。这里,tx1tx2 是发送端(Sender),而 rx1rx2是接收端(Receiver)。
  3. 发送音讯 :接下来,创立了两个线程,每个线程向各自的 channel 发送一个整数音讯,第一个线程通过tx1 发送 1,第二个线程通过tx2 发送2。这两个线程是并行执行的,因而发送操作是异步的。
  4. 选择性接管音讯 select! 宏用于同时监听 rx1rx2这两个接收端。当任一 channel 接管到音讯时,select!宏会立刻匹配到相应的分支并执行。这里有两个 recv 调用,别离对应两个接收端。一旦任一接收端接管到音讯,对应的代码块就会执行,并打印出接管到的音讯及其起源。msg.unwrap()用于获取 Result 类型中的音讯值,前提是没有产生谬误。

代码中的 select! 宏使得程序不用在繁多的 channel 上阻塞期待,而是能够灵便地解决来自多个源的音讯。这种模式在须要解决多个异步事件源时十分有用,例如在网络服务器或并发零碎中解决来自不同客户端或工作的输出。

有点相似 Go 的 select 语句

迭代器接管

Receiver实现了Iterator,这意味着能够应用迭代器的形式接管所有可用的音讯,直到 channel 被敞开。这种形式简化了接收端的代码,特地是当须要解决所有音讯而不用关怀接管的具体机会时。

use std::sync::mpsc;
use std::thread;

fn main() {let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        for i in 1..=5 {tx.send(i).unwrap();}
    });

    // 通过迭代器接管音讯
    for received in rx {println!("Received: {}", received);
    }
}

输入:

Received: 1
Received: 2
Received: 3
Received: 4
Received: 5

退出移动版