关于rust:如何优雅地组织Rust项目中的异步代码

40次阅读

共计 4156 个字符,预计需要花费 11 分钟才能阅读完成。

概要

很多应用过 Async Rust 的人都可能有过被其要求的束缚所困扰的经验,例如,spawned task'static 的要求,MutexGuard不能逾越.await,等等。克服这些束缚须要认真地设计代码构造,很可能会导致艰涩和嵌套的代码,这对开发人员和审查人员都是一种挑战。在这篇文章中,我将首先列出我在编写 async Rust 代码时的一些痛点。而后,我将指出咱们真正须要异步代码的场景,并探讨为什么咱们应该把异步和非异步代码离开。最初,我将展现我是如何在最近的一次 Curp 重构中实际这一准则的。

痛点:

Spawned Task必须是'static

在 spawn 一个新的 async task 的时候,编译器并不知道该 task 会被执行多久,可能很短暂,也可能会始终执行至程序运行完结。所以,编译器会要求该 task 所含的所有类型都领有 'static 的生命周期。

这样的限度使得咱们经常能在 spawn 前看到不少的 clone 代码。当然,这些代码从某种角度来讲能够帮忙程序员更好天文清哪些变量的所有权是要被移交给新的 task 的,但同时,也会使得代码看上去很啰嗦,不够简洁。

let a_arc = Arc::clone(&a);
let b_arc = Arc::clone(&b);
tokio::spawn(async move {// ...});

Send 的变量的持有不能够逾越 .await

这点限度背地的起因 tokio 的 task 并不是固定在一个线程上执行的,闲暇线程会被动“偷取”繁忙线程的 task,这就须要 task 能够被Send

请看上面一段代码:

let mut log_l = log.lock();
log_l.append(new_entry.clone());
broadcast(new_entry).await;

尝试编译后,会发现报错:log_l不能逾越 .await 点持有。

天然,为了使得拿着锁的 critical section 尽量地短,咱们不须要拿着锁过 .await 点,所以咱们在其中加一行放锁的代码:

let mut log_l = log.lock();
log_l.append(new_entry.clone());
drop(log_l);
broadcast(new_entry).await;

很惋惜,还是不能通过编译,这是因为编译器目前只能通过计算代码 Scope 的形式来判断一个 task 是否能够被Send。如果说上一个痛点还有肯定的益处,那么这个问题就纯正来源于编译器的限度了。所以咱们必须把代码改成这个样子:

{let mut log_w = log.write();
    log_w.append(new_entry.clone());
}
broadcast(new_entry).await;

如果一个函数中有须要拿多把锁,又有很多的异步调用,代码就会嵌套起来,变得复杂艰涩。
Side Note: 咱们晓得 tokio 本人有个异步的锁 tokio::sync::Mutex,它是能够被 hold 过.await 的。但要留神的是,大多数状况下,咱们并不会须要异步锁,因为异步锁通常意味着拿着锁的 critical section 是会十分长的。所以,如果咱们须要在异步代码中拿锁,不要不加思索地应用异步锁,事实上,在 tokio 官网文档中,也是更加倡议应用同步锁的。

应用异步 Rust 的场景和组织形式

如果咱们常常在我的项目开发中遇到上述问题,天然就会开始思考其产生的起因以及该怎么防止。我认为一个很重要的因素就是没有把 async 和非 async 的代码给离开,或者说,更实质的起因是咱们没有在设计我的项目架构的时候将须要 async 的局部和不须要 async 的局部离开。所以接下来,我将梳理咱们什么时候能力真正地用到 Async Rust?

I/O

当咱们进行比拟耗时的 I / O 操作,咱们不想让这些操作 block 住咱们以后的线程。所以咱们用异步 I /O,当运行到 await 的时候,I/ O 就能够到后盾去做,让其它的 task 执行。

// .await will enable other scheduled tasks to progress
let mut file = File::create(“foo.txt”).await?;

file.write(b"some bytes”).await?;

后台任务

后台任务的 task 通常会随同着一个 channel 的接收端呈现。

tokio::spawn(async move {while let Some(job) = rx.recv().await {// ...}
};

并发工作

并发地 spawn 多个 task 能够更高效地利用多核处理器。

let chunks = data.chunks(data.len() / N_TASKS);
for chunk in chunks {tokio::spawn(work_on(chunk));
}

依赖期待

应用 .await 期待依赖。这种应用绝对较少一些。

// wait for some event
event.listen().await;

// barrier
barrier.wait().await;

能够看到,应用 Async 代码的中央,次要集中在 I /O、并发与后台任务。在开发之前,咱们也无妨无意识地去拆散我的项目中的 async 与 sync 局部:放大 Async 局部的函数,将解决逻辑挪动至一般函数中。将这两局部拆散,不仅能够缓解文章结尾所说的痛点,更能够帮我理清代码构造。

{let mut log_w = log.write();
    log_w.append(new_entry.clone());
    // ...
}
broadcast(new_entry).await;

// move the logic to another function instead

fn update_log(log: &mut Log, new_entry: Entry) {log.append(new_entry);
    // ...
}

update_log(&mut log.write(), new_entry.clone());
broadcast(new_entry).await;

对于 Curp 的一次大型重构

在重构之前,因为一次次的迭代,代码的可读性和构造变得越来越差。具体来说,因为咱们有若干个带锁构造须要在 curp server 的各个局部中共享,而 curp server 的大部分函数又是 async 的,async 和拿锁的代码混淆在一起,就导致了咱们经常在开发过程中遇到上述痛点。

所以,咱们从新调整了 curp server 的构造,将其分为了 async 局部的 CurpNode 和非 async 局部的 RawCurpCurpNode 包含了异步 IO(接管,发送网络申请,数据长久化),后台任务(定时查看 leader 活性,leader 在每个节点上复制数据、校准各 follower);RawCurp可被视为一个状态机,它接管来自 CurpNode 的调用,并更新状态。如果 RawCurp 想要做一些异步操作(比方播送心跳),它就能够通过返回值让 CurpNode 去替它发申请。

举一个 tick task 的例子,在未 refactor 之前,因为咱们不能 LockGuard 不能过 .await 点,以及有多逻辑分支的限度,不得不将代码组织成这样的一个模式:


    loop {let _now = ticker.tick().await;
        let task = {let state_c = Arc::clone(&state);
            let state_r = state.upgradable_read();
            if state_r.is_leader() {
                if state_r.needs_hb
                {let resps = bcast_heartbeats(connects.clone(), state_r, rpc_timeout);
                    Either::Left(handle_heartbeat_responses(
                        resps,
                        state_c,
                        Arc::clone(&timeout),
                    ))
                } else {continue;}
            } else {let mut state_w = RwLockUgradableReadGuard::upgrade(state_r);
                // ...
                let resps = bcast_votes(connects.clone(), state_r, rpc_timeout);
                Either::Right(handle_vote_responses(resps, state_c))
            }
        };
        task.await;
    }

在 refactor 之后,解决逻辑都被放在了 RawCurp 中,CurpNode 中的代码就清晰多了:


loop {let _now = ticker.tick().await;
    let action = curp.tick();
    match action {TickAction::Heartbeat(hbs) => {Self::bcast_heartbeats(Arc::clone(&curp), &connects, hbs).await;
        }
        TickAction::Votes(votes) => {Self::bcast_votes(Arc::clone(&curp), &connects, votes).await;
        }
        TickAction::Nothing => {}}
}

咱们的我的项目:Xline

Xline 是一个用于元数据管理的分布式 KV 存储。以上为对 Xline 中应用的 Curp 共识协定的重构总结。

如果你想理解更多对于 Xline 的信息,请参考咱们的 Github:https://github.com/datenlord/Xline

达坦科技(DatenLord)专一下一代云计算——“天空计算”的基础设施技术,致力于拓宽云计算的边界。达坦科技打造的新一代开源跨云存储平台 DatenLord,通过软硬件深度交融的形式买通云云壁垒,实现无限度跨云存储、跨云联通,建设海量异地、异构数据的对立存储拜访机制,为云上利用提供高性能平安存储反对。以满足不同行业客户对海量数据跨云、跨数据中心高性能拜访的需要。

公众号:达坦科技 DatenLord
知乎账号:
https://www.zhihu.com/org/da-tan-ke-ji
B 站:
https://space.bilibili.com/2017027518

正文完
 0