diff --git a/block/internal/da/client_test.go b/block/internal/da/client_test.go index f9da72523b..1fa8414781 100644 --- a/block/internal/da/client_test.go +++ b/block/internal/da/client_test.go @@ -523,3 +523,226 @@ func TestClient_Retrieve_Timeout(t *testing.T) { assert.Assert(t, result.Message != "") }) } + +func TestClient_RetrieveHeaders(t *testing.T) { + logger := zerolog.Nop() + dataLayerHeight := uint64(100) + mockIDs := [][]byte{[]byte("id1")} + mockBlobs := [][]byte{[]byte("header-blob")} + mockTimestamp := time.Now() + + mockDAInstance := &mockDA{ + getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) { + return &coreda.GetIDsResult{ + IDs: mockIDs, + Timestamp: mockTimestamp, + }, nil + }, + getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) { + return mockBlobs, nil + }, + } + + client := NewClient(Config{ + DA: mockDAInstance, + Logger: logger, + Namespace: "test-header-ns", + DataNamespace: "test-data-ns", + }) + + result := client.RetrieveHeaders(context.Background(), dataLayerHeight) + + assert.Equal(t, coreda.StatusSuccess, result.Code) + assert.Equal(t, dataLayerHeight, result.Height) + assert.Equal(t, len(mockBlobs), len(result.Data)) +} + +func TestClient_RetrieveData(t *testing.T) { + logger := zerolog.Nop() + dataLayerHeight := uint64(200) + mockIDs := [][]byte{[]byte("id1"), []byte("id2")} + mockBlobs := [][]byte{[]byte("data-blob-1"), []byte("data-blob-2")} + mockTimestamp := time.Now() + + mockDAInstance := &mockDA{ + getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) { + return &coreda.GetIDsResult{ + IDs: mockIDs, + Timestamp: mockTimestamp, + }, nil + }, + getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) { + return mockBlobs, nil + }, + } + + client := NewClient(Config{ + DA: mockDAInstance, + Logger: logger, + Namespace: "test-header-ns", + DataNamespace: "test-data-ns", + }) + + result := client.RetrieveData(context.Background(), dataLayerHeight) + + assert.Equal(t, coreda.StatusSuccess, result.Code) + assert.Equal(t, dataLayerHeight, result.Height) + assert.Equal(t, len(mockBlobs), len(result.Data)) +} + +func TestClient_RetrieveBatched(t *testing.T) { + logger := zerolog.Nop() + dataLayerHeight := uint64(100) + + // Create 200 IDs to exceed default batch size + numIDs := 200 + mockIDs := make([][]byte, numIDs) + for i := range numIDs { + mockIDs[i] = []byte{byte(i)} + } + + // Track which batches were requested + batchCalls := []int{} + + mockDAInstance := &mockDA{ + getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) { + return &coreda.GetIDsResult{ + IDs: mockIDs, + Timestamp: time.Now(), + }, nil + }, + getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) { + batchCalls = append(batchCalls, len(ids)) + // Return a blob for each ID in the batch + blobs := make([][]byte, len(ids)) + for i := range ids { + blobs[i] = []byte("blob") + } + return blobs, nil + }, + } + + client := NewClient(Config{ + DA: mockDAInstance, + Logger: logger, + Namespace: "test-ns", + DataNamespace: "test-data-ns", + RetrieveBatchSize: 50, // Set smaller batch size for testing + }) + + encodedNamespace := coreda.NamespaceFromString("test-ns") + result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes()) + + assert.Equal(t, coreda.StatusSuccess, result.Code) + assert.Equal(t, numIDs, len(result.Data)) + + // Should have made 4 batches: 50 + 50 + 50 + 50 = 200 + assert.Equal(t, 4, len(batchCalls)) + assert.Equal(t, 50, batchCalls[0]) + assert.Equal(t, 50, batchCalls[1]) + assert.Equal(t, 50, batchCalls[2]) + assert.Equal(t, 50, batchCalls[3]) +} + +func TestClient_RetrieveBatched_PartialBatch(t *testing.T) { + logger := zerolog.Nop() + dataLayerHeight := uint64(100) + + // Create 175 IDs to test partial batch at the end + numIDs := 175 + mockIDs := make([][]byte, numIDs) + for i := range numIDs { + mockIDs[i] = []byte{byte(i)} + } + + batchCalls := []int{} + + mockDAInstance := &mockDA{ + getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) { + return &coreda.GetIDsResult{ + IDs: mockIDs, + Timestamp: time.Now(), + }, nil + }, + getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) { + batchCalls = append(batchCalls, len(ids)) + blobs := make([][]byte, len(ids)) + for i := range ids { + blobs[i] = []byte("blob") + } + return blobs, nil + }, + } + + client := NewClient(Config{ + DA: mockDAInstance, + Logger: logger, + Namespace: "test-ns", + DataNamespace: "test-data-ns", + RetrieveBatchSize: 50, + }) + + encodedNamespace := coreda.NamespaceFromString("test-ns") + result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes()) + + assert.Equal(t, coreda.StatusSuccess, result.Code) + assert.Equal(t, numIDs, len(result.Data)) + + // Should have made 4 batches: 50 + 50 + 50 + 25 = 175 + assert.Equal(t, 4, len(batchCalls)) + assert.Equal(t, 50, batchCalls[0]) + assert.Equal(t, 50, batchCalls[1]) + assert.Equal(t, 50, batchCalls[2]) + assert.Equal(t, 25, batchCalls[3]) // Partial batch +} + +func TestClient_RetrieveBatched_ErrorInSecondBatch(t *testing.T) { + logger := zerolog.Nop() + dataLayerHeight := uint64(100) + + // Create 200 IDs to require multiple batches + numIDs := 200 + mockIDs := make([][]byte, numIDs) + for i := range numIDs { + mockIDs[i] = []byte{byte(i)} + } + + batchCallCount := 0 + + mockDAInstance := &mockDA{ + getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) { + return &coreda.GetIDsResult{ + IDs: mockIDs, + Timestamp: time.Now(), + }, nil + }, + getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) { + batchCallCount++ + // Fail on second batch + if batchCallCount == 2 { + return nil, errors.New("network error in batch 2") + } + blobs := make([][]byte, len(ids)) + for i := range ids { + blobs[i] = []byte("blob") + } + return blobs, nil + }, + } + + client := NewClient(Config{ + DA: mockDAInstance, + Logger: logger, + Namespace: "test-ns", + DataNamespace: "test-data-ns", + RetrieveBatchSize: 50, + }) + + encodedNamespace := coreda.NamespaceFromString("test-ns") + result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes()) + + assert.Equal(t, coreda.StatusError, result.Code) + assert.Assert(t, result.Message != "") + // Error message should mention the batch range + assert.Assert(t, len(result.Message) > 0) +} diff --git a/block/internal/da/forced_inclusion_retriever.go b/block/internal/da/forced_inclusion_retriever.go index 18bb180668..c5ff15945a 100644 --- a/block/internal/da/forced_inclusion_retriever.go +++ b/block/internal/da/forced_inclusion_retriever.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/rs/zerolog" @@ -25,6 +26,7 @@ type ForcedInclusionRetriever struct { // ForcedInclusionEvent contains forced inclusion transactions retrieved from DA. type ForcedInclusionEvent struct { + Timestamp time.Time StartDaHeight uint64 EndDaHeight uint64 Txs [][]byte @@ -158,6 +160,10 @@ func (r *ForcedInclusionRetriever) processForcedInclusionBlobs( } } + if result.Timestamp.After(event.Timestamp) { + event.Timestamp = result.Timestamp + } + r.logger.Debug(). Uint64("height", height). Int("blob_count", len(result.Data)). diff --git a/block/internal/da/forced_inclusion_retriever_test.go b/block/internal/da/forced_inclusion_retriever_test.go index e51bbe22f1..4492da3f6c 100644 --- a/block/internal/da/forced_inclusion_retriever_test.go +++ b/block/internal/da/forced_inclusion_retriever_test.go @@ -244,6 +244,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_MultiHeightEpoch(t * assert.Assert(t, event != nil) assert.Equal(t, event.StartDaHeight, uint64(100)) assert.Equal(t, event.EndDaHeight, uint64(102)) + assert.Assert(t, event.Timestamp.After(time.Time{})) // Should have collected all txs from all heights expectedTxCount := len(testBlobsByHeight[100]) + len(testBlobsByHeight[101]) + len(testBlobsByHeight[102]) @@ -334,6 +335,7 @@ func TestForcedInclusionRetriever_processForcedInclusionBlobs(t *testing.T) { } else { assert.NilError(t, err) assert.Equal(t, len(event.Txs), tt.expectedTxCount) + assert.Equal(t, event.Timestamp, time.Time{}) } }) } diff --git a/sequencers/based/sequencer.go b/sequencers/based/sequencer.go index 8845183e64..7960cd52df 100644 --- a/sequencers/based/sequencer.go +++ b/sequencers/based/sequencer.go @@ -37,6 +37,8 @@ type BasedSequencer struct { // Cached transactions from the current DA block being processed currentBatchTxs [][]byte + // DA epoch end time for timestamp calculation + currentDAEndTime time.Time } // NewBasedSequencer creates a new based sequencer instance @@ -97,13 +99,15 @@ func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.Get // If we have no cached transactions or we've consumed all from the current DA block, // fetch the next DA epoch daHeight := s.GetDAHeight() + if len(s.currentBatchTxs) == 0 || s.checkpoint.TxIndex >= uint64(len(s.currentBatchTxs)) { - daEndHeight, err := s.fetchNextDAEpoch(ctx, req.MaxBytes) + daEndTime, daEndHeight, err := s.fetchNextDAEpoch(ctx, req.MaxBytes) if err != nil { return nil, err } daHeight = daEndHeight + s.currentDAEndTime = daEndTime } // Create batch from current position up to MaxBytes @@ -129,15 +133,21 @@ func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.Get } } + // Calculate timestamp based on remaining transactions after this batch + // timestamp correspond to the last block time of a DA epoch, based on the remaining transactions to be executed + // this is done in order to handle the case where a DA epoch must fit in multiple blocks + remainingTxs := uint64(len(s.currentBatchTxs)) - s.checkpoint.TxIndex + timestamp := s.currentDAEndTime.Add(-time.Duration(remainingTxs) * time.Millisecond) + return &coresequencer.GetNextBatchResponse{ Batch: batch, - Timestamp: time.Time{}, // TODO(@julienrbrt): we need to use DA block timestamp for determinism + Timestamp: timestamp, BatchData: req.LastBatchData, }, nil } // fetchNextDAEpoch fetches transactions from the next DA epoch -func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint64, error) { +func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (time.Time, uint64, error) { currentDAHeight := s.checkpoint.DAHeight s.logger.Debug(). @@ -149,16 +159,16 @@ func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) if err != nil { // Check if forced inclusion is not configured if errors.Is(err, block.ErrForceInclusionNotConfigured) { - return 0, block.ErrForceInclusionNotConfigured + return time.Time{}, 0, block.ErrForceInclusionNotConfigured } else if errors.Is(err, coreda.ErrHeightFromFuture) { // If we get a height from future error, stay at current position // We'll retry the same height on the next call until DA produces that block s.logger.Debug(). Uint64("da_height", currentDAHeight). Msg("DA height from future, waiting for DA to produce block") - return 0, nil + return time.Time{}, 0, nil } - return 0, fmt.Errorf("failed to retrieve forced inclusion transactions: %w", err) + return time.Time{}, 0, fmt.Errorf("failed to retrieve forced inclusion transactions: %w", err) } // Validate and filter transactions @@ -188,7 +198,7 @@ func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) // Cache the transactions for this DA epoch s.currentBatchTxs = validTxs - return forcedTxsEvent.EndDaHeight, nil + return forcedTxsEvent.Timestamp.UTC(), forcedTxsEvent.EndDaHeight, nil } // createBatchFromCheckpoint creates a batch from the current checkpoint position respecting MaxBytes diff --git a/sequencers/based/sequencer_test.go b/sequencers/based/sequencer_test.go index 30a4bd6118..eb29e6bffb 100644 --- a/sequencers/based/sequencer_test.go +++ b/sequencers/based/sequencer_test.go @@ -3,6 +3,7 @@ package based import ( "context" "testing" + "time" ds "github.com/ipfs/go-datastore" syncds "github.com/ipfs/go-datastore/sync" @@ -539,3 +540,142 @@ func TestBasedSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T mockRetriever.AssertExpectations(t) } + +func TestBasedSequencer_GetNextBatch_TimestampAdjustment(t *testing.T) { + // Test that timestamp is adjusted based on the number of transactions in the batch + // The timestamp should be: daEndTime - (len(batch.Transactions) * 1ms) + + testBlobs := [][]byte{[]byte("tx1"), []byte("tx2"), []byte("tx3")} + daEndTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + + mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ + Txs: testBlobs, + StartDaHeight: 100, + EndDaHeight: 100, + Timestamp: daEndTime, + }, nil) + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq := createTestSequencer(t, mockRetriever, gen) + + req := coresequencer.GetNextBatchRequest{ + MaxBytes: 1000000, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(context.Background(), req) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, resp.Batch) + assert.Equal(t, 3, len(resp.Batch.Transactions)) + + // After taking all 3 txs, there are 0 remaining, so timestamp = daEndTime - 0ms = daEndTime + expectedTimestamp := daEndTime + assert.Equal(t, expectedTimestamp, resp.Timestamp) + + mockRetriever.AssertExpectations(t) +} + +func TestBasedSequencer_GetNextBatch_TimestampAdjustment_PartialBatch(t *testing.T) { + // Test timestamp adjustment when MaxBytes limits the batch size + tx1 := make([]byte, 100) + tx2 := make([]byte, 150) + tx3 := make([]byte, 200) + testBlobs := [][]byte{tx1, tx2, tx3} + daEndTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + + mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ + Txs: testBlobs, + StartDaHeight: 100, + EndDaHeight: 100, + Timestamp: daEndTime, + }, nil) + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq := createTestSequencer(t, mockRetriever, gen) + + // First call with MaxBytes that fits only first 2 transactions + req := coresequencer.GetNextBatchRequest{ + MaxBytes: 250, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(context.Background(), req) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, resp.Batch) + assert.Equal(t, 2, len(resp.Batch.Transactions)) + + // After taking 2 txs, there is 1 remaining, so timestamp = daEndTime - 1ms + expectedTimestamp := daEndTime.Add(-1 * time.Millisecond) + assert.Equal(t, expectedTimestamp, resp.Timestamp) + + // Second call should get the remaining transaction + req = coresequencer.GetNextBatchRequest{ + MaxBytes: 1000, + LastBatchData: nil, + } + + // The second call uses cached transactions - timestamp should be based on remaining txs + resp, err = seq.GetNextBatch(context.Background(), req) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, resp.Batch) + assert.Equal(t, 1, len(resp.Batch.Transactions)) + + // After taking this 1 tx, there are 0 remaining, so timestamp = daEndTime - 0ms = daEndTime + expectedTimestamp2 := daEndTime + assert.Equal(t, expectedTimestamp2, resp.Timestamp) + + mockRetriever.AssertExpectations(t) +} + +func TestBasedSequencer_GetNextBatch_TimestampAdjustment_EmptyBatch(t *testing.T) { + // Test that timestamp is zero when batch is empty + daEndTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + + mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ + Txs: [][]byte{}, + StartDaHeight: 100, + EndDaHeight: 100, + Timestamp: daEndTime, + }, nil) + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq := createTestSequencer(t, mockRetriever, gen) + + req := coresequencer.GetNextBatchRequest{ + MaxBytes: 1000000, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(context.Background(), req) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, resp.Batch) + assert.Equal(t, 0, len(resp.Batch.Transactions)) + + // When batch is empty, there are 0 remaining txs, so timestamp = daEndTime + expectedTimestamp := daEndTime + assert.Equal(t, expectedTimestamp, resp.Timestamp) + + mockRetriever.AssertExpectations(t) +}