共计 3480 个字符,预计需要花费 9 分钟才能阅读完成。
模块地址:https://github.com/netwarps/l…
libp2p-rs 作为一个 p2p 网络我的项目,有时候咱们可能须要察看网络数据的收发状况,并对其进行收集和汇总。基于这个前提,设计了一个 metric 模块去实现相干内容。
metric 实现构想
因为 libp2p 反对连贯多个 peer,而每个 peer 反对的 protocol 类型也不尽相同。咱们岂但须要汇总收发包的数据,同时也须要依据 peer_id 和 protocol,去分类记录相应的网络流量状况。很显著,这是一个 key-value 构造,天然会想到应用 HashMap 去存储相干数据,然而 HashMap 不是一个线程平安的数据结构,那咱们就须要思考实现一个反对多线程平安并发的 HashMap。
平安并发
在设计的初始,首先思考到的就是应用 Arc 包裹 Mutex 的形式去保障线程平安,但因为目前的应用场景是统计网络收发包状况,如果频繁进行 lock 的操作,会导致性能极其低下。于是我参考了 go-libp2p 的相干 metric 实现,Go 的底层是应用了一个 sync.Map 的构造,通过 Atomic+Mutex 保障了多线程并发平安。因而设计的逻辑就变成了,是否应用 CAS 之类的原子操作,实现一个 lock-free 的 HashMap。
垃圾回收
除了线程平安,还有一种状况也须要思考。在 Java 和 Go 中,变量应用完后,GC 会主动帮咱们执行开释内存的操作。在 Rust 中,裸指针是指向内存地址的指针,只能通过手动开释的形式去回收内存;同时,在手动回收的时候,还须要思考是否有其余线程正在通过裸指针应用某块内存地址。而 AtomicPtr 的 compare_and_swap() 办法返回的恰好是一个可变的裸指针(即 *mut T),这无疑是一个辣手的问题。
crossbeam-epoch
针对上述两种状况,咱们能够应用 Crossbeam-Epoch 来解决遇到的问题。它提供了 Atomic 的相干原子操作和一个提早删除的性能。正如其名,epoch 应用世代和提早队列的形式,当 local epoch 与 global epoch 相差两代时,代表能够平安回收队列中两代前的内存地址,补救了前文提到的裸指针开释操作带来的破绽。crossbeam 通过 epoch 这个机制,保障了所有的对象只有在未被援用的状况下才会被删除,防止了呈现野指针的状况。
MetricMap
MetricMap 作为 Metric 的外围,外部实现是一个包裹了 crossbeam_epoch::Atomic 的 HashMap。通过 crossbeam_epoch 提供的 pin(),load(),defer_destroy() 等一系列办法,实现了 lock-free 的 HashMap。
MetricMap 的实现与 go-libp2p 中的 DeepCopyMap 类似,都是通过深拷贝的形式实现 map 构造的替换。Clone() 操作在 map 的数据量较大时,对性能的影响较为显著,后续思考优化相干构造。
以 store_or_modify() 办法举例:
- 首先应用 pin() 办法 ”pin” 住以后 thread,避免全局 epoch 降级导致以后线程的 drop() 办法被调用;
- 而后起一个 loop,循环加载 Atomic 中的 HashMap;
- 对 HashMap 解援用,因为在 rust 中解裸指针的援用是不平安的,因而须要用 unsafe 办法包裹;
- as_ref() 办法返回的是不可变援用,须要通过 clone() 失去一份新的 HashMap。如果 key 值存在,通过向闭包传值获取新的返回值,更新 value;否则插入新的 key-value;
- 调用 Owned::new 为新的 HashMap 调配一个在堆上的内存地址,执行 CAS 操作;
- 如果 CAS 胜利,将旧的 HashMap 地址增加到待革除的列表中,这个列表就是前文提到的提早删除的队列。
/// If map contains key, replaces original value with the result that return by F.
/// Otherwise, create a new key-value and insert.
pub fn store_or_modify<F: Fn(&K, &V) -> V>(&self, key: &K, value: V, on_modify: F) {let guard = crossbeam_epoch::pin();
loop {let shared = self.data.load(SeqCst, &guard);
let mut new_hash = HashMap::new();
match unsafe {shared.as_ref() } {Some(old_hash) => {new_hash = old_hash.clone();
if let Some(old_value) = new_hash.get(key) {let new_value = on_modify(key, old_value);
new_hash.insert(key.clone(), new_value.clone());
} else {new_hash.insert(key.clone(), value.clone());
}
}
None => {new_hash.insert(key.clone(), value.clone());
}
}
let owned = Owned::new(new_hash);
match self.data.compare_and_set(shared, owned, SeqCst, &guard) {Ok(_) => {
unsafe {guard.defer_destroy(shared);
break;
}
// break;
}
Err(_e) => {}}
}
}
Metric
Metric 的主体实现如下,能够看到与 peer 和 protocol 相干的数据结构都是基于 MetricMap 的。总数据包的个数和字节数大小不须要辨别,所以间接应用 std 的 AtomicUize 即可:
pub struct Metric {
/// The accumulative counter of packets sent.
pkt_sent: AtomicUsize,
/// The accumulative counter of packets received.
pkt_recv: AtomicUsize,
/// The accumulative counter of bytes sent.
byte_sent: AtomicUsize,
/// The accumulative counter of bytes received.
byte_recv: AtomicUsize,
/// A hashmap that key is protocol name and value is a counter of bytes received.
protocol_in: MetricMap<ProtocolId, usize>,
/// A hashmap that key is protocol name and value is a counter of bytes sent.
protocol_out: MetricMap<ProtocolId, usize>,
/// A hashmap that key is peer_id and value is a counter of bytes received.
peer_in: MetricMap<PeerId, usize>,
/// A hashmap that key is peer_id and value is a counter of bytes sent.
peer_out: MetricMap<PeerId, usize>,
}
总结
以上是 Metric 相干构造从实现到竣工,两头若有了解上的谬误,还请各位不吝赐教。目前而言,MetricMap 的设计适宜于一次新增屡次批改的状况。后续思考通过起一个 Web Server 的形式,通过 Restful API 的形式裸露相干监控数据,不便在内部查看。
Netwarps 由国内资深的云计算和分布式技术开发团队组成,该团队在金融、电力、通信及互联网行业有十分丰盛的落地教训。Netwarps 目前在深圳、北京均设立了研发核心,团队规模 30+,其中大部分为具备十年以上开发教训的技术人员,别离来自互联网、金融、云计算、区块链以及科研机构等业余畛域。
Netwarps 专一于平安存储技术产品的研发与利用,次要产品有去中心化文件系统(DFS)、去中心化计算平台(DCP),致力于提供基于去中心化网络技术实现的分布式存储和分布式计算平台,具备高可用、低功耗和低网络的技术特点,实用于物联网、工业互联网等场景。
公众号:Netwarps