关于rust:Actor并发系统说明与使用

60次阅读

共计 2106 个字符,预计需要花费 6 分钟才能阅读完成。

简介

Actor模型是一种并行计算模型,提供了一种用于构建并发、分布式系统的形象办法

Actor 模型中,计算被示意为独立的、轻量级的计算单元,称为Actor,能够发送和接管音讯并进行本地计算

作为一种通用的消息传递编程模型,被宽泛用于构建大规模可伸缩分布式系统

核心思想是独立保护隔离状态,并基于消息传递实现异步通信

Actor模型组成

  • 存储:每个 Actor 持有一个邮箱(mailbox),实质上是一个队列,用于存储音讯
  • 通信:每个 Actor 能够发送音讯至任何 Actor,应用异步消息传递,不保障音讯达到指标 Actor 时的程序
  • 计算:Actor 能够通过解决音讯来更新外部状态,对于内部而言,Actor 的状态是隔离的isolated state

劣势

  • 每个 Actor 独立运行,因而程序天然是并行的
  • 每个 Actor 都齐全独立于其余实例,不存在共享内存和竞争条件的问题,能够防止并发编程中的一些难点
  • 通过消息传递进行通信,每个 Actor 都有一个邮箱来接管音讯,每次只解决一个音讯,以确保状态的一致性和线程平安
  • 消息传递是异步的,发送方不须要期待接管方的响应,从而防止了锁和同步的开销
  • 提供高度的并发性和可扩展性,可能无效地解决多核 CPU 和分布式系统中的并发编程问题
  • 提供良好的容错性和可恢复性,因为每个 Actor 都有本人的状态和行为,能够更容易地实现零碎的容错和复原

Actor 基于线程的任务调度

为每一个 Actor 调配一个独立的执行过程(线程),独占该线程,能够是操作系统线程,协程或者虚拟机线程

如果以后 Actor 的邮箱为空,Actor 会阻塞以后线程,期待接管新的音讯

因为线程数量受到零碎的限度,因而 Actor 的数量也会受到限制

Actor基于事件驱动的任务调度

只有在事件触发(即接管音讯)时,才为 Actor 的任务分配线程并执行,当事件处理完毕,即退出线程

该形式能够应用很少的线程来执行大量 Actor 产生的工作,也是当初大部分 Actor 模型所采纳的调度形式

这种实现与 run loopevent loop 机制十分类似

rust中的 Actor 模型实现 -actix

Actix 建设在 Actor 模型的根底上,容许将应用程序编写为一组独立执行但合作的 Actor,这些Actor 通过音讯进行通信

Actor 是封装状态和行为的对象,并在 actix 库提供的Actor 零碎中运行

Actor 在特定的执行上下文 Context<A> 中运行,上下文对象仅在执行期间可用,每个 Actor 都有一个独自的执行上下文,执行上下文还管制 Actor 的生命周期。Actor 仅通过替换音讯进行通信

任何 Rust 类型都能够是 Actor,只须要实现 Actor Trait 即可

为了可能解决特定音讯,参与者必须为此音讯提供 Handler<M> 实现

根本应用

批改Cargo.toml

[dependencies]
actix = "0.11.0"
actix-rt = "2.2"

批改main.rs

use actix::prelude::*;

/// 定义音讯的类型,rtype 指定返回类型
#[derive(Message)]
#[rtype(result = "Result<bool, std::io::Error>")]
struct Ping;

// 定义 Actor
struct MyActor;

// 集成 Actor Trait
impl Actor for MyActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Context<Self>) {println!("Actor is alive");
    }

    fn stopped(&mut self, ctx: &mut Context<Self>) {println!("Actor is stopped");
    }
}

/// Define handler for `Ping` message
impl Handler<Ping> for MyActor {
    type Result = Result<bool, std::io::Error>;

    fn handle(&mut self, msg: Ping, ctx: &mut Context<Self>) -> Self::Result {println!("Ping received");

        Ok(true)
    }
}

#[actix_rt::main]
async fn main() {
    // Start MyActor in current thread
    let addr = MyActor.start();

    // Send Ping message.
    // send() message returns Future object, that resolves to message result
    let result = addr.send(Ping).await;

    match result {Ok(res) => println!("Got result: {}", res.unwrap()),
        Err(err) => println!("Got error: {}", err),
    }
}

运行后果

Actor is alive
Ping received
Got result: true

浏览参考

浅谈 Actor 模型

懒人专用高并发:Actor模型

Actix Actor官网文档

正文完
 0