From 73f5bcb75b562fcb3c109dd9c51f21956bc1f1f4 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Thu, 28 Sep 2023 15:00:53 +0800 Subject: [PATCH] core, accounts, eth, trie: handle genesis state missing (#28171) * core, accounts, eth, trie: handle genesis state missing * core, eth, trie: polish * core: manage txpool subscription in mainpool * eth/backend: fix test * cmd, eth: fix test * core/rawdb, trie/triedb/pathdb: address comments * eth, trie: address comments * eth: inline the function * eth: use synced flag * core/txpool: revert changes in txpool * core, eth, trie: rename functions --- accounts/abi/bind/backends/simulated.go | 17 ++-- cmd/devp2p/internal/ethtest/suite_test.go | 1 + core/blockchain.go | 54 ++++-------- core/rawdb/accessors_sync.go | 22 +++++ core/rawdb/database.go | 2 +- core/rawdb/schema.go | 3 + core/txpool/blobpool/blobpool.go | 6 ++ core/txpool/legacypool/legacypool.go | 15 +++- eth/api_backend.go | 10 ++- eth/backend.go | 2 +- eth/downloader/downloader.go | 4 +- eth/handler.go | 50 +++++------ eth/handler_eth.go | 2 +- eth/handler_eth_test.go | 4 +- eth/sync.go | 23 ++--- miner/miner_test.go | 7 +- trie/database.go | 20 ++++- trie/triedb/pathdb/database.go | 101 +++++++++++++++------- trie/triedb/pathdb/database_test.go | 37 ++++---- trie/triedb/pathdb/errors.go | 10 ++- trie/triedb/pathdb/journal.go | 2 +- 21 files changed, 244 insertions(+), 148 deletions(-) diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 0c4342c494..dbdcd17823 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -199,7 +199,6 @@ func (b *SimulatedBackend) CodeAt(ctx context.Context, contract common.Address, if err != nil { return nil, err } - return stateDB.GetCode(contract), nil } @@ -212,7 +211,6 @@ func (b *SimulatedBackend) BalanceAt(ctx context.Context, contract common.Addres if err != nil { return nil, err } - return stateDB.GetBalance(contract), nil } @@ -225,7 +223,6 @@ func (b *SimulatedBackend) NonceAt(ctx context.Context, contract common.Address, if err != nil { return 0, err } - return stateDB.GetNonce(contract), nil } @@ -238,7 +235,6 @@ func (b *SimulatedBackend) StorageAt(ctx context.Context, contract common.Addres if err != nil { return nil, err } - val := stateDB.GetState(contract, key) return val[:], nil } @@ -700,8 +696,10 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa } block.AddTxWithChain(b.blockchain, tx) }) - stateDB, _ := b.blockchain.State() - + stateDB, err := b.blockchain.State() + if err != nil { + return err + } b.pendingBlock = blocks[0] b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil) b.pendingReceipts = receipts[0] @@ -821,11 +819,12 @@ func (b *SimulatedBackend) AdjustTime(adjustment time.Duration) error { blocks, _ := core.GenerateChain(b.config, block, ethash.NewFaker(), b.database, 1, func(number int, block *core.BlockGen) { block.OffsetTime(int64(adjustment.Seconds())) }) - stateDB, _ := b.blockchain.State() - + stateDB, err := b.blockchain.State() + if err != nil { + return err + } b.pendingBlock = blocks[0] b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil) - return nil } diff --git a/cmd/devp2p/internal/ethtest/suite_test.go b/cmd/devp2p/internal/ethtest/suite_test.go index c5bcc3db1d..7890c31348 100644 --- a/cmd/devp2p/internal/ethtest/suite_test.go +++ b/cmd/devp2p/internal/ethtest/suite_test.go @@ -120,6 +120,7 @@ func setupGeth(stack *node.Node) error { if err != nil { return err } + backend.SetSynced() _, err = backend.BlockChain().InsertChain(chain.blocks[1:]) return err diff --git a/core/blockchain.go b/core/blockchain.go index e371e8d926..067f44d1f1 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -337,17 +337,17 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis if err := bc.loadLastState(); err != nil { return nil, err } - // Make sure the state associated with the block is available + // Make sure the state associated with the block is available, or log out + // if there is no available state, waiting for state sync. head := bc.CurrentBlock() if !bc.HasState(head.Root) { if head.Number.Uint64() == 0 { // The genesis state is missing, which is only possible in the path-based - // scheme. This situation occurs when the state syncer overwrites it. - // - // The solution is to reset the state to the genesis state. Although it may not - // match the sync target, the state healer will later address and correct any - // inconsistencies. - bc.resetState() + // scheme. This situation occurs when the initial state sync is not finished + // yet, or the chain head is rewound below the pivot point. In both scenario, + // there is no possible recovery approach except for rerunning a snap sync. + // Do nothing here until the state syncer picks it up. + log.Info("Genesis state is missing, wait state sync") } else { // Head state is missing, before the state recovery, find out the // disk layer point of snapshot(if it's enabled). Make sure the @@ -630,28 +630,6 @@ func (bc *BlockChain) SetSafe(header *types.Header) { } } -// resetState resets the persistent state to genesis state if it's not present. -func (bc *BlockChain) resetState() { - // Short circuit if the genesis state is already present. - root := bc.genesisBlock.Root() - if bc.HasState(root) { - return - } - // Reset the state database to empty for committing genesis state. - // Note, it should only happen in path-based scheme and Reset function - // is also only call-able in this mode. - if bc.triedb.Scheme() == rawdb.PathScheme { - if err := bc.triedb.Reset(types.EmptyRootHash); err != nil { - log.Crit("Failed to clean state", "err", err) // Shouldn't happen - } - } - // Write genesis state into database. - if err := CommitGenesisState(bc.db, bc.triedb, bc.genesisBlock.Hash()); err != nil { - log.Crit("Failed to commit genesis state", "err", err) - } - log.Info("Reset state to genesis", "root", root) -} - // setHeadBeyondRoot rewinds the local chain to a new head with the extra condition // that the rewind must pass the specified state root. This method is meant to be // used when rewinding with snapshots enabled to ensure that we go back further than @@ -687,7 +665,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha if newHeadBlock == nil { log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash()) newHeadBlock = bc.genesisBlock - bc.resetState() } else { // Block exists, keep rewinding until we find one with state, // keeping rewinding until we exceed the optional threshold @@ -715,16 +692,14 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha } } if beyondRoot || newHeadBlock.NumberU64() == 0 { - if newHeadBlock.NumberU64() == 0 { - bc.resetState() - } else if !bc.HasState(newHeadBlock.Root()) { + if !bc.HasState(newHeadBlock.Root()) && bc.stateRecoverable(newHeadBlock.Root()) { // Rewind to a block with recoverable state. If the state is // missing, run the state recovery here. if err := bc.triedb.Recover(newHeadBlock.Root()); err != nil { log.Crit("Failed to rollback state", "err", err) // Shouldn't happen } + log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash()) } - log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash()) break } log.Debug("Skipping block with threshold state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "root", newHeadBlock.Root()) @@ -739,6 +714,15 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // to low, so it's safe to update in-memory markers directly. bc.currentBlock.Store(newHeadBlock.Header()) headBlockGauge.Update(int64(newHeadBlock.NumberU64())) + + // The head state is missing, which is only possible in the path-based + // scheme. This situation occurs when the chain head is rewound below + // the pivot point. In this scenario, there is no possible recovery + // approach except for rerunning a snap sync. Do nothing here until the + // state syncer picks it up. + if !bc.HasState(newHeadBlock.Root()) { + log.Info("Chain is stateless, wait state sync", "number", newHeadBlock.Number(), "hash", newHeadBlock.Hash()) + } } // Rewind the snap block in a simpleton way to the target head if currentSnapBlock := bc.CurrentSnapBlock(); currentSnapBlock != nil && header.Number.Uint64() < currentSnapBlock.Number.Uint64() { @@ -838,7 +822,7 @@ func (bc *BlockChain) SnapSyncCommitHead(hash common.Hash) error { // Reset the trie database with the fresh snap synced state. root := block.Root() if bc.triedb.Scheme() == rawdb.PathScheme { - if err := bc.triedb.Reset(root); err != nil { + if err := bc.triedb.Enable(root); err != nil { return err } } diff --git a/core/rawdb/accessors_sync.go b/core/rawdb/accessors_sync.go index 7a7374e168..2dc08b3b72 100644 --- a/core/rawdb/accessors_sync.go +++ b/core/rawdb/accessors_sync.go @@ -76,3 +76,25 @@ func DeleteSkeletonHeader(db ethdb.KeyValueWriter, number uint64) { log.Crit("Failed to delete skeleton header", "err", err) } } + +const ( + StateSyncUnknown = uint8(0) // flags the state snap sync is unknown + StateSyncRunning = uint8(1) // flags the state snap sync is not completed yet + StateSyncFinished = uint8(2) // flags the state snap sync is completed +) + +// ReadSnapSyncStatusFlag retrieves the state snap sync status flag. +func ReadSnapSyncStatusFlag(db ethdb.KeyValueReader) uint8 { + blob, err := db.Get(snapSyncStatusFlagKey) + if err != nil || len(blob) != 1 { + return StateSyncUnknown + } + return blob[0] +} + +// WriteSnapSyncStatusFlag stores the state snap sync status flag into database. +func WriteSnapSyncStatusFlag(db ethdb.KeyValueWriter, flag uint8) { + if err := db.Put(snapSyncStatusFlagKey, []byte{flag}); err != nil { + log.Crit("Failed to store sync status flag", "err", err) + } +} diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 3839e949ed..e97eeb2aa3 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -555,7 +555,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { lastPivotKey, fastTrieProgressKey, snapshotDisabledKey, SnapshotRootKey, snapshotJournalKey, snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, uncleanShutdownKey, badBlockKey, transitionStatusKey, skeletonSyncStatusKey, - persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, + persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, snapSyncStatusFlagKey, } { if bytes.Equal(key, meta) { metadata.Add(size) diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 7269fe5d56..8e82459e82 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -91,6 +91,9 @@ var ( // transitionStatusKey tracks the eth2 transition status. transitionStatusKey = []byte("eth2-transition") + // snapSyncStatusFlagKey flags that status of snap sync. + snapSyncStatusFlagKey = []byte("SnapSyncStatus") + // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 042ff3be20..36916c3f0b 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -355,7 +355,13 @@ func (p *BlobPool) Init(gasTip *big.Int, head *types.Header, reserve txpool.Addr return err } } + // Initialize the state with head block, or fallback to empty one in + // case the head state is not available(might occur when node is not + // fully synced). state, err := p.chain.StateAt(head.Root) + if err != nil { + state, err = p.chain.StateAt(types.EmptyRootHash) + } if err != nil { return err } diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index f1b960510a..2430028f9d 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -298,7 +298,20 @@ func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool // Set the basic pool parameters pool.gasTip.Store(gasTip) - pool.reset(nil, head) + + // Initialize the state with head block, or fallback to empty one in + // case the head state is not available(might occur when node is not + // fully synced). + statedb, err := pool.chain.StateAt(head.Root) + if err != nil { + statedb, err = pool.chain.StateAt(types.EmptyRootHash) + } + if err != nil { + return err + } + pool.currentHead.Store(head) + pool.currentState = statedb + pool.pendingNonces = newNoncer(statedb) // Start the reorg loop early, so it can handle requests generated during // journal loading. diff --git a/eth/api_backend.go b/eth/api_backend.go index dea745382e..a0c14f1338 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -204,7 +204,10 @@ func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.B return nil, nil, errors.New("header not found") } stateDb, err := b.eth.BlockChain().StateAt(header.Root) - return stateDb, header, err + if err != nil { + return nil, nil, err + } + return stateDb, header, nil } func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) { @@ -223,7 +226,10 @@ func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockN return nil, nil, errors.New("hash is not currently canonical") } stateDb, err := b.eth.BlockChain().StateAt(header.Root) - return stateDb, header, err + if err != nil { + return nil, nil, err + } + return stateDb, header, nil } return nil, nil, errors.New("invalid arguments; neither block nor hash specified") } diff --git a/eth/backend.go b/eth/backend.go index b99ae7655b..af03517792 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -474,7 +474,7 @@ func (s *Ethereum) Engine() consensus.Engine { return s.engine } func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb } func (s *Ethereum) IsListening() bool { return true } // Always listening func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader } -func (s *Ethereum) Synced() bool { return s.handler.acceptTxs.Load() } +func (s *Ethereum) Synced() bool { return s.handler.synced.Load() } func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() } func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning } func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 732e79f8ba..7fed48bdb2 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -403,7 +403,9 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int, // subsequent state reads, explicitly disable the trie database and state // syncer is responsible to address and correct any state missing. if d.blockchain.TrieDB().Scheme() == rawdb.PathScheme { - d.blockchain.TrieDB().Reset(types.EmptyRootHash) + if err := d.blockchain.TrieDB().Disable(); err != nil { + return err + } } // Snap sync uses the snapshot namespace to store potentially flaky data until // sync completely heals and finishes. Pause snapshot maintenance in the mean- diff --git a/eth/handler.go b/eth/handler.go index a629ec5ee9..59040442e4 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -100,8 +100,8 @@ type handler struct { networkID uint64 forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node - snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks) - acceptTxs atomic.Bool // Flag whether we're considered synchronised (enables transaction processing) + snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks) + synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing) database ethdb.Database txpool txPool @@ -163,32 +163,24 @@ func newHandler(config *handlerConfig) (*handler, error) { fullBlock, snapBlock := h.chain.CurrentBlock(), h.chain.CurrentSnapBlock() if fullBlock.Number.Uint64() == 0 && snapBlock.Number.Uint64() > 0 { h.snapSync.Store(true) - log.Warn("Switch sync mode from full sync to snap sync") + log.Warn("Switch sync mode from full sync to snap sync", "reason", "snap sync incomplete") + } else if !h.chain.HasState(fullBlock.Root) { + h.snapSync.Store(true) + log.Warn("Switch sync mode from full sync to snap sync", "reason", "head state missing") } } else { - if h.chain.CurrentBlock().Number.Uint64() > 0 { + head := h.chain.CurrentBlock() + if head.Number.Uint64() > 0 && h.chain.HasState(head.Root) { // Print warning log if database is not empty to run snap sync. - log.Warn("Switch sync mode from snap sync to full sync") + log.Warn("Switch sync mode from snap sync to full sync", "reason", "snap sync complete") } else { // If snap sync was requested and our database is empty, grant it h.snapSync.Store(true) + log.Info("Enabled snap sync", "head", head.Number, "hash", head.Hash()) } } - // If sync succeeds, pass a callback to potentially disable snap sync mode - // and enable transaction propagation. - success := func() { - // If we were running snap sync and it finished, disable doing another - // round on next sync cycle - if h.snapSync.Load() { - log.Info("Snap sync complete, auto disabling") - h.snapSync.Store(false) - } - // If we've successfully finished a sync cycle, accept transactions from - // the network - h.enableSyncedFeatures() - } // Construct the downloader (long sync) - h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, success) + h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, h.enableSyncedFeatures) if ttd := h.chain.Config().TerminalTotalDifficulty; ttd != nil { if h.chain.Config().TerminalTotalDifficultyPassed { log.Info("Chain post-merge, sync via beacon client") @@ -245,8 +237,8 @@ func newHandler(config *handlerConfig) (*handler, error) { // accept each others' blocks until a restart. Unfortunately we haven't figured // out a way yet where nodes can decide unilaterally whether the network is new // or not. This should be fixed if we figure out a solution. - if h.snapSync.Load() { - log.Warn("Snap syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) + if !h.synced.Load() { + log.Warn("Syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) return 0, nil } if h.merger.TDDReached() { @@ -272,11 +264,7 @@ func newHandler(config *handlerConfig) (*handler, error) { } return 0, nil } - n, err := h.chain.InsertChain(blocks) - if err == nil { - h.enableSyncedFeatures() // Mark initial sync done on any fetcher import - } - return n, err + return h.chain.InsertChain(blocks) } h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer) @@ -680,7 +668,15 @@ func (h *handler) txBroadcastLoop() { // enableSyncedFeatures enables the post-sync functionalities when the initial // sync is finished. func (h *handler) enableSyncedFeatures() { - h.acceptTxs.Store(true) + // Mark the local node as synced. + h.synced.Store(true) + + // If we were running snap sync and it finished, disable doing another + // round on next sync cycle + if h.snapSync.Load() { + log.Info("Snap sync complete, auto disabling") + h.snapSync.Store(false) + } if h.chain.TrieDB().Scheme() == rawdb.PathScheme { h.chain.TrieDB().SetBufferSize(pathdb.DefaultBufferSize) } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 3a5e6608bb..2aba16f928 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -51,7 +51,7 @@ func (h *ethHandler) PeerInfo(id enode.ID) interface{} { // AcceptTxs retrieves whether transaction processing is enabled on the node // or if inbound transactions should simply be dropped. func (h *ethHandler) AcceptTxs() bool { - return h.acceptTxs.Load() + return h.synced.Load() } // Handle is invoked from a peer's message handler when it receives a new remote diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 41619fe300..a16abc5ed6 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -248,7 +248,7 @@ func testRecvTransactions(t *testing.T, protocol uint) { handler := newTestHandler() defer handler.close() - handler.handler.acceptTxs.Store(true) // mark synced to accept transactions + handler.handler.synced.Store(true) // mark synced to accept transactions txs := make(chan core.NewTxsEvent) sub := handler.txpool.SubscribeNewTxsEvent(txs) @@ -401,7 +401,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) { sinks[i] = newTestHandler() defer sinks[i].close() - sinks[i].handler.acceptTxs.Store(true) // mark synced to accept transactions + sinks[i].handler.synced.Store(true) // mark synced to accept transactions } // Interconnect all the sink handlers with the source handler for i, sink := range sinks { diff --git a/eth/sync.go b/eth/sync.go index ba7a7427a5..c7ba7c93d6 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -197,16 +197,25 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) { return downloader.SnapSync, td } // We are probably in full sync, but we might have rewound to before the - // snap sync pivot, check if we should reenable + // snap sync pivot, check if we should re-enable snap sync. + head := cs.handler.chain.CurrentBlock() if pivot := rawdb.ReadLastPivotNumber(cs.handler.database); pivot != nil { - if head := cs.handler.chain.CurrentBlock(); head.Number.Uint64() < *pivot { + if head.Number.Uint64() < *pivot { block := cs.handler.chain.CurrentSnapBlock() td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) return downloader.SnapSync, td } } + // We are in a full sync, but the associated head state is missing. To complete + // the head state, forcefully rerun the snap sync. Note it doesn't mean the + // persistent state is corrupted, just mismatch with the head block. + if !cs.handler.chain.HasState(head.Root) { + block := cs.handler.chain.CurrentSnapBlock() + td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) + log.Info("Reenabled snap sync as chain is stateless") + return downloader.SnapSync, td + } // Nope, we're really full syncing - head := cs.handler.chain.CurrentBlock() td := cs.handler.chain.GetTd(head.Hash(), head.Number.Uint64()) return downloader.FullSync, td } @@ -242,13 +251,7 @@ func (h *handler) doSync(op *chainSyncOp) error { if err != nil { return err } - if h.snapSync.Load() { - log.Info("Snap sync complete, auto disabling") - h.snapSync.Store(false) - } - // If we've successfully finished a sync cycle, enable accepting transactions - // from the network. - h.acceptTxs.Store(true) + h.enableSyncedFeatures() head := h.chain.CurrentBlock() if head.Number.Uint64() > 0 { diff --git a/miner/miner_test.go b/miner/miner_test.go index 489bc46a91..36d5166c6d 100644 --- a/miner/miner_test.go +++ b/miner/miner_test.go @@ -64,6 +64,7 @@ func (m *mockBackend) StateAtBlock(block *types.Block, reexec uint64, base *stat } type testBlockChain struct { + root common.Hash config *params.ChainConfig statedb *state.StateDB gasLimit uint64 @@ -89,6 +90,10 @@ func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) { return bc.statedb, nil } +func (bc *testBlockChain) HasState(root common.Hash) bool { + return bc.root == root +} + func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { return bc.chainHeadFeed.Subscribe(ch) } @@ -302,7 +307,7 @@ func createMiner(t *testing.T) (*Miner, *event.TypeMux, func(skipMiner bool)) { t.Fatalf("can't create new chain %v", err) } statedb, _ := state.New(bc.Genesis().Root(), bc.StateCache(), nil) - blockchain := &testBlockChain{chainConfig, statedb, 10000000, new(event.Feed)} + blockchain := &testBlockChain{bc.Genesis().Root(), chainConfig, statedb, 10000000, new(event.Feed)} pool := legacypool.New(testTxPoolConfig, blockchain) txpool, _ := txpool.New(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain, []txpool.SubPool{pool}) diff --git a/trie/database.go b/trie/database.go index 535ad87d72..1e59f0908f 100644 --- a/trie/database.go +++ b/trie/database.go @@ -273,15 +273,27 @@ func (db *Database) Recoverable(root common.Hash) (bool, error) { return pdb.Recoverable(root), nil } -// Reset wipes all available journal from the persistent database and discard -// all caches and diff layers. Using the given root to create a new disk layer. +// Disable deactivates the database and invalidates all available state layers +// as stale to prevent access to the persistent state, which is in the syncing +// stage. +// // It's only supported by path-based database and will return an error for others. -func (db *Database) Reset(root common.Hash) error { +func (db *Database) Disable() error { + pdb, ok := db.backend.(*pathdb.Database) + if !ok { + return errors.New("not supported") + } + return pdb.Disable() +} + +// Enable activates database and resets the state tree with the provided persistent +// state root once the state sync is finished. +func (db *Database) Enable(root common.Hash) error { pdb, ok := db.backend.(*pathdb.Database) if !ok { return errors.New("not supported") } - return pdb.Reset(root) + return pdb.Enable(root) } // Journal commits an entire diff hierarchy to disk into a single journal entry. diff --git a/trie/triedb/pathdb/database.go b/trie/triedb/pathdb/database.go index 18cc36ffc3..dc64414e9b 100644 --- a/trie/triedb/pathdb/database.go +++ b/trie/triedb/pathdb/database.go @@ -128,7 +128,8 @@ type Database struct { // readOnly is the flag whether the mutation is allowed to be applied. // It will be set automatically when the database is journaled during // the shutdown to reject all following unexpected mutations. - readOnly bool // Indicator if database is opened in read only mode + readOnly bool // Flag if database is opened in read only mode + waitSync bool // Flag if database is deactivated due to initial state sync bufferSize int // Memory allowance (in bytes) for caching dirty nodes config *Config // Configuration for database diskdb ethdb.Database // Persistent storage for matured trie nodes @@ -179,6 +180,12 @@ func New(diskdb ethdb.Database, config *Config) *Database { log.Warn("Truncated extra state histories", "number", pruned) } } + // Disable database in case node is still in the initial state sync stage. + if rawdb.ReadSnapSyncStatusFlag(diskdb) == rawdb.StateSyncRunning && !db.readOnly { + if err := db.Disable(); err != nil { + log.Crit("Failed to disable database", "err", err) // impossible to happen + } + } log.Warn("Path-based state scheme is an experimental feature") return db } @@ -204,9 +211,9 @@ func (db *Database) Update(root common.Hash, parentRoot common.Hash, block uint6 db.lock.Lock() defer db.lock.Unlock() - // Short circuit if the database is in read only mode. - if db.readOnly { - return errSnapshotReadOnly + // Short circuit if the mutation is not allowed. + if err := db.modifyAllowed(); err != nil { + return err } if err := db.tree.add(root, parentRoot, block, nodes, states); err != nil { return err @@ -227,45 +234,59 @@ func (db *Database) Commit(root common.Hash, report bool) error { db.lock.Lock() defer db.lock.Unlock() - // Short circuit if the database is in read only mode. - if db.readOnly { - return errSnapshotReadOnly + // Short circuit if the mutation is not allowed. + if err := db.modifyAllowed(); err != nil { + return err } return db.tree.cap(root, 0) } -// Reset rebuilds the database with the specified state as the base. -// -// - if target state is empty, clear the stored state and all layers on top -// - if target state is non-empty, ensure the stored state matches with it -// and clear all other layers on top. -func (db *Database) Reset(root common.Hash) error { +// Disable deactivates the database and invalidates all available state layers +// as stale to prevent access to the persistent state, which is in the syncing +// stage. +func (db *Database) Disable() error { db.lock.Lock() defer db.lock.Unlock() // Short circuit if the database is in read only mode. if db.readOnly { - return errSnapshotReadOnly + return errDatabaseReadOnly } - batch := db.diskdb.NewBatch() - root = types.TrieRootHash(root) - if root == types.EmptyRootHash { - // Empty state is requested as the target, nuke out - // the root node and leave all others as dangling. - rawdb.DeleteAccountTrieNode(batch, nil) - } else { - // Ensure the requested state is existent before any - // action is applied. - _, hash := rawdb.ReadAccountTrieNode(db.diskdb, nil) - if hash != root { - return fmt.Errorf("state is mismatched, local: %x, target: %x", hash, root) - } + // Prevent duplicated disable operation. + if db.waitSync { + log.Error("Reject duplicated disable operation") + return nil } - // Mark the disk layer as stale before applying any mutation. + db.waitSync = true + + // Mark the disk layer as stale to prevent access to persistent state. db.tree.bottom().markStale() + // Write the initial sync flag to persist it across restarts. + rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncRunning) + log.Info("Disabled trie database due to state sync") + return nil +} + +// Enable activates database and resets the state tree with the provided persistent +// state root once the state sync is finished. +func (db *Database) Enable(root common.Hash) error { + db.lock.Lock() + defer db.lock.Unlock() + + // Short circuit if the database is in read only mode. + if db.readOnly { + return errDatabaseReadOnly + } + // Ensure the provided state root matches the stored one. + root = types.TrieRootHash(root) + _, stored := rawdb.ReadAccountTrieNode(db.diskdb, nil) + if stored != root { + return fmt.Errorf("state root mismatch: stored %x, synced %x", stored, root) + } // Drop the stale state journal in persistent database and // reset the persistent state id back to zero. + batch := db.diskdb.NewBatch() rawdb.DeleteTrieJournal(batch) rawdb.WritePersistentStateID(batch, 0) if err := batch.Write(); err != nil { @@ -282,8 +303,11 @@ func (db *Database) Reset(root common.Hash) error { } // Re-construct a new disk layer backed by persistent state // with **empty clean cache and node buffer**. - dl := newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0)) - db.tree.reset(dl) + db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0))) + + // Re-enable the database as the final step. + db.waitSync = false + rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncFinished) log.Info("Rebuilt trie database", "root", root) return nil } @@ -296,7 +320,10 @@ func (db *Database) Recover(root common.Hash, loader triestate.TrieLoader) error defer db.lock.Unlock() // Short circuit if rollback operation is not supported. - if db.readOnly || db.freezer == nil { + if err := db.modifyAllowed(); err != nil { + return err + } + if db.freezer == nil { return errors.New("state rollback is non-supported") } // Short circuit if the target state is not recoverable. @@ -424,3 +451,15 @@ func (db *Database) SetBufferSize(size int) error { func (db *Database) Scheme() string { return rawdb.PathScheme } + +// modifyAllowed returns the indicator if mutation is allowed. This function +// assumes the db.lock is already held. +func (db *Database) modifyAllowed() error { + if db.readOnly { + return errDatabaseReadOnly + } + if db.waitSync { + return errDatabaseWaitSync + } + return nil +} diff --git a/trie/triedb/pathdb/database_test.go b/trie/triedb/pathdb/database_test.go index 6d346d20ea..912364f7f4 100644 --- a/trie/triedb/pathdb/database_test.go +++ b/trie/triedb/pathdb/database_test.go @@ -439,38 +439,39 @@ func TestDatabaseRecoverable(t *testing.T) { } } -func TestReset(t *testing.T) { - var ( - tester = newTester(t) - index = tester.bottomIndex() - ) +func TestDisable(t *testing.T) { + tester := newTester(t) defer tester.release() - // Reset database to unknown target, should reject it - if err := tester.db.Reset(testutil.RandomHash()); err == nil { - t.Fatal("Failed to reject invalid reset") + _, stored := rawdb.ReadAccountTrieNode(tester.db.diskdb, nil) + if err := tester.db.Disable(); err != nil { + t.Fatal("Failed to deactivate database") } - // Reset database to state persisted in the disk - if err := tester.db.Reset(types.EmptyRootHash); err != nil { - t.Fatalf("Failed to reset database %v", err) + if err := tester.db.Enable(types.EmptyRootHash); err == nil { + t.Fatalf("Invalid activation should be rejected") } + if err := tester.db.Enable(stored); err != nil { + t.Fatal("Failed to activate database") + } + // Ensure journal is deleted from disk if blob := rawdb.ReadTrieJournal(tester.db.diskdb); len(blob) != 0 { t.Fatal("Failed to clean journal") } // Ensure all trie histories are removed - for i := 0; i <= index; i++ { - _, err := readHistory(tester.db.freezer, uint64(i+1)) - if err == nil { - t.Fatalf("Failed to clean state history, index %d", i+1) - } + n, err := tester.db.freezer.Ancients() + if err != nil { + t.Fatal("Failed to clean state history") + } + if n != 0 { + t.Fatal("Failed to clean state history") } // Verify layer tree structure, single disk layer is expected if tester.db.tree.len() != 1 { t.Fatalf("Extra layer kept %d", tester.db.tree.len()) } - if tester.db.tree.bottom().rootHash() != types.EmptyRootHash { - t.Fatalf("Root hash is not matched exp %x got %x", types.EmptyRootHash, tester.db.tree.bottom().rootHash()) + if tester.db.tree.bottom().rootHash() != stored { + t.Fatalf("Root hash is not matched exp %x got %x", stored, tester.db.tree.bottom().rootHash()) } } diff --git a/trie/triedb/pathdb/errors.go b/trie/triedb/pathdb/errors.go index f6ac0ec4a0..78ee4459fe 100644 --- a/trie/triedb/pathdb/errors.go +++ b/trie/triedb/pathdb/errors.go @@ -25,9 +25,13 @@ import ( ) var ( - // errSnapshotReadOnly is returned if the database is opened in read only mode - // and mutation is requested. - errSnapshotReadOnly = errors.New("read only") + // errDatabaseReadOnly is returned if the database is opened in read only mode + // to prevent any mutation. + errDatabaseReadOnly = errors.New("read only") + + // errDatabaseWaitSync is returned if the initial state sync is not completed + // yet and database is disabled to prevent accessing state. + errDatabaseWaitSync = errors.New("waiting for sync") // errSnapshotStale is returned from data accessors if the underlying layer // layer had been invalidated due to the chain progressing forward far enough diff --git a/trie/triedb/pathdb/journal.go b/trie/triedb/pathdb/journal.go index ea90207f29..ac770763e3 100644 --- a/trie/triedb/pathdb/journal.go +++ b/trie/triedb/pathdb/journal.go @@ -356,7 +356,7 @@ func (db *Database) Journal(root common.Hash) error { // Short circuit if the database is in read only mode. if db.readOnly { - return errSnapshotReadOnly + return errDatabaseReadOnly } // Firstly write out the metadata of journal journal := new(bytes.Buffer)