diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index ae9574963e0..546d2e0301f 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -189,7 +189,7 @@ func New(diskdb ethdb.Database, config *Config, isVerkle bool) *Database { } // TODO (rjl493456442) disable the background indexing in read-only mode if db.stateFreezer != nil && db.config.EnableStateIndexing { - db.stateIndexer = newHistoryIndexer(db.diskdb, db.stateFreezer, db.tree.bottom().stateID()) + db.stateIndexer = newHistoryIndexer(db.diskdb, db.stateFreezer, db.tree.bottom().stateID(), typeStateHistory) log.Info("Enabled state history indexing") } fields := config.fields() @@ -245,7 +245,7 @@ func (db *Database) repairHistory() error { } // Truncate the extra state histories above in freezer in case it's not // aligned with the disk layer. It might happen after a unclean shutdown. - pruned, err := truncateFromHead(db.stateFreezer, id) + pruned, err := truncateFromHead(db.stateFreezer, typeStateHistory, id) if err != nil { log.Crit("Failed to truncate extra state histories", "err", err) } @@ -448,7 +448,7 @@ func (db *Database) Enable(root common.Hash) error { // 2. Re-initialize the indexer so it starts indexing from the new state root. if db.stateIndexer != nil && db.stateFreezer != nil && db.config.EnableStateIndexing { db.stateIndexer.close() - db.stateIndexer = newHistoryIndexer(db.diskdb, db.stateFreezer, db.tree.bottom().stateID()) + db.stateIndexer = newHistoryIndexer(db.diskdb, db.stateFreezer, db.tree.bottom().stateID(), typeStateHistory) log.Info("Re-enabled state history indexing") } log.Info("Rebuilt trie database", "root", root) @@ -502,7 +502,7 @@ func (db *Database) Recover(root common.Hash) error { if err := db.diskdb.SyncKeyValue(); err != nil { return err } - _, err := truncateFromHead(db.stateFreezer, dl.stateID()) + _, err := truncateFromHead(db.stateFreezer, typeStateHistory, dl.stateID()) if err != nil { return err } diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index 2042e916110..76f3f5a46ea 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -378,7 +378,7 @@ func (dl *diskLayer) writeStateHistory(diff *diffLayer) (bool, error) { log.Debug("Skip tail truncation", "persistentID", persistentID, "tailID", tail+1, "headID", diff.stateID(), "limit", limit) return true, nil } - pruned, err := truncateFromTail(dl.db.stateFreezer, newFirst-1) + pruned, err := truncateFromTail(dl.db.stateFreezer, typeStateHistory, newFirst-1) if err != nil { return false, err } diff --git a/triedb/pathdb/history.go b/triedb/pathdb/history.go index bbedd52f344..ae022236fef 100644 --- a/triedb/pathdb/history.go +++ b/triedb/pathdb/history.go @@ -19,11 +19,147 @@ package pathdb import ( "errors" "fmt" + "iter" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" ) +// historyType represents the category of historical data. +type historyType uint8 + +const ( + // typeStateHistory indicates history data related to account or storage changes. + typeStateHistory historyType = 0 +) + +// String returns the string format representation. +func (h historyType) String() string { + switch h { + case typeStateHistory: + return "state" + default: + return fmt.Sprintf("unknown type: %d", h) + } +} + +// elementType represents the category of state element. +type elementType uint8 + +const ( + typeAccount elementType = 0 // represents the account data + typeStorage elementType = 1 // represents the storage slot data +) + +// String returns the string format representation. +func (e elementType) String() string { + switch e { + case typeAccount: + return "account" + case typeStorage: + return "storage" + default: + return fmt.Sprintf("unknown element type: %d", e) + } +} + +// toHistoryType maps an element type to its corresponding history type. +func toHistoryType(typ elementType) historyType { + if typ == typeAccount || typ == typeStorage { + return typeStateHistory + } + panic(fmt.Sprintf("unknown element type %v", typ)) +} + +// stateIdent represents the identifier of a state element, which can be +// an account or a storage slot. +type stateIdent struct { + typ elementType + + // The hash of the account address. This is used instead of the raw account + // address is to align the traversal order with the Merkle-Patricia-Trie. + addressHash common.Hash + + // The hash of the storage slot key. This is used instead of the raw slot key + // because, in legacy state histories (prior to the Cancun fork), the slot + // identifier is the hash of the key, and the original key (preimage) cannot + // be recovered. To maintain backward compatibility, the key hash is used. + // + // Meanwhile, using the storage key hash also preserve the traversal order + // with Merkle-Patricia-Trie. + // + // This field is null if the identifier refers to an account or a trie node. + storageHash common.Hash +} + +// String returns the string format state identifier. +func (ident stateIdent) String() string { + if ident.typ == typeAccount { + return ident.addressHash.Hex() + } + return ident.addressHash.Hex() + ident.storageHash.Hex() +} + +// newAccountIdent constructs a state identifier for an account. +func newAccountIdent(addressHash common.Hash) stateIdent { + return stateIdent{ + typ: typeAccount, + addressHash: addressHash, + } +} + +// newStorageIdent constructs a state identifier for a storage slot. +// The address denotes the address hash of the associated account; +// the storageHash denotes the hash of the raw storage slot key; +func newStorageIdent(addressHash common.Hash, storageHash common.Hash) stateIdent { + return stateIdent{ + typ: typeStorage, + addressHash: addressHash, + storageHash: storageHash, + } +} + +// stateIdentQuery is the extension of stateIdent by adding the account address +// and raw storage key. +type stateIdentQuery struct { + stateIdent + + address common.Address + storageKey common.Hash +} + +// newAccountIdentQuery constructs a state identifier for an account. +func newAccountIdentQuery(address common.Address, addressHash common.Hash) stateIdentQuery { + return stateIdentQuery{ + stateIdent: newAccountIdent(addressHash), + address: address, + } +} + +// newStorageIdentQuery constructs a state identifier for a storage slot. +// the address denotes the address of the associated account; +// the addressHash denotes the address hash of the associated account; +// the storageKey denotes the raw storage slot key; +// the storageHash denotes the hash of the raw storage slot key; +func newStorageIdentQuery(address common.Address, addressHash common.Hash, storageKey common.Hash, storageHash common.Hash) stateIdentQuery { + return stateIdentQuery{ + stateIdent: newStorageIdent(addressHash, storageHash), + address: address, + storageKey: storageKey, + } +} + +// history defines the interface of historical data, implemented by stateHistory +// and trienodeHistory (in the near future). +type history interface { + // typ returns the historical data type held in the history. + typ() historyType + + // forEach returns an iterator to traverse the state entries in the history. + forEach() iter.Seq[stateIdent] +} + var ( errHeadTruncationOutOfRange = errors.New("history head truncation out of range") errTailTruncationOutOfRange = errors.New("history tail truncation out of range") @@ -31,7 +167,7 @@ var ( // truncateFromHead removes excess elements from the head of the freezer based // on the given parameters. It returns the number of items that were removed. -func truncateFromHead(store ethdb.AncientStore, nhead uint64) (int, error) { +func truncateFromHead(store ethdb.AncientStore, typ historyType, nhead uint64) (int, error) { ohead, err := store.Ancients() if err != nil { return 0, err @@ -40,11 +176,11 @@ func truncateFromHead(store ethdb.AncientStore, nhead uint64) (int, error) { if err != nil { return 0, err } - log.Info("Truncating from head", "ohead", ohead, "tail", otail, "nhead", nhead) + log.Info("Truncating from head", "type", typ.String(), "ohead", ohead, "tail", otail, "nhead", nhead) // Ensure that the truncation target falls within the valid range. if ohead < nhead || nhead < otail { - return 0, fmt.Errorf("%w, tail: %d, head: %d, target: %d", errHeadTruncationOutOfRange, otail, ohead, nhead) + return 0, fmt.Errorf("%w, %s, tail: %d, head: %d, target: %d", errHeadTruncationOutOfRange, typ, otail, ohead, nhead) } // Short circuit if nothing to truncate. if ohead == nhead { @@ -61,7 +197,7 @@ func truncateFromHead(store ethdb.AncientStore, nhead uint64) (int, error) { // truncateFromTail removes excess elements from the end of the freezer based // on the given parameters. It returns the number of items that were removed. -func truncateFromTail(store ethdb.AncientStore, ntail uint64) (int, error) { +func truncateFromTail(store ethdb.AncientStore, typ historyType, ntail uint64) (int, error) { ohead, err := store.Ancients() if err != nil { return 0, err @@ -72,7 +208,7 @@ func truncateFromTail(store ethdb.AncientStore, ntail uint64) (int, error) { } // Ensure that the truncation target falls within the valid range. if otail > ntail || ntail > ohead { - return 0, fmt.Errorf("%w, tail: %d, head: %d, target: %d", errTailTruncationOutOfRange, otail, ohead, ntail) + return 0, fmt.Errorf("%w, %s, tail: %d, head: %d, target: %d", errTailTruncationOutOfRange, typ, otail, ohead, ntail) } // Short circuit if nothing to truncate. if otail == ntail { diff --git a/triedb/pathdb/history_index.go b/triedb/pathdb/history_index.go index e781a898e1a..47cee9820dd 100644 --- a/triedb/pathdb/history_index.go +++ b/triedb/pathdb/history_index.go @@ -78,12 +78,7 @@ type indexReader struct { // loadIndexData loads the index data associated with the specified state. func loadIndexData(db ethdb.KeyValueReader, state stateIdent) ([]*indexBlockDesc, error) { - var blob []byte - if state.account { - blob = rawdb.ReadAccountHistoryIndex(db, state.addressHash) - } else { - blob = rawdb.ReadStorageHistoryIndex(db, state.addressHash, state.storageHash) - } + blob := readStateIndex(state, db) if len(blob) == 0 { return nil, nil } @@ -137,15 +132,8 @@ func (r *indexReader) readGreaterThan(id uint64) (uint64, error) { br, ok := r.readers[desc.id] if !ok { - var ( - err error - blob []byte - ) - if r.state.account { - blob = rawdb.ReadAccountHistoryIndexBlock(r.db, r.state.addressHash, desc.id) - } else { - blob = rawdb.ReadStorageHistoryIndexBlock(r.db, r.state.addressHash, r.state.storageHash, desc.id) - } + var err error + blob := readStateIndexBlock(r.state, r.db, desc.id) br, err = newBlockReader(blob) if err != nil { return 0, err @@ -174,12 +162,7 @@ type indexWriter struct { // newIndexWriter constructs the index writer for the specified state. func newIndexWriter(db ethdb.KeyValueReader, state stateIdent) (*indexWriter, error) { - var blob []byte - if state.account { - blob = rawdb.ReadAccountHistoryIndex(db, state.addressHash) - } else { - blob = rawdb.ReadStorageHistoryIndex(db, state.addressHash, state.storageHash) - } + blob := readStateIndex(state, db) if len(blob) == 0 { desc := newIndexBlockDesc(0) bw, _ := newBlockWriter(nil, desc) @@ -194,15 +177,8 @@ func newIndexWriter(db ethdb.KeyValueReader, state stateIdent) (*indexWriter, er if err != nil { return nil, err } - var ( - indexBlock []byte - lastDesc = descList[len(descList)-1] - ) - if state.account { - indexBlock = rawdb.ReadAccountHistoryIndexBlock(db, state.addressHash, lastDesc.id) - } else { - indexBlock = rawdb.ReadStorageHistoryIndexBlock(db, state.addressHash, state.storageHash, lastDesc.id) - } + lastDesc := descList[len(descList)-1] + indexBlock := readStateIndexBlock(state, db, lastDesc.id) bw, err := newBlockWriter(indexBlock, lastDesc) if err != nil { return nil, err @@ -270,11 +246,7 @@ func (w *indexWriter) finish(batch ethdb.Batch) { return // nothing to commit } for _, bw := range writers { - if w.state.account { - rawdb.WriteAccountHistoryIndexBlock(batch, w.state.addressHash, bw.desc.id, bw.finish()) - } else { - rawdb.WriteStorageHistoryIndexBlock(batch, w.state.addressHash, w.state.storageHash, bw.desc.id, bw.finish()) - } + writeStateIndexBlock(w.state, batch, bw.desc.id, bw.finish()) } w.frozen = nil // release all the frozen writers @@ -282,11 +254,7 @@ func (w *indexWriter) finish(batch ethdb.Batch) { for _, desc := range descList { buf = append(buf, desc.encode()...) } - if w.state.account { - rawdb.WriteAccountHistoryIndex(batch, w.state.addressHash, buf) - } else { - rawdb.WriteStorageHistoryIndex(batch, w.state.addressHash, w.state.storageHash, buf) - } + writeStateIndex(w.state, batch, buf) } // indexDeleter is responsible for deleting index data for a specific state. @@ -301,12 +269,7 @@ type indexDeleter struct { // newIndexDeleter constructs the index deleter for the specified state. func newIndexDeleter(db ethdb.KeyValueReader, state stateIdent) (*indexDeleter, error) { - var blob []byte - if state.account { - blob = rawdb.ReadAccountHistoryIndex(db, state.addressHash) - } else { - blob = rawdb.ReadStorageHistoryIndex(db, state.addressHash, state.storageHash) - } + blob := readStateIndex(state, db) if len(blob) == 0 { // TODO(rjl493456442) we can probably return an error here, // deleter with no data is meaningless. @@ -323,15 +286,8 @@ func newIndexDeleter(db ethdb.KeyValueReader, state stateIdent) (*indexDeleter, if err != nil { return nil, err } - var ( - indexBlock []byte - lastDesc = descList[len(descList)-1] - ) - if state.account { - indexBlock = rawdb.ReadAccountHistoryIndexBlock(db, state.addressHash, lastDesc.id) - } else { - indexBlock = rawdb.ReadStorageHistoryIndexBlock(db, state.addressHash, state.storageHash, lastDesc.id) - } + lastDesc := descList[len(descList)-1] + indexBlock := readStateIndexBlock(state, db, lastDesc.id) bw, err := newBlockWriter(indexBlock, lastDesc) if err != nil { return nil, err @@ -376,15 +332,8 @@ func (d *indexDeleter) pop(id uint64) error { d.descList = d.descList[:len(d.descList)-1] // Open the previous block writer for deleting - var ( - indexBlock []byte - lastDesc = d.descList[len(d.descList)-1] - ) - if d.state.account { - indexBlock = rawdb.ReadAccountHistoryIndexBlock(d.db, d.state.addressHash, lastDesc.id) - } else { - indexBlock = rawdb.ReadStorageHistoryIndexBlock(d.db, d.state.addressHash, d.state.storageHash, lastDesc.id) - } + lastDesc := d.descList[len(d.descList)-1] + indexBlock := readStateIndexBlock(d.state, d.db, lastDesc.id) bw, err := newBlockWriter(indexBlock, lastDesc) if err != nil { return err @@ -399,38 +348,100 @@ func (d *indexDeleter) pop(id uint64) error { // This function is safe to be called multiple times. func (d *indexDeleter) finish(batch ethdb.Batch) { for _, id := range d.dropped { - if d.state.account { - rawdb.DeleteAccountHistoryIndexBlock(batch, d.state.addressHash, id) - } else { - rawdb.DeleteStorageHistoryIndexBlock(batch, d.state.addressHash, d.state.storageHash, id) - } + deleteStateIndexBlock(d.state, batch, id) } d.dropped = nil // Flush the content of last block writer, regardless it's dirty or not if !d.bw.empty() { - if d.state.account { - rawdb.WriteAccountHistoryIndexBlock(batch, d.state.addressHash, d.bw.desc.id, d.bw.finish()) - } else { - rawdb.WriteStorageHistoryIndexBlock(batch, d.state.addressHash, d.state.storageHash, d.bw.desc.id, d.bw.finish()) - } + writeStateIndexBlock(d.state, batch, d.bw.desc.id, d.bw.finish()) } // Flush the index metadata into the supplied batch if d.empty() { - if d.state.account { - rawdb.DeleteAccountHistoryIndex(batch, d.state.addressHash) - } else { - rawdb.DeleteStorageHistoryIndex(batch, d.state.addressHash, d.state.storageHash) - } + deleteStateIndex(d.state, batch) } else { buf := make([]byte, 0, indexBlockDescSize*len(d.descList)) for _, desc := range d.descList { buf = append(buf, desc.encode()...) } - if d.state.account { - rawdb.WriteAccountHistoryIndex(batch, d.state.addressHash, buf) - } else { - rawdb.WriteStorageHistoryIndex(batch, d.state.addressHash, d.state.storageHash, buf) - } + writeStateIndex(d.state, batch, buf) + } +} + +// readStateIndex retrieves the index metadata for the given state identifier. +// This function is shared by accounts and storage slots. +func readStateIndex(ident stateIdent, db ethdb.KeyValueReader) []byte { + switch ident.typ { + case typeAccount: + return rawdb.ReadAccountHistoryIndex(db, ident.addressHash) + case typeStorage: + return rawdb.ReadStorageHistoryIndex(db, ident.addressHash, ident.storageHash) + default: + panic(fmt.Errorf("unknown type: %v", ident.typ)) + } +} + +// writeStateIndex writes the provided index metadata into database with the +// given state identifier. This function is shared by accounts and storage slots. +func writeStateIndex(ident stateIdent, db ethdb.KeyValueWriter, data []byte) { + switch ident.typ { + case typeAccount: + rawdb.WriteAccountHistoryIndex(db, ident.addressHash, data) + case typeStorage: + rawdb.WriteStorageHistoryIndex(db, ident.addressHash, ident.storageHash, data) + default: + panic(fmt.Errorf("unknown type: %v", ident.typ)) + } +} + +// deleteStateIndex removes the index metadata for the given state identifier. +// This function is shared by accounts and storage slots. +func deleteStateIndex(ident stateIdent, db ethdb.KeyValueWriter) { + switch ident.typ { + case typeAccount: + rawdb.DeleteAccountHistoryIndex(db, ident.addressHash) + case typeStorage: + rawdb.DeleteStorageHistoryIndex(db, ident.addressHash, ident.storageHash) + default: + panic(fmt.Errorf("unknown type: %v", ident.typ)) + } +} + +// readStateIndexBlock retrieves the index block for the given state identifier +// and block ID. This function is shared by accounts and storage slots. +func readStateIndexBlock(ident stateIdent, db ethdb.KeyValueReader, id uint32) []byte { + switch ident.typ { + case typeAccount: + return rawdb.ReadAccountHistoryIndexBlock(db, ident.addressHash, id) + case typeStorage: + return rawdb.ReadStorageHistoryIndexBlock(db, ident.addressHash, ident.storageHash, id) + default: + panic(fmt.Errorf("unknown type: %v", ident.typ)) + } +} + +// writeStateIndexBlock writes the provided index block into database with the +// given state identifier. This function is shared by accounts and storage slots. +func writeStateIndexBlock(ident stateIdent, db ethdb.KeyValueWriter, id uint32, data []byte) { + switch ident.typ { + case typeAccount: + rawdb.WriteAccountHistoryIndexBlock(db, ident.addressHash, id, data) + case typeStorage: + rawdb.WriteStorageHistoryIndexBlock(db, ident.addressHash, ident.storageHash, id, data) + default: + panic(fmt.Errorf("unknown type: %v", ident.typ)) + } +} + +// deleteStateIndexBlock removes the index block from database with the given +// state identifier. This function is shared by accounts and storage slots. +func deleteStateIndexBlock(ident stateIdent, db ethdb.KeyValueWriter, id uint32) { + switch ident.typ { + case typeAccount: + rawdb.DeleteAccountHistoryIndexBlock(db, ident.addressHash, id) + case typeStorage: + rawdb.DeleteStorageHistoryIndexBlock(db, ident.addressHash, ident.storageHash, id) + default: + panic(fmt.Errorf("unknown type: %v", ident.typ)) } } diff --git a/triedb/pathdb/history_index_test.go b/triedb/pathdb/history_index_test.go index c83c33ffbde..be9b7c40491 100644 --- a/triedb/pathdb/history_index_test.go +++ b/triedb/pathdb/history_index_test.go @@ -179,7 +179,7 @@ func TestIndexWriterDelete(t *testing.T) { func TestBatchIndexerWrite(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() - batch = newBatchIndexer(db, false) + batch = newBatchIndexer(db, false, typeStateHistory) histories = makeStateHistories(10) ) for i, h := range histories { @@ -190,7 +190,7 @@ func TestBatchIndexerWrite(t *testing.T) { if err := batch.finish(true); err != nil { t.Fatalf("Failed to finish batch indexer, %v", err) } - metadata := loadIndexMetadata(db) + metadata := loadIndexMetadata(db, typeStateHistory) if metadata == nil || metadata.Last != uint64(10) { t.Fatal("Unexpected index position") } @@ -256,7 +256,7 @@ func TestBatchIndexerWrite(t *testing.T) { func TestBatchIndexerDelete(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() - bw = newBatchIndexer(db, false) + bw = newBatchIndexer(db, false, typeStateHistory) histories = makeStateHistories(10) ) // Index histories @@ -270,7 +270,7 @@ func TestBatchIndexerDelete(t *testing.T) { } // Unindex histories - bd := newBatchIndexer(db, true) + bd := newBatchIndexer(db, true, typeStateHistory) for i := len(histories) - 1; i >= 0; i-- { if err := bd.process(histories[i], uint64(i+1)); err != nil { t.Fatalf("Failed to process history, %v", err) @@ -280,7 +280,7 @@ func TestBatchIndexerDelete(t *testing.T) { t.Fatalf("Failed to finish batch indexer, %v", err) } - metadata := loadIndexMetadata(db) + metadata := loadIndexMetadata(db, typeStateHistory) if metadata != nil { t.Fatal("Unexpected index position") } diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index b4e89c3f17b..d6185859291 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -26,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" @@ -41,13 +40,32 @@ const ( stateIndexVersion = stateIndexV0 // the current state index version ) +// indexVersion returns the latest index version for the given history type. +// It panics if the history type is unknown. +func indexVersion(typ historyType) uint8 { + switch typ { + case typeStateHistory: + return stateIndexVersion + default: + panic(fmt.Errorf("unknown history type: %d", typ)) + } +} + +// indexMetadata describes the metadata of the historical data index. type indexMetadata struct { Version uint8 Last uint64 } -func loadIndexMetadata(db ethdb.KeyValueReader) *indexMetadata { - blob := rawdb.ReadStateHistoryIndexMetadata(db) +// loadIndexMetadata reads the metadata of the specific history index. +func loadIndexMetadata(db ethdb.KeyValueReader, typ historyType) *indexMetadata { + var blob []byte + switch typ { + case typeStateHistory: + blob = rawdb.ReadStateHistoryIndexMetadata(db) + default: + panic(fmt.Errorf("unknown history type %d", typ)) + } if len(blob) == 0 { return nil } @@ -59,91 +77,94 @@ func loadIndexMetadata(db ethdb.KeyValueReader) *indexMetadata { return &m } -func storeIndexMetadata(db ethdb.KeyValueWriter, last uint64) { - var m indexMetadata - m.Version = stateIndexVersion - m.Last = last +// storeIndexMetadata stores the metadata of the specific history index. +func storeIndexMetadata(db ethdb.KeyValueWriter, typ historyType, last uint64) { + m := indexMetadata{ + Version: indexVersion(typ), + Last: last, + } blob, err := rlp.EncodeToBytes(m) if err != nil { - log.Crit("Failed to encode index metadata", "err", err) + panic(fmt.Errorf("fail to encode index metadata, %v", err)) } - rawdb.WriteStateHistoryIndexMetadata(db, blob) + switch typ { + case typeStateHistory: + rawdb.WriteStateHistoryIndexMetadata(db, blob) + default: + panic(fmt.Errorf("unknown history type %d", typ)) + } + log.Debug("Written index metadata", "type", typ, "last", last) } -// batchIndexer is a structure designed to perform batch indexing or unindexing -// of state histories atomically. +// deleteIndexMetadata deletes the metadata of the specific history index. +func deleteIndexMetadata(db ethdb.KeyValueWriter, typ historyType) { + switch typ { + case typeStateHistory: + rawdb.DeleteStateHistoryIndexMetadata(db) + default: + panic(fmt.Errorf("unknown history type %d", typ)) + } + log.Debug("Deleted index metadata", "type", typ) +} + +// batchIndexer is responsible for performing batch indexing or unindexing +// of historical data (e.g., state or trie node changes) atomically. type batchIndexer struct { - accounts map[common.Hash][]uint64 // History ID list, Keyed by the hash of account address - storages map[common.Hash]map[common.Hash][]uint64 // History ID list, Keyed by the hash of account address and the hash of raw storage key - counter int // The counter of processed states - delete bool // Index or unindex mode - lastID uint64 // The ID of latest processed history - db ethdb.KeyValueStore + index map[stateIdent][]uint64 // List of history IDs for tracked state entry + pending int // Number of entries processed in the current batch. + delete bool // Operation mode: true for unindex, false for index. + lastID uint64 // ID of the most recently processed history. + typ historyType // Type of history being processed (e.g., state or trienode). + db ethdb.KeyValueStore // Key-value database used to store or delete index data. } // newBatchIndexer constructs the batch indexer with the supplied mode. -func newBatchIndexer(db ethdb.KeyValueStore, delete bool) *batchIndexer { +func newBatchIndexer(db ethdb.KeyValueStore, delete bool, typ historyType) *batchIndexer { return &batchIndexer{ - accounts: make(map[common.Hash][]uint64), - storages: make(map[common.Hash]map[common.Hash][]uint64), - delete: delete, - db: db, + index: make(map[stateIdent][]uint64), + delete: delete, + typ: typ, + db: db, } } -// process iterates through the accounts and their associated storage slots in the -// state history, tracking the mapping between state and history IDs. -func (b *batchIndexer) process(h *stateHistory, historyID uint64) error { - for _, address := range h.accountList { - addrHash := crypto.Keccak256Hash(address.Bytes()) - b.counter += 1 - b.accounts[addrHash] = append(b.accounts[addrHash], historyID) - - for _, slotKey := range h.storageList[address] { - b.counter += 1 - if _, ok := b.storages[addrHash]; !ok { - b.storages[addrHash] = make(map[common.Hash][]uint64) - } - // The hash of the storage slot key is used as the identifier because the - // legacy history does not include the raw storage key, therefore, the - // conversion from storage key to hash is necessary for non-v0 histories. - slotHash := slotKey - if h.meta.version != stateHistoryV0 { - slotHash = crypto.Keccak256Hash(slotKey.Bytes()) - } - b.storages[addrHash][slotHash] = append(b.storages[addrHash][slotHash], historyID) - } +// process traverses the state entries within the provided history and tracks the mutation +// records for them. +func (b *batchIndexer) process(h history, id uint64) error { + for ident := range h.forEach() { + b.index[ident] = append(b.index[ident], id) + b.pending++ } - b.lastID = historyID + b.lastID = id + return b.finish(false) } // finish writes the accumulated state indexes into the disk if either the // memory limitation is reached or it's requested forcibly. func (b *batchIndexer) finish(force bool) error { - if b.counter == 0 { + if b.pending == 0 { return nil } - if !force && b.counter < historyIndexBatch { + if !force && b.pending < historyIndexBatch { return nil } var ( - batch = b.db.NewBatch() - batchMu sync.RWMutex - storages int - start = time.Now() - eg errgroup.Group + batch = b.db.NewBatch() + batchMu sync.RWMutex + start = time.Now() + eg errgroup.Group ) eg.SetLimit(runtime.NumCPU()) - for addrHash, idList := range b.accounts { + for ident, list := range b.index { eg.Go(func() error { if !b.delete { - iw, err := newIndexWriter(b.db, newAccountIdent(addrHash)) + iw, err := newIndexWriter(b.db, ident) if err != nil { return err } - for _, n := range idList { + for _, n := range list { if err := iw.append(n); err != nil { return err } @@ -152,11 +173,11 @@ func (b *batchIndexer) finish(force bool) error { iw.finish(batch) batchMu.Unlock() } else { - id, err := newIndexDeleter(b.db, newAccountIdent(addrHash)) + id, err := newIndexDeleter(b.db, ident) if err != nil { return err } - for _, n := range idList { + for _, n := range list { if err := id.pop(n); err != nil { return err } @@ -168,72 +189,36 @@ func (b *batchIndexer) finish(force bool) error { return nil }) } - for addrHash, slots := range b.storages { - storages += len(slots) - for storageHash, idList := range slots { - eg.Go(func() error { - if !b.delete { - iw, err := newIndexWriter(b.db, newStorageIdent(addrHash, storageHash)) - if err != nil { - return err - } - for _, n := range idList { - if err := iw.append(n); err != nil { - return err - } - } - batchMu.Lock() - iw.finish(batch) - batchMu.Unlock() - } else { - id, err := newIndexDeleter(b.db, newStorageIdent(addrHash, storageHash)) - if err != nil { - return err - } - for _, n := range idList { - if err := id.pop(n); err != nil { - return err - } - } - batchMu.Lock() - id.finish(batch) - batchMu.Unlock() - } - return nil - }) - } - } if err := eg.Wait(); err != nil { return err } // Update the position of last indexed state history if !b.delete { - storeIndexMetadata(batch, b.lastID) + storeIndexMetadata(batch, b.typ, b.lastID) } else { if b.lastID == 1 { - rawdb.DeleteStateHistoryIndexMetadata(batch) + deleteIndexMetadata(batch, b.typ) } else { - storeIndexMetadata(batch, b.lastID-1) + storeIndexMetadata(batch, b.typ, b.lastID-1) } } if err := batch.Write(); err != nil { return err } - log.Debug("Committed batch indexer", "accounts", len(b.accounts), "storages", storages, "records", b.counter, "elapsed", common.PrettyDuration(time.Since(start))) - b.counter = 0 - b.accounts = make(map[common.Hash][]uint64) - b.storages = make(map[common.Hash]map[common.Hash][]uint64) + log.Debug("Committed batch indexer", "type", b.typ, "entries", len(b.index), "records", b.pending, "elapsed", common.PrettyDuration(time.Since(start))) + b.pending = 0 + b.index = make(map[stateIdent][]uint64) return nil } // indexSingle processes the state history with the specified ID for indexing. -func indexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.AncientReader) error { +func indexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.AncientReader, typ historyType) error { start := time.Now() defer func() { indexHistoryTimer.UpdateSince(start) }() - metadata := loadIndexMetadata(db) + metadata := loadIndexMetadata(db, typ) if metadata == nil || metadata.Last+1 != historyID { last := "null" if metadata != nil { @@ -241,29 +226,37 @@ func indexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.Ancient } return fmt.Errorf("history indexing is out of order, last: %s, requested: %d", last, historyID) } - h, err := readStateHistory(freezer, historyID) + var ( + err error + h history + b = newBatchIndexer(db, false, typ) + ) + if typ == typeStateHistory { + h, err = readStateHistory(freezer, historyID) + } else { + // h, err = readTrienodeHistory(freezer, historyID) + } if err != nil { return err } - b := newBatchIndexer(db, false) if err := b.process(h, historyID); err != nil { return err } if err := b.finish(true); err != nil { return err } - log.Debug("Indexed state history", "id", historyID, "elapsed", common.PrettyDuration(time.Since(start))) + log.Debug("Indexed history", "type", typ, "id", historyID, "elapsed", common.PrettyDuration(time.Since(start))) return nil } // unindexSingle processes the state history with the specified ID for unindexing. -func unindexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.AncientReader) error { +func unindexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.AncientReader, typ historyType) error { start := time.Now() defer func() { unindexHistoryTimer.UpdateSince(start) }() - metadata := loadIndexMetadata(db) + metadata := loadIndexMetadata(db, typ) if metadata == nil || metadata.Last != historyID { last := "null" if metadata != nil { @@ -271,18 +264,26 @@ func unindexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.Ancie } return fmt.Errorf("history unindexing is out of order, last: %s, requested: %d", last, historyID) } - h, err := readStateHistory(freezer, historyID) + var ( + err error + h history + ) + b := newBatchIndexer(db, true, typ) + if typ == typeStateHistory { + h, err = readStateHistory(freezer, historyID) + } else { + // h, err = readTrienodeHistory(freezer, historyID) + } if err != nil { return err } - b := newBatchIndexer(db, true) if err := b.process(h, historyID); err != nil { return err } if err := b.finish(true); err != nil { return err } - log.Debug("Unindexed state history", "id", historyID, "elapsed", common.PrettyDuration(time.Since(start))) + log.Debug("Unindexed history", "type", typ, "id", historyID, "elapsed", common.PrettyDuration(time.Since(start))) return nil } @@ -305,6 +306,8 @@ type indexIniter struct { interrupt chan *interruptSignal done chan struct{} closed chan struct{} + typ historyType + log log.Logger // Contextual logger with the history type injected // indexing progress indexed atomic.Uint64 // the id of latest indexed state @@ -313,18 +316,20 @@ type indexIniter struct { wg sync.WaitGroup } -func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastID uint64) *indexIniter { +func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter { initer := &indexIniter{ disk: disk, freezer: freezer, interrupt: make(chan *interruptSignal), done: make(chan struct{}), closed: make(chan struct{}), + typ: typ, + log: log.New("type", typ.String()), } // Load indexing progress var recover bool initer.last.Store(lastID) - metadata := loadIndexMetadata(disk) + metadata := loadIndexMetadata(disk, typ) if metadata != nil { initer.indexed.Store(metadata.Last) recover = metadata.Last > lastID @@ -371,7 +376,7 @@ func (i *indexIniter) remain() uint64 { default: last, indexed := i.last.Load(), i.indexed.Load() if last < indexed { - log.Warn("State indexer is in recovery", "indexed", indexed, "last", last) + i.log.Warn("State indexer is in recovery", "indexed", indexed, "last", last) return indexed - last } return last - indexed @@ -389,7 +394,7 @@ func (i *indexIniter) run(lastID uint64) { // checkDone indicates whether all requested state histories // have been fully indexed. checkDone = func() bool { - metadata := loadIndexMetadata(i.disk) + metadata := loadIndexMetadata(i.disk, i.typ) return metadata != nil && metadata.Last == lastID } ) @@ -411,7 +416,7 @@ func (i *indexIniter) run(lastID uint64) { if newLastID == lastID+1 { lastID = newLastID signal.result <- nil - log.Debug("Extended state history range", "last", lastID) + i.log.Debug("Extended history range", "last", lastID) continue } // The index limit is shortened by one, interrupt the current background @@ -422,14 +427,14 @@ func (i *indexIniter) run(lastID uint64) { // If all state histories, including the one to be reverted, have // been fully indexed, unindex it here and shut down the initializer. if checkDone() { - log.Info("Truncate the extra history", "id", lastID) - if err := unindexSingle(lastID, i.disk, i.freezer); err != nil { + i.log.Info("Truncate the extra history", "id", lastID) + if err := unindexSingle(lastID, i.disk, i.freezer, i.typ); err != nil { signal.result <- err return } close(i.done) signal.result <- nil - log.Info("State histories have been fully indexed", "last", lastID-1) + i.log.Info("Histories have been fully indexed", "last", lastID-1) return } // Adjust the indexing target and relaunch the process @@ -438,12 +443,12 @@ func (i *indexIniter) run(lastID uint64) { done, interrupt = make(chan struct{}), new(atomic.Int32) go i.index(done, interrupt, lastID) - log.Debug("Shortened state history range", "last", lastID) + i.log.Debug("Shortened history range", "last", lastID) case <-done: if checkDone() { close(i.done) - log.Info("State histories have been fully indexed", "last", lastID) + i.log.Info("Histories have been fully indexed", "last", lastID) return } // Relaunch the background runner if some tasks are left @@ -452,7 +457,7 @@ func (i *indexIniter) run(lastID uint64) { case <-i.closed: interrupt.Store(1) - log.Info("Waiting background history index initer to exit") + i.log.Info("Waiting background history index initer to exit") <-done if checkDone() { @@ -472,14 +477,14 @@ func (i *indexIniter) next() (uint64, error) { tailID := tail + 1 // compute the id of the oldest history // Start indexing from scratch if nothing has been indexed - metadata := loadIndexMetadata(i.disk) + metadata := loadIndexMetadata(i.disk, i.typ) if metadata == nil { - log.Debug("Initialize state history indexing from scratch", "id", tailID) + i.log.Debug("Initialize history indexing from scratch", "id", tailID) return tailID, nil } // Resume indexing from the last interrupted position if metadata.Last+1 >= tailID { - log.Debug("Resume state history indexing", "id", metadata.Last+1, "tail", tailID) + i.log.Debug("Resume history indexing", "id", metadata.Last+1, "tail", tailID) return metadata.Last + 1, nil } // History has been shortened without indexing. Discard the gapped segment @@ -487,7 +492,7 @@ func (i *indexIniter) next() (uint64, error) { // // The missing indexes corresponding to the gapped histories won't be visible. // It's fine to leave them unindexed. - log.Info("History gap detected, discard old segment", "oldHead", metadata.Last, "newHead", tailID) + i.log.Info("History gap detected, discard old segment", "oldHead", metadata.Last, "newHead", tailID) return tailID, nil } @@ -496,7 +501,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID beginID, err := i.next() if err != nil { - log.Error("Failed to find next state history for indexing", "err", err) + i.log.Error("Failed to find next history for indexing", "err", err) return } // All available state histories have been indexed, and the last indexed one @@ -511,36 +516,47 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID // // This step is essential to avoid spinning up indexing thread // endlessly until a history object is produced. - storeIndexMetadata(i.disk, 0) - log.Info("Initialized history indexing flag") + storeIndexMetadata(i.disk, i.typ, 0) + i.log.Info("Initialized history indexing flag") } else { - log.Debug("State history is fully indexed", "last", lastID) + i.log.Debug("History is fully indexed", "last", lastID) } return } - log.Info("Start history indexing", "beginID", beginID, "lastID", lastID) + i.log.Info("Start history indexing", "beginID", beginID, "lastID", lastID) var ( current = beginID start = time.Now() logged = time.Now() - batch = newBatchIndexer(i.disk, false) + batch = newBatchIndexer(i.disk, false, i.typ) ) for current <= lastID { count := lastID - current + 1 if count > historyReadBatch { count = historyReadBatch } - histories, err := readStateHistories(i.freezer, current, count) - if err != nil { - // The history read might fall if the history is truncated from - // head due to revert operation. - log.Error("Failed to read history for indexing", "current", current, "count", count, "err", err) - return + var histories []history + if i.typ == typeStateHistory { + histories, err = readStateHistories(i.freezer, current, count) + if err != nil { + // The history read might fall if the history is truncated from + // head due to revert operation. + i.log.Error("Failed to read history for indexing", "current", current, "count", count, "err", err) + return + } + } else { + // histories, err = readTrienodeHistories(i.freezer, current, count) + // if err != nil { + // // The history read might fall if the history is truncated from + // // head due to revert operation. + // i.log.Error("Failed to read history for indexing", "current", current, "count", count, "err", err) + // return + // } } for _, h := range histories { if err := batch.process(h, current); err != nil { - log.Error("Failed to index history", "err", err) + i.log.Error("Failed to index history", "err", err) return } current += 1 @@ -554,7 +570,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID done = current - beginID ) eta := common.CalculateETA(done, left, time.Since(start)) - log.Info("Indexing state history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) + i.log.Info("Indexing state history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) } } i.indexed.Store(current - 1) // update indexing progress @@ -563,7 +579,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID if interrupt != nil { if signal := interrupt.Load(); signal != 0 { if err := batch.finish(true); err != nil { - log.Error("Failed to flush index", "err", err) + i.log.Error("Failed to flush index", "err", err) } log.Info("State indexing interrupted") return @@ -571,9 +587,9 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID } } if err := batch.finish(true); err != nil { - log.Error("Failed to flush index", "err", err) + i.log.Error("Failed to flush index", "err", err) } - log.Info("Indexed state history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) + i.log.Info("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) } // recover handles unclean shutdown recovery. After an unclean shutdown, any @@ -602,14 +618,14 @@ func (i *indexIniter) recover(lastID uint64) { lastID = newLastID signal.result <- nil i.last.Store(newLastID) - log.Debug("Updated history index flag", "last", lastID) + i.log.Debug("Updated history index flag", "last", lastID) // Terminate the recovery routine once the histories are fully aligned // with the index data, indicating that index initialization is complete. - metadata := loadIndexMetadata(i.disk) + metadata := loadIndexMetadata(i.disk, i.typ) if metadata != nil && metadata.Last == lastID { close(i.done) - log.Info("History indexer is recovered", "last", lastID) + i.log.Info("History indexer is recovered", "last", lastID) return } @@ -631,21 +647,31 @@ func (i *indexIniter) recover(lastID uint64) { // state history. type historyIndexer struct { initer *indexIniter + typ historyType disk ethdb.KeyValueStore freezer ethdb.AncientStore } // checkVersion checks whether the index data in the database matches the version. -func checkVersion(disk ethdb.KeyValueStore) { - blob := rawdb.ReadStateHistoryIndexMetadata(disk) +func checkVersion(disk ethdb.KeyValueStore, typ historyType) { + var blob []byte + if typ == typeStateHistory { + blob = rawdb.ReadStateHistoryIndexMetadata(disk) + } else { + panic(fmt.Errorf("unknown history type: %v", typ)) + } + // Short circuit if metadata is not found, re-index is required + // from scratch. if len(blob) == 0 { return } + // Short circuit if the metadata is found and the version is matched var m indexMetadata err := rlp.DecodeBytes(blob, &m) if err == nil && m.Version == stateIndexVersion { return } + // Version is not matched, prune the existing data and re-index from scratch version := "unknown" if err == nil { version = fmt.Sprintf("%d", m.Version) @@ -662,10 +688,11 @@ func checkVersion(disk ethdb.KeyValueStore) { // newHistoryIndexer constructs the history indexer and launches the background // initer to complete the indexing of any remaining state histories. -func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64) *historyIndexer { - checkVersion(disk) +func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer { + checkVersion(disk, typ) return &historyIndexer{ - initer: newIndexIniter(disk, freezer, lastHistoryID), + initer: newIndexIniter(disk, freezer, typ, lastHistoryID), + typ: typ, disk: disk, freezer: freezer, } @@ -693,7 +720,7 @@ func (i *historyIndexer) extend(historyID uint64) error { case <-i.initer.closed: return errors.New("indexer is closed") case <-i.initer.done: - return indexSingle(historyID, i.disk, i.freezer) + return indexSingle(historyID, i.disk, i.freezer, i.typ) case i.initer.interrupt <- signal: return <-signal.result } @@ -710,7 +737,7 @@ func (i *historyIndexer) shorten(historyID uint64) error { case <-i.initer.closed: return errors.New("indexer is closed") case <-i.initer.done: - return unindexSingle(historyID, i.disk, i.freezer) + return unindexSingle(historyID, i.disk, i.freezer, i.typ) case i.initer.interrupt <- signal: return <-signal.result } diff --git a/triedb/pathdb/history_indexer_test.go b/triedb/pathdb/history_indexer_test.go index 96c87ccb1bd..f333d18d8b5 100644 --- a/triedb/pathdb/history_indexer_test.go +++ b/triedb/pathdb/history_indexer_test.go @@ -38,7 +38,7 @@ func TestHistoryIndexerShortenDeadlock(t *testing.T) { rawdb.WriteStateHistory(freezer, uint64(i+1), h.meta.encode(), accountIndex, storageIndex, accountData, storageData) } // As a workaround, assign a future block to keep the initer running indefinitely - indexer := newHistoryIndexer(db, freezer, 200) + indexer := newHistoryIndexer(db, freezer, 200, typeStateHistory) defer indexer.close() done := make(chan error, 1) diff --git a/triedb/pathdb/history_reader.go b/triedb/pathdb/history_reader.go index a11297b3f61..ce6aa693d11 100644 --- a/triedb/pathdb/history_reader.go +++ b/triedb/pathdb/history_reader.go @@ -29,88 +29,6 @@ import ( "github.com/ethereum/go-ethereum/ethdb" ) -// stateIdent represents the identifier of a state element, which can be -// either an account or a storage slot. -type stateIdent struct { - account bool - - // The hash of the account address. This is used instead of the raw account - // address is to align the traversal order with the Merkle-Patricia-Trie. - addressHash common.Hash - - // The hash of the storage slot key. This is used instead of the raw slot key - // because, in legacy state histories (prior to the Cancun fork), the slot - // identifier is the hash of the key, and the original key (preimage) cannot - // be recovered. To maintain backward compatibility, the key hash is used. - // - // Meanwhile, using the storage key hash also preserve the traversal order - // with Merkle-Patricia-Trie. - // - // This field is null if the identifier refers to account data. - storageHash common.Hash -} - -// String returns the string format state identifier. -func (ident stateIdent) String() string { - if ident.account { - return ident.addressHash.Hex() - } - return ident.addressHash.Hex() + ident.storageHash.Hex() -} - -// newAccountIdent constructs a state identifier for an account. -func newAccountIdent(addressHash common.Hash) stateIdent { - return stateIdent{ - account: true, - addressHash: addressHash, - } -} - -// newStorageIdent constructs a state identifier for a storage slot. -// The address denotes the address of the associated account; -// the storageHash denotes the hash of the raw storage slot key; -func newStorageIdent(addressHash common.Hash, storageHash common.Hash) stateIdent { - return stateIdent{ - addressHash: addressHash, - storageHash: storageHash, - } -} - -// stateIdentQuery is the extension of stateIdent by adding the raw storage key. -type stateIdentQuery struct { - stateIdent - - address common.Address - storageKey common.Hash -} - -// newAccountIdentQuery constructs a state identifier for an account. -func newAccountIdentQuery(address common.Address, addressHash common.Hash) stateIdentQuery { - return stateIdentQuery{ - stateIdent: stateIdent{ - account: true, - addressHash: addressHash, - }, - address: address, - } -} - -// newStorageIdentQuery constructs a state identifier for a storage slot. -// the address denotes the address of the associated account; -// the addressHash denotes the address hash of the associated account; -// the storageKey denotes the raw storage slot key; -// the storageHash denotes the hash of the raw storage slot key; -func newStorageIdentQuery(address common.Address, addressHash common.Hash, storageKey common.Hash, storageHash common.Hash) stateIdentQuery { - return stateIdentQuery{ - stateIdent: stateIdent{ - addressHash: addressHash, - storageHash: storageHash, - }, - address: address, - storageKey: storageKey, - } -} - // indexReaderWithLimitTag is a wrapper around indexReader that includes an // additional index position. This position represents the ID of the last // indexed state history at the time the reader was created, implying that @@ -169,7 +87,7 @@ func (r *indexReaderWithLimitTag) readGreaterThan(id uint64, lastID uint64) (uin // Given that it's very unlikely to occur and users try to perform historical // state queries while reverting the states at the same time. Simply returning // an error should be sufficient for now. - metadata := loadIndexMetadata(r.db) + metadata := loadIndexMetadata(r.db, toHistoryType(r.reader.state.typ)) if metadata == nil || metadata.Last < lastID { return 0, errors.New("state history hasn't been indexed yet") } @@ -331,7 +249,7 @@ func (r *historyReader) read(state stateIdentQuery, stateID uint64, lastID uint6 // To serve the request, all state histories from stateID+1 to lastID // must be indexed. It's not supposed to happen unless system is very // wrong. - metadata := loadIndexMetadata(r.disk) + metadata := loadIndexMetadata(r.disk, toHistoryType(state.typ)) if metadata == nil || metadata.Last < lastID { indexed := "null" if metadata != nil { @@ -364,7 +282,7 @@ func (r *historyReader) read(state stateIdentQuery, stateID uint64, lastID uint6 // that the associated state histories are no longer available due to a rollback. // Such truncation should be captured by the state resolver below, rather than returning // invalid data. - if state.account { + if state.typ == typeAccount { return r.readAccount(state.address, historyID) } return r.readStorage(state.address, state.storageKey, state.storageHash, historyID) diff --git a/triedb/pathdb/history_reader_test.go b/triedb/pathdb/history_reader_test.go index 75c5f701f96..3e1a545ff32 100644 --- a/triedb/pathdb/history_reader_test.go +++ b/triedb/pathdb/history_reader_test.go @@ -29,7 +29,7 @@ import ( func waitIndexing(db *Database) { for { - metadata := loadIndexMetadata(db.diskdb) + metadata := loadIndexMetadata(db.diskdb, typeStateHistory) if metadata != nil && metadata.Last >= db.tree.bottom().stateID() { return } diff --git a/triedb/pathdb/history_state.go b/triedb/pathdb/history_state.go index 3bb69a7f4d0..9d1e4dfb099 100644 --- a/triedb/pathdb/history_state.go +++ b/triedb/pathdb/history_state.go @@ -21,6 +21,7 @@ import ( "encoding/binary" "errors" "fmt" + "iter" "maps" "slices" "time" @@ -275,6 +276,36 @@ func newStateHistory(root common.Hash, parent common.Hash, block uint64, account } } +// typ implements the history interface, returning the historical data type held. +func (h *stateHistory) typ() historyType { + return typeStateHistory +} + +// forEach implements the history interface, returning an iterator to traverse the +// state entries in the history. +func (h *stateHistory) forEach() iter.Seq[stateIdent] { + return func(yield func(stateIdent) bool) { + for _, addr := range h.accountList { + addrHash := crypto.Keccak256Hash(addr.Bytes()) + if !yield(newAccountIdent(addrHash)) { + return + } + for _, slotKey := range h.storageList[addr] { + // The hash of the storage slot key is used as the identifier because the + // legacy history does not include the raw storage key, therefore, the + // conversion from storage key to hash is necessary for non-v0 histories. + slotHash := slotKey + if h.meta.version != stateHistoryV0 { + slotHash = crypto.Keccak256Hash(slotKey.Bytes()) + } + if !yield(newStorageIdent(addrHash, slotHash)) { + return + } + } + } + } +} + // stateSet returns the state set, keyed by the hash of the account address // and the hash of the storage slot key. func (h *stateHistory) stateSet() (map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte) { @@ -536,8 +567,8 @@ func readStateHistory(reader ethdb.AncientReader, id uint64) (*stateHistory, err } // readStateHistories reads a list of state history records within the specified range. -func readStateHistories(freezer ethdb.AncientReader, start uint64, count uint64) ([]*stateHistory, error) { - var histories []*stateHistory +func readStateHistories(freezer ethdb.AncientReader, start uint64, count uint64) ([]history, error) { + var histories []history metaList, aIndexList, sIndexList, aDataList, sDataList, err := rawdb.ReadStateHistoryList(freezer, start, count) if err != nil { return nil, err diff --git a/triedb/pathdb/history_state_test.go b/triedb/pathdb/history_state_test.go index 4a777111ea8..5718081566c 100644 --- a/triedb/pathdb/history_state_test.go +++ b/triedb/pathdb/history_state_test.go @@ -137,7 +137,7 @@ func TestTruncateHeadStateHistory(t *testing.T) { rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData) } for size := len(hs); size > 0; size-- { - pruned, err := truncateFromHead(freezer, uint64(size-1)) + pruned, err := truncateFromHead(freezer, typeStateHistory, uint64(size-1)) if err != nil { t.Fatalf("Failed to truncate from head %v", err) } @@ -161,7 +161,7 @@ func TestTruncateTailStateHistory(t *testing.T) { rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData) } for newTail := 1; newTail < len(hs); newTail++ { - pruned, _ := truncateFromTail(freezer, uint64(newTail)) + pruned, _ := truncateFromTail(freezer, typeStateHistory, uint64(newTail)) if pruned != 1 { t.Error("Unexpected pruned items", "want", 1, "got", pruned) } @@ -209,7 +209,7 @@ func TestTruncateTailStateHistories(t *testing.T) { accountData, storageData, accountIndex, storageIndex := hs[i].encode() rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData) } - pruned, _ := truncateFromTail(freezer, uint64(10)-c.limit) + pruned, _ := truncateFromTail(freezer, typeStateHistory, uint64(10)-c.limit) if pruned != c.expPruned { t.Error("Unexpected pruned items", "want", c.expPruned, "got", pruned) } @@ -233,7 +233,7 @@ func TestTruncateOutOfRange(t *testing.T) { accountData, storageData, accountIndex, storageIndex := hs[i].encode() rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData) } - truncateFromTail(freezer, uint64(len(hs)/2)) + truncateFromTail(freezer, typeStateHistory, uint64(len(hs)/2)) // Ensure of-out-range truncations are rejected correctly. head, _ := freezer.Ancients() @@ -254,9 +254,9 @@ func TestTruncateOutOfRange(t *testing.T) { for _, c := range cases { var gotErr error if c.mode == 0 { - _, gotErr = truncateFromHead(freezer, c.target) + _, gotErr = truncateFromHead(freezer, typeStateHistory, c.target) } else { - _, gotErr = truncateFromTail(freezer, c.target) + _, gotErr = truncateFromTail(freezer, typeStateHistory, c.target) } if !errors.Is(gotErr, c.expErr) { t.Errorf("Unexpected error, want: %v, got: %v", c.expErr, gotErr)