Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sync_tester_ext_el
import (
"fmt"
"os"
"runtime"
"testing"

"github.com/ethereum-optimism/optimism/op-devstack/devtest"
Expand Down Expand Up @@ -228,11 +227,6 @@ func setupOrchestrator(gt *testing.T, t devtest.T, blocksToSync uint64) (*sysgo.
Finalized: chainCfg.Genesis.L2.Number,
}),
)
// TODO(#17564): op-node has a suspected race during EL Sync.
// To temporarily mitigate and stabilize tests, restrict runtime
// parallelism to 1 (no true concurrency). This masks the race;
// remove once the underlying issue is fixed.
runtime.GOMAXPROCS(1)
} else {
opt = stack.Combine(opt,
presets.WithSyncTesterELInitialState(eth.FCUState{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"os"
"runtime"
"strconv"
"testing"

Expand Down Expand Up @@ -200,11 +199,6 @@ func setupOrchestrator(gt *testing.T, t devtest.T, blk, targetBlock uint64, l2CL
Finalized: chainCfg.Genesis.L2.Number,
}),
)
// TODO(#17564): op-node has a suspected race during EL Sync.
// To temporarily mitigate and stabilize tests, restrict runtime
// parallelism to 1 (no true concurrency). This masks the race;
// remove once the underlying issue is fixed.
runtime.GOMAXPROCS(1)
} else {
opt = stack.Combine(opt,
presets.WithSyncTesterELInitialState(eth.FCUState{
Expand Down
2 changes: 1 addition & 1 deletion op-node/rollup/engine/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (e *EngineController) CommitBlock(ctx context.Context, signed *opsigner.Sig

e.SetUnsafeHead(ref)
e.emitter.Emit(ctx, UnsafeUpdateEvent{Ref: ref})
if err := e.tryUpdateEngine(ctx); err != nil {
if err := e.tryUpdateEngineInternal(ctx); err != nil {
return fmt.Errorf("failed to update engine forkchoice: %w", err)
}
return nil
Expand Down
121 changes: 87 additions & 34 deletions op-node/rollup/engine/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ func (e *EngineController) requestForkchoiceUpdate(ctx context.Context) {
}

func (e *EngineController) IsEngineSyncing() bool {
e.mu.Lock()
defer e.mu.Unlock()
return e.isEngineSyncing()
}

func (e *EngineController) isEngineSyncing() bool {
return e.syncStatus == syncStatusWillStartEL ||
e.syncStatus == syncStatusStartedEL ||
e.syncStatus == syncStatusFinishedELButNotFinalized
Expand Down Expand Up @@ -398,13 +404,11 @@ func (e *EngineController) initializeUnknowns(ctx context.Context) error {
return nil
}

// tryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node,
// this is a no-op if the nodes already agree on the forkchoice state.
func (e *EngineController) tryUpdateEngine(ctx context.Context) error {
func (e *EngineController) tryUpdateEngineInternal(ctx context.Context) error {
if !e.needFCUCall {
return ErrNoFCUNeeded
}
if e.IsEngineSyncing() {
if e.isEngineSyncing() {
e.log.Warn("Attempting to update forkchoice state while EL syncing")
}
if err := e.initializeUnknowns(ctx); err != nil {
Expand Down Expand Up @@ -447,7 +451,31 @@ func (e *EngineController) tryUpdateEngine(ctx context.Context) error {
return nil
}

// tryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node,
// this is a no-op if the nodes already agree on the forkchoice state.
func (e *EngineController) tryUpdateEngine(ctx context.Context) {
// If we don't need to call FCU, keep going b/c this was a no-op. If we needed to
// perform a network call, then we should yield even if we did not encounter an error.
if err := e.tryUpdateEngineInternal(e.ctx); err != nil && !errors.Is(err, ErrNoFCUNeeded) {
if errors.Is(err, derive.ErrReset) {
e.emitter.Emit(ctx, rollup.ResetEvent{Err: err})
} else if errors.Is(err, derive.ErrTemporary) {
e.emitter.Emit(ctx, rollup.EngineTemporaryErrorEvent{Err: err})
} else {
e.emitter.Emit(ctx, rollup.CriticalErrorEvent{
Err: fmt.Errorf("unexpected tryUpdateEngine error type: %w", err),
})
}
}
}

func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error {
e.mu.Lock()
defer e.mu.Unlock()
return e.insertUnsafePayload(ctx, envelope, ref)
}

func (e *EngineController) insertUnsafePayload(ctx context.Context, envelope *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error {
// Check if there is a finalized head once when doing EL sync. If so, transition to CL sync
if e.syncStatus == syncStatusWillStartEL {
b, err := e.engine.L2BlockRefByLabel(ctx, eth.Finalized)
Expand Down Expand Up @@ -556,7 +584,7 @@ func (e *EngineController) shouldTryBackupUnsafeReorg() bool {
return false
}
// This method must be never called when EL sync. If EL sync is in progress, early return.
if e.IsEngineSyncing() {
if e.isEngineSyncing() {
e.log.Warn("Attempting to unsafe reorg using backupUnsafe while EL syncing")
return false
}
Expand All @@ -568,9 +596,15 @@ func (e *EngineController) shouldTryBackupUnsafeReorg() bool {
return true
}

// TryBackupUnsafeReorg attempts to reorg(restore) unsafe head to backupUnsafeHead.
// If succeeds, update current forkchoice state to the rollup node.
func (e *EngineController) TryBackupUnsafeReorg(ctx context.Context) (bool, error) {
e.mu.Lock()
defer e.mu.Unlock()
return e.tryBackupUnsafeReorg(ctx)
}

// tryBackupUnsafeReorg attempts to reorg(restore) unsafe head to backupUnsafeHead.
// If succeeds, update current forkchoice state to the rollup node.
func (e *EngineController) tryBackupUnsafeReorg(ctx context.Context) (bool, error) {
if !e.shouldTryBackupUnsafeReorg() {
// Do not need to perform FCU.
return false, nil
Expand Down Expand Up @@ -625,24 +659,14 @@ func (e *EngineController) TryBackupUnsafeReorg(ctx context.Context) (bool, erro
}

func (e *EngineController) TryUpdateEngine(ctx context.Context) {
// If we don't need to call FCU, keep going b/c this was a no-op. If we needed to
// perform a network call, then we should yield even if we did not encounter an error.
if err := e.tryUpdateEngine(e.ctx); err != nil && !errors.Is(err, ErrNoFCUNeeded) {
if errors.Is(err, derive.ErrReset) {
e.emitter.Emit(ctx, rollup.ResetEvent{Err: err})
} else if errors.Is(err, derive.ErrTemporary) {
e.emitter.Emit(ctx, rollup.EngineTemporaryErrorEvent{Err: err})
} else {
e.emitter.Emit(ctx, rollup.CriticalErrorEvent{
Err: fmt.Errorf("unexpected tryUpdateEngine error type: %w", err),
})
}
}
e.mu.Lock()
defer e.mu.Unlock()
e.tryUpdateEngine(ctx)
}

// TODO(#16917) Remove Event System Refactor Comments
// OnEvent implements event.Deriver (moved from EngDeriver)
// TryUpdateEngineEvent is replaced with TryUpdateEngine
// TryUpdateEngineEvent is replaced with tryUpdateEngine
func (e *EngineController) OnEvent(ctx context.Context, ev event.Event) bool {
e.mu.Lock()
defer e.mu.Unlock()
Expand All @@ -656,7 +680,7 @@ func (e *EngineController) OnEvent(ctx context.Context, ev event.Event) bool {
e.emitter.Emit(ctx, PromoteCrossUnsafeEvent(x))
}
// Try to apply the forkchoice changes
e.TryUpdateEngine(ctx)
e.tryUpdateEngine(ctx)
case PromoteCrossUnsafeEvent:
e.SetCrossUnsafeHead(x.Ref)
e.onUnsafeUpdate(ctx, x.Ref, e.unsafeHead)
Expand Down Expand Up @@ -693,15 +717,24 @@ func (e *EngineController) OnEvent(ctx context.Context, ev event.Event) bool {
return true
}

func (e *EngineController) RequestPendingSafeUpdate(ctx context.Context) {
e.emitter.Emit(ctx, PendingSafeUpdateEvent{
PendingSafe: e.pendingSafeHead,
Unsafe: e.unsafeHead,
func (d *EngineController) RequestPendingSafeUpdate(ctx context.Context) {
d.mu.Lock()
defer d.mu.Unlock()
d.emitter.Emit(ctx, PendingSafeUpdateEvent{
PendingSafe: d.PendingSafeL2Head(),
Unsafe: d.UnsafeL2Head(),
})
}

// TryUpdatePendingSafe updates the pending safe head if the new reference is newer
// TryUpdatePendingSafe updates the pending safe head if the new reference is newer, acquiring lock
func (e *EngineController) TryUpdatePendingSafe(ctx context.Context, ref eth.L2BlockRef, concluding bool, source eth.L1BlockRef) {
e.mu.Lock()
defer e.mu.Unlock()
e.tryUpdatePendingSafe(ctx, ref, concluding, source)
}

// tryUpdatePendingSafe updates the pending safe head if the new reference is newer
func (e *EngineController) tryUpdatePendingSafe(ctx context.Context, ref eth.L2BlockRef, concluding bool, source eth.L1BlockRef) {
// Only promote if not already stale.
// Resets/overwrites happen through engine-resets, not through promotion.
if ref.Number > e.pendingSafeHead.Number {
Expand All @@ -714,8 +747,15 @@ func (e *EngineController) TryUpdatePendingSafe(ctx context.Context, ref eth.L2B
}
}

// TryUpdateLocalSafe updates the local safe head if the new reference is newer and concluding
// TryUpdateLocalSafe updates the local safe head if the new reference is newer and concluding, acquiring lock
func (e *EngineController) TryUpdateLocalSafe(ctx context.Context, ref eth.L2BlockRef, concluding bool, source eth.L1BlockRef) {
e.mu.Lock()
defer e.mu.Unlock()
e.tryUpdateLocalSafe(ctx, ref, concluding, source)
}

// tryUpdateLocalSafe updates the local safe head if the new reference is newer and concluding
func (e *EngineController) tryUpdateLocalSafe(ctx context.Context, ref eth.L2BlockRef, concluding bool, source eth.L1BlockRef) {
if concluding && ref.Number > e.localSafeHead.Number {
// Promote to local safe
e.log.Debug("Updating local safe", "local_safe", ref, "safe", e.safeHead, "unsafe", e.unsafeHead)
Expand All @@ -725,7 +765,7 @@ func (e *EngineController) TryUpdateLocalSafe(ctx context.Context, ref eth.L2Blo
}

// TryUpdateUnsafe updates the unsafe head and backs up the previous one if needed
func (e *EngineController) TryUpdateUnsafe(ctx context.Context, ref eth.L2BlockRef) {
func (e *EngineController) tryUpdateUnsafe(ctx context.Context, ref eth.L2BlockRef) {
// Backup unsafeHead when new block is not built on original unsafe head.
if e.unsafeHead.Number >= ref.Number {
e.SetBackupUnsafeL2Head(e.unsafeHead, false)
Expand All @@ -746,10 +786,16 @@ func (e *EngineController) PromoteSafe(ctx context.Context, ref eth.L2BlockRef,
e.onUnsafeUpdate(ctx, ref, e.unsafeHead)
}
// Try to apply the forkchoice changes
e.TryUpdateEngine(ctx)
e.tryUpdateEngine(ctx)
}

func (e *EngineController) PromoteFinalized(ctx context.Context, ref eth.L2BlockRef) {
e.mu.Lock()
defer e.mu.Unlock()
e.promoteFinalized(ctx, ref)
}
func (e *EngineController) promoteFinalized(ctx context.Context, ref eth.L2BlockRef) {

if ref.Number < e.finalizedHead.Number {
e.log.Error("Cannot rewind finality,", "ref", ref, "finalized", e.finalizedHead)
return
Expand All @@ -761,7 +807,7 @@ func (e *EngineController) PromoteFinalized(ctx context.Context, ref eth.L2Block
e.SetFinalizedHead(ref)
e.emitter.Emit(ctx, FinalizedUpdateEvent{Ref: ref})
// Try to apply the forkchoice changes
e.TryUpdateEngine(ctx)
e.tryUpdateEngine(ctx)
}

// SetAttributesResetter sets the attributes component that needs force reset notifications
Expand All @@ -779,8 +825,15 @@ func (e *EngineController) SetOriginSelectorResetter(resetter OriginSelectorForc
e.originSelectorResetter = resetter
}

// ForceReset performs a forced reset to the specified block references
// ForceReset performs a forced reset to the specified block references, acquiring lock
func (e *EngineController) ForceReset(ctx context.Context, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef) {
e.mu.Lock()
defer e.mu.Unlock()
e.forceReset(ctx, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized)
}

// forceReset performs a forced reset to the specified block references
func (e *EngineController) forceReset(ctx context.Context, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef) {
// Reset other components before resetting the engine
if e.attributesResetter != nil {
e.attributesResetter.ForceReset(ctx, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized)
Expand All @@ -800,7 +853,7 @@ func (e *EngineController) ForceReset(ctx context.Context, localUnsafe, crossUns
}

// Time to apply the changes to the underlying engine
e.TryUpdateEngine(ctx)
e.tryUpdateEngine(ctx)

v := EngineResetConfirmedEvent{
LocalUnsafe: e.unsafeHead,
Expand Down Expand Up @@ -892,7 +945,7 @@ func (e *EngineController) processUnsafePayload(ctx context.Context, envelope *e
if ref.BlockRef().ID() == e.unsafeHead.BlockRef().ID() {
return
}
if err := e.InsertUnsafePayload(e.ctx, envelope, ref); err != nil {
if err := e.insertUnsafePayload(e.ctx, envelope, ref); err != nil {
e.log.Info("failed to insert payload", "ref", ref,
"txs", len(envelope.ExecutionPayload.Transactions), "err", err)
// yes, duplicate error-handling. After all derivers are interacting with the engine
Expand Down
12 changes: 6 additions & 6 deletions op-node/rollup/engine/payload_success.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,27 @@ func (e *EngineController) onPayloadSuccess(ctx context.Context, ev PayloadSucce
e.log.Warn("Successfully built replacement block, resetting chain to continue now", "replacement", ev.Ref)
// Change the engine state to make the replacement block the cross-safe head of the chain,
// And continue syncing from there.
e.ForceReset(ctx, ev.Ref, ev.Ref, ev.Ref, ev.Ref, e.Finalized())
e.forceReset(ctx, ev.Ref, ev.Ref, ev.Ref, ev.Ref, e.Finalized())
e.emitter.Emit(ctx, InteropReplacedBlockEvent{
Envelope: ev.Envelope,
Ref: ev.Ref.BlockRef(),
})
// Apply it to the execution engine
e.TryUpdateEngine(ctx)
e.tryUpdateEngine(ctx)
// Not a regular reset, since we don't wind back to any L2 block.
// We start specifically from the replacement block.
return
}

// TryUpdateUnsafe, TryUpdatePendingSafe, TryUpdateLocalSafe, tryUpdateEngine must be sequentially invoked
e.TryUpdateUnsafe(ctx, ev.Ref)
e.tryUpdateUnsafe(ctx, ev.Ref)
// If derived from L1, then it can be considered (pending) safe
if ev.DerivedFrom != (eth.L1BlockRef{}) {
e.TryUpdatePendingSafe(ctx, ev.Ref, ev.Concluding, ev.DerivedFrom)
e.TryUpdateLocalSafe(ctx, ev.Ref, ev.Concluding, ev.DerivedFrom)
e.tryUpdatePendingSafe(ctx, ev.Ref, ev.Concluding, ev.DerivedFrom)
e.tryUpdateLocalSafe(ctx, ev.Ref, ev.Concluding, ev.DerivedFrom)
}
// Now if possible synchronously call FCU
err := e.tryUpdateEngine(ctx)
err := e.tryUpdateEngineInternal(ctx)
if err != nil {
e.log.Error("Failed to update engine", "error", err)
}
Expand Down