Skip to content

Commit

Permalink
eth, trie/triedb/pathdb: pbss patches (#1955)
Browse files Browse the repository at this point in the history
* fix: use the top root hash for rewinding under path schema

* feat: add async flush nodebuffer in path schema

* chore: add prun-block param suffix check

* fix: code review comments
  • Loading branch information
joeylichang authored Nov 6, 2023
1 parent 6286247 commit 53559fc
Show file tree
Hide file tree
Showing 21 changed files with 709 additions and 76 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ var (
utils.TransactionHistoryFlag,
utils.StateSchemeFlag,
utils.StateHistoryFlag,
utils.PathDBSyncFlag,
utils.LightServeFlag,
utils.LightIngressFlag,
utils.LightEgressFlag,
Expand Down
17 changes: 16 additions & 1 deletion cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"time"

"github.com/prometheus/tsdb/fileutil"
Expand Down Expand Up @@ -360,7 +361,21 @@ func pruneBlock(ctx *cli.Context) error {
if path == "" {
return errors.New("prune failed, did not specify the AncientPath")
}
newAncientPath = filepath.Join(path, "ancient_back")
newVersionPath := false
files, err := os.ReadDir(oldAncientPath)
if err != nil {
return err
}
for _, file := range files {
if file.IsDir() && file.Name() == "chain" {
newVersionPath = true
}
}
if newVersionPath && !strings.HasSuffix(oldAncientPath, "geth/chaindata/ancient/chain") {
log.Error("datadir.ancient subdirectory incorrect", "got path", oldAncientPath, "want subdirectory", "geth/chaindata/ancient/chain/")
return errors.New("datadir.ancient subdirectory incorrect")
}
newAncientPath = filepath.Join(path, "chain_back")

blockpruner = pruner.NewBlockPruner(chaindb, stack, oldAncientPath, newAncientPath, blockAmountReserved)

Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ var (
Value: rawdb.HashScheme,
Category: flags.StateCategory,
}
PathDBSyncFlag = &cli.BoolFlag{
Name: "pathdb.sync",
Usage: "sync flush nodes cache to disk in path schema",
Value: false,
Category: flags.StateCategory,
}
StateHistoryFlag = &cli.Uint64Flag{
Name: "history.state",
Usage: "Number of recent blocks to retain state history for (default = 90,000 blocks, 0 = entire chain)",
Expand Down Expand Up @@ -1951,6 +1957,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
log.Warn("The flag --txlookuplimit is deprecated and will be removed, please use --history.transactions")
cfg.TransactionHistory = ctx.Uint64(TransactionHistoryFlag.Name)
}
if ctx.IsSet(PathDBSyncFlag.Name) {
cfg.PathSyncFlush = true
}
if ctx.String(GCModeFlag.Name) == "archive" && cfg.TransactionHistory != 0 {
cfg.TransactionHistory = 0
log.Warn("Disabled transaction unindexing for archive node")
Expand Down
53 changes: 22 additions & 31 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type CacheConfig struct {
NoTries bool // Insecure settings. Do not have any tries in databases if enabled.
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top
PathSyncFlush bool // Whether sync flush the trienodebuffer of pathdb to disk.

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
Expand All @@ -174,6 +175,7 @@ func (c *CacheConfig) triedbConfig() *trie.Config {
}
if c.StateScheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
SyncFlush: c.PathSyncFlush,
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
Expand Down Expand Up @@ -396,11 +398,15 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
} else if bc.triedb.Scheme() == rawdb.PathScheme {
_, diskRoot = rawdb.ReadAccountTrieNode(bc.db, nil)
}
if bc.triedb.Scheme() == rawdb.PathScheme {
recoverable, _ := bc.triedb.Recoverable(diskRoot)
if !bc.HasState(diskRoot) && !recoverable {
diskRoot = bc.triedb.Head()
}
}
if diskRoot != (common.Hash{}) {
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "snaproot", diskRoot)
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "diskRoot", diskRoot)

snapDisk, err := bc.setHeadBeyondRoot(head.Number.Uint64(), 0, diskRoot, true)
if err != nil {
Expand Down Expand Up @@ -689,18 +695,18 @@ func (bc *BlockChain) loadLastState() error {
blockTd = bc.GetTd(headBlock.Hash(), headBlock.NumberU64())
)
if headHeader.Hash() != headBlock.Hash() {
log.Info("Loaded most recent local header", "number", headHeader.Number, "hash", headHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(headHeader.Time), 0)))
log.Info("Loaded most recent local header", "number", headHeader.Number, "hash", headHeader.Hash(), "hash", headHeader.Root, "td", headerTd, "age", common.PrettyAge(time.Unix(int64(headHeader.Time), 0)))
}
log.Info("Loaded most recent local block", "number", headBlock.Number(), "hash", headBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(headBlock.Time()), 0)))
log.Info("Loaded most recent local block", "number", headBlock.Number(), "hash", headBlock.Hash(), "root", headBlock.Root(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(headBlock.Time()), 0)))
if headBlock.Hash() != currentSnapBlock.Hash() {
snapTd := bc.GetTd(currentSnapBlock.Hash(), currentSnapBlock.Number.Uint64())
log.Info("Loaded most recent local snap block", "number", currentSnapBlock.Number, "hash", currentSnapBlock.Hash(), "td", snapTd, "age", common.PrettyAge(time.Unix(int64(currentSnapBlock.Time), 0)))
log.Info("Loaded most recent local snap block", "number", currentSnapBlock.Number, "hash", currentSnapBlock.Hash(), "root", currentSnapBlock.Root, "td", snapTd, "age", common.PrettyAge(time.Unix(int64(currentSnapBlock.Time), 0)))
}
if posa, ok := bc.engine.(consensus.PoSA); ok {
if currentFinalizedHeader := posa.GetFinalizedHeader(bc, headHeader); currentFinalizedHeader != nil {
if currentFinalizedBlock := bc.GetBlockByHash(currentFinalizedHeader.Hash()); currentFinalizedBlock != nil {
finalTd := bc.GetTd(currentFinalizedBlock.Hash(), currentFinalizedBlock.NumberU64())
log.Info("Loaded most recent local finalized block", "number", currentFinalizedBlock.Number(), "hash", currentFinalizedBlock.Hash(), "td", finalTd, "age", common.PrettyAge(time.Unix(int64(currentFinalizedBlock.Time()), 0)))
log.Info("Loaded most recent local finalized block", "number", currentFinalizedBlock.Number(), "hash", currentFinalizedBlock.Hash(), "root", currentFinalizedBlock.Root(), "td", finalTd, "age", common.PrettyAge(time.Unix(int64(currentFinalizedBlock.Time()), 0)))
}
}
}
Expand Down Expand Up @@ -802,6 +808,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha

// resetState resets the persistent state to genesis if it's not available.
resetState := func() {
log.Info("Reset to block with genesis state", "number", bc.genesisBlock.NumberU64(), "hash", bc.genesisBlock.Hash())
// Short circuit if the genesis state is already present.
if bc.HasState(bc.genesisBlock.Root()) {
return
Expand All @@ -825,7 +832,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
// chain reparation mechanism without deleting any data!
if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() <= currentBlock.Number.Uint64() {
newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
lastBlockNum := header.Number.Uint64()
if newHeadBlock == nil {
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock
Expand All @@ -835,10 +841,8 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
// keeping rewinding until we exceed the optional threshold
// root hash
beyondRoot := (root == common.Hash{}) // Flag whether we're beyond the requested root (no root, always true)
enoughBeyondCount := false
beyondCount := 0

for {
beyondCount++
// If a root threshold was requested but not yet crossed, check
if root != (common.Hash{}) && !beyondRoot && newHeadBlock.Root() == root {
beyondRoot, rootNumber = true, newHeadBlock.NumberU64()
Expand All @@ -854,24 +858,11 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
log.Error("Missing block in the middle, aiming genesis", "number", newHeadBlock.NumberU64()-1, "hash", newHeadBlock.ParentHash())
newHeadBlock = bc.genesisBlock
} else {
log.Trace("Rewind passed pivot, aiming genesis", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "pivot", *pivot)
log.Info("Rewind passed pivot, aiming genesis", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "pivot", *pivot)
newHeadBlock = bc.genesisBlock
}
}
if beyondRoot || (enoughBeyondCount && root != common.Hash{}) || newHeadBlock.NumberU64() == 0 {
if enoughBeyondCount && (root != common.Hash{}) && rootNumber == 0 {
for {
lastBlockNum++
block := bc.GetBlockByNumber(lastBlockNum)
if block == nil {
break
}
if block.Root() == root {
rootNumber = block.NumberU64()
break
}
}
}
if beyondRoot || newHeadBlock.NumberU64() == 0 {
if newHeadBlock.NumberU64() == 0 {
resetState()
} else if !bc.HasState(newHeadBlock.Root()) {
Expand Down Expand Up @@ -1215,7 +1206,7 @@ func (bc *BlockChain) Stop() {
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem())
}
if size, _ := triedb.Size(); size != 0 {
if _, size, _, _ := triedb.Size(); size != 0 {
log.Error("Dangling trie nodes after full cleanup")
}
}
Expand Down Expand Up @@ -1619,8 +1610,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
if current := block.NumberU64(); current > bc.triesInMemory {
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
nodes, imgs = triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
_, nodes, _, imgs = triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
triedb.Cap(limit - ethdb.IdealBatchSize)
Expand Down Expand Up @@ -2104,8 +2095,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
stats.processed++
stats.usedGas += usedGas

dirty, _ := bc.triedb.Size()
stats.report(chain, it.index, dirty, setHead)
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size()
stats.report(chain, it.index, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead)

if !setHead {
// After merge we expect few side chains. Simply count
Expand Down
10 changes: 8 additions & 2 deletions core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const statsReportLimit = 8 * time.Second

// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize, setHead bool) {
func (st *insertStats) report(chain []*types.Block, index int, trieDiffNodes, trieBufNodes, trieImmutableBufNodes common.StorageSize, setHead bool) {
// Fetch the timings for the batch
var (
now = mclock.Now()
Expand All @@ -63,7 +63,13 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor
if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
context = append(context, []interface{}{"dirty", dirty}...)
if trieDiffNodes != 0 { // pathdb
context = append(context, []interface{}{"triediffs", trieDiffNodes}...)
context = append(context, []interface{}{"triedirty", trieBufNodes}...)
context = append(context, []interface{}{"trieimutabledirty", trieImmutableBufNodes}...)
} else {
context = append(context, []interface{}{"triedirty", trieBufNodes}...)
}

if st.queued > 0 {
context = append(context, []interface{}{"queued", st.queued}...)
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1807,7 +1807,7 @@ func TestTrieForkGC(t *testing.T) {
chain.stateCache.TrieDB().Dereference(blocks[len(blocks)-1-i].Root())
chain.stateCache.TrieDB().Dereference(forks[len(blocks)-1-i].Root())
}
if nodes, _ := chain.TrieDB().Size(); nodes > 0 {
if _, nodes, _, _ := chain.TrieDB().Size(); nodes > 0 {
t.Fatalf("stale tries still alive after garbase collection")
}
}
Expand Down
3 changes: 2 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// dirty cache to the clean cache.
if config.StateScheme == rawdb.PathScheme && config.TrieDirtyCache > pathdb.MaxDirtyBufferSize/1024/1024 {
log.Info("Capped dirty cache size", "provided", common.StorageSize(config.TrieDirtyCache)*1024*1024, "adjusted", common.StorageSize(pathdb.MaxDirtyBufferSize))
config.TrieCleanCache += config.TrieDirtyCache - pathdb.MaxDirtyBufferSize/1024/1024
log.Info("Clean cache size", "provided", common.StorageSize(config.TrieCleanCache)*1024*1024)
config.TrieDirtyCache = pathdb.MaxDirtyBufferSize / 1024 / 1024
}
log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024)
Expand Down Expand Up @@ -225,6 +225,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
Preimages: config.Preimages,
StateHistory: config.StateHistory,
StateScheme: config.StateScheme,
PathSyncFlush: config.PathSyncFlush,
}
)
bcOps := make([]core.BlockChainOption, 0)
Expand Down
1 change: 1 addition & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type Config struct {
TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved.
StateScheme string `toml:",omitempty"` // State scheme used to store ethereum state and merkle trie nodes on top
PathSyncFlush bool `toml:",omitempty"` // State scheme used to store ethereum state and merkle trie nodes on top

// RequiredBlocks is a set of block number -> hash mappings which must be in the
// canonical chain of all remote peers. Setting the option makes geth verify the
Expand Down
9 changes: 5 additions & 4 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/trie/triedb/pathdb"
)

const (
Expand Down Expand Up @@ -979,7 +978,9 @@ func (h *handler) voteBroadcastLoop() {
// sync is finished.
func (h *handler) enableSyncedFeatures() {
h.acceptTxs.Store(true)
if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
h.chain.TrieDB().SetBufferSize(pathdb.DefaultDirtyBufferSize)
}
// In the bsc scenario, pathdb.MaxDirtyBufferSize (256MB) will be used.
// The performance is better than DefaultDirtyBufferSize (64MB).
//if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
// h.chain.TrieDB().SetBufferSize(pathdb.DefaultDirtyBufferSize)
//}
}
4 changes: 2 additions & 2 deletions eth/state_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u
parent = root
}
if report {
nodes, imgs := triedb.Size()
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs)
diff, nodes, immutablenodes, imgs := triedb.Size()
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "layer", diff, "nodes", nodes, "immutablenodes", immutablenodes, "preimages", imgs)
}
return statedb, func() { triedb.Dereference(block.Root()) }, nil
}
Expand Down
4 changes: 2 additions & 2 deletions eth/tracers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
// if the relevant state is available in disk.
var preferDisk bool
if statedb != nil {
s1, s2 := statedb.Database().TrieDB().Size()
preferDisk = s1+s2 > defaultTracechainMemLimit
s1, s2, s3, s4 := statedb.Database().TrieDB().Size()
preferDisk = s1+s2+s3+s4 > defaultTracechainMemLimit
}
statedb, release, err = api.backend.StateAtBlock(ctx, block, reexec, statedb, false, preferDisk)
if err != nil {
Expand Down
23 changes: 17 additions & 6 deletions trie/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type backend interface {

// Size returns the current storage size of the memory cache in front of the
// persistent database layer.
Size() common.StorageSize
Size() (common.StorageSize, common.StorageSize, common.StorageSize)

// Update performs a state transition by committing dirty nodes contained
// in the given set in order to update state from the specified parent to
Expand Down Expand Up @@ -207,16 +207,16 @@ func (db *Database) Commit(root common.Hash, report bool) error {

// Size returns the storage size of dirty trie nodes in front of the persistent
// database and the size of cached preimages.
func (db *Database) Size() (common.StorageSize, common.StorageSize) {
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize, common.StorageSize) {
var (
storages common.StorageSize
preimages common.StorageSize
diffs, nodes, immutablenodes common.StorageSize
preimages common.StorageSize
)
storages = db.backend.Size()
diffs, nodes, immutablenodes = db.backend.Size()
if db.preimages != nil {
preimages = db.preimages.size()
}
return storages, preimages
return diffs, nodes, immutablenodes, preimages
}

// Initialized returns an indicator if the state data is already initialized
Expand Down Expand Up @@ -353,3 +353,14 @@ func (db *Database) SetBufferSize(size int) error {
}
return pdb.SetBufferSize(size)
}

// Head return the top non-fork difflayer/disklayer root hash for rewinding.
// It's only supported by path-based database and will return an error for
// others.
func (db *Database) Head() common.Hash {
pdb, ok := db.backend.(*pathdb.Database)
if !ok {
return common.Hash{}
}
return pdb.Head()
}
4 changes: 2 additions & 2 deletions trie/triedb/hashdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,15 +641,15 @@ func (db *Database) Update(root common.Hash, parent common.Hash, block uint64, n

// Size returns the current storage size of the memory cache in front of the
// persistent database layer.
func (db *Database) Size() common.StorageSize {
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) {
db.lock.RLock()
defer db.lock.RUnlock()

// db.dirtiesSize only contains the useful data in the cache, but when reporting
// the total memory consumption, the maintenance metadata is also needed to be
// counted.
var metadataSize = common.StorageSize(len(db.dirties) * cachedNodeSize)
return db.dirtiesSize + db.childrenSize + metadataSize
return 0, db.dirtiesSize + db.childrenSize + metadataSize, 0
}

// Close closes the trie database and releases all held resources.
Expand Down
Loading

0 comments on commit 53559fc

Please sign in to comment.