Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 37 additions & 18 deletions sequencers/single/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,7 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e
return ErrQueueFull
}

hash, err := batch.Hash()
if err != nil {
return err
}
key := hex.EncodeToString(hash)

pbBatch := &pb.Batch{
Txs: batch.Transactions,
}

encodedBatch, err := proto.Marshal(pbBatch)
if err != nil {
return err
}

// First write to DB for durability
if err := bq.db.Put(ctx, ds.NewKey(key), encodedBatch); err != nil {
if err := bq.persistBatch(ctx, batch); err != nil {
return err
}

Expand All @@ -85,11 +69,21 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e

// Prepend adds a batch to the front of the queue (before head position).
// This is used to return transactions that couldn't fit in the current batch.
// TODO(@julienrbrt): The batch is currently NOT persisted to the DB since these are transactions that were already in the queue or were just processed. -- FI txs are lost, this should be tackled.
// The batch is persisted to the DB to ensure durability in case of crashes.
//
// NOTE: Prepend intentionally bypasses the maxQueueSize limit to ensure high-priority
// transactions can always be re-queued. This means the effective queue size may temporarily
// exceed the configured maximum when Prepend is used. This is by design to prevent loss
// of transactions that have already been accepted but couldn't fit in the current batch.
func (bq *BatchQueue) Prepend(ctx context.Context, batch coresequencer.Batch) error {
bq.mu.Lock()
defer bq.mu.Unlock()

if err := bq.persistBatch(ctx, batch); err != nil {
return err
}

// Then add to in-memory queue
// If we have room before head, use it
if bq.head > 0 {
bq.head--
Expand Down Expand Up @@ -187,3 +181,28 @@ func (bq *BatchQueue) Size() int {
defer bq.mu.Unlock()
return len(bq.queue) - bq.head
}

// persistBatch persists a batch to the datastore
func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch) error {
hash, err := batch.Hash()
if err != nil {
return err
}
key := hex.EncodeToString(hash)

pbBatch := &pb.Batch{
Txs: batch.Transactions,
}

encodedBatch, err := proto.Marshal(pbBatch)
if err != nil {
return err
}

// First write to DB for durability
if err := bq.db.Put(ctx, ds.NewKey(key), encodedBatch); err != nil {
return err
}

return nil
}
49 changes: 49 additions & 0 deletions sequencers/single/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,4 +720,53 @@ func TestBatchQueue_Prepend(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, []byte("tx1"), nextBatch.Transactions[0])
})

t.Run("prepend persistence across restarts", func(t *testing.T) {
prefix := "test-prepend-persistence"
queue := NewBatchQueue(db, prefix, 0)
err := queue.Load(ctx)
require.NoError(t, err)

// Add some batches
batch1 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}}
batch2 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx2")}}
err = queue.AddBatch(ctx, batch1)
require.NoError(t, err)
err = queue.AddBatch(ctx, batch2)
require.NoError(t, err)

// Consume first batch
_, err = queue.Next(ctx)
require.NoError(t, err)

// Prepend a batch (simulating transactions that couldn't fit)
prependedBatch := coresequencer.Batch{Transactions: [][]byte{[]byte("prepended")}}
err = queue.Prepend(ctx, prependedBatch)
require.NoError(t, err)

assert.Equal(t, 2, queue.Size())

// Simulate restart by creating a new queue with same prefix
queue2 := NewBatchQueue(db, prefix, 0)
err = queue2.Load(ctx)
require.NoError(t, err)

// Should have both the prepended batch and tx2
assert.Equal(t, 2, queue2.Size())

// First should be prepended batch
nextBatch, err := queue2.Next(ctx)
require.NoError(t, err)
assert.Equal(t, 1, len(nextBatch.Transactions))
assert.Contains(t, nextBatch.Transactions, []byte("prepended"))

// Then tx2
nextBatch, err = queue2.Next(ctx)
require.NoError(t, err)
assert.Equal(t, 1, len(nextBatch.Transactions))
assert.Contains(t, nextBatch.Transactions, []byte("tx2"))

// Queue should be empty now
assert.Equal(t, 0, queue2.Size())
})
}
Loading