Skip to content

Commit

Permalink
Merge pull request #189 from OffchainLabs/bloom-file-roots
Browse files Browse the repository at this point in the history
Store bloom filter roots in the file content
  • Loading branch information
PlasmaPower authored Dec 29, 2022
2 parents 3da051a + 5521226 commit 2dfaad1
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 61 deletions.
49 changes: 40 additions & 9 deletions core/state/pruner/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
package pruner

import (
"bufio"
"encoding/binary"
"errors"
"fmt"
"io"
"os"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
bloomfilter "github.com/holiman/bloomfilter/v2"
)

Expand Down Expand Up @@ -73,27 +77,54 @@ func newStateBloomWithSize(size uint64) (*stateBloom, error) {

// NewStateBloomFromDisk loads the state bloom from the given file.
// In this case the assumption is held the bloom filter is complete.
func NewStateBloomFromDisk(filename string) (*stateBloom, error) {
bloom, _, err := bloomfilter.ReadFile(filename)
func NewStateBloomFromDisk(filename string) (*stateBloom, []common.Hash, error) {
f, err := os.Open(filename)
if err != nil {
return nil, err
return nil, nil, err
}
return &stateBloom{bloom: bloom}, nil
defer f.Close()
r := bufio.NewReader(f)
version := []byte{0}
_, err = io.ReadFull(r, version)
if err != nil {
return nil, nil, err
}
if version[0] != 0 {
return nil, nil, fmt.Errorf("unknown state bloom filter version %v", version[0])
}
var roots []common.Hash
err = rlp.Decode(r, &roots)
if err != nil {
return nil, nil, err
}
bloom, _, err := bloomfilter.ReadFrom(r)
if err != nil {
return nil, nil, err
}
return &stateBloom{bloom: bloom}, roots, nil
}

// Commit flushes the bloom filter content into the disk and marks the bloom
// as complete.
func (bloom *stateBloom) Commit(filename, tempname string) error {
// Write the bloom out into a temporary file
_, err := bloom.bloom.WriteFile(tempname)
func (bloom *stateBloom) Commit(filename, tempname string, roots []common.Hash) error {
f, err := os.OpenFile(tempname, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return err
}
// Ensure the file is synced to disk
f, err := os.OpenFile(tempname, os.O_RDWR, 0666)
_, err = f.Write([]byte{0}) // version
if err != nil {
return err
}
err = rlp.Encode(f, roots)
if err != nil {
return err
}
// Write the bloom out into a temporary file
_, err = bloom.bloom.WriteTo(f)
if err != nil {
return err
}
// Ensure the file is synced to disk
if err := f.Sync(); err != nil {
f.Close()
return err
Expand Down
75 changes: 23 additions & 52 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"

Expand All @@ -43,11 +42,8 @@ import (
)

const (
// stateBloomFilePrefix is the filename prefix of state bloom filter.
stateBloomFilePrefix = "statebloom"

// stateBloomFilePrefix is the filename suffix of state bloom filter.
stateBloomFileSuffix = "bf.gz"
// stateBloomFileName is the filename of state bloom filter.
stateBloomFileName = "statebloom.bf.gz"

// stateBloomFileTempSuffix is the filename suffix of state bloom filter
// while it is being written out to detect write aborts.
Expand Down Expand Up @@ -439,11 +435,11 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error {
// reuse it for pruning instead of generating a new one. It's
// mandatory because a part of state may already be deleted,
// the recovery procedure is necessary.
_, stateBloomRoots, err := findBloomFilter(p.datadir)
bloomExists, err := bloomFilterExists(p.datadir)
if err != nil {
return err
}
if len(stateBloomRoots) > 0 {
if bloomExists {
return RecoverPruning(p.datadir, p.db, p.trieCachePath)
}
// Retrieve all snapshot layers from the current HEAD.
Expand Down Expand Up @@ -525,13 +521,13 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error {
if err := extractGenesis(p.db, p.stateBloom); err != nil {
return err
}
filterName := bloomFilterName(p.datadir, roots)
filterName := bloomFilterPath(p.datadir)

log.Info("Writing state bloom to disk", "name", filterName)
if err := p.stateBloom.Commit(filterName, filterName+stateBloomFileTempSuffix); err != nil {
log.Info("Writing state bloom to disk", "name", filterName, "roots", roots)
if err := p.stateBloom.Commit(filterName, filterName+stateBloomFileTempSuffix, roots); err != nil {
return err
}
log.Info("State bloom filter committed", "name", filterName)
log.Info("State bloom filter committed", "name", filterName, "roots", roots)
return prune(p.snaptree, roots, p.db, p.stateBloom, filterName, start)
}

Expand All @@ -543,11 +539,11 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error {
// pruning **has to be resumed**. Otherwise a lot of dangling nodes may be left
// in the disk.
func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) error {
stateBloomPath, stateBloomRoots, err := findBloomFilter(datadir)
exists, err := bloomFilterExists(datadir)
if err != nil {
return err
}
if stateBloomPath == "" {
if !exists {
return nil // nothing to recover
}
headBlock := rawdb.ReadHeadBlock(db)
Expand All @@ -566,11 +562,12 @@ func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) err
if err != nil {
return err // The relevant snapshot(s) might not exist
}
stateBloom, err := NewStateBloomFromDisk(stateBloomPath)
stateBloomPath := bloomFilterPath(datadir)
stateBloom, stateBloomRoots, err := NewStateBloomFromDisk(stateBloomPath)
if err != nil {
return err
}
log.Info("Loaded state bloom filter", "path", stateBloomPath)
log.Info("Loaded state bloom filter", "path", stateBloomPath, "roots", stateBloomRoots)

// Before start the pruning, delete the clean trie cache first.
// It's necessary otherwise in the next restart we will hit the
Expand All @@ -595,45 +592,19 @@ func extractGenesis(db ethdb.Database, stateBloom *stateBloom) error {
return dumpRawTrieDescendants(db, genesis.Root(), stateBloom)
}

func bloomFilterName(datadir string, hashes []common.Hash) string {
path := filepath.Join(datadir, stateBloomFilePrefix)
for _, hash := range hashes {
path += "." + hash.String()
}
return path + "." + stateBloomFileSuffix
}

func isBloomFilter(filename string) (bool, []common.Hash) {
filename = filepath.Base(filename)
if strings.HasPrefix(filename, stateBloomFilePrefix) && strings.HasSuffix(filename, stateBloomFileSuffix) {
parts := strings.Split(filename[len(stateBloomFilePrefix)+1:len(filename)-len(stateBloomFileSuffix)-1], ".")
var roots []common.Hash
for _, part := range parts {
roots = append(roots, common.HexToHash(part))
}
return true, roots
}
return false, nil
func bloomFilterPath(datadir string) string {
return filepath.Join(datadir, stateBloomFileName)
}

func findBloomFilter(datadir string) (string, []common.Hash, error) {
var (
stateBloomPath string
stateBloomRoots []common.Hash
)
if err := filepath.Walk(datadir, func(path string, info os.FileInfo, err error) error {
if info != nil && !info.IsDir() {
ok, roots := isBloomFilter(path)
if ok {
stateBloomPath = path
stateBloomRoots = roots
}
}
return nil
}); err != nil {
return "", nil, err
func bloomFilterExists(datadir string) (bool, error) {
_, err := os.Stat(bloomFilterPath(datadir))
if errors.Is(err, os.ErrNotExist) {
return false, nil
} else if err != nil {
return false, err
} else {
return true, nil
}
return stateBloomPath, stateBloomRoots, nil
}

const warningLog = `
Expand Down

0 comments on commit 2dfaad1

Please sign in to comment.