From 2c0ec21f029d37b47e5d027c2284c286be5a466e Mon Sep 17 00:00:00 2001 From: Joshua Gutow Date: Tue, 27 Sep 2022 15:56:11 -0700 Subject: [PATCH 1/2] op-node: Clean up pipeline Now that the engine queue is the only step stage, it is easy to consolidate different loops inside the derivation pipeline. --- op-node/rollup/derive/batch_queue.go | 6 -- op-node/rollup/derive/channel_bank.go | 2 +- op-node/rollup/derive/channel_in_reader.go | 2 +- op-node/rollup/derive/engine_queue.go | 18 ++-- op-node/rollup/derive/engine_queue_test.go | 4 +- op-node/rollup/derive/l1_retrieval.go | 2 +- op-node/rollup/derive/l1_traversal.go | 2 +- op-node/rollup/derive/pipeline.go | 117 ++++++--------------- op-node/rollup/derive/pipeline_test.go | 13 --- 9 files changed, 47 insertions(+), 119 deletions(-) diff --git a/op-node/rollup/derive/batch_queue.go b/op-node/rollup/derive/batch_queue.go index 5e5dd5496ff62..d5c683254c691 100644 --- a/op-node/rollup/derive/batch_queue.go +++ b/op-node/rollup/derive/batch_queue.go @@ -26,12 +26,6 @@ import ( // It is internally responsible for making sure that batches with L1 inclusions block outside it's // working range are not considered or pruned. -type BatchQueueOutput interface { - StageProgress - AddBatch(batch *BatchData) - SafeL2Head() eth.L2BlockRef -} - type NextBatchProvider interface { Origin() eth.L1BlockRef NextBatch(ctx context.Context) (*BatchData, error) diff --git a/op-node/rollup/derive/channel_bank.go b/op-node/rollup/derive/channel_bank.go index 9a4581051cddc..dfcd048d65b63 100644 --- a/op-node/rollup/derive/channel_bank.go +++ b/op-node/rollup/derive/channel_bank.go @@ -38,7 +38,7 @@ type ChannelBank struct { fetcher L1Fetcher } -var _ PullStage = (*ChannelBank)(nil) +var _ ResetableStage = (*ChannelBank)(nil) // NewChannelBank creates a ChannelBank, which should be Reset(origin) before use. func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextDataProvider, fetcher L1Fetcher) *ChannelBank { diff --git a/op-node/rollup/derive/channel_in_reader.go b/op-node/rollup/derive/channel_in_reader.go index 9865dacd5aa7f..07156b9997a97 100644 --- a/op-node/rollup/derive/channel_in_reader.go +++ b/op-node/rollup/derive/channel_in_reader.go @@ -22,7 +22,7 @@ type ChannelInReader struct { prev *ChannelBank } -var _ PullStage = (*ChannelInReader)(nil) +var _ ResetableStage = (*ChannelInReader)(nil) // NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use. func NewChannelInReader(log log.Logger, prev *ChannelBank) *ChannelInReader { diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index b585619dde36b..b528517041ba9 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -80,11 +80,12 @@ type EngineQueue struct { progress Progress // only used for pipeline resets - metrics Metrics + metrics Metrics + l1Fetcher L1Fetcher } // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. -func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider) *EngineQueue { +func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher) *EngineQueue { return &EngineQueue{ log: log, cfg: cfg, @@ -95,7 +96,8 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M MaxSize: maxUnsafePayloadsMemory, SizeFn: payloadMemSize, }, - prev: prev, + prev: prev, + l1Fetcher: l1Fetcher, } } @@ -151,7 +153,7 @@ func (eq *EngineQueue) LastL2Time() uint64 { return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp) } -func (eq *EngineQueue) Step(ctx context.Context, _ Progress) error { +func (eq *EngineQueue) Step(ctx context.Context) error { if len(eq.safeAttributes) > 0 { return eq.tryNextSafeAttributes(ctx) } @@ -402,13 +404,13 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { // ResetStep Walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical. // The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical. -func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - result, err := sync.FindL2Heads(ctx, eq.cfg, l1Fetcher, eq.engine) +func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error { + result, err := sync.FindL2Heads(ctx, eq.cfg, eq.l1Fetcher, eq.engine) if err != nil { return NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err)) } finalized, safe, unsafe := result.Finalized, result.Safe, result.Unsafe - l1Origin, err := l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash) + l1Origin, err := eq.l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash) if err != nil { return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", safe.L1Origin, err)) } @@ -421,7 +423,7 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error if l1Origin.Number < eq.cfg.ChannelTimeout { pipelineNumber = 0 } - pipelineOrigin, err := l1Fetcher.L1BlockRefByNumber(ctx, pipelineNumber) + pipelineOrigin, err := eq.l1Fetcher.L1BlockRefByNumber(ctx, pipelineNumber) if err != nil { return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", pipelineNumber, err)) } diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go index 9359f3ccd8164..d0edea4f1f7e5 100644 --- a/op-node/rollup/derive/engine_queue_test.go +++ b/op-node/rollup/derive/engine_queue_test.go @@ -230,8 +230,8 @@ func TestEngineQueue_Finalize(t *testing.T) { prev := &fakeAttributesQueue{} - eq := NewEngineQueue(logger, cfg, eng, metrics, prev) - require.ErrorIs(t, eq.ResetStep(context.Background(), l1F), io.EOF) + eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F) + require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}), io.EOF) require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B") diff --git a/op-node/rollup/derive/l1_retrieval.go b/op-node/rollup/derive/l1_retrieval.go index dda4be6434bac..1fd4acf8cb54b 100644 --- a/op-node/rollup/derive/l1_retrieval.go +++ b/op-node/rollup/derive/l1_retrieval.go @@ -25,7 +25,7 @@ type L1Retrieval struct { datas DataIter } -var _ PullStage = (*L1Retrieval)(nil) +var _ ResetableStage = (*L1Retrieval)(nil) func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, prev NextBlockProvider) *L1Retrieval { return &L1Retrieval{ diff --git a/op-node/rollup/derive/l1_traversal.go b/op-node/rollup/derive/l1_traversal.go index db19eed5a94db..8ad65873e9ee8 100644 --- a/op-node/rollup/derive/l1_traversal.go +++ b/op-node/rollup/derive/l1_traversal.go @@ -24,7 +24,7 @@ type L1Traversal struct { log log.Logger } -var _ PullStage = (*L1Traversal)(nil) +var _ ResetableStage = (*L1Traversal)(nil) func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher) *L1Traversal { return &L1Traversal{ diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index bc8fe63d78009..d9c1c661d7062 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -24,36 +24,11 @@ type L1Fetcher interface { L1TransactionFetcher } -type StageProgress interface { - Progress() Progress -} - -type PullStage interface { +type ResetableStage interface { // Reset resets a pull stage. `base` refers to the L1 Block Reference to reset to. - // TODO: Return L1 Block reference Reset(ctx context.Context, base eth.L1BlockRef) error } -type Stage interface { - StageProgress - - // Step tries to progress the state. - // The outer stage progress informs the step what to do. - // - // If the stage: - // - returns EOF: the stage will be skipped - // - returns another error: the stage will make the pipeline error. - // - returns nil: the stage will be repeated next Step - Step(ctx context.Context, outer Progress) error - - // ResetStep prepares the state for usage in regular steps. - // Similar to Step(ctx) it returns: - // - EOF if the next stage should be reset - // - error if the reset should start all over again - // - nil if the reset should continue resetting this stage. - ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error -} - type EngineQueueStage interface { Finalized() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef @@ -64,6 +39,7 @@ type EngineQueueStage interface { Finalize(l1Origin eth.BlockID) AddSafeAttributes(attributes *eth.PayloadAttributes) AddUnsafePayload(payload *eth.ExecutionPayload) + Step(context.Context) error } // DerivationPipeline is updated with new L1 data, and the Step() function can be iterated on to keep the L2 Engine in sync. @@ -74,19 +50,12 @@ type DerivationPipeline struct { // Index of the stage that is currently being reset. // >= len(stages) if no additional resetting is required - resetting int - pullResetIdx int + resetting int + stages []ResetableStage - // Index of the stage that is currently being processed. - active int - - // stages in execution order. A stage Step that: - stages []Stage - - pullStages []PullStage - traversal *L1Traversal - - eng EngineQueueStage + // Special stages to keep track of + traversal *L1Traversal + eng EngineQueueStage metrics Metrics } @@ -103,29 +72,28 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch batchQueue := NewBatchQueue(log, cfg, chInReader) attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue) - // Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages) - eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue) + // Step stages + eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue, l1Fetcher) - stages := []Stage{eng} - pullStages := []PullStage{attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal} + // Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during + // the reset, but after the engine queue, this is the order in which the stages could talk to each other. + // Note: The engine queue stage is the only reset that can fail. + stages := []ResetableStage{eng, l1Traversal, l1Src, bank, chInReader, batchQueue, attributesQueue} return &DerivationPipeline{ - log: log, - cfg: cfg, - l1Fetcher: l1Fetcher, - resetting: 0, - active: 0, - stages: stages, - pullStages: pullStages, - eng: eng, - metrics: metrics, - traversal: l1Traversal, + log: log, + cfg: cfg, + l1Fetcher: l1Fetcher, + resetting: 0, + stages: stages, + eng: eng, + metrics: metrics, + traversal: l1Traversal, } } func (dp *DerivationPipeline) Reset() { dp.resetting = 0 - dp.pullResetIdx = 0 } func (dp *DerivationPipeline) Progress() Progress { @@ -169,8 +137,8 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error { // if any stages need to be reset, do that first. if dp.resetting < len(dp.stages) { - if err := dp.stages[dp.resetting].ResetStep(ctx, dp.l1Fetcher); err == io.EOF { - dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.stages[dp.resetting].Progress().Origin) + if err := dp.stages[dp.resetting].Reset(ctx, dp.eng.Progress().Origin); err == io.EOF { + dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.eng.Progress().Origin) dp.resetting += 1 return nil } else if err != nil { @@ -179,37 +147,14 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error { return nil } } - // Then reset the pull based stages - if dp.pullResetIdx < len(dp.pullStages) { - // Use the last stage's progress as the one to pull from - inner := dp.stages[len(dp.stages)-1].Progress() - - // Do the reset - if err := dp.pullStages[dp.pullResetIdx].Reset(ctx, inner.Origin); err == io.EOF { - // dp.log.Debug("reset of stage completed", "stage", dp.pullResetIdx, "origin", dp.pullStages[dp.pullResetIdx].Progress().Origin) - dp.pullResetIdx += 1 - return nil - } else if err != nil { - return fmt.Errorf("stage %d failed resetting: %w", dp.pullResetIdx, err) - } else { - return nil - } - } - // Lastly advance the stages - for i, stage := range dp.stages { - var outer Progress - if i+1 < len(dp.stages) { - outer = dp.stages[i+1].Progress() - } - if err := stage.Step(ctx, outer); err == io.EOF { - continue - } else if err != nil { - return fmt.Errorf("stage %d failed: %w", i, err) - } else { - return nil - } + // Now step the engine queue. It will pull earlier data as needed. + if err := dp.eng.Step(ctx); err == io.EOF { + // If every stage has returned io.EOF, try to advance the L1 Origin + return dp.traversal.AdvanceL1Block(ctx) + } else if err != nil { + return fmt.Errorf("engine stage failed: %w", err) + } else { + return nil } - // If every stage has returned io.EOF, try to advance the L1 Origin - return dp.traversal.AdvanceL1Block(ctx) } diff --git a/op-node/rollup/derive/pipeline_test.go b/op-node/rollup/derive/pipeline_test.go index 6c6e4d9b756b0..926518fe29127 100644 --- a/op-node/rollup/derive/pipeline_test.go +++ b/op-node/rollup/derive/pipeline_test.go @@ -5,8 +5,6 @@ import ( "io" "testing" - "github.com/stretchr/testify/mock" - "github.com/ethereum-optimism/optimism/op-node/testutils" ) @@ -14,17 +12,6 @@ var _ Engine = (*testutils.MockEngine)(nil) var _ L1Fetcher = (*testutils.MockL1Source)(nil) -type MockOriginStage struct { - mock.Mock - progress Progress -} - -func (m *MockOriginStage) Progress() Progress { - return m.progress -} - -var _ StageProgress = (*MockOriginStage)(nil) - // RepeatResetStep is a test util that will repeat the ResetStep function until an error. // If the step runs too many times, it will fail the test. func RepeatResetStep(t *testing.T, step func(ctx context.Context, l1Fetcher L1Fetcher) error, l1Fetcher L1Fetcher, max int) error { From 98c1856d911f08db6b2085fcdbb73043784e0700 Mon Sep 17 00:00:00 2001 From: Joshua Gutow Date: Mon, 3 Oct 2022 13:00:02 -0700 Subject: [PATCH 2/2] op-node: Fully remove the progress API It has been partially replaced with the Origin API, but the open/closed distinction no longer exists. --- op-e2e/actions/l2_verifier.go | 2 +- op-node/rollup/derive/engine_queue.go | 10 ++--- op-node/rollup/derive/engine_queue_test.go | 6 +-- op-node/rollup/derive/pipeline.go | 12 +++--- op-node/rollup/derive/pipeline_test.go | 42 +------------------- op-node/rollup/derive/progress.go | 46 ---------------------- op-node/rollup/driver/driver.go | 2 +- op-node/rollup/driver/state.go | 8 ++-- 8 files changed, 20 insertions(+), 108 deletions(-) delete mode 100644 op-node/rollup/derive/progress.go diff --git a/op-e2e/actions/l2_verifier.go b/op-e2e/actions/l2_verifier.go index ee9f6031c357f..49700c44af549 100644 --- a/op-e2e/actions/l2_verifier.go +++ b/op-e2e/actions/l2_verifier.go @@ -47,7 +47,7 @@ func NewL2Verifier(log log.Logger, l1 derive.L1Fetcher, eng derive.Engine, cfg * func (s *L2Verifier) SyncStatus() *eth.SyncStatus { return ð.SyncStatus{ - CurrentL1: s.derivation.Progress().Origin, + CurrentL1: s.derivation.Origin(), HeadL1: s.l1Head, SafeL1: s.l1Safe, FinalizedL1: s.l1Finalized, diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index b528517041ba9..ae4cac54aa530 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -78,7 +78,7 @@ type EngineQueue struct { engine Engine prev NextAttributesProvider - progress Progress // only used for pipeline resets + origin eth.L1BlockRef // only used for pipeline resets metrics Metrics l1Fetcher L1Fetcher @@ -101,8 +101,8 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M } } -func (eq *EngineQueue) Progress() Progress { - return eq.progress +func (eq *EngineQueue) Origin() eth.L1BlockRef { + return eq.origin } func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) { @@ -433,9 +433,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error { eq.finalized = finalized eq.finalityData = eq.finalityData[:0] // note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads. - eq.progress = Progress{ - Origin: pipelineOrigin, - } + eq.origin = pipelineOrigin eq.metrics.RecordL2Ref("l2_finalized", finalized) eq.metrics.RecordL2Ref("l2_safe", safe) eq.metrics.RecordL2Ref("l2_unsafe", unsafe) diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go index d0edea4f1f7e5..bc09bad092b4d 100644 --- a/op-node/rollup/derive/engine_queue_test.go +++ b/op-node/rollup/derive/engine_queue_test.go @@ -234,17 +234,17 @@ func TestEngineQueue_Finalize(t *testing.T) { require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}), io.EOF) require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") - require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B") + require.Equal(t, refB, eq.Origin(), "Expecting to be set back derivation L1 progress to B") require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps") // now say C1 was included in D and became the new safe head - eq.progress.Origin = refD + eq.origin = refD prev.origin = refD eq.safeHead = refC1 eq.postProcessSafeL2() // now say D0 was included in E and became the new safe head - eq.progress.Origin = refE + eq.origin = refE prev.origin = refE eq.safeHead = refD0 eq.postProcessSafeL2() diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index d9c1c661d7062..2d34c8d2ddb43 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -33,7 +33,7 @@ type EngineQueueStage interface { Finalized() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef SafeL2Head() eth.L2BlockRef - Progress() Progress + Origin() eth.L1BlockRef SetUnsafeHead(head eth.L2BlockRef) Finalize(l1Origin eth.BlockID) @@ -96,8 +96,8 @@ func (dp *DerivationPipeline) Reset() { dp.resetting = 0 } -func (dp *DerivationPipeline) Progress() Progress { - return dp.eng.Progress() +func (dp *DerivationPipeline) Origin() eth.L1BlockRef { + return dp.eng.Origin() } func (dp *DerivationPipeline) Finalize(l1Origin eth.BlockID) { @@ -133,12 +133,12 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) { // An error is expected when the underlying source closes. // When Step returns nil, it should be called again, to continue the derivation process. func (dp *DerivationPipeline) Step(ctx context.Context) error { - defer dp.metrics.RecordL1Ref("l1_derived", dp.Progress().Origin) + defer dp.metrics.RecordL1Ref("l1_derived", dp.Origin()) // if any stages need to be reset, do that first. if dp.resetting < len(dp.stages) { - if err := dp.stages[dp.resetting].Reset(ctx, dp.eng.Progress().Origin); err == io.EOF { - dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.eng.Progress().Origin) + if err := dp.stages[dp.resetting].Reset(ctx, dp.eng.Origin()); err == io.EOF { + dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.eng.Origin()) dp.resetting += 1 return nil } else if err != nil { diff --git a/op-node/rollup/derive/pipeline_test.go b/op-node/rollup/derive/pipeline_test.go index 926518fe29127..995b479a695dc 100644 --- a/op-node/rollup/derive/pipeline_test.go +++ b/op-node/rollup/derive/pipeline_test.go @@ -1,49 +1,9 @@ package derive -import ( - "context" - "io" - "testing" - - "github.com/ethereum-optimism/optimism/op-node/testutils" -) +import "github.com/ethereum-optimism/optimism/op-node/testutils" var _ Engine = (*testutils.MockEngine)(nil) var _ L1Fetcher = (*testutils.MockL1Source)(nil) -// RepeatResetStep is a test util that will repeat the ResetStep function until an error. -// If the step runs too many times, it will fail the test. -func RepeatResetStep(t *testing.T, step func(ctx context.Context, l1Fetcher L1Fetcher) error, l1Fetcher L1Fetcher, max int) error { - ctx := context.Background() - for i := 0; i < max; i++ { - err := step(ctx, l1Fetcher) - if err == io.EOF { - return nil - } - if err != nil { - return err - } - } - t.Fatal("ran out of steps") - return nil -} - -// RepeatStep is a test util that will repeat the Step function until an error. -// If the step runs too many times, it will fail the test. -func RepeatStep(t *testing.T, step func(ctx context.Context, outer Progress) error, outer Progress, max int) error { - ctx := context.Background() - for i := 0; i < max; i++ { - err := step(ctx, outer) - if err == io.EOF { - return nil - } - if err != nil { - return err - } - } - t.Fatal("ran out of steps") - return nil -} - var _ Metrics = (*testutils.TestDerivationMetrics)(nil) diff --git a/op-node/rollup/derive/progress.go b/op-node/rollup/derive/progress.go deleted file mode 100644 index b6915b998548b..0000000000000 --- a/op-node/rollup/derive/progress.go +++ /dev/null @@ -1,46 +0,0 @@ -package derive - -import ( - "fmt" - - "github.com/ethereum-optimism/optimism/op-node/eth" -) - -// Progress represents the progress of a derivation stage: -// the input L1 block that is being processed, and whether it's fully processed yet. -type Progress struct { - Origin eth.L1BlockRef - // Closed means that the Current has no more data that the stage may need. - Closed bool -} - -func (pr *Progress) Update(outer Progress) (changed bool, err error) { - if outer.Origin.Number < pr.Origin.Number { - return false, nil - } - if pr.Closed { - if outer.Closed { - if pr.Origin.ID() != outer.Origin.ID() { - return true, NewResetError(fmt.Errorf("outer stage changed origin from %s to %s without opening it", pr.Origin, outer.Origin)) - } - return false, nil - } else { - if pr.Origin.Hash != outer.Origin.ParentHash { - return true, NewResetError(fmt.Errorf("detected internal pipeline reorg of L1 origin data from %s to %s", pr.Origin, outer.Origin)) - } - pr.Origin = outer.Origin - pr.Closed = false - return true, nil - } - } else { - if pr.Origin.ID() != outer.Origin.ID() { - return true, NewResetError(fmt.Errorf("outer stage changed origin from %s to %s before closing it", pr.Origin, outer.Origin)) - } - if outer.Closed { - pr.Closed = true - return true, nil - } else { - return false, nil - } - } -} diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index ad21915d3e9e7..5ee4d176d2706 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -59,7 +59,7 @@ type DerivationPipeline interface { Finalized() eth.L2BlockRef SafeL2Head() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef - Progress() derive.Progress + Origin() eth.L1BlockRef } type outputInterface interface { diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index 3538cb6d8344f..1abd2a2a6f105 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -416,13 +416,13 @@ func (s *state) eventLoop() { case <-stepReqCh: s.metrics.SetDerivationIdle(false) s.idleDerivation = false - s.log.Debug("Derivation process step", "onto_origin", s.derivation.Progress().Origin, "onto_closed", s.derivation.Progress().Closed, "attempts", stepAttempts) + s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts) stepCtx, cancel := context.WithTimeout(ctx, time.Second*10) // TODO pick a timeout for executing a single step err := s.derivation.Step(stepCtx) cancel() stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress. if err == io.EOF { - s.log.Debug("Derivation process went idle", "progress", s.derivation.Progress().Origin) + s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin()) s.idleDerivation = true stepAttempts = 0 s.metrics.SetDerivationIdle(true) @@ -454,7 +454,7 @@ func (s *state) eventLoop() { } case respCh := <-s.syncStatusReq: respCh <- eth.SyncStatus{ - CurrentL1: s.derivation.Progress().Origin, + CurrentL1: s.derivation.Origin(), HeadL1: s.l1Head, SafeL1: s.l1Safe, FinalizedL1: s.l1Finalized, @@ -520,7 +520,7 @@ func (s *state) snapshot(event string) { s.snapshotLog.Info("Rollup State Snapshot", "event", event, "l1Head", deferJSONString{s.l1Head}, - "l1Current", deferJSONString{s.derivation.Progress().Origin}, + "l1Current", deferJSONString{s.derivation.Origin()}, "l2Head", deferJSONString{s.derivation.UnsafeL2Head()}, "l2Safe", deferJSONString{s.derivation.SafeL2Head()}, "l2FinalizedHead", deferJSONString{s.derivation.Finalized()})