Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 36 additions & 44 deletions op-node/rollup/derive/attributes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,58 +22,60 @@ import (
// This stage can be reset by clearing it's batch buffer.
// This stage does not need to retain any references to L1 blocks.

type AttributesQueueOutput interface {
AddSafeAttributes(attributes *eth.PayloadAttributes)
SafeL2Head() eth.L2BlockRef
StageProgress
}

type AttributesQueue struct {
log log.Logger
config *rollup.Config
dl L1ReceiptsFetcher
next AttributesQueueOutput
progress Progress
batches []*BatchData
log log.Logger
config *rollup.Config
dl L1ReceiptsFetcher
prev *BatchQueue
batch *BatchData
}

func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, next AttributesQueueOutput) *AttributesQueue {
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, prev *BatchQueue) *AttributesQueue {
return &AttributesQueue{
log: log,
config: cfg,
dl: l1Fetcher,
next: next,
prev: prev,
}
}

func (aq *AttributesQueue) AddBatch(batch *BatchData) {
aq.log.Debug("Received next batch", "batch_epoch", batch.EpochNum, "batch_timestamp", batch.Timestamp, "tx_count", len(batch.Transactions))
aq.batches = append(aq.batches, batch)
func (aq *AttributesQueue) Origin() eth.L1BlockRef {
return aq.prev.Origin()
}

func (aq *AttributesQueue) Progress() Progress {
return aq.progress
}

func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := aq.progress.Update(outer); err != nil || changed {
return err
func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// Get a batch if we need it
if aq.batch == nil {
batch, err := aq.prev.NextBatch(ctx, l2SafeHead)
if err != nil {
return nil, err
}
aq.batch = batch
}
if len(aq.batches) == 0 {
return io.EOF

// Actually generate the next attributes
if attrs, err := aq.createNextAttributes(ctx, aq.batch, l2SafeHead); err != nil {
return nil, err
} else {
// Clear out the local state once we will succeed
aq.batch = nil
return attrs, nil
}
batch := aq.batches[0]

safeL2Head := aq.next.SafeL2Head()
}

// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions
// to the attributes transaction list
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// sanity check parent hash
if batch.ParentHash != safeL2Head.Hash {
return NewCriticalError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, safeL2Head.Hash))
if batch.ParentHash != l2SafeHead.Hash {
return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash))
}
fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, safeL2Head, batch.Timestamp, batch.Epoch())
attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, l2SafeHead, batch.Timestamp, batch.Epoch())
if err != nil {
return err
return nil, err
}

// we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool
Expand All @@ -83,19 +85,9 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error {

aq.log.Info("generated attributes in payload queue", "txs", len(attrs.Transactions), "timestamp", batch.Timestamp)

// Slice off the batch once we are guaranteed to succeed
aq.batches = aq.batches[1:]

aq.next.AddSafeAttributes(attrs)
return nil
return attrs, nil
}

func (aq *AttributesQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
aq.batches = aq.batches[:0]
aq.progress = aq.next.Progress()
func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error {
return io.EOF
}

func (aq *AttributesQueue) SafeL2Head() eth.L2BlockRef {
return aq.next.SafeL2Head()
}
47 changes: 8 additions & 39 deletions op-node/rollup/derive/attributes_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package derive

import (
"context"
"io"
"math/big"
"math/rand"
"testing"
Expand All @@ -17,29 +16,10 @@ import (
"github.com/ethereum/go-ethereum/log"
)

type MockAttributesQueueOutput struct {
MockOriginStage
}

func (m *MockAttributesQueueOutput) AddSafeAttributes(attributes *eth.PayloadAttributes) {
m.Mock.MethodCalled("AddSafeAttributes", attributes)
}

func (m *MockAttributesQueueOutput) ExpectAddSafeAttributes(attributes *eth.PayloadAttributes) {
m.Mock.On("AddSafeAttributes", attributes).Once().Return()
}

func (m *MockAttributesQueueOutput) SafeL2Head() eth.L2BlockRef {
return m.Mock.MethodCalled("SafeL2Head").Get(0).(eth.L2BlockRef)
}

func (m *MockAttributesQueueOutput) ExpectSafeL2Head(head eth.L2BlockRef) {
m.Mock.On("SafeL2Head").Once().Return(head)
}

var _ AttributesQueueOutput = (*MockAttributesQueueOutput)(nil)

func TestAttributesQueue_Step(t *testing.T) {
// TestAttributesQueue checks that it properly uses the PreparePayloadAttributes function
// (which is well tested) and that it properly sets NoTxPool and adds in the candidate
// transactions.
func TestAttributesQueue(t *testing.T) {
// test config, only init the necessary fields
cfg := &rollup.Config{
BlockTime: 2,
Expand All @@ -56,18 +36,9 @@ func TestAttributesQueue_Step(t *testing.T) {

l1Fetcher.ExpectInfoByHash(l1Info.InfoHash, l1Info, nil)

out := &MockAttributesQueueOutput{}
out.progress = Progress{
Origin: l1Info.BlockRef(),
Closed: false,
}
defer out.AssertExpectations(t)

safeHead := testutils.RandomL2BlockRef(rng)
safeHead.L1Origin = l1Info.ID()

out.ExpectSafeL2Head(safeHead)

batch := &BatchData{BatchV1{
ParentHash: safeHead.Hash,
EpochNum: rollup.Epoch(l1Info.InfoNum),
Expand All @@ -85,13 +56,11 @@ func TestAttributesQueue_Step(t *testing.T) {
Transactions: []eth.Data{l1InfoTx, eth.Data("foobar"), eth.Data("example")},
NoTxPool: true,
}
out.ExpectAddSafeAttributes(&attrs)

aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, out)
require.NoError(t, RepeatResetStep(t, aq.ResetStep, l1Fetcher, 1))
aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, nil)

aq.AddBatch(batch)
actual, err := aq.createNextAttributes(context.Background(), batch, safeHead)

require.NoError(t, aq.Step(context.Background(), out.progress), "adding batch to next stage, no EOF yet")
require.Equal(t, io.EOF, aq.Step(context.Background(), out.progress), "done with batches")
require.Nil(t, err)
require.Equal(t, attrs, *actual)
}
105 changes: 69 additions & 36 deletions op-node/rollup/derive/batch_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,18 @@ type BatchQueueOutput interface {
SafeL2Head() eth.L2BlockRef
}

type NextBatchProvider interface {
Origin() eth.L1BlockRef
NextBatch(ctx context.Context) (*BatchData, error)
}

// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
log log.Logger
config *rollup.Config
next BatchQueueOutput
progress Progress
log log.Logger
config *rollup.Config
prev NextBatchProvider
origin eth.L1BlockRef

l1Blocks []eth.L1BlockRef

Expand All @@ -47,62 +52,91 @@ type BatchQueue struct {
}

// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, next BatchQueueOutput) *BatchQueue {
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider) *BatchQueue {
return &BatchQueue{
log: log,
config: cfg,
next: next,
prev: prev,
}
}

func (bq *BatchQueue) Progress() Progress {
return bq.progress
func (bq *BatchQueue) Origin() eth.L1BlockRef {
return bq.prev.Origin()
}

func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := bq.progress.Update(outer); err != nil {
return err
} else if changed {
if !bq.progress.Closed { // init inputs if we moved to a new open origin
bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin)
func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) {
originBehind := bq.origin.Number < safeL2Head.L1Origin.Number

// Advance origin if needed
// Note: The entire pipeline has the same origin
// We just don't accept batches prior to the L1 origin of the L2 safe head
if bq.origin != bq.prev.Origin() {
bq.origin = bq.prev.Origin()
if !originBehind {
bq.l1Blocks = append(bq.l1Blocks, bq.origin)
} else {
// This is to handle the special case of startup. At startup we call Reset & include
// the L1 origin. That is the only time where immediately after `Reset` is called
// originBehind is false.
bq.l1Blocks = bq.l1Blocks[:0]
}
return nil
bq.log.Info("Advancing bq origin", "origin", bq.origin)
}
batch, err := bq.deriveNextBatch(ctx)
if err == io.EOF {
// very noisy, commented for now, or we should bump log level from trace to debug
// bq.log.Trace("need more L1 data before deriving next batch", "progress", bq.progress.Origin)
return io.EOF

// Load more data into the batch queue
outOfData := false
if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
outOfData = true
} else if err != nil {
return err
return nil, err
} else if !originBehind {
bq.AddBatch(batch, safeL2Head)
}
bq.next.AddBatch(batch)
return nil

// Skip adding data unless we are up to date with the origin, but do fully
// empty the previous stages
if originBehind {
if outOfData {
return nil, io.EOF
} else {
return nil, NotEnoughData
}
}

// Finally attempt to derive more batches
batch, err := bq.deriveNextBatch(ctx, outOfData, safeL2Head)
if err == io.EOF && outOfData {
return nil, io.EOF
} else if err == io.EOF {
return nil, NotEnoughData
} else if err != nil {
return nil, err
}
return batch, nil
}

func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef) error {
// Copy over the Origin from the next stage
// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
bq.progress = bq.next.Progress()
bq.origin = base
bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock)
// Include the new origin as an origin to build on
// Note: This is only for the initialization case. During normal resets we will later
// throw out this block.
bq.l1Blocks = bq.l1Blocks[:0]
bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin)
bq.l1Blocks = append(bq.l1Blocks, base)
return io.EOF
}

func (bq *BatchQueue) AddBatch(batch *BatchData) {
if bq.progress.Closed {
panic("write batch while closed")
}
func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) {
if len(bq.l1Blocks) == 0 {
panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp))
}
data := BatchWithL1InclusionBlock{
L1InclusionBlock: bq.progress.Origin,
L1InclusionBlock: bq.origin,
Batch: batch,
}
validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, bq.next.SafeL2Head(), &data)
validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data)
if validity == BatchDrop {
return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
}
Expand All @@ -113,12 +147,11 @@ func (bq *BatchQueue) AddBatch(batch *BatchData) {
// following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned.
func (bq *BatchQueue) deriveNextBatch(ctx context.Context) (*BatchData, error) {
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) {
if len(bq.l1Blocks) == 0 {
return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
}
epoch := bq.l1Blocks[0]
l2SafeHead := bq.next.SafeL2Head()

if l2SafeHead.L1Origin != epoch.ID() {
return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head %s", epoch, l2SafeHead))
Expand Down Expand Up @@ -183,8 +216,8 @@ batchLoop:
// i.e. if the sequence window expired, we create empty batches
expiryEpoch := epoch.Number + bq.config.SeqWindowSize
forceNextEpoch :=
(expiryEpoch == bq.progress.Origin.Number && bq.progress.Closed) ||
expiryEpoch < bq.progress.Origin.Number
(expiryEpoch == bq.origin.Number && outOfData) ||
expiryEpoch < bq.origin.Number

if !forceNextEpoch {
// sequence window did not expire yet, still room to receive batches for the current epoch,
Expand Down
Loading