乐趣区

关于rust:OpenRaft-在交易撮合引擎中的应用

前言

因为工作须要,始终对原子多播利用有十分浓重的趣味。通过一段时间的技术选型。咱们十分侥幸的失去了 Openraft 实操分享 Databend 社区的热心反对。我也想通过咱们的理论工作,对 Openraft 的将来利用尽一些微薄之力。

Openraft 是一个 Raft 的改进版(包含优化选举抵触, 解决网络抖动对 leadership 的影响), 它在 Databend 中为 db, table 等元数据提供分布式存储和强统一读写, 为 databend 的云端数据库集群提供事务性保障.

我的实际的上一篇文章反馈了咱们的选型过程,有趣味的人能够看一下。Raft in Rust (原子多播 + 撮合引擎)这篇文章更多的是想阐明咱们在应用 OpenRaft 的理论问题,并且通过咱们的实现,揭秘 OpenRaft 的一些机制。

代码仓库

大家在应用 OpenRaft 的时候,我置信很多人都查看了手册:
Getting Started – openraftThe openraft user guide.

当然,这是一个十分优良的手册。咱们从这个手册里,会学习到如何应用 OpenRaft 实现本人的利用。而且,openraft/example-raft-kv 这个例子的确可能很好的阐明如何实现一个简略的利用。然而,这个例子是应用的内存来做长久化实现。当然内存不会真正做长久化,所以很容易在节点退出后,失落状态。而咱们真正须要的示例是一个可能长久化的例子。

另外一个实例就是 databend/metasrv 而这个示例外面,咱们能够看到一个残缺的 metadata 存储服务的实现。实际上,metasrv 自身是一个相似于 etcd 的 kv 存储。存储整个 databend 集群的 metadata。这个示例外面,metasrv 应用了 sled 作为底层存储。sled 既存储 log,也存储 statemachine 的状态。这个例子,statemachine 所有的更新都间接在 sled 存储里通过 sled 的事物实现了。所以,对于如何存储 snapshot 这个问题,咱们并不太容易看清楚。所以 snapshot 的产生和传递次要是在节点间同步的时候应用。

这里,大家能够看到咱们凋谢的源代码。尽管这个示例是基于 example-raft-kv 示例,没有达到 metasrv 的生产强度。然而咱们还是十分全面的体现出了 openraft 对 log, snapshot 解决的行为和能力。

GitHub – raymondshe/matchengine-raft

利用场景

和 metasrv 的场景不同。咱们须要咱们的 statemachine 尽量在内存外面更新,尽量少落盘。尽管 sled 本地落盘的速度也很快,然而内存操作的速度会更快。所以,咱们基本上就是这样进行操作的。

 总体设计图

所以在这个图外面,大家能够看到日志是通过 sled 进行存储的。而这些日志因为通过 Raft 协定,实际上他们在每台机器上的程序是统一的。
所以,不同的 matchengine-raft 实例,在雷同的日志流状况下,对状态机的操作就是统一的。所以,不论咱们从哪一个日志开始写 snapshot,通过加载 snapshot 并且回放后续的日志,咱们都能够复原到最新状态。

依照设计图中显示,以后 StateMachine 的状态是解决了第 9 个日志里的音讯。
这时候,零碎保留了所有的音讯到 sled。并且在第 3 个音讯的时候落盘了一次 snapshot,并且在低 6 个音讯的时候落盘了一次 snapshot。
如果这台机器当机,咱们是能够从编号为 3 的 snapshot 复原状态机,并且持续解决 3,4,5,6,7,8,9 这 6 条音讯来复原以后状态。
当然,咱们也能够从编号为 6 的 snapshot 复原状态机,并且持续解决 7,8,9 这 3 条音讯来复原以后状态。

当然咱们能够抉择多少个音讯进行一次落盘。当然落盘的次数越多越牢靠,然而性能影响比拟大。好在 snapshot 的生成和落盘是异步的形式做的。

有趣味的敌人能够看一下 akka 的 EventSroucing 模式。这种模式和 Raft 单节点十分相像。不同的是 OpenRaft 强调多实例一致性,而 Akka 则提供了十分多的形式来存储 Log(Journal) 和 Snapshot.

实现细节

谈到实现细节。咱们还是回到官网文档 geting-started 来。咱们也依照这个文档的程序进行阐明。

Raft 对于从利用开发着的角度,咱们能够简化到上面的这张图里。Raft 的分布式共识就是要保障驱动状态机的指令可能在 Log 里被统一的复制到各个节点里。

Raft 有两个重要的组成部分:

  • 如何统一的在节点之间复制日志
  • 并且在状态机外面如何生产这些日志

基于 OpenRaft 实现咱们本人的 Raft 利用其实并不简单,只须要一下三局部:

  • 定义好客户端的申请和应答
  • 实现好存储 RaftStore 来长久化状态
  • 实现一个网络层,来保障 Raft 节点之间能互相传递音讯。

好,那咱们就开始吧:

1. 定义好客户端的申请和应答

申请有可能是客户端收回的驱动 Raft 状态机的数据,而应答则是 Raft 状态机会打给客户端的数据。

申请和应答须要实现 AppData 和 AppDataResponse 这里,咱们的实现如下:

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum ExampleRequest {Set { key: String, value: String},
    Place {order: Order},
    Cancel {order: Order}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ExampleResponse {pub value: Option<String>,}

这两个类型齐全是利用相干的,而且和 RaftStrage 的状态机实现相干。

  1. 这里,Set 是 example-raft-kv 原示例就有的命令。
  2. 大家也留神到了,命令都是对状态机有驱动的命令。也就是会对状态机状态造成扭转的命令。如果咱们须要取状态机外部数据的值返回给客户端。咱们大可不必定义到这里。

2. 实现 RaftStorage

这是整个我的项目十分要害的一部分。

只有实现好 trait RaftStorage,咱们就把数据存储和生产的行为定义好。RaftStoreage 能够包装一个像 RocksDB, Sled 的本地 KV 存储或者远端的 SQL DB。

RaftStorage 定义了上面一些 API

  • 读写 Raft 状态,比方说 term,vote (term:任期,vote:投票后果)
fn save_vote(vote:&Vote)
fn read_vote() -> Result<Option<Vote>>
  • 读写日志
fn get_log_state() -> Result<LogState> fn try_get_log_entries(range) -> Result<Vec<Entry>>
fn append_to_log(entries)
fn delete_conflict_logs_since(since:LogId)
fn purge_logs_upto(upto:LogId)
  • 将日志的内容利用到状态机
fn last_applied_state() -> Result<(Option<LogId>,Option<EffectiveMembership>)>
fn apply_to_state_machine(entries) -> Result<Vec<AppResponse>>
  • 创立和装置快照(snapshot)
fn build_snapshot() -> Result<Snapshot> fn get_current_snapshot() -> Result<Option<Snapshot>>
fn begin_receiving_snapshot() -> Result<Box<SnapshotData>>
fn install_snapshot(meta, snapshot)

在 ExampleStore,
这些内存化存储行为是十分明确简略的。而咱们不是要真正落盘了吗?那咱们就看一下 matchengine-rust 是怎么实现的。

这里是 matchengine-raft/src/store

接口 实现形式
Raft 状态 sled 存储,应用专门的 key 来读写 raft 状态。
日志 sled 存储,应用 log_index 来惟一标识一个 Log Entity
利用状态机 状态机外面一部分是业务数据,然而一部分是 raft 的数据。业务数据次要是订单薄。   
快照 快照齐全是通过文件进行存储的,而且文件的名字就保留了快照的全副 meta 信息。

咱们阐明一些设计要点

ExampleStore 的数据

ExchangeStore 外面次要是蕴含上面的成员变量。

#[derive(Debug)]
pub struct ExampleStore {

    last_purged_log_id: RwLock<Option<LogId<ExampleNodeId>>>,

    /// The Raft log.

    pub log: sled::Tree,
    
    /// The Raft state machine.
    pub state_machine: RwLock<ExampleStateMachine>,

    /// The current granted vote.
    vote: sled::Tree,

    snapshot_idx: Arc<Mutex<u64>>,

    current_snapshot: RwLock<Option<ExampleSnapshot>>,

    config : Config,

    pub node_id: ExampleNodeId,

}

帮忙咱们落盘的成员次要是 log, vote。而须要产生 snapshot 进行落盘的所有内容都在 state_machine.

  1. last_purged_log_id:这是最初删除的日志 ID。删除日志自身能够节约存储,然而,对咱们来讲,我了保证数据存储的平安。在删除日志之前,咱们必须有这条日志 index 大的 snapshot 产生。否则,咱们就没有方法通过 snapshot 来复原数据。
  2. log: 这是一个 sled::Tree,也就是一个 map。如果看着局部代码的话,咱们就能够分明的明确 log 对象的构造。key 是一个 log_id_index 的 Big Endian 的字节片段。value 是通过 serd_json 进行序列化的内容。
    #[tracing::instrument(level = "trace", skip(self, entries))]
    async fn append_to_log(
        &mut self,
        entries: &[&Entry<ExampleTypeConfig>],
    ) -> Result<(), StorageError<ExampleNodeId>> {

        let log = &self.log;
        for entry in entries {log.insert(entry.log_id.index.to_be_bytes(), IVec::from(serde_json::to_vec(&*entry).unwrap())).unwrap();}
        Ok(())
    }
  1. state_machine: 这里就是通过日志驱动的所有状态的汇合。

    #[derive(Serialize, Deserialize, Debug, Default, Clone)]pub struct ExampleStateMachine {
    
        pub last_applied_log: Option<LogId<ExampleNodeId>>,
    
        // TODO: it should not be Option.
        pub last_membership: EffectiveMembership<ExampleNodeId>,
    
        /// Application data.
        pub data: BTreeMap<String, String>,
    
        // ** Orderbook
        pub orderbook: OrderBook,
    
    }

StateMachine 外面最重要的数据就是 orderbook 这部分就是撮合引擎外面重要的订单表。寄存买方和卖方的未成交订单信息。这是次要的业务逻辑。data 这部分是原来例子中的 kv 存储。咱们还在这里没有删除。

这里 last_applied_log, last_menbership 这些状态和业务逻辑没有太大关系。所以,如果您要实现本人的 StateMachine。
还是尽量和例子保持一致。次要是因为这两个状态是通过 apply_to_state_machine() 这个接口更新。也正好须要长久化。如果须要进一步暗藏 Raft 的细节,咱们还是倡议 openraft 能将这两个状态进一步进行暗藏封装。

state_machine 的落盘操作次要集中在这里:store/store.rs。有趣味的能够看一下。这外面比拟有意思的问题是 orderbook 自身无奈被默认的 serde_json 序列化 / 反序列化。所以咱们才在 matchengine/mod.rs 加了这段代码:

pub mod vectorize {use serde::{Deserialize, Deserializer, Serialize, Serializer};
    use std::iter::FromIterator;

    pub fn serialize<'a, T, K, V, S>(target: T, ser: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
        T: IntoIterator<Item = (&'a K, &'a V)>,
        K: Serialize + 'a,
        V: Serialize + 'a,
    {let container: Vec<_> = target.into_iter().collect();
        serde::Serialize::serialize(&container, ser)
    }

    pub fn deserialize<'de, T, K, V, D>(des: D) -> Result<T, D::Error>
    where
        D: Deserializer<'de>,
        T: FromIterator<(K, V)>,
        K: Deserialize<'de>,
        V: Deserialize<'de>,
    {let container: Vec<_> = serde::Deserialize::deserialize(des)?;
        Ok(T::from_iter(container.into_iter()))
    }
}

/// main OrderBook structure
#[derive(Clone, Default, Serialize, Deserialize, Debug)]
pub struct OrderBook {#[serde(with = "vectorize")]
    pub bids: BTreeMap<BidKey, Order>,
    #[serde(with = "vectorize")]
    pub asks: BTreeMap<AskKey, Order>,
    pub sequance: u64,
}
  1. vote: 就是对最初一次 vote 的存储。具体请看, 这段代码倒不是因为这段代码有多重要,只是因为代码比较简单,看能够少些一些阐明:
#[tracing::instrument(level = "trace", skip(self))]
    async fn save_vote(&mut self, vote: &Vote<ExampleNodeId>) -> Result<(), StorageError<ExampleNodeId>> {self.vote.insert(b"vote", IVec::from(serde_json::to_vec(vote).unwrap())).unwrap();
        Ok(())
    }

    async fn read_vote(&mut self) -> Result<Option<Vote<ExampleNodeId>>, StorageError<ExampleNodeId>> {let value = self.vote.get(b"vote").unwrap();
        match value {None => {Ok(None)},
            Some(val) =>  {Ok(Some(serde_json::from_slice::<Vote<ExampleNodeId>>(&*val).unwrap()))}
        }
    }

然而这儿的确有个小坑,之前我没有留神到 vote 须要长久化,开始调试的时候产生了很多问题。晓得找到 Openraft 作者 Zhang Yanpo 才解决。
也是登程我想开源这个 openraft 文件长久化实现的诱因吧。感激 Zhang Yanpo, 好样的。

  1. 其余的成员变量其实没什么太好说的了。和原例子一样。

对日志和快照的管制

日志,快照相互配合,咱们能够很好的长久化状态,并且复原最新状态。多久写一次快照,保留多少日志。在这里咱们应用了上面的代码。

let mut config = Config::default().validate().unwrap();
    config.snapshot_policy = SnapshotPolicy::LogsSinceLast(500);
    config.max_applied_log_to_keep = 20000;
    config.install_snapshot_timeout = 400;

强烈建议大家看一下 Config in openraft::config – Rust

重点看 snapshot_policy, 代码里能够分明的标识,咱们须要 500 次 log 写一次快照。也就是 openraft 会调用 build_snapshot() 函数创立 snapshot。原示例里,snapshot 只是在内存里保留在 current_snapshot 变量里。而咱们须要实在的落盘。请留神这段代码的 self.write_snapshot()

#[async_trait]
impl RaftSnapshotBuilder<ExampleTypeConfig, Cursor<Vec<u8>>> for Arc<ExampleStore> {#[tracing::instrument(level = "trace", skip(self))]
    async fn build_snapshot(&mut self,) -> Result<Snapshot<ExampleTypeConfig, Cursor<Vec<u8>>>, StorageError<ExampleNodeId>> {let (data, last_applied_log);

        {
            // Serialize the data of the state machine.
            let state_machine = self.state_machine.read().await;
            data = serde_json::to_vec(&*state_machine)
                .map_err(|e| StorageIOError::new(ErrorSubject::StateMachine, ErrorVerb::Read, AnyError::new(&e)))?;

            last_applied_log = state_machine.last_applied_log;
        }

        let last_applied_log = match last_applied_log {
            None => {panic!("can not compact empty state machine");
            }
            Some(x) => x,
        };

        let snapshot_idx = {let mut l = self.snapshot_idx.lock().unwrap();
            *l += 1;
            *l
        };

        let snapshot_id = format!("{}-{}-{}",
            last_applied_log.leader_id, last_applied_log.index, snapshot_idx
        );

        let meta = SnapshotMeta {
            last_log_id: last_applied_log,
            snapshot_id,
        };

        let snapshot = ExampleSnapshot {meta: meta.clone(),
            data: data.clone(),};

        {let mut current_snapshot = self.current_snapshot.write().await;
            *current_snapshot = Some(snapshot);
        }

        self.write_snapshot().await.unwrap();

        Ok(Snapshot {
            meta,
            snapshot: Box::new(Cursor::new(data)),
        })
    }
}

这下咱们有了 snapshot,当然 snapshot 一方面能够用来在节点之间同步状态。
另一方面就是在启动的时候复原状态。而 openraft 的实现十分好。实际上复原状态只须要回复到最新的 snapshot 就行。只有本地日志齐备,openraft 会帮忙你调用 apply_to_statemachine() 来复原到最新状态。所以咱们就有了 restore() 函数。

#[async_trait]
impl Restore for Arc<ExampleStore> {#[tracing::instrument(level = "trace", skip(self))]
    async fn restore(&mut self) {tracing::debug!("restore");
        let log = &self.log;

        let first = log.iter().rev()
        .next()
        .map(|res| res.unwrap()).map(|(_, val)|
            serde_json::from_slice::<Entry<ExampleTypeConfig>>(&*val).unwrap().log_id);

        match first {Some(x) => {tracing::debug!("restore: first log id = {:?}", x);
                let mut ld = self.last_purged_log_id.write().await;
                *ld = Some(x);
            },
            None => {}}

        let snapshot = self.get_current_snapshot().await.unwrap();

        match snapshot {Some (ss) => {self.install_snapshot(&ss.meta, ss.snapshot).await.unwrap();},
            None => {}}
    }
}

大家留神一下 snapshot 的操作。当然,在这里,咱们也复原了 last_purged_log_id。

当然 store 这个函数会在 ExampleStore 刚刚构建的时候调用。

   // Create a instance of where the Raft data will be stored.
   let es = ExampleStore::open_create(node_id);

   //es.load_latest_snapshot().await.unwrap();

   let mut store = Arc::new(es);

   store.restore().await;

如何确定 RaftStorage 是对的

请查阅 Test suite for RaftStorage, 如果通过这个测试,一般来讲,OpenRaft 就能够应用他了。

#[test]
pub fn test_mem_store() -> anyhow::Result<()> {openraft::testing::Suite::test_all(MemStore::new) }

RaftStorage 的竞争状态

在咱们的设计里,在一个时刻,最多有一个线程会写状态,然而,会有多个线程来进行读取。比方说,可能有多个复制工作在同时度日志和存储。

实现必须保证数据持久性

调用者会假如所有的写操作都被长久化了。而且 Raft 的纠错机制也是依赖于牢靠的存储。


3. 实现 RaftNetwork

为了节点之间对日志可能有共识,咱们须要可能让节点之间进行通信。trait RaftNetwork就定义了数据传输的需要。RaftNetwork的实现能够是思考调用远端的 Raft 节点的服务

pub trait RaftNetwork<D>: Send + Sync + 'static where D: AppData {async fn send_append_entries(&self, target: NodeId, node:Option<Node>, rpc: AppendEntriesRequest<D>) -> Result<AppendEntriesResponse>;
    async fn send_install_snapshot(&self, target: NodeId, node:Option<Node>, rpc: InstallSnapshotRequest,) -> Result<InstallSnapshotResponse>;
    async fn send_vote(&self, target: NodeId, node:Option<Node>, rpc: VoteRequest) -> Result<VoteResponse>;
}

ExampleNetwork 显示了如何调用传输音讯。每一个 Raft 节点都应该提供有这样一个 RPC 服务。当节点收到 raft rpc,服务会吧申请传递给 raft 实例,并且通过 raft-server-endpoint 返回应答。

在理论状况下可能应用 Tonic gRPC 是一个更好的抉择。databend-meta 里有一个十分好的参考实现。

在咱们的 matchengen-raft 实现里,咱们解决了原示例中大量重连的问题。

  1. 保护一个可服用量的 client

这段代码在:network/raft_network_impl.rs

     let clients = Arc::get_mut(&mut self.clients).unwrap();
     let client = clients.entry(url.clone()).or_insert(reqwest::Client::new());
  1. 在服务器端引入keep_alive

这段代码在:lib.rs

// Start the actix-web server.
let server = HttpServer::new(move || {App::new()
        .wrap(Logger::default())
        .wrap(Logger::new("%a %{User-Agent}i"))
        .wrap(middleware::Compress::default())
        .app_data(app.clone())
        // raft internal RPC
        .service(raft::append)
        .service(raft::snapshot)
        .service(raft::vote)
        // admin API
        .service(management::init)
        .service(management::add_learner)
        .service(management::change_membership)
        .service(management::metrics)
        // application API
        .service(api::write)
        .service(api::read)
        .service(api::consistent_read)
}).keep_alive(Duration::from_secs(5));

这样的改变的确是对性能有一些晋升。然而真的须要更快的话,咱们应用 grpc,甚至应用 reliable multicast,比方说 pgm。


4. 启动集群

因为咱们保留了之前的 key/value 实现。所以之前的脚本应该还是可能工作的。而且之前的 key/value 有了真正的存储。

为了可能运行集群:

  • 启动三个没有初始化的 raft 节点;
  • 初始化其中一台 raft 节点;
  • 把其余的 raft 节点退出到这个集群里;
  • 更新 raft 成员配置。
    example-raft-kv 的 readme 文档外面把这些步骤都介绍的比较清楚了。
  • 上面两个测试脚本是十分有用的:
    test-cluster.sh 这个脚本能够简练的拆穿如何用 curl 和 raft 集群进行交互。在脚本里,您能够看到所有 http 申请和应答。

test\_cluster.rs 这个 rust 程序显示了怎么应用 ExampleClient 操作集群,发送申请和承受应答。

这里咱们要强调的是,在初始化 raft 集群的时候。咱们须要上述的过程。如果集群曾经被初始化,并且咱们曾经长久化了相应的状态(menbership, vote, log) 当前,再某些节点退出并且重新加入,咱们就不须要再过多干涉了。

在应用 metasrv 启动 meta service 的时候,我也遇到了雷同的状况。所以还是要先启动一个 single node 以保障这个节点作为种子节点被正当初始化了。

Deploy a Databend Meta Service Cluster | Databend

为了更好的启动治理集群,咱们在我的项目里增加了 test.sh。用法如下:

./test.sh <command> <node_id>

咱们能够在不同阶段调用不同的命令。大家有趣味的话能够看一下代码。这部分是主程序局部,蕴含了咱们实现的所有命令。

echo "Run command $1"
case $1 in
"place-order")
    place_order $2
    ;;
"metrics")
    get_metrics $2
    ;;

"kill-node")
    kill_node $2
    ;;
"kill")
    kill
    ;;
"start-node")
    start_node $2
    ;;
"get-seq")
    rpc 2100$2/read  '"orderbook_sequance"'
    ;;
"build-cluster")
    build_cluster $2
    ;;
"change-membership")
    change_membership $2
    ;;

"clean")
    clean
    ;;
  *)
    "Nothing is done!"
    ;;
esac

将来的工作

以后咱们实现的 matchengine-raft 只是为了示例怎么通过 raft 利用到撮合引擎这样一个对性能,稳定性,高可用要求都十分刻薄的利用场景。通过 raft 集群来实现撮合引擎的分布式治理。咱们置信真正把这个玩具撮合引擎推向产品环境,咱们还是须要进行很多工作:

  1. 优化序列化计划,serd_json 诚然好,然而通过字符串进行编解码还是差点儿意思。至多用到 bson 或者更好的用 protobuf, avro 等,进步编解码速度,传输和存储的开销。
  2. 优化 RaftNetwork, 在能够应用 multi-cast 的状况下应用 pgm,如果不行,能够应用 grpc。
  3. 撮合后果的散发。这部分在很多状况下依赖音讯队列中间件比拟好。
  4. 减少更多的撮合算法。这部分齐全是业务需要,和 openraft 无关。咱们就不在这个文章里探讨了。
  5. 欠缺测试和客户端的调用。
  6. 欠缺压测程序,筹备进一步调优。

论断

通过这个简略的小我的项目,咱们:

  1. 实现了一个简略的玩具撮合引擎。
  2. 验证了 OpenRaft 在性能上对撮合引擎场景的反对能力。
  3. 给 OpenRaft 提供了一个基于 sled KV 存储的日志存储的参考实现。
  4. 给 OpenRaft 提供了一个基于本地文件的快照存储的参考实现。

给大家走漏一个小机密,SAP 也在应用 OpenRaft 来构建要害利用。大家想想,都用到 Raft 协定了,肯定是十分重要的利用。

对于 databend 社区的帮忙,我示意由衷的感激。
作为一个长期工作在软件行业一线的老程序猿,看到中国开源软件开始在根底构建发力,由衷的感到快慰。也心愿中国开源社群越来越好,越来越弱小,走向软件行业的顶端。

作者信息:沈勇 Decisive Density  CTO

对于 Databend

Databend 是一款开源、弹性、低成本,基于对象存储也能够做实时剖析的旧式数仓。期待您的关注,一起摸索云原生数仓解决方案,打造新一代开源 Data Cloud。

  • Databend 文档:https://databend.rs/
  • Twitter:https://twitter.com/Datafuse_…
  • Slack:https://datafusecloud.slack.com/
  • Wechat:Databend
  • GitHub:https://github.com/datafusela…

文章首发于公众号:Databend

退出移动版