Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 56 additions & 58 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,6 @@ type BlockChain interface {
// CurrentHeader retrieves the head header from the local chain.
CurrentHeader() *types.Header

// InsertHeaderChain inserts a batch of headers into the local chain.
InsertHeaderChain([]*types.Header) (int, error)

// GetTd returns the total difficulty of a local block.
GetTd(common.Hash, uint64) *big.Int

Expand Down Expand Up @@ -637,9 +634,19 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
d.pivotHeader = pivot
d.pivotLock.Unlock()

fetchers = append(fetchers, func() error { return d.processSnapSyncContent() })
fetchers = append(fetchers, func() error {
if err := d.processSnapSyncContent(); err != nil {
return err
}
return d.checkStalling(td, beaconMode)
})
} else if mode == ethconfig.FullSync {
fetchers = append(fetchers, func() error { return d.processFullSyncContent(ttd, beaconMode) })
fetchers = append(fetchers, func() error {
if err := d.processFullSyncContent(ttd, beaconMode); err != nil {
return err
}
return d.checkStalling(td, beaconMode)
})
}
// update the chasing head
d.blockchain.UpdateChasingHead(remoteHeader)
Expand Down Expand Up @@ -1255,9 +1262,8 @@ func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error {
// queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode bool) error {
var (
mode = d.getMode()
gotHeaders = false // Wait for batches of headers to process
timer = time.NewTimer(time.Second)
mode = d.getMode()
timer = time.NewTimer(time.Second)
)
defer timer.Stop()

Expand All @@ -1276,46 +1282,11 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
case <-d.cancelCh:
}
}
// If we're in legacy sync mode, we need to check total difficulty
// violations from malicious peers. That is not needed in beacon
// mode and we can skip to terminating sync.
if !beaconMode {
// If no headers were retrieved at all, the peer violated its TD promise that it had a
// better chain compared to ours. The only exception is if its promised blocks were
// already imported by other means (e.g. fetcher):
//
// R <remote peer>, L <local node>: Both at block 10
// R: Mine block 11, and propagate it to L
// L: Queue block 11 for import
// L: Notice that R's head and TD increased compared to ours, start sync
// L: Import of block 11 finishes
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
head := d.blockchain.CurrentBlock()
if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
// If snap or light syncing, ensure promised headers are indeed delivered. This is
// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
// of delivering the post-pivot blocks that would flag the invalid content.
//
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if mode == SnapSync {
head := d.blockchain.CurrentHeader()
if td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
}
}
return nil
}
// Otherwise split the chunk of headers into batches and process them
headers, hashes, scheduled := task.headers, task.hashes, false

gotHeaders = true
for len(headers) > 0 {
// Terminate if something failed in between processing chunks
select {
Expand Down Expand Up @@ -1346,21 +1317,6 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
log.Debug("Inserted headers before cutoff", "number", chunkHeaders[cutoff-1].Number, "hash", chunkHashes[cutoff-1])
chunkHeaders = chunkHeaders[cutoff:]
chunkHashes = chunkHashes[cutoff:]
}
// TODO(Nathan): no need to `InsertHeaderChain` by design, but will fail without this, why?
// In case of header only syncing, validate the chunk immediately
if mode == ethconfig.SnapSync {
// Although the received headers might be all valid, a legacy
// PoW/PoA sync must not accept post-merge headers. Make sure
// that any transition is rejected at this point.
if len(chunkHeaders) > 0 {
if n, err := d.blockchain.InsertHeaderChain(chunkHeaders); err != nil {
log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
}
}
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
Expand All @@ -1376,6 +1332,10 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
//
// Skip the bodies/receipts retrieval scheduling before the cutoff in snap
// sync if chain pruning is configured.
if mode == ethconfig.SnapSync && cutoff != 0 {
chunkHeaders = chunkHeaders[cutoff:]
chunkHashes = chunkHashes[cutoff:]
}
if len(chunkHeaders) > 0 {
scheduled = true
if d.queue.Schedule(chunkHeaders, chunkHashes, origin+uint64(cutoff)) != len(chunkHeaders) {
Expand Down Expand Up @@ -1406,6 +1366,44 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
}
}

func (d *Downloader) checkStalling(td *big.Int, beaconMode bool) error {
// If we're in legacy sync mode, we need to check total difficulty
// violations from malicious peers. That is not needed in beacon
// mode and we can skip to terminating sync.
if !beaconMode {
// If no headers were retrieved at all, the peer violated its TD promise that it had a
// better chain compared to ours. The only exception is if its promised blocks were
// already imported by other means (e.g. fetcher):
//
// R <remote peer>, L <local node>: Both at block 10
// R: Mine block 11, and propagate it to L
// L: Queue block 11 for import
// L: Notice that R's head and TD increased compared to ours, start sync
// L: Import of block 11 finishes
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
head := d.blockchain.CurrentBlock()
if td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
// If snap or light syncing, ensure promised headers are indeed delivered. This is
// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
// of delivering the post-pivot blocks that would flag the invalid content.
//
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if d.getMode() == SnapSync {
head := d.blockchain.CurrentHeader()
if td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
}
}
return nil
}

// processFullSyncContent takes fetch results from the queue and imports them into the chain.
func (d *Downloader) processFullSyncContent(ttd *big.Int, beaconMode bool) error {
for {
Expand Down