Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 144 additions & 8 deletions triedb/pathdb/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func (ctx *genctx) storageOriginSet(rawStorageKey bool, t *tester) map[common.Ad
type tester struct {
db *Database
roots []common.Hash
nodes []*trienode.MergedNodeSet
states []*StateSetWithOrigin
preimages map[common.Hash][]byte

// current state set
Expand All @@ -135,12 +137,38 @@ type tester struct {
snapNodes map[common.Hash]*trienode.MergedNodeSet
}

// testerConfig holds configuration parameters for running a test scenario.
type testerConfig struct {
stateHistory uint64
isVerkle bool
layers int
enableIndex bool
journalDir string
stateHistory uint64 // Number of historical states to retain
layers int // Number of state transitions to generate for
enableIndex bool // Enable state history indexing or not
journalDir string // Directory path for persisting journal files
isVerkle bool // Enables Verkle trie mode if true

writeBuffer *int // Optional, the size of memory allocated for write buffer
trieCache *int // Optional, the size of memory allocated for trie cache
stateCache *int // Optional, the size of memory allocated for state cache
}

func (c *testerConfig) trieCacheSize() int {
if c.trieCache != nil {
return *c.trieCache
}
return 256 * 1024
}

func (c *testerConfig) stateCacheSize() int {
if c.stateCache != nil {
return *c.stateCache
}
return 256 * 1024
}

func (c *testerConfig) writeBufferSize() int {
if c.writeBuffer != nil {
return *c.writeBuffer
}
return 256 * 1024
}

func newTester(t *testing.T, config *testerConfig) *tester {
Expand All @@ -149,9 +177,9 @@ func newTester(t *testing.T, config *testerConfig) *tester {
db = New(disk, &Config{
StateHistory: config.stateHistory,
EnableStateIndexing: config.enableIndex,
TrieCleanSize: 256 * 1024,
StateCleanSize: 256 * 1024,
WriteBufferSize: 256 * 1024,
TrieCleanSize: config.trieCacheSize(),
StateCleanSize: config.stateCacheSize(),
WriteBufferSize: config.writeBufferSize(),
NoAsyncFlush: true,
JournalDirectory: config.journalDir,
}, config.isVerkle)
Expand All @@ -177,6 +205,8 @@ func newTester(t *testing.T, config *testerConfig) *tester {
panic(fmt.Errorf("failed to update state changes, err: %w", err))
}
obj.roots = append(obj.roots, root)
obj.nodes = append(obj.nodes, nodes)
obj.states = append(obj.states, states)
}
return obj
}
Expand All @@ -200,6 +230,8 @@ func (t *tester) extend(layers int) {
panic(fmt.Errorf("failed to update state changes, err: %w", err))
}
t.roots = append(t.roots, root)
t.nodes = append(t.nodes, nodes)
t.states = append(t.states, states)
}
}

Expand Down Expand Up @@ -885,3 +917,107 @@ func copyStorages(set map[common.Hash]map[common.Hash][]byte) map[common.Hash]ma
}
return copied
}

func TestDatabaseIndexRecovery(t *testing.T) {
maxDiffLayers = 4
defer func() {
maxDiffLayers = 128
}()

//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
writeBuffer := 512 * 1024
config := &testerConfig{
layers: 64,
enableIndex: true,
writeBuffer: &writeBuffer,
}
env := newTester(t, config)
defer env.release()

// Ensure the buffer in disk layer is not empty
var (
bRoot = env.db.tree.bottom().rootHash()
dRoot = crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(env.db.diskdb, nil))
)
for dRoot == bRoot {
env.extend(1)

bRoot = env.db.tree.bottom().rootHash()
dRoot = crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(env.db.diskdb, nil))
}
waitIndexing(env.db)

var (
dIndex int
roots = env.roots
hr = newHistoryReader(env.db.diskdb, env.db.stateFreezer)
)
for i, root := range roots {
if root == dRoot {
dIndex = i
}
if root == bRoot {
break
}
if err := checkHistoricalState(env, root, uint64(i+1), hr); err != nil {
t.Fatal(err)
}
}

// Terminate the database and mutate the journal, it's for simulating
// the unclean shutdown
env.db.Journal(env.lastHash())
env.db.Close()

// Mutate the journal in disk, it should be regarded as invalid
blob := rawdb.ReadTrieJournal(env.db.diskdb)
blob[0] = 0xa
rawdb.WriteTrieJournal(env.db.diskdb, blob)

// Reload the database, the extra state histories should be removed
env.db = New(env.db.diskdb, env.db.config, false)

for i := range roots {
_, err := readStateHistory(env.db.stateFreezer, uint64(i+1))
if i <= dIndex && err != nil {
t.Fatalf("State history is not found, %d", i)
}
if i > dIndex && err == nil {
t.Fatalf("Unexpected state history found, %d", i)
}
}
remain, err := env.db.IndexProgress()
if err != nil {
t.Fatalf("Failed to obtain the progress, %v", err)
}
if remain == 0 {
t.Fatalf("Unexpected progress remain, %d", remain)
}

// Apply new states on top, ensuring state indexing can respond correctly
for i := dIndex + 1; i < len(roots); i++ {
if err := env.db.Update(roots[i], roots[i-1], uint64(i), env.nodes[i], env.states[i]); err != nil {
panic(fmt.Errorf("failed to update state changes, err: %w", err))
}
}
remain, err = env.db.IndexProgress()
if err != nil {
t.Fatalf("Failed to obtain the progress, %v", err)
}
if remain != 0 {
t.Fatalf("Unexpected progress remain, %d", remain)
}
waitIndexing(env.db)

// Ensure the truncated state histories become accessible
bRoot = env.db.tree.bottom().rootHash()
hr = newHistoryReader(env.db.diskdb, env.db.stateFreezer)
for i, root := range roots {
if root == bRoot {
break
}
if err := checkHistoricalState(env, root, uint64(i+1), hr); err != nil {
t.Fatal(err)
}
}
}
56 changes: 53 additions & 3 deletions triedb/pathdb/history_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,15 +322,22 @@ func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastID
closed: make(chan struct{}),
}
// Load indexing progress
var recover bool
initer.last.Store(lastID)
metadata := loadIndexMetadata(disk)
if metadata != nil {
initer.indexed.Store(metadata.Last)
recover = metadata.Last > lastID
}

// Launch background indexer
initer.wg.Add(1)
go initer.run(lastID)
if recover {
log.Info("History indexer is recovering", "history", lastID, "indexed", metadata.Last)
go initer.recover(lastID)
Comment thread
rjl493456442 marked this conversation as resolved.
} else {
go initer.run(lastID)
}
return initer
}

Expand Down Expand Up @@ -364,8 +371,8 @@ func (i *indexIniter) remain() uint64 {
default:
last, indexed := i.last.Load(), i.indexed.Load()
if last < indexed {
log.Error("Invalid state indexing range", "last", last, "indexed", indexed)
return 0
log.Warn("State indexer is in recovery", "indexed", indexed, "last", last)
return indexed - last
Comment thread
rjl493456442 marked this conversation as resolved.
}
return last - indexed
}
Expand Down Expand Up @@ -569,6 +576,49 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
log.Info("Indexed state history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start)))
}

// recover handles unclean shutdown recovery. After an unclean shutdown, any
// extra histories are typically truncated, while the corresponding history index
// entries may still have been written. Ideally, we would unindex these histories
// in reverse order, but there is no guarantee that the required histories will
// still be available.
//
// As a workaround, indexIniter waits until the missing histories are regenerated
// by chain recovery, under the assumption that the recovered histories will be
// identical to the lost ones. Fork-awareness should be added in the future to
// correctly handle histories affected by reorgs.
func (i *indexIniter) recover(lastID uint64) {
defer i.wg.Done()

for {
select {
case signal := <-i.interrupt:
newLastID := signal.newLastID
if newLastID != lastID+1 && newLastID != lastID-1 {
signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, newLastID)
continue
}

// Update the last indexed flag
lastID = newLastID
signal.result <- nil
i.last.Store(newLastID)
log.Debug("Updated history index flag", "last", lastID)

// Terminate the recovery routine once the histories are fully aligned
// with the index data, indicating that index initialization is complete.
metadata := loadIndexMetadata(i.disk)
if metadata != nil && metadata.Last == lastID {
close(i.done)
log.Info("History indexer is recovered", "last", lastID)
return
}

case <-i.closed:
return
}
}
}

// historyIndexer manages the indexing and unindexing of state histories,
// providing access to historical states.
//
Expand Down
14 changes: 12 additions & 2 deletions triedb/pathdb/history_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,13 @@ func testHistoryReader(t *testing.T, historyLimit uint64) {
maxDiffLayers = 128
}()

env := newTester(t, &testerConfig{stateHistory: historyLimit, layers: 64, enableIndex: true})
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
config := &testerConfig{
stateHistory: historyLimit,
layers: 64,
enableIndex: true,
}
env := newTester(t, config)
defer env.release()
waitIndexing(env.db)

Expand Down Expand Up @@ -183,7 +189,11 @@ func TestHistoricalStateReader(t *testing.T) {
}()

//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
config := &testerConfig{stateHistory: 0, layers: 64, enableIndex: true}
config := &testerConfig{
stateHistory: 0,
layers: 64,
enableIndex: true,
}
env := newTester(t, config)
defer env.release()
waitIndexing(env.db)
Expand Down
2 changes: 1 addition & 1 deletion triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (dl *diskLayer) journal(w io.Writer) error {
if err := dl.buffer.states.encode(w); err != nil {
return err
}
log.Debug("Journaled pathdb disk layer", "root", dl.root)
log.Debug("Journaled pathdb disk layer", "root", dl.root, "id", dl.id)
return nil
}

Expand Down