后面咱们对go-libp2p中swarm拨号源码进行了剖析(【go-libp2p源码分析】Swarm拨号),参考go-libp2p,咱们在libp2p-rs上实现swarm拨号性能的开发。性能基本上和go-libp2p保持一致,略微做了精简,去掉了go-libp2p拨号的局部性能,如DialSync中的同步拨号限度。上面对libp2p-rs swarm拨号性能的实现做一个具体的阐明。
代码组织构造
仓库地址:https://github.com/netwarps/libp2p-rs.git
拨号相干代码次要散布在swarm/src/lib.rs
和swarm/src/dial.rs
两个文件中
类图如下:
- 拨号实现次要围绕AsyncDialer开展,它组合了DialLimiter和Backoff的性能,AsyncDialer实现了拨号的重试,拨号工作的启动及拨号后果的收集和反馈。拨号默认不重试,能够通过批改环境变量
LIBP2P_SWARM_DIAL_ATTEMPTS
对重试次数做批改。 - DialParam 包装了多个拨号须要的参数,在AsyncDialer的办法之间传递
- Transports能够依据拨号地址匹配适合的Transport去拨号(比方是TCP还是WebSocket)
- DialBackoff 对Peer的拨号失败的地址做了标记,防止频繁拨号
- DialLimiter 对并发拨号数做了限度,默认100,也能够通过批改环境变量
LIBP2P_SWARM_DIAL_LIMIT
对并发拨号数做批改
工作流程
时序图如下:
- 通过control发送一个命令给swarm,能够调用new_connection创立一个新的连贯,再创立stream,也能够间接调用open_stream创立stream。Swarm接管到命令后,调用on_new_connection或on_new_stream。在on_new_stream中如果connection存在间接拿出connection创立stream,如果不存在则去拨号创立一个新的connection再创立stream。最初调用dial_peer对peer进行拨号,在这里会将拨号要用到的参数从Swarm复制到DialParam。
注:咱们拨号没有将connection间接返回(因为只有在open_stream时才用到了connection,如果将值返回显得有点多余,返回可变援用又会有生命周期相干问题)。这里会结构一个闭包(次要用来关上流并返回流),最终在ConnectionEstablished事件或OutgoingConnectionError事件处理函数中执行这个闭包。
因为拨号须要启动多个task,如果一路传递上来的话,闭包须要反对clone才行,闭包捕捉了内部的oneshot::Sender,它不反对clone,所以为求不便咱们将闭包暂存在Swarm里的dial_transactions中,它是一个hashmap数据结构,key值是每次操作生成的惟一值,咱们命名为TransactionId。这个TransactionId最终会带到ConnectionEstablished事件或OutgoingConnectionError事件对应的处理函数,最初咱们能够依据TransactionId将闭包remove进去执行。
局部代码片段
type DialCallback = Box<dyn FnOnce(Result<&mut Connection>) + Send>;
fn on_new_stream(&mut self, peer_id: PeerId, pids: Vec<ProtocolId>, reply: oneshot::Sender<Result<Substream>>) -> Result<()> { if let Some(connection) = self.get_best_conn(&peer_id) { ...... } else { // dialing peer, and opening a new stream in the post-processing callback self.dial_peer(peer_id.clone(), |r: Result<&mut Connection>| { match r { Ok(connection) => { connection.open_stream(pids, |r| { let _ = reply.send(r.map_err(|e| e.into())); }); } Err(e) => { let _ = reply.send(Err(e)); } } }); } Ok(()) }
fn dial_peer<F: FnOnce(Result<&mut Connection>) + Send + 'static>(&mut self, peer_id: PeerId, f: F) { ...... // allocate transaction id and push box::f into hashmap for post-processing let tid = self.assign_tid(); self.dial_transactions.insert(tid, Box::new(f)); self.dialer .dial(peer_id, self.transports.clone(), addrs, self.event_sender.clone(), tid); }
fn handle_connection_opened(&mut self, stream_muxer: IStreamMuxer, dir: Direction, tid: Option<TransactionId>) -> Result<()> { ...... // dial callback for post-processing // note that it must cleanup the tid entry if let Some(id) = tid { // the entry must be there let callback = self.dial_transactions.remove(&id).expect("no match tid found"); callback(Ok(&mut connection)); } ...... }
- Swarm拨号时会调用AsyncDialer的dial办法。这里首先启动一个新的task,再调用start_dialing办法。start_dialing办法实现了对拨号的重试性能,它会期待拨号后果,将拨号后果返回给dial,胜利则发送ConnectionEstablished事件,失败则发送OutgoingConnectionError事件,在事件处理函数中会间接间接第一步传入的闭包。
pub(crate) fn dial( &self, peer_id: PeerId, transports: Transports, addrs: EitherDialAddr, mut event_sender: mpsc::UnboundedSender<SwarmEvent>, tid: TransactionId, ) { let dial_param = DialParam { transports, addrs, peer_id, tid, limiter: self.limiter.clone(), backoff: self.backoff.clone(), attempts: self.attempts, }; task::spawn(async move { let tid = dial_param.tid; let peer_id = dial_param.peer_id.clone(); let r = AsyncDialer::start_dialing(dial_param).await; match r { Ok(stream_muxer) => { let _ = event_sender .send(SwarmEvent::ConnectionEstablished { stream_muxer, direction: Direction::Outbound, tid: Some(tid), }) .await; } Err(err) => { let _ = event_sender .send(SwarmEvent::OutgoingConnectionError { tid, peer_id, error: err }) .await; } } }); }
async fn start_dialing(dial_param: DialParam) -> Result<IStreamMuxer> { let mut dial_count: u32 = 0; loop { dial_count += 1; let active_param = dial_param.clone(); let r = AsyncDialer::dial_addrs(active_param).await; if let Err(e) = r { log::info!("[Dialer] dialer failed at attempt={} error={:?}", dial_count, e); if dial_count < dial_param.attempts { log::info!( "[Dialer] All addresses of {:?} cannot be dialed successfully. Now try dialing again, attempts={}", dial_param.peer_id, dial_count ); //TODO: task::sleep(BACKOFF_BASE).await; } else if dial_param.attempts > 1 { break Err(SwarmError::MaxDialAttempts(dial_param.attempts)); } else { break Err(e); } } else { break r; } } }
- start外部调用了dial_addrs,即对peer的多个地址同时进行拨号。首先查看backoff,如果刚拨号失败过,则间接返回谬误。而后针对每个地址结构一个DialJob,每个DialJob启动一个task调用limiter的do_dial_job做拨号检查和拨号操作,因为不晓得task啥时候能拨号实现,这里传了一个channel tx进去,只有拨号实现就会发回一个音讯,再在里面接管,启动几个task就接管几次channel rx的音讯,一旦发现有胜利的拨号,就将后果间接返回。那些前面再拨号胜利的,咱们不关怀,让它们主动销毁;对那些拨号失败的增加backoff,防止对失败地址频繁拨号。
let (tx, rx) = mpsc::unbounded::<(Result<IStreamMuxer>, Multiaddr)>(); let mut num_jobs = 0; for addr in addrs_rank { // first of all, check the transport let r = param.transports.lookup_by_addr(addr.clone()); if r.is_err() { log::info!("[Dialer] no transport found for {:?}", addr); continue; } num_jobs += 1; let dj = DialJob { addr, peer: peer_id.clone(), tx: tx.clone(), transport: r.unwrap(), }; // spawn a task to dial let limiter = self.limiter.clone(); task::spawn(async move { limiter.do_dial_job(dj).await; }); } log::trace!("total {} dialing jobs started, collecting...", num_jobs); self.collect_dialing_result(rx, num_jobs, param).await
async fn collect_dialing_result(&self, mut rx: UnboundedReceiver<(Result<IStreamMuxer>, Multiaddr)>, jobs: u32, param: DialParam) -> Result<IStreamMuxer> { for i in 0..jobs { let peer_id = param.peer_id.clone(); log::trace!("[Dialer] receiving dial result, finished jobs={} ...", i); let r = rx.next().await; match r { Some((Ok(stream_muxer), addr)) => { let reported_pid = stream_muxer.remote_peer(); if peer_id == reported_pid { return Ok(stream_muxer); } else { self.backoff.add_peer(peer_id, addr).await; } } Some((Err(err), addr)) => { if let SwarmError::Transport(_) = err { self.backoff.add_peer(peer_id, addr).await; } } None => { log::warn!("[Dialer] should not happen"); } } } return Err(SwarmError::AllDialsFailed); }
- 绝对go的实现DialLimiter做了精简,去掉了期待列表,失败的咱们不会放到waiting列表里做拨号,而是间接返回谬误。AsyncDialer的dial_addrs会调用do_dial_job。do_dial_job中会判断以后正在拨号的数量,如果数量超过咱们的限度,则间接返回ConcurrentDialLimit谬误。否则给并发数加1,并调用execute_dial做理论的拨号操作,拨号实现并发数减1。这里对transport的拨号加了一个超时的封装(本地地址默认5秒超时,内部地址默认60s超时),如果超时则间接返回DialTimeout谬误。不论拨号胜利与否都通过channel将音讯送回给AsyncDialer。
async fn do_dial_job(&self, mut dj: DialJob) { if self.dial_consuming.load(Ordering::SeqCst) >= self.dial_limit { let _ = dj.tx.send((Err(SwarmError::ConcurrentDialLimit(self.dial_limit)), dj.addr)).await; return; } self.dial_consuming.fetch_add(1, Ordering::SeqCst); self.execute_dial(dj).await; }
fn dial_timeout(&self, ma: &Multiaddr) -> Duration { let mut timeout: Duration = DIAL_TIMEOUT; if ma.is_private_addr() { timeout = DIAL_TIMEOUT_LOCAL; } timeout } async fn execute_dial(&self, mut dj: DialJob) { let timeout = self.dial_timeout(&dj.addr); let dial_r = future::timeout(timeout, dj.transport.dial(dj.addr.clone())).await; if let Ok(r) = dial_r { let _ = dj.tx.send((r.map_err(|e|e.into()), dj.addr)).await; } else { let _ = dj.tx.send((Err(SwarmError::DialTimeout(dj.addr.clone(), timeout.as_secs())), dj.addr)).await; } self.dial_consuming.fetch_sub(1, Ordering::SeqCst); }
Netwarps 由国内资深的云计算和分布式技术开发团队组成,该团队在金融、电力、通信及互联网行业有十分丰盛的落地教训。Netwarps 目前在深圳、北京均设立了研发核心,团队规模30+,其中大部分为具备十年以上开发教训的技术人员,别离来自互联网、金融、云计算、区块链以及科研机构等业余畛域。
Netwarps 专一于平安存储技术产品的研发与利用,次要产品有去中心化文件系统(DFS)、去中心化计算平台(DCP),致力于提供基于去中心化网络技术实现的分布式存储和分布式计算平台,具备高可用、低功耗和低网络的技术特点,实用于物联网、工业互联网等场景。
公众号:Netwarps