乐趣区

关于程序员:异步代码的几种写法-Rust学习笔记

作者:谢敬伟,江湖人称“刀哥”,20 年 IT 老兵,数据通信网络专家,电信网络架构师,目前任 Netwarps 开发总监。刀哥在操作系统、网络编程、高并发、高吞吐、高可用性等畛域有多年的实践经验,并对网络及编程等方面的新技术有浓重的趣味。


Rust历史不长,依然处于疾速倒退的历程中。对于异步编程的模式,当初曾经倒退到 async/await 协程的高级阶段。大略是因为 async/await 呈现的工夫还不长,所以现有大多数的开源我的项目并不是或不是纯正应用 async/await 来书写的,而是前前后后有多种的写法。这样的情况给 Rust 的学习带来了一些的难度。在这里,咱们来捋一捋异步代码的几种写法。

mio

最原始的形式是应用 mio 进行开发。mio是一个底层异步 I/O 库,提供非阻塞形式的 API,具备很高的性能。实际上mio 是对于操作系统 epoll/kqueue/IOCP 的封装。在 C/C++ 中咱们应用 libevent 之类的库,mio能够了解为对应的 Rust 版本。基于 mio 的代码大抵如下:

 loop {
     // Poll Mio for events, blocking until we get an event.
     poll.poll(&mut events, None)?;

     // Process each event.
     for event in events.iter() {if event.is_writable() {// socket 可写,开始发送数据}

         if event.is_readable() {// socket 可读,开始接收数据}
         // socket 敞开,退出循环
         return Ok(());
     }
 }    

总的来说,这是齐全基于异步事件告诉的写法,和 C/C++ 区别不是很大,异步代码对于程序员是一个挑战,当代码逻辑越来越简单,增加新性能或是解决已有问题的难度也越来越大。

另外,mio实现的是一个单线程事件循环,尽管能够解决成千上万路的 I/O 操作,但没有多线程的能力,须要本人裁减。

Future Poll

为了更好地标准异步的逻辑,Rust形象出 Future 示意尚未产生的事物。这些 Future 能够用很多形式組合成一个更简单的复合 Future 来代表一系列的事件。Future须要程序被动去 poll(轮询) 能力获取到最终的后果,每一次轮询的后果可能是 Ready 或者Pending

运行库提供 ExecutorReactor来执行 Future,也就是调用Futurepoll办法循环执行一系列就绪的 Future,当Future 返回 Pending 的时候,会将 Future 转移到 Reactor 上期待唤醒。Reactor被用来负责唤醒之前无奈实现的 Future。事实上,tokioReactor是基于 mio 实现的,而 async-std/smol 则是封装了epoll/kqueue/IOCP,提供相似的性能。

手动实现 Future 是一件绝对繁琐的工作,次要的问题在于异步模式自身的个性。例如,接管网络数据,无奈臆测每次轮询会收到多少字节的数据,往往须要开拓一段接收缓冲区包容数据,协定解码也须要一个状态机拼包向下层提交;发送网络数据存在类似问题,发送数据时底层未就绪,则缓冲发送数据,待下次轮询时,须要首先查看并解决发送缓冲区。另外还有一些值得注意的中央,如果手动实现的 Future 返回 Pending,则必须本人实现唤醒机制,也就是须要将cx 克隆一份记下来,而后在适当的时侯调用 cx.wake()。因为网络相干的性能往往是分层的,因而手动的Poll 循环也会是层层重叠的,这时候,返回值 Poll::Ready(T) 就有学识了。泛型 T 可能包裹各种不同的数据,Option<T>Result<T,E>,或者两者的组合。因为最外层还有一个 Poll<T>,所有这时候的match 语句写起来会十分臃肿,粘贴复制写很多代码,实现的性能却十分无限,而且因为这些代码很类似,大大增加了出错的可能性。

规范库中仅仅定义了 Future,更多的相干性能须要援用futures-rs 类库,外面定义了一系列无关异步的操作,包含 StreamSinkAsyncReadAsyncWrite 等根底 Trait,以及对应实现了大量不便操作的组合子的Ext Trait,特地用处的fusedBoxTry 系列的扩大,诸如 join!select!pin_mut! 等一系列的宏。实践上,不应用这些扩大也能写出代码,只不过那样的代码很可能篇幅会长的可怕。值得一提的是,除了一些能够简化代码的过程宏之外,扩大 Trait 提供的组合子也会让代码精简不少。比方 Future::and_then 能够让代码写成链式调用的形式;Sink::send包装了 Sink 发送三步骤 poll_ready/start_send/poll_flush,应用 .await 一行代码间接就能够实现发送。因而,很多 poll 形式的代码实际上是精确地说是混合式的,其中也应用了不少 async 代码块。

总之,搞清楚 Future 相干的这些内容是须要破费不少工夫,更不用说用它们来写代码了。不过,即使是应用 async/await 这种更高级原语,也是有必要理解底层的工作原理和实现机制,所谓知其然知其所以然。

async/await

应用 async/await 能够将异步的代码写得相似同步的过程,更加符合人体工程学。因为 async 被翻译为一个 Future 状态机,原先在 poll 形式中须要解决的与 Pending 相干的状态当初都由 async 生成的状态机主动实现,因而大大加重了程序员的心智累赘。

如前所述,底层的 Futures 提供了很多不便的组合子扩大 Future,应用起来很简洁,能够极大地简化代码。例如,上文提到过的Sink::send 包装了发送缓冲区的实现和异步发送的三个步骤;AsyncRead::read_exact实现了读取指定字节数的性能,在解决网络协议解析时能够防止手写一个拼包状态机;AsyncWrite::write_all实现了发送全副数据以及发送缓冲,等等。正是在这些底层性能的反对下,async/await成为了更高级的书写异步代码的形式。兴许会有少许放心,这样所谓“高级”会不会在性能上有很大损失?笔者集体不这么认为。主动实现的状态机兴许未必比程序员手动实现的性能更差。状态机编程对于任何人,即使是一个有教训的程序员都是不小挑战。糟糕的状态机实现不仅可能有性能问题,更大的危险来自于实现上的破绽,以及保护上的艰难。代码写进去更多是给他人看的,实现同样的性能,简洁的代码更有可能是更高质量的代码。

以下例子是固定长度宰割的报文接管过程,应用 async/await 是很简略的。如果实现为一个Stream/poll_next,代码会简单很多。

    /// convenient method for reading a whole frame
    pub async fn recv_frame(&mut self) -> io::Result<Vec<u8>> {let mut len = [0; 4];
        let _ = self.inner.read_exact(&mut len).await?;  // inner socket, 反对 AsyncRead

        let n = u32::from_be_bytes(len) as usize;
        if n > self.max_frame_len {
            let msg = format!("data length {} exceeds allowed maximum {}",
                n, self.max_frame_len
            );
            return Err(io::Error::new(io::ErrorKind::PermissionDenied, msg));
        }

        let mut frame = vec![0; n];
        self.inner.read_exact(&mut frame).await?;

        Ok(frame)
    }

最初,齐全应用 async/await 写代码目前还有几个问题:

  • async trait

以后 Trait 不反对 async fn,无奈间接用Trait 来形象异步办法。临时解决办法是应用三方库 async-trait。如下:

use async_trait::async_trait;

#[async_trait]
trait Advertisement {async fn run(&self);
}

async_trait将代码转换为一个返回 Pin<Box<dyn Future + Send + 'async>> 的同步办法。因为装箱和动静派发的起因,性能上会有少许损失。

  • 异步析构

以后 drop 办法必须是同步调用,不能应用 await 语法。当一个 I/O 对象越过生命周期被析构,往往在敞开底层句柄之前,还须要实现某些 I/O 操作。比方,告诉网络对端连贯曾经敞开。在同步代码中,咱们只须要在 drop() 中置入这些操作,然而在异步代码中,无奈在 drop() 中做相似的事件。

解决办法,总是在异步 I/O 对象越过生命周期之前显式地执行敞开动作,或是,实现一个相似 GC 的性能,专门负责清理工作。

瞻望

笔者在学习 Rust 过程中,次要关注网络相干的并发编程。因为之前有在 Go 版本的 ipfs/libp2p 上的开发教训,故而学习钻研了 rust-libp2p 以及 nervos tentaclerust-libp2pParity实现的准官网版本,然而这个我的项目的代码及其难懂,过于强调应用泛型参数的形象,导致代码可读性十分差。求教了代码作者,他抵赖代码可能有些简单,但也强调都是有起因的 … nervos tentacle的实现在协定上不够残缺,特地是与规范 libp2p 并不兼容。两个我的项目共有的特点是次要用 poll 的形式写代码,逻辑上都是状态机的嵌套。

因而,笔者试图齐全应用 async/await 形式重构 libp2p,参考rust-libp2p 的实现,代码协程化,向下层提供纯正的异步接口,争取在 API 层面的体验靠近 go-libp2p,这是推广Rust 协程机制的一个尝试,同时也是集体的一个学习的过程。目前刚刚起步,仅实现了 secioyamux局部,待适合机会开源,冀望更多 Rust 爱好者独特来开发欠缺。

参考:
Asynchronous Destructors


深圳星链网科科技有限公司(Netwarps),专一于互联网安全存储畛域技术的研发与利用,是先进的平安存储基础设施提供商,次要产品有去中心化文件系统(DFS)、企业联盟链平台(EAC)、区块链操作系统(BOS)。
微信公众号:Netwarps

退出移动版