前面我们对go-libp2p中swarm拨号源码进行了分析(【go-libp2p源码剖析】Swarm拨号),参考go-libp2p,我们在libp2p-rs上完成swarm拨号功能的开发。功能基本上和go-libp2p保持一致,稍微做了精简,去掉了go-libp2p拨号的部分功能,如DialSync中的同步拨号限制。下面对libp2p-rs swarm拨号功能的实现做一个详细的说明。
代码组织结构
仓库地址:https://github.com/netwarps/libp2p-rs.git
拨号相关代码主要分布在swarm/src/lib.rs
和swarm/src/dial.rs
两个文件中
类图如下:
- 拨号实现主要围绕AsyncDialer展开,它组合了DialLimiter和Backoff的功能,AsyncDialer实现了拨号的重试,拨号任务的启动及拨号结果的收集和反馈。拨号默认不重试,可以通过修改环境变量
LIBP2P_SWARM_DIAL_ATTEMPTS
对重试次数做修改。 - DialParam 包装了多个拨号需要的参数,在AsyncDialer的方法之间传递
- Transports可以根据拨号地址匹配合适的Transport去拨号(比如是TCP还是WebSocket)
- DialBackoff 对Peer的拨号失败的地址做了标记,避免频繁拨号
- DialLimiter 对并发拨号数做了限制,默认100,也可以通过修改环境变量
LIBP2P_SWARM_DIAL_LIMIT
对并发拨号数做修改
工作流程
时序图如下:
- 通过control发送一个命令给swarm,可以调用new_connection创建一个新的连接,再创建stream,也可以直接调用open_stream创建stream。Swarm接收到命令后,调用on_new_connection或on_new_stream。在on_new_stream中如果connection存在直接拿出connection创建stream,如果不存在则去拨号创建一个新的connection再创建stream。最后调用dial_peer对peer进行拨号,在这里会将拨号要用到的参数从Swarm复制到DialParam。
注:我们拨号没有将connection直接返回(因为只有在open_stream时才用到了connection,如果将值返回显得有点多余,返回可变引用又会有生命周期相关问题)。这里会构造一个闭包(主要用来打开流并返回流),最终在ConnectionEstablished事件或OutgoingConnectionError事件处理函数中执行这个闭包。
由于拨号需要启动多个task,如果一路传递下去的话,闭包需要支持clone才行,闭包捕获了外部的oneshot::Sender,它不支持clone,所以为求方便我们将闭包暂存在Swarm里的dial_transactions中,它是一个hashmap数据结构,key值是每次操作生成的唯一值,我们命名为TransactionId。这个TransactionId最终会带到ConnectionEstablished事件或OutgoingConnectionError事件对应的处理函数,最后我们可以根据TransactionId将闭包remove出来执行。
部分代码片段
type DialCallback = Box<dyn FnOnce(Result<&mut Connection>) + Send>;
fn on_new_stream(&mut self, peer_id: PeerId, pids: Vec<ProtocolId>, reply: oneshot::Sender<Result<Substream>>) -> Result<()> {
if let Some(connection) = self.get_best_conn(&peer_id) {
......
} else {
// dialing peer, and opening a new stream in the post-processing callback
self.dial_peer(peer_id.clone(), |r: Result<&mut Connection>| {
match r {
Ok(connection) => {
connection.open_stream(pids, |r| {
let _ = reply.send(r.map_err(|e| e.into()));
});
}
Err(e) => {
let _ = reply.send(Err(e));
}
}
});
}
Ok(())
}
fn dial_peer<F: FnOnce(Result<&mut Connection>) + Send + 'static>(&mut self, peer_id: PeerId, f: F) {
......
// allocate transaction id and push box::f into hashmap for post-processing
let tid = self.assign_tid();
self.dial_transactions.insert(tid, Box::new(f));
self.dialer
.dial(peer_id, self.transports.clone(), addrs, self.event_sender.clone(), tid);
}
fn handle_connection_opened(&mut self, stream_muxer: IStreamMuxer, dir: Direction, tid: Option<TransactionId>) -> Result<()> {
......
// dial callback for post-processing
// note that it must cleanup the tid entry
if let Some(id) = tid {
// the entry must be there
let callback = self.dial_transactions.remove(&id).expect("no match tid found");
callback(Ok(&mut connection));
}
......
}
- Swarm拨号时会调用AsyncDialer的dial方法。这里首先启动一个新的task,再调用start_dialing方法。start_dialing方法实现了对拨号的重试功能,它会等待拨号结果,将拨号结果返回给dial,成功则发送ConnectionEstablished事件,失败则发送OutgoingConnectionError事件,在事件处理函数中会直接直接第一步传入的闭包。
pub(crate) fn dial(
&self,
peer_id: PeerId,
transports: Transports,
addrs: EitherDialAddr,
mut event_sender: mpsc::UnboundedSender<SwarmEvent>,
tid: TransactionId,
) {
let dial_param = DialParam {
transports,
addrs,
peer_id,
tid,
limiter: self.limiter.clone(),
backoff: self.backoff.clone(),
attempts: self.attempts,
};
task::spawn(async move {
let tid = dial_param.tid;
let peer_id = dial_param.peer_id.clone();
let r = AsyncDialer::start_dialing(dial_param).await;
match r {
Ok(stream_muxer) => {
let _ = event_sender
.send(SwarmEvent::ConnectionEstablished {
stream_muxer,
direction: Direction::Outbound,
tid: Some(tid),
})
.await;
}
Err(err) => {
let _ = event_sender
.send(SwarmEvent::OutgoingConnectionError { tid, peer_id, error: err })
.await;
}
}
});
}
async fn start_dialing(dial_param: DialParam) -> Result<IStreamMuxer> {
let mut dial_count: u32 = 0;
loop {
dial_count += 1;
let active_param = dial_param.clone();
let r = AsyncDialer::dial_addrs(active_param).await;
if let Err(e) = r {
log::info!("[Dialer] dialer failed at attempt={} error={:?}", dial_count, e);
if dial_count < dial_param.attempts {
log::info!(
"[Dialer] All addresses of {:?} cannot be dialed successfully. Now try dialing again, attempts={}",
dial_param.peer_id,
dial_count
);
//TODO:
task::sleep(BACKOFF_BASE).await;
} else if dial_param.attempts > 1 {
break Err(SwarmError::MaxDialAttempts(dial_param.attempts));
} else {
break Err(e);
}
} else {
break r;
}
}
}
- start内部调用了dial_addrs,即对peer的多个地址同时进行拨号。首先检查backoff,如果刚拨号失败过,则直接返回错误。然后针对每个地址构造一个DialJob,每个DialJob启动一个task调用limiter的do_dial_job做拨号检查和拨号操作,因为不知道task啥时候能拨号完成,这里传了一个channel tx进去,只要拨号完成就会发回一个消息,再在外面接收,启动几个task就接收几次channel rx的消息,一旦发现有成功的拨号,就将结果直接返回。那些后面再拨号成功的,我们不关心,让它们自动销毁;对那些拨号失败的添加backoff,避免对失败地址频繁拨号。
let (tx, rx) = mpsc::unbounded::<(Result<IStreamMuxer>, Multiaddr)>();
let mut num_jobs = 0;
for addr in addrs_rank {
// first of all, check the transport
let r = param.transports.lookup_by_addr(addr.clone());
if r.is_err() {
log::info!("[Dialer] no transport found for {:?}", addr);
continue;
}
num_jobs += 1;
let dj = DialJob {
addr,
peer: peer_id.clone(),
tx: tx.clone(),
transport: r.unwrap(),
};
// spawn a task to dial
let limiter = self.limiter.clone();
task::spawn(async move {
limiter.do_dial_job(dj).await;
});
}
log::trace!("total {} dialing jobs started, collecting...", num_jobs);
self.collect_dialing_result(rx, num_jobs, param).await
async fn collect_dialing_result(&self, mut rx: UnboundedReceiver<(Result<IStreamMuxer>, Multiaddr)>, jobs: u32, param: DialParam) -> Result<IStreamMuxer> {
for i in 0..jobs {
let peer_id = param.peer_id.clone();
log::trace!("[Dialer] receiving dial result, finished jobs={} ...", i);
let r = rx.next().await;
match r {
Some((Ok(stream_muxer), addr)) => {
let reported_pid = stream_muxer.remote_peer();
if peer_id == reported_pid {
return Ok(stream_muxer);
} else {
self.backoff.add_peer(peer_id, addr).await;
}
}
Some((Err(err), addr)) => {
if let SwarmError::Transport(_) = err {
self.backoff.add_peer(peer_id, addr).await;
}
}
None => {
log::warn!("[Dialer] should not happen");
}
}
}
return Err(SwarmError::AllDialsFailed);
}
- 相对go的实现DialLimiter做了精简,去掉了等待列表,失败的我们不会放到waiting列表里做拨号,而是直接返回错误。AsyncDialer的dial_addrs会调用do_dial_job。do_dial_job中会判断当前正在拨号的数量,如果数量超过我们的限制,则直接返回ConcurrentDialLimit错误。否则给并发数加1,并调用execute_dial做实际的拨号操作,拨号完成并发数减1。这里对transport的拨号加了一个超时的封装(本地地址默认5秒超时,外部地址默认60s超时),如果超时则直接返回DialTimeout错误。不管拨号成功与否都通过channel将消息送回给AsyncDialer。
async fn do_dial_job(&self, mut dj: DialJob) {
if self.dial_consuming.load(Ordering::SeqCst) >= self.dial_limit {
let _ = dj.tx.send((Err(SwarmError::ConcurrentDialLimit(self.dial_limit)), dj.addr)).await;
return;
}
self.dial_consuming.fetch_add(1, Ordering::SeqCst);
self.execute_dial(dj).await;
}
fn dial_timeout(&self, ma: &Multiaddr) -> Duration {
let mut timeout: Duration = DIAL_TIMEOUT;
if ma.is_private_addr() {
timeout = DIAL_TIMEOUT_LOCAL;
}
timeout
}
async fn execute_dial(&self, mut dj: DialJob) {
let timeout = self.dial_timeout(&dj.addr);
let dial_r = future::timeout(timeout, dj.transport.dial(dj.addr.clone())).await;
if let Ok(r) = dial_r {
let _ = dj.tx.send((r.map_err(|e|e.into()), dj.addr)).await;
} else {
let _ = dj.tx.send((Err(SwarmError::DialTimeout(dj.addr.clone(), timeout.as_secs())), dj.addr)).await;
}
self.dial_consuming.fetch_sub(1, Ordering::SeqCst);
}
Netwarps 由国内资深的云计算和分布式技术开发团队组成,该团队在金融、电力、通信及互联网行业有非常丰富的落地经验。Netwarps 目前在深圳、北京均设立了研发中心,团队规模30+,其中大部分为具备十年以上开发经验的技术人员,分别来自互联网、金融、云计算、区块链以及科研机构等专业领域。
Netwarps 专注于安全存储技术产品的研发与应用,主要产品有去中心化文件系统(DFS)、去中心化计算平台(DCP),致力于提供基于去中心化网络技术实现的分布式存储和分布式计算平台,具有高可用、低功耗和低网络的技术特点,适用于物联网、工业互联网等场景。
公众号:Netwarps
有疑问加站长微信联系(非本文作者)