Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions core/state/snapshot/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,17 @@ type diskLayer struct {
root common.Hash // Root hash of the base snapshot
stale bool // Signals that the layer became stale (state progressed)

genMarker []byte // Marker for the state that's indexed during initial layer generation
genPending chan struct{} // Notification channel when generation is done (test synchronicity)
genAbort chan chan *generatorStats // Notification channel to abort generating the snapshot in this layer
genMarker []byte // Marker for the state that's indexed during initial layer generation
genPending chan struct{} // Notification channel when generation is done (test synchronicity)

// Generator lifecycle management:
// - [cancel] is closed to request termination (broadcast).
// - [done] is closed by the generator goroutine on exit.
cancel chan struct{}
done chan struct{}
cancelOnce sync.Once

genStats *generatorStats // Stats for snapshot generation (generation aborted/finished if non-nil)

lock sync.RWMutex
}
Expand All @@ -49,6 +57,10 @@ type diskLayer struct {
// Reset() in order to not leak memory.
// OBS: It does not invoke Close on the diskdb
func (dl *diskLayer) Release() error {
// Stop any ongoing snapshot generation to prevent it from accessing
// the database after it's closed during shutdown
dl.stopGeneration()

if dl.cache != nil {
dl.cache.Reset()
}
Expand Down Expand Up @@ -184,17 +196,27 @@ func (dl *diskLayer) Update(blockHash common.Hash, accounts map[common.Hash][]by
return newDiffLayer(dl, blockHash, accounts, storage)
}

// stopGeneration aborts the state snapshot generation if it is currently running.
// stopGeneration requests cancellation of any running snapshot generation and
// blocks until the generator goroutine (if running) has fully terminated.
//
// Concurrency guarantees:
// - Thread-safe: May be called concurrently from multiple goroutines
// - Idempotent: Safe to call multiple times; subsequent calls have no effect
// - Blocking: Returns only after the generator goroutine (if any) has exited
// - Safe to call at any time, including when no generation is running
//
// After return, it is **guaranteed** that:
// - The generator goroutine has terminated
// - It is safe to proceed with cleanup operations (e.g. closing databases)
func (dl *diskLayer) stopGeneration() {
dl.lock.RLock()
generating := dl.genMarker != nil
dl.lock.RUnlock()
if !generating {
cancel := dl.cancel
done := dl.done
if cancel == nil || done == nil {
return
}
if dl.genAbort != nil {
abort := make(chan *generatorStats)
dl.genAbort <- abort
<-abort
}

dl.cancelOnce.Do(func() {
close(cancel)
})
<-done
}
62 changes: 23 additions & 39 deletions core/state/snapshot/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ var (
// errMissingTrie is returned if the target trie is missing while the generation
// is running. In this case the generation is aborted and wait the new signal.
errMissingTrie = errors.New("missing trie")

// errAborted is returned when snapshot generation was interrupted/aborted
errAborted = errors.New("aborted")
)

// generateSnapshot regenerates a brand new snapshot based on an existing state
Expand All @@ -74,7 +77,8 @@ func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *triedb.Database, cache
cache: fastcache.New(cache * 1024 * 1024),
genMarker: genMarker,
genPending: make(chan struct{}),
genAbort: make(chan chan *generatorStats),
cancel: make(chan struct{}),
done: make(chan struct{}),
}
go base.generate(stats)
log.Debug("Start snapshot generation", "root", root)
Expand Down Expand Up @@ -467,12 +471,14 @@ func (dl *diskLayer) generateRange(ctx *generatorContext, trieId *trie.ID, prefi
// checkAndFlush checks if an interruption signal is received or the
// batch size has exceeded the allowance.
func (dl *diskLayer) checkAndFlush(ctx *generatorContext, current []byte) error {
var abort chan *generatorStats
aborting := false
select {
case abort = <-dl.genAbort:
case <-dl.cancel:
aborting = true
default:
}
if ctx.batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {

if ctx.batch.ValueSize() > ethdb.IdealBatchSize || aborting {
if bytes.Compare(current, dl.genMarker) < 0 {
log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", dl.genMarker))
}
Expand All @@ -490,9 +496,9 @@ func (dl *diskLayer) checkAndFlush(ctx *generatorContext, current []byte) error
dl.genMarker = current
dl.lock.Unlock()

if abort != nil {
if aborting {
ctx.stats.Log("Aborting state snapshot generation", dl.root, current)
return newAbortErr(abort) // bubble up an error for interruption
return errAborted
}
// Don't hold the iterators too long, release them to let compactor works
ctx.reopenIterator(snapAccount)
Expand Down Expand Up @@ -648,10 +654,11 @@ func generateAccounts(ctx *generatorContext, dl *diskLayer, accMarker []byte) er
// gathering and logging, since the method surfs the blocks as they arrive, often
// being restarted.
func (dl *diskLayer) generate(stats *generatorStats) {
var (
accMarker []byte
abort chan *generatorStats
)
if dl.done != nil {
defer close(dl.done)
}

var accMarker []byte
if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
accMarker = dl.genMarker[:common.HashLength]
}
Expand All @@ -669,15 +676,11 @@ func (dl *diskLayer) generate(stats *generatorStats) {
defer ctx.close()

if err := generateAccounts(ctx, dl, accMarker); err != nil {
// Extract the received interruption signal if exists
if aerr, ok := err.(*abortErr); ok {
abort = aerr.abort
// Check if error was due to abort
if err == errAborted {
stats.Log("Aborting state snapshot generation", dl.root, dl.genMarker)
}
// Aborted by internal error, wait the signal
if abort == nil {
abort = <-dl.genAbort
}
abort <- stats
dl.genStats = stats
return
}
// Snapshot fully generated, set the marker to nil.
Expand All @@ -686,9 +689,7 @@ func (dl *diskLayer) generate(stats *generatorStats) {
journalProgress(ctx.batch, nil, stats)
if err := ctx.batch.Write(); err != nil {
log.Error("Failed to flush batch", "err", err)

abort = <-dl.genAbort
abort <- stats
dl.genStats = stats
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this being read from? Kinda suspicious, looks like it might need to be in a lock

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this doesn't need a lock -- it's only read after stopGeneration() has terminated -- snapshot.go:520-631

return
}
ctx.batch.Reset()
Expand All @@ -698,12 +699,9 @@ func (dl *diskLayer) generate(stats *generatorStats) {

dl.lock.Lock()
dl.genMarker = nil
dl.genStats = stats
close(dl.genPending)
dl.lock.Unlock()

// Someone will be looking for us, wait it out
abort = <-dl.genAbort
abort <- nil
}

// increaseKey increase the input key by one bit. Return nil if the entire
Expand All @@ -717,17 +715,3 @@ func increaseKey(key []byte) []byte {
}
return nil
}

// abortErr wraps an interruption signal received to represent the
// generation is aborted by external processes.
type abortErr struct {
abort chan *generatorStats
}

func newAbortErr(abort chan *generatorStats) error {
return &abortErr{abort: abort}
}

func (err *abortErr) Error() string {
return "aborted"
}
Loading