死磕以太坊源码剖析之 p2p 网络启动
p2p 源码目录
discover/ 基于 UDP 的节点发现 V4 协定
discv5/ 节点发现 V5 协定
enode/ 节点信息
enr/ 以太坊节点记录(ethereum node records)
nat/ 网络地址转换,用于内网穿透
netutil/
protocol/
simulations/ 本地 p2p 网络的模拟器
dial.go 建设连贯申请,以工作的模式
message.go 定义了读写的接口
metrics.go 计时器和计量器工具
peer.go 节点
protocol.go 子协定
rlpx.go 加密传输协定
server.go 底层 p2p 网络的函数入口
启动 p2p 网络
启动 p2p 网络次要会做以下几件事:
- 发现近程节点,建设相邻节点列表
- 监听近程节点发过来的建设 TCP 申请
- 向近程节点发送建设 TCP 连贯申请
首先找到 p2p 网络启动的入口:
Start()
start
函数次要做了以下 6 件事:
- 初始化 server 的字段
- 设置本地节点 setupLocalNode
- 设置监听 TCP 连贯申请 setupListening
- 设置节点发现(setupDiscovery)V4 版本
- 设置最大能够被动发动的连贯为 50/3
- srv.run(dialer) 发动建设 TCP 连贯申请
其中 setupLocalNode、setupListening、setupDiscovery、newDialState、srv.run(dialer)是咱们要重点剖析的函数。
设置本地节点
进入到 setupLocalNode 中:
①:创立 devp2p 握手
pubkey := crypto.FromECDSAPub(&srv.PrivateKey.PublicKey)
srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: pubkey[1:]}
for _, p := range srv.Protocols {srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
}
sort.Sort(capsByNameAndVersion(srv.ourHandshake.Caps))
握手协定包含协定版本号,节点名称和节点的公钥,存入到 Caps 中要依据名称和协定排序。
②:创立本地节点
db, err := enode.OpenDB(srv.Config.NodeDatabase)
if err != nil {return err}
srv.nodedb = db
srv.localnode = enode.NewLocalNode(db, srv.PrivateKey)
srv.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
// TODO: check conflicts
for _, p := range srv.Protocols {
for _, e := range p.Attributes {srv.localnode.Set(e)
}
}
首先从节点数据库中去获取节点信息,如果不存在则新建本地节点并设置默认 IP,同时将节点记录的协定特定信息存入到本地节点中。
设置监听
进入到setupListening
:
①:启动监听器
②:如果配置了 NAT,则更新本地节点记录并映射 TCP 监听端口
if tcp, ok := listener.Addr().(*net.TCPAddr); ok {srv.localnode.Set(enr.TCP(tcp.Port))
if !tcp.IP.IsLoopback() && srv.NAT != nil {srv.loopWG.Add(1)
go func() {nat.Map(srv.NAT, srv.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
srv.loopWG.Done()}()}
}
③:开启 P2P 监听,接管 inbound
连贯
srv.listenLoop()
这个函数须要进一步剖析:
次要有以下逻辑:
- 首先
defaultMaxPendingPeers
这个字段指的是inbound
和outbound
连贯,默认最大值为 50 -
将监听的连贯返回给
listener
fd, err = srv.listener.Accept()
-
获取监听的连贯的地址并查看这个连贯
remoteIP := netutil.AddrIP(fd.RemoteAddr()) if err := srv.checkInboundConn(fd, remoteIP); err != nil {.....}
checkInboundConn
次要是做了以下的判断:- 回绝不合乎 NetRestrict 的连贯(NetRestrict 是指曾经限定了某些连贯,除此之外会回绝)
- 回绝尝试过多的节点
-
最初真正建设连贯
go func() {srv.SetupConn(fd, inboundConn, nil)// 连贯建设过程(将连贯增加为 peer)slots <- struct{}{} }()
要留神 setupConn 的第三个字段传入的是 nil,示意还没有拨号,如果正在拨号的话须要节点公钥。
var dialPubkey *ecdsa.PublicKey if dialDest != nil {dialPubkey = new(ecdsa.PublicKey) if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {return errors.New("dial destination doesn't have a secp256k1 public key") } }
之后就是进行 RLPX(RLPX 会独自讲)握手
remotePubkey, err := c.doEncHandshake(srv.PrivateKey, dialPubkey)
如果 dialDest 不为 nil,查看公钥是否匹配,如果为 nil, 就从连贯中返回一个 node 进去
if dialDest != nil { // For dialed connections, check that the remote public key matches. // 对于拨号连贯,请查看近程公钥是否匹配 if dialPubkey.X.Cmp(remotePubkey.X) != 0 || dialPubkey.Y.Cmp(remotePubkey.Y) != 0 {return DiscUnexpectedIdentity} c.node = dialDest } else {c.node = nodeFromConn(remotePubkey, c.fd) }
接下来就是真正执行握手了 , 这部分也属于 RLPX,跳过
phs, err := c.doProtoHandshake(srv.ourHandshake)
之后要进行查看,如果胜利了的话,连贯就会作为节点被增加,并且启动了 runPeer.
到此为止,整个 listenLoop 就实现了。
设置节点发现
进入到srv.setupDiscovery()
①:增加特定于协定的发现源
added := make(map[string]bool)
for _, proto := range srv.Protocols {if proto.DialCandidates != nil && !added[proto.Name] {srv.discmix.AddSource(proto.DialCandidates)
added[proto.Name] = true
}
}
②:如果 DHT 禁用的话,就不要在 UDP 上监听
if srv.NoDiscovery && !srv.DiscoveryV5 {return nil}
③:监听给定的 socket 上的发现的包
ntab, err := discover.ListenUDP(conn, srv.localnode, cfg)
创立 DialState
dialstate 负责拨号和查找发现。
①:初始化 dialstate
s := &dialstate{
maxDynDials: maxdyn,
self: self,
netrestrict: cfg.NetRestrict,
log: cfg.Logger,
static: make(map[enode.ID]*dialTask),
dialing: make(map[enode.ID]connFlag),
bootnodes: make([]*enode.Node, len(cfg.BootstrapNodes)),
}
②:退出初始疏导节点
copy(s.bootnodes, cfg.BootstrapNodes)
if s.log == nil {s.log = log.Root()
}
③:退出动态节点
for _, n := range cfg.StaticNodes {s.addStatic(n)
}
bootnodes
是初始疏导节点,在节点没有接管到任何节点的连贯申请,也没有节点能够给咱们街坊节点的时候,就去连贯bootnodes
,它硬编码在了以太坊的源码中。
static
是动态节点,如果咱们想和某些节点放弃长期的连贯,就把它们退出到动态节点的列表中
接下来就是到了运行 p2p 网络的时候了,次要的函数是:go srv.run(dialer)
运行 p2p 网络
srv.run(dialer)
在 p2p 网络启动时候,咱们会监听近程节点发送过去的 TCP 申请,到了运行 p2p 网络的时候,咱们则会向近程节点发动 TCP 的连贯申请。首先咱们要晓得咱们所说的发动 TCP 连贯申请能够形容成拨号,每个拨号都是以工作的模式存在,进入到 srv.run(dialer)
剖析
整个函数就是一个循环,介绍下它的次要性能:
发动 TCP 连贯工作
scheduleTasks()
scheduleTasks
次要是从 queued task 中去获取工作,通过查问 dialer 以查找新工作并立刻启动尽可能多的工作,咱们这里要留神个变量maxActiveDialTasks
, 它的默认值为 16,而安顿工作的外围办法是:
nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
次要做了以下几件事:
①:为没有连贯的动态节点创立拨号工作
for id, t := range s.static {err := s.checkDial(t.dest, peers)
switch err {
case errNotWhitelisted, errSelf:
s.log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err)
delete(s.static, t.dest.ID())
case nil:
s.dialing[id] = t.flags
newtasks = append(newtasks, t)
}
}
首先对拨号节点进行校验:正在连接,曾经连贯,是自身,不在白名单中,最近连贯过的都会报错,并且不是在白名单中的和本身的节点会间接从动态节点列表中删除,校验通过的创立工作。
②:计算所需的动静拨号数
needDynDials := s.maxDynDials
for _, p := range peers {if p.rw.is(dynDialedConn) {needDynDials--}
}
for _, flag := range s.dialing {
if flag&dynDialedConn != 0 {needDynDials--}
}
咱们被动发动的 TCP 连贯申请是由节点最大连接数除以拨号比率得出的,即maxPeers/radio
,同时咱们会判断节点中是否曾经有建设了连贯的节点和正在拨号的节点,有的话会 needDynDials 会减去。
③:如果找不到任何的 peers, 就去随机找 bootnode,发动连贯
不过这个个别实用在 测试网或者私链。
if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && now.Sub(s.start) > fallbackInterval {bootnode := s.bootnodes[0]
s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...)
s.bootnodes = append(s.bootnodes, bootnode)
if addDial(dynDialedConn, bootnode) {needDynDials--}
}
④:从节点发现后果中创立动静拨号工作
如果不满足最大工作数量的话,就去 s.lookupBuf
中寻找,lookupBuf
通过 KAD 算法获取的节点。
for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {if addDial(dynDialedConn, s.lookupBuf[i]) {needDynDials--}
}
s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]
if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
s.lookupRunning = true
newtasks = append(newtasks, &discoverTask{want: needDynDials - len(s.lookupBuf)})
}
⑤:没有须要执行的工作,放弃拨号逻辑持续运行
if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {t := &waitExpireTask{s.hist.nextExpiry().Sub(now)}
newtasks = append(newtasks, t)
}
到此创立新工作完结,返回newTasks
执行 TCP 连贯工作
直到满足最大流动工作数才开始工作执行,具体的执行过程在以下代码:
startTasks := func(ts []task) (rest []task) {
i := 0
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {t := ts[i]
srv.log.Trace("New dial task", "task", t)
go func() { t.Do(srv); taskdone <- t }()
runningTasks = append(runningTasks, t)
}
return ts[i:]
}
t.Do(srv);
执行的次要工作包含上面几种:
- dialTask
- discoverTask
- waitExpireTask
最要害的就是dialTask
func (t *dialTask) Do(srv *Server) {if t.dest.Incomplete() {if !t.resolve(srv) {return}
}
err := t.dial(srv, t.dest)
if err != nil {srv.log.Trace("Dial error", "task", t, "err", err)
// Try resolving the ID of static nodes if dialing failed.
if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {if t.resolve(srv) {t.dial(srv, t.dest)
}
}
}
}
真正的连贯是在 t.dail
中做的:
// 理论的网络连接操作
func (t *dialTask) dial(srv *Server, dest *enode.Node) error {fd, err := srv.Dialer.Dial(dest)
if err != nil {return &dialError{err}
}
mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()})
return srv.SetupConn(mfd, t.flags, dest)
}
再往下面就没必要深究了,理论的网络连接操作到此为止了。
治理 TCP 连贯工作
在 TCP 连贯工作实现后,会对连贯有各种解决,如下:
①:进行 p2p 服务
case <-srv.quit:
break running
②:增加动态节点到 peer 列表
case n := <-srv.addstatic:
srv.log.Trace("Adding static node", "node", n)
dialstate.addStatic(n)
③:发送断开连接申请,并断开连接
case n := <-srv.removestatic:
dialstate.removeStatic(n)
if p, ok := peers[n.ID()]; ok {p.Disconnect(DiscRequested)
}
断开连接会立刻返回,并且不会等连贯敞开。
④:标记可信节点
case n := <-srv.addtrusted:
trusted[n.ID()] = true
⑤:从信赖节点中删除一个节点
case n := <-srv.removetrusted:
delete(trusted, n.ID())
⑥:拨号工作实现
case t := <-taskdone:newTasks
dialstate.taskDone(t, time.Now())
delTask(t)
⑦:连贯已通过加密握手, 近程身份是已知的(但尚未通过验证)
case c := <-srv.checkpointPostHandshake:
c.cont <- srv.postHandshakeChecks(peers, inboundCount, c)
⑧:连贯已通过协定握手,已知其性能并验证了近程身份
err := srv.addPeerChecks(peers, inboundCount, c)
if err == nil {
// 握手实现,所有查看结束
p := newPeer(srv.log, c, srv.Protocols)
// 启用了音讯事件就把 peerfeed 传给 peer
if srv.EnableMsgEvents {p.events = &srv.peerFeed}
name := truncateName(c.name)
p.RemoteAddr(), "peers", len(peers)+1, "name", name)
go srv.runPeer(p) // 重点
peers[c.node.ID()] = p
if p.Inbound() {inboundCount++}
if conn, ok := c.fd.(*meteredConn); ok {conn.handshakeDone(p)//
}
}
addPeerChecks
会删除没有匹配协定的连贯,并且会反复握手后查看,因为自执行这些查看后可能已更改。连贯通过握手后,将调用handshakeDone
⑨:Peer 断开连接
case pd := <-srv.delpeer:
d := common.PrettyDuration(mclock.Now() - pd.created)
pd.log.Debug("Removing p2p peer", "addr", pd.RemoteAddr(), "peers", len(peers)-1, "duration", d, "req", pd.requested, "err", pd.err)
delete(peers, pd.ID())
if pd.Inbound() {inboundCount--}
到此为止整个次要的解决 TCP 连贯的循环解说完结。
总结
- 开启 p2p 网络次要包含:设置本地节点,监听 TCP 连贯以及设置节点发现
- 运行 P2P 网络之后次要包含:发动 TCP 连贯并执行连贯,以及相干的连贯解决。