diff --git a/.circleci/config.yml b/.circleci/config.yml index 742cfc39714ab..340f108fd8cdf 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1981,8 +1981,8 @@ workflows: - contracts-bedrock-build - go-tests: name: go-tests-short - no_output_timeout: 19m - test_timeout: 20m + no_output_timeout: 29m + test_timeout: 30m requires: - contracts-bedrock-build - cannon-prestate-quick diff --git a/op-e2e/actions/helpers/l2_verifier.go b/op-e2e/actions/helpers/l2_verifier.go index f5eb7872adb6e..63b774795fe0a 100644 --- a/op-e2e/actions/helpers/l2_verifier.go +++ b/op-e2e/actions/helpers/l2_verifier.go @@ -147,8 +147,14 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, metrics := &testutils.TestDerivationMetrics{} ec := engine.NewEngineController(ctx, eng, log, opnodemetrics.NoopMetrics, cfg, syncCfg, sys.Register("engine-controller", nil, opts)) - sys.Register("engine-reset", - engine.NewEngineResetDeriver(ctx, log, cfg, l1, eng, syncCfg), opts) + if mm, ok := interopSys.(*indexing.IndexingMode); ok { + mm.SetForceResetNotifier(ec) + } + + engineResetDeriver := engine.NewEngineResetDeriver(ctx, log, cfg, l1, eng, syncCfg) + sys.Register("engine-reset", engineResetDeriver, opts) + // TODO(#17061): Refactor dependency cycles + engineResetDeriver.SetEngController(ec) clSync := clsync.NewCLSync(log, cfg, metrics, ec) sys.Register("cl-sync", clSync, opts) @@ -163,10 +169,13 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, attrHandler := attributes.NewAttributesHandler(log, cfg, ctx, eng, ec) sys.Register("attributes-handler", attrHandler, opts) + ec.SetAttributesResetter(attrHandler) indexingMode := interopSys != nil pipeline := derive.NewDerivationPipeline(log, cfg, depSet, l1, blobsSrc, altDASrc, eng, metrics, indexingMode) - sys.Register("pipeline", derive.NewPipelineDeriver(ctx, pipeline), opts) + pipelineDeriver := derive.NewPipelineDeriver(ctx, pipeline) + sys.Register("pipeline", pipelineDeriver, opts) + ec.SetPipelineResetter(pipelineDeriver) testActionEmitter := sys.Register("test-action", nil, opts) diff --git a/op-node/node/node.go b/op-node/node/node.go index 0e09fca50afee..5d7caadfa62df 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -462,6 +462,14 @@ func (n *OpNode) initL2(ctx context.Context, cfg *config.Config) error { n.l2Driver = driver.NewDriver(n.eventSys, n.eventDrain, &cfg.Driver, &cfg.Rollup, cfg.DependencySet, n.l2Source, n.l1Source, n.beacon, n, n, n.log, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, altDA, indexingMode) + + // Wire up IndexingMode to engine controller for direct force reset notifications + if n.interopSys != nil { + if indexingMode, ok := n.interopSys.(*indexing.IndexingMode); ok { + indexingMode.SetForceResetNotifier(n.l2Driver.SyncDeriver.Engine) + } + } + return nil } diff --git a/op-node/rollup/attributes/attributes.go b/op-node/rollup/attributes/attributes.go index 4b08d67ad37be..45519eb04de8f 100644 --- a/op-node/rollup/attributes/attributes.go +++ b/op-node/rollup/attributes/attributes.go @@ -69,6 +69,17 @@ func (eq *AttributesHandler) AttachEmitter(em event.Emitter) { eq.emitter = em } +func (eq *AttributesHandler) forceResetLocked() { + eq.sentAttributes = false + eq.attributes = nil +} + +func (eq *AttributesHandler) ForceReset(ctx context.Context, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef) { + eq.mu.Lock() + defer eq.mu.Unlock() + eq.forceResetLocked() +} + func (eq *AttributesHandler) OnEvent(ctx context.Context, ev event.Event) bool { // Events may be concurrent in the future. Prevent unsafe concurrent modifications to the attributes. eq.mu.Lock() @@ -83,9 +94,8 @@ func (eq *AttributesHandler) OnEvent(ctx context.Context, ev event.Event) bool { eq.emitter.Emit(ctx, derive.ConfirmReceivedAttributesEvent{}) // to make sure we have a pre-state signal to process the attributes from eq.emitter.Emit(ctx, engine.PendingSafeRequestEvent{}) - case rollup.ResetEvent, rollup.ForceResetEvent: - eq.sentAttributes = false - eq.attributes = nil + case rollup.ResetEvent: + eq.forceResetLocked() case rollup.EngineTemporaryErrorEvent: eq.sentAttributes = false case engine.InvalidPayloadAttributesEvent: diff --git a/op-node/rollup/derive/deriver.go b/op-node/rollup/derive/deriver.go index a76802b31a3fd..042b7a9e4a7e3 100644 --- a/op-node/rollup/derive/deriver.go +++ b/op-node/rollup/derive/deriver.go @@ -121,10 +121,12 @@ func (d *PipelineDeriver) AttachEmitter(em event.Emitter) { d.emitter = em } +func (d *PipelineDeriver) ResetPipeline() { + d.pipeline.Reset() +} + func (d *PipelineDeriver) OnEvent(ctx context.Context, ev event.Event) bool { switch x := ev.(type) { - case rollup.ForceResetEvent: - d.pipeline.Reset() case PipelineStepEvent: // Don't generate attributes if there are already attributes in-flight if d.needAttributesConfirmation { diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index a593dd1bc94cf..d8f718e1347a7 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -138,7 +138,8 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, depSet Depe // DerivationReady returns true if the derivation pipeline is ready to be used. // When it's being reset its state is inconsistent, and should not be used externally. func (dp *DerivationPipeline) DerivationReady() bool { - return dp.engineIsReset && dp.resetting > 0 + // Ready only when the engine has been confirmed reset and all stages finished resetting + return dp.engineIsReset && dp.resetting >= len(dp.stages) } func (dp *DerivationPipeline) Reset() { diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index a3aeb9bc350df..8d77bb3fec4eb 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -194,8 +194,9 @@ func NewDriver( // TODO(#17115): Refactor dependency cycles ec.SetCrossUpdateHandler(statusTracker) - sys.Register("engine-reset", - engine.NewEngineResetDeriver(driverCtx, log, cfg, l1, l2, syncCfg)) + engineReset := engine.NewEngineResetDeriver(driverCtx, log, cfg, l1, l2, syncCfg) + engineReset.SetEngController(ec) + sys.Register("engine-reset", engineReset) clSync := clsync.NewCLSync(log, cfg, metrics, ec) // alt-sync still uses cl-sync state to determine what to sync to sys.Register("cl-sync", clSync) @@ -213,8 +214,12 @@ func NewDriver( derivationPipeline := derive.NewDerivationPipeline(log, cfg, depSet, verifConfDepth, l1Blobs, altDA, l2, metrics, indexingMode) - sys.Register("pipeline", - derive.NewPipelineDeriver(driverCtx, derivationPipeline)) + pipelineDeriver := derive.NewPipelineDeriver(driverCtx, derivationPipeline) + sys.Register("pipeline", pipelineDeriver) + + // Connect components that need force reset notifications to the engine controller + ec.SetAttributesResetter(attrHandler) + ec.SetPipelineResetter(pipelineDeriver) schedDeriv := NewStepSchedulingDeriver(log) sys.Register("step-scheduler", schedDeriv) @@ -248,6 +253,10 @@ func NewDriver( sequencerConfDepth := confdepth.NewConfDepth(driverCfg.SequencerConfDepth, statusTracker.L1Head, l1) findL1Origin := sequencing.NewL1OriginSelector(driverCtx, log, cfg, sequencerConfDepth) sys.Register("origin-selector", findL1Origin) + + // Connect origin selector to the engine controller for force reset notifications + ec.SetOriginSelectorResetter(findL1Origin) + sequencer = sequencing.NewSequencer(driverCtx, log, cfg, attrBuilder, findL1Origin, sequencerStateListener, sequencerConductor, asyncGossiper, metrics, ec) sys.Register("sequencer", sequencer) diff --git a/op-node/rollup/engine/engine_controller.go b/op-node/rollup/engine/engine_controller.go index bcfa8f3d696b1..aa7185655260c 100644 --- a/op-node/rollup/engine/engine_controller.go +++ b/op-node/rollup/engine/engine_controller.go @@ -50,6 +50,18 @@ type SyncDeriver interface { OnELSyncStarted() } +type AttributesForceResetter interface { + ForceReset(ctx context.Context, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef) +} + +type PipelineForceResetter interface { + ResetPipeline() +} + +type OriginSelectorForceResetter interface { + ResetOrigins() +} + // CrossUpdateHandler handles both cross-unsafe and cross-safe L2 head changes. // Nil check required because op-program omits this handler. type CrossUpdateHandler interface { @@ -110,6 +122,11 @@ type EngineController struct { // Embed SyncDeriver into EngineController after initializing SyncDeriver SyncDeriver SyncDeriver + // Components that need to be notified during force reset + attributesResetter AttributesForceResetter + pipelineResetter PipelineForceResetter + originSelectorResetter OriginSelectorForceResetter + // Handler for cross-unsafe and cross-safe updates crossUpdateHandler CrossUpdateHandler } @@ -166,8 +183,6 @@ func (e *EngineController) BackupUnsafeL2Head() eth.L2BlockRef { return e.backupUnsafeHead } -// RequestForkchoiceUpdate implements attributes.EngineController. -// It reads the current heads under a read lock and emits a ForkchoiceUpdateEvent. func (e *EngineController) RequestForkchoiceUpdate(ctx context.Context) { e.mu.RLock() unsafe := e.UnsafeL2Head() @@ -667,29 +682,6 @@ func (d *EngineController) OnEvent(ctx context.Context, ev event.Event) bool { } else { d.log.Info("successfully processed payload", "ref", ref, "txs", len(x.Envelope.ExecutionPayload.Transactions)) } - case rollup.ForceResetEvent: - ForceEngineReset(d, x) - - // Time to apply the changes to the underlying engine - d.TryUpdateEngine(ctx) - - v := EngineResetConfirmedEvent{ - LocalUnsafe: d.UnsafeL2Head(), - CrossUnsafe: d.CrossUnsafeL2Head(), - LocalSafe: d.LocalSafeL2Head(), - CrossSafe: d.SafeL2Head(), - Finalized: d.Finalized(), - } - // We do not emit the original event values, since those might not be set (optional attributes). - d.emitter.Emit(ctx, v) - d.log.Info("Reset of Engine is completed", - "local_unsafe", v.LocalUnsafe, - "cross_unsafe", v.CrossUnsafe, - "local_safe", v.LocalSafe, - "cross_safe", v.CrossSafe, - "finalized", v.Finalized, - ) - case UnsafeUpdateEvent: // pre-interop everything that is local-unsafe is also immediately cross-unsafe. if !d.rollupCfg.IsInterop(x.Ref.Time) { @@ -796,3 +788,59 @@ func (e *EngineController) TryUpdateUnsafe(ctx context.Context, ref eth.L2BlockR e.SetUnsafeHead(ref) e.emitter.Emit(ctx, UnsafeUpdateEvent{Ref: ref}) } + +// SetAttributesResetter sets the attributes component that needs force reset notifications +func (e *EngineController) SetAttributesResetter(resetter AttributesForceResetter) { + e.attributesResetter = resetter +} + +// SetPipelineResetter sets the pipeline component that needs force reset notifications +func (e *EngineController) SetPipelineResetter(resetter PipelineForceResetter) { + e.pipelineResetter = resetter +} + +// SetOriginSelectorResetter sets the origin selector component that needs force reset notifications +func (e *EngineController) SetOriginSelectorResetter(resetter OriginSelectorForceResetter) { + e.originSelectorResetter = resetter +} + +// ForceReset performs a forced reset to the specified block references +func (e *EngineController) ForceReset(ctx context.Context, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef) { + // Reset other components before resetting the engine + if e.attributesResetter != nil { + e.attributesResetter.ForceReset(ctx, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized) + } + if e.pipelineResetter != nil { + e.pipelineResetter.ResetPipeline() + } + // originSelectorResetter is only present when sequencing is enabled + if e.originSelectorResetter != nil { + e.originSelectorResetter.ResetOrigins() + } + + ForceEngineReset(e, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized) + + if e.pipelineResetter != nil { + e.emitter.Emit(ctx, derive.ConfirmPipelineResetEvent{}) + } + + // Time to apply the changes to the underlying engine + e.TryUpdateEngine(ctx) + + v := EngineResetConfirmedEvent{ + LocalUnsafe: e.UnsafeL2Head(), + CrossUnsafe: e.CrossUnsafeL2Head(), + LocalSafe: e.LocalSafeL2Head(), + CrossSafe: e.SafeL2Head(), + Finalized: e.Finalized(), + } + // We do not emit the original event values, since those might not be set (optional attributes). + e.emitter.Emit(ctx, v) + e.log.Info("Reset of Engine is completed", + "local_unsafe", v.LocalUnsafe, + "cross_unsafe", v.CrossUnsafe, + "local_safe", v.LocalSafe, + "cross_safe", v.CrossSafe, + "finalized", v.Finalized, + ) +} diff --git a/op-node/rollup/engine/engine_reset.go b/op-node/rollup/engine/engine_reset.go index 9fcfe25f57c81..7a1e327737c5d 100644 --- a/op-node/rollup/engine/engine_reset.go +++ b/op-node/rollup/engine/engine_reset.go @@ -31,6 +31,8 @@ type EngineResetDeriver struct { syncCfg *sync.Config emitter event.Emitter + + engController *EngineController } func NewEngineResetDeriver(ctx context.Context, log log.Logger, cfg *rollup.Config, @@ -45,6 +47,10 @@ func NewEngineResetDeriver(ctx context.Context, log log.Logger, cfg *rollup.Conf } } +func (d *EngineResetDeriver) SetEngController(engController *EngineController) { + d.engController = engController +} + func (d *EngineResetDeriver) AttachEmitter(em event.Emitter) { d.emitter = em } @@ -59,13 +65,7 @@ func (d *EngineResetDeriver) OnEvent(ctx context.Context, ev event.Event) bool { }) return true } - d.emitter.Emit(ctx, rollup.ForceResetEvent{ - LocalUnsafe: result.Unsafe, - CrossUnsafe: result.Unsafe, - LocalSafe: result.Safe, - CrossSafe: result.Safe, - Finalized: result.Finalized, - }) + d.engController.ForceReset(ctx, result.Unsafe, result.Unsafe, result.Safe, result.Safe, result.Finalized) default: return false } diff --git a/op-node/rollup/engine/events.go b/op-node/rollup/engine/events.go index 1bac262ce70b1..aa984978d94ad 100644 --- a/op-node/rollup/engine/events.go +++ b/op-node/rollup/engine/events.go @@ -3,7 +3,6 @@ package engine import ( "github.com/ethereum/go-ethereum/common" - "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/eth" ) @@ -162,21 +161,21 @@ type ResetEngineControl interface { SetPendingSafeL2Head(eth.L2BlockRef) } -func ForceEngineReset(ec ResetEngineControl, x rollup.ForceResetEvent) { - ec.SetUnsafeHead(x.LocalUnsafe) +func ForceEngineReset(ec ResetEngineControl, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef) { + ec.SetUnsafeHead(localUnsafe) // cross-safe is fine to revert back, it does not affect engine logic, just sync-status - ec.SetCrossUnsafeHead(x.CrossUnsafe) + ec.SetCrossUnsafeHead(crossUnsafe) // derivation continues at local-safe point - ec.SetLocalSafeHead(x.LocalSafe) - ec.SetPendingSafeL2Head(x.LocalSafe) + ec.SetLocalSafeHead(localSafe) + ec.SetPendingSafeL2Head(localSafe) // "safe" in RPC terms is cross-safe - ec.SetSafeHead(x.CrossSafe) + ec.SetSafeHead(crossSafe) // finalized head - ec.SetFinalizedHead(x.Finalized) + ec.SetFinalizedHead(finalized) ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false) } diff --git a/op-node/rollup/engine/payload_success.go b/op-node/rollup/engine/payload_success.go index 61bce0745d559..3c4dce977e0ab 100644 --- a/op-node/rollup/engine/payload_success.go +++ b/op-node/rollup/engine/payload_success.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-service/eth" ) @@ -29,13 +28,7 @@ func (eq *EngineController) onPayloadSuccess(ctx context.Context, ev PayloadSucc eq.log.Warn("Successfully built replacement block, resetting chain to continue now", "replacement", ev.Ref) // Change the engine state to make the replacement block the cross-safe head of the chain, // And continue syncing from there. - eq.emitter.Emit(ctx, rollup.ForceResetEvent{ - LocalUnsafe: ev.Ref, - CrossUnsafe: ev.Ref, - LocalSafe: ev.Ref, - CrossSafe: ev.Ref, - Finalized: eq.Finalized(), - }) + eq.ForceReset(ctx, ev.Ref, ev.Ref, ev.Ref, ev.Ref, eq.Finalized()) eq.emitter.Emit(ctx, InteropReplacedBlockEvent{ Envelope: ev.Envelope, Ref: ev.Ref.BlockRef(), diff --git a/op-node/rollup/event.go b/op-node/rollup/event.go index efd5f40cc0347..4e268240fd422 100644 --- a/op-node/rollup/event.go +++ b/op-node/rollup/event.go @@ -1,7 +1,6 @@ package rollup import ( - "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/event" ) @@ -40,17 +39,5 @@ func (ev ResetEvent) String() string { return "reset-event" } -// ForceResetEvent forces a reset to a specific local-unsafe/local-safe/finalized starting point. -// Resets may override local-unsafe, to reset the very end of the chain. -// Resets may override local-safe, since post-interop we need the local-safe block derivation to continue. -// Pre-interop both local and cross values should be set the same. -type ForceResetEvent struct { - LocalUnsafe, CrossUnsafe, LocalSafe, CrossSafe, Finalized eth.L2BlockRef -} - -func (ev ForceResetEvent) String() string { - return "force-reset" -} - // CriticalErrorEvent is an alias for event.CriticalErrorEvent type CriticalErrorEvent = event.CriticalErrorEvent diff --git a/op-node/rollup/interop/indexing/system.go b/op-node/rollup/interop/indexing/system.go index e316fa9ccd424..fc369b1554174 100644 --- a/op-node/rollup/interop/indexing/system.go +++ b/op-node/rollup/interop/indexing/system.go @@ -46,6 +46,10 @@ type L1Source interface { L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1BlockRef, error) } +type ForceResetNotifier interface { + ForceReset(ctx context.Context, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef) +} + // IndexingMode makes the op-node managed by an op-supervisor, // by serving sync work and updating the canonical chain based on instructions. type IndexingMode struct { @@ -53,6 +57,8 @@ type IndexingMode struct { emitter event.Emitter + forceResetNotifier ForceResetNotifier + l1 L1Source l2 L2Source @@ -111,6 +117,10 @@ func NewIndexingMode(log log.Logger, cfg *rollup.Config, addr string, port int, return out } +func (m *IndexingMode) SetForceResetNotifier(notifier ForceResetNotifier) { + m.forceResetNotifier = notifier +} + // TestDisableEventDeduplication is a test-only function that disables event deduplication. // It is necessary to make action tests work. func (m *IndexingMode) TestDisableEventDeduplication() { @@ -450,13 +460,7 @@ func (m *IndexingMode) Reset(ctx context.Context, lUnsafe, xUnsafe, lSafe, xSafe return err } - m.emitter.Emit(ctx, rollup.ForceResetEvent{ - LocalUnsafe: lUnsafeRef, - CrossUnsafe: xUnsafeRef, - LocalSafe: lSafeRef, - CrossSafe: xSafeRef, - Finalized: finalizedRef, - }) + m.forceResetNotifier.ForceReset(ctx, lUnsafeRef, xUnsafeRef, lSafeRef, xSafeRef, finalizedRef) return nil } diff --git a/op-node/rollup/sequencing/origin_selector.go b/op-node/rollup/sequencing/origin_selector.go index 938c16affe4d5..255522d46441e 100644 --- a/op-node/rollup/sequencing/origin_selector.go +++ b/op-node/rollup/sequencing/origin_selector.go @@ -54,12 +54,16 @@ func (los *L1OriginSelector) SetRecoverMode(enabled bool) { los.recoverMode.Store(enabled) } +func (los *L1OriginSelector) ResetOrigins() { + los.reset() +} + func (los *L1OriginSelector) OnEvent(ctx context.Context, ev event.Event) bool { switch x := ev.(type) { case engine.ForkchoiceUpdateEvent: los.onForkchoiceUpdate(x.UnsafeL2Head) - case rollup.ResetEvent, rollup.ForceResetEvent: - los.reset() + case rollup.ResetEvent: + los.ResetOrigins() default: return false } diff --git a/op-program/client/driver/driver.go b/op-program/client/driver/driver.go index e6c8eb5da37e1..6751013e37a0d 100644 --- a/op-program/client/driver/driver.go +++ b/op-program/client/driver/driver.go @@ -10,6 +10,7 @@ import ( altda "github.com/ethereum-optimism/optimism/op-alt-da" "github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/attributes" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/sync" @@ -45,8 +46,14 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, depSet derive.DependencySe ec := engine.NewEngineController(context.Background(), l2Source, logger, metrics.NoopMetrics, cfg, &sync.Config{SyncMode: sync.CLSync}, d) syncCfg := &sync.Config{SyncMode: sync.CLSync} + + attrHandler := attributes.NewAttributesHandler(logger, cfg, context.Background(), l2Source, ec) + ec.SetAttributesResetter(attrHandler) + ec.SetPipelineResetter(pipelineDeriver) + engResetDeriv := engine.NewEngineResetDeriver(context.Background(), logger, cfg, l1Source, l2Source, syncCfg) engResetDeriv.AttachEmitter(d) + engResetDeriv.SetEngController(ec) prog := &ProgramDeriver{ logger: logger,