关于rust:浅谈-Gossipsub

42次阅读

共计 4522 个字符,预计需要花费 12 分钟才能阅读完成。

状况概述

目前 libp2p-rs 团队在开发分支上初步实现了 Gossip 协定的相干工作,将会尽快公布。以下是 Gossip 协定的相干简略介绍:

从 Pubsub 说起

Pubsub 的意思是 publish/subscribe,即公布 / 订阅,是音讯流传一种机制。任意一个节点都能够关注感兴趣的主题,并公布主题相干的音讯,此音讯将会被发送到任何订阅了该主题的节点上。Pubsub 共有三种实现:Floodsub,Gossipsub,Randomsub。本文将次要剖析 floodsub 和 gossipsub。

Floodsub

Floodsub 顾名思义,是一种成果相似于“泛洪”的公布 / 订阅机制。在 libp2p-rs 中,每一个节点通过 floodsub 收到相干的音讯后,首先在本地进行相干的解决,而后通过再公布的模式扩散到四周的节点。

Floodsub 的长处在于可维护性较强,且在音讯的流传门路上能最小化网络提早所带来的问题。然而它依然有有余的中央,当某个节点领有大量的连贯节点时,转发音讯可能将会带来极大的带宽问题。

Gossipsub

Gossipsub 是在 floodsub 的根底上设计开发的一个协定。通过施加一些限度,以及设计一些相干的数据结构,解决了 floodsub 带来的问题。外围是保护 mesh 和 fanout。

mesh&fanout

mesh 是一个针对主题造成小范畴的节点网格,构造如下:

    /// Overlay network of connected peers - Maps topics to connected gossipsub peers.
    mesh: HashMap<TopicHash, BTreeSet<PeerId>>,

这是一个以主题哈希为 key,B 树汇合为 value 的哈希表。针对每一个主题,节点都会生成一棵 B 树汇合,须要留神的是这棵 B 树有节点个数的最大最小值限度。每收到一个相干主题的音讯,节点都会音讯转发到对应的 B 树汇合节点上,因为节点是无限的,此时将大幅度缩小带宽的应用。

fanout 则是一种比拟非凡的网格,构造与 mesh 雷同:

    /// Map of topics to list of peers that we publish to, but don't subscribe to.
    fanout: HashMap<TopicHash, BTreeSet<PeerId>>,

fanout 记录的是公布了音讯然而没有进行订阅操作的主题与节点关系。即代表一个节点能够不订阅某个主题而间接发送与该主题相干的音讯。fanout 的构建只与 publish 无关。

fanout 和 topic_peers 是构建 mesh 的要害。topic_peers 的构造如下:

    /// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids.
    topic_peers: HashMap<TopicHash, BTreeSet<PeerId>>,

topic_peers 用来记录主题与已连贯的节点 ID 对应关系。
构建 mesh 时,首先从 fanout 中查找数据,通过某些特定的条件筛选出节点增加到 mesh;如果 mesh 以后节点数小于最小值,再通过 topic_peers 找出其余已知节点,实现 mesh 的构建。同时还将构建类型为 GRAFT 的音讯并向外流传。

Control Message

Control Message 是用来保护 gossip 的音讯。共有四种类型:GRAFT,PRUNE,IHAVE,IWANT。

GRAFT,意为嫁接、移植。在 gossip 中,如果一个节点 A 订阅了某个主题,那么会向四周节点公布 GRAFT 音讯,告诉它们把 A 退出到 mesh 中。接管到音讯后,四周节点会通过一系列的条件判断来决定是否 A 节点增加到 mesh。

PRUNE 与 GRAFT 正好相同,是从 mesh 中移除节点。PRUNE 的触发场景个别是在勾销订阅时,须要告知其余节点执行 mesh 移除的操作。

IHAVE 是以后节点向外播送,通知其余节点“我”的 mesh 有哪些主题以及这个 mesh 下所有已知的音讯。如果有节点对某个主题感兴趣,就须要回复一个 IWANT 音讯,单方能力进行信息的替换。

IWANT 代表须要一个或者多个主题对应的音讯。节点接管到 IWANT 音讯后,会从本地音讯缓存中查找对应的音讯 ID 并返回。须要留神的是,为了避免歹意的屡次申请,音讯查找有肯定的次数限度,这与 GossipConfig 的设置无关。

Message Cache

针对接管到的音讯,Gossip 提供全局的 Message Cache 进行缓存操作:

/// CacheEntry stored in the history.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CacheEntry {
    mid: MessageId,
    topic: TopicHash,
}

/// MessageCache struct holding history of messages.
#[derive(Clone)]
pub struct MessageCache {
    msgs: HashMap<MessageId, RawGossipsubMessage>,
    /// For every message and peer the number of times this peer asked for the message
    iwant_counts: HashMap<MessageId, HashMap<PeerId, u32>>,
    history: Vec<Vec<CacheEntry>>,
    /// The number of indices in the cache history used for gossipping. That means that a message
    /// won't get gossipped anymore when shift got called `gossip` many times after inserting the
    /// message in the cache.
    gossip: usize,
}

msgs 记录的是音讯 ID 与具体的音讯内容;iwant_counts 是下面提到的对音讯查找的拜访次数限度;history 记录了历史的缓存信息;gossip 代表最大可拜访到的历史音讯条数。

须要留神的是,Message Cache 的缓存会在 heartbeat 执行的时候进行清理。

    /// Shift the history array down one and delete messages associated with the
    /// last entry.
    pub fn shift(&mut self) {for entry in self.history.pop().expect("history is always > 1") {if let Some(msg) = self.msgs.remove(&entry.mid) {
                if !msg.validated {
                    // If GossipsubConfig::validate_messages is true, the implementing
                    // application has to ensure that Gossipsub::validate_message gets called for
                    // each received message within the cache timeout time."debug!("The message with id {} got removed from the cache without being validated.",
                        &entry.mid
                    );
                }
            }
            debug!("Remove message from the cache: {}", &entry.mid);

            self.iwant_counts.remove(&entry.mid);
        }

        // Insert an empty vec in position 0
        self.history.insert(0, Vec::new());
    }

首先将最早的历史音讯数组从 history 中取出,其中可能蕴含了一条或多条音讯,再从 msgs 中打消对应的 Gossip Message,并重置音讯的拜访次数。最初再往 history 中新增一条空记录,确保下一次执行 put 操作时不会呈现谬误状况。

Heartbeat

Heartbeat 保护了一个定期执行的心跳连贯过程,在固定的频率下向外发送音讯。性能上实现了几个成果:维持 mesh 和 fanout 的稳固,以及向外的播送。

保护 mesh

mesh 保护比拟好了解,因为 mesh 自身有上上限的限度。mesh 少了,从 topic_peers 中补充;多了就依据评分零碎执行相干的裁剪性能。此外,对于 mesh 中的节点还须要更新评分,以确保零碎更好的运行。

保护 fanout

前文有提到,fanout 的作用是记录没有订阅然而公布了音讯的主题。对于每一个公布了音讯的主题,有一个数据结构 fanout_last_pub 记录了公布的工夫:

    /// The last publish time for fanout topics.
    fanout_last_pub: HashMap<TopicHash, Instant>,

如果超过了 Gossip 预设的过期工夫,这条主题就会从 fanout 中删除。

fanout 的保护还与 peer_topics 无关。peer_topics 是一个关联节点与主题的数据结构:

    /// A map of all connected peers to their subscribed topics.
    peer_topics: HashMap<PeerId, BTreeSet<TopicHash>>,

如果某个节点存在于 fanout 的某个主题中,然而在 peer_topics 中发现该节点曾经不再关注此主题,就会被标记为须要移除。

fanout 自身有节点的上限数限度,与 mesh 一样,都是从 topic_peers 中找出节点进行补充。

向外播送

这个机制是为了让不在 mesh 和 fanout 中,却又订阅了相干主题的节点,也有机会接管到主题相干的音讯。并不是每个节点都肯定会收到音讯,随机选取节点的算法受到 Config 中的配置和关注主题的节点数两者独特影响。

首先将 mesh 和 fanout 合并成一个迭代器,从 Message Cache 中取出主题对应的音讯 ID,向 topic_peers 中随机选出的节点发送 IHAVE 音讯。

总结

Gossip 协定的货色比拟多,上述内容只是其中的一小部分介绍,更具体的代码可参阅 libp2p-rs 后续的版本公布,感激各位浏览。


Netwarps 由国内资深的云计算和分布式技术开发团队组成,该团队在金融、电力、通信及互联网行业有十分丰盛的落地教训。Netwarps 目前在深圳、北京均设立了研发核心,团队规模 30+,其中大部分为具备十年以上开发教训的技术人员,别离来自互联网、金融、云计算、区块链以及科研机构等业余畛域。

Netwarps 专一于平安存储技术产品的研发与利用,次要产品有去中心化文件系统(DFS)、去中心化计算平台(DCP),致力于提供基于去中心化网络技术实现的分布式存储和分布式计算平台,具备高可用、低功耗和低网络的技术特点,实用于物联网、工业互联网等场景。

公众号:Netwarps

正文完
 0