Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: trieJournal format compatible old db format #2395

Merged
merged 4 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type layer interface {
// journal commits an entire diff hierarchy to disk into a single journal entry.
// This is meant to be used during shutdown to persist the layer without
// flattening everything down (bad for reorgs).
journal(w io.Writer) error
journal(w io.Writer, journalFile bool) error
}

// Config contains the settings for database.
Expand Down Expand Up @@ -527,6 +527,10 @@ func (db *Database) GetAllRooHash() [][]string {
return data
}

func (db *Database) IsEnableJournalFile() bool {
return len(db.config.JournalFilePath) != 0
}

func (db *Database) DeleteTrieJournal(writer ethdb.KeyValueWriter) error {
filePath := db.config.JournalFilePath
if len(filePath) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion triedb/pathdb/difflayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,6 @@ func BenchmarkJournal(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
layer.journal(new(bytes.Buffer))
layer.journal(new(bytes.Buffer), false)
}
}
160 changes: 96 additions & 64 deletions triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,12 @@ func (db *Database) loadJournal(diskRoot common.Hash) (layer, error) {
return nil, fmt.Errorf("%w want %x got %x", errUnmatchedJournal, root, diskRoot)
}
// Load the disk layer from the journal
base, err := db.loadDiskLayer(r)
base, err := db.loadDiskLayer(r, db.IsEnableJournalFile())
if err != nil {
return nil, err
}
// Load all the diff layers from the journal
head, err := db.loadDiffLayer(base, r)
head, err := db.loadDiffLayer(base, r, db.IsEnableJournalFile())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -260,20 +260,26 @@ func (db *Database) loadLayers() layer {

// loadDiskLayer reads the binary blob from the layer journal, reconstructing
// a new disk layer on it.
func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
func (db *Database) loadDiskLayer(r *rlp.Stream, journalFile bool) (layer, error) {
// Resolve disk layer root
var root common.Hash
var length uint64
if err := r.Decode(&length); err != nil {
return nil, fmt.Errorf("load disk length: %v", err)
}

var journalEncodedBuff []byte
if err := r.Decode(&journalEncodedBuff); err != nil {
return nil, fmt.Errorf("load disk journal: %v", err)
var (
root common.Hash
journalBuf *rlp.Stream
journalEncodedBuff []byte
)
if journalFile {
var length uint64
if err := r.Decode(&length); err != nil {
return nil, fmt.Errorf("load disk length: %v", err)
}
if err := r.Decode(&journalEncodedBuff); err != nil {
return nil, fmt.Errorf("load disk journal: %v", err)
}
journalBuf = rlp.NewStream(bytes.NewReader(journalEncodedBuff), 0)
} else {
journalBuf = r
}

journalBuf := rlp.NewStream(bytes.NewReader(journalEncodedBuff), 0)
if err := journalBuf.Decode(&root); err != nil {
return nil, fmt.Errorf("load disk root: %v", err)
}
Expand Down Expand Up @@ -306,14 +312,16 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
nodes[entry.Owner] = subset
}

var shaSum [32]byte
if err := r.Decode(&shaSum); err != nil {
return nil, fmt.Errorf("load shasum: %v", err)
}
if journalFile {
var shaSum [32]byte
if err := r.Decode(&shaSum); err != nil {
return nil, fmt.Errorf("load shasum: %v", err)
}

expectSum := sha256.Sum256(journalEncodedBuff)
if shaSum != expectSum {
return nil, fmt.Errorf("expect shaSum: %v, real:%v", expectSum, shaSum)
expectSum := sha256.Sum256(journalEncodedBuff)
if shaSum != expectSum {
return nil, fmt.Errorf("expect shaSum: %v, real:%v", expectSum, shaSum)
}
}

// Calculate the internal state transitions by id difference.
Expand All @@ -323,24 +331,30 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {

// loadDiffLayer reads the next sections of a layer journal, reconstructing a new
// diff and verifying that it can be linked to the requested parent.
func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {
func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream, journalFile bool) (layer, error) {
// Read the next diff journal entry
var root common.Hash
var length uint64
if err := r.Decode(&length); err != nil {
// The first read may fail with EOF, marking the end of the journal
if err == io.EOF {
return parent, nil
var (
root common.Hash
journalBuf *rlp.Stream
journalEncodedBuff []byte
)
if journalFile {
var length uint64
if err := r.Decode(&length); err != nil {
// The first read may fail with EOF, marking the end of the journal
if err == io.EOF {
return parent, nil
}
return nil, fmt.Errorf("load disk length : %v", err)
}
return nil, fmt.Errorf("load disk length : %v", err)
}
var journalEncodedBuff []byte
if err := r.Decode(&journalEncodedBuff); err != nil {
return nil, fmt.Errorf("load disk journal buffer: %v", err)
if err := r.Decode(&journalEncodedBuff); err != nil {
return nil, fmt.Errorf("load disk journal buffer: %v", err)
}
journalBuf = rlp.NewStream(bytes.NewReader(journalEncodedBuff), 0)
} else {
journalBuf = r
}

journalBuf := rlp.NewStream(bytes.NewReader(journalEncodedBuff), 0)

if err := journalBuf.Decode(&root); err != nil {
// The first read may fail with EOF, marking the end of the journal
if err == io.EOF {
Expand Down Expand Up @@ -400,23 +414,27 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {
}
storages[entry.Account] = set
}
var shaSum [32]byte
if err := r.Decode(&shaSum); err != nil {
return nil, fmt.Errorf("load shasum: %v", err)
}

expectSum := sha256.Sum256(journalEncodedBuff)
if shaSum != expectSum {
return nil, fmt.Errorf("expect shaSum: %v, real:%v", expectSum, shaSum)
if journalFile {
var shaSum [32]byte
if err := r.Decode(&shaSum); err != nil {
return nil, fmt.Errorf("load shasum: %v", err)
}

expectSum := sha256.Sum256(journalEncodedBuff)
if shaSum != expectSum {
return nil, fmt.Errorf("expect shaSum: %v, real:%v", expectSum, shaSum)
}
}

log.Debug("Loaded diff layer journal", "root", root, "parent", parent.rootHash(), "id", parent.stateID()+1, "block", block)

return db.loadDiffLayer(newDiffLayer(parent, root, parent.stateID()+1, block, nodes, triestate.New(accounts, storages, incomplete)), r)
return db.loadDiffLayer(newDiffLayer(parent, root, parent.stateID()+1, block, nodes, triestate.New(accounts, storages, incomplete)), r, db.IsEnableJournalFile())
}

// journal implements the layer interface, marshaling the un-flushed trie nodes
// along with layer metadata into provided byte buffer.
func (dl *diskLayer) journal(w io.Writer) error {
func (dl *diskLayer) journal(w io.Writer, journalFile bool) error {
dl.lock.RLock()
defer dl.lock.RUnlock()

Expand Down Expand Up @@ -450,28 +468,35 @@ func (dl *diskLayer) journal(w io.Writer) error {
}

// Store the journal buf into w and calculate checksum
if err := rlp.Encode(w, uint64(journalBuf.Len())); err != nil {
return err
}
shasum := sha256.Sum256(journalBuf.Bytes())
if err := rlp.Encode(w, journalBuf.Bytes()); err != nil {
return err
}
if err := rlp.Encode(w, shasum); err != nil {
return err
if journalFile {
if err := rlp.Encode(w, uint64(journalBuf.Len())); err != nil {
return err
}
shasum := sha256.Sum256(journalBuf.Bytes())
if err := rlp.Encode(w, journalBuf.Bytes()); err != nil {
return err
}
if err := rlp.Encode(w, shasum); err != nil {
return err
}
} else {
if _, err := w.Write(journalBuf.Bytes()); err != nil {
return err
}
}

log.Info("Journaled pathdb disk layer", "root", dl.root, "nodes", len(bufferNodes))
return nil
}

// journal implements the layer interface, writing the memory layer contents
// into a buffer to be stored in the database as the layer journal.
func (dl *diffLayer) journal(w io.Writer) error {
func (dl *diffLayer) journal(w io.Writer, journalFile bool) error {
dl.lock.RLock()
defer dl.lock.RUnlock()

// journal the parent first
if err := dl.parent.journal(w); err != nil {
if err := dl.parent.journal(w, journalFile); err != nil {
return err
}
// Create a buffer to store encoded data
Expand Down Expand Up @@ -521,16 +546,23 @@ func (dl *diffLayer) journal(w io.Writer) error {
}

// Store the journal buf into w and calculate checksum
if err := rlp.Encode(w, uint64(journalBuf.Len())); err != nil {
return err
}
shasum := sha256.Sum256(journalBuf.Bytes())
if err := rlp.Encode(w, journalBuf.Bytes()); err != nil {
return err
}
if err := rlp.Encode(w, shasum); err != nil {
return err
if journalFile {
if err := rlp.Encode(w, uint64(journalBuf.Len())); err != nil {
return err
}
shasum := sha256.Sum256(journalBuf.Bytes())
if err := rlp.Encode(w, journalBuf.Bytes()); err != nil {
return err
}
if err := rlp.Encode(w, shasum); err != nil {
return err
}
} else {
if _, err := w.Write(journalBuf.Bytes()); err != nil {
return err
}
}

log.Info("Journaled pathdb diff layer", "root", dl.root, "parent", dl.parent.rootHash(), "id", dl.stateID(), "block", dl.block, "nodes", len(dl.nodes))
return nil
}
Expand Down Expand Up @@ -582,7 +614,7 @@ func (db *Database) Journal(root common.Hash) error {
return err
}
// Finally write out the journal of each layer in reverse order.
if err := l.journal(journal); err != nil {
if err := l.journal(journal, db.IsEnableJournalFile()); err != nil {
return err
}
// Store the journal into the database and return
Expand Down
Loading