Skip to content

Commit

Permalink
feat(pruner/light): implement light pruning (#3388)
Browse files Browse the repository at this point in the history
Implements light pruning in before the Shwap era.

It recursively traverses the tree and deletes the NMT nodes as it goes.

The time to prune the whole history takes more than 24 hours.
The time to prune recent heights (every 5 mins) takes `~0.5s`
The historical pruning reduced disk usage from ~62GB to ~ 38GB.
The RAM usage for active pruning is stable at `~70MB`
  • Loading branch information
Wondertan authored and ramin committed Jun 6, 2024
1 parent 19f7610 commit 97dc30b
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 6 deletions.
1 change: 1 addition & 0 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
case node.Light:
return fx.Module("prune",
baseComponents,
fx.Provide(light.NewPruner),
)
case node.Full:
opts := baseComponents
Expand Down
39 changes: 34 additions & 5 deletions pruner/light/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,44 @@ package light
import (
"context"

"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/go-datastore"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/ipld"
)

type Pruner struct{}
type Pruner struct {
bserv blockservice.BlockService
ds datastore.Datastore
}

func NewPruner(bstore blockstore.Blockstore, ds datastore.Batching) pruner.Pruner {
return &Pruner{bserv: ipld.NewBlockservice(bstore, nil), ds: ds}
}

func (p *Pruner) Prune(ctx context.Context, h *header.ExtendedHeader) error {
dah := h.DAH
if share.DataHash(dah.Hash()).IsEmptyRoot() {
return nil
}

var roots [][]byte
roots = append(roots, h.DAH.RowRoots...)
roots = append(roots, h.DAH.ColumnRoots...)
for _, root := range roots {
cid := ipld.MustCidFromNamespacedSha256(root)
if err := ipld.DeleteNode(ctx, p.bserv, cid); err != nil {
return err
}
}

func NewPruner() *Pruner {
return &Pruner{}
return p.ds.Delete(ctx, rootKey(dah))
}

func (p *Pruner) Prune(context.Context, *header.ExtendedHeader) error {
return nil
func rootKey(root *share.Root) datastore.Key {
return datastore.NewKey(root.String())
}
13 changes: 12 additions & 1 deletion pruner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,16 @@ func (s *Service) run() {
}

for {
lastPrunedHeader = s.prune(s.ctx, lastPrunedHeader)
// pruning may take a while beyond ticker's time
// and this ensures we don't do idle spins right after the pruning
// and ensures there is always pruneCycle period between each run
ticker.Reset(s.params.pruneCycle)

select {
case <-s.ctx.Done():
return
case <-ticker.C:
lastPrunedHeader = s.prune(s.ctx, lastPrunedHeader)
}
}
}
Expand All @@ -128,6 +133,12 @@ func (s *Service) prune(
// prioritize retrying previously-failed headers
s.retryFailed(s.ctx)

now := time.Now()
log.Debug("pruning round start")
defer func() {
log.Debugw("pruning round finished", "took", time.Since(now))
}()

for {
select {
case <-s.ctx.Done():
Expand Down
29 changes: 29 additions & 0 deletions share/ipld/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package ipld

import (
"context"
"errors"

"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/go-cid"
)

// DeleteNode deletes the Node behind the CID. It also recursively deletes all other Nodes linked
// behind the Node.
func DeleteNode(ctx context.Context, bserv blockservice.BlockService, cid cid.Cid) error {
blk, err := GetNode(ctx, bserv, cid)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil
}
return err
}

for _, lnk := range blk.Links() {
if err := DeleteNode(ctx, bserv, lnk.Cid); err != nil {
return err
}
}

return bserv.DeleteBlock(ctx, cid)
}
101 changes: 101 additions & 0 deletions share/ipld/delete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package ipld

import (
"context"
"testing"
"time"

"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share/sharetest"
)

func TestDeleteNode_FullSquare(t *testing.T) {
const size = 8
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
bServ := NewMemBlockservice()

shares := sharetest.RandShares(t, size*size)
eds, err := AddShares(ctx, shares, bServ)
require.NoError(t, err)

keys, err := bServ.Blockstore().AllKeysChan(ctx)
require.NoError(t, err)

var preDeleteCount int
for range keys {
preDeleteCount++
}
require.NotZero(t, preDeleteCount)

rowRoots, err := eds.RowRoots()
require.NoError(t, err)
for _, root := range rowRoots {
err := DeleteNode(ctx, bServ, MustCidFromNamespacedSha256(root))
require.NoError(t, err)
}
colRoots, err := eds.ColRoots()
require.NoError(t, err)
for _, root := range colRoots {
err := DeleteNode(ctx, bServ, MustCidFromNamespacedSha256(root))
require.NoError(t, err)
}

keys, err = bServ.Blockstore().AllKeysChan(ctx)
require.NoError(t, err)

var postDeleteCount int
for range keys {
postDeleteCount++
}
require.Zero(t, postDeleteCount)
}

func TestDeleteNode_Sample(t *testing.T) {
const size = 8
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
full := NewMemBlockservice()

shares := sharetest.RandShares(t, size*size)
eds, err := AddShares(ctx, shares, full)
require.NoError(t, err)

rowRoots, err := eds.RowRoots()
require.NoError(t, err)

bstore := blockstore.NewBlockstore(sync.MutexWrap(datastore.NewMapDatastore()))
light := NewBlockservice(bstore, offline.Exchange(full.Blockstore()))

cid := MustCidFromNamespacedSha256(rowRoots[0])
_, err = GetShare(ctx, light, cid, 0, len(rowRoots))
require.NoError(t, err)

keys, err := light.Blockstore().AllKeysChan(ctx)
require.NoError(t, err)

var preDeleteCount int
for range keys {
preDeleteCount++
}
require.NotZero(t, preDeleteCount)

for _, root := range rowRoots {
err := DeleteNode(ctx, light, MustCidFromNamespacedSha256(root))
require.NoError(t, err)
}

keys, err = light.Blockstore().AllKeysChan(ctx)
require.NoError(t, err)

var postDeleteCount int
for range keys {
postDeleteCount++
}
require.Zero(t, postDeleteCount)
}

0 comments on commit 97dc30b

Please sign in to comment.