From f861aee7259ff86d3248483c63eccec554a4d54e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 30 Sep 2021 14:51:28 +0300 Subject: [PATCH 1/5] eth/downloader: implement beacon sync --- core/rawdb/accessors_sync.go | 80 +++ core/rawdb/schema.go | 9 + eth/api.go | 10 + eth/downloader/beaconsync.go | 221 ++++++ eth/downloader/downloader.go | 176 +++-- eth/downloader/downloader_test.go | 6 +- eth/downloader/fetchers_concurrent.go | 6 +- eth/downloader/peer.go | 12 +- eth/downloader/skeleton.go | 965 ++++++++++++++++++++++++++ eth/downloader/skeleton_test.go | 257 +++++++ eth/handler.go | 22 +- eth/sync.go | 8 +- internal/web3ext/web3ext.go | 5 + 13 files changed, 1694 insertions(+), 83 deletions(-) create mode 100644 core/rawdb/accessors_sync.go create mode 100644 eth/downloader/beaconsync.go create mode 100644 eth/downloader/skeleton.go create mode 100644 eth/downloader/skeleton_test.go diff --git a/core/rawdb/accessors_sync.go b/core/rawdb/accessors_sync.go new file mode 100644 index 000000000000..50dfb848e4e0 --- /dev/null +++ b/core/rawdb/accessors_sync.go @@ -0,0 +1,80 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "bytes" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +// ReadSkeletonSyncStatus retrieves the serialized sync status saved at shutdown. +func ReadSkeletonSyncStatus(db ethdb.KeyValueReader) []byte { + data, _ := db.Get(skeletonSyncStatusKey) + return data +} + +// WriteSkeletonSyncStatus stores the serialized sync status to save at shutdown. +func WriteSkeletonSyncStatus(db ethdb.KeyValueWriter, status []byte) { + if err := db.Put(skeletonSyncStatusKey, status); err != nil { + log.Crit("Failed to store skeleton sync status", "err", err) + } +} + +// DeleteSkeletonSyncStatus deletes the serialized sync status saved at the last +// shutdown +func DeleteSkeletonSyncStatus(db ethdb.KeyValueWriter) { + if err := db.Delete(skeletonSyncStatusKey); err != nil { + log.Crit("Failed to remove skeleton sync status", "err", err) + } +} + +// ReadSkeletonHeader retrieves a block header from the skeleton sync store, +func ReadSkeletonHeader(db ethdb.KeyValueReader, number uint64) *types.Header { + data, _ := db.Get(skeletonHeaderKey(number)) + if len(data) == 0 { + return nil + } + header := new(types.Header) + if err := rlp.Decode(bytes.NewReader(data), header); err != nil { + log.Error("Invalid skeleton header RLP", "number", number, "err", err) + return nil + } + return header +} + +// WriteSkeletonHeader stores a block header into the skeleton sync store. +func WriteSkeletonHeader(db ethdb.KeyValueWriter, header *types.Header) { + data, err := rlp.EncodeToBytes(header) + if err != nil { + log.Crit("Failed to RLP encode header", "err", err) + } + key := skeletonHeaderKey(header.Number.Uint64()) + if err := db.Put(key, data); err != nil { + log.Crit("Failed to store skeleton header", "err", err) + } +} + +// DeleteSkeletonHeader removes all block header data associated with a hash. +func DeleteSkeletonHeader(db ethdb.KeyValueWriter, number uint64) { + if err := db.Delete(skeletonHeaderKey(number)); err != nil { + log.Crit("Failed to delete skeleton header", "err", err) + } +} diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index b35fcba45f79..f869f36e1040 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -63,6 +63,9 @@ var ( // snapshotSyncStatusKey tracks the snapshot sync status across restarts. snapshotSyncStatusKey = []byte("SnapshotSyncStatus") + // skeletonSyncStatusKey tracks the skeleton sync status across restarts. + skeletonSyncStatusKey = []byte("SkeletonSyncStatus") + // txIndexTailKey tracks the oldest block whose transactions have been indexed. txIndexTailKey = []byte("TransactionIndexTail") @@ -92,6 +95,7 @@ var ( SnapshotAccountPrefix = []byte("a") // SnapshotAccountPrefix + account hash -> account trie value SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value CodePrefix = []byte("c") // CodePrefix + code hash -> account code + skeletonHeaderPrefix = []byte("S") // skeletonHeaderPrefox + num (uint64 big endian) -> header PreimagePrefix = []byte("secure-key-") // PreimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db @@ -210,6 +214,11 @@ func bloomBitsKey(bit uint, section uint64, hash common.Hash) []byte { return key } +// skeletonHeaderKey = skeletonHeaderPrefix + num (uint64 big endian) +func skeletonHeaderKey(number uint64) []byte { + return append(skeletonHeaderPrefix, encodeBlockNumber(number)...) +} + // preimageKey = PreimagePrefix + hash func preimageKey(hash common.Hash) []byte { return append(PreimagePrefix, hash.Bytes()...) diff --git a/eth/api.go b/eth/api.go index f81dfa922b7a..bc2a615ad1dd 100644 --- a/eth/api.go +++ b/eth/api.go @@ -257,6 +257,16 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) { return true, nil } +// NewHead requests the node to beacon-sync to the designated head header. +func (api *PrivateAdminAPI) NewHead(blob hexutil.Bytes) error { + header := new(types.Header) + if err := rlp.DecodeBytes(blob, header); err != nil { + return err + } + mode, _ := api.eth.handler.chainSync.modeAndLocalHead() + return api.eth.Downloader().BeaconSync(mode, header) +} + // PublicDebugAPI is the collection of Ethereum full node APIs exposed // over the public debugging endpoint. type PublicDebugAPI struct { diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go new file mode 100644 index 000000000000..6c0b76016053 --- /dev/null +++ b/eth/downloader/beaconsync.go @@ -0,0 +1,221 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +// beaconBackfiller is the chain and state backfilling that can be commenced once +// the skeleton syncer has successfully reverse downloaded all the headers up to +// the genesis block or an existing header in the database. Its operation is fully +// directed by the skeleton sync's head/tail events. +type beaconBackfiller struct { + downloader *Downloader // Downloader to direct via this callback implementation + syncMode SyncMode // Sync mode to use for backfilling the skeleton chains + success func() // Callback to run on successful sync cycle completion + filling bool // Flag whether the downloader is backfilling or not + lock sync.Mutex // Mutex protecting the sync lock +} + +// newBeaconBackfiller is a helper method to create the backfiller. +func newBeaconBackfiller(dl *Downloader, success func()) backfiller { + return &beaconBackfiller{ + downloader: dl, + success: success, + } +} + +// suspend cancels any background downloader threads. +func (b *beaconBackfiller) suspend() { + b.downloader.Cancel() +} + +// resume starts the downloader threads for backfilling state and chain data. +func (b *beaconBackfiller) resume() { + b.lock.Lock() + if b.filling { + // If a previous filling cycle is still running, just ignore this start + // request. // TODO(karalabe): We should make this channel driven + b.lock.Unlock() + return + } + b.filling = true + mode := b.syncMode + b.lock.Unlock() + + // Start the backfilling on its own thread since the downloader does not have + // its own lifecycle runloop. + go func() { + // Set the backfiller to non-filling when download completes + defer func() { + b.lock.Lock() + b.filling = false + b.lock.Unlock() + }() + // If the downloader fails, report an error as in beacon chain mode there + // should be no errors as long as the chain we're syncing to is valid. + if err := b.downloader.synchronise("", common.Hash{}, nil, mode, true); err != nil { + log.Error("Beacon backfilling failed", "err", err) + return + } + // Synchronization succeeded. Since this happens async, notify the outer + // context to disable snap syncing and enable transaction propagation. + if b.success != nil { + b.success() + } + }() +} + +// setMode updates the sync mode from the current one to the requested one. If +// there's an active sync in progress, it will be cancelled and restarted. +func (b *beaconBackfiller) setMode(mode SyncMode) { + // Update the old sync mode and track if it was changed + b.lock.Lock() + updated := b.syncMode != mode + filling := b.filling + b.syncMode = mode + b.lock.Unlock() + + // If the sync mode was changed mid-sync, restart. This should never ever + // really happen, we just handle it to detect programming errors. + if !updated || !filling { + return + } + log.Error("Downloader sync mode changed mid-run", "old", mode.String(), "new", mode.String()) + b.suspend() + b.resume() +} + +// BeaconSync is the Ethereum 2 version of the chain synchronization, where the +// chain is not downloaded from genesis onward, rather from trusted head announces +// backwards. +// +// Internally backfilling and state sync is done the same way, but the header +// retrieval and scheduling is replaced. +func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error { + // When the downloader starts a sync cycle, it needs to be aware of the sync + // mode to use (full, snap). To keep the skeleton chain oblivious, inject the + // mode into the backfiller directly. + // + // Super crazy dangerous type cast. Should be fine (TM), we're only using a + // different backfiller implementation for skeleton tests. + d.skeleton.filler.(*beaconBackfiller).setMode(mode) + + // Signal the skeleton sync to switch to a new head, however it wants + if err := d.skeleton.Sync(head); err != nil { + return err + } + return nil +} + +// findBeaconAncestor tries to locate the common ancestor link of the local chain +// and the beacon chain just requested. In the general case when our node was in +// sync and on the correct chain, checking the top N links should already get us +// a match. In the rare scenario when we ended up on a long reorganisation (i.e. +// none of the head links match), we do a binary search to find the ancestor. +func (d *Downloader) findBeaconAncestor() uint64 { + // Figure out the current local head position + var head *types.Header + + switch d.getMode() { + case FullSync: + head = d.blockchain.CurrentBlock().Header() + case SnapSync: + head = d.blockchain.CurrentFastBlock().Header() + default: + head = d.lightchain.CurrentHeader() + } + number := head.Number.Uint64() + + // If the head is present in the skeleton chain, return that + if head.Hash() == d.skeleton.Header(number).Hash() { + return number + } + // Head header not present, binary search to find the ancestor + start, end := uint64(0), number + for start+1 < end { + // Split our chain interval in two, and request the hash to cross check + check := (start + end) / 2 + + h := d.skeleton.Header(check) + n := h.Number.Uint64() + + var known bool + switch d.getMode() { + case FullSync: + known = d.blockchain.HasBlock(h.Hash(), n) + case SnapSync: + known = d.blockchain.HasFastBlock(h.Hash(), n) + default: + known = d.lightchain.HasHeader(h.Hash(), n) + } + if !known { + end = check + continue + } + start = check + } + return start +} + +// fetchBeaconHeaders feeds skeleton headers to the downloader queue for scheduling +// until sync errors or is finished. +func (d *Downloader) fetchBeaconHeaders(from uint64) error { + head, err := d.skeleton.Head() + if err != nil { + return err + } + for { + // Retrieve a batch of headers and feed it to the header processor + headers := make([]*types.Header, 0, maxHeadersProcess) + for i := 0; i < maxHeadersProcess && from <= head.Number.Uint64(); i++ { + headers = append(headers, d.skeleton.Header(from)) + from++ + } + select { + case d.headerProcCh <- headers: + case <-d.cancelCh: + return errCanceled + } + // If we still have headers to import, loop and keep pushing them + if from <= head.Number.Uint64() { + continue + } + // If the pivot block is committed, signal header sync termination + if atomic.LoadInt32(&d.committed) == 1 { + d.headerProcCh <- nil + return nil + } + // State sync still going, wait a bit for new headers and retry + select { + case <-time.After(fsHeaderContCheck): + case <-d.cancelCh: + return errCanceled + } + head, err = d.skeleton.Head() + if err != nil { + return err + } + } +} diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 28ad18b81579..6aaae32d0a1b 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -30,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -123,6 +122,9 @@ type Downloader struct { // Channels headerProcCh chan *headerTask // Channel to feed the header processor new tasks + // Skeleton sync + skeleton *skeleton // Header skeleton to backfill the chain with (eth2 mode) + // State sync pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root pivotLock sync.RWMutex // Lock protecting pivot header reads from updates @@ -201,7 +203,7 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { +func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, success func()) *Downloader { if lightchain == nil { lightchain = chain } @@ -219,6 +221,8 @@ func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain Bl SnapSyncer: snap.NewSyncer(stateDb), stateSyncStart: make(chan *stateSync), } + dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success)) + go dl.stateFetcher() return dl } @@ -318,10 +322,10 @@ func (d *Downloader) UnregisterPeer(id string) error { return nil } -// Synchronise tries to sync up our local block chain with a remote peer, both +// LegacySync tries to sync up our local block chain with a remote peer, both // adding various sanity checks as well as wrapping it with various log entries. -func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error { - err := d.synchronise(id, head, td, mode) +func (d *Downloader) LegacySync(id string, head common.Hash, td *big.Int, mode SyncMode) error { + err := d.synchronise(id, head, td, mode, false) switch err { case nil, errBusy, errCanceled: @@ -347,7 +351,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode // synchronise will select the peer and use it for synchronising. If an empty string is given // it will use the best peer possible and synchronize if its TD is higher than our own. If any of the // checks fail an error will be returned. This method is synchronous -func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error { +func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode, beaconMode bool) error { // Mock out the synchronisation if testing if d.synchroniseMock != nil { return d.synchroniseMock(id, hash) @@ -402,11 +406,14 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode atomic.StoreUint32(&d.mode, uint32(mode)) // Retrieve the origin peer and initiate the downloading process - p := d.peers.Peer(id) - if p == nil { - return errUnknownPeer + var p *peerConnection + if !beaconMode { // Beacon mode doesn't need a peer to sync from + p = d.peers.Peer(id) + if p == nil { + return errUnknownPeer + } } - return d.syncWithPeer(p, hash, td) + return d.syncWithPeer(p, hash, td, beaconMode) } func (d *Downloader) getMode() SyncMode { @@ -415,7 +422,7 @@ func (d *Downloader) getMode() SyncMode { // syncWithPeer starts a block synchronization based on the hash chain from the // specified peer and head hash. -func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { +func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int, beaconMode bool) (err error) { d.mux.Post(StartEvent{}) defer func() { // reset on error @@ -426,33 +433,57 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I d.mux.Post(DoneEvent{latest}) } }() - if p.version < eth.ETH66 { - return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, eth.ETH66) - } mode := d.getMode() - log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode) + if !beaconMode { + log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode) + } else { + log.Debug("Backfilling with the network", "mode", mode) + } defer func(start time.Time) { log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start))) }(time.Now()) // Look up the sync boundaries: the common ancestor and the target block - latest, pivot, err := d.fetchHead(p) - if err != nil { - return err - } - if mode == SnapSync && pivot == nil { - // If no pivot block was returned, the head is below the min full block - // threshold (i.e. new chain). In that case we won't really snap sync - // anyway, but still need a valid pivot block to avoid some code hitting - // nil panics on an access. - pivot = d.blockchain.CurrentBlock().Header() + var latest, pivot *types.Header + if !beaconMode { + // In legacy mode, use the master peer to retrieve the headers from + latest, pivot, err = d.fetchHead(p) + if err != nil { + return err + } + if mode == SnapSync && pivot == nil { + // If no pivot block was returned, the head is below the min full block + // threshold (i.e. new chain). In that case we won't really snap sync + // anyway, but still need a valid pivot block to avoid some code hitting + // nil panics on an access. + pivot = d.blockchain.CurrentBlock().Header() + } + } else { + // In beacon mode, user the skeleton chain to retrieve the headers from + latest, err = d.skeleton.Head() + if err != nil { + return err + } + // Opposed to legacy mode, in beacon mode we trust the chain we've been + // told to sync to, so no need to leave a gap between the pivot and head + // to full sync. Still, the downloader's been architected to do a full + // block import after the pivot, so make it off by one to avoid having + // to special case everything internally. + pivot = d.skeleton.Header(latest.Number.Uint64() - 1) } height := latest.Number.Uint64() - origin, err := d.findAncestor(p, latest) - if err != nil { - return err + var origin uint64 + if !beaconMode { + // In legacy mode, reach out to the network and find the ancestor + origin, err = d.findAncestor(p, latest) + if err != nil { + return err + } + } else { + // In beacon mode, use the skeleton chain for the ancestor lookup + origin = d.findBeaconAncestor() } d.syncStatsLock.Lock() if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { @@ -523,11 +554,19 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I if d.syncInitHook != nil { d.syncInitHook(origin, height) } + var headerFetcher func() error + if !beaconMode { + // In legacy mode, headers are retrieved from the network + headerFetcher = func() error { return d.fetchHeaders(p, origin+1) } + } else { + // In beacon mode, headers are served by the skeleton syncer + headerFetcher = func() error { return d.fetchBeaconHeaders(origin + 1) } + } fetchers := []func() error{ - func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync - func() error { return d.processHeaders(origin+1, td) }, + headerFetcher, // Headers are always retrieved + func() error { return d.fetchBodies(origin+1, beaconMode) }, // Bodies are retrieved during normal and snap sync + func() error { return d.fetchReceipts(origin+1, beaconMode) }, // Receipts are retrieved during snap sync + func() error { return d.processHeaders(origin+1, td, beaconMode) }, } if mode == SnapSync { d.pivotLock.Lock() @@ -1127,7 +1166,7 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( log.Debug("Filling up skeleton", "from", from) d.queue.ScheduleSkeleton(from, skeleton) - err := d.concurrentFetch((*headerQueue)(d)) + err := d.concurrentFetch((*headerQueue)(d), false) if err != nil { log.Debug("Skeleton fill failed", "err", err) } @@ -1141,9 +1180,9 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( // fetchBodies iteratively downloads the scheduled block bodies, taking any // available peers, reserving a chunk of blocks for each, waiting for delivery // and also periodically checking for timeouts. -func (d *Downloader) fetchBodies(from uint64) error { +func (d *Downloader) fetchBodies(from uint64, beaconMode bool) error { log.Debug("Downloading block bodies", "origin", from) - err := d.concurrentFetch((*bodyQueue)(d)) + err := d.concurrentFetch((*bodyQueue)(d), beaconMode) log.Debug("Block body download terminated", "err", err) return err @@ -1152,9 +1191,9 @@ func (d *Downloader) fetchBodies(from uint64) error { // fetchReceipts iteratively downloads the scheduled block receipts, taking any // available peers, reserving a chunk of receipts for each, waiting for delivery // and also periodically checking for timeouts. -func (d *Downloader) fetchReceipts(from uint64) error { +func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error { log.Debug("Downloading receipts", "origin", from) - err := d.concurrentFetch((*receiptQueue)(d)) + err := d.concurrentFetch((*receiptQueue)(d), beaconMode) log.Debug("Receipt download terminated", "err", err) return err @@ -1163,7 +1202,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { // processHeaders takes batches of retrieved headers from an input channel and // keeps processing and scheduling them into the header chain and downloader's // queue until the stream ends or a failure occurs. -func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { +func (d *Downloader) processHeaders(origin uint64, td *big.Int, beaconMode bool) error { // Keep a count of uncertain headers to roll back var ( rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis) @@ -1211,35 +1250,40 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { case <-d.cancelCh: } } - // If no headers were retrieved at all, the peer violated its TD promise that it had a - // better chain compared to ours. The only exception is if its promised blocks were - // already imported by other means (e.g. fetcher): - // - // R , L : Both at block 10 - // R: Mine block 11, and propagate it to L - // L: Queue block 11 for import - // L: Notice that R's head and TD increased compared to ours, start sync - // L: Import of block 11 finishes - // L: Sync begins, and finds common ancestor at 11 - // L: Request new headers up from 11 (R's TD was higher, it must have something) - // R: Nothing to give - if mode != LightSync { - head := d.blockchain.CurrentBlock() - if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { - return errStallingPeer + // If we're in legacy sync mode, we need to check total difficulty + // violations from malicious peers. That is not needed in beacon + // mode and we can skip to terminating sync. + if !beaconMode { + // If no headers were retrieved at all, the peer violated its TD promise that it had a + // better chain compared to ours. The only exception is if its promised blocks were + // already imported by other means (e.g. fetcher): + // + // R , L : Both at block 10 + // R: Mine block 11, and propagate it to L + // L: Queue block 11 for import + // L: Notice that R's head and TD increased compared to ours, start sync + // L: Import of block 11 finishes + // L: Sync begins, and finds common ancestor at 11 + // L: Request new headers up from 11 (R's TD was higher, it must have something) + // R: Nothing to give + if mode != LightSync { + head := d.blockchain.CurrentBlock() + if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { + return errStallingPeer + } } - } - // If snap or light syncing, ensure promised headers are indeed delivered. This is - // needed to detect scenarios where an attacker feeds a bad pivot and then bails out - // of delivering the post-pivot blocks that would flag the invalid content. - // - // This check cannot be executed "as is" for full imports, since blocks may still be - // queued for processing when the header download completes. However, as long as the - // peer gave us something useful, we're already happy/progressed (above check). - if mode == SnapSync || mode == LightSync { - head := d.lightchain.CurrentHeader() - if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { - return errStallingPeer + // If snap or light syncing, ensure promised headers are indeed delivered. This is + // needed to detect scenarios where an attacker feeds a bad pivot and then bails out + // of delivering the post-pivot blocks that would flag the invalid content. + // + // This check cannot be executed "as is" for full imports, since blocks may still be + // queued for processing when the header download completes. However, as long as the + // peer gave us something useful, we're already happy/progressed (above check). + if mode == SnapSync || mode == LightSync { + head := d.lightchain.CurrentHeader() + if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { + return errStallingPeer + } } } // Disable any rollback and return diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 70c6a51215b5..b3ffe35e85f8 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -75,7 +75,7 @@ func newTester() *downloadTester { chain: chain, peers: make(map[string]*downloadTesterPeer), } - tester.downloader = New(0, db, new(event.TypeMux), tester.chain, nil, tester.dropPeer) + tester.downloader = New(0, db, new(event.TypeMux), tester.chain, nil, tester.dropPeer, nil) return tester } @@ -96,7 +96,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { td = dl.peers[id].chain.GetTd(head.Hash(), head.NumberU64()) } // Synchronise with the chosen peer and ensure proper cleanup afterwards - err := dl.downloader.synchronise(id, head.Hash(), td, mode) + err := dl.downloader.synchronise(id, head.Hash(), td, mode, false) select { case <-dl.downloader.cancelCh: // Ok, downloader fully cancelled after sync cycle @@ -971,7 +971,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) { // Simulate a synchronisation and check the required result tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result } - tester.downloader.Synchronise(id, tester.chain.Genesis().Hash(), big.NewInt(1000), FullSync) + tester.downloader.LegacySync(id, tester.chain.Genesis().Hash(), big.NewInt(1000), FullSync) if _, ok := tester.peers[id]; !ok != tt.drop { t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop) } diff --git a/eth/downloader/fetchers_concurrent.go b/eth/downloader/fetchers_concurrent.go index 4bade2b4c3dd..a0aa197175a3 100644 --- a/eth/downloader/fetchers_concurrent.go +++ b/eth/downloader/fetchers_concurrent.go @@ -76,7 +76,7 @@ type typedQueue interface { // concurrentFetch iteratively downloads scheduled block parts, taking available // peers, reserving a chunk of fetch requests for each and waiting for delivery // or timeouts. -func (d *Downloader) concurrentFetch(queue typedQueue) error { +func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { // Create a delivery channel to accept responses from all peers responses := make(chan *eth.Response) @@ -127,7 +127,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error { finished := false for { // Short circuit if we lost all our peers - if d.peers.Len() == 0 { + if d.peers.Len() == 0 && !beaconMode { return errNoPeers } // If there's nothing more to fetch, wait or terminate @@ -209,7 +209,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error { } // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error - if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 { + if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 && !beaconMode { return errPeersUnavailable } } diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 324fdb9cd51f..d74d23e74d55 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -294,19 +294,19 @@ func (ps *peerSet) AllPeers() []*peerConnection { // peerCapacitySort implements sort.Interface. // It sorts peer connections by capacity (descending). type peerCapacitySort struct { - p []*peerConnection - tp []int + peers []*peerConnection + caps []int } func (ps *peerCapacitySort) Len() int { - return len(ps.p) + return len(ps.peers) } func (ps *peerCapacitySort) Less(i, j int) bool { - return ps.tp[i] > ps.tp[j] + return ps.caps[i] > ps.caps[j] } func (ps *peerCapacitySort) Swap(i, j int) { - ps.p[i], ps.p[j] = ps.p[j], ps.p[i] - ps.tp[i], ps.tp[j] = ps.tp[j], ps.tp[i] + ps.peers[i], ps.peers[j] = ps.peers[j], ps.peers[i] + ps.caps[i], ps.caps[j] = ps.caps[j], ps.caps[i] } diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go new file mode 100644 index 000000000000..7c604cbffac7 --- /dev/null +++ b/eth/downloader/skeleton.go @@ -0,0 +1,965 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "encoding/json" + "errors" + "math/rand" + "sort" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// scratchHeaders is the number of headers to store in a scratch space to allow +// concurrent downloads. A header is about 0.5KB in size, so there is no worry +// about using too much memory. The only catch is that we can only validate gaps +// afer they're linked to the head, so the bigger the scratch space, the larger +// potential for invalid headers. +// +// The current scratch space of 131072 headers is expected to use 64MB RAM. +const scratchHeaders = 131072 + +// requestHeaders is the number of header to request from a remote peer in a single +// network packet. Although the skeleton downloader takes into consideration peer +// capacities when picking idlers, the packet size was decided to remain constant +// since headers are relatively small and it's easier to work with fixed batches +// vs. dynamic interval fillings. +const requestHeaders = 512 + +// errSyncLinked is an internal helper error to signal that the current sync +// cycle linked up to the genesis block, this the skeleton syncer should ping +// the backfiller to resume. Since we already have that logic on sync start, +// piggie-back on that instead of 2 entrypoints. +var errSyncLinked = errors.New("sync linked") + +// errSyncMerged is an internal helper error to signal that the current sync +// cycle merged with a previously aborted subchain, thus the skeleton syncer +// should abort and restart with the new state. +var errSyncMerged = errors.New("sync merged") + +// errSyncReorged is an internal helper error to signal that the head chain of +// the current sync cycle was (partially) reorged, thus the skeleton syncer +// should abort and restart with the new state. +var errSyncReorged = errors.New("sync reorged") + +// errTerminated is returned if the sync mechanism was terminated for this run of +// the process. This is usually the case when Geth is shutting down and some events +// might still be propagating. +var errTerminated = errors.New("terminated") + +func init() { + // Tuning parameters is nice, but the scratch space must be assignable in + // full to peers. It's a useless cornercase to support a dangling half-group. + if scratchHeaders%requestHeaders != 0 { + panic("Please make scratchHeaders divisible by requestHeaders") + } +} + +// subchain is a contiguous header chain segment that is backed by the database, +// but may not be linked to the live chain. The skeleton downloader may produce +// a new one of these every time it is restarted until the subchain grows large +// enough to connect with a previous subchain. +// +// The subchains use the exact same database namespace and are not disjoint from +// each other. As such, extending one to overlap the other entails reducing the +// second one first. This combined buffer model is used to avoid having to move +// data on disk when two subchains are joined together. +type subchain struct { + Head uint64 // Block number of the newest header in the subchain + Tail uint64 // Block number of the oldest header in the subchain + Next common.Hash // Block hash of the next oldest header in the subchain +} + +// skeletonProgress is a database entry to allow suspending and resuming a chain +// sync. As the skeleton header chain is downloaded backwards, restarts can and +// will produce temporarilly disjoint subchains. There is no way to restart a +// suspended skeleton sync without prior knowlege of all prior suspension points. +type skeletonProgress struct { + Subchains []*subchain // Disjoint subchains downloaded until now +} + +// headerRequest tracks a pending header request to ensure responses are to +// actual requests and to validate any security constraints. +// +// Concurrency note: header requests and responses are handled concurrently from +// the main runloop to allow Keccak256 hash verifications on the peer's thread and +// to drop on invalid response. The request struct must contain all the data to +// construct the response without accessing runloop internals (i.e. subchains). +// That is only included to allow the runloop to match a response to the task being +// synced without having yet another set of maps. +type headerRequest struct { + peer string // Peer to which this request is assigned + id uint64 // Request ID of this request + + deliver chan *headerResponse // Channel to deliver successful response on + revert chan *headerRequest // Channel to deliver request failure on + cancel chan struct{} // Channel to track sync cancellation + stale chan struct{} // Channel to signal the request was dropped + + head uint64 // Head number of the requested batch of headers +} + +// headerResponse is an already verified remote response to a header request. +type headerResponse struct { + peer *peerConnection // Peer from which this response originates + reqid uint64 // Request ID that this response fulfils + headers []*types.Header // Chain of headers +} + +// backfiller is a callback interface through which the skeleton sync can tell +// the downloader that it should suspend or resume backfilling on specific head +// events (e.g. suspend on forks or gaps, resume on successfull linkups). +type backfiller interface { + // suspend requests the backfiller to abort any running full or snap sync + // based on the skeleton chain as it might be invalid. The backfiller should + // gracefully handle multiple consecutive suspends without a resume, even + // on initial sartup. + suspend() + + // resume requests the backfiller to start running fill or snap sync based on + // the skeleton chain as it has successfully been linked. Appending new heads + // to the end of the chain will not result in suspend/resume cycles. + resume() +} + +// skeleton represents a header chain synchronized after the Ethereum 2 merge, +// where blocks aren't validated any more via PoW in a forward fashion, rather +// are dictated and extended at the head via the beacon chain and backfilled on +// the original Ethereum 1 block sync protocol. +// +// Since the skeleton is grown backwards from head to genesis, it is handled as +// a separate entity, not mixed in with the logical sequential transition of the +// blocks. Once the skeleton is connected to an existing, validated chain, the +// headers will be moved into the main downloader for filling and execution. +// +// Opposed to the Ethereum 1 block synchronization which is trustless (and uses a +// master peer to minimize the attack surface), Ethereum 2 block synchronization +// starts from a trusted head. As such, there is no need for a master peer any +// more and headers can be requested fully concurrently (though some batches might +// be discarded if they don't link up correctly). +// +// Although a skeleton is part of a sync cycle, it is not recreated, rather stays +// alive throughout the lifetime of the downloader. This allows it to be extended +// concurrently with the sync cycle, since extensions arrive from an API surface, +// not from within (vs. Ethereum 1 sync). +// +// Since the skeleton tracks the entire header chain until it is cosumed by the +// forward block filling, it needs 0.5KB/block storage. At current mainnet sizes +// this is only possible with a disk backend. Since the skeleton is separate from +// the node's header chain, storing the headers ephemerally until sync finishes +// is wasted disk IO, but it's a price we're going to pay to keep things simple +// for now. +type skeleton struct { + db ethdb.Database // Database backing the skeleton + filler backfiller // Chain syncer suspended/resumed by head events + + peers *peerSet // Set of peers we can sync from + idles map[string]*peerConnection // Set of idle peers in the current sync cycle + drop peerDropFn // Drops a peer for misbehaving + + progress *skeletonProgress // Sync progress tracker for resumption and metrics + started time.Time // Timestamp when the skeleton syncer was created + logged time.Time // Timestamp when progress was last logged to the user + pulled uint64 // Number of headers downloaded in this run + + scratchSpace []*types.Header // Scratch space to accumulate headers in (first = recent) + scratchOwners []string // Peer IDs owning chunks of the scratch space (pend or delivered) + scratchHead uint64 // Block number of the first item in the scratch space + + requests map[uint64]*headerRequest // Header requests currently running + + headEvents chan *types.Header // Notification channel for new heads + terminate chan chan error // Termination channel to abort sync + terminated chan struct{} // Channel to signal that the syner is dead + + // Callback hooks used during testing + syncStarting func() // callback triggered after a sync cycle is inited but before started +} + +// newSkeleton creates a new sync skeleton that tracks a potentially dangling +// header chain until it's linked into an existing set of blocks. +func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller) *skeleton { + sk := &skeleton{ + db: db, + filler: filler, + peers: peers, + drop: drop, + requests: make(map[uint64]*headerRequest), + headEvents: make(chan *types.Header), + terminate: make(chan chan error), + terminated: make(chan struct{}), + } + go sk.startup() + return sk +} + +// startup is an initial background loop which waits for an event to start or +// tear the syncer down. This is required to make the skeleton sync loop once +// per process but at the same time not start before the beacon chain announces +// a new (existing) head. +func (s *skeleton) startup() { + // Close a notification channel so anyone sending us events will know if the + // sync loop was torn down for good. + defer close(s.terminated) + + // Wait for startup or teardown + select { + case errc := <-s.terminate: + // No head was announced but Geth is shutting down + errc <- nil + return + + case head := <-s.headEvents: + // New head announced, start syncing to it, looping every time a current + // cycle is terminated due to a chain event (head reorg, old chain merge) + s.started = time.Now() + + for { + // If the sync cycle terminated or was terminated, propagate up when + // higher layers request termination. There's no fancy explicit error + // signalling as the sync loop should never terminate (TM). + newhead, err := s.sync(head) + switch { + case err == errSyncLinked: + // Sync cycle linked up to the genesis block. Tear down the loop + // and restart it so, it can properly notify the backfiller. Don't + // account a new head. + head = nil + + case err == errSyncMerged: + // Subchains were merged, we just need to reinit the internal + // start to continue on the tail of the merged chain. Don't + // announce a new head, + head = nil + + case err == errSyncReorged: + // The subchain being synced got modified at the head in a + // way that requires resyncing it. Restart sync with the new + // head to force a cleanup. + head = newhead + + case err == errTerminated: + // Sync was requested to be terminated from within, stop and + // return (no need to pass a message, was already done internally) + return + + default: + // Sync either successfully terminated or failed with an unhandled + // error. Abort and wait until Geth requests a termination. + errc := <-s.terminate + errc <- err + return + } + } + } +} + +// Terminate tears down the syncer indefinitely. +func (s *skeleton) Terminate() error { + // Request termination and fetch any errors + errc := make(chan error) + s.terminate <- errc + err := <-errc + + // Wait for full shutdown (not necessary, but cleaner) + <-s.terminated + return err +} + +// Sync starts or resumes a previous sync cycle to download and maintain a reverse +// header chain starting at the head and leading towards genesis to an available +// ancestor. +// +// This method does not block, rather it just waits until the syncer receives the +// fed header. What the syncer does with it is the syncer's problem. +func (s *skeleton) Sync(head *types.Header) error { + log.Trace("New skeleton head announced", "number", head.Number, "hash", head.Hash()) + select { + case s.headEvents <- head: + return nil + case <-s.terminated: + return errTerminated + } +} + +// sync is the internal version of Sync that executes a single sync cycle, either +// until some termination condition is reached, or until the current cycle merges +// with a previously aborted run. +func (s *skeleton) sync(head *types.Header) (*types.Header, error) { + // If we're continuing a previous merge interrupt, just access the existing + // old state without initing from disk. + if head == nil { + head = rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[0].Head) + } else { + // Otherwise, initialize the sync, trimming and previous leftovers until + // we're consistent with the newly requested chain head + s.initSync(head) + } + // Create the scratch space to fill with concurrently downloaded headers + s.scratchSpace = make([]*types.Header, scratchHeaders) + defer func() { s.scratchSpace = nil }() // don't hold on to references after sync + + s.scratchOwners = make([]string, scratchHeaders/requestHeaders) + defer func() { s.scratchOwners = nil }() // don't hold on to references after sync + + s.scratchHead = s.progress.Subchains[0].Tail - 1 // tail must not be 0! + + // If the sync is already done, resume the backfiller. When the loop stops, + // terminate the backfiller too. + if s.scratchHead == 0 { + s.filler.resume() + } + defer s.filler.suspend() + + // Create a set of unique channels for this sync cycle. We need these to be + // ephemeral so a data race doesn't accidentally deliver something stale on + // a persistent channel across syncs (yup, this happened) + var ( + requestFails = make(chan *headerRequest) + responses = make(chan *headerResponse) + ) + cancel := make(chan struct{}) + defer close(cancel) + + log.Debug("Starting reverse header sync cycle", "head", head.Number, "hash", head.Hash(), "cont", s.scratchHead) + + // Whether sync completed or not, disregard any future packets + defer func() { + log.Debug("Terminating reverse header sync cycle", "head", head.Number, "hash", head.Hash(), "cont", s.scratchHead) + s.requests = make(map[uint64]*headerRequest) + }() + + // Start tracking idle peers for task assignments + peering := make(chan *peeringEvent, 64) // arbitrary buffer, just some burst protection + + peeringSub := s.peers.SubscribeEvents(peering) + defer peeringSub.Unsubscribe() + + s.idles = make(map[string]*peerConnection) + for _, peer := range s.peers.AllPeers() { + s.idles[peer.id] = peer + } + // Nofity any tester listening for startup events + if s.syncStarting != nil { + s.syncStarting() + } + for { + // Something happened, try to assign new tasks to any idle peers + s.assingTasks(responses, requestFails, cancel) + + // Wait for something to happen + select { + case event := <-peering: + // A peer joined or left, the tasks queue and allocations need to be + // checked for potential assignment or reassignment + peerid := event.peer.id + if event.join { + s.idles[peerid] = event.peer + } else { + s.revertRequests(peerid) + delete(s.idles, peerid) + } + + case errc := <-s.terminate: + errc <- nil + return nil, errTerminated + + case head := <-s.headEvents: + // New head was announced, try to integrate it. If successful, nothing + // needs to be done as the head simply extended the last range. For now + // we don't seamlessly integrate reorgs to keep things simple. If the + // network starts doing many mini reorgs, it might be worthwhile handling + // a limited depth without an error. + if reorged := s.processNewHead(head); reorged { + return head, errSyncReorged + } + // New head was integrated into the skeleton chain. If the backfiller + // is still running, it will pick it up. If it already terminated, + // a new cycle needs to be spun up. + if s.scratchHead == 0 { + s.filler.resume() + } + + case req := <-requestFails: + s.revertRequest(req) + + case res := <-responses: + // Process the batch of headers. If though processing we managed to + // link the curret subchain to a previously downloaded one, abort the + // sync and restart with the merged subchains. We could probably hack + // the internal state to switch the scratch space over to the tail of + // the extended subchain, but since the scenario is rare, it's cleaner + // to rely on the restart mechanism than a stateful modification. + if merged := s.processResponse(res); merged { + return nil, errSyncMerged + } + // If we've just reached the genesis block, tear down the sync cycle + // and restart it to resume the backfiller. We could just as well do + // a signalling here, but it's a tad cleaner to have only one entry + // pathway to suspending/resuming it. + return nil, errSyncLinked + } + } +} + +// initSync attempts to get the skeleton sync into a consistent state wrt any +// past state on disk and the newly requested head to sync to. If the new head +// is nil, the method will return and continue from the previous head. +func (s *skeleton) initSync(head *types.Header) { + // Extract the head number, we'll need it all over + number := head.Number.Uint64() + + // Retrieve the previously saved sync progress + if status := rawdb.ReadSkeletonSyncStatus(s.db); len(status) > 0 { + s.progress = new(skeletonProgress) + if err := json.Unmarshal(status, s.progress); err != nil { + log.Error("Failed to decode skeleton sync status", "err", err) + } else { + // Previous sync was available, print some continuation logs + for _, subchain := range s.progress.Subchains { + log.Debug("Restarting skeleton subchain", "head", subchain.Head, "tail", subchain.Tail) + } + // Create a new subchain for the head (unless the last can be extended), + // trimming anything it would overwrite + headchain := &subchain{ + Head: number, + Tail: number, + Next: head.ParentHash, + } + for len(s.progress.Subchains) > 0 { + // If the last chain is above the new head, delete altogether + lastchain := s.progress.Subchains[0] + if lastchain.Tail >= headchain.Tail { + log.Debug("Dropping skeleton subchain", "head", lastchain.Head, "tail", lastchain.Tail) + s.progress.Subchains = s.progress.Subchains[1:] + continue + } + // Otherwise truncate the last chain if needed and abort trimming + if lastchain.Head >= headchain.Tail { + log.Debug("Trimming skeleton subchain", "oldhead", lastchain.Head, "newhead", headchain.Tail-1, "tail", lastchain.Tail) + lastchain.Head = headchain.Tail - 1 + } + break + } + // If the last subchain can be extended, we're lucky. Otherwise create + // a new subchain sync task. + var extended bool + if n := len(s.progress.Subchains); n > 0 { + lastchain := s.progress.Subchains[0] + if lastchain.Head == headchain.Tail-1 { + lasthead := rawdb.ReadSkeletonHeader(s.db, lastchain.Head) + if lasthead.Hash() == head.ParentHash { + log.Debug("Extended skeleton subchain with new head", "head", headchain.Tail, "tail", lastchain.Tail) + lastchain.Head = headchain.Tail + extended = true + } + } + } + if !extended { + log.Debug("Created new skeleton subchain", "head", number, "tail", number) + s.progress.Subchains = append([]*subchain{headchain}, s.progress.Subchains...) + } + // Update the database with the new sync stats and insert the new + // head header. We won't delete any trimmed skeleton headers since + // those will be outside the index space of the many subchains and + // the database space will be reclaimed eventually when processing + // blocks above the current head (TODO(karalabe): don't forget). + batch := s.db.NewBatch() + + rawdb.WriteSkeletonHeader(batch, head) + s.saveSyncStatus(batch) + + if err := batch.Write(); err != nil { + log.Crit("Failed to write skeleton sync status", "err", err) + } + return + } + } + // Either we've failed to decode the previus state, or there was none. Start + // a fresh sync with a single subchain represented by the currently sent + // chain head. + s.progress = &skeletonProgress{ + Subchains: []*subchain{ + { + Head: number, + Tail: number, + Next: head.ParentHash, + }, + }, + } + batch := s.db.NewBatch() + + rawdb.WriteSkeletonHeader(batch, head) + s.saveSyncStatus(batch) + + if err := batch.Write(); err != nil { + log.Crit("Failed to write initial skeleton sync status", "err", err) + } + log.Debug("Created initial skeleton subchain", "head", number, "tail", number) +} + +// saveSyncStatus marshals the remaining sync tasks into leveldb. +func (s *skeleton) saveSyncStatus(db ethdb.KeyValueWriter) { + status, err := json.Marshal(s.progress) + if err != nil { + panic(err) // This can only fail during implementation + } + rawdb.WriteSkeletonSyncStatus(db, status) +} + +// processNewHead does the internal shuffling for a new head marker and either +// accepts and integrates it into the skeleton or requests a reorg. Upon reorg, +// the syncer will tear itself down and restart with a fresh head. It is simpler +// to reconstruct the sync state than to mutate it and hope for the best. +func (s *skeleton) processNewHead(head *types.Header) bool { + // If the header cannot be inserted without interruption, return an error for + // the outer loop to tear down the skeleton sync and restart it + number := head.Number.Uint64() + + lastchain := s.progress.Subchains[0] + if lastchain.Tail >= number { + log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "newHead", number) + return true + } + if lastchain.Head+1 < number { + log.Warn("Beacon chain gapped", "head", lastchain.Head, "newHead", number) + return true + } + if parent := rawdb.ReadSkeletonHeader(s.db, number-1); parent.Hash() != head.ParentHash { + log.Warn("Beacon chain forked", "ancestor", parent.Number, "hash", parent.Hash(), "want", head.ParentHash) + return true + } + // New header seems to be in the last subchain range. Unwind any extra headers + // from the chain tip and insert the new head. We won't delete any trimmed + // skeleton headers since those will be outside the index space of the many + // subchains and the database space will be reclaimed eventually when processing + // blocks above the current head (TODO(karalabe): don't forget). + batch := s.db.NewBatch() + + rawdb.WriteSkeletonHeader(batch, head) + lastchain.Head = number + s.saveSyncStatus(batch) + + if err := batch.Write(); err != nil { + log.Crit("Failed to write skeleton sync status", "err", err) + } + return false +} + +// assingTasks attempts to match idle peers to pending header retrievals. +func (s *skeleton) assingTasks(success chan *headerResponse, fail chan *headerRequest, cancel chan struct{}) { + // Sort the peers by download capacity to use faster ones if many available + idlers := &peerCapacitySort{ + peers: make([]*peerConnection, 0, len(s.idles)), + caps: make([]int, 0, len(s.idles)), + } + targetTTL := s.peers.rates.TargetTimeout() + for _, peer := range s.idles { + idlers.peers = append(idlers.peers, peer) + idlers.caps = append(idlers.caps, s.peers.rates.Capacity(peer.id, eth.BlockHeadersMsg, targetTTL)) + } + if len(idlers.peers) == 0 { + return + } + sort.Sort(idlers) + + // Find header regions not yet downloading and fill them + for task, owner := range s.scratchOwners { + // If we're out of idle peers, stop assigning tasks + if len(idlers.peers) == 0 { + return + } + // Skip any tasks already filling + if owner != "" { + continue + } + // If we've reached the genesis, stop assigning tasks + if uint64(task*requestHeaders) >= s.scratchHead { + return + } + // Found a task and have peers available, assign it + idle := idlers.peers[0] + + idlers.peers = idlers.peers[1:] + idlers.caps = idlers.caps[1:] + + // Matched a pending task to an idle peer, allocate a unique request id + var reqid uint64 + for { + reqid = uint64(rand.Int63()) + if reqid == 0 { + continue + } + if _, ok := s.requests[reqid]; ok { + continue + } + break + } + // Generate the network query and send it to the peer + req := &headerRequest{ + peer: idle.id, + id: reqid, + deliver: success, + revert: fail, + cancel: cancel, + stale: make(chan struct{}), + head: s.scratchHead - uint64(task*requestHeaders), + } + s.requests[reqid] = req + delete(s.idles, idle.id) + + // Generate the network query and send it to the peer + go s.executeTask(idle, req) + + // Inject the request into the task to block further assignments + s.scratchOwners[task] = idle.id + } +} + +// executeTask executes a single fetch request, blocking until either a result +// arrives or a timeouts / cancellation is triggered. The method should be run +// on its own goroutine and will deliver on the requested channels. +func (s *skeleton) executeTask(peer *peerConnection, req *headerRequest) { + start := time.Now() + resCh := make(chan *eth.Response) + + // Figure out how many headers to fetch. Usually this will be a full batch, + // but for the very tail of the chain, trim the request to the number left. + // Since nodes may or may not return the genesis header for a batch request, + // don't even request it. The parent hash of block #1 is enough to link. + requestCount := requestHeaders + if req.head < requestHeaders { + requestCount = int(req.head) + } + peer.log.Trace("Fetching skeleton headers", "from", req.head, "count", requestCount) + netreq, err := peer.peer.RequestHeadersByNumber(req.head, requestCount, 0, true, resCh) + if err != nil { + peer.log.Trace("Failed to request headers", "err", err) + s.scheduleRevertRequest(req) + return + } + defer netreq.Close() + + // Wait until the response arrives, the request is cancelled or times out + ttl := s.peers.rates.TargetTimeout() + + timeoutTimer := time.NewTimer(ttl) + defer timeoutTimer.Stop() + + select { + case <-req.cancel: + peer.log.Debug("Header request cancelled") + s.scheduleRevertRequest(req) + + case <-timeoutTimer.C: + // Header retrieval timed out, update the metrics + peer.log.Trace("Header request timed out", "elapsed", ttl) + headerTimeoutMeter.Mark(1) + s.peers.rates.Update(peer.id, eth.BlockHeadersMsg, 0, 0) + s.scheduleRevertRequest(req) + + case res := <-resCh: + // Headers successfully retrieved, update the metrics + headers := *res.Res.(*eth.BlockHeadersPacket) + + headerReqTimer.Update(time.Since(start)) + s.peers.rates.Update(peer.id, eth.BlockHeadersMsg, res.Time, len(headers)) + + // Cross validate the headers with the requests + switch { + case len(headers) == 0: + // No headers were delivered, reject the response and reschedule + peer.log.Debug("No headers delivered") + res.Done <- errors.New("no headers delivered") + s.scheduleRevertRequest(req) + + case headers[0].Number.Uint64() != req.head: + // Header batch anchored at non-requested number + peer.log.Debug("Invalid header response head", "have", headers[0].Number, "want", req.head) + res.Done <- errors.New("invalid header batch anchor") + s.scheduleRevertRequest(req) + + case headers[0].Number.Uint64() >= requestHeaders && len(headers) != requestHeaders: + // Invalid number of non-genesis headers delivered, reject the response and reschedule + peer.log.Debug("Invalid non-genesis header count", "have", len(headers), "want", requestHeaders) + res.Done <- errors.New("not enough non-genesis headers delivered") + s.scheduleRevertRequest(req) + + case headers[0].Number.Uint64() < requestHeaders && uint64(len(headers)) != headers[0].Number.Uint64(): + // Invalid number of genesis headers delivered, reject the response and reschedule + peer.log.Debug("Invalid genesis header count", "have", len(headers), "want", headers[0].Number.Uint64()) + res.Done <- errors.New("not enough genesis headers delivered") + s.scheduleRevertRequest(req) + + default: + // Packet seems structurally valid, check hash progression and if it + // is correct too, deliver for storage + for i := 0; i < len(headers)-1; i++ { + if headers[i].ParentHash != headers[i+1].Hash() { + peer.log.Debug("Invalid genesis header count", "have", len(headers), "want", headers[0].Number.Uint64()) + res.Done <- errors.New("not enough genesis headers delivered") + s.scheduleRevertRequest(req) + return + } + } + // Hash chain is valid. The delivery might still be junk as we're + // downloading batches concurrently (so no way to link the headers + // until gaps are filled); in that case, we'll nuke the peer when + // we detect the fault. + res.Done <- nil + + select { + case req.deliver <- &headerResponse{ + peer: peer, + reqid: req.id, + headers: headers, + }: + case <-req.cancel: + } + } + } +} + +// revertRequests locates all the currently pending reuqests from a particular +// peer and reverts them, rescheduling for others to fulfill. +func (s *skeleton) revertRequests(peer string) { + // Gather the requests first, revertals need the lock too + var requests []*headerRequest + for _, req := range s.requests { + if req.peer == peer { + requests = append(requests, req) + } + } + // Revert all the requests matching the peer + for _, req := range requests { + s.revertRequest(req) + } +} + +// scheduleRevertRequest asks the event loop to clean up a request and return +// all failed retrieval tasks to the scheduler for reassignment. +func (s *skeleton) scheduleRevertRequest(req *headerRequest) { + select { + case req.revert <- req: + // Sync event loop notified + case <-req.cancel: + // Sync cycle got cancelled + case <-req.stale: + // Request already reverted + } +} + +// revertRequest cleans up a request and returns all failed retrieval tasks to +// the scheduler for reassignment. +// +// Note, this needs to run on the event runloop thread to reschedule to idle peers. +// On peer threads, use scheduleRevertRequest. +func (s *skeleton) revertRequest(req *headerRequest) { + log.Trace("Reverting header request", "peer", req.peer, "reqid", req.id) + select { + case <-req.stale: + log.Trace("Header request already reverted", "peer", req.peer, "reqid", req.id) + return + default: + } + close(req.stale) + + // Remove the request from the tracked set + delete(s.requests, req.id) + + // Remove the request from the tracked set and mark the task as not-pending, + // ready for resheduling + s.scratchOwners[(s.scratchHead-req.head)/requestHeaders] = "" +} + +func (s *skeleton) processResponse(res *headerResponse) bool { + res.peer.log.Trace("Processing header response", "head", res.headers[0].Number, "hash", res.headers[0].Hash(), "count", len(res.headers)) + + // Whether or not the response is valid, we can mark the peer as idle and + // notify the scheduler to assign a new task. If the response is invalid, + // we'll drop the peer in a bit. + s.idles[res.peer.id] = res.peer + + // Ensure the response is for a valid request + if _, ok := s.requests[res.reqid]; !ok { + // Request stale, perhaps the peer timed out but came through in the end + res.peer.log.Warn("Unexpected header packet") + return false + } + delete(s.requests, res.reqid) + + // Insert the delivered headers into the scratch space independent of the + // content or continuation; those will be validated in a moment + head := res.headers[0].Number.Uint64() + copy(s.scratchSpace[s.scratchHead-head:], res.headers) + + // If there's still a gap in the head of the scratch space, abort + if s.scratchSpace[0] == nil { + return false + } + // Try to consume any head headers, validating the boundary conditions + var merged bool // Whether subchains were merged + + batch := s.db.NewBatch() + for s.scratchSpace[0] != nil { + // Next batch of headers available, cross-reference with the subchain + // we are extending and either accept or discard + if s.progress.Subchains[0].Next != s.scratchSpace[0].Hash() { + // Print a log messages to track what's going on + tail := s.progress.Subchains[0].Tail + want := s.progress.Subchains[0].Next + have := s.scratchSpace[0].Hash() + + log.Warn("Invalid skeleton headers", "peer", s.scratchOwners[0], "number", tail-1, "want", want, "have", have) + + // The peer delivered junk, or at least not the subchain we are + // syncing to. Free up the scratch space and assignment, reassign + // and drop the original peer. + for i := 0; i < requestHeaders; i++ { + s.scratchSpace[i] = nil + } + s.drop(s.scratchOwners[0]) + s.scratchOwners[0] = "" + break + } + // Scratch delivery matches required subchain, deliver the batch of + // headers and push the subchain forward + var consumed int + for _, header := range s.scratchSpace[:requestHeaders] { + if header != nil { // nil when the genesis is reached + consumed++ + + rawdb.WriteSkeletonHeader(batch, header) + s.pulled++ + + s.progress.Subchains[0].Tail-- + s.progress.Subchains[0].Next = header.ParentHash + } + } + // Batch of headers consumed, shift the download window forward + head := s.progress.Subchains[0].Head + tail := s.progress.Subchains[0].Tail + next := s.progress.Subchains[0].Next + + log.Trace("Primary subchain extended", "head", head, "tail", tail, "next", next) + + copy(s.scratchSpace, s.scratchSpace[requestHeaders:]) + for i := 0; i < requestHeaders; i++ { + s.scratchSpace[scratchHeaders-i-1] = nil + } + copy(s.scratchOwners, s.scratchOwners[1:]) + s.scratchOwners[scratchHeaders/requestHeaders-1] = "" + + s.scratchHead -= uint64(consumed) + + // If the subchain extended into the next subchain, we need to handle + // the overlap. Since there could be many overlaps (come on), do this + // in a loop. + for len(s.progress.Subchains) > 1 && s.progress.Subchains[1].Head >= s.progress.Subchains[0].Tail { + // Extract some stats from the second subchain + head := s.progress.Subchains[1].Head + tail := s.progress.Subchains[1].Tail + next := s.progress.Subchains[1].Next + + // Since we just overwrote part of the next subchain, we need to trim + // its head independent of matching or mismatching content + if s.progress.Subchains[1].Tail >= s.progress.Subchains[0].Tail { + // Fully overwritten, get rid of the subchain as a whole + log.Debug("Previous subchain fully overwritten", "head", head, "tail", tail, "next", next) + s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...) + continue + } else { + // Partially overwritten, trim the head to the overwritten size + log.Debug("Previous subchain partially overwritten", "head", head, "tail", tail, "next", next) + s.progress.Subchains[1].Head = s.progress.Subchains[0].Tail - 1 + } + // If the old subchain is an extension of the new one, merge the two + // and let the skeleton syncer restart (to clean internal state) + if rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[1].Head).Hash() == s.progress.Subchains[0].Next { + log.Debug("Previous subchain merged", "head", head, "tail", tail, "next", next) + s.progress.Subchains[0].Tail = s.progress.Subchains[1].Tail + s.progress.Subchains[0].Next = s.progress.Subchains[1].Next + + s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...) + merged = true + } + } + } + s.saveSyncStatus(batch) + if err := batch.Write(); err != nil { + log.Crit("Failed to write skeleton headers and progress", "err", err) + } + // Print a progress report to make the UX a bit nicer + left := s.progress.Subchains[0].Tail - 1 + if time.Since(s.logged) > 8*time.Second || left == 0 { + s.logged = time.Now() + + if s.pulled == 0 { + log.Info("Beacon sync starting", "left", left) + } else { + eta := float64(time.Since(s.started)) / float64(s.pulled) * float64(left) + log.Info("Syncing beacon headers", "downloaded", s.pulled, "left", left, "eta", common.PrettyDuration(eta)) + } + } + return merged +} + +// Head retrieves the current head tracked by the skeleton syncer. This method +// is meant to be used by the backfiller, whose life cycle is controlled by the +// skeleton syncer. +// +// Note, the method will not use the internal state of the skeleton, but will +// rather blindly pull stuff from the database. This is fine, because the back- +// filler will only run when the skeleton chain is fully downloaded and stable. +// There might be new heads appended, but those are stomic from the perspective +// of this method. Any head reorg will first tear down the backfiller and only +// then make the modification. +func (s *skeleton) Head() (*types.Header, error) { + // Read the current sync progress from disk and figure out the current head. + // Although there's a lot of error handling here, these are mostly as sanity + // checks to avoid crashing if a programming error happens. These should not + // happen in live code. + status := rawdb.ReadSkeletonSyncStatus(s.db) + if len(status) == 0 { + return nil, errors.New("beacon sync not yet started") + } + s.progress = new(skeletonProgress) + if err := json.Unmarshal(status, s.progress); err != nil { + return nil, err + } + if s.progress.Subchains[0].Tail != 1 { + return nil, errors.New("beacon sync not yet finished") + } + return rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[0].Head), nil +} + +// Header retrieves a specific header tracked by the skeleton syncer. This method +// is meant to be used by the backfiller, whose life cycle is controlled by the +// skeleton syncer. +// +// Note, outside the permitted runtimes, this method might return nil results and +// subsequent calls might return headers from different chains. +func (s *skeleton) Header(number uint64) *types.Header { + return rawdb.ReadSkeletonHeader(s.db, number) +} diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go new file mode 100644 index 000000000000..7c2e07a43866 --- /dev/null +++ b/eth/downloader/skeleton_test.go @@ -0,0 +1,257 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "encoding/json" + "math/big" + "os" + "testing" + + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +// hookedBackfiller is a tester backfiller with all interface methods mocked and +// hooked so tests can implement only the things they need. +type hookedBackfiller struct { + // suspendHook is an optional hook to be called when the filler is requested + // to be suspended. + suspendHook func() + + // resumeHook is an optional hook to be called when the filler is requested + // to be resumed. + resumeHook func() +} + +// suspend requests the backfiller to abort any running full or snap sync +// based on the skeleton chain as it might be invalid. The backfiller should +// gracefully handle multiple consecutive suspends without a resume, even +// on initial sartup. +func (hf *hookedBackfiller) suspend() { + if hf.suspendHook != nil { + hf.suspendHook() + } +} + +// resume requests the backfiller to start running fill or snap sync based on +// the skeleton chain as it has successfully been linked. Appending new heads +// to the end of the chain will not result in suspend/resume cycles. +func (hf *hookedBackfiller) resume() { + if hf.resumeHook != nil { + hf.resumeHook() + } +} + +// newNoopBackfiller creates a hooked backfiller with all callbacks disabled, +// essentially acting as a noop. +func newNoopBackfiller() backfiller { + return new(hookedBackfiller) +} + +// Tests various sync initialzations based on previous leftovers in the database +// and announced heads. +func TestSkeletonSyncInit(t *testing.T) { + log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + + // Create a few key headers + var ( + genesis = &types.Header{Number: big.NewInt(0)} + block49 = &types.Header{Number: big.NewInt(49)} + block49B = &types.Header{Number: big.NewInt(49), Extra: []byte("B")} + block50 = &types.Header{Number: big.NewInt(50), ParentHash: block49.Hash()} + ) + tests := []struct { + headers []*types.Header // Database content (beside the genesis) + oldstate []*subchain // Old sync state with various interrupted subchains + head *types.Header // New head header to announce to reorg to + newstate []*subchain // Expected sync state after the reorg + }{ + // Completely empty database with only the genesis set. The sync is expected + // to create a single subchain with the requested head. + { + head: block50, + newstate: []*subchain{{Head: 50, Tail: 50}}, + }, + // Empty database with only the genesis set with a leftover empty sync + // progess. This is a synthetic case, just for the sake of covering things. + { + oldstate: []*subchain{}, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 50}}, + }, + // A single leftover subchain is present, older than the new head. The + // old subchain should be left as is and a new one appended to the sync + // status. + { + oldstate: []*subchain{{Head: 10, Tail: 5}}, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 10, Tail: 5}, + }, + }, + // Multiple leftover subchains are present, older than the new head. The + // old subchains should be left as is and a new one appended to the sync + // status. + { + oldstate: []*subchain{ + {Head: 20, Tail: 15}, + {Head: 10, Tail: 5}, + }, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 20, Tail: 15}, + {Head: 10, Tail: 5}, + }, + }, + // A single leftover subchain is present, newer than the new head. The + // newer subchain should be deleted and a fresh one created for the head. + { + oldstate: []*subchain{{Head: 65, Tail: 60}}, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 50}}, + }, + // Multiple leftover subchain is present, newer than the new head. The + // newer subchains should be deleted and a fresh one created for the head. + { + oldstate: []*subchain{ + {Head: 75, Tail: 70}, + {Head: 65, Tail: 60}, + }, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 50}}, + }, + + // Two leftover subchains are present, one fully older and one fully + // newer than the announced head. The head should delete the newer one, + // keeping the older one. + { + oldstate: []*subchain{ + {Head: 65, Tail: 60}, + {Head: 10, Tail: 5}, + }, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 10, Tail: 5}, + }, + }, + // Multiple leftover subchains are present, some fully older and some + // fully newer than the announced head. The head should delete the newer + // ones, keeping the older ones. + { + oldstate: []*subchain{ + {Head: 75, Tail: 70}, + {Head: 65, Tail: 60}, + {Head: 20, Tail: 15}, + {Head: 10, Tail: 5}, + }, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 20, Tail: 15}, + {Head: 10, Tail: 5}, + }, + }, + // A single leftover subchain is present and the new head is extending + // it with one more header. We expect the subchain head to be pushed + // forward. + { + headers: []*types.Header{block49}, + oldstate: []*subchain{{Head: 49, Tail: 5}}, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 5}}, + }, + // A single leftover subchain is present and although the new head does + // extend it number wise, the hash chain does not link up. We expect a + // new subchain to be created for the dangling head. + { + headers: []*types.Header{block49B}, + oldstate: []*subchain{{Head: 49, Tail: 5}}, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 49, Tail: 5}, + }, + }, + // A single leftover subchain is present. A new head is announced that + // links into the middle of it, correctly anchoring into an existing + // header. We expect the old subchain to be truncated and extended with + // the new head. + { + headers: []*types.Header{block49}, + oldstate: []*subchain{{Head: 100, Tail: 5}}, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 5}}, + }, + // A single leftover subchain is present. A new head is announced that + // links into the middle of it, but does not anchor into an existing + // header. We expect the old subchain to be truncated and a new chain + // be created for the dangling head. + { + headers: []*types.Header{block49B}, + oldstate: []*subchain{{Head: 100, Tail: 5}}, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 49, Tail: 5}, + }, + }, + } + for i, tt := range tests { + // Create a fresh database and initialize it with the starting state + db := rawdb.NewMemoryDatabase() + + rawdb.WriteHeader(db, genesis) + for _, header := range tt.headers { + rawdb.WriteSkeletonHeader(db, header) + } + if tt.oldstate != nil { + blob, _ := json.Marshal(&skeletonProgress{Subchains: tt.oldstate}) + rawdb.WriteSkeletonSyncStatus(db, blob) + } + // Create a skeleton sync and run a cycle + wait := make(chan struct{}) + + skeleton := newSkeleton(db, newPeerSet(), func(string) {}, newNoopBackfiller()) + skeleton.syncStarting = func() { close(wait) } + skeleton.Sync(tt.head) + + <-wait + skeleton.Terminate() + + // Ensure the correct resulting sync status + var progress skeletonProgress + json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) + + if len(progress.Subchains) != len(tt.newstate) { + t.Errorf("test %d: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.newstate)) + continue + } + for j := 0; j < len(progress.Subchains); j++ { + if progress.Subchains[j].Head != tt.newstate[j].Head { + t.Errorf("test %d: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.newstate[j].Head) + } + if progress.Subchains[j].Tail != tt.newstate[j].Tail { + t.Errorf("test %d: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.newstate[j].Tail) + } + } + } +} diff --git a/eth/handler.go b/eth/handler.go index 921a62dba501..1e0c543d54a6 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -171,10 +171,30 @@ func newHandler(config *handlerConfig) (*handler, error) { h.checkpointNumber = (config.Checkpoint.SectionIndex+1)*params.CHTFrequency - 1 h.checkpointHash = config.Checkpoint.SectionHead } + // If sync succeeds, pass a callback to potentially disable snap sync mode + // and enable transaction propagation. + success := func() { + // If we were running snap sync and it finished, disable doing another + // round on next sync cycle + if atomic.LoadUint32(&h.snapSync) == 1 { + log.Info("Snap sync complete, auto disabling") + atomic.StoreUint32(&h.snapSync, 0) + } + // If we've successfully finished a sync cycle and passed any required + // checkpoint, enable accepting transactions from the network + head := h.chain.CurrentBlock() + if head.NumberU64() >= h.checkpointNumber { + // Checkpoint passed, sanity check the timestamp to have a fallback mechanism + // for non-checkpointed (number = 0) private networks. + if head.Time() >= uint64(time.Now().AddDate(0, -1, 0).Unix()) { + atomic.StoreUint32(&h.acceptTxs, 1) + } + } + } // Construct the downloader (long sync) and its backing state bloom if snap // sync is requested. The downloader is responsible for deallocating the state // bloom when it's done. - h.downloader = downloader.New(h.checkpointNumber, config.Database, h.eventMux, h.chain, nil, h.removePeer) + h.downloader = downloader.New(h.checkpointNumber, config.Database, h.eventMux, h.chain, nil, h.removePeer, success) // Construct the fetcher (short sync) validator := func(header *types.Header) error { diff --git a/eth/sync.go b/eth/sync.go index b8ac67d3b2d1..384b42bfaa80 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -113,9 +113,9 @@ func (cs *chainSyncer) loop() { defer cs.force.Stop() for { - if op := cs.nextSyncOp(); op != nil { - cs.startSync(op) - } + //if op := cs.nextSyncOp(); op != nil { + // cs.startSync(op) + //} select { case <-cs.peerEventCh: // Peer information changed, recheck. @@ -227,7 +227,7 @@ func (h *handler) doSync(op *chainSyncOp) error { } } // Run the sync cycle, and disable snap sync if we're past the pivot block - err := h.downloader.Synchronise(op.peer.ID(), op.head, op.td, op.mode) + err := h.downloader.LegacySync(op.peer.ID(), op.head, op.td, op.mode) if err != nil { return err } diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index c4bdbaeb8d20..acc38dcd40ee 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -152,6 +152,11 @@ web3._extend({ call: 'admin_importChain', params: 1 }), + new web3._extend.Method({ + name: 'newHead', + call: 'admin_newHead', + params: 1 + }), new web3._extend.Method({ name: 'sleepBlocks', call: 'admin_sleepBlocks', From 8fcf2f78a31d12952439edf4d7cd75617f7a5222 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Fri, 12 Nov 2021 13:43:00 +0100 Subject: [PATCH 2/5] eth/catalyst: reenable sync --- core/blockchain.go | 11 +++++++++++ eth/catalyst/api.go | 18 ++++++++++++------ eth/downloader/beaconsync.go | 7 +++++-- eth/downloader/downloader.go | 2 +- eth/sync.go | 6 +++--- 5 files changed, 32 insertions(+), 12 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 8da7cc22923f..8396cb511328 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/consensus/beacon" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state/snapshot" @@ -2265,6 +2266,16 @@ func (bc *BlockChain) reportBlock(block *types.Block, receipts types.Receipts, e i, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.ContractAddress.Hex(), receipt.Status, receipt.TxHash.Hex(), receipt.Logs, receipt.Bloom, receipt.PostState) } + if eng, ok := bc.engine.(*beacon.Beacon); ok { + if eng.IsPoSHeader(block.Header()) { + fmt.Println("PoSHeader") + } + if reached, err := beacon.IsTTDReached(bc, block.ParentHash(), block.NumberU64()-1); reached { + fmt.Println("TTDD reached") + } else if err != nil { + fmt.Printf("TTDReached error: %v\n", err) + } + } log.Error(fmt.Sprintf(` ########## BAD BLOCK ######### Chain config: %v diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 1c3d65a1ce79..a530ed556046 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -260,13 +260,17 @@ func (api *ConsensusAPI) ExecutePayloadV1(params ExecutableDataV1) (ExecutePaylo } return ExecutePayloadResponse{Status: VALID.Status, LatestValidHash: block.Hash()}, nil } + if api.eth.BlockChain().GetBlockByHash(params.BlockHash) != nil { + log.Info("Ignoring already processed block", "number", params.Number, "hash", params.BlockHash) + return ExecutePayloadResponse{Status: VALID.Status, LatestValidHash: block.Hash()}, nil + } if !api.eth.BlockChain().HasBlock(block.ParentHash(), block.NumberU64()-1) { - /* - TODO (MariusVanDerWijden) reenable once sync is merged - if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), block.Header()); err != nil { - return SYNCING, err - } - */ + + //TODO (MariusVanDerWijden) reenable once sync is merged + if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), block.Header()); err != nil { + return ExecutePayloadResponse{Status: SYNCING.Status, LatestValidHash: common.Hash{}}, err + } + // TODO (MariusVanDerWijden) we should return nil here not empty hash return ExecutePayloadResponse{Status: SYNCING.Status, LatestValidHash: common.Hash{}}, nil } @@ -437,6 +441,8 @@ func ExecutableDataToBlock(params ExecutableDataV1) (*types.Block, error) { } block := types.NewBlockWithHeader(header).WithBody(txs, nil /* uncles */) if block.Hash() != params.BlockHash { + log.Warn("Invalid Payload", "ParentHash", header.ParentHash, "UncleHash", header.UncleHash, "Coinbase", header.Coinbase, "Root", header.Root, "TxHash", header.TxHash, "ReceiptHash", header.ReceiptHash, "Bloom", header.Bloom, "Difficulty", header.Difficulty, "Number", header.Number, "GasLimit", header.GasLimit, "GasUsed", header.GasUsed, "Time", header.Time, "BaseFee", header.BaseFee, "Extra", header.Extra, "MixDigest", header.MixDigest) + log.Warn("PayloadParams", "ParentHash", params.ParentHash, "Coinbase", params.FeeRecipient, "Root", params.StateRoot, "ReceiptHash", params.ReceiptsRoot, "Bloom", params.LogsBloom, "Number", params.Number, "GasLimit", params.GasLimit, "GasUsed", params.GasUsed, "Time", params.Timestamp, "BaseFee", params.BaseFeePerGas, "Extra", params.ExtraData, "MixDigest", params.Random) return nil, fmt.Errorf("blockhash mismatch, want %x, got %x", params.BlockHash, block.Hash()) } return block, nil diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index 6c0b76016053..666561b0ae8e 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -189,12 +189,15 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { for { // Retrieve a batch of headers and feed it to the header processor headers := make([]*types.Header, 0, maxHeadersProcess) + hashes := make([]common.Hash, 0, cap(headers)) for i := 0; i < maxHeadersProcess && from <= head.Number.Uint64(); i++ { - headers = append(headers, d.skeleton.Header(from)) + header := d.skeleton.Header(from) + headers = append(headers, header) + hashes = append(hashes, header.Hash()) from++ } select { - case d.headerProcCh <- headers: + case d.headerProcCh <- &headerTask{headers: headers, hashes: hashes}: case <-d.cancelCh: return errCanceled } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 6aaae32d0a1b..763f6ddfb259 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -557,7 +557,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I var headerFetcher func() error if !beaconMode { // In legacy mode, headers are retrieved from the network - headerFetcher = func() error { return d.fetchHeaders(p, origin+1) } + headerFetcher = func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) } } else { // In beacon mode, headers are served by the skeleton syncer headerFetcher = func() error { return d.fetchBeaconHeaders(origin + 1) } diff --git a/eth/sync.go b/eth/sync.go index 384b42bfaa80..85cdad10a4dd 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -113,9 +113,9 @@ func (cs *chainSyncer) loop() { defer cs.force.Stop() for { - //if op := cs.nextSyncOp(); op != nil { - // cs.startSync(op) - //} + if op := cs.nextSyncOp(); op != nil { + cs.startSync(op) + } select { case <-cs.peerEventCh: // Peer information changed, recheck. From c5bb9f7bc63632dc0bda87414a21f70be790b015 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Tue, 7 Dec 2021 11:59:12 +0100 Subject: [PATCH 3/5] eth/catalyst: add test --- eth/catalyst/api_test.go | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index 6e52c4fea27d..141c6a00334b 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -185,6 +185,31 @@ func checkLogEvents(t *testing.T, logsCh <-chan []*types.Log, rmLogsCh <-chan co } } +func TestInvalidPayloadTimestamp(t *testing.T) { + genesis, preMergeBlocks := generatePreMergeChain(10) + n, ethservice := startEthService(t, genesis, preMergeBlocks) + ethservice.Merger().ReachTTD() + defer n.Close() + var ( + api = NewConsensusAPI(ethservice, nil) + parent = ethservice.BlockChain().CurrentBlock() + ) + params := PayloadAttributesV1{ + Timestamp: parent.Time() - 1, + Random: crypto.Keccak256Hash([]byte{byte(123)}), + SuggestedFeeRecipient: parent.Coinbase(), + } + fcState := ForkchoiceStateV1{ + HeadBlockHash: parent.Hash(), + SafeBlockHash: common.Hash{}, + FinalizedBlockHash: common.Hash{}, + } + _, err := api.ForkchoiceUpdatedV1(fcState, ¶ms) + if err == nil { + t.Fatalf("expected error preparing payload with invalid timestamp, err=%v", err) + } +} + func TestEth2NewBlock(t *testing.T) { genesis, preMergeBlocks := generatePreMergeChain(10) n, ethservice := startEthService(t, genesis, preMergeBlocks) @@ -414,6 +439,5 @@ func TestFullAPI(t *testing.T) { t.Fatalf("Chain head should be updated") } parent = ethservice.BlockChain().CurrentBlock() - } } From e7c8d77efa6774d9490b99e01161bdc88578f1ca Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Fri, 31 Dec 2021 12:28:39 +0100 Subject: [PATCH 4/5] eth/downloader: log instead of panic in beaconsync --- eth/downloader/beaconsync.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index 666561b0ae8e..17c3ce292cdc 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -159,6 +159,10 @@ func (d *Downloader) findBeaconAncestor() uint64 { check := (start + end) / 2 h := d.skeleton.Header(check) + if h == nil { + log.Warn("Header not found in skeleton", "check", check, "head", number) + return 0 + } n := h.Number.Uint64() var known bool From 63a89820ca9721ef2256859391db5a5920faf16c Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Fri, 31 Dec 2021 13:46:30 +0100 Subject: [PATCH 5/5] core/rawdb: account for skeleton data --- core/rawdb/database.go | 6 +++++- core/rawdb/schema.go | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 5ef64d26a205..c5ccd6da5657 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -322,6 +322,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { preimages stat bloomBits stat cliqueSnaps stat + skeleton stat // Ancient store statistics ancientHeadersSize common.StorageSize @@ -379,6 +380,8 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { bloomBits.Add(size) case bytes.HasPrefix(key, BloomBitsIndexPrefix): bloomBits.Add(size) + case bytes.HasPrefix(key, skeletonHeaderPrefix) && len(key) == (len(skeletonHeaderPrefix)+8): + skeleton.Add(size) case bytes.HasPrefix(key, []byte("clique-")) && len(key) == 7+common.HashLength: cliqueSnaps.Add(size) case bytes.HasPrefix(key, []byte("cht-")) || @@ -395,7 +398,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { databaseVersionKey, headHeaderKey, headBlockKey, headFastBlockKey, lastPivotKey, fastTrieProgressKey, snapshotDisabledKey, SnapshotRootKey, snapshotJournalKey, snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, - uncleanShutdownKey, badBlockKey, transitionStatusKey, + uncleanShutdownKey, badBlockKey, transitionStatusKey, snapshotSyncStatusKey, skeletonSyncStatusKey, } { if bytes.Equal(key, meta) { metadata.Add(size) @@ -432,6 +435,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { {"Key-Value store", "Bodies", bodies.Size(), bodies.Count()}, {"Key-Value store", "Receipt lists", receipts.Size(), receipts.Count()}, {"Key-Value store", "Difficulties", tds.Size(), tds.Count()}, + {"Key-Value store", "Skeleton", skeleton.Size(), skeleton.Count()}, {"Key-Value store", "Block number->hash", numHashPairings.Size(), numHashPairings.Count()}, {"Key-Value store", "Block hash->number", hashNumPairings.Size(), hashNumPairings.Count()}, {"Key-Value store", "Transaction index", txLookups.Size(), txLookups.Count()}, diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index f869f36e1040..b43db7ab955f 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -95,7 +95,7 @@ var ( SnapshotAccountPrefix = []byte("a") // SnapshotAccountPrefix + account hash -> account trie value SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value CodePrefix = []byte("c") // CodePrefix + code hash -> account code - skeletonHeaderPrefix = []byte("S") // skeletonHeaderPrefox + num (uint64 big endian) -> header + skeletonHeaderPrefix = []byte("S") // skeletonHeaderPrefix + num (uint64 big endian) -> header PreimagePrefix = []byte("secure-key-") // PreimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db