概要

很多应用过Async Rust的人都可能有过被其要求的束缚所困扰的经验,例如,spawned task'static的要求,MutexGuard不能逾越.await,等等。克服这些束缚须要认真地设计代码构造,很可能会导致艰涩和嵌套的代码,这对开发人员和审查人员都是一种挑战。在这篇文章中,我将首先列出我在编写async Rust代码时的一些痛点。而后,我将指出咱们真正须要异步代码的场景,并探讨为什么咱们应该把异步和非异步代码离开。最初,我将展现我是如何在最近的一次Curp重构中实际这一准则的。

痛点:

Spawned Task必须是'static

在spawn一个新的async task的时候,编译器并不知道该task会被执行多久,可能很短暂,也可能会始终执行至程序运行完结。所以,编译器会要求该task所含的所有类型都领有'static的生命周期。

这样的限度使得咱们经常能在spawn前看到不少的clone代码。当然,这些代码从某种角度来讲能够帮忙程序员更好天文清哪些变量的所有权是要被移交给新的task的,但同时,也会使得代码看上去很啰嗦,不够简洁。

let a_arc = Arc::clone(&a);let b_arc = Arc::clone(&b);tokio::spawn(async move {    // ...});

Send的变量的持有不能够逾越.await

这点限度背地的起因tokio的task并不是固定在一个线程上执行的,闲暇线程会被动“偷取”繁忙线程的task,这就须要task能够被Send

请看上面一段代码:

let mut log_l = log.lock();log_l.append(new_entry.clone());broadcast(new_entry).await;

尝试编译后,会发现报错:log_l不能逾越.await点持有。

天然,为了使得拿着锁的critical section尽量地短,咱们不须要拿着锁过.await点,所以咱们在其中加一行放锁的代码:

let mut log_l = log.lock();log_l.append(new_entry.clone());drop(log_l);broadcast(new_entry).await;

很惋惜,还是不能通过编译,这是因为编译器目前只能通过计算代码Scope的形式来判断一个task是否能够被Send。如果说上一个痛点还有肯定的益处,那么这个问题就纯正来源于编译器的限度了。所以咱们必须把代码改成这个样子:

{    let mut log_w = log.write();    log_w.append(new_entry.clone());}broadcast(new_entry).await;

如果一个函数中有须要拿多把锁,又有很多的异步调用,代码就会嵌套起来,变得复杂艰涩。
Side Note: 咱们晓得tokio本人有个异步的锁tokio::sync::Mutex,它是能够被hold过.await的。但要留神的是,大多数状况下,咱们并不会须要异步锁,因为异步锁通常意味着拿着锁的critical section是会十分长的。所以,如果咱们须要在异步代码中拿锁,不要不加思索地应用异步锁,事实上,在tokio官网文档中,也是更加倡议应用同步锁的。

应用异步Rust的场景和组织形式

如果咱们常常在我的项目开发中遇到上述问题,天然就会开始思考其产生的起因以及该怎么防止。我认为一个很重要的因素就是没有把async和非async的代码给离开,或者说,更实质的起因是咱们没有在设计我的项目架构的时候将须要async的局部和不须要async的局部离开。所以接下来,我将梳理咱们什么时候能力真正地用到Async Rust?

I/O

当咱们进行比拟耗时的I/O操作,咱们不想让这些操作block住咱们以后的线程。所以咱们用异步I/O,当运行到await 的时候,I/O就能够到后盾去做,让其它的task执行。

// .await will enable other scheduled tasks to progresslet mut file = File::create(“foo.txt”).await?;file.write(b"some bytes”).await?;

后台任务

后台任务的task通常会随同着一个channel的接收端呈现。

tokio::spawn(async move {    while let Some(job) = rx.recv().await {        // ...    }};

并发工作

并发地spawn多个task能够更高效地利用多核处理器。

let chunks = data.chunks(data.len() / N_TASKS);for chunk in chunks {  tokio::spawn(work_on(chunk));}

依赖期待

应用.await期待依赖。这种应用绝对较少一些。

// wait for some eventevent.listen().await;// barrierbarrier.wait().await;

能够看到,应用Async代码的中央,次要集中在I/O、并发与后台任务。在开发之前,咱们也无妨无意识地去拆散我的项目中的async与sync局部:放大Async局部的函数,将解决逻辑挪动至一般函数中。将这两局部拆散,不仅能够缓解文章结尾所说的痛点,更能够帮我理清代码构造。

{    let mut log_w = log.write();    log_w.append(new_entry.clone());    // ...}broadcast(new_entry).await;// move the logic to another function insteadfn update_log(log: &mut Log, new_entry: Entry) {    log.append(new_entry);    // ...}update_log(&mut log.write(), new_entry.clone());broadcast(new_entry).await;

对于Curp的一次大型重构

在重构之前,因为一次次的迭代,代码的可读性和构造变得越来越差。具体来说,因为咱们有若干个带锁构造须要在curp server的各个局部中共享,而curp server的大部分函数又是async的,async和拿锁的代码混淆在一起,就导致了咱们经常在开发过程中遇到上述痛点。

所以,咱们从新调整了curp server的构造,将其分为了async局部的CurpNode和非async局部的RawCurpCurpNode包含了异步IO(接管,发送网络申请,数据长久化),后台任务(定时查看leader活性,leader在每个节点上复制数据、校准各follower);RawCurp可被视为一个状态机,它接管来自CurpNode的调用,并更新状态。如果RawCurp想要做一些异步操作(比方播送心跳),它就能够通过返回值让CurpNode去替它发申请。

举一个tick task的例子,在未refactor之前,因为咱们不能LockGuard不能过.await点,以及有多逻辑分支的限度,不得不将代码组织成这样的一个模式:

    loop {        let _now = ticker.tick().await;        let task = {            let state_c = Arc::clone(&state);            let state_r = state.upgradable_read();            if state_r.is_leader() {                if state_r.needs_hb                {                    let resps = bcast_heartbeats(connects.clone(), state_r, rpc_timeout);                    Either::Left(handle_heartbeat_responses(                        resps,                        state_c,                        Arc::clone(&timeout),                    ))                } else {                    continue;                }            } else {                let mut state_w = RwLockUgradableReadGuard::upgrade(state_r);                // ...                let resps = bcast_votes(connects.clone(), state_r, rpc_timeout);                Either::Right(handle_vote_responses(resps, state_c))            }        };        task.await;    }

在refactor之后,解决逻辑都被放在了RawCurp中,CurpNode中的代码就清晰多了:

loop {    let _now = ticker.tick().await;    let action = curp.tick();    match action {        TickAction::Heartbeat(hbs) => {            Self::bcast_heartbeats(Arc::clone(&curp), &connects, hbs).await;        }        TickAction::Votes(votes) => {            Self::bcast_votes(Arc::clone(&curp), &connects, votes).await;        }        TickAction::Nothing => {}    }}

咱们的我的项目:Xline

Xline是一个用于元数据管理的分布式KV存储。以上为对Xline中应用的Curp共识协定的重构总结。

如果你想理解更多对于Xline的信息,请参考咱们的Github:https://github.com/datenlord/Xline

达坦科技(DatenLord)专一下一代云计算——“天空计算”的基础设施技术,致力于拓宽云计算的边界。达坦科技打造的新一代开源跨云存储平台DatenLord,通过软硬件深度交融的形式买通云云壁垒,实现无限度跨云存储、跨云联通,建设海量异地、异构数据的对立存储拜访机制,为云上利用提供高性能平安存储反对。以满足不同行业客户对海量数据跨云、跨数据中心高性能拜访的需要。

公众号:达坦科技DatenLord
知乎账号:
https://www.zhihu.com/org/da-tan-ke-ji
B站:
https://space.bilibili.com/2017027518