Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion sequencers/single/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,32 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e

// Prepend adds a batch to the front of the queue (before head position).
// This is used to return transactions that couldn't fit in the current batch.
// TODO(@julienrbrt): The batch is currently NOT persisted to the DB since these are transactions that were already in the queue or were just processed. -- FI txs are lost, this should be tackled.
// The batch is persisted to the DB to ensure durability in case of crashes.
func (bq *BatchQueue) Prepend(ctx context.Context, batch coresequencer.Batch) error {
bq.mu.Lock()
defer bq.mu.Unlock()

hash, err := batch.Hash()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prepend does not check for max queue size. It makes sense for me to grow beyond the limit to get the high priority TX included. Nevertheless, some doc would be good to manage expectations

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil {
return err
}
key := hex.EncodeToString(hash)

pbBatch := &pb.Batch{
Txs: batch.Transactions,
}

encodedBatch, err := proto.Marshal(pbBatch)
if err != nil {
return err
}

// First write to DB for durability
if err := bq.db.Put(ctx, ds.NewKey(key), encodedBatch); err != nil {
return err
}

// Then add to in-memory queue
// If we have room before head, use it
if bq.head > 0 {
bq.head--
Expand Down
49 changes: 49 additions & 0 deletions sequencers/single/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,4 +720,53 @@ func TestBatchQueue_Prepend(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, []byte("tx1"), nextBatch.Transactions[0])
})

t.Run("prepend persistence across restarts", func(t *testing.T) {
prefix := "test-prepend-persistence"
queue := NewBatchQueue(db, prefix, 0)
err := queue.Load(ctx)
require.NoError(t, err)

// Add some batches
batch1 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}}
batch2 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx2")}}
err = queue.AddBatch(ctx, batch1)
require.NoError(t, err)
err = queue.AddBatch(ctx, batch2)
require.NoError(t, err)

// Consume first batch
_, err = queue.Next(ctx)
require.NoError(t, err)

// Prepend a batch (simulating transactions that couldn't fit)
prependedBatch := coresequencer.Batch{Transactions: [][]byte{[]byte("prepended")}}
err = queue.Prepend(ctx, prependedBatch)
require.NoError(t, err)

assert.Equal(t, 2, queue.Size())

// Simulate restart by creating a new queue with same prefix
queue2 := NewBatchQueue(db, prefix, 0)
err = queue2.Load(ctx)
require.NoError(t, err)

// Should have both the prepended batch and tx2
assert.Equal(t, 2, queue2.Size())

// First should be prepended batch
nextBatch, err := queue2.Next(ctx)
require.NoError(t, err)
assert.Equal(t, 1, len(nextBatch.Transactions))
assert.Equal(t, []byte("prepended"), nextBatch.Transactions[0])

// Then tx2
nextBatch, err = queue2.Next(ctx)
require.NoError(t, err)
assert.Equal(t, 1, len(nextBatch.Transactions))
assert.Equal(t, []byte("tx2"), nextBatch.Transactions[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This part of the test assumes a specific order of batches after Load is called. However, Load retrieves batches from the datastore based on the lexicographical order of their keys (hashes), which doesn't correspond to the order they were added or prepended. The test might pass with the current data ("prepended" vs "tx2"), but it's brittle and could fail with different test data if the hash order changes.

This test's assumption of order masks a conceptual issue with Prepend's persistence where the prepended order is not guaranteed after a restart. If the ordering issue is addressed, this test will be valid. If not, the test should be changed to only verify the presence of batches, not their order, to avoid being flaky.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good one! This makes sense. I think we do want to preserve ordering, especially for force included transactions. The order must be deterministic between all based sequencers. However, this is the single sequencer, so it doesn't really matter. This is why the queue implementation only uses hash I suppose. I'll update the test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Queue should be empty now
assert.Equal(t, 0, queue2.Size())
})
}
Loading