死磕以太坊源码剖析之p2p网络启动
p2p源码目录
discover/ 基于UDP的节点发现V4协定 discv5/ 节点发现V5协定 enode/ 节点信息 enr/ 以太坊节点记录(ethereum node records) nat/ 网络地址转换,用于内网穿透 netutil/ protocol/ simulations/ 本地p2p网络的模拟器 dial.go 建设连贯申请,以工作的模式 message.go 定义了读写的接口 metrics.go 计时器和计量器工具 peer.go 节点 protocol.go 子协定 rlpx.go 加密传输协定 server.go 底层p2p网络的函数入口
启动p2p网络
启动p2p网络次要会做以下几件事:
- 发现近程节点,建设相邻节点列表
- 监听近程节点发过来的建设TCP申请
- 向近程节点发送建设TCP连贯申请
首先找到p2p网络启动的入口:
Start()
start
函数次要做了以下6件事:
- 初始化server的字段
- 设置本地节点setupLocalNode
- 设置监听TCP连贯申请setupListening
- 设置节点发现(setupDiscovery)V4版本
- 设置最大能够被动发动的连贯为50/3
- srv.run(dialer) 发动建设TCP连贯申请
其中setupLocalNode、setupListening、setupDiscovery、newDialState、srv.run(dialer)是咱们要重点剖析的函数。
设置本地节点
进入到setupLocalNode中:
①:创立devp2p握手
pubkey := crypto.FromECDSAPub(&srv.PrivateKey.PublicKey) srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: pubkey[1:]} for _, p := range srv.Protocols { srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap()) }sort.Sort(capsByNameAndVersion(srv.ourHandshake.Caps))
握手协定包含协定版本号,节点名称和节点的公钥,存入到Caps中要依据名称和协定排序。
②:创立本地节点
db, err := enode.OpenDB(srv.Config.NodeDatabase) if err != nil { return err } srv.nodedb = db srv.localnode = enode.NewLocalNode(db, srv.PrivateKey) srv.localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) // TODO: check conflicts for _, p := range srv.Protocols { for _, e := range p.Attributes { srv.localnode.Set(e) } }
首先从节点数据库中去获取节点信息,如果不存在则新建本地节点并设置默认IP,同时将节点记录的协定特定信息存入到本地节点中。
设置监听
进入到setupListening
:
①:启动监听器
②:如果配置了NAT,则更新本地节点记录并映射TCP监听端口
if tcp, ok := listener.Addr().(*net.TCPAddr); ok { srv.localnode.Set(enr.TCP(tcp.Port)) if !tcp.IP.IsLoopback() && srv.NAT != nil { srv.loopWG.Add(1) go func() { nat.Map(srv.NAT, srv.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p") srv.loopWG.Done() }() } }
③:开启P2P监听,接管inbound
连贯
srv.listenLoop()
这个函数须要进一步剖析:
次要有以下逻辑:
- 首先
defaultMaxPendingPeers
这个字段指的是inbound
和outbound
连贯,默认最大值为50 将监听的连贯返回给
listener
fd, err = srv.listener.Accept()
获取监听的连贯的地址并查看这个连贯
remoteIP := netutil.AddrIP(fd.RemoteAddr())if err := srv.checkInboundConn(fd, remoteIP); err != nil { .....}
checkInboundConn
次要是做了以下的判断:- 回绝不合乎NetRestrict的连贯(NetRestrict是指曾经限定了某些连贯,除此之外会回绝)
- 回绝尝试过多的节点
最初真正建设连贯
go func() { srv.SetupConn(fd, inboundConn, nil)// 连贯建设过程(将连贯增加为peer) slots <- struct{}{} }()
要留神setupConn的第三个字段传入的是nil,示意还没有拨号,如果正在拨号的话须要节点公钥。
var dialPubkey *ecdsa.PublicKey if dialDest != nil { dialPubkey = new(ecdsa.PublicKey) if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil { return errors.New("dial destination doesn't have a secp256k1 public key") } }
之后就是进行RLPX(RLPX会独自讲)握手
remotePubkey, err := c.doEncHandshake(srv.PrivateKey, dialPubkey)
如果dialDest 不为nil,查看公钥是否匹配,如果为nil,就从连贯中返回一个node进去
if dialDest != nil { // For dialed connections, check that the remote public key matches. //对于拨号连贯,请查看近程公钥是否匹配 if dialPubkey.X.Cmp(remotePubkey.X) != 0 || dialPubkey.Y.Cmp(remotePubkey.Y) != 0 { return DiscUnexpectedIdentity } c.node = dialDest } else { c.node = nodeFromConn(remotePubkey, c.fd) }
接下来就是真正执行握手了 ,这部分也属于RLPX,跳过
phs, err := c.doProtoHandshake(srv.ourHandshake)
之后要进行查看,如果胜利了的话,连贯就会作为节点被增加,并且启动了runPeer.
到此为止,整个listenLoop 就实现了。
设置节点发现
进入到srv.setupDiscovery()
①:增加特定于协定的发现源
added := make(map[string]bool) for _, proto := range srv.Protocols { if proto.DialCandidates != nil && !added[proto.Name] { srv.discmix.AddSource(proto.DialCandidates) added[proto.Name] = true } }
②:如果DHT禁用的话,就不要在UDP上监听
if srv.NoDiscovery && !srv.DiscoveryV5 { return nil }
③:监听给定的socket 上的发现的包
ntab, err := discover.ListenUDP(conn, srv.localnode, cfg)
创立DialState
dialstate负责拨号和查找发现。
①:初始化dialstate
s := &dialstate{ maxDynDials: maxdyn, self: self, netrestrict: cfg.NetRestrict, log: cfg.Logger, static: make(map[enode.ID]*dialTask), dialing: make(map[enode.ID]connFlag), bootnodes: make([]*enode.Node, len(cfg.BootstrapNodes)),}
②:退出初始疏导节点
copy(s.bootnodes, cfg.BootstrapNodes) if s.log == nil { s.log = log.Root() }
③: 退出动态节点
for _, n := range cfg.StaticNodes { s.addStatic(n) }
bootnodes
是初始疏导节点,在节点没有接管到任何节点的连贯申请,也没有节点能够给咱们街坊节点的时候,就去连贯bootnodes
,它硬编码在了以太坊的源码中。
static
是动态节点,如果咱们想和某些节点放弃长期的连贯,就把它们退出到动态节点的列表中
接下来就是到了运行p2p网络的时候了,次要的函数是:go srv.run(dialer)
运行p2p网络
srv.run(dialer)
在p2p网络启动时候,咱们会监听近程节点发送过去的TCP申请,到了运行p2p网络的时候,咱们则会向近程节点发动TCP的连贯申请。首先咱们要晓得咱们所说的发动TCP连贯申请能够形容成拨号,每个拨号都是以工作的模式存在,进入到srv.run(dialer)
剖析
整个函数就是一个循环,介绍下它的次要性能:
发动TCP连贯工作
scheduleTasks()
scheduleTasks
次要是从queued task 中去获取工作,通过查问dialer以查找新工作并立刻启动尽可能多的工作,咱们这里要留神个变量maxActiveDialTasks
,它的默认值为16 ,而安顿工作的外围办法是:
nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
次要做了以下几件事:
①:为没有连贯的动态节点创立拨号工作
for id, t := range s.static { err := s.checkDial(t.dest, peers) switch err { case errNotWhitelisted, errSelf: s.log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err) delete(s.static, t.dest.ID()) case nil: s.dialing[id] = t.flags newtasks = append(newtasks, t) } }
首先对拨号节点进行校验:正在连接,曾经连贯,是自身,不在白名单中,最近连贯过的都会报错,并且不是在白名单中的和本身的节点会间接从动态节点列表中删除,校验通过的创立工作。
②:计算所需的动静拨号数
needDynDials := s.maxDynDials for _, p := range peers { if p.rw.is(dynDialedConn) { needDynDials-- } } for _, flag := range s.dialing { if flag&dynDialedConn != 0 { needDynDials-- } }
咱们被动发动的TCP连贯申请是由节点最大连接数除以拨号比率得出的,即maxPeers/radio
,同时咱们会判断节点中是否曾经有建设了连贯的节点和正在拨号的节点,有的话会needDynDials会减去。
③:如果找不到任何的peers,就去随机找bootnode,发动连贯
不过这个个别实用在测试网或者私链。
if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && now.Sub(s.start) > fallbackInterval { bootnode := s.bootnodes[0] s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...) s.bootnodes = append(s.bootnodes, bootnode) if addDial(dynDialedConn, bootnode) { needDynDials-- } }
④:从节点发现后果中创立动静拨号工作
如果不满足最大工作数量的话,就去s.lookupBuf
中寻找,lookupBuf
通过KAD算法获取的节点。
for ; i < len(s.lookupBuf) && needDynDials > 0; i++ { if addDial(dynDialedConn, s.lookupBuf[i]) { needDynDials-- } } s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]if len(s.lookupBuf) < needDynDials && !s.lookupRunning { s.lookupRunning = true newtasks = append(newtasks, &discoverTask{want: needDynDials - len(s.lookupBuf)}) }
⑤:没有须要执行的工作,放弃拨号逻辑持续运行
if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 { t := &waitExpireTask{s.hist.nextExpiry().Sub(now)} newtasks = append(newtasks, t) }
到此创立新工作完结,返回newTasks
执行TCP连贯工作
直到满足最大流动工作数才开始工作执行,具体的执行过程在以下代码:
startTasks := func(ts []task) (rest []task) { i := 0 for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { t := ts[i] srv.log.Trace("New dial task", "task", t) go func() { t.Do(srv); taskdone <- t }() runningTasks = append(runningTasks, t) } return ts[i:] }
t.Do(srv);
执行的次要工作包含上面几种:
- dialTask
- discoverTask
- waitExpireTask
最要害的就是dialTask
func (t *dialTask) Do(srv *Server) { if t.dest.Incomplete() { if !t.resolve(srv) { return } } err := t.dial(srv, t.dest) if err != nil { srv.log.Trace("Dial error", "task", t, "err", err) // Try resolving the ID of static nodes if dialing failed. if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 { if t.resolve(srv) { t.dial(srv, t.dest) } } }}
真正的连贯是在t.dail
中做的:
// 理论的网络连接操作func (t *dialTask) dial(srv *Server, dest *enode.Node) error { fd, err := srv.Dialer.Dial(dest) if err != nil { return &dialError{err} } mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()}) return srv.SetupConn(mfd, t.flags, dest)}
再往下面就没必要深究了,理论的网络连接操作到此为止了。
治理TCP连贯工作
在TCP连贯工作实现后,会对连贯有各种解决,如下:
①:进行p2p服务
case <-srv.quit:break running
②:增加动态节点到peer列表
case n := <-srv.addstatic:srv.log.Trace("Adding static node", "node", n)dialstate.addStatic(n)
③:发送断开连接申请,并断开连接
case n := <-srv.removestatic:dialstate.removeStatic(n) if p, ok := peers[n.ID()]; ok { p.Disconnect(DiscRequested) }
断开连接会立刻返回,并且不会等连贯敞开。
④:标记可信节点
case n := <-srv.addtrusted:trusted[n.ID()] = true
⑤:从信赖节点中删除一个节点
case n := <-srv.removetrusted:delete(trusted, n.ID())
⑥:拨号工作实现
case t := <-taskdone:newTasksdialstate.taskDone(t, time.Now())delTask(t)
⑦:连贯已通过加密握手,近程身份是已知的(但尚未通过验证)
case c := <-srv.checkpointPostHandshake:c.cont <- srv.postHandshakeChecks(peers, inboundCount, c)
⑧:连贯已通过协定握手,已知其性能并验证了近程身份
err := srv.addPeerChecks(peers, inboundCount, c) if err == nil { // 握手实现,所有查看结束 p := newPeer(srv.log, c, srv.Protocols) //启用了音讯事件就把peerfeed传给peer if srv.EnableMsgEvents { p.events = &srv.peerFeed } name := truncateName(c.name) p.RemoteAddr(), "peers", len(peers)+1, "name", name) go srv.runPeer(p) // 重点 peers[c.node.ID()] = p if p.Inbound() { inboundCount++ } if conn, ok := c.fd.(*meteredConn); ok { conn.handshakeDone(p)// } }
addPeerChecks
会删除没有匹配协定的连贯,并且会反复握手后查看,因为自执行这些查看后可能已更改。连贯通过握手后,将调用handshakeDone
⑨:Peer断开连接
case pd := <-srv.delpeer:d := common.PrettyDuration(mclock.Now() - pd.created) pd.log.Debug("Removing p2p peer", "addr", pd.RemoteAddr(), "peers", len(peers)-1, "duration", d, "req", pd.requested, "err", pd.err) delete(peers, pd.ID()) if pd.Inbound() { inboundCount-- }
到此为止整个次要的解决TCP连贯的循环解说完结。
总结
- 开启p2p网络次要包含:设置本地节点,监听TCP连贯以及设置节点发现
- 运行P2P网络之后次要包含:发动TCP连贯并执行连贯,以及相干的连贯解决。