关于rust:从-generator-的角度看-Rust-异步代码

45次阅读

共计 6580 个字符,预计需要花费 17 分钟才能阅读完成。

文|Ruihang Xia

目前参加边缘时序数据存储引擎我的项目

本文 6992 字 浏览 18 分钟

前 言

作为 2018 edition 一个比拟重要的个性 Rust 的异步编程当初曾经失去了宽泛的应用。应用的时候难免会好奇它是如何运作的,这篇文章尝试从 generator 以及变量捕捉的方面进行摸索,而后介绍了在嵌入式时序存储引擎 ceresdb-helix 的研发过程中遇到的一个场景。

囿于作者程度内容不免存在一些错漏之处,还烦请留言告知。

PART. 1 async/.await, coroutine and generator

async/.await 语法在 1.39 版本 [1] 进入 stable channel,它可能很不便地编写异步代码:

、、、java
async fn asynchronous() {

// snipped

}

async fn foo() {

let x: usize = 233;
asynchronous().await;
println!("{}", x);

、、、

在下面的示例中,局部变量 x 可能间接在一次异步过程(fn asynchoronous)之后应用,和写同步代码一样。而在这之前,异步代码个别是通过相似 futures 0.1[2] 模式的组合子来应用,想要给接下来 (如 and_then()) 的异步过程的应用的局部变量须要被显式手动地以闭包出入参的形式链式解决,体验不是特地好。

async/.await 所做的实际上就是将代码变换一下,变成 generator/coroutine[3] 的模式去执行。一个 coroutine 过程能够被挂起,去做一些别的事件而后再持续复原执行,目前用起来就是 .await 的样子。以下面的代码为例,在异步过程 foo()中调用了另一个异步过程 asynchronous(),在第七行的 .await 时以后过程的执行被挂起,等到能够继续执行的时候再被复原。

而复原执行可能须要之前的一些信息,如在 foo()中咱们在第八行用到了之前的信息 x。也就是说 async 过程要有能力保留一些外部部分状态,使得它们可能在 .await 之后被持续应用。换句话说要在 generator state 外面保留可能在 yield 之后被应用的局部变量。这里须要引入 pin[4] 机制解决可能呈现的自援用问题,这部分不再赘述。

PART. 2 visualize generator via MIR

咱们能够透过 MIR[5]来看一下后面提到的 generator 是什么样子的。MIR 是 Rust 的一个两头示意,基于控制流图 CFG[6]示意。CFG 可能比拟直观地展现程序执行起来大略是什么样子,MIR 在有时不分明你的 Rust 代码到底变成了什么样子的时候可能起到一些帮忙。

想要失去代码的 MIR 示意有几种办法,如果当初手边有一个可用的 Rust toolchain,能够像这样传递一个环境变量给 rustc,再应用 cargo 进行构建来产生 MIR:

RUSTFLAGS="--emit mir" cargo build

构建胜利的话会在 target/debug/deps/ 目录下生成一个 .mir 的文件。或者也能通过 https://play.rust-lang.org/ 来获取 MIR,在 Run 旁边的溢出菜单上抉择 MIR 就能够。

由 2021-08 nightly 的 toolchain 所产生的 MIR 大略是这个样子的,有许多不意识的货色能够不必管,大略晓得一下。

  • _0, _1 这些是变量
  • 有许多语法和 Rust 差不多,如类型注解,函数定义及调用和正文等就行了。
fn future_1() -> impl Future {
    let mut _0: impl std::future::Future; // return place in scope 0 at src/anchored.rs:27:21: 27:21
    let mut _1: [static generator@src/anchored.rs:27:21: 27:23]; // in scope 0 at src/anchored.rs:27:21: 27:23

    bb0: {discriminant(_1) = 0; // scope 0 at src/anchored.rs:27:21: 27:23
        _0 = from_generator::<[static generator@src/anchored.rs:27:21: 27:23]>(move _1) -> bb1; // scope 0 at src/anchored.rs:27:21: 27:23
                                         // mir::Constant
                                         // + span: src/anchored.rs:27:21: 27:23
                                         // + literal: Const {ty: fn([static generator@src/anchored.rs:27:21: 27:23]) -> impl std::future::Future {std::future::from_generator::<[static generator@src/anchored.rs:27:21: 27:23]>}, val: Value(Scalar(<ZST>)) }
    }

    bb1: {return; // scope 0 at src/anchored.rs:27:23: 27:23}
}

fn future_1::{closure#0}(_1: Pin<&mut [static generator@src/anchored.rs:27:21: 27:23]>, _2: ResumeTy) -> GeneratorState<(), ()> {
    debug _task_context => _4; // in scope 0 at src/anchored.rs:27:21: 27:23
    let mut _0: std::ops::GeneratorState<(), ()>; // return place in scope 0 at src/anchored.rs:27:21: 27:23
    let mut _3: (); // in scope 0 at src/anchored.rs:27:21: 27:23
    let mut _4: std::future::ResumeTy; // in scope 0 at src/anchored.rs:27:21: 27:23
    let mut _5: u32; // in scope 0 at src/anchored.rs:27:21: 27:23

    bb0: {_5 = discriminant((*(_1.0: &mut [static generator@src/anchored.rs:27:21: 27:23]))); // scope 0 at src/anchored.rs:27:21: 27:23
        switchInt(move _5) -> [0_u32: bb1, 1_u32: bb2, otherwise: bb3]; // scope 0 at src/anchored.rs:27:21: 27:23
    }

    bb1: {
        _4 = move _2; // scope 0 at src/anchored.rs:27:21: 27:23
        _3 = const (); // scope 0 at src/anchored.rs:27:21: 27:23
        ((_0 as Complete).0: ()) = move _3; // scope 0 at src/anchored.rs:27:23: 27:23
        discriminant(_0) = 1; // scope 0 at src/anchored.rs:27:23: 27:23
        discriminant((*(_1.0: &mut [static generator@src/anchored.rs:27:21: 27:23]))) = 1; // scope 0 at src/anchored.rs:27:23: 27:23
        return; // scope 0 at src/anchored.rs:27:23: 27:23
    }

    bb2: {assert(const false, "`async fn` resumed after completion") -> bb2; // scope 0 at src/anchored.rs:27:21: 27:23
    }

    bb3: {unreachable; // scope 0 at src/anchored.rs:27:21: 27:23}
}

这个 demo crate 中还有一些别的代码,不过对应下面的 MIR 的源码比较简单:

async fn future_1() {}

只是一个简略的空的异步函数,能够看到生成的 MIR 会收缩很多,如果内容略微多一点的话通过文本模式不太好看。咱们能够指定一下生成的 MIR 的格局,而后将它可视化。

步骤大略如下:

RUSTFLAGS="--emit mir -Z dump-mir=F -Z dump-mir-dataflow -Z unpretty=mir-cfg" cargo build > mir.dot
dot -T svg -o mir.svg mir.dot

可能在当前目录下找到 mir.svg,关上之后能够看到一个像流程图的货色(另一幅差不多的图省略掉了,有趣味的能够尝试通过下面的办法本人生成一份)。

这里将 MIR 依照根本单位 basic block (bb) 组织,本来的信息都在,并且将各个 basic block 之间的跳转关系画了进去。从下面的图中咱们能够看到四个 basic blocks,其中一个是终点,另外三个是起点。首先终点的 bb0 switch(match in rust)了一个变量 _5,依照不同的值分支到不同的 blocks。能大略设想一下这样的代码:

match _5 {0: jump(bb1),
    1: jump(bb2),
    _ => unreachable()}

而 generator 的 state 能够当成就是那个 _5,不同的值就是这个 generator 的各个状态。future_1 的状态写进去大略是这样

enum Future1State {
    Start,
    Finished,
}

如果是 §1 中的 async fn foo(),可能还会多一个枚举值来示意那一次 yield。此时再想之前的问题,就可能很天然地想到要逾越 generator 不同阶段的变量须要如何保留了。

enum FooState {
    Start,
    Yield(usize),
    Finished,
}

PART. 3 generator captured

让咱们把保留在 generator state 中,可能逾越 .await/yield 被后续阶段应用的变量称为被捕捉的变量。那么能不能晓得到底哪些变量实际上被捕捉了呢?让咱们试一试,首先写一个略微简单一点的异步函数:

async fn complex() {
    let x = 0;
    future_1().await;
    let y = 1;
    future_1().await;
    println!("{}, {}", x, y);
}

生成的 MIR 及 svg 比较复杂,截取了一段放在了附录中,能够尝试本人生成一份残缺的内容。

略微浏览一下生成的内容,咱们能够看到一个很长的类型总是呈现,像是这样子的货色:

[static generator@src/anchored.rs:27:20: 33:2]
// or
(((*(_1.0: &mut [static generator@src/anchored.rs:27:20: 33:2])) as variant#3).0: i32)

对照咱们代码的地位能够发现这个类型中所带的两个文件地位就是咱们异步函数 complex()的首尾两个大括号,这个类型是一个跟咱们这整个异步函数相干的类型。

通过更进一步的摸索咱们大略能猜一下,下面代码片段中第一行的是一个实现了 Generator trait[7] 的匿名类型(struct),而 “as variant#3” 是 MIR 中的一个操作,Projection 的 Projection::Downcast,大略在这里 [8] 生成。在这个 downcast 之后所做的 projection 的到的类型是咱们意识的 i32。综合其余相似的片段咱们可能揣测这个匿名类型和后面形容的 generator state 是差不多的货色,而各个 variant 是不同的状态元组,投影这个 N 元组可能拿到被捕捉的局部变量。

PART. 4 anchored

晓得哪些变量会被捕捉可能帮忙咱们了解本人的代码,也可能基于这些信息进行一些利用。

先提一下 Rust 类型零碎中非凡的一种货色 auto trait[9]。最常见的就是 Send 和 Sync,这种 auto trait 会主动为所有的类型实现,除非显式地用 negative impl opt-out,并且 negative impl 会传递,如蕴含了 !Send 的 Rc 构造也是 !Send 的。通过 auto trait 和 negative impl 咱们管制一些构造的类型,并让编译器帮忙查看。

比方 anchored[10] crate 就是提供了通过 auto trait 和 generator 捕捉机制所实现的一个小工具,它可能阻止异步函数中指定的变量穿过 .await 点。比拟有用的一个场景就是异步过程中对于变量外部可变性的获取。

通常来说,咱们会通过异步锁如 tokio::sync::Mutex 来提供变量的外部可变性;如果这个变量不会穿过 .await point 即被 generator state 捕捉,那么 std::sync::Mutex 这种同步锁或者 RefCell 也能应用;如果想要更高的性能,防止这两者运行时的开销,那也可能思考 UnsafeCell 或其余 unsafe 伎俩,然而就有一点危险了。而通过 anchored 咱们能够在这种场景下管制不平安因素,实现一个平安的办法来提供外部可变性,只有将变量通过 anchored::Anchored 这个 ZST 进行标记,再给整个 async fn 带上一个 attribute 就可能让编译器帮咱们确认没有货色谬误地被捕捉并穿梭了 .await、而后导致灾难性的数据竞争。

就像这样:

#[unanchored]
async fn foo(){
    {let bar = Anchored::new(Bar {});
    }
    async_fn().await;}

而这种就会导致编译谬误:

#[unanchored]
async fn foo(){let bar = Anchored::new(Bar {});
    async_fn().await;
    drop(bar);
}

对于 std 的 Mutex, Ref 和 RefMut 等常见类型,clippy 提供了两个 lints[11],它们也是通过剖析 generator 的类型来实现的。并且与 anchored 一样都有一个毛病,在除了像下面那样明确应用独自的 block 搁置变量外,都会呈现 false positive 的状况[12]。因为局部变量在其余的模式下都会被记录下来[13],导致信息被净化。

anchored 目前还短少一些 ergonomic 的接口,attribute macro 和 ecosystem 的其余工具交互的时候也存在一点问题,欢送感兴趣的小伙伴来理解一下 https://github.com/waynexia/a…

文档:https://docs.rs/anchored/0.1….

「参 考」

[1]https://blog.rust-lang.org/20…

[2]https://docs.rs/futures/0.1.2…

[3]https://github.com/rust-lang/…

[4]https://doc.rust-lang.org/std…

[5]https://blog.rust-lang.org/20…

[6]https://en.wikipedia.org/wiki…

[7]https://doc.rust-lang.org/std…

[8]https://github.com/rust-lang/…

[9]https://doc.rust-lang.org/bet…

[10]https://crates.io/crates/anch…

[11]https://rust-lang.github.io/r…

[12]https://github.com/rust-lang/…

[13]https://doc.rust-lang.org/sta…

本周举荐浏览

Prometheus on CeresDB 演进之路

深刻 HTTP/3(一)|从 QUIC 链接的建设与敞开看协定的演进

降本提效!注册核心在蚂蚁团体的变质之路

蚂蚁大规模 Sigma 集群 Etcd 拆分实际

正文完
 0