目前在我的项目的主线上曾经实现了 kad 协定,而 kBucket 作为存储节点的一环,值得进行一次剖析。

Kbucket 简要介绍

在 kad 中,peer 每获取到一个节点的信息,会将其寄存到本人的 KBucket 中。每个 peer_id 由公钥通过 sha2_256 运算之后失去,长度为 32 个字节。每个节点都能够与另外的节点通过异或运算失去最长前缀,即从第一位开始的间断 0 的个数。0 越多,代表两个节点越靠近,最多能够有 32* 8 个间断的 0。所以对 KBucket 而言,桶的最大个数为 256 个。

KBucket 构造

在 rust-libp2p 中,每个 KBucket 外部保护了一个 Node 类型的 ArrayVector,大小为 20,其中 Node 构造应用 key 寄存 peer_id,value 寄存地址信息;first_connected_pos 作为 KBucket 的连贯标记位,记录可被清理的节点下标;同时还提供了 apply_pending 的属性,存储准备插入的节点

与 rust-libp2p 不同的是,libp2p-rs 应用了一个新的设计办法。因为 peerstore 的存在,咱们不须要在 KBucket 外面存储 peer 所对应的地址信息,绝对应的,peer 相干的一些连贯信息,如最初连接时间,就能够作为新的 value 被寄存到 node 中。这样设计还有一个益处,就是也不须要 apply_pending 和 connect_pos 这些属性了,每个 peer 能够独自保护一个本人的连贯状态信息,KBucket 清理时,能够间接用 filter 的形式执行相干操作。综合以上状况,咱们设计了一个构造体 PeerInfo,用来作为新的 Value 类型

PeerInfo 中记录了三个属性

/// The information of a peer in Kad routing table.
#[derive(Clone, Debug)]
pub struct PeerInfo {
    /// The time instant at which we talk to the remote peer.
    /// Sets to `Some` if it is deemed to be alive. Otherwise,
    /// it is set to `None`
    aliveness: Option<Instant>,

    /// The time this peer was added to the routing table.
    added_at: Instant,

    /// reserved for future use?
    replaceable: bool,

aliveness 示意最初通信时刻,added_at 记录该节点被增加到路由表的时刻,replaceable 标记这条信息是否能够被替换(目前未启用)。通过这种形式,在对 KBucket 进行增删操作时,可能更加容易判断相干 peer 的状态。


上面以 KBucketTable 的 try_add_peer() 进行剖析:

  1. 首先判断是否为已存在节点。如果存在,且属于迭代查问过程中调用的办法,那就须要更新 peer 的最初通信工夫
  2. 如果不是已存在节点,须要分状况探讨

    1. 如果调用 insert 办法可能胜利增加,就须要在 peerstore 中将该节点的 GC 标记位设置为 false,避免因为 GC 导致地址信息被清理,进而反复进行迭代查问。
    2. 如果增加失败,阐明 KBucket 满了,须要进行清理。首先通过 filter 和 min_by 找出最久未通信的节点,将其从 KBucket 中驱赶,同时 peerstore 中批改为可被 GC。之后再将新的节点插入到 KBucket 中,peerstore 标记不进行 GC
fn try_add_peer(&mut self, peer: PeerId, queried: bool) {
        let timeout = self.check_kad_peer_interval;
        let now = Instant::now();
        let key = kbucket::Key::new(peer.clone());

        log::debug!("trying to add a peer: {:?} bucket-index={:?}, query={}",

        match self.kbuckets.entry(&key) {kbucket::Entry::Present(mut entry) => {
                // already in RT, update the node's aliveness if queried is true
                if queried {entry.value().set_aliveness(Some(Instant::now()));
                    log::debug!("{:?} updated: {:?}", peer, entry.value());
            kbucket::Entry::Absent(mut entry) => {let info = PeerInfo::new(queried);
                if entry.insert(info.clone()) {log::debug!("Peer added to routing table: {} {:?}", peer, info);
                    // pin this peer in PeerStore to prevent GC from recycling multiaddr
                    if let Some(s) = self.swarm.as_ref() {s.pin(&peer)
                } else {log::debug!("Bucket full, trying to replace an old node for {}", peer);
                    // try replacing an 'old' peer
                    let bucket = entry.bucket();
                    let candidate = bucket
                        .filter(|n| n.value.get_aliveness().map_or(true, |a| now.duration_since(a) > timeout))
                        .min_by(|x, y| x.value.get_aliveness().cmp(&y.value.get_aliveness()));

                    if let Some(candidate) = candidate {let key = candidate.key.clone();
                        let evicted = bucket.remove(&key);
                        log::debug!("Bucket full. Peer node added, {} replacing {:?}", peer, evicted);
                        // unpin the evicted peer
                        if let Some(s) = self.swarm.as_ref() {s.unpin(key.preimage())
                        // now try to insert the value again
                        let _ = entry.insert(info);
                        // pin this peer in PeerStore to prevent GC from recycling multiaddr
                        if let Some(s) = self.swarm.as_ref() {s.pin(&peer)
                    } else {log::debug!("Bucket full, but can't find an replaced node, give up {}", peer);
            _ => {}}

