From 2a8b4501efc33290920bec72546ecdca52338590 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 24 Feb 2025 14:46:50 +0800 Subject: [PATCH 01/10] eth/downloader: use intermediate variable for better readability (#17510) --- eth/downloader/downloader.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index e14775d06a19..98879252a569 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -667,8 +667,10 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err continue } // Otherwise check if we already know the header or not - if (d.mode == FullSync && d.blockchain.HasBlock(headers[i].Hash(), headers[i].Number.Uint64())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) { - number, hash = headers[i].Number.Uint64(), headers[i].Hash() + h := headers[i].Hash() + n := headers[i].Number.Uint64() + if (d.mode == FullSync && d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && d.lightchain.HasHeader(h, n)) { + number, hash = n, h // If every header is known, even future ones, the peer straight out lied about its head if number > height && i == limit-1 { @@ -732,11 +734,13 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err arrived = true // Modify the search interval based on the response - if (d.mode == FullSync && !d.blockchain.HasBlock(headers[0].Hash(), headers[0].Number.Uint64())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) { + h := headers[0].Hash() + n := headers[0].Number.Uint64() + if (d.mode == FullSync && !d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && !d.lightchain.HasHeader(h, n)) { end = check break } - header := d.lightchain.GetHeaderByHash(headers[0].Hash()) // Independent of sync mode, header surely exists + header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists if header.Number.Uint64() != check { p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check) return 0, errBadPeer From e424ff1380447dda335d3a2b9037eee91013ec48 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 24 Feb 2025 16:27:33 +0800 Subject: [PATCH 02/10] eth/downloader: fix comment typos (#17956) --- eth/downloader/downloader.go | 6 +++--- eth/downloader/queue.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 98879252a569..ae42ae638555 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1218,7 +1218,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er } // If no headers were retrieved at all, the peer violated its TD promise that it had a // better chain compared to ours. The only exception is if its promised blocks were - // already imported by other means (e.g. fecher): + // already imported by other means (e.g. fetcher): // // R , L : Both at block 10 // R: Mine block 11, and propagate it to L @@ -1428,7 +1428,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { defer stateSync.Cancel() go func() { if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { - d.queue.Close() // wake up WaitResults + d.queue.Close() // wake up Results } }() // Figure out the ideal pivot block. Note, that this goalpost may move if the @@ -1486,7 +1486,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { defer stateSync.Cancel() go func() { if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { - d.queue.Close() // wake up WaitResults + d.queue.Close() // wake up Results } }() oldPivot = P diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 4815207307b1..a65a2877dc68 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -143,7 +143,7 @@ func (q *queue) Reset() { q.resultOffset = 0 } -// Close marks the end of the sync, unblocking WaitResults. +// Close marks the end of the sync, unblocking Results. // It may be called even if the queue is already closed. func (q *queue) Close() { q.lock.Lock() @@ -545,7 +545,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common taskQueue.Push(header, -int64(header.Number.Uint64())) } if progress { - // Wake WaitResults, resultCache was modified + // Wake Results, resultCache was modified q.active.Signal() } // Assemble and return the block download request @@ -860,7 +860,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ taskQueue.Push(header, -int64(header.Number.Uint64())) } } - // Wake up WaitResults + // Wake up Results if accepted > 0 { q.active.Signal() } From 31011d4aa98c01e852d067f0faa13ab252fee025 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 24 Feb 2025 15:55:29 +0800 Subject: [PATCH 03/10] eth/downloader: fix invalid hash chain error due to head mini reorg (#17839) --- eth/downloader/downloader.go | 39 ++++++++++++++++++++++++++++++- eth/downloader/downloader_test.go | 4 ++-- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index ae42ae638555..938b9da6eaaf 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -63,6 +63,9 @@ var ( maxHeadersProcess = 2048 // Number of header download results to import at once into the chain maxResultsProcess = 2048 // Number of content download results to import at once into the chain + reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection + reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs + fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it @@ -856,6 +859,30 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) } headers = filled[proced:] from += uint64(proced) + } else { + // If we're closing in on the chain head, but haven't yet reached it, delay + // the last few headers so mini reorgs on the head don't cause invalid hash + // chain errors. + if n := len(headers); n > 0 { + // Retrieve the current head we're at + head := uint64(0) + if d.mode == LightSync { + head = d.lightchain.CurrentHeader().Number.Uint64() + } else { + head = d.blockchain.CurrentFastBlock().NumberU64() + if full := d.blockchain.CurrentBlock().NumberU64(); head < full { + head = full + } + } + // If the head is way older than this batch, delay the last few headers + if head+uint64(reorgProtThreshold) < headers[n-1].Number.Uint64() { + delay := reorgProtHeaderDelay + if delay > n { + delay = n + } + headers = headers[:n-delay] + } + } } // Insert all the new headers and fetch the next batch if len(headers) > 0 { @@ -866,8 +893,18 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) return errCancelHeaderFetch } from += uint64(len(headers)) + getHeaders(from) + } else { + // No headers delivered, or all of them being delayed, sleep a bit and retry + p.log.Trace("All headers delayed, waiting") + select { + case <-time.After(fsHeaderContCheck): + getHeaders(from) + continue + case <-d.cancelCh: + return errCancelHeaderFetch + } } - getHeaders(from) case <-timeout.C: if d.dropPeer == nil { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 3df24e6d370a..4167d9c937c2 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -762,7 +762,7 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { tester.downloader.queue.lock.Unlock() tester.lock.Unlock() - if cached == blockCacheItems || retrieved+cached+frozen == targetBlocks+1 { + if cached == blockCacheItems || cached == blockCacheItems-reorgProtHeaderDelay || retrieved+cached+frozen == targetBlocks+1 || retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay { break } } @@ -772,7 +772,7 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { tester.lock.RLock() retrieved = len(tester.ownBlocks) tester.lock.RUnlock() - if cached != blockCacheItems && retrieved+cached+frozen != targetBlocks+1 { + if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay { t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1) } // Permit the blocked blocks to import From 9ad4e1d2822c15d9319119e67106d5c184ec6ec8 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 24 Feb 2025 17:02:49 +0800 Subject: [PATCH 04/10] downloader: measure successfull deliveries, not failed (#17983) --- eth/downloader/statesync.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index 19abe0346bef..cf5c78ac371a 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -309,11 +309,12 @@ func (s *stateSync) loop() error { s.d.dropPeer(req.peer.id) } // Process all the received blobs and check for stale delivery - if err := s.process(req); err != nil { + delivered, err := s.process(req) + if err != nil { log.Warn("Node data write error", "err", err) return err } - req.peer.SetNodeDataIdle(len(req.response)) + req.peer.SetNodeDataIdle(delivered) } } return s.commit(true) @@ -391,10 +392,11 @@ func (s *stateSync) fillTasks(n int, req *stateReq) { // process iterates over a batch of delivered state data, injecting each item // into a running state sync, re-queuing any items that were requested but not -// delivered. -func (s *stateSync) process(req *stateReq) error { +// delivered. Returns whether the peer actually managed to deliver anything of +// value, and any error that occurred. +func (s *stateSync) process(req *stateReq) (int, error) { // Collect processing stats and update progress if valid data was received - duplicate, unexpected := 0, 0 + duplicate, unexpected, successful := 0, 0, 0 defer func(start time.Time) { if duplicate > 0 || unexpected > 0 { @@ -404,7 +406,6 @@ func (s *stateSync) process(req *stateReq) error { // Iterate over all the delivered data and inject one-by-one into the trie progress := false - for _, blob := range req.response { prog, hash, err := s.processNodeData(blob) switch err { @@ -412,12 +413,13 @@ func (s *stateSync) process(req *stateReq) error { s.numUncommitted++ s.bytesUncommitted += len(blob) progress = progress || prog + successful++ case trie.ErrNotRequested: unexpected++ case trie.ErrAlreadyProcessed: duplicate++ default: - return fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) + return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) } delete(req.tasks, hash) } @@ -433,12 +435,12 @@ func (s *stateSync) process(req *stateReq) error { // If we've requested the node too many times already, it may be a malicious // sync where nobody has the right data. Abort. if len(task.attempts) >= npeers { - return fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) + return successful, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) } // Missing item, place into the retry queue. s.tasks[hash] = task } - return nil + return successful, nil } // processNodeData tries to inject a trie node data blob delivered from a remote From 3adec8148389807785e0e05dd23548b58987f4cb Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 24 Feb 2025 15:04:44 +0800 Subject: [PATCH 05/10] downloader: different sync strategy (#18085) --- core/blockchain.go | 11 +++ core/rawdb/accessors_chain.go | 12 +++ eth/downloader/downloader.go | 147 ++++++++++++++++++++++-------- eth/downloader/downloader_test.go | 107 +++++++++++++++++++--- 4 files changed, 226 insertions(+), 51 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 6c4e9c3e95fd..752455b9fce3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -852,6 +852,17 @@ func (bc *BlockChain) HasBlock(hash common.Hash, number uint64) bool { return rawdb.HasBody(bc.db, hash, number) } +// HasFastBlock checks if a fast block is fully present in the database or not. +func (bc *BlockChain) HasFastBlock(hash common.Hash, number uint64) bool { + if !bc.HasBlock(hash, number) { + return false + } + if bc.receiptsCache.Contains(hash) { + return true + } + return rawdb.HasReceipts(bc.db, hash, number) +} + // HasFullState checks if state trie is fully present in the database or not. func (bc *BlockChain) HasFullState(block *types.Block) bool { _, err := bc.stateCache.OpenTrie(block.Root()) diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index a641f25fc91d..071d69445bf7 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -370,6 +370,18 @@ func DeleteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { } } +// HasReceipts verifies the existence of all the transaction receipts belonging +// to a block. +func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool { + if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash { + return true + } + if has, err := db.Has(blockReceiptsKey(number, hash)); !has || err != nil { + return false + } + return true +} + // ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding. func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { // First try to look up the data in ancient database. Extra hash diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 938b9da6eaaf..51f68f6f2a6f 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -187,6 +187,9 @@ type BlockChain interface { // HasBlock verifies a block's presence in the local chain. HasBlock(common.Hash, uint64) bool + // HasFastBlock verifies a fast block's presence in the local chain. + HasFastBlock(common.Hash, uint64) bool + // GetBlockByHash retrieves a block from the local chain. GetBlockByHash(common.Hash) *types.Block @@ -437,7 +440,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I } height := latest.Number.Uint64() - origin, err := d.findAncestor(p, height) + origin, err := d.findAncestor(p, latest) if err != nil { return err } @@ -595,41 +598,88 @@ func (d *Downloader) fetchHeight(p *peerConnection, hash common.Hash) (*types.He } } +// calculateRequestSpan calculates what headers to request from a peer when trying to determine the +// common ancestor. +// It returns parameters to be used for peer.RequestHeadersByNumber: +// +// from - starting block number +// count - number of headers to request +// skip - number of headers to skip +// +// and also returns 'max', the last block which is expected to be returned by the remote peers, +// given the (from,count,skip) +func calculateRequestSpan(remoteHeight, localHeight uint64) (int64, int, int, uint64) { + var ( + from int + count int + MaxCount = MaxHeaderFetch / 16 + ) + // requestHead is the highest block that we will ask for. If requestHead is not offset, + // the highest block that we will get is 16 blocks back from head, which means we + // will fetch 14 or 15 blocks unnecessarily in the case the height difference + // between us and the peer is 1-2 blocks, which is most common + requestHead := int(remoteHeight) - 1 + if requestHead < 0 { + requestHead = 0 + } + // requestBottom is the lowest block we want included in the query + // Ideally, we want to include the one just below our own head + requestBottom := int(localHeight - 1) + if requestBottom < 0 { + requestBottom = 0 + } + totalSpan := requestHead - requestBottom + span := 1 + totalSpan/MaxCount + if span < 2 { + span = 2 + } + if span > 16 { + span = 16 + } + + count = 1 + totalSpan/span + if count > MaxCount { + count = MaxCount + } + if count < 2 { + count = 2 + } + from = requestHead - (count-1)*span + if from < 0 { + from = 0 + } + max := from + (count-1)*span + return int64(from), count, span - 1, uint64(max) +} + // findAncestor tries to locate the common ancestor link of the local chain and // a remote peers blockchain. In the general case when our node was in sync and // on the correct chain, checking the top N links should already get us a match. // In the rare scenario when we ended up on a long reorganisation (i.e. none of // the head links match), we do a binary search to find the common ancestor. -func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) { +func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) (uint64, error) { // Figure out the valid ancestor range to prevent rewrite attacks - floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64() - - if d.mode == FullSync { - ceil = d.blockchain.CurrentBlock().NumberU64() - } else if d.mode == FastSync { - ceil = d.blockchain.CurrentFastBlock().NumberU64() + var ( + floor = int64(-1) + localHeight uint64 + remoteHeight = remoteHeader.Number.Uint64() + ) + switch d.mode { + case FullSync: + localHeight = d.blockchain.CurrentBlock().NumberU64() + case FastSync: + localHeight = d.blockchain.CurrentFastBlock().NumberU64() + default: + localHeight = d.lightchain.CurrentHeader().Number.Uint64() } - if ceil >= MaxForkAncestry { - floor = int64(ceil - MaxForkAncestry) + p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight) + if localHeight >= MaxForkAncestry { + floor = int64(localHeight - MaxForkAncestry) } - p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height) + from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight) - // Request the topmost blocks to short circuit binary ancestor lookup - head := ceil - if head > height { - head = height - } - from := int64(head) - int64(MaxHeaderFetch) - if from < 0 { - from = 0 - } - // Span out with 15 block gaps into the future to catch bad head reports - limit := 2 * MaxHeaderFetch / 16 - count := 1 + int((int64(ceil)-from)/16) - if count > limit { - count = limit - } - go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false) + p.log.Trace("Span searching for common ancestor", "count", count, "from", from, "skip", skip) + go p.peer.RequestHeadersByNumber(uint64(from), count, skip, false) // Wait for the remote response to the head fetch number, hash := uint64(0), common.Hash{} @@ -656,9 +706,10 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err return 0, errEmptyHeaderSet } // Make sure the peer's reply conforms to the request - for i := 0; i < len(headers); i++ { - if number := headers[i].Number.Int64(); number != from+int64(i)*16 { - p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number) + for i, header := range headers { + expectNumber := from + int64(i)*int64((skip+1)) + if number := header.Number.Int64(); number != expectNumber { + p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number) return 0, errInvalidChain } } @@ -666,20 +717,24 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err finished = true for i := len(headers) - 1; i >= 0; i-- { // Skip any headers that underflow/overflow our requested set - if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil { + if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > max { continue } // Otherwise check if we already know the header or not h := headers[i].Hash() n := headers[i].Number.Uint64() - if (d.mode == FullSync && d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && d.lightchain.HasHeader(h, n)) { - number, hash = n, h - // If every header is known, even future ones, the peer straight out lied about its head - if number > height && i == limit-1 { - p.log.Warn("Lied about chain head", "reported", height, "found", number) - return 0, errStallingPeer - } + var known bool + switch d.mode { + case FullSync: + known = d.blockchain.HasBlock(h, n) + case FastSync: + known = d.blockchain.HasFastBlock(h, n) + default: + known = d.lightchain.HasHeader(h, n) + } + if known { + number, hash = n, h break } } @@ -703,10 +758,12 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err return number, nil } // Ancestor not found, we need to binary search over our chain - start, end := uint64(0), head + start, end := uint64(0), remoteHeight if floor > 0 { start = uint64(floor) } + p.log.Trace("Binary searching for common ancestor", "start", start, "end", end) + for start+1 < end { // Split our chain interval in two, and request the hash to cross check check := (start + end) / 2 @@ -739,7 +796,17 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err // Modify the search interval based on the response h := headers[0].Hash() n := headers[0].Number.Uint64() - if (d.mode == FullSync && !d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && !d.lightchain.HasHeader(h, n)) { + + var known bool + switch d.mode { + case FullSync: + known = d.blockchain.HasBlock(h, n) + case FastSync: + known = d.blockchain.HasFastBlock(h, n) + default: + known = d.lightchain.HasHeader(h, n) + } + if !known { end = check break } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 4167d9c937c2..4fe9bcc03e19 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -21,16 +21,16 @@ import ( "fmt" "math" "math/big" + "strings" "sync" "sync/atomic" "testing" "time" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" - "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus/ethash" "github.com/XinFinOrg/XDPoSChain/core" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/crypto" "github.com/XinFinOrg/XDPoSChain/ethdb" @@ -230,6 +230,15 @@ func (dl *downloadTester) HasBlock(hash common.Hash, number uint64) bool { return dl.GetBlockByHash(hash) != nil } +// HasFastBlock checks if a block is present in the testers canonical chain. +func (dl *downloadTester) HasFastBlock(hash common.Hash, number uint64) bool { + dl.lock.RLock() + defer dl.lock.RUnlock() + + _, ok := dl.ownReceipts[hash] + return ok +} + // GetHeader retrieves a header from the testers canonical chain. func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header { dl.lock.RLock() @@ -350,6 +359,7 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (int, error) { dl.ownHeaders[block.Hash()] = block.Header() } dl.ownBlocks[block.Hash()] = block + dl.ownReceipts[block.Hash()] = make(types.Receipts, 0) dl.stateDb.Put(block.Root().Bytes(), []byte{0x00}) dl.ownChainTd[block.Hash()] = new(big.Int).Add(dl.ownChainTd[block.ParentHash()], block.Difficulty()) } @@ -622,28 +632,28 @@ func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error { // assertOwnChain checks if the local chain contains the correct number of items // of the various chain components. func assertOwnChain(t *testing.T, tester *downloadTester, length int) { + // Mark this method as a helper to report errors at callsite, not in here + t.Helper() + assertOwnForkedChain(t, tester, 1, []int{length}) } // assertOwnForkedChain checks if the local forked chain contains the correct // number of items of the various chain components. func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) { + // Mark this method as a helper to report errors at callsite, not in here + t.Helper() + // Initialize the counters for the first fork - headers, blocks, receipts := lengths[0], lengths[0], lengths[0]-fsMinFullBlocks + headers, blocks, receipts := lengths[0], lengths[0], lengths[0] - if receipts < 0 { - receipts = 1 - } // Update the counters for each subsequent fork for _, length := range lengths[1:] { headers += length - common blocks += length - common - receipts += length - common - fsMinFullBlocks + receipts += length - common } - switch tester.downloader.mode { - case FullSync: - receipts = 1 - case LightSync: + if tester.downloader.mode == LightSync { blocks, receipts = 1, 1 } if hs := len(tester.ownHeaders); hs != headers { @@ -1773,3 +1783,78 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { tester.downloader.peers.peers["peer"].peer.(*floodingTestPeer).pend.Wait() } } + +func TestRemoteHeaderRequestSpan(t *testing.T) { + testCases := []struct { + remoteHeight uint64 + localHeight uint64 + expected []int + }{ + // Remote is way higher. We should ask for the remote head and go backwards + {1500, 1000, + []int{1323, 1339, 1355, 1371, 1387, 1403, 1419, 1435, 1451, 1467, 1483, 1499}, + }, + {15000, 13006, + []int{14823, 14839, 14855, 14871, 14887, 14903, 14919, 14935, 14951, 14967, 14983, 14999}, + }, + //Remote is pretty close to us. We don't have to fetch as many + {1200, 1150, + []int{1149, 1154, 1159, 1164, 1169, 1174, 1179, 1184, 1189, 1194, 1199}, + }, + // Remote is equal to us (so on a fork with higher td) + // We should get the closest couple of ancestors + {1500, 1500, + []int{1497, 1499}, + }, + // We're higher than the remote! Odd + {1000, 1500, + []int{997, 999}, + }, + // Check some weird edgecases that it behaves somewhat rationally + {0, 1500, + []int{0, 2}, + }, + {6000000, 0, + []int{5999823, 5999839, 5999855, 5999871, 5999887, 5999903, 5999919, 5999935, 5999951, 5999967, 5999983, 5999999}, + }, + {0, 0, + []int{0, 2}, + }, + } + reqs := func(from, count, span int) []int { + var r []int + num := from + for len(r) < count { + r = append(r, num) + num += span + 1 + } + return r + } + for i, tt := range testCases { + from, count, span, max := calculateRequestSpan(tt.remoteHeight, tt.localHeight) + data := reqs(int(from), count, span) + + if max != uint64(data[len(data)-1]) { + t.Errorf("test %d: wrong last value %d != %d", i, data[len(data)-1], max) + } + failed := false + if len(data) != len(tt.expected) { + failed = true + t.Errorf("test %d: length wrong, expected %d got %d", i, len(tt.expected), len(data)) + } else { + for j, n := range data { + if n != tt.expected[j] { + failed = true + break + } + } + } + if failed { + res := strings.Replace(fmt.Sprint(data), " ", ",", -1) + exp := strings.Replace(fmt.Sprint(tt.expected), " ", ",", -1) + fmt.Printf("got: %v\n", res) + fmt.Printf("exp: %v\n", exp) + t.Errorf("test %d: wrong values", i) + } + } +} From 96db952576e2928be7437ddaafa75dfa5f8122d2 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 24 Feb 2025 17:31:41 +0800 Subject: [PATCH 06/10] downloader: fix edgecase where returned index is OOB for downloader (#18335) --- eth/downloader/downloader.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 51f68f6f2a6f..970ad7406e13 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1510,7 +1510,15 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) } if index, err := d.blockchain.InsertChain(blocks); err != nil { - log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + if index < len(results) { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + } else { + // The InsertChain method in blockchain.go will sometimes return an out-of-bounds index, + // when it needs to preprocess blocks to import a sidechain. + // The importer will put together a new list of blocks to import, which is a superset + // of the blocks delivered from the downloader, and the indexing will be off. + log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err) + } return errInvalidChain } if d.handleProposedBlock != nil { From 04adabe55ce9349e81de0d0527fc9e9d0aedcff1 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 24 Feb 2025 16:46:41 +0800 Subject: [PATCH 07/10] eth/downloader: fix nil droppeer in state sync (#19232) --- eth/downloader/statesync.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index cf5c78ac371a..f78541c0512f 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -306,7 +306,13 @@ func (s *stateSync) loop() error { // 2 items are the minimum requested, if even that times out, we've no use of // this peer at the moment. log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id) - s.d.dropPeer(req.peer.id) + if s.d.dropPeer == nil { + // The dropPeer method is nil when `--copydb` is used for a local copy. + // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored + req.peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", req.peer.id) + } else { + s.d.dropPeer(req.peer.id) + } } // Process all the received blobs and check for stale delivery delivered, err := s.process(req) From 3ee371454fe03dcf1c91e7ee68c993f82b430946 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 24 Feb 2025 16:45:02 +0800 Subject: [PATCH 08/10] eth/downloader: make syncing error obvious (#19413) --- eth/downloader/downloader.go | 69 +++++++++++++++---------------- eth/downloader/downloader_test.go | 6 --- eth/downloader/statesync.go | 10 +++++ 3 files changed, 43 insertions(+), 42 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 970ad7406e13..3bb2cdda087f 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -84,16 +84,11 @@ var ( errPeersUnavailable = errors.New("no peers available or all tried for download") errInvalidAncestor = errors.New("retrieved ancestor is invalid") errInvalidChain = errors.New("retrieved hash chain is invalid") - errInvalidBlock = errors.New("retrieved block is invalid") errInvalidBody = errors.New("retrieved block body is invalid") errInvalidReceipt = errors.New("retrieved receipt is invalid") - errCancelBlockFetch = errors.New("block download canceled (requested)") - errCancelHeaderFetch = errors.New("block header download canceled (requested)") - errCancelBodyFetch = errors.New("block body download canceled (requested)") - errCancelReceiptFetch = errors.New("receipt download canceled (requested)") errCancelStateFetch = errors.New("state data download canceled (requested)") - errCancelHeaderProcessing = errors.New("header processing canceled (requested)") errCancelContentProcessing = errors.New("content processing canceled (requested)") + errCanceled = errors.New("syncing canceled (requested)") errNoSyncActive = errors.New("no sync active") errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)") errEnoughBlock = errors.New("downloader download enough block") @@ -313,14 +308,6 @@ func (d *Downloader) UnregisterPeer(id string) error { } d.queue.Revoke(id) - // If this peer was the master peer, abort sync immediately - d.cancelLock.RLock() - master := id == d.cancelPeer - d.cancelLock.RUnlock() - - if master { - d.cancel() - } return nil } @@ -330,7 +317,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode err := d.synchronise(id, head, td, mode) switch err { case nil: - case errBusy: + case errBusy, errCanceled: case errTimeout, errBadPeer, errStallingPeer, errEmptyHeaderSet, errPeersUnavailable, errTooOld, @@ -505,7 +492,7 @@ func (d *Downloader) spawnSync(fetchers []func() error) error { // it has processed the queue. d.queue.Close() } - if err = <-errc; err != nil { + if err = <-errc; err != nil && err != errCanceled { break } } @@ -569,7 +556,7 @@ func (d *Downloader) fetchHeight(p *peerConnection, hash common.Hash) (*types.He for { select { case <-d.cancelCh: - return nil, errCancelBlockFetch + return nil, errCanceled case packet := <-d.headerCh: // Discard anything not from the origin peer @@ -690,8 +677,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) for finished := false; !finished; { select { case <-d.cancelCh: - - return 0, errCancelHeaderFetch + return 0, errCanceled case packet := <-d.headerCh: // Discard anything not from the origin peer @@ -777,7 +763,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) for arrived := false; !arrived; { select { case <-d.cancelCh: - return 0, errCancelHeaderFetch + return 0, errCanceled case packer := <-d.headerCh: // Discard anything not from the origin peer @@ -876,7 +862,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) for { select { case <-d.cancelCh: - return errCancelHeaderFetch + return errCanceled case packet := <-d.headerCh: // Make sure the active peer is giving us the skeleton headers @@ -903,7 +889,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) getHeaders(from) continue case <-d.cancelCh: - return errCancelHeaderFetch + return errCanceled } } // Pivot done (or not in fast sync) and no more headers, terminate the process @@ -912,7 +898,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) case d.headerProcCh <- nil: return nil case <-d.cancelCh: - return errCancelHeaderFetch + return errCanceled } } headers := packet.(*headerPack).headers @@ -957,7 +943,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) select { case d.headerProcCh <- headers: case <-d.cancelCh: - return errCancelHeaderFetch + return errCanceled } from += uint64(len(headers)) getHeaders(from) @@ -969,7 +955,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) getHeaders(from) continue case <-d.cancelCh: - return errCancelHeaderFetch + return errCanceled } } @@ -1028,7 +1014,7 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) } setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) } ) - err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, + err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire, d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve, nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers") @@ -1054,7 +1040,7 @@ func (d *Downloader) fetchBodies(from uint64) error { capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) } setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) } ) - err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, + err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire, d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies") @@ -1078,7 +1064,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) } setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) } ) - err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, + err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire, d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts") @@ -1111,7 +1097,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { // - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks // - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping) // - kind: textual label of the type being downloaded to display in log mesages -func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, +func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int, idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error { @@ -1127,7 +1113,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv for { select { case <-d.cancelCh: - return errCancel + return errCanceled case packet := <-deliveryCh: // If the peer was previously banned and failed to deliver its pack @@ -1198,12 +1184,23 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv setIdle(peer, 0) } else { peer.log.Debug("Stalling delivery, dropping", "type", kind) + if d.dropPeer == nil { // The dropPeer method is nil when `--copydb` is used for a local copy. // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid) } else { d.dropPeer(pid) + + // If this peer was the master peer, abort sync immediately + d.cancelLock.RLock() + master := pid == d.cancelPeer + d.cancelLock.RUnlock() + + if master { + d.cancel() + return errTimeout + } } } } @@ -1308,7 +1305,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er for { select { case <-d.cancelCh: - return errCancelHeaderProcessing + return errCanceled case headers := <-d.headerProcCh: // Terminate header processing if we synced up @@ -1357,12 +1354,11 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er } // Otherwise split the chunk of headers into batches and process them gotHeaders = true - for len(headers) > 0 { // Terminate if something failed in between processing chunks select { case <-d.cancelCh: - return errCancelHeaderProcessing + return errCanceled default: } // Select the next chunk of headers to import @@ -1406,7 +1402,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { select { case <-d.cancelCh: - return errCancelHeaderProcessing + return errCanceled case <-time.After(time.Second): } } @@ -1539,7 +1535,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { stateSync := d.syncState(latest.Root) defer stateSync.Cancel() go func() { - if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { + if err := stateSync.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled { d.queue.Close() // wake up Results } }() @@ -1567,6 +1563,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { // If sync failed, stop select { case <-d.cancelCh: + stateSync.Cancel() return stateSync.Cancel() default: } @@ -1597,7 +1594,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { stateSync = d.syncState(P.Header.Root) defer stateSync.Cancel() go func() { - if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { + if err := stateSync.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled { d.queue.Close() // wake up Results } }() diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 4fe9bcc03e19..48ad7b7415f1 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -1348,14 +1348,8 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop - {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin {errInvalidBody, false}, // A bad peer was detected, but not the sync origin {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin - {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelHeaderProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop } // Run the tests and check disconnection status diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index f78541c0512f..b9e312f95d08 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -312,6 +312,16 @@ func (s *stateSync) loop() error { req.peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", req.peer.id) } else { s.d.dropPeer(req.peer.id) + + // If this peer was the master peer, abort sync immediately + s.d.cancelLock.RLock() + master := req.peer.id == s.d.cancelPeer + s.d.cancelLock.RUnlock() + + if master { + s.d.cancel() + return errTimeout + } } } // Process all the received blobs and check for stale delivery From 2955a988caf7ec52190de745551cfb315915133b Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 24 Feb 2025 17:49:13 +0800 Subject: [PATCH 09/10] eth/downloader: more context in errors (#21067) --- eth/downloader/downloader.go | 35 ++++++++++++++++++++++--------- eth/downloader/downloader_test.go | 27 ++++++++++++++---------- eth/downloader/queue.go | 14 +++++++------ 3 files changed, 49 insertions(+), 27 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 3bb2cdda087f..b8b59347b176 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -315,13 +315,28 @@ func (d *Downloader) UnregisterPeer(id string) error { // adding various sanity checks as well as wrapping it with various log entries. func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error { err := d.synchronise(id, head, td, mode) + switch err { - case nil: - case errBusy, errCanceled: + case nil, errBusy, errCanceled: + return err + } + if errors.Is(err, errInvalidChain) { + log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err) + if d.dropPeer == nil { + // The dropPeer method is nil when `--copydb` is used for a local copy. + // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored + log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id) + } else { + d.dropPeer(id) + } + return err + } + + switch err { case errTimeout, errBadPeer, errStallingPeer, errEmptyHeaderSet, errPeersUnavailable, errTooOld, - errInvalidAncestor, errInvalidChain: + errInvalidAncestor: log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err) if d.dropPeer == nil { // The dropPeer method is nil when `--copydb` is used for a local copy. @@ -696,7 +711,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) expectNumber := from + int64(i)*int64((skip+1)) if number := header.Number.Int64(); number != expectNumber { p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number) - return 0, errInvalidChain + return 0, fmt.Errorf("%w: %v", errInvalidChain, errors.New("head headers broke chain ordering")) } } // Check if a common ancestor was found @@ -908,7 +923,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) filled, proced, err := d.fillHeaderSkeleton(from, headers) if err != nil { p.log.Debug("Skeleton chain invalid", "err", err) - return errInvalidChain + return fmt.Errorf("%w: %v", errInvalidChain, err) } headers = filled[proced:] from += uint64(proced) @@ -1121,13 +1136,13 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) if peer := d.peers.Peer(packet.PeerId()); peer != nil { // Deliver the received chunk of data and check chain validity accepted, err := deliver(packet) - if err == errInvalidChain { + if errors.Is(err, errInvalidChain) { return err } // Unless a peer delivered something completely else than requested (usually // caused by a timed out request which came through in the end), set it to // idle. If the delivery's stale, the peer should have already been idled. - if err != errStaleDelivery { + if !errors.Is(err, errStaleDelivery) { setIdle(peer, accepted) } // Issue a log to the user to see what's going on @@ -1388,7 +1403,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er rollback = append(rollback, chunk[:n]...) } log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err) - return errInvalidChain + return fmt.Errorf("%w: %v", errInvalidChain, err) } // All verifications passed, store newly found uncertain headers rollback = append(rollback, unknown...) @@ -1515,7 +1530,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { // of the blocks delivered from the downloader, and the indexing will be off. log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err) } - return errInvalidChain + return fmt.Errorf("%w: %v", errInvalidChain, err) } if d.handleProposedBlock != nil { header := blocks[len(blocks)-1].Header() @@ -1666,7 +1681,7 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state } if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil { log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) - return errInvalidChain + return fmt.Errorf("%w: %v", errInvalidChain, err) } return nil } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 48ad7b7415f1..e09a8f56fdd3 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -318,27 +318,32 @@ func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int { func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (int, error) { dl.lock.Lock() defer dl.lock.Unlock() - // Do a quick check, as the blockchain.InsertHeaderChain doesn't insert anything in case of errors if _, ok := dl.ownHeaders[headers[0].ParentHash]; !ok { - return 0, errors.New("unknown parent") + return 0, errors.New("InsertHeaderChain: unknown parent at first position") } + var hashes []common.Hash for i := 1; i < len(headers); i++ { + hash := headers[i-1].Hash() if headers[i].ParentHash != headers[i-1].Hash() { - return i, errors.New("unknown parent") + return i, fmt.Errorf("non-contiguous import at position %d", i) } + hashes = append(hashes, hash) } + hashes = append(hashes, headers[len(headers)-1].Hash()) // Do a full insert if pre-checks passed for i, header := range headers { - if _, ok := dl.ownHeaders[header.Hash()]; ok { + hash := hashes[i] + if _, ok := dl.ownHeaders[hash]; ok { continue } if _, ok := dl.ownHeaders[header.ParentHash]; !ok { - return i, errors.New("unknown parent") + // This _should_ be impossible, due to precheck and induction + return i, fmt.Errorf("InsertHeaderChain: unknown parent at position %d", i) } - dl.ownHashes = append(dl.ownHashes, header.Hash()) - dl.ownHeaders[header.Hash()] = header - dl.ownChainTd[header.Hash()] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty) + dl.ownHashes = append(dl.ownHashes, hash) + dl.ownHeaders[hash] = header + dl.ownChainTd[hash] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty) } return len(headers), nil } @@ -350,9 +355,9 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (int, error) { for i, block := range blocks { if parent, ok := dl.ownBlocks[block.ParentHash()]; !ok { - return i, errors.New("unknown parent") + return i, fmt.Errorf("InsertChain: unknown parent at position %d / %d", i, len(blocks)) } else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil { - return i, fmt.Errorf("unknown parent state %x: %v", parent.Root(), err) + return i, fmt.Errorf("InsertChain: unknown parent state %x: %v", parent.Root(), err) } if _, ok := dl.ownHeaders[block.Hash()]; !ok { dl.ownHashes = append(dl.ownHashes, block.Hash()) @@ -376,7 +381,7 @@ func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []typ return i, errors.New("unknown owner") } if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok { - return i, errors.New("unknown parent") + return i, errors.New("InsertReceiptChain: unknown parent") } dl.ownBlocks[blocks[i].Hash()] = blocks[i] dl.ownReceipts[blocks[i].Hash()] = receipts[i] diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index a65a2877dc68..cbf141e65120 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -510,7 +510,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common index := int(header.Number.Int64() - int64(q.resultOffset)) if index >= len(q.resultCache) || index < 0 { log.Error("index allocation went beyond available resultCache space", "index", index, "len.resultCache", len(q.resultCache), "blockNum", header.Number.Int64(), "resultOffset", q.resultOffset) - return nil, false, errInvalidChain + return nil, false, fmt.Errorf("%w: index allocation went beyond available resultCache space", errInvalidChain) } if q.resultCache[index] == nil { components := 1 @@ -865,14 +865,16 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ q.active.Signal() } // If none of the data was good, it's a stale delivery - switch { - case failure == nil || failure == errInvalidChain: + if failure == nil { + return accepted, nil + } + if errors.Is(failure, errInvalidChain) { return accepted, failure - case useful: + } + if useful { return accepted, fmt.Errorf("partial failure: %v", failure) - default: - return accepted, errStaleDelivery } + return accepted, fmt.Errorf("%w: %v", failure, errStaleDelivery) } // Prepare configures the result cache to allow accepting and caching inbound From bf011986a5751799b35648a054c027d3d63b89cd Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 24 Feb 2025 18:54:12 +0800 Subject: [PATCH 10/10] eth/downloader: fixes data race between synchronize and other methods (#21201) --- eth/downloader/downloader.go | 62 ++++++++++++++++++------------- eth/downloader/downloader_test.go | 4 +- eth/downloader/modes.go | 4 +- eth/handler.go | 2 +- les/handler.go | 2 +- 5 files changed, 42 insertions(+), 32 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index b8b59347b176..e275707fc0c1 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -95,7 +95,7 @@ var ( ) type Downloader struct { - mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) + mode uint32 // Synchronisation mode defining the strategy used (per sync cycle) mux *event.TypeMux // Event multiplexer to announce sync operation events queue *queue // Scheduler for selecting the hashes to download @@ -205,13 +205,12 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, handleProposedBlock proposeBlockHandlerFn) *Downloader { +func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, handleProposedBlock proposeBlockHandlerFn) *Downloader { if lightchain == nil { lightchain = chain } dl := &Downloader{ - mode: mode, stateDB: stateDb, mux: mux, queue: newQueue(), @@ -254,13 +253,16 @@ func (d *Downloader) Progress() XDPoSChain.SyncProgress { defer d.syncStatsLock.RUnlock() current := uint64(0) - switch d.mode { - case FullSync: + mode := d.getMode() + switch { + case d.blockchain != nil && mode == FullSync: current = d.blockchain.CurrentBlock().NumberU64() - case FastSync: + case d.blockchain != nil && mode == FastSync: current = d.blockchain.CurrentFastBlock().NumberU64() - case LightSync: + case d.lightchain != nil: current = d.lightchain.CurrentHeader().Number.Uint64() + default: + log.Error("Unknown downloader chain/mode combo", "light", d.lightchain != nil, "full", d.blockchain != nil, "mode", mode) } return XDPoSChain.SyncProgress{ StartingBlock: d.syncStatsChainOrigin, @@ -403,8 +405,8 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode defer d.Cancel() // No matter what, we can't leave the cancel channel open - // Set the requested sync mode, unless it's forbidden - d.mode = mode + // Atomically set the requested sync mode + atomic.StoreUint32(&d.mode, uint32(mode)) // Retrieve the origin peer and initiate the downloading process p := d.peers.Peer(id) @@ -414,6 +416,10 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode return d.syncWithPeer(p, hash, td) } +func (d *Downloader) getMode() SyncMode { + return SyncMode(atomic.LoadUint32(&d.mode)) +} + // syncWithPeer starts a block synchronization based on the hash chain from the // specified peer and head hash. func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { @@ -429,8 +435,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I if p.version < 62 { return errTooOld } + mode := d.getMode() - log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode) + log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode) defer func(start time.Time) { log.Debug("Synchronisation terminated", "elapsed", time.Since(start)) }(time.Now()) @@ -455,7 +462,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I // Ensure our origin point is below any fast sync pivot point pivot := uint64(0) - if d.mode == FastSync { + if mode == FastSync { if height <= uint64(fsMinFullBlocks) { origin = 0 } else { @@ -466,11 +473,11 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I } } d.committed = 1 - if d.mode == FastSync && pivot != 0 { + if mode == FastSync && pivot != 0 { d.committed = 0 } // Initiate the sync using a concurrent header and content retrieval algorithm - d.queue.Prepare(origin+1, d.mode) + d.queue.Prepare(origin+1, mode) if d.syncInitHook != nil { d.syncInitHook(origin, height) } @@ -481,9 +488,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync func() error { return d.processHeaders(origin+1, pivot, td) }, } - if d.mode == FastSync { + if mode == FastSync { fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) - } else if d.mode == FullSync { + } else if mode == FullSync { fetchers = append(fetchers, func() error { return d.processFullSyncContent(height) }) } return d.spawnSync(fetchers) @@ -666,7 +673,8 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) localHeight uint64 remoteHeight = remoteHeader.Number.Uint64() ) - switch d.mode { + mode := d.getMode() + switch mode { case FullSync: localHeight = d.blockchain.CurrentBlock().NumberU64() case FastSync: @@ -675,6 +683,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) localHeight = d.lightchain.CurrentHeader().Number.Uint64() } p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight) + if localHeight >= MaxForkAncestry { floor = int64(localHeight - MaxForkAncestry) } @@ -726,7 +735,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) n := headers[i].Number.Uint64() var known bool - switch d.mode { + switch mode { case FullSync: known = d.blockchain.HasBlock(h, n) case FastSync: @@ -799,7 +808,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) n := headers[0].Number.Uint64() var known bool - switch d.mode { + switch mode { case FullSync: known = d.blockchain.HasBlock(h, n) case FastSync: @@ -874,6 +883,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) // Start pulling the header chain skeleton until all is done getHeaders(from) + mode := d.getMode() for { select { case <-d.cancelCh: @@ -934,7 +944,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) if n := len(headers); n > 0 { // Retrieve the current head we're at head := uint64(0) - if d.mode == LightSync { + if mode == LightSync { head = d.lightchain.CurrentHeader().Number.Uint64() } else { head = d.blockchain.CurrentFastBlock().NumberU64() @@ -1289,6 +1299,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { // Keep a count of uncertain headers to roll back rollback := []*types.Header{} + mode := d.getMode() defer func() { if len(rollback) > 0 { // Flatten the headers and roll them back @@ -1297,13 +1308,13 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er hashes[i] = header.Hash() } lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0 - if d.mode != LightSync { + if mode != LightSync { lastFastBlock = d.blockchain.CurrentFastBlock().Number() lastBlock = d.blockchain.CurrentBlock().Number() } d.lightchain.Rollback(hashes) curFastBlock, curBlock := common.Big0, common.Big0 - if d.mode != LightSync { + if mode != LightSync { curFastBlock = d.blockchain.CurrentFastBlock().Number() curBlock = d.blockchain.CurrentBlock().Number() } @@ -1344,7 +1355,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er // L: Sync begins, and finds common ancestor at 11 // L: Request new headers up from 11 (R's TD was higher, it must have something) // R: Nothing to give - if d.mode != LightSync { + if mode != LightSync { head := d.blockchain.CurrentBlock() if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { return errStallingPeer @@ -1357,7 +1368,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er // This check cannot be executed "as is" for full imports, since blocks may still be // queued for processing when the header download completes. However, as long as the // peer gave us something useful, we're already happy/progressed (above check). - if d.mode == FastSync || d.mode == LightSync { + if mode == FastSync || mode == LightSync { head := d.lightchain.CurrentHeader() if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { return errStallingPeer @@ -1382,9 +1393,8 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er limit = len(headers) } chunk := headers[:limit] - // In case of header only syncing, validate the chunk immediately - if d.mode == FastSync || d.mode == LightSync { + if mode == FastSync || mode == LightSync { // Collect the yet unknown headers to mark them as uncertain unknown := make([]*types.Header, 0, len(headers)) for _, header := range chunk { @@ -1412,7 +1422,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er } } // Unless we're doing light chains, schedule the headers for associated content retrieval - if d.mode == FullSync || d.mode == FastSync { + if mode == FullSync || mode == FastSync { // If we've reached the allowed number of pending headers, stall a bit for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { select { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index e09a8f56fdd3..0c1115f227ff 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -99,7 +99,7 @@ func newTester() *downloadTester { tester.stateDb = rawdb.NewMemoryDatabase() tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00}) - tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer, tester.handleProposedBlock) + tester.downloader = New(tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer, tester.handleProposedBlock) return tester } @@ -658,7 +658,7 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng blocks += length - common receipts += length - common } - if tester.downloader.mode == LightSync { + if tester.downloader.getMode() == LightSync { blocks, receipts = 1, 1 } if hs := len(tester.ownHeaders); hs != headers { diff --git a/eth/downloader/modes.go b/eth/downloader/modes.go index 8ecdf91f1187..174b5a1c712a 100644 --- a/eth/downloader/modes.go +++ b/eth/downloader/modes.go @@ -18,8 +18,8 @@ package downloader import "fmt" -// SyncMode represents the synchronisation mode of the downloader. -type SyncMode int +// It is a uint32 as it is used with atomic operations. +type SyncMode uint32 const ( FullSync SyncMode = iota // Synchronise the entire blockchain history from full blocks diff --git a/eth/handler.go b/eth/handler.go index 0abf711fe238..90e83c3c15b6 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -209,7 +209,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne } // Construct the different synchronisation mechanisms - manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer, handleProposedBlock) + manager.downloader = downloader.New(chaindb, manager.eventMux, blockchain, nil, manager.removePeer, handleProposedBlock) validator := func(header *types.Header) error { return engine.VerifyHeader(blockchain, header, true) diff --git a/les/handler.go b/les/handler.go index d988101004cf..6265abee1dc1 100644 --- a/les/handler.go +++ b/les/handler.go @@ -208,7 +208,7 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco } if lightSync { - manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer, manager.handleProposedBlock) + manager.downloader = downloader.New(chainDb, manager.eventMux, nil, blockchain, removePeer, manager.handleProposedBlock) manager.peers.notify((*downloaderPeerNotify)(manager)) manager.fetcher = newLightFetcher(manager) }