diff --git a/op-e2e/actions/helpers/l2_verifier.go b/op-e2e/actions/helpers/l2_verifier.go index 60bc7e585d438..de2a28afc39f8 100644 --- a/op-e2e/actions/helpers/l2_verifier.go +++ b/op-e2e/actions/helpers/l2_verifier.go @@ -148,7 +148,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, ec := engine.NewEngineController(ctx, eng, log, opnodemetrics.NoopMetrics, cfg, syncCfg, sys.Register("engine-controller", nil, opts)) if mm, ok := interopSys.(*indexing.IndexingMode); ok { - mm.SetForceResetNotifier(ec) + mm.SetEngineController(ec) } engineResetDeriver := engine.NewEngineResetDeriver(ctx, log, cfg, l1, eng, syncCfg) diff --git a/op-e2e/actions/sync/sync_test.go b/op-e2e/actions/sync/sync_test.go index 7508a99f2ae38..9705777647a6d 100644 --- a/op-e2e/actions/sync/sync_test.go +++ b/op-e2e/actions/sync/sync_test.go @@ -1052,7 +1052,6 @@ func TestSpanBatchAtomicity_Consolidation(gt *testing.T) { require.Equal(t, verifier.L2Safe().Number, uint64(0)) } else { // Make sure we do the post-processing of what safety updates might happen - // Digest events until EngDeriver implicitly consumes PromoteSafeEvent verifier.ActL2PipelineFull(t) // Once the span batch is fully processed, the safe head must advance to the end of span batch. require.Equal(t, verifier.L2Safe().Number, targetHeadNumber) diff --git a/op-node/node/node.go b/op-node/node/node.go index 5d7caadfa62df..f8671346e9989 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -463,10 +463,10 @@ func (n *OpNode) initL2(ctx context.Context, cfg *config.Config) error { n.l2Driver = driver.NewDriver(n.eventSys, n.eventDrain, &cfg.Driver, &cfg.Rollup, cfg.DependencySet, n.l2Source, n.l1Source, n.beacon, n, n, n.log, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, altDA, indexingMode) - // Wire up IndexingMode to engine controller for direct force reset notifications + // Wire up IndexingMode to engine controller for direct procedure call if n.interopSys != nil { if indexingMode, ok := n.interopSys.(*indexing.IndexingMode); ok { - indexingMode.SetForceResetNotifier(n.l2Driver.SyncDeriver.Engine) + indexingMode.SetEngineController(n.l2Driver.SyncDeriver.Engine) } } diff --git a/op-node/rollup/engine/engine_controller.go b/op-node/rollup/engine/engine_controller.go index aa7185655260c..e34178571c2c5 100644 --- a/op-node/rollup/engine/engine_controller.go +++ b/op-node/rollup/engine/engine_controller.go @@ -650,6 +650,7 @@ func (d *EngineController) OnEvent(ctx context.Context, ev event.Event) bool { defer d.mu.Unlock() // TODO(#16917) Remove Event System Refactor Comments // PromoteUnsafeEvent, PromotePendingSafeEvent, PromoteLocalSafeEvent fan out is updated to procedural + // PromoteSafeEvent fan out is updated to procedural PromoteSafe method call switch x := ev.(type) { case ProcessUnsafePayloadEvent: ref, err := derive.PayloadToBlockRef(d.rollupCfg, x.Envelope.ExecutionPayload) @@ -699,23 +700,10 @@ func (d *EngineController) OnEvent(ctx context.Context, ev event.Event) bool { }) case LocalSafeUpdateEvent: - // pre-interop everything that is local-unsafe is also immediately cross-unsafe. + // pre-interop everything that is local-safe is also immediately cross-safe. if !d.rollupCfg.IsInterop(x.Ref.Time) { - d.emitter.Emit(ctx, PromoteSafeEvent(x)) - } - case PromoteSafeEvent: - d.log.Debug("Updating safe", "safe", x.Ref, "unsafe", d.UnsafeL2Head()) - d.SetSafeHead(x.Ref) - // Finalizer can pick up this safe cross-block now - d.emitter.Emit(ctx, SafeDerivedEvent{Safe: x.Ref, Source: x.Source}) - d.onSafeUpdate(ctx, d.SafeL2Head(), d.LocalSafeL2Head()) - if x.Ref.Number > d.crossUnsafeHead.Number { - d.log.Debug("Cross Unsafe Head is stale, updating to match cross safe", "cross_unsafe", d.crossUnsafeHead, "cross_safe", x.Ref) - d.SetCrossUnsafeHead(x.Ref) - d.onUnsafeUpdate(ctx, x.Ref, d.UnsafeL2Head()) + d.PromoteSafe(ctx, x.Ref, x.Source) } - // Try to apply the forkchoice changes - d.TryUpdateEngine(ctx) case PromoteFinalizedEvent: if x.Ref.Number < d.Finalized().Number { d.log.Error("Cannot rewind finality,", "ref", x.Ref, "finalized", d.Finalized()) @@ -789,6 +777,21 @@ func (e *EngineController) TryUpdateUnsafe(ctx context.Context, ref eth.L2BlockR e.emitter.Emit(ctx, UnsafeUpdateEvent{Ref: ref}) } +func (e *EngineController) PromoteSafe(ctx context.Context, ref eth.L2BlockRef, source eth.L1BlockRef) { + e.log.Debug("Updating safe", "safe", ref, "unsafe", e.UnsafeL2Head()) + e.SetSafeHead(ref) + // Finalizer can pick up this safe cross-block now + e.emitter.Emit(ctx, SafeDerivedEvent{Safe: ref, Source: source}) + e.onSafeUpdate(ctx, e.SafeL2Head(), e.LocalSafeL2Head()) + if ref.Number > e.crossUnsafeHead.Number { + e.log.Debug("Cross Unsafe Head is stale, updating to match cross safe", "cross_unsafe", e.crossUnsafeHead, "cross_safe", ref) + e.SetCrossUnsafeHead(ref) + e.onUnsafeUpdate(ctx, ref, e.UnsafeL2Head()) + } + // Try to apply the forkchoice changes + e.TryUpdateEngine(ctx) +} + // SetAttributesResetter sets the attributes component that needs force reset notifications func (e *EngineController) SetAttributesResetter(resetter AttributesForceResetter) { e.attributesResetter = resetter diff --git a/op-node/rollup/engine/events.go b/op-node/rollup/engine/events.go index aa984978d94ad..04dd405f83fe9 100644 --- a/op-node/rollup/engine/events.go +++ b/op-node/rollup/engine/events.go @@ -65,18 +65,8 @@ func (ev LocalSafeUpdateEvent) String() string { return "local-safe-update" } -// PromoteSafeEvent signals that a block can be promoted to cross-safe. -type PromoteSafeEvent struct { - Ref eth.L2BlockRef - Source eth.L1BlockRef -} - -func (ev PromoteSafeEvent) String() string { - return "promote-safe" -} - // SafeDerivedEvent signals that a block was determined to be safe, and derived from the given L1 block. -// This is signaled upon successful processing of PromoteSafeEvent. +// This is signaled upon procedural call of PromoteSafe method type SafeDerivedEvent struct { Safe eth.L2BlockRef Source eth.L1BlockRef diff --git a/op-node/rollup/interop/indexing/system.go b/op-node/rollup/interop/indexing/system.go index fc369b1554174..4c6dc489e22a1 100644 --- a/op-node/rollup/interop/indexing/system.go +++ b/op-node/rollup/interop/indexing/system.go @@ -45,9 +45,9 @@ type L1Source interface { L1BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L1BlockRef, error) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1BlockRef, error) } - -type ForceResetNotifier interface { +type EngineController interface { ForceReset(ctx context.Context, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef) + PromoteSafe(ctx context.Context, ref eth.L2BlockRef, source eth.L1BlockRef) } // IndexingMode makes the op-node managed by an op-supervisor, @@ -57,8 +57,6 @@ type IndexingMode struct { emitter event.Emitter - forceResetNotifier ForceResetNotifier - l1 L1Source l2 L2Source @@ -79,6 +77,8 @@ type IndexingMode struct { srv *rpc.Server jwtSecret eth.Bytes32 + + engineController EngineController } func NewIndexingMode(log log.Logger, cfg *rollup.Config, addr string, port int, jwtSecret eth.Bytes32, l1 L1Source, l2 L2Source, m opmetrics.RPCMetricer) *IndexingMode { @@ -117,8 +117,8 @@ func NewIndexingMode(log log.Logger, cfg *rollup.Config, addr string, port int, return out } -func (m *IndexingMode) SetForceResetNotifier(notifier ForceResetNotifier) { - m.forceResetNotifier = notifier +func (m *IndexingMode) SetEngineController(engineController EngineController) { + m.engineController = engineController } // TestDisableEventDeduplication is a test-only function that disables event deduplication. @@ -298,12 +298,7 @@ func (m *IndexingMode) UpdateCrossSafe(ctx context.Context, derived eth.BlockID, if err != nil { return fmt.Errorf("failed to get L1BlockRef: %w", err) } - m.emitter.Emit(m.ctx, engine.PromoteSafeEvent{ - Ref: l2Ref, - Source: l1Ref, - }) - // We return early: there is no point waiting for the cross-safe engine-update synchronously. - // All error-feedback comes to the supervisor by aborting derivation tasks with an error. + m.engineController.PromoteSafe(ctx, l2Ref, l1Ref) return nil } @@ -460,7 +455,7 @@ func (m *IndexingMode) Reset(ctx context.Context, lUnsafe, xUnsafe, lSafe, xSafe return err } - m.forceResetNotifier.ForceReset(ctx, lUnsafeRef, xUnsafeRef, lSafeRef, xSafeRef, finalizedRef) + m.engineController.ForceReset(ctx, lUnsafeRef, xUnsafeRef, lSafeRef, xSafeRef, finalizedRef) return nil }