Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,8 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
headBlockGauge.Update(int64(block.NumberU64()))
bc.chainmu.Unlock()

// Destroy any existing state snapshot and regenerate it in the background
// Destroy any existing state snapshot and regenerate it in the background,
// also resuming the normal maintenance of any previously paused snapshot.
if bc.snaps != nil {
bc.snaps.Rebuild(block.Root())
}
Expand Down
20 changes: 20 additions & 0 deletions core/rawdb/accessors_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,26 @@ import (
"github.com/ethereum/go-ethereum/log"
)

// ReadSnapshotDisabled retrieves if the snapshot maintenance is disabled.
func ReadSnapshotDisabled(db ethdb.KeyValueReader) bool {
disabled, _ := db.Has(snapshotDisabledKey)
return disabled
}

// WriteSnapshotDisabled stores the snapshot pause flag.
func WriteSnapshotDisabled(db ethdb.KeyValueWriter) {
if err := db.Put(snapshotDisabledKey, []byte("42")); err != nil {
log.Crit("Failed to store snapshot disabled flag", "err", err)
}
}

// DeleteSnapshotDisabled deletes the flag keeping the snapshot maintenance disabled.
func DeleteSnapshotDisabled(db ethdb.KeyValueWriter) {
if err := db.Delete(snapshotDisabledKey); err != nil {
log.Crit("Failed to remove snapshot disabled flag", "err", err)
}
}

// ReadSnapshotRoot retrieves the root of the block whose state is contained in
// the persisted snapshot.
func ReadSnapshotRoot(db ethdb.KeyValueReader) common.Hash {
Expand Down
6 changes: 3 additions & 3 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,9 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
var accounted bool
for _, meta := range [][]byte{
databaseVersionKey, headHeaderKey, headBlockKey, headFastBlockKey, lastPivotKey,
fastTrieProgressKey, snapshotRootKey, snapshotJournalKey, snapshotGeneratorKey,
snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, uncleanShutdownKey,
badBlockKey,
fastTrieProgressKey, snapshotDisabledKey, snapshotRootKey, snapshotJournalKey,
snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey,
uncleanShutdownKey, badBlockKey,
} {
if bytes.Equal(key, meta) {
metadata.Add(size)
Expand Down
3 changes: 3 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ var (
// fastTrieProgressKey tracks the number of trie entries imported during fast sync.
fastTrieProgressKey = []byte("TrieSync")

// snapshotDisabledKey flags that the snapshot should not be maintained due to initial sync.
snapshotDisabledKey = []byte("SnapshotDisabled")
Comment thread
karalabe marked this conversation as resolved.
Outdated

// snapshotRootKey tracks the hash of the last snapshot.
snapshotRootKey = []byte("SnapshotRoot")

Expand Down
10 changes: 0 additions & 10 deletions core/state/snapshot/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,6 @@ func (gs *generatorStats) Log(msg string, root common.Hash, marker []byte) {
log.Info(msg, ctx...)
}

// ClearSnapshotMarker sets the snapshot marker to zero, meaning that snapshots
// are not usable.
func ClearSnapshotMarker(diskdb ethdb.KeyValueStore) {
batch := diskdb.NewBatch()
journalProgress(batch, []byte{}, nil)
if err := batch.Write(); err != nil {
log.Crit("Failed to write initialized state marker", "err", err)
}
}

// generateSnapshot regenerates a brand new snapshot based on an existing state
// database and head block asynchronously. The snapshot is returned immediately
// and generation is continued in the background until done.
Expand Down
15 changes: 10 additions & 5 deletions core/state/snapshot/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,17 @@ func loadAndParseJournal(db ethdb.KeyValueStore, base *diskLayer) (snapshot, jou
}

// loadSnapshot loads a pre-existing state snapshot backed by a key-value store.
func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, recovery bool) (snapshot, error) {
func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, recovery bool) (snapshot, bool, error) {
// If snapshotting is disabled (initial sync in progress), don't do anything,
// wait for the chain to permit us to do something meaningful
if rawdb.ReadSnapshotDisabled(diskdb) {
return nil, true, nil
}
// Retrieve the block number and hash of the snapshot, failing if no snapshot
// is present in the database (or crashed mid-update).
baseRoot := rawdb.ReadSnapshotRoot(diskdb)
if baseRoot == (common.Hash{}) {
return nil, errors.New("missing or corrupted snapshot")
return nil, false, errors.New("missing or corrupted snapshot")
}
base := &diskLayer{
diskdb: diskdb,
Expand All @@ -142,7 +147,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int,
snapshot, generator, err := loadAndParseJournal(diskdb, base)
if err != nil {
log.Warn("Failed to load new-format journal", "error", err)
return nil, err
return nil, false, err
}
// Entire snapshot journal loaded, sanity check the head. If the loaded
// snapshot is not matched with current state root, print a warning log
Expand All @@ -157,7 +162,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int,
// it's not in recovery mode, returns the error here for
// rebuilding the entire snapshot forcibly.
if !recovery {
return nil, fmt.Errorf("head doesn't match snapshot: have %#x, want %#x", head, root)
return nil, false, fmt.Errorf("head doesn't match snapshot: have %#x, want %#x", head, root)
}
// It's in snapshot recovery, the assumption is held that
// the disk layer is always higher than chain head. It can
Expand Down Expand Up @@ -187,7 +192,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int,
storage: common.StorageSize(generator.Storage),
})
}
return snapshot, nil
return snapshot, false, nil
}

// loadDiffLayer reads the next sections of a snapshot journal, reconstructing a new
Expand Down
68 changes: 61 additions & 7 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ type snapshot interface {
StorageIterator(account common.Hash, seek common.Hash) (StorageIterator, bool)
}

// SnapshotTree is an Ethereum state snapshot tree. It consists of one persistent
// base layer backed by a key-value store, on top of which arbitrarily many in-
// memory diff layers are topped. The memory diffs can form a tree with branching,
// but the disk layer is singleton and common to all. If a reorg goes deeper than
// the disk layer, everything needs to be deleted.
// Tree is an Ethereum state snapshot tree. It consists of one persistent base
// layer backed by a key-value store, on top of which arbitrarily many in-memory
// diff layers are topped. The memory diffs can form a tree with branching, but
// the disk layer is singleton and common to all. If a reorg goes deeper than the
// disk layer, everything needs to be deleted.
//
// The goal of a state snapshot is twofold: to allow direct access to account and
// storage data to avoid expensive multi-level trie lookups; and to allow sorted,
Expand Down Expand Up @@ -186,7 +186,11 @@ func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root comm
defer snap.waitBuild()
}
// Attempt to load a previously persisted snapshot and rebuild one if failed
head, err := loadSnapshot(diskdb, triedb, cache, root, recovery)
head, disabled, err := loadSnapshot(diskdb, triedb, cache, root, recovery)
if disabled {
log.Warn("Snapshot maintenance disabled (syncing)")
return snap, nil
Comment thread
karalabe marked this conversation as resolved.
Outdated
}
if err != nil {
if rebuild {
log.Warn("Failed to load snapshot, regenerating", "err", err)
Expand Down Expand Up @@ -224,6 +228,55 @@ func (t *Tree) waitBuild() {
}
}

// Disable interrupts any pending snapshot generator, deletes all the snapshot
// layers in memory and marks snapshots disabled globally. In order to resume
// the snapshot functionality, the caller must invoke Rebuild.
func (t *Tree) Disable() {
// Interrupt any live snapshot layers
t.lock.Lock()
defer t.lock.Unlock()

for _, layer := range t.layers {
switch layer := layer.(type) {
case *diskLayer:
// If the base layer is generating, abort it
if layer.genAbort != nil {
abort := make(chan *generatorStats)
layer.genAbort <- abort
<-abort
}
// Layer should be inactive now, mark it as stale
layer.lock.Lock()
layer.stale = true
layer.lock.Unlock()

case *diffLayer:
// If the layer is a simple diff, simply mark as stale
layer.lock.Lock()
atomic.StoreUint32(&layer.stale, 1)
layer.lock.Unlock()

default:
panic(fmt.Sprintf("unknown layer type: %T", layer))
}
}
t.layers = map[common.Hash]snapshot{}

// Delete all snapshot liveness information from the database
batch := t.diskdb.NewBatch()

rawdb.WriteSnapshotDisabled(batch)
rawdb.DeleteSnapshotRoot(batch)
rawdb.DeleteSnapshotJournal(batch)
rawdb.DeleteSnapshotGenerator(batch)
rawdb.DeleteSnapshotRecoveryNumber(batch)
// Note, we don't delete the sync progress

if err := batch.Write(); err != nil {
log.Crit("Failed to disable snapshots", "err", err)
}
}

// Snapshot retrieves a snapshot belonging to the given block root, or nil if no
// snapshot is maintained for that block.
func (t *Tree) Snapshot(blockRoot common.Hash) Snapshot {
Expand Down Expand Up @@ -626,8 +679,9 @@ func (t *Tree) Rebuild(root common.Hash) {
defer t.lock.Unlock()

// Firstly delete any recovery flag in the database. Because now we are
// building a brand new snapshot.
// building a brand new snapshot. Also reenable the snapshot feature.
rawdb.DeleteSnapshotRecoveryNumber(t.diskdb)
rawdb.DeleteSnapshotDisabled(t.diskdb)

// Iterate over and mark all layers stale
for _, layer := range t.layers {
Expand Down
10 changes: 10 additions & 0 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
Expand Down Expand Up @@ -214,6 +215,9 @@ type BlockChain interface {

// InsertReceiptChain inserts a batch of receipts into the local chain.
InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error)

// Snapshots returns the blockchain snapshot tree to paused it during sync.
Snapshots() *snapshot.Tree
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
Expand Down Expand Up @@ -393,6 +397,12 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// but until snap becomes prevalent, we should support both. TODO(karalabe).
if mode == SnapSync {
if !d.snapSync {
// Snap sync uses the snapshot namespace to store potentially flakey data until
// sync completely heals and finishes. Pause snapshot maintenance in the mean
// time to prevent access.
if snapshots := d.blockchain.Snapshots(); snapshots != nil { // Only nil in tests
snapshots.Disable()
}
log.Warn("Enabling snapshot sync prototype")
d.snapSync = true
}
Expand Down
6 changes: 6 additions & 0 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/ethdb"
Expand Down Expand Up @@ -409,6 +410,11 @@ func (dl *downloadTester) dropPeer(id string) {
dl.downloader.UnregisterPeer(id)
}

// Snapshots implements the BlockChain interface for the downloader, but is a noop.
func (dl *downloadTester) Snapshots() *snapshot.Tree {
return nil
}

type downloadTesterPeer struct {
dl *downloadTester
id string
Expand Down
5 changes: 0 additions & 5 deletions eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,6 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
log.Debug("Snapshot sync already completed")
return nil
}
// If sync is still not finished, we need to ensure that any marker is wiped.
// Otherwise, it may happen that requests for e.g. genesis-data is delivered
// from the snapshot data, instead of from the trie
snapshot.ClearSnapshotMarker(s.db)

defer func() { // Persist any progress, independent of failure
for _, task := range s.tasks {
s.forwardAccountTask(task)
Expand Down