关于区块链:死磕以太坊源码分析之downloader同步

34次阅读

共计 23070 个字符,预计需要花费 58 分钟才能阅读完成。

死磕以太坊源码剖析之 downloader 同步

须要配合正文代码看:https://github.com/blockchain…

这篇文章篇幅较长,能看上来的是条汉子,倡议珍藏

心愿读者在浏览过程中,指出问题,给个关注,一起探讨。

概览

downloader 模块的代码位于 eth/downloader 目录下。次要的性能代码别离是:

  • downloader.go:实现了区块同步逻辑
  • peer.go:对区块各个阶段的组装,上面的各个FetchXXX 就是很依赖这个模块。
  • queue.go:对 eth/peer.go 的封装
  • statesync.go:同步 state 对象

同步模式

### full sync

full 模式会在数据库中保留所有区块数据,同步时从近程节点同步 header 和 body 数据,而 state 和 receipt 数据则是在本地计算出来的。

在 full 模式下,downloader 会同步区块的 header 和 body 数据组成一个区块,而后通过 blockchain 模块的 BlockChain.InsertChain 向数据库中插入区块。在 BlockChain.InsertChain 中,会一一计算和验证每个块的 staterecepit 等数据,如果一切正常就将区块数据以及本人计算失去的 staterecepit 数据一起写入到数据库中。

fast sync

fast 模式下,recepit 不再由本地计算,而是和区块数据一样,间接由 downloader 从其它节点中同步;state 数据并不会全副计算和下载,而是选一个较新的区块(称之为 pivot)的 state 进行下载,以这个区块为分界,之前的区块是没有 state 数据的,之后的区块会像 full 模式下一样在本地计算 state。因而在 fast 模式下,同步的数据除了 header 和 body,还有 receipt,以及 pivot 区块的 state

因而 fast 模式疏忽了大部分 state 数据,并且应用网络间接同步 receipt 数据的形式替换了 full 模式下的本地计算,所以比拟快。

light sync

light 模式也叫做轻模式,它只对区块头进行同步,而不同步其它的数据。

SyncMode:

  • FullSync: 从残缺区块同步整个区块链历史
  • FastSync: 疾速下载题目,仅在链头处齐全同步
  • LightSync: 仅下载题目,而后终止

区块下载流程

图片只是大略的形容一下,理论还是要联合代码,所有区块链相干文章合集,https://github.com/blockchain…

同时心愿结识更多区块链圈子的人,能够 star 下面我的项目,继续更新

首先依据 Synchronise 开始区块同步,通过 findAncestor 找到指定节点的独特先人,并在此高度进行同步,同时开启多个 goroutine 同步不同的数据:headerreceiptbody。如果同步高度为 100 的区块,必须先 header 同步胜利同步实现才能够唤醒 bodyreceipts的同步。

而每个局部的同步大抵都是由 FetchParts 来实现的,外面蕴含了各个 Chan 的配合,也会波及不少的回调函数,总而言之多读几遍每次都会有不同的了解。接下来就逐渐剖析这些要害内容。


synchronise

①:确保对方的 TD 高于咱们本人的 TD

currentBlock := pm.blockchain.CurrentBlock()
    td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
    pHead, pTd := peer.Head()
    if pTd.Cmp(td) <= 0 {return}

②:开启 downloader 的同步

pm.downloader.Synchronise(peer.id, pHead, pTd, mode)

进入函数:次要做了以下几件事:

  1. d.synchronise(id, head, td, mode):同步过程
  2. 谬误日志输入,并删除此peer

进入到 d.synchronise,走到最初一步d.syncWithPeer(p, hash, td) 真正开启同步。

func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
  ...
  return d.syncWithPeer(p, hash, td)
}

syncWithPeer 大略做了以下几件事:

  1. 查找先人findAncestor
  2. 开启独自 goroutine 别离运行以下几个函数:

    • fetchHeaders
    • processHeaders
    • fetchbodies
    • fetchReceipts
    • processFastSyncContent
    • processFullSyncContent

接下来的文章,以及整个 Downloader 模块次要内容就是围绕这几个局部进行开展。


findAncestor

同步首要的是 确定同步区块的区间 :顶部为近程节点的最高区块,底部为两个节点都领有的雷同区块的最高高度(先人区块)。findAncestor 就是用来找先人区块。函数剖析如下:

①:确定本地高度和近程节点的最高高度

var (floor        = int64(-1) // 底部
        localHeight  uint64  // 本地最高高度
        remoteHeight = remoteHeader.Number.Uint64() // 近程节点最高高度)
switch d.mode {
    case FullSync:
        localHeight = d.blockchain.CurrentBlock().NumberU64()
    case FastSync:
        localHeight = d.blockchain.CurrentFastBlock().NumberU64()
    default:
        localHeight = d.lightchain.CurrentHeader().Number.Uint64()
    }

②:计算同步的高度区间和距离

from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight) 
  • from::示意从哪个高度开始获取区块
  • count:示意从近程节点获取多少个区块
  • skip:示意距离,比方skip 为 2,获取第一个高度为 5,则第二个就是 8
  • max:示意最大高度

③:发送获取 header 的申请

go p.peer.RequestHeadersByNumber(uint64(from), count, skip, false)

④:解决下面申请接管到的header :case packet := <-d.headerCh

  1. 抛弃掉不是来自咱们申请节的内容
  2. 确保返回的 header 数量不为空
  3. 验证返回的 headers 的高度是咱们所申请的
  4. 查看是否找到独特先人
//----①
if packet.PeerId() != p.id {log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
                break
            }
//-----②
headers := packet.(*headerPack).headers
            if len(headers) == 0 {p.log.Warn("Empty head header set")
        return 0
      }
//-----③
for i, header := range headers {expectNumber := from + int64(i)*int64(skip+1)
                if number := header.Number.Int64(); number != expectNumber { // 验证这些返回的 header 是否是咱们下面申请的 headers
                    p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number)
                    return 0, errInvalidChain
                }
            }
//-----④
// 查看是否找到独特先人
            finished = true
            // 留神这里是从 headers 最初一个元素开始查找,也就是高度最高的区块。for i := len(headers) - 1; i >= 0; i-- {
                // 跳过不在咱们申请的高度区间内的区块
                if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > max {continue}
                // // 查看咱们本地是否曾经有某个区块了,如果有就算是找到了独特先人,// 并将独特先人的哈希和高度设置在 number 和 hash 变量中。h := headers[i].Hash()
                n := headers[i].Number.Uint64()

        
        

⑤:如果通过固定距离法找到了独特先人则返回先人,会对其高度与 floor 变量进行验证, floor 变量代表的是独特先人的高度的最小值, 如果找到独特先人的高度比这个值还小,就认为是两个节点之间分叉太大了,不再容许进行同步。如果一切正常,就返回找到的独特先人的高度 number 变量。

if hash != (common.Hash{}) {if int64(number) <= floor {return 0, errInvalidAncestor}
        return number, nil
    }

⑥:如果固定距离法没有找到先人则通过二分法来查找先人,这部分能够思维跟二分法算法相似,有趣味的能够细看。


queue 详解

queue对象和 Downloader 对象是相互作用的,Downloader的很多性能离不开他,接下来咱们介绍一下这部分内容,然而本节,能够后行跳过 ,等到了浏览上面的对于Queue 调用的一些函数局部再回过来浏览这部分解说。

queue 构造体

type queue struct {
  mode SyncMode // 同步模式
  
  // header 解决相干
  headerHead      common.Hash   // 最初一个排队的标头的哈希值以验证程序
  headerTaskPool  map[uint64]*types.Header  // 待处理的标头检索工作,将起始索引映射到框架标头
  headerTaskQueue *prque.Prque  // 骨架索引的优先级队列,以获取用于的填充标头
  headerPeerMiss map[string]map[uint64]struct{} // 已知不可用的对等头批处理集
  headerPendPool map[string]*fetchRequest // 以后挂起的头检索操作
  headerResults []*types.Header // 后果缓存累积实现的头
  headerProced int // 从后果中拿进去曾经解决的 header
  headerContCh chan bool //header 下载实现时告诉的频道
  
  blockTaskPool  map[common.Hash]*types.Header // 待处理的块(body)检索工作,将哈希映射到 header
  blockTaskQueue *prque.Prque // 标头的优先级队列, 以用于获取块(bodies)blockPendPool map[string]*fetchRequest // 以后的正在解决的块(body)检索操作
  blockDonePool map[common.Hash]struct{} // 曾经实现的块(body)
  
    receiptTaskPool map[common.Hash]*types.Header // 待处理的收据检索工作,将哈希映射到 header
    receiptTaskQueue *prque.Prque // 标头的优先级队列, 以用于获取收据
    receiptPendPool map[string]*fetchRequest // 以后的正在解决的收据检索操作
    receiptDonePool map[common.Hash]struct{} // 曾经实现的收据
    
    resultCache []*fetchResult // 下载但尚未交付获取后果
    resultOffset uint64 // 区块链中第一个缓存的获取后果的偏移量
    resultSize common.StorageSize // 块的近似大小

    lock   *sync.Mutex
    active *sync.Cond
    closed bool
  
}

次要细分性能

数据下载开始安顿工作

  • ScheduleSkeleton:将一批 header 检索工作增加到队列中,以填充已检索的header skeleton
  • Schedule:用来筹备对一些 bodyreceipt 数据的下载

数据下载中的各类状态

  • pending

    pending示意待检索的 XXX 申请的数量,包含了:PendingHeadersPendingBlocksPendingReceipts,别离都是对应取 XXXTaskQueue 的长度。

  • InFlight

    InFlight示意是否有正在获取 XXX 的申请,包含:InFlightHeadersInFlightBlocksInFlightReceipts,都是通过判断len(q.receiptPendPool) > 0 来确认。

  • ShouldThrottle

    ShouldThrottle示意查看是否应该限度下载 XXX,包含:ShouldThrottleBlocksShouldThrottleReceipts,次要是为了避免下载过程中本地内存占用过大。

  • Reserve

    Reserve通过结构一个 fetchRequest 构造并返回,向调用者提供指定数量的待下载的数据的信息(queue 外部会将这些数据标记为「正在下载」)。调用者应用返回的 fetchRequest 数据向近程节点发动新的获取数据的申请。包含:ReserveHeadersReserveBodiesReserveReceipts

  • Cancel

    Cance用来吊销对 fetchRequest 构造中的数据的下载(queue 外部会将这些数据从新从「正在下载」的状态更改为「期待下载」)。包含:CancelHeadersCancelBodiesCancelReceipts

  • expire

    expire查看正在执行中的申请是否超过了超时限度,包含:ExpireHeadersExpireBodiesExpireReceipts

  • Deliver

    当有数据下载胜利时,调用者会应用 deliver 性能用来告诉 queue 对象。包含:DeliverHeadersDeliverBodiesDeliverReceipts

数据下载实现获取区块数据

  • RetrieveHeaders
    在填充 skeleton 实现后,queue.RetrieveHeaders 用来获取整个 skeleton 中的所有 header
  • Results
    queue.Results 用来获取以后的 headerbodyreceipt(只在 fast 模式下)都已下载胜利的区块(并将这些区块从 queue 外部移除)

函数实现

ScheduleSkeleton

queue.ScheduleSkeleton 次要是为了填充 skeleton,它的参数是要下载区块的起始高度和所有 skeleton 区块头,最外围的内容则是上面这段循环:

func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
    ......
    for i, header := range skeleton {index := from + uint64(i*y)
        q.headerTaskPool[index] = header
        q.headerTaskQueue.Push(index, -int64(index))
    }
}

假如已确定须要下载的区块高度区间是从 10 到 46,MaxHeaderFetch 的值为 10,那么这个高度区块就会被分成 3 组:10 – 19,20 – 29,30 – 39,而 skeleton 则别离由高度为 19、29、39 的区块头组成。循环中的 index 变量实际上是每一组区块中的第一个区块的高度(比方 10、20、30),queue.headerTaskPool 实际上是一个 每一组区块中第一个区块的高度到最初一个区块的 header 的映射

headerTaskPool = {
  10: headerOf_19,
    20: headerOf_20,
    30: headerOf_39,
}

ReserveHeaders

reserve 用来获取可下载的数据。

reserve  = func(p *peerConnection, count int) (*fetchRequest, bool, error) {return d.queue.ReserveHeaders(p, count), false, nil
        }
func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {if _, ok := q.headerPendPool[p.id]; ok {return nil} //①
  ...
  send, skip := uint64(0), []uint64{}
    for send == 0 && !q.headerTaskQueue.Empty() {from, _ := q.headerTaskQueue.Pop()
        if q.headerPeerMiss[p.id] != nil {if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {skip = append(skip, from.(uint64))
                continue
            }
        }
        send = from.(uint64) // ②
    }
  
 ...
  for _, from := range skip {q.headerTaskQueue.Push(from, -int64(from))
    } // ③
  ...
  request := &fetchRequest{
        Peer: p,
        From: send,
        Time: time.Now(),}
    q.headerPendPool[p.id] = request // ④
  
}

①:依据 headerPendPool 来判断近程节点是否正在下载数据信息。

②:从 headerTaskQueue 取出值作为本次申请的起始高度,赋值给 send 变量,在这个过程中会排除 headerPeerMiss 所记录的节点下载数据失败的信息。

③:将失败的工作再从新写回task queue

④:利用 send 变量结构 fetchRequest 构造,此构造是用来作为 FetchHeaders 来应用的:

fetch = func(p *peerConnection, req *fetchRequest) error {return p.FetchHeaders(req.From, MaxHeaderFetch) 
}

至此,ReserveHeaders会从工作队列里抉择最小的起始高度并结构 fetchRequest 传递给 fetch 获取数据。


DeliverHeaders

deliver = func(packet dataPack) (int, error) {pack := packet.(*headerPack)
            return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
        }

①:如果发现下载数据的节点没有在 queue.headerPendPool 中,就间接返回谬误;否则就持续解决,并将节点记录从 queue.headerPendPool 中删除。

request := q.headerPendPool[id]
    if request == nil {return 0, errNoFetchesPending}
    headerReqTimer.UpdateSince(request.Time)
    delete(q.headerPendPool, id)

②:验证headers

包含三方面验证:

  1. 查看起始区块的高度和哈希
  2. 查看高度的连接性
  3. 查看哈希的连接性
if accepted {
        // 查看起始区块的高度和哈希
        if headers[0].Number.Uint64() != request.From {
            ...
            accepted = false
        } else if headers[len(headers)-1].Hash() != target {
            ...
            accepted = false
        }
    }
    if accepted {for i, header := range headers[1:] {hash := header.Hash() // 查看高度的连接性
            if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {...}
            if headers[i].Hash() != header.ParentHash { // 查看哈希的连接性
                ...
            }
        }
    }

③:将有效数据存入headerPeerMiss,并将这组区块起始高度从新放入headerTaskQueue

if !accepted {
    ...
        miss := q.headerPeerMiss[id]
        if miss == nil {q.headerPeerMiss[id] = make(map[uint64]struct{})
            miss = q.headerPeerMiss[id]
        }
        miss[request.From] = struct{}{}
        q.headerTaskQueue.Push(request.From, -int64(request.From))
        return 0, errors.New("delivery not accepted")
    }

④:保留数据,并告诉 headerProcCh 解决新的header

if ready > 0 {process := make([]*types.Header, ready)
        copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
        select {
        case headerProcCh <- process:
            q.headerProced += len(process)
        default:
        }
    }

⑤:发送音讯给.headerContCh,告诉skeleton 都被下载完了

if len(q.headerTaskPool) == 0 {q.headerContCh <- false}

DeliverHeaders 会对数据进行测验和保留,并发送 channel 音讯给 Downloader.processHeadersDownloader.fetchPartswakeCh 参数。


Schedule

processHeaders在解决 header 数据的时候,会调用queue.Schedule 为下载 bodyreceipt 作筹备。

inserts := d.queue.Schedule(chunk, origin)
func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {inserts := make([]*types.Header, 0, len(headers))
    for _, header := range headers {
    // 校验
    ...
        q.blockTaskPool[hash] = header
        q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))

        if q.mode == FastSync {q.receiptTaskPool[hash] = header
            q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
        }
        inserts = append(inserts, header)
        q.headerHead = hash
        from++
    }
    return inserts
}

这个函数次要就是将信息写入到 body 和 receipt 队列,期待调度。


ReserveBody&Receipt

queue 中筹备好了 bodyreceipt 相干的数据,processHeaders最初一段,是唤醒下载 Bodyies 和 Receipts 的要害代码,会告诉 fetchBodiesfetchReceipts 能够对各自的数据进行下载了。

for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
                select {
                case ch <- true:
                default:
                }
            }

fetchXXX 会调用fetchParts,逻辑相似下面的的,reserve 最终则会调用reserveHeadersdeliver 最终调用的是 queue.deliver.

先来剖析reserveHeaders

①:如果没有可解决的工作,间接返回

if taskQueue.Empty() {return nil, false, nil}

②:如果参数给定的节点正在下载数据,返回

 if _, ok := pendPool[p.id]; ok {return nil, false, nil}

③:计算 queue 对象中的缓存空间还能够包容多少条数据

space := q.resultSlots(pendPool, donePool)

④:从「task queue」中顺次取出工作进行解决

次要实现以下性能:

  • 计算以后 header 在 queue.resultCache 中的地位,而后填充 queue.resultCache 中相应地位的元素
  • 解决空区块的状况,若为空不下载。
  • 解决近程节点短少这个以后区块数据的状况,如果发现这个节点已经下载以后数据失败过,就不再让它下载了。

留神:resultCache 字段用来记录所有正在被解决的数据的处理结果,它的元素类型是 fetchResult。它的 Pending 字段代表以后区块还有几类数据须要下载。这里须要下载的数据最多有两类:body 和 receipt,full 模式下只须要下载 body 数据,而 fast 模式要多下载一个 receipt 数据。

for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {header := taskQueue.PopItem().(*types.Header)
        hash := header.Hash()
        index := int(header.Number.Int64() - int64(q.resultOffset))
        if index >= len(q.resultCache) || index < 0 {....}
        if q.resultCache[index] == nil {
            components := 1
            if q.mode == FastSync {components = 2}
            q.resultCache[index] = &fetchResult{
                Pending: components,
                Hash:    hash,
                Header:  header,
            }
        }
  
        if isNoop(header) {donePool[hash] = struct{}{}
            delete(taskPool, hash)

            space, proc = space-1, proc-1
            q.resultCache[index].Pending--
            progress = true
            continue
        }
        if p.Lacks(hash) {skip = append(skip, header)
        } else {send = append(send, header)
        }
    }

最初就是结构 fetchRequest 构造并返回。


DeliverBodies&Receipts

bodyreceipt 数据都曾经通过 reserve 操作结构了 fetchRequest 构造并传给 fetch,接下来就是期待数据的达到, 数据下载胜利后,会调用 queue 对象的 deliver 办法进行传递,包含 queue.DeliverBodiesqueue.DeliverReceipts。这两个办法都以不同的参数调用了 queue.deliver 办法:

①:如果下载的数据数量为 0,则把所有此节点此次下载的数据标记为「缺失」

if results == 0 {
        for _, header := range request.Headers {request.Peer.MarkLacking(header.Hash())
        }
    }

②:循环解决数据,通过调用reconstruct 填充 resultCache[index] 中的相应的字段

for i, header := range request.Headers {
  ...
  if err := reconstruct(header, i, q.resultCache[index]); err != nil {
            failure = err
            break
        }
}

③:验证resultCache 中的数据,其对应的 request.Headers 中的 header 都应为 nil,若不是则阐明验证未通过,须要如果到 task queue 从新下载

for _, header := range request.Headers {
        if header != nil {taskQueue.Push(header, -int64(header.Number.Uint64()))
        }
    }

④:如果有数据被验证通过且写入 queue.resultCache 中了(accepted > 0),发送 queue.active 音讯。Results 会期待这这个信号。


Results

当 (header、body、receipt) 都下载完,就要将区块写入到数据库了,queue.Results 就是用来返回所有目前曾经下载实现的数据,它在 Downloader.processFullSyncContentDownloader.processFastSyncContent 中被调用。代码比较简单就不多说了。

到此为止 queue 对象就剖析的差不多了。


同步 headers

fetchHeaders

同步 headers 是是由函数fetchHeaders 来实现的。

fetchHeaders的大抵思维:

同步 header 的数据会被填充到skeleton,每次从近程节点获取区块数据最大为MaxHeaderFetch(192),所以要获取的区块数据如果大于 192,会被分成组,每组MaxHeaderFetch,残余的有余 192 个的不会填充进skeleton,具体步骤如下图所示:

此种形式能够 防止从同一节点下载过多谬误数据 ,如果咱们连贯到了一个歹意节点,它能够发明一个链条很长且TD 值也十分高的区块链数据。如果咱们的区块从 0 开始全副从它那同步,也就下载了一些基本不被他人抵赖的数据。如果我只从它那同步 MaxHeaderFetch 个区块,而后发现这些区块无奈正确填充我之前的 skeleton(可能是 skeleton 的数据错了,或者用来填充 skeleton 的数据错了),就会丢掉这些数据。

接下来查看下代码如何实现:

①:发动获取 header 的申请

如果是下载skeleton,则会从高度 from+MaxHeaderFetch-1 开始(包含),每隔 MaxHeaderFetch-1 的高度申请一个 header,最多申请 MaxSkeletonSize 个。如果不是的话,则要获取残缺的headers

②:期待并解决 headerCh 中的 header 数据

2.1 确保近程节点正在返回咱们须要填充 skeleton 所需的header

if packet.PeerId() != p.id {log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId())
                break
            }

2.2 如果 skeleton 曾经下载结束,则须要持续填充skeleton

if packet.Items() == 0 && skeleton {
                skeleton = false
                getHeaders(from)
                continue
            }

2.3 整个 skeleton 填充实现,并且没有要获取的 header 了,要告诉 headerProcCh 全副实现

if packet.Items() == 0 {
                // 下载 pivot 时不要停止标头的提取
                if atomic.LoadInt32(&d.committed) == 0 && pivot <= from {p.log.Debug("No headers, waiting for pivot commit")
                    select {case <-time.After(fsHeaderContCheck):
                        getHeaders(from)
                        continue
                    case <-d.cancelCh:
                        return errCanceled
                    }
                }
                // 实现 Pivot 操作(或不进行疾速同步),并且没有头文件,终止该过程
                p.log.Debug("No more headers available")
                select {
                case d.headerProcCh <- nil:
                    return nil
                case <-d.cancelCh:
                    return errCanceled
                }
            }

2.4 当 header 有数据并且是在获取 skeleton 的时候,调用 fillHeaderSkeleton 填充skeleton

if skeleton {filled, proced, err := d.fillHeaderSkeleton(from, headers)
                if err != nil {p.log.Debug("Skeleton chain invalid", "err", err)
                    return errInvalidChain
                }
                headers = filled[proced:]
                from += uint64(proced)
            }

2.5 如果以后解决的不是 skeleton,表明区块同步得差不多了,解决尾部的一些区块

判断本地的主链高度与新收到的 header 的最高高度的高度差是否在 reorgProtThreshold 以内,如果不是,就将高度最高的 reorgProtHeaderDelay 个 header 丢掉。

if head+uint64(reorgProtThreshold) < headers[n-1].Number.Uint64() {
                        delay := reorgProtHeaderDelay
                        if delay > n {delay = n}
                        headers = headers[:n-delay]
                    }

2.6 如果还有 header 未解决,发给 headerProcCh 进行解决,Downloader.processHeaders 会期待这个 channel 的音讯并进行解决;

if len(headers) > 0 {
                ...
                select {
                case d.headerProcCh <- headers:
                case <-d.cancelCh:
                    return errCanceled
                }
                from += uint64(len(headers))
  getHeaders(from)
}

2.7 如果没有发送标头,或者所有标头期待 fsHeaderContCheck 秒,再次调用 getHeaders 申请区块

p.log.Trace("All headers delayed, waiting")
                select {case <-time.After(fsHeaderContCheck):
                    getHeaders(from)
                    continue
                case <-d.cancelCh:
                    return errCanceled
                }

这段代码起初才加上的,其 commit 的记录在这里,而「pull request」在这里。从「pull request」中作者的解释咱们能够理解这段代码的逻辑和性能:这个批改次要是为了解决经常出现的「invalid hash chain」谬误,呈现这个谬误的起因是因为在咱们上一次从近程节点获取到一些区块并将它们退出到本地的主链的过程中,近程节点产生了 reorg 操作(参见这篇文章里对于「主链与侧链」的介绍);当咱们再次依据高度申请新的区块时,对方返回给咱们的是它的新的主链上的区块,而咱们没有这个链上的历史区块,因而在本地写入区块时就会返回「invalid hash chain」谬误。

要想产生「reorg」操作,就须要有新区块退出。在以太坊主网上,新产生一个区块的距离是 10 秒到 20 秒左右。个别状况下,如果仅仅是区块数据,它的同步速度还是很快的,每次下载也有最大数量的限度。所以在新产生一个区块的这段时间里,足够同步实现一组区块数据而对方节点不会产生「reorg」操作。然而留神方才说的「仅仅是区块数据」的同步较快,state 数据的同步就十分慢了。简略来说在实现同步之前可能会有多个「pivot」区块,这些区块的 state 数据会从网络上下载,这就大大拖慢了整个区块的同步速度,使得本地在同步一组区块的同时对方产生「reorg」操作的机率大大增加。

作者认为这种状况下产生的「reorg」操作是由新产生的区块的竞争引起的,所以最新的几个区块是「不稳固的」,如果本次同步的区块数量较多(也就是咱们同步时耗费的工夫比拟长)(在这里「本次同步的区数数量较多」的体现是新收到的区块的最高高度与本地数据库中的最高高度的差距大于 reorgProtThreshold),那么在同步时能够先防止同步最新区块,这就是 reorgProtThresholdreorgProtHeaderDelay 这个变量的由来。

至此,Downloader.fetchHeaders 办法就完结了,所有的区块头也就同步实现了。在下面咱们提到填充 skeleton 的时候,是由 fillHeaderSkeleton 函数来实现,接下来就要细讲填充 skeleton 的细节。


fillHeaderSkeleton

首先咱们晓得以太坊在同步区块时,先确定要下载的区块的高度区间,而后将这个区间按 MaxHeaderFetch 切分成很多组,每一组的最初一个区块组成了「skeleton」(最初一组不满 MaxHeaderFetch 个区块不算作一组)。不分明的能够查看下面的图。

①:将一批 header 检索工作增加到队列中,以填充skeleton

这个函数参照下面 queue 详解 的剖析

func (q queue) ScheduleSkeleton(from uint64, skeleton []types.Header) {}

②:调用 fetchParts 获取headers 数据

fetchParts是很外围的函数,上面的 FetchbodiesFetchReceipts都会调用。先来大抵看一下 fetchParts 的构造:

func (d *Downloader) fetchParts(...) error {
  ...
  for {
        select {
        case <-d.cancelCh:
        case packet := <-deliveryCh:
        case cont := <-wakeCh:
        case <-ticker.C:
        case <-update:
        ...
    }
}

简化下来就是这 5 个 channel 在解决,后面 4 个 channel 负责循环期待音讯,update用来期待其余 4 个 channel 的告诉来解决逻辑,先离开剖析一个个的channel

2.1 deliveryCh 传递下载的数据

deliveryCh 作用就是传递下载的数据,当有数据被真正下载下来时,就会给这个 channel 发消息将数据传递过去。这个 channel 对应的别离是:d.headerChd.bodyChd.receiptCh,而这三个 channel 别离在以下三个办法中被写入数据:DeliverHeadersDeliverBodiesDeliverReceipts。看下 deliveryCh 如何解决数据:

case packet := <-deliveryCh:
            if peer := d.peers.Peer(packet.PeerId()); peer != nil {accepted, err := deliver(packet)// 传递接管到的数据块并查看链有效性
                if err == errInvalidChain {return err}
                if err != errStaleDelivery {setIdle(peer, accepted)
                }
                switch {case err == nil && packet.Items() == 0:
                    ...
                case err == nil:
                ...
                }
            }
            select {case update <- struct{}{}:
            default:
            }

收到下载数据后判断节点是否无效,如果节点没有被移除,则会通过 deliver 传递接管到的下载数据。如果没有任何谬误,则告诉 update 解决。

要留神 deliver 是一个回调函数,它调用了 queue 对象的 Deliver 办法:queue.DeliverHeadersqueue.DeliverBodiesqueue.DeliverReceipts,在收到下载数据就会调用此回调函数(queue 相干函数剖析参照 queue 详解局部)。

在下面处理错误局部,有一个 setIdle 函数,它也是回调函数,其实现都是调用了 peerConnection 对象的相干办法:SetHeadersIdleSetBodiesIdleSetReceiptsIdle。它这个函数是指某些节点针对某类数据是闲暇的,比方headerbodiesreceipts,如果须要下载这几类数据,就能够从闲暇的节点下载这些数据。

2.2 wakeCh 唤醒fetchParts,下载新数据或下载已实现

case cont := <-wakeCh:
            if !cont {finished = true}
            select {case update <- struct{}{}:
            default:
            }

首先咱们通过调用 fetchParts 传递的参数晓得,wakeCh 的值其实是 queue.headerContCh。在 queue.DeliverHeaders 中发现所有须要下戴的 header 都下载实现了时,才会发送 false 给这个 channel。fetchParts 在收到这个音讯时,就晓得没有 header 须要下载了。代码如下:

func (q *queue) DeliverHeaders(......) (int, error) {
    ......
    if len(q.headerTaskPool) == 0 {q.headerContCh <- false}
    ......
}

同样如此,bodyreceipt 则是 bodyWakeChreceiptWakeCh,在 processHeaders 中,如果所有 header 曾经下载实现了,那么发送 false 给这两个 channel,告诉它们没有新的 header 了。bodyreceipt 的下载依赖于 header, 须要 header 先下载实现能力下载,所以对于下戴 bodyreceiptfetchParts 来说,收到这个 wakeCh 就代表不会再有告诉让本人下载数据了.

func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
    for {
        select {
        case headers := <-d.headerProcCh:
            if len(headers) == 0 {for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
                    select {
                    case ch <- false:
                    case <-d.cancelCh:
                    }
                }
                        ...
            }
            ...
            for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
                select {
                case ch <- true:
                default:
                }
            }
        }
    }
}

2.3 ticker 负责周期性的激活 update进行音讯解决

case <-ticker.C:
            select {case update <- struct{}{}:
            default:
            }

2.4 update(解决此前几个 channel 的数据)(重要)

2.4.1 判断是否无效节点,并获取超时数据的信息

获取超时数据的节点 ID 和数据数量,如果大于两个的话,就将这个节点设置为闲暇状态(setIdle),小于两个的话间接断开节点连贯。

expire 是一个回调函数,会返回以后所有的超时数据信息。这个函数的理论实现都是调用了 queue 对象的 Expire 办法:ExpireHeadersExpireBodiesExpireReceipts, 此函数会统计以后正在下载的数据中,起始工夫与以后工夫的差距超过给定阈值(downloader.requestTTL 办法的返回值)的数据,并将其返回。

if d.peers.Len() == 0 {return errNoPeers}
for pid, fails := range expire() {if peer := d.peers.Peer(pid); peer != nil {
    if fails > 2 {
                        ...
                        setIdle(peer, 0)
                    } else {
                    ...
                        if d.dropPeer == nil { } else {d.dropPeer(pid)
                            ....
                        }
                    }
  }

2.4.2 解决完超时数据,判断是否还有下载的数据

如果没有其余可下载的内容,请期待或终止,这里 pending()inFlight()都是回调函数,pending别离对应了 queue.PendingHeadersqueue.PendingBlocksqueue.PendingReceipts, 用来返回各自要下载的工作数量。inFlight() 别离对应了queue.InFlightHeadersqueue.InFlightBlocksqueue.InFlightReceipts, 用来返回正在下载的数据数量。

if pending() == 0 {if !inFlight() && finished {
                ...
                    return nil
                }
                break
            }

2.4.3 应用闲暇节点,调用 fetch 函数发送数据申请

Idle()回调函数在下面曾经提过了,throttle()回调函数则别离对 queue.ShouldThrottleBlocksqueue.ShouldThrottleReceipts, 用来示意是否应该下载bodies 或者receipts

reserve函数别离对应queue.ReserveHeadersqueue.ReserveBodiesqueue.ReserveReceipts, 用来从从下载工作中选取一些能够下载的工作,并结构一个 fetchRequest 构造。它还返回一个 process 变量,标记着是否有空的数据正在被解决。比方有可能某区块中未蕴含任何一条交易,因而它的 bodyreceipt 都是空的,这种数据其实是不须要下载的。在 queue 对象的 Reserve 办法中,会对这种状况进行辨认。如果遇到空的数据,这些数据会被间接标记为下载胜利。在办法返回时,就将是否产生过「间接标记为下载胜利」的状况返回。

capacity回调函数别离对应peerConnection.HeaderCapacitypeerConnection.BlockCapacitypeerConnection.ReceiptCapacity, 用来决定下载须要申请数据的个数。

fetch回调函数别离对应peer.FetchHeaderspeer.Fetchbodiespeer.FetchReceipts, 用来发送获取各类数据的申请。

progressed, throttled, running := false, false, inFlight()
            idles, total := idle()
            for _, peer := range idles {if throttle() {...}
                if pending() == 0 {break}
                request, progress, err := reserve(peer, capacity(peer))
                if err != nil {return err}
                if progress {progressed = true}
        if request == nil {continue}
                if request.From > 0 {...}
                ...
                if err := fetch(peer, request); err != nil {...}
            if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {return errPeersUnavailable}

简略来概括这段代码就是:应用闲暇节点下载数据,判断是否须要暂停,或者数据是否曾经下载实现;之后选取数据进行下载;最初,如果没有遇到空块须要下载、且没有暂停下载和所有无效节点都闲暇和的确有数据须要下载,但下载没有运行起来,就返回 errPeersUnavailable 谬误。

到此为止 fetchParts 函数就剖析的差不多了。外面波及的跟 queue.go 相干的一些函数都在 queue 详解 大节里介绍了。


processHeaders

通过 headerProcCh 接管 header 数据,并解决的过程是在 processHeaders 函数中实现的。整个处理过程集中在:case headers := <-d.headerProcCh 中:

①:如果 headers 的长度为 0,则会有以下操作:

1.1 告诉所有人 header 曾经处理完毕

for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
                    select {
                    case ch <- false:
                    case <-d.cancelCh:
                    }
                }

1.2 若没有检索到任何 header,阐明他们的TD 小于咱们的,或者曾经通过咱们的 fetcher 模块进行了同步。

if d.mode != LightSync {head := d.blockchain.CurrentBlock()
                    if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {return errStallingPeer}
                }

1.3 如果是 fast 或者light 同步,确保传递了header

if d.mode == FastSync || d.mode == LightSync {head := d.lightchain.CurrentHeader()
                    if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {return errStallingPeer}
                }

②:如果 headers 的长度大于 0

2.1 如果是 fast 或者 light 同步,调用 ightchain.InsertHeaderChain() 写入 headerleveldb数据库;

if d.mode == FastSync || d.mode == LightSync {
  ....
  d.lightchain.InsertHeaderChain(chunk, frequency);
  ....
}

2.2 如果是 fast 或者 full sync 模式,则调用 d.queue.Schedule 进行内容 (body 和 receipt) 检索。

if d.mode == FullSync || d.mode == FastSync {
  ...
  inserts := d.queue.Schedule(chunk, origin)
  ...
}

③:如果找到更新的块号,则要发信号告诉新工作

if d.syncStatsChainHeight < origin {d.syncStatsChainHeight = origin - 1}
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
                select {
                case ch <- true:
                default:
                }
            }

到此解决 Headers 的剖析就实现了。


同步 bodies

同步 bodies 则是由fetchBodies 函数实现的。

fetchBodies

同步 bodies 的过程跟同步 header 相似,大抵讲下步骤:

  1. 调用fetchParts
  2. ReserveBodies()从 bodyTaskPool 中取出要同步的body
  3. 调用 fetch,也就是调用这里的FetchBodies 从节点获取 body,发送GetBlockBodiesMsg 音讯;
  4. 收到 bodyCh 的数据后,调用 deliver 函数,将 Transactions 和 Uncles 写入resultCache

同步 Receipts

fetchReceipts

同步 receipts 的过程跟同步 header 相似,大抵讲下步骤:

  1. 调用fetchParts()
  2. ReserveBodies()从 ReceiptTaskPool 中取出要同步的Receipt
  3. 调用这里的 FetchReceipts 从节点获取 receipts,发送GetReceiptsMsg 音讯;
  4. 收到 receiptCh 的数据后,调用 deliver 函数,将 Receipts 写入resultCache

同步状态

这里咱们讲两种模式下的状态同步:

  • fullSync: processFullSyncContentfull模式下 Receipts 没有缓存到 resultCache 中,间接先从缓存中取出 body 数据,而后执行交易生成状态,最初写入区块链。
  • fastSync:processFastSyncContent:fast 模式的 Receipts、Transaction、Uncles 都在 resultCache 中,所以还须要下载 ”state”,进行校验,再写入区块链。

接下来大抵的探讨下这两种形式。

processFullSyncContent

func (d *Downloader) processFullSyncContent() error {
    for {results := d.queue.Results(true)
        ...
        if err := d.importBlockResults(results); err != nil ...
    }
}
func (d *Downloader) importBlockResults(results []*fetchResult) error {
    ...
    select {
...
    blocks := make([]*types.Block, len(results))
    for i, result := range results {blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
    }
    if index, err := d.blockchain.InsertChain(blocks); err != nil {....}

间接从 result 中获取数据并生成block,直接插入区块链中,就完结了。


processFastSyncContent

fast 模式同步状态内容比拟多,大抵也就如下几局部,咱们开始简略剖析以下。

①:下载最新的区块状态

sync := d.syncState(latest.Root)

咱们间接用一张图来示意整个大抵流程:

具体的代码读者本人翻阅,大抵就是这么个简略过程。

②:计算出 pivot 块

pivotlatestHeight - 64,调用splitAroundPivot() 办法以 pivot 为核心,将 results 分为三个局部:beforePPafterP

pivot := uint64(0)
    if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) {pivot = height - uint64(fsMinFullBlocks)
    }
P, beforeP, afterP := splitAroundPivot(pivot, results)

③:对 beforeP 的局部调用 commitFastSyncData,将bodyreceipt都写入区块链

d.commitFastSyncData(beforeP, sync); 

④:对 P 的局部更新状态信息为 P block 的状态,把 P 对应的 result(蕴含bodyreceipt)调用 commitPivotBlock 插入本地区块链中,并调用 FastSyncCommitHead 记录这个 pivothash值,存在 downloader 中,标记为疾速同步的最初一个区块 hash 值;

if err := d.commitPivotBlock(P); err != nil {return err}

⑤:对 afterP 调用 d.importBlockResults,将body 插入区块链,而不插入 receipt。因为是最初 64 个区块,所以此时数据库中只有headerbody,没有 receipt 和状态,要通过 fullSync 模式进行最初的同步。

if err := d.importBlockResults(afterP); err != nil {return err}

到此为止整个 Downloader 同步实现了。

参考

https://mindcarver.cn

https://github.com/ethereum/g…

https://yangzhe.me/2019/05/09…

正文完
 0