Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2e769b5
refactor(sequencers): persist prepended batch
julienrbrt Dec 3, 2025
e38887b
refactor(sequencers): implement tx queue persistance for based seq
julienrbrt Dec 3, 2025
0bd6be4
ai one shot
julienrbrt Dec 4, 2025
21ad0fc
Merge branch 'main' into julien/persist-base
julienrbrt Dec 4, 2025
95e06b7
simplify
julienrbrt Dec 5, 2025
87f9da5
describe checkpoint design and generalize
julienrbrt Dec 5, 2025
6890d32
update docs
julienrbrt Dec 5, 2025
408d76d
simplify constructor
julienrbrt Dec 5, 2025
21e3ead
remove unused type
julienrbrt Dec 5, 2025
ab669ea
implement checkpoint for single sequencer
julienrbrt Dec 5, 2025
ea3fc6c
check against max bytes direclty
julienrbrt Dec 5, 2025
840568d
chore: update comments
julienrbrt Dec 5, 2025
cdadf80
test: fix error check
julienrbrt Dec 5, 2025
c761301
test: properly test tx precedence
julienrbrt Dec 5, 2025
7ed1ee3
feat: add based batch time
julienrbrt Dec 5, 2025
ee3a10b
cleanups
julienrbrt Dec 5, 2025
3096c5e
test: add empty da epoch test
julienrbrt Dec 8, 2025
48ca2da
align testes
julienrbrt Dec 8, 2025
ff4727a
fix da increasing
julienrbrt Dec 8, 2025
9d7601c
allow other processing
julienrbrt Dec 8, 2025
feb309b
chore: revert autodoc change
julienrbrt Dec 8, 2025
e7cd0af
proper implementation
julienrbrt Dec 8, 2025
d9f8a0b
Merge branch 'julien/persist-base' into julien/based-block-time
julienrbrt Dec 8, 2025
95c5394
Merge branch 'main' into julien/based-block-time
julienrbrt Dec 8, 2025
5bffe1d
Merge branch 'main' into julien/based-block-time
julienrbrt Dec 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apps/evm/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ func createSequencer(
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
}

basedSeq := based.NewBasedSequencer(fiRetriever, da, nodeConfig, genesis, logger)
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
if err != nil {
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
}

logger.Info().
Str("forced_inclusion_namespace", nodeConfig.DA.GetForcedInclusionNamespace()).
Expand Down
5 changes: 4 additions & 1 deletion apps/grpc/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ func createSequencer(
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
}

basedSeq := based.NewBasedSequencer(fiRetriever, da, nodeConfig, genesis, logger)
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
if err != nil {
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
}

logger.Info().
Str("forced_inclusion_namespace", nodeConfig.DA.GetForcedInclusionNamespace()).
Expand Down
5 changes: 4 additions & 1 deletion apps/testapp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ func createSequencer(
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
}

basedSeq := based.NewBasedSequencer(fiRetriever, da, nodeConfig, genesis, logger)
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
if err != nil {
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
}

logger.Info().
Str("forced_inclusion_namespace", nodeConfig.DA.GetForcedInclusionNamespace()).
Expand Down
223 changes: 223 additions & 0 deletions block/internal/da/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,3 +523,226 @@ func TestClient_Retrieve_Timeout(t *testing.T) {
assert.Assert(t, result.Message != "")
})
}

func TestClient_RetrieveHeaders(t *testing.T) {
logger := zerolog.Nop()
dataLayerHeight := uint64(100)
mockIDs := [][]byte{[]byte("id1")}
mockBlobs := [][]byte{[]byte("header-blob")}
mockTimestamp := time.Now()

mockDAInstance := &mockDA{
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
return &coreda.GetIDsResult{
IDs: mockIDs,
Timestamp: mockTimestamp,
}, nil
},
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
return mockBlobs, nil
},
}

client := NewClient(Config{
DA: mockDAInstance,
Logger: logger,
Namespace: "test-header-ns",
DataNamespace: "test-data-ns",
})

result := client.RetrieveHeaders(context.Background(), dataLayerHeight)

assert.Equal(t, coreda.StatusSuccess, result.Code)
assert.Equal(t, dataLayerHeight, result.Height)
assert.Equal(t, len(mockBlobs), len(result.Data))
}

func TestClient_RetrieveData(t *testing.T) {
logger := zerolog.Nop()
dataLayerHeight := uint64(200)
mockIDs := [][]byte{[]byte("id1"), []byte("id2")}
mockBlobs := [][]byte{[]byte("data-blob-1"), []byte("data-blob-2")}
mockTimestamp := time.Now()

mockDAInstance := &mockDA{
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
return &coreda.GetIDsResult{
IDs: mockIDs,
Timestamp: mockTimestamp,
}, nil
},
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
return mockBlobs, nil
},
}

client := NewClient(Config{
DA: mockDAInstance,
Logger: logger,
Namespace: "test-header-ns",
DataNamespace: "test-data-ns",
})

result := client.RetrieveData(context.Background(), dataLayerHeight)

assert.Equal(t, coreda.StatusSuccess, result.Code)
assert.Equal(t, dataLayerHeight, result.Height)
assert.Equal(t, len(mockBlobs), len(result.Data))
}

func TestClient_RetrieveBatched(t *testing.T) {
logger := zerolog.Nop()
dataLayerHeight := uint64(100)

// Create 200 IDs to exceed default batch size
numIDs := 200
mockIDs := make([][]byte, numIDs)
for i := range numIDs {
mockIDs[i] = []byte{byte(i)}
}

// Track which batches were requested
batchCalls := []int{}

mockDAInstance := &mockDA{
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
return &coreda.GetIDsResult{
IDs: mockIDs,
Timestamp: time.Now(),
}, nil
},
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
batchCalls = append(batchCalls, len(ids))
// Return a blob for each ID in the batch
blobs := make([][]byte, len(ids))
for i := range ids {
blobs[i] = []byte("blob")
}
return blobs, nil
},
}

client := NewClient(Config{
DA: mockDAInstance,
Logger: logger,
Namespace: "test-ns",
DataNamespace: "test-data-ns",
RetrieveBatchSize: 50, // Set smaller batch size for testing
})

encodedNamespace := coreda.NamespaceFromString("test-ns")
result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes())

assert.Equal(t, coreda.StatusSuccess, result.Code)
assert.Equal(t, numIDs, len(result.Data))

// Should have made 4 batches: 50 + 50 + 50 + 50 = 200
assert.Equal(t, 4, len(batchCalls))
assert.Equal(t, 50, batchCalls[0])
assert.Equal(t, 50, batchCalls[1])
assert.Equal(t, 50, batchCalls[2])
assert.Equal(t, 50, batchCalls[3])
}

func TestClient_RetrieveBatched_PartialBatch(t *testing.T) {
logger := zerolog.Nop()
dataLayerHeight := uint64(100)

// Create 175 IDs to test partial batch at the end
numIDs := 175
mockIDs := make([][]byte, numIDs)
for i := range numIDs {
mockIDs[i] = []byte{byte(i)}
}

batchCalls := []int{}

mockDAInstance := &mockDA{
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
return &coreda.GetIDsResult{
IDs: mockIDs,
Timestamp: time.Now(),
}, nil
},
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
batchCalls = append(batchCalls, len(ids))
blobs := make([][]byte, len(ids))
for i := range ids {
blobs[i] = []byte("blob")
}
return blobs, nil
},
}

client := NewClient(Config{
DA: mockDAInstance,
Logger: logger,
Namespace: "test-ns",
DataNamespace: "test-data-ns",
RetrieveBatchSize: 50,
})

encodedNamespace := coreda.NamespaceFromString("test-ns")
result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes())

assert.Equal(t, coreda.StatusSuccess, result.Code)
assert.Equal(t, numIDs, len(result.Data))

// Should have made 4 batches: 50 + 50 + 50 + 25 = 175
assert.Equal(t, 4, len(batchCalls))
assert.Equal(t, 50, batchCalls[0])
assert.Equal(t, 50, batchCalls[1])
assert.Equal(t, 50, batchCalls[2])
assert.Equal(t, 25, batchCalls[3]) // Partial batch
}

func TestClient_RetrieveBatched_ErrorInSecondBatch(t *testing.T) {
logger := zerolog.Nop()
dataLayerHeight := uint64(100)

// Create 200 IDs to require multiple batches
numIDs := 200
mockIDs := make([][]byte, numIDs)
for i := range numIDs {
mockIDs[i] = []byte{byte(i)}
}

batchCallCount := 0

mockDAInstance := &mockDA{
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
return &coreda.GetIDsResult{
IDs: mockIDs,
Timestamp: time.Now(),
}, nil
},
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
batchCallCount++
// Fail on second batch
if batchCallCount == 2 {
return nil, errors.New("network error in batch 2")
}
blobs := make([][]byte, len(ids))
for i := range ids {
blobs[i] = []byte("blob")
}
return blobs, nil
},
}

client := NewClient(Config{
DA: mockDAInstance,
Logger: logger,
Namespace: "test-ns",
DataNamespace: "test-data-ns",
RetrieveBatchSize: 50,
})

encodedNamespace := coreda.NamespaceFromString("test-ns")
result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes())

assert.Equal(t, coreda.StatusError, result.Code)
assert.Assert(t, result.Message != "")
// Error message should mention the batch range
assert.Assert(t, len(result.Message) > 0)
}
6 changes: 6 additions & 0 deletions block/internal/da/forced_inclusion_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/rs/zerolog"

Expand All @@ -25,6 +26,7 @@ type ForcedInclusionRetriever struct {

// ForcedInclusionEvent contains forced inclusion transactions retrieved from DA.
type ForcedInclusionEvent struct {
Timestamp time.Time
StartDaHeight uint64
EndDaHeight uint64
Txs [][]byte
Expand Down Expand Up @@ -158,6 +160,10 @@ func (r *ForcedInclusionRetriever) processForcedInclusionBlobs(
}
}

if result.Timestamp.After(event.Timestamp) {
event.Timestamp = result.Timestamp
}

r.logger.Debug().
Uint64("height", height).
Int("blob_count", len(result.Data)).
Expand Down
2 changes: 2 additions & 0 deletions block/internal/da/forced_inclusion_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_MultiHeightEpoch(t *
assert.Assert(t, event != nil)
assert.Equal(t, event.StartDaHeight, uint64(100))
assert.Equal(t, event.EndDaHeight, uint64(102))
assert.Assert(t, event.Timestamp.After(time.Time{}))

// Should have collected all txs from all heights
expectedTxCount := len(testBlobsByHeight[100]) + len(testBlobsByHeight[101]) + len(testBlobsByHeight[102])
Expand Down Expand Up @@ -334,6 +335,7 @@ func TestForcedInclusionRetriever_processForcedInclusionBlobs(t *testing.T) {
} else {
assert.NilError(t, err)
assert.Equal(t, len(event.Txs), tt.expectedTxCount)
assert.Equal(t, event.Timestamp, time.Time{})
}
})
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/blob/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
This package is a **trimmed copy** of code from `celestia-node` to stay JSON-compatible with the blob RPC without importing the full Cosmos/Celestia dependency set.

## Upstream source

- `blob.go` comes from `celestia-node/blob/blob.go` @ tag `v0.28.4` (release v0.28.4), with unused pieces removed (blob v1, proof helpers, share length calc, appconsts dependency, etc.).
- `submit_options.go` mirrors the exported JSON fields of `celestia-node/state/tx_config.go` @ the same tag, leaving out functional options, defaults, and Cosmos keyring helpers.

## Why copy instead of import?

- Avoids pulling Cosmos SDK / celestia-app dependencies into ev-node for the small surface we need (blob JSON and commitment for v0).
- Keeps binary size and module graph smaller while remaining wire-compatible with celestia-node's blob service.

## Keeping it in sync

- When celestia-node changes blob JSON or tx config fields, update this package manually:
1. `diff -u pkg/blob/blob.go ../Celestia/celestia-node/blob/blob.go`
2. `diff -u pkg/blob/submit_options.go ../Celestia/celestia-node/state/tx_config.go`
Expand Down
8 changes: 8 additions & 0 deletions proto/evnode/v1/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,11 @@ message State {

reserved 7;
}

// SequencerDACheckpoint tracks the position in the DA where transactions were last processed
message SequencerDACheckpoint {
// DA block height being processed
uint64 da_height = 1;
// Index of the next transaction to process within the DA block's forced inclusion batch
uint64 tx_index = 2;
}
Loading
Loading