Skip to content

Commit

Permalink
core: uncomment TestCorruptedJournal test
Browse files Browse the repository at this point in the history
core: fix some comment

core: rename some variable
  • Loading branch information
jingjunLi committed Apr 9, 2024
1 parent 3a37e96 commit 4e89289
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 102 deletions.
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ var (
}
JournalFileFlag = &cli.BoolFlag{
Name: "journalfile",
Usage: "Enable the in-memory trie node layers to store to wal file when shutdown in pbss (default = false)",
Usage: "Enable journal file to store the TrieJournal when shutdown in pbss (default = false)",
Value: false,
Category: flags.StateCategory,
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ type CacheConfig struct {
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top
PathSyncFlush bool // Whether sync flush the trienodebuffer of pathdb to disk.
JournalFile string // whether enable TrieJournal store in wal
JournalFile string // whether enable TrieJournal store in journal file

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
Expand Down
8 changes: 5 additions & 3 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import (

const (
ChainDBNamespace = "eth/db/chaindata/"
JournalFile = "state.journal"
JournalFile = "trie.journal"
ChainData = "chaindata"
)

Expand Down Expand Up @@ -252,8 +252,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion)
}
}
var journalFile string
var path string
var (
journalFile string
path string
)
if config.JournalFileEnabled {
if stack.IsSeparatedDB() {
path = ChainData + "/state"
Expand Down
2 changes: 1 addition & 1 deletion eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type Config struct {
// consistent with persistent state.
StateScheme string `toml:",omitempty"` // State scheme used to store ethereum state and merkle trie nodes on top
PathSyncFlush bool `toml:",omitempty"` // State scheme used to store ethereum state and merkle trie nodes on top
JournalFileEnabled bool // Enable the in-memory trie node layers to store to wal file when shutdown
JournalFileEnabled bool // Enable the TrieJournal to store to journal file when shutdown

// RequiredBlocks is a set of block number -> hash mappings which must be in the
// canonical chain of all remote peers. Setting the option makes geth verify the
Expand Down
6 changes: 3 additions & 3 deletions triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type Config struct {
DirtyCacheSize int // Maximum memory allowance (in bytes) for caching dirty nodes
ReadOnly bool // Flag whether the database is opened in read only mode.
NoTries bool
JournalFile string // whether enable TrieJournal store in wal
JournalFile string // whether enable TrieJournal store in journal file
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -320,7 +320,7 @@ func (db *Database) Enable(root common.Hash) error {
// Drop the stale state journal in persistent database and
// reset the persistent state id back to zero.
batch := db.diskdb.NewBatch()
db.journal.JournalDelete()
db.journal.Delete()
rawdb.WritePersistentStateID(batch, 0)
if err := batch.Write(); err != nil {
return err
Expand Down Expand Up @@ -384,7 +384,7 @@ func (db *Database) Recover(root common.Hash, loader triestate.TrieLoader) error
// disk layer won't be accessible from outside.
db.tree.reset(dl)
}
db.journal.JournalDelete()
db.journal.Delete()
_, err := truncateFromHead(db.diskdb, db.freezer, dl.stateID())
if err != nil {
return err
Expand Down
59 changes: 29 additions & 30 deletions triedb/pathdb/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,36 +527,35 @@ func TestJournal(t *testing.T) {
}
}

//func TestCorruptedJournal(t *testing.T) {
// tester := newTester(t, 0)
// defer tester.release()
// log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelInfo, true)))
//
// if err := tester.db.Journal(tester.lastHash()); err != nil {
// t.Errorf("Failed to journal, err: %v", err)
// }
// tester.db.Close()
// _, root := rawdb.ReadAccountTrieNode(tester.db.diskdb, nil)
//
// // Mutate the journal in disk, it should be regarded as invalid
// blob := rawdb.ReadTrieJournal(tester.db.diskdb)
// blob[0] = 1
// rawdb.WriteTrieJournal(tester.db.diskdb, blob)
//
// // Verify states, all not-yet-written states should be discarded
// tester.db = New(tester.db.diskdb, nil)
// for i := 0; i < len(tester.roots); i++ {
// if tester.roots[i] == root {
// if err := tester.verifyState(root); err != nil {
// t.Fatalf("Disk state is corrupted, err: %v", err)
// }
// continue
// }
// if err := tester.verifyState(tester.roots[i]); err == nil {
// t.Fatal("Unexpected state")
// }
// }
//}
func TestCorruptedJournal(t *testing.T) {
tester := newTester(t, 0)
defer tester.release()

if err := tester.db.Journal(tester.lastHash()); err != nil {
t.Errorf("Failed to journal, err: %v", err)
}
tester.db.Close()
_, root := rawdb.ReadAccountTrieNode(tester.db.diskdb, nil)

// Mutate the journal in disk, it should be regarded as invalid
blob := rawdb.ReadTrieJournal(tester.db.diskdb)
blob[0] = 1
rawdb.WriteTrieJournal(tester.db.diskdb, blob)

// Verify states, all not-yet-written states should be discarded
tester.db = New(tester.db.diskdb, nil)
for i := 0; i < len(tester.roots); i++ {
if tester.roots[i] == root {
if err := tester.verifyState(root); err != nil {
t.Fatalf("Disk state is corrupted, err: %v", err)
}
continue
}
if err := tester.verifyState(tester.roots[i]); err == nil {
t.Fatal("Unexpected state")
}
}
}

// TestTailTruncateHistory function is designed to test a specific edge case where,
// when history objects are removed from the end, it should trigger a state flush
Expand Down
126 changes: 63 additions & 63 deletions triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,25 +74,25 @@ type journalStorage struct {
Slots [][]byte
}

// journalDB is a journal implementation that stores the journal in db.
type journalDB struct {
// journalKV is a journal implementation that stores the journal in db as a single kv.
type journalKV struct {
Journal
journalBuf bytes.Buffer // Used for temporary storage in memory, and finally uniformly written to the database during sync.
diskdb ethdb.Database // Persistent storage for matured trie nodes
}

// journalWAL is a journal implementation that stores the journal in a file.
type journalWAL struct {
// journalFile is a journal implementation that stores the journal in a file.
type journalFile struct {
Journal
journalFile string
journalFd *os.File
file string // the file used to store the TrieJournal
fd *os.File // the file's fd
}

// loadJournal tries to parse the layer journal from the disk.
func (db *Database) loadJournal(diskRoot common.Hash) (layer, error) {
start := time.Now()
r, err := db.journal.NewJournalReader()
defer db.journal.JournalClose()
r, err := db.journal.newJournalReader()
defer db.journal.Close()

if err != nil {
return nil, err
Expand Down Expand Up @@ -459,9 +459,9 @@ func (db *Database) Journal(root common.Hash) error {
return errDatabaseReadOnly
}
// Firstly write out the metadata of journal
db.journal.JournalDelete()
journal := db.journal.NewJournalWriter()
defer db.journal.JournalClose()
db.journal.Delete()
journal := db.journal.newJournalWriter()
defer db.journal.Close()

if err := rlp.Encode(journal, journalVersion); err != nil {
return err
Expand All @@ -482,9 +482,9 @@ func (db *Database) Journal(root common.Hash) error {
}

// Store the journal into the database and return
// JournalSize returns the size of the journal in bytes, It must be called before JournalWriterSync.
journalSize := db.journal.JournalSize()
db.journal.JournalWriterSync()
// Size returns the size of the journal in bytes, It must be called before Sync.
journalSize := db.journal.Size()
db.journal.Sync()

// Set the db in read only mode to reject all following mutations
db.readOnly = true
Expand All @@ -493,95 +493,95 @@ func (db *Database) Journal(root common.Hash) error {
}

type Journal interface {
// NewJournalWriter creates a new journal writer.
NewJournalWriter() io.Writer
// newJournalWriter creates a new journal writer.
newJournalWriter() io.Writer

// NewJournalReader creates a new journal reader.
NewJournalReader() (*rlp.Stream, error)
// newJournalReader creates a new journal reader.
newJournalReader() (*rlp.Stream, error)

// JournalWriterSync flushes the journal writer.
JournalWriterSync()
// Sync flushes the journal writer.
Sync()

// JournalDelete deletes the journal.
JournalDelete()
// Delete deletes the journal.
Delete()

// JournalClose closes the journal.
JournalClose()
// Close closes the journal.
Close()

// JournalSize returns the size of the journal.
JournalSize() uint64
// Size returns the size of the journal.
Size() uint64
}

func newJournal(journalFile string, db ethdb.Database) Journal {
if len(journalFile) == 0 {
return &journalDB{
func newJournal(file string, db ethdb.Database) Journal {
if len(file) == 0 {
return &journalKV{
diskdb: db,
}
} else {
return &journalWAL{
journalFile: journalFile,
return &journalFile{
file: file,
}
}
}

func (db *journalDB) NewJournalWriter() io.Writer {
return &db.journalBuf
func (kv *journalKV) newJournalWriter() io.Writer {
return &kv.journalBuf
}

func (db *journalDB) NewJournalReader() (*rlp.Stream, error) {
journal := rawdb.ReadTrieJournal(db.diskdb)
func (kv *journalKV) newJournalReader() (*rlp.Stream, error) {
journal := rawdb.ReadTrieJournal(kv.diskdb)
if len(journal) == 0 {
return nil, errMissJournal
}
return rlp.NewStream(bytes.NewReader(journal), 0), nil
}

func (db *journalDB) JournalWriterSync() {
rawdb.WriteTrieJournal(db.diskdb, db.journalBuf.Bytes())
db.journalBuf.Reset()
func (kv *journalKV) Sync() {
rawdb.WriteTrieJournal(kv.diskdb, kv.journalBuf.Bytes())
kv.journalBuf.Reset()
}

func (db *journalDB) JournalDelete() {
rawdb.DeleteTrieJournal(db.diskdb)
func (kv *journalKV) Delete() {
rawdb.DeleteTrieJournal(kv.diskdb)
}

func (db *journalDB) JournalClose() {
func (kv *journalKV) Close() {
}

func (db *journalDB) JournalSize() uint64 {
return uint64(db.journalBuf.Len())
func (kv *journalKV) Size() uint64 {
return uint64(kv.journalBuf.Len())
}

// NewJournalWriter creates a new journal writer.
func (wal *journalWAL) NewJournalWriter() io.Writer {
// newJournalWriter creates a new journal writer.
func (f *journalFile) newJournalWriter() io.Writer {
var err error
wal.journalFd, err = os.OpenFile(wal.journalFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
f.fd, err = os.OpenFile(f.file, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil
}
return wal.journalFd
return f.fd
}

// NewJournalReader creates a new journal reader.
func (wal *journalWAL) NewJournalReader() (*rlp.Stream, error) {
// newJournalReader creates a new journal reader.
func (f *journalFile) newJournalReader() (*rlp.Stream, error) {
var err error
wal.journalFd, err = os.Open(wal.journalFile)
f.fd, err = os.Open(f.file)
if errors.Is(err, fs.ErrNotExist) {
return nil, errMissJournal
}
if err != nil {
return nil, err
}
return rlp.NewStream(wal.journalFd, 0), nil
return rlp.NewStream(f.fd, 0), nil
}

// JournalWriterSync flushes the journal writer.
func (wal *journalWAL) JournalWriterSync() {
// Sync flushes the journal writer.
func (f *journalFile) Sync() {
}

// JournalDelete deletes the journal.
func (wal *journalWAL) JournalDelete() {
file := wal.journalFile
// Delete deletes the journal.
func (f *journalFile) Delete() {
file := f.file
_, err := os.Stat(file)
if os.IsNotExist(err) {
return
Expand All @@ -592,15 +592,15 @@ func (wal *journalWAL) JournalDelete() {
}
}

// JournalClose closes the journal.
func (wal *journalWAL) JournalClose() {
wal.journalFd.Close()
// Close closes the journal.
func (f *journalFile) Close() {
f.fd.Close()
}

// JournalSize returns the size of the journal.
func (wal *journalWAL) JournalSize() uint64 {
if wal.journalFd != nil {
fileInfo, err := wal.journalFd.Stat()
// Size returns the size of the journal.
func (f *journalFile) Size() uint64 {
if f.fd != nil {
fileInfo, err := f.fd.Stat()
if err != nil {
log.Crit("Failed to stat journal", "err", err)
}
Expand Down

0 comments on commit 4e89289

Please sign in to comment.