Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trie_prefetcher: alternate structure #666

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
blockStateInitTimer.Inc(time.Since(substart).Milliseconds())

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism)
statedb.StartPrefetcher("chain")
activeState = statedb

// Process block using the parent state as reference point
Expand Down Expand Up @@ -1675,7 +1675,7 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block)
}

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism)
statedb.StartPrefetcher("chain")
defer func() {
statedb.StopPrefetcher()
}()
Expand Down Expand Up @@ -2073,7 +2073,10 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
bc.hc.SetCurrentHeader(block.Header())

lastAcceptedHash := block.Hash()
bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb)
bc.stateCache = state.WithPrefetcher(
state.NewDatabaseWithNodeDB(bc.db, bc.triedb),
bc.cacheConfig.TriePrefetcherParallelism,
)

if err := bc.loadLastState(lastAcceptedHash); err != nil {
return err
Expand Down
214 changes: 214 additions & 0 deletions core/state/prefetcher_database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// (c) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package state

import (
"sync"

"github.com/ava-labs/coreth/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)

// PrefetcherDB is an interface that extends Database with additional methods
// used in trie_prefetcher. This includes specific methods for prefetching
// accounts and storage slots, (which may be non-blocking and/or parallelized)
// and methods to wait for pending prefetches.
type PrefetcherDB interface {
// From Database
OpenTrie(root common.Hash) (Trie, error)
OpenStorageTrie(stateRoot common.Hash, address common.Address, root common.Hash, trie Trie) (Trie, error)
CopyTrie(t Trie) Trie

// Additional methods
PrefetchAccount(t Trie, address common.Address)
PrefetchStorage(t Trie, address common.Address, key []byte)
CanPrefetchDuringShutdown() bool
WaitTrie(t Trie)
Close()
}

// withPrefetcher is an optional interface that a Database can implement to
// signal PrefetcherDB() should be called to get a Database for use in
// trie_prefetcher. Each call to PrefetcherDB() should return a new
// PrefetcherDB instance.
type withPrefetcherDB interface {
PrefetcherDB() PrefetcherDB
}

type withPrefetcher struct {
Database
maxConcurrency int
}

func (db *withPrefetcher) PrefetcherDB() PrefetcherDB {
return newPrefetcherDatabase(db.Database, db.maxConcurrency)
}

func WithPrefetcher(db Database, maxConcurrency int) Database {
return &withPrefetcher{db, maxConcurrency}
}

// withPrefetcherDefaults extends Database and implements PrefetcherDB by adding
// default implementations for PrefetchAccount and PrefetchStorage that read the
// account and storage slot from the trie.
type withPrefetcherDefaults struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this no-op prefetcher?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we add a type assertion and maybe move this to no_op_prefetcher file?

Database
}

func (withPrefetcherDefaults) PrefetchAccount(t Trie, address common.Address) {
_, err := t.GetAccount(address)
if err != nil {
log.Error("GetAccount failed in prefetcher", "err", err)
}
}

func (withPrefetcherDefaults) PrefetchStorage(t Trie, address common.Address, key []byte) {
_, err := t.GetStorage(address, key)
if err != nil {
log.Error("GetStorage failed in prefetcher", "err", err)
}
}

func (withPrefetcherDefaults) CanPrefetchDuringShutdown() bool { return false }
func (withPrefetcherDefaults) WaitTrie(Trie) {}
func (withPrefetcherDefaults) Close() {}

type prefetcherDatabase struct {
Database

maxConcurrency int
workers *utils.BoundedWorkers
}

func newPrefetcherDatabase(db Database, maxConcurrency int) *prefetcherDatabase {
return &prefetcherDatabase{
Database: db,
maxConcurrency: maxConcurrency,
workers: utils.NewBoundedWorkers(maxConcurrency),
}
}

func (p *prefetcherDatabase) OpenTrie(root common.Hash) (Trie, error) {
trie, err := p.Database.OpenTrie(root)
return newPrefetcherTrie(p, trie), err
}

func (p *prefetcherDatabase) OpenStorageTrie(stateRoot common.Hash, address common.Address, root common.Hash, trie Trie) (Trie, error) {
storageTrie, err := p.Database.OpenStorageTrie(stateRoot, address, root, trie)
return newPrefetcherTrie(p, storageTrie), err
}

func (p *prefetcherDatabase) CopyTrie(t Trie) Trie {
switch t := t.(type) {
case *prefetcherTrie:
return t.getCopy()
default:
return p.Database.CopyTrie(t)
}
}

// PrefetchAccount should only be called on a trie returned from OpenTrie or OpenStorageTrie
func (*prefetcherDatabase) PrefetchAccount(t Trie, address common.Address) {
t.(*prefetcherTrie).PrefetchAccount(address)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can you be certain that t will be a *prefetcherTrie?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PrefetcherDB type is designed to minimize changes in trie_prefetcher so these methods are only called from the trie_prefetcher.
In this case, sf.trie is initialized with the return values of OpenTrie or OpenStorageTrie, which is the intended use of the PrefetcherDB type.

We could try to return another type from OpenTrie, however this lead to more changes in trie_prefetcher than I would consider "minimal", open to your suggestions.

Copy link
Collaborator Author

@darioush darioush Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also existing code https://github.com/ava-labs/coreth/blob/master/core/state/database.go#L218-L225 couples the DB type to the trie type (CopyTrie). This code seems "gone" https://github.com/ethereum/go-ethereum/blob/master/core/state/database.go#L48 from the current versions of upstream which is very promising for the future

}

// PrefetchStorage should only be called on a trie returned from OpenTrie or OpenStorageTrie
func (*prefetcherDatabase) PrefetchStorage(t Trie, address common.Address, key []byte) {
t.(*prefetcherTrie).PrefetchStorage(address, key)
}

// WaitTrie should only be called on a trie returned from OpenTrie or OpenStorageTrie
func (*prefetcherDatabase) WaitTrie(t Trie) {
t.(*prefetcherTrie).Wait()
}

func (p *prefetcherDatabase) Close() {
p.workers.Wait()
}

func (p *prefetcherDatabase) CanPrefetchDuringShutdown() bool {
return true
}

type prefetcherTrie struct {
p *prefetcherDatabase

Trie
copyLock sync.Mutex

copies chan Trie
wg sync.WaitGroup
}

// newPrefetcherTrie returns a new prefetcherTrie that wraps the given trie.
// prefetcherTrie prefetches accounts and storage slots in parallel, using
// bounded workers from the prefetcherDatabase. As Trie is not safe for
// concurrent access, each prefetch operation uses a copy. The copy is kept in
// a buffered channel for reuse.
func newPrefetcherTrie(p *prefetcherDatabase, t Trie) *prefetcherTrie {
prefetcher := &prefetcherTrie{
p: p,
Trie: t,
copies: make(chan Trie, p.maxConcurrency),
}
prefetcher.copies <- prefetcher.getCopy()
return prefetcher
}

func (p *prefetcherTrie) Wait() {
p.wg.Wait()
}

// getCopy returns a copy of the trie. The copy is taken from the copies channel
// if available, otherwise a new copy is created.
func (p *prefetcherTrie) getCopy() Trie {
select {
case copy := <-p.copies:
return copy
default:
p.copyLock.Lock()
defer p.copyLock.Unlock()
return p.p.Database.CopyTrie(p.Trie)
darioush marked this conversation as resolved.
Show resolved Hide resolved
}
}

// putCopy keeps the copy for future use. If the buffer is full, the copy is
// discarded.
func (p *prefetcherTrie) putCopy(copy Trie) {
select {
case p.copies <- copy:
default:
}
}

func (p *prefetcherTrie) PrefetchAccount(address common.Address) {
p.wg.Add(1)
f := func() {
defer p.wg.Done()

tr := p.getCopy()
_, err := tr.GetAccount(address)
if err != nil {
log.Error("GetAccount failed in prefetcher", "err", err)
}
p.putCopy(tr)
}
p.p.workers.Execute(f)
}

func (p *prefetcherTrie) PrefetchStorage(address common.Address, key []byte) {
p.wg.Add(1)
f := func() {
defer p.wg.Done()

tr := p.getCopy()
_, err := tr.GetStorage(address, key)
if err != nil {
log.Error("GetStorage failed in prefetcher", "err", err)
}
p.putCopy(tr)
}
p.p.workers.Execute(f)
}
4 changes: 2 additions & 2 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ func NewWithSnapshot(root common.Hash, db Database, snap snapshot.Snapshot) (*St
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string, maxConcurrency int) {
func (s *StateDB) StartPrefetcher(namespace string) {
if s.prefetcher != nil {
s.prefetcher.close()
s.prefetcher = nil
}
if s.snap != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, maxConcurrency)
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace)
}
}

Expand Down
Loading
Loading