

本文以 floodsub 为例,探讨如何在 libp2p-rs 上开发新协定,具体代码请查看源码。

实现两个 trait

在 libp2p-rs 中,swarm 提供了两个 trait:

  • Notifiee 用于接管 swarm 的告诉,当有新的连贯创立或者连贯敞开时,swarm 会调用 connected() 或者 disconnected();
  • ProtocolHandler 用于读写协定的数据,协定协商胜利后,swarm 会调用 handle()。
/// Notifiee is an trait for an object wishing to receive notifications from swarm
pub trait Notifiee {
    /// called when a connection opened
    fn connected(&mut self, _conn: &mut Connection) {}
    /// called when a connection closed
    fn disconnected(&mut self, _conn: &mut Connection) {}}

/// Common trait for upgrades that can be applied on inbound substreams, outbound substreams,
/// or both.
/// Possible upgrade on a connection or substream.
pub trait ProtocolHandler: UpgradeInfo + Notifiee {
    /// After we have determined that the remote supports one of the protocols we support, this
    /// method is called to start handling the inbound. Swarm will start invoking this method
    /// in a newly spawned task.
    /// The `info` is the identifier of the protocol, as produced by `protocol_info`.
    async fn handle(&mut self, stream: Substream, info: <Self as UpgradeInfo>::Info) -> Result<(), Box<dyn Error>>;
    /// This is to provide a clone method for the trait object.
    fn box_clone(&self) -> IProtocolHandler;

floodsub handler 实现 Notifiee 和 ProtocolHandler

pub struct Handler {
    incoming_tx: mpsc::UnboundedSender<RPC>,
    new_peer: mpsc::UnboundedSender<PeerEvent>,

impl Handler {pub(crate) fn new(incoming_tx: mpsc::UnboundedSender<RPC>, new_peer: mpsc::UnboundedSender<PeerEvent>) -> Self {Handler { incoming_tx, new_peer}

impl UpgradeInfo for Handler {type Info = &'static [u8];

    fn protocol_info(&self) -> Vec<Self::Info> {vec![FLOOD_SUB_ID]

impl Notifiee for Handler {fn connected(&mut self, conn: &mut Connection) {let peer_id = conn.remote_peer();
        let mut new_peers = self.new_peer.clone();
        task::spawn(async move {let _ = new_peers.send(PeerEvent::NewPeer(peer_id)).await;

impl ProtocolHandler for Handler {async fn handle(&mut self, mut stream: Substream, _info: <Self as UpgradeInfo>::Info) -> Result<(), Box<dyn Error>> {
        loop {
            /* recv, decode and send to msg process mainloop */
            self.incoming_tx.send(rpc).await.map_err(|_| FloodsubDecodeError::ProtocolExit)?;

    fn box_clone(&self) -> IProtocolHandler {Box::new(self.clone())

注册到 swarm

let floodsub = FloodSub::new(FloodsubConfig::new(local_peer_id));
let handler = floodsub.handler();

let mut swarm = Swarm::new(local_key.public()).with_protocol(Box::new(handler))


简略的协定,比方 echo,那么所有事件都在 ProtocolHandler.handle() 中解决即可,到这里就完结了。

略微简单的协定,比方 floodsub,最好将 swarm 的告诉和收到的数据,发送到音讯解决主循环进行解决,实时更新状态;

impl floodsub {pub fn start(mut self, control: Swarm_Control) {self.control = Some(control);
        // well, self 'move' explicitly,
        let mut floodsub = self;
        task::spawn(async move {let _ = floodsub.process_loop().await;

    /// Message Process Loop.
    pub async fn process_loop(&mut self) -> Result<()> {
        loop {
            select! {cmd = self.peer_rx.next() => {self.handle_peer_event(cmd).await;
                rpc = self.incoming_rx.next() => {self.handle_incoming_rpc(rpc).await?;
                cmd = self.control_rx.next() => {self.on_control_command(cmd).await?;
                sub = self.cancel_rx.next() => {self.un_subscribe(sub).await?;

从下面能够看到,floodsub 音讯解决主循环运行在一个 task 外面,start() 时须要将 self 传递进去,因而后续的公布订阅等操作只能通过 channel 发消息,这就是 control 和 handler 包裹 channel 的起因。

pub struct Control {
    config: FloodsubConfig,
    control_sender: mpsc::UnboundedSender<ControlCommand>,

impl Control {
    /// Subscribe to messages on a given topic.
    pub async fn subscribe(&mut self, topic: Topic) -> Option<Subscription> {let (tx, rx) = oneshot::channel();
            .send(ControlCommand::Subscribe(topic, tx))
            .expect("control send subscribe");

当新的连贯创立时,floodsub 会被动创立流,协商通过后向对方发送本节点感兴趣的 topic。因而这里须要 swarm 的 control。

self.control.as_mut().unwrap().new_stream(pid, vec![FLOOD_SUB_ID]).await;


在 libp2p-rs 下面开发简略的协定,只须要两步,对于略微简单的协定,须要 handler 和 control 这类包裹 channel 的构造,将音讯发送到协定音讯解决主循环,以驱动整个协定的运行,实现特定的性能。

