diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 69639042c9..da636e88b5 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -179,9 +179,6 @@ type BlockChain interface { // CurrentHeader retrieves the head header from the local chain. CurrentHeader() *types.Header - // InsertHeaderChain inserts a batch of headers into the local chain. - InsertHeaderChain([]*types.Header) (int, error) - // GetTd returns the total difficulty of a local block. GetTd(common.Hash, uint64) *big.Int @@ -637,9 +634,19 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * d.pivotHeader = pivot d.pivotLock.Unlock() - fetchers = append(fetchers, func() error { return d.processSnapSyncContent() }) + fetchers = append(fetchers, func() error { + if err := d.processSnapSyncContent(); err != nil { + return err + } + return d.checkStalling(td, beaconMode) + }) } else if mode == ethconfig.FullSync { - fetchers = append(fetchers, func() error { return d.processFullSyncContent(ttd, beaconMode) }) + fetchers = append(fetchers, func() error { + if err := d.processFullSyncContent(ttd, beaconMode); err != nil { + return err + } + return d.checkStalling(td, beaconMode) + }) } // update the chasing head d.blockchain.UpdateChasingHead(remoteHeader) @@ -1255,9 +1262,8 @@ func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error { // queue until the stream ends or a failure occurs. func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode bool) error { var ( - mode = d.getMode() - gotHeaders = false // Wait for batches of headers to process - timer = time.NewTimer(time.Second) + mode = d.getMode() + timer = time.NewTimer(time.Second) ) defer timer.Stop() @@ -1276,46 +1282,11 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode case <-d.cancelCh: } } - // If we're in legacy sync mode, we need to check total difficulty - // violations from malicious peers. That is not needed in beacon - // mode and we can skip to terminating sync. - if !beaconMode { - // If no headers were retrieved at all, the peer violated its TD promise that it had a - // better chain compared to ours. The only exception is if its promised blocks were - // already imported by other means (e.g. fetcher): - // - // R , L : Both at block 10 - // R: Mine block 11, and propagate it to L - // L: Queue block 11 for import - // L: Notice that R's head and TD increased compared to ours, start sync - // L: Import of block 11 finishes - // L: Sync begins, and finds common ancestor at 11 - // L: Request new headers up from 11 (R's TD was higher, it must have something) - // R: Nothing to give - head := d.blockchain.CurrentBlock() - if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { - return errStallingPeer - } - // If snap or light syncing, ensure promised headers are indeed delivered. This is - // needed to detect scenarios where an attacker feeds a bad pivot and then bails out - // of delivering the post-pivot blocks that would flag the invalid content. - // - // This check cannot be executed "as is" for full imports, since blocks may still be - // queued for processing when the header download completes. However, as long as the - // peer gave us something useful, we're already happy/progressed (above check). - if mode == SnapSync { - head := d.blockchain.CurrentHeader() - if td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { - return errStallingPeer - } - } - } return nil } // Otherwise split the chunk of headers into batches and process them headers, hashes, scheduled := task.headers, task.hashes, false - gotHeaders = true for len(headers) > 0 { // Terminate if something failed in between processing chunks select { @@ -1346,21 +1317,6 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode return fmt.Errorf("%w: %v", errInvalidChain, err) } log.Debug("Inserted headers before cutoff", "number", chunkHeaders[cutoff-1].Number, "hash", chunkHashes[cutoff-1]) - chunkHeaders = chunkHeaders[cutoff:] - chunkHashes = chunkHashes[cutoff:] - } - // TODO(Nathan): no need to `InsertHeaderChain` by design, but will fail without this, why? - // In case of header only syncing, validate the chunk immediately - if mode == ethconfig.SnapSync { - // Although the received headers might be all valid, a legacy - // PoW/PoA sync must not accept post-merge headers. Make sure - // that any transition is rejected at this point. - if len(chunkHeaders) > 0 { - if n, err := d.blockchain.InsertHeaderChain(chunkHeaders); err != nil { - log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err) - return fmt.Errorf("%w: %v", errInvalidChain, err) - } - } } // If we've reached the allowed number of pending headers, stall a bit for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { @@ -1376,6 +1332,10 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode // // Skip the bodies/receipts retrieval scheduling before the cutoff in snap // sync if chain pruning is configured. + if mode == ethconfig.SnapSync && cutoff != 0 { + chunkHeaders = chunkHeaders[cutoff:] + chunkHashes = chunkHashes[cutoff:] + } if len(chunkHeaders) > 0 { scheduled = true if d.queue.Schedule(chunkHeaders, chunkHashes, origin+uint64(cutoff)) != len(chunkHeaders) { @@ -1406,6 +1366,44 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode } } +func (d *Downloader) checkStalling(td *big.Int, beaconMode bool) error { + // If we're in legacy sync mode, we need to check total difficulty + // violations from malicious peers. That is not needed in beacon + // mode and we can skip to terminating sync. + if !beaconMode { + // If no headers were retrieved at all, the peer violated its TD promise that it had a + // better chain compared to ours. The only exception is if its promised blocks were + // already imported by other means (e.g. fetcher): + // + // R , L : Both at block 10 + // R: Mine block 11, and propagate it to L + // L: Queue block 11 for import + // L: Notice that R's head and TD increased compared to ours, start sync + // L: Import of block 11 finishes + // L: Sync begins, and finds common ancestor at 11 + // L: Request new headers up from 11 (R's TD was higher, it must have something) + // R: Nothing to give + head := d.blockchain.CurrentBlock() + if td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { + return errStallingPeer + } + // If snap or light syncing, ensure promised headers are indeed delivered. This is + // needed to detect scenarios where an attacker feeds a bad pivot and then bails out + // of delivering the post-pivot blocks that would flag the invalid content. + // + // This check cannot be executed "as is" for full imports, since blocks may still be + // queued for processing when the header download completes. However, as long as the + // peer gave us something useful, we're already happy/progressed (above check). + if d.getMode() == SnapSync { + head := d.blockchain.CurrentHeader() + if td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { + return errStallingPeer + } + } + } + return nil +} + // processFullSyncContent takes fetch results from the queue and imports them into the chain. func (d *Downloader) processFullSyncContent(ttd *big.Int, beaconMode bool) error { for {