Skip to content

Commit

Permalink
core: add delete journal
Browse files Browse the repository at this point in the history
  • Loading branch information
jingjunLi committed Mar 26, 2024
1 parent a35bb88 commit 426ee1f
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 28 deletions.
2 changes: 2 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ type CacheConfig struct {
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top
PathSyncFlush bool // Whether sync flush the trienodebuffer of pathdb to disk.
JournalPath string

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
Expand All @@ -188,6 +189,7 @@ func (c *CacheConfig) triedbConfig() *triedb.Config {
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
JournalPath: c.JournalPath,
}
}
return config
Expand Down
12 changes: 9 additions & 3 deletions core/rawdb/accessors_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rawdb

import (
"encoding/binary"
"os"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
Expand Down Expand Up @@ -154,9 +155,14 @@ func WriteTrieJournal(db ethdb.KeyValueWriter, journal []byte) {

// DeleteTrieJournal deletes the serialized in-memory trie nodes of layers saved at
// the last shutdown.
func DeleteTrieJournal(db ethdb.KeyValueWriter) {
if err := db.Delete(trieJournalKey); err != nil {
log.Crit("Failed to remove tries journal", "err", err)
func DeleteTrieJournal(path string) {
_, err := os.Stat(path)
if os.IsNotExist(err) {
return
}
errRemove := os.Remove(path)
if errRemove != nil {
log.Crit("Failed to remote tries journal", "err", err)
}
}

Expand Down
3 changes: 3 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math/big"
"path/filepath"
"runtime"
"sync"

Expand Down Expand Up @@ -254,6 +255,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
vmConfig = vm.Config{
EnablePreimageRecording: config.EnablePreimageRecording,
}
journalPath = filepath.Join(stack.ResolvePath("chaindata"), "ancient/state")
cacheConfig = &core.CacheConfig{
TrieCleanLimit: config.TrieCleanCache,
TrieCleanNoPrefetch: config.NoPrefetch,
Expand All @@ -267,6 +269,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
StateHistory: config.StateHistory,
StateScheme: config.StateScheme,
PathSyncFlush: config.PathSyncFlush,
JournalPath: journalPath,
}
)
bcOps := make([]core.BlockChainOption, 0)
Expand Down
7 changes: 5 additions & 2 deletions triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ const (
// DefaultBatchRedundancyRate defines the batch size, compatible write
// size calculation is inaccurate
DefaultBatchRedundancyRate = 1.1

JournalFile = "state.journal"
)

// layer is the interface implemented by all state layers which includes some
Expand Down Expand Up @@ -101,6 +103,7 @@ type Config struct {
CleanCacheSize int // Maximum memory allowance (in bytes) for caching clean nodes
DirtyCacheSize int // Maximum memory allowance (in bytes) for caching dirty nodes
ReadOnly bool // Flag whether the database is opened in read only mode.
JournalPath string // Path of the journal
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -315,7 +318,7 @@ func (db *Database) Enable(root common.Hash) error {
// Drop the stale state journal in persistent database and
// reset the persistent state id back to zero.
batch := db.diskdb.NewBatch()
rawdb.DeleteTrieJournal(batch)
rawdb.DeleteTrieJournal(db.config.JournalPath + JournalFile)
rawdb.WritePersistentStateID(batch, 0)
if err := batch.Write(); err != nil {
return err
Expand Down Expand Up @@ -379,7 +382,7 @@ func (db *Database) Recover(root common.Hash, loader triestate.TrieLoader) error
// disk layer won't be accessible from outside.
db.tree.reset(dl)
}
rawdb.DeleteTrieJournal(db.diskdb)
rawdb.DeleteTrieJournal(db.config.JournalPath + JournalFile)
_, err := truncateFromHead(db.diskdb, db.freezer, dl.stateID())
if err != nil {
return err
Expand Down
54 changes: 31 additions & 23 deletions triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,16 @@ type journalStorage struct {

// loadJournal tries to parse the layer journal from the disk.
func (db *Database) loadJournal(diskRoot common.Hash) (layer, error) {
input, err := os.Open("state.journal")
fd, err := os.Open(db.config.JournalPath + JournalFile)
if errors.Is(err, fs.ErrNotExist) {
// Skip the parsing if the journal file doesn't exist at all
return nil, fmt.Errorf("state journal not exist")
return nil, errMissJournal
}
if err != nil {
return nil, err
}
defer input.Close()
defer fd.Close()

r := rlp.NewStream(input, 0)
r := rlp.NewStream(fd, 0)

// Firstly, resolve the first element as the journal version
version, err := r.Uint64()
Expand Down Expand Up @@ -117,7 +116,7 @@ func (db *Database) loadJournal(diskRoot common.Hash) (layer, error) {
if err != nil {
return nil, err
}
log.Debug("Loaded layer journal", "diskroot", diskRoot, "diffhead", head.rootHash())
log.Info("Loaded layer journal", "diskroot", diskRoot, "diffhead", head.rootHash())
return head, nil
}

Expand Down Expand Up @@ -149,17 +148,15 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
var root common.Hash
var length uint64
if err := r.Decode(&length); err != nil {
return nil, fmt.Errorf("load disk length xxx: %v", err)
return nil, fmt.Errorf("load disk length: %v", err)
}

var journalBuf []byte
if err := r.Decode(&journalBuf); err != nil {
return nil, fmt.Errorf("load disk journal: %v", err)
}
log.Info("success journalBuf", "journalBuf", len(journalBuf), "len", length)

rBuf := rlp.NewStream(bytes.NewReader(journalBuf), 1024*1024)

if err := rBuf.Decode(&root); err != nil {
return nil, fmt.Errorf("load disk root: %v", err)
}
Expand Down Expand Up @@ -192,14 +189,14 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
nodes[entry.Owner] = subset
}

var shasum [32]byte
if err := r.Decode(&shasum); err != nil {
var shaSum [32]byte
if err := r.Decode(&shaSum); err != nil {
return nil, fmt.Errorf("load shasum: %v", err)
}

expectSum := sha256.Sum256(journalBuf)
if shasum != expectSum {
return nil, fmt.Errorf("expect shasum: %v, real:%v", expectSum, shasum)
if shaSum != expectSum {
return nil, fmt.Errorf("expect shaSum: %v, real:%v", expectSum, shaSum)
}

// Calculate the internal state transitions by id difference.
Expand All @@ -214,13 +211,17 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {
var root common.Hash
var length uint64
if err := r.Decode(&length); err != nil {
return nil, fmt.Errorf("load disk length 222 : %v", err)
// The first read may fail with EOF, marking the end of the journal
if err == io.EOF {
return parent, nil
}
return nil, fmt.Errorf("load disk length : %v", err)
}
log.Info("Loaded diff layer journal", "parent", parent)
var journalBuf []byte
if err := r.Decode(&journalBuf); err != nil {
return nil, fmt.Errorf("load disk journal buffer: %v", err)
}
log.Info("success journalBuf", "journalBuf", len(journalBuf), "len", length)

rBuf := rlp.NewStream(bytes.NewReader(journalBuf), 1024*1024)

Expand Down Expand Up @@ -283,14 +284,14 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {
}
storages[entry.Account] = set
}
var shasum [32]byte
if err := r.Decode(&shasum); err != nil {
var shaSum [32]byte
if err := r.Decode(&shaSum); err != nil {
return nil, fmt.Errorf("load shasum: %v", err)
}

expectSum := sha256.Sum256(journalBuf)
if shasum != expectSum {
return nil, fmt.Errorf("expect shasum: %v, real:%v", expectSum, shasum)
if shaSum != expectSum {
return nil, fmt.Errorf("expect shaSum: %v, real:%v", expectSum, shaSum)
}
return db.loadDiffLayer(newDiffLayer(parent, root, parent.stateID()+1, block, nodes, triestate.New(accounts, storages, incomplete)), r)
}
Expand Down Expand Up @@ -340,7 +341,7 @@ func (dl *diskLayer) journal(w io.Writer) error {
if err := rlp.Encode(w, shasum); err != nil {
return err
}
log.Debug("Journaled pathdb disk layer", "root", dl.root, "nodes", len(bufferNodes))
log.Info("Journaled pathdb disk layer", "root", dl.root, "nodes", len(bufferNodes))
return nil
}

Expand Down Expand Up @@ -410,7 +411,7 @@ func (dl *diffLayer) journal(w io.Writer) error {
if err := rlp.Encode(w, shasum); err != nil {
return err
}
log.Debug("Journaled pathdb diff layer", "root", dl.root, "parent", dl.parent.rootHash(), "id", dl.stateID(), "block", dl.block, "nodes", len(dl.nodes))
log.Info("Journaled pathdb diff layer", "root", dl.root, "parent", dl.parent.rootHash(), "id", dl.stateID(), "block", dl.block, "nodes", len(dl.nodes))
return nil
}

Expand Down Expand Up @@ -443,8 +444,15 @@ func (db *Database) Journal(root common.Hash) error {
return errDatabaseReadOnly
}
// Firstly write out the metadata of journal
//journal := new(bytes.Buffer)
journal, err := os.OpenFile("state.journal", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
journalFilePath := db.config.JournalPath + JournalFile

if _, err := os.Stat(journalFilePath); err == nil {
if err = os.Remove(journalFilePath); err != nil {
fmt.Println("Error removing existing file:", err)
return err
}
}
journal, err := os.OpenFile(journalFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return err
}
Expand Down

0 comments on commit 426ee1f

Please sign in to comment.