diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 8ccdcc1c542ad..0e0da3b8002d6 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -7,13 +7,13 @@ import ( "io" "time" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/sync" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" ) @@ -30,6 +30,29 @@ type Engine interface { // Max number of unsafe payloads that may be queued up for execution const maxUnsafePayloads = 50 +// finalityLookback defines the amount of L1<>L2 relations to track for finalization purposes, one per L1 block. +// +// When L1 finalizes blocks, it finalizes finalityLookback blocks behind the L1 head. +// Non-finality may take longer, but when it does finalize again, it is within this range of the L1 head. +// Thus we only need to retain the L1<>L2 derivation relation data of this many L1 blocks. +// +// In the event of older finalization signals, misconfiguration, or insufficient L1<>L2 derivation relation data, +// then we may miss the opportunity to finalize more L2 blocks. +// This does not cause any divergence, it just causes lagging finalization status. +// +// The beacon chain on mainnet has 32 slots per epoch, +// and new finalization events happen at most 4 epochs behind the head. +// And then we add 1 to make pruning easier by leaving room for a new item without pruning the 32*4. +const finalityLookback = 4*32 + 1 + +type FinalityData struct { + // The last L2 block that was fully derived and inserted into the L2 engine while processing this L1 block. + L2Block eth.L2BlockRef + // The L1 block this stage was at when inserting the L2 block. + // When this L1 block is finalized, the L2 chain up to this block can be fully reproduced from finalized L1 data. + L1Block eth.BlockID +} + // EngineQueue queues up payload attributes to consolidate or process with the provided Engine type EngineQueue struct { log log.Logger @@ -39,13 +62,16 @@ type EngineQueue struct { safeHead eth.L2BlockRef unsafeHead eth.L2BlockRef - toFinalize eth.BlockID + finalizedL1 eth.BlockID progress Progress safeAttributes []*eth.PayloadAttributes unsafePayloads []*eth.ExecutionPayload + // Tracks which L2 blocks where last derived from which L1 block. At most finalityLookback large. + finalityData []FinalityData + engine Engine metrics Metrics @@ -55,7 +81,13 @@ var _ AttributesQueueOutput = (*EngineQueue)(nil) // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue { - return &EngineQueue{log: log, cfg: cfg, engine: engine, metrics: metrics} + return &EngineQueue{ + log: log, + cfg: cfg, + engine: engine, + metrics: metrics, + finalityData: make([]FinalityData, 0, finalityLookback), + } } func (eq *EngineQueue) Progress() Progress { @@ -82,7 +114,8 @@ func (eq *EngineQueue) AddSafeAttributes(attributes *eth.PayloadAttributes) { } func (eq *EngineQueue) Finalize(l1Origin eth.BlockID) { - eq.toFinalize = l1Origin + eq.finalizedL1 = l1Origin + eq.tryFinalizeL2() } func (eq *EngineQueue) Finalized() eth.L2BlockRef { @@ -108,14 +141,6 @@ func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error { if changed, err := eq.progress.Update(outer); err != nil || changed { return err } - - // TODO: check if engine unsafehead/safehead/finalized data match, return error and reset pipeline if not. - // maybe better to do in the driver instead. - - // TODO: implement finalization - //if eq.finalized.ID() != eq.toFinalize { - // return eq.tryFinalize(ctx) - //} if len(eq.safeAttributes) > 0 { return eq.tryNextSafeAttributes(ctx) } @@ -125,13 +150,43 @@ func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error { return io.EOF } -// TODO: implement finalization -//func (eq *EngineQueue) tryFinalize(ctx context.Context) error { -// // find last l2 block ref that references the toFinalize origin, and is lower or equal to the safehead -// var finalizedL2 eth.L2BlockRef -// eq.finalized = finalizedL2 -// return nil -//} +// tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized, +// and then marks the latest fully derived L2 block from this as finalized, +// or defaults to the current finalized L2 block. +func (eq *EngineQueue) tryFinalizeL2() { + if eq.finalizedL1 == (eth.BlockID{}) { + return // if no L1 information is finalized yet, then skip this + } + // default to keep the same finalized block + finalizedL2 := eq.finalized + // go through the latest inclusion data, and find the last L2 block that was derived from a finalized L1 block + for _, fd := range eq.finalityData { + if fd.L2Block.Number > finalizedL2.Number && fd.L1Block.Number <= eq.finalizedL1.Number { + finalizedL2 = fd.L2Block + } + } + eq.finalized = finalizedL2 +} + +// postProcessSafeL2 buffers the L1 block the safe head was fully derived from, +// to finalize it once the L1 block, or later, finalizes. +func (eq *EngineQueue) postProcessSafeL2() { + // prune finality data if necessary + if len(eq.finalityData) >= finalityLookback { + eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...) + } + // remember the last L2 block that we fully derived from the given finality data + if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.progress.Origin.Number { + // append entry for new L1 block + eq.finalityData = append(eq.finalityData, FinalityData{ + L2Block: eq.safeHead, + L1Block: eq.progress.Origin.ID(), + }) + } else { + // if it's a now L2 block that was derived from the same latest L1 block, then just update the entry + eq.finalityData[len(eq.finalityData)-1].L2Block = eq.safeHead + } +} func (eq *EngineQueue) logSyncProgress(reason string) { eq.log.Info("Sync progress", @@ -250,6 +305,7 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error eq.safeHead = ref // unsafe head stays the same, we did not reorg the chain. eq.safeAttributes = eq.safeAttributes[1:] + eq.postProcessSafeL2() eq.logSyncProgress("reconciled with L1") return nil @@ -303,6 +359,7 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { eq.metrics.RecordL2Ref("l2_safe", ref) eq.metrics.RecordL2Ref("l2_unsafe", ref) eq.safeAttributes = eq.safeAttributes[1:] + eq.postProcessSafeL2() eq.logSyncProgress("processed safe block derived from L1") return nil @@ -311,6 +368,14 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { // ResetStep Walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical. // The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical. func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { + finalized, err := eq.engine.L2BlockRefByLabel(ctx, eth.Finalized) + if errors.Is(err, ethereum.NotFound) { + // default to genesis if we have not finalized anything before. + finalized, err = eq.engine.L2BlockRefByHash(ctx, eq.cfg.Genesis.L2.Hash) + } + if err != nil { + return NewTemporaryError(fmt.Errorf("failed to find the finalized L2 block: %w", err)) + } // TODO: this should be resetting using the safe head instead. Out of scope for L2 client bindings PR. prevUnsafe, err := eq.engine.L2BlockRefByLabel(ctx, eth.Unsafe) if err != nil { @@ -331,11 +396,13 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin) eq.unsafeHead = unsafe eq.safeHead = safe + eq.finalized = finalized + eq.finalityData = eq.finalityData[:0] eq.progress = Progress{ Origin: l1Origin, Closed: false, } - eq.metrics.RecordL2Ref("l2_finalized", eq.finalized) // todo(proto): finalized L2 block updates + eq.metrics.RecordL2Ref("l2_finalized", finalized) eq.metrics.RecordL2Ref("l2_safe", safe) eq.metrics.RecordL2Ref("l2_unsafe", unsafe) eq.logSyncProgress("reset derivation work") diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go new file mode 100644 index 0000000000000..103d44d4f663c --- /dev/null +++ b/op-node/rollup/derive/engine_queue_test.go @@ -0,0 +1,145 @@ +package derive + +import ( + "math/rand" + "testing" + + "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/testlog" + "github.com/ethereum-optimism/optimism/op-node/testutils" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" +) + +func TestEngineQueue_Finalize(t *testing.T) { + logger := testlog.Logger(t, log.LvlInfo) + + rng := rand.New(rand.NewSource(1234)) + // create a short test L2 chain: + // + // L2: + // A0: genesis + // A1: finalized, incl in B + // B0: safe, incl in C + // B1: not yet included in L1 + // C0: head, not included in L1 yet + // + // L1: + // A: genesis + // B: finalized, incl A1 + // C: safe, incl B0 + // D: unsafe, not yet referenced by L2 + + l1Time := uint64(2) + refA := testutils.RandomBlockRef(rng) + + refB := eth.L1BlockRef{ + Hash: testutils.RandomHash(rng), + Number: refA.Number + 1, + ParentHash: refA.Hash, + Time: refA.Time + l1Time, + } + refC := eth.L1BlockRef{ + Hash: testutils.RandomHash(rng), + Number: refB.Number + 1, + ParentHash: refB.Hash, + Time: refB.Time + l1Time, + } + refD := eth.L1BlockRef{ + Hash: testutils.RandomHash(rng), + Number: refC.Number + 1, + ParentHash: refC.Hash, + Time: refC.Time + l1Time, + } + + refA0 := eth.L2BlockRef{ + Hash: testutils.RandomHash(rng), + Number: 0, + ParentHash: common.Hash{}, + Time: refA.Time, + L1Origin: refA.ID(), + SequenceNumber: 0, + } + cfg := &rollup.Config{ + Genesis: rollup.Genesis{ + L1: refA.ID(), + L2: refA0.ID(), + L2Time: refA0.Time, + }, + BlockTime: 1, + SeqWindowSize: 2, + } + refA1 := eth.L2BlockRef{ + Hash: testutils.RandomHash(rng), + Number: refA0.Number + 1, + ParentHash: refA0.Hash, + Time: refA0.Time + cfg.BlockTime, + L1Origin: refA.ID(), + SequenceNumber: 1, + } + refB0 := eth.L2BlockRef{ + Hash: testutils.RandomHash(rng), + Number: refA1.Number + 1, + ParentHash: refA1.Hash, + Time: refA1.Time + cfg.BlockTime, + L1Origin: refB.ID(), + SequenceNumber: 0, + } + refB1 := eth.L2BlockRef{ + Hash: testutils.RandomHash(rng), + Number: refB0.Number + 1, + ParentHash: refB0.Hash, + Time: refB0.Time + cfg.BlockTime, + L1Origin: refB.ID(), + SequenceNumber: 1, + } + refC0 := eth.L2BlockRef{ + Hash: testutils.RandomHash(rng), + Number: refB1.Number + 1, + ParentHash: refB1.Hash, + Time: refB1.Time + cfg.BlockTime, + L1Origin: refC.ID(), + SequenceNumber: 0, + } + + metrics := &TestMetrics{} + eng := &testutils.MockEngine{} + eng.ExpectL2BlockRefByLabel(eth.Finalized, refA1, nil) + // TODO(Proto): update expectation once we're using safe block label properly for sync starting point + eng.ExpectL2BlockRefByLabel(eth.Unsafe, refC0, nil) + + // we find the common point to initialize to by comparing the L1 origins in the L2 chain with the L1 chain + l1F := &testutils.MockL1Source{} + l1F.ExpectL1BlockRefByLabel(eth.Unsafe, refD, nil) + l1F.ExpectL1BlockRefByNumber(refC0.L1Origin.Number, refC, nil) + eng.ExpectL2BlockRefByHash(refC0.ParentHash, refB1, nil) // good L1 origin + eng.ExpectL2BlockRefByHash(refB1.ParentHash, refB0, nil) // need a block with seqnr == 0, don't stop at above + l1F.ExpectL1BlockRefByHash(refB0.L1Origin.Hash, refB, nil) // the origin of the safe L2 head will be the L1 starting point for derivation. + + eq := NewEngineQueue(logger, cfg, eng, metrics) + require.NoError(t, RepeatResetStep(t, eq.ResetStep, l1F, 3)) + + // TODO(proto): this is changing, needs to be a sequence window ago, but starting traversal back from safe block, + // safe blocks with canon origin are good, but we go back a full window to ensure they are all included in L1, + // by forcing them to be consolidated with L1 again. + require.Equal(t, eq.SafeL2Head(), refB0, "L2 reset should go back to sequence window ago") + + require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps") + + // we are not adding blocks in this test, + // but we can still trigger post-processing for the already existing safe head, + // so the engine can prepare to finalize that. + eq.postProcessSafeL2() + // let's finalize C, which included B0, but not B1 + eq.Finalize(refC.ID()) + + // Now a few steps later, without consuming any additional L1 inputs, + // we should be able to resolve that B0 is now finalized + require.NoError(t, RepeatStep(t, eq.Step, eq.progress, 10)) + require.Equal(t, refB0, eq.Finalized(), "B0 was included in finalized C, and should now be finalized") + + l1F.AssertExpectations(t) + eng.AssertExpectations(t) +} diff --git a/op-node/rollup/derive/pipeline_test.go b/op-node/rollup/derive/pipeline_test.go index 9ac4bd9eca118..4b339395e74ec 100644 --- a/op-node/rollup/derive/pipeline_test.go +++ b/op-node/rollup/derive/pipeline_test.go @@ -5,9 +5,9 @@ import ( "io" "testing" - "github.com/stretchr/testify/mock" - + "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/testutils" + "github.com/stretchr/testify/mock" ) var _ Engine = (*testutils.MockEngine)(nil) @@ -58,3 +58,24 @@ func RepeatStep(t *testing.T, step func(ctx context.Context, outer Progress) err t.Fatal("ran out of steps") return nil } + +// TestMetrics implements the metrics used in the derivation pipeline as no-op operations. +// Optionally a test may hook into the metrics +type TestMetrics struct { + recordL1Ref func(name string, ref eth.L1BlockRef) + recordL2Ref func(name string, ref eth.L2BlockRef) +} + +func (t *TestMetrics) RecordL1Ref(name string, ref eth.L1BlockRef) { + if t.recordL1Ref != nil { + t.recordL1Ref(name, ref) + } +} + +func (t *TestMetrics) RecordL2Ref(name string, ref eth.L2BlockRef) { + if t.recordL2Ref != nil { + t.recordL2Ref(name, ref) + } +} + +var _ Metrics = (*TestMetrics)(nil) diff --git a/op-node/testutils/mock_l1.go b/op-node/testutils/mock_l1.go index 81f2719d57580..79543d494f6fc 100644 --- a/op-node/testutils/mock_l1.go +++ b/op-node/testutils/mock_l1.go @@ -12,7 +12,7 @@ type MockL1Source struct { } func (m *MockL1Source) L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error) { - out := m.Mock.MethodCalled("L1BlockRefByLabel") + out := m.Mock.MethodCalled("L1BlockRefByLabel", label) return out[0].(eth.L1BlockRef), *out[1].(*error) } diff --git a/op-node/testutils/mock_l2.go b/op-node/testutils/mock_l2.go index 7b48e3f3c73f3..2e5b9ca4c16d9 100644 --- a/op-node/testutils/mock_l2.go +++ b/op-node/testutils/mock_l2.go @@ -15,7 +15,7 @@ func (c *MockL2Client) L2BlockRefByLabel(ctx context.Context, label eth.BlockLab return c.Mock.MethodCalled("L2BlockRefByLabel", label).Get(0).(eth.L2BlockRef), nil } -func (m *MockL1Source) ExpectL2BlockRefByLabel(label eth.BlockLabel, ref eth.L2BlockRef, err error) { +func (m *MockL2Client) ExpectL2BlockRefByLabel(label eth.BlockLabel, ref eth.L2BlockRef, err error) { m.Mock.On("L2BlockRefByLabel", label).Once().Return(ref, &err) } @@ -23,7 +23,7 @@ func (c *MockL2Client) L2BlockRefByNumber(ctx context.Context, num uint64) (eth. return c.Mock.MethodCalled("L2BlockRefByNumber", num).Get(0).(eth.L2BlockRef), nil } -func (m *MockL1Source) ExpectL2BlockRefByNumber(num uint64, ref eth.L2BlockRef, err error) { +func (m *MockL2Client) ExpectL2BlockRefByNumber(num uint64, ref eth.L2BlockRef, err error) { m.Mock.On("L2BlockRefByNumber", num).Once().Return(ref, &err) } @@ -31,6 +31,6 @@ func (c *MockL2Client) L2BlockRefByHash(ctx context.Context, hash common.Hash) ( return c.Mock.MethodCalled("L2BlockRefByHash", hash).Get(0).(eth.L2BlockRef), nil } -func (m *MockL1Source) ExpectL2BlockRefByHash(hash common.Hash, ref eth.L2BlockRef, err error) { +func (m *MockL2Client) ExpectL2BlockRefByHash(hash common.Hash, ref eth.L2BlockRef, err error) { m.Mock.On("L2BlockRefByHash", hash).Once().Return(ref, &err) }