From 37ec06a5ce7031235581ebabc0577cf6c6425a49 Mon Sep 17 00:00:00 2001 From: joeylichang Date: Tue, 7 Nov 2023 10:44:51 +0800 Subject: [PATCH 1/2] core, eth, trie: write nodebuffer asynchronously to disk --- core/blockchain.go | 11 +- core/blockchain_insert.go | 3 +- core/blockchain_test.go | 2 +- eth/state_accessor.go | 4 +- eth/tracers/api.go | 4 +- trie/database.go | 12 +- trie/triedb/hashdb/database.go | 4 +- trie/triedb/pathdb/database.go | 12 +- trie/triedb/pathdb/difflayer_test.go | 2 +- trie/triedb/pathdb/disklayer.go | 54 +++++- trie/triedb/pathdb/errors.go | 14 ++ trie/triedb/pathdb/journal.go | 15 +- trie/triedb/pathdb/nodebuffer.go | 276 ++++++++++++++++++++++++++- 13 files changed, 367 insertions(+), 46 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index f458da82573e..5ec5bdad175b 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1023,7 +1023,7 @@ func (bc *BlockChain) Stop() { for !bc.triegc.Empty() { triedb.Dereference(bc.triegc.PopItem()) } - if _, nodes, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb + if _, nodes, _, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb log.Error("Dangling trie nodes after full cleanup") } } @@ -1431,8 +1431,9 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } // If we exceeded our memory allowance, flush matured singleton nodes to disk var ( - _, nodes, imgs = bc.triedb.Size() // all memory is contained within the nodes return for hashdb - limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 + _, nodesMutable, nodesImmutable, imgs = bc.triedb.Size() // all memory is contained within the nodes return for hashdb + limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 + nodes = nodesMutable + nodesImmutable ) if nodes > limit || imgs > 4*1024*1024 { bc.triedb.Cap(limit - ethdb.IdealBatchSize) @@ -1872,8 +1873,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) if bc.snaps != nil { snapDiffItems, snapBufItems = bc.snaps.Size() } - trieDiffNodes, trieBufNodes, _ := bc.triedb.Size() - stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, setHead) + trieDiffNodes, trieBufNodes, trieBufNodesImmutable, _ := bc.triedb.Size() + stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieBufNodesImmutable, setHead) if !setHead { // After merge we expect few side chains. Simply count diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index 9bf662b6b710..bb6ed5a2f041 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -39,7 +39,7 @@ const statsReportLimit = 8 * time.Second // report prints statistics if some number of blocks have been processed // or more than a few seconds have passed since the last message. -func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes common.StorageSize, setHead bool) { +func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes, trieBufNodesImmutable common.StorageSize, setHead bool) { // Fetch the timings for the batch var ( now = mclock.Now() @@ -71,6 +71,7 @@ func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, sn } if trieDiffNodes != 0 { // pathdb context = append(context, []interface{}{"triediffs", trieDiffNodes}...) + context = append(context, []interface{}{"triedirtyimmutable", trieBufNodesImmutable}...) } context = append(context, []interface{}{"triedirty", triebufNodes}...) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index bc6f8112f015..f451fa8d1a6a 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -1844,7 +1844,7 @@ func TestTrieForkGC(t *testing.T) { chain.TrieDB().Dereference(blocks[len(blocks)-1-i].Root()) chain.TrieDB().Dereference(forks[len(blocks)-1-i].Root()) } - if _, nodes, _ := chain.TrieDB().Size(); nodes > 0 { // all memory is returned in the nodes return for hashdb + if _, nodes, _, _ := chain.TrieDB().Size(); nodes > 0 { // all memory is returned in the nodes return for hashdb t.Fatalf("stale tries still alive after garbase collection") } } diff --git a/eth/state_accessor.go b/eth/state_accessor.go index 24694df66c36..c83bd9e07ea0 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -168,8 +168,8 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u parent = root } if report { - _, nodes, imgs := triedb.Size() // all memory is contained within the nodes return in hashdb - log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs) + _, nodes, nodeImmutable, imgs := triedb.Size() // all memory is contained within the nodes return in hashdb + log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "nodesimmutable", nodeImmutable, "preimages", imgs) } return statedb, func() { triedb.Dereference(block.Root()) }, nil } diff --git a/eth/tracers/api.go b/eth/tracers/api.go index 300d904a9970..10c47983ab7d 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -368,8 +368,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed // if the relevant state is available in disk. var preferDisk bool if statedb != nil { - s1, s2, s3 := statedb.Database().TrieDB().Size() - preferDisk = s1+s2+s3 > defaultTracechainMemLimit + s1, s2, s3, s4 := statedb.Database().TrieDB().Size() + preferDisk = s1+s2+s3+s4 > defaultTracechainMemLimit } statedb, release, err = api.backend.StateAtBlock(ctx, block, reexec, statedb, false, preferDisk) if err != nil { diff --git a/trie/database.go b/trie/database.go index 1e59f0908f38..6929b9c0da40 100644 --- a/trie/database.go +++ b/trie/database.go @@ -57,7 +57,7 @@ type backend interface { // // For hash scheme, there is no differentiation between diff layer nodes // and dirty disk layer nodes, so both are merged into the second return. - Size() (common.StorageSize, common.StorageSize) + Size() (common.StorageSize, common.StorageSize, common.StorageSize) // Update performs a state transition by committing dirty nodes contained // in the given set in order to update state from the specified parent to @@ -151,16 +151,16 @@ func (db *Database) Commit(root common.Hash, report bool) error { // Size returns the storage size of diff layer nodes above the persistent disk // layer, the dirty nodes buffered within the disk layer, and the size of cached // preimages. -func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) { +func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize, common.StorageSize) { var ( - diffs, nodes common.StorageSize - preimages common.StorageSize + diffs, nodes, nodesImmutable common.StorageSize + preimages common.StorageSize ) - diffs, nodes = db.backend.Size() + diffs, nodes, nodesImmutable = db.backend.Size() if db.preimages != nil { preimages = db.preimages.size() } - return diffs, nodes, preimages + return diffs, nodes, nodesImmutable, preimages } // Initialized returns an indicator if the state data is already initialized diff --git a/trie/triedb/hashdb/database.go b/trie/triedb/hashdb/database.go index 764ab24ec8dd..1f49491d0128 100644 --- a/trie/triedb/hashdb/database.go +++ b/trie/triedb/hashdb/database.go @@ -627,7 +627,7 @@ func (db *Database) Update(root common.Hash, parent common.Hash, block uint64, n // // The first return will always be 0, representing the memory stored in unbounded // diff layers above the dirty cache. This is only available in pathdb. -func (db *Database) Size() (common.StorageSize, common.StorageSize) { +func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) { db.lock.RLock() defer db.lock.RUnlock() @@ -635,7 +635,7 @@ func (db *Database) Size() (common.StorageSize, common.StorageSize) { // the total memory consumption, the maintenance metadata is also needed to be // counted. var metadataSize = common.StorageSize(len(db.dirties) * cachedNodeSize) - return 0, db.dirtiesSize + db.childrenSize + metadataSize + return 0, db.dirtiesSize + db.childrenSize + metadataSize, 0 } // Close closes the trie database and releases all held resources. diff --git a/trie/triedb/pathdb/database.go b/trie/triedb/pathdb/database.go index dc64414e9b52..94753f228ffa 100644 --- a/trie/triedb/pathdb/database.go +++ b/trie/triedb/pathdb/database.go @@ -52,6 +52,10 @@ const ( // Do not increase the buffer size arbitrarily, otherwise the system // pause time will increase when the database writes happen. DefaultBufferSize = 64 * 1024 * 1024 + + // DefaultBackgroundFlushInterval defines the default the wait interval + // that background node cache flush disk. + DefaultBackgroundFlushInterval = 3 ) // layer is the interface implemented by all state layers which includes some @@ -303,7 +307,7 @@ func (db *Database) Enable(root common.Hash) error { } // Re-construct a new disk layer backed by persistent state // with **empty clean cache and node buffer**. - db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0))) + db.tree.reset(newDiskLayer(root, 0, db, nil, newAsyncNodeBuffer(db.bufferSize, nil, 0))) // Re-enable the database as the final step. db.waitSync = false @@ -410,16 +414,16 @@ func (db *Database) Close() error { // Size returns the current storage size of the memory cache in front of the // persistent database layer. -func (db *Database) Size() (diffs common.StorageSize, nodes common.StorageSize) { +func (db *Database) Size() (diffs common.StorageSize, nodes common.StorageSize, nodesImmutable common.StorageSize) { db.tree.forEach(func(layer layer) { if diff, ok := layer.(*diffLayer); ok { diffs += common.StorageSize(diff.memory) } if disk, ok := layer.(*diskLayer); ok { - nodes += disk.size() + nodes, nodesImmutable = disk.size() } }) - return diffs, nodes + return diffs, nodes, nodesImmutable } // Initialized returns an indicator if the state data is already diff --git a/trie/triedb/pathdb/difflayer_test.go b/trie/triedb/pathdb/difflayer_test.go index 9b5907c3c5b3..72aada1e1c52 100644 --- a/trie/triedb/pathdb/difflayer_test.go +++ b/trie/triedb/pathdb/difflayer_test.go @@ -29,7 +29,7 @@ import ( func emptyLayer() *diskLayer { return &diskLayer{ db: New(rawdb.NewMemoryDatabase(), nil), - buffer: newNodeBuffer(DefaultBufferSize, nil, 0), + buffer: newAsyncNodeBuffer(DefaultBufferSize, nil, 0), } } diff --git a/trie/triedb/pathdb/disklayer.go b/trie/triedb/pathdb/disklayer.go index ef697cbce8ce..c6d900888288 100644 --- a/trie/triedb/pathdb/disklayer.go +++ b/trie/triedb/pathdb/disklayer.go @@ -25,25 +25,68 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/trie/trienode" "github.com/ethereum/go-ethereum/trie/triestate" "golang.org/x/crypto/sha3" ) +// trienodebuffer is a collection of modified trie nodes to aggregate the disk +// write. The content of the trienodebuffer must be checked before diving into +// disk (since it basically is not-yet-written data). +type trienodebuffer interface { + // node retrieves the trie node with given node info. + node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error) + + // commit merges the dirty nodes into the trienodebuffer. This operation won't take + // the ownership of the nodes map which belongs to the bottom-most diff layer. + // It will just hold the node references from the given map which are safe to + // copy. + commit(nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer + + // revert is the reverse operation of commit. It also merges the provided nodes + // into the trienodebuffer, the difference is that the provided node set should + // revert the changes made by the last state transition. + revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error + + // flush persists the in-memory dirty trie node into the disk if the configured + // memory threshold is reached. Note, all data must be written atomically. + flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error + + // setSize sets the buffer size to the provided number, and invokes a flush + // operation if the current memory usage exceeds the new limit. + setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error + + // reset cleans up the disk cache. + reset() + + // empty returns an indicator if trienodebuffer contains any state transition inside. + empty() bool + + // getSize return the trienodebuffer used size. + getSize() (uint64, uint64) + + // getAllNodes return all the trie nodes are cached in trienodebuffer. + getAllNodes() map[common.Hash]map[string]*trienode.Node + + // getLayers return the size of cached difflayers. + getLayers() uint64 +} + // diskLayer is a low level persistent layer built on top of a key-value store. type diskLayer struct { root common.Hash // Immutable, root hash to which this layer was made for id uint64 // Immutable, corresponding state id db *Database // Path-based trie database cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs - buffer *nodebuffer // Node buffer to aggregate writes + buffer trienodebuffer // Node buffer to aggregate writes stale bool // Signals that the layer became stale (state progressed) lock sync.RWMutex // Lock used to protect stale flag } // newDiskLayer creates a new disk layer based on the passing arguments. -func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer) *diskLayer { +func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer trienodebuffer) *diskLayer { // Initialize a clean cache if the memory allowance is not zero // or reuse the provided cache if it is not nil (inherited from // the original disk layer). @@ -294,14 +337,15 @@ func (dl *diskLayer) setBufferSize(size int) error { } // size returns the approximate size of cached nodes in the disk layer. -func (dl *diskLayer) size() common.StorageSize { +func (dl *diskLayer) size() (common.StorageSize, common.StorageSize) { dl.lock.RLock() defer dl.lock.RUnlock() if dl.stale { - return 0 + return 0, 0 } - return common.StorageSize(dl.buffer.size) + nodeBuf, nodeImmutableBuf := dl.buffer.getSize() + return common.StorageSize(nodeBuf), common.StorageSize(nodeImmutableBuf) } // resetCache releases the memory held by clean cache to prevent memory leak. diff --git a/trie/triedb/pathdb/errors.go b/trie/triedb/pathdb/errors.go index 78ee4459fe50..d8a580531789 100644 --- a/trie/triedb/pathdb/errors.go +++ b/trie/triedb/pathdb/errors.go @@ -49,6 +49,20 @@ var ( // errUnexpectedNode is returned if the requested node with specified path is // not hash matched with expectation. errUnexpectedNode = errors.New("unexpected node") + + // errWriteImmutable is returned if write to background immutable nodebuffer + // under asyncnodebuffer + errWriteImmutable = errors.New("write immutable node buffer") + + // errFlushMutable is returned if flush the background mutable nodebuffer + // to disk, under asyncnodebuffer + errFlushMutable = errors.New("flush mutable node buffer") + + // errRevertImmutable is returned if revert the background immutable nodebuffer + errRevertImmutable = errors.New("revert immutable node buffer") + + // errIncompatibleMerge is returned when merge node cache occurs error. + errIncompatibleMerge = errors.New("incompatible node buffer merge") ) func newUnexpectedNodeError(loc string, expHash common.Hash, gotHash common.Hash, owner common.Hash, path []byte, blob []byte) error { diff --git a/trie/triedb/pathdb/journal.go b/trie/triedb/pathdb/journal.go index ac770763e38d..ac43dbacdeb5 100644 --- a/trie/triedb/pathdb/journal.go +++ b/trie/triedb/pathdb/journal.go @@ -130,7 +130,7 @@ func (db *Database) loadLayers() layer { log.Info("Failed to load journal, discard it", "err", err) } // Return single layer with persistent state. - return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newNodeBuffer(db.bufferSize, nil, 0)) + return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newAsyncNodeBuffer(db.bufferSize, nil, 0)) } // loadDiskLayer reads the binary blob from the layer journal, reconstructing @@ -170,7 +170,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) { nodes[entry.Owner] = subset } // Calculate the internal state transitions by id difference. - base := newDiskLayer(root, id, db, nil, newNodeBuffer(db.bufferSize, nodes, id-stored)) + base := newDiskLayer(root, id, db, nil, newAsyncNodeBuffer(db.bufferSize, nodes, id-stored)) return base, nil } @@ -260,8 +260,9 @@ func (dl *diskLayer) journal(w io.Writer) error { return err } // Step three, write all unwritten nodes into the journal - nodes := make([]journalNodes, 0, len(dl.buffer.nodes)) - for owner, subset := range dl.buffer.nodes { + cachedNodes := dl.buffer.getAllNodes() + nodes := make([]journalNodes, 0, len(cachedNodes)) + for owner, subset := range cachedNodes { entry := journalNodes{Owner: owner} for path, node := range subset { entry.Nodes = append(entry.Nodes, journalNode{Path: []byte(path), Blob: node.Blob}) @@ -271,7 +272,7 @@ func (dl *diskLayer) journal(w io.Writer) error { if err := rlp.Encode(w, nodes); err != nil { return err } - log.Debug("Journaled pathdb disk layer", "root", dl.root, "nodes", len(dl.buffer.nodes)) + log.Debug("Journaled pathdb disk layer", "root", dl.root, "nodes", len(cachedNodes)) return nil } @@ -344,9 +345,9 @@ func (db *Database) Journal(root common.Hash) error { } disk := db.tree.bottom() if l, ok := l.(*diffLayer); ok { - log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers) + log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.getLayers()) } else { // disk layer only on noop runs (likely) or deep reorgs (unlikely) - log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.layers) + log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.getLayers()) } start := time.Now() diff --git a/trie/triedb/pathdb/nodebuffer.go b/trie/triedb/pathdb/nodebuffer.go index 4a7d328b9afb..65e64badc644 100644 --- a/trie/triedb/pathdb/nodebuffer.go +++ b/trie/triedb/pathdb/nodebuffer.go @@ -18,6 +18,8 @@ package pathdb import ( "fmt" + "sync" + "sync/atomic" "time" "github.com/VictoriaMetrics/fastcache" @@ -29,14 +31,180 @@ import ( "github.com/ethereum/go-ethereum/trie/trienode" ) +var _ trienodebuffer = &asyncnodebuffer{} + +// asyncnodebuffer implement trienodebuffer interface, and aysnc the nodecache +// to disk. +type asyncnodebuffer struct { + mux sync.RWMutex + current *nodebuffer + background *nodebuffer +} + +// newAsyncNodeBuffer initializes the async node buffer with the provided nodes. +func newAsyncNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) *asyncnodebuffer { + return &asyncnodebuffer{ + current: newNodeBuffer(limit, nodes, layers), + background: newNodeBuffer(limit, nil, 0), + } +} + +// node retrieves the trie node with given node info. +func (a *asyncnodebuffer) node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error) { + a.mux.RLock() + defer a.mux.RUnlock() + + node, err := a.current.node(owner, path, hash) + if err != nil { + return nil, err + } + if node == nil { + return a.background.node(owner, path, hash) + } + return node, nil +} + +// commit merges the dirty nodes into the nodebuffer. This operation won't take +// the ownership of the nodes map which belongs to the bottom-most diff layer. +// It will just hold the node references from the given map which are safe to +// copy. +func (a *asyncnodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer { + a.mux.Lock() + defer a.mux.Unlock() + + if err := a.current.commit(nodes); err != nil { + log.Warn("Failed to commit trie nodes", "error", err) + } + return a +} + +// revert is the reverse operation of commit. It also merges the provided nodes +// into the nodebuffer, the difference is that the provided node set should +// revert the changes made by the last state transition. +func (a *asyncnodebuffer) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error { + a.mux.Lock() + defer a.mux.Unlock() + + newBuf, err := a.current.merge(a.background) + if err != nil { + log.Warn("[BUG] failed to merge node cache under revert async node buffer", "error", err) + return err + } + a.current = newBuf + a.background.reset() + return a.current.revert(db, nodes) +} + +// setSize is unsupported in asyncnodebuffer, due to the double buffer, blocking will occur. +func (a *asyncnodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { + a.mux.Lock() + defer a.mux.Unlock() + newBuf, err := a.current.merge(a.background) + if err != nil { + log.Warn("[BUG] failed to merge node cache under revert async node buffer", "error", err) + return err + } + a.current = newBuf + a.background.reset() + a.current.setSize(size, db, clean, id) + a.background.size = uint64(size) + return nil +} + +// reset cleans up the disk cache. +func (a *asyncnodebuffer) reset() { + a.mux.Lock() + defer a.mux.Unlock() + + a.current.reset() + a.background.reset() +} + +// empty returns an indicator if nodebuffer contains any state transition inside. +func (a *asyncnodebuffer) empty() bool { + a.mux.RLock() + defer a.mux.RUnlock() + + return a.current.empty() && a.background.empty() +} + +// flush persists the in-memory dirty trie node into the disk if the configured +// memory threshold is reached. Note, all data must be written atomically. +func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error { + a.mux.Lock() + defer a.mux.Unlock() + + if force { + for { + if atomic.LoadUint64(&a.background.immutable) == 1 { + time.Sleep(time.Duration(DefaultBackgroundFlushInterval) * time.Second) + log.Info("waiting background memory table flush to disk for force flush node buffer") + continue + } + atomic.StoreUint64(&a.current.immutable, 1) + return a.current.flush(db, clean, id, true) + } + } + + if a.current.size < a.current.limit { + return nil + } + + // background flush doing + if atomic.LoadUint64(&a.background.immutable) == 1 { + return nil + } + + atomic.StoreUint64(&a.current.immutable, 1) + a.current, a.background = a.background, a.current + + go func(persistId uint64) { + for { + err := a.background.flush(db, clean, persistId, true) + if err == nil { + log.Debug("succeed to flush background nodecahce to disk", "state_id", persistId) + return + } + log.Error("failed to flush background nodecahce to disk", "state_id", persistId, "error", err) + } + }(id) + return nil +} + +func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node { + a.mux.Lock() + defer a.mux.Unlock() + + cached, err := a.current.merge(a.background) + if err != nil { + log.Crit("[BUG] failed to merge nodecache under revert asyncnodebuffer", "error", err) + } + return cached.nodes +} + +func (a *asyncnodebuffer) getLayers() uint64 { + a.mux.RLock() + defer a.mux.RUnlock() + + return a.current.layers + a.background.layers +} + +func (a *asyncnodebuffer) getSize() (uint64, uint64) { + a.mux.RLock() + defer a.mux.RUnlock() + + return a.current.size, a.background.size +} + // nodebuffer is a collection of modified trie nodes to aggregate the disk // write. The content of the nodebuffer must be checked before diving into // disk (since it basically is not-yet-written data). type nodebuffer struct { - layers uint64 // The number of diff layers aggregated inside - size uint64 // The size of aggregated writes - limit uint64 // The maximum memory allowance in bytes - nodes map[common.Hash]map[string]*trienode.Node // The dirty node set, mapped by owner and path + layers uint64 // The number of diff layers aggregated inside + size uint64 // The size of aggregated writes + limit uint64 // The maximum memory allowance in bytes + nodes map[common.Hash]map[string]*trienode.Node // The dirty node set, mapped by owner and path + immutable uint64 // The flag equal 1, flush nodes to disk background } // newNodeBuffer initializes the node buffer with the provided nodes. @@ -51,10 +219,11 @@ func newNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.Node, l } } return &nodebuffer{ - layers: layers, - nodes: nodes, - size: size, - limit: uint64(limit), + layers: layers, + nodes: nodes, + size: size, + limit: uint64(limit), + immutable: 0, } } @@ -80,7 +249,11 @@ func (b *nodebuffer) node(owner common.Hash, path []byte, hash common.Hash) (*tr // the ownership of the nodes map which belongs to the bottom-most diff layer. // It will just hold the node references from the given map which are safe to // copy. -func (b *nodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) *nodebuffer { +func (b *nodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) error { + if atomic.LoadUint64(&b.immutable) == 1 { + return errWriteImmutable + } + var ( delta int64 overwrite int64 @@ -118,13 +291,17 @@ func (b *nodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) *no b.layers++ gcNodesMeter.Mark(overwrite) gcBytesMeter.Mark(overwriteSize) - return b + return nil } // revert is the reverse operation of commit. It also merges the provided nodes // into the nodebuffer, the difference is that the provided node set should // revert the changes made by the last state transition. func (b *nodebuffer) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error { + if atomic.LoadUint64(&b.immutable) == 1 { + return errRevertImmutable + } + // Short circuit if no embedded state transition to revert. if b.layers == 0 { return errStateUnrecoverable @@ -187,6 +364,7 @@ func (b *nodebuffer) updateSize(delta int64) { // reset cleans up the disk cache. func (b *nodebuffer) reset() { + atomic.StoreUint64(&b.immutable, 0) b.layers = 0 b.size = 0 b.nodes = make(map[common.Hash]map[string]*trienode.Node) @@ -200,13 +378,73 @@ func (b *nodebuffer) empty() bool { // setSize sets the buffer size to the provided number, and invokes a flush // operation if the current memory usage exceeds the new limit. func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { + if atomic.LoadUint64(&b.immutable) == 1 { + return errRevertImmutable + } + b.limit = uint64(size) return b.flush(db, clean, id, false) } +// merge returns a new nodebuffer instances that include `b` and `nb` nodes. +func (b *nodebuffer) merge(nb *nodebuffer) (*nodebuffer, error) { + if b == nil && nb == nil { + return nil, nil + } + if b == nil || b.empty() { + res := copyNodeBuffer(nb) + atomic.StoreUint64(&res.immutable, 0) + return nb, nil + } + if nb == nil || nb.empty() { + res := copyNodeBuffer(b) + atomic.StoreUint64(&res.immutable, 0) + return b, nil + } + if atomic.LoadUint64(&b.immutable) == atomic.LoadUint64(&nb.immutable) { + return nil, errIncompatibleMerge + } + + var ( + immutable *nodebuffer + mutable *nodebuffer + ) + if atomic.LoadUint64(&b.immutable) == 1 { + immutable = b + mutable = nb + } else { + immutable = nb + mutable = b + } + + nodes := make(map[common.Hash]map[string]*trienode.Node) + for acc, subTree := range immutable.nodes { + if _, ok := nodes[acc]; !ok { + nodes[acc] = make(map[string]*trienode.Node) + } + for path, node := range subTree { + nodes[acc][path] = node + } + } + + for acc, subTree := range mutable.nodes { + if _, ok := nodes[acc]; !ok { + nodes[acc] = make(map[string]*trienode.Node) + } + for path, node := range subTree { + nodes[acc][path] = node + } + } + return newNodeBuffer(int(mutable.limit), nodes, immutable.layers+mutable.layers), nil +} + // flush persists the in-memory dirty trie node into the disk if the configured // memory threshold is reached. Note, all data must be written atomically. func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error { + if atomic.LoadUint64(&b.immutable) == 0 { + return errFlushMutable + } + if b.size <= b.limit && !force { return nil } @@ -273,3 +511,21 @@ func cacheKey(owner common.Hash, path []byte) []byte { } return append(owner.Bytes(), path...) } + +// copyNodeBuffer returns a new instance nodebuffer that copy the data of 'n'. +func copyNodeBuffer(n *nodebuffer) *nodebuffer { + if n == nil { + return nil + } + nodes := make(map[common.Hash]map[string]*trienode.Node) + for acc, subTree := range n.nodes { + if _, ok := nodes[acc]; !ok { + nodes[acc] = make(map[string]*trienode.Node) + } + for path, node := range subTree { + nodes[acc][path] = node + } + } + nb := newNodeBuffer(int(n.limit), nodes, n.layers) + return nb +} From 8677240ff81ef03833a8fdc898d900b23d83cdaa Mon Sep 17 00:00:00 2001 From: joeylichang Date: Thu, 23 Nov 2023 12:08:09 +0800 Subject: [PATCH 2/2] pathdb: add more docs for asyncnodebuffer --- trie/database.go | 8 +- trie/triedb/pathdb/disklayer.go | 8 +- trie/triedb/pathdb/errors.go | 4 +- trie/triedb/pathdb/nodebuffer.go | 127 +++++++++++++++++-------------- 4 files changed, 80 insertions(+), 67 deletions(-) diff --git a/trie/database.go b/trie/database.go index 6929b9c0da40..0db779f38ca2 100644 --- a/trie/database.go +++ b/trie/database.go @@ -148,9 +148,11 @@ func (db *Database) Commit(root common.Hash, report bool) error { return db.backend.Commit(root, report) } -// Size returns the storage size of diff layer nodes above the persistent disk -// layer, the dirty nodes buffered within the disk layer, and the size of cached -// preimages. +// Size returns the sizes of: +// - the diff layer nodes above the persistent disk layer, +// - the mutable dirty nodes buffered within the disk layer, +// - the immutable nodes in the disk layer, +// - the cached preimages. func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize, common.StorageSize) { var ( diffs, nodes, nodesImmutable common.StorageSize diff --git a/trie/triedb/pathdb/disklayer.go b/trie/triedb/pathdb/disklayer.go index c6d900888288..bfa506fd3e3f 100644 --- a/trie/triedb/pathdb/disklayer.go +++ b/trie/triedb/pathdb/disklayer.go @@ -64,7 +64,9 @@ type trienodebuffer interface { // empty returns an indicator if trienodebuffer contains any state transition inside. empty() bool - // getSize return the trienodebuffer used size. + // getSize return the trienodebuffer used size, includes: + // - the mutable dirty nodes buffered within the disk layer, + // - the immutable nodes in the disk layer. getSize() (uint64, uint64) // getAllNodes return all the trie nodes are cached in trienodebuffer. @@ -336,7 +338,9 @@ func (dl *diskLayer) setBufferSize(size int) error { return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id) } -// size returns the approximate size of cached nodes in the disk layer. +// size returns the approximate size of: +// - the mutable dirty nodes buffered within the disk layer, +// - the immutable nodes in the disk layer. func (dl *diskLayer) size() (common.StorageSize, common.StorageSize) { dl.lock.RLock() defer dl.lock.RUnlock() diff --git a/trie/triedb/pathdb/errors.go b/trie/triedb/pathdb/errors.go index d8a580531789..f78c82af4e11 100644 --- a/trie/triedb/pathdb/errors.go +++ b/trie/triedb/pathdb/errors.go @@ -51,11 +51,9 @@ var ( errUnexpectedNode = errors.New("unexpected node") // errWriteImmutable is returned if write to background immutable nodebuffer - // under asyncnodebuffer errWriteImmutable = errors.New("write immutable node buffer") - // errFlushMutable is returned if flush the background mutable nodebuffer - // to disk, under asyncnodebuffer + // errFlushMutable is returned if flush the background mutable nodebuffer to disk errFlushMutable = errors.New("flush mutable node buffer") // errRevertImmutable is returned if revert the background immutable nodebuffer diff --git a/trie/triedb/pathdb/nodebuffer.go b/trie/triedb/pathdb/nodebuffer.go index 65e64badc644..3293fcebbd9d 100644 --- a/trie/triedb/pathdb/nodebuffer.go +++ b/trie/triedb/pathdb/nodebuffer.go @@ -33,12 +33,15 @@ import ( var _ trienodebuffer = &asyncnodebuffer{} -// asyncnodebuffer implement trienodebuffer interface, and aysnc the nodecache -// to disk. +// asyncnodebuffer implement trienodebuffer interface, and async flush nodebuffer +// to disk. It includes two nodebuffer, the mutable and immutable nodebuffer. The +// mutable nodebuffer that up to the size limit switches the immutable, and the new +// mutable nodebuffer can continue to be committed nodes. Retrieves node will access +// mutable nodebuffer firstly, then immutable nodebuffer. type asyncnodebuffer struct { - mux sync.RWMutex - current *nodebuffer - background *nodebuffer + mu sync.RWMutex // Lock used to protect current and background switch + current *nodebuffer // mutable nodebuffer is used to write and read nodes + background *nodebuffer // immutable nodebuffer is readonly and async flush to disk } // newAsyncNodeBuffer initializes the async node buffer with the provided nodes. @@ -49,10 +52,10 @@ func newAsyncNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.No } } -// node retrieves the trie node with given node info. +// node retrieves the trie node with given node info, retrieves the current, then background. func (a *asyncnodebuffer) node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error) { - a.mux.RLock() - defer a.mux.RUnlock() + a.mu.RLock() + defer a.mu.RUnlock() node, err := a.current.node(owner, path, hash) if err != nil { @@ -64,13 +67,13 @@ func (a *asyncnodebuffer) node(owner common.Hash, path []byte, hash common.Hash) return node, nil } -// commit merges the dirty nodes into the nodebuffer. This operation won't take -// the ownership of the nodes map which belongs to the bottom-most diff layer. -// It will just hold the node references from the given map which are safe to -// copy. +// commit merges the dirty nodes into the current nodebuffer. This operation +// won't take the ownership of the nodes map which belongs to the bottom-most +// diff layer. It will just hold the node references from the given map which +// are safe to copy. func (a *asyncnodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer { - a.mux.Lock() - defer a.mux.Unlock() + a.mu.Lock() + defer a.mu.Unlock() if err := a.current.commit(nodes); err != nil { log.Warn("Failed to commit trie nodes", "error", err) @@ -82,8 +85,8 @@ func (a *asyncnodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node // into the nodebuffer, the difference is that the provided node set should // revert the changes made by the last state transition. func (a *asyncnodebuffer) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error { - a.mux.Lock() - defer a.mux.Unlock() + a.mu.Lock() + defer a.mu.Unlock() newBuf, err := a.current.merge(a.background) if err != nil { @@ -95,10 +98,10 @@ func (a *asyncnodebuffer) revert(db ethdb.KeyValueReader, nodes map[common.Hash] return a.current.revert(db, nodes) } -// setSize is unsupported in asyncnodebuffer, due to the double buffer, blocking will occur. +// setSize sets the nodebuffer size limit. func (a *asyncnodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { - a.mux.Lock() - defer a.mux.Unlock() + a.mu.Lock() + defer a.mu.Unlock() newBuf, err := a.current.merge(a.background) if err != nil { log.Warn("[BUG] failed to merge node cache under revert async node buffer", "error", err) @@ -113,8 +116,8 @@ func (a *asyncnodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastc // reset cleans up the disk cache. func (a *asyncnodebuffer) reset() { - a.mux.Lock() - defer a.mux.Unlock() + a.mu.Lock() + defer a.mu.Unlock() a.current.reset() a.background.reset() @@ -122,26 +125,28 @@ func (a *asyncnodebuffer) reset() { // empty returns an indicator if nodebuffer contains any state transition inside. func (a *asyncnodebuffer) empty() bool { - a.mux.RLock() - defer a.mux.RUnlock() + a.mu.RLock() + defer a.mu.RUnlock() return a.current.empty() && a.background.empty() } -// flush persists the in-memory dirty trie node into the disk if the configured -// memory threshold is reached. Note, all data must be written atomically. +// flush persists the immutable dirty trie node into the disk. If the configured +// memory threshold is reached, switch the mutable nodebuffer to immutable, if the +// previous immutable nodebuffer flushing to disk immediately return. Note, all +// data belongs the same nodebuffer must be written atomically. func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error { - a.mux.Lock() - defer a.mux.Unlock() + a.mu.Lock() + defer a.mu.Unlock() if force { for { - if atomic.LoadUint64(&a.background.immutable) == 1 { + if a.background.immutable.Load() { time.Sleep(time.Duration(DefaultBackgroundFlushInterval) * time.Second) log.Info("waiting background memory table flush to disk for force flush node buffer") continue } - atomic.StoreUint64(&a.current.immutable, 1) + a.current.immutable.Store(true) return a.current.flush(db, clean, id, true) } } @@ -151,11 +156,11 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, } // background flush doing - if atomic.LoadUint64(&a.background.immutable) == 1 { + if a.background.immutable.Load() { return nil } - - atomic.StoreUint64(&a.current.immutable, 1) + // immutable the current nodebuffer, ready for switching + a.current.immutable.Store(true) a.current, a.background = a.background, a.current go func(persistId uint64) { @@ -172,8 +177,8 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, } func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node { - a.mux.Lock() - defer a.mux.Unlock() + a.mu.Lock() + defer a.mu.Unlock() cached, err := a.current.merge(a.background) if err != nil { @@ -183,15 +188,15 @@ func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Nod } func (a *asyncnodebuffer) getLayers() uint64 { - a.mux.RLock() - defer a.mux.RUnlock() + a.mu.RLock() + defer a.mu.RUnlock() return a.current.layers + a.background.layers } func (a *asyncnodebuffer) getSize() (uint64, uint64) { - a.mux.RLock() - defer a.mux.RUnlock() + a.mu.RLock() + defer a.mu.RUnlock() return a.current.size, a.background.size } @@ -200,11 +205,13 @@ func (a *asyncnodebuffer) getSize() (uint64, uint64) { // write. The content of the nodebuffer must be checked before diving into // disk (since it basically is not-yet-written data). type nodebuffer struct { - layers uint64 // The number of diff layers aggregated inside - size uint64 // The size of aggregated writes - limit uint64 // The maximum memory allowance in bytes - nodes map[common.Hash]map[string]*trienode.Node // The dirty node set, mapped by owner and path - immutable uint64 // The flag equal 1, flush nodes to disk background + layers uint64 // The number of diff layers aggregated inside + size uint64 // The size of aggregated writes + limit uint64 // The maximum memory allowance in bytes + nodes map[common.Hash]map[string]*trienode.Node // The dirty node set, mapped by owner and path + // If this is set to true, then this nodebuffer is immutable and any write-operations to it will exit with error. + // If this is set to true, then some other thread is performing a flush in the background, and thus nonblock the write/read-operations. + immutable atomic.Bool // The flag equal true, readonly wait to flush nodes to disk background } // newNodeBuffer initializes the node buffer with the provided nodes. @@ -218,13 +225,14 @@ func newNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.Node, l size += uint64(len(n.Blob) + len(path)) } } - return &nodebuffer{ - layers: layers, - nodes: nodes, - size: size, - limit: uint64(limit), - immutable: 0, + nb := &nodebuffer{ + layers: layers, + nodes: nodes, + size: size, + limit: uint64(limit), } + nb.immutable.Store(false) + return nb } // node retrieves the trie node with given node info. @@ -250,7 +258,7 @@ func (b *nodebuffer) node(owner common.Hash, path []byte, hash common.Hash) (*tr // It will just hold the node references from the given map which are safe to // copy. func (b *nodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) error { - if atomic.LoadUint64(&b.immutable) == 1 { + if b.immutable.Load() { return errWriteImmutable } @@ -298,7 +306,7 @@ func (b *nodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) err // into the nodebuffer, the difference is that the provided node set should // revert the changes made by the last state transition. func (b *nodebuffer) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error { - if atomic.LoadUint64(&b.immutable) == 1 { + if b.immutable.Load() { return errRevertImmutable } @@ -364,7 +372,7 @@ func (b *nodebuffer) updateSize(delta int64) { // reset cleans up the disk cache. func (b *nodebuffer) reset() { - atomic.StoreUint64(&b.immutable, 0) + b.immutable.Store(false) b.layers = 0 b.size = 0 b.nodes = make(map[common.Hash]map[string]*trienode.Node) @@ -378,8 +386,8 @@ func (b *nodebuffer) empty() bool { // setSize sets the buffer size to the provided number, and invokes a flush // operation if the current memory usage exceeds the new limit. func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { - if atomic.LoadUint64(&b.immutable) == 1 { - return errRevertImmutable + if b.immutable.Load() { + return errWriteImmutable } b.limit = uint64(size) @@ -393,15 +401,15 @@ func (b *nodebuffer) merge(nb *nodebuffer) (*nodebuffer, error) { } if b == nil || b.empty() { res := copyNodeBuffer(nb) - atomic.StoreUint64(&res.immutable, 0) + res.immutable.Store(false) return nb, nil } if nb == nil || nb.empty() { res := copyNodeBuffer(b) - atomic.StoreUint64(&res.immutable, 0) + res.immutable.Store(false) return b, nil } - if atomic.LoadUint64(&b.immutable) == atomic.LoadUint64(&nb.immutable) { + if b.immutable.Load() == nb.immutable.Load() { return nil, errIncompatibleMerge } @@ -409,7 +417,7 @@ func (b *nodebuffer) merge(nb *nodebuffer) (*nodebuffer, error) { immutable *nodebuffer mutable *nodebuffer ) - if atomic.LoadUint64(&b.immutable) == 1 { + if b.immutable.Load() { immutable = b mutable = nb } else { @@ -441,7 +449,7 @@ func (b *nodebuffer) merge(nb *nodebuffer) (*nodebuffer, error) { // flush persists the in-memory dirty trie node into the disk if the configured // memory threshold is reached. Note, all data must be written atomically. func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error { - if atomic.LoadUint64(&b.immutable) == 0 { + if !b.immutable.Load() { return errFlushMutable } @@ -527,5 +535,6 @@ func copyNodeBuffer(n *nodebuffer) *nodebuffer { } } nb := newNodeBuffer(int(n.limit), nodes, n.layers) + nb.immutable.Store(n.immutable.Load()) return nb }