diff --git a/sequencers/single/queue.go b/sequencers/single/queue.go index a208aa109b..707fa7ad5f 100644 --- a/sequencers/single/queue.go +++ b/sequencers/single/queue.go @@ -57,23 +57,7 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e return ErrQueueFull } - hash, err := batch.Hash() - if err != nil { - return err - } - key := hex.EncodeToString(hash) - - pbBatch := &pb.Batch{ - Txs: batch.Transactions, - } - - encodedBatch, err := proto.Marshal(pbBatch) - if err != nil { - return err - } - - // First write to DB for durability - if err := bq.db.Put(ctx, ds.NewKey(key), encodedBatch); err != nil { + if err := bq.persistBatch(ctx, batch); err != nil { return err } @@ -85,11 +69,21 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e // Prepend adds a batch to the front of the queue (before head position). // This is used to return transactions that couldn't fit in the current batch. -// TODO(@julienrbrt): The batch is currently NOT persisted to the DB since these are transactions that were already in the queue or were just processed. -- FI txs are lost, this should be tackled. +// The batch is persisted to the DB to ensure durability in case of crashes. +// +// NOTE: Prepend intentionally bypasses the maxQueueSize limit to ensure high-priority +// transactions can always be re-queued. This means the effective queue size may temporarily +// exceed the configured maximum when Prepend is used. This is by design to prevent loss +// of transactions that have already been accepted but couldn't fit in the current batch. func (bq *BatchQueue) Prepend(ctx context.Context, batch coresequencer.Batch) error { bq.mu.Lock() defer bq.mu.Unlock() + if err := bq.persistBatch(ctx, batch); err != nil { + return err + } + + // Then add to in-memory queue // If we have room before head, use it if bq.head > 0 { bq.head-- @@ -187,3 +181,28 @@ func (bq *BatchQueue) Size() int { defer bq.mu.Unlock() return len(bq.queue) - bq.head } + +// persistBatch persists a batch to the datastore +func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch) error { + hash, err := batch.Hash() + if err != nil { + return err + } + key := hex.EncodeToString(hash) + + pbBatch := &pb.Batch{ + Txs: batch.Transactions, + } + + encodedBatch, err := proto.Marshal(pbBatch) + if err != nil { + return err + } + + // First write to DB for durability + if err := bq.db.Put(ctx, ds.NewKey(key), encodedBatch); err != nil { + return err + } + + return nil +} diff --git a/sequencers/single/queue_test.go b/sequencers/single/queue_test.go index b7665ee67f..39ac725cd9 100644 --- a/sequencers/single/queue_test.go +++ b/sequencers/single/queue_test.go @@ -720,4 +720,53 @@ func TestBatchQueue_Prepend(t *testing.T) { require.NoError(t, err) assert.Equal(t, []byte("tx1"), nextBatch.Transactions[0]) }) + + t.Run("prepend persistence across restarts", func(t *testing.T) { + prefix := "test-prepend-persistence" + queue := NewBatchQueue(db, prefix, 0) + err := queue.Load(ctx) + require.NoError(t, err) + + // Add some batches + batch1 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}} + batch2 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx2")}} + err = queue.AddBatch(ctx, batch1) + require.NoError(t, err) + err = queue.AddBatch(ctx, batch2) + require.NoError(t, err) + + // Consume first batch + _, err = queue.Next(ctx) + require.NoError(t, err) + + // Prepend a batch (simulating transactions that couldn't fit) + prependedBatch := coresequencer.Batch{Transactions: [][]byte{[]byte("prepended")}} + err = queue.Prepend(ctx, prependedBatch) + require.NoError(t, err) + + assert.Equal(t, 2, queue.Size()) + + // Simulate restart by creating a new queue with same prefix + queue2 := NewBatchQueue(db, prefix, 0) + err = queue2.Load(ctx) + require.NoError(t, err) + + // Should have both the prepended batch and tx2 + assert.Equal(t, 2, queue2.Size()) + + // First should be prepended batch + nextBatch, err := queue2.Next(ctx) + require.NoError(t, err) + assert.Equal(t, 1, len(nextBatch.Transactions)) + assert.Contains(t, nextBatch.Transactions, []byte("prepended")) + + // Then tx2 + nextBatch, err = queue2.Next(ctx) + require.NoError(t, err) + assert.Equal(t, 1, len(nextBatch.Transactions)) + assert.Contains(t, nextBatch.Transactions, []byte("tx2")) + + // Queue should be empty now + assert.Equal(t, 0, queue2.Size()) + }) }