Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1981,8 +1981,8 @@ workflows:
- contracts-bedrock-build
- go-tests:
name: go-tests-short
no_output_timeout: 19m
test_timeout: 20m
no_output_timeout: 29m
test_timeout: 30m
requires:
- contracts-bedrock-build
- cannon-prestate-quick
Expand Down
15 changes: 12 additions & 3 deletions op-e2e/actions/helpers/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,14 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher,
metrics := &testutils.TestDerivationMetrics{}
ec := engine.NewEngineController(ctx, eng, log, opnodemetrics.NoopMetrics, cfg, syncCfg, sys.Register("engine-controller", nil, opts))

sys.Register("engine-reset",
engine.NewEngineResetDeriver(ctx, log, cfg, l1, eng, syncCfg), opts)
if mm, ok := interopSys.(*indexing.IndexingMode); ok {
mm.SetForceResetNotifier(ec)
}

engineResetDeriver := engine.NewEngineResetDeriver(ctx, log, cfg, l1, eng, syncCfg)
sys.Register("engine-reset", engineResetDeriver, opts)
// TODO(#17061): Refactor dependency cycles
engineResetDeriver.SetEngController(ec)

clSync := clsync.NewCLSync(log, cfg, metrics, ec)
sys.Register("cl-sync", clSync, opts)
Expand All @@ -163,10 +169,13 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher,

attrHandler := attributes.NewAttributesHandler(log, cfg, ctx, eng, ec)
sys.Register("attributes-handler", attrHandler, opts)
ec.SetAttributesResetter(attrHandler)

indexingMode := interopSys != nil
pipeline := derive.NewDerivationPipeline(log, cfg, depSet, l1, blobsSrc, altDASrc, eng, metrics, indexingMode)
sys.Register("pipeline", derive.NewPipelineDeriver(ctx, pipeline), opts)
pipelineDeriver := derive.NewPipelineDeriver(ctx, pipeline)
sys.Register("pipeline", pipelineDeriver, opts)
ec.SetPipelineResetter(pipelineDeriver)

testActionEmitter := sys.Register("test-action", nil, opts)

Expand Down
8 changes: 8 additions & 0 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,14 @@ func (n *OpNode) initL2(ctx context.Context, cfg *config.Config) error {

n.l2Driver = driver.NewDriver(n.eventSys, n.eventDrain, &cfg.Driver, &cfg.Rollup, cfg.DependencySet, n.l2Source, n.l1Source,
n.beacon, n, n, n.log, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, altDA, indexingMode)

// Wire up IndexingMode to engine controller for direct force reset notifications
if n.interopSys != nil {
if indexingMode, ok := n.interopSys.(*indexing.IndexingMode); ok {
indexingMode.SetForceResetNotifier(n.l2Driver.SyncDeriver.Engine)
}
}

return nil
}

Expand Down
16 changes: 13 additions & 3 deletions op-node/rollup/attributes/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ func (eq *AttributesHandler) AttachEmitter(em event.Emitter) {
eq.emitter = em
}

func (eq *AttributesHandler) forceResetLocked() {
eq.sentAttributes = false
eq.attributes = nil
}

func (eq *AttributesHandler) ForceReset(ctx context.Context, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef) {
eq.mu.Lock()
defer eq.mu.Unlock()
eq.forceResetLocked()
}

func (eq *AttributesHandler) OnEvent(ctx context.Context, ev event.Event) bool {
// Events may be concurrent in the future. Prevent unsafe concurrent modifications to the attributes.
eq.mu.Lock()
Expand All @@ -83,9 +94,8 @@ func (eq *AttributesHandler) OnEvent(ctx context.Context, ev event.Event) bool {
eq.emitter.Emit(ctx, derive.ConfirmReceivedAttributesEvent{})
// to make sure we have a pre-state signal to process the attributes from
eq.emitter.Emit(ctx, engine.PendingSafeRequestEvent{})
case rollup.ResetEvent, rollup.ForceResetEvent:
eq.sentAttributes = false
eq.attributes = nil
case rollup.ResetEvent:
eq.forceResetLocked()
case rollup.EngineTemporaryErrorEvent:
eq.sentAttributes = false
case engine.InvalidPayloadAttributesEvent:
Expand Down
6 changes: 4 additions & 2 deletions op-node/rollup/derive/deriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,12 @@ func (d *PipelineDeriver) AttachEmitter(em event.Emitter) {
d.emitter = em
}

func (d *PipelineDeriver) ResetPipeline() {
d.pipeline.Reset()
}

func (d *PipelineDeriver) OnEvent(ctx context.Context, ev event.Event) bool {
switch x := ev.(type) {
case rollup.ForceResetEvent:
d.pipeline.Reset()
case PipelineStepEvent:
// Don't generate attributes if there are already attributes in-flight
if d.needAttributesConfirmation {
Expand Down
3 changes: 2 additions & 1 deletion op-node/rollup/derive/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, depSet Depe
// DerivationReady returns true if the derivation pipeline is ready to be used.
// When it's being reset its state is inconsistent, and should not be used externally.
func (dp *DerivationPipeline) DerivationReady() bool {
return dp.engineIsReset && dp.resetting > 0
// Ready only when the engine has been confirmed reset and all stages finished resetting
return dp.engineIsReset && dp.resetting >= len(dp.stages)
}

func (dp *DerivationPipeline) Reset() {
Expand Down
17 changes: 13 additions & 4 deletions op-node/rollup/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ func NewDriver(
// TODO(#17115): Refactor dependency cycles
ec.SetCrossUpdateHandler(statusTracker)

sys.Register("engine-reset",
engine.NewEngineResetDeriver(driverCtx, log, cfg, l1, l2, syncCfg))
engineReset := engine.NewEngineResetDeriver(driverCtx, log, cfg, l1, l2, syncCfg)
engineReset.SetEngController(ec)
sys.Register("engine-reset", engineReset)

clSync := clsync.NewCLSync(log, cfg, metrics, ec) // alt-sync still uses cl-sync state to determine what to sync to
sys.Register("cl-sync", clSync)
Expand All @@ -213,8 +214,12 @@ func NewDriver(

derivationPipeline := derive.NewDerivationPipeline(log, cfg, depSet, verifConfDepth, l1Blobs, altDA, l2, metrics, indexingMode)

sys.Register("pipeline",
derive.NewPipelineDeriver(driverCtx, derivationPipeline))
pipelineDeriver := derive.NewPipelineDeriver(driverCtx, derivationPipeline)
sys.Register("pipeline", pipelineDeriver)

// Connect components that need force reset notifications to the engine controller
ec.SetAttributesResetter(attrHandler)
ec.SetPipelineResetter(pipelineDeriver)

schedDeriv := NewStepSchedulingDeriver(log)
sys.Register("step-scheduler", schedDeriv)
Expand Down Expand Up @@ -248,6 +253,10 @@ func NewDriver(
sequencerConfDepth := confdepth.NewConfDepth(driverCfg.SequencerConfDepth, statusTracker.L1Head, l1)
findL1Origin := sequencing.NewL1OriginSelector(driverCtx, log, cfg, sequencerConfDepth)
sys.Register("origin-selector", findL1Origin)

// Connect origin selector to the engine controller for force reset notifications
ec.SetOriginSelectorResetter(findL1Origin)

sequencer = sequencing.NewSequencer(driverCtx, log, cfg, attrBuilder, findL1Origin,
sequencerStateListener, sequencerConductor, asyncGossiper, metrics, ec)
sys.Register("sequencer", sequencer)
Expand Down
98 changes: 73 additions & 25 deletions op-node/rollup/engine/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ type SyncDeriver interface {
OnELSyncStarted()
}

type AttributesForceResetter interface {
ForceReset(ctx context.Context, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef)
}

type PipelineForceResetter interface {
ResetPipeline()
}

type OriginSelectorForceResetter interface {
ResetOrigins()
}

// CrossUpdateHandler handles both cross-unsafe and cross-safe L2 head changes.
// Nil check required because op-program omits this handler.
type CrossUpdateHandler interface {
Expand Down Expand Up @@ -110,6 +122,11 @@ type EngineController struct {
// Embed SyncDeriver into EngineController after initializing SyncDeriver
SyncDeriver SyncDeriver

// Components that need to be notified during force reset
attributesResetter AttributesForceResetter
pipelineResetter PipelineForceResetter
originSelectorResetter OriginSelectorForceResetter

// Handler for cross-unsafe and cross-safe updates
crossUpdateHandler CrossUpdateHandler
}
Expand Down Expand Up @@ -166,8 +183,6 @@ func (e *EngineController) BackupUnsafeL2Head() eth.L2BlockRef {
return e.backupUnsafeHead
}

// RequestForkchoiceUpdate implements attributes.EngineController.
// It reads the current heads under a read lock and emits a ForkchoiceUpdateEvent.
func (e *EngineController) RequestForkchoiceUpdate(ctx context.Context) {
e.mu.RLock()
unsafe := e.UnsafeL2Head()
Expand Down Expand Up @@ -667,29 +682,6 @@ func (d *EngineController) OnEvent(ctx context.Context, ev event.Event) bool {
} else {
d.log.Info("successfully processed payload", "ref", ref, "txs", len(x.Envelope.ExecutionPayload.Transactions))
}
case rollup.ForceResetEvent:
ForceEngineReset(d, x)

// Time to apply the changes to the underlying engine
d.TryUpdateEngine(ctx)

v := EngineResetConfirmedEvent{
LocalUnsafe: d.UnsafeL2Head(),
CrossUnsafe: d.CrossUnsafeL2Head(),
LocalSafe: d.LocalSafeL2Head(),
CrossSafe: d.SafeL2Head(),
Finalized: d.Finalized(),
}
// We do not emit the original event values, since those might not be set (optional attributes).
d.emitter.Emit(ctx, v)
d.log.Info("Reset of Engine is completed",
"local_unsafe", v.LocalUnsafe,
"cross_unsafe", v.CrossUnsafe,
"local_safe", v.LocalSafe,
"cross_safe", v.CrossSafe,
"finalized", v.Finalized,
)

case UnsafeUpdateEvent:
// pre-interop everything that is local-unsafe is also immediately cross-unsafe.
if !d.rollupCfg.IsInterop(x.Ref.Time) {
Expand Down Expand Up @@ -796,3 +788,59 @@ func (e *EngineController) TryUpdateUnsafe(ctx context.Context, ref eth.L2BlockR
e.SetUnsafeHead(ref)
e.emitter.Emit(ctx, UnsafeUpdateEvent{Ref: ref})
}

// SetAttributesResetter sets the attributes component that needs force reset notifications
func (e *EngineController) SetAttributesResetter(resetter AttributesForceResetter) {
e.attributesResetter = resetter
}

// SetPipelineResetter sets the pipeline component that needs force reset notifications
func (e *EngineController) SetPipelineResetter(resetter PipelineForceResetter) {
e.pipelineResetter = resetter
}

// SetOriginSelectorResetter sets the origin selector component that needs force reset notifications
func (e *EngineController) SetOriginSelectorResetter(resetter OriginSelectorForceResetter) {
e.originSelectorResetter = resetter
}

// ForceReset performs a forced reset to the specified block references
func (e *EngineController) ForceReset(ctx context.Context, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef) {
// Reset other components before resetting the engine
if e.attributesResetter != nil {
e.attributesResetter.ForceReset(ctx, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized)
}
if e.pipelineResetter != nil {
e.pipelineResetter.ResetPipeline()
}
// originSelectorResetter is only present when sequencing is enabled
if e.originSelectorResetter != nil {
e.originSelectorResetter.ResetOrigins()
}

ForceEngineReset(e, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized)

if e.pipelineResetter != nil {
e.emitter.Emit(ctx, derive.ConfirmPipelineResetEvent{})
}

// Time to apply the changes to the underlying engine
e.TryUpdateEngine(ctx)

v := EngineResetConfirmedEvent{
LocalUnsafe: e.UnsafeL2Head(),
CrossUnsafe: e.CrossUnsafeL2Head(),
LocalSafe: e.LocalSafeL2Head(),
CrossSafe: e.SafeL2Head(),
Finalized: e.Finalized(),
}
// We do not emit the original event values, since those might not be set (optional attributes).
e.emitter.Emit(ctx, v)
e.log.Info("Reset of Engine is completed",
"local_unsafe", v.LocalUnsafe,
"cross_unsafe", v.CrossUnsafe,
"local_safe", v.LocalSafe,
"cross_safe", v.CrossSafe,
"finalized", v.Finalized,
)
}
14 changes: 7 additions & 7 deletions op-node/rollup/engine/engine_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type EngineResetDeriver struct {
syncCfg *sync.Config

emitter event.Emitter

engController *EngineController
}

func NewEngineResetDeriver(ctx context.Context, log log.Logger, cfg *rollup.Config,
Expand All @@ -45,6 +47,10 @@ func NewEngineResetDeriver(ctx context.Context, log log.Logger, cfg *rollup.Conf
}
}

func (d *EngineResetDeriver) SetEngController(engController *EngineController) {
d.engController = engController
}

func (d *EngineResetDeriver) AttachEmitter(em event.Emitter) {
d.emitter = em
}
Expand All @@ -59,13 +65,7 @@ func (d *EngineResetDeriver) OnEvent(ctx context.Context, ev event.Event) bool {
})
return true
}
d.emitter.Emit(ctx, rollup.ForceResetEvent{
LocalUnsafe: result.Unsafe,
CrossUnsafe: result.Unsafe,
LocalSafe: result.Safe,
CrossSafe: result.Safe,
Finalized: result.Finalized,
})
d.engController.ForceReset(ctx, result.Unsafe, result.Unsafe, result.Safe, result.Safe, result.Finalized)
default:
return false
}
Expand Down
15 changes: 7 additions & 8 deletions op-node/rollup/engine/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package engine
import (
"github.com/ethereum/go-ethereum/common"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
Expand Down Expand Up @@ -162,21 +161,21 @@ type ResetEngineControl interface {
SetPendingSafeL2Head(eth.L2BlockRef)
}

func ForceEngineReset(ec ResetEngineControl, x rollup.ForceResetEvent) {
ec.SetUnsafeHead(x.LocalUnsafe)
func ForceEngineReset(ec ResetEngineControl, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef) {
ec.SetUnsafeHead(localUnsafe)

// cross-safe is fine to revert back, it does not affect engine logic, just sync-status
ec.SetCrossUnsafeHead(x.CrossUnsafe)
ec.SetCrossUnsafeHead(crossUnsafe)

// derivation continues at local-safe point
ec.SetLocalSafeHead(x.LocalSafe)
ec.SetPendingSafeL2Head(x.LocalSafe)
ec.SetLocalSafeHead(localSafe)
ec.SetPendingSafeL2Head(localSafe)

// "safe" in RPC terms is cross-safe
ec.SetSafeHead(x.CrossSafe)
ec.SetSafeHead(crossSafe)

// finalized head
ec.SetFinalizedHead(x.Finalized)
ec.SetFinalizedHead(finalized)

ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
}
9 changes: 1 addition & 8 deletions op-node/rollup/engine/payload_success.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
)

Expand All @@ -29,13 +28,7 @@ func (eq *EngineController) onPayloadSuccess(ctx context.Context, ev PayloadSucc
eq.log.Warn("Successfully built replacement block, resetting chain to continue now", "replacement", ev.Ref)
// Change the engine state to make the replacement block the cross-safe head of the chain,
// And continue syncing from there.
eq.emitter.Emit(ctx, rollup.ForceResetEvent{
LocalUnsafe: ev.Ref,
CrossUnsafe: ev.Ref,
LocalSafe: ev.Ref,
CrossSafe: ev.Ref,
Finalized: eq.Finalized(),
})
eq.ForceReset(ctx, ev.Ref, ev.Ref, ev.Ref, ev.Ref, eq.Finalized())
eq.emitter.Emit(ctx, InteropReplacedBlockEvent{
Envelope: ev.Envelope,
Ref: ev.Ref.BlockRef(),
Expand Down
Loading