diff --git a/op-e2e/actions/helpers/l2_verifier.go b/op-e2e/actions/helpers/l2_verifier.go index b2bd8a957ba9d..dd1724250f7b2 100644 --- a/op-e2e/actions/helpers/l2_verifier.go +++ b/op-e2e/actions/helpers/l2_verifier.go @@ -180,7 +180,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, Log: log, Ctx: ctx, Drain: executor.Drain, - ManagedMode: false, + ManagedMode: managedMode, }, opts) sys.Register("engine", engine.NewEngDeriver(log, ctx, cfg, metrics, ec), opts) diff --git a/op-e2e/actions/interop/dsl/dsl.go b/op-e2e/actions/interop/dsl/dsl.go index e2457c2b9caa2..888af9f98d0fa 100644 --- a/op-e2e/actions/interop/dsl/dsl.go +++ b/op-e2e/actions/interop/dsl/dsl.go @@ -64,18 +64,12 @@ type InteropDSL struct { func NewInteropDSL(t helpers.Testing) *InteropDSL { setup := SetupInterop(t) actors := setup.CreateActors() + actors.PrepareChainState(t) t.Logf("ChainA: %v, ChainB: %v", actors.ChainA.ChainID, actors.ChainB.ChainID) allChains := []*Chain{actors.ChainA, actors.ChainB} - // Get all the initial events processed - for _, chain := range allChains { - chain.Sequencer.ActL2PipelineFull(t) - chain.Sequencer.SyncSupervisor(t) - } - actors.Supervisor.ProcessFull(t) - superRootSource, err := NewSuperRootSource( t.Ctx(), actors.ChainA.Sequencer.RollupClient(), diff --git a/op-e2e/actions/interop/dsl/interop.go b/op-e2e/actions/interop/dsl/interop.go index 9bcf2e1838196..35664cd404791 100644 --- a/op-e2e/actions/interop/dsl/interop.go +++ b/op-e2e/actions/interop/dsl/interop.go @@ -68,6 +68,37 @@ type InteropActors struct { ChainB *Chain } +func (actors *InteropActors) PrepareChainState(t helpers.Testing) { + // Initialize both chain states + actors.ChainA.Sequencer.ActL2PipelineFull(t) + actors.ChainB.Sequencer.ActL2PipelineFull(t) + t.Log("Sequencers should initialize, and produce initial reset requests") + + // Process the anchor point + actors.Supervisor.ProcessFull(t) + t.Log("Supervisor should have anchor points now") + + // Sync supervisors, i.e. the reset request makes it to the supervisor now + actors.ChainA.Sequencer.SyncSupervisor(t) + actors.ChainB.Sequencer.SyncSupervisor(t) + t.Log("Supervisor has events now") + + // Pick up the reset request + actors.Supervisor.ProcessFull(t) + t.Log("Supervisor processed initial resets") + + // Process reset work + actors.ChainA.Sequencer.ActL2PipelineFull(t) + actors.ChainB.Sequencer.ActL2PipelineFull(t) + t.Log("Processed!") + + // Verify initial state + statusA := actors.ChainA.Sequencer.SyncStatus() + statusB := actors.ChainB.Sequencer.SyncStatus() + require.Equal(t, uint64(0), statusA.UnsafeL2.Number) + require.Equal(t, uint64(0), statusB.UnsafeL2.Number) +} + // messageExpiryTime is the time in seconds that a message will be valid for on the L2 chain. // At a 2 second block time, this should be small enough to cover all events buffered in the supervisor event queue. const messageExpiryTime = 120 // 2 minutes diff --git a/op-e2e/actions/interop/emitter_contract_test.go b/op-e2e/actions/interop/emitter_contract_test.go index 6f3c25a0cbc9e..756e5e84adfe1 100644 --- a/op-e2e/actions/interop/emitter_contract_test.go +++ b/op-e2e/actions/interop/emitter_contract_test.go @@ -43,7 +43,7 @@ func TestEmitterContract(gt *testing.T) { actors = is.CreateActors() aliceA = setupUser(t, is, actors.ChainA, 0) aliceB = setupUser(t, is, actors.ChainB, 0) - initializeChainState(t, actors) + actors.PrepareChainState(t) emitTx = initializeEmitterContractTest(t, aliceA, actors) } @@ -145,25 +145,6 @@ func idForTx(t helpers.Testing, tx *types.Transaction, srcChain *dsl.Chain) inbo } } -func initializeChainState(t helpers.Testing, actors *dsl.InteropActors) { - // Initialize both chain states - actors.ChainA.Sequencer.ActL2PipelineFull(t) - actors.ChainB.Sequencer.ActL2PipelineFull(t) - - // Sync supervisors - actors.ChainA.Sequencer.SyncSupervisor(t) - actors.ChainB.Sequencer.SyncSupervisor(t) - - // Verify initial state - statusA := actors.ChainA.Sequencer.SyncStatus() - statusB := actors.ChainB.Sequencer.SyncStatus() - require.Equal(t, uint64(0), statusA.UnsafeL2.Number) - require.Equal(t, uint64(0), statusB.UnsafeL2.Number) - - // Complete initial sync - actors.Supervisor.ProcessFull(t) -} - func initializeEmitterContractTest(t helpers.Testing, aliceA *userWithKeys, actors *dsl.InteropActors) *types.Transaction { // Deploy message contract and emit a log on ChainA // This issues two blocks to ChainA diff --git a/op-e2e/actions/interop/interop_test.go b/op-e2e/actions/interop/interop_test.go index 714fcf11140f0..9f4d5dd1031e5 100644 --- a/op-e2e/actions/interop/interop_test.go +++ b/op-e2e/actions/interop/interop_test.go @@ -25,10 +25,7 @@ func TestFullInterop(gt *testing.T) { is := dsl.SetupInterop(t) actors := is.CreateActors() - - // get both sequencers set up - actors.ChainA.Sequencer.ActL2PipelineFull(t) - actors.ChainB.Sequencer.ActL2PipelineFull(t) + actors.PrepareChainState(t) // sync the supervisor, handle initial events emitted by the nodes actors.ChainA.Sequencer.SyncSupervisor(t) @@ -168,10 +165,7 @@ func TestFinality(gt *testing.T) { testFinality := func(t helpers.StatefulTesting, extraBlocks int) { is := dsl.SetupInterop(t) actors := is.CreateActors() - - // set up a blank ChainA - actors.ChainA.Sequencer.ActL2PipelineFull(t) - actors.ChainA.Sequencer.SyncSupervisor(t) + actors.PrepareChainState(t) actors.Supervisor.ProcessFull(t) @@ -250,15 +244,7 @@ func TestInteropLocalSafeInvalidation(gt *testing.T) { is := dsl.SetupInterop(t) actors := is.CreateActors() - - // get both sequencers set up - actors.ChainA.Sequencer.ActL2PipelineFull(t) - actors.ChainB.Sequencer.ActL2PipelineFull(t) - - // sync the supervisor, handle initial events emitted by the nodes - actors.ChainA.Sequencer.SyncSupervisor(t) - actors.ChainB.Sequencer.SyncSupervisor(t) - actors.Supervisor.ProcessFull(t) + actors.PrepareChainState(t) genesisB := actors.ChainB.Sequencer.SyncStatus() @@ -376,15 +362,7 @@ func TestInteropCrossSafeDependencyDelay(gt *testing.T) { is := dsl.SetupInterop(t) actors := is.CreateActors() - - // get both sequencers set up - actors.ChainA.Sequencer.ActL2PipelineFull(t) - actors.ChainB.Sequencer.ActL2PipelineFull(t) - - // sync the supervisor, handle initial events emitted by the nodes - actors.ChainA.Sequencer.SyncSupervisor(t) - actors.ChainB.Sequencer.SyncSupervisor(t) - actors.Supervisor.ProcessFull(t) + actors.PrepareChainState(t) // We create a batch with some empty blocks before and after the cross-chain message, // so multiple L2 blocks are all derived from the same L1 block. diff --git a/op-e2e/actions/interop/reset_test.go b/op-e2e/actions/interop/reset_test.go index 58ae285f534d6..b7585bb5430a2 100644 --- a/op-e2e/actions/interop/reset_test.go +++ b/op-e2e/actions/interop/reset_test.go @@ -16,11 +16,7 @@ func TestReset(gt *testing.T) { is := dsl.SetupInterop(t) actors := is.CreateActors() - - // get both sequencers set up - // sync the supervisor, handle initial events emitted by the nodes - actors.ChainA.Sequencer.ActL2PipelineFull(t) - actors.ChainA.Sequencer.SyncSupervisor(t) + actors.PrepareChainState(t) // No blocks yet status := actors.ChainA.Sequencer.SyncStatus() diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index 67d0724b1c5fa..c0713ec8119b2 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -367,11 +367,11 @@ func (s *SyncDeriver) onEngineConfirmedReset(x engine.EngineResetConfirmedEvent) // and don't confirm the engine-reset with the derivation pipeline. // The pipeline will re-trigger a reset as necessary. if s.SafeHeadNotifs != nil { - if err := s.SafeHeadNotifs.SafeHeadReset(x.Safe); err != nil { - s.Log.Error("Failed to warn safe-head notifier of safe-head reset", "safe", x.Safe) + if err := s.SafeHeadNotifs.SafeHeadReset(x.CrossSafe); err != nil { + s.Log.Error("Failed to warn safe-head notifier of safe-head reset", "safe", x.CrossSafe) return } - if s.SafeHeadNotifs.Enabled() && x.Safe.ID() == s.Config.Genesis.L2 { + if s.SafeHeadNotifs.Enabled() && x.CrossSafe.ID() == s.Config.Genesis.L2 { // The rollup genesis block is always safe by definition. So if the pipeline resets this far back we know // we will process all safe head updates and can record genesis as always safe from L1 genesis. // Note that it is not safe to use cfg.Genesis.L1 here as it is the block immediately before the L2 genesis @@ -382,23 +382,20 @@ func (s *SyncDeriver) onEngineConfirmedReset(x engine.EngineResetConfirmedEvent) s.Log.Error("Failed to retrieve L1 genesis, cannot notify genesis as safe block", "err", err) return } - if err := s.SafeHeadNotifs.SafeHeadUpdated(x.Safe, l1Genesis.ID()); err != nil { + if err := s.SafeHeadNotifs.SafeHeadUpdated(x.CrossSafe, l1Genesis.ID()); err != nil { s.Log.Error("Failed to notify safe-head listener of safe-head", "err", err) return } } } + s.Log.Info("Confirming pipeline reset") s.Emitter.Emit(derive.ConfirmPipelineResetEvent{}) } func (s *SyncDeriver) onResetEvent(x rollup.ResetEvent) { if s.ManagedMode { - if errors.Is(x.Err, derive.ErrEngineResetReq) { - s.Log.Warn("Managed Mode is enabled, but engine reset is required", "err", x.Err) - s.Emitter.Emit(engine.ResetEngineRequestEvent{}) - } else { - s.Log.Warn("Encountered reset, waiting for op-supervisor to recover", "err", x.Err) - } + s.Log.Warn("Encountered reset in Managed Mode, waiting for op-supervisor", "err", x.Err) + // ManagedMode will pick up the ResetEvent return } // If the system corrupts, e.g. due to a reorg, simply reset it diff --git a/op-node/rollup/engine/engine_controller.go b/op-node/rollup/engine/engine_controller.go index 907238e84a9ef..67ea651c99405 100644 --- a/op-node/rollup/engine/engine_controller.go +++ b/op-node/rollup/engine/engine_controller.go @@ -262,6 +262,56 @@ func (e *EngineController) checkForkchoiceUpdatedStatus(status eth.ExecutePayloa return status == eth.ExecutionValid } +// initializeUnknowns is important to give the op-node EngineController engine state. +// Pre-interop, the initial reset triggered a find-sync-start, and filled the forkchoice. +// This still happens, but now overrides what may be initialized here. +// Post-interop, the op-supervisor may diff the forkchoice state against the supervisor DB, +// to determine where to perform the initial reset to. +func (e *EngineController) initializeUnknowns(ctx context.Context) error { + if e.unsafeHead == (eth.L2BlockRef{}) { + ref, err := e.engine.L2BlockRefByLabel(ctx, eth.Unsafe) + if err != nil { + return fmt.Errorf("failed to load local-unsafe head: %w", err) + } + e.SetUnsafeHead(ref) + e.log.Info("Loaded initial local-unsafe block ref", "local_unsafe", ref) + } + var finalizedRef eth.L2BlockRef + if e.finalizedHead == (eth.L2BlockRef{}) { + var err error + finalizedRef, err = e.engine.L2BlockRefByLabel(ctx, eth.Finalized) + if err != nil { + return fmt.Errorf("failed to load finalized head: %w", err) + } + e.SetFinalizedHead(finalizedRef) + e.log.Info("Loaded initial finalized block ref", "finalized", finalizedRef) + } + if e.safeHead == (eth.L2BlockRef{}) { + ref, err := e.engine.L2BlockRefByLabel(ctx, eth.Safe) + if err != nil { + if errors.Is(err, ethereum.NotFound) { + // If the engine doesn't have a safe head, then we can use the finalized head + e.SetSafeHead(finalizedRef) + e.log.Info("Loaded initial cross-safe block from finalized", "cross_safe", finalizedRef) + } else { + return fmt.Errorf("failed to load cross-safe head: %w", err) + } + } else { + e.SetSafeHead(ref) + e.log.Info("Loaded initial cross-safe block ref", "cross_safe", ref) + } + } + if e.crossUnsafeHead == (eth.L2BlockRef{}) { + e.SetCrossUnsafeHead(e.safeHead) // preserve cross-safety, don't fall back to a non-cross safety level + e.log.Info("Set initial cross-unsafe block ref to match cross-safe", "cross_unsafe", e.safeHead) + } + if e.localSafeHead == (eth.L2BlockRef{}) { + e.SetLocalSafeHead(e.safeHead) + e.log.Info("Set initial local-safe block ref to match cross-safe", "local_safe", e.safeHead) + } + return nil +} + // TryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node, // this is a no-op if the nodes already agree on the forkchoice state. func (e *EngineController) TryUpdateEngine(ctx context.Context) error { @@ -271,6 +321,9 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error { if e.IsEngineSyncing() { e.log.Warn("Attempting to update forkchoice state while EL syncing") } + if err := e.initializeUnknowns(ctx); err != nil { + return derive.NewTemporaryError(fmt.Errorf("cannot update engine until engine forkchoice is initialized: %w", err)) + } if e.unsafeHead.Number < e.finalizedHead.Number { err := fmt.Errorf("invalid forkchoice state, unsafe head %s is behind finalized head %s", e.unsafeHead, e.finalizedHead) e.emitter.Emit(rollup.CriticalErrorEvent{Err: err}) // make the node exit, things are very wrong. diff --git a/op-node/rollup/engine/engine_reset.go b/op-node/rollup/engine/engine_reset.go index 7574f9cf963a0..86228eb209085 100644 --- a/op-node/rollup/engine/engine_reset.go +++ b/op-node/rollup/engine/engine_reset.go @@ -14,6 +14,7 @@ import ( // ResetEngineRequestEvent requests the EngineResetDeriver to walk // the L2 chain backwards until it finds a plausible unsafe head, // and find an L2 safe block that is guaranteed to still be from the L1 chain. +// This event is not used in interop. type ResetEngineRequestEvent struct{} func (ev ResetEngineRequestEvent) String() string { @@ -56,9 +57,11 @@ func (d *EngineResetDeriver) OnEvent(ev event.Event) bool { return true } d.emitter.Emit(rollup.ForceResetEvent{ - Unsafe: result.Unsafe, - Safe: result.Safe, - Finalized: result.Finalized, + LocalUnsafe: result.Unsafe, + CrossUnsafe: result.Unsafe, + LocalSafe: result.Safe, + CrossSafe: result.Safe, + Finalized: result.Finalized, }) default: return false diff --git a/op-node/rollup/engine/events.go b/op-node/rollup/engine/events.go index d7f47cd23e156..2a5e2bfc5ad7c 100644 --- a/op-node/rollup/engine/events.go +++ b/op-node/rollup/engine/events.go @@ -267,7 +267,11 @@ func (ev TryUpdateEngineEvent) getBlockProcessingMetrics() []interface{} { } type EngineResetConfirmedEvent struct { - Unsafe, Safe, Finalized eth.L2BlockRef + LocalUnsafe eth.L2BlockRef + CrossUnsafe eth.L2BlockRef + LocalSafe eth.L2BlockRef + CrossSafe eth.L2BlockRef + Finalized eth.L2BlockRef } func (ev EngineResetConfirmedEvent) String() string { @@ -430,10 +434,22 @@ func (d *EngDeriver) OnEvent(ev event.Event) bool { // Time to apply the changes to the underlying engine d.emitter.Emit(TryUpdateEngineEvent{}) - log.Debug("Reset of Engine is completed", - "safeHead", x.Safe, "unsafe", x.Unsafe, "safe_timestamp", x.Safe.Time, - "unsafe_timestamp", x.Unsafe.Time) - d.emitter.Emit(EngineResetConfirmedEvent(x)) + v := EngineResetConfirmedEvent{ + LocalUnsafe: d.ec.LocalSafeL2Head(), + CrossUnsafe: d.ec.CrossUnsafeL2Head(), + LocalSafe: d.ec.LocalSafeL2Head(), + CrossSafe: d.ec.SafeL2Head(), + Finalized: d.ec.Finalized(), + } + // We do not emit the original event values, since those might not be set (optional attributes). + d.emitter.Emit(v) + d.log.Info("Reset of Engine is completed", + "local_unsafe", v.LocalUnsafe, + "cross_unsafe", v.CrossUnsafe, + "local_safe", v.LocalSafe, + "cross_safe", v.CrossSafe, + "finalized", v.Finalized, + ) case PromoteUnsafeEvent: // Backup unsafeHead when new block is not built on original unsafe head. if d.ec.unsafeHead.Number >= x.Ref.Number { @@ -570,24 +586,31 @@ func (d *EngDeriver) OnEvent(ev event.Event) bool { type ResetEngineControl interface { SetUnsafeHead(eth.L2BlockRef) + SetCrossUnsafeHead(ref eth.L2BlockRef) + SetLocalSafeHead(ref eth.L2BlockRef) SetSafeHead(eth.L2BlockRef) SetFinalizedHead(eth.L2BlockRef) - SetLocalSafeHead(ref eth.L2BlockRef) - SetCrossUnsafeHead(ref eth.L2BlockRef) SetBackupUnsafeL2Head(block eth.L2BlockRef, triggerReorg bool) SetPendingSafeL2Head(eth.L2BlockRef) } -// ForceEngineReset is not to be used. The op-program needs it for now, until event processing is adopted there. func ForceEngineReset(ec ResetEngineControl, x rollup.ForceResetEvent) { - // if the unsafe head is not provided, do not override the existing unsafe head - if x.Unsafe != (eth.L2BlockRef{}) { - ec.SetUnsafeHead(x.Unsafe) + // local-unsafe is an optional attribute, empty to preserve the existing latest chain + if x.LocalUnsafe != (eth.L2BlockRef{}) { + ec.SetUnsafeHead(x.LocalUnsafe) } - ec.SetLocalSafeHead(x.Safe) - ec.SetPendingSafeL2Head(x.Safe) + // cross-safe is fine to revert back, it does not affect engine logic, just sync-status + ec.SetCrossUnsafeHead(x.CrossUnsafe) + + // derivation continues at local-safe point + ec.SetLocalSafeHead(x.LocalSafe) + ec.SetPendingSafeL2Head(x.LocalSafe) + + // "safe" in RPC terms is cross-safe + ec.SetSafeHead(x.CrossSafe) + + // finalized head ec.SetFinalizedHead(x.Finalized) - ec.SetSafeHead(x.Safe) - ec.SetCrossUnsafeHead(x.Safe) + ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false) } diff --git a/op-node/rollup/engine/payload_success.go b/op-node/rollup/engine/payload_success.go index 433acb8915bb3..e7454168acc2d 100644 --- a/op-node/rollup/engine/payload_success.go +++ b/op-node/rollup/engine/payload_success.go @@ -29,9 +29,11 @@ func (eq *EngDeriver) onPayloadSuccess(ev PayloadSuccessEvent) { // Change the engine state to make the replacement block the cross-safe head of the chain, // And continue syncing from there. eq.emitter.Emit(rollup.ForceResetEvent{ - Unsafe: ev.Ref, - Safe: ev.Ref, - Finalized: eq.ec.Finalized(), + LocalUnsafe: ev.Ref, + CrossUnsafe: ev.Ref, + LocalSafe: ev.Ref, + CrossSafe: ev.Ref, + Finalized: eq.ec.Finalized(), }) eq.emitter.Emit(InteropReplacedBlockEvent{ Envelope: ev.Envelope, diff --git a/op-node/rollup/event.go b/op-node/rollup/event.go index 6b4b8c068cd56..6e0fe81eb5d84 100644 --- a/op-node/rollup/event.go +++ b/op-node/rollup/event.go @@ -40,9 +40,15 @@ func (ev ResetEvent) String() string { return "reset-event" } -// ForceResetEvent forces a reset to a specific unsafe/safe/finalized starting point. +// ForceResetEvent forces a reset to a specific local-unsafe/local-safe/finalized starting point. +// Resets may override local-unsafe, to reset the very end of the chain. +// Resets may override local-safe, since post-interop we need the local-safe block derivation to continue. +// Pre-interop both local and cross values should be set the same. type ForceResetEvent struct { - Unsafe, Safe, Finalized eth.L2BlockRef + // LocalUnsafe is optional: the existing chain local-unsafe head will be preserved if this field is zeroed. + LocalUnsafe eth.L2BlockRef + + CrossUnsafe, LocalSafe, CrossSafe, Finalized eth.L2BlockRef } func (ev ForceResetEvent) String() string { diff --git a/op-node/rollup/interop/managed/api.go b/op-node/rollup/interop/managed/api.go index 03de8aa2a8894..a949162146d31 100644 --- a/op-node/rollup/interop/managed/api.go +++ b/op-node/rollup/interop/managed/api.go @@ -43,8 +43,8 @@ func (ib *InteropAPI) AnchorPoint(ctx context.Context) (supervisortypes.DerivedB return ib.backend.AnchorPoint(ctx) } -func (ib *InteropAPI) Reset(ctx context.Context, unsafe, safe, finalized eth.BlockID) error { - return ib.backend.Reset(ctx, unsafe, safe, finalized) +func (ib *InteropAPI) Reset(ctx context.Context, lUnsafe, xUnsafe, lSafe, xSafe, finalized eth.BlockID) error { + return ib.backend.Reset(ctx, lUnsafe, xUnsafe, lSafe, xSafe, finalized) } func (ib *InteropAPI) FetchReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) { diff --git a/op-node/rollup/interop/managed/system.go b/op-node/rollup/interop/managed/system.go index 98f2e91758697..988f242c82d64 100644 --- a/op-node/rollup/interop/managed/system.go +++ b/op-node/rollup/interop/managed/system.go @@ -255,10 +255,19 @@ const ( ConflictingBlockRPCErrCode = -39002 ) -func (m *ManagedMode) Reset(ctx context.Context, unsafe, safe, finalized eth.BlockID) error { - logger := m.log.New("unsafe", unsafe, "safe", safe, "finalized", finalized) - logger.Info("Received reset request", "unsafe", unsafe, "safe", safe, "finalized", finalized) - +func (m *ManagedMode) Reset(ctx context.Context, lUnsafe, xUnsafe, lSafe, xSafe, finalized eth.BlockID) error { + logger := m.log.New( + "localUnsafe", lUnsafe, + "crossUnsafe", xUnsafe, + "localSafe", lSafe, + "crossSafe", xSafe, + "finalized", finalized) + logger.Info("Received reset request", + "localUnsafe", lUnsafe, + "crossUnsafe", xUnsafe, + "localSafe", lSafe, + "crossSafe", xSafe, + "finalized", finalized) verify := func(ref eth.BlockID, name string) (eth.L2BlockRef, error) { result, err := m.l2.L2BlockRefByNumber(ctx, ref.Number) if err != nil { @@ -287,26 +296,42 @@ func (m *ManagedMode) Reset(ctx context.Context, unsafe, safe, finalized eth.Blo return result, nil } - // unsafeRef is always unused, as it is either - // - invalid (does not match, and therefore cannot be used for reset) - // - valid, in which case we will use the full unsafe chain for reset - _, err := verify(unsafe, "unsafe") + // verify all provided references + _, err := verify(lUnsafe, "unsafe") + if err != nil { + logger.Error("Cannot reset, local-unsafe block not known") + return err + } + xUnsafeRef, err := verify(xUnsafe, "cross-unsafe") + if err != nil { + logger.Error("Cannot reset, cross-safe block not known") + return err + } + lSafeRef, err := verify(lSafe, "safe") if err != nil { + logger.Error("Cannot reset, local-safe block not known") return err } - safeRef, err := verify(safe, "safe") + xSafeRef, err := verify(xSafe, "cross-safe") if err != nil { + logger.Error("Cannot reset, cross-safe block not known") return err } finalizedRef, err := verify(finalized, "finalized") if err != nil { + logger.Error("Cannot reset, finalized block not known") return err } m.emitter.Emit(rollup.ForceResetEvent{ - Unsafe: eth.L2BlockRef{}, - Safe: safeRef, - Finalized: finalizedRef, + // Unsafe is not provided, because it is never considered for reset. + // it is either invalid, in which case we cannot reset to it, + // or valid, in which case we reset to the full chain. + LocalUnsafe: eth.L2BlockRef{}, + CrossUnsafe: xUnsafeRef, + LocalSafe: lSafeRef, + CrossSafe: xSafeRef, + Finalized: finalizedRef, }) return nil } diff --git a/op-node/rollup/sequencing/sequencer_chaos_test.go b/op-node/rollup/sequencing/sequencer_chaos_test.go index 93ba254c1b545..e4bc55154d0aa 100644 --- a/op-node/rollup/sequencing/sequencer_chaos_test.go +++ b/op-node/rollup/sequencing/sequencer_chaos_test.go @@ -107,9 +107,11 @@ func (c *ChaoticEngine) OnEvent(ev event.Event) bool { c.currentPayloadInfo = eth.PayloadInfo{} c.currentAttributes = nil c.emitter.Emit(engine.EngineResetConfirmedEvent{ - Unsafe: c.unsafe, - Safe: c.safe, - Finalized: c.finalized, + LocalUnsafe: c.unsafe, + CrossUnsafe: c.unsafe, + LocalSafe: c.safe, + CrossSafe: c.safe, + Finalized: c.finalized, }) case engine.BuildInvalidEvent: // Engine translates the internal BuildInvalidEvent event diff --git a/op-node/rollup/status/status.go b/op-node/rollup/status/status.go index 26e9ddbc2197e..9937a54731143 100644 --- a/op-node/rollup/status/status.go +++ b/op-node/rollup/status/status.go @@ -118,8 +118,10 @@ func (st *StatusTracker) OnEvent(ev event.Event) bool { st.data.SafeL2 = eth.L2BlockRef{} st.data.CurrentL1 = eth.L1BlockRef{} case engine.EngineResetConfirmedEvent: - st.data.UnsafeL2 = x.Unsafe - st.data.SafeL2 = x.Safe + st.data.UnsafeL2 = x.LocalUnsafe + st.data.CrossUnsafeL2 = x.CrossUnsafe + st.data.LocalSafeL2 = x.LocalSafe + st.data.SafeL2 = x.CrossSafe st.data.FinalizedL2 = x.Finalized default: // other events do not affect the sync status return false diff --git a/op-supervisor/supervisor/backend/backend.go b/op-supervisor/supervisor/backend/backend.go index 6b5b80d5462f2..abd7ebe93e927 100644 --- a/op-supervisor/supervisor/backend/backend.go +++ b/op-supervisor/supervisor/backend/backend.go @@ -522,6 +522,14 @@ func (su *SupervisorBackend) SafeDerivedAt(ctx context.Context, chainID eth.Chai return v.ID(), nil } +func (su *SupervisorBackend) FindSealedBlock(ctx context.Context, chainID eth.ChainID, number uint64) (eth.BlockID, error) { + seal, err := su.chainDBs.FindSealedBlock(chainID, number) + if err != nil { + return eth.BlockID{}, err + } + return seal.ID(), nil +} + // AllSafeDerivedAt returns the last derived block for each chain, from the given L1 block func (su *SupervisorBackend) AllSafeDerivedAt(ctx context.Context, source eth.BlockID) (map[eth.ChainID]eth.BlockID, error) { chains := su.depSet.Chains() @@ -548,6 +556,18 @@ func (su *SupervisorBackend) FinalizedL1() eth.BlockRef { return su.chainDBs.FinalizedL1() } +func (su *SupervisorBackend) IsLocalUnsafe(ctx context.Context, chainID eth.ChainID, block eth.BlockID) error { + return su.chainDBs.IsLocalUnsafe(chainID, block) +} + +func (su *SupervisorBackend) IsCrossSafe(ctx context.Context, chainID eth.ChainID, block eth.BlockID) error { + return su.chainDBs.IsCrossSafe(chainID, block) +} + +func (su *SupervisorBackend) IsLocalSafe(ctx context.Context, chainID eth.ChainID, block eth.BlockID) error { + return su.chainDBs.IsLocalSafe(chainID, block) +} + func (su *SupervisorBackend) CrossDerivedToSource(ctx context.Context, chainID eth.ChainID, derived eth.BlockID) (source eth.BlockRef, err error) { v, err := su.chainDBs.CrossDerivedToSourceRef(chainID, derived) if err != nil { diff --git a/op-supervisor/supervisor/backend/db/anchor.go b/op-supervisor/supervisor/backend/db/anchor.go index e862039ff47f3..1e07ee6fbaa9a 100644 --- a/op-supervisor/supervisor/backend/db/anchor.go +++ b/op-supervisor/supervisor/backend/db/anchor.go @@ -58,7 +58,8 @@ func (db *ChainsDB) maybeInitSafeDB(id eth.ChainID, anchor types.DerivedBlockRef if err := db.initializedUpdateCrossSafe(id, anchor.Source, anchor.Derived); err != nil { return err } - db.initializedUpdateLocalSafe(id, anchor.Source, anchor.Derived) + // "anchor" is not a node, so failure to update won't be caught by any SyncNode + db.initializedUpdateLocalSafe(id, anchor.Source, anchor.Derived, "anchor") } else if err != nil { return fmt.Errorf("failed to check if chain database is initialized: %w", err) } else { diff --git a/op-supervisor/supervisor/backend/db/db.go b/op-supervisor/supervisor/backend/db/db.go index 13cd9ad306a16..e7b471bd8ead2 100644 --- a/op-supervisor/supervisor/backend/db/db.go +++ b/op-supervisor/supervisor/backend/db/db.go @@ -152,7 +152,7 @@ func (db *ChainsDB) OnEvent(ev event.Event) bool { "chain", x.ChainID, "derived", x.Anchor.Derived, "source", x.Anchor.Source) db.initFromAnchor(x.ChainID, x.Anchor) case superevents.LocalDerivedEvent: - db.UpdateLocalSafe(x.ChainID, x.Derived.Source, x.Derived.Derived) + db.UpdateLocalSafe(x.ChainID, x.Derived.Source, x.Derived.Derived, x.NodeID) case superevents.FinalizedL1RequestEvent: db.onFinalizedL1(x.FinalizedL1) case superevents.ReplaceBlockEvent: diff --git a/op-supervisor/supervisor/backend/db/query.go b/op-supervisor/supervisor/backend/db/query.go index af1e3549c8005..a103ef2f86393 100644 --- a/op-supervisor/supervisor/backend/db/query.go +++ b/op-supervisor/supervisor/backend/db/query.go @@ -76,6 +76,22 @@ func (db *ChainsDB) IsLocalUnsafe(chainID eth.ChainID, block eth.BlockID) error return nil } +func (db *ChainsDB) IsCrossSafe(chainID eth.ChainID, block eth.BlockID) error { + xdb, ok := db.crossDBs.Get(chainID) + if !ok { + return types.ErrUnknownChain + } + return xdb.ContainsDerived(block) +} + +func (db *ChainsDB) IsLocalSafe(chainID eth.ChainID, block eth.BlockID) error { + ldb, ok := db.localDBs.Get(chainID) + if !ok { + return types.ErrUnknownChain + } + return ldb.ContainsDerived(block) +} + func (db *ChainsDB) SafeDerivedAt(chainID eth.ChainID, source eth.BlockID) (types.BlockSeal, error) { lDB, ok := db.localDBs.Get(chainID) if !ok { diff --git a/op-supervisor/supervisor/backend/db/update.go b/op-supervisor/supervisor/backend/db/update.go index 90e036be67eb0..b186600739150 100644 --- a/op-supervisor/supervisor/backend/db/update.go +++ b/op-supervisor/supervisor/backend/db/update.go @@ -82,16 +82,16 @@ func (db *ChainsDB) Rewind(chain eth.ChainID, headBlock eth.BlockID) error { // UpdateLocalSafe updates the local-safe database with the given source and lastDerived blocks. // It wraps an inner function, blocking the call if the database is not initialized. -func (db *ChainsDB) UpdateLocalSafe(chain eth.ChainID, source eth.BlockRef, lastDerived eth.BlockRef) { +func (db *ChainsDB) UpdateLocalSafe(chain eth.ChainID, source eth.BlockRef, lastDerived eth.BlockRef, nodeId string) { logger := db.logger.New("chain", chain, "source", source, "lastDerived", lastDerived) if !db.isInitialized(chain) { logger.Error("cannot UpdateLocalSafe on uninitialized database", "chain", chain) return } - db.initializedUpdateLocalSafe(chain, source, lastDerived) + db.initializedUpdateLocalSafe(chain, source, lastDerived, nodeId) } -func (db *ChainsDB) initializedUpdateLocalSafe(chain eth.ChainID, source eth.BlockRef, lastDerived eth.BlockRef) { +func (db *ChainsDB) initializedUpdateLocalSafe(chain eth.ChainID, source eth.BlockRef, lastDerived eth.BlockRef, nodeId string) { logger := db.logger.New("chain", chain, "ource", source, "lastDerived", lastDerived) localDB, ok := db.localDBs.Get(chain) if !ok { @@ -105,10 +105,10 @@ func (db *ChainsDB) initializedUpdateLocalSafe(chain eth.ChainID, source eth.Blo return } logger.Warn("Failed to update local safe", "err", err) - db.emitter.Emit(superevents.LocalSafeOutOfSyncEvent{ + db.emitter.Emit(superevents.UpdateLocalSafeFailedEvent{ ChainID: chain, - L1Ref: source, Err: err, + NodeID: nodeId, }) return } diff --git a/op-supervisor/supervisor/backend/rewinder/rewinder_test.go b/op-supervisor/supervisor/backend/rewinder/rewinder_test.go index b01a8d613ca04..14c1d68ab70da 100644 --- a/op-supervisor/supervisor/backend/rewinder/rewinder_test.go +++ b/op-supervisor/supervisor/backend/rewinder/rewinder_test.go @@ -851,7 +851,7 @@ func (s *testSetup) makeBlockSafe(chainID eth.ChainID, block eth.L2BlockRef, l1B Number: block.Number, Time: block.Time, ParentHash: block.ParentHash, - }) + }, "test") if makeCrossSafe { require.NoError(s.t, s.chainsDB.UpdateCrossUnsafe(chainID, types.BlockSeal{ diff --git a/op-supervisor/supervisor/backend/superevents/events.go b/op-supervisor/supervisor/backend/superevents/events.go index 7b8b7a41d379c..9c13dcf1eca80 100644 --- a/op-supervisor/supervisor/backend/superevents/events.go +++ b/op-supervisor/supervisor/backend/superevents/events.go @@ -91,16 +91,6 @@ func (ev FinalizedL2UpdateEvent) String() string { return "finalized-l2-update" } -type LocalSafeOutOfSyncEvent struct { - ChainID eth.ChainID - L1Ref eth.BlockRef - Err error -} - -func (ev LocalSafeOutOfSyncEvent) String() string { - return "local-safe-out-of-sync" -} - type LocalUnsafeReceivedEvent struct { ChainID eth.ChainID NewLocalUnsafe eth.BlockRef @@ -113,6 +103,7 @@ func (ev LocalUnsafeReceivedEvent) String() string { type LocalDerivedEvent struct { ChainID eth.ChainID Derived types.DerivedBlockRefPair + NodeID string } func (ev LocalDerivedEvent) String() string { @@ -170,3 +161,13 @@ type ChainRewoundEvent struct { func (ev ChainRewoundEvent) String() string { return "chain-rewound" } + +type UpdateLocalSafeFailedEvent struct { + ChainID eth.ChainID + Err error + NodeID string +} + +func (ev UpdateLocalSafeFailedEvent) String() string { + return "update-local-safe-failed" +} diff --git a/op-supervisor/supervisor/backend/syncnode/controller_test.go b/op-supervisor/supervisor/backend/syncnode/controller_test.go index 9755ab9469160..eb5935f23e9ce 100644 --- a/op-supervisor/supervisor/backend/syncnode/controller_test.go +++ b/op-supervisor/supervisor/backend/syncnode/controller_test.go @@ -25,6 +25,7 @@ type mockSyncControl struct { updateCrossUnsafeFn func(ctx context.Context, derived eth.BlockID) error updateFinalizedFn func(ctx context.Context, id eth.BlockID) error pullEventFn func(ctx context.Context) (*types.ManagedEvent, error) + blockRefByNumFn func(ctx context.Context, number uint64) (eth.BlockRef, error) subscribeEvents gethevent.FeedOf[*types.ManagedEvent] } @@ -47,9 +48,9 @@ func (m *mockSyncControl) ProvideL1(ctx context.Context, ref eth.BlockRef) error return nil } -func (m *mockSyncControl) Reset(ctx context.Context, unsafe, safe, finalized eth.BlockID) error { +func (m *mockSyncControl) Reset(ctx context.Context, lUnsafe, xUnsafe, lSafe, xSafe, finalized eth.BlockID) error { if m.resetFn != nil { - return m.resetFn(ctx, unsafe, safe, finalized) + return m.resetFn(ctx, lUnsafe, lSafe, finalized) } return nil } @@ -86,6 +87,13 @@ func (m *mockSyncControl) UpdateFinalized(ctx context.Context, id eth.BlockID) e return nil } +func (m *mockSyncControl) BlockRefByNumber(ctx context.Context, number uint64) (eth.BlockRef, error) { + if m.blockRefByNumFn != nil { + return m.blockRefByNumFn(ctx, number) + } + return eth.BlockRef{}, nil +} + func (m *mockSyncControl) String() string { return "mock" } @@ -93,10 +101,26 @@ func (m *mockSyncControl) String() string { var _ SyncControl = (*mockSyncControl)(nil) type mockBackend struct { - safeDerivedAtFn func(ctx context.Context, chainID eth.ChainID, source eth.BlockID) (eth.BlockID, error) + localSafeFn func(ctx context.Context, chainID eth.ChainID) (pair types.DerivedIDPair, err error) + finalizedFn func(ctx context.Context, chainID eth.ChainID) (eth.BlockID, error) + safeDerivedAtFn func(ctx context.Context, chainID eth.ChainID, source eth.BlockID) (eth.BlockID, error) + findSealedBlockFn func(ctx context.Context, chainID eth.ChainID, num uint64) (eth.BlockID, error) + isLocalSafeFn func(ctx context.Context, chainID eth.ChainID, blockID eth.BlockID) error + isCrossSafeFn func(ctx context.Context, chainID eth.ChainID, blockID eth.BlockID) error + isLocalUnsafeFn func(ctx context.Context, chainID eth.ChainID, blockID eth.BlockID) error +} + +func (m *mockBackend) FindSealedBlock(ctx context.Context, chainID eth.ChainID, num uint64) (eth.BlockID, error) { + if m.findSealedBlockFn != nil { + return m.findSealedBlockFn(ctx, chainID, num) + } + return eth.BlockID{}, nil } func (m *mockBackend) LocalSafe(ctx context.Context, chainID eth.ChainID) (pair types.DerivedIDPair, err error) { + if m.localSafeFn != nil { + return m.localSafeFn(ctx, chainID) + } return types.DerivedIDPair{}, nil } @@ -108,6 +132,27 @@ func (m *mockBackend) LocalUnsafe(ctx context.Context, chainID eth.ChainID) (eth return eth.BlockID{}, nil } +func (m *mockBackend) IsLocalSafe(ctx context.Context, chainID eth.ChainID, blockID eth.BlockID) error { + if m.isLocalSafeFn != nil { + return m.isLocalSafeFn(ctx, chainID, blockID) + } + return nil +} + +func (m *mockBackend) IsCrossSafe(ctx context.Context, chainID eth.ChainID, blockID eth.BlockID) error { + if m.isCrossSafeFn != nil { + return m.isCrossSafeFn(ctx, chainID, blockID) + } + return nil +} + +func (m *mockBackend) IsLocalUnsafe(ctx context.Context, chainID eth.ChainID, blockID eth.BlockID) error { + if m.isLocalUnsafeFn != nil { + return m.isLocalUnsafeFn(ctx, chainID, blockID) + } + return nil +} + func (m *mockBackend) SafeDerivedAt(ctx context.Context, chainID eth.ChainID, source eth.BlockID) (derived eth.BlockID, err error) { if m.safeDerivedAtFn != nil { return m.safeDerivedAtFn(ctx, chainID, source) @@ -116,6 +161,9 @@ func (m *mockBackend) SafeDerivedAt(ctx context.Context, chainID eth.ChainID, so } func (m *mockBackend) Finalized(ctx context.Context, chainID eth.ChainID) (eth.BlockID, error) { + if m.finalizedFn != nil { + return m.finalizedFn(ctx, chainID) + } return eth.BlockID{}, nil } @@ -123,6 +171,10 @@ func (m *mockBackend) L1BlockRefByNumber(ctx context.Context, number uint64) (et return eth.L1BlockRef{}, nil } +func (m *mockBackend) CrossUnsafe(ctx context.Context, chainID eth.ChainID) (eth.BlockID, error) { + return eth.BlockID{}, nil +} + var _ backend = (*mockBackend)(nil) func sampleDepSet(t *testing.T) depset.DependencySet { diff --git a/op-supervisor/supervisor/backend/syncnode/iface.go b/op-supervisor/supervisor/backend/syncnode/iface.go index a8970a67d6a63..0fc1f833da3d5 100644 --- a/op-supervisor/supervisor/backend/syncnode/iface.go +++ b/op-supervisor/supervisor/backend/syncnode/iface.go @@ -36,6 +36,7 @@ type SyncSource interface { type SyncControl interface { SubscribeEvents(ctx context.Context, c chan *types.ManagedEvent) (ethereum.Subscription, error) PullEvent(ctx context.Context) (*types.ManagedEvent, error) + BlockRefByNumber(ctx context.Context, number uint64) (eth.BlockRef, error) UpdateCrossUnsafe(ctx context.Context, id eth.BlockID) error UpdateCrossSafe(ctx context.Context, derived eth.BlockID, source eth.BlockID) error @@ -43,7 +44,7 @@ type SyncControl interface { InvalidateBlock(ctx context.Context, seal types.BlockSeal) error - Reset(ctx context.Context, unsafe, safe, finalized eth.BlockID) error + Reset(ctx context.Context, lUnsafe, xUnsafe, lSafe, xSafe, finalized eth.BlockID) error ProvideL1(ctx context.Context, nextL1 eth.BlockRef) error AnchorPoint(ctx context.Context) (types.DerivedBlockRefPair, error) diff --git a/op-supervisor/supervisor/backend/syncnode/node.go b/op-supervisor/supervisor/backend/syncnode/node.go index e577e8ffc4052..f99957f2509db 100644 --- a/op-supervisor/supervisor/backend/syncnode/node.go +++ b/op-supervisor/supervisor/backend/syncnode/node.go @@ -3,10 +3,9 @@ package syncnode import ( "context" "errors" - "fmt" "io" - "strings" "sync" + "sync/atomic" "time" "github.com/ethereum-optimism/optimism/op-service/rpc" @@ -23,11 +22,17 @@ import ( ) type backend interface { - LocalSafe(ctx context.Context, chainID eth.ChainID) (pair types.DerivedIDPair, err error) LocalUnsafe(ctx context.Context, chainID eth.ChainID) (eth.BlockID, error) + CrossUnsafe(ctx context.Context, chainID eth.ChainID) (eth.BlockID, error) + LocalSafe(ctx context.Context, chainID eth.ChainID) (pair types.DerivedIDPair, err error) CrossSafe(ctx context.Context, chainID eth.ChainID) (pair types.DerivedIDPair, err error) - SafeDerivedAt(ctx context.Context, chainID eth.ChainID, source eth.BlockID) (derived eth.BlockID, err error) Finalized(ctx context.Context, chainID eth.ChainID) (eth.BlockID, error) + + FindSealedBlock(ctx context.Context, chainID eth.ChainID, number uint64) (eth.BlockID, error) + IsLocalSafe(ctx context.Context, chainID eth.ChainID, block eth.BlockID) error + IsCrossSafe(ctx context.Context, chainID eth.ChainID, block eth.BlockID) error + IsLocalUnsafe(ctx context.Context, chainID eth.ChainID, block eth.BlockID) error + SafeDerivedAt(ctx context.Context, chainID eth.ChainID, source eth.BlockID) (derived eth.BlockID, err error) L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error) } @@ -57,6 +62,11 @@ type ManagedNode struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup + + lastNodeLocalUnsafe eth.BlockID + lastNodeLocalSafe eth.BlockID + + resetTracker *resetTracker } var _ event.AttachEmitter = (*ManagedNode)(nil) @@ -72,6 +82,12 @@ func NewManagedNode(log log.Logger, id eth.ChainID, node SyncControl, backend ba ctx: ctx, cancel: cancel, } + m.resetTracker = &resetTracker{ + managed: m, + synchronous: noSubscribe, + cancelling: &atomic.Bool{}, + resetting: &atomic.Bool{}, + } if !noSubscribe { m.SubscribeToNodeEvents() } @@ -84,7 +100,22 @@ func (m *ManagedNode) AttachEmitter(em event.Emitter) { } func (m *ManagedNode) OnEvent(ev event.Event) bool { + // if we're resetting, ignore all events + if m.resetTracker.isResetting() { + // even if we are resetting, cancel the reset if the L1 rewinds + if _, ok := ev.(superevents.ChainRewoundEvent); ok { + m.resetTracker.cancelReset() + } + m.log.Debug("Ignoring event during ongoing reset", "event", ev) + return false + } switch x := ev.(type) { + case superevents.UpdateLocalSafeFailedEvent: + if x.ChainID != m.chainID || + x.NodeID != m.Node.String() { + return false + } + m.onUpdateLocalSafeFailed(x) case superevents.InvalidateLocalSafeEvent: if x.ChainID != m.chainID { return false @@ -105,16 +136,10 @@ func (m *ManagedNode) OnEvent(ev event.Event) bool { return false } m.onFinalizedL2(x.FinalizedL2) - case superevents.LocalSafeOutOfSyncEvent: - if x.ChainID != m.chainID { - return false - } - m.resetFromError(x.Err, x.L1Ref) case superevents.ChainRewoundEvent: if x.ChainID != m.chainID { return false } - m.sendReset() default: return false } @@ -202,6 +227,10 @@ func (m *ManagedNode) PullEvents(ctx context.Context) (pulledAny bool, err error } func (m *ManagedNode) onNodeEvent(ev *types.ManagedEvent) { + if m.resetTracker.isResetting() { + m.log.Debug("Ignoring event during ongoing reset", "event", ev) + return + } if ev == nil { m.log.Warn("Received nil event") return @@ -226,15 +255,42 @@ func (m *ManagedNode) onNodeEvent(ev *types.ManagedEvent) { } } +// onResetEvent handles a reset event from the node func (m *ManagedNode) onResetEvent(errStr string) { m.log.Warn("Node sent us a reset error", "err", errStr) - if strings.Contains(errStr, "cannot continue derivation until Engine has been reset") { - // TODO - return + m.resetFullRange() +} + +func (m *ManagedNode) onUpdateLocalSafeFailed(ev superevents.UpdateLocalSafeFailedEvent) { + switch { + case errors.Is(ev.Err, types.ErrConflict): + m.log.Warn("DB indicated a conflict, checking consistency") + m.resetIfInconsistent() + case errors.Is(ev.Err, types.ErrFuture): + m.log.Warn("DB indicated an update is in the future, checking if node is ahead") + m.resetIfAhead() + } +} + +// OnResetReady handles a reset-ready event from the supervisor +// once the supervisor has determined the reset target by bisecting the search range +func (m *ManagedNode) OnResetReady(lUnsafe, xUnsafe, lSafe, xSafe, finalized eth.BlockID) { + m.log.Info("Reset ready event received", + "localUnsafe", lUnsafe, + "crossUnsafe", xUnsafe, + "localSafe", lSafe, + "crossSafe", xSafe, + "finalized", finalized) + ctx, cancel := context.WithTimeout(m.ctx, nodeTimeout) + defer cancel() + // whether the reset passes or fails, this ongoing reset is done + m.resetTracker.endReset() + if err := m.Node.Reset(ctx, + lUnsafe, xUnsafe, + lSafe, xSafe, + finalized); err != nil { + m.log.Error("Failed to reset node", "err", err) } - // Try and restore the safe head of the op-supervisor. - // The node will abort the reset until we find a block that is known. - m.sendReset() } func (m *ManagedNode) onCrossUnsafeUpdate(seal types.BlockSeal) { @@ -279,6 +335,8 @@ func (m *ManagedNode) onUnsafeBlock(unsafeRef eth.BlockRef) { ChainID: m.chainID, NewLocalUnsafe: unsafeRef, }) + m.lastNodeLocalUnsafe = unsafeRef.ID() + m.resetIfInconsistent() } func (m *ManagedNode) onDerivationUpdate(pair types.DerivedBlockRefPair) { @@ -287,7 +345,10 @@ func (m *ManagedNode) onDerivationUpdate(pair types.DerivedBlockRefPair) { m.emitter.Emit(superevents.LocalDerivedEvent{ ChainID: m.chainID, Derived: pair, + NodeID: m.Node.String(), }) + m.lastNodeLocalSafe = pair.Derived.ID() + m.resetIfInconsistent() } func (m *ManagedNode) onDerivationOriginUpdate(origin eth.BlockRef) { @@ -298,164 +359,6 @@ func (m *ManagedNode) onDerivationOriginUpdate(origin eth.BlockRef) { }) } -// resetFromError considers an incoming error signal, and an optional L1 block reference, -// and calls specific reset handling, or passes the call along to the default reset. -func (m *ManagedNode) resetFromError(errSignal error, l1Ref eth.BlockRef) { - switch { - case errors.Is(errSignal, types.ErrConflict): - // conflicts must be resolved via walkback - if err := m.walkback(l1Ref); err != nil { - m.log.Warn("Failed to walkback", "l1Ref", l1Ref, "err", err) - } - case errors.Is(errSignal, types.ErrOutOfOrder): - // if the out of order signal shows the node is far enough behind, - // push a reset to attempt to get the node to tip more quickly. - if m.farBehind(l1Ref.ID()) { - m.log.Warn("Node is far behind and should be reset", "l1Ref", l1Ref, "err", errSignal) - // m.sendReset() - } else { - // otherwise, ignore the out of order signal, the node is near enough to the tip. - m.log.Warn("Node is behind, ignoring", "l1Ref", l1Ref, "err", errSignal) - } - case errors.Is(errSignal, types.ErrFuture): - // if the node is in the future, we need to reset it back to the tip of the supervisor - m.sendReset() - } -} - -// farBehindThreshold is the heuristic threshold for determining if the node is far behind. -var farBehindThreshold = uint64(20) - -// farBehind checks if the node is far behind the given reference block. -// it a heuristic to determine if the node is far behind and should be reset. -func (m *ManagedNode) farBehind(ref eth.BlockID) bool { - ctx, cancel := context.WithTimeout(m.ctx, internalTimeout) - defer cancel() - latest, err := m.backend.LocalSafe(ctx, m.chainID) - if err != nil { - m.log.Warn("Failed to retrieve local-safe", "err", err) - return false - } - // can't be far behind if the latest is lower than the threshold already - if latest.Source.Number < uint64(farBehindThreshold) { - return false - } - // if even after pushing the latest back by the threshold, - // the ref is still behind, then we are far behind. - return ref.Number < latest.Source.Number-farBehindThreshold -} - -func (m *ManagedNode) walkback(l1Ref eth.L1BlockRef) error { - ctx, cancel := context.WithTimeout(m.ctx, internalTimeout) - defer cancel() - - u, err := m.backend.LocalUnsafe(ctx, m.chainID) - if err != nil { - return fmt.Errorf("failed to retrieve local-unsafe: %w", err) - } - f, err := m.backend.Finalized(ctx, m.chainID) - if err != nil { - if errors.Is(err, types.ErrFuture) { - f = eth.BlockID{Number: 0} - } else { - return fmt.Errorf("failed to retrieve finalized: %w", err) - } - } - if err := m.resolveConflict(ctx, l1Ref, u, f); err != nil { - return fmt.Errorf("failed to resolve conflict: %w", err) - } - return nil -} - -func (m *ManagedNode) sendReset() { - ctx, cancel := context.WithTimeout(m.ctx, internalTimeout) - defer cancel() - - u, err := m.backend.LocalUnsafe(ctx, m.chainID) - if err != nil { - m.log.Warn("Failed to retrieve local-unsafe", "err", err) - return - } - s, err := m.backend.CrossSafe(ctx, m.chainID) - if err != nil { - m.log.Warn("Failed to retrieve cross-safe", "err", err) - return - } - f, err := m.backend.Finalized(ctx, m.chainID) - if err != nil { - if errors.Is(err, types.ErrFuture) { - f = eth.BlockID{Number: 0} - } else { - m.log.Warn("Failed to retrieve finalized", "err", err) - return - } - } - - if err := m.Node.Reset(ctx, u, s.Derived, f); err != nil { - m.log.Warn("Node failed to reset", "err", err) - return - } -} - -// resolveConflict attempts to reset the node to a valid state when a conflict is detected. -// It first tries using the latest safe block, and if that fails, walks back block by block -// until it finds a common ancestor or reaches the finalized block. -func (m *ManagedNode) resolveConflict(ctx context.Context, l1Ref eth.BlockRef, u eth.BlockID, f eth.BlockID) error { - // First try to reset to the last known safe block - s, err := m.backend.SafeDerivedAt(ctx, m.chainID, l1Ref.ID()) - if err != nil { - return fmt.Errorf("failed to retrieve safe block for %v: %w", l1Ref.ID(), err) - } - - // Helper to attempt a reset and classify the error - tryReset := func(safe eth.BlockID) (resolved bool, needsWalkback bool, err error) { - m.log.Debug("Attempting reset", "unsafe", u, "safe", safe, "finalized", f) - if err := m.Node.Reset(ctx, u, safe, f); err == nil { - return true, false, nil - } else { - var rpcErr *gethrpc.JsonError - if errors.As(err, &rpcErr) && (rpcErr.Code == blockNotFoundRPCErrCode || rpcErr.Code == conflictingBlockRPCErrCode) { - return false, true, err - } - return false, false, err - } - } - - // Try initial reset - resolved, needsWalkback, err := tryReset(s) - if resolved { - return nil - } - if !needsWalkback { - return fmt.Errorf("error during reset: %w", err) - } - - // Walk back one block at a time looking for a common ancestor - currentBlock := s.Number - for i := 0; i < maxWalkBackAttempts; i++ { - currentBlock-- - if currentBlock <= f.Number { - return fmt.Errorf("reached finalized block %d without finding common ancestor", f.Number) - } - - safe, err := m.backend.SafeDerivedAt(ctx, m.chainID, eth.BlockID{Number: currentBlock}) - if err != nil { - return fmt.Errorf("failed to retrieve safe block %d: %w", currentBlock, err) - } - - resolved, _, err := tryReset(safe) - if resolved { - return nil - } - // Continue walking back on walkable errors, otherwise return the error - var rpcErr *gethrpc.JsonError - if !errors.As(err, &rpcErr) || (rpcErr.Code != blockNotFoundRPCErrCode && rpcErr.Code != conflictingBlockRPCErrCode) { - return fmt.Errorf("error during reset at block %d: %w", currentBlock, err) - } - } - return fmt.Errorf("exceeded maximum walk-back attempts (%d)", maxWalkBackAttempts) -} - func (m *ManagedNode) onExhaustL1Event(completed types.DerivedBlockRefPair) { m.log.Info("Node completed syncing", "l2", completed.Derived, "l1", completed.Source) @@ -516,3 +419,72 @@ func (m *ManagedNode) Close() error { } return nil } + +// resetIfInconsistent checks if the node is consistent with the logs db +// and initiates a bisection based reset preparation if it is +func (m *ManagedNode) resetIfInconsistent() { + ctx, cancel := context.WithTimeout(m.ctx, internalTimeout) + defer cancel() + + var last eth.BlockID + + // check if the last unsafe block we saw is consistent with the logs db + err := m.backend.IsLocalUnsafe(ctx, m.chainID, m.lastNodeLocalUnsafe) + if errors.Is(err, types.ErrConflict) { + m.log.Warn("local unsafe block is inconsistent with logs db. Initiating reset", + "lastUnsafeblock", m.lastNodeLocalUnsafe, + "err", err) + last = m.lastNodeLocalUnsafe + } + + // check if the last safe block we saw is consistent with the local safe db + err = m.backend.IsLocalSafe(ctx, m.chainID, m.lastNodeLocalSafe) + if errors.Is(err, types.ErrConflict) { + m.log.Warn("local safe block is inconsistent with logs db. Initiating reset", + "lastSafeblock", m.lastNodeLocalSafe, + "err", err) + last = m.lastNodeLocalSafe + } + + // there is inconsistency. begin the reset process + if last != (eth.BlockID{}) { + m.resetTracker.beginBisectionReset(last) + } else { + m.log.Debug("no inconsistency found") + } +} + +// resetIfAhead checks if the node is ahead of the logs db +// and initiates a bisection based reset preparation if it is +func (m *ManagedNode) resetIfAhead() { + ctx, cancel := context.WithTimeout(m.ctx, internalTimeout) + defer cancel() + + // get the last local safe block + lastDBLocalSafe, err := m.backend.LocalSafe(ctx, m.chainID) + if err != nil { + m.log.Error("failed to get last local safe block", "err", err) + return + } + // if the node is ahead of the logs db, initiate a reset + // with the end of the range being the last safe block in the db + if m.lastNodeLocalSafe.Number > lastDBLocalSafe.Derived.Number { + m.log.Warn("local safe block on node is ahead of logs db. Initiating reset", + "lastNodeLocalSafe", m.lastNodeLocalSafe, + "lastDBLocalSafe", lastDBLocalSafe.Derived) + m.resetTracker.beginBisectionReset(lastDBLocalSafe.Derived) + } +} + +// resetFullRange resets the node using the last block in the db +// as the end of the range to search for the last consistent block +func (m *ManagedNode) resetFullRange() { + internalCtx, iCancel := context.WithTimeout(m.ctx, internalTimeout) + defer iCancel() + dbLast, err := m.backend.LocalUnsafe(internalCtx, m.chainID) + if err != nil { + m.log.Error("failed to get last local unsafe block", "err", err) + return + } + m.resetTracker.beginBisectionReset(dbLast) +} diff --git a/op-supervisor/supervisor/backend/syncnode/node_test.go b/op-supervisor/supervisor/backend/syncnode/node_test.go index 573d5ca80bbee..be65699968880 100644 --- a/op-supervisor/supervisor/backend/syncnode/node_test.go +++ b/op-supervisor/supervisor/backend/syncnode/node_test.go @@ -2,7 +2,6 @@ package syncnode import ( "context" - "fmt" "testing" "time" @@ -11,8 +10,8 @@ import ( "github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" - gethrpc "github.com/ethereum/go-ethereum/rpc" "github.com/stretchr/testify/require" ) @@ -93,97 +92,95 @@ func TestEventResponse(t *testing.T) { }, 4*time.Second, 250*time.Millisecond) } -func TestResetConflict(t *testing.T) { +func TestPrepareReset(t *testing.T) { chainID := eth.ChainIDFromUInt64(1) - logger := testlog.Logger(t, log.LvlDebug) - - tests := []struct { - name string - resetErrors []error - expectAttempts int - expectError bool - l1RefNum uint64 - finalizedNum uint64 - }{ - { - name: "succeeds_first_try", - resetErrors: []error{nil}, - expectAttempts: 1, - expectError: false, - l1RefNum: 100, - finalizedNum: 50, - }, - { - name: "walks_back_on_block_not_found", - resetErrors: []error{ - &gethrpc.JsonError{Code: blockNotFoundRPCErrCode}, - &gethrpc.JsonError{Code: blockNotFoundRPCErrCode}, - nil, - }, - expectAttempts: 3, - expectError: false, - l1RefNum: 100, - finalizedNum: 50, - }, - { - name: "handles_finalized_boundary", - resetErrors: []error{ - &gethrpc.JsonError{Code: blockNotFoundRPCErrCode}, - }, - expectAttempts: 1, - expectError: true, - l1RefNum: 100, - finalizedNum: 99, - }, - { - name: "stops_after_max_attempts_exceeded", - resetErrors: func() []error { - // Generate more errors than we allow attempts for - errors := make([]error, maxWalkBackAttempts+100) - for i := range errors { - errors[i] = &gethrpc.JsonError{Code: blockNotFoundRPCErrCode} - } - return errors - }(), - // We expect the max number of attempts to be made, plus one for the initial attempt - expectAttempts: maxWalkBackAttempts + 1, - expectError: true, - l1RefNum: 1000, - finalizedNum: 1, - }, + logger := testlog.Logger(t, log.LvlInfo) + syncCtrl := &mockSyncControl{} + backend := &mockBackend{} + + ex := event.NewGlobalSynchronous(context.Background()) + eventSys := event.NewSystem(logger, ex) + + mon := &eventMonitor{} + eventSys.Register("monitor", mon, event.DefaultRegisterOpts()) + + node := NewManagedNode(logger, chainID, syncCtrl, backend, false) + eventSys.Register("node", node, event.DefaultRegisterOpts()) + + // mock: return a block of the same number as requested + syncCtrl.blockRefByNumFn = func(ctx context.Context, number uint64) (eth.BlockRef, error) { + return eth.BlockRef{Number: number, Hash: common.Hash{0xaa}}, nil } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - resetAttempts := 0 - ctrl := &mockSyncControl{ - resetFn: func(ctx context.Context, unsafe, safe, finalized eth.BlockID) error { - resetAttempts++ - if resetAttempts > len(tc.resetErrors) { - return fmt.Errorf("unexpected reset attempt %d", resetAttempts) - } - return tc.resetErrors[resetAttempts-1] - }, - } - backend := &mockBackend{ - safeDerivedAtFn: func(ctx context.Context, chainID eth.ChainID, source eth.BlockID) (eth.BlockID, error) { - return eth.BlockID{Number: source.Number}, nil - }, - } - - node := NewManagedNode(logger, chainID, ctrl, backend, true) - l1Ref := eth.BlockRef{Number: tc.l1RefNum} - unsafe := eth.BlockID{Number: tc.l1RefNum + 100} - finalized := eth.BlockID{Number: tc.finalizedNum} - - err := node.resolveConflict(context.Background(), l1Ref, unsafe, finalized) - - require.Equal(t, tc.expectAttempts, resetAttempts, "incorrect number of reset attempts") - if tc.expectError { - require.Error(t, err) - } else { - require.NoError(t, err) - } - }) + // mock: control whether the blocks appear valid or not + var pivot uint64 + backend.isLocalUnsafeFn = func(ctx context.Context, chainID eth.ChainID, id eth.BlockID) error { + if id.Number > uint64(pivot) { + return types.ErrConflict + } + return nil + } + + // mock: record the reset signal given to the node + var unsafe, safe, finalized eth.BlockID + var resetCalled int + syncCtrl.resetFn = func(ctx context.Context, u, s, f eth.BlockID) error { + unsafe = u + safe = s + finalized = f + resetCalled++ + return nil } + + // test that the bisection finds the correct block, + // anywhere inside the min-max range + min, max := uint64(1), uint64(235) + for i := min; i < max; i++ { + node.resetTracker.a = eth.BlockID{Number: min, Hash: common.Hash{0xaa}} + node.resetTracker.z = eth.BlockID{Number: max} + pivot = i + node.resetTracker.bisectToTarget() + require.Equal(t, i, unsafe.Number) + require.Equal(t, uint64(0), safe.Number) + require.Equal(t, uint64(0), finalized.Number) + } + + // test that when the end of range (z) is known to the node, + // the reset request is made with the end of the range as the safe block + for i := min; i < max; i++ { + node.resetTracker.a = eth.BlockID{Number: min} + node.resetTracker.z = eth.BlockID{Number: max, Hash: common.Hash{0xaa}} + pivot = 0 + node.resetTracker.bisectToTarget() + require.Equal(t, max, unsafe.Number) + } + + // mock: return local safe and finalized blocks which are *ahead* of the pivot + backend.localSafeFn = func(ctx context.Context, chainID eth.ChainID) (types.DerivedIDPair, error) { + return types.DerivedIDPair{ + Derived: eth.BlockID{Number: pivot + 1}, + }, nil + } + backend.finalizedFn = func(ctx context.Context, chainID eth.ChainID) (eth.BlockID, error) { + return eth.BlockID{Number: pivot + 1}, nil + } + // test that the bisection finds the correct block, + // AND that the safe and finalized blocks are updated to match the unsafe block + for i := min; i < max; i++ { + node.resetTracker.a = eth.BlockID{Number: min, Hash: common.Hash{0xaa}} + node.resetTracker.z = eth.BlockID{Number: max} + pivot = i + node.resetTracker.bisectToTarget() + require.Equal(t, i, unsafe.Number) + require.Equal(t, i, safe.Number) + require.Equal(t, i, finalized.Number) + } + + // test that the reset function is not called if start of the range (a) is unknown + resetCount := resetCalled + node.resetTracker.a = eth.BlockID{Number: 0, Hash: common.Hash{0xbb}} + node.resetTracker.z = eth.BlockID{Number: max} + pivot = 40 + node.resetTracker.bisectToTarget() + require.Equal(t, resetCount, resetCalled) } diff --git a/op-supervisor/supervisor/backend/syncnode/reset.go b/op-supervisor/supervisor/backend/syncnode/reset.go new file mode 100644 index 0000000000000..0d73fd41875ec --- /dev/null +++ b/op-supervisor/supervisor/backend/syncnode/reset.go @@ -0,0 +1,264 @@ +package syncnode + +import ( + "context" + "errors" + "sync/atomic" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" + "github.com/ethereum/go-ethereum" +) + +// resetTracker manages a bisection +// between consistent and inconsistent blocks +// and is used to prepare a reset request +// which is sent to the managed node +type resetTracker struct { + a eth.BlockID + z eth.BlockID + + synchronous bool + resetting *atomic.Bool + cancelling *atomic.Bool + + managed *ManagedNode +} + +// init initializes the reset tracker with +// empty start and end of range, and no reset in progress +func (t *resetTracker) init() { + t.resetting.Store(true) + t.cancelling.Store(false) + t.a = eth.BlockID{} + t.z = eth.BlockID{} +} + +// beginBisectionReset initializes the reset tracker +// and starts the bisection process at the given block +// which will lead to a reset request +func (t *resetTracker) beginBisectionReset(z eth.BlockID) { + t.managed.log.Info("beginning reset", "endOfRange", z) + // only one reset can be in progress at a time + if t.resetting.Load() { + return + } + // initialize the reset tracker + t.init() + t.z = z + // action tests may prefer to run the managed node totally synchronously + if t.synchronous { + t.bisectToTarget() + } else { + go t.bisectToTarget() + } +} + +// endReset signals that the reset is over +func (t *resetTracker) endReset() { + t.resetting.Store(false) + t.cancelling.Store(false) +} + +// isResetting returns true if a reset is in progress +func (t *resetTracker) isResetting() bool { + return t.resetting.Load() +} + +// cancelReset signals that the ongoing reset should be cancelled +// it is not guaranteed that the reset will be cancelled immediately +func (t *resetTracker) cancelReset() { + t.cancelling.Store(true) +} + +// bisectToTarget prepares the reset by bisecting the search range until the last consistent block is found. +// it then calls resetHeadsFromTarget to trigger the reset on the node. +func (t *resetTracker) bisectToTarget() { + nodeCtx, nCancel := context.WithTimeout(t.managed.ctx, nodeTimeout) + defer nCancel() + internalCtx, iCancel := context.WithTimeout(t.managed.ctx, internalTimeout) + defer iCancel() + + // initialize the start of the range if it is empty + if t.a == (eth.BlockID{}) { + t.managed.log.Debug("start of range is empty, finding the first block") + var err error + t.a, err = t.managed.backend.FindSealedBlock(internalCtx, t.managed.chainID, 0) + if err != nil { + t.managed.log.Error("failed to initialize start of bisection range", "err", err) + t.endReset() + return + } + } + + // before starting bisection, check if z is already consistent (i.e. the node is ahead but otherwise consistent) + nodeZ, err := t.managed.Node.BlockRefByNumber(nodeCtx, t.z.Number) + // if z is already consistent, we can skip the bisection + // and move straight to a targeted reset + if err == nil && nodeZ.ID() == t.z { + t.resetHeadsFromTarget(t.z) + return + } + + // before starting bisection, check if a is inconsistent (i.e. the node has no common reference point) + // if the first block in the range can't be found or is inconsistent, we can't do a reset + nodeA, err := t.managed.Node.BlockRefByNumber(nodeCtx, t.a.Number) + if err != nil { + t.managed.log.Error("failed to get block at start of range. cannot reset node", "err", err) + t.endReset() + return + } + if nodeA.ID() != t.a { + t.managed.log.Error("start of range is inconsistent with logs db. cannot reset node", + "a", t.a, + "block", nodeA.ID()) + t.endReset() + return + } + + // repeatedly bisect the range until the last consistent block is found + for { + if t.cancelling.Load() { + t.managed.log.Debug("reset cancelled") + t.endReset() + return + } + if t.a.Number >= t.z.Number { + t.managed.log.Debug("reset target converged. Resetting to start of range", "a", t.a, "z", t.z) + t.resetHeadsFromTarget(t.a) + return + } + if t.a.Number+1 == t.z.Number { + break + } + err := t.bisect() + if err != nil { + t.managed.log.Error("failed to bisect recovery range. cannot reset node", "err", err) + t.endReset() + return + } + } + // the bisection is now complete. a is the last consistent block, and z is the first inconsistent block + t.resetHeadsFromTarget(t.a) +} + +// bisect halves the search range of the ongoing reset to narrow down +// where the reset will target. It bisects the range and constrains either +// the start or the end of the range, based on the consistency of the midpoint +// with the logs db. +func (t *resetTracker) bisect() error { + internalCtx, iCancel := context.WithTimeout(t.managed.ctx, internalTimeout) + defer iCancel() + nodeCtx, nCancel := context.WithTimeout(t.managed.ctx, nodeTimeout) + defer nCancel() + + // attempt to get the block at the midpoint of the range + i := (t.a.Number + t.z.Number) / 2 + nodeIRef, err := t.managed.Node.BlockRefByNumber(nodeCtx, i) + if err != nil { + // if the block is not known to the node, it is defacto inconsistent + if errors.Is(err, ethereum.NotFound) { + t.managed.log.Trace("midpoint of range is not known to node. pulling back end of range", "i", i) + t.z = eth.BlockID{Number: i} + return nil + } else { + t.managed.log.Error("failed to get block at midpoint of range. cannot reset node", "err", err) + } + } + + // check if the block at i is consistent with the logs db + // and update the search range accordingly + nodeI := nodeIRef.ID() + err = t.managed.backend.IsLocalUnsafe(internalCtx, t.managed.chainID, nodeI) + if err != nil { + t.managed.log.Trace("midpoint of range is inconsistent with logs db. pulling back end of range", "i", i) + t.z = nodeI + } else { + t.managed.log.Trace("midpoint of range is consistent with logs db. pushing up start of range", "i", i) + t.a = nodeI + } + return nil +} + +// resetHeadsFromTarget takes a target block and identifies the correct +// unsafe, safe, and finalized blocks to target for the reset. +// It then triggers the reset on the node. +func (t *resetTracker) resetHeadsFromTarget(target eth.BlockID) { + internalCtx, iCancel := context.WithTimeout(t.managed.ctx, internalTimeout) + defer iCancel() + + // if the target is empty, no reset can be done + if target == (eth.BlockID{}) { + t.managed.log.Error("no reset target found. cannot reset node") + t.endReset() + return + } + + t.managed.log.Info("reset target identified", "target", target) + var lUnsafe, xUnsafe, lSafe, xSafe, finalized eth.BlockID + + // the unsafe block is always the last block we found to be consistent + lUnsafe = target + + // all other blocks are either the last consistent block, or the last block in the db, whichever is earlier + // cross unsafe + lastXUnsafe, err := t.managed.backend.CrossUnsafe(internalCtx, t.managed.chainID) + if err != nil { + t.managed.log.Error("failed to get last cross unsafe block. cancelling reset", "err", err) + t.endReset() + return + } + if lastXUnsafe.Number < target.Number { + xUnsafe = lastXUnsafe + } else { + xUnsafe = target + } + // local safe + lastLSafe, err := t.managed.backend.LocalSafe(internalCtx, t.managed.chainID) + if err != nil { + t.managed.log.Error("failed to get last safe block. cancelling reset", "err", err) + t.endReset() + return + } + if lastLSafe.Derived.Number < target.Number { + lSafe = lastLSafe.Derived + } else { + lSafe = target + } + // cross safe + lastXSafe, err := t.managed.backend.CrossSafe(internalCtx, t.managed.chainID) + if err != nil { + t.managed.log.Error("failed to get last cross safe block. cancelling reset", "err", err) + t.endReset() + return + } + if lastXSafe.Derived.Number < target.Number { + xSafe = lastXSafe.Derived + } else { + xSafe = target + } + // finalized + lastFinalized, err := t.managed.backend.Finalized(internalCtx, t.managed.chainID) + if errors.Is(err, types.ErrFuture) { + t.managed.log.Warn("finalized block is not yet known", "err", err) + lastFinalized = eth.BlockID{} + } else if err != nil { + t.managed.log.Error("failed to get last finalized block. cancelling reset", "err", err) + t.endReset() + return + } + if lastFinalized.Number < target.Number { + finalized = lastFinalized + } else { + finalized = target + } + + // trigger the reset + t.managed.log.Info("triggering reset on node", + "localUnsafe", lUnsafe, + "crossUnsafe", xUnsafe, + "localSafe", lSafe, + "crossSafe", xSafe, + "finalized", finalized) + t.managed.OnResetReady(lUnsafe, xUnsafe, lSafe, xSafe, finalized) +} diff --git a/op-supervisor/supervisor/backend/syncnode/rpc.go b/op-supervisor/supervisor/backend/syncnode/rpc.go index 526c352bdc264..8d2c33196ea71 100644 --- a/op-supervisor/supervisor/backend/syncnode/rpc.go +++ b/op-supervisor/supervisor/backend/syncnode/rpc.go @@ -126,8 +126,8 @@ func (rs *RPCSyncNode) InvalidateBlock(ctx context.Context, seal types.BlockSeal return rs.cl.CallContext(ctx, nil, "interop_invalidateBlock", seal) } -func (rs *RPCSyncNode) Reset(ctx context.Context, unsafe, safe, finalized eth.BlockID) error { - return rs.cl.CallContext(ctx, nil, "interop_reset", unsafe, safe, finalized) +func (rs *RPCSyncNode) Reset(ctx context.Context, lUnsafe, xUnsafe, lSafe, xSafe, finalized eth.BlockID) error { + return rs.cl.CallContext(ctx, nil, "interop_reset", lUnsafe, xUnsafe, lSafe, xSafe, finalized) } func (rs *RPCSyncNode) ProvideL1(ctx context.Context, nextL1 eth.BlockRef) error {