diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index b579101511..40248019a4 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -27,7 +27,6 @@ type DARetriever struct { da coreda.DA cache cache.Manager genesis genesis.Genesis - options common.BlockOptions logger zerolog.Logger // calculate namespaces bytes once and reuse them @@ -46,14 +45,12 @@ func NewDARetriever( cache cache.Manager, config config.Config, genesis genesis.Genesis, - options common.BlockOptions, logger zerolog.Logger, ) *DARetriever { return &DARetriever{ da: da, cache: cache, genesis: genesis, - options: options, logger: logger.With().Str("component", "da_retriever").Logger(), namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(), namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(), @@ -192,8 +189,8 @@ func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight // Handle empty data case if data == nil { - if r.isEmptyDataExpected(header) { - data = r.createEmptyDataForHeader(ctx, header) + if isEmptyDataExpected(header) { + data = createEmptyDataForHeader(ctx, header) delete(r.pendingHeaders, height) } else { // keep header in pending headers until data lands @@ -291,22 +288,6 @@ func (r *DARetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data { return &signedData.Data } -// isEmptyDataExpected checks if empty data is expected for a header -func (r *DARetriever) isEmptyDataExpected(header *types.SignedHeader) bool { - return len(header.DataHash) == 0 || bytes.Equal(header.DataHash, common.DataHashForEmptyTxs) -} - -// createEmptyDataForHeader creates empty data for a header -func (r *DARetriever) createEmptyDataForHeader(ctx context.Context, header *types.SignedHeader) *types.Data { - return &types.Data{ - Metadata: &types.Metadata{ - ChainID: header.ChainID(), - Height: header.Height(), - Time: header.BaseHeader.Time, - }, - } -} - // assertExpectedProposer validates the proposer address func (r *DARetriever) assertExpectedProposer(proposerAddr []byte) error { if string(proposerAddr) != string(r.genesis.ProposerAddress) { @@ -342,3 +323,21 @@ func (r *DARetriever) assertValidSignedData(signedData *types.SignedData) error return nil } + +// isEmptyDataExpected checks if empty data is expected for a header +func isEmptyDataExpected(header *types.SignedHeader) bool { + return len(header.DataHash) == 0 || bytes.Equal(header.DataHash, common.DataHashForEmptyTxs) +} + +// createEmptyDataForHeader creates empty data for a header +func createEmptyDataForHeader(ctx context.Context, header *types.SignedHeader) *types.Data { + return &types.Data{ + Txs: make(types.Txs, 0), + Metadata: &types.Metadata{ + ChainID: header.ChainID(), + Height: header.Height(), + Time: header.BaseHeader.Time, + LastDataHash: nil, // LastDataHash must be filled in the syncer, as it is not available here, block n-1 has not been processed yet. + }, + } +} diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index 22b27dcbc5..0d97d5940f 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -59,7 +59,7 @@ func TestDARetriever_RetrieveFromDA_Invalid(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, errors.New("just invalid")).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, err := r.RetrieveFromDA(context.Background(), 42) assert.Error(t, err) assert.Len(t, events, 0) @@ -77,7 +77,7 @@ func TestDARetriever_RetrieveFromDA_NotFound(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("%s: whatever", coreda.ErrBlobNotFound.Error())).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, err := r.RetrieveFromDA(context.Background(), 42) assert.True(t, errors.Is(err, coreda.ErrBlobNotFound)) assert.Len(t, events, 0) @@ -94,7 +94,7 @@ func TestDARetriever_RetrieveFromDA_HeightFromFuture(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("%s: later", coreda.ErrHeightFromFuture.Error())).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, derr := r.RetrieveFromDA(context.Background(), 1000) assert.Error(t, derr) assert.True(t, errors.Is(derr, coreda.ErrHeightFromFuture)) @@ -116,7 +116,7 @@ func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) { }). Return(nil, context.DeadlineExceeded).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) start := time.Now() events, err := r.RetrieveFromDA(context.Background(), 42) @@ -145,7 +145,7 @@ func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, context.DeadlineExceeded).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, err := r.RetrieveFromDA(context.Background(), 42) @@ -165,7 +165,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2) hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data) @@ -193,7 +193,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Header with no data hash present should trigger empty data creation (per current logic) hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil) @@ -221,7 +221,7 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil) gotH := r.tryDecodeHeader(hb, 123) @@ -238,16 +238,6 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) { assert.Nil(t, r.tryDecodeData([]byte("junk"), 1)) } -func TestDARetriever_isEmptyDataExpected(t *testing.T) { - r := &DARetriever{} - h := &types.SignedHeader{} - // when DataHash is nil/empty -> expected empty - assert.True(t, r.isEmptyDataExpected(h)) - // when equals to predefined emptyTxs hash -> expected empty - h.DataHash = common.DataHashForEmptyTxs - assert.True(t, r.isEmptyDataExpected(h)) -} - func TestDARetriever_tryDecodeData_InvalidSignatureOrProposer(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) @@ -257,7 +247,7 @@ func TestDARetriever_tryDecodeData_InvalidSignatureOrProposer(t *testing.T) { goodAddr, pub, signer := buildSyncTestSigner(t) badAddr := []byte("not-the-proposer") gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: badAddr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Signed data is made by goodAddr; retriever expects badAddr -> should be rejected db, _ := makeSignedDataBytes(t, gen.ChainID, 7, goodAddr, pub, signer, 1) @@ -310,7 +300,7 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) { mockDA.EXPECT().Get(mock.Anything, mock.Anything, mock.MatchedBy(func(ns []byte) bool { return bytes.Equal(ns, namespaceDataBz) })). Return([][]byte{dataBin}, nil).Once() - r := NewDARetriever(mockDA, cm, cfg, gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, cfg, gen, zerolog.Nop()) events, derr := r.RetrieveFromDA(context.Background(), 1234) require.NoError(t, derr) @@ -328,7 +318,7 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Create header and data for the same block height but from different DA heights dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 2) @@ -364,7 +354,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Create multiple headers and data for different block heights data3Bin, data3 := makeSignedDataBytes(t, gen.ChainID, 3, addr, pub, signer, 1) @@ -422,3 +412,12 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin require.Len(t, r.pendingHeaders, 0, "all headers should be processed") require.Len(t, r.pendingData, 0, "all data should be processed") } + +func Test_isEmptyDataExpected(t *testing.T) { + h := &types.SignedHeader{} + // when DataHash is nil/empty -> expected empty + assert.True(t, isEmptyDataExpected(h)) + // when equals to predefined emptyTxs hash -> expected empty + h.DataHash = common.DataHashForEmptyTxs + assert.True(t, isEmptyDataExpected(h)) +} diff --git a/block/internal/syncing/p2p_handler.go b/block/internal/syncing/p2p_handler.go index cd369e44a0..cfd72dc40e 100644 --- a/block/internal/syncing/p2p_handler.go +++ b/block/internal/syncing/p2p_handler.go @@ -8,6 +8,7 @@ import ( goheader "github.com/celestiaorg/go-header" "github.com/rs/zerolog" + "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/types" @@ -17,8 +18,8 @@ import ( type P2PHandler struct { headerStore goheader.Store[*types.SignedHeader] dataStore goheader.Store[*types.Data] + cache cache.Manager genesis genesis.Genesis - options common.BlockOptions logger zerolog.Logger } @@ -26,31 +27,29 @@ type P2PHandler struct { func NewP2PHandler( headerStore goheader.Store[*types.SignedHeader], dataStore goheader.Store[*types.Data], + cache cache.Manager, genesis genesis.Genesis, - options common.BlockOptions, logger zerolog.Logger, ) *P2PHandler { return &P2PHandler{ headerStore: headerStore, dataStore: dataStore, + cache: cache, genesis: genesis, - options: options, logger: logger.With().Str("component", "p2p_handler").Logger(), } } // ProcessHeaderRange processes headers from the header store within the given range -func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHeight uint64) []common.DAHeightEvent { +func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) { if startHeight > endHeight { - return nil + return } - var events []common.DAHeightEvent - for height := startHeight; height <= endHeight; height++ { select { case <-ctx.Done(): - return events + return default: } @@ -70,7 +69,7 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei var data *types.Data if bytes.Equal(header.DataHash, common.DataHashForEmptyTxs) { // Create empty data for headers with empty data hash - data = h.createEmptyDataForHeader(ctx, header) + data = createEmptyDataForHeader(ctx, header) } else { // Try to get data from data store retrievedData, err := h.dataStore.GetByHeight(ctx, height) @@ -91,26 +90,26 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei DaHeight: 0, // P2P events don't have DA height context } - events = append(events, event) + select { + case heightInCh <- event: + default: + h.cache.SetPendingEvent(event.Header.Height(), &event) + } h.logger.Debug().Uint64("height", height).Str("source", "p2p_headers").Msg("processed header from P2P") } - - return events } // ProcessDataRange processes data from the data store within the given range -func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeight uint64) []common.DAHeightEvent { +func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) { if startHeight > endHeight { - return nil + return } - var events []common.DAHeightEvent - for height := startHeight; height <= endHeight; height++ { select { case <-ctx.Done(): - return events + return default: } @@ -143,12 +142,14 @@ func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeigh DaHeight: 0, // P2P events don't have DA height context } - events = append(events, event) + select { + case heightInCh <- event: + default: + h.cache.SetPendingEvent(event.Header.Height(), &event) + } h.logger.Debug().Uint64("height", height).Str("source", "p2p_data").Msg("processed data from P2P") } - - return events } // assertExpectedProposer validates the proposer address @@ -159,30 +160,3 @@ func (h *P2PHandler) assertExpectedProposer(proposerAddr []byte) error { } return nil } - -// createEmptyDataForHeader creates empty data for headers with empty data hash -func (h *P2PHandler) createEmptyDataForHeader(ctx context.Context, header *types.SignedHeader) *types.Data { - headerHeight := header.Height() - var lastDataHash types.Hash - - if headerHeight > 1 { - // Try to get previous data hash, but don't fail if not available - if prevData, err := h.dataStore.GetByHeight(ctx, headerHeight-1); err == nil && prevData != nil { - lastDataHash = prevData.Hash() - } else { - h.logger.Debug().Uint64("current_height", headerHeight).Uint64("previous_height", headerHeight-1). - Msg("previous block not available, using empty last data hash") - } - } - - metadata := &types.Metadata{ - ChainID: header.ChainID(), - Height: headerHeight, - Time: header.BaseHeader.Time, - LastDataHash: lastDataHash, - } - - return &types.Data{ - Metadata: metadata, - } -} diff --git a/block/internal/syncing/p2p_handler_test.go b/block/internal/syncing/p2p_handler_test.go index 11f24caca1..090ef4a7ee 100644 --- a/block/internal/syncing/p2p_handler_test.go +++ b/block/internal/syncing/p2p_handler_test.go @@ -7,14 +7,19 @@ import ( "testing" "time" + ds "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" + storemocks "github.com/evstack/ev-node/test/mocks" extmocks "github.com/evstack/ev-node/test/mocks/external" "github.com/evstack/ev-node/types" ) @@ -56,13 +61,14 @@ type P2PTestData struct { Handler *P2PHandler HeaderStore *extmocks.MockStore[*types.SignedHeader] DataStore *extmocks.MockStore[*types.Data] + Cache cache.Manager Genesis genesis.Genesis ProposerAddr []byte ProposerPub crypto.PubKey Signer signerpkg.Signer } -// setupP2P constructs a P2PHandler with mocked go-header stores +// setupP2P constructs a P2PHandler with mocked go-header stores and real cache func setupP2P(t *testing.T) *P2PTestData { t.Helper() proposerAddr, proposerPub, signer := buildTestSigner(t) @@ -72,11 +78,28 @@ func setupP2P(t *testing.T) *P2PTestData { headerStoreMock := extmocks.NewMockStore[*types.SignedHeader](t) dataStoreMock := extmocks.NewMockStore[*types.Data](t) - handler := NewP2PHandler(headerStoreMock, dataStoreMock, gen, common.DefaultBlockOptions(), zerolog.Nop()) + // Create a real cache manager for tests + storeMock := storemocks.NewMockStore(t) + // Mock the methods that cache manager initialization will call + // Return ErrNotFound for non-existent metadata keys + storeMock.EXPECT().GetMetadata(mock.Anything, "last-submitted-header-height").Return(nil, ds.ErrNotFound).Maybe() + storeMock.EXPECT().GetMetadata(mock.Anything, "last-submitted-data-height").Return(nil, ds.ErrNotFound).Maybe() + storeMock.EXPECT().Height(mock.Anything).Return(uint64(0), nil).Maybe() + storeMock.EXPECT().SetMetadata(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + + cfg := config.Config{ + RootDir: t.TempDir(), + ClearCache: true, + } + cacheManager, err := cache.NewManager(cfg, storeMock, zerolog.Nop()) + require.NoError(t, err, "failed to create cache manager") + + handler := NewP2PHandler(headerStoreMock, dataStoreMock, cacheManager, gen, zerolog.Nop()) return &P2PTestData{ Handler: handler, HeaderStore: headerStoreMock, DataStore: dataStoreMock, + Cache: cacheManager, Genesis: gen, ProposerAddr: proposerAddr, ProposerPub: proposerPub, @@ -84,6 +107,29 @@ func setupP2P(t *testing.T) *P2PTestData { } } +// collectEvents reads events from a channel with a timeout +func collectEvents(t *testing.T, ch <-chan common.DAHeightEvent, timeout time.Duration) []common.DAHeightEvent { + t.Helper() + var events []common.DAHeightEvent + deadline := time.After(timeout) + for { + select { + case event := <-ch: + events = append(events, event) + case <-deadline: + return events + case <-time.After(10 * time.Millisecond): + // Give it a moment to check if more events are coming + select { + case event := <-ch: + events = append(events, event) + default: + return events + } + } + } +} + func TestP2PHandler_ProcessHeaderRange_HeaderAndDataHappyPath(t *testing.T) { p2pData := setupP2P(t) ctx := context.Background() @@ -104,10 +150,14 @@ func TestP2PHandler_ProcessHeaderRange_HeaderAndDataHappyPath(t *testing.T) { // Sanity: header should validate with data using default sync verifier require.NoError(t, signedHeader.ValidateBasicWithData(blockData), "header+data must validate before handler processes them") - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(signedHeader, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(blockData, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(signedHeader, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(blockData, nil).Once() - events := p2pData.Handler.ProcessHeaderRange(ctx, 5, 5) + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 5, 5, ch) + + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 1, "expected one event for the provided header/data height") require.Equal(t, uint64(5), events[0].Header.Height()) require.NotNil(t, events[0].Data) @@ -125,10 +175,14 @@ func TestP2PHandler_ProcessHeaderRange_MissingData_NonEmptyHash(t *testing.T) { blockData := makeData(p2pData.Genesis.ChainID, 7, 1) signedHeader.DataHash = blockData.DACommitment() - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(7)).Return(signedHeader, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(7)).Return(nil, errors.New("not found")).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(signedHeader, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(nil, errors.New("not found")).Once() + + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 7, 7, ch) - events := p2pData.Handler.ProcessHeaderRange(ctx, 7, 7) + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 0) } @@ -137,10 +191,14 @@ func TestP2PHandler_ProcessDataRange_HeaderMissing(t *testing.T) { ctx := context.Background() blockData := makeData(p2pData.Genesis.ChainID, 9, 1) - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(9)).Return(blockData, nil).Once() - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(9)).Return(nil, errors.New("no header")).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(blockData, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(nil, errors.New("no header")).Once() + + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessDataRange(ctx, 9, 9, ch) - events := p2pData.Handler.ProcessDataRange(ctx, 9, 9) + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 0) } @@ -154,33 +212,17 @@ func TestP2PHandler_ProposerMismatch_Rejected(t *testing.T) { signedHeader := p2pMakeSignedHeader(t, p2pData.Genesis.ChainID, 4, badAddr, pub, signer) signedHeader.DataHash = common.DataHashForEmptyTxs - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(4)).Return(signedHeader, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(4)).Return(signedHeader, nil).Once() - events := p2pData.Handler.ProcessHeaderRange(ctx, 4, 4) - require.Len(t, events, 0) -} + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 4, 4, ch) -func TestP2PHandler_CreateEmptyDataForHeader_UsesPreviousDataHash(t *testing.T) { - p2pData := setupP2P(t) - ctx := context.Background() - - // Prepare a header at height 10 - signedHeader := p2pMakeSignedHeader(t, p2pData.Genesis.ChainID, 10, p2pData.ProposerAddr, p2pData.ProposerPub, p2pData.Signer) - signedHeader.DataHash = common.DataHashForEmptyTxs - - // Mock previous data at height 9 so handler can propagate its hash - previousData := makeData(p2pData.Genesis.ChainID, 9, 1) - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(9)).Return(previousData, nil).Once() - - emptyData := p2pData.Handler.createEmptyDataForHeader(ctx, signedHeader) - require.NotNil(t, emptyData, "handler should synthesize empty data when header declares empty data hash") - require.Equal(t, p2pData.Genesis.ChainID, emptyData.ChainID(), "synthesized data should carry header chain ID") - require.Equal(t, uint64(10), emptyData.Height(), "synthesized data should carry header height") - require.Equal(t, signedHeader.BaseHeader.Time, emptyData.Metadata.Time, "synthesized data should carry header time") - require.Equal(t, previousData.Hash(), emptyData.LastDataHash, "synthesized data should propagate previous data hash") + events := collectEvents(t, ch, 100*time.Millisecond) + require.Len(t, events, 0) } -func TestP2PHandler_CreateEmptyDataForHeader_NoPreviousData(t *testing.T) { +func TestP2PHandler_CreateEmptyDataForHeader(t *testing.T) { p2pData := setupP2P(t) ctx := context.Background() @@ -188,16 +230,12 @@ func TestP2PHandler_CreateEmptyDataForHeader_NoPreviousData(t *testing.T) { signedHeader := p2pMakeSignedHeader(t, p2pData.Genesis.ChainID, 2, p2pData.ProposerAddr, p2pData.ProposerPub, p2pData.Signer) signedHeader.DataHash = common.DataHashForEmptyTxs - // Mock previous data fetch failure - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(1)).Return(nil, errors.New("not available")).Once() - - emptyData := p2pData.Handler.createEmptyDataForHeader(ctx, signedHeader) + emptyData := createEmptyDataForHeader(ctx, signedHeader) require.NotNil(t, emptyData, "handler should synthesize empty data even when previous data is unavailable") require.Equal(t, p2pData.Genesis.ChainID, emptyData.ChainID(), "synthesized data should carry header chain ID") require.Equal(t, uint64(2), emptyData.Height(), "synthesized data should carry header height") require.Equal(t, signedHeader.BaseHeader.Time, emptyData.Metadata.Time, "synthesized data should carry header time") - // When no previous data is available, LastDataHash should be zero value - require.Equal(t, (types.Hash)(nil), emptyData.LastDataHash, "last data hash should be empty when previous data is not available") + require.Equal(t, (types.Hash)(nil), emptyData.LastDataHash) } func TestP2PHandler_ProcessHeaderRange_MultipleHeightsHappyPath(t *testing.T) { @@ -229,12 +267,16 @@ func TestP2PHandler_ProcessHeaderRange_MultipleHeightsHappyPath(t *testing.T) { require.NoError(t, header6.ValidateBasicWithData(data6), "header/data invalid for height 6") // Expectations for both heights - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(header5, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(data5, nil).Once() - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(6)).Return(header6, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(6)).Return(data6, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(header5, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(data5, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(header6, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(data6, nil).Once() - events := p2pData.Handler.ProcessHeaderRange(ctx, 5, 6) + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 5, 6, ch) + + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 2, "expected two events for heights 5 and 6") require.Equal(t, uint64(5), events[0].Header.Height(), "first event should be height 5") require.Equal(t, uint64(6), events[1].Header.Height(), "second event should be height 6") @@ -248,14 +290,18 @@ func TestP2PHandler_ProcessDataRange_HeaderValidateHeaderFails(t *testing.T) { // Data exists at height 3 blockData := makeData(p2pData.Genesis.ChainID, 3, 1) - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(3)).Return(blockData, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(3)).Return(blockData, nil).Once() // Header proposer does not match genesis -> validateHeader should fail badAddr, pub, signer := buildTestSigner(t) require.NotEqual(t, string(p2pData.Genesis.ProposerAddress), string(badAddr), "negative test requires mismatched proposer") badHeader := p2pMakeSignedHeader(t, p2pData.Genesis.ChainID, 3, badAddr, pub, signer) - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(3)).Return(badHeader, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(3)).Return(badHeader, nil).Once() + + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessDataRange(ctx, 3, 3, ch) - events := p2pData.Handler.ProcessDataRange(ctx, 3, 3) + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 0, "validateHeader failure should drop event") } diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index e928c9dbf3..d1dc7253a5 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -25,9 +25,10 @@ import ( type daRetriever interface { RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) } + type p2pHandler interface { - ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent - ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent + ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64, heightInCh chan<- common.DAHeightEvent) + ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64, heightInCh chan<- common.DAHeightEvent) } // Syncer handles block synchronization from DA and P2P sources. @@ -117,8 +118,8 @@ func (s *Syncer) Start(ctx context.Context) error { } // Initialize handlers - s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.options, s.logger) - s.p2pHandler = NewP2PHandler(s.headerStore, s.dataStore, s.genesis, s.options, s.logger) + s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.logger) + s.p2pHandler = NewP2PHandler(s.headerStore, s.dataStore, s.cache, s.genesis, s.logger) // Start main processing loop s.wg.Add(1) @@ -236,36 +237,25 @@ func (s *Syncer) syncLoop() { } } - initialHeight, err := s.store.Height(s.ctx) - if err != nil { - s.logger.Error().Err(err).Msg("failed to get initial height") - return - } - - lastHeaderHeight := &initialHeight - lastDataHeight := &initialHeight - // Backoff control when DA replies with errors nextDARequestAt := &time.Time{} - blockTicker := time.NewTicker(s.config.Node.BlockTime.Duration) - defer blockTicker.Stop() - for { select { case <-s.ctx.Done(): return default: } - // Process pending events from cache on every iteration - s.processPendingEvents() - fetchedP2pEvent := s.tryFetchFromP2P(lastHeaderHeight, lastDataHeight, blockTicker.C) - fetchedDaEvent := s.tryFetchFromDA(nextDARequestAt) + s.processPendingEvents() + s.tryFetchFromP2P() + s.tryFetchFromDA(nextDARequestAt) - // Prevent busy-waiting when no events are available - if !fetchedDaEvent && !fetchedP2pEvent { - time.Sleep(min(10*time.Millisecond, s.config.Node.BlockTime.Duration)) + // Prevent busy-waiting when no events are processed + select { + case <-s.ctx.Done(): + return + case <-time.After(min(10*time.Millisecond, s.config.Node.BlockTime.Duration)): } } } @@ -273,13 +263,13 @@ func (s *Syncer) syncLoop() { // tryFetchFromDA attempts to fetch events from the DA layer. // It handles backoff timing, DA height management, and error classification. // Returns true if any events were successfully processed. -func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { +func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) { now := time.Now() daHeight := s.GetDAHeight() // Respect backoff window if set if !nextDARequestAt.IsZero() && now.Before(*nextDARequestAt) { - return false + return } // Retrieve from DA as fast as possible (unless throttled by HFF) @@ -291,7 +281,7 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { s.SetDAHeight(daHeight + 1) // Reset backoff on success *nextDARequestAt = time.Time{} - return false + return } // Back off exactly by DA block time to avoid overloading @@ -303,7 +293,7 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { s.logger.Error().Err(err).Dur("delay", backoffDelay).Uint64("da_height", daHeight).Msg("failed to retrieve from DA; backing off DA requests") - return false + return } // Reset backoff on success @@ -320,57 +310,29 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { // increment DA height on successful retrieval s.SetDAHeight(daHeight + 1) - return len(events) > 0 } // tryFetchFromP2P attempts to fetch events from P2P stores. // It processes both header and data ranges when the block ticker fires. // Returns true if any events were successfully processed. -func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, blockTicker <-chan time.Time) bool { - eventsProcessed := false - - select { - case <-blockTicker: - // Process headers - newHeaderHeight := s.headerStore.Height() - if newHeaderHeight > *lastHeaderHeight { - events := s.p2pHandler.ProcessHeaderRange(s.ctx, *lastHeaderHeight+1, newHeaderHeight) - for _, event := range events { - select { - case s.heightInCh <- event: - default: - s.cache.SetPendingEvent(event.Header.Height(), &event) - } - } - *lastHeaderHeight = newHeaderHeight - if len(events) > 0 { - eventsProcessed = true - } - } +func (s *Syncer) tryFetchFromP2P() { + currentHeight, err := s.store.Height(s.ctx) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get current height") + return + } - // Process data - newDataHeight := s.dataStore.Height() - if newDataHeight == newHeaderHeight { - *lastDataHeight = newDataHeight - } else if newDataHeight > *lastDataHeight { - events := s.p2pHandler.ProcessDataRange(s.ctx, *lastDataHeight+1, newDataHeight) - for _, event := range events { - select { - case s.heightInCh <- event: - default: - s.cache.SetPendingEvent(event.Header.Height(), &event) - } - } - *lastDataHeight = newDataHeight - if len(events) > 0 { - eventsProcessed = true - } - } - default: - // No P2P events available + // Process headers + newHeaderHeight := s.headerStore.Height() + if newHeaderHeight > currentHeight { + s.p2pHandler.ProcessHeaderRange(s.ctx, currentHeight+1, newHeaderHeight, s.heightInCh) } - return eventsProcessed + // Process data (if not already processed by headers) + newDataHeight := s.dataStore.Height() + if newDataHeight != newHeaderHeight && newDataHeight > currentHeight { + s.p2pHandler.ProcessDataRange(s.ctx, currentHeight+1, newDataHeight, s.heightInCh) + } } func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { @@ -403,6 +365,16 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { return } + // LastDataHash must be gotten from store when the data hash is empty. + if bytes.Equal(event.Header.DataHash, common.DataHashForEmptyTxs) && currentHeight > 0 { + _, lastData, err := s.store.GetBlockData(s.ctx, currentHeight) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get last data") + return + } + event.Data.LastDataHash = lastData.Hash() + } + // Try to sync the next block if err := s.trySyncNextBlock(event); err != nil { s.logger.Error().Err(err).Msg("failed to sync next block") diff --git a/block/internal/syncing/syncer_mock.go b/block/internal/syncing/syncer_mock.go index 2413db59cd..85cad46960 100644 --- a/block/internal/syncing/syncer_mock.go +++ b/block/internal/syncing/syncer_mock.go @@ -134,22 +134,9 @@ func (_m *mockp2pHandler) EXPECT() *mockp2pHandler_Expecter { } // ProcessDataRange provides a mock function for the type mockp2pHandler -func (_mock *mockp2pHandler) ProcessDataRange(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent { - ret := _mock.Called(ctx, fromHeight, toHeight) - - if len(ret) == 0 { - panic("no return value specified for ProcessDataRange") - } - - var r0 []common.DAHeightEvent - if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64) []common.DAHeightEvent); ok { - r0 = returnFunc(ctx, fromHeight, toHeight) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]common.DAHeightEvent) - } - } - return r0 +func (_mock *mockp2pHandler) ProcessDataRange(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent) { + _mock.Called(ctx, fromHeight, toHeight, heightInCh) + return } // mockp2pHandler_ProcessDataRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessDataRange' @@ -161,11 +148,12 @@ type mockp2pHandler_ProcessDataRange_Call struct { // - ctx context.Context // - fromHeight uint64 // - toHeight uint64 -func (_e *mockp2pHandler_Expecter) ProcessDataRange(ctx interface{}, fromHeight interface{}, toHeight interface{}) *mockp2pHandler_ProcessDataRange_Call { - return &mockp2pHandler_ProcessDataRange_Call{Call: _e.mock.On("ProcessDataRange", ctx, fromHeight, toHeight)} +// - heightInCh chan<- common.DAHeightEvent +func (_e *mockp2pHandler_Expecter) ProcessDataRange(ctx interface{}, fromHeight interface{}, toHeight interface{}, heightInCh interface{}) *mockp2pHandler_ProcessDataRange_Call { + return &mockp2pHandler_ProcessDataRange_Call{Call: _e.mock.On("ProcessDataRange", ctx, fromHeight, toHeight, heightInCh)} } -func (_c *mockp2pHandler_ProcessDataRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64)) *mockp2pHandler_ProcessDataRange_Call { +func (_c *mockp2pHandler_ProcessDataRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessDataRange_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { @@ -179,42 +167,34 @@ func (_c *mockp2pHandler_ProcessDataRange_Call) Run(run func(ctx context.Context if args[2] != nil { arg2 = args[2].(uint64) } + var arg3 chan<- common.DAHeightEvent + if args[3] != nil { + arg3 = args[3].(chan<- common.DAHeightEvent) + } run( arg0, arg1, arg2, + arg3, ) }) return _c } -func (_c *mockp2pHandler_ProcessDataRange_Call) Return(dAHeightEvents []common.DAHeightEvent) *mockp2pHandler_ProcessDataRange_Call { - _c.Call.Return(dAHeightEvents) +func (_c *mockp2pHandler_ProcessDataRange_Call) Return() *mockp2pHandler_ProcessDataRange_Call { + _c.Call.Return() return _c } -func (_c *mockp2pHandler_ProcessDataRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent) *mockp2pHandler_ProcessDataRange_Call { - _c.Call.Return(run) +func (_c *mockp2pHandler_ProcessDataRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessDataRange_Call { + _c.Run(run) return _c } // ProcessHeaderRange provides a mock function for the type mockp2pHandler -func (_mock *mockp2pHandler) ProcessHeaderRange(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent { - ret := _mock.Called(ctx, fromHeight, toHeight) - - if len(ret) == 0 { - panic("no return value specified for ProcessHeaderRange") - } - - var r0 []common.DAHeightEvent - if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64) []common.DAHeightEvent); ok { - r0 = returnFunc(ctx, fromHeight, toHeight) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]common.DAHeightEvent) - } - } - return r0 +func (_mock *mockp2pHandler) ProcessHeaderRange(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent) { + _mock.Called(ctx, fromHeight, toHeight, heightInCh) + return } // mockp2pHandler_ProcessHeaderRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessHeaderRange' @@ -226,11 +206,12 @@ type mockp2pHandler_ProcessHeaderRange_Call struct { // - ctx context.Context // - fromHeight uint64 // - toHeight uint64 -func (_e *mockp2pHandler_Expecter) ProcessHeaderRange(ctx interface{}, fromHeight interface{}, toHeight interface{}) *mockp2pHandler_ProcessHeaderRange_Call { - return &mockp2pHandler_ProcessHeaderRange_Call{Call: _e.mock.On("ProcessHeaderRange", ctx, fromHeight, toHeight)} +// - heightInCh chan<- common.DAHeightEvent +func (_e *mockp2pHandler_Expecter) ProcessHeaderRange(ctx interface{}, fromHeight interface{}, toHeight interface{}, heightInCh interface{}) *mockp2pHandler_ProcessHeaderRange_Call { + return &mockp2pHandler_ProcessHeaderRange_Call{Call: _e.mock.On("ProcessHeaderRange", ctx, fromHeight, toHeight, heightInCh)} } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64)) *mockp2pHandler_ProcessHeaderRange_Call { +func (_c *mockp2pHandler_ProcessHeaderRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessHeaderRange_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { @@ -244,21 +225,26 @@ func (_c *mockp2pHandler_ProcessHeaderRange_Call) Run(run func(ctx context.Conte if args[2] != nil { arg2 = args[2].(uint64) } + var arg3 chan<- common.DAHeightEvent + if args[3] != nil { + arg3 = args[3].(chan<- common.DAHeightEvent) + } run( arg0, arg1, arg2, + arg3, ) }) return _c } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) Return(dAHeightEvents []common.DAHeightEvent) *mockp2pHandler_ProcessHeaderRange_Call { - _c.Call.Return(dAHeightEvents) +func (_c *mockp2pHandler_ProcessHeaderRange_Call) Return() *mockp2pHandler_ProcessHeaderRange_Call { + _c.Call.Return() return _c } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent) *mockp2pHandler_ProcessHeaderRange_Call { - _c.Call.Return(run) +func (_c *mockp2pHandler_ProcessHeaderRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessHeaderRange_Call { + _c.Run(run) return _c } diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index cbeaa7a97d..3e4fe3af62 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -13,6 +13,7 @@ import ( signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" testmocks "github.com/evstack/ev-node/test/mocks" + mocks "github.com/evstack/ev-node/test/mocks/external" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" @@ -107,8 +108,8 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mocks.MockStore[*types.SignedHeader]{}, + &mocks.MockStore[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -155,8 +156,8 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mocks.MockStore[*types.SignedHeader]{}, + &mocks.MockStore[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -205,8 +206,8 @@ func TestSequentialBlockSync(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mocks.MockStore[*types.SignedHeader]{}, + &mocks.MockStore[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -326,6 +327,10 @@ func TestSyncLoopPersistState(t *testing.T) { gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr, DAStartHeight: myDAHeightOffset} dummyExec := execution.NewDummyExecutor() + mockP2PHeaderStore := &mocks.MockStore[*types.SignedHeader]{} + mockP2PDataStore := &mocks.MockStore[*types.Data]{} + mockP2PHeaderStore.On("Height", mock.Anything).Return(uint64(1), nil).Maybe() + mockP2PDataStore.On("Height", mock.Anything).Return(uint64(1), nil).Maybe() syncerInst1 := NewSyncer( st, @@ -335,8 +340,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + mockP2PHeaderStore, + mockP2PDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -346,6 +351,8 @@ func TestSyncLoopPersistState(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) syncerInst1.ctx = ctx daRtrMock, p2pHndlMock := newMockdaRetriever(t), newMockp2pHandler(t) + p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() syncerInst1.daRetriever, syncerInst1.p2pHandler = daRtrMock, p2pHndlMock // with n da blobs fetched @@ -412,8 +419,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + mockP2PHeaderStore, + mockP2PDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -425,6 +432,8 @@ func TestSyncLoopPersistState(t *testing.T) { t.Cleanup(cancel) syncerInst2.ctx = ctx daRtrMock, p2pHndlMock = newMockdaRetriever(t), newMockp2pHandler(t) + p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() syncerInst2.daRetriever, syncerInst2.p2pHandler = daRtrMock, p2pHndlMock daRtrMock.On("RetrieveFromDA", mock.Anything, mock.Anything). diff --git a/test/e2e/evm_full_node_e2e_test.go b/test/e2e/evm_full_node_e2e_test.go index 5d10726722..2bff86d93c 100644 --- a/test/e2e/evm_full_node_e2e_test.go +++ b/test/e2e/evm_full_node_e2e_test.go @@ -426,8 +426,8 @@ func TestEvmSequencerWithFullNodeE2E(t *testing.T) { t.Logf("Full node block height before DA inclusion wait: %d", fnBlockHeightBeforeWait) // Wait a few seconds to allow DA inclusion to process - waitTime := 2 * time.Second - t.Logf("Waiting %v 2s for DA inclusion to process...", waitTime) + waitTime := 4 * time.Second + t.Logf("Waiting %v for DA inclusion to process...", waitTime) time.Sleep(waitTime) // Get the DA included height from full node after the wait