diff --git a/op-node/rollup/derive/attributes_queue.go b/op-node/rollup/derive/attributes_queue.go index cec93477a3205..a6f8e21ca15af 100644 --- a/op-node/rollup/derive/attributes_queue.go +++ b/op-node/rollup/derive/attributes_queue.go @@ -22,58 +22,46 @@ import ( // This stage can be reset by clearing it's batch buffer. // This stage does not need to retain any references to L1 blocks. -type AttributesQueueOutput interface { - AddSafeAttributes(attributes *eth.PayloadAttributes) - SafeL2Head() eth.L2BlockRef - StageProgress -} - type AttributesQueue struct { - log log.Logger - config *rollup.Config - dl L1ReceiptsFetcher - next AttributesQueueOutput - progress Progress - batches []*BatchData + log log.Logger + config *rollup.Config + dl L1ReceiptsFetcher + prev *BatchQueue + batch *BatchData } -func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, next AttributesQueueOutput) *AttributesQueue { +func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, prev *BatchQueue) *AttributesQueue { return &AttributesQueue{ log: log, config: cfg, dl: l1Fetcher, - next: next, + prev: prev, } } -func (aq *AttributesQueue) AddBatch(batch *BatchData) { - aq.log.Debug("Received next batch", "batch_epoch", batch.EpochNum, "batch_timestamp", batch.Timestamp, "tx_count", len(batch.Transactions)) - aq.batches = append(aq.batches, batch) -} - -func (aq *AttributesQueue) Progress() Progress { - return aq.progress +func (aq *AttributesQueue) Origin() eth.L1BlockRef { + return aq.prev.Origin() } -func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error { - if changed, err := aq.progress.Update(outer); err != nil || changed { - return err +func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) { + if aq.batch == nil { + batch, err := aq.prev.NextBatch(ctx, l2SafeHead) + if err != nil { + return nil, err + } + aq.batch = batch } - if len(aq.batches) == 0 { - return io.EOF - } - batch := aq.batches[0] + batch := aq.batch - safeL2Head := aq.next.SafeL2Head() // sanity check parent hash - if batch.ParentHash != safeL2Head.Hash { - return NewCriticalError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, safeL2Head.Hash)) + if batch.ParentHash != l2SafeHead.Hash { + return nil, NewCriticalError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash)) } fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second) defer cancel() - attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, safeL2Head, batch.Timestamp, batch.Epoch()) + attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, l2SafeHead, batch.Timestamp, batch.Epoch()) if err != nil { - return err + return nil, err } // we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool @@ -83,19 +71,12 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error { aq.log.Info("generated attributes in payload queue", "txs", len(attrs.Transactions), "timestamp", batch.Timestamp) - // Slice off the batch once we are guaranteed to succeed - aq.batches = aq.batches[1:] + // Clear out the local state once we will succeed + aq.batch = nil - aq.next.AddSafeAttributes(attrs) - return nil + return attrs, nil } -func (aq *AttributesQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - aq.batches = aq.batches[:0] - aq.progress = aq.next.Progress() +func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error { return io.EOF } - -func (aq *AttributesQueue) SafeL2Head() eth.L2BlockRef { - return aq.next.SafeL2Head() -} diff --git a/op-node/rollup/derive/batch_queue.go b/op-node/rollup/derive/batch_queue.go index ed49f5dcfdbf7..0cdff36c23e50 100644 --- a/op-node/rollup/derive/batch_queue.go +++ b/op-node/rollup/derive/batch_queue.go @@ -26,19 +26,13 @@ import ( // It is internally responsible for making sure that batches with L1 inclusions block outside it's // working range are not considered or pruned. -type BatchQueueOutput interface { - StageProgress - AddBatch(batch *BatchData) - SafeL2Head() eth.L2BlockRef -} - // BatchQueue contains a set of batches for every L1 block. // L1 blocks are contiguous and this does not support reorgs. type BatchQueue struct { - log log.Logger - config *rollup.Config - next BatchQueueOutput - progress Progress + log log.Logger + config *rollup.Config + prev *ChannelInReader + origin eth.L1BlockRef l1Blocks []eth.L1BlockRef @@ -47,62 +41,91 @@ type BatchQueue struct { } // NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use. -func NewBatchQueue(log log.Logger, cfg *rollup.Config, next BatchQueueOutput) *BatchQueue { +func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev *ChannelInReader) *BatchQueue { return &BatchQueue{ log: log, config: cfg, - next: next, + prev: prev, } } -func (bq *BatchQueue) Progress() Progress { - return bq.progress +func (bq *BatchQueue) Origin() eth.L1BlockRef { + return bq.prev.Origin() } -func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error { - if changed, err := bq.progress.Update(outer); err != nil { - return err - } else if changed { - if !bq.progress.Closed { // init inputs if we moved to a new open origin - bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin) +func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) { + originBehind := bq.origin.Number < safeL2Head.L1Origin.Number + + // Advance origin if needed + // Note: The entire pipeline has the same origin + // We just don't accept batches prior to the L1 origin of the L2 safe head + if bq.origin != bq.prev.Origin() { + bq.origin = bq.prev.Origin() + if !originBehind { + bq.l1Blocks = append(bq.l1Blocks, bq.origin) + } else { + // This is to handle the special case of startup. At startup we call Reset & include + // the L1 origin. That is the only time where immediately after `Reset` is called + // originBehind is false. + bq.l1Blocks = bq.l1Blocks[:0] + } + bq.log.Info("Advancing bq origin", "origin", bq.origin) + } + + // Load more data into the batch queue + outOfData := false + if batch, err := bq.prev.NextBatch(ctx); err == io.EOF { + outOfData = true + } else if err != nil { + return nil, err + } else if !originBehind { + bq.AddBatch(batch, safeL2Head) + } + + // Skip adding data unless we are up to date with the origin, but do fully + // empty the previous stages + if originBehind { + if outOfData { + return nil, io.EOF + } else { + return nil, NotEnoughData } - return nil } - batch, err := bq.deriveNextBatch(ctx) - if err == io.EOF { - // very noisy, commented for now, or we should bump log level from trace to debug - // bq.log.Trace("need more L1 data before deriving next batch", "progress", bq.progress.Origin) - return io.EOF + + // Finally attempt to derive more batches + batch, err := bq.deriveNextBatch(ctx, outOfData, safeL2Head) + if err == io.EOF && outOfData { + return nil, io.EOF + } else if err == io.EOF { + return nil, NotEnoughData } else if err != nil { - return err + return nil, err } - bq.next.AddBatch(batch) - return nil + return batch, nil } -func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { +func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef) error { // Copy over the Origin from the next stage // It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress - bq.progress = bq.next.Progress() + bq.origin = base bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock) // Include the new origin as an origin to build on + // Note: This is only for the initialization case. During normal resets we will later + // throw out this block. bq.l1Blocks = bq.l1Blocks[:0] - bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin) + bq.l1Blocks = append(bq.l1Blocks, base) return io.EOF } -func (bq *BatchQueue) AddBatch(batch *BatchData) { - if bq.progress.Closed { - panic("write batch while closed") - } +func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) { if len(bq.l1Blocks) == 0 { panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp)) } data := BatchWithL1InclusionBlock{ - L1InclusionBlock: bq.progress.Origin, + L1InclusionBlock: bq.origin, Batch: batch, } - validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, bq.next.SafeL2Head(), &data) + validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data) if validity == BatchDrop { return // if we do drop the batch, CheckBatch will log the drop reason with WARN level. } @@ -113,12 +136,11 @@ func (bq *BatchQueue) AddBatch(batch *BatchData) { // following the validity rules imposed on consecutive batches, // based on currently available buffered batch and L1 origin information. // If no batch can be derived yet, then (nil, io.EOF) is returned. -func (bq *BatchQueue) deriveNextBatch(ctx context.Context) (*BatchData, error) { +func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) { if len(bq.l1Blocks) == 0 { return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared")) } epoch := bq.l1Blocks[0] - l2SafeHead := bq.next.SafeL2Head() if l2SafeHead.L1Origin != epoch.ID() { return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head %s", epoch, l2SafeHead)) @@ -183,8 +205,8 @@ batchLoop: // i.e. if the sequence window expired, we create empty batches expiryEpoch := epoch.Number + bq.config.SeqWindowSize forceNextEpoch := - (expiryEpoch == bq.progress.Origin.Number && bq.progress.Closed) || - expiryEpoch < bq.progress.Origin.Number + (expiryEpoch == bq.origin.Number && outOfData) || + expiryEpoch < bq.origin.Number if !forceNextEpoch { // sequence window did not expire yet, still room to receive batches for the current epoch, diff --git a/op-node/rollup/derive/calldata_source.go b/op-node/rollup/derive/calldata_source.go index 00a9f89dca36d..6fb6782ae9eeb 100644 --- a/op-node/rollup/derive/calldata_source.go +++ b/op-node/rollup/derive/calldata_source.go @@ -2,7 +2,6 @@ package derive import ( "context" - "fmt" "io" "github.com/ethereum-optimism/optimism/op-node/eth" @@ -15,21 +14,60 @@ import ( // CalldataSource readers raw transactions from a given block & then filters for // batch submitter transactions. // This is not a stage in the pipeline, but a wrapper for another stage in the pipeline -// type L1TransactionFetcher interface { InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) } -type DataSlice []eth.Data +// CalldataSourceImpl is a fault tolerant approach to fetching data. +// The constructor will never fail & it will instead re-attempt the fetcher +// at a later point. +// This API greatly simplifies some calling code. +type CalldataSourceImpl struct { + // Internal state + data + open bool + data []eth.Data + // Required to re-attempt fetching + id eth.BlockID + cfg *rollup.Config // TODO: `DataFromEVMTransactions` should probably not take the full config + fetcher L1TransactionFetcher + log log.Logger +} -func (ds *DataSlice) Next(ctx context.Context) (eth.Data, error) { - if len(*ds) == 0 { +func NewCalldataSourceImpl(ctx context.Context, log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher, block eth.BlockID) *CalldataSourceImpl { + _, txs, err := fetcher.InfoAndTxsByHash(ctx, block.Hash) + if err != nil { + return &CalldataSourceImpl{ + open: false, + id: block, + cfg: cfg, + fetcher: fetcher, + log: log, + } + } else { + return &CalldataSourceImpl{ + open: true, + data: DataFromEVMTransactions(cfg, txs, log.New("origin", block)), + } + } +} + +func (cs *CalldataSourceImpl) Next(ctx context.Context) (eth.Data, error) { + if !cs.open { + if _, txs, err := cs.fetcher.InfoAndTxsByHash(ctx, cs.id.Hash); err == nil { + cs.open = true + cs.data = DataFromEVMTransactions(cs.cfg, txs, log.New("origin", cs.id)) + } else { + return nil, err + } + } + if len(cs.data) == 0 { return nil, io.EOF + } else { + data := cs.data[0] + cs.data = cs.data[1:] + return data, nil } - out := (*ds)[0] - *ds = (*ds)[1:] - return out, nil } type CalldataSource struct { @@ -42,13 +80,8 @@ func NewCalldataSource(log log.Logger, cfg *rollup.Config, fetcher L1Transaction return &CalldataSource{log: log, cfg: cfg, fetcher: fetcher} } -func (cs *CalldataSource) OpenData(ctx context.Context, id eth.BlockID) (DataIter, error) { - _, txs, err := cs.fetcher.InfoAndTxsByHash(ctx, id.Hash) - if err != nil { - return nil, fmt.Errorf("failed to fetch transactions: %w", err) - } - data := DataFromEVMTransactions(cs.cfg, txs, cs.log.New("origin", id)) - return (*DataSlice)(&data), nil +func (cs *CalldataSource) OpenData(ctx context.Context, id eth.BlockID) *CalldataSourceImpl { + return NewCalldataSourceImpl(ctx, cs.log, cs.cfg, cs.fetcher, id) } func DataFromEVMTransactions(config *rollup.Config, txs types.Transactions, log log.Logger) []eth.Data { diff --git a/op-node/rollup/derive/channel_bank.go b/op-node/rollup/derive/channel_bank.go index 00d603469b17a..c7f1aac5204d8 100644 --- a/op-node/rollup/derive/channel_bank.go +++ b/op-node/rollup/derive/channel_bank.go @@ -2,7 +2,6 @@ package derive import ( "context" - "fmt" "io" "github.com/ethereum-optimism/optimism/op-node/eth" @@ -22,11 +21,6 @@ import ( // Specifically, the channel bank is not allowed to become too large between successive calls // to `IngestData`. This means that we can do an ingest and then do a read while becoming too large. -type ChannelBankOutput interface { - StageProgress - WriteChannel(data []byte) -} - // ChannelBank buffers channel frames, and emits full channel data type ChannelBank struct { log log.Logger @@ -35,26 +29,26 @@ type ChannelBank struct { channels map[ChannelID]*Channel // channels by ID channelQueue []ChannelID // channels in FIFO order - progress Progress - - next ChannelBankOutput + prev *L1Retrieval + fetcher L1Fetcher } -var _ Stage = (*ChannelBank)(nil) +var _ ResetableStage = (*ChannelBank)(nil) // NewChannelBank creates a ChannelBank, which should be Reset(origin) before use. -func NewChannelBank(log log.Logger, cfg *rollup.Config, next ChannelBankOutput) *ChannelBank { +func NewChannelBank(log log.Logger, cfg *rollup.Config, prev *L1Retrieval, fetcher L1Fetcher) *ChannelBank { return &ChannelBank{ log: log, cfg: cfg, channels: make(map[ChannelID]*Channel), channelQueue: make([]ChannelID, 0, 10), - next: next, + prev: prev, + fetcher: fetcher, } } -func (ib *ChannelBank) Progress() Progress { - return ib.progress +func (ib *ChannelBank) Origin() eth.L1BlockRef { + return ib.prev.Origin() } func (ib *ChannelBank) prune() { @@ -76,10 +70,8 @@ func (ib *ChannelBank) prune() { // IngestData adds new L1 data to the channel bank. // Read() should be called repeatedly first, until everything has been read, before adding new data.\ func (ib *ChannelBank) IngestData(data []byte) { - if ib.progress.Closed { - panic("write data to bank while closed") - } - ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data)) + origin := ib.Origin() + ib.log.Debug("channel bank got new data", "origin", origin, "data_len", len(data)) // TODO: Why is the prune here? ib.prune() @@ -95,19 +87,19 @@ func (ib *ChannelBank) IngestData(data []byte) { currentCh, ok := ib.channels[f.ID] if !ok { // create new channel if it doesn't exist yet - currentCh = NewChannel(f.ID, ib.progress.Origin) + currentCh = NewChannel(f.ID, origin) ib.channels[f.ID] = currentCh ib.channelQueue = append(ib.channelQueue, f.ID) } // check if the channel is not timed out - if currentCh.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.progress.Origin.Number { + if currentCh.OpenBlockNumber()+ib.cfg.ChannelTimeout < origin.Number { ib.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber) continue } ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data)) - if err := currentCh.AddFrame(f, ib.progress.Origin); err != nil { + if err := currentCh.AddFrame(f, origin); err != nil { ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err) continue } @@ -122,7 +114,7 @@ func (ib *ChannelBank) Read() (data []byte, err error) { } first := ib.channelQueue[0] ch := ib.channels[first] - timedOut := ch.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.progress.Origin.Number + timedOut := ch.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.Origin().Number if timedOut { ib.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs)) delete(ib.channels, first) @@ -141,47 +133,37 @@ func (ib *ChannelBank) Read() (data []byte, err error) { return data, nil } -func (ib *ChannelBank) Step(ctx context.Context, outer Progress) error { - if changed, err := ib.progress.Update(outer); err != nil || changed { - return err - } +// NextData pulls the next piece of data from the channel bank. +// Note that it attempts to pull data out of the channel bank prior to +// loading data in (unlike most other stages). This is to ensure maintain +// consistency around channel bank pruning which depends upon the order +// of operations. +func (ib *ChannelBank) NextData(ctx context.Context) ([]byte, error) { - // If the bank is behind the channel reader, then we are replaying old data to prepare the bank. - // Read if we can, and drop if it gives anything - if ib.next.Progress().Origin.Number > ib.progress.Origin.Number { - _, err := ib.Read() - return err + // Do the read from the channel bank first + data, err := ib.Read() + if err == io.EOF { + // continue - We will attempt to load data into the channel bank + } else if err != nil { + return nil, err + } else { + return data, nil } - // otherwise, read the next channel data from the bank - data, err := ib.Read() - if err == io.EOF { // need new L1 data in the bank before we can read more channel data - return io.EOF + // Then load data into the channel bank + if data, err := ib.prev.NextData(ctx); err == io.EOF { + return nil, io.EOF } else if err != nil { - return err + return nil, err + } else { + ib.IngestData(data) + return nil, NotEnoughData } - ib.next.WriteChannel(data) - return nil } -// ResetStep walks back the L1 chain, starting at the origin of the next stage, -// to find the origin that the channel bank should be reset to, -// to get consistent reads starting at origin. -// Any channel data before this origin will be timed out by the time the channel bank is synced up to the origin, -// so it is not relevant to replay it into the bank. -func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - ib.progress = ib.next.Progress() - ib.log.Debug("walking back to find reset origin for channel bank", "origin", ib.progress.Origin) - // go back in history if we are not distant enough from the next stage - resetBlock := ib.progress.Origin.Number - ib.cfg.ChannelTimeout - if ib.progress.Origin.Number < ib.cfg.ChannelTimeout { - resetBlock = 0 // don't underflow - } - parent, err := l1Fetcher.L1BlockRefByNumber(ctx, resetBlock) - if err != nil { - return NewTemporaryError(fmt.Errorf("failed to find channel bank block, failed to retrieve L1 reference: %w", err)) - } - ib.progress.Origin = parent +func (ib *ChannelBank) Reset(ctx context.Context, base eth.L1BlockRef) error { + ib.channels = make(map[ChannelID]*Channel) + ib.channelQueue = make([]ChannelID, 0, 10) return io.EOF } diff --git a/op-node/rollup/derive/channel_in_reader.go b/op-node/rollup/derive/channel_in_reader.go index da96058f0b55d..07156b9997a97 100644 --- a/op-node/rollup/derive/channel_in_reader.go +++ b/op-node/rollup/derive/channel_in_reader.go @@ -5,6 +5,7 @@ import ( "context" "io" + "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum/go-ethereum/log" ) @@ -13,41 +14,36 @@ import ( // This is a pure function from the channel, but each channel (or channel fragment) // must be tagged with an L1 inclusion block to be passed to the the batch queue. -type BatchQueueStage interface { - StageProgress - AddBatch(batch *BatchData) -} - type ChannelInReader struct { log log.Logger nextBatchFn func() (BatchWithL1InclusionBlock, error) - progress Progress - - next BatchQueueStage + prev *ChannelBank } -var _ ChannelBankOutput = (*ChannelInReader)(nil) +var _ ResetableStage = (*ChannelInReader)(nil) // NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use. -func NewChannelInReader(log log.Logger, next BatchQueueStage) *ChannelInReader { - return &ChannelInReader{log: log, next: next} +func NewChannelInReader(log log.Logger, prev *ChannelBank) *ChannelInReader { + return &ChannelInReader{ + log: log, + prev: prev, + } } -func (cr *ChannelInReader) Progress() Progress { - return cr.progress +func (cr *ChannelInReader) Origin() eth.L1BlockRef { + return cr.prev.Origin() } // TODO: Take full channel for better logging -func (cr *ChannelInReader) WriteChannel(data []byte) { - if cr.progress.Closed { - panic("write channel while closed") - } - if f, err := BatchReader(bytes.NewBuffer(data), cr.progress.Origin); err == nil { +func (cr *ChannelInReader) WriteChannel(data []byte) error { + if f, err := BatchReader(bytes.NewBuffer(data), cr.Origin()); err == nil { cr.nextBatchFn = f + return nil } else { cr.log.Error("Error creating batch reader from channel data", "err", err) + return err } } @@ -57,32 +53,37 @@ func (cr *ChannelInReader) NextChannel() { cr.nextBatchFn = nil } -func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error { - if changed, err := cr.progress.Update(outer); err != nil || changed { - return err - } - +// NextBatch pulls out the next batch from the channel if it has it. +// It returns io.EOF when it cannot make any more progress. +// It will return a temporary error if it needs to be called again to advance some internal state. +func (cr *ChannelInReader) NextBatch(ctx context.Context) (*BatchData, error) { if cr.nextBatchFn == nil { - return io.EOF + if data, err := cr.prev.NextData(ctx); err == io.EOF { + return nil, io.EOF + } else if err != nil { + return nil, err + } else { + if err := cr.WriteChannel(data); err != nil { + return nil, NewTemporaryError(err) + } + } } // TODO: can batch be non nil while err == io.EOF // This depends on the behavior of rlp.Stream batch, err := cr.nextBatchFn() - if err == io.EOF { - return io.EOF + cr.NextChannel() + return nil, NotEnoughData } else if err != nil { cr.log.Warn("failed to read batch from channel reader, skipping to next channel now", "err", err) cr.NextChannel() - return nil + return nil, NotEnoughData } - cr.next.AddBatch(batch.Batch) - return nil + return batch.Batch, nil } -func (cr *ChannelInReader) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { +func (cr *ChannelInReader) Reset(ctx context.Context, _ eth.L1BlockRef) error { cr.nextBatchFn = nil - cr.progress = cr.next.Progress() return io.EOF } diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index dbdffec809cc1..c255f046645f7 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -64,8 +64,6 @@ type EngineQueue struct { finalizedL1 eth.BlockID - progress Progress - safeAttributes []*eth.PayloadAttributes unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps @@ -73,14 +71,16 @@ type EngineQueue struct { finalityData []FinalityData engine Engine + prev *AttributesQueue - metrics Metrics -} + progress Progress // only used for pipeline resets -var _ AttributesQueueOutput = (*EngineQueue)(nil) + metrics Metrics + l1Fetcher L1Fetcher +} // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. -func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue { +func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev *AttributesQueue, l1Fetcher L1Fetcher) *EngineQueue { return &EngineQueue{ log: log, cfg: cfg, @@ -91,6 +91,8 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M MaxSize: maxUnsafePayloadsMemory, SizeFn: payloadMemSize, }, + prev: prev, + l1Fetcher: l1Fetcher, } } @@ -146,17 +148,30 @@ func (eq *EngineQueue) LastL2Time() uint64 { return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp) } -func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error { - if changed, err := eq.progress.Update(outer); err != nil || changed { - return err - } +func (eq *EngineQueue) Step(ctx context.Context) error { if len(eq.safeAttributes) > 0 { return eq.tryNextSafeAttributes(ctx) } + outOfData := false + if len(eq.safeAttributes) == 0 { + if next, err := eq.prev.NextAttributes(ctx, eq.safeHead); err == io.EOF { + outOfData = true + } else if err != nil { + return err + } else { + eq.safeAttributes = append(eq.safeAttributes, next) + return NotEnoughData + } + } if eq.unsafePayloads.Len() > 0 { return eq.tryNextUnsafePayload(ctx) } - return io.EOF + + if outOfData { + return io.EOF + } else { + return nil + } } // tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized, @@ -186,11 +201,11 @@ func (eq *EngineQueue) postProcessSafeL2() { eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...) } // remember the last L2 block that we fully derived from the given finality data - if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.progress.Origin.Number { + if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.prev.Origin().Number { // append entry for new L1 block eq.finalityData = append(eq.finalityData, FinalityData{ L2Block: eq.safeHead, - L1Block: eq.progress.Origin.ID(), + L1Block: eq.prev.Origin().ID(), }) } else { // if it's a now L2 block that was derived from the same latest L1 block, then just update the entry @@ -205,7 +220,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) { "l2_safe", eq.safeHead, "l2_unsafe", eq.unsafeHead, "l2_time", eq.unsafeHead.Time, - "l1_derived", eq.progress.Origin, + "l1_derived", eq.prev.Origin(), ) } @@ -384,13 +399,13 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { // ResetStep Walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical. // The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical. -func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - result, err := sync.FindL2Heads(ctx, eq.cfg, l1Fetcher, eq.engine) +func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error { + result, err := sync.FindL2Heads(ctx, eq.cfg, eq.l1Fetcher, eq.engine) if err != nil { return NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err)) } finalized, safe, unsafe := result.Finalized, result.Safe, result.Unsafe - l1Origin, err := l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash) + l1Origin, err := eq.l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash) if err != nil { return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", safe.L1Origin, err)) } @@ -398,6 +413,15 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error return NewResetError(fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken", safe, safe.Time, l1Origin, l1Origin.Time)) } + + pipelineNumber := l1Origin.Number - eq.cfg.ChannelTimeout + if l1Origin.Number < eq.cfg.ChannelTimeout { + pipelineNumber = 0 + } + pipelineOrigin, err := eq.l1Fetcher.L1BlockRefByNumber(ctx, pipelineNumber) + if err != nil { + return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", pipelineNumber, err)) + } eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin) eq.unsafeHead = unsafe eq.safeHead = safe @@ -405,8 +429,7 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error eq.finalityData = eq.finalityData[:0] // note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads. eq.progress = Progress{ - Origin: l1Origin, - Closed: false, + Origin: pipelineOrigin, } eq.metrics.RecordL2Ref("l2_finalized", finalized) eq.metrics.RecordL2Ref("l2_safe", safe) diff --git a/op-node/rollup/derive/error.go b/op-node/rollup/derive/error.go index 893e0fad1b8a7..ef896d2fa677e 100644 --- a/op-node/rollup/derive/error.go +++ b/op-node/rollup/derive/error.go @@ -1,6 +1,7 @@ package derive import ( + "errors" "fmt" ) @@ -91,3 +92,7 @@ func NewCriticalError(err error) error { var ErrTemporary = NewTemporaryError(nil) var ErrReset = NewResetError(nil) var ErrCritical = NewCriticalError(nil) + +// NotEnoughData implies that the function currently does not have enough data to progress +// but if it is retried enough times, it will eventually return a real value or io.EOF +var NotEnoughData = errors.New("not enough data") diff --git a/op-node/rollup/derive/l1_retrieval.go b/op-node/rollup/derive/l1_retrieval.go index 9b888b084bbdb..5b1a741fb2a12 100644 --- a/op-node/rollup/derive/l1_retrieval.go +++ b/op-node/rollup/derive/l1_retrieval.go @@ -9,98 +9,59 @@ import ( "github.com/ethereum/go-ethereum/log" ) -// This is a generic wrapper around fetching all transactions in a block & then -// it feeds one L1 transaction at a time to the next stage - -// DataIter is a minimal iteration interface to fetch rollup input data from an arbitrary data-availability source -type DataIter interface { - // Next can be repeatedly called for more data, until it returns an io.EOF error. - // It never returns io.EOF and data at the same time. - Next(ctx context.Context) (eth.Data, error) -} - -// DataAvailabilitySource provides rollup input data -type DataAvailabilitySource interface { - // OpenData does any initial data-fetching work and returns an iterator to fetch data with. - OpenData(ctx context.Context, id eth.BlockID) (DataIter, error) -} - -type L1SourceOutput interface { - StageProgress - IngestData(data []byte) -} - type L1Retrieval struct { log log.Logger - dataSrc DataAvailabilitySource - next L1SourceOutput - - progress Progress + dataSrc *CalldataSource + prev *L1Traversal - data eth.Data - datas DataIter + datas *CalldataSourceImpl } -var _ Stage = (*L1Retrieval)(nil) +var _ ResetableStage = (*L1Retrieval)(nil) -func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, next L1SourceOutput) *L1Retrieval { +func NewL1Retrieval(log log.Logger, dataSrc *CalldataSource, prev *L1Traversal) *L1Retrieval { return &L1Retrieval{ log: log, dataSrc: dataSrc, - next: next, + prev: prev, } } -func (l1r *L1Retrieval) Progress() Progress { - return l1r.progress +func (l1r *L1Retrieval) Origin() eth.L1BlockRef { + return l1r.prev.Origin() } -func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error { - if changed, err := l1r.progress.Update(outer); err != nil || changed { - return err - } - - // specific to L1 source: if the L1 origin is closed, there is no more data to retrieve. - if l1r.progress.Closed { - return io.EOF - } - - // create a source if we have none +// NextData does an action in the L1 Retrieval stage +// If there is data, it pushes it to the next stage. +// If there is no more data open ourselves if we are closed or close ourselves if we are open +func (l1r *L1Retrieval) NextData(ctx context.Context) ([]byte, error) { if l1r.datas == nil { - datas, err := l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID()) - if err != nil { - return NewTemporaryError(fmt.Errorf("can't fetch L1 data: %v: %w", l1r.progress.Origin, err)) - } - l1r.datas = datas - return nil - } - - // buffer data if we have none - if l1r.data == nil { - l1r.log.Debug("fetching next piece of data") - data, err := l1r.datas.Next(ctx) + next, err := l1r.prev.NextL1Block(ctx) if err == io.EOF { - l1r.progress.Closed = true - l1r.datas = nil - return io.EOF + return nil, io.EOF } else if err != nil { - return NewTemporaryError(fmt.Errorf("context to retrieve next L1 data failed: %w", err)) - } else { - l1r.data = data - return nil + return nil, err } + l1r.datas = l1r.dataSrc.OpenData(ctx, next.ID()) } - // flush the data to next stage - l1r.next.IngestData(l1r.data) - // and nil the data, the next step will retrieve the next data - l1r.data = nil - return nil + l1r.log.Debug("fetching next piece of data") + data, err := l1r.datas.Next(ctx) + if err == io.EOF { + l1r.datas = nil + return nil, io.EOF + } else if err != nil { + return nil, NewTemporaryError(fmt.Errorf("context to retrieve next L1 data failed: %w", err)) + } else { + return data, nil + } } -func (l1r *L1Retrieval) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - l1r.progress = l1r.next.Progress() - l1r.datas = nil - l1r.data = nil +// ResetStep re-initializes the L1 Retrieval stage to block of it's `next` progress. +// Note that we open up the `l1r.datas` here because it is requires to maintain the +// internal invariants that later propagate up the derivation pipeline. +func (l1r *L1Retrieval) Reset(ctx context.Context, base eth.L1BlockRef) error { + l1r.datas = l1r.dataSrc.OpenData(ctx, base.ID()) + l1r.log.Info("Reset of L1Retrieval done", "origin", base) return io.EOF } diff --git a/op-node/rollup/derive/l1_traversal.go b/op-node/rollup/derive/l1_traversal.go index f58f4de903465..8ad65873e9ee8 100644 --- a/op-node/rollup/derive/l1_traversal.go +++ b/op-node/rollup/derive/l1_traversal.go @@ -11,42 +11,46 @@ import ( "github.com/ethereum/go-ethereum/log" ) +// L1 Traversal fetches the next L1 block and exposes it through the progress API + type L1BlockRefByNumberFetcher interface { L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) } type L1Traversal struct { - log log.Logger + block eth.L1BlockRef + done bool l1Blocks L1BlockRefByNumberFetcher - next StageProgress - progress Progress + log log.Logger } -var _ Stage = (*L1Traversal)(nil) +var _ ResetableStage = (*L1Traversal)(nil) -func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher, next StageProgress) *L1Traversal { +func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher) *L1Traversal { return &L1Traversal{ log: log, l1Blocks: l1Blocks, - next: next, } } -func (l1t *L1Traversal) Progress() Progress { - return l1t.progress +func (l1t *L1Traversal) Origin() eth.L1BlockRef { + return l1t.block } -func (l1t *L1Traversal) Step(ctx context.Context, outer Progress) error { - if !l1t.progress.Closed { // close origin and do another pipeline sweep, before we try to move to the next origin - l1t.progress.Closed = true - return nil +// NextL1Block returns the next block. It does not advance, but it can only be +// called once before returning io.EOF +func (l1t *L1Traversal) NextL1Block(_ context.Context) (eth.L1BlockRef, error) { + if !l1t.done { + l1t.done = true + return l1t.block, nil + } else { + return eth.L1BlockRef{}, io.EOF } +} - // If we reorg to a shorter chain, then we'll only derive new L2 data once the L1 reorg - // becomes longer than the previous L1 chain. - // This is fine, assuming the new L1 chain is live, but we may want to reconsider this. - - origin := l1t.progress.Origin +// AdvanceL1Block advances the internal state of L1 Traversal +func (l1t *L1Traversal) AdvanceL1Block(ctx context.Context) error { + origin := l1t.block nextL1Origin, err := l1t.l1Blocks.L1BlockRefByNumber(ctx, origin.Number+1) if errors.Is(err, ethereum.NotFound) { l1t.log.Debug("can't find next L1 block info (yet)", "number", origin.Number+1, "origin", origin) @@ -54,16 +58,20 @@ func (l1t *L1Traversal) Step(ctx context.Context, outer Progress) error { } else if err != nil { return NewTemporaryError(fmt.Errorf("failed to find L1 block info by number, at origin %s next %d: %w", origin, origin.Number+1, err)) } - if l1t.progress.Origin.Hash != nextL1Origin.ParentHash { - return NewResetError(fmt.Errorf("detected L1 reorg from %s to %s with conflicting parent %s", l1t.progress.Origin, nextL1Origin, nextL1Origin.ParentID())) + if l1t.block.Hash != nextL1Origin.ParentHash { + return NewResetError(fmt.Errorf("detected L1 reorg from %s to %s with conflicting parent %s", l1t.block, nextL1Origin, nextL1Origin.ParentID())) } - l1t.progress.Origin = nextL1Origin - l1t.progress.Closed = false + l1t.block = nextL1Origin + l1t.done = false return nil } -func (l1t *L1Traversal) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - l1t.progress = l1t.next.Progress() - l1t.log.Info("completed reset of derivation pipeline", "origin", l1t.progress.Origin) +// Reset sets the internal L1 block to the supplied base. +// Note that the next call to `NextL1Block` will return the block after `base` +// TODO: Walk one back/figure this out. +func (l1t *L1Traversal) Reset(ctx context.Context, base eth.L1BlockRef) error { + l1t.block = base + l1t.done = false + l1t.log.Info("completed reset of derivation pipeline", "origin", base) return io.EOF } diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 17dd7d8805afa..f91f9cfb3f7c1 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -24,28 +24,9 @@ type L1Fetcher interface { L1TransactionFetcher } -type StageProgress interface { - Progress() Progress -} - -type Stage interface { - StageProgress - - // Step tries to progress the state. - // The outer stage progress informs the step what to do. - // - // If the stage: - // - returns EOF: the stage will be skipped - // - returns another error: the stage will make the pipeline error. - // - returns nil: the stage will be repeated next Step - Step(ctx context.Context, outer Progress) error - - // ResetStep prepares the state for usage in regular steps. - // Similar to Step(ctx) it returns: - // - EOF if the next stage should be reset - // - error if the reset should start all over again - // - nil if the reset should continue resetting this stage. - ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error +type ResetableStage interface { + // Reset resets a pull stage. `base` refers to the L1 Block Reference to reset to. + Reset(ctx context.Context, base eth.L1BlockRef) error } type EngineQueueStage interface { @@ -58,6 +39,7 @@ type EngineQueueStage interface { Finalize(l1Origin eth.BlockID) AddSafeAttributes(attributes *eth.PayloadAttributes) AddUnsafePayload(payload *eth.ExecutionPayload) + Step(context.Context) error } // DerivationPipeline is updated with new L1 data, and the Step() function can be iterated on to keep the L2 Engine in sync. @@ -69,39 +51,44 @@ type DerivationPipeline struct { // Index of the stage that is currently being reset. // >= len(stages) if no additional resetting is required resetting int + stages []ResetableStage - // Index of the stage that is currently being processed. - active int - - // stages in execution order. A stage Step that: - stages []Stage - - eng EngineQueueStage + // Special stages to keep track of + traversal *L1Traversal + eng EngineQueueStage metrics Metrics } // NewDerivationPipeline creates a derivation pipeline, which should be reset before use. func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics) *DerivationPipeline { - eng := NewEngineQueue(log, cfg, engine, metrics) - attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng) - batchQueue := NewBatchQueue(log, cfg, attributesQueue) - chInReader := NewChannelInReader(log, batchQueue) - bank := NewChannelBank(log, cfg, chInReader) - dataSrc := NewCalldataSource(log, cfg, l1Fetcher) - l1Src := NewL1Retrieval(log, dataSrc, bank) - l1Traversal := NewL1Traversal(log, l1Fetcher, l1Src) - stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal} + + // Pull stages + l1Traversal := NewL1Traversal(log, l1Fetcher) + dataSrc := NewCalldataSource(log, cfg, l1Fetcher) // auxiliary stage for L1Retrieval + l1Src := NewL1Retrieval(log, dataSrc, l1Traversal) + bank := NewChannelBank(log, cfg, l1Src, l1Fetcher) + chInReader := NewChannelInReader(log, bank) + batchQueue := NewBatchQueue(log, cfg, chInReader) + attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue) + + // Step stages + eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue, l1Fetcher) + + // Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during + // the reset, but after the engine queue, this is the order in which the stages could talk to each other. + // Note: The engine queue stage is the only reset that can fail. + stages := []ResetableStage{eng, l1Traversal, l1Src, bank, chInReader, batchQueue, attributesQueue} return &DerivationPipeline{ log: log, cfg: cfg, l1Fetcher: l1Fetcher, resetting: 0, - active: 0, stages: stages, eng: eng, metrics: metrics, + traversal: l1Traversal, } } @@ -150,8 +137,8 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error { // if any stages need to be reset, do that first. if dp.resetting < len(dp.stages) { - if err := dp.stages[dp.resetting].ResetStep(ctx, dp.l1Fetcher); err == io.EOF { - dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.stages[dp.resetting].Progress().Origin) + if err := dp.stages[dp.resetting].Reset(ctx, dp.eng.Progress().Origin); err == io.EOF { + dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.eng.Progress().Origin) dp.resetting += 1 return nil } else if err != nil { @@ -161,18 +148,13 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error { } } - for i, stage := range dp.stages { - var outer Progress - if i+1 < len(dp.stages) { - outer = dp.stages[i+1].Progress() - } - if err := stage.Step(ctx, outer); err == io.EOF { - continue - } else if err != nil { - return fmt.Errorf("stage %d failed: %w", i, err) - } else { - return nil - } + // Now step the engine queue. It will pull earlier data as needed. + if err := dp.eng.Step(ctx); err == io.EOF { + // If every stage has returned io.EOF, try to advance the L1 Origin + return dp.traversal.AdvanceL1Block(ctx) + } else if err != nil { + return fmt.Errorf("engine stage failed: %w", err) + } else { + return nil } - return io.EOF } diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index cd2bdd2fe5abf..3538cb6d8344f 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -440,6 +440,10 @@ func (s *state) eventLoop() { } else if err != nil && errors.Is(err, derive.ErrCritical) { s.log.Error("Derivation process critical error", "err", err) return + } else if err != nil && errors.Is(err, derive.NotEnoughData) { + stepAttempts = 0 // don't do a backoff for this error + reqStep() + continue } else if err != nil { s.log.Error("Derivation process error", "attempts", stepAttempts, "err", err) reqStep()