模块地址: https://github.com/netwarps/l...
libp2p-rs 作为一个 p2p 网络项目,有时候我们可能需要观察网络数据的收发情况,并对其进行收集和汇总。基于这个前提,设计了一个 metric 模块去实现相关内容。
metric实现构想
由于 libp2p 支持连接多个 peer,而每个 peer 支持的 protocol 类型也不尽相同。我们不但需要汇总收发包的数据,同时也需要根据 peer_id 和 protocol,去分类记录相应的网络流量情况。很明显,这是一个 key-value 结构,自然会想到使用 HashMap 去存储相关数据,但是 HashMap 不是一个线程安全的数据结构,那我们就需要考虑实现一个支持多线程安全并发的 HashMap。
安全并发
在设计的初始,首先考虑到的就是使用 Arc 包裹 Mutex 的方式去保证线程安全,但由于目前的使用场景是统计网络收发包情况,如果频繁进行 lock 的操作,会导致性能极其低下。于是我参考了go-libp2p 的相关 metric 实现,Go 的底层是使用了一个 sync.Map 的结构,通过 Atomic+Mutex 保证了多线程并发安全。因此设计的逻辑就变成了,能否使用 CAS 之类的原子操作,实现一个 lock-free 的 HashMap。
垃圾回收
除了线程安全,还有一种情况也需要考虑。在Java和Go中,变量使用完后,GC会自动帮我们执行释放内存的操作。在 Rust 中,裸指针是指向内存地址的指针,只能通过手动释放的方式去回收内存;同时,在手动回收的时候,还需要考虑是否有其他线程正在通过裸指针使用某块内存地址。而 AtomicPtr 的 compare_and_swap() 方法返回的恰好是一个可变的裸指针(即*mut T),这无疑是一个棘手的问题。
crossbeam-epoch
针对上述两种情况,我们可以使用 Crossbeam-Epoch 来解决遇到的问题。它提供了 Atomic 的相关原子操作和一个延迟删除的功能。正如其名,epoch 使用世代和延迟队列的方式,当 local epoch 与 global epoch 相差两代时,代表可以安全回收队列中两代前的内存地址,弥补了前文提到的裸指针释放操作带来的漏洞。crossbeam 通过 epoch 这个机制,保证了所有的对象只有在未被引用的情况下才会被删除,避免了出现野指针的情况。
MetricMap
MetricMap 作为 Metric 的核心,内部实现是一个包裹了crossbeam_epoch::Atomic 的 HashMap。通过 crossbeam_epoch 提供的 pin(), load(),defer_destroy() 等一系列方法,实现了 lock-free 的 HashMap。
MetricMap 的实现与 go-libp2p 中的 DeepCopyMap 相似,都是通过深拷贝的方式实现 map 结构的替换。Clone() 操作在 map 的数据量较大时,对性能的影响较为明显,后续考虑优化相关结构。
以 store_or_modify() 方法举例:
- 首先使用 pin() 方法"pin"住当前 thread,防止全局 epoch 升级导致当前线程的 drop() 方法被调用;
- 然后起一个 loop,循环加载 Atomic 中的 HashMap;
- 对 HashMap 解引用,由于在 rust 中解裸指针的引用是不安全的,因此需要用 unsafe 方法包裹;
- as_ref() 方法返回的是不可变引用,需要通过 clone() 得到一份新的 HashMap。如果 key 值存在,通过向闭包传值获取新的返回值,更新 value;否则插入新的 key-value;
- 调用 Owned::new 为新的 HashMap 分配一个在堆上的内存地址,执行 CAS 操作;
- 如果 CAS 成功,将旧的 HashMap 地址添加到待清除的列表中,这个列表就是前文提到的延迟删除的队列。
/// If map contains key, replaces original value with the result that return by F.
/// Otherwise, create a new key-value and insert.
pub fn store_or_modify<F: Fn(&K, &V) -> V>(&self, key: &K, value: V, on_modify: F) {
let guard = crossbeam_epoch::pin();
loop {
let shared = self.data.load(SeqCst, &guard);
let mut new_hash = HashMap::new();
match unsafe { shared.as_ref() } {
Some(old_hash) => {
new_hash = old_hash.clone();
if let Some(old_value) = new_hash.get(key) {
let new_value = on_modify(key, old_value);
new_hash.insert(key.clone(), new_value.clone());
} else {
new_hash.insert(key.clone(), value.clone());
}
}
None => {
new_hash.insert(key.clone(), value.clone());
}
}
let owned = Owned::new(new_hash);
match self.data.compare_and_set(shared, owned, SeqCst, &guard) {
Ok(_) => {
unsafe {
guard.defer_destroy(shared);
break;
}
// break;
}
Err(_e) => {}
}
}
}
Metric
Metric 的主体实现如下,可以看到与 peer 和 protocol 相关的数据结构都是基于 MetricMap 的。总数据包的个数和字节数大小不需要区分,所以直接使用 std 的 AtomicUize 即可:
pub struct Metric {
/// The accumulative counter of packets sent.
pkt_sent: AtomicUsize,
/// The accumulative counter of packets received.
pkt_recv: AtomicUsize,
/// The accumulative counter of bytes sent.
byte_sent: AtomicUsize,
/// The accumulative counter of bytes received.
byte_recv: AtomicUsize,
/// A hashmap that key is protocol name and value is a counter of bytes received.
protocol_in: MetricMap<ProtocolId, usize>,
/// A hashmap that key is protocol name and value is a counter of bytes sent.
protocol_out: MetricMap<ProtocolId, usize>,
/// A hashmap that key is peer_id and value is a counter of bytes received.
peer_in: MetricMap<PeerId, usize>,
/// A hashmap that key is peer_id and value is a counter of bytes sent.
peer_out: MetricMap<PeerId, usize>,
}
总结
以上是 Metric 相关结构从实现到完工,中间若有理解上的错误,还请各位不吝赐教。目前而言,MetricMap 的设计适合于一次新增多次修改的情况。后续考虑通过起一个 Web Server 的方式,通过 Restful API 的方式暴露相关监控数据,方便在外部查看。
Netwarps 由国内资深的云计算和分布式技术开发团队组成,该团队在金融、电力、通信及互联网行业有非常丰富的落地经验。Netwarps 目前在深圳、北京均设立了研发中心,团队规模30+,其中大部分为具备十年以上开发经验的技术人员,分别来自互联网、金融、云计算、区块链以及科研机构等专业领域。
Netwarps 专注于安全存储技术产品的研发与应用,主要产品有去中心化文件系统(DFS)、去中心化计算平台(DCP),致力于提供基于去中心化网络技术实现的分布式存储和分布式计算平台,具有高可用、低功耗和低网络的技术特点,适用于物联网、工业互联网等场景。
公众号:Netwarps
有疑问加站长微信联系(非本文作者)