Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion sequencers/single/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,32 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e

// Prepend adds a batch to the front of the queue (before head position).
// This is used to return transactions that couldn't fit in the current batch.
// TODO(@julienrbrt): The batch is currently NOT persisted to the DB since these are transactions that were already in the queue or were just processed. -- FI txs are lost, this should be tackled.
// The batch is persisted to the DB to ensure durability in case of crashes.
func (bq *BatchQueue) Prepend(ctx context.Context, batch coresequencer.Batch) error {
bq.mu.Lock()
defer bq.mu.Unlock()

hash, err := batch.Hash()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prepend does not check for max queue size. It makes sense for me to grow beyond the limit to get the high priority TX included. Nevertheless, some doc would be good to manage expectations

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil {
return err
}
key := hex.EncodeToString(hash)

pbBatch := &pb.Batch{
Txs: batch.Transactions,
}

encodedBatch, err := proto.Marshal(pbBatch)
if err != nil {
return err
}

// First write to DB for durability
if err := bq.db.Put(ctx, ds.NewKey(key), encodedBatch); err != nil {
return err
}

// Then add to in-memory queue
// If we have room before head, use it
if bq.head > 0 {
bq.head--
Expand Down
49 changes: 49 additions & 0 deletions sequencers/single/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,4 +720,53 @@ func TestBatchQueue_Prepend(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, []byte("tx1"), nextBatch.Transactions[0])
})

t.Run("prepend persistence across restarts", func(t *testing.T) {
prefix := "test-prepend-persistence"
queue := NewBatchQueue(db, prefix, 0)
err := queue.Load(ctx)
require.NoError(t, err)

// Add some batches
batch1 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}}
batch2 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx2")}}
err = queue.AddBatch(ctx, batch1)
require.NoError(t, err)
err = queue.AddBatch(ctx, batch2)
require.NoError(t, err)

// Consume first batch
_, err = queue.Next(ctx)
require.NoError(t, err)

// Prepend a batch (simulating transactions that couldn't fit)
prependedBatch := coresequencer.Batch{Transactions: [][]byte{[]byte("prepended")}}
err = queue.Prepend(ctx, prependedBatch)
require.NoError(t, err)

assert.Equal(t, 2, queue.Size())

// Simulate restart by creating a new queue with same prefix
queue2 := NewBatchQueue(db, prefix, 0)
err = queue2.Load(ctx)
require.NoError(t, err)

// Should have both the prepended batch and tx2
assert.Equal(t, 2, queue2.Size())

// First should be prepended batch
nextBatch, err := queue2.Next(ctx)
require.NoError(t, err)
assert.Equal(t, 1, len(nextBatch.Transactions))
assert.Contains(t, nextBatch.Transactions, []byte("prepended"))

// Then tx2
nextBatch, err = queue2.Next(ctx)
require.NoError(t, err)
assert.Equal(t, 1, len(nextBatch.Transactions))
assert.Contains(t, nextBatch.Transactions, []byte("tx2"))

// Queue should be empty now
assert.Equal(t, 0, queue2.Size())
})
}
Loading