Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 89 additions & 22 deletions op-node/rollup/derive/engine_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"io"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"

"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

Expand All @@ -30,6 +30,29 @@ type Engine interface {
// Max number of unsafe payloads that may be queued up for execution
const maxUnsafePayloads = 50

// finalityLookback defines the amount of L1<>L2 relations to track for finalization purposes, one per L1 block.
//
// When L1 finalizes blocks, it finalizes finalityLookback blocks behind the L1 head.
// Non-finality may take longer, but when it does finalize again, it is within this range of the L1 head.
// Thus we only need to retain the L1<>L2 derivation relation data of this many L1 blocks.
//
// In the event of older finalization signals, misconfiguration, or insufficient L1<>L2 derivation relation data,
// then we may miss the opportunity to finalize more L2 blocks.
// This does not cause any divergence, it just causes lagging finalization status.
//
// The beacon chain on mainnet has 32 slots per epoch,
// and new finalization events happen at most 4 epochs behind the head.
// And then we add 1 to make pruning easier by leaving room for a new item without pruning the 32*4.
const finalityLookback = 4*32 + 1

type FinalityData struct {
// The last L2 block that was fully derived and inserted into the L2 engine while processing this L1 block.
L2Block eth.L2BlockRef
// The L1 block this stage was at when inserting the L2 block.
// When this L1 block is finalized, the L2 chain up to this block can be fully reproduced from finalized L1 data.
L1Block eth.BlockID
}

// EngineQueue queues up payload attributes to consolidate or process with the provided Engine
type EngineQueue struct {
log log.Logger
Expand All @@ -39,13 +62,16 @@ type EngineQueue struct {
safeHead eth.L2BlockRef
unsafeHead eth.L2BlockRef

toFinalize eth.BlockID
finalizedL1 eth.BlockID

progress Progress

safeAttributes []*eth.PayloadAttributes
unsafePayloads []*eth.ExecutionPayload

// Tracks which L2 blocks where last derived from which L1 block. At most finalityLookback large.
finalityData []FinalityData

engine Engine

metrics Metrics
Expand All @@ -55,7 +81,13 @@ var _ AttributesQueueOutput = (*EngineQueue)(nil)

// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue {
return &EngineQueue{log: log, cfg: cfg, engine: engine, metrics: metrics}
return &EngineQueue{
log: log,
cfg: cfg,
engine: engine,
metrics: metrics,
finalityData: make([]FinalityData, 0, finalityLookback),
}
}

func (eq *EngineQueue) Progress() Progress {
Expand All @@ -82,7 +114,8 @@ func (eq *EngineQueue) AddSafeAttributes(attributes *eth.PayloadAttributes) {
}

func (eq *EngineQueue) Finalize(l1Origin eth.BlockID) {
eq.toFinalize = l1Origin
eq.finalizedL1 = l1Origin
eq.tryFinalizeL2()
}

func (eq *EngineQueue) Finalized() eth.L2BlockRef {
Expand All @@ -108,14 +141,6 @@ func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := eq.progress.Update(outer); err != nil || changed {
return err
}

// TODO: check if engine unsafehead/safehead/finalized data match, return error and reset pipeline if not.
// maybe better to do in the driver instead.

// TODO: implement finalization
//if eq.finalized.ID() != eq.toFinalize {
// return eq.tryFinalize(ctx)
//}
if len(eq.safeAttributes) > 0 {
return eq.tryNextSafeAttributes(ctx)
}
Expand All @@ -125,13 +150,43 @@ func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error {
return io.EOF
}

// TODO: implement finalization
//func (eq *EngineQueue) tryFinalize(ctx context.Context) error {
// // find last l2 block ref that references the toFinalize origin, and is lower or equal to the safehead
// var finalizedL2 eth.L2BlockRef
// eq.finalized = finalizedL2
// return nil
//}
// tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized,
// and then marks the latest fully derived L2 block from this as finalized,
// or defaults to the current finalized L2 block.
func (eq *EngineQueue) tryFinalizeL2() {
if eq.finalizedL1 == (eth.BlockID{}) {
return // if no L1 information is finalized yet, then skip this
}
// default to keep the same finalized block
finalizedL2 := eq.finalized
// go through the latest inclusion data, and find the last L2 block that was derived from a finalized L1 block
for _, fd := range eq.finalityData {
if fd.L2Block.Number > finalizedL2.Number && fd.L1Block.Number <= eq.finalizedL1.Number {
finalizedL2 = fd.L2Block
}
}
eq.finalized = finalizedL2
}

// postProcessSafeL2 buffers the L1 block the safe head was fully derived from,
// to finalize it once the L1 block, or later, finalizes.
func (eq *EngineQueue) postProcessSafeL2() {
// prune finality data if necessary
if len(eq.finalityData) >= finalityLookback {
eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...)
}
// remember the last L2 block that we fully derived from the given finality data
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.progress.Origin.Number {
// append entry for new L1 block
eq.finalityData = append(eq.finalityData, FinalityData{
L2Block: eq.safeHead,
L1Block: eq.progress.Origin.ID(),
})
} else {
// if it's a now L2 block that was derived from the same latest L1 block, then just update the entry
eq.finalityData[len(eq.finalityData)-1].L2Block = eq.safeHead
}
}

func (eq *EngineQueue) logSyncProgress(reason string) {
eq.log.Info("Sync progress",
Expand Down Expand Up @@ -250,6 +305,7 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
eq.safeHead = ref
// unsafe head stays the same, we did not reorg the chain.
eq.safeAttributes = eq.safeAttributes[1:]
eq.postProcessSafeL2()
eq.logSyncProgress("reconciled with L1")

return nil
Expand Down Expand Up @@ -303,6 +359,7 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
eq.metrics.RecordL2Ref("l2_safe", ref)
eq.metrics.RecordL2Ref("l2_unsafe", ref)
eq.safeAttributes = eq.safeAttributes[1:]
eq.postProcessSafeL2()
eq.logSyncProgress("processed safe block derived from L1")

return nil
Expand All @@ -311,6 +368,14 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
// ResetStep Walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical.
// The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical.
func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
finalized, err := eq.engine.L2BlockRefByLabel(ctx, eth.Finalized)
if errors.Is(err, ethereum.NotFound) {
// default to genesis if we have not finalized anything before.
finalized, err = eq.engine.L2BlockRefByHash(ctx, eq.cfg.Genesis.L2.Hash)
}
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to find the finalized L2 block: %w", err))
}
// TODO: this should be resetting using the safe head instead. Out of scope for L2 client bindings PR.
prevUnsafe, err := eq.engine.L2BlockRefByLabel(ctx, eth.Unsafe)
if err != nil {
Expand All @@ -331,11 +396,13 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin)
eq.unsafeHead = unsafe
eq.safeHead = safe
eq.finalized = finalized
eq.finalityData = eq.finalityData[:0]
eq.progress = Progress{
Origin: l1Origin,
Closed: false,
}
eq.metrics.RecordL2Ref("l2_finalized", eq.finalized) // todo(proto): finalized L2 block updates
eq.metrics.RecordL2Ref("l2_finalized", finalized)
eq.metrics.RecordL2Ref("l2_safe", safe)
eq.metrics.RecordL2Ref("l2_unsafe", unsafe)
eq.logSyncProgress("reset derivation work")
Expand Down
145 changes: 145 additions & 0 deletions op-node/rollup/derive/engine_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package derive

import (
"math/rand"
"testing"

"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)

func TestEngineQueue_Finalize(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)

rng := rand.New(rand.NewSource(1234))
// create a short test L2 chain:
//
// L2:
// A0: genesis
// A1: finalized, incl in B
// B0: safe, incl in C
// B1: not yet included in L1
// C0: head, not included in L1 yet
//
// L1:
// A: genesis
// B: finalized, incl A1
// C: safe, incl B0
// D: unsafe, not yet referenced by L2

l1Time := uint64(2)
refA := testutils.RandomBlockRef(rng)

refB := eth.L1BlockRef{
Hash: testutils.RandomHash(rng),
Number: refA.Number + 1,
ParentHash: refA.Hash,
Time: refA.Time + l1Time,
}
refC := eth.L1BlockRef{
Hash: testutils.RandomHash(rng),
Number: refB.Number + 1,
ParentHash: refB.Hash,
Time: refB.Time + l1Time,
}
refD := eth.L1BlockRef{
Hash: testutils.RandomHash(rng),
Number: refC.Number + 1,
ParentHash: refC.Hash,
Time: refC.Time + l1Time,
}

refA0 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: 0,
ParentHash: common.Hash{},
Time: refA.Time,
L1Origin: refA.ID(),
SequenceNumber: 0,
}
cfg := &rollup.Config{
Genesis: rollup.Genesis{
L1: refA.ID(),
L2: refA0.ID(),
L2Time: refA0.Time,
},
BlockTime: 1,
SeqWindowSize: 2,
}
refA1 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: refA0.Number + 1,
ParentHash: refA0.Hash,
Time: refA0.Time + cfg.BlockTime,
L1Origin: refA.ID(),
SequenceNumber: 1,
}
refB0 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: refA1.Number + 1,
ParentHash: refA1.Hash,
Time: refA1.Time + cfg.BlockTime,
L1Origin: refB.ID(),
SequenceNumber: 0,
}
refB1 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: refB0.Number + 1,
ParentHash: refB0.Hash,
Time: refB0.Time + cfg.BlockTime,
L1Origin: refB.ID(),
SequenceNumber: 1,
}
refC0 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: refB1.Number + 1,
ParentHash: refB1.Hash,
Time: refB1.Time + cfg.BlockTime,
L1Origin: refC.ID(),
SequenceNumber: 0,
}

metrics := &TestMetrics{}
eng := &testutils.MockEngine{}
eng.ExpectL2BlockRefByLabel(eth.Finalized, refA1, nil)
// TODO(Proto): update expectation once we're using safe block label properly for sync starting point
eng.ExpectL2BlockRefByLabel(eth.Unsafe, refC0, nil)

// we find the common point to initialize to by comparing the L1 origins in the L2 chain with the L1 chain
l1F := &testutils.MockL1Source{}
l1F.ExpectL1BlockRefByLabel(eth.Unsafe, refD, nil)
l1F.ExpectL1BlockRefByNumber(refC0.L1Origin.Number, refC, nil)
eng.ExpectL2BlockRefByHash(refC0.ParentHash, refB1, nil) // good L1 origin
eng.ExpectL2BlockRefByHash(refB1.ParentHash, refB0, nil) // need a block with seqnr == 0, don't stop at above
l1F.ExpectL1BlockRefByHash(refB0.L1Origin.Hash, refB, nil) // the origin of the safe L2 head will be the L1 starting point for derivation.

eq := NewEngineQueue(logger, cfg, eng, metrics)
require.NoError(t, RepeatResetStep(t, eq.ResetStep, l1F, 3))

// TODO(proto): this is changing, needs to be a sequence window ago, but starting traversal back from safe block,
// safe blocks with canon origin are good, but we go back a full window to ensure they are all included in L1,
// by forcing them to be consolidated with L1 again.
require.Equal(t, eq.SafeL2Head(), refB0, "L2 reset should go back to sequence window ago")

require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps")

// we are not adding blocks in this test,
// but we can still trigger post-processing for the already existing safe head,
// so the engine can prepare to finalize that.
eq.postProcessSafeL2()
// let's finalize C, which included B0, but not B1
eq.Finalize(refC.ID())

// Now a few steps later, without consuming any additional L1 inputs,
// we should be able to resolve that B0 is now finalized
require.NoError(t, RepeatStep(t, eq.Step, eq.progress, 10))
require.Equal(t, refB0, eq.Finalized(), "B0 was included in finalized C, and should now be finalized")

l1F.AssertExpectations(t)
eng.AssertExpectations(t)
}
25 changes: 23 additions & 2 deletions op-node/rollup/derive/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"io"
"testing"

"github.com/stretchr/testify/mock"

"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/stretchr/testify/mock"
)

var _ Engine = (*testutils.MockEngine)(nil)
Expand Down Expand Up @@ -58,3 +58,24 @@ func RepeatStep(t *testing.T, step func(ctx context.Context, outer Progress) err
t.Fatal("ran out of steps")
return nil
}

// TestMetrics implements the metrics used in the derivation pipeline as no-op operations.
// Optionally a test may hook into the metrics
type TestMetrics struct {
recordL1Ref func(name string, ref eth.L1BlockRef)
recordL2Ref func(name string, ref eth.L2BlockRef)
}

func (t *TestMetrics) RecordL1Ref(name string, ref eth.L1BlockRef) {
if t.recordL1Ref != nil {
t.recordL1Ref(name, ref)
}
}

func (t *TestMetrics) RecordL2Ref(name string, ref eth.L2BlockRef) {
if t.recordL2Ref != nil {
t.recordL2Ref(name, ref)
}
}

var _ Metrics = (*TestMetrics)(nil)
2 changes: 1 addition & 1 deletion op-node/testutils/mock_l1.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type MockL1Source struct {
}

func (m *MockL1Source) L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error) {
out := m.Mock.MethodCalled("L1BlockRefByLabel")
out := m.Mock.MethodCalled("L1BlockRefByLabel", label)
return out[0].(eth.L1BlockRef), *out[1].(*error)
}

Expand Down
Loading