Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 25 additions & 44 deletions op-node/rollup/derive/attributes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,58 +22,46 @@ import (
// This stage can be reset by clearing it's batch buffer.
// This stage does not need to retain any references to L1 blocks.

type AttributesQueueOutput interface {
AddSafeAttributes(attributes *eth.PayloadAttributes)
SafeL2Head() eth.L2BlockRef
StageProgress
}

type AttributesQueue struct {
log log.Logger
config *rollup.Config
dl L1ReceiptsFetcher
next AttributesQueueOutput
progress Progress
batches []*BatchData
log log.Logger
config *rollup.Config
dl L1ReceiptsFetcher
prev *BatchQueue
batch *BatchData
}

func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, next AttributesQueueOutput) *AttributesQueue {
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, prev *BatchQueue) *AttributesQueue {
return &AttributesQueue{
log: log,
config: cfg,
dl: l1Fetcher,
next: next,
prev: prev,
}
}

func (aq *AttributesQueue) AddBatch(batch *BatchData) {
aq.log.Debug("Received next batch", "batch_epoch", batch.EpochNum, "batch_timestamp", batch.Timestamp, "tx_count", len(batch.Transactions))
aq.batches = append(aq.batches, batch)
}

func (aq *AttributesQueue) Progress() Progress {
return aq.progress
func (aq *AttributesQueue) Origin() eth.L1BlockRef {
return aq.prev.Origin()
}

func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := aq.progress.Update(outer); err != nil || changed {
return err
func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
if aq.batch == nil {
batch, err := aq.prev.NextBatch(ctx, l2SafeHead)
if err != nil {
return nil, err
}
aq.batch = batch
}
if len(aq.batches) == 0 {
return io.EOF
}
batch := aq.batches[0]
batch := aq.batch

safeL2Head := aq.next.SafeL2Head()
// sanity check parent hash
if batch.ParentHash != safeL2Head.Hash {
return NewCriticalError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, safeL2Head.Hash))
if batch.ParentHash != l2SafeHead.Hash {
return nil, NewCriticalError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash))
}
fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, safeL2Head, batch.Timestamp, batch.Epoch())
attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, l2SafeHead, batch.Timestamp, batch.Epoch())
if err != nil {
return err
return nil, err
}

// we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool
Expand All @@ -83,19 +71,12 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error {

aq.log.Info("generated attributes in payload queue", "txs", len(attrs.Transactions), "timestamp", batch.Timestamp)

// Slice off the batch once we are guaranteed to succeed
aq.batches = aq.batches[1:]
// Clear out the local state once we will succeed
aq.batch = nil

aq.next.AddSafeAttributes(attrs)
return nil
return attrs, nil
}

func (aq *AttributesQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
aq.batches = aq.batches[:0]
aq.progress = aq.next.Progress()
func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error {
return io.EOF
}

func (aq *AttributesQueue) SafeL2Head() eth.L2BlockRef {
return aq.next.SafeL2Head()
}
106 changes: 64 additions & 42 deletions op-node/rollup/derive/batch_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,13 @@ import (
// It is internally responsible for making sure that batches with L1 inclusions block outside it's
// working range are not considered or pruned.

type BatchQueueOutput interface {
StageProgress
AddBatch(batch *BatchData)
SafeL2Head() eth.L2BlockRef
}

// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
log log.Logger
config *rollup.Config
next BatchQueueOutput
progress Progress
log log.Logger
config *rollup.Config
prev *ChannelInReader
origin eth.L1BlockRef

l1Blocks []eth.L1BlockRef

Expand All @@ -47,62 +41,91 @@ type BatchQueue struct {
}

// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, next BatchQueueOutput) *BatchQueue {
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev *ChannelInReader) *BatchQueue {
return &BatchQueue{
log: log,
config: cfg,
next: next,
prev: prev,
}
}

func (bq *BatchQueue) Progress() Progress {
return bq.progress
func (bq *BatchQueue) Origin() eth.L1BlockRef {
return bq.prev.Origin()
}

func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := bq.progress.Update(outer); err != nil {
return err
} else if changed {
if !bq.progress.Closed { // init inputs if we moved to a new open origin
bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin)
func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) {
originBehind := bq.origin.Number < safeL2Head.L1Origin.Number

// Advance origin if needed
// Note: The entire pipeline has the same origin
// We just don't accept batches prior to the L1 origin of the L2 safe head
if bq.origin != bq.prev.Origin() {
bq.origin = bq.prev.Origin()
if !originBehind {
bq.l1Blocks = append(bq.l1Blocks, bq.origin)
} else {
// This is to handle the special case of startup. At startup we call Reset & include
// the L1 origin. That is the only time where immediately after `Reset` is called
// originBehind is false.
bq.l1Blocks = bq.l1Blocks[:0]
}
bq.log.Info("Advancing bq origin", "origin", bq.origin)
}

// Load more data into the batch queue
outOfData := false
if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
outOfData = true
} else if err != nil {
return nil, err
} else if !originBehind {
bq.AddBatch(batch, safeL2Head)
}

// Skip adding data unless we are up to date with the origin, but do fully
// empty the previous stages
if originBehind {
if outOfData {
return nil, io.EOF
} else {
return nil, NotEnoughData
}
return nil
}
batch, err := bq.deriveNextBatch(ctx)
if err == io.EOF {
// very noisy, commented for now, or we should bump log level from trace to debug
// bq.log.Trace("need more L1 data before deriving next batch", "progress", bq.progress.Origin)
return io.EOF

// Finally attempt to derive more batches
batch, err := bq.deriveNextBatch(ctx, outOfData, safeL2Head)
if err == io.EOF && outOfData {
return nil, io.EOF
} else if err == io.EOF {
return nil, NotEnoughData
} else if err != nil {
return err
return nil, err
}
bq.next.AddBatch(batch)
return nil
return batch, nil
}

func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef) error {
// Copy over the Origin from the next stage
// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
bq.progress = bq.next.Progress()
bq.origin = base
bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock)
// Include the new origin as an origin to build on
// Note: This is only for the initialization case. During normal resets we will later
// throw out this block.
bq.l1Blocks = bq.l1Blocks[:0]
bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin)
bq.l1Blocks = append(bq.l1Blocks, base)
return io.EOF
}

func (bq *BatchQueue) AddBatch(batch *BatchData) {
if bq.progress.Closed {
panic("write batch while closed")
}
func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) {
if len(bq.l1Blocks) == 0 {
panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp))
}
data := BatchWithL1InclusionBlock{
L1InclusionBlock: bq.progress.Origin,
L1InclusionBlock: bq.origin,
Batch: batch,
}
validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, bq.next.SafeL2Head(), &data)
validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data)
if validity == BatchDrop {
return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
}
Expand All @@ -113,12 +136,11 @@ func (bq *BatchQueue) AddBatch(batch *BatchData) {
// following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned.
func (bq *BatchQueue) deriveNextBatch(ctx context.Context) (*BatchData, error) {
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) {
if len(bq.l1Blocks) == 0 {
return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
}
epoch := bq.l1Blocks[0]
l2SafeHead := bq.next.SafeL2Head()

if l2SafeHead.L1Origin != epoch.ID() {
return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head %s", epoch, l2SafeHead))
Expand Down Expand Up @@ -183,8 +205,8 @@ batchLoop:
// i.e. if the sequence window expired, we create empty batches
expiryEpoch := epoch.Number + bq.config.SeqWindowSize
forceNextEpoch :=
(expiryEpoch == bq.progress.Origin.Number && bq.progress.Closed) ||
expiryEpoch < bq.progress.Origin.Number
(expiryEpoch == bq.origin.Number && outOfData) ||
expiryEpoch < bq.origin.Number

if !forceNextEpoch {
// sequence window did not expire yet, still room to receive batches for the current epoch,
Expand Down
63 changes: 48 additions & 15 deletions op-node/rollup/derive/calldata_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package derive

import (
"context"
"fmt"
"io"

"github.com/ethereum-optimism/optimism/op-node/eth"
Expand All @@ -15,21 +14,60 @@ import (
// CalldataSource readers raw transactions from a given block & then filters for
// batch submitter transactions.
// This is not a stage in the pipeline, but a wrapper for another stage in the pipeline
//

type L1TransactionFetcher interface {
InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error)
}

type DataSlice []eth.Data
// CalldataSourceImpl is a fault tolerant approach to fetching data.
// The constructor will never fail & it will instead re-attempt the fetcher
// at a later point.
// This API greatly simplifies some calling code.
type CalldataSourceImpl struct {
// Internal state + data
open bool
data []eth.Data
// Required to re-attempt fetching
id eth.BlockID
cfg *rollup.Config // TODO: `DataFromEVMTransactions` should probably not take the full config
fetcher L1TransactionFetcher
log log.Logger
}

func (ds *DataSlice) Next(ctx context.Context) (eth.Data, error) {
if len(*ds) == 0 {
func NewCalldataSourceImpl(ctx context.Context, log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher, block eth.BlockID) *CalldataSourceImpl {
_, txs, err := fetcher.InfoAndTxsByHash(ctx, block.Hash)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this get stuck in an infinite loop if the block hash doesn't exist? Perhaps want to check for ethereum.NotFound

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this can. This PR serves as an example of the structure rather than being ready to go, but this specific code will need to be beefed up before being merge.

Comment on lines +38 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a difference between temporary RPC errors and persistent not-found type of errors. Creating a calldata source with no data (if it doesn't error, but just returns nothing?) may result in accidental missed data, which could become a divergence. IMHO leaving the calldata as-is, and putting it in a separate PR or reducing changes is nice, since it's not quite the same as a derivation stage.

return &CalldataSourceImpl{
open: false,
id: block,
cfg: cfg,
fetcher: fetcher,
log: log,
}
} else {
return &CalldataSourceImpl{
open: true,
data: DataFromEVMTransactions(cfg, txs, log.New("origin", block)),
}
}
}

func (cs *CalldataSourceImpl) Next(ctx context.Context) (eth.Data, error) {
if !cs.open {
if _, txs, err := cs.fetcher.InfoAndTxsByHash(ctx, cs.id.Hash); err == nil {
cs.open = true
cs.data = DataFromEVMTransactions(cs.cfg, txs, log.New("origin", cs.id))
} else {
return nil, err
}
}
if len(cs.data) == 0 {
return nil, io.EOF
} else {
data := cs.data[0]
cs.data = cs.data[1:]
return data, nil
}
out := (*ds)[0]
*ds = (*ds)[1:]
return out, nil
}

type CalldataSource struct {
Expand All @@ -42,13 +80,8 @@ func NewCalldataSource(log log.Logger, cfg *rollup.Config, fetcher L1Transaction
return &CalldataSource{log: log, cfg: cfg, fetcher: fetcher}
}

func (cs *CalldataSource) OpenData(ctx context.Context, id eth.BlockID) (DataIter, error) {
_, txs, err := cs.fetcher.InfoAndTxsByHash(ctx, id.Hash)
if err != nil {
return nil, fmt.Errorf("failed to fetch transactions: %w", err)
}
data := DataFromEVMTransactions(cs.cfg, txs, cs.log.New("origin", id))
return (*DataSlice)(&data), nil
func (cs *CalldataSource) OpenData(ctx context.Context, id eth.BlockID) *CalldataSourceImpl {
return NewCalldataSourceImpl(ctx, cs.log, cs.cfg, cs.fetcher, id)
}

func DataFromEVMTransactions(config *rollup.Config, txs types.Transactions, log log.Logger) []eth.Data {
Expand Down
Loading