Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: speedup pbss trienode read #2508

Merged
merged 23 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,18 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Optimize memory distribution by reallocating surplus allowance from the
// dirty cache to the clean cache.
if config.StateScheme == rawdb.PathScheme && config.TrieDirtyCache > pathdb.MaxDirtyBufferSize/1024/1024 {
log.Info("Capped dirty cache size", "provided", common.StorageSize(config.TrieDirtyCache)*1024*1024, "adjusted", common.StorageSize(pathdb.MaxDirtyBufferSize))
fynnss marked this conversation as resolved.
Show resolved Hide resolved
log.Info("Clean cache size", "provided", common.StorageSize(config.TrieCleanCache)*1024*1024)
log.Info("Capped dirty cache size", "provided", common.StorageSize(config.TrieDirtyCache)*1024*1024,
"adjusted", common.StorageSize(pathdb.MaxDirtyBufferSize))
log.Info("Clean cache size", "provided", common.StorageSize(config.TrieCleanCache)*1024*1024,
"adjusted", common.StorageSize(config.TrieCleanCache+config.TrieDirtyCache-pathdb.MaxDirtyBufferSize/1024/1024)*1024*1024)
config.TrieCleanCache += config.TrieDirtyCache - pathdb.MaxDirtyBufferSize/1024/1024
config.TrieDirtyCache = pathdb.MaxDirtyBufferSize / 1024 / 1024
}
log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024)

log.Info("Allocated memory caches",
"state_scheme", config.StateScheme,
"trie_clean_cache", common.StorageSize(config.TrieCleanCache)*1024*1024,
"trie_dirty_cache", common.StorageSize(config.TrieDirtyCache)*1024*1024,
"snapshot_cache", common.StorageSize(config.SnapshotCache)*1024*1024)
// Try to recover offline state pruning only in hash-based.
if config.StateScheme == rawdb.HashScheme {
if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb, config.TriesInMemory); err != nil {
Expand Down
149 changes: 149 additions & 0 deletions triedb/pathdb/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,106 @@ import (
"github.com/ethereum/go-ethereum/trie/triestate"
)

type RefTrieNode struct {
refCount uint32
node *trienode.Node
}

type HashNodeCache struct {
lock sync.RWMutex
cache map[common.Hash]*RefTrieNode
}

func (h *HashNodeCache) length() int {
if h == nil {
return 0
}
h.lock.RLock()
defer h.lock.RUnlock()
return len(h.cache)
}

func (h *HashNodeCache) set(hash common.Hash, node *trienode.Node) {
if h == nil {
return
}
h.lock.Lock()
defer h.lock.Unlock()
if n, ok := h.cache[hash]; ok {
n.refCount++
} else {
h.cache[hash] = &RefTrieNode{1, node}
}
}

func (h *HashNodeCache) Get(hash common.Hash) *trienode.Node {
if h == nil {
return nil
}
h.lock.RLock()
defer h.lock.RUnlock()
if n, ok := h.cache[hash]; ok {
return n.node
}
return nil
}

func (h *HashNodeCache) del(hash common.Hash) {
if h == nil {
return
}
h.lock.Lock()
defer h.lock.Unlock()
n, ok := h.cache[hash]
if !ok {
return
}
if n.refCount > 0 {
n.refCount--
}
if n.refCount == 0 {
delete(h.cache, hash)
}
}

func (h *HashNodeCache) Add(ly layer) {
if h == nil {
return
}
dl, ok := ly.(*diffLayer)
if !ok {
return
}
beforeAdd := h.length()
for _, subset := range dl.nodes {
for _, node := range subset {
h.set(node.Hash, node)
}
}
diffHashCacheLengthGauge.Update(int64(h.length()))
log.Debug("Add difflayer to hash map", "root", ly.rootHash(), "block_number", dl.block, "map_len", h.length(), "add_delta", h.length()-beforeAdd)
}

func (h *HashNodeCache) Remove(ly layer) {
if h == nil {
return
}
dl, ok := ly.(*diffLayer)
if !ok {
return
}
go func() {
beforeDel := h.length()
for _, subset := range dl.nodes {
for _, node := range subset {
h.del(node.Hash)
}
}
diffHashCacheLengthGauge.Update(int64(h.length()))
log.Debug("Remove difflayer from hash map", "root", ly.rootHash(), "block_number", dl.block, "map_len", h.length(), "del_delta", beforeDel-h.length())
}()
}

// diffLayer represents a collection of modifications made to the in-memory tries
// along with associated state changes after running a block on top.
//
Expand All @@ -39,7 +139,10 @@ type diffLayer struct {
nodes map[common.Hash]map[string]*trienode.Node // Cached trie nodes indexed by owner and path
states *triestate.Set // Associated state change set for building history
memory uint64 // Approximate guess as to how much memory we use
cache *HashNodeCache // trienode cache by hash key. cache is immutable, but cache's item can be add/del.

// mutables
origin *diskLayer // The current difflayer corresponds to the underlying disklayer and is updated during cap.
parent layer // Parent layer modified by this one, never nil, **can be changed**
lock sync.RWMutex // Lock used to protect parent
}
Expand All @@ -58,6 +161,20 @@ func newDiffLayer(parent layer, root common.Hash, id uint64, block uint64, nodes
states: states,
parent: parent,
}

switch l := parent.(type) {
case *diskLayer:
dl.origin = l
dl.cache = &HashNodeCache{
cache: make(map[common.Hash]*RefTrieNode),
}
case *diffLayer:
dl.origin = l.originDiskLayer()
dl.cache = l.cache
default:
panic("unknown parent type")
}

for _, subset := range nodes {
for path, n := range subset {
dl.memory += uint64(n.Size() + len(path))
Expand All @@ -75,6 +192,12 @@ func newDiffLayer(parent layer, root common.Hash, id uint64, block uint64, nodes
return dl
}

func (dl *diffLayer) originDiskLayer() *diskLayer {
dl.lock.RLock()
defer dl.lock.RUnlock()
return dl.origin
}

// rootHash implements the layer interface, returning the root hash of
// corresponding state.
func (dl *diffLayer) rootHash() common.Hash {
Expand Down Expand Up @@ -133,6 +256,32 @@ func (dl *diffLayer) node(owner common.Hash, path []byte, hash common.Hash, dept
// Node implements the layer interface, retrieving the trie node blob with the
// provided node information. No error will be returned if the node is not found.
func (dl *diffLayer) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) {
if n := dl.cache.Get(hash); n != nil {
// The query from the hash map is fastpath,
// avoiding recursive query of 128 difflayers.
diffHashCacheHitMeter.Mark(1)
diffHashCacheReadMeter.Mark(int64(len(n.Blob)))
return n.Blob, nil
}
diffHashCacheMissMeter.Mark(1)

persistLayer := dl.originDiskLayer()
if persistLayer != nil {
blob, err := persistLayer.Node(owner, path, hash)
if err != nil {
// This is a bad case with a very low probability.
// r/w the difflayer cache and r/w the disklayer are not in the same lock,
// so in extreme cases, both reading the difflayer cache and reading the disklayer may fail, eg, disklayer is stale.
// In this case, fallback to the original 128-layer recursive difflayer query path.
diffHashCacheSlowPathMeter.Mark(1)
log.Debug("Retry difflayer due to query origin failed", "owner", owner, "path", path, "hash", hash.String(), "error", err)
return dl.node(owner, path, hash, 0)
} else { // This is the fastpath.
return blob, nil
}
}
diffHashCacheSlowPathMeter.Mark(1)
log.Debug("Retry difflayer due to origin is nil", "owner", owner, "path", path, "hash", hash.String())
return dl.node(owner, path, hash, 0)
}

Expand Down
3 changes: 3 additions & 0 deletions triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
}
log.Debug("Pruned state history", "items", pruned, "tailid", oldest)
}

// The bottom has been eaten by disklayer, releasing the hash cache of bottom difflayer.
bottom.cache.Remove(bottom)
return ndl, nil
}

Expand Down
51 changes: 51 additions & 0 deletions triedb/pathdb/layertree.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,20 @@ func (tree *layerTree) reset(head layer) {
tree.lock.Lock()
defer tree.lock.Unlock()

for _, ly := range tree.layers {
if dl, ok := ly.(*diffLayer); ok {
// Clean up the hash cache of difflayers due to reset.
dl.cache.Remove(dl)
}
}

var layers = make(map[common.Hash]layer)
for head != nil {
layers[head.rootHash()] = head
if dl, ok := head.(*diffLayer); ok {
// Add the hash cache of difflayers due to reset.
dl.cache.Add(dl)
}
head = head.parentLayer()
}
tree.layers = layers
Expand Down Expand Up @@ -98,12 +109,19 @@ func (tree *layerTree) add(root common.Hash, parentRoot common.Hash, block uint6
if root == parentRoot {
return errors.New("layer cycle")
}
if tree.get(root) != nil {
log.Info("Skip add repeated difflayer", "root", root.String(), "block_id", block)
return nil
}
parent := tree.get(parentRoot)
if parent == nil {
return fmt.Errorf("triedb parent [%#x] layer missing", parentRoot)
}
l := parent.update(root, parent.stateID()+1, block, nodes.Flatten(), states)

// Before adding layertree, update the hash cache.
l.cache.Add(l)

tree.lock.Lock()
tree.layers[l.rootHash()] = l
tree.lock.Unlock()
Expand Down Expand Up @@ -132,8 +150,15 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
if err != nil {
return err
}
for _, ly := range tree.layers {
if dl, ok := ly.(*diffLayer); ok {
dl.cache.Remove(dl)
log.Debug("Cleanup difflayer hash cache due to cap all", "diff_root", dl.root.String(), "diff_block_number", dl.block)
}
}
// Replace the entire layer tree with the flat base
tree.layers = map[common.Hash]layer{base.rootHash(): base}
log.Debug("Cap all difflayers to disklayer", "disk_root", base.rootHash().String())
return nil
}
// Dive until we run out of layers or reach the persistent database
Expand All @@ -146,6 +171,7 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
return nil
}
}
var persisted *diskLayer
// We're out of layers, flatten anything below, stopping if it's the disk or if
// the memory limit is not yet exceeded.
switch parent := diff.parentLayer().(type) {
Expand All @@ -166,6 +192,7 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
diff.parent = base

diff.lock.Unlock()
persisted = base.(*diskLayer)

default:
panic(fmt.Sprintf("unknown data layer in triedb: %T", parent))
Expand All @@ -180,6 +207,13 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
}
var remove func(root common.Hash)
remove = func(root common.Hash) {
if df, exist := tree.layers[root]; exist {
if dl, ok := df.(*diffLayer); ok {
// Clean up the hash cache of the child difflayer corresponding to the stale parent, include the re-org case.
dl.cache.Remove(dl)
log.Debug("Cleanup difflayer hash cache due to reorg", "diff_root", dl.root.String(), "diff_block_number", dl.block)
}
}
delete(tree.layers, root)
for _, child := range children[root] {
remove(child)
Expand All @@ -189,8 +223,25 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
for root, layer := range tree.layers {
if dl, ok := layer.(*diskLayer); ok && dl.isStale() {
remove(root)
log.Debug("Remove stale the disklayer", "disk_root", dl.root.String())
}
}

if persisted != nil {
var updateOriginFunc func(root common.Hash)
updateOriginFunc = func(root common.Hash) {
if diff, ok := tree.layers[root].(*diffLayer); ok {
diff.lock.Lock()
diff.origin = persisted
diff.lock.Unlock()
}
for _, child := range children[root] {
updateOriginFunc(child)
}
}
updateOriginFunc(persisted.root)
}

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions triedb/pathdb/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,10 @@ var (
historyBuildTimeMeter = metrics.NewRegisteredTimer("pathdb/history/time", nil)
historyDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/data", nil)
historyIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/index", nil)

diffHashCacheHitMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/hit", nil)
diffHashCacheReadMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/read", nil)
diffHashCacheMissMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/miss", nil)
diffHashCacheSlowPathMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/slowpath", nil)
diffHashCacheLengthGauge = metrics.NewRegisteredGauge("pathdb/difflayer/hashcache/size", nil)
)
Loading