diff --git a/op-node/rollup/derive/attributes_queue.go b/op-node/rollup/derive/attributes_queue.go index cec93477a3205..f2e764b0c3cb0 100644 --- a/op-node/rollup/derive/attributes_queue.go +++ b/op-node/rollup/derive/attributes_queue.go @@ -22,58 +22,60 @@ import ( // This stage can be reset by clearing it's batch buffer. // This stage does not need to retain any references to L1 blocks. -type AttributesQueueOutput interface { - AddSafeAttributes(attributes *eth.PayloadAttributes) - SafeL2Head() eth.L2BlockRef - StageProgress -} - type AttributesQueue struct { - log log.Logger - config *rollup.Config - dl L1ReceiptsFetcher - next AttributesQueueOutput - progress Progress - batches []*BatchData + log log.Logger + config *rollup.Config + dl L1ReceiptsFetcher + prev *BatchQueue + batch *BatchData } -func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, next AttributesQueueOutput) *AttributesQueue { +func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, prev *BatchQueue) *AttributesQueue { return &AttributesQueue{ log: log, config: cfg, dl: l1Fetcher, - next: next, + prev: prev, } } -func (aq *AttributesQueue) AddBatch(batch *BatchData) { - aq.log.Debug("Received next batch", "batch_epoch", batch.EpochNum, "batch_timestamp", batch.Timestamp, "tx_count", len(batch.Transactions)) - aq.batches = append(aq.batches, batch) +func (aq *AttributesQueue) Origin() eth.L1BlockRef { + return aq.prev.Origin() } -func (aq *AttributesQueue) Progress() Progress { - return aq.progress -} - -func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error { - if changed, err := aq.progress.Update(outer); err != nil || changed { - return err +func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) { + // Get a batch if we need it + if aq.batch == nil { + batch, err := aq.prev.NextBatch(ctx, l2SafeHead) + if err != nil { + return nil, err + } + aq.batch = batch } - if len(aq.batches) == 0 { - return io.EOF + + // Actually generate the next attributes + if attrs, err := aq.createNextAttributes(ctx, aq.batch, l2SafeHead); err != nil { + return nil, err + } else { + // Clear out the local state once we will succeed + aq.batch = nil + return attrs, nil } - batch := aq.batches[0] - safeL2Head := aq.next.SafeL2Head() +} + +// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions +// to the attributes transaction list +func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) { // sanity check parent hash - if batch.ParentHash != safeL2Head.Hash { - return NewCriticalError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, safeL2Head.Hash)) + if batch.ParentHash != l2SafeHead.Hash { + return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash)) } fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second) defer cancel() - attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, safeL2Head, batch.Timestamp, batch.Epoch()) + attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, l2SafeHead, batch.Timestamp, batch.Epoch()) if err != nil { - return err + return nil, err } // we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool @@ -83,19 +85,9 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error { aq.log.Info("generated attributes in payload queue", "txs", len(attrs.Transactions), "timestamp", batch.Timestamp) - // Slice off the batch once we are guaranteed to succeed - aq.batches = aq.batches[1:] - - aq.next.AddSafeAttributes(attrs) - return nil + return attrs, nil } -func (aq *AttributesQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - aq.batches = aq.batches[:0] - aq.progress = aq.next.Progress() +func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error { return io.EOF } - -func (aq *AttributesQueue) SafeL2Head() eth.L2BlockRef { - return aq.next.SafeL2Head() -} diff --git a/op-node/rollup/derive/attributes_queue_test.go b/op-node/rollup/derive/attributes_queue_test.go index 226cfa82fd701..c5784f59f1715 100644 --- a/op-node/rollup/derive/attributes_queue_test.go +++ b/op-node/rollup/derive/attributes_queue_test.go @@ -2,7 +2,6 @@ package derive import ( "context" - "io" "math/big" "math/rand" "testing" @@ -17,29 +16,10 @@ import ( "github.com/ethereum/go-ethereum/log" ) -type MockAttributesQueueOutput struct { - MockOriginStage -} - -func (m *MockAttributesQueueOutput) AddSafeAttributes(attributes *eth.PayloadAttributes) { - m.Mock.MethodCalled("AddSafeAttributes", attributes) -} - -func (m *MockAttributesQueueOutput) ExpectAddSafeAttributes(attributes *eth.PayloadAttributes) { - m.Mock.On("AddSafeAttributes", attributes).Once().Return() -} - -func (m *MockAttributesQueueOutput) SafeL2Head() eth.L2BlockRef { - return m.Mock.MethodCalled("SafeL2Head").Get(0).(eth.L2BlockRef) -} - -func (m *MockAttributesQueueOutput) ExpectSafeL2Head(head eth.L2BlockRef) { - m.Mock.On("SafeL2Head").Once().Return(head) -} - -var _ AttributesQueueOutput = (*MockAttributesQueueOutput)(nil) - -func TestAttributesQueue_Step(t *testing.T) { +// TestAttributesQueue checks that it properly uses the PreparePayloadAttributes function +// (which is well tested) and that it properly sets NoTxPool and adds in the candidate +// transactions. +func TestAttributesQueue(t *testing.T) { // test config, only init the necessary fields cfg := &rollup.Config{ BlockTime: 2, @@ -56,18 +36,9 @@ func TestAttributesQueue_Step(t *testing.T) { l1Fetcher.ExpectInfoByHash(l1Info.InfoHash, l1Info, nil) - out := &MockAttributesQueueOutput{} - out.progress = Progress{ - Origin: l1Info.BlockRef(), - Closed: false, - } - defer out.AssertExpectations(t) - safeHead := testutils.RandomL2BlockRef(rng) safeHead.L1Origin = l1Info.ID() - out.ExpectSafeL2Head(safeHead) - batch := &BatchData{BatchV1{ ParentHash: safeHead.Hash, EpochNum: rollup.Epoch(l1Info.InfoNum), @@ -85,13 +56,11 @@ func TestAttributesQueue_Step(t *testing.T) { Transactions: []eth.Data{l1InfoTx, eth.Data("foobar"), eth.Data("example")}, NoTxPool: true, } - out.ExpectAddSafeAttributes(&attrs) - aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, out) - require.NoError(t, RepeatResetStep(t, aq.ResetStep, l1Fetcher, 1)) + aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, nil) - aq.AddBatch(batch) + actual, err := aq.createNextAttributes(context.Background(), batch, safeHead) - require.NoError(t, aq.Step(context.Background(), out.progress), "adding batch to next stage, no EOF yet") - require.Equal(t, io.EOF, aq.Step(context.Background(), out.progress), "done with batches") + require.Nil(t, err) + require.Equal(t, attrs, *actual) } diff --git a/op-node/rollup/derive/batch_queue.go b/op-node/rollup/derive/batch_queue.go index 44afedb9d42a6..5e5dd5496ff62 100644 --- a/op-node/rollup/derive/batch_queue.go +++ b/op-node/rollup/derive/batch_queue.go @@ -40,11 +40,10 @@ type NextBatchProvider interface { // BatchQueue contains a set of batches for every L1 block. // L1 blocks are contiguous and this does not support reorgs. type BatchQueue struct { - log log.Logger - config *rollup.Config - next BatchQueueOutput - prev NextBatchProvider - progress Progress + log log.Logger + config *rollup.Config + prev NextBatchProvider + origin eth.L1BlockRef l1Blocks []eth.L1BlockRef @@ -53,102 +52,91 @@ type BatchQueue struct { } // NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use. -func NewBatchQueue(log log.Logger, cfg *rollup.Config, next BatchQueueOutput, prev NextBatchProvider) *BatchQueue { +func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider) *BatchQueue { return &BatchQueue{ log: log, config: cfg, - next: next, prev: prev, } } -func (bq *BatchQueue) Progress() Progress { - return bq.progress +func (bq *BatchQueue) Origin() eth.L1BlockRef { + return bq.prev.Origin() } -func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error { - - originBehind := bq.progress.Origin.Number < bq.next.SafeL2Head().L1Origin.Number +func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) { + originBehind := bq.origin.Number < safeL2Head.L1Origin.Number // Advance origin if needed // Note: The entire pipeline has the same origin // We just don't accept batches prior to the L1 origin of the L2 safe head - if bq.progress.Origin != bq.prev.Origin() { - bq.progress.Closed = false - bq.progress.Origin = bq.prev.Origin() + if bq.origin != bq.prev.Origin() { + bq.origin = bq.prev.Origin() if !originBehind { - bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin) - } - bq.log.Info("Advancing bq origin", "origin", bq.progress.Origin) - return nil - } - if !bq.progress.Closed { - if batch, err := bq.prev.NextBatch(ctx); err == io.EOF { - bq.log.Info("Closing batch queue origin") - bq.progress.Closed = true - return nil - } else if err != nil { - return err + bq.l1Blocks = append(bq.l1Blocks, bq.origin) } else { - bq.log.Info("have batch") - if !originBehind { - bq.AddBatch(batch) - } else { - bq.log.Warn("Skipping old batch") - } + // This is to handle the special case of startup. At startup we call Reset & include + // the L1 origin. That is the only time where immediately after `Reset` is called + // originBehind is false. + bq.l1Blocks = bq.l1Blocks[:0] } + bq.log.Info("Advancing bq origin", "origin", bq.origin) + } + + // Load more data into the batch queue + outOfData := false + if batch, err := bq.prev.NextBatch(ctx); err == io.EOF { + outOfData = true + } else if err != nil { + return nil, err + } else if !originBehind { + bq.AddBatch(batch, safeL2Head) } - // Skip adding batches / blocks to the internal state until they are from the same L1 origin - // as the current safe head. + // Skip adding data unless we are up to date with the origin, but do fully + // empty the previous stages if originBehind { - if bq.progress.Closed { - return io.EOF + if outOfData { + return nil, io.EOF } else { - // Immediately close the stage - bq.progress.Closed = true - return nil + return nil, NotEnoughData } } - batch, err := bq.deriveNextBatch(ctx) - if err == io.EOF { - bq.log.Info("no more batches in deriveNextBatch") - if bq.progress.Closed { - return io.EOF - } else { - return nil - } + // Finally attempt to derive more batches + batch, err := bq.deriveNextBatch(ctx, outOfData, safeL2Head) + if err == io.EOF && outOfData { + return nil, io.EOF + } else if err == io.EOF { + return nil, NotEnoughData } else if err != nil { - return err + return nil, err } - bq.next.AddBatch(batch) - return nil + return batch, nil } -func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { +func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef) error { // Copy over the Origin from the next stage // It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress - bq.progress = bq.next.Progress() + bq.origin = base bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock) // Include the new origin as an origin to build on + // Note: This is only for the initialization case. During normal resets we will later + // throw out this block. bq.l1Blocks = bq.l1Blocks[:0] - bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin) + bq.l1Blocks = append(bq.l1Blocks, base) return io.EOF } -func (bq *BatchQueue) AddBatch(batch *BatchData) { - if bq.progress.Closed { - panic("write batch while closed") - } +func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) { if len(bq.l1Blocks) == 0 { panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp)) } data := BatchWithL1InclusionBlock{ - L1InclusionBlock: bq.progress.Origin, + L1InclusionBlock: bq.origin, Batch: batch, } - validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, bq.next.SafeL2Head(), &data) + validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data) if validity == BatchDrop { return // if we do drop the batch, CheckBatch will log the drop reason with WARN level. } @@ -159,12 +147,11 @@ func (bq *BatchQueue) AddBatch(batch *BatchData) { // following the validity rules imposed on consecutive batches, // based on currently available buffered batch and L1 origin information. // If no batch can be derived yet, then (nil, io.EOF) is returned. -func (bq *BatchQueue) deriveNextBatch(ctx context.Context) (*BatchData, error) { +func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) { if len(bq.l1Blocks) == 0 { return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared")) } epoch := bq.l1Blocks[0] - l2SafeHead := bq.next.SafeL2Head() if l2SafeHead.L1Origin != epoch.ID() { return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head %s", epoch, l2SafeHead)) @@ -229,8 +216,8 @@ batchLoop: // i.e. if the sequence window expired, we create empty batches expiryEpoch := epoch.Number + bq.config.SeqWindowSize forceNextEpoch := - (expiryEpoch == bq.progress.Origin.Number && bq.progress.Closed) || - expiryEpoch < bq.progress.Origin.Number + (expiryEpoch == bq.origin.Number && outOfData) || + expiryEpoch < bq.origin.Number if !forceNextEpoch { // sequence window did not expire yet, still room to receive batches for the current epoch, diff --git a/op-node/rollup/derive/batch_queue_test.go b/op-node/rollup/derive/batch_queue_test.go index d766f7345bb8b..b921ff1318aaa 100644 --- a/op-node/rollup/derive/batch_queue_test.go +++ b/op-node/rollup/derive/batch_queue_test.go @@ -7,8 +7,6 @@ import ( "math/rand" "testing" - "github.com/stretchr/testify/require" - "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/testlog" @@ -16,44 +14,28 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" ) -// fakeBatchQueueOutput fakes the next stage (receive only) for the batch queue -// It tracks the progress state of the next stage. -// Upon receiving a batch, relevant characteristic of safeL2Head are immediately advanced. -type fakeBatchQueueOutput struct { - progress Progress - batches []*BatchData - safeL2Head eth.L2BlockRef -} - -var _ BatchQueueOutput = (*fakeBatchQueueOutput)(nil) - -func (f *fakeBatchQueueOutput) AddBatch(batch *BatchData) { - f.batches = append(f.batches, batch) - if batch.ParentHash != f.safeL2Head.Hash { - panic("batch has wrong parent hash") - } - newEpoch := f.safeL2Head.L1Origin.Hash != batch.EpochHash - // Advance SafeL2Head - f.safeL2Head.Time = batch.Timestamp - f.safeL2Head.L1Origin.Number = uint64(batch.EpochNum) - f.safeL2Head.L1Origin.Hash = batch.EpochHash - if newEpoch { - f.safeL2Head.SequenceNumber = 0 - } else { - f.safeL2Head.SequenceNumber += 1 - } - f.safeL2Head.ParentHash = batch.ParentHash - f.safeL2Head.Hash = mockHash(batch.Timestamp, 2) +type fakeBatchQueueInput struct { + i int + batches []*BatchData + errors []error + origin eth.L1BlockRef } -func (f *fakeBatchQueueOutput) SafeL2Head() eth.L2BlockRef { - return f.safeL2Head +func (f *fakeBatchQueueInput) Origin() eth.L1BlockRef { + return f.origin } -func (f *fakeBatchQueueOutput) Progress() Progress { - return f.progress +func (f *fakeBatchQueueInput) NextBatch(ctx context.Context) (*BatchData, error) { + if f.i >= len(f.batches) { + return nil, io.EOF + } + b := f.batches[f.i] + e := f.errors[f.i] + f.i += 1 + return b, e } func mockHash(time uint64, layer uint8) common.Hash { @@ -62,7 +44,6 @@ func mockHash(time uint64, layer uint8) common.Hash { return hash } -// nolint - will be used in next PR when the t.Skip goes away func b(timestamp uint64, epoch eth.L1BlockRef) *BatchData { rng := rand.New(rand.NewSource(int64(timestamp))) data := testutils.RandomData(rng, 20) @@ -91,23 +72,18 @@ func L1Chain(l1Times []uint64) []eth.L1BlockRef { return out } +// TestBatchQueueEager adds a bunch of contiguous batches and asserts that +// enough calls to `NextBatch` return all of those batches. func TestBatchQueueEager(t *testing.T) { - t.Skip("want to migrate the test suite at once") - log := testlog.Logger(t, log.LvlTrace) + log := testlog.Logger(t, log.LvlCrit) l1 := L1Chain([]uint64{10, 20, 30}) - next := &fakeBatchQueueOutput{ - safeL2Head: eth.L2BlockRef{ - Hash: mockHash(10, 2), - Number: 0, - ParentHash: common.Hash{}, - Time: 10, - L1Origin: l1[0].ID(), - SequenceNumber: 0, - }, - progress: Progress{ - Origin: l1[0], - Closed: false, - }, + safeHead := eth.L2BlockRef{ + Hash: mockHash(10, 2), + Number: 0, + ParentHash: common.Hash{}, + Time: 10, + L1Origin: l1[0].ID(), + SequenceNumber: 0, } cfg := &rollup.Config{ Genesis: rollup.Genesis{ @@ -118,133 +94,44 @@ func TestBatchQueueEager(t *testing.T) { SeqWindowSize: 30, } - bq := NewBatchQueue(log, cfg, next, nil) - require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step") - - // We start with an open L1 origin as progress in the first step - progress := bq.progress - require.Equal(t, bq.progress.Closed, false) + batches := []*BatchData{b(12, l1[0]), b(14, l1[0]), b(16, l1[0]), b(18, l1[0]), b(20, l1[0]), b(22, l1[0]), b(24, l1[1]), nil} + errors := []error{nil, nil, nil, nil, nil, nil, nil, io.EOF} - // Add batches - batches := []*BatchData{b(12, l1[0]), b(14, l1[0])} - for _, batch := range batches { - bq.AddBatch(batch) - } - // Step - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - - // Verify Output - require.Equal(t, batches, next.batches) -} - -func TestBatchQueueFull(t *testing.T) { - t.Skip("want to migrate the test suite at once") - - log := testlog.Logger(t, log.LvlTrace) - l1 := L1Chain([]uint64{10, 15, 20}) - next := &fakeBatchQueueOutput{ - safeL2Head: eth.L2BlockRef{ - Hash: mockHash(10, 2), - Number: 0, - ParentHash: common.Hash{}, - Time: 10, - L1Origin: l1[0].ID(), - SequenceNumber: 0, - }, - progress: Progress{ - Origin: l1[0], - Closed: false, - }, - } - cfg := &rollup.Config{ - Genesis: rollup.Genesis{ - L2Time: 10, - }, - BlockTime: 2, - MaxSequencerDrift: 600, - SeqWindowSize: 2, + input := &fakeBatchQueueInput{ + batches: batches, + errors: errors, + origin: l1[0], } - bq := NewBatchQueue(log, cfg, next, nil) - require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step") - - // We start with an open L1 origin as progress in the first step - progress := bq.progress - require.Equal(t, bq.progress.Closed, false) - - // Add batches - batches := []*BatchData{b(14, l1[0]), b(16, l1[0]), b(18, l1[1])} - for _, batch := range batches { - bq.AddBatch(batch) + bq := NewBatchQueue(log, cfg, input) + _ = bq.Reset(context.Background(), l1[0]) + // Advance the origin + input.origin = l1[1] + + for i := 0; i < len(batches); i++ { + b, e := bq.NextBatch(context.Background(), safeHead) + require.ErrorIs(t, e, errors[i]) + require.Equal(t, batches[i], b) + + if b != nil { + safeHead.Number += 1 + safeHead.Time += 2 + safeHead.Hash = mockHash(b.Timestamp, 2) + safeHead.L1Origin = b.Epoch() + } } - // Missing first batch - err := bq.Step(context.Background(), progress) - require.Equal(t, err, io.EOF) - - // Close previous to close bq - progress.Closed = true - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, true) - - // Open previous to open bq with the new inclusion block - progress.Closed = false - progress.Origin = l1[1] - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, false) - - // Close previous to close bq (for epoch 2) - progress.Closed = true - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, true) - - // Open previous to open bq with the new inclusion block (epoch 2) - progress.Closed = false - progress.Origin = l1[2] - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, false) - - // Finally add batch - firstBatch := b(12, l1[0]) - bq.AddBatch(firstBatch) - - // Close the origin - progress.Closed = true - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, true) - - // Step, but should have full epoch now - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - - // Verify Output - var final []*BatchData - final = append(final, firstBatch) - final = append(final, batches...) - require.Equal(t, final, next.batches) } func TestBatchQueueMissing(t *testing.T) { - t.Skip("want to migrate the test suite at once") - - log := testlog.Logger(t, log.LvlTrace) + log := testlog.Logger(t, log.LvlCrit) l1 := L1Chain([]uint64{10, 15, 20}) - next := &fakeBatchQueueOutput{ - safeL2Head: eth.L2BlockRef{ - Hash: mockHash(10, 2), - Number: 0, - ParentHash: common.Hash{}, - Time: 10, - L1Origin: l1[0].ID(), - SequenceNumber: 0, - }, - progress: Progress{ - Origin: l1[0], - Closed: false, - }, + safeHead := eth.L2BlockRef{ + Hash: mockHash(10, 2), + Number: 0, + ParentHash: common.Hash{}, + Time: 10, + L1Origin: l1[0].ID(), + SequenceNumber: 0, } cfg := &rollup.Config{ Genesis: rollup.Genesis{ @@ -255,56 +142,68 @@ func TestBatchQueueMissing(t *testing.T) { SeqWindowSize: 2, } - bq := NewBatchQueue(log, cfg, next, nil) - require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step") - - // We start with an open L1 origin as progress in the first step - progress := bq.progress - require.Equal(t, bq.progress.Closed, false) - // The batches at 18 and 20 are skipped to stop 22 from being eagerly processed. // This test checks that batch timestamp 12 & 14 are created, 16 is used, and 18 is advancing the epoch. // Due to the large sequencer time drift 16 is perfectly valid to have epoch 0 as origin. batches := []*BatchData{b(16, l1[0]), b(22, l1[1])} - for _, batch := range batches { - bq.AddBatch(batch) - } - // Missing first batches with timestamp 12 and 14, nothing to do yet. - err := bq.Step(context.Background(), progress) - require.Equal(t, err, io.EOF) + errors := []error{nil, nil} - // Close l1[0] - progress.Closed = true - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, true) + input := &fakeBatchQueueInput{ + batches: batches, + errors: errors, + origin: l1[0], + } - // Open l1[1] - progress.Closed = false - progress.Origin = l1[1] - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, false) - require.Empty(t, next.batches, "no batches yet, sequence window did not expire, waiting for 12 and 14") + bq := NewBatchQueue(log, cfg, input) + _ = bq.Reset(context.Background(), l1[0]) - // Close l1[1] - progress.Closed = true - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, true) + for i := 0; i < len(batches); i++ { + b, e := bq.NextBatch(context.Background(), safeHead) + require.ErrorIs(t, e, NotEnoughData) + require.Nil(t, b) + } - // Open l1[2] - progress.Closed = false - progress.Origin = l1[2] - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, false) + // advance origin. Underlying stage still has no more batches + // This is not enough to auto advance yet + input.origin = l1[1] + b, e := bq.NextBatch(context.Background(), safeHead) + require.ErrorIs(t, e, io.EOF) + require.Nil(t, b) + + // Advance the origin. At this point batch timestamps 12 and 14 will be created + input.origin = l1[2] + + // Check for a generated batch at t = 12 + b, e = bq.NextBatch(context.Background(), safeHead) + require.Nil(t, e) + require.Equal(t, b.Timestamp, uint64(12)) + require.Empty(t, b.BatchV1.Transactions) + safeHead.Number += 1 + safeHead.Time += 2 + safeHead.Hash = mockHash(b.Timestamp, 2) + + // Check for generated batch at t = 14 + b, e = bq.NextBatch(context.Background(), safeHead) + require.Nil(t, e) + require.Equal(t, b.Timestamp, uint64(14)) + require.Empty(t, b.BatchV1.Transactions) + safeHead.Number += 1 + safeHead.Time += 2 + safeHead.Hash = mockHash(b.Timestamp, 2) + + // Check for the inputted batch at t = 16 + b, e = bq.NextBatch(context.Background(), safeHead) + require.Nil(t, e) + require.Equal(t, b, batches[0]) + safeHead.Number += 1 + safeHead.Time += 2 + safeHead.Hash = mockHash(b.Timestamp, 2) + + // Check for the generated batch at t = 18. This batch advances the epoch + b, e = bq.NextBatch(context.Background(), safeHead) + require.Nil(t, e) + require.Equal(t, b.Timestamp, uint64(18)) + require.Empty(t, b.BatchV1.Transactions) + require.Equal(t, rollup.Epoch(1), b.EpochNum) - // Close l1[2], this is the moment that l1[0] expires and empty batches 12 and 14 can be created, - // and batch 16 can then be used. - progress.Closed = true - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, true) - require.Equal(t, 4, len(next.batches), "expecting empty batches with timestamp 12 and 14 to be created and existing batch 16 to follow") - require.Equal(t, uint64(12), next.batches[0].Timestamp) - require.Equal(t, uint64(14), next.batches[1].Timestamp) - require.Equal(t, batches[0], next.batches[2]) - require.Equal(t, uint64(18), next.batches[3].Timestamp) - require.Equal(t, rollup.Epoch(1), next.batches[3].EpochNum) } diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 412b553e3a4ab..b585619dde36b 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -17,6 +17,11 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type NextAttributesProvider interface { + Origin() eth.L1BlockRef + NextAttributes(context.Context, eth.L2BlockRef) (*eth.PayloadAttributes, error) +} + type Engine interface { GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) @@ -64,8 +69,6 @@ type EngineQueue struct { finalizedL1 eth.BlockID - progress Progress - safeAttributes []*eth.PayloadAttributes unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps @@ -73,14 +76,15 @@ type EngineQueue struct { finalityData []FinalityData engine Engine + prev NextAttributesProvider + + progress Progress // only used for pipeline resets metrics Metrics } -var _ AttributesQueueOutput = (*EngineQueue)(nil) - // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. -func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue { +func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider) *EngineQueue { return &EngineQueue{ log: log, cfg: cfg, @@ -91,6 +95,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M MaxSize: maxUnsafePayloadsMemory, SizeFn: payloadMemSize, }, + prev: prev, } } @@ -146,17 +151,30 @@ func (eq *EngineQueue) LastL2Time() uint64 { return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp) } -func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error { - if changed, err := eq.progress.Update(outer); err != nil || changed { - return err - } +func (eq *EngineQueue) Step(ctx context.Context, _ Progress) error { if len(eq.safeAttributes) > 0 { return eq.tryNextSafeAttributes(ctx) } + outOfData := false + if len(eq.safeAttributes) == 0 { + if next, err := eq.prev.NextAttributes(ctx, eq.safeHead); err == io.EOF { + outOfData = true + } else if err != nil { + return err + } else { + eq.safeAttributes = append(eq.safeAttributes, next) + return NotEnoughData + } + } if eq.unsafePayloads.Len() > 0 { return eq.tryNextUnsafePayload(ctx) } - return io.EOF + + if outOfData { + return io.EOF + } else { + return nil + } } // tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized, @@ -186,11 +204,11 @@ func (eq *EngineQueue) postProcessSafeL2() { eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...) } // remember the last L2 block that we fully derived from the given finality data - if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.progress.Origin.Number { + if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.prev.Origin().Number { // append entry for new L1 block eq.finalityData = append(eq.finalityData, FinalityData{ L2Block: eq.safeHead, - L1Block: eq.progress.Origin.ID(), + L1Block: eq.prev.Origin().ID(), }) } else { // if it's a now L2 block that was derived from the same latest L1 block, then just update the entry @@ -205,7 +223,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) { "l2_safe", eq.safeHead, "l2_unsafe", eq.unsafeHead, "l2_time", eq.unsafeHead.Time, - "l1_derived", eq.progress.Origin, + "l1_derived", eq.prev.Origin(), ) } @@ -415,7 +433,6 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error // note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads. eq.progress = Progress{ Origin: pipelineOrigin, - Closed: false, } eq.metrics.RecordL2Ref("l2_finalized", finalized) eq.metrics.RecordL2Ref("l2_safe", safe) diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go index 503f40c33e118..800c5c7736fe8 100644 --- a/op-node/rollup/derive/engine_queue_test.go +++ b/op-node/rollup/derive/engine_queue_test.go @@ -1,6 +1,8 @@ package derive import ( + "context" + "io" "math/rand" "testing" @@ -14,6 +16,20 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type fakeAttributesQueue struct { + origin eth.L1BlockRef +} + +func (f *fakeAttributesQueue) Origin() eth.L1BlockRef { + return f.origin +} + +func (f *fakeAttributesQueue) NextAttributes(_ context.Context, _ eth.L2BlockRef) (*eth.PayloadAttributes, error) { + return nil, io.EOF +} + +var _ NextAttributesProvider = (*fakeAttributesQueue)(nil) + func TestEngineQueue_Finalize(t *testing.T) { logger := testlog.Logger(t, log.LvlInfo) @@ -211,8 +227,10 @@ func TestEngineQueue_Finalize(t *testing.T) { l1F.ExpectL1BlockRefByHash(refB.Hash, refB, nil) l1F.ExpectL1BlockRefByNumber(refB.Number, refB, nil) - eq := NewEngineQueue(logger, cfg, eng, metrics) - require.NoError(t, RepeatResetStep(t, eq.ResetStep, l1F, 20)) + prev := &fakeAttributesQueue{} + + eq := NewEngineQueue(logger, cfg, eng, metrics, prev) + require.ErrorIs(t, eq.ResetStep(context.Background(), l1F), io.EOF) require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B") @@ -220,20 +238,19 @@ func TestEngineQueue_Finalize(t *testing.T) { // now say C1 was included in D and became the new safe head eq.progress.Origin = refD + prev.origin = refD eq.safeHead = refC1 eq.postProcessSafeL2() // now say D0 was included in E and became the new safe head eq.progress.Origin = refE + prev.origin = refE eq.safeHead = refD0 eq.postProcessSafeL2() // let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E) eq.Finalize(refD.ID()) - // Now a few steps later, without consuming any additional L1 inputs, - // we should be able to resolve that B1 is now finalized, since it was included in finalized L1 block C - require.NoError(t, RepeatStep(t, eq.Step, eq.progress, 10)) require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized") l1F.AssertExpectations(t) diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index b5cf89f3f7ec8..bc8fe63d78009 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -100,14 +100,14 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch l1Src := NewL1Retrieval(log, dataSrc, l1Traversal) bank := NewChannelBank(log, cfg, l1Src, l1Fetcher) chInReader := NewChannelInReader(log, bank) + batchQueue := NewBatchQueue(log, cfg, chInReader) + attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue) // Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages) - eng := NewEngineQueue(log, cfg, engine, metrics) - attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng) - batchQueue := NewBatchQueue(log, cfg, attributesQueue, chInReader) + eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue) - stages := []Stage{eng, attributesQueue, batchQueue} - pullStages := []PullStage{chInReader, bank, l1Src, l1Traversal} + stages := []Stage{eng} + pullStages := []PullStage{attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal} return &DerivationPipeline{ log: log,