Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/geth/dbcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,7 @@ func hbss2pbss(ctx *cli.Context) error {
defer stack.Close()

db := utils.MakeChainDatabase(ctx, stack, false, false)
db.BlockStore().Sync()
db.BlockStore().SyncAncient()
stateDiskDb := db.StateStore()
defer db.Close()

Expand Down
8 changes: 4 additions & 4 deletions core/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) {
if !disk {
db = rawdb.NewMemoryDatabase()
} else {
pdb, err := pebble.New(b.TempDir(), 128, 128, "", false, true)
pdb, err := pebble.New(b.TempDir(), 128, 128, "", false)
if err != nil {
b.Fatalf("cannot create temporary database: %v", err)
}
Expand Down Expand Up @@ -304,7 +304,7 @@ func makeChainForBench(db ethdb.Database, genesis *Genesis, full bool, count uin
func benchWriteChain(b *testing.B, full bool, count uint64) {
genesis := &Genesis{Config: params.AllEthashProtocolChanges}
for i := 0; i < b.N; i++ {
pdb, err := pebble.New(b.TempDir(), 1024, 128, "", false, true)
pdb, err := pebble.New(b.TempDir(), 1024, 128, "", false)
if err != nil {
b.Fatalf("error opening database: %v", err)
}
Expand All @@ -317,7 +317,7 @@ func benchWriteChain(b *testing.B, full bool, count uint64) {
func benchReadChain(b *testing.B, full bool, count uint64) {
dir := b.TempDir()

pdb, err := pebble.New(dir, 1024, 128, "", false, true)
pdb, err := pebble.New(dir, 1024, 128, "", false)
if err != nil {
b.Fatalf("error opening database: %v", err)
}
Expand All @@ -333,7 +333,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
pdb, err = pebble.New(dir, 1024, 128, "", false, true)
pdb, err = pebble.New(dir, 1024, 128, "", false)
if err != nil {
b.Fatalf("error opening database: %v", err)
}
Expand Down
21 changes: 10 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,17 +1119,16 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
// Ignore the error here since light client won't hit this path
frozen, _ := bc.db.BlockStore().Ancients()
if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
if _, err := bc.db.BlockStore().TruncateHead(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
// Remove the hash <-> number mapping from the active store.
rawdb.DeleteHeaderNumber(db, hash)
// The chain segment, such as the block header, canonical hash,
// body, and receipt, will be removed from the ancient store
// in one go.
//
// The hash-to-number mapping in the key-value store will be
// removed by the hc.SetHead function.
} else {
// Remove relative body and receipts from the active store.
// The header, total difficulty and canonical hash will be
// removed in the hc.SetHead function.
// Remove the associated body and receipts from the key-value store.
// The header, hash-to-number mapping, and canonical hash will be
// removed by the hc.SetHead function.
rawdb.DeleteBody(db, hash, num)
rawdb.DeleteBlobSidecars(db, hash, num)
rawdb.DeleteReceipts(db, hash, num)
Expand Down Expand Up @@ -1599,7 +1598,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
size += writeSize

// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.BlockStore().Sync(); err != nil {
if err := bc.db.BlockStore().SyncAncient(); err != nil {
return 0, err
}
// Update the current snap block because all block data is now present in DB.
Expand Down
8 changes: 4 additions & 4 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1767,7 +1767,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
datadir := t.TempDir()
ancient := filepath.Join(datadir, "ancient")

pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
Expand Down Expand Up @@ -1861,7 +1861,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
chain.stopWithoutSaving()

// Start a new blockchain back up and see where the repair leads us
pdb, err = pebble.New(datadir, 0, 0, "", false, true)
pdb, err = pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to reopen persistent key-value database: %v", err)
}
Expand Down Expand Up @@ -1926,7 +1926,7 @@ func testIssue23496(t *testing.T, scheme string) {
datadir := t.TempDir()
ancient := filepath.Join(datadir, "ancient")

pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
Expand Down Expand Up @@ -1984,7 +1984,7 @@ func testIssue23496(t *testing.T, scheme string) {
chain.stopWithoutSaving()

// Start a new blockchain back up and see where the repair leads us
pdb, err = pebble.New(datadir, 0, 0, "", false, true)
pdb, err = pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to reopen persistent key-value database: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1971,7 +1971,7 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme
datadir := t.TempDir()
ancient := filepath.Join(datadir, "ancient")

pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
datadir := t.TempDir()
ancient := filepath.Join(datadir, "ancient")

pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func (snaptest *crashSnapshotTest) test(t *testing.T) {
chain.triedb.Close()

// Start a new blockchain back up and see where the repair leads us
pdb, err := pebble.New(snaptest.datadir, 0, 0, "", false, true)
pdb, err := pebble.New(snaptest.datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2670,7 +2670,7 @@ func testSideImportPrunedBlocks(t *testing.T, scheme string) {
datadir := t.TempDir()
ancient := path.Join(datadir, "ancient")

pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
Expand Down
35 changes: 34 additions & 1 deletion core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,18 +695,51 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat
hashes = append(hashes, hdr.Hash())
}
for _, hash := range hashes {
// Remove the associated block body and receipts if required.
//
// If the block is in the chain freezer, then this delete operation
// is actually ineffective.
if delFn != nil {
delFn(blockBatch, hash, num)
}
// Remove the hash->number mapping along with the header itself
rawdb.DeleteHeader(blockBatch, hash, num)
rawdb.DeleteTd(blockBatch, hash, num)
}
// Remove the number->hash mapping
rawdb.DeleteCanonicalHash(blockBatch, num)
}
}
// Flush all accumulated deletions.
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to rewind block", "error", err)
log.Crit("Failed to commit batch in setHead", "err", err)
}
// Explicitly flush the pending writes in the key-value store to disk, ensuring
// data durability of the previous deletions.
if err := hc.chainDb.SyncKeyValue(); err != nil {
log.Crit("Failed to sync the key-value store in setHead", "err", err)
}
// Truncate the excessive chain segments in the ancient store.
// These are actually deferred deletions from the loop above.
//
// This step must be performed after synchronizing the key-value store;
// otherwise, in the event of a panic, it's theoretically possible to
// lose recent key-value store writes while the ancient store deletions
// remain, leading to data inconsistency, e.g., the gap between the key
// value store and ancient can be created due to unclean shutdown.
if delFn != nil {
// Ignore the error here since light client won't hit this path
frozen, _ := hc.chainDb.Ancients()
header := hc.CurrentHeader()

// Truncate the excessive chain segment above the current chain head
// in the ancient store.
if header.Number.Uint64()+1 < frozen {
_, err := hc.chainDb.BlockStore().TruncateHead(header.Number.Uint64() + 1)
if err != nil {
log.Crit("Failed to truncate head block", "err", err)
}
}
}
// Clear out any stale content from the caches
hc.headerCache.Purge()
Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/chain_freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
continue
}
// Batch of blocks have been frozen, flush them before wiping from key-value store
if err := f.Sync(); err != nil {
if err := f.SyncAncient(); err != nil {
log.Crit("Failed to flush frozen tables", "err", err)
}
// Wipe out all data from the active database
Expand Down
8 changes: 4 additions & 4 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ func (db *nofreezedb) ResetTable(kind string, startAt uint64, onlyEmpty bool) er
return errNotSupported
}

// Sync returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) Sync() error {
// SyncAncient returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) SyncAncient() error {
return errNotSupported
}

Expand Down Expand Up @@ -391,8 +391,8 @@ func (db *emptyfreezedb) ResetTable(kind string, startAt uint64, onlyEmpty bool)
return nil
}

// Sync returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) Sync() error {
// SyncAncient returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) SyncAncient() error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ func (f *Freezer) TruncateTail(tail uint64) (uint64, error) {
return old, nil
}

// Sync flushes all data tables to disk.
func (f *Freezer) Sync() error {
// SyncAncient flushes all data tables to disk.
func (f *Freezer) SyncAncient() error {
var errs []error
for _, table := range f.tables {
if err := table.Sync(); err != nil {
Expand Down Expand Up @@ -600,7 +600,7 @@ func (f *Freezer) ResetTable(kind string, startAt uint64, onlyEmpty bool) error
return nil
}

if err := f.Sync(); err != nil {
if err := f.SyncAncient(); err != nil {
return err
}
nt, err := t.resetItems(startAt - f.offset)
Expand Down
4 changes: 2 additions & 2 deletions core/rawdb/freezer_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,8 @@ func (f *MemoryFreezer) TruncateTail(tail uint64) (uint64, error) {
return old, nil
}

// Sync flushes all data tables to disk.
func (f *MemoryFreezer) Sync() error {
// SyncAncient flushes all data tables to disk.
func (f *MemoryFreezer) SyncAncient() error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions core/rawdb/freezer_resettable.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,12 @@ func (f *resettableFreezer) ResetTable(kind string, startAt uint64, onlyEmpty bo
return f.freezer.ResetTable(kind, startAt, onlyEmpty)
}

// Sync flushes all data tables to disk.
func (f *resettableFreezer) Sync() error {
// SyncAncient flushes all data tables to disk.
func (f *resettableFreezer) SyncAncient() error {
f.lock.RLock()
defer f.lock.RUnlock()

return f.freezer.Sync()
return f.freezer.SyncAncient()
}

// AncientDatadir returns the path of the ancient store.
Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/freezer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func TestFreezerCloseSync(t *testing.T) {
if err := f.Close(); err != nil {
t.Fatal(err)
}
if err := f.Sync(); err == nil {
if err := f.SyncAncient(); err == nil {
t.Fatalf("want error, have nil")
} else if have, want := err.Error(), "[closed closed]"; have != want {
t.Fatalf("want %v, have %v", have, want)
Expand Down
10 changes: 5 additions & 5 deletions core/rawdb/prunedfreezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (f *prunedfreezer) repair(datadir string) error {
log.Info("Read ancient db item counts", "items", minItems, "frozen", maxOffset)

atomic.StoreUint64(&f.frozen, maxOffset)
if err := f.Sync(); err != nil {
if err := f.SyncAncient(); err != nil {
return nil
}
return nil
Expand All @@ -117,7 +117,7 @@ func (f *prunedfreezer) Close() error {
var err error
f.closeOnce.Do(func() {
close(f.quit)
f.Sync()
f.SyncAncient()
err = f.instanceLock.Release()
})
return err
Expand Down Expand Up @@ -200,8 +200,8 @@ func (f *prunedfreezer) TruncateTail(tail uint64) (uint64, error) {
return 0, errNotSupported
}

// Sync flushes meta data tables to disk.
func (f *prunedfreezer) Sync() error {
// SyncAncient flushes meta data tables to disk.
func (f *prunedfreezer) SyncAncient() error {
WriteFrozenOfAncientFreezer(f.db, atomic.LoadUint64(&f.frozen))
// compatible offline prune blocks tool
WriteOffSetOfCurrentAncientFreezer(f.db, atomic.LoadUint64(&f.frozen))
Expand Down Expand Up @@ -315,7 +315,7 @@ func (f *prunedfreezer) freeze() {
ancients = append(ancients, hash)
}
// Batch of blocks have been frozen, flush them before wiping from leveldb
if err := f.Sync(); err != nil {
if err := f.SyncAncient(); err != nil {
log.Crit("Failed to flush frozen tables", "err", err)
}
backoff = f.frozen-first >= freezerBatchLimit
Expand Down
12 changes: 9 additions & 3 deletions core/rawdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ func (t *table) TruncateTail(items uint64) (uint64, error) {
return t.db.TruncateTail(items)
}

// Sync is a noop passthrough that just forwards the request to the underlying
// SyncAncient is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) Sync() error {
return t.db.Sync()
func (t *table) SyncAncient() error {
return t.db.SyncAncient()
}

// AncientDatadir returns the ancient datadir of the underlying database.
Expand Down Expand Up @@ -224,6 +224,12 @@ func (t *table) Compact(start []byte, limit []byte) error {
return t.db.Compact(start, limit)
}

// SyncKeyValue ensures that all pending writes are flushed to disk,
// guaranteeing data durability up to the point.
func (t *table) SyncKeyValue() error {
return t.db.SyncKeyValue()
}

// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called, each operation prefixing all keys with the
// pre-configured string.
Expand Down
14 changes: 11 additions & 3 deletions ethdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ type KeyValueStater interface {
Stat() (string, error)
}

// KeyValueSyncer wraps the SyncKeyValue method of a backing data store.
type KeyValueSyncer interface {
// SyncKeyValue ensures that all pending writes are flushed to disk,
// guaranteeing data durability up to the point.
SyncKeyValue() error
}

// Compacter wraps the Compact method of a backing data store.
type Compacter interface {
// Compact flattens the underlying data store for the given key range. In essence,
Expand All @@ -72,6 +79,7 @@ type KeyValueStore interface {
KeyValueReader
KeyValueWriter
KeyValueStater
KeyValueSyncer
KeyValueRangeDeleter
Batcher
Iteratee
Expand Down Expand Up @@ -129,6 +137,9 @@ type AncientWriter interface {
// The integer return value is the total size of the written data.
ModifyAncients(func(AncientWriteOp) error) (int64, error)

// SyncAncient flushes all in-memory ancient store data to disk.
SyncAncient() error

// TruncateHead discards all but the first n ancient data from the ancient store.
// After the truncation, the latest item can be accessed it item_n-1(start from 0).
TruncateHead(n uint64) (uint64, error)
Expand All @@ -140,9 +151,6 @@ type AncientWriter interface {
// will be removed all together.
TruncateTail(n uint64) (uint64, error)

// Sync flushes all in-memory ancient store data to disk.
Sync() error

// TruncateTableTail will truncate certain table to new tail
TruncateTableTail(kind string, tail uint64) (uint64, error)

Expand Down
Loading