Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions op-node/rollup/attributes/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type EngineController interface {
TryUpdateLocalSafe(ctx context.Context, ref eth.L2BlockRef, concluding bool, source eth.L1BlockRef)
// RequestForkchoiceUpdate requests a forkchoice update
RequestForkchoiceUpdate(ctx context.Context)
RequestPendingSafeUpdate(ctx context.Context)
}

type L2 interface {
Expand Down Expand Up @@ -93,7 +94,7 @@ func (eq *AttributesHandler) OnEvent(ctx context.Context, ev event.Event) bool {
eq.sentAttributes = false
eq.emitter.Emit(ctx, derive.ConfirmReceivedAttributesEvent{})
// to make sure we have a pre-state signal to process the attributes from
eq.emitter.Emit(ctx, engine.PendingSafeRequestEvent{})
eq.engineController.RequestPendingSafeUpdate(ctx)
case rollup.ResetEvent:
eq.forceResetLocked()
case rollup.EngineTemporaryErrorEvent:
Expand All @@ -108,7 +109,7 @@ func (eq *AttributesHandler) OnEvent(ctx context.Context, ev event.Event) bool {
eq.attributes = nil
// Time to re-evaluate without attributes.
// (the pending-safe state will then be forwarded to our source of attributes).
eq.emitter.Emit(ctx, engine.PendingSafeRequestEvent{})
eq.engineController.RequestPendingSafeUpdate(ctx)
case engine.PayloadSealExpiredErrorEvent:
if x.DerivedFrom == (eth.L1BlockRef{}) {
return true // from sequencing
Expand All @@ -125,7 +126,7 @@ func (eq *AttributesHandler) OnEvent(ctx context.Context, ev event.Event) bool {
"build_id", x.Info.ID, "timestamp", x.Info.Timestamp, "err", x.Err)
eq.sentAttributes = false
eq.attributes = nil
eq.emitter.Emit(ctx, engine.PendingSafeRequestEvent{})
eq.engineController.RequestPendingSafeUpdate(ctx)
default:
return false
}
Expand Down
21 changes: 14 additions & 7 deletions op-node/rollup/attributes/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,19 @@ func TestAttributesHandler(t *testing.T) {
ah.AttachEmitter(emitter)

emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
engDeriver.On("RequestPendingSafeUpdate", context.Background()).Once()
ah.OnEvent(context.Background(), derive.DerivedAttributesEvent{
Attributes: attrA1,
})
engDeriver.AssertExpectations(t)
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes, "queue the invalid attributes")

emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
engDeriver.On("RequestPendingSafeUpdate", context.Background()).Once()
ah.OnEvent(context.Background(), engine.InvalidPayloadAttributesEvent{
Attributes: attrA1,
})
engDeriver.AssertExpectations(t)
emitter.AssertExpectations(t)
require.Nil(t, ah.attributes, "drop the invalid attributes")
})
Expand All @@ -198,10 +200,11 @@ func TestAttributesHandler(t *testing.T) {
ah.AttachEmitter(emitter)

emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
engDeriver.On("RequestPendingSafeUpdate", context.Background()).Once()
ah.OnEvent(context.Background(), derive.DerivedAttributesEvent{
Attributes: attrA1,
})
engDeriver.AssertExpectations(t)
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes)
// New attributes will have to get generated after processing the last ones
Expand All @@ -224,10 +227,11 @@ func TestAttributesHandler(t *testing.T) {
ah.AttachEmitter(emitter)

emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
engDeriver.On("RequestPendingSafeUpdate", context.Background()).Once()
ah.OnEvent(context.Background(), derive.DerivedAttributesEvent{
Attributes: attrA1,
})
engDeriver.AssertExpectations(t)
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes)

Expand All @@ -252,8 +256,9 @@ func TestAttributesHandler(t *testing.T) {

// attrA1Alt does not match block A1, so will cause force-reorg.
emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
engDeriver.On("RequestPendingSafeUpdate", context.Background()).Once()
ah.OnEvent(context.Background(), derive.DerivedAttributesEvent{Attributes: attrA1Alt})
engDeriver.AssertExpectations(t)
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes, "queued up derived attributes")

Expand Down Expand Up @@ -295,8 +300,9 @@ func TestAttributesHandler(t *testing.T) {
DerivedFrom: refB,
}
emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
engDeriver.On("RequestPendingSafeUpdate", context.Background()).Once()
ah.OnEvent(context.Background(), derive.DerivedAttributesEvent{Attributes: attr})
engDeriver.AssertExpectations(t)
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes, "queued up derived attributes")

Expand Down Expand Up @@ -344,8 +350,9 @@ func TestAttributesHandler(t *testing.T) {
ah.AttachEmitter(emitter)

emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
engDeriver.On("RequestPendingSafeUpdate", context.Background()).Once()
ah.OnEvent(context.Background(), derive.DerivedAttributesEvent{Attributes: attrA1Alt})
engDeriver.AssertExpectations(t)
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes, "queued up derived attributes")

Expand Down
4 changes: 4 additions & 0 deletions op-node/rollup/attributes/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ func (m *MockEngineController) TryUpdateLocalSafe(ctx context.Context, ref eth.L
func (m *MockEngineController) RequestForkchoiceUpdate(ctx context.Context) {
m.Mock.MethodCalled("RequestForkchoiceUpdate", ctx)
}

func (m *MockEngineController) RequestPendingSafeUpdate(ctx context.Context) {
m.Mock.MethodCalled("RequestPendingSafeUpdate", ctx)
}
2 changes: 2 additions & 0 deletions op-node/rollup/clsync/clsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func (f *fakeEngController) TryUpdatePendingSafe(ctx context.Context, ref eth.L2
}
func (f *fakeEngController) TryUpdateLocalSafe(ctx context.Context, ref eth.L2BlockRef, concluding bool, source eth.L1BlockRef) {
}
func (f *fakeEngController) RequestPendingSafeUpdate(ctx context.Context) {
}

func TestCLSync_InvalidPayloadDropsHead(t *testing.T) {
logger := testlog.Logger(t, 0)
Expand Down
2 changes: 1 addition & 1 deletion op-node/rollup/driver/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (s *SyncDeriver) SyncStep() {
// Instead, we request the engine to repeat where its pending-safe head is at.
// Upon the pending-safe signal the attributes deriver can then ask the pipeline
// to generate new attributes, if no attributes are known already.
s.Emitter.Emit(s.Ctx, engine.PendingSafeRequestEvent{})
s.Engine.RequestPendingSafeUpdate(s.Ctx)

}

Expand Down
13 changes: 7 additions & 6 deletions op-node/rollup/engine/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,12 +692,6 @@ func (d *EngineController) OnEvent(ctx context.Context, ev event.Event) bool {
case PromoteCrossUnsafeEvent:
d.SetCrossUnsafeHead(x.Ref)
d.onUnsafeUpdate(ctx, x.Ref, d.UnsafeL2Head())
case PendingSafeRequestEvent:
d.emitter.Emit(ctx, PendingSafeUpdateEvent{
PendingSafe: d.PendingSafeL2Head(),
Unsafe: d.UnsafeL2Head(),
})

case LocalSafeUpdateEvent:
// pre-interop everything that is local-unsafe is also immediately cross-unsafe.
if !d.rollupCfg.IsInterop(x.Ref.Time) {
Expand Down Expand Up @@ -755,6 +749,13 @@ func (d *EngineController) OnEvent(ctx context.Context, ev event.Event) bool {
return true
}

func (d *EngineController) RequestPendingSafeUpdate(ctx context.Context) {
d.emitter.Emit(ctx, PendingSafeUpdateEvent{
PendingSafe: d.PendingSafeL2Head(),
Unsafe: d.UnsafeL2Head(),
})
}

// TryUpdatePendingSafe updates the pending safe head if the new reference is newer
func (e *EngineController) TryUpdatePendingSafe(ctx context.Context, ref eth.L2BlockRef, concluding bool, source eth.L1BlockRef) {
// Only promote if not already stale.
Expand Down
7 changes: 0 additions & 7 deletions op-node/rollup/engine/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,6 @@ func (ev SafeDerivedEvent) String() string {
return "safe-derived"
}

type PendingSafeRequestEvent struct {
}

func (ev PendingSafeRequestEvent) String() string {
return "pending-safe-request"
}

type ProcessUnsafePayloadEvent struct {
Envelope *eth.ExecutionPayloadEnvelope
}
Expand Down
2 changes: 2 additions & 0 deletions op-node/rollup/sequencing/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ func (fakeEngController) TryUpdatePendingSafe(ctx context.Context, ref eth.L2Blo
func (fakeEngController) TryUpdateLocalSafe(ctx context.Context, ref eth.L2BlockRef, concluding bool, source eth.L1BlockRef) {
}

func (fakeEngController) RequestPendingSafeUpdate(ctx context.Context) {}

// TestSequencer_StartStop runs through start/stop state back and forth to test state changes.
func TestSequencer_StartStop(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
Expand Down
11 changes: 6 additions & 5 deletions op-program/client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, depSet derive.DependencySe
engResetDeriv.SetEngController(ec)

prog := &ProgramDeriver{
logger: logger,
Emitter: d,
closing: false,
result: eth.L2BlockRef{},
targetBlockNum: targetBlockNum,
logger: logger,
Emitter: d,
engineController: ec,
closing: false,
result: eth.L2BlockRef{},
targetBlockNum: targetBlockNum,
}

d.deriver = &event.DeriverMux{
Expand Down
14 changes: 10 additions & 4 deletions op-program/client/driver/program.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"github.com/ethereum-optimism/optimism/op-service/event"
)

type EngineController interface {
RequestPendingSafeUpdate(context.Context)
}

// ProgramDeriver expresses how engine and derivation events are
// translated and monitored to execute the pure L1 to L2 state transition.
//
Expand All @@ -22,6 +26,8 @@ type ProgramDeriver struct {

Emitter event.Emitter

engineController EngineController

closing bool
result eth.L2BlockRef
resultError error
Expand All @@ -42,11 +48,11 @@ func (d *ProgramDeriver) OnEvent(ctx context.Context, ev event.Event) bool {
d.Emitter.Emit(ctx, derive.ConfirmPipelineResetEvent{})
// After initial reset we can request the pending-safe block,
// where attributes will be generated on top of.
d.Emitter.Emit(ctx, engine.PendingSafeRequestEvent{})
d.engineController.RequestPendingSafeUpdate(ctx)
case engine.PendingSafeUpdateEvent:
d.Emitter.Emit(ctx, derive.PipelineStepEvent{PendingSafe: x.PendingSafe})
case derive.DeriverMoreEvent:
d.Emitter.Emit(ctx, engine.PendingSafeRequestEvent{})
d.engineController.RequestPendingSafeUpdate(ctx)
case derive.DerivedAttributesEvent:
// Allow new attributes to be generated.
// We will process the current attributes synchronously,
Expand All @@ -59,7 +65,7 @@ func (d *ProgramDeriver) OnEvent(ctx context.Context, ev event.Event) bool {
case engine.InvalidPayloadAttributesEvent:
// If a set of attributes was invalid, then we drop the attributes,
// and continue with the next.
d.Emitter.Emit(ctx, engine.PendingSafeRequestEvent{})
d.engineController.RequestPendingSafeUpdate(ctx)
case engine.ForkchoiceUpdateEvent:
// Track latest head.
if x.SafeL2Head.Number >= d.result.Number {
Expand Down Expand Up @@ -94,7 +100,7 @@ func (d *ProgramDeriver) OnEvent(ctx context.Context, ev event.Event) bool {
// (Legacy case): While most temporary errors are due to requests for external data failing which can't happen,
// they may also be returned due to other events like channels timing out so need to be handled
d.logger.Warn("Temporary error in derivation", "err", x.Err)
d.Emitter.Emit(ctx, engine.PendingSafeRequestEvent{})
d.engineController.RequestPendingSafeUpdate(ctx)
case rollup.CriticalErrorEvent:
d.closing = true
d.resultError = x.Err
Expand Down
35 changes: 16 additions & 19 deletions op-program/client/driver/program_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,21 @@ var (
errTestCrit = errors.New("crit test err")
)

type fakeEngineController struct{}

var _ EngineController = fakeEngineController{}

func (fakeEngineController) RequestPendingSafeUpdate(ctx context.Context) {}

func TestProgramDeriver(t *testing.T) {
newProgram := func(t *testing.T, target uint64) (*ProgramDeriver, *testutils.MockEmitter) {
m := &testutils.MockEmitter{}
logger := testlog.Logger(t, log.LevelInfo)
prog := &ProgramDeriver{
logger: logger,
Emitter: m,
targetBlockNum: target,
logger: logger,
engineController: fakeEngineController{},
Emitter: m,
targetBlockNum: target,
}
return prog, m
}
Expand All @@ -39,7 +46,6 @@ func TestProgramDeriver(t *testing.T) {
t.Run("engine reset confirmed", func(t *testing.T) {
p, m := newProgram(t, 1000)
m.ExpectOnce(derive.ConfirmPipelineResetEvent{})
m.ExpectOnce(engine.PendingSafeRequestEvent{})
p.OnEvent(context.Background(), engine.EngineResetConfirmedEvent{})
m.AssertExpectations(t)
require.False(t, p.closing)
Expand All @@ -60,7 +66,6 @@ func TestProgramDeriver(t *testing.T) {
// step 3: if no attributes are generated, loop back to derive more.
t.Run("deriver more", func(t *testing.T) {
p, m := newProgram(t, 1000)
m.ExpectOnce(engine.PendingSafeRequestEvent{})
p.OnEvent(context.Background(), derive.DeriverMoreEvent{})
m.AssertExpectations(t)
require.False(t, p.closing)
Expand All @@ -80,7 +85,6 @@ func TestProgramDeriver(t *testing.T) {
// step 5: if attributes were invalid, continue with derivation for new attributes.
t.Run("invalid payload", func(t *testing.T) {
p, m := newProgram(t, 1000)
m.ExpectOnce(engine.PendingSafeRequestEvent{})
p.OnEvent(context.Background(), engine.InvalidPayloadAttributesEvent{Attributes: &derive.AttributesWithParent{}})
m.AssertExpectations(t)
require.False(t, p.closing)
Expand Down Expand Up @@ -113,49 +117,42 @@ func TestProgramDeriver(t *testing.T) {
})
// Do not stop processing when the deriver is idle, the engine may still be busy and create further events.
t.Run("deriver idle", func(t *testing.T) {
p, m := newProgram(t, 1000)
p, _ := newProgram(t, 1000)
p.OnEvent(context.Background(), derive.DeriverIdleEvent{})
m.AssertExpectations(t)
require.False(t, p.closing)
require.NoError(t, p.resultError)
})
// on inconsistent chain data: stop with error
t.Run("reset event", func(t *testing.T) {
p, m := newProgram(t, 1000)
p, _ := newProgram(t, 1000)
p.OnEvent(context.Background(), rollup.ResetEvent{Err: errTestReset})
m.AssertExpectations(t)
require.True(t, p.closing)
require.Error(t, p.resultError)
})
// on L1 temporary error: stop with error
t.Run("L1 temporary error event", func(t *testing.T) {
p, m := newProgram(t, 1000)
p, _ := newProgram(t, 1000)
p.OnEvent(context.Background(), rollup.L1TemporaryErrorEvent{Err: errTestTemp})
m.AssertExpectations(t)
require.True(t, p.closing)
require.Error(t, p.resultError)
})
// on engine temporary error: continue derivation (because legacy, not all connection related)
t.Run("engine temp error event", func(t *testing.T) {
p, m := newProgram(t, 1000)
m.ExpectOnce(engine.PendingSafeRequestEvent{})
p, _ := newProgram(t, 1000)
p.OnEvent(context.Background(), rollup.EngineTemporaryErrorEvent{Err: errTestTemp})
m.AssertExpectations(t)
require.False(t, p.closing)
require.NoError(t, p.resultError)
})
// on critical error: stop
t.Run("critical error event", func(t *testing.T) {
p, m := newProgram(t, 1000)
p, _ := newProgram(t, 1000)
p.OnEvent(context.Background(), rollup.ResetEvent{Err: errTestCrit})
m.AssertExpectations(t)
require.True(t, p.closing)
require.Error(t, p.resultError)
})
t.Run("unknown event", func(t *testing.T) {
p, m := newProgram(t, 1000)
p, _ := newProgram(t, 1000)
p.OnEvent(context.Background(), TestEvent{})
m.AssertExpectations(t)
require.False(t, p.closing)
require.NoError(t, p.resultError)
})
Expand Down