Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2e769b5
refactor(sequencers): persist prepended batch
julienrbrt Dec 3, 2025
e38887b
refactor(sequencers): implement tx queue persistance for based seq
julienrbrt Dec 3, 2025
0bd6be4
ai one shot
julienrbrt Dec 4, 2025
21ad0fc
Merge branch 'main' into julien/persist-base
julienrbrt Dec 4, 2025
95e06b7
simplify
julienrbrt Dec 5, 2025
87f9da5
describe checkpoint design and generalize
julienrbrt Dec 5, 2025
6890d32
update docs
julienrbrt Dec 5, 2025
408d76d
simplify constructor
julienrbrt Dec 5, 2025
21e3ead
remove unused type
julienrbrt Dec 5, 2025
ab669ea
implement checkpoint for single sequencer
julienrbrt Dec 5, 2025
ea3fc6c
check against max bytes direclty
julienrbrt Dec 5, 2025
840568d
chore: update comments
julienrbrt Dec 5, 2025
cdadf80
test: fix error check
julienrbrt Dec 5, 2025
c761301
test: properly test tx precedence
julienrbrt Dec 5, 2025
7ed1ee3
feat: add based batch time
julienrbrt Dec 5, 2025
ee3a10b
cleanups
julienrbrt Dec 5, 2025
3096c5e
test: add empty da epoch test
julienrbrt Dec 8, 2025
48ca2da
align testes
julienrbrt Dec 8, 2025
ff4727a
fix da increasing
julienrbrt Dec 8, 2025
9d7601c
allow other processing
julienrbrt Dec 8, 2025
feb309b
chore: revert autodoc change
julienrbrt Dec 8, 2025
e7cd0af
proper implementation
julienrbrt Dec 8, 2025
d9f8a0b
Merge branch 'julien/persist-base' into julien/based-block-time
julienrbrt Dec 8, 2025
95c5394
Merge branch 'main' into julien/based-block-time
julienrbrt Dec 8, 2025
5bffe1d
Merge branch 'main' into julien/based-block-time
julienrbrt Dec 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 223 additions & 0 deletions block/internal/da/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,3 +523,226 @@ func TestClient_Retrieve_Timeout(t *testing.T) {
assert.Assert(t, result.Message != "")
})
}

func TestClient_RetrieveHeaders(t *testing.T) {
logger := zerolog.Nop()
dataLayerHeight := uint64(100)
mockIDs := [][]byte{[]byte("id1")}
mockBlobs := [][]byte{[]byte("header-blob")}
mockTimestamp := time.Now()

mockDAInstance := &mockDA{
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
return &coreda.GetIDsResult{
IDs: mockIDs,
Timestamp: mockTimestamp,
}, nil
},
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
return mockBlobs, nil
},
}

client := NewClient(Config{
DA: mockDAInstance,
Logger: logger,
Namespace: "test-header-ns",
DataNamespace: "test-data-ns",
})

result := client.RetrieveHeaders(context.Background(), dataLayerHeight)

assert.Equal(t, coreda.StatusSuccess, result.Code)
assert.Equal(t, dataLayerHeight, result.Height)
assert.Equal(t, len(mockBlobs), len(result.Data))
}

func TestClient_RetrieveData(t *testing.T) {
logger := zerolog.Nop()
dataLayerHeight := uint64(200)
mockIDs := [][]byte{[]byte("id1"), []byte("id2")}
mockBlobs := [][]byte{[]byte("data-blob-1"), []byte("data-blob-2")}
mockTimestamp := time.Now()

mockDAInstance := &mockDA{
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
return &coreda.GetIDsResult{
IDs: mockIDs,
Timestamp: mockTimestamp,
}, nil
},
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
return mockBlobs, nil
},
}

client := NewClient(Config{
DA: mockDAInstance,
Logger: logger,
Namespace: "test-header-ns",
DataNamespace: "test-data-ns",
})

result := client.RetrieveData(context.Background(), dataLayerHeight)

assert.Equal(t, coreda.StatusSuccess, result.Code)
assert.Equal(t, dataLayerHeight, result.Height)
assert.Equal(t, len(mockBlobs), len(result.Data))
}

func TestClient_RetrieveBatched(t *testing.T) {
logger := zerolog.Nop()
dataLayerHeight := uint64(100)

// Create 200 IDs to exceed default batch size
numIDs := 200
mockIDs := make([][]byte, numIDs)
for i := range numIDs {
mockIDs[i] = []byte{byte(i)}
}

// Track which batches were requested
batchCalls := []int{}

mockDAInstance := &mockDA{
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
return &coreda.GetIDsResult{
IDs: mockIDs,
Timestamp: time.Now(),
}, nil
},
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
batchCalls = append(batchCalls, len(ids))
// Return a blob for each ID in the batch
blobs := make([][]byte, len(ids))
for i := range ids {
blobs[i] = []byte("blob")
}
return blobs, nil
},
}

client := NewClient(Config{
DA: mockDAInstance,
Logger: logger,
Namespace: "test-ns",
DataNamespace: "test-data-ns",
RetrieveBatchSize: 50, // Set smaller batch size for testing
})

encodedNamespace := coreda.NamespaceFromString("test-ns")
result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes())

assert.Equal(t, coreda.StatusSuccess, result.Code)
assert.Equal(t, numIDs, len(result.Data))

// Should have made 4 batches: 50 + 50 + 50 + 50 = 200
assert.Equal(t, 4, len(batchCalls))
assert.Equal(t, 50, batchCalls[0])
assert.Equal(t, 50, batchCalls[1])
assert.Equal(t, 50, batchCalls[2])
assert.Equal(t, 50, batchCalls[3])
}

func TestClient_RetrieveBatched_PartialBatch(t *testing.T) {
logger := zerolog.Nop()
dataLayerHeight := uint64(100)

// Create 175 IDs to test partial batch at the end
numIDs := 175
mockIDs := make([][]byte, numIDs)
for i := range numIDs {
mockIDs[i] = []byte{byte(i)}
}

batchCalls := []int{}

mockDAInstance := &mockDA{
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
return &coreda.GetIDsResult{
IDs: mockIDs,
Timestamp: time.Now(),
}, nil
},
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
batchCalls = append(batchCalls, len(ids))
blobs := make([][]byte, len(ids))
for i := range ids {
blobs[i] = []byte("blob")
}
return blobs, nil
},
}

client := NewClient(Config{
DA: mockDAInstance,
Logger: logger,
Namespace: "test-ns",
DataNamespace: "test-data-ns",
RetrieveBatchSize: 50,
})

encodedNamespace := coreda.NamespaceFromString("test-ns")
result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes())

assert.Equal(t, coreda.StatusSuccess, result.Code)
assert.Equal(t, numIDs, len(result.Data))

// Should have made 4 batches: 50 + 50 + 50 + 25 = 175
assert.Equal(t, 4, len(batchCalls))
assert.Equal(t, 50, batchCalls[0])
assert.Equal(t, 50, batchCalls[1])
assert.Equal(t, 50, batchCalls[2])
assert.Equal(t, 25, batchCalls[3]) // Partial batch
}

func TestClient_RetrieveBatched_ErrorInSecondBatch(t *testing.T) {
logger := zerolog.Nop()
dataLayerHeight := uint64(100)

// Create 200 IDs to require multiple batches
numIDs := 200
mockIDs := make([][]byte, numIDs)
for i := range numIDs {
mockIDs[i] = []byte{byte(i)}
}

batchCallCount := 0

mockDAInstance := &mockDA{
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
return &coreda.GetIDsResult{
IDs: mockIDs,
Timestamp: time.Now(),
}, nil
},
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
batchCallCount++
// Fail on second batch
if batchCallCount == 2 {
return nil, errors.New("network error in batch 2")
}
blobs := make([][]byte, len(ids))
for i := range ids {
blobs[i] = []byte("blob")
}
return blobs, nil
},
}

client := NewClient(Config{
DA: mockDAInstance,
Logger: logger,
Namespace: "test-ns",
DataNamespace: "test-data-ns",
RetrieveBatchSize: 50,
})

encodedNamespace := coreda.NamespaceFromString("test-ns")
result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes())

assert.Equal(t, coreda.StatusError, result.Code)
assert.Assert(t, result.Message != "")
// Error message should mention the batch range
assert.Assert(t, len(result.Message) > 0)
}
6 changes: 6 additions & 0 deletions block/internal/da/forced_inclusion_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/rs/zerolog"

Expand All @@ -25,6 +26,7 @@ type ForcedInclusionRetriever struct {

// ForcedInclusionEvent contains forced inclusion transactions retrieved from DA.
type ForcedInclusionEvent struct {
Timestamp time.Time
StartDaHeight uint64
EndDaHeight uint64
Txs [][]byte
Expand Down Expand Up @@ -158,6 +160,10 @@ func (r *ForcedInclusionRetriever) processForcedInclusionBlobs(
}
}

if result.Timestamp.After(event.Timestamp) {
event.Timestamp = result.Timestamp
}

r.logger.Debug().
Uint64("height", height).
Int("blob_count", len(result.Data)).
Expand Down
2 changes: 2 additions & 0 deletions block/internal/da/forced_inclusion_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_MultiHeightEpoch(t *
assert.Assert(t, event != nil)
assert.Equal(t, event.StartDaHeight, uint64(100))
assert.Equal(t, event.EndDaHeight, uint64(102))
assert.Assert(t, event.Timestamp.After(time.Time{}))

// Should have collected all txs from all heights
expectedTxCount := len(testBlobsByHeight[100]) + len(testBlobsByHeight[101]) + len(testBlobsByHeight[102])
Expand Down Expand Up @@ -334,6 +335,7 @@ func TestForcedInclusionRetriever_processForcedInclusionBlobs(t *testing.T) {
} else {
assert.NilError(t, err)
assert.Equal(t, len(event.Txs), tt.expectedTxCount)
assert.Equal(t, event.Timestamp, time.Time{})
}
})
}
Expand Down
24 changes: 17 additions & 7 deletions sequencers/based/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type BasedSequencer struct {

// Cached transactions from the current DA block being processed
currentBatchTxs [][]byte
// DA epoch end time for timestamp calculation
currentDAEndTime time.Time
}

// NewBasedSequencer creates a new based sequencer instance
Expand Down Expand Up @@ -97,13 +99,15 @@ func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.Get
// If we have no cached transactions or we've consumed all from the current DA block,
// fetch the next DA epoch
daHeight := s.GetDAHeight()

if len(s.currentBatchTxs) == 0 || s.checkpoint.TxIndex >= uint64(len(s.currentBatchTxs)) {
daEndHeight, err := s.fetchNextDAEpoch(ctx, req.MaxBytes)
daEndTime, daEndHeight, err := s.fetchNextDAEpoch(ctx, req.MaxBytes)
if err != nil {
return nil, err
}

daHeight = daEndHeight
s.currentDAEndTime = daEndTime
}

// Create batch from current position up to MaxBytes
Expand All @@ -129,15 +133,21 @@ func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.Get
}
}

// Calculate timestamp based on remaining transactions after this batch
// timestamp correspond to the last block time of a DA epoch, based on the remaining transactions to be executed
// this is done in order to handle the case where a DA epoch must fit in multiple blocks
remainingTxs := uint64(len(s.currentBatchTxs)) - s.checkpoint.TxIndex
timestamp := s.currentDAEndTime.Add(-time.Duration(remainingTxs) * time.Millisecond)

return &coresequencer.GetNextBatchResponse{
Batch: batch,
Timestamp: time.Time{}, // TODO(@julienrbrt): we need to use DA block timestamp for determinism
Timestamp: timestamp,
BatchData: req.LastBatchData,
}, nil
}

// fetchNextDAEpoch fetches transactions from the next DA epoch
func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint64, error) {
func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (time.Time, uint64, error) {
currentDAHeight := s.checkpoint.DAHeight

s.logger.Debug().
Expand All @@ -149,16 +159,16 @@ func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64)
if err != nil {
// Check if forced inclusion is not configured
if errors.Is(err, block.ErrForceInclusionNotConfigured) {
return 0, block.ErrForceInclusionNotConfigured
return time.Time{}, 0, block.ErrForceInclusionNotConfigured
} else if errors.Is(err, coreda.ErrHeightFromFuture) {
// If we get a height from future error, stay at current position
// We'll retry the same height on the next call until DA produces that block
s.logger.Debug().
Uint64("da_height", currentDAHeight).
Msg("DA height from future, waiting for DA to produce block")
return 0, nil
return time.Time{}, 0, nil
}
return 0, fmt.Errorf("failed to retrieve forced inclusion transactions: %w", err)
return time.Time{}, 0, fmt.Errorf("failed to retrieve forced inclusion transactions: %w", err)
}

// Validate and filter transactions
Expand Down Expand Up @@ -188,7 +198,7 @@ func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64)
// Cache the transactions for this DA epoch
s.currentBatchTxs = validTxs

return forcedTxsEvent.EndDaHeight, nil
return forcedTxsEvent.Timestamp.UTC(), forcedTxsEvent.EndDaHeight, nil
}

// createBatchFromCheckpoint creates a batch from the current checkpoint position respecting MaxBytes
Expand Down
Loading
Loading