diff --git a/op-node/rollup/derive/attributes_queue.go b/op-node/rollup/derive/attributes_queue.go index 63088a6eba8b4..f2e764b0c3cb0 100644 --- a/op-node/rollup/derive/attributes_queue.go +++ b/op-node/rollup/derive/attributes_queue.go @@ -22,75 +22,60 @@ import ( // This stage can be reset by clearing it's batch buffer. // This stage does not need to retain any references to L1 blocks. -type AttributesQueueOutput interface { - AddSafeAttributes(attributes *eth.PayloadAttributes) - SafeL2Head() eth.L2BlockRef - StageProgress -} - type AttributesQueue struct { - log log.Logger - config *rollup.Config - dl L1ReceiptsFetcher - next AttributesQueueOutput - prev *BatchQueue - progress Progress - batches []*BatchData + log log.Logger + config *rollup.Config + dl L1ReceiptsFetcher + prev *BatchQueue + batch *BatchData } -func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, next AttributesQueueOutput, prev *BatchQueue) *AttributesQueue { +func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, prev *BatchQueue) *AttributesQueue { return &AttributesQueue{ log: log, config: cfg, dl: l1Fetcher, - next: next, prev: prev, } } -func (aq *AttributesQueue) AddBatch(batch *BatchData) { - aq.log.Debug("Received next batch", "batch_epoch", batch.EpochNum, "batch_timestamp", batch.Timestamp, "tx_count", len(batch.Transactions)) - aq.batches = append(aq.batches, batch) -} - -func (aq *AttributesQueue) Progress() Progress { - return aq.progress +func (aq *AttributesQueue) Origin() eth.L1BlockRef { + return aq.prev.Origin() } -func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error { - if aq.progress.Origin != aq.prev.Origin() { - aq.progress.Closed = false - aq.progress.Origin = aq.prev.Origin() - return nil +func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) { + // Get a batch if we need it + if aq.batch == nil { + batch, err := aq.prev.NextBatch(ctx, l2SafeHead) + if err != nil { + return nil, err + } + aq.batch = batch } - if len(aq.batches) == 0 { - batch, err := aq.prev.NextBatch(ctx, aq.next.SafeL2Head()) - if err == io.EOF { - if !aq.progress.Closed { - aq.progress.Closed = true - return nil - } else { - return io.EOF - } - - } else if err != nil { - return err - } - aq.batches = append(aq.batches, batch) + // Actually generate the next attributes + if attrs, err := aq.createNextAttributes(ctx, aq.batch, l2SafeHead); err != nil { + return nil, err + } else { + // Clear out the local state once we will succeed + aq.batch = nil + return attrs, nil } - batch := aq.batches[0] - safeL2Head := aq.next.SafeL2Head() +} + +// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions +// to the attributes transaction list +func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) { // sanity check parent hash - if batch.ParentHash != safeL2Head.Hash { - return NewCriticalError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, safeL2Head.Hash)) + if batch.ParentHash != l2SafeHead.Hash { + return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash)) } fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second) defer cancel() - attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, safeL2Head, batch.Timestamp, batch.Epoch()) + attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, l2SafeHead, batch.Timestamp, batch.Epoch()) if err != nil { - return err + return nil, err } // we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool @@ -100,19 +85,9 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error { aq.log.Info("generated attributes in payload queue", "txs", len(attrs.Transactions), "timestamp", batch.Timestamp) - // Slice off the batch once we are guaranteed to succeed - aq.batches = aq.batches[1:] - - aq.next.AddSafeAttributes(attrs) - return nil + return attrs, nil } -func (aq *AttributesQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - aq.batches = aq.batches[:0] - aq.progress = aq.next.Progress() +func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error { return io.EOF } - -func (aq *AttributesQueue) SafeL2Head() eth.L2BlockRef { - return aq.next.SafeL2Head() -} diff --git a/op-node/rollup/derive/attributes_queue_test.go b/op-node/rollup/derive/attributes_queue_test.go index 959fc79c6b6e9..c5784f59f1715 100644 --- a/op-node/rollup/derive/attributes_queue_test.go +++ b/op-node/rollup/derive/attributes_queue_test.go @@ -2,7 +2,6 @@ package derive import ( "context" - "io" "math/big" "math/rand" "testing" @@ -17,30 +16,10 @@ import ( "github.com/ethereum/go-ethereum/log" ) -type MockAttributesQueueOutput struct { - MockOriginStage -} - -func (m *MockAttributesQueueOutput) AddSafeAttributes(attributes *eth.PayloadAttributes) { - m.Mock.MethodCalled("AddSafeAttributes", attributes) -} - -func (m *MockAttributesQueueOutput) ExpectAddSafeAttributes(attributes *eth.PayloadAttributes) { - m.Mock.On("AddSafeAttributes", attributes).Once().Return() -} - -func (m *MockAttributesQueueOutput) SafeL2Head() eth.L2BlockRef { - return m.Mock.MethodCalled("SafeL2Head").Get(0).(eth.L2BlockRef) -} - -func (m *MockAttributesQueueOutput) ExpectSafeL2Head(head eth.L2BlockRef) { - m.Mock.On("SafeL2Head").Once().Return(head) -} - -var _ AttributesQueueOutput = (*MockAttributesQueueOutput)(nil) - -func TestAttributesQueue_Step(t *testing.T) { - t.Skip("don't fake out batch queue") +// TestAttributesQueue checks that it properly uses the PreparePayloadAttributes function +// (which is well tested) and that it properly sets NoTxPool and adds in the candidate +// transactions. +func TestAttributesQueue(t *testing.T) { // test config, only init the necessary fields cfg := &rollup.Config{ BlockTime: 2, @@ -57,18 +36,9 @@ func TestAttributesQueue_Step(t *testing.T) { l1Fetcher.ExpectInfoByHash(l1Info.InfoHash, l1Info, nil) - out := &MockAttributesQueueOutput{} - out.progress = Progress{ - Origin: l1Info.BlockRef(), - Closed: false, - } - defer out.AssertExpectations(t) - safeHead := testutils.RandomL2BlockRef(rng) safeHead.L1Origin = l1Info.ID() - out.ExpectSafeL2Head(safeHead) - batch := &BatchData{BatchV1{ ParentHash: safeHead.Hash, EpochNum: rollup.Epoch(l1Info.InfoNum), @@ -86,13 +56,11 @@ func TestAttributesQueue_Step(t *testing.T) { Transactions: []eth.Data{l1InfoTx, eth.Data("foobar"), eth.Data("example")}, NoTxPool: true, } - out.ExpectAddSafeAttributes(&attrs) - aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, out, nil) - require.NoError(t, RepeatResetStep(t, aq.ResetStep, l1Fetcher, 1)) + aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, nil) - aq.AddBatch(batch) + actual, err := aq.createNextAttributes(context.Background(), batch, safeHead) - require.NoError(t, aq.Step(context.Background(), out.progress), "adding batch to next stage, no EOF yet") - require.Equal(t, io.EOF, aq.Step(context.Background(), out.progress), "done with batches") + require.Nil(t, err) + require.Equal(t, attrs, *actual) } diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 412b553e3a4ab..b585619dde36b 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -17,6 +17,11 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type NextAttributesProvider interface { + Origin() eth.L1BlockRef + NextAttributes(context.Context, eth.L2BlockRef) (*eth.PayloadAttributes, error) +} + type Engine interface { GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) @@ -64,8 +69,6 @@ type EngineQueue struct { finalizedL1 eth.BlockID - progress Progress - safeAttributes []*eth.PayloadAttributes unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps @@ -73,14 +76,15 @@ type EngineQueue struct { finalityData []FinalityData engine Engine + prev NextAttributesProvider + + progress Progress // only used for pipeline resets metrics Metrics } -var _ AttributesQueueOutput = (*EngineQueue)(nil) - // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. -func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue { +func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider) *EngineQueue { return &EngineQueue{ log: log, cfg: cfg, @@ -91,6 +95,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M MaxSize: maxUnsafePayloadsMemory, SizeFn: payloadMemSize, }, + prev: prev, } } @@ -146,17 +151,30 @@ func (eq *EngineQueue) LastL2Time() uint64 { return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp) } -func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error { - if changed, err := eq.progress.Update(outer); err != nil || changed { - return err - } +func (eq *EngineQueue) Step(ctx context.Context, _ Progress) error { if len(eq.safeAttributes) > 0 { return eq.tryNextSafeAttributes(ctx) } + outOfData := false + if len(eq.safeAttributes) == 0 { + if next, err := eq.prev.NextAttributes(ctx, eq.safeHead); err == io.EOF { + outOfData = true + } else if err != nil { + return err + } else { + eq.safeAttributes = append(eq.safeAttributes, next) + return NotEnoughData + } + } if eq.unsafePayloads.Len() > 0 { return eq.tryNextUnsafePayload(ctx) } - return io.EOF + + if outOfData { + return io.EOF + } else { + return nil + } } // tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized, @@ -186,11 +204,11 @@ func (eq *EngineQueue) postProcessSafeL2() { eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...) } // remember the last L2 block that we fully derived from the given finality data - if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.progress.Origin.Number { + if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.prev.Origin().Number { // append entry for new L1 block eq.finalityData = append(eq.finalityData, FinalityData{ L2Block: eq.safeHead, - L1Block: eq.progress.Origin.ID(), + L1Block: eq.prev.Origin().ID(), }) } else { // if it's a now L2 block that was derived from the same latest L1 block, then just update the entry @@ -205,7 +223,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) { "l2_safe", eq.safeHead, "l2_unsafe", eq.unsafeHead, "l2_time", eq.unsafeHead.Time, - "l1_derived", eq.progress.Origin, + "l1_derived", eq.prev.Origin(), ) } @@ -415,7 +433,6 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error // note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads. eq.progress = Progress{ Origin: pipelineOrigin, - Closed: false, } eq.metrics.RecordL2Ref("l2_finalized", finalized) eq.metrics.RecordL2Ref("l2_safe", safe) diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go index 503f40c33e118..800c5c7736fe8 100644 --- a/op-node/rollup/derive/engine_queue_test.go +++ b/op-node/rollup/derive/engine_queue_test.go @@ -1,6 +1,8 @@ package derive import ( + "context" + "io" "math/rand" "testing" @@ -14,6 +16,20 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type fakeAttributesQueue struct { + origin eth.L1BlockRef +} + +func (f *fakeAttributesQueue) Origin() eth.L1BlockRef { + return f.origin +} + +func (f *fakeAttributesQueue) NextAttributes(_ context.Context, _ eth.L2BlockRef) (*eth.PayloadAttributes, error) { + return nil, io.EOF +} + +var _ NextAttributesProvider = (*fakeAttributesQueue)(nil) + func TestEngineQueue_Finalize(t *testing.T) { logger := testlog.Logger(t, log.LvlInfo) @@ -211,8 +227,10 @@ func TestEngineQueue_Finalize(t *testing.T) { l1F.ExpectL1BlockRefByHash(refB.Hash, refB, nil) l1F.ExpectL1BlockRefByNumber(refB.Number, refB, nil) - eq := NewEngineQueue(logger, cfg, eng, metrics) - require.NoError(t, RepeatResetStep(t, eq.ResetStep, l1F, 20)) + prev := &fakeAttributesQueue{} + + eq := NewEngineQueue(logger, cfg, eng, metrics, prev) + require.ErrorIs(t, eq.ResetStep(context.Background(), l1F), io.EOF) require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B") @@ -220,20 +238,19 @@ func TestEngineQueue_Finalize(t *testing.T) { // now say C1 was included in D and became the new safe head eq.progress.Origin = refD + prev.origin = refD eq.safeHead = refC1 eq.postProcessSafeL2() // now say D0 was included in E and became the new safe head eq.progress.Origin = refE + prev.origin = refE eq.safeHead = refD0 eq.postProcessSafeL2() // let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E) eq.Finalize(refD.ID()) - // Now a few steps later, without consuming any additional L1 inputs, - // we should be able to resolve that B1 is now finalized, since it was included in finalized L1 block C - require.NoError(t, RepeatStep(t, eq.Step, eq.progress, 10)) require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized") l1F.AssertExpectations(t) diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index ba2f62472d68d..bc8fe63d78009 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -101,13 +101,13 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch bank := NewChannelBank(log, cfg, l1Src, l1Fetcher) chInReader := NewChannelInReader(log, bank) batchQueue := NewBatchQueue(log, cfg, chInReader) + attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue) // Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages) - eng := NewEngineQueue(log, cfg, engine, metrics) - attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng, batchQueue) + eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue) - stages := []Stage{eng, attributesQueue} - pullStages := []PullStage{batchQueue, chInReader, bank, l1Src, l1Traversal} + stages := []Stage{eng} + pullStages := []PullStage{attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal} return &DerivationPipeline{ log: log,