死磕以太坊源码剖析之downloader同步须要配合正文代码看:https://github.com/blockchain...
这篇文章篇幅较长,能看上来的是条汉子,倡议珍藏
心愿读者在浏览过程中,指出问题,给个关注,一起探讨。
概览
downloader
模块的代码位于 eth/downloader
目录下。次要的性能代码别离是:
downloader.go
:实现了区块同步逻辑peer.go
:对区块各个阶段的组装,上面的各个FetchXXX
就是很依赖这个模块。queue.go
:对eth/peer.go
的封装statesync.go
:同步state
对象
同步模式
### full sync
full 模式会在数据库中保留所有区块数据,同步时从近程节点同步 header 和 body 数据,而state 和 receipt 数据则是在本地计算出来的。
在 full 模式下,downloader 会同步区块的 header 和 body 数据组成一个区块,而后通过 blockchain 模块的 BlockChain.InsertChain
向数据库中插入区块。在 BlockChain.InsertChain
中,会一一计算和验证每个块的 state
和 recepit
等数据,如果一切正常就将区块数据以及本人计算失去的 state
、recepit
数据一起写入到数据库中。
fast sync
fast
模式下,recepit
不再由本地计算,而是和区块数据一样,间接由 downloader
从其它节点中同步;state
数据并不会全副计算和下载,而是选一个较新的区块(称之为 pivot
)的 state
进行下载,以这个区块为分界,之前的区块是没有 state
数据的,之后的区块会像 full
模式下一样在本地计算 state
。因而在 fast
模式下,同步的数据除了 header
和 body,还有 receipt
,以及 pivot
区块的 state
。
因而 fast
模式疏忽了大部分 state
数据,并且应用网络间接同步 receipt
数据的形式替换了 full 模式下的本地计算,所以比拟快。
light sync
light 模式也叫做轻模式,它只对区块头进行同步,而不同步其它的数据。
SyncMode:
- FullSync:从残缺区块同步整个区块链历史
- FastSync:疾速下载题目,仅在链头处齐全同步
- LightSync:仅下载题目,而后终止
区块下载流程
图片只是大略的形容一下,理论还是要联合代码,所有区块链相干文章合集,https://github.com/blockchain...同时心愿结识更多区块链圈子的人,能够star下面我的项目,继续更新
首先依据Synchronise
开始区块同步,通过findAncestor
找到指定节点的独特先人,并在此高度进行同步,同时开启多个goroutine
同步不同的数据:header
、receipt
、body
。如果同步高度为 100 的区块,必须先header
同步胜利同步实现才能够唤醒body
和receipts
的同步。
而每个局部的同步大抵都是由FetchParts
来实现的,外面蕴含了各个Chan
的配合,也会波及不少的回调函数,总而言之多读几遍每次都会有不同的了解。接下来就逐渐剖析这些要害内容。
synchronise
①:确保对方的TD高于咱们本人的TD
currentBlock := pm.blockchain.CurrentBlock() td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) pHead, pTd := peer.Head() if pTd.Cmp(td) <= 0 { return }
②:开启downloader
的同步
pm.downloader.Synchronise(peer.id, pHead, pTd, mode)
进入函数:次要做了以下几件事:
d.synchronise(id, head, td, mode)
:同步过程- 谬误日志输入, 并删除此
peer
。
进入到d.synchronise
,走到最初一步d.syncWithPeer(p, hash, td)
真正开启同步。
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error { ... return d.syncWithPeer(p, hash, td)}
syncWithPeer大略做了以下几件事:
- 查找先人
findAncestor
开启独自
goroutine
别离运行以下几个函数:- fetchHeaders
- processHeaders
- fetchbodies
- fetchReceipts
- processFastSyncContent
- processFullSyncContent
接下来的文章,以及整个Downloader
模块次要内容就是围绕这几个局部进行开展。
findAncestor
同步首要的是确定同步区块的区间:顶部为近程节点的最高区块,底部为两个节点都领有的雷同区块的最高高度(先人区块)。findAncestor
就是用来找先人区块。函数剖析如下:
①:确定本地高度和近程节点的最高高度
var ( floor = int64(-1) // 底部 localHeight uint64 // 本地最高高度 remoteHeight = remoteHeader.Number.Uint64() // 近程节点最高高度 )switch d.mode { case FullSync: localHeight = d.blockchain.CurrentBlock().NumberU64() case FastSync: localHeight = d.blockchain.CurrentFastBlock().NumberU64() default: localHeight = d.lightchain.CurrentHeader().Number.Uint64() }
②:计算同步的高度区间和距离
from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight)
from
::示意从哪个高度开始获取区块count
:示意从近程节点获取多少个区块skip
:示意距离,比方skip
为 2 ,获取第一个高度为 5,则第二个就是 8max
:示意最大高度
③:发送获取header
的申请
go p.peer.RequestHeadersByNumber(uint64(from), count, skip, false)
④:解决下面申请接管到的header
:case packet := <-d.headerCh
- 抛弃掉不是来自咱们申请节的内容
- 确保返回的
header
数量不为空 - 验证返回的
headers
的高度是咱们所申请的 - 查看是否找到独特先人
//----①if packet.PeerId() != p.id { log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) break }//-----②headers := packet.(*headerPack).headers if len(headers) == 0 { p.log.Warn("Empty head header set") return 0 }//-----③for i, header := range headers { expectNumber := from + int64(i)*int64(skip+1) if number := header.Number.Int64(); number != expectNumber { // 验证这些返回的header是否是咱们下面申请的headers p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number) return 0, errInvalidChain } }//-----④// 查看是否找到独特先人 finished = true //留神这里是从headers最初一个元素开始查找,也就是高度最高的区块。 for i := len(headers) - 1; i >= 0; i-- { // 跳过不在咱们申请的高度区间内的区块 if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > max { continue } // //查看咱们本地是否曾经有某个区块了,如果有就算是找到了独特先人, //并将独特先人的哈希和高度设置在number和hash变量中。 h := headers[i].Hash() n := headers[i].Number.Uint64()
⑤:如果通过固定距离法找到了独特先人则返回先人,会对其高度与 floor
变量进行验证, floor
变量代表的是独特先人的高度的最小值,如果找到独特先人的高度比这个值还小,就认为是两个节点之间分叉太大了,不再容许进行同步。如果一切正常,就返回找到的独特先人的高度 number
变量。
if hash != (common.Hash{}) { if int64(number) <= floor { return 0, errInvalidAncestor } return number, nil }
⑥:如果固定距离法没有找到先人则通过二分法来查找先人,这部分能够思维跟二分法算法相似,有趣味的能够细看。
queue详解
queue
对象和Downloader
对象是相互作用的,Downloader
的很多性能离不开他,接下来咱们介绍一下这部分内容,然而本节,能够后行跳过,等到了浏览上面的对于Queue
调用的一些函数局部再回过来浏览这部分解说。
queue构造体
type queue struct { mode SyncMode // 同步模式 // header解决相干 headerHead common.Hash //最初一个排队的标头的哈希值以验证程序 headerTaskPool map[uint64]*types.Header //待处理的标头检索工作,将起始索引映射到框架标头 headerTaskQueue *prque.Prque //骨架索引的优先级队列,以获取用于的填充标头 headerPeerMiss map[string]map[uint64]struct{} //已知不可用的对等头批处理集 headerPendPool map[string]*fetchRequest //以后挂起的头检索操作 headerResults []*types.Header //后果缓存累积实现的头 headerProced int //从后果中拿进去曾经解决的header headerContCh chan bool //header下载实现时告诉的频道 blockTaskPool map[common.Hash]*types.Header //待处理的块(body)检索工作,将哈希映射到header blockTaskQueue *prque.Prque //标头的优先级队列,以用于获取块(bodies) blockPendPool map[string]*fetchRequest //以后的正在解决的块(body)检索操作 blockDonePool map[common.Hash]struct{} //曾经实现的块(body) receiptTaskPool map[common.Hash]*types.Header //待处理的收据检索工作,将哈希映射到header receiptTaskQueue *prque.Prque //标头的优先级队列,以用于获取收据 receiptPendPool map[string]*fetchRequest //以后的正在解决的收据检索操作 receiptDonePool map[common.Hash]struct{} //曾经实现的收据 resultCache []*fetchResult //下载但尚未交付获取后果 resultOffset uint64 //区块链中第一个缓存的获取后果的偏移量 resultSize common.StorageSize // 块的近似大小 lock *sync.Mutex active *sync.Cond closed bool }
次要细分性能
数据下载开始安顿工作
ScheduleSkeleton
:将一批header
检索工作增加到队列中,以填充已检索的header skeleton
Schedule
:用来筹备对一些body
和receipt
数据的下载
数据下载中的各类状态
pending
pending
示意待检索的XXX申请的数量,包含了:PendingHeaders
、PendingBlocks
、PendingReceipts
,别离都是对应取XXXTaskQueue
的长度。InFlight
InFlight
示意是否有正在获取XXX的申请,包含:InFlightHeaders
、InFlightBlocks
、InFlightReceipts
,都是通过判断len(q.receiptPendPool) > 0
来确认。ShouldThrottle
ShouldThrottle
示意查看是否应该限度下载XXX,包含:ShouldThrottleBlocks
、ShouldThrottleReceipts
,次要是为了避免下载过程中本地内存占用过大。Reserve
Reserve
通过结构一个fetchRequest
构造并返回,向调用者提供指定数量的待下载的数据的信息(queue
外部会将这些数据标记为「正在下载」)。调用者应用返回的fetchRequest
数据向近程节点发动新的获取数据的申请。包含:ReserveHeaders
、ReserveBodies
、ReserveReceipts
。Cancel
Cance
用来吊销对fetchRequest
构造中的数据的下载(queue
外部会将这些数据从新从「正在下载」的状态更改为「期待下载」)。包含:CancelHeaders
、CancelBodies
、CancelReceipts
。expire
expire
查看正在执行中的申请是否超过了超时限度,包含:ExpireHeaders
、ExpireBodies
、ExpireReceipts
。Deliver
当有数据下载胜利时,调用者会应用
deliver
性能用来告诉queue
对象。包含:DeliverHeaders
、DeliverBodies
、DeliverReceipts
。
数据下载实现获取区块数据
RetrieveHeaders
在填充skeleton
实现后,queue.RetrieveHeaders
用来获取整个skeleton
中的所有header
。Results
queue.Results
用来获取以后的header
、body
和receipt
(只在fast
模式下) 都已下载胜利的区块(并将这些区块从queue
外部移除)
函数实现
ScheduleSkeleton
queue.ScheduleSkeleton次要是为了填充skeleton,它的参数是要下载区块的起始高度和所有 skeleton
区块头,最外围的内容则是上面这段循环:
func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { ...... for i, header := range skeleton { index := from + uint64(i*y) q.headerTaskPool[index] = header q.headerTaskQueue.Push(index, -int64(index)) }}
假如已确定须要下载的区块高度区间是从 10 到 46,MaxHeaderFetch
的值为 10,那么这个高度区块就会被分成 3 组:10 - 19,20 - 29,30 - 39,而 skeleton 则别离由高度为 19、29、39 的区块头组成。循环中的 index
变量实际上是每一组区块中的第一个区块的高度(比方 10、20、30),queue.headerTaskPool
实际上是一个每一组区块中第一个区块的高度到最初一个区块的 header 的映射
headerTaskPool = { 10: headerOf_19, 20: headerOf_20, 30: headerOf_39,}
ReserveHeaders
reserve
用来获取可下载的数据。
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) { return d.queue.ReserveHeaders(p, count), false, nil }
func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest { if _, ok := q.headerPendPool[p.id]; ok { return nil } //① ... send, skip := uint64(0), []uint64{} for send == 0 && !q.headerTaskQueue.Empty() { from, _ := q.headerTaskQueue.Pop() if q.headerPeerMiss[p.id] != nil { if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok { skip = append(skip, from.(uint64)) continue } } send = from.(uint64) // ② } ... for _, from := range skip { q.headerTaskQueue.Push(from, -int64(from)) } // ③ ... request := &fetchRequest{ Peer: p, From: send, Time: time.Now(), } q.headerPendPool[p.id] = request // ④ }
①:依据headerPendPool
来判断近程节点是否正在下载数据信息。
②:从headerTaskQueue
取出值作为本次申请的起始高度,赋值给send
变量,在这个过程中会排除headerPeerMiss所记录的节点下载数据失败的信息。
③:将失败的工作再从新写回task queue
④:利用send
变量结构fetchRequest
构造,此构造是用来作为FetchHeaders
来应用的:
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
至此,ReserveHeaders
会从工作队列里抉择最小的起始高度并结构fetchRequest
传递给fetch
获取数据。
DeliverHeaders
deliver = func(packet dataPack) (int, error) { pack := packet.(*headerPack) return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh) }
①:如果发现下载数据的节点没有在 queue.headerPendPool
中,就间接返回谬误;否则就持续解决,并将节点记录从 queue.headerPendPool
中删除。
request := q.headerPendPool[id] if request == nil { return 0, errNoFetchesPending } headerReqTimer.UpdateSince(request.Time) delete(q.headerPendPool, id)
②:验证headers
包含三方面验证:
- 查看起始区块的高度和哈希
- 查看高度的连接性
- 查看哈希的连接性
if accepted { //查看起始区块的高度和哈希 if headers[0].Number.Uint64() != request.From { ... accepted = false } else if headers[len(headers)-1].Hash() != target { ... accepted = false } } if accepted { for i, header := range headers[1:] { hash := header.Hash() // 查看高度的连接性 if want := request.From + 1 + uint64(i); header.Number.Uint64() != want { ... } if headers[i].Hash() != header.ParentHash { // 查看哈希的连接性 ... } } }
③: 将有效数据存入headerPeerMiss
,并将这组区块起始高度从新放入headerTaskQueue
if !accepted { ... miss := q.headerPeerMiss[id] if miss == nil { q.headerPeerMiss[id] = make(map[uint64]struct{}) miss = q.headerPeerMiss[id] } miss[request.From] = struct{}{} q.headerTaskQueue.Push(request.From, -int64(request.From)) return 0, errors.New("delivery not accepted") }
④:保留数据,并告诉headerProcCh
解决新的header
if ready > 0 { process := make([]*types.Header, ready) copy(process, q.headerResults[q.headerProced:q.headerProced+ready]) select { case headerProcCh <- process: q.headerProced += len(process) default: } }
⑤:发送音讯给.headerContCh
,告诉skeleton
都被下载完了
if len(q.headerTaskPool) == 0 { q.headerContCh <- false }
DeliverHeaders
会对数据进行测验和保留,并发送 channel 音讯给 Downloader.processHeaders
和 Downloader.fetchParts
的 wakeCh
参数。
Schedule
processHeaders
在解决header
数据的时候,会调用queue.Schedule
为下载 body
和 receipt
作筹备。
inserts := d.queue.Schedule(chunk, origin)
func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { inserts := make([]*types.Header, 0, len(headers)) for _, header := range headers { //校验 ... q.blockTaskPool[hash] = header q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) if q.mode == FastSync { q.receiptTaskPool[hash] = header q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) } inserts = append(inserts, header) q.headerHead = hash from++ } return inserts}
这个函数次要就是将信息写入到body和receipt队列,期待调度。
ReserveBody&Receipt
在 queue
中筹备好了 body 和 receipt 相干的数据, processHeaders
最初一段,是唤醒下载Bodyies和Receipts的要害代码,会告诉 fetchBodies
和 fetchReceipts
能够对各自的数据进行下载了。
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: } }
而fetchXXX
会调用fetchParts
,逻辑相似下面的的,reserve
最终则会调用reserveHeaders
,deliver
最终调用的是 queue.deliver
.
先来剖析reserveHeaders
:
①:如果没有可解决的工作,间接返回
if taskQueue.Empty() { return nil, false, nil }
②:如果参数给定的节点正在下载数据,返回
if _, ok := pendPool[p.id]; ok { return nil, false, nil }
③:计算 queue 对象中的缓存空间还能够包容多少条数据
space := q.resultSlots(pendPool, donePool)
④:从 「task queue」 中顺次取出工作进行解决
次要实现以下性能:
- 计算以后 header 在
queue.resultCache
中的地位,而后填充queue.resultCache
中相应地位的元素 - 解决空区块的状况,若为空不下载。
- 解决近程节点短少这个以后区块数据的状况,如果发现这个节点已经下载以后数据失败过,就不再让它下载了。
留神:resultCache
字段用来记录所有正在被解决的数据的处理结果,它的元素类型是 fetchResult
。它的 Pending
字段代表以后区块还有几类数据须要下载。这里须要下载的数据最多有两类:body 和 receipt,full
模式下只须要下载 body
数据,而 fast
模式要多下载一个 receipt
数据。
for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ { header := taskQueue.PopItem().(*types.Header) hash := header.Hash() index := int(header.Number.Int64() - int64(q.resultOffset)) if index >= len(q.resultCache) || index < 0 { .... } if q.resultCache[index] == nil { components := 1 if q.mode == FastSync { components = 2 } q.resultCache[index] = &fetchResult{ Pending: components, Hash: hash, Header: header, } } if isNoop(header) { donePool[hash] = struct{}{} delete(taskPool, hash) space, proc = space-1, proc-1 q.resultCache[index].Pending-- progress = true continue } if p.Lacks(hash) { skip = append(skip, header) } else { send = append(send, header) } }
最初就是结构 fetchRequest
构造并返回。
DeliverBodies&Receipts
body
或 receipt
数据都曾经通过 reserve
操作结构了 fetchRequest
构造并传给 fetch
,接下来就是期待数据的达到,数据下载胜利后,会调用 queue
对象的 deliver
办法进行传递,包含 queue.DeliverBodies
和 queue.DeliverReceipts
。这两个办法都以不同的参数调用了 queue.deliver
办法:
①:如果下载的数据数量为 0,则把所有此节点此次下载的数据标记为「缺失」
if results == 0 { for _, header := range request.Headers { request.Peer.MarkLacking(header.Hash()) } }
②:循环解决数据,通过调用reconstruct
填充 resultCache[index]
中的相应的字段
for i, header := range request.Headers { ... if err := reconstruct(header, i, q.resultCache[index]); err != nil { failure = err break }}
③:验证resultCache
中的数据,其对应的 request.Headers
中的 header
都应为 nil,若不是则阐明验证未通过,须要如果到task queue从新下载
for _, header := range request.Headers { if header != nil { taskQueue.Push(header, -int64(header.Number.Uint64())) } }
④:如果有数据被验证通过且写入 queue.resultCache
中了(accepted
> 0),发送 queue.active
音讯。Results
会期待这这个信号。
Results
当(header、body、receipt)都下载完,就要将区块写入到数据库了,queue.Results
就是用来返回所有目前曾经下载实现的数据,它在 Downloader.processFullSyncContent
和 Downloader.processFastSyncContent
中被调用。代码比较简单就不多说了。
到此为止queue
对象就剖析的差不多了。
同步headers
fetchHeaders
同步headers
是是由函数fetchHeaders
来实现的。
fetchHeaders
的大抵思维:
同步header
的数据会被填充到skeleton
,每次从近程节点获取区块数据最大为MaxHeaderFetch
(192),所以要获取的区块数据如果大于192 ,会被分成组,每组MaxHeaderFetch
,残余的有余192个的不会填充进skeleton
,具体步骤如下图所示:
此种形式能够防止从同一节点下载过多谬误数据,如果咱们连贯到了一个歹意节点,它能够发明一个链条很长且TD
值也十分高的区块链数据。如果咱们的区块从 0 开始全副从它那同步,也就下载了一些基本不被他人抵赖的数据。如果我只从它那同步 MaxHeaderFetch
个区块,而后发现这些区块无奈正确填充我之前的 skeleton
(可能是 skeleton
的数据错了,或者用来填充 skeleton
的数据错了),就会丢掉这些数据。
接下来查看下代码如何实现:
①:发动获取header
的申请
如果是下载skeleton
,则会从高度 from+MaxHeaderFetch-1
开始(包含),每隔 MaxHeaderFetch-1
的高度申请一个 header
,最多申请 MaxSkeletonSize
个。如果不是的话,则要获取残缺的headers
。
②:期待并解决headerCh
中的header
数据
2.1 确保近程节点正在返回咱们须要填充skeleton
所需的header
if packet.PeerId() != p.id { log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId()) break }
2.2 如果skeleton
曾经下载结束,则须要持续填充skeleton
if packet.Items() == 0 && skeleton { skeleton = false getHeaders(from) continue }
2.3 整个skeleton
填充实现,并且没有要获取的header
了,要告诉headerProcCh
全副实现
if packet.Items() == 0 { //下载pivot时不要停止标头的提取 if atomic.LoadInt32(&d.committed) == 0 && pivot <= from { p.log.Debug("No headers, waiting for pivot commit") select { case <-time.After(fsHeaderContCheck): getHeaders(from) continue case <-d.cancelCh: return errCanceled } } //实现Pivot操作(或不进行疾速同步),并且没有头文件,终止该过程 p.log.Debug("No more headers available") select { case d.headerProcCh <- nil: return nil case <-d.cancelCh: return errCanceled } }
2.4 当header
有数据并且是在获取skeleton
的时候,调用fillHeaderSkeleton
填充skeleton
if skeleton { filled, proced, err := d.fillHeaderSkeleton(from, headers) if err != nil { p.log.Debug("Skeleton chain invalid", "err", err) return errInvalidChain } headers = filled[proced:] from += uint64(proced) }
2.5 如果以后解决的不是 skeleton
,表明区块同步得差不多了,解决尾部的一些区块
判断本地的主链高度与新收到的 header 的最高高度的高度差是否在 reorgProtThreshold
以内,如果不是,就将高度最高的 reorgProtHeaderDelay
个 header 丢掉。
if head+uint64(reorgProtThreshold) < headers[n-1].Number.Uint64() { delay := reorgProtHeaderDelay if delay > n { delay = n } headers = headers[:n-delay] }
2.6 如果还有 header
未解决,发给 headerProcCh
进行解决,Downloader.processHeaders
会期待这个 channel 的音讯并进行解决;
if len(headers) > 0 { ... select { case d.headerProcCh <- headers: case <-d.cancelCh: return errCanceled } from += uint64(len(headers)) getHeaders(from)}
2.7 如果没有发送标头,或者所有标头期待 fsHeaderContCheck
秒,再次调用 getHeaders
申请区块
p.log.Trace("All headers delayed, waiting") select { case <-time.After(fsHeaderContCheck): getHeaders(from) continue case <-d.cancelCh: return errCanceled }
这段代码起初才加上的,其 commit 的记录在这里,而 「pull request」 在这里。从 「pull request」 中作者的解释咱们能够理解这段代码的逻辑和性能:这个批改次要是为了解决经常出现的 「invalid hash chain」 谬误,呈现这个谬误的起因是因为在咱们上一次从近程节点获取到一些区块并将它们退出到本地的主链的过程中,近程节点产生了 reorg 操作(参见这篇文章里对于「主链与侧链」的介绍 );当咱们再次依据高度申请新的区块时,对方返回给咱们的是它的新的主链上的区块,而咱们没有这个链上的历史区块,因而在本地写入区块时就会返回 「invalid hash chain」 谬误。
要想产生 「reorg」 操作,就须要有新区块退出。在以太坊主网上,新产生一个区块的距离是 10 秒到 20 秒左右。个别状况下,如果仅仅是区块数据,它的同步速度还是很快的,每次下载也有最大数量的限度。所以在新产生一个区块的这段时间里,足够同步实现一组区块数据而对方节点不会产生 「reorg」 操作。然而留神方才说的「仅仅是区块数据」的同步较快,state 数据的同步就十分慢了。简略来说在实现同步之前可能会有多个 「pivot」 区块,这些区块的 state 数据会从网络上下载,这就大大拖慢了整个区块的同步速度,使得本地在同步一组区块的同时对方产生 「reorg」 操作的机率大大增加。
作者认为这种状况下产生的 「reorg」 操作是由新产生的区块的竞争引起的,所以最新的几个区块是「不稳固的」,如果本次同步的区块数量较多(也就是咱们同步时耗费的工夫比拟长)(在这里「本次同步的区数数量较多」的体现是新收到的区块的最高高度与本地数据库中的最高高度的差距大于 reorgProtThreshold
),那么在同步时能够先防止同步最新区块,这就是 reorgProtThreshold
和 reorgProtHeaderDelay
这个变量的由来。
至此,Downloader.fetchHeaders
办法就完结了,所有的区块头也就同步实现了。在下面咱们提到填充skeleton
的时候,是由fillHeaderSkeleton
函数来实现,接下来就要细讲填充skeleton
的细节。
fillHeaderSkeleton
首先咱们晓得以太坊在同步区块时,先确定要下载的区块的高度区间,而后将这个区间按 MaxHeaderFetch
切分成很多组,每一组的最初一个区块组成了 「skeleton」(最初一组不满 MaxHeaderFetch
个区块不算作一组)。不分明的能够查看下面的图。
①:将一批header
检索工作增加到队列中,以填充skeleton
。
这个函数参照下面queue详解的剖析
func (q queue) ScheduleSkeleton(from uint64, skeleton []types.Header) {}
②:调用fetchParts
获取headers
数据
fetchParts
是很外围的函数,上面的Fetchbodies
和FetchReceipts
都会调用。先来大抵看一下fetchParts
的构造:
func (d *Downloader) fetchParts(...) error { ... for { select { case <-d.cancelCh: case packet := <-deliveryCh: case cont := <-wakeCh: case <-ticker.C: case <-update: ... }}
简化下来就是这 5 个channel
在解决,后面 4 个channel
负责循环期待音讯,update
用来期待其余 4 个channel
的告诉来解决逻辑,先离开剖析一个个的channel
。
2.1 deliveryCh 传递下载的数据
deliveryCh
作用就是传递下载的数据,当有数据被真正下载下来时,就会给这个 channel
发消息将数据传递过去。这个 channel 对应的别离是:d.headerCh
、d.bodyCh
、d.receiptCh
,而这三个 channel
别离在以下三个办法中被写入数据:DeliverHeaders
、DeliverBodies
、DeliverReceipts
。 看下deliveryCh
如何解决数据:
case packet := <-deliveryCh: if peer := d.peers.Peer(packet.PeerId()); peer != nil { accepted, err := deliver(packet)//传递接管到的数据块并查看链有效性 if err == errInvalidChain { return err } if err != errStaleDelivery { setIdle(peer, accepted) } switch { case err == nil && packet.Items() == 0: ... case err == nil: ... } } select { case update <- struct{}{}: default: }
收到下载数据后判断节点是否无效,如果节点没有被移除,则会通过deliver
传递接管到的下载数据。如果没有任何谬误,则告诉update
解决。
要留神deliver
是一个回调函数,它调用了 queue 对象的 Deliver 办法:queue.DeliverHeaders
、queue.DeliverBodies
、queue.DeliverReceipts
,在收到下载数据就会调用此回调函数(queue相干函数剖析参照queue详解局部)。
在下面处理错误局部,有一个setIdle
函数,它也是回调函数,其实现都是调用了 peerConnection
对象的相干办法:SetHeadersIdle
、SetBodiesIdle
、SetReceiptsIdle
。它这个函数是指某些节点针对某类数据是闲暇的,比方header
、bodies
、receipts
,如果须要下载这几类数据,就能够从闲暇的节点下载这些数据。
2.2 wakeCh
唤醒fetchParts
,下载新数据或下载已实现
case cont := <-wakeCh: if !cont { finished = true } select { case update <- struct{}{}: default: }
首先咱们通过调用fetchParts传递的参数晓得,wakeCh
的值其实是 queue.headerContCh
。在 queue.DeliverHeaders
中发现所有须要下戴的 header 都下载实现了时,才会发送 false 给这个 channel。fetchParts
在收到这个音讯时,就晓得没有 header 须要下载了。代码如下:
func (q *queue) DeliverHeaders(......) (int, error) { ...... if len(q.headerTaskPool) == 0 { q.headerContCh <- false } ......}
同样如此,body
和receipt
则是bodyWakeCh
和receiptWakeCh
,在 processHeaders
中,如果所有 header
曾经下载实现了,那么发送 false
给这两个 channel
,告诉它们没有新的 header
了。 body
和 receipt
的下载依赖于 header
,须要 header
先下载实现能力下载,所以对于下戴 body
或 receipt
的 fetchParts
来说,收到这个 wakeCh
就代表不会再有告诉让本人下载数据了.
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { for { select { case headers := <-d.headerProcCh: if len(headers) == 0 { for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: } } ... } ... for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: } } } }}
2.3 ticker 负责周期性的激活 update
进行音讯解决
case <-ticker.C: select { case update <- struct{}{}: default: }
2.4 update
(解决此前几个channel
的数据)(重要)
2.4.1 判断是否无效节点,并获取超时数据的信息
获取超时数据的节点ID和数据数量,如果大于两个的话,就将这个节点设置为闲暇状态(setIdle
),小于两个的话间接断开节点连贯。
expire
是一个回调函数,会返回以后所有的超时数据信息。这个函数的理论实现都是调用了 queue
对象的 Expire
办法:ExpireHeaders
、ExpireBodies
、ExpireReceipts
,此函数会统计以后正在下载的数据中,起始工夫与以后工夫的差距超过给定阈值(downloader.requestTTL
办法的返回值)的数据,并将其返回。
if d.peers.Len() == 0 { return errNoPeers }for pid, fails := range expire() { if peer := d.peers.Peer(pid); peer != nil { if fails > 2 { ... setIdle(peer, 0) } else { ... if d.dropPeer == nil { } else { d.dropPeer(pid) .... } } }
2.4.2 解决完超时数据,判断是否还有下载的数据
如果没有其余可下载的内容,请期待或终止,这里pending()
和inFlight()
都是回调函数,pending
别离对应了queue.PendingHeaders
、queue.PendingBlocks
、queue.PendingReceipts
,用来返回各自要下载的工作数量。inFlight()
别离对应了queue.InFlightHeaders
、queue.InFlightBlocks
、queue.InFlightReceipts
,用来返回正在下载的数据数量。
if pending() == 0 { if !inFlight() && finished { ... return nil } break }
2.4.3 应用闲暇节点,调用fetch
函数发送数据申请
Idle()
回调函数在下面曾经提过了,throttle()
回调函数则别离对queue.ShouldThrottleBlocks
、queue.ShouldThrottleReceipts
,用来示意是否应该下载bodies
或者receipts
。
reserve
函数别离对应queue.ReserveHeaders
、queue.ReserveBodies
、queue.ReserveReceipts
,用来从从下载工作中选取一些能够下载的工作,并结构一个 fetchRequest
构造。它还返回一个 process
变量,标记着是否有空的数据正在被解决。比方有可能某区块中未蕴含任何一条交易,因而它的 body
和 receipt
都是空的,这种数据其实是不须要下载的。在 queue
对象的 Reserve
办法中,会对这种状况进行辨认。如果遇到空的数据,这些数据会被间接标记为下载胜利。在办法返回时,就将是否产生过「间接标记为下载胜利」的状况返回。
capacity
回调函数别离对应peerConnection.HeaderCapacity
、peerConnection.BlockCapacity
、peerConnection.ReceiptCapacity
,用来决定下载须要申请数据的个数。
fetch
回调函数别离对应peer.FetchHeaders
、peer.Fetchbodies
、peer.FetchReceipts
,用来发送获取各类数据的申请。
progressed, throttled, running := false, false, inFlight() idles, total := idle() for _, peer := range idles { if throttle() { ... } if pending() == 0 { break } request, progress, err := reserve(peer, capacity(peer)) if err != nil { return err } if progress { progressed = true } if request == nil { continue } if request.From > 0 { ... } ... if err := fetch(peer, request); err != nil { ... } if !progressed && !throttled && !running && len(idles) == total && pending() > 0 { return errPeersUnavailable }
简略来概括这段代码就是:应用闲暇节点下载数据,判断是否须要暂停,或者数据是否曾经下载实现;之后选取数据进行下载;最初,如果没有遇到空块须要下载、且没有暂停下载和所有无效节点都闲暇和的确有数据须要下载,但下载没有运行起来,就返回 errPeersUnavailable
谬误。
到此为止fetchParts
函数就剖析的差不多了。外面波及的跟queue.go
相干的一些函数都在queue详解大节里介绍了。
processHeaders
通过headerProcCh
接管header
数据,并解决的过程是在processHeaders
函数中实现的。整个处理过程集中在:case headers := <-d.headerProcCh中
:
①:如果headers
的长度为0 ,则会有以下操作:
1.1 告诉所有人header
曾经处理完毕
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: } }
1.2 若没有检索到任何header
,阐明他们的TD
小于咱们的,或者曾经通过咱们的fetcher
模块进行了同步。
if d.mode != LightSync { head := d.blockchain.CurrentBlock() if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { return errStallingPeer } }
1.3 如果是fast
或者light
同步,确保传递了header
if d.mode == FastSync || d.mode == LightSync { head := d.lightchain.CurrentHeader() if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { return errStallingPeer } }
②:如果headers
的长度大于 0
2.1 如果是fast或者light 同步,调用ightchain.InsertHeaderChain()写入header
到leveldb
数据库;
if d.mode == FastSync || d.mode == LightSync { .... d.lightchain.InsertHeaderChain(chunk, frequency); ....}
2.2 如果是fast
或者full sync
模式,则调用 d.queue.Schedule进行内容(body和receipt)检索。
if d.mode == FullSync || d.mode == FastSync { ... inserts := d.queue.Schedule(chunk, origin) ...}
③:如果找到更新的块号,则要发信号告诉新工作
if d.syncStatsChainHeight < origin { d.syncStatsChainHeight = origin - 1 }for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: } }
到此解决Headers
的剖析就实现了。
同步bodies
同步bodies
则是由fetchBodies
函数实现的。
fetchBodies
同步bodies的过程跟同步header相似,大抵讲下步骤:
- 调用
fetchParts
ReserveBodies
()从bodyTaskPool
中取出要同步的body
;- 调用
fetch
,也就是调用这里的FetchBodies
从节点获取body
,发送GetBlockBodiesMsg
音讯; - 收到
bodyCh
的数据后,调用deliver
函数,将Transactions和Uncles
写入resultCache
。
同步Receipts
fetchReceipts
同步receipts
的过程跟同步header
相似,大抵讲下步骤:
- 调用
fetchParts
() ReserveBodies
()从ReceiptTaskPool
中取出要同步的Receipt
- 调用这里的
FetchReceipts
从节点获取receipts
,发送GetReceiptsMsg
音讯; - 收到
receiptCh
的数据后,调用deliver
函数,将Receipts
写入resultCache
。
同步状态
这里咱们讲两种模式下的状态同步:
- fullSync:
processFullSyncContent
,full
模式下Receipts
没有缓存到resultCache
中,间接先从缓存中取出body
数据,而后执行交易生成状态,最初写入区块链。 - fastSync:
processFastSyncContent
:fast模式的Receipts、Transaction、Uncles都在resultCache中,所以还须要下载"state",进行校验,再写入区块链。
接下来大抵的探讨下这两种形式。
processFullSyncContent
func (d *Downloader) processFullSyncContent() error { for { results := d.queue.Results(true) ... if err := d.importBlockResults(results); err != nil ... }}
func (d *Downloader) importBlockResults(results []*fetchResult) error { ... select {... blocks := make([]*types.Block, len(results)) for i, result := range results { blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) } if index, err := d.blockchain.InsertChain(blocks); err != nil { ....}
间接从result
中获取数据并生成block
,直接插入区块链中,就完结了。
processFastSyncContent
fast模式同步状态内容比拟多,大抵也就如下几局部,咱们开始简略剖析以下。
①:下载最新的区块状态
sync := d.syncState(latest.Root)
咱们间接用一张图来示意整个大抵流程:
具体的代码读者本人翻阅,大抵就是这么个简略过程。
②:计算出pivot块
pivot
为latestHeight - 64
,调用splitAroundPivot
()办法以pivot为核心,将results
分为三个局部:beforeP
,P
,afterP
;
pivot := uint64(0) if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) { pivot = height - uint64(fsMinFullBlocks) }
P, beforeP, afterP := splitAroundPivot(pivot, results)
③: 对beforeP
的局部调用commitFastSyncData
,将body
和receipt
都写入区块链
d.commitFastSyncData(beforeP, sync);
④:对P的局部更新状态信息为P block
的状态,把P对应的result(蕴含body和receipt)调用commitPivotBlock插入本地区块链中,并调用FastSyncCommitHead记录这个pivot的hash值,存在downloader中,标记为疾速同步的最初一个区块hash值;
if err := d.commitPivotBlock(P); err != nil { return err }
⑤:对afterP
调用d.importBlockResults
,将body
插入区块链,而不插入receipt
。因为是最初 64 个区块,所以此时数据库中只有header
和body
,没有receipt
和状态,要通过fullSync
模式进行最初的同步。
if err := d.importBlockResults(afterP); err != nil { return err }
到此为止整个Downloader同步实现了。
参考
https://mindcarver.cnhttps://github.com/ethereum/g...
https://yangzhe.me/2019/05/09...