乐趣区

关于golang:libp2prs-swarm-拨号设计与实现

后面咱们对 go-libp2p 中 swarm 拨号源码进行了剖析(【go-libp2p 源码分析】Swarm 拨号),参考 go-libp2p,咱们在 libp2p-rs 上实现 swarm 拨号性能的开发。性能基本上和 go-libp2p 保持一致,略微做了精简,去掉了 go-libp2p 拨号的局部性能,如 DialSync 中的同步拨号限度。上面对 libp2p-rs swarm 拨号性能的实现做一个具体的阐明。

代码组织构造

仓库地址:https://github.com/netwarps/libp2p-rs.git
拨号相干代码次要散布在 swarm/src/lib.rsswarm/src/dial.rs两个文件中

类图如下:

  • 拨号实现次要围绕 AsyncDialer 开展,它组合了 DialLimiter 和 Backoff 的性能,AsyncDialer 实现了拨号的重试,拨号工作的启动及拨号后果的收集和反馈。拨号默认不重试,能够通过批改环境变量 LIBP2P_SWARM_DIAL_ATTEMPTS 对重试次数做批改。
  • DialParam 包装了多个拨号须要的参数,在 AsyncDialer 的办法之间传递
  • Transports 能够依据拨号地址匹配适合的 Transport 去拨号(比方是 TCP 还是 WebSocket)
  • DialBackoff 对 Peer 的拨号失败的地址做了标记,防止频繁拨号
  • DialLimiter 对并发拨号数做了限度,默认 100,也能够通过批改环境变量 LIBP2P_SWARM_DIAL_LIMIT 对并发拨号数做批改

工作流程

时序图如下:

  1. 通过 control 发送一个命令给 swarm,能够调用 new_connection 创立一个新的连贯,再创立 stream,也能够间接调用 open_stream 创立 stream。Swarm 接管到命令后,调用 on_new_connection 或 on_new_stream。在 on_new_stream 中如果 connection 存在间接拿出 connection 创立 stream,如果不存在则去拨号创立一个新的 connection 再创立 stream。最初调用 dial_peer 对 peer 进行拨号,在这里会将拨号要用到的参数从 Swarm 复制到 DialParam。

注:咱们拨号没有将 connection 间接返回(因为只有在 open_stream 时才用到了 connection,如果将值返回显得有点多余,返回可变援用又会有生命周期相干问题)。这里会结构一个闭包 (次要用来关上流并返回流),最终在 ConnectionEstablished 事件或 OutgoingConnectionError 事件处理函数中执行这个闭包。
因为拨号须要启动多个 task,如果一路传递上来的话,闭包须要反对 clone 才行,闭包捕捉了内部的 oneshot::Sender,它不反对 clone,所以为求不便咱们将闭包暂存在 Swarm 里的 dial_transactions 中,它是一个 hashmap 数据结构,key 值是每次操作生成的惟一值,咱们命名为 TransactionId。这个 TransactionId 最终会带到 ConnectionEstablished 事件或 OutgoingConnectionError 事件对应的处理函数,最初咱们能够依据 TransactionId 将闭包 remove 进去执行。

局部代码片段

type DialCallback = Box<dyn FnOnce(Result<&mut Connection>) + Send>;
fn on_new_stream(&mut self, peer_id: PeerId, pids: Vec<ProtocolId>, reply: oneshot::Sender<Result<Substream>>) -> Result<()> {if let Some(connection) = self.get_best_conn(&peer_id) {......} else {
            // dialing peer, and opening a new stream in the post-processing callback
            self.dial_peer(peer_id.clone(), |r: Result<&mut Connection>| {
                match r {Ok(connection) => {
                        connection.open_stream(pids, |r| {let _ = reply.send(r.map_err(|e| e.into()));
                        });
                    }
                    Err(e) => {let _ = reply.send(Err(e));
                    }
                }
            });
        }
        Ok(())
    }
 fn dial_peer<F: FnOnce(Result<&mut Connection>) + Send + 'static>(&mut self, peer_id: PeerId, f: F) {
        ......
        
           // allocate transaction id and push box::f into hashmap for post-processing
        let tid = self.assign_tid();
        self.dial_transactions.insert(tid, Box::new(f));
        self.dialer
            .dial(peer_id, self.transports.clone(), addrs, self.event_sender.clone(), tid);
    }
 fn handle_connection_opened(&mut self, stream_muxer: IStreamMuxer, dir: Direction, tid: Option<TransactionId>) -> Result<()> {
 ......
    // dial callback for post-processing
        // note that it must cleanup the tid entry
        if let Some(id) = tid {
            // the entry must be there
            let callback = self.dial_transactions.remove(&id).expect("no match tid found");
            callback(Ok(&mut connection));
        }
   ......
 }
  1. Swarm 拨号时会调用 AsyncDialer 的 dial 办法。这里首先启动一个新的 task,再调用 start_dialing 办法。start_dialing 办法实现了对拨号的重试性能,它会期待拨号后果,将拨号后果返回给 dial,胜利则发送 ConnectionEstablished 事件,失败则发送 OutgoingConnectionError 事件,在事件处理函数中会间接间接第一步传入的闭包。
pub(crate) fn dial(
        &self,
        peer_id: PeerId,
        transports: Transports,
        addrs: EitherDialAddr,
        mut event_sender: mpsc::UnboundedSender<SwarmEvent>,
        tid: TransactionId,
    ) {
        let dial_param = DialParam {
            transports,
            addrs,
            peer_id,
            tid,
            limiter: self.limiter.clone(),
            backoff: self.backoff.clone(),
            attempts: self.attempts,
        };

        task::spawn(async move {
            let tid = dial_param.tid;
            let peer_id = dial_param.peer_id.clone();

            let r = AsyncDialer::start_dialing(dial_param).await;
            match r {Ok(stream_muxer) => {
                    let _ = event_sender
                        .send(SwarmEvent::ConnectionEstablished {
                            stream_muxer,
                            direction: Direction::Outbound,
                            tid: Some(tid),
                        })
                        .await;
                }
                Err(err) => {
                    let _ = event_sender
                        .send(SwarmEvent::OutgoingConnectionError { tid, peer_id, error: err})
                        .await;
                }
            }
        });
    }
async fn start_dialing(dial_param: DialParam) -> Result<IStreamMuxer> {
        let mut dial_count: u32 = 0;
        loop {
            dial_count += 1;

            let active_param = dial_param.clone();
            let r = AsyncDialer::dial_addrs(active_param).await;
            if let Err(e) = r {log::info!("[Dialer] dialer failed at attempt={} error={:?}", dial_count, e);
                if dial_count < dial_param.attempts {
                    log::info!("[Dialer] All addresses of {:?} cannot be dialed successfully. Now try dialing again, attempts={}",
                        dial_param.peer_id,
                        dial_count
                    );
                    //TODO:
                    task::sleep(BACKOFF_BASE).await;
                } else if dial_param.attempts > 1 {break Err(SwarmError::MaxDialAttempts(dial_param.attempts));
                } else {break Err(e);
                }
            } else {break r;}
        }
    }
  1. start 外部调用了 dial_addrs,即对 peer 的多个地址同时进行拨号。首先查看 backoff,如果刚拨号失败过,则间接返回谬误。而后针对每个地址结构一个 DialJob,每个 DialJob 启动一个 task 调用 limiter 的 do_dial_job 做拨号检查和拨号操作,因为不晓得 task 啥时候能拨号实现,这里传了一个 channel tx 进去,只有拨号实现就会发回一个音讯,再在里面接管,启动几个 task 就接管几次 channel rx 的音讯,一旦发现有胜利的拨号,就将后果间接返回。那些前面再拨号胜利的,咱们不关怀,让它们主动销毁;对那些拨号失败的增加 backoff,防止对失败地址频繁拨号。
       let (tx, rx) = mpsc::unbounded::<(Result<IStreamMuxer>, Multiaddr)>();
        let mut num_jobs = 0;
        for addr in addrs_rank {
            // first of all, check the transport
            let r = param.transports.lookup_by_addr(addr.clone());
            if r.is_err() {log::info!("[Dialer] no transport found for {:?}", addr);
                continue;
            }

            num_jobs += 1;

            let dj = DialJob {
                addr,
                peer: peer_id.clone(),
                tx: tx.clone(),
                transport: r.unwrap(),};
            // spawn a task to dial
            let limiter = self.limiter.clone();
            task::spawn(async move {limiter.do_dial_job(dj).await;
            });
        }
         log::trace!("total {} dialing jobs started, collecting...", num_jobs);
        self.collect_dialing_result(rx, num_jobs, param).await
 async fn collect_dialing_result(&self, mut rx: UnboundedReceiver<(Result<IStreamMuxer>, Multiaddr)>, jobs: u32, param: DialParam) -> Result<IStreamMuxer> {
        for i in 0..jobs {let peer_id = param.peer_id.clone();
            log::trace!("[Dialer] receiving dial result, finished jobs={} ...", i);
            let r = rx.next().await;
            match r {Some((Ok(stream_muxer), addr)) => {let reported_pid = stream_muxer.remote_peer();
                    if peer_id == reported_pid {return Ok(stream_muxer);
                    } else {self.backoff.add_peer(peer_id, addr).await;
                    }
                }
                Some((Err(err), addr)) => {if let SwarmError::Transport(_) = err {self.backoff.add_peer(peer_id, addr).await;
                    }
                }
                None => {log::warn!("[Dialer] should not happen");
                }
            }
        }
        return Err(SwarmError::AllDialsFailed);
    }
  1. 绝对 go 的实现 DialLimiter 做了精简,去掉了期待列表,失败的咱们不会放到 waiting 列表里做拨号,而是间接返回谬误。AsyncDialer 的 dial_addrs 会调用 do_dial_job。do_dial_job 中会判断以后正在拨号的数量,如果数量超过咱们的限度,则间接返回 ConcurrentDialLimit 谬误。否则给并发数加 1,并调用 execute_dial 做理论的拨号操作,拨号实现并发数减 1。这里对 transport 的拨号加了一个超时的封装(本地地址默认 5 秒超时,内部地址默认 60s 超时),如果超时则间接返回 DialTimeout 谬误。不论拨号胜利与否都通过 channel 将音讯送回给 AsyncDialer。
 async fn do_dial_job(&self, mut dj: DialJob) {if self.dial_consuming.load(Ordering::SeqCst) >= self.dial_limit {let _ = dj.tx.send((Err(SwarmError::ConcurrentDialLimit(self.dial_limit)), dj.addr)).await;
            return;
        }
        self.dial_consuming.fetch_add(1, Ordering::SeqCst);
        self.execute_dial(dj).await;
    }
 fn dial_timeout(&self, ma: &Multiaddr) -> Duration {
        let mut timeout: Duration = DIAL_TIMEOUT;
        if ma.is_private_addr() {timeout = DIAL_TIMEOUT_LOCAL;}
        timeout
    }
  async fn execute_dial(&self, mut dj: DialJob) {let timeout = self.dial_timeout(&dj.addr);

        let dial_r = future::timeout(timeout, dj.transport.dial(dj.addr.clone())).await;
        if let Ok(r) = dial_r {let _ = dj.tx.send((r.map_err(|e|e.into()), dj.addr)).await;
        } else {let _ = dj.tx.send((Err(SwarmError::DialTimeout(dj.addr.clone(), timeout.as_secs())), dj.addr)).await;
        }
        self.dial_consuming.fetch_sub(1, Ordering::SeqCst);
    }

Netwarps 由国内资深的云计算和分布式技术开发团队组成,该团队在金融、电力、通信及互联网行业有十分丰盛的落地教训。Netwarps 目前在深圳、北京均设立了研发核心,团队规模 30+,其中大部分为具备十年以上开发教训的技术人员,别离来自互联网、金融、云计算、区块链以及科研机构等业余畛域。
Netwarps 专一于平安存储技术产品的研发与利用,次要产品有去中心化文件系统(DFS)、去中心化计算平台(DCP),致力于提供基于去中心化网络技术实现的分布式存储和分布式计算平台,具备高可用、低功耗和低网络的技术特点,实用于物联网、工业互联网等场景。
公众号:Netwarps

退出移动版