diff --git a/op-acceptance-tests/tests/sync_tester/sync_tester_ext_el/sync_tester_ext_el_test.go b/op-acceptance-tests/tests/sync_tester/sync_tester_ext_el/sync_tester_ext_el_test.go index 9c4f57f26382d..28bdae5449247 100644 --- a/op-acceptance-tests/tests/sync_tester/sync_tester_ext_el/sync_tester_ext_el_test.go +++ b/op-acceptance-tests/tests/sync_tester/sync_tester_ext_el/sync_tester_ext_el_test.go @@ -3,7 +3,6 @@ package sync_tester_ext_el import ( "fmt" "os" - "runtime" "testing" "github.com/ethereum-optimism/optimism/op-devstack/devtest" @@ -228,11 +227,6 @@ func setupOrchestrator(gt *testing.T, t devtest.T, blocksToSync uint64) (*sysgo. Finalized: chainCfg.Genesis.L2.Number, }), ) - // TODO(#17564): op-node has a suspected race during EL Sync. - // To temporarily mitigate and stabilize tests, restrict runtime - // parallelism to 1 (no true concurrency). This masks the race; - // remove once the underlying issue is fixed. - runtime.GOMAXPROCS(1) } else { opt = stack.Combine(opt, presets.WithSyncTesterELInitialState(eth.FCUState{ diff --git a/op-acceptance-tests/tests/sync_tester/sync_tester_hfs_ext/sync_tester_hfs_ext_test.go b/op-acceptance-tests/tests/sync_tester/sync_tester_hfs_ext/sync_tester_hfs_ext_test.go index bc190690ca99d..1cb2a8dadf515 100644 --- a/op-acceptance-tests/tests/sync_tester/sync_tester_hfs_ext/sync_tester_hfs_ext_test.go +++ b/op-acceptance-tests/tests/sync_tester/sync_tester_hfs_ext/sync_tester_hfs_ext_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "runtime" "strconv" "testing" @@ -200,11 +199,6 @@ func setupOrchestrator(gt *testing.T, t devtest.T, blk, targetBlock uint64, l2CL Finalized: chainCfg.Genesis.L2.Number, }), ) - // TODO(#17564): op-node has a suspected race during EL Sync. - // To temporarily mitigate and stabilize tests, restrict runtime - // parallelism to 1 (no true concurrency). This masks the race; - // remove once the underlying issue is fixed. - runtime.GOMAXPROCS(1) } else { opt = stack.Combine(opt, presets.WithSyncTesterELInitialState(eth.FCUState{ diff --git a/op-node/rollup/engine/api.go b/op-node/rollup/engine/api.go index f6499f38986b6..78aba561ac3ba 100644 --- a/op-node/rollup/engine/api.go +++ b/op-node/rollup/engine/api.go @@ -138,7 +138,7 @@ func (e *EngineController) CommitBlock(ctx context.Context, signed *opsigner.Sig e.SetUnsafeHead(ref) e.emitter.Emit(ctx, UnsafeUpdateEvent{Ref: ref}) - if err := e.tryUpdateEngine(ctx); err != nil { + if err := e.tryUpdateEngineInternal(ctx); err != nil { return fmt.Errorf("failed to update engine forkchoice: %w", err) } return nil diff --git a/op-node/rollup/engine/engine_controller.go b/op-node/rollup/engine/engine_controller.go index 98456ca44cdf2..a9b8f18366afb 100644 --- a/op-node/rollup/engine/engine_controller.go +++ b/op-node/rollup/engine/engine_controller.go @@ -206,6 +206,12 @@ func (e *EngineController) requestForkchoiceUpdate(ctx context.Context) { } func (e *EngineController) IsEngineSyncing() bool { + e.mu.Lock() + defer e.mu.Unlock() + return e.isEngineSyncing() +} + +func (e *EngineController) isEngineSyncing() bool { return e.syncStatus == syncStatusWillStartEL || e.syncStatus == syncStatusStartedEL || e.syncStatus == syncStatusFinishedELButNotFinalized @@ -398,13 +404,11 @@ func (e *EngineController) initializeUnknowns(ctx context.Context) error { return nil } -// tryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node, -// this is a no-op if the nodes already agree on the forkchoice state. -func (e *EngineController) tryUpdateEngine(ctx context.Context) error { +func (e *EngineController) tryUpdateEngineInternal(ctx context.Context) error { if !e.needFCUCall { return ErrNoFCUNeeded } - if e.IsEngineSyncing() { + if e.isEngineSyncing() { e.log.Warn("Attempting to update forkchoice state while EL syncing") } if err := e.initializeUnknowns(ctx); err != nil { @@ -447,7 +451,31 @@ func (e *EngineController) tryUpdateEngine(ctx context.Context) error { return nil } +// tryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node, +// this is a no-op if the nodes already agree on the forkchoice state. +func (e *EngineController) tryUpdateEngine(ctx context.Context) { + // If we don't need to call FCU, keep going b/c this was a no-op. If we needed to + // perform a network call, then we should yield even if we did not encounter an error. + if err := e.tryUpdateEngineInternal(e.ctx); err != nil && !errors.Is(err, ErrNoFCUNeeded) { + if errors.Is(err, derive.ErrReset) { + e.emitter.Emit(ctx, rollup.ResetEvent{Err: err}) + } else if errors.Is(err, derive.ErrTemporary) { + e.emitter.Emit(ctx, rollup.EngineTemporaryErrorEvent{Err: err}) + } else { + e.emitter.Emit(ctx, rollup.CriticalErrorEvent{ + Err: fmt.Errorf("unexpected tryUpdateEngine error type: %w", err), + }) + } + } +} + func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error { + e.mu.Lock() + defer e.mu.Unlock() + return e.insertUnsafePayload(ctx, envelope, ref) +} + +func (e *EngineController) insertUnsafePayload(ctx context.Context, envelope *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error { // Check if there is a finalized head once when doing EL sync. If so, transition to CL sync if e.syncStatus == syncStatusWillStartEL { b, err := e.engine.L2BlockRefByLabel(ctx, eth.Finalized) @@ -556,7 +584,7 @@ func (e *EngineController) shouldTryBackupUnsafeReorg() bool { return false } // This method must be never called when EL sync. If EL sync is in progress, early return. - if e.IsEngineSyncing() { + if e.isEngineSyncing() { e.log.Warn("Attempting to unsafe reorg using backupUnsafe while EL syncing") return false } @@ -568,9 +596,15 @@ func (e *EngineController) shouldTryBackupUnsafeReorg() bool { return true } -// TryBackupUnsafeReorg attempts to reorg(restore) unsafe head to backupUnsafeHead. -// If succeeds, update current forkchoice state to the rollup node. func (e *EngineController) TryBackupUnsafeReorg(ctx context.Context) (bool, error) { + e.mu.Lock() + defer e.mu.Unlock() + return e.tryBackupUnsafeReorg(ctx) +} + +// tryBackupUnsafeReorg attempts to reorg(restore) unsafe head to backupUnsafeHead. +// If succeeds, update current forkchoice state to the rollup node. +func (e *EngineController) tryBackupUnsafeReorg(ctx context.Context) (bool, error) { if !e.shouldTryBackupUnsafeReorg() { // Do not need to perform FCU. return false, nil @@ -625,24 +659,14 @@ func (e *EngineController) TryBackupUnsafeReorg(ctx context.Context) (bool, erro } func (e *EngineController) TryUpdateEngine(ctx context.Context) { - // If we don't need to call FCU, keep going b/c this was a no-op. If we needed to - // perform a network call, then we should yield even if we did not encounter an error. - if err := e.tryUpdateEngine(e.ctx); err != nil && !errors.Is(err, ErrNoFCUNeeded) { - if errors.Is(err, derive.ErrReset) { - e.emitter.Emit(ctx, rollup.ResetEvent{Err: err}) - } else if errors.Is(err, derive.ErrTemporary) { - e.emitter.Emit(ctx, rollup.EngineTemporaryErrorEvent{Err: err}) - } else { - e.emitter.Emit(ctx, rollup.CriticalErrorEvent{ - Err: fmt.Errorf("unexpected tryUpdateEngine error type: %w", err), - }) - } - } + e.mu.Lock() + defer e.mu.Unlock() + e.tryUpdateEngine(ctx) } // TODO(#16917) Remove Event System Refactor Comments // OnEvent implements event.Deriver (moved from EngDeriver) -// TryUpdateEngineEvent is replaced with TryUpdateEngine +// TryUpdateEngineEvent is replaced with tryUpdateEngine func (e *EngineController) OnEvent(ctx context.Context, ev event.Event) bool { e.mu.Lock() defer e.mu.Unlock() @@ -656,7 +680,7 @@ func (e *EngineController) OnEvent(ctx context.Context, ev event.Event) bool { e.emitter.Emit(ctx, PromoteCrossUnsafeEvent(x)) } // Try to apply the forkchoice changes - e.TryUpdateEngine(ctx) + e.tryUpdateEngine(ctx) case PromoteCrossUnsafeEvent: e.SetCrossUnsafeHead(x.Ref) e.onUnsafeUpdate(ctx, x.Ref, e.unsafeHead) @@ -693,15 +717,24 @@ func (e *EngineController) OnEvent(ctx context.Context, ev event.Event) bool { return true } -func (e *EngineController) RequestPendingSafeUpdate(ctx context.Context) { - e.emitter.Emit(ctx, PendingSafeUpdateEvent{ - PendingSafe: e.pendingSafeHead, - Unsafe: e.unsafeHead, +func (d *EngineController) RequestPendingSafeUpdate(ctx context.Context) { + d.mu.Lock() + defer d.mu.Unlock() + d.emitter.Emit(ctx, PendingSafeUpdateEvent{ + PendingSafe: d.PendingSafeL2Head(), + Unsafe: d.UnsafeL2Head(), }) } -// TryUpdatePendingSafe updates the pending safe head if the new reference is newer +// TryUpdatePendingSafe updates the pending safe head if the new reference is newer, acquiring lock func (e *EngineController) TryUpdatePendingSafe(ctx context.Context, ref eth.L2BlockRef, concluding bool, source eth.L1BlockRef) { + e.mu.Lock() + defer e.mu.Unlock() + e.tryUpdatePendingSafe(ctx, ref, concluding, source) +} + +// tryUpdatePendingSafe updates the pending safe head if the new reference is newer +func (e *EngineController) tryUpdatePendingSafe(ctx context.Context, ref eth.L2BlockRef, concluding bool, source eth.L1BlockRef) { // Only promote if not already stale. // Resets/overwrites happen through engine-resets, not through promotion. if ref.Number > e.pendingSafeHead.Number { @@ -714,8 +747,15 @@ func (e *EngineController) TryUpdatePendingSafe(ctx context.Context, ref eth.L2B } } -// TryUpdateLocalSafe updates the local safe head if the new reference is newer and concluding +// TryUpdateLocalSafe updates the local safe head if the new reference is newer and concluding, acquiring lock func (e *EngineController) TryUpdateLocalSafe(ctx context.Context, ref eth.L2BlockRef, concluding bool, source eth.L1BlockRef) { + e.mu.Lock() + defer e.mu.Unlock() + e.tryUpdateLocalSafe(ctx, ref, concluding, source) +} + +// tryUpdateLocalSafe updates the local safe head if the new reference is newer and concluding +func (e *EngineController) tryUpdateLocalSafe(ctx context.Context, ref eth.L2BlockRef, concluding bool, source eth.L1BlockRef) { if concluding && ref.Number > e.localSafeHead.Number { // Promote to local safe e.log.Debug("Updating local safe", "local_safe", ref, "safe", e.safeHead, "unsafe", e.unsafeHead) @@ -725,7 +765,7 @@ func (e *EngineController) TryUpdateLocalSafe(ctx context.Context, ref eth.L2Blo } // TryUpdateUnsafe updates the unsafe head and backs up the previous one if needed -func (e *EngineController) TryUpdateUnsafe(ctx context.Context, ref eth.L2BlockRef) { +func (e *EngineController) tryUpdateUnsafe(ctx context.Context, ref eth.L2BlockRef) { // Backup unsafeHead when new block is not built on original unsafe head. if e.unsafeHead.Number >= ref.Number { e.SetBackupUnsafeL2Head(e.unsafeHead, false) @@ -746,10 +786,16 @@ func (e *EngineController) PromoteSafe(ctx context.Context, ref eth.L2BlockRef, e.onUnsafeUpdate(ctx, ref, e.unsafeHead) } // Try to apply the forkchoice changes - e.TryUpdateEngine(ctx) + e.tryUpdateEngine(ctx) } func (e *EngineController) PromoteFinalized(ctx context.Context, ref eth.L2BlockRef) { + e.mu.Lock() + defer e.mu.Unlock() + e.promoteFinalized(ctx, ref) +} +func (e *EngineController) promoteFinalized(ctx context.Context, ref eth.L2BlockRef) { + if ref.Number < e.finalizedHead.Number { e.log.Error("Cannot rewind finality,", "ref", ref, "finalized", e.finalizedHead) return @@ -761,7 +807,7 @@ func (e *EngineController) PromoteFinalized(ctx context.Context, ref eth.L2Block e.SetFinalizedHead(ref) e.emitter.Emit(ctx, FinalizedUpdateEvent{Ref: ref}) // Try to apply the forkchoice changes - e.TryUpdateEngine(ctx) + e.tryUpdateEngine(ctx) } // SetAttributesResetter sets the attributes component that needs force reset notifications @@ -779,8 +825,15 @@ func (e *EngineController) SetOriginSelectorResetter(resetter OriginSelectorForc e.originSelectorResetter = resetter } -// ForceReset performs a forced reset to the specified block references +// ForceReset performs a forced reset to the specified block references, acquiring lock func (e *EngineController) ForceReset(ctx context.Context, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef) { + e.mu.Lock() + defer e.mu.Unlock() + e.forceReset(ctx, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized) +} + +// forceReset performs a forced reset to the specified block references +func (e *EngineController) forceReset(ctx context.Context, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized eth.L2BlockRef) { // Reset other components before resetting the engine if e.attributesResetter != nil { e.attributesResetter.ForceReset(ctx, localUnsafe, crossUnsafe, localSafe, crossSafe, finalized) @@ -800,7 +853,7 @@ func (e *EngineController) ForceReset(ctx context.Context, localUnsafe, crossUns } // Time to apply the changes to the underlying engine - e.TryUpdateEngine(ctx) + e.tryUpdateEngine(ctx) v := EngineResetConfirmedEvent{ LocalUnsafe: e.unsafeHead, @@ -892,7 +945,7 @@ func (e *EngineController) processUnsafePayload(ctx context.Context, envelope *e if ref.BlockRef().ID() == e.unsafeHead.BlockRef().ID() { return } - if err := e.InsertUnsafePayload(e.ctx, envelope, ref); err != nil { + if err := e.insertUnsafePayload(e.ctx, envelope, ref); err != nil { e.log.Info("failed to insert payload", "ref", ref, "txs", len(envelope.ExecutionPayload.Transactions), "err", err) // yes, duplicate error-handling. After all derivers are interacting with the engine diff --git a/op-node/rollup/engine/payload_success.go b/op-node/rollup/engine/payload_success.go index e12e7da7c26ff..6cb6d356254d9 100644 --- a/op-node/rollup/engine/payload_success.go +++ b/op-node/rollup/engine/payload_success.go @@ -28,27 +28,27 @@ func (e *EngineController) onPayloadSuccess(ctx context.Context, ev PayloadSucce e.log.Warn("Successfully built replacement block, resetting chain to continue now", "replacement", ev.Ref) // Change the engine state to make the replacement block the cross-safe head of the chain, // And continue syncing from there. - e.ForceReset(ctx, ev.Ref, ev.Ref, ev.Ref, ev.Ref, e.Finalized()) + e.forceReset(ctx, ev.Ref, ev.Ref, ev.Ref, ev.Ref, e.Finalized()) e.emitter.Emit(ctx, InteropReplacedBlockEvent{ Envelope: ev.Envelope, Ref: ev.Ref.BlockRef(), }) // Apply it to the execution engine - e.TryUpdateEngine(ctx) + e.tryUpdateEngine(ctx) // Not a regular reset, since we don't wind back to any L2 block. // We start specifically from the replacement block. return } // TryUpdateUnsafe, TryUpdatePendingSafe, TryUpdateLocalSafe, tryUpdateEngine must be sequentially invoked - e.TryUpdateUnsafe(ctx, ev.Ref) + e.tryUpdateUnsafe(ctx, ev.Ref) // If derived from L1, then it can be considered (pending) safe if ev.DerivedFrom != (eth.L1BlockRef{}) { - e.TryUpdatePendingSafe(ctx, ev.Ref, ev.Concluding, ev.DerivedFrom) - e.TryUpdateLocalSafe(ctx, ev.Ref, ev.Concluding, ev.DerivedFrom) + e.tryUpdatePendingSafe(ctx, ev.Ref, ev.Concluding, ev.DerivedFrom) + e.tryUpdateLocalSafe(ctx, ev.Ref, ev.Concluding, ev.DerivedFrom) } // Now if possible synchronously call FCU - err := e.tryUpdateEngine(ctx) + err := e.tryUpdateEngineInternal(ctx) if err != nil { e.log.Error("Failed to update engine", "error", err) }