目前在我的项目的主线上曾经实现了kad协定,而kBucket作为存储节点的一环,值得进行一次剖析。
Kbucket简要介绍
在kad中,peer每获取到一个节点的信息,会将其寄存到本人的KBucket中。每个peer_id由公钥通过sha2_256运算之后失去,长度为32个字节。每个节点都能够与另外的节点通过异或运算失去最长前缀,即从第一位开始的间断0的个数。0越多,代表两个节点越靠近,最多能够有32*8个间断的0。所以对KBucket而言,桶的最大个数为256个。
KBucket构造
在rust-libp2p中,每个KBucket外部保护了一个Node类型的ArrayVector,大小为20,其中Node构造应用key寄存peer_id,value寄存地址信息;first_connected_pos作为KBucket的连贯标记位,记录可被清理的节点下标;同时还提供了apply_pending的属性,存储准备插入的节点
与rust-libp2p不同的是,libp2p-rs应用了一个新的设计办法。因为peerstore的存在,咱们不须要在KBucket外面存储peer所对应的地址信息,绝对应的,peer相干的一些连贯信息,如最初连接时间,就能够作为新的value被寄存到node中。这样设计还有一个益处,就是也不须要apply_pending和connect_pos这些属性了,每个peer能够独自保护一个本人的连贯状态信息,KBucket清理时,能够间接用filter的形式执行相干操作。综合以上状况,咱们设计了一个构造体PeerInfo,用来作为新的Value类型
PeerInfo中记录了三个属性
/// The information of a peer in Kad routing table.#[derive(Clone, Debug)]pub struct PeerInfo { /// The time instant at which we talk to the remote peer. /// Sets to `Some` if it is deemed to be alive. Otherwise, /// it is set to `None` aliveness: Option<Instant>, /// The time this peer was added to the routing table. added_at: Instant, /// reserved for future use? replaceable: bool,}
aliveness示意最初通信时刻,added_at记录该节点被增加到路由表的时刻,replaceable标记这条信息是否能够被替换(目前未启用)。通过这种形式,在对KBucket进行增删操作时,可能更加容易判断相干peer的状态。
代码剖析
上面以KBucketTable的try_add_peer()进行剖析:
- 首先判断是否为已存在节点。如果存在,且属于迭代查问过程中调用的办法,那就须要更新peer的最初通信工夫
如果不是已存在节点,须要分状况探讨
- 如果调用insert办法可能胜利增加,就须要在peerstore中将该节点的GC标记位设置为false,避免因为GC导致地址信息被清理,进而反复进行迭代查问。
- 如果增加失败,阐明KBucket满了,须要进行清理。首先通过filter和min_by找出最久未通信的节点,将其从KBucket中驱赶,同时peerstore中批改为可被GC。之后再将新的节点插入到KBucket中,peerstore标记不进行GC
fn try_add_peer(&mut self, peer: PeerId, queried: bool) { let timeout = self.check_kad_peer_interval; let now = Instant::now(); let key = kbucket::Key::new(peer.clone()); log::debug!( "trying to add a peer: {:?} bucket-index={:?}, query={}", peer, self.kbuckets.bucket_index(&key), queried ); match self.kbuckets.entry(&key) { kbucket::Entry::Present(mut entry) => { // already in RT, update the node's aliveness if queried is true if queried { entry.value().set_aliveness(Some(Instant::now())); log::debug!("{:?} updated: {:?}", peer, entry.value()); } } kbucket::Entry::Absent(mut entry) => { let info = PeerInfo::new(queried); if entry.insert(info.clone()) { log::debug!("Peer added to routing table: {} {:?}", peer, info); // pin this peer in PeerStore to prevent GC from recycling multiaddr if let Some(s) = self.swarm.as_ref() { s.pin(&peer) } } else { log::debug!("Bucket full, trying to replace an old node for {}", peer); // try replacing an 'old' peer let bucket = entry.bucket(); let candidate = bucket .iter() .filter(|n| n.value.get_aliveness().map_or(true, |a| now.duration_since(a) > timeout)) .min_by(|x, y| x.value.get_aliveness().cmp(&y.value.get_aliveness())); if let Some(candidate) = candidate { let key = candidate.key.clone(); let evicted = bucket.remove(&key); log::debug!("Bucket full. Peer node added, {} replacing {:?}", peer, evicted); // unpin the evicted peer if let Some(s) = self.swarm.as_ref() { s.unpin(key.preimage()) } // now try to insert the value again let _ = entry.insert(info); // pin this peer in PeerStore to prevent GC from recycling multiaddr if let Some(s) = self.swarm.as_ref() { s.pin(&peer) } } else { log::debug!("Bucket full, but can't find an replaced node, give up {}", peer); } } } _ => {} } }
Netwarps 由国内资深的云计算和分布式技术开发团队组成,该团队在金融、电力、通信及互联网行业有十分丰盛的落地教训。Netwarps 目前在深圳、北京均设立了研发核心,团队规模30+,其中大部分为具备十年以上开发教训的技术人员,别离来自互联网、金融、云计算、区块链以及科研机构等业余畛域。
Netwarps 专一于平安存储技术产品的研发与利用,次要产品有去中心化文件系统(DFS)、去中心化计算平台(DCP),致力于提供基于去中心化网络技术实现的分布式存储和分布式计算平台,具备高可用、低功耗和低网络的技术特点,实用于物联网、工业互联网等场景。
公众号:Netwarps