Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 34 additions & 59 deletions op-node/rollup/derive/attributes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,75 +22,60 @@ import (
// This stage can be reset by clearing it's batch buffer.
// This stage does not need to retain any references to L1 blocks.

type AttributesQueueOutput interface {
AddSafeAttributes(attributes *eth.PayloadAttributes)
SafeL2Head() eth.L2BlockRef
StageProgress
}

type AttributesQueue struct {
log log.Logger
config *rollup.Config
dl L1ReceiptsFetcher
next AttributesQueueOutput
prev *BatchQueue
progress Progress
batches []*BatchData
log log.Logger
config *rollup.Config
dl L1ReceiptsFetcher
prev *BatchQueue
batch *BatchData
}

func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, next AttributesQueueOutput, prev *BatchQueue) *AttributesQueue {
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, prev *BatchQueue) *AttributesQueue {
return &AttributesQueue{
log: log,
config: cfg,
dl: l1Fetcher,
next: next,
prev: prev,
}
}

func (aq *AttributesQueue) AddBatch(batch *BatchData) {
aq.log.Debug("Received next batch", "batch_epoch", batch.EpochNum, "batch_timestamp", batch.Timestamp, "tx_count", len(batch.Transactions))
aq.batches = append(aq.batches, batch)
}

func (aq *AttributesQueue) Progress() Progress {
return aq.progress
func (aq *AttributesQueue) Origin() eth.L1BlockRef {
return aq.prev.Origin()
}

func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error {
if aq.progress.Origin != aq.prev.Origin() {
aq.progress.Closed = false
aq.progress.Origin = aq.prev.Origin()
return nil
func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// Get a batch if we need it
if aq.batch == nil {
batch, err := aq.prev.NextBatch(ctx, l2SafeHead)
if err != nil {
return nil, err
}
aq.batch = batch
}

if len(aq.batches) == 0 {
batch, err := aq.prev.NextBatch(ctx, aq.next.SafeL2Head())
if err == io.EOF {
if !aq.progress.Closed {
aq.progress.Closed = true
return nil
} else {
return io.EOF
}

} else if err != nil {
return err
}
aq.batches = append(aq.batches, batch)
// Actually generate the next attributes
if attrs, err := aq.createNextAttributes(ctx, aq.batch, l2SafeHead); err != nil {
return nil, err
} else {
// Clear out the local state once we will succeed
aq.batch = nil
return attrs, nil
}
batch := aq.batches[0]

safeL2Head := aq.next.SafeL2Head()
}

// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions
// to the attributes transaction list
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// sanity check parent hash
if batch.ParentHash != safeL2Head.Hash {
return NewCriticalError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, safeL2Head.Hash))
if batch.ParentHash != l2SafeHead.Hash {
return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash))
}
fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, safeL2Head, batch.Timestamp, batch.Epoch())
attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, l2SafeHead, batch.Timestamp, batch.Epoch())
if err != nil {
return err
return nil, err
}

// we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool
Expand All @@ -100,19 +85,9 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error {

aq.log.Info("generated attributes in payload queue", "txs", len(attrs.Transactions), "timestamp", batch.Timestamp)

// Slice off the batch once we are guaranteed to succeed
aq.batches = aq.batches[1:]

aq.next.AddSafeAttributes(attrs)
return nil
return attrs, nil
}

func (aq *AttributesQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
aq.batches = aq.batches[:0]
aq.progress = aq.next.Progress()
func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error {
return io.EOF
}

func (aq *AttributesQueue) SafeL2Head() eth.L2BlockRef {
return aq.next.SafeL2Head()
}
48 changes: 8 additions & 40 deletions op-node/rollup/derive/attributes_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package derive

import (
"context"
"io"
"math/big"
"math/rand"
"testing"
Expand All @@ -17,30 +16,10 @@ import (
"github.com/ethereum/go-ethereum/log"
)

type MockAttributesQueueOutput struct {
MockOriginStage
}

func (m *MockAttributesQueueOutput) AddSafeAttributes(attributes *eth.PayloadAttributes) {
m.Mock.MethodCalled("AddSafeAttributes", attributes)
}

func (m *MockAttributesQueueOutput) ExpectAddSafeAttributes(attributes *eth.PayloadAttributes) {
m.Mock.On("AddSafeAttributes", attributes).Once().Return()
}

func (m *MockAttributesQueueOutput) SafeL2Head() eth.L2BlockRef {
return m.Mock.MethodCalled("SafeL2Head").Get(0).(eth.L2BlockRef)
}

func (m *MockAttributesQueueOutput) ExpectSafeL2Head(head eth.L2BlockRef) {
m.Mock.On("SafeL2Head").Once().Return(head)
}

var _ AttributesQueueOutput = (*MockAttributesQueueOutput)(nil)

func TestAttributesQueue_Step(t *testing.T) {
t.Skip("don't fake out batch queue")
// TestAttributesQueue checks that it properly uses the PreparePayloadAttributes function
// (which is well tested) and that it properly sets NoTxPool and adds in the candidate
// transactions.
func TestAttributesQueue(t *testing.T) {
// test config, only init the necessary fields
cfg := &rollup.Config{
BlockTime: 2,
Expand All @@ -57,18 +36,9 @@ func TestAttributesQueue_Step(t *testing.T) {

l1Fetcher.ExpectInfoByHash(l1Info.InfoHash, l1Info, nil)

out := &MockAttributesQueueOutput{}
out.progress = Progress{
Origin: l1Info.BlockRef(),
Closed: false,
}
defer out.AssertExpectations(t)

safeHead := testutils.RandomL2BlockRef(rng)
safeHead.L1Origin = l1Info.ID()

out.ExpectSafeL2Head(safeHead)

batch := &BatchData{BatchV1{
ParentHash: safeHead.Hash,
EpochNum: rollup.Epoch(l1Info.InfoNum),
Expand All @@ -86,13 +56,11 @@ func TestAttributesQueue_Step(t *testing.T) {
Transactions: []eth.Data{l1InfoTx, eth.Data("foobar"), eth.Data("example")},
NoTxPool: true,
}
out.ExpectAddSafeAttributes(&attrs)

aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, out, nil)
require.NoError(t, RepeatResetStep(t, aq.ResetStep, l1Fetcher, 1))
aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, nil)

aq.AddBatch(batch)
actual, err := aq.createNextAttributes(context.Background(), batch, safeHead)

require.NoError(t, aq.Step(context.Background(), out.progress), "adding batch to next stage, no EOF yet")
require.Equal(t, io.EOF, aq.Step(context.Background(), out.progress), "done with batches")
require.Nil(t, err)
require.Equal(t, attrs, *actual)
}
45 changes: 31 additions & 14 deletions op-node/rollup/derive/engine_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import (
"github.com/ethereum/go-ethereum/log"
)

type NextAttributesProvider interface {
Origin() eth.L1BlockRef
NextAttributes(context.Context, eth.L2BlockRef) (*eth.PayloadAttributes, error)
}

type Engine interface {
GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error)
ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
Expand Down Expand Up @@ -64,23 +69,22 @@ type EngineQueue struct {

finalizedL1 eth.BlockID

progress Progress

safeAttributes []*eth.PayloadAttributes
unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps

// Tracks which L2 blocks where last derived from which L1 block. At most finalityLookback large.
finalityData []FinalityData

engine Engine
prev NextAttributesProvider

progress Progress // only used for pipeline resets

metrics Metrics
}

var _ AttributesQueueOutput = (*EngineQueue)(nil)

// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue {
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider) *EngineQueue {
return &EngineQueue{
log: log,
cfg: cfg,
Expand All @@ -91,6 +95,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
MaxSize: maxUnsafePayloadsMemory,
SizeFn: payloadMemSize,
},
prev: prev,
}
}

Expand Down Expand Up @@ -146,17 +151,30 @@ func (eq *EngineQueue) LastL2Time() uint64 {
return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp)
}

func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := eq.progress.Update(outer); err != nil || changed {
return err
}
func (eq *EngineQueue) Step(ctx context.Context, _ Progress) error {
if len(eq.safeAttributes) > 0 {
return eq.tryNextSafeAttributes(ctx)
}
outOfData := false
if len(eq.safeAttributes) == 0 {
if next, err := eq.prev.NextAttributes(ctx, eq.safeHead); err == io.EOF {
outOfData = true
} else if err != nil {
return err
} else {
eq.safeAttributes = append(eq.safeAttributes, next)
return NotEnoughData
}
}
if eq.unsafePayloads.Len() > 0 {
return eq.tryNextUnsafePayload(ctx)
}
return io.EOF

if outOfData {
return io.EOF
} else {
return nil
}
}

// tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized,
Expand Down Expand Up @@ -186,11 +204,11 @@ func (eq *EngineQueue) postProcessSafeL2() {
eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...)
}
// remember the last L2 block that we fully derived from the given finality data
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.progress.Origin.Number {
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.prev.Origin().Number {
// append entry for new L1 block
eq.finalityData = append(eq.finalityData, FinalityData{
L2Block: eq.safeHead,
L1Block: eq.progress.Origin.ID(),
L1Block: eq.prev.Origin().ID(),
})
} else {
// if it's a now L2 block that was derived from the same latest L1 block, then just update the entry
Expand All @@ -205,7 +223,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
"l2_safe", eq.safeHead,
"l2_unsafe", eq.unsafeHead,
"l2_time", eq.unsafeHead.Time,
"l1_derived", eq.progress.Origin,
"l1_derived", eq.prev.Origin(),
)
}

Expand Down Expand Up @@ -415,7 +433,6 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
// note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
eq.progress = Progress{
Origin: pipelineOrigin,
Closed: false,
}
eq.metrics.RecordL2Ref("l2_finalized", finalized)
eq.metrics.RecordL2Ref("l2_safe", safe)
Expand Down
27 changes: 22 additions & 5 deletions op-node/rollup/derive/engine_queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package derive

import (
"context"
"io"
"math/rand"
"testing"

Expand All @@ -14,6 +16,20 @@ import (
"github.com/ethereum/go-ethereum/log"
)

type fakeAttributesQueue struct {
origin eth.L1BlockRef
}

func (f *fakeAttributesQueue) Origin() eth.L1BlockRef {
return f.origin
}

func (f *fakeAttributesQueue) NextAttributes(_ context.Context, _ eth.L2BlockRef) (*eth.PayloadAttributes, error) {
return nil, io.EOF
}

var _ NextAttributesProvider = (*fakeAttributesQueue)(nil)

func TestEngineQueue_Finalize(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)

Expand Down Expand Up @@ -211,29 +227,30 @@ func TestEngineQueue_Finalize(t *testing.T) {
l1F.ExpectL1BlockRefByHash(refB.Hash, refB, nil)
l1F.ExpectL1BlockRefByNumber(refB.Number, refB, nil)

eq := NewEngineQueue(logger, cfg, eng, metrics)
require.NoError(t, RepeatResetStep(t, eq.ResetStep, l1F, 20))
prev := &fakeAttributesQueue{}

eq := NewEngineQueue(logger, cfg, eng, metrics, prev)
require.ErrorIs(t, eq.ResetStep(context.Background(), l1F), io.EOF)

require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B")
require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps")

// now say C1 was included in D and became the new safe head
eq.progress.Origin = refD
prev.origin = refD
eq.safeHead = refC1
eq.postProcessSafeL2()

// now say D0 was included in E and became the new safe head
eq.progress.Origin = refE
prev.origin = refE
eq.safeHead = refD0
eq.postProcessSafeL2()

// let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E)
eq.Finalize(refD.ID())

// Now a few steps later, without consuming any additional L1 inputs,
// we should be able to resolve that B1 is now finalized, since it was included in finalized L1 block C
require.NoError(t, RepeatStep(t, eq.Step, eq.progress, 10))
require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized")

l1F.AssertExpectations(t)
Expand Down
Loading