Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(share/availability): Move window and constants to share/availability pkg #3906

Merged
merged 6 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"context"
"fmt"
"time"

"github.com/tendermint/tendermint/types"

Expand All @@ -15,9 +16,8 @@ import (
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/store"
)

Expand Down Expand Up @@ -61,16 +61,16 @@ func storeEDS(
eh *header.ExtendedHeader,
eds *rsmt2d.ExtendedDataSquare,
store *store.Store,
window pruner.AvailabilityWindow,
window time.Duration,
) error {
if !pruner.IsWithinAvailabilityWindow(eh.Time(), window) {
if !availability.IsWithinWindow(eh.Time(), window) {
log.Debugw("skipping storage of historic block", "height", eh.Height())
return nil
}

var err error
// archival nodes should not store Q4 outside the availability window.
if pruner.IsWithinAvailabilityWindow(eh.Time(), full.Window) {
if availability.IsWithinWindow(eh.Time(), availability.StorageWindow) {
err = store.PutODSQ4(ctx, eh.DAH, eh.Height(), eds)
} else {
err = store.PutODS(ctx, eh.DAH, eh.Height(), eds)
Expand Down
3 changes: 1 addition & 2 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/store"
)

Expand All @@ -23,7 +22,7 @@ type Exchange struct {
store *store.Store
construct header.ConstructFn

availabilityWindow pruner.AvailabilityWindow
availabilityWindow time.Duration

metrics *exchangeMetrics
}
Expand Down
3 changes: 1 addition & 2 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/celestiaorg/celestia-app/v3/test/util/testnode"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/store"
)
Expand Down Expand Up @@ -82,7 +81,7 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) {
fetcher,
store,
header.MakeExtendedHeader,
WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond)), // all blocks will be "historic"
WithAvailabilityWindow(time.Nanosecond), // all blocks will be "historic"
)
require.NoError(t, err)

Expand Down
3 changes: 1 addition & 2 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
"github.com/celestiaorg/celestia-node/store"
)
Expand All @@ -38,7 +37,7 @@ type Listener struct {

construct header.ConstructFn
store *store.Store
availabilityWindow pruner.AvailabilityWindow
availabilityWindow time.Duration

headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
hashBroadcaster shrexsub.BroadcastFn
Expand Down
3 changes: 1 addition & 2 deletions core/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/celestiaorg/celestia-node/header"
nodep2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
"github.com/celestiaorg/celestia-node/store"
)
Expand Down Expand Up @@ -118,7 +117,7 @@ func TestListener_DoesNotStoreHistoric(t *testing.T) {
require.NoError(t, err)

// create Listener and start listening
opt := WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond))
opt := WithAvailabilityWindow(time.Nanosecond)
cl := createListener(ctx, t, fetcher, ps0, eds, store, testChainID, opt)

dataRoots := generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)
Expand Down
10 changes: 5 additions & 5 deletions core/option.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package core

import (
"time"

"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/archival"
)

type Option func(*params)

type params struct {
metrics bool
chainID string
availabilityWindow pruner.AvailabilityWindow
availabilityWindow time.Duration
}

func defaultParams() params {
return params{
availabilityWindow: archival.Window,
availabilityWindow: time.Duration(0),
}
}

Expand All @@ -34,7 +34,7 @@ func WithChainID(id p2p.Network) Option {
}
}

func WithAvailabilityWindow(window pruner.AvailabilityWindow) Option {
func WithAvailabilityWindow(window time.Duration) Option {
return func(p *params) {
p.availabilityWindow = window
}
Expand Down
4 changes: 2 additions & 2 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
)
Expand Down Expand Up @@ -162,7 +162,7 @@ func (d *DASer) Stop(ctx context.Context) error {
func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error {
// short-circuit if pruning is enabled and the header is outside the
// availability window
if !pruner.IsWithinAvailabilityWindow(h.Time(), d.params.samplingWindow) {
if !availability.IsWithinWindow(h.Time(), d.params.samplingWindow) {
log.Debugw("skipping header outside sampling window", "height", h.Height(),
"time", h.Time())
return errOutsideSamplingWindow
Expand Down
6 changes: 3 additions & 3 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/headertest"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/availability/mocks"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
)
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestDASer_SamplingWindow(t *testing.T) {

// create and start DASer
daser, err := NewDASer(avail, sub, getter, ds, fserv, newBroadcastMock(1),
WithSamplingWindow(pruner.AvailabilityWindow(time.Second)))
WithSamplingWindow(time.Second))
require.NoError(t, err)

tests := []struct {
Expand All @@ -276,7 +276,7 @@ func TestDASer_SamplingWindow(t *testing.T) {
assert.Equal(
t,
tt.withinWindow,
pruner.IsWithinAvailabilityWindow(eh.Time(), daser.params.samplingWindow),
availability.IsWithinWindow(eh.Time(), daser.params.samplingWindow),
)
})
}
Expand Down
6 changes: 2 additions & 4 deletions das/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"time"

"github.com/celestiaorg/celestia-node/pruner"
)

// ErrInvalidOption is an error that is returned by Parameters.Validate
Expand Down Expand Up @@ -47,7 +45,7 @@ type Parameters struct {
// samplingWindow determines the time window that headers should fall into
// in order to be sampled. If set to 0, the sampling window will include
// all headers.
samplingWindow pruner.AvailabilityWindow
samplingWindow time.Duration
}

// DefaultParameters returns the default configuration values for the daser parameters
Expand Down Expand Up @@ -166,7 +164,7 @@ func WithSampleTimeout(sampleTimeout time.Duration) Option {

// WithSamplingWindow is a functional option to configure the DASer's
// `samplingWindow` parameter.
func WithSamplingWindow(samplingWindow pruner.AvailabilityWindow) Option {
func WithSamplingWindow(samplingWindow time.Duration) Option {
return func(d *DASer) {
d.params.samplingWindow = samplingWindow
}
Expand Down
6 changes: 3 additions & 3 deletions nodebuilder/das/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/celestiaorg/celestia-node/das"
"github.com/celestiaorg/celestia-node/header"
modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/pruner"
modshare "github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
Expand Down Expand Up @@ -45,10 +45,10 @@ func newDASer(
batching datastore.Batching,
fraudServ fraud.Service[*header.ExtendedHeader],
bFn shrexsub.BroadcastFn,
availWindow pruner.AvailabilityWindow,
availWindow modshare.Window,
options ...das.Option,
) (*das.DASer, *modfraud.ServiceBreaker[*das.DASer, *header.ExtendedHeader], error) {
options = append(options, das.WithSamplingWindow(availWindow))
options = append(options, das.WithSamplingWindow(availWindow.Duration()))

ds, err := das.NewDASer(da, hsub, store, batching, fraudServ, bFn, options...)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions nodebuilder/pruner/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import (

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
modshare "github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/pruner"
)

func newPrunerService(
p pruner.Pruner,
window pruner.AvailabilityWindow,
window modshare.Window,
getter libhead.Store[*header.ExtendedHeader],
ds datastore.Batching,
opts ...pruner.Option,
) (*pruner.Service, error) {
serv, err := pruner.NewService(p, window, getter, ds, p2p.BlockTime, opts...)
serv, err := pruner.NewService(p, window.Duration(), getter, ds, p2p.BlockTime, opts...)
if err != nil {
return nil, err
}
Expand Down
36 changes: 27 additions & 9 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pruner

import (
"context"
"time"

"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -10,10 +11,12 @@ import (
"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/libs/fxutil"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
modshare "github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/archival"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/pruner/light"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/availability/light"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery"
)

var log = logging.Logger("module/pruner")
Expand All @@ -22,6 +25,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents := fx.Options(
fx.Supply(cfg),
availWindow(tp, cfg.EnableService),
advertiseArchival(tp, cfg),
)

prunerService := fx.Options(
Expand All @@ -45,6 +49,9 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
return fx.Module("prune",
baseComponents,
prunerService,
// TODO(@walldiss @renaynay): remove conversion after Availability and Pruner interfaces are merged
// note this provide exists in pruner module to avoid cyclical imports
fx.Provide(func(la *light.ShareAvailability) pruner.Pruner { return la }),
)
}
// We do not trigger DetectPreviousRun for Light nodes, to allow them to disable pruning at wish.
Expand Down Expand Up @@ -73,8 +80,8 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Provide(func(window pruner.AvailabilityWindow) []core.Option {
return []core.Option{core.WithAvailabilityWindow(window)}
fx.Provide(func(window modshare.Window) []core.Option {
return []core.Option{core.WithAvailabilityWindow(window.Duration())}
}),
)
}
Expand All @@ -92,20 +99,31 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
}
}

func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option {
if (tp == node.Full || tp == node.Bridge) && !pruneCfg.EnableService {
return fx.Supply(discovery.WithAdvertise())
}
return fx.Provide(func() discovery.Option {
var opt discovery.Option
return opt
})
}

func availWindow(tp node.Type, pruneEnabled bool) fx.Option {
switch tp {
case node.Light:
// light nodes are still subject to sampling within window
// even if pruning is not enabled.
return fx.Provide(func() pruner.AvailabilityWindow {
return light.Window
return fx.Provide(func() modshare.Window {
return modshare.Window(availability.StorageWindow)
})
case node.Full, node.Bridge:
return fx.Provide(func() pruner.AvailabilityWindow {
return fx.Provide(func() modshare.Window {
if pruneEnabled {
return full.Window
return modshare.Window(availability.StorageWindow)
}
return archival.Window
// implicitly disable pruning by setting the window to 0
return modshare.Window(time.Duration(0))
})
default:
panic("unknown node type")
Expand Down
5 changes: 2 additions & 3 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"go.uber.org/fx"

headerServ "github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/shwap"
"github.com/celestiaorg/celestia-node/share/shwap/getters"
Expand All @@ -23,9 +22,9 @@ func bitswapGetter(
lc fx.Lifecycle,
exchange exchange.SessionExchange,
bstore blockstore.Blockstore,
wndw pruner.AvailabilityWindow,
wndw Window,
) *bitswap.Getter {
getter := bitswap.NewGetter(exchange, bstore, wndw)
getter := bitswap.NewGetter(exchange, bstore, wndw.Duration())
lc.Append(fx.StartStopHook(getter.Start, getter.Stop))
return getter
}
Expand Down
Loading