Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions block/internal/syncing/da_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type DARetriever struct {
da coreda.DA
cache cache.Manager
genesis genesis.Genesis
options common.BlockOptions
logger zerolog.Logger

// calculate namespaces bytes once and reuse them
Expand All @@ -46,14 +45,12 @@ func NewDARetriever(
cache cache.Manager,
config config.Config,
genesis genesis.Genesis,
options common.BlockOptions,
logger zerolog.Logger,
) *DARetriever {
return &DARetriever{
da: da,
cache: cache,
genesis: genesis,
options: options,
logger: logger.With().Str("component", "da_retriever").Logger(),
namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(),
namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(),
Expand Down Expand Up @@ -192,8 +189,8 @@ func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight

// Handle empty data case
if data == nil {
if r.isEmptyDataExpected(header) {
data = r.createEmptyDataForHeader(ctx, header)
if isEmptyDataExpected(header) {
data = createEmptyDataForHeader(ctx, header)
delete(r.pendingHeaders, height)
} else {
// keep header in pending headers until data lands
Expand Down Expand Up @@ -291,22 +288,6 @@ func (r *DARetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data {
return &signedData.Data
}

// isEmptyDataExpected checks if empty data is expected for a header
func (r *DARetriever) isEmptyDataExpected(header *types.SignedHeader) bool {
return len(header.DataHash) == 0 || bytes.Equal(header.DataHash, common.DataHashForEmptyTxs)
}

// createEmptyDataForHeader creates empty data for a header
func (r *DARetriever) createEmptyDataForHeader(ctx context.Context, header *types.SignedHeader) *types.Data {
return &types.Data{
Metadata: &types.Metadata{
ChainID: header.ChainID(),
Height: header.Height(),
Time: header.BaseHeader.Time,
},
}
}

// assertExpectedProposer validates the proposer address
func (r *DARetriever) assertExpectedProposer(proposerAddr []byte) error {
if string(proposerAddr) != string(r.genesis.ProposerAddress) {
Expand Down Expand Up @@ -342,3 +323,21 @@ func (r *DARetriever) assertValidSignedData(signedData *types.SignedData) error

return nil
}

// isEmptyDataExpected checks if empty data is expected for a header
func isEmptyDataExpected(header *types.SignedHeader) bool {
return len(header.DataHash) == 0 || bytes.Equal(header.DataHash, common.DataHashForEmptyTxs)
}

// createEmptyDataForHeader creates empty data for a header
func createEmptyDataForHeader(ctx context.Context, header *types.SignedHeader) *types.Data {
return &types.Data{
Txs: make(types.Txs, 0),
Metadata: &types.Metadata{
ChainID: header.ChainID(),
Height: header.Height(),
Time: header.BaseHeader.Time,
LastDataHash: nil, // LastDataHash must be filled in the syncer, as it is not available here, block n-1 has not been processed yet.
},
}
}
43 changes: 21 additions & 22 deletions block/internal/syncing/da_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestDARetriever_RetrieveFromDA_Invalid(t *testing.T) {
mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything).
Return(nil, errors.New("just invalid")).Maybe()

r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop())
events, err := r.RetrieveFromDA(context.Background(), 42)
assert.Error(t, err)
assert.Len(t, events, 0)
Expand All @@ -77,7 +77,7 @@ func TestDARetriever_RetrieveFromDA_NotFound(t *testing.T) {
mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything).
Return(nil, fmt.Errorf("%s: whatever", coreda.ErrBlobNotFound.Error())).Maybe()

r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop())
events, err := r.RetrieveFromDA(context.Background(), 42)
assert.True(t, errors.Is(err, coreda.ErrBlobNotFound))
assert.Len(t, events, 0)
Expand All @@ -94,7 +94,7 @@ func TestDARetriever_RetrieveFromDA_HeightFromFuture(t *testing.T) {
mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything).
Return(nil, fmt.Errorf("%s: later", coreda.ErrHeightFromFuture.Error())).Maybe()

r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop())
events, derr := r.RetrieveFromDA(context.Background(), 1000)
assert.Error(t, derr)
assert.True(t, errors.Is(derr, coreda.ErrHeightFromFuture))
Expand All @@ -116,7 +116,7 @@ func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) {
}).
Return(nil, context.DeadlineExceeded).Maybe()

r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop())

start := time.Now()
events, err := r.RetrieveFromDA(context.Background(), 42)
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) {
mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything).
Return(nil, context.DeadlineExceeded).Maybe()

r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop())

events, err := r.RetrieveFromDA(context.Background(), 42)

Expand All @@ -165,7 +165,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) {
addr, pub, signer := buildSyncTestSigner(t)
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}

r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data)
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) {

addr, pub, signer := buildSyncTestSigner(t)
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

// Header with no data hash present should trigger empty data creation (per current logic)
hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil)
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) {

addr, pub, signer := buildSyncTestSigner(t)
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil)
gotH := r.tryDecodeHeader(hb, 123)
Expand All @@ -238,16 +238,6 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) {
assert.Nil(t, r.tryDecodeData([]byte("junk"), 1))
}

func TestDARetriever_isEmptyDataExpected(t *testing.T) {
r := &DARetriever{}
h := &types.SignedHeader{}
// when DataHash is nil/empty -> expected empty
assert.True(t, r.isEmptyDataExpected(h))
// when equals to predefined emptyTxs hash -> expected empty
h.DataHash = common.DataHashForEmptyTxs
assert.True(t, r.isEmptyDataExpected(h))
}

func TestDARetriever_tryDecodeData_InvalidSignatureOrProposer(t *testing.T) {
ds := dssync.MutexWrap(datastore.NewMapDatastore())
st := store.New(ds)
Expand All @@ -257,7 +247,7 @@ func TestDARetriever_tryDecodeData_InvalidSignatureOrProposer(t *testing.T) {
goodAddr, pub, signer := buildSyncTestSigner(t)
badAddr := []byte("not-the-proposer")
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: badAddr}
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

// Signed data is made by goodAddr; retriever expects badAddr -> should be rejected
db, _ := makeSignedDataBytes(t, gen.ChainID, 7, goodAddr, pub, signer, 1)
Expand Down Expand Up @@ -310,7 +300,7 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) {
mockDA.EXPECT().Get(mock.Anything, mock.Anything, mock.MatchedBy(func(ns []byte) bool { return bytes.Equal(ns, namespaceDataBz) })).
Return([][]byte{dataBin}, nil).Once()

r := NewDARetriever(mockDA, cm, cfg, gen, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(mockDA, cm, cfg, gen, zerolog.Nop())

events, derr := r.RetrieveFromDA(context.Background(), 1234)
require.NoError(t, derr)
Expand All @@ -328,7 +318,7 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) {
addr, pub, signer := buildSyncTestSigner(t)
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}

r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

// Create header and data for the same block height but from different DA heights
dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 2)
Expand Down Expand Up @@ -364,7 +354,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
addr, pub, signer := buildSyncTestSigner(t)
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}

r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

// Create multiple headers and data for different block heights
data3Bin, data3 := makeSignedDataBytes(t, gen.ChainID, 3, addr, pub, signer, 1)
Expand Down Expand Up @@ -422,3 +412,12 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
require.Len(t, r.pendingHeaders, 0, "all headers should be processed")
require.Len(t, r.pendingData, 0, "all data should be processed")
}

func Test_isEmptyDataExpected(t *testing.T) {
h := &types.SignedHeader{}
// when DataHash is nil/empty -> expected empty
assert.True(t, isEmptyDataExpected(h))
// when equals to predefined emptyTxs hash -> expected empty
h.DataHash = common.DataHashForEmptyTxs
assert.True(t, isEmptyDataExpected(h))
}
68 changes: 21 additions & 47 deletions block/internal/syncing/p2p_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
goheader "github.com/celestiaorg/go-header"
"github.com/rs/zerolog"

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/types"
Expand All @@ -17,40 +18,38 @@ import (
type P2PHandler struct {
headerStore goheader.Store[*types.SignedHeader]
dataStore goheader.Store[*types.Data]
cache cache.Manager
genesis genesis.Genesis
options common.BlockOptions
logger zerolog.Logger
}

// NewP2PHandler creates a new P2P handler
func NewP2PHandler(
headerStore goheader.Store[*types.SignedHeader],
dataStore goheader.Store[*types.Data],
cache cache.Manager,
genesis genesis.Genesis,
options common.BlockOptions,
logger zerolog.Logger,
) *P2PHandler {
return &P2PHandler{
headerStore: headerStore,
dataStore: dataStore,
cache: cache,
genesis: genesis,
options: options,
logger: logger.With().Str("component", "p2p_handler").Logger(),
}
}

// ProcessHeaderRange processes headers from the header store within the given range
func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHeight uint64) []common.DAHeightEvent {
func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) {
if startHeight > endHeight {
return nil
return
}

var events []common.DAHeightEvent

for height := startHeight; height <= endHeight; height++ {
select {
case <-ctx.Done():
return events
return
default:
}

Expand All @@ -70,7 +69,7 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei
var data *types.Data
if bytes.Equal(header.DataHash, common.DataHashForEmptyTxs) {
// Create empty data for headers with empty data hash
data = h.createEmptyDataForHeader(ctx, header)
data = createEmptyDataForHeader(ctx, header)
} else {
// Try to get data from data store
retrievedData, err := h.dataStore.GetByHeight(ctx, height)
Expand All @@ -91,26 +90,26 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei
DaHeight: 0, // P2P events don't have DA height context
}

events = append(events, event)
select {
case heightInCh <- event:
default:
h.cache.SetPendingEvent(event.Header.Height(), &event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should set pending check for duplicates or if the height has already passed? right now we add the event to cache with no checks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. We can evict all previous height when we call GetNextPendingEvent. Checking the height before adding a pending event may slow down the system.

}

h.logger.Debug().Uint64("height", height).Str("source", "p2p_headers").Msg("processed header from P2P")
}

return events
}

// ProcessDataRange processes data from the data store within the given range
func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeight uint64) []common.DAHeightEvent {
func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) {
if startHeight > endHeight {
return nil
return
}

var events []common.DAHeightEvent

for height := startHeight; height <= endHeight; height++ {
select {
case <-ctx.Done():
return events
return
default:
}

Expand Down Expand Up @@ -143,12 +142,14 @@ func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeigh
DaHeight: 0, // P2P events don't have DA height context
}

events = append(events, event)
select {
case heightInCh <- event:
default:
h.cache.SetPendingEvent(event.Header.Height(), &event)
}

h.logger.Debug().Uint64("height", height).Str("source", "p2p_data").Msg("processed data from P2P")
}

return events
}

// assertExpectedProposer validates the proposer address
Expand All @@ -159,30 +160,3 @@ func (h *P2PHandler) assertExpectedProposer(proposerAddr []byte) error {
}
return nil
}

// createEmptyDataForHeader creates empty data for headers with empty data hash
func (h *P2PHandler) createEmptyDataForHeader(ctx context.Context, header *types.SignedHeader) *types.Data {
headerHeight := header.Height()
var lastDataHash types.Hash

if headerHeight > 1 {
// Try to get previous data hash, but don't fail if not available
if prevData, err := h.dataStore.GetByHeight(ctx, headerHeight-1); err == nil && prevData != nil {
lastDataHash = prevData.Hash()
} else {
h.logger.Debug().Uint64("current_height", headerHeight).Uint64("previous_height", headerHeight-1).
Msg("previous block not available, using empty last data hash")
}
}

metadata := &types.Metadata{
ChainID: header.ChainID(),
Height: headerHeight,
Time: header.BaseHeader.Time,
LastDataHash: lastDataHash,
}

return &types.Data{
Metadata: metadata,
}
}
Loading
Loading