Skip to content

Commit

Permalink
fix: trieJournal format compatible old db format (#2395)
Browse files Browse the repository at this point in the history
  • Loading branch information
jingjunLi committed Apr 16, 2024
1 parent 837de88 commit 90eb5b3
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 61 deletions.
6 changes: 5 additions & 1 deletion triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type layer interface {
// journal commits an entire diff hierarchy to disk into a single journal entry.
// This is meant to be used during shutdown to persist the layer without
// flattening everything down (bad for reorgs).
journal(w io.Writer) error
journal(w io.Writer, journalFile bool) error
}

// Config contains the settings for database.
Expand Down Expand Up @@ -527,6 +527,10 @@ func (db *Database) GetAllRooHash() [][]string {
return data
}

func (db *Database) IsEnableJournalFile() bool {
return len(db.config.JournalFilePath) != 0
}

func (db *Database) DeleteTrieJournal(writer ethdb.KeyValueWriter) error {
filePath := db.config.JournalFilePath
if len(filePath) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion triedb/pathdb/difflayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,6 @@ func BenchmarkJournal(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
layer.journal(new(bytes.Buffer))
layer.journal(new(bytes.Buffer), false)
}
}
136 changes: 77 additions & 59 deletions triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,18 +262,20 @@ func (db *Database) loadLayers() layer {
// a new disk layer on it.
func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
// Resolve disk layer root
var root common.Hash
var length uint64
if err := r.Decode(&length); err != nil {
return nil, fmt.Errorf("load disk length: %v", err)
}

var journalEncodedBuff []byte
if err := r.Decode(&journalEncodedBuff); err != nil {
return nil, fmt.Errorf("load disk journal: %v", err)
var (
root common.Hash
journalBuf *rlp.Stream
journalEncodedBuff []byte
)
if db.IsEnableJournalFile() {
if err := r.Decode(&journalEncodedBuff); err != nil {
return nil, fmt.Errorf("load disk journal: %v", err)
}
journalBuf = rlp.NewStream(bytes.NewReader(journalEncodedBuff), 0)
} else {
journalBuf = r
}

journalBuf := rlp.NewStream(bytes.NewReader(journalEncodedBuff), 0)
if err := journalBuf.Decode(&root); err != nil {
return nil, fmt.Errorf("load disk root: %v", err)
}
Expand Down Expand Up @@ -306,14 +308,16 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
nodes[entry.Owner] = subset
}

var shaSum [32]byte
if err := r.Decode(&shaSum); err != nil {
return nil, fmt.Errorf("load shasum: %v", err)
}
if db.IsEnableJournalFile() {
var shaSum [32]byte
if err := r.Decode(&shaSum); err != nil {
return nil, fmt.Errorf("load shasum: %v", err)
}

expectSum := sha256.Sum256(journalEncodedBuff)
if shaSum != expectSum {
return nil, fmt.Errorf("expect shaSum: %v, real:%v", expectSum, shaSum)
expectSum := sha256.Sum256(journalEncodedBuff)
if shaSum != expectSum {
return nil, fmt.Errorf("expect shaSum: %v, real:%v", expectSum, shaSum)
}
}

// Calculate the internal state transitions by id difference.
Expand All @@ -325,22 +329,24 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
// diff and verifying that it can be linked to the requested parent.
func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {
// Read the next diff journal entry
var root common.Hash
var length uint64
if err := r.Decode(&length); err != nil {
// The first read may fail with EOF, marking the end of the journal
if err == io.EOF {
return parent, nil
var (
root common.Hash
journalBuf *rlp.Stream
journalEncodedBuff []byte
)
if db.IsEnableJournalFile() {
if err := r.Decode(&journalEncodedBuff); err != nil {
// The first read may fail with EOF, marking the end of the journal
if err == io.EOF {
return parent, nil
}
return nil, fmt.Errorf("load disk journal buffer: %v", err)
}
return nil, fmt.Errorf("load disk length : %v", err)
}
var journalEncodedBuff []byte
if err := r.Decode(&journalEncodedBuff); err != nil {
return nil, fmt.Errorf("load disk journal buffer: %v", err)
journalBuf = rlp.NewStream(bytes.NewReader(journalEncodedBuff), 0)
} else {
journalBuf = r
}

journalBuf := rlp.NewStream(bytes.NewReader(journalEncodedBuff), 0)

if err := journalBuf.Decode(&root); err != nil {
// The first read may fail with EOF, marking the end of the journal
if err == io.EOF {
Expand Down Expand Up @@ -400,23 +406,27 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {
}
storages[entry.Account] = set
}
var shaSum [32]byte
if err := r.Decode(&shaSum); err != nil {
return nil, fmt.Errorf("load shasum: %v", err)
}

expectSum := sha256.Sum256(journalEncodedBuff)
if shaSum != expectSum {
return nil, fmt.Errorf("expect shaSum: %v, real:%v", expectSum, shaSum)
if db.IsEnableJournalFile() {
var shaSum [32]byte
if err := r.Decode(&shaSum); err != nil {
return nil, fmt.Errorf("load shasum: %v", err)
}

expectSum := sha256.Sum256(journalEncodedBuff)
if shaSum != expectSum {
return nil, fmt.Errorf("expect shaSum: %v, real:%v", expectSum, shaSum)
}
}

log.Debug("Loaded diff layer journal", "root", root, "parent", parent.rootHash(), "id", parent.stateID()+1, "block", block)

return db.loadDiffLayer(newDiffLayer(parent, root, parent.stateID()+1, block, nodes, triestate.New(accounts, storages, incomplete)), r)
}

// journal implements the layer interface, marshaling the un-flushed trie nodes
// along with layer metadata into provided byte buffer.
func (dl *diskLayer) journal(w io.Writer) error {
func (dl *diskLayer) journal(w io.Writer, journalFile bool) error {
dl.lock.RLock()
defer dl.lock.RUnlock()

Expand Down Expand Up @@ -450,28 +460,32 @@ func (dl *diskLayer) journal(w io.Writer) error {
}

// Store the journal buf into w and calculate checksum
if err := rlp.Encode(w, uint64(journalBuf.Len())); err != nil {
return err
}
shasum := sha256.Sum256(journalBuf.Bytes())
if err := rlp.Encode(w, journalBuf.Bytes()); err != nil {
return err
}
if err := rlp.Encode(w, shasum); err != nil {
return err
if journalFile {
shasum := sha256.Sum256(journalBuf.Bytes())
if err := rlp.Encode(w, journalBuf.Bytes()); err != nil {
return err
}
if err := rlp.Encode(w, shasum); err != nil {
return err
}
} else {
if _, err := w.Write(journalBuf.Bytes()); err != nil {
return err
}
}

log.Info("Journaled pathdb disk layer", "root", dl.root, "nodes", len(bufferNodes))
return nil
}

// journal implements the layer interface, writing the memory layer contents
// into a buffer to be stored in the database as the layer journal.
func (dl *diffLayer) journal(w io.Writer) error {
func (dl *diffLayer) journal(w io.Writer, journalFile bool) error {
dl.lock.RLock()
defer dl.lock.RUnlock()

// journal the parent first
if err := dl.parent.journal(w); err != nil {
if err := dl.parent.journal(w, journalFile); err != nil {
return err
}
// Create a buffer to store encoded data
Expand Down Expand Up @@ -521,16 +535,20 @@ func (dl *diffLayer) journal(w io.Writer) error {
}

// Store the journal buf into w and calculate checksum
if err := rlp.Encode(w, uint64(journalBuf.Len())); err != nil {
return err
}
shasum := sha256.Sum256(journalBuf.Bytes())
if err := rlp.Encode(w, journalBuf.Bytes()); err != nil {
return err
}
if err := rlp.Encode(w, shasum); err != nil {
return err
if journalFile {
shasum := sha256.Sum256(journalBuf.Bytes())
if err := rlp.Encode(w, journalBuf.Bytes()); err != nil {
return err
}
if err := rlp.Encode(w, shasum); err != nil {
return err
}
} else {
if _, err := w.Write(journalBuf.Bytes()); err != nil {
return err
}
}

log.Info("Journaled pathdb diff layer", "root", dl.root, "parent", dl.parent.rootHash(), "id", dl.stateID(), "block", dl.block, "nodes", len(dl.nodes))
return nil
}
Expand Down Expand Up @@ -582,7 +600,7 @@ func (db *Database) Journal(root common.Hash) error {
return err
}
// Finally write out the journal of each layer in reverse order.
if err := l.journal(journal); err != nil {
if err := l.journal(journal, db.IsEnableJournalFile()); err != nil {
return err
}
// Store the journal into the database and return
Expand Down

0 comments on commit 90eb5b3

Please sign in to comment.