Skip to content
Merged
2 changes: 1 addition & 1 deletion op-e2e/actions/helpers/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher,
Log: log,
Ctx: ctx,
Drain: executor.Drain,
ManagedMode: false,
ManagedMode: managedMode,
}, opts)

sys.Register("engine", engine.NewEngDeriver(log, ctx, cfg, metrics, ec), opts)
Expand Down
8 changes: 1 addition & 7 deletions op-e2e/actions/interop/dsl/dsl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,12 @@ type InteropDSL struct {
func NewInteropDSL(t helpers.Testing) *InteropDSL {
setup := SetupInterop(t)
actors := setup.CreateActors()
actors.PrepareChainState(t)

t.Logf("ChainA: %v, ChainB: %v", actors.ChainA.ChainID, actors.ChainB.ChainID)

allChains := []*Chain{actors.ChainA, actors.ChainB}

// Get all the initial events processed
for _, chain := range allChains {
chain.Sequencer.ActL2PipelineFull(t)
chain.Sequencer.SyncSupervisor(t)
}
actors.Supervisor.ProcessFull(t)

superRootSource, err := NewSuperRootSource(
t.Ctx(),
actors.ChainA.Sequencer.RollupClient(),
Expand Down
31 changes: 31 additions & 0 deletions op-e2e/actions/interop/dsl/interop.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,37 @@ type InteropActors struct {
ChainB *Chain
}

func (actors *InteropActors) PrepareChainState(t helpers.Testing) {
// Initialize both chain states
actors.ChainA.Sequencer.ActL2PipelineFull(t)
actors.ChainB.Sequencer.ActL2PipelineFull(t)
t.Log("Sequencers should initialize, and produce initial reset requests")

// Process the anchor point
actors.Supervisor.ProcessFull(t)
t.Log("Supervisor should have anchor points now")

// Sync supervisors, i.e. the reset request makes it to the supervisor now
actors.ChainA.Sequencer.SyncSupervisor(t)
actors.ChainB.Sequencer.SyncSupervisor(t)
t.Log("Supervisor has events now")

// Pick up the reset request
actors.Supervisor.ProcessFull(t)
t.Log("Supervisor processed initial resets")

// Process reset work
actors.ChainA.Sequencer.ActL2PipelineFull(t)
actors.ChainB.Sequencer.ActL2PipelineFull(t)
t.Log("Processed!")

// Verify initial state
statusA := actors.ChainA.Sequencer.SyncStatus()
statusB := actors.ChainB.Sequencer.SyncStatus()
require.Equal(t, uint64(0), statusA.UnsafeL2.Number)
require.Equal(t, uint64(0), statusB.UnsafeL2.Number)
}

// messageExpiryTime is the time in seconds that a message will be valid for on the L2 chain.
// At a 2 second block time, this should be small enough to cover all events buffered in the supervisor event queue.
const messageExpiryTime = 120 // 2 minutes
Expand Down
21 changes: 1 addition & 20 deletions op-e2e/actions/interop/emitter_contract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestEmitterContract(gt *testing.T) {
actors = is.CreateActors()
aliceA = setupUser(t, is, actors.ChainA, 0)
aliceB = setupUser(t, is, actors.ChainB, 0)
initializeChainState(t, actors)
actors.PrepareChainState(t)
emitTx = initializeEmitterContractTest(t, aliceA, actors)
}

Expand Down Expand Up @@ -145,25 +145,6 @@ func idForTx(t helpers.Testing, tx *types.Transaction, srcChain *dsl.Chain) inbo
}
}

func initializeChainState(t helpers.Testing, actors *dsl.InteropActors) {
// Initialize both chain states
actors.ChainA.Sequencer.ActL2PipelineFull(t)
actors.ChainB.Sequencer.ActL2PipelineFull(t)

// Sync supervisors
actors.ChainA.Sequencer.SyncSupervisor(t)
actors.ChainB.Sequencer.SyncSupervisor(t)

// Verify initial state
statusA := actors.ChainA.Sequencer.SyncStatus()
statusB := actors.ChainB.Sequencer.SyncStatus()
require.Equal(t, uint64(0), statusA.UnsafeL2.Number)
require.Equal(t, uint64(0), statusB.UnsafeL2.Number)

// Complete initial sync
actors.Supervisor.ProcessFull(t)
}

func initializeEmitterContractTest(t helpers.Testing, aliceA *userWithKeys, actors *dsl.InteropActors) *types.Transaction {
// Deploy message contract and emit a log on ChainA
// This issues two blocks to ChainA
Expand Down
30 changes: 4 additions & 26 deletions op-e2e/actions/interop/interop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ func TestFullInterop(gt *testing.T) {

is := dsl.SetupInterop(t)
actors := is.CreateActors()

// get both sequencers set up
actors.ChainA.Sequencer.ActL2PipelineFull(t)
actors.ChainB.Sequencer.ActL2PipelineFull(t)
actors.PrepareChainState(t)

// sync the supervisor, handle initial events emitted by the nodes
actors.ChainA.Sequencer.SyncSupervisor(t)
Expand Down Expand Up @@ -168,10 +165,7 @@ func TestFinality(gt *testing.T) {
testFinality := func(t helpers.StatefulTesting, extraBlocks int) {
is := dsl.SetupInterop(t)
actors := is.CreateActors()

// set up a blank ChainA
actors.ChainA.Sequencer.ActL2PipelineFull(t)
actors.ChainA.Sequencer.SyncSupervisor(t)
actors.PrepareChainState(t)

actors.Supervisor.ProcessFull(t)

Expand Down Expand Up @@ -250,15 +244,7 @@ func TestInteropLocalSafeInvalidation(gt *testing.T) {

is := dsl.SetupInterop(t)
actors := is.CreateActors()

// get both sequencers set up
actors.ChainA.Sequencer.ActL2PipelineFull(t)
actors.ChainB.Sequencer.ActL2PipelineFull(t)

// sync the supervisor, handle initial events emitted by the nodes
actors.ChainA.Sequencer.SyncSupervisor(t)
actors.ChainB.Sequencer.SyncSupervisor(t)
actors.Supervisor.ProcessFull(t)
actors.PrepareChainState(t)

genesisB := actors.ChainB.Sequencer.SyncStatus()

Expand Down Expand Up @@ -376,15 +362,7 @@ func TestInteropCrossSafeDependencyDelay(gt *testing.T) {

is := dsl.SetupInterop(t)
actors := is.CreateActors()

// get both sequencers set up
actors.ChainA.Sequencer.ActL2PipelineFull(t)
actors.ChainB.Sequencer.ActL2PipelineFull(t)

// sync the supervisor, handle initial events emitted by the nodes
actors.ChainA.Sequencer.SyncSupervisor(t)
actors.ChainB.Sequencer.SyncSupervisor(t)
actors.Supervisor.ProcessFull(t)
actors.PrepareChainState(t)

// We create a batch with some empty blocks before and after the cross-chain message,
// so multiple L2 blocks are all derived from the same L1 block.
Expand Down
6 changes: 1 addition & 5 deletions op-e2e/actions/interop/reset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ func TestReset(gt *testing.T) {

is := dsl.SetupInterop(t)
actors := is.CreateActors()

// get both sequencers set up
// sync the supervisor, handle initial events emitted by the nodes
actors.ChainA.Sequencer.ActL2PipelineFull(t)
actors.ChainA.Sequencer.SyncSupervisor(t)
actors.PrepareChainState(t)

// No blocks yet
status := actors.ChainA.Sequencer.SyncStatus()
Expand Down
17 changes: 7 additions & 10 deletions op-node/rollup/driver/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,11 +367,11 @@ func (s *SyncDeriver) onEngineConfirmedReset(x engine.EngineResetConfirmedEvent)
// and don't confirm the engine-reset with the derivation pipeline.
// The pipeline will re-trigger a reset as necessary.
if s.SafeHeadNotifs != nil {
if err := s.SafeHeadNotifs.SafeHeadReset(x.Safe); err != nil {
s.Log.Error("Failed to warn safe-head notifier of safe-head reset", "safe", x.Safe)
if err := s.SafeHeadNotifs.SafeHeadReset(x.CrossSafe); err != nil {
s.Log.Error("Failed to warn safe-head notifier of safe-head reset", "safe", x.CrossSafe)
return
}
if s.SafeHeadNotifs.Enabled() && x.Safe.ID() == s.Config.Genesis.L2 {
if s.SafeHeadNotifs.Enabled() && x.CrossSafe.ID() == s.Config.Genesis.L2 {
// The rollup genesis block is always safe by definition. So if the pipeline resets this far back we know
// we will process all safe head updates and can record genesis as always safe from L1 genesis.
// Note that it is not safe to use cfg.Genesis.L1 here as it is the block immediately before the L2 genesis
Expand All @@ -382,23 +382,20 @@ func (s *SyncDeriver) onEngineConfirmedReset(x engine.EngineResetConfirmedEvent)
s.Log.Error("Failed to retrieve L1 genesis, cannot notify genesis as safe block", "err", err)
return
}
if err := s.SafeHeadNotifs.SafeHeadUpdated(x.Safe, l1Genesis.ID()); err != nil {
if err := s.SafeHeadNotifs.SafeHeadUpdated(x.CrossSafe, l1Genesis.ID()); err != nil {
s.Log.Error("Failed to notify safe-head listener of safe-head", "err", err)
return
}
}
}
s.Log.Info("Confirming pipeline reset")
s.Emitter.Emit(derive.ConfirmPipelineResetEvent{})
}

func (s *SyncDeriver) onResetEvent(x rollup.ResetEvent) {
if s.ManagedMode {
if errors.Is(x.Err, derive.ErrEngineResetReq) {
s.Log.Warn("Managed Mode is enabled, but engine reset is required", "err", x.Err)
s.Emitter.Emit(engine.ResetEngineRequestEvent{})
} else {
s.Log.Warn("Encountered reset, waiting for op-supervisor to recover", "err", x.Err)
}
s.Log.Warn("Encountered reset in Managed Mode, waiting for op-supervisor", "err", x.Err)
// ManagedMode will pick up the ResetEvent
return
}
// If the system corrupts, e.g. due to a reorg, simply reset it
Expand Down
53 changes: 53 additions & 0 deletions op-node/rollup/engine/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,56 @@ func (e *EngineController) checkForkchoiceUpdatedStatus(status eth.ExecutePayloa
return status == eth.ExecutionValid
}

// initializeUnknowns is important to give the op-node EngineController engine state.
// Pre-interop, the initial reset triggered a find-sync-start, and filled the forkchoice.
// This still happens, but now overrides what may be initialized here.
// Post-interop, the op-supervisor may diff the forkchoice state against the supervisor DB,
// to determine where to perform the initial reset to.
func (e *EngineController) initializeUnknowns(ctx context.Context) error {
if e.unsafeHead == (eth.L2BlockRef{}) {
ref, err := e.engine.L2BlockRefByLabel(ctx, eth.Unsafe)
if err != nil {
return fmt.Errorf("failed to load local-unsafe head: %w", err)
}
e.SetUnsafeHead(ref)
e.log.Info("Loaded initial local-unsafe block ref", "local_unsafe", ref)
}
var finalizedRef eth.L2BlockRef
if e.finalizedHead == (eth.L2BlockRef{}) {
var err error
finalizedRef, err = e.engine.L2BlockRefByLabel(ctx, eth.Finalized)
if err != nil {
return fmt.Errorf("failed to load finalized head: %w", err)
}
e.SetFinalizedHead(finalizedRef)
e.log.Info("Loaded initial finalized block ref", "finalized", finalizedRef)
}
if e.safeHead == (eth.L2BlockRef{}) {
ref, err := e.engine.L2BlockRefByLabel(ctx, eth.Safe)
if err != nil {
if errors.Is(err, ethereum.NotFound) {
// If the engine doesn't have a safe head, then we can use the finalized head
e.SetSafeHead(finalizedRef)
e.log.Info("Loaded initial cross-safe block from finalized", "cross_safe", finalizedRef)
} else {
return fmt.Errorf("failed to load cross-safe head: %w", err)
}
} else {
e.SetSafeHead(ref)
e.log.Info("Loaded initial cross-safe block ref", "cross_safe", ref)
}
}
if e.crossUnsafeHead == (eth.L2BlockRef{}) {
e.SetCrossUnsafeHead(e.safeHead) // preserve cross-safety, don't fall back to a non-cross safety level
e.log.Info("Set initial cross-unsafe block ref to match cross-safe", "cross_unsafe", e.safeHead)
}
if e.localSafeHead == (eth.L2BlockRef{}) {
e.SetLocalSafeHead(e.safeHead)
e.log.Info("Set initial local-safe block ref to match cross-safe", "local_safe", e.safeHead)
}
return nil
}

// TryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node,
// this is a no-op if the nodes already agree on the forkchoice state.
func (e *EngineController) TryUpdateEngine(ctx context.Context) error {
Expand All @@ -271,6 +321,9 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error {
if e.IsEngineSyncing() {
e.log.Warn("Attempting to update forkchoice state while EL syncing")
}
if err := e.initializeUnknowns(ctx); err != nil {
return derive.NewTemporaryError(fmt.Errorf("cannot update engine until engine forkchoice is initialized: %w", err))
}
if e.unsafeHead.Number < e.finalizedHead.Number {
err := fmt.Errorf("invalid forkchoice state, unsafe head %s is behind finalized head %s", e.unsafeHead, e.finalizedHead)
e.emitter.Emit(rollup.CriticalErrorEvent{Err: err}) // make the node exit, things are very wrong.
Expand Down
9 changes: 6 additions & 3 deletions op-node/rollup/engine/engine_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// ResetEngineRequestEvent requests the EngineResetDeriver to walk
// the L2 chain backwards until it finds a plausible unsafe head,
// and find an L2 safe block that is guaranteed to still be from the L1 chain.
// This event is not used in interop.
type ResetEngineRequestEvent struct{}

func (ev ResetEngineRequestEvent) String() string {
Expand Down Expand Up @@ -56,9 +57,11 @@ func (d *EngineResetDeriver) OnEvent(ev event.Event) bool {
return true
}
d.emitter.Emit(rollup.ForceResetEvent{
Unsafe: result.Unsafe,
Safe: result.Safe,
Finalized: result.Finalized,
LocalUnsafe: result.Unsafe,
CrossUnsafe: result.Unsafe,
LocalSafe: result.Safe,
CrossSafe: result.Safe,
Finalized: result.Finalized,
})
default:
return false
Expand Down
53 changes: 38 additions & 15 deletions op-node/rollup/engine/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,11 @@ func (ev TryUpdateEngineEvent) getBlockProcessingMetrics() []interface{} {
}

type EngineResetConfirmedEvent struct {
Unsafe, Safe, Finalized eth.L2BlockRef
LocalUnsafe eth.L2BlockRef
CrossUnsafe eth.L2BlockRef
LocalSafe eth.L2BlockRef
CrossSafe eth.L2BlockRef
Finalized eth.L2BlockRef
}

func (ev EngineResetConfirmedEvent) String() string {
Expand Down Expand Up @@ -430,10 +434,22 @@ func (d *EngDeriver) OnEvent(ev event.Event) bool {
// Time to apply the changes to the underlying engine
d.emitter.Emit(TryUpdateEngineEvent{})

log.Debug("Reset of Engine is completed",
"safeHead", x.Safe, "unsafe", x.Unsafe, "safe_timestamp", x.Safe.Time,
"unsafe_timestamp", x.Unsafe.Time)
d.emitter.Emit(EngineResetConfirmedEvent(x))
v := EngineResetConfirmedEvent{
LocalUnsafe: d.ec.LocalSafeL2Head(),
CrossUnsafe: d.ec.CrossUnsafeL2Head(),
LocalSafe: d.ec.LocalSafeL2Head(),
CrossSafe: d.ec.SafeL2Head(),
Finalized: d.ec.Finalized(),
}
// We do not emit the original event values, since those might not be set (optional attributes).
d.emitter.Emit(v)
d.log.Info("Reset of Engine is completed",
"local_unsafe", v.LocalUnsafe,
"cross_unsafe", v.CrossUnsafe,
"local_safe", v.LocalSafe,
"cross_safe", v.CrossSafe,
"finalized", v.Finalized,
)
case PromoteUnsafeEvent:
// Backup unsafeHead when new block is not built on original unsafe head.
if d.ec.unsafeHead.Number >= x.Ref.Number {
Expand Down Expand Up @@ -570,24 +586,31 @@ func (d *EngDeriver) OnEvent(ev event.Event) bool {

type ResetEngineControl interface {
SetUnsafeHead(eth.L2BlockRef)
SetCrossUnsafeHead(ref eth.L2BlockRef)
SetLocalSafeHead(ref eth.L2BlockRef)
SetSafeHead(eth.L2BlockRef)
SetFinalizedHead(eth.L2BlockRef)
SetLocalSafeHead(ref eth.L2BlockRef)
SetCrossUnsafeHead(ref eth.L2BlockRef)
SetBackupUnsafeL2Head(block eth.L2BlockRef, triggerReorg bool)
SetPendingSafeL2Head(eth.L2BlockRef)
}

// ForceEngineReset is not to be used. The op-program needs it for now, until event processing is adopted there.
func ForceEngineReset(ec ResetEngineControl, x rollup.ForceResetEvent) {
// if the unsafe head is not provided, do not override the existing unsafe head
if x.Unsafe != (eth.L2BlockRef{}) {
ec.SetUnsafeHead(x.Unsafe)
// local-unsafe is an optional attribute, empty to preserve the existing latest chain
if x.LocalUnsafe != (eth.L2BlockRef{}) {
ec.SetUnsafeHead(x.LocalUnsafe)
}
ec.SetLocalSafeHead(x.Safe)
ec.SetPendingSafeL2Head(x.Safe)
// cross-safe is fine to revert back, it does not affect engine logic, just sync-status
ec.SetCrossUnsafeHead(x.CrossUnsafe)

// derivation continues at local-safe point
ec.SetLocalSafeHead(x.LocalSafe)
ec.SetPendingSafeL2Head(x.LocalSafe)

// "safe" in RPC terms is cross-safe
ec.SetSafeHead(x.CrossSafe)

// finalized head
ec.SetFinalizedHead(x.Finalized)
ec.SetSafeHead(x.Safe)
ec.SetCrossUnsafeHead(x.Safe)

ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
}
Loading