diff --git a/eth/api_backend.go b/eth/api_backend.go index 766a99fc1ef6..0cf10ca00009 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -63,6 +63,7 @@ func (b *EthAPIBackend) CurrentBlock() *types.Header { func (b *EthAPIBackend) SetHead(number uint64) { b.eth.handler.downloader.Cancel() + b.eth.handler.downloader.ResetSkeleton() b.eth.blockchain.SetHead(number) } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 020dd7314bec..ac0a0e27bf9a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -116,6 +116,7 @@ type Downloader struct { // Callbacks dropPeer peerDropFn // Drops a peer for misbehaving badBlock badBlockFn // Reports a block as rejected by the chain + success func() // Callback to signal successful sync completion // Status synchronising atomic.Bool @@ -237,6 +238,7 @@ func New(stateDb ethdb.Database, mode ethconfig.SyncMode, mux *event.TypeMux, ch chainCutoffNumber: cutoffNumber, chainCutoffHash: cutoffHash, dropPeer: dropPeer, + success: success, headerProcCh: make(chan *headerTask, 1), quitCh: make(chan struct{}), SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()), @@ -662,6 +664,14 @@ func (d *Downloader) Cancel() { d.blockchain.InterruptInsert(false) } +// ResetSkeleton terminates the skeleton syncer and reinitializes it. +func (d *Downloader) ResetSkeleton() { + log.Debug("Resetting skeleton syncer due to chain rewind") + d.skeleton.Terminate() + rawdb.DeleteSkeletonSyncStatus(d.stateDB) + d.skeleton = newSkeleton(d.stateDB, d.peers, d.dropPeer, newBeaconBackfiller(d, d.success)) +} + // Terminate interrupts the downloader, canceling all pending operations. // The downloader cannot be reused after calling Terminate. func (d *Downloader) Terminate() { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 7fa2522a3d43..f7b6b4d1c26a 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -734,3 +734,46 @@ func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) { t.Fatalf("Failed to sync chain in three seconds") } } + +// TestSkeletonResetAfterSetHead tests that the skeleton syncer is properly reset +// when the chain is rewound using SetHead, preventing data inconsistency issues. +func TestSkeletonResetAfterSetHead(t *testing.T) { + tester := newTester(t, ethconfig.SnapSync) + defer tester.terminate() + + chain := testChainBase.shorten(800) + tester.newPeer("peer", eth.ETH68, chain.blocks[1:]) + + if _, err := tester.chain.InsertChain(chain.blocks[1:401]); err != nil { + t.Fatalf("Failed to insert chain: %v", err) + } + + // Start beacon sync to populate the skeleton + header := chain.blocks[400].Header() + if err := tester.downloader.BeaconSync(header, header); err != nil { + t.Fatalf("Failed to start beacon sync: %v", err) + } + + // Wait for the skeleton state + time.Sleep(20 * time.Millisecond) + + // Check skeleton sync status exists in database before SetHead + if skeleton := rawdb.ReadSkeletonSyncStatus(tester.downloader.stateDB); len(skeleton) == 0 { + t.Fatal("Skeleton sync status should exist in database before SetHead") + } + + // Simulate chain rewind by calling SetHead + tester.downloader.Cancel() + tester.downloader.ResetSkeleton() + tester.chain.SetHead(200) + + // Verify skeleton sync status was cleared from database + if skeleton := rawdb.ReadSkeletonSyncStatus(tester.downloader.stateDB); len(skeleton) != 0 { + t.Fatal("Skeleton sync status should be cleared from database after SetHead") + } + + // Verify we can start a new sync after reset + if err := tester.downloader.BeaconSync(header, header); err != nil { + t.Fatalf("Failed to start beacon sync after reset: %v", err) + } +}