Skip to content
5 changes: 5 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ packages:
dir: ./block/internal/syncing
pkgname: syncing
filename: syncer_mock.go
HeightStore:
config:
dir: ./block/internal/syncing
pkgname: syncing
filename: height_store_mock.go
github.com/evstack/ev-node/block/internal/common:
interfaces:
Broadcaster:
Expand Down
6 changes: 3 additions & 3 deletions apps/evm/cmd/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"

"github.com/evstack/ev-node/types"
ds "github.com/ipfs/go-datastore"
kt "github.com/ipfs/go-datastore/keytransform"
"github.com/spf13/cobra"
Expand All @@ -13,7 +14,6 @@ import (
"github.com/evstack/ev-node/node"
rollcmd "github.com/evstack/ev-node/pkg/cmd"
"github.com/evstack/ev-node/pkg/store"
"github.com/evstack/ev-node/types"
)

// NewRollbackCmd creates a command to rollback ev-node state by one height.
Expand Down Expand Up @@ -70,7 +70,7 @@ func NewRollbackCmd() *cobra.Command {
}

// rollback ev-node goheader state
headerStore, err := goheaderstore.NewStore[*types.SignedHeader](
headerStore, err := goheaderstore.NewStore[*types.P2PSignedHeader](
evolveDB,
goheaderstore.WithStorePrefix("headerSync"),
goheaderstore.WithMetrics(),
Expand All @@ -79,7 +79,7 @@ func NewRollbackCmd() *cobra.Command {
return err
}

dataStore, err := goheaderstore.NewStore[*types.Data](
dataStore, err := goheaderstore.NewStore[*types.P2PData](
evolveDB,
goheaderstore.WithStorePrefix("dataSync"),
goheaderstore.WithMetrics(),
Expand Down
7 changes: 3 additions & 4 deletions apps/testapp/cmd/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import (
"errors"
"fmt"

goheaderstore "github.com/celestiaorg/go-header/store"
kvexecutor "github.com/evstack/ev-node/apps/testapp/kv"
"github.com/evstack/ev-node/node"
rollcmd "github.com/evstack/ev-node/pkg/cmd"
"github.com/evstack/ev-node/pkg/store"
"github.com/evstack/ev-node/types"

goheaderstore "github.com/celestiaorg/go-header/store"
ds "github.com/ipfs/go-datastore"
kt "github.com/ipfs/go-datastore/keytransform"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -76,7 +75,7 @@ func NewRollbackCmd() *cobra.Command {
}

// rollback ev-node goheader state
headerStore, err := goheaderstore.NewStore[*types.SignedHeader](
headerStore, err := goheaderstore.NewStore[*types.P2PSignedHeader](
evolveDB,
goheaderstore.WithStorePrefix("headerSync"),
goheaderstore.WithMetrics(),
Expand All @@ -85,7 +84,7 @@ func NewRollbackCmd() *cobra.Command {
return err
}

dataStore, err := goheaderstore.NewStore[*types.Data](
dataStore, err := goheaderstore.NewStore[*types.P2PData](
evolveDB,
goheaderstore.WithStorePrefix("dataSync"),
goheaderstore.WithMetrics(),
Expand Down
15 changes: 7 additions & 8 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"errors"
"fmt"

"github.com/evstack/ev-node/pkg/sync"
"github.com/rs/zerolog"

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
da "github.com/evstack/ev-node/block/internal/da"
"github.com/evstack/ev-node/block/internal/executing"
"github.com/evstack/ev-node/block/internal/reaping"
Expand All @@ -20,7 +20,6 @@ import (
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/signer"
"github.com/evstack/ev-node/pkg/store"
"github.com/evstack/ev-node/types"
)

// Components represents the block-related components
Expand Down Expand Up @@ -121,8 +120,8 @@ func NewSyncComponents(
store store.Store,
exec coreexecutor.Executor,
daClient da.Client,
headerStore common.Broadcaster[*types.SignedHeader],
dataStore common.Broadcaster[*types.Data],
headerStore *sync.HeaderSyncService,
dataStore *sync.DataSyncService,
logger zerolog.Logger,
metrics *Metrics,
blockOpts BlockOptions,
Expand Down Expand Up @@ -152,7 +151,7 @@ func NewSyncComponents(
)

// Create submitter for sync nodes (no signer, only DA inclusion processing)
daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerStore, dataStore)
submitter := submitting.NewSubmitter(
store,
exec,
Expand Down Expand Up @@ -185,8 +184,8 @@ func NewAggregatorComponents(
sequencer coresequencer.Sequencer,
daClient da.Client,
signer signer.Signer,
headerBroadcaster common.Broadcaster[*types.SignedHeader],
dataBroadcaster common.Broadcaster[*types.Data],
headerBroadcaster *sync.HeaderSyncService,
dataBroadcaster *sync.DataSyncService,
logger zerolog.Logger,
metrics *Metrics,
blockOpts BlockOptions,
Expand Down Expand Up @@ -241,7 +240,7 @@ func NewAggregatorComponents(
}, nil
}

daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerBroadcaster, dataBroadcaster)
submitter := submitting.NewSubmitter(
store,
exec,
Expand Down
143 changes: 124 additions & 19 deletions block/internal/common/broadcaster_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions block/internal/common/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ type DAHeightEvent struct {
DaHeight uint64
// Source indicates where this event originated from (DA or P2P)
Source EventSource

// Optional DA height hints from P2P. first is the DA height hint for the header, second is the DA height hint for the data
DaHeightHints [2]uint64
}
11 changes: 9 additions & 2 deletions block/internal/common/expected_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@ package common
import (
"context"

"github.com/evstack/ev-node/types"
pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/celestiaorg/go-header"
)

// broadcaster interface for P2P broadcasting
type (
HeaderP2PBroadcaster = Broadcaster[*types.P2PSignedHeader]
DataP2PBroadcaster = Broadcaster[*types.P2PData]
)

// Broadcaster interface for P2P broadcasting
type Broadcaster[H header.Header[H]] interface {
WriteToStoreAndBroadcast(ctx context.Context, payload H, opts ...pubsub.PubOpt) error
Store() header.Store[H]
AppendDAHint(ctx context.Context, daHeight uint64, hashes ...types.Hash) error
GetByHeight(ctx context.Context, height uint64) (H, uint64, error)
}
16 changes: 10 additions & 6 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type Executor struct {
metrics *common.Metrics

// Broadcasting
headerBroadcaster common.Broadcaster[*types.SignedHeader]
dataBroadcaster common.Broadcaster[*types.Data]
headerBroadcaster common.HeaderP2PBroadcaster
dataBroadcaster common.DataP2PBroadcaster

// Configuration
config config.Config
Expand Down Expand Up @@ -79,8 +79,8 @@ func NewExecutor(
metrics *common.Metrics,
config config.Config,
genesis genesis.Genesis,
headerBroadcaster common.Broadcaster[*types.SignedHeader],
dataBroadcaster common.Broadcaster[*types.Data],
headerBroadcaster common.HeaderP2PBroadcaster,
dataBroadcaster common.DataP2PBroadcaster,
logger zerolog.Logger,
options common.BlockOptions,
errorCh chan<- error,
Expand Down Expand Up @@ -431,8 +431,12 @@ func (e *Executor) produceBlock() error {

// broadcast header and data to P2P network
g, ctx := errgroup.WithContext(e.ctx)
g.Go(func() error { return e.headerBroadcaster.WriteToStoreAndBroadcast(ctx, header) })
g.Go(func() error { return e.dataBroadcaster.WriteToStoreAndBroadcast(ctx, data) })
g.Go(func() error {
return e.headerBroadcaster.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{SignedHeader: *header})
})
g.Go(func() error {
return e.dataBroadcaster.WriteToStoreAndBroadcast(ctx, &types.P2PData{Data: *data})
})
if err := g.Wait(); err != nil {
e.logger.Error().Err(err).Msg("failed to broadcast header and/data")
// don't fail block production on broadcast error
Expand Down
Loading
Loading