Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type beaconBackfiller struct {
downloader *Downloader // Downloader to direct via this callback implementation
success func() // Callback to run on successful sync cycle completion
filling bool // Flag whether the downloader is backfilling or not
filled *types.Header // Last header filled by the last terminated sync loop
started chan struct{} // Notification channel whether the downloader inited
lock sync.Mutex // Mutex protecting the sync lock
}
Expand All @@ -56,13 +55,15 @@ func (b *beaconBackfiller) suspend() *types.Header {
// If no filling is running, don't waste cycles
b.lock.Lock()
filling := b.filling
filled := b.filled
started := b.started
b.lock.Unlock()

if !filling {
// Sync cycle was inactive, retrieve and return the latest snap block
// as the filled header.
log.Debug("Backfiller was inactive")
return filled // Return the filled header on the previous sync completion

return b.downloader.blockchain.CurrentSnapBlock()
}
// A previous filling should be running, though it may happen that it hasn't
// yet started (being done on a new goroutine). Many concurrent beacon head
Expand All @@ -77,7 +78,6 @@ func (b *beaconBackfiller) suspend() *types.Header {
log.Debug("Backfiller has been suspended")

// Sync cycle was just terminated, retrieve and return the last filled header.
// Can't use `filled` as that contains a stale value from before cancellation.
return b.downloader.blockchain.CurrentSnapBlock()
}

Expand All @@ -92,7 +92,6 @@ func (b *beaconBackfiller) resume() {
return
}
b.filling = true
b.filled = nil
b.started = make(chan struct{})
b.lock.Unlock()

Expand All @@ -103,7 +102,6 @@ func (b *beaconBackfiller) resume() {
defer func() {
b.lock.Lock()
b.filling = false
b.filled = b.downloader.blockchain.CurrentSnapBlock()
b.lock.Unlock()
}()
// If the downloader fails, report an error as in beacon chain mode there
Expand All @@ -113,7 +111,7 @@ func (b *beaconBackfiller) resume() {
return
}
// Synchronization succeeded. Since this happens async, notify the outer
// context to disable snap syncing and enable transaction propagation.
// context to enable transaction propagation.
if b.success != nil {
b.success()
}
Expand Down Expand Up @@ -188,6 +186,8 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
log.Error("Failed to retrieve beacon bounds", "err", err)
return 0, err
}
log.Debug("Searching beacon ancestor", "local", number, "beaconhead", beaconHead.Number, "beacontail", beaconTail.Number)

var linked bool
switch d.getMode() {
case ethconfig.FullSync:
Expand Down Expand Up @@ -241,6 +241,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
}
start = check
}
log.Debug("Found beacon ancestor", "number", start)
return start, nil
}

Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func New(stateDb ethdb.Database, mode ethconfig.SyncMode, mux *event.TypeMux, ch
syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(),
}
// Create the post-merge skeleton syncer and start the process
dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success))
dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success), chain)

go dl.stateFetcher()
return dl
Expand Down
135 changes: 85 additions & 50 deletions eth/downloader/skeleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ type backfiller interface {
type skeleton struct {
db ethdb.Database // Database backing the skeleton
filler backfiller // Chain syncer suspended/resumed by head events
chain chainReader // Underlying block chain

peers *peerSet // Set of peers we can sync from
idles map[string]*peerConnection // Set of idle peers in the current sync cycle
Expand All @@ -231,12 +232,19 @@ type skeleton struct {
syncStarting func() // callback triggered after a sync cycle is inited but before started
}

// chainReader wraps the method to retrieve the head of the local chain.
type chainReader interface {
// CurrentSnapBlock retrieves the head snap block from the local chain.
CurrentSnapBlock() *types.Header
}

// newSkeleton creates a new sync skeleton that tracks a potentially dangling
// header chain until it's linked into an existing set of blocks.
func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller) *skeleton {
func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller, chain chainReader) *skeleton {
sk := &skeleton{
db: db,
filler: filler,
chain: chain,
peers: peers,
drop: drop,
requests: make(map[uint64]*headerRequest),
Expand Down Expand Up @@ -354,6 +362,29 @@ func (s *skeleton) Sync(head *types.Header, final *types.Header, force bool) err
}
}

// linked returns the flag indicating whether the skeleton has been linked with
// the local chain.
func (s *skeleton) linked(number uint64, hash common.Hash) bool {
linked := rawdb.HasHeader(s.db, hash, number) &&
rawdb.HasBody(s.db, hash, number) &&
rawdb.HasReceipts(s.db, hash, number)

// Ensure the skeleton chain links to the local chain below the chain head.
// This accounts for edge cases where leftover chain segments above the head
// may still link to the skeleton chain. In such cases, synchronization is
// likely to fail due to potentially missing segments in the middle.
//
// You can try to produce the edge case by these steps:
// - sync the chain
// - debug.setHead(`0x1`)
// - kill the geth process (the chain segment will be left with chain head rewound)
// - restart
if s.chain.CurrentSnapBlock() != nil {
linked = linked && s.chain.CurrentSnapBlock().Number.Uint64() >= number
}
return linked
}

// sync is the internal version of Sync that executes a single sync cycle, either
// until some termination condition is reached, or until the current cycle merges
// with a previously aborted run.
Expand All @@ -378,10 +409,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {

// If the sync is already done, resume the backfiller. When the loop stops,
// terminate the backfiller too.
linked := len(s.progress.Subchains) == 1 &&
rawdb.HasHeader(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.scratchHead)
linked := len(s.progress.Subchains) == 1 && s.linked(s.scratchHead, s.progress.Subchains[0].Next)
if linked {
s.filler.resume()
}
Expand Down Expand Up @@ -497,12 +525,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
// is still running, it will pick it up. If it already terminated,
// a new cycle needs to be spun up.
if linked {
linked = len(s.progress.Subchains) == 1 &&
rawdb.HasHeader(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.scratchHead)

if linked {
if len(s.progress.Subchains) == 1 && s.linked(s.scratchHead, s.progress.Subchains[0].Next) {
// The skeleton chain has been extended and is still linked with the local
// chain, try to re-schedule the backfiller if it's already terminated.
s.filler.resume()
Expand Down Expand Up @@ -946,6 +969,45 @@ func (s *skeleton) revertRequest(req *headerRequest) {
s.scratchOwners[(s.scratchHead-req.head)/requestHeaders] = ""
}

// mergeSubchains is invoked once certain beacon headers have been persisted locally
// and the subchains should be merged in case there are some overlaps between. An
// indicator will be returned if the last subchain is merged with previous subchain.
func (s *skeleton) mergeSubchains() bool {
// If the subchain extended into the next subchain, we need to handle
// the overlap. Since there could be many overlaps, do this in a loop.
var merged bool
for len(s.progress.Subchains) > 1 && s.progress.Subchains[1].Head >= s.progress.Subchains[0].Tail {
// Extract some stats from the second subchain
head := s.progress.Subchains[1].Head
tail := s.progress.Subchains[1].Tail
next := s.progress.Subchains[1].Next

// Since we just overwrote part of the next subchain, we need to trim
// its head independent of matching or mismatching content
if s.progress.Subchains[1].Tail >= s.progress.Subchains[0].Tail {
// Fully overwritten, get rid of the subchain as a whole
log.Debug("Previous subchain fully overwritten", "head", head, "tail", tail, "next", next)
s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...)
continue
} else {
// Partially overwritten, trim the head to the overwritten size
log.Debug("Previous subchain partially overwritten", "head", head, "tail", tail, "next", next)
s.progress.Subchains[1].Head = s.progress.Subchains[0].Tail - 1
}
// If the old subchain is an extension of the new one, merge the two
// and let the skeleton syncer restart (to clean internal state)
if rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[1].Head).Hash() == s.progress.Subchains[0].Next {
log.Debug("Previous subchain merged", "head", head, "tail", tail, "next", next)
s.progress.Subchains[0].Tail = s.progress.Subchains[1].Tail
s.progress.Subchains[0].Next = s.progress.Subchains[1].Next

s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...)
merged = true
}
}
return merged
}

func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged bool) {
res.peer.log.Trace("Processing header response", "head", res.headers[0].Number, "hash", res.headers[0].Hash(), "count", len(res.headers))

Expand Down Expand Up @@ -1019,10 +1081,9 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
// processing is done, so it's just one more "needless" check.
//
// The weird cascading checks are done to minimize the database reads.
linked = rawdb.HasHeader(s.db, header.ParentHash, header.Number.Uint64()-1) &&
rawdb.HasBody(s.db, header.ParentHash, header.Number.Uint64()-1) &&
rawdb.HasReceipts(s.db, header.ParentHash, header.Number.Uint64()-1)
linked = s.linked(header.Number.Uint64()-1, header.ParentHash)
if linked {
log.Debug("Primary subchain linked", "number", header.Number.Uint64()-1, "hash", header.ParentHash)
break
}
}
Expand All @@ -1036,6 +1097,9 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
// If the beacon chain was linked to the local chain, completely swap out
// all internal progress and abort header synchronization.
if linked {
// Merge all overlapped subchains beforehand
s.mergeSubchains()

// Linking into the local chain should also mean that there are no
// leftover subchains, but in the case of importing the blocks via
// the engine API, we will not push the subchains forward. This will
Expand Down Expand Up @@ -1093,41 +1157,10 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo

s.scratchHead -= uint64(consumed)

// If the subchain extended into the next subchain, we need to handle
// the overlap. Since there could be many overlaps (come on), do this
// in a loop.
for len(s.progress.Subchains) > 1 && s.progress.Subchains[1].Head >= s.progress.Subchains[0].Tail {
// Extract some stats from the second subchain
head := s.progress.Subchains[1].Head
tail := s.progress.Subchains[1].Tail
next := s.progress.Subchains[1].Next

// Since we just overwrote part of the next subchain, we need to trim
// its head independent of matching or mismatching content
if s.progress.Subchains[1].Tail >= s.progress.Subchains[0].Tail {
// Fully overwritten, get rid of the subchain as a whole
log.Debug("Previous subchain fully overwritten", "head", head, "tail", tail, "next", next)
s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...)
continue
} else {
// Partially overwritten, trim the head to the overwritten size
log.Debug("Previous subchain partially overwritten", "head", head, "tail", tail, "next", next)
s.progress.Subchains[1].Head = s.progress.Subchains[0].Tail - 1
}
// If the old subchain is an extension of the new one, merge the two
// and let the skeleton syncer restart (to clean internal state)
if rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[1].Head).Hash() == s.progress.Subchains[0].Next {
log.Debug("Previous subchain merged", "head", head, "tail", tail, "next", next)
s.progress.Subchains[0].Tail = s.progress.Subchains[1].Tail
s.progress.Subchains[0].Next = s.progress.Subchains[1].Next

s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...)
merged = true
}
}
// If subchains were merged, all further available headers in the scratch
// space are invalid since we skipped ahead. Stop processing the scratch
// space to avoid dropping peers thinking they delivered invalid data.
merged = s.mergeSubchains()
if merged {
break
}
Expand Down Expand Up @@ -1158,15 +1191,17 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
// due to the downloader backfilling past the tracked tail.
func (s *skeleton) cleanStales(filled *types.Header) error {
number := filled.Number.Uint64()
log.Trace("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash())
log.Debug("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash())

// If the filled header is below the linked subchain, something's corrupted
// internally. Report and error and refuse to do anything.
// If the filled header is below the subchain, it means the skeleton is not
// linked with local chain yet, don't bother to do cleanup.
if number+1 < s.progress.Subchains[0].Tail {
return fmt.Errorf("filled header below beacon header tail: %d < %d", number, s.progress.Subchains[0].Tail)
log.Debug("filled header below beacon header tail", "filled", number, "tail", s.progress.Subchains[0].Tail)
return nil
}
// If nothing in subchain is filled, don't bother to do cleanup.
if number+1 == s.progress.Subchains[0].Tail {
log.Debug("Skeleton chain not yet consumed", "filled", number, "hash", filled.Hash(), "tail", s.progress.Subchains[0].Tail)
return nil
}
// If the latest fill was on a different subchain, it means the backfiller
Expand Down
13 changes: 10 additions & 3 deletions eth/downloader/skeleton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -71,6 +72,12 @@ func (hf *hookedBackfiller) resume() {
}
}

type fakeChainReader struct{}

func (fc *fakeChainReader) CurrentSnapBlock() *types.Header {
return &types.Header{Number: big.NewInt(math.MaxInt64)}
}

// skeletonTestPeer is a mock peer that can only serve header requests from a
// pre-perated header chain (which may be arbitrarily wrong for testing).
//
Expand Down Expand Up @@ -369,7 +376,7 @@ func TestSkeletonSyncInit(t *testing.T) {
// Create a skeleton sync and run a cycle
wait := make(chan struct{})

skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller())
skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller(), &fakeChainReader{})
skeleton.syncStarting = func() { close(wait) }
skeleton.Sync(tt.head, nil, true)

Expand Down Expand Up @@ -472,7 +479,7 @@ func TestSkeletonSyncExtend(t *testing.T) {
// Create a skeleton sync and run a cycle
wait := make(chan struct{})

skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller())
skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller(), &fakeChainReader{})
skeleton.syncStarting = func() { close(wait) }
skeleton.Sync(tt.head, nil, true)

Expand Down Expand Up @@ -885,7 +892,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
}
}
// Create a skeleton sync and run a cycle
skeleton := newSkeleton(db, peerset, drop, filler)
skeleton := newSkeleton(db, peerset, drop, filler, &fakeChainReader{})
skeleton.Sync(tt.head, nil, true)

// Wait a bit (bleah) for the initial sync loop to go to idle. This might
Expand Down