模块地址: https://github.com/netwarps/l...

在rust-libp2p中,当协定想要获取peer_id所对应的地址时,须要实现NetworkBehaviour的addresses_of_peer办法。与之不同的是,go-libp2p应用peerstore来存储了peer_id与address之间的关系,因而咱们能够参考它来实现一个rust版本的peerstore。

实现构想

首先,因为咱们的启动外围是swarm,那peerstore会作为其中的一个属性。其次,在go-libp2p-peerstore中,peerstore次要寄存的数据为三块:地址信息的AddrBook,公私钥信息的KeyBook,协定信息的ProtoBook;咱们能够将三者联合在一起组成一个新的struct,取名为PeerRecord,放在以peer_id为key的Hashmap中。

垃圾回收机制

GC存在的意义就是避免hashmap适度收缩。因为网络状态时时刻刻都在发生变化,peer之间的连贯也可能随之变动,而peerstore有一个重要的作用就是寄存peer的地址信息。如果不对曾经生效的peer信息进行清理,就会影响到peerstore的工作效率。

目前实现的成果是,在swarm的start办法中应用task::spawn启动一个工作,创立一个mpsc的channel,应用select语法期待管道传来的音讯或者 task 期待10分钟的逻辑实现,针对某些地址,如果曾经超出ttl的限度,清理以后地址;同时,如果以后peer_id的地址汇合中不存在任何的地址信息,就将其从Hashmap中移除。

Pinned

尽管GC机制的存在,使Hashmap不会无限度扩容,良好地帮忙了零碎的运行。然而依然有些有余的中央,思考如下这种状况:

对于KAD协定来说,peer须要不停地进行迭代查问已知节点,逐渐填充本人的KBucket,这是一个耗时的过程。如果在gc时,将较早查问到的节点地址信息从peerstore中移除了,那么又须要从新启用迭代查问去获取地址,因而咱们在PeerRecord中增加了一个bool值pinned。GC时会判断这条记录的类别,如果pinned为true,就会跳过清理的步骤。

序列化与长久化

对每一个peer来说,因为某些起因须要下线或停机时,寄存在peerstore里的节点信息是不应该被抛弃的。而对于须要长久化的数据,也须要进行序列化操作,便于寄存。

在libp2p-rs中,主循环的调用也是通过task::spawn启动的。当swarm接管到close的音讯时,将会退出事件处理的循环,并向运行peerstore的gc线程发送一个close()的事件,完结gc的过程。接下来调用peerstore的save_data()办法,将数据应用serde序列化成json格局,并应用std::io将序列化后的数据寄存到根目录的txt文件中。

办法剖析

以GC办法进行解析:

  1. swarm主循环spawn运行task,每十分钟触发一次select。
  2. Hashmap被Arc<Mutex>包裹,能够通过lock()获取,保障并发平安。
  3. 如果该peer的信息不是通过kad获取的,调用retain筛选未超出ttl时限的地址。
  4. 如果以后peer的地址数据曾经清空,从hashmap中移除这个peer。
    // swarm/lib.rs    // The GC task is to remove all expired addresses from the peer store        task::spawn(async move {            log::info!("starting Peerstore GC...");            loop {                let either = future::select(rx.next(), task::sleep(PEERSTORE_GC_PURGE_INTERVAL).boxed()).await;                match either {                    Either::Left((_, _)) => break,                    Either::Right((_, _)) => peer_store.remove_expired_addrs(),                }            }            log::info!("quitting Peerstore GC...");        });            // core/peerstore.rs    /// Removes all expired address.    pub fn remove_expired_addrs(&self) {        let mut to_remove = vec![];        let mut guard = self.inner.lock().unwrap();        for (peer, pr) in guard.iter_mut() {            if !pr.pinned {                log::debug!("GC attempt for {:?}", peer);                pr.addrs.retain(|record| record.expiry.elapsed() < record.ttl);                // delete this peer if no addr at all                if pr.addrs.is_empty() {                    log::debug!("remove {:?} from peerstore", peer);                    to_remove.push(peer.clone());                }            }        }        for peer in to_remove {            guard.remove(&peer);        }    }

Netwarps 由国内资深的云计算和分布式技术开发团队组成,该团队在金融、电力、通信及互联网行业有十分丰盛的落地教训。Netwarps 目前在深圳、北京均设立了研发核心,团队规模30+,其中大部分为具备十年以上开发教训的技术人员,别离来自互联网、金融、云计算、区块链以及科研机构等业余畛域。
Netwarps 专一于平安存储技术产品的研发与利用,次要产品有去中心化文件系统(DFS)、去中心化计算平台(DCP),致力于提供基于去中心化网络技术实现的分布式存储和分布式计算平台,具备高可用、低功耗和低网络的技术特点,实用于物联网、工业互联网等场景。
公众号:Netwarps