目前在我的项目的主线上曾经实现了 kad 协定,而 kBucket 作为存储节点的一环,值得进行一次剖析。
Kbucket 简要介绍
在 kad 中,peer 每获取到一个节点的信息,会将其寄存到本人的 KBucket 中。每个 peer_id 由公钥通过 sha2_256 运算之后失去,长度为 32 个字节。每个节点都能够与另外的节点通过异或运算失去最长前缀,即从第一位开始的间断 0 的个数。0 越多,代表两个节点越靠近,最多能够有 32* 8 个间断的 0。所以对 KBucket 而言,桶的最大个数为 256 个。
KBucket 构造
在 rust-libp2p 中,每个 KBucket 外部保护了一个 Node 类型的 ArrayVector,大小为 20,其中 Node 构造应用 key 寄存 peer_id,value 寄存地址信息;first_connected_pos 作为 KBucket 的连贯标记位,记录可被清理的节点下标;同时还提供了 apply_pending 的属性,存储准备插入的节点
与 rust-libp2p 不同的是,libp2p-rs 应用了一个新的设计办法。因为 peerstore 的存在,咱们不须要在 KBucket 外面存储 peer 所对应的地址信息,绝对应的,peer 相干的一些连贯信息,如最初连接时间,就能够作为新的 value 被寄存到 node 中。这样设计还有一个益处,就是也不须要 apply_pending 和 connect_pos 这些属性了,每个 peer 能够独自保护一个本人的连贯状态信息,KBucket 清理时,能够间接用 filter 的形式执行相干操作。综合以上状况,咱们设计了一个构造体 PeerInfo,用来作为新的 Value 类型
PeerInfo 中记录了三个属性
/// The information of a peer in Kad routing table.
#[derive(Clone, Debug)]
pub struct PeerInfo {
/// The time instant at which we talk to the remote peer.
/// Sets to `Some` if it is deemed to be alive. Otherwise,
/// it is set to `None`
aliveness: Option<Instant>,
/// The time this peer was added to the routing table.
added_at: Instant,
/// reserved for future use?
replaceable: bool,
}
aliveness 示意最初通信时刻,added_at 记录该节点被增加到路由表的时刻,replaceable 标记这条信息是否能够被替换(目前未启用)。通过这种形式,在对 KBucket 进行增删操作时,可能更加容易判断相干 peer 的状态。
代码剖析
上面以 KBucketTable 的 try_add_peer() 进行剖析:
- 首先判断是否为已存在节点。如果存在,且属于迭代查问过程中调用的办法,那就须要更新 peer 的最初通信工夫
-
如果不是已存在节点,须要分状况探讨
- 如果调用 insert 办法可能胜利增加,就须要在 peerstore 中将该节点的 GC 标记位设置为 false,避免因为 GC 导致地址信息被清理,进而反复进行迭代查问。
- 如果增加失败,阐明 KBucket 满了,须要进行清理。首先通过 filter 和 min_by 找出最久未通信的节点,将其从 KBucket 中驱赶,同时 peerstore 中批改为可被 GC。之后再将新的节点插入到 KBucket 中,peerstore 标记不进行 GC
fn try_add_peer(&mut self, peer: PeerId, queried: bool) {
let timeout = self.check_kad_peer_interval;
let now = Instant::now();
let key = kbucket::Key::new(peer.clone());
log::debug!("trying to add a peer: {:?} bucket-index={:?}, query={}",
peer,
self.kbuckets.bucket_index(&key),
queried
);
match self.kbuckets.entry(&key) {kbucket::Entry::Present(mut entry) => {
// already in RT, update the node's aliveness if queried is true
if queried {entry.value().set_aliveness(Some(Instant::now()));
log::debug!("{:?} updated: {:?}", peer, entry.value());
}
}
kbucket::Entry::Absent(mut entry) => {let info = PeerInfo::new(queried);
if entry.insert(info.clone()) {log::debug!("Peer added to routing table: {} {:?}", peer, info);
// pin this peer in PeerStore to prevent GC from recycling multiaddr
if let Some(s) = self.swarm.as_ref() {s.pin(&peer)
}
} else {log::debug!("Bucket full, trying to replace an old node for {}", peer);
// try replacing an 'old' peer
let bucket = entry.bucket();
let candidate = bucket
.iter()
.filter(|n| n.value.get_aliveness().map_or(true, |a| now.duration_since(a) > timeout))
.min_by(|x, y| x.value.get_aliveness().cmp(&y.value.get_aliveness()));
if let Some(candidate) = candidate {let key = candidate.key.clone();
let evicted = bucket.remove(&key);
log::debug!("Bucket full. Peer node added, {} replacing {:?}", peer, evicted);
// unpin the evicted peer
if let Some(s) = self.swarm.as_ref() {s.unpin(key.preimage())
}
// now try to insert the value again
let _ = entry.insert(info);
// pin this peer in PeerStore to prevent GC from recycling multiaddr
if let Some(s) = self.swarm.as_ref() {s.pin(&peer)
}
} else {log::debug!("Bucket full, but can't find an replaced node, give up {}", peer);
}
}
}
_ => {}}
}
Netwarps 由国内资深的云计算和分布式技术开发团队组成,该团队在金融、电力、通信及互联网行业有十分丰盛的落地教训。Netwarps 目前在深圳、北京均设立了研发核心,团队规模 30+,其中大部分为具备十年以上开发教训的技术人员,别离来自互联网、金融、云计算、区块链以及科研机构等业余畛域。
Netwarps 专一于平安存储技术产品的研发与利用,次要产品有去中心化文件系统(DFS)、去中心化计算平台(DCP),致力于提供基于去中心化网络技术实现的分布式存储和分布式计算平台,具备高可用、低功耗和低网络的技术特点,实用于物联网、工业互联网等场景。
公众号:Netwarps