From 1ca404b6b9b13d22e507edd2a9d35c49ce1d15a2 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Oct 2025 14:23:18 +0200 Subject: [PATCH 1/6] refactor(syncer): fix last data check for both da and syncer + optimize --- block/internal/syncing/da_retriever.go | 41 +++--- block/internal/syncing/da_retriever_test.go | 43 +++--- block/internal/syncing/p2p_handler.go | 68 +++------ block/internal/syncing/p2p_handler_test.go | 152 ++++++++++++-------- block/internal/syncing/syncer.go | 128 ++++++++--------- block/internal/syncing/syncer_mock.go | 78 +++++----- 6 files changed, 244 insertions(+), 266 deletions(-) diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index b579101511..40248019a4 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -27,7 +27,6 @@ type DARetriever struct { da coreda.DA cache cache.Manager genesis genesis.Genesis - options common.BlockOptions logger zerolog.Logger // calculate namespaces bytes once and reuse them @@ -46,14 +45,12 @@ func NewDARetriever( cache cache.Manager, config config.Config, genesis genesis.Genesis, - options common.BlockOptions, logger zerolog.Logger, ) *DARetriever { return &DARetriever{ da: da, cache: cache, genesis: genesis, - options: options, logger: logger.With().Str("component", "da_retriever").Logger(), namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(), namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(), @@ -192,8 +189,8 @@ func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight // Handle empty data case if data == nil { - if r.isEmptyDataExpected(header) { - data = r.createEmptyDataForHeader(ctx, header) + if isEmptyDataExpected(header) { + data = createEmptyDataForHeader(ctx, header) delete(r.pendingHeaders, height) } else { // keep header in pending headers until data lands @@ -291,22 +288,6 @@ func (r *DARetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data { return &signedData.Data } -// isEmptyDataExpected checks if empty data is expected for a header -func (r *DARetriever) isEmptyDataExpected(header *types.SignedHeader) bool { - return len(header.DataHash) == 0 || bytes.Equal(header.DataHash, common.DataHashForEmptyTxs) -} - -// createEmptyDataForHeader creates empty data for a header -func (r *DARetriever) createEmptyDataForHeader(ctx context.Context, header *types.SignedHeader) *types.Data { - return &types.Data{ - Metadata: &types.Metadata{ - ChainID: header.ChainID(), - Height: header.Height(), - Time: header.BaseHeader.Time, - }, - } -} - // assertExpectedProposer validates the proposer address func (r *DARetriever) assertExpectedProposer(proposerAddr []byte) error { if string(proposerAddr) != string(r.genesis.ProposerAddress) { @@ -342,3 +323,21 @@ func (r *DARetriever) assertValidSignedData(signedData *types.SignedData) error return nil } + +// isEmptyDataExpected checks if empty data is expected for a header +func isEmptyDataExpected(header *types.SignedHeader) bool { + return len(header.DataHash) == 0 || bytes.Equal(header.DataHash, common.DataHashForEmptyTxs) +} + +// createEmptyDataForHeader creates empty data for a header +func createEmptyDataForHeader(ctx context.Context, header *types.SignedHeader) *types.Data { + return &types.Data{ + Txs: make(types.Txs, 0), + Metadata: &types.Metadata{ + ChainID: header.ChainID(), + Height: header.Height(), + Time: header.BaseHeader.Time, + LastDataHash: nil, // LastDataHash must be filled in the syncer, as it is not available here, block n-1 has not been processed yet. + }, + } +} diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index 22b27dcbc5..0d97d5940f 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -59,7 +59,7 @@ func TestDARetriever_RetrieveFromDA_Invalid(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, errors.New("just invalid")).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, err := r.RetrieveFromDA(context.Background(), 42) assert.Error(t, err) assert.Len(t, events, 0) @@ -77,7 +77,7 @@ func TestDARetriever_RetrieveFromDA_NotFound(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("%s: whatever", coreda.ErrBlobNotFound.Error())).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, err := r.RetrieveFromDA(context.Background(), 42) assert.True(t, errors.Is(err, coreda.ErrBlobNotFound)) assert.Len(t, events, 0) @@ -94,7 +94,7 @@ func TestDARetriever_RetrieveFromDA_HeightFromFuture(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("%s: later", coreda.ErrHeightFromFuture.Error())).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, derr := r.RetrieveFromDA(context.Background(), 1000) assert.Error(t, derr) assert.True(t, errors.Is(derr, coreda.ErrHeightFromFuture)) @@ -116,7 +116,7 @@ func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) { }). Return(nil, context.DeadlineExceeded).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) start := time.Now() events, err := r.RetrieveFromDA(context.Background(), 42) @@ -145,7 +145,7 @@ func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, context.DeadlineExceeded).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, err := r.RetrieveFromDA(context.Background(), 42) @@ -165,7 +165,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2) hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data) @@ -193,7 +193,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Header with no data hash present should trigger empty data creation (per current logic) hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil) @@ -221,7 +221,7 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil) gotH := r.tryDecodeHeader(hb, 123) @@ -238,16 +238,6 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) { assert.Nil(t, r.tryDecodeData([]byte("junk"), 1)) } -func TestDARetriever_isEmptyDataExpected(t *testing.T) { - r := &DARetriever{} - h := &types.SignedHeader{} - // when DataHash is nil/empty -> expected empty - assert.True(t, r.isEmptyDataExpected(h)) - // when equals to predefined emptyTxs hash -> expected empty - h.DataHash = common.DataHashForEmptyTxs - assert.True(t, r.isEmptyDataExpected(h)) -} - func TestDARetriever_tryDecodeData_InvalidSignatureOrProposer(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) @@ -257,7 +247,7 @@ func TestDARetriever_tryDecodeData_InvalidSignatureOrProposer(t *testing.T) { goodAddr, pub, signer := buildSyncTestSigner(t) badAddr := []byte("not-the-proposer") gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: badAddr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Signed data is made by goodAddr; retriever expects badAddr -> should be rejected db, _ := makeSignedDataBytes(t, gen.ChainID, 7, goodAddr, pub, signer, 1) @@ -310,7 +300,7 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) { mockDA.EXPECT().Get(mock.Anything, mock.Anything, mock.MatchedBy(func(ns []byte) bool { return bytes.Equal(ns, namespaceDataBz) })). Return([][]byte{dataBin}, nil).Once() - r := NewDARetriever(mockDA, cm, cfg, gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, cfg, gen, zerolog.Nop()) events, derr := r.RetrieveFromDA(context.Background(), 1234) require.NoError(t, derr) @@ -328,7 +318,7 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Create header and data for the same block height but from different DA heights dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 2) @@ -364,7 +354,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Create multiple headers and data for different block heights data3Bin, data3 := makeSignedDataBytes(t, gen.ChainID, 3, addr, pub, signer, 1) @@ -422,3 +412,12 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin require.Len(t, r.pendingHeaders, 0, "all headers should be processed") require.Len(t, r.pendingData, 0, "all data should be processed") } + +func Test_isEmptyDataExpected(t *testing.T) { + h := &types.SignedHeader{} + // when DataHash is nil/empty -> expected empty + assert.True(t, isEmptyDataExpected(h)) + // when equals to predefined emptyTxs hash -> expected empty + h.DataHash = common.DataHashForEmptyTxs + assert.True(t, isEmptyDataExpected(h)) +} diff --git a/block/internal/syncing/p2p_handler.go b/block/internal/syncing/p2p_handler.go index cd369e44a0..cfd72dc40e 100644 --- a/block/internal/syncing/p2p_handler.go +++ b/block/internal/syncing/p2p_handler.go @@ -8,6 +8,7 @@ import ( goheader "github.com/celestiaorg/go-header" "github.com/rs/zerolog" + "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/types" @@ -17,8 +18,8 @@ import ( type P2PHandler struct { headerStore goheader.Store[*types.SignedHeader] dataStore goheader.Store[*types.Data] + cache cache.Manager genesis genesis.Genesis - options common.BlockOptions logger zerolog.Logger } @@ -26,31 +27,29 @@ type P2PHandler struct { func NewP2PHandler( headerStore goheader.Store[*types.SignedHeader], dataStore goheader.Store[*types.Data], + cache cache.Manager, genesis genesis.Genesis, - options common.BlockOptions, logger zerolog.Logger, ) *P2PHandler { return &P2PHandler{ headerStore: headerStore, dataStore: dataStore, + cache: cache, genesis: genesis, - options: options, logger: logger.With().Str("component", "p2p_handler").Logger(), } } // ProcessHeaderRange processes headers from the header store within the given range -func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHeight uint64) []common.DAHeightEvent { +func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) { if startHeight > endHeight { - return nil + return } - var events []common.DAHeightEvent - for height := startHeight; height <= endHeight; height++ { select { case <-ctx.Done(): - return events + return default: } @@ -70,7 +69,7 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei var data *types.Data if bytes.Equal(header.DataHash, common.DataHashForEmptyTxs) { // Create empty data for headers with empty data hash - data = h.createEmptyDataForHeader(ctx, header) + data = createEmptyDataForHeader(ctx, header) } else { // Try to get data from data store retrievedData, err := h.dataStore.GetByHeight(ctx, height) @@ -91,26 +90,26 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei DaHeight: 0, // P2P events don't have DA height context } - events = append(events, event) + select { + case heightInCh <- event: + default: + h.cache.SetPendingEvent(event.Header.Height(), &event) + } h.logger.Debug().Uint64("height", height).Str("source", "p2p_headers").Msg("processed header from P2P") } - - return events } // ProcessDataRange processes data from the data store within the given range -func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeight uint64) []common.DAHeightEvent { +func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) { if startHeight > endHeight { - return nil + return } - var events []common.DAHeightEvent - for height := startHeight; height <= endHeight; height++ { select { case <-ctx.Done(): - return events + return default: } @@ -143,12 +142,14 @@ func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeigh DaHeight: 0, // P2P events don't have DA height context } - events = append(events, event) + select { + case heightInCh <- event: + default: + h.cache.SetPendingEvent(event.Header.Height(), &event) + } h.logger.Debug().Uint64("height", height).Str("source", "p2p_data").Msg("processed data from P2P") } - - return events } // assertExpectedProposer validates the proposer address @@ -159,30 +160,3 @@ func (h *P2PHandler) assertExpectedProposer(proposerAddr []byte) error { } return nil } - -// createEmptyDataForHeader creates empty data for headers with empty data hash -func (h *P2PHandler) createEmptyDataForHeader(ctx context.Context, header *types.SignedHeader) *types.Data { - headerHeight := header.Height() - var lastDataHash types.Hash - - if headerHeight > 1 { - // Try to get previous data hash, but don't fail if not available - if prevData, err := h.dataStore.GetByHeight(ctx, headerHeight-1); err == nil && prevData != nil { - lastDataHash = prevData.Hash() - } else { - h.logger.Debug().Uint64("current_height", headerHeight).Uint64("previous_height", headerHeight-1). - Msg("previous block not available, using empty last data hash") - } - } - - metadata := &types.Metadata{ - ChainID: header.ChainID(), - Height: headerHeight, - Time: header.BaseHeader.Time, - LastDataHash: lastDataHash, - } - - return &types.Data{ - Metadata: metadata, - } -} diff --git a/block/internal/syncing/p2p_handler_test.go b/block/internal/syncing/p2p_handler_test.go index 11f24caca1..c6d9ef6406 100644 --- a/block/internal/syncing/p2p_handler_test.go +++ b/block/internal/syncing/p2p_handler_test.go @@ -7,14 +7,19 @@ import ( "testing" "time" + ds "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" + storemocks "github.com/evstack/ev-node/test/mocks" extmocks "github.com/evstack/ev-node/test/mocks/external" "github.com/evstack/ev-node/types" ) @@ -56,13 +61,14 @@ type P2PTestData struct { Handler *P2PHandler HeaderStore *extmocks.MockStore[*types.SignedHeader] DataStore *extmocks.MockStore[*types.Data] + Cache cache.Manager Genesis genesis.Genesis ProposerAddr []byte ProposerPub crypto.PubKey Signer signerpkg.Signer } -// setupP2P constructs a P2PHandler with mocked go-header stores +// setupP2P constructs a P2PHandler with mocked go-header stores and real cache func setupP2P(t *testing.T) *P2PTestData { t.Helper() proposerAddr, proposerPub, signer := buildTestSigner(t) @@ -72,11 +78,28 @@ func setupP2P(t *testing.T) *P2PTestData { headerStoreMock := extmocks.NewMockStore[*types.SignedHeader](t) dataStoreMock := extmocks.NewMockStore[*types.Data](t) - handler := NewP2PHandler(headerStoreMock, dataStoreMock, gen, common.DefaultBlockOptions(), zerolog.Nop()) + // Create a real cache manager for tests + storeMock := storemocks.NewMockStore(t) + // Mock the methods that cache manager initialization will call + // Return ErrNotFound for non-existent metadata keys + storeMock.EXPECT().GetMetadata(mock.Anything, "last-submitted-header-height").Return(nil, ds.ErrNotFound).Maybe() + storeMock.EXPECT().GetMetadata(mock.Anything, "last-submitted-data-height").Return(nil, ds.ErrNotFound).Maybe() + storeMock.EXPECT().Height(mock.Anything).Return(uint64(0), nil).Maybe() + storeMock.EXPECT().SetMetadata(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + + cfg := config.Config{ + RootDir: t.TempDir(), + ClearCache: true, + } + cacheManager, err := cache.NewManager(cfg, storeMock, zerolog.Nop()) + require.NoError(t, err, "failed to create cache manager") + + handler := NewP2PHandler(headerStoreMock, dataStoreMock, cacheManager, gen, zerolog.Nop()) return &P2PTestData{ Handler: handler, HeaderStore: headerStoreMock, DataStore: dataStoreMock, + Cache: cacheManager, Genesis: gen, ProposerAddr: proposerAddr, ProposerPub: proposerPub, @@ -84,6 +107,29 @@ func setupP2P(t *testing.T) *P2PTestData { } } +// collectEvents reads events from a channel with a timeout +func collectEvents(t *testing.T, ch <-chan common.DAHeightEvent, timeout time.Duration) []common.DAHeightEvent { + t.Helper() + var events []common.DAHeightEvent + deadline := time.After(timeout) + for { + select { + case event := <-ch: + events = append(events, event) + case <-deadline: + return events + case <-time.After(10 * time.Millisecond): + // Give it a moment to check if more events are coming + select { + case event := <-ch: + events = append(events, event) + default: + return events + } + } + } +} + func TestP2PHandler_ProcessHeaderRange_HeaderAndDataHappyPath(t *testing.T) { p2pData := setupP2P(t) ctx := context.Background() @@ -104,10 +150,14 @@ func TestP2PHandler_ProcessHeaderRange_HeaderAndDataHappyPath(t *testing.T) { // Sanity: header should validate with data using default sync verifier require.NoError(t, signedHeader.ValidateBasicWithData(blockData), "header+data must validate before handler processes them") - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(signedHeader, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(blockData, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(signedHeader, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(blockData, nil).Once() + + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 5, 5, ch) - events := p2pData.Handler.ProcessHeaderRange(ctx, 5, 5) + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 1, "expected one event for the provided header/data height") require.Equal(t, uint64(5), events[0].Header.Height()) require.NotNil(t, events[0].Data) @@ -125,10 +175,14 @@ func TestP2PHandler_ProcessHeaderRange_MissingData_NonEmptyHash(t *testing.T) { blockData := makeData(p2pData.Genesis.ChainID, 7, 1) signedHeader.DataHash = blockData.DACommitment() - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(7)).Return(signedHeader, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(7)).Return(nil, errors.New("not found")).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(signedHeader, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(nil, errors.New("not found")).Once() - events := p2pData.Handler.ProcessHeaderRange(ctx, 7, 7) + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 7, 7, ch) + + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 0) } @@ -137,10 +191,14 @@ func TestP2PHandler_ProcessDataRange_HeaderMissing(t *testing.T) { ctx := context.Background() blockData := makeData(p2pData.Genesis.ChainID, 9, 1) - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(9)).Return(blockData, nil).Once() - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(9)).Return(nil, errors.New("no header")).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(blockData, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(nil, errors.New("no header")).Once() + + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessDataRange(ctx, 9, 9, ch) - events := p2pData.Handler.ProcessDataRange(ctx, 9, 9) + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 0) } @@ -154,50 +212,14 @@ func TestP2PHandler_ProposerMismatch_Rejected(t *testing.T) { signedHeader := p2pMakeSignedHeader(t, p2pData.Genesis.ChainID, 4, badAddr, pub, signer) signedHeader.DataHash = common.DataHashForEmptyTxs - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(4)).Return(signedHeader, nil).Once() - - events := p2pData.Handler.ProcessHeaderRange(ctx, 4, 4) - require.Len(t, events, 0) -} - -func TestP2PHandler_CreateEmptyDataForHeader_UsesPreviousDataHash(t *testing.T) { - p2pData := setupP2P(t) - ctx := context.Background() - - // Prepare a header at height 10 - signedHeader := p2pMakeSignedHeader(t, p2pData.Genesis.ChainID, 10, p2pData.ProposerAddr, p2pData.ProposerPub, p2pData.Signer) - signedHeader.DataHash = common.DataHashForEmptyTxs - - // Mock previous data at height 9 so handler can propagate its hash - previousData := makeData(p2pData.Genesis.ChainID, 9, 1) - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(9)).Return(previousData, nil).Once() - - emptyData := p2pData.Handler.createEmptyDataForHeader(ctx, signedHeader) - require.NotNil(t, emptyData, "handler should synthesize empty data when header declares empty data hash") - require.Equal(t, p2pData.Genesis.ChainID, emptyData.ChainID(), "synthesized data should carry header chain ID") - require.Equal(t, uint64(10), emptyData.Height(), "synthesized data should carry header height") - require.Equal(t, signedHeader.BaseHeader.Time, emptyData.Metadata.Time, "synthesized data should carry header time") - require.Equal(t, previousData.Hash(), emptyData.LastDataHash, "synthesized data should propagate previous data hash") -} - -func TestP2PHandler_CreateEmptyDataForHeader_NoPreviousData(t *testing.T) { - p2pData := setupP2P(t) - ctx := context.Background() - - // Prepare a header at height 2 (previous height exists but will return error) - signedHeader := p2pMakeSignedHeader(t, p2pData.Genesis.ChainID, 2, p2pData.ProposerAddr, p2pData.ProposerPub, p2pData.Signer) - signedHeader.DataHash = common.DataHashForEmptyTxs + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(4)).Return(signedHeader, nil).Once() - // Mock previous data fetch failure - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(1)).Return(nil, errors.New("not available")).Once() + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 4, 4, ch) - emptyData := p2pData.Handler.createEmptyDataForHeader(ctx, signedHeader) - require.NotNil(t, emptyData, "handler should synthesize empty data even when previous data is unavailable") - require.Equal(t, p2pData.Genesis.ChainID, emptyData.ChainID(), "synthesized data should carry header chain ID") - require.Equal(t, uint64(2), emptyData.Height(), "synthesized data should carry header height") - require.Equal(t, signedHeader.BaseHeader.Time, emptyData.Metadata.Time, "synthesized data should carry header time") - // When no previous data is available, LastDataHash should be zero value - require.Equal(t, (types.Hash)(nil), emptyData.LastDataHash, "last data hash should be empty when previous data is not available") + events := collectEvents(t, ch, 100*time.Millisecond) + require.Len(t, events, 0) } func TestP2PHandler_ProcessHeaderRange_MultipleHeightsHappyPath(t *testing.T) { @@ -229,12 +251,16 @@ func TestP2PHandler_ProcessHeaderRange_MultipleHeightsHappyPath(t *testing.T) { require.NoError(t, header6.ValidateBasicWithData(data6), "header/data invalid for height 6") // Expectations for both heights - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(header5, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(data5, nil).Once() - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(6)).Return(header6, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(6)).Return(data6, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(header5, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(data5, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(header6, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(data6, nil).Once() + + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 5, 6, ch) - events := p2pData.Handler.ProcessHeaderRange(ctx, 5, 6) + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 2, "expected two events for heights 5 and 6") require.Equal(t, uint64(5), events[0].Header.Height(), "first event should be height 5") require.Equal(t, uint64(6), events[1].Header.Height(), "second event should be height 6") @@ -248,14 +274,18 @@ func TestP2PHandler_ProcessDataRange_HeaderValidateHeaderFails(t *testing.T) { // Data exists at height 3 blockData := makeData(p2pData.Genesis.ChainID, 3, 1) - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(3)).Return(blockData, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(3)).Return(blockData, nil).Once() // Header proposer does not match genesis -> validateHeader should fail badAddr, pub, signer := buildTestSigner(t) require.NotEqual(t, string(p2pData.Genesis.ProposerAddress), string(badAddr), "negative test requires mismatched proposer") badHeader := p2pMakeSignedHeader(t, p2pData.Genesis.ChainID, 3, badAddr, pub, signer) - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(3)).Return(badHeader, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(3)).Return(badHeader, nil).Once() + + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessDataRange(ctx, 3, 3, ch) - events := p2pData.Handler.ProcessDataRange(ctx, 3, 3) + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 0, "validateHeader failure should drop event") } diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 4e64188b06..dde602e5a8 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -25,9 +25,10 @@ import ( type daRetriever interface { RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) } + type p2pHandler interface { - ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent - ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent + ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64, heightInCh chan<- common.DAHeightEvent) + ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64, heightInCh chan<- common.DAHeightEvent) } // Syncer handles block synchronization from DA and P2P sources. @@ -117,8 +118,8 @@ func (s *Syncer) Start(ctx context.Context) error { } // Initialize handlers - s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.options, s.logger) - s.p2pHandler = NewP2PHandler(s.headerStore, s.dataStore, s.genesis, s.options, s.logger) + s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.logger) + s.p2pHandler = NewP2PHandler(s.headerStore, s.dataStore, s.cache, s.genesis, s.logger) // Start main processing loop s.wg.Add(1) @@ -236,15 +237,6 @@ func (s *Syncer) syncLoop() { } } - initialHeight, err := s.store.Height(s.ctx) - if err != nil { - s.logger.Error().Err(err).Msg("failed to get initial height") - return - } - - lastHeaderHeight := &initialHeight - lastDataHeight := &initialHeight - // Backoff control when DA replies with errors nextDARequestAt := &time.Time{} @@ -252,20 +244,36 @@ func (s *Syncer) syncLoop() { defer blockTicker.Stop() for { + wg := sync.WaitGroup{} + select { case <-s.ctx.Done(): return default: } - // Process pending events from cache on every iteration - s.processPendingEvents() - - fetchedP2pEvent := s.tryFetchFromP2P(lastHeaderHeight, lastDataHeight, blockTicker.C) - fetchedDaEvent := s.tryFetchFromDA(nextDARequestAt) - - // Prevent busy-waiting when no events are available - if !fetchedDaEvent && !fetchedP2pEvent { - time.Sleep(min(10*time.Millisecond, s.config.Node.BlockTime.Duration)) + wg.Add(1) + go func() { + defer wg.Done() + s.processPendingEvents() + }() + + wg.Add(1) + go func() { + defer wg.Done() + s.tryFetchFromP2P() + }() + + wg.Add(1) + go func() { + defer wg.Done() + s.tryFetchFromDA(nextDARequestAt) + }() + + // Prevent busy-waiting when no events are processed + select { + case <-s.ctx.Done(): + return + case <-time.After(min(10*time.Millisecond, s.config.Node.BlockTime.Duration)): } } } @@ -273,13 +281,13 @@ func (s *Syncer) syncLoop() { // tryFetchFromDA attempts to fetch events from the DA layer. // It handles backoff timing, DA height management, and error classification. // Returns true if any events were successfully processed. -func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { +func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) { now := time.Now() daHeight := s.GetDAHeight() // Respect backoff window if set if !nextDARequestAt.IsZero() && now.Before(*nextDARequestAt) { - return false + return } // Retrieve from DA as fast as possible (unless throttled by HFF) @@ -291,7 +299,7 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { s.SetDAHeight(daHeight + 1) // Reset backoff on success *nextDARequestAt = time.Time{} - return false + return } // Back off exactly by DA block time to avoid overloading @@ -303,7 +311,7 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { s.logger.Error().Err(err).Dur("delay", backoffDelay).Uint64("da_height", daHeight).Msg("failed to retrieve from DA; backing off DA requests") - return false + return } // Reset backoff on success @@ -320,57 +328,29 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { // increment DA height on successful retrieval s.SetDAHeight(daHeight + 1) - return len(events) > 0 } // tryFetchFromP2P attempts to fetch events from P2P stores. // It processes both header and data ranges when the block ticker fires. // Returns true if any events were successfully processed. -func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, blockTicker <-chan time.Time) bool { - eventsProcessed := false - - select { - case <-blockTicker: - // Process headers - newHeaderHeight := s.headerStore.Height() - if newHeaderHeight > *lastHeaderHeight { - events := s.p2pHandler.ProcessHeaderRange(s.ctx, *lastHeaderHeight+1, newHeaderHeight) - for _, event := range events { - select { - case s.heightInCh <- event: - default: - s.cache.SetPendingEvent(event.Header.Height(), &event) - } - } - *lastHeaderHeight = newHeaderHeight - if len(events) > 0 { - eventsProcessed = true - } - } +func (s *Syncer) tryFetchFromP2P() { + currentHeight, err := s.store.Height(s.ctx) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get current height") + return + } - // Process data - newDataHeight := s.dataStore.Height() - if newDataHeight == newHeaderHeight { - *lastDataHeight = newDataHeight - } else if newDataHeight > *lastDataHeight { - events := s.p2pHandler.ProcessDataRange(s.ctx, *lastDataHeight+1, newDataHeight) - for _, event := range events { - select { - case s.heightInCh <- event: - default: - s.cache.SetPendingEvent(event.Header.Height(), &event) - } - } - *lastDataHeight = newDataHeight - if len(events) > 0 { - eventsProcessed = true - } - } - default: - // No P2P events available + // Process headers + newHeaderHeight := s.headerStore.Height() + if newHeaderHeight > currentHeight { + s.p2pHandler.ProcessHeaderRange(s.ctx, currentHeight+1, newHeaderHeight, s.heightInCh) } - return eventsProcessed + // Process data (if not already processed by headers) + newDataHeight := s.dataStore.Height() + if newDataHeight != newHeaderHeight && newDataHeight > currentHeight { + s.p2pHandler.ProcessDataRange(s.ctx, currentHeight+1, newDataHeight, s.heightInCh) + } } func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { @@ -403,6 +383,16 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { return } + // LastDataHash must be gotten from store when the data hash is empty. + if bytes.Equal(event.Header.DataHash, common.DataHashForEmptyTxs) { + _, lastData, err := s.store.GetBlockData(s.ctx, currentHeight) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get last data") + return + } + event.Data.LastDataHash = lastData.Hash() + } + // Try to sync the next block if err := s.trySyncNextBlock(event); err != nil { s.logger.Error().Err(err).Msg("failed to sync next block") diff --git a/block/internal/syncing/syncer_mock.go b/block/internal/syncing/syncer_mock.go index 2413db59cd..85cad46960 100644 --- a/block/internal/syncing/syncer_mock.go +++ b/block/internal/syncing/syncer_mock.go @@ -134,22 +134,9 @@ func (_m *mockp2pHandler) EXPECT() *mockp2pHandler_Expecter { } // ProcessDataRange provides a mock function for the type mockp2pHandler -func (_mock *mockp2pHandler) ProcessDataRange(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent { - ret := _mock.Called(ctx, fromHeight, toHeight) - - if len(ret) == 0 { - panic("no return value specified for ProcessDataRange") - } - - var r0 []common.DAHeightEvent - if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64) []common.DAHeightEvent); ok { - r0 = returnFunc(ctx, fromHeight, toHeight) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]common.DAHeightEvent) - } - } - return r0 +func (_mock *mockp2pHandler) ProcessDataRange(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent) { + _mock.Called(ctx, fromHeight, toHeight, heightInCh) + return } // mockp2pHandler_ProcessDataRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessDataRange' @@ -161,11 +148,12 @@ type mockp2pHandler_ProcessDataRange_Call struct { // - ctx context.Context // - fromHeight uint64 // - toHeight uint64 -func (_e *mockp2pHandler_Expecter) ProcessDataRange(ctx interface{}, fromHeight interface{}, toHeight interface{}) *mockp2pHandler_ProcessDataRange_Call { - return &mockp2pHandler_ProcessDataRange_Call{Call: _e.mock.On("ProcessDataRange", ctx, fromHeight, toHeight)} +// - heightInCh chan<- common.DAHeightEvent +func (_e *mockp2pHandler_Expecter) ProcessDataRange(ctx interface{}, fromHeight interface{}, toHeight interface{}, heightInCh interface{}) *mockp2pHandler_ProcessDataRange_Call { + return &mockp2pHandler_ProcessDataRange_Call{Call: _e.mock.On("ProcessDataRange", ctx, fromHeight, toHeight, heightInCh)} } -func (_c *mockp2pHandler_ProcessDataRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64)) *mockp2pHandler_ProcessDataRange_Call { +func (_c *mockp2pHandler_ProcessDataRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessDataRange_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { @@ -179,42 +167,34 @@ func (_c *mockp2pHandler_ProcessDataRange_Call) Run(run func(ctx context.Context if args[2] != nil { arg2 = args[2].(uint64) } + var arg3 chan<- common.DAHeightEvent + if args[3] != nil { + arg3 = args[3].(chan<- common.DAHeightEvent) + } run( arg0, arg1, arg2, + arg3, ) }) return _c } -func (_c *mockp2pHandler_ProcessDataRange_Call) Return(dAHeightEvents []common.DAHeightEvent) *mockp2pHandler_ProcessDataRange_Call { - _c.Call.Return(dAHeightEvents) +func (_c *mockp2pHandler_ProcessDataRange_Call) Return() *mockp2pHandler_ProcessDataRange_Call { + _c.Call.Return() return _c } -func (_c *mockp2pHandler_ProcessDataRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent) *mockp2pHandler_ProcessDataRange_Call { - _c.Call.Return(run) +func (_c *mockp2pHandler_ProcessDataRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessDataRange_Call { + _c.Run(run) return _c } // ProcessHeaderRange provides a mock function for the type mockp2pHandler -func (_mock *mockp2pHandler) ProcessHeaderRange(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent { - ret := _mock.Called(ctx, fromHeight, toHeight) - - if len(ret) == 0 { - panic("no return value specified for ProcessHeaderRange") - } - - var r0 []common.DAHeightEvent - if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64) []common.DAHeightEvent); ok { - r0 = returnFunc(ctx, fromHeight, toHeight) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]common.DAHeightEvent) - } - } - return r0 +func (_mock *mockp2pHandler) ProcessHeaderRange(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent) { + _mock.Called(ctx, fromHeight, toHeight, heightInCh) + return } // mockp2pHandler_ProcessHeaderRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessHeaderRange' @@ -226,11 +206,12 @@ type mockp2pHandler_ProcessHeaderRange_Call struct { // - ctx context.Context // - fromHeight uint64 // - toHeight uint64 -func (_e *mockp2pHandler_Expecter) ProcessHeaderRange(ctx interface{}, fromHeight interface{}, toHeight interface{}) *mockp2pHandler_ProcessHeaderRange_Call { - return &mockp2pHandler_ProcessHeaderRange_Call{Call: _e.mock.On("ProcessHeaderRange", ctx, fromHeight, toHeight)} +// - heightInCh chan<- common.DAHeightEvent +func (_e *mockp2pHandler_Expecter) ProcessHeaderRange(ctx interface{}, fromHeight interface{}, toHeight interface{}, heightInCh interface{}) *mockp2pHandler_ProcessHeaderRange_Call { + return &mockp2pHandler_ProcessHeaderRange_Call{Call: _e.mock.On("ProcessHeaderRange", ctx, fromHeight, toHeight, heightInCh)} } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64)) *mockp2pHandler_ProcessHeaderRange_Call { +func (_c *mockp2pHandler_ProcessHeaderRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessHeaderRange_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { @@ -244,21 +225,26 @@ func (_c *mockp2pHandler_ProcessHeaderRange_Call) Run(run func(ctx context.Conte if args[2] != nil { arg2 = args[2].(uint64) } + var arg3 chan<- common.DAHeightEvent + if args[3] != nil { + arg3 = args[3].(chan<- common.DAHeightEvent) + } run( arg0, arg1, arg2, + arg3, ) }) return _c } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) Return(dAHeightEvents []common.DAHeightEvent) *mockp2pHandler_ProcessHeaderRange_Call { - _c.Call.Return(dAHeightEvents) +func (_c *mockp2pHandler_ProcessHeaderRange_Call) Return() *mockp2pHandler_ProcessHeaderRange_Call { + _c.Call.Return() return _c } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent) *mockp2pHandler_ProcessHeaderRange_Call { - _c.Call.Return(run) +func (_c *mockp2pHandler_ProcessHeaderRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessHeaderRange_Call { + _c.Run(run) return _c } From f3af490ee4f91ca374a1332de49bcfa27e756ece Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Oct 2025 14:58:44 +0200 Subject: [PATCH 2/6] fix tests --- block/internal/syncing/syncer.go | 2 +- block/internal/syncing/syncer_test.go | 29 ++++++++++++++++++--------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index dde602e5a8..d3b75de8a4 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -384,7 +384,7 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { } // LastDataHash must be gotten from store when the data hash is empty. - if bytes.Equal(event.Header.DataHash, common.DataHashForEmptyTxs) { + if bytes.Equal(event.Header.DataHash, common.DataHashForEmptyTxs) && currentHeight > 0 { _, lastData, err := s.store.GetBlockData(s.ctx, currentHeight) if err != nil { s.logger.Error().Err(err).Msg("failed to get last data") diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 0fe7d5192e..7d1c52629c 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -13,6 +13,7 @@ import ( signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" testmocks "github.com/evstack/ev-node/test/mocks" + mocks "github.com/evstack/ev-node/test/mocks/external" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" @@ -107,8 +108,8 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mocks.MockStore[*types.SignedHeader]{}, + &mocks.MockStore[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -155,8 +156,8 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mocks.MockStore[*types.SignedHeader]{}, + &mocks.MockStore[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -205,8 +206,8 @@ func TestSequentialBlockSync(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mocks.MockStore[*types.SignedHeader]{}, + &mocks.MockStore[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -323,6 +324,10 @@ func TestSyncLoopPersistState(t *testing.T) { gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr, DAStartHeight: myDAHeightOffset} dummyExec := execution.NewDummyExecutor() + mockP2PHeaderStore := &mocks.MockStore[*types.SignedHeader]{} + mockP2PDataStore := &mocks.MockStore[*types.Data]{} + mockP2PHeaderStore.On("Height", mock.Anything).Return(uint64(1), nil).Maybe() + mockP2PDataStore.On("Height", mock.Anything).Return(uint64(1), nil).Maybe() syncerInst1 := NewSyncer( st, @@ -332,8 +337,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + mockP2PHeaderStore, + mockP2PDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -343,6 +348,8 @@ func TestSyncLoopPersistState(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) syncerInst1.ctx = ctx daRtrMock, p2pHndlMock := newMockdaRetriever(t), newMockp2pHandler(t) + p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() syncerInst1.daRetriever, syncerInst1.p2pHandler = daRtrMock, p2pHndlMock // with n da blobs fetched @@ -409,8 +416,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + mockP2PHeaderStore, + mockP2PDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -422,6 +429,8 @@ func TestSyncLoopPersistState(t *testing.T) { t.Cleanup(cancel) syncerInst2.ctx = ctx daRtrMock, p2pHndlMock = newMockdaRetriever(t), newMockp2pHandler(t) + p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() syncerInst2.daRetriever, syncerInst2.p2pHandler = daRtrMock, p2pHndlMock daRtrMock.On("RetrieveFromDA", mock.Anything, mock.Anything). From 5db271205cfbce12d10c3e1a6cd0954ff37f1f83 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Oct 2025 15:02:30 +0200 Subject: [PATCH 3/6] forgot wait --- block/internal/syncing/syncer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 8d86b0459b..548ffcde41 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -269,6 +269,9 @@ func (s *Syncer) syncLoop() { s.tryFetchFromDA(nextDARequestAt) }() + // wait for pending events processing, p2p and da fetching + wg.Wait() + // Prevent busy-waiting when no events are processed select { case <-s.ctx.Done(): From 4cec38edba0aeab8070a081c1798223bb55c2139 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Oct 2025 15:07:09 +0200 Subject: [PATCH 4/6] re-add test --- block/internal/syncing/p2p_handler_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/block/internal/syncing/p2p_handler_test.go b/block/internal/syncing/p2p_handler_test.go index c6d9ef6406..090ef4a7ee 100644 --- a/block/internal/syncing/p2p_handler_test.go +++ b/block/internal/syncing/p2p_handler_test.go @@ -222,6 +222,22 @@ func TestP2PHandler_ProposerMismatch_Rejected(t *testing.T) { require.Len(t, events, 0) } +func TestP2PHandler_CreateEmptyDataForHeader(t *testing.T) { + p2pData := setupP2P(t) + ctx := context.Background() + + // Prepare a header at height 2 (previous height exists but will return error) + signedHeader := p2pMakeSignedHeader(t, p2pData.Genesis.ChainID, 2, p2pData.ProposerAddr, p2pData.ProposerPub, p2pData.Signer) + signedHeader.DataHash = common.DataHashForEmptyTxs + + emptyData := createEmptyDataForHeader(ctx, signedHeader) + require.NotNil(t, emptyData, "handler should synthesize empty data even when previous data is unavailable") + require.Equal(t, p2pData.Genesis.ChainID, emptyData.ChainID(), "synthesized data should carry header chain ID") + require.Equal(t, uint64(2), emptyData.Height(), "synthesized data should carry header height") + require.Equal(t, signedHeader.BaseHeader.Time, emptyData.Metadata.Time, "synthesized data should carry header time") + require.Equal(t, (types.Hash)(nil), emptyData.LastDataHash) +} + func TestP2PHandler_ProcessHeaderRange_MultipleHeightsHappyPath(t *testing.T) { p2pData := setupP2P(t) ctx := context.Background() From bc3d57a188bad48130034b456c5b9211bde3f6c5 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Oct 2025 15:09:52 +0200 Subject: [PATCH 5/6] synchronous again --- block/internal/syncing/syncer.go | 29 ++++------------------------- 1 file changed, 4 insertions(+), 25 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 548ffcde41..d1dc7253a5 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -240,37 +240,16 @@ func (s *Syncer) syncLoop() { // Backoff control when DA replies with errors nextDARequestAt := &time.Time{} - blockTicker := time.NewTicker(s.config.Node.BlockTime.Duration) - defer blockTicker.Stop() - for { - wg := sync.WaitGroup{} - select { case <-s.ctx.Done(): return default: } - wg.Add(1) - go func() { - defer wg.Done() - s.processPendingEvents() - }() - - wg.Add(1) - go func() { - defer wg.Done() - s.tryFetchFromP2P() - }() - - wg.Add(1) - go func() { - defer wg.Done() - s.tryFetchFromDA(nextDARequestAt) - }() - - // wait for pending events processing, p2p and da fetching - wg.Wait() + + s.processPendingEvents() + s.tryFetchFromP2P() + s.tryFetchFromDA(nextDARequestAt) // Prevent busy-waiting when no events are processed select { From abd24f7ca8bd9d2c9e224b5987c09efd41da427e Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Oct 2025 18:18:48 +0200 Subject: [PATCH 6/6] updates --- test/e2e/evm_full_node_e2e_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e/evm_full_node_e2e_test.go b/test/e2e/evm_full_node_e2e_test.go index 5d10726722..2bff86d93c 100644 --- a/test/e2e/evm_full_node_e2e_test.go +++ b/test/e2e/evm_full_node_e2e_test.go @@ -426,8 +426,8 @@ func TestEvmSequencerWithFullNodeE2E(t *testing.T) { t.Logf("Full node block height before DA inclusion wait: %d", fnBlockHeightBeforeWait) // Wait a few seconds to allow DA inclusion to process - waitTime := 2 * time.Second - t.Logf("Waiting %v 2s for DA inclusion to process...", waitTime) + waitTime := 4 * time.Second + t.Logf("Waiting %v for DA inclusion to process...", waitTime) time.Sleep(waitTime) // Get the DA included height from full node after the wait