diff --git a/op-node/rollup/derive/batch_queue.go b/op-node/rollup/derive/batch_queue.go index 5e5dd5496ff62..d5c683254c691 100644 --- a/op-node/rollup/derive/batch_queue.go +++ b/op-node/rollup/derive/batch_queue.go @@ -26,12 +26,6 @@ import ( // It is internally responsible for making sure that batches with L1 inclusions block outside it's // working range are not considered or pruned. -type BatchQueueOutput interface { - StageProgress - AddBatch(batch *BatchData) - SafeL2Head() eth.L2BlockRef -} - type NextBatchProvider interface { Origin() eth.L1BlockRef NextBatch(ctx context.Context) (*BatchData, error) diff --git a/op-node/rollup/derive/channel_bank.go b/op-node/rollup/derive/channel_bank.go index 9a4581051cddc..dfcd048d65b63 100644 --- a/op-node/rollup/derive/channel_bank.go +++ b/op-node/rollup/derive/channel_bank.go @@ -38,7 +38,7 @@ type ChannelBank struct { fetcher L1Fetcher } -var _ PullStage = (*ChannelBank)(nil) +var _ ResetableStage = (*ChannelBank)(nil) // NewChannelBank creates a ChannelBank, which should be Reset(origin) before use. func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextDataProvider, fetcher L1Fetcher) *ChannelBank { diff --git a/op-node/rollup/derive/channel_in_reader.go b/op-node/rollup/derive/channel_in_reader.go index 9865dacd5aa7f..07156b9997a97 100644 --- a/op-node/rollup/derive/channel_in_reader.go +++ b/op-node/rollup/derive/channel_in_reader.go @@ -22,7 +22,7 @@ type ChannelInReader struct { prev *ChannelBank } -var _ PullStage = (*ChannelInReader)(nil) +var _ ResetableStage = (*ChannelInReader)(nil) // NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use. func NewChannelInReader(log log.Logger, prev *ChannelBank) *ChannelInReader { diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index b585619dde36b..b528517041ba9 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -80,11 +80,12 @@ type EngineQueue struct { progress Progress // only used for pipeline resets - metrics Metrics + metrics Metrics + l1Fetcher L1Fetcher } // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. -func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider) *EngineQueue { +func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher) *EngineQueue { return &EngineQueue{ log: log, cfg: cfg, @@ -95,7 +96,8 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M MaxSize: maxUnsafePayloadsMemory, SizeFn: payloadMemSize, }, - prev: prev, + prev: prev, + l1Fetcher: l1Fetcher, } } @@ -151,7 +153,7 @@ func (eq *EngineQueue) LastL2Time() uint64 { return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp) } -func (eq *EngineQueue) Step(ctx context.Context, _ Progress) error { +func (eq *EngineQueue) Step(ctx context.Context) error { if len(eq.safeAttributes) > 0 { return eq.tryNextSafeAttributes(ctx) } @@ -402,13 +404,13 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { // ResetStep Walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical. // The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical. -func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - result, err := sync.FindL2Heads(ctx, eq.cfg, l1Fetcher, eq.engine) +func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error { + result, err := sync.FindL2Heads(ctx, eq.cfg, eq.l1Fetcher, eq.engine) if err != nil { return NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err)) } finalized, safe, unsafe := result.Finalized, result.Safe, result.Unsafe - l1Origin, err := l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash) + l1Origin, err := eq.l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash) if err != nil { return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", safe.L1Origin, err)) } @@ -421,7 +423,7 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error if l1Origin.Number < eq.cfg.ChannelTimeout { pipelineNumber = 0 } - pipelineOrigin, err := l1Fetcher.L1BlockRefByNumber(ctx, pipelineNumber) + pipelineOrigin, err := eq.l1Fetcher.L1BlockRefByNumber(ctx, pipelineNumber) if err != nil { return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", pipelineNumber, err)) } diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go index 9359f3ccd8164..d0edea4f1f7e5 100644 --- a/op-node/rollup/derive/engine_queue_test.go +++ b/op-node/rollup/derive/engine_queue_test.go @@ -230,8 +230,8 @@ func TestEngineQueue_Finalize(t *testing.T) { prev := &fakeAttributesQueue{} - eq := NewEngineQueue(logger, cfg, eng, metrics, prev) - require.ErrorIs(t, eq.ResetStep(context.Background(), l1F), io.EOF) + eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F) + require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}), io.EOF) require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B") diff --git a/op-node/rollup/derive/l1_retrieval.go b/op-node/rollup/derive/l1_retrieval.go index dda4be6434bac..1fd4acf8cb54b 100644 --- a/op-node/rollup/derive/l1_retrieval.go +++ b/op-node/rollup/derive/l1_retrieval.go @@ -25,7 +25,7 @@ type L1Retrieval struct { datas DataIter } -var _ PullStage = (*L1Retrieval)(nil) +var _ ResetableStage = (*L1Retrieval)(nil) func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, prev NextBlockProvider) *L1Retrieval { return &L1Retrieval{ diff --git a/op-node/rollup/derive/l1_traversal.go b/op-node/rollup/derive/l1_traversal.go index db19eed5a94db..8ad65873e9ee8 100644 --- a/op-node/rollup/derive/l1_traversal.go +++ b/op-node/rollup/derive/l1_traversal.go @@ -24,7 +24,7 @@ type L1Traversal struct { log log.Logger } -var _ PullStage = (*L1Traversal)(nil) +var _ ResetableStage = (*L1Traversal)(nil) func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher) *L1Traversal { return &L1Traversal{ diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index bc8fe63d78009..d9c1c661d7062 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -24,36 +24,11 @@ type L1Fetcher interface { L1TransactionFetcher } -type StageProgress interface { - Progress() Progress -} - -type PullStage interface { +type ResetableStage interface { // Reset resets a pull stage. `base` refers to the L1 Block Reference to reset to. - // TODO: Return L1 Block reference Reset(ctx context.Context, base eth.L1BlockRef) error } -type Stage interface { - StageProgress - - // Step tries to progress the state. - // The outer stage progress informs the step what to do. - // - // If the stage: - // - returns EOF: the stage will be skipped - // - returns another error: the stage will make the pipeline error. - // - returns nil: the stage will be repeated next Step - Step(ctx context.Context, outer Progress) error - - // ResetStep prepares the state for usage in regular steps. - // Similar to Step(ctx) it returns: - // - EOF if the next stage should be reset - // - error if the reset should start all over again - // - nil if the reset should continue resetting this stage. - ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error -} - type EngineQueueStage interface { Finalized() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef @@ -64,6 +39,7 @@ type EngineQueueStage interface { Finalize(l1Origin eth.BlockID) AddSafeAttributes(attributes *eth.PayloadAttributes) AddUnsafePayload(payload *eth.ExecutionPayload) + Step(context.Context) error } // DerivationPipeline is updated with new L1 data, and the Step() function can be iterated on to keep the L2 Engine in sync. @@ -74,19 +50,12 @@ type DerivationPipeline struct { // Index of the stage that is currently being reset. // >= len(stages) if no additional resetting is required - resetting int - pullResetIdx int + resetting int + stages []ResetableStage - // Index of the stage that is currently being processed. - active int - - // stages in execution order. A stage Step that: - stages []Stage - - pullStages []PullStage - traversal *L1Traversal - - eng EngineQueueStage + // Special stages to keep track of + traversal *L1Traversal + eng EngineQueueStage metrics Metrics } @@ -103,29 +72,28 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch batchQueue := NewBatchQueue(log, cfg, chInReader) attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue) - // Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages) - eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue) + // Step stages + eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue, l1Fetcher) - stages := []Stage{eng} - pullStages := []PullStage{attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal} + // Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during + // the reset, but after the engine queue, this is the order in which the stages could talk to each other. + // Note: The engine queue stage is the only reset that can fail. + stages := []ResetableStage{eng, l1Traversal, l1Src, bank, chInReader, batchQueue, attributesQueue} return &DerivationPipeline{ - log: log, - cfg: cfg, - l1Fetcher: l1Fetcher, - resetting: 0, - active: 0, - stages: stages, - pullStages: pullStages, - eng: eng, - metrics: metrics, - traversal: l1Traversal, + log: log, + cfg: cfg, + l1Fetcher: l1Fetcher, + resetting: 0, + stages: stages, + eng: eng, + metrics: metrics, + traversal: l1Traversal, } } func (dp *DerivationPipeline) Reset() { dp.resetting = 0 - dp.pullResetIdx = 0 } func (dp *DerivationPipeline) Progress() Progress { @@ -169,8 +137,8 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error { // if any stages need to be reset, do that first. if dp.resetting < len(dp.stages) { - if err := dp.stages[dp.resetting].ResetStep(ctx, dp.l1Fetcher); err == io.EOF { - dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.stages[dp.resetting].Progress().Origin) + if err := dp.stages[dp.resetting].Reset(ctx, dp.eng.Progress().Origin); err == io.EOF { + dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.eng.Progress().Origin) dp.resetting += 1 return nil } else if err != nil { @@ -179,37 +147,14 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error { return nil } } - // Then reset the pull based stages - if dp.pullResetIdx < len(dp.pullStages) { - // Use the last stage's progress as the one to pull from - inner := dp.stages[len(dp.stages)-1].Progress() - - // Do the reset - if err := dp.pullStages[dp.pullResetIdx].Reset(ctx, inner.Origin); err == io.EOF { - // dp.log.Debug("reset of stage completed", "stage", dp.pullResetIdx, "origin", dp.pullStages[dp.pullResetIdx].Progress().Origin) - dp.pullResetIdx += 1 - return nil - } else if err != nil { - return fmt.Errorf("stage %d failed resetting: %w", dp.pullResetIdx, err) - } else { - return nil - } - } - // Lastly advance the stages - for i, stage := range dp.stages { - var outer Progress - if i+1 < len(dp.stages) { - outer = dp.stages[i+1].Progress() - } - if err := stage.Step(ctx, outer); err == io.EOF { - continue - } else if err != nil { - return fmt.Errorf("stage %d failed: %w", i, err) - } else { - return nil - } + // Now step the engine queue. It will pull earlier data as needed. + if err := dp.eng.Step(ctx); err == io.EOF { + // If every stage has returned io.EOF, try to advance the L1 Origin + return dp.traversal.AdvanceL1Block(ctx) + } else if err != nil { + return fmt.Errorf("engine stage failed: %w", err) + } else { + return nil } - // If every stage has returned io.EOF, try to advance the L1 Origin - return dp.traversal.AdvanceL1Block(ctx) } diff --git a/op-node/rollup/derive/pipeline_test.go b/op-node/rollup/derive/pipeline_test.go index 6c6e4d9b756b0..926518fe29127 100644 --- a/op-node/rollup/derive/pipeline_test.go +++ b/op-node/rollup/derive/pipeline_test.go @@ -5,8 +5,6 @@ import ( "io" "testing" - "github.com/stretchr/testify/mock" - "github.com/ethereum-optimism/optimism/op-node/testutils" ) @@ -14,17 +12,6 @@ var _ Engine = (*testutils.MockEngine)(nil) var _ L1Fetcher = (*testutils.MockL1Source)(nil) -type MockOriginStage struct { - mock.Mock - progress Progress -} - -func (m *MockOriginStage) Progress() Progress { - return m.progress -} - -var _ StageProgress = (*MockOriginStage)(nil) - // RepeatResetStep is a test util that will repeat the ResetStep function until an error. // If the step runs too many times, it will fail the test. func RepeatResetStep(t *testing.T, step func(ctx context.Context, l1Fetcher L1Fetcher) error, l1Fetcher L1Fetcher, max int) error {