Skip to content

Commit

Permalink
implement block process part of light sync
Browse files Browse the repository at this point in the history
  • Loading branch information
unclezoro committed Aug 23, 2021
1 parent 03f7b31 commit 41b8931
Show file tree
Hide file tree
Showing 23 changed files with 697 additions and 80 deletions.
4 changes: 2 additions & 2 deletions cmd/evm/internal/t8ntool/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig,
statedb.AddBalance(pre.Env.Coinbase, minerReward)
}
// Commit block
root, err := statedb.Commit(chainConfig.IsEIP158(vmContext.BlockNumber))
root, _, err := statedb.Commit(chainConfig.IsEIP158(vmContext.BlockNumber))
if err != nil {
fmt.Fprintf(os.Stderr, "Could not commit state: %v", err)
return nil, nil, NewError(ErrorEVM, fmt.Errorf("could not commit state: %v", err))
Expand Down Expand Up @@ -252,7 +252,7 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc) *state.StateDB
}
}
// Commit and re-open to start with a clean state.
root, _ := statedb.Commit(false)
root, _, _ := statedb.Commit(false)
statedb, _ = state.New(root, sdb, nil)
return statedb
}
Expand Down
15 changes: 14 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ var (
Name: "datadir.ancient",
Usage: "Data directory for ancient chain segments (default = inside chaindata)",
}
DiffFlag = DirectoryFlag{
Name: "datadir.diff",
Usage: "Data directory for difflayer segments (default = inside chaindata)",
}
MinFreeDiskSpaceFlag = DirectoryFlag{
Name: "datadir.minfreedisk",
Usage: "Minimum free disk space in MB, once reached triggers auto shut down (default = --cache.gc converted to MB, 0 = disabled)",
Expand Down Expand Up @@ -425,6 +429,10 @@ var (
Name: "cache.preimages",
Usage: "Enable recording the SHA3/keccak preimages of trie keys",
}
PersistDiffFlag = cli.BoolFlag{
Name: "persistdiff",
Usage: "Enable persisting the diff layer",
}
// Miner settings
MiningEnabledFlag = cli.BoolFlag{
Name: "mine",
Expand Down Expand Up @@ -1564,7 +1572,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(AncientFlag.Name) {
cfg.DatabaseFreezer = ctx.GlobalString(AncientFlag.Name)
}

if ctx.GlobalIsSet(DiffFlag.Name) {
cfg.DatabaseDiff = ctx.GlobalString(DiffFlag.Name)
}
if ctx.GlobalIsSet(PersistDiffFlag.Name) {
cfg.PersistDiff = ctx.GlobalBool(PersistDiffFlag.Name)
}
if gcmode := ctx.GlobalString(GCModeFlag.Name); gcmode != "full" && gcmode != "archive" {
Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name)
}
Expand Down
122 changes: 111 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,16 @@ var (
const (
bodyCacheLimit = 256
blockCacheLimit = 256
diffLayerCacheLimit = 1024
receiptsCacheLimit = 10000
txLookupCacheLimit = 1024
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
badBlockLimit = 10
maxBeyondBlocks = 2048

diffLayerfreezerRecheckInterval = 3 * time.Second
diffLayerfreezerBlockLimit = 864000 // The number of blocks that should be kept in disk.

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
//
// Changelog:
Expand Down Expand Up @@ -188,13 +191,15 @@ type BlockChain struct {
currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)

stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache // Cache for the most recent block bodies
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
receiptsCache *lru.Cache // Cache for the most recent receipts per block
blockCache *lru.Cache // Cache for the most recent entire blocks
txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
futureBlocks *lru.Cache // future blocks are blocks added for later processing
stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache // Cache for the most recent block bodies
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
receiptsCache *lru.Cache // Cache for the most recent receipts per block
blockCache *lru.Cache // Cache for the most recent entire blocks
txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
futureBlocks *lru.Cache // future blocks are blocks added for later processing
diffLayerCache *lru.Cache // Cache for the diffLayers
diffQueue *prque.Prque // A Priority queue to store recent diff layer

quit chan struct{} // blockchain quit channel
wg sync.WaitGroup // chain processing wait group for shutting down
Expand Down Expand Up @@ -226,6 +231,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
blockCache, _ := lru.New(blockCacheLimit)
txLookupCache, _ := lru.New(txLookupCacheLimit)
futureBlocks, _ := lru.New(maxFutureBlocks)
diffLayerCache, _ := lru.New(diffLayerCacheLimit)

bc := &BlockChain{
chainConfig: chainConfig,
Expand All @@ -244,10 +250,12 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
diffLayerCache: diffLayerCache,
txLookupCache: txLookupCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
diffQueue: prque.New(nil),
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.processor = NewStateProcessor(chainConfig, bc, engine)
Expand Down Expand Up @@ -396,6 +404,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
triedb.SaveCachePeriodically(bc.cacheConfig.TrieCleanJournal, bc.cacheConfig.TrieCleanRejournal, bc.quit)
}()
}
// Need persist and prune diff layer
if bc.db.DiffStore() != nil {
go bc.diffLayerFreeze()
}
return bc, nil
}

Expand All @@ -408,6 +420,14 @@ func (bc *BlockChain) CacheReceipts(hash common.Hash, receipts types.Receipts) {
bc.receiptsCache.Add(hash, receipts)
}

func (bc *BlockChain) CacheDiffLayer(hash common.Hash, num uint64, diffLayer *types.DiffLayer) {
bc.diffLayerCache.Add(hash, diffLayer)
if bc.db.DiffStore() != nil {
// push to priority queue before persisting
bc.diffQueue.Push(diffLayer, -(int64(num)))
}
}

func (bc *BlockChain) CacheBlock(hash common.Hash, block *types.Block) {
bc.blockCache.Add(hash, block)
}
Expand Down Expand Up @@ -1506,10 +1526,19 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
wg.Done()
}()
// Commit all cached state changes into underlying memory database.
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
root, diffLayer, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return NonStatTy, err
}

// Ensure no empty block body
if diffLayer != nil && block.Header().TxHash != types.EmptyRootHash {
// Filling necessary field
diffLayer.Receipts = receipts
diffLayer.StateRoot = root
diffLayer.Hash = block.Hash()
bc.CacheDiffLayer(diffLayer.Hash, block.Number().Uint64(), diffLayer)
}
triedb := bc.stateCache.TrieDB()

// If we're running an archive node, always flush
Expand Down Expand Up @@ -1895,8 +1924,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
bc.reportBlock(block, receipts, err)
return it.index, err
}
bc.CacheReceipts(block.Hash(), receipts)
bc.CacheBlock(block.Hash(), block)
// Update the metrics touched during block processing
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
Expand All @@ -1916,6 +1943,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
log.Error("validate state failed", "error", err)
return it.index, err
}
bc.CacheReceipts(block.Hash(), receipts)
bc.CacheBlock(block.Hash(), block)
proctime := time.Since(start)

// Update the metrics touched during block validation
Expand Down Expand Up @@ -2292,6 +2321,77 @@ func (bc *BlockChain) update() {
}
}

func (bc *BlockChain) diffLayerFreeze() {
recheck := time.Tick(diffLayerfreezerRecheckInterval)
for {
select {
case <-bc.quit:
// Persist all diffLayers when shutdown, it will introduce redundant storage, but it is acceptable.
// If the client been ungracefully shutdown, it will missing all cached diff layers, it is acceptable as well.
var batch ethdb.Batch
for !bc.diffQueue.Empty() {
diff, _ := bc.diffQueue.Pop()
diffLayer := diff.(*types.DiffLayer)
if batch == nil {
batch = bc.db.DiffStore().NewBatch()
}
rawdb.WriteDiffLayer(batch, diffLayer.Hash, diffLayer)
if batch.ValueSize() > ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
log.Error("Failed to write diff layer", "err", err)
return
}
batch.Reset()
}
}
if batch != nil {
if err := batch.Write(); err != nil {
log.Error("Failed to write diff layer", "err", err)
return
}
batch.Reset()
}
return
case <-recheck:
currentHeight := bc.CurrentBlock().NumberU64()
var batch ethdb.Batch
for !bc.diffQueue.Empty() {
diff, prio := bc.diffQueue.Pop()
diffLayer := diff.(*types.DiffLayer)

// if the block old enough
if int64(currentHeight)+prio > int64(bc.triesInMemory) {
canonicalHash := bc.GetCanonicalHash(uint64(-prio))
// on the canonical chain
if canonicalHash == diffLayer.Hash {
if batch == nil {
batch = bc.db.DiffStore().NewBatch()
}
rawdb.WriteDiffLayer(batch, diffLayer.Hash, diffLayer)
staleHash := bc.GetCanonicalHash(uint64(-prio) - diffLayerfreezerBlockLimit)
rawdb.DeleteDiffLayer(batch, staleHash)
}
} else {
bc.diffQueue.Push(diffLayer, prio)
break
}
if batch != nil && batch.ValueSize() > ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
panic(fmt.Sprintf("Failed to write diff layer, error %v", err))
}
batch.Reset()
}
}
if batch != nil {
if err := batch.Write(); err != nil {
panic(fmt.Sprintf("Failed to write diff layer, error %v", err))
}
batch.Reset()
}
}
}
}

// maintainTxIndex is responsible for the construction and deletion of the
// transaction index.
//
Expand Down
2 changes: 1 addition & 1 deletion core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
block, _, _ := b.engine.FinalizeAndAssemble(chainreader, b.header, statedb, b.txs, b.uncles, b.receipts)

// Write state changes to db
root, err := statedb.Commit(config.IsEIP158(b.header.Number))
root, _, err := statedb.Commit(config.IsEIP158(b.header.Number))
if err != nil {
panic(fmt.Sprintf("state write error: %v", err))
}
Expand Down
38 changes: 38 additions & 0 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,44 @@ func WriteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64, body *t
WriteBodyRLP(db, hash, number, data)
}

func WriteDiffLayer(db ethdb.KeyValueWriter, hash common.Hash, layer *types.DiffLayer) {
data, err := rlp.EncodeToBytes(layer)
if err != nil {
log.Crit("Failed to RLP encode diff layer", "err", err)
}
WriteDiffLayerRLP(db, hash, data)
}

func WriteDiffLayerRLP(db ethdb.KeyValueWriter, hash common.Hash, rlp rlp.RawValue) {
if err := db.Put(diffLayerKey(hash), rlp); err != nil {
log.Crit("Failed to store block body", "err", err)
}
}

func ReadDiffLayer(db ethdb.Reader, hash common.Hash) *types.DiffLayer {
data := ReadDiffLayerRLP(db, hash)
if len(data) == 0 {
return nil
}
diff := new(types.DiffLayer)
if err := rlp.Decode(bytes.NewReader(data), diff); err != nil {
log.Error("Invalid diff layer RLP", "hash", hash, "err", err)
return nil
}
return diff
}

func ReadDiffLayerRLP(db ethdb.Reader, hash common.Hash) rlp.RawValue {
data, _ := db.Get(diffLayerKey(hash))
return data
}

func DeleteDiffLayer(db ethdb.KeyValueWriter, hash common.Hash) {
if err := db.Delete(diffLayerKey(hash)); err != nil {
log.Crit("Failed to delete diffLayer", "err", err)
}
}

// DeleteBody removes all block body data associated with a hash.
func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Delete(blockBodyKey(number, hash)); err != nil {
Expand Down
25 changes: 25 additions & 0 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
type freezerdb struct {
ethdb.KeyValueStore
ethdb.AncientStore
diffStore ethdb.KeyValueStore
}

// Close implements io.Closer, closing both the fast key-value store as well as
Expand All @@ -48,12 +49,28 @@ func (frdb *freezerdb) Close() error {
if err := frdb.KeyValueStore.Close(); err != nil {
errs = append(errs, err)
}
if frdb.diffStore != nil {
if err := frdb.diffStore.Close(); err != nil {
errs = append(errs, err)
}
}
if len(errs) != 0 {
return fmt.Errorf("%v", errs)
}
return nil
}

func (frdb *freezerdb) DiffStore() ethdb.KeyValueStore {
return frdb.diffStore
}

func (frdb *freezerdb) SetDiffStore(diff ethdb.KeyValueStore) {
if frdb.diffStore != nil {
frdb.diffStore.Close()
}
frdb.diffStore = diff
}

// Freeze is a helper method used for external testing to trigger and block until
// a freeze cycle completes, without having to sleep for a minute to trigger the
// automatic background run.
Expand Down Expand Up @@ -114,6 +131,14 @@ func (db *nofreezedb) Sync() error {
return errNotSupported
}

func (db *nofreezedb) DiffStore() ethdb.KeyValueStore {
return nil
}

func (db *nofreezedb) SetDiffStore(diff ethdb.KeyValueStore) {
panic("not implement")
}

// NewDatabase creates a high level database on top of a given key-value data
// store without a freezer moving immutable chain segments into cold storage.
func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
Expand Down
8 changes: 8 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ var (
SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value
CodePrefix = []byte("c") // CodePrefix + code hash -> account code

// difflayer database
diffLayerPrefix = []byte("d") // diffLayerPrefix + hash -> diffLayer

preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage
configPrefix = []byte("ethereum-config-") // config prefix for the db

Expand Down Expand Up @@ -177,6 +180,11 @@ func blockReceiptsKey(number uint64, hash common.Hash) []byte {
return append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...)
}

// diffLayerKey = diffLayerKeyPrefix + hash
func diffLayerKey(hash common.Hash) []byte {
return append(append(diffLayerPrefix, hash.Bytes()...))
}

// txLookupKey = txLookupPrefix + hash
func txLookupKey(hash common.Hash) []byte {
return append(txLookupPrefix, hash.Bytes()...)
Expand Down
Loading

0 comments on commit 41b8931

Please sign in to comment.