From a62207c86c79885b7ac9162b1919277acb5539fd Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Thu, 8 May 2025 19:10:26 +0800 Subject: [PATCH] core, ethdb: introduce database sync function (#31703) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This pull request introduces a SyncKeyValue function to the ethdb.KeyValueStore interface, providing the ability to forcibly flush all previous writes to disk. This functionality is critical for go-ethereum, which internally uses two independent database engines: a key-value store (such as Pebble, LevelDB, or memoryDB for testing) and a flat-file–based freezer. To ensure write-order consistency between these engines, the key-value store must be explicitly synced before writing to the freezer and vice versa. Fixes - https://github.com/ethereum/go-ethereum/issues/31405 - https://github.com/ethereum/go-ethereum/issues/29819 --- cmd/geth/dbcmd.go | 2 +- core/bench_test.go | 8 +++---- core/blockchain.go | 21 +++++++++--------- core/blockchain_repair_test.go | 8 +++---- core/blockchain_sethead_test.go | 2 +- core/blockchain_snapshot_test.go | 4 ++-- core/blockchain_test.go | 2 +- core/headerchain.go | 35 ++++++++++++++++++++++++++++- core/rawdb/chain_freezer.go | 2 +- core/rawdb/database.go | 8 +++---- core/rawdb/freezer.go | 6 ++--- core/rawdb/freezer_memory.go | 4 ++-- core/rawdb/freezer_resettable.go | 6 ++--- core/rawdb/freezer_test.go | 2 +- core/rawdb/prunedfreezer.go | 10 ++++----- core/rawdb/table.go | 12 +++++++--- ethdb/database.go | 14 +++++++++--- ethdb/leveldb/leveldb.go | 16 ++++++++++++++ ethdb/memorydb/memorydb.go | 6 +++++ ethdb/pebble/pebble.go | 38 ++++++++++++++++++++++++++++---- ethdb/pebble/pebble_test.go | 24 ++++++++++++++++++++ ethdb/remotedb/remotedb.go | 6 ++++- node/database.go | 12 ++++------ trie/trie_test.go | 1 + triedb/pathdb/buffer.go | 9 +++++--- triedb/pathdb/database.go | 9 ++++++++ 26 files changed, 201 insertions(+), 66 deletions(-) diff --git a/cmd/geth/dbcmd.go b/cmd/geth/dbcmd.go index 935fd6baaf..51a8600710 100644 --- a/cmd/geth/dbcmd.go +++ b/cmd/geth/dbcmd.go @@ -1282,7 +1282,7 @@ func hbss2pbss(ctx *cli.Context) error { defer stack.Close() db := utils.MakeChainDatabase(ctx, stack, false, false) - db.BlockStore().Sync() + db.BlockStore().SyncAncient() stateDiskDb := db.StateStore() defer db.Close() diff --git a/core/bench_test.go b/core/bench_test.go index 7f7d3e33e5..9f7c525f09 100644 --- a/core/bench_test.go +++ b/core/bench_test.go @@ -183,7 +183,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) { if !disk { db = rawdb.NewMemoryDatabase() } else { - pdb, err := pebble.New(b.TempDir(), 128, 128, "", false, true) + pdb, err := pebble.New(b.TempDir(), 128, 128, "", false) if err != nil { b.Fatalf("cannot create temporary database: %v", err) } @@ -304,7 +304,7 @@ func makeChainForBench(db ethdb.Database, genesis *Genesis, full bool, count uin func benchWriteChain(b *testing.B, full bool, count uint64) { genesis := &Genesis{Config: params.AllEthashProtocolChanges} for i := 0; i < b.N; i++ { - pdb, err := pebble.New(b.TempDir(), 1024, 128, "", false, true) + pdb, err := pebble.New(b.TempDir(), 1024, 128, "", false) if err != nil { b.Fatalf("error opening database: %v", err) } @@ -317,7 +317,7 @@ func benchWriteChain(b *testing.B, full bool, count uint64) { func benchReadChain(b *testing.B, full bool, count uint64) { dir := b.TempDir() - pdb, err := pebble.New(dir, 1024, 128, "", false, true) + pdb, err := pebble.New(dir, 1024, 128, "", false) if err != nil { b.Fatalf("error opening database: %v", err) } @@ -333,7 +333,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) { b.ResetTimer() for i := 0; i < b.N; i++ { - pdb, err = pebble.New(dir, 1024, 128, "", false, true) + pdb, err = pebble.New(dir, 1024, 128, "", false) if err != nil { b.Fatalf("error opening database: %v", err) } diff --git a/core/blockchain.go b/core/blockchain.go index fde19bbe78..7ab7e54e89 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1119,17 +1119,16 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // Ignore the error here since light client won't hit this path frozen, _ := bc.db.BlockStore().Ancients() if num+1 <= frozen { - // Truncate all relative data(header, total difficulty, body, receipt - // and canonical hash) from ancient store. - if _, err := bc.db.BlockStore().TruncateHead(num); err != nil { - log.Crit("Failed to truncate ancient data", "number", num, "err", err) - } - // Remove the hash <-> number mapping from the active store. - rawdb.DeleteHeaderNumber(db, hash) + // The chain segment, such as the block header, canonical hash, + // body, and receipt, will be removed from the ancient store + // in one go. + // + // The hash-to-number mapping in the key-value store will be + // removed by the hc.SetHead function. } else { - // Remove relative body and receipts from the active store. - // The header, total difficulty and canonical hash will be - // removed in the hc.SetHead function. + // Remove the associated body and receipts from the key-value store. + // The header, hash-to-number mapping, and canonical hash will be + // removed by the hc.SetHead function. rawdb.DeleteBody(db, hash, num) rawdb.DeleteBlobSidecars(db, hash, num) rawdb.DeleteReceipts(db, hash, num) @@ -1599,7 +1598,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ size += writeSize // Sync the ancient store explicitly to ensure all data has been flushed to disk. - if err := bc.db.BlockStore().Sync(); err != nil { + if err := bc.db.BlockStore().SyncAncient(); err != nil { return 0, err } // Update the current snap block because all block data is now present in DB. diff --git a/core/blockchain_repair_test.go b/core/blockchain_repair_test.go index e9d6694cf3..383e454ffb 100644 --- a/core/blockchain_repair_test.go +++ b/core/blockchain_repair_test.go @@ -1767,7 +1767,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s datadir := t.TempDir() ancient := filepath.Join(datadir, "ancient") - pdb, err := pebble.New(datadir, 0, 0, "", false, true) + pdb, err := pebble.New(datadir, 0, 0, "", false) if err != nil { t.Fatalf("Failed to create persistent key-value database: %v", err) } @@ -1861,7 +1861,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s chain.stopWithoutSaving() // Start a new blockchain back up and see where the repair leads us - pdb, err = pebble.New(datadir, 0, 0, "", false, true) + pdb, err = pebble.New(datadir, 0, 0, "", false) if err != nil { t.Fatalf("Failed to reopen persistent key-value database: %v", err) } @@ -1926,7 +1926,7 @@ func testIssue23496(t *testing.T, scheme string) { datadir := t.TempDir() ancient := filepath.Join(datadir, "ancient") - pdb, err := pebble.New(datadir, 0, 0, "", false, true) + pdb, err := pebble.New(datadir, 0, 0, "", false) if err != nil { t.Fatalf("Failed to create persistent key-value database: %v", err) } @@ -1984,7 +1984,7 @@ func testIssue23496(t *testing.T, scheme string) { chain.stopWithoutSaving() // Start a new blockchain back up and see where the repair leads us - pdb, err = pebble.New(datadir, 0, 0, "", false, true) + pdb, err = pebble.New(datadir, 0, 0, "", false) if err != nil { t.Fatalf("Failed to reopen persistent key-value database: %v", err) } diff --git a/core/blockchain_sethead_test.go b/core/blockchain_sethead_test.go index 5ed9a34292..1b8f18fae6 100644 --- a/core/blockchain_sethead_test.go +++ b/core/blockchain_sethead_test.go @@ -1971,7 +1971,7 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme datadir := t.TempDir() ancient := filepath.Join(datadir, "ancient") - pdb, err := pebble.New(datadir, 0, 0, "", false, true) + pdb, err := pebble.New(datadir, 0, 0, "", false) if err != nil { t.Fatalf("Failed to create persistent key-value database: %v", err) } diff --git a/core/blockchain_snapshot_test.go b/core/blockchain_snapshot_test.go index cfb158255e..667a06f806 100644 --- a/core/blockchain_snapshot_test.go +++ b/core/blockchain_snapshot_test.go @@ -66,7 +66,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo datadir := t.TempDir() ancient := filepath.Join(datadir, "ancient") - pdb, err := pebble.New(datadir, 0, 0, "", false, true) + pdb, err := pebble.New(datadir, 0, 0, "", false) if err != nil { t.Fatalf("Failed to create persistent key-value database: %v", err) } @@ -257,7 +257,7 @@ func (snaptest *crashSnapshotTest) test(t *testing.T) { chain.triedb.Close() // Start a new blockchain back up and see where the repair leads us - pdb, err := pebble.New(snaptest.datadir, 0, 0, "", false, true) + pdb, err := pebble.New(snaptest.datadir, 0, 0, "", false) if err != nil { t.Fatalf("Failed to create persistent key-value database: %v", err) } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 0cf7ac6ba5..d9fce1c75d 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -2670,7 +2670,7 @@ func testSideImportPrunedBlocks(t *testing.T, scheme string) { datadir := t.TempDir() ancient := path.Join(datadir, "ancient") - pdb, err := pebble.New(datadir, 0, 0, "", false, true) + pdb, err := pebble.New(datadir, 0, 0, "", false) if err != nil { t.Fatalf("Failed to create persistent key-value database: %v", err) } diff --git a/core/headerchain.go b/core/headerchain.go index bf88e770cd..cc0677fde6 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -695,18 +695,51 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat hashes = append(hashes, hdr.Hash()) } for _, hash := range hashes { + // Remove the associated block body and receipts if required. + // + // If the block is in the chain freezer, then this delete operation + // is actually ineffective. if delFn != nil { delFn(blockBatch, hash, num) } + // Remove the hash->number mapping along with the header itself rawdb.DeleteHeader(blockBatch, hash, num) rawdb.DeleteTd(blockBatch, hash, num) } + // Remove the number->hash mapping rawdb.DeleteCanonicalHash(blockBatch, num) } } // Flush all accumulated deletions. if err := blockBatch.Write(); err != nil { - log.Crit("Failed to rewind block", "error", err) + log.Crit("Failed to commit batch in setHead", "err", err) + } + // Explicitly flush the pending writes in the key-value store to disk, ensuring + // data durability of the previous deletions. + if err := hc.chainDb.SyncKeyValue(); err != nil { + log.Crit("Failed to sync the key-value store in setHead", "err", err) + } + // Truncate the excessive chain segments in the ancient store. + // These are actually deferred deletions from the loop above. + // + // This step must be performed after synchronizing the key-value store; + // otherwise, in the event of a panic, it's theoretically possible to + // lose recent key-value store writes while the ancient store deletions + // remain, leading to data inconsistency, e.g., the gap between the key + // value store and ancient can be created due to unclean shutdown. + if delFn != nil { + // Ignore the error here since light client won't hit this path + frozen, _ := hc.chainDb.Ancients() + header := hc.CurrentHeader() + + // Truncate the excessive chain segment above the current chain head + // in the ancient store. + if header.Number.Uint64()+1 < frozen { + _, err := hc.chainDb.BlockStore().TruncateHead(header.Number.Uint64() + 1) + if err != nil { + log.Crit("Failed to truncate head block", "err", err) + } + } } // Clear out any stale content from the caches hc.headerCache.Purge() diff --git a/core/rawdb/chain_freezer.go b/core/rawdb/chain_freezer.go index b5612fe5bc..2580bd819b 100644 --- a/core/rawdb/chain_freezer.go +++ b/core/rawdb/chain_freezer.go @@ -309,7 +309,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) { continue } // Batch of blocks have been frozen, flush them before wiping from key-value store - if err := f.Sync(); err != nil { + if err := f.SyncAncient(); err != nil { log.Crit("Failed to flush frozen tables", "err", err) } // Wipe out all data from the active database diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 06b471679e..e96863087f 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -235,8 +235,8 @@ func (db *nofreezedb) ResetTable(kind string, startAt uint64, onlyEmpty bool) er return errNotSupported } -// Sync returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) Sync() error { +// SyncAncient returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) SyncAncient() error { return errNotSupported } @@ -391,8 +391,8 @@ func (db *emptyfreezedb) ResetTable(kind string, startAt uint64, onlyEmpty bool) return nil } -// Sync returns nil for pruned db that we don't have a backing chain freezer. -func (db *emptyfreezedb) Sync() error { +// SyncAncient returns nil for pruned db that we don't have a backing chain freezer. +func (db *emptyfreezedb) SyncAncient() error { return nil } diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 4854e0adb0..c7f28b62b5 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -386,8 +386,8 @@ func (f *Freezer) TruncateTail(tail uint64) (uint64, error) { return old, nil } -// Sync flushes all data tables to disk. -func (f *Freezer) Sync() error { +// SyncAncient flushes all data tables to disk. +func (f *Freezer) SyncAncient() error { var errs []error for _, table := range f.tables { if err := table.Sync(); err != nil { @@ -600,7 +600,7 @@ func (f *Freezer) ResetTable(kind string, startAt uint64, onlyEmpty bool) error return nil } - if err := f.Sync(); err != nil { + if err := f.SyncAncient(); err != nil { return err } nt, err := t.resetItems(startAt - f.offset) diff --git a/core/rawdb/freezer_memory.go b/core/rawdb/freezer_memory.go index 101f103d74..177f51b1c3 100644 --- a/core/rawdb/freezer_memory.go +++ b/core/rawdb/freezer_memory.go @@ -410,8 +410,8 @@ func (f *MemoryFreezer) TruncateTail(tail uint64) (uint64, error) { return old, nil } -// Sync flushes all data tables to disk. -func (f *MemoryFreezer) Sync() error { +// SyncAncient flushes all data tables to disk. +func (f *MemoryFreezer) SyncAncient() error { return nil } diff --git a/core/rawdb/freezer_resettable.go b/core/rawdb/freezer_resettable.go index 35e4bd5157..a8a78f9f40 100644 --- a/core/rawdb/freezer_resettable.go +++ b/core/rawdb/freezer_resettable.go @@ -226,12 +226,12 @@ func (f *resettableFreezer) ResetTable(kind string, startAt uint64, onlyEmpty bo return f.freezer.ResetTable(kind, startAt, onlyEmpty) } -// Sync flushes all data tables to disk. -func (f *resettableFreezer) Sync() error { +// SyncAncient flushes all data tables to disk. +func (f *resettableFreezer) SyncAncient() error { f.lock.RLock() defer f.lock.RUnlock() - return f.freezer.Sync() + return f.freezer.SyncAncient() } // AncientDatadir returns the path of the ancient store. diff --git a/core/rawdb/freezer_test.go b/core/rawdb/freezer_test.go index 5edd1be6b6..f42ea31e7f 100644 --- a/core/rawdb/freezer_test.go +++ b/core/rawdb/freezer_test.go @@ -493,7 +493,7 @@ func TestFreezerCloseSync(t *testing.T) { if err := f.Close(); err != nil { t.Fatal(err) } - if err := f.Sync(); err == nil { + if err := f.SyncAncient(); err == nil { t.Fatalf("want error, have nil") } else if have, want := err.Error(), "[closed closed]"; have != want { t.Fatalf("want %v, have %v", have, want) diff --git a/core/rawdb/prunedfreezer.go b/core/rawdb/prunedfreezer.go index f15eb8ea13..f6581de41b 100644 --- a/core/rawdb/prunedfreezer.go +++ b/core/rawdb/prunedfreezer.go @@ -106,7 +106,7 @@ func (f *prunedfreezer) repair(datadir string) error { log.Info("Read ancient db item counts", "items", minItems, "frozen", maxOffset) atomic.StoreUint64(&f.frozen, maxOffset) - if err := f.Sync(); err != nil { + if err := f.SyncAncient(); err != nil { return nil } return nil @@ -117,7 +117,7 @@ func (f *prunedfreezer) Close() error { var err error f.closeOnce.Do(func() { close(f.quit) - f.Sync() + f.SyncAncient() err = f.instanceLock.Release() }) return err @@ -200,8 +200,8 @@ func (f *prunedfreezer) TruncateTail(tail uint64) (uint64, error) { return 0, errNotSupported } -// Sync flushes meta data tables to disk. -func (f *prunedfreezer) Sync() error { +// SyncAncient flushes meta data tables to disk. +func (f *prunedfreezer) SyncAncient() error { WriteFrozenOfAncientFreezer(f.db, atomic.LoadUint64(&f.frozen)) // compatible offline prune blocks tool WriteOffSetOfCurrentAncientFreezer(f.db, atomic.LoadUint64(&f.frozen)) @@ -315,7 +315,7 @@ func (f *prunedfreezer) freeze() { ancients = append(ancients, hash) } // Batch of blocks have been frozen, flush them before wiping from leveldb - if err := f.Sync(); err != nil { + if err := f.SyncAncient(); err != nil { log.Crit("Failed to flush frozen tables", "err", err) } backoff = f.frozen-first >= freezerBatchLimit diff --git a/core/rawdb/table.go b/core/rawdb/table.go index f1fd44cfdb..e924b043f3 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -143,10 +143,10 @@ func (t *table) TruncateTail(items uint64) (uint64, error) { return t.db.TruncateTail(items) } -// Sync is a noop passthrough that just forwards the request to the underlying +// SyncAncient is a noop passthrough that just forwards the request to the underlying // database. -func (t *table) Sync() error { - return t.db.Sync() +func (t *table) SyncAncient() error { + return t.db.SyncAncient() } // AncientDatadir returns the ancient datadir of the underlying database. @@ -224,6 +224,12 @@ func (t *table) Compact(start []byte, limit []byte) error { return t.db.Compact(start, limit) } +// SyncKeyValue ensures that all pending writes are flushed to disk, +// guaranteeing data durability up to the point. +func (t *table) SyncKeyValue() error { + return t.db.SyncKeyValue() +} + // NewBatch creates a write-only database that buffers changes to its host db // until a final write is called, each operation prefixing all keys with the // pre-configured string. diff --git a/ethdb/database.go b/ethdb/database.go index ed97891b50..acd0e370c4 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -54,6 +54,13 @@ type KeyValueStater interface { Stat() (string, error) } +// KeyValueSyncer wraps the SyncKeyValue method of a backing data store. +type KeyValueSyncer interface { + // SyncKeyValue ensures that all pending writes are flushed to disk, + // guaranteeing data durability up to the point. + SyncKeyValue() error +} + // Compacter wraps the Compact method of a backing data store. type Compacter interface { // Compact flattens the underlying data store for the given key range. In essence, @@ -72,6 +79,7 @@ type KeyValueStore interface { KeyValueReader KeyValueWriter KeyValueStater + KeyValueSyncer KeyValueRangeDeleter Batcher Iteratee @@ -129,6 +137,9 @@ type AncientWriter interface { // The integer return value is the total size of the written data. ModifyAncients(func(AncientWriteOp) error) (int64, error) + // SyncAncient flushes all in-memory ancient store data to disk. + SyncAncient() error + // TruncateHead discards all but the first n ancient data from the ancient store. // After the truncation, the latest item can be accessed it item_n-1(start from 0). TruncateHead(n uint64) (uint64, error) @@ -140,9 +151,6 @@ type AncientWriter interface { // will be removed all together. TruncateTail(n uint64) (uint64, error) - // Sync flushes all in-memory ancient store data to disk. - Sync() error - // TruncateTableTail will truncate certain table to new tail TruncateTableTail(kind string, tail uint64) (uint64, error) diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go index 1d88ed460b..479fb3f2b2 100644 --- a/ethdb/leveldb/leveldb.go +++ b/ethdb/leveldb/leveldb.go @@ -326,6 +326,22 @@ func (db *Database) Path() string { return db.fn } +// SyncKeyValue flushes all pending writes in the write-ahead-log to disk, +// ensuring data durability up to that point. +func (db *Database) SyncKeyValue() error { + // In theory, the WAL (Write-Ahead Log) can be explicitly synchronized using + // a write operation with SYNC=true. However, there is no dedicated key reserved + // for this purpose, and even a nil key (key=nil) is considered a valid + // database entry. + // + // In LevelDB, writes are blocked until the data is written to the WAL, meaning + // recent writes won't be lost unless a power failure or system crash occurs. + // Additionally, LevelDB is no longer the default database engine and is likely + // only used by hash-mode archive nodes. Given this, the durability guarantees + // without explicit sync are acceptable in the context of LevelDB. + return nil +} + // meter periodically retrieves internal leveldb counters and reports them to // the metrics subsystem. func (db *Database) meter(refresh time.Duration, namespace string) { diff --git a/ethdb/memorydb/memorydb.go b/ethdb/memorydb/memorydb.go index df9e76daab..920dbbbb55 100644 --- a/ethdb/memorydb/memorydb.go +++ b/ethdb/memorydb/memorydb.go @@ -277,6 +277,12 @@ func (db *Database) Compact(start []byte, limit []byte) error { return nil } +// SyncKeyValue ensures that all pending writes are flushed to disk, +// guaranteeing data durability up to the point. +func (db *Database) SyncKeyValue() error { + return nil +} + // Len returns the number of entries currently present in the memory database. // // Note, this method is only used for testing (i.e. not public in general) and diff --git a/ethdb/pebble/pebble.go b/ethdb/pebble/pebble.go index 54c96bc779..13cea1eb3a 100644 --- a/ethdb/pebble/pebble.go +++ b/ethdb/pebble/pebble.go @@ -146,7 +146,7 @@ func (l panicLogger) Fatalf(format string, args ...interface{}) { // New returns a wrapped pebble DB object. The namespace is the prefix that the // metrics reporting should use for surfacing internal stats. -func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool) (*Database, error) { +func New(file string, cache int, handles int, namespace string, readonly bool) (*Database, error) { // Ensure we have some minimal caching and file guarantees if cache < minCache { cache = minCache @@ -187,9 +187,18 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e "handles", handles, "memory table", common.StorageSize(memTableSize)) db := &Database{ - fn: file, - log: logger, - quitChan: make(chan chan error), + fn: file, + log: logger, + quitChan: make(chan chan error), + + // Use asynchronous write mode by default. Otherwise, the overhead of frequent fsync + // operations can be significant, especially on platforms with slow fsync performance + // (e.g., macOS) or less capable SSDs. + // + // Note that enabling async writes means recent data may be lost in the event of an + // application-level panic (writes will also be lost on a machine-level failure, + // of course). Geth is expected to handle recovery from an unclean shutdown. + // writeOptions: pebble.NoSync, writeOptions: pebble.Sync, } opt := &pebble.Options{ @@ -234,6 +243,15 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e }, Levels: make([]pebble.LevelOptions, numLevels), Logger: panicLogger{}, // TODO(karalabe): Delete when this is upstreamed in Pebble + + // Pebble is configured to use asynchronous write mode, meaning write operations + // return as soon as the data is cached in memory, without waiting for the WAL + // to be written. This mode offers better write performance but risks losing + // recent writes if the application crashes or a power failure/system crash occurs. + // + // By setting the WALBytesPerSync, the cached WAL writes will be periodically + // flushed at the background if the accumulated size exceeds this threshold. + WALBytesPerSync: 5 * ethdb.IdealBatchSize, } for i := 0; i < len(opt.Levels); i++ { @@ -433,6 +451,18 @@ func (d *Database) Path() string { return d.fn } +// SyncKeyValue flushes all pending writes in the write-ahead-log to disk, +// ensuring data durability up to that point. +func (d *Database) SyncKeyValue() error { + // The entry (value=nil) is not written to the database; it is only + // added to the WAL. Writing this special log entry in sync mode + // automatically flushes all previous writes, ensuring database + // durability up to this point. + b := d.db.NewBatch() + b.LogData(nil, nil) + return d.db.Apply(b, pebble.Sync) +} + // meter periodically retrieves internal pebble counters and reports them to // the metrics subsystem. func (d *Database) meter(refresh time.Duration, namespace string) { diff --git a/ethdb/pebble/pebble_test.go b/ethdb/pebble/pebble_test.go index 3265491d4a..e703a8d0ce 100644 --- a/ethdb/pebble/pebble_test.go +++ b/ethdb/pebble/pebble_test.go @@ -17,6 +17,7 @@ package pebble import ( + "errors" "testing" "github.com/cockroachdb/pebble" @@ -54,3 +55,26 @@ func BenchmarkPebbleDB(b *testing.B) { } }) } + +func TestPebbleLogData(t *testing.T) { + db, err := pebble.Open("", &pebble.Options{ + FS: vfs.NewMem(), + }) + if err != nil { + t.Fatal(err) + } + + _, _, err = db.Get(nil) + if !errors.Is(err, pebble.ErrNotFound) { + t.Fatal("Unknown database entry") + } + + b := db.NewBatch() + b.LogData(nil, nil) + db.Apply(b, pebble.Sync) + + _, _, err = db.Get(nil) + if !errors.Is(err, pebble.ErrNotFound) { + t.Fatal("Unknown database entry") + } +} diff --git a/ethdb/remotedb/remotedb.go b/ethdb/remotedb/remotedb.go index f6a3151fef..aea0dc2fae 100644 --- a/ethdb/remotedb/remotedb.go +++ b/ethdb/remotedb/remotedb.go @@ -172,7 +172,7 @@ func (db *Database) ResetTable(kind string, startAt uint64, onlyEmpty bool) erro panic("not supported") } -func (db *Database) Sync() error { +func (db *Database) SyncAncient() error { return nil } @@ -200,6 +200,10 @@ func (db *Database) Compact(start []byte, limit []byte) error { return nil } +func (db *Database) SyncKeyValue() error { + return nil +} + func (db *Database) Close() error { db.remote.Close() return nil diff --git a/node/database.go b/node/database.go index b01e535957..3210cf0d1d 100644 --- a/node/database.go +++ b/node/database.go @@ -41,10 +41,6 @@ type openOptions struct { IsLastOffset bool PruneAncientData bool MultiDataBase bool - // Ephemeral means that filesystem sync operations should be avoided: - // data integrity in the face of a crash is not important. This option - // should typically be used in tests. - Ephemeral bool } // openDatabase opens both a disk-based key-value database such as leveldb or pebble, but also @@ -92,7 +88,7 @@ func openKeyValueDatabase(o openOptions) (ethdb.Database, error) { } if o.Type == rawdb.DBPebble || existingDb == rawdb.DBPebble { log.Info("Using pebble as the backing database") - return newPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral) + return newPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly) } if o.Type == rawdb.DBLeveldb || existingDb == rawdb.DBLeveldb { log.Info("Using leveldb as the backing database") @@ -100,7 +96,7 @@ func openKeyValueDatabase(o openOptions) (ethdb.Database, error) { } // No pre-existing database, no user-requested one either. Default to Pebble. log.Info("Defaulting to pebble as the backing database") - return newPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral) + return newPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly) } // newLevelDBDatabase creates a persistent key-value database without a freezer @@ -116,8 +112,8 @@ func newLevelDBDatabase(file string, cache int, handles int, namespace string, r // newPebbleDBDatabase creates a persistent key-value database without a freezer // moving immutable chain segments into cold storage. -func newPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool) (ethdb.Database, error) { - db, err := pebble.New(file, cache, handles, namespace, readonly, ephemeral) +func newPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly bool) (ethdb.Database, error) { + db, err := pebble.New(file, cache, handles, namespace, readonly) if err != nil { return nil, err } diff --git a/trie/trie_test.go b/trie/trie_test.go index 423ed30fe8..95043a3dce 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -824,6 +824,7 @@ func (s *spongeDb) NewBatch() ethdb.Batch { return &spongeBat func (s *spongeDb) NewBatchWithSize(size int) ethdb.Batch { return &spongeBatch{s} } func (s *spongeDb) Stat() (string, error) { panic("implement me") } func (s *spongeDb) Compact(start []byte, limit []byte) error { panic("implement me") } +func (s *spongeDb) SyncKeyValue() error { return nil } func (s *spongeDb) Close() error { return nil } func (s *spongeDb) Put(key []byte, value []byte) error { var ( diff --git a/triedb/pathdb/buffer.go b/triedb/pathdb/buffer.go index a62ab478e0..52393d871c 100644 --- a/triedb/pathdb/buffer.go +++ b/triedb/pathdb/buffer.go @@ -145,10 +145,13 @@ func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, node start = time.Now() batch = db.NewBatchWithSize(b.nodes.dbsize() * 11 / 10) // extra 10% for potential pebble internal stuff ) - // Explicitly sync the state freezer, ensuring that all written - // data is transferred to disk before updating the key-value store. + // Explicitly sync the state freezer to ensure all written data is persisted to disk + // before updating the key-value store. + // + // This step is crucial to guarantee that the corresponding state history remains + // available for state rollback. if freezer != nil { - if err := freezer.Sync(); err != nil { + if err := freezer.SyncAncient(); err != nil { return err } } diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 5116b5aba4..c4a4a05e5a 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -475,6 +475,15 @@ func (db *Database) Recover(root common.Hash) error { db.tree.reset(dl) } db.DeleteTrieJournal(db.diskdb) + + // Explicitly sync the key-value store to ensure all recent writes are + // flushed to disk. This step is crucial to prevent a scenario where + // recent key-value writes are lost due to an application panic, while + // the associated state histories have already been removed, resulting + // in the inability to perform a state rollback. + if err := db.diskdb.SyncKeyValue(); err != nil { + return err + } _, err := truncateFromHead(db.diskdb, db.freezer, dl.stateID()) if err != nil { return err