Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:splitstore:Badger GC of hotstore command #10387

Merged
merged 6 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,14 @@ type FullNode interface {
// nodes.
ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg ChainExportConfig) error //perm:admin

// ChainPrune prunes the stored chain state and garbage collects; only supported if you
// ChainPrune forces compaction on cold store and garbage collects; only supported if you
// are using the splitstore
ChainPrune(ctx context.Context, opts PruneOpts) error //perm:admin

// ChainHotGC does online (badger) GC on the hot store; only supported if you are using
// the splitstore
ChainHotGC(ctx context.Context, opts HotGCOpts) error //perm:admin

// ChainCheckBlockstore performs an (asynchronous) health check on the chain/state blockstore
// if supported by the underlying implementation.
ChainCheckBlockstore(context.Context) error //perm:admin
Expand Down Expand Up @@ -1354,6 +1358,12 @@ type PruneOpts struct {
RetainState int64
}

type HotGCOpts struct {
Threshold float64
Periodic bool
Moving bool
}

type EthTxReceipt struct {
TransactionHash ethtypes.EthHash `json:"transactionHash"`
TransactionIndex ethtypes.EthUint64 `json:"transactionIndex"`
Expand Down
14 changes: 14 additions & 0 deletions api/mocks/mock_full.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 55 additions & 5 deletions blockstore/badger/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
pool "github.com/libp2p/go-buffer-pool"
"github.com/multiformats/go-base32"
"go.uber.org/zap"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/blockstore"
)
Expand All @@ -44,7 +45,8 @@ const (
// MemoryMap is equivalent to badger/options.MemoryMap.
MemoryMap = options.MemoryMap
// LoadToRAM is equivalent to badger/options.LoadToRAM.
LoadToRAM = options.LoadToRAM
LoadToRAM = options.LoadToRAM
defaultGCThreshold = 0.125
)

// Options embeds the badger options themselves, and augments them with
Expand Down Expand Up @@ -439,7 +441,7 @@ func (b *Blockstore) deleteDB(path string) {
}
}

func (b *Blockstore) onlineGC() error {
func (b *Blockstore) onlineGC(ctx context.Context, threshold float64) error {
b.lockDB()
defer b.unlockDB()

Expand All @@ -448,14 +450,22 @@ func (b *Blockstore) onlineGC() error {
if nworkers < 2 {
nworkers = 2
}
if nworkers > 7 { // max out at 1 goroutine per badger level
nworkers = 7
}

err := b.db.Flatten(nworkers)
if err != nil {
return err
}

for err == nil {
err = b.db.RunValueLogGC(0.125)
select {
case <-ctx.Done():
err = ctx.Err()
default:
err = b.db.RunValueLogGC(threshold)
}
}

if err == badger.ErrNoRewrite {
Expand All @@ -468,7 +478,7 @@ func (b *Blockstore) onlineGC() error {

// CollectGarbage compacts and runs garbage collection on the value log;
// implements the BlockstoreGC trait
func (b *Blockstore) CollectGarbage(opts ...blockstore.BlockstoreGCOption) error {
func (b *Blockstore) CollectGarbage(ctx context.Context, opts ...blockstore.BlockstoreGCOption) error {
if err := b.access(); err != nil {
return err
}
Expand All @@ -485,8 +495,48 @@ func (b *Blockstore) CollectGarbage(opts ...blockstore.BlockstoreGCOption) error
if options.FullGC {
return b.movingGC()
}
threshold := options.Threshold
if threshold == 0 {
threshold = defaultGCThreshold
}
return b.onlineGC(ctx, threshold)
}

// GCOnce runs garbage collection on the value log;
// implements BlockstoreGCOnce trait
func (b *Blockstore) GCOnce(ctx context.Context, opts ...blockstore.BlockstoreGCOption) error {
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()

return b.onlineGC()
var options blockstore.BlockstoreGCOptions
for _, opt := range opts {
err := opt(&options)
if err != nil {
return err
}
}
if options.FullGC {
return xerrors.Errorf("FullGC option specified for GCOnce but full GC is non incremental")
}

threshold := options.Threshold
if threshold == 0 {
threshold = defaultGCThreshold
}

b.lockDB()
defer b.unlockDB()

// Note no compaction needed before single GC as we will hit at most one vlog anyway
err := b.db.RunValueLogGC(threshold)
if err == badger.ErrNoRewrite {
// not really an error in this case, it signals the end of GC
return nil
}

return err
}

// Size returns the aggregate size of the blockstore
Expand Down
4 changes: 2 additions & 2 deletions blockstore/badger/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func testMove(t *testing.T, optsF func(string) Options) {
return nil
})
g.Go(func() error {
return db.CollectGarbage(blockstore.WithFullGC(true))
return db.CollectGarbage(ctx, blockstore.WithFullGC(true))
})

err = g.Wait()
Expand Down Expand Up @@ -230,7 +230,7 @@ func testMove(t *testing.T, optsF func(string) Options) {
checkPath()

// now do another FullGC to test the double move and following of symlinks
if err := db.CollectGarbage(blockstore.WithFullGC(true)); err != nil {
if err := db.CollectGarbage(ctx, blockstore.WithFullGC(true)); err != nil {
t.Fatal(err)
}

Expand Down
16 changes: 15 additions & 1 deletion blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ type BlockstoreIterator interface {

// BlockstoreGC is a trait for blockstores that support online garbage collection
type BlockstoreGC interface {
CollectGarbage(options ...BlockstoreGCOption) error
CollectGarbage(ctx context.Context, options ...BlockstoreGCOption) error
}

// BlockstoreGCOnce is a trait for a blockstore that supports incremental online garbage collection
type BlockstoreGCOnce interface {
GCOnce(ctx context.Context, options ...BlockstoreGCOption) error
}

// BlockstoreGCOption is a functional interface for controlling blockstore GC options
Expand All @@ -45,6 +50,8 @@ type BlockstoreGCOption = func(*BlockstoreGCOptions) error
// BlockstoreGCOptions is a struct with GC options
type BlockstoreGCOptions struct {
FullGC bool
// fraction of garbage in badger vlog before its worth processing in online GC
Threshold float64
}

func WithFullGC(fullgc bool) BlockstoreGCOption {
Expand All @@ -54,6 +61,13 @@ func WithFullGC(fullgc bool) BlockstoreGCOption {
}
}

func WithThreshold(threshold float64) BlockstoreGCOption {
return func(opts *BlockstoreGCOptions) error {
opts.Threshold = threshold
return nil
}
}

// BlockstoreSize is a trait for on-disk blockstores that can report their size
type BlockstoreSize interface {
Size() (int64, error)
Expand Down
2 changes: 1 addition & 1 deletion blockstore/splitstore/splitstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {

// we are done; do some housekeeping
s.endTxnProtect()
s.gcHotstore()
s.gcHotAfterCompaction()

err = s.setBaseEpoch(boundaryEpoch)
if err != nil {
Expand Down
20 changes: 18 additions & 2 deletions blockstore/splitstore/splitstore_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
bstore "github.com/filecoin-project/lotus/blockstore"
)

func (s *SplitStore) gcHotstore() {
func (s *SplitStore) gcHotAfterCompaction() {
var opts []bstore.BlockstoreGCOption
if s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0 {
opts = append(opts, bstore.WithFullGC(true))
Expand All @@ -23,7 +23,7 @@ func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreG
log.Info("garbage collecting blockstore")
startGC := time.Now()

if err := gc.CollectGarbage(opts...); err != nil {
if err := gc.CollectGarbage(s.ctx, opts...); err != nil {
return err
}

Expand All @@ -33,3 +33,19 @@ func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreG

return fmt.Errorf("blockstore doesn't support garbage collection: %T", b)
}

func (s *SplitStore) gcBlockstoreOnce(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error {
if gc, ok := b.(bstore.BlockstoreGCOnce); ok {
log.Debug("gc blockstore once")
startGC := time.Now()

if err := gc.GCOnce(s.ctx, opts...); err != nil {
return err
}

log.Debugw("gc blockstore once done", "took", time.Since(startGC))
return nil
}

return fmt.Errorf("blockstore doesn't support gc once: %T", b)
}
17 changes: 17 additions & 0 deletions blockstore/splitstore/splitstore_prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,23 @@ var (
PruneThreshold = 7 * build.Finality
)

// GCHotstore runs online GC on the chain state in the hotstore according the to options specified
func (s *SplitStore) GCHotStore(opts api.HotGCOpts) error {
if opts.Moving {
gcOpts := []bstore.BlockstoreGCOption{bstore.WithFullGC(true)}
return s.gcBlockstore(s.hot, gcOpts)
}

gcOpts := []bstore.BlockstoreGCOption{bstore.WithThreshold(opts.Threshold)}
var err error
if opts.Periodic {
err = s.gcBlockstore(s.hot, gcOpts)
} else {
err = s.gcBlockstoreOnce(s.hot, gcOpts)
}
return err
}

// PruneChain instructs the SplitStore to prune chain state in the coldstore, according to the
// options specified.
func (s *SplitStore) PruneChain(opts api.PruneOpts) error {
Expand Down
Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/gateway.json.gz
Binary file not shown.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
59 changes: 58 additions & 1 deletion cli/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1591,7 +1591,64 @@ func createExportFile(app *cli.App, path string) (io.WriteCloser, error) {

var ChainPruneCmd = &cli.Command{
Name: "prune",
Usage: "prune the stored chain state and perform garbage collection",
Usage: "splitstore gc",
Subcommands: []*cli.Command{
chainPruneColdCmd,
chainPruneHotGCCmd,
chainPruneHotMovingGCCmd,
},
}

var chainPruneHotGCCmd = &cli.Command{
Name: "hot",
Usage: "run online (badger vlog) garbage collection on hotstore",
Flags: []cli.Flag{
&cli.Float64Flag{Name: "threshold", Value: 0.01, Usage: "Threshold of vlog garbage for gc"},
&cli.BoolFlag{Name: "periodic", Value: false, Usage: "Run periodic gc over multiple vlogs. Otherwise run gc once"},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
opts := lapi.HotGCOpts{}
opts.Periodic = cctx.Bool("periodic")
opts.Threshold = cctx.Float64("threshold")

gcStart := time.Now()
err = api.ChainHotGC(ctx, opts)
gcTime := time.Since(gcStart)
fmt.Printf("Online GC took %v (periodic <%t> threshold <%f>)", gcTime, opts.Periodic, opts.Threshold)
return err
},
}

var chainPruneHotMovingGCCmd = &cli.Command{
Name: "hot-moving",
Usage: "run moving gc on hotstore",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
opts := lapi.HotGCOpts{}
opts.Moving = true

gcStart := time.Now()
err = api.ChainHotGC(ctx, opts)
gcTime := time.Since(gcStart)
fmt.Printf("Moving GC took %v", gcTime)
return err
},
}

var chainPruneColdCmd = &cli.Command{
Name: "compact-cold",
Usage: "force splitstore compaction on cold store state and run gc",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "online-gc",
Expand Down
Loading