Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion op-e2e/actions/helpers/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher,
ec := engine.NewEngineController(ctx, eng, log, opnodemetrics.NoopMetrics, cfg, syncCfg, sys.Register("engine-controller", nil, opts))

if mm, ok := interopSys.(*indexing.IndexingMode); ok {
mm.SetForceResetNotifier(ec)
mm.SetEngineController(ec)
}

engineResetDeriver := engine.NewEngineResetDeriver(ctx, log, cfg, l1, eng, syncCfg)
Expand Down
1 change: 0 additions & 1 deletion op-e2e/actions/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,6 @@ func TestSpanBatchAtomicity_Consolidation(gt *testing.T) {
require.Equal(t, verifier.L2Safe().Number, uint64(0))
} else {
// Make sure we do the post-processing of what safety updates might happen
// Digest events until EngDeriver implicitly consumes PromoteSafeEvent
verifier.ActL2PipelineFull(t)
// Once the span batch is fully processed, the safe head must advance to the end of span batch.
require.Equal(t, verifier.L2Safe().Number, targetHeadNumber)
Expand Down
4 changes: 2 additions & 2 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,10 @@ func (n *OpNode) initL2(ctx context.Context, cfg *config.Config) error {
n.l2Driver = driver.NewDriver(n.eventSys, n.eventDrain, &cfg.Driver, &cfg.Rollup, cfg.DependencySet, n.l2Source, n.l1Source,
n.beacon, n, n, n.log, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, altDA, indexingMode)

// Wire up IndexingMode to engine controller for direct force reset notifications
// Wire up IndexingMode to engine controller for direct procedure call
if n.interopSys != nil {
if indexingMode, ok := n.interopSys.(*indexing.IndexingMode); ok {
indexingMode.SetForceResetNotifier(n.l2Driver.SyncDeriver.Engine)
indexingMode.SetEngineController(n.l2Driver.SyncDeriver.Engine)
}
}

Expand Down
33 changes: 18 additions & 15 deletions op-node/rollup/engine/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ func (d *EngineController) OnEvent(ctx context.Context, ev event.Event) bool {
defer d.mu.Unlock()
// TODO(#16917) Remove Event System Refactor Comments
// PromoteUnsafeEvent, PromotePendingSafeEvent, PromoteLocalSafeEvent fan out is updated to procedural
// PromoteSafeEvent fan out is updated to procedural PromoteSafe method call
switch x := ev.(type) {
case ProcessUnsafePayloadEvent:
ref, err := derive.PayloadToBlockRef(d.rollupCfg, x.Envelope.ExecutionPayload)
Expand Down Expand Up @@ -699,23 +700,10 @@ func (d *EngineController) OnEvent(ctx context.Context, ev event.Event) bool {
})

case LocalSafeUpdateEvent:
// pre-interop everything that is local-unsafe is also immediately cross-unsafe.
// pre-interop everything that is local-safe is also immediately cross-safe.
if !d.rollupCfg.IsInterop(x.Ref.Time) {
d.emitter.Emit(ctx, PromoteSafeEvent(x))
}
case PromoteSafeEvent:
d.log.Debug("Updating safe", "safe", x.Ref, "unsafe", d.UnsafeL2Head())
d.SetSafeHead(x.Ref)
// Finalizer can pick up this safe cross-block now
d.emitter.Emit(ctx, SafeDerivedEvent{Safe: x.Ref, Source: x.Source})
d.onSafeUpdate(ctx, d.SafeL2Head(), d.LocalSafeL2Head())
if x.Ref.Number > d.crossUnsafeHead.Number {
d.log.Debug("Cross Unsafe Head is stale, updating to match cross safe", "cross_unsafe", d.crossUnsafeHead, "cross_safe", x.Ref)
d.SetCrossUnsafeHead(x.Ref)
d.onUnsafeUpdate(ctx, x.Ref, d.UnsafeL2Head())
d.PromoteSafe(ctx, x.Ref, x.Source)
}
// Try to apply the forkchoice changes
d.TryUpdateEngine(ctx)
case PromoteFinalizedEvent:
if x.Ref.Number < d.Finalized().Number {
d.log.Error("Cannot rewind finality,", "ref", x.Ref, "finalized", d.Finalized())
Expand Down Expand Up @@ -789,6 +777,21 @@ func (e *EngineController) TryUpdateUnsafe(ctx context.Context, ref eth.L2BlockR
e.emitter.Emit(ctx, UnsafeUpdateEvent{Ref: ref})
}

func (e *EngineController) PromoteSafe(ctx context.Context, ref eth.L2BlockRef, source eth.L1BlockRef) {
e.log.Debug("Updating safe", "safe", ref, "unsafe", e.UnsafeL2Head())
e.SetSafeHead(ref)
// Finalizer can pick up this safe cross-block now
e.emitter.Emit(ctx, SafeDerivedEvent{Safe: ref, Source: source})
e.onSafeUpdate(ctx, e.SafeL2Head(), e.LocalSafeL2Head())
if ref.Number > e.crossUnsafeHead.Number {
e.log.Debug("Cross Unsafe Head is stale, updating to match cross safe", "cross_unsafe", e.crossUnsafeHead, "cross_safe", ref)
e.SetCrossUnsafeHead(ref)
e.onUnsafeUpdate(ctx, ref, e.UnsafeL2Head())
}
// Try to apply the forkchoice changes
e.TryUpdateEngine(ctx)
}

// SetAttributesResetter sets the attributes component that needs force reset notifications
func (e *EngineController) SetAttributesResetter(resetter AttributesForceResetter) {
e.attributesResetter = resetter
Expand Down
12 changes: 1 addition & 11 deletions op-node/rollup/engine/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,8 @@ func (ev LocalSafeUpdateEvent) String() string {
return "local-safe-update"
}

// PromoteSafeEvent signals that a block can be promoted to cross-safe.
type PromoteSafeEvent struct {
Ref eth.L2BlockRef
Source eth.L1BlockRef
}

func (ev PromoteSafeEvent) String() string {
return "promote-safe"
}

// SafeDerivedEvent signals that a block was determined to be safe, and derived from the given L1 block.
// This is signaled upon successful processing of PromoteSafeEvent.
// This is signaled upon procedural call of PromoteSafe method
type SafeDerivedEvent struct {
Safe eth.L2BlockRef
Source eth.L1BlockRef
Expand Down
21 changes: 8 additions & 13 deletions op-node/rollup/interop/indexing/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ type L1Source interface {
L1BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L1BlockRef, error)
L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1BlockRef, error)
}

type ForceResetNotifier interface {
type EngineController interface {
ForceReset(ctx context.Context, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef)
PromoteSafe(ctx context.Context, ref eth.L2BlockRef, source eth.L1BlockRef)
}

// IndexingMode makes the op-node managed by an op-supervisor,
Expand All @@ -57,8 +57,6 @@ type IndexingMode struct {

emitter event.Emitter

forceResetNotifier ForceResetNotifier

l1 L1Source
l2 L2Source

Expand All @@ -79,6 +77,8 @@ type IndexingMode struct {

srv *rpc.Server
jwtSecret eth.Bytes32

engineController EngineController
}

func NewIndexingMode(log log.Logger, cfg *rollup.Config, addr string, port int, jwtSecret eth.Bytes32, l1 L1Source, l2 L2Source, m opmetrics.RPCMetricer) *IndexingMode {
Expand Down Expand Up @@ -117,8 +117,8 @@ func NewIndexingMode(log log.Logger, cfg *rollup.Config, addr string, port int,
return out
}

func (m *IndexingMode) SetForceResetNotifier(notifier ForceResetNotifier) {
m.forceResetNotifier = notifier
func (m *IndexingMode) SetEngineController(engineController EngineController) {
m.engineController = engineController
}

// TestDisableEventDeduplication is a test-only function that disables event deduplication.
Expand Down Expand Up @@ -298,12 +298,7 @@ func (m *IndexingMode) UpdateCrossSafe(ctx context.Context, derived eth.BlockID,
if err != nil {
return fmt.Errorf("failed to get L1BlockRef: %w", err)
}
m.emitter.Emit(m.ctx, engine.PromoteSafeEvent{
Ref: l2Ref,
Source: l1Ref,
})
// We return early: there is no point waiting for the cross-safe engine-update synchronously.
// All error-feedback comes to the supervisor by aborting derivation tasks with an error.
m.engineController.PromoteSafe(ctx, l2Ref, l1Ref)
return nil
}

Expand Down Expand Up @@ -460,7 +455,7 @@ func (m *IndexingMode) Reset(ctx context.Context, lUnsafe, xUnsafe, lSafe, xSafe
return err
}

m.forceResetNotifier.ForceReset(ctx, lUnsafeRef, xUnsafeRef, lSafeRef, xSafeRef, finalizedRef)
m.engineController.ForceReset(ctx, lUnsafeRef, xUnsafeRef, lSafeRef, xSafeRef, finalizedRef)
return nil
}

Expand Down