关于程序员:rust-版本的-peerstore-落地实践

4次阅读

共计 2486 个字符,预计需要花费 7 分钟才能阅读完成。

模块地址:https://github.com/netwarps/l…

在 rust-libp2p 中,当协定想要获取 peer_id 所对应的地址时,须要实现 NetworkBehaviour 的 addresses_of_peer 办法。与之不同的是,go-libp2p 应用 peerstore 来存储了 peer_id 与 address 之间的关系,因而咱们能够参考它来实现一个 rust 版本的 peerstore。

实现构想

首先,因为咱们的启动外围是 swarm,那 peerstore 会作为其中的一个属性。其次,在 go-libp2p-peerstore 中,peerstore 次要寄存的数据为三块:地址信息的 AddrBook,公私钥信息的 KeyBook,协定信息的 ProtoBook;咱们能够将三者联合在一起组成一个新的 struct,取名为 PeerRecord,放在以 peer_id 为 key 的 Hashmap 中。

垃圾回收机制

GC 存在的意义就是避免 hashmap 适度收缩。因为网络状态时时刻刻都在发生变化,peer 之间的连贯也可能随之变动,而 peerstore 有一个重要的作用就是寄存 peer 的地址信息。如果不对曾经生效的 peer 信息进行清理,就会影响到 peerstore 的工作效率。

目前实现的成果是,在 swarm 的 start 办法中应用 task::spawn 启动一个工作,创立一个 mpsc 的 channel,应用 select 语法期待管道传来的音讯或者 task 期待 10 分钟的逻辑实现,针对某些地址,如果曾经超出 ttl 的限度,清理以后地址;同时,如果以后 peer_id 的地址汇合中不存在任何的地址信息,就将其从 Hashmap 中移除。

Pinned

尽管 GC 机制的存在,使 Hashmap 不会无限度扩容,良好地帮忙了零碎的运行。然而依然有些有余的中央,思考如下这种状况:

对于 KAD 协定来说,peer 须要不停地进行迭代查问已知节点,逐渐填充本人的 KBucket,这是一个耗时的过程。如果在 gc 时,将较早查问到的节点地址信息从 peerstore 中移除了,那么又须要从新启用迭代查问去获取地址,因而咱们在 PeerRecord 中增加了一个 bool 值 pinned。GC 时会判断这条记录的类别,如果 pinned 为 true,就会跳过清理的步骤。

序列化与长久化

对每一个 peer 来说,因为某些起因须要下线或停机时,寄存在 peerstore 里的节点信息是不应该被抛弃的。而对于须要长久化的数据,也须要进行序列化操作,便于寄存。

在 libp2p-rs 中,主循环的调用也是通过 task::spawn 启动的。当 swarm 接管到 close 的音讯时,将会退出事件处理的循环,并向运行 peerstore 的 gc 线程发送一个 close() 的事件,完结 gc 的过程。接下来调用 peerstore 的 save_data() 办法,将数据应用 serde 序列化成 json 格局,并应用 std::io 将序列化后的数据寄存到根目录的 txt 文件中。

办法剖析

以 GC 办法进行解析:

  1. swarm 主循环 spawn 运行 task,每十分钟触发一次 select。
  2. Hashmap 被 Arc<Mutex> 包裹,能够通过 lock() 获取,保障并发平安。
  3. 如果该 peer 的信息不是通过 kad 获取的,调用 retain 筛选未超出 ttl 时限的地址。
  4. 如果以后 peer 的地址数据曾经清空,从 hashmap 中移除这个 peer。
    // swarm/lib.rs
    // The GC task is to remove all expired addresses from the peer store
        task::spawn(async move {log::info!("starting Peerstore GC...");
            loop {let either = future::select(rx.next(), task::sleep(PEERSTORE_GC_PURGE_INTERVAL).boxed()).await;
                match either {Either::Left((_, _)) => break,
                    Either::Right((_, _)) => peer_store.remove_expired_addrs(),}
            }
            log::info!("quitting Peerstore GC...");
        });
        
    // core/peerstore.rs
    /// Removes all expired address.
    pub fn remove_expired_addrs(&self) {let mut to_remove = vec![];
        let mut guard = self.inner.lock().unwrap();
        for (peer, pr) in guard.iter_mut() {
            if !pr.pinned {log::debug!("GC attempt for {:?}", peer);
                pr.addrs.retain(|record| record.expiry.elapsed() < record.ttl);
                // delete this peer if no addr at all
                if pr.addrs.is_empty() {log::debug!("remove {:?} from peerstore", peer);
                    to_remove.push(peer.clone());
                }
            }
        }

        for peer in to_remove {guard.remove(&peer);
        }
    }

Netwarps 由国内资深的云计算和分布式技术开发团队组成,该团队在金融、电力、通信及互联网行业有十分丰盛的落地教训。Netwarps 目前在深圳、北京均设立了研发核心,团队规模 30+,其中大部分为具备十年以上开发教训的技术人员,别离来自互联网、金融、云计算、区块链以及科研机构等业余畛域。
Netwarps 专一于平安存储技术产品的研发与利用,次要产品有去中心化文件系统(DFS)、去中心化计算平台(DCP),致力于提供基于去中心化网络技术实现的分布式存储和分布式计算平台,具备高可用、低功耗和低网络的技术特点,实用于物联网、工业互联网等场景。
公众号:Netwarps

正文完
 0