diff --git a/op-node/rollup/derive/attributes.go b/op-node/rollup/derive/attributes.go index c7d326db88dc3..14d58b2deef90 100644 --- a/op-node/rollup/derive/attributes.go +++ b/op-node/rollup/derive/attributes.go @@ -33,42 +33,28 @@ func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1Rece if l2Parent.L1Origin.Number != epoch.Number { info, _, receipts, err := dl.Fetch(ctx, epoch.Hash) if err != nil { - return nil, NewTemporaryError( - err, - "failed to fetch L1 block info and receipts", - ) + return nil, NewTemporaryError(fmt.Errorf("failed to fetch L1 block info and receipts: %w", err)) } if l2Parent.L1Origin.Hash != info.ParentHash() { return nil, NewResetError( - nil, - fmt.Sprintf("cannot create new block with L1 origin %s (parent %s) on top of L1 origin %s", - epoch, info.ParentHash(), l2Parent.L1Origin), - ) + fmt.Errorf("cannot create new block with L1 origin %s (parent %s) on top of L1 origin %s", + epoch, info.ParentHash(), l2Parent.L1Origin)) } deposits, err := DeriveDeposits(receipts, cfg.DepositContractAddress) if err != nil { - return nil, NewResetError( - err, - "failed to derive some deposits", - ) + // deposits may never be ignored. Failing to process them is a critical error. + return nil, NewCriticalError(fmt.Errorf("failed to derive some deposits: %w", err)) } l1Info = info depositTxs = deposits seqNumber = 0 } else { if l2Parent.L1Origin.Hash != epoch.Hash { - return nil, NewResetError( - nil, - fmt.Sprintf("cannot create new block with L1 origin %s in conflict with L1 origin %s", - epoch, l2Parent.L1Origin), - ) + return nil, NewResetError(fmt.Errorf("cannot create new block with L1 origin %s in conflict with L1 origin %s", epoch, l2Parent.L1Origin)) } info, err := dl.InfoByHash(ctx, epoch.Hash) if err != nil { - return nil, NewTemporaryError( - err, - "failed to fetch L1 block info", - ) + return nil, NewTemporaryError(fmt.Errorf("failed to fetch L1 block info: %w", err)) } l1Info = info depositTxs = nil @@ -77,10 +63,7 @@ func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1Rece l1InfoTx, err := L1InfoDepositBytes(seqNumber, l1Info) if err != nil { - return nil, NewResetError( - err, - "failed to create l1InfoTx", - ) + return nil, NewCriticalError(fmt.Errorf("failed to create l1InfoTx: %w", err)) } txs := make([]hexutil.Bytes, 0, 1+len(depositTxs)) diff --git a/op-node/rollup/derive/batch_queue.go b/op-node/rollup/derive/batch_queue.go index 0d3e451b20368..1099eb198c83c 100644 --- a/op-node/rollup/derive/batch_queue.go +++ b/op-node/rollup/derive/batch_queue.go @@ -2,7 +2,6 @@ package derive import ( "context" - "errors" "fmt" "io" "sort" @@ -74,9 +73,7 @@ func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error { bq.log.Trace("Out of batches") return io.EOF } else if err != nil { - bq.log.Error("Error deriving batches", "err", err) - // Suppress transient errors for when reporting back to the pipeline - return nil + return err } for _, batch := range batches { @@ -103,14 +100,14 @@ func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error return io.EOF } -func (bq *BatchQueue) AddBatch(batch *BatchData) error { +func (bq *BatchQueue) AddBatch(batch *BatchData) { if bq.progress.Closed { panic("write batch while closed") } - bq.log.Trace("queued batch", "origin", bq.progress.Origin, "tx_count", len(batch.Transactions), "timestamp", batch.Timestamp) if len(bq.l1Blocks) == 0 { - return fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp) + panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp)) } + bq.log.Trace("queuing batch", "origin", bq.progress.Origin, "tx_count", len(batch.Transactions), "timestamp", batch.Timestamp) data := BatchWithL1InclusionBlock{ L1InclusionBlock: bq.progress.Origin, @@ -122,55 +119,45 @@ func (bq *BatchQueue) AddBatch(batch *BatchData) error { for _, b := range batches { if b.Batch.Timestamp == batch.Timestamp && b.Batch.Epoch() == batch.Epoch() { bq.log.Warn("duplicate batch", "epoch", batch.Epoch(), "timestamp", batch.Timestamp, "txs", len(batch.Transactions)) - return nil + return } } } else { bq.log.Debug("First seen batch", "epoch", batch.Epoch(), "timestamp", batch.Timestamp, "txs", len(batch.Transactions)) - } // May have duplicate block numbers or individual fields, but have limited complete duplicates bq.batchesByTimestamp[batch.Timestamp] = append(batches, &data) - return nil } // validExtension determines if a batch follows the previous attributes -func (bq *BatchQueue) validExtension(batch *BatchWithL1InclusionBlock, prevTime, prevEpoch uint64) bool { +func (bq *BatchQueue) validExtension(batch *BatchWithL1InclusionBlock, prevTime, prevEpoch uint64) (valid bool, err error) { if batch.Batch.Timestamp != prevTime+bq.config.BlockTime { bq.log.Debug("Batch does not extend the block time properly", "time", batch.Batch.Timestamp, "prev_time", prevTime) - - return false + return false, nil } if batch.Batch.EpochNum != rollup.Epoch(prevEpoch) && batch.Batch.EpochNum != rollup.Epoch(prevEpoch+1) { bq.log.Debug("Batch does not extend the epoch properly", "epoch", batch.Batch.EpochNum, "prev_epoch", prevEpoch) - - return false + return false, nil } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) l1BlockRef, err := bq.dl.L1BlockRefByNumber(ctx, batch.Batch.Epoch().Number) cancel() if err != nil { - bq.log.Warn("err fetching l1 block", "err", err) - if errors.Is(err, ErrTemporary) { - // Skipping validation in case of temporary RPC error - bq.log.Warn("temporary err - skipping epoch hash validation", "err", err) - return true - } else { - return false - } + return false, err } if l1BlockRef.Hash != batch.Batch.EpochHash { - return false + bq.log.Debug("Batch epoch hash does not match expected L1 block hash", "batch_epoch", batch.Batch.Epoch(), "expected", l1BlockRef.ID()) + return false, nil } // Note: `Batch.EpochNum` is an external input, but it is constrained to be a reasonable size by the // above equality checks. if uint64(batch.Batch.EpochNum)+bq.config.SeqWindowSize < batch.L1InclusionBlock.Number { bq.log.Debug("Batch submitted outside sequence window", "epoch", batch.Batch.EpochNum, "inclusion_block", batch.L1InclusionBlock.Number) - return false + return false, nil } - return true + return true, nil } // deriveBatches pulls a single batch eagerly or a collection of batches if it is the end of @@ -230,15 +217,14 @@ func (bq *BatchQueue) deriveBatches(ctx context.Context, l2SafeHead eth.L2BlockR } else { bq.log.Trace("Trying to eagerly find batch") - var ret []*BatchData next, err := bq.tryPopNextBatch(ctx, l2SafeHead) - if next != nil { + if err != nil { + return nil, err + } else { bq.log.Info("found eager batch", "batch", next.Batch) - ret = append(ret, next.Batch) + return []*BatchData{next.Batch}, nil } - return ret, err } - } // tryPopNextBatch tries to get the next batch from the batch queue using an eager approach. @@ -285,8 +271,7 @@ func (bq *BatchQueue) tryPopNextBatch(ctx context.Context, l2SafeHead eth.L2Bloc // Note: Don't check epoch change here, check it in `validExtension` epoch, err := bq.dl.L1BlockRefByNumber(ctx, uint64(batch.Batch.EpochNum)) if err != nil { - bq.log.Warn("error fetching origin", "err", err) - return nil, err + return nil, NewTemporaryError(fmt.Errorf("error fetching origin: %w", err)) } if err := ValidBatch(batch.Batch, bq.config, epoch.ID(), minL2Time, maxL2Time); err != nil { bq.log.Warn("Invalid batch", "err", err) @@ -294,7 +279,9 @@ func (bq *BatchQueue) tryPopNextBatch(ctx context.Context, l2SafeHead eth.L2Bloc } // We have a valid batch, no make sure that it builds off the previous L2 block - if bq.validExtension(batch, l2SafeHead.Time, l2SafeHead.L1Origin.Number) { + if valid, err := bq.validExtension(batch, l2SafeHead.Time, l2SafeHead.L1Origin.Number); err != nil { + return nil, err + } else if valid { // Advance the epoch if needed if l2SafeHead.L1Origin.Number != uint64(batch.Batch.EpochNum) { bq.l1Blocks = bq.l1Blocks[1:] @@ -302,12 +289,12 @@ func (bq *BatchQueue) tryPopNextBatch(ctx context.Context, l2SafeHead eth.L2Bloc // Don't leak data in the map delete(bq.batchesByTimestamp, batch.Batch.Timestamp) - bq.log.Info("Batch was valid extension") + bq.log.Debug("Batch was valid extension") // We have found the fist valid batch. return batch, nil } else { - bq.log.Info("batch was not valid extension") + bq.log.Warn("batch was not valid extension", "inclusion", batch.L1InclusionBlock, "safe_origin", l2SafeHead.L1Origin, "l2_time", l2SafeHead.Time) } } diff --git a/op-node/rollup/derive/batch_queue_test.go b/op-node/rollup/derive/batch_queue_test.go index 017a6d402a097..fddee441fe873 100644 --- a/op-node/rollup/derive/batch_queue_test.go +++ b/op-node/rollup/derive/batch_queue_test.go @@ -116,8 +116,7 @@ func TestBatchQueueEager(t *testing.T) { // Add batches batches := []*BatchData{b(12, l1[0]), b(14, l1[0])} for _, batch := range batches { - err := bq.AddBatch(batch) - require.Nil(t, err) + bq.AddBatch(batch) } // Step for { @@ -170,8 +169,7 @@ func TestBatchQueueFull(t *testing.T) { // Add batches batches := []*BatchData{b(14, l1[0]), b(16, l1[0]), b(18, l1[1])} for _, batch := range batches { - err := bq.AddBatch(batch) - require.Nil(t, err) + bq.AddBatch(batch) } // Missing first batch err = bq.Step(context.Background(), prevProgress) @@ -205,8 +203,7 @@ func TestBatchQueueFull(t *testing.T) { // Finally add batch firstBatch := b(12, l1[0]) - err = bq.AddBatch(firstBatch) - require.Equal(t, err, nil) + bq.AddBatch(firstBatch) // Close the origin prevProgress.Closed = true @@ -271,8 +268,7 @@ func TestBatchQueueMissing(t *testing.T) { // that batch timestamp 12 & 14 is created & 16 is used. batches := []*BatchData{b(16, l1[0]), b(20, l1[1])} for _, batch := range batches { - err := bq.AddBatch(batch) - require.Nil(t, err) + bq.AddBatch(batch) } // Missing first batch err = bq.Step(context.Background(), prevProgress) diff --git a/op-node/rollup/derive/channel_bank.go b/op-node/rollup/derive/channel_bank.go index b19c4151f542b..fc1c7d85c000f 100644 --- a/op-node/rollup/derive/channel_bank.go +++ b/op-node/rollup/derive/channel_bank.go @@ -66,29 +66,28 @@ func (ib *ChannelBank) prune() { } // IngestData adds new L1 data to the channel bank. -// Read() should be called repeatedly first, until everything has been read, before adding new data. -// Then NextL1(ref) should be called to move forward to the next L1 input -func (ib *ChannelBank) IngestData(data []byte) error { +// Read() should be called repeatedly first, until everything has been read, before adding new data.\ +func (ib *ChannelBank) IngestData(data []byte) { if ib.progress.Closed { panic("write data to bank while closed") } ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data)) if len(data) < 1 { - return NewTemporaryError( - nil, - "data must be at least have a version byte, but got empty string", - ) + ib.log.Warn("data must be at least have a version byte, but got empty string") + return } if data[0] != DerivationVersion0 { - return fmt.Errorf("unrecognized derivation version: %d", data) + ib.log.Warn("unrecognized derivation version", "version", data) + return } buf := bytes.NewBuffer(data[1:]) ib.prune() if buf.Len() < minimumFrameSize { - return fmt.Errorf("data must be at least have one frame") + ib.log.Warn("data must be at least have one frame", "length", buf.Len()) + return } // Iterate over all frames. They may have different channel IDs to indicate that they stream consumer should reset. @@ -96,36 +95,37 @@ func (ib *ChannelBank) IngestData(data []byte) error { // Don't try to unmarshal from an empty buffer. // The if done checks should catch most/all of this case though. if buf.Len() < ChannelIDDataSize+1 { - return nil + return } done := false var f Frame if err := (&f).UnmarshalBinary(buf); err == io.EOF { done = true } else if err != nil { - return fmt.Errorf("failed to unmarshal a frame: %w", err) - + ib.log.Warn("malformed frame: %w", err) + return } - // stop reading and ignore remaining data if we encounter a zeroed ID + // stop reading and ignore remaining data if we encounter a zeroed ID, + // this happens when there is zero padding at the end. if f.ID == (ChannelID{}) { - ib.log.Info("empty channel ID") - return nil + ib.log.Trace("empty channel ID") + return } // check if the channel is not timed out if f.ID.Time+ib.cfg.ChannelTimeout < ib.progress.Origin.Time { - ib.log.Info("channel is timed out, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber) + ib.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber) if done { - return nil + return } continue } // check if the channel is not included too soon (otherwise timeouts wouldn't be effective) if f.ID.Time > ib.progress.Origin.Time { - ib.log.Info("channel claims to be from the future, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber) + ib.log.Warn("channel claims to be from the future, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber) if done { - return nil + return } continue } @@ -137,17 +137,17 @@ func (ib *ChannelBank) IngestData(data []byte) error { ib.channelQueue = append(ib.channelQueue, f.ID) } - ib.log.Debug("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data)) + ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data)) if err := currentCh.IngestData(uint64(f.FrameNumber), f.IsLast, f.Data); err != nil { - ib.log.Debug("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err) + ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err) if done { - return nil + return } continue } if done { - return nil + return } } } @@ -221,10 +221,7 @@ func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error // go back in history if we are not distant enough from the next stage parent, err := l1Fetcher.L1BlockRefByHash(ctx, ib.progress.Origin.ParentHash) if err != nil { - return NewTemporaryError( - err, - fmt.Sprintf("failed to find channel bank block, failed to retrieve L1 reference: %v", err), - ) + return NewTemporaryError(fmt.Errorf("failed to find channel bank block, failed to retrieve L1 reference: %w", err)) } ib.progress.Origin = parent return nil diff --git a/op-node/rollup/derive/channel_bank_test.go b/op-node/rollup/derive/channel_bank_test.go index 3f9836d5295b7..cc32d523fc470 100644 --- a/op-node/rollup/derive/channel_bank_test.go +++ b/op-node/rollup/derive/channel_bank_test.go @@ -118,8 +118,9 @@ func (tf testFrame) ToFrame() Frame { } func (bt *bankTestSetup) ingestData(data []byte) { - require.NoError(bt.t, bt.cb.IngestData(data)) + bt.cb.IngestData(data) } + func (bt *bankTestSetup) ingestFrames(frames ...testFrame) { data := new(bytes.Buffer) data.WriteByte(DerivationVersion0) diff --git a/op-node/rollup/derive/channel_in_reader.go b/op-node/rollup/derive/channel_in_reader.go index 2e13362b62be7..fe53caea828c7 100644 --- a/op-node/rollup/derive/channel_in_reader.go +++ b/op-node/rollup/derive/channel_in_reader.go @@ -19,7 +19,7 @@ type zlibReader interface { type BatchQueueStage interface { StageProgress - AddBatch(batch *BatchData) error + AddBatch(batch *BatchData) } type ChannelInReader struct { @@ -115,7 +115,8 @@ func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error { cr.NextChannel() return nil } - return cr.next.AddBatch(&batch) + cr.next.AddBatch(&batch) + return nil } func (cr *ChannelInReader) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index f18fceed97a4f..0b563a7a1bba9 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -163,33 +163,21 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { } fcRes, err := eq.engine.ForkchoiceUpdate(ctx, &fc, nil) if err != nil { - return NewTemporaryError( - err, - fmt.Sprintf("failed to update forkchoice to prepare for new unsafe payload: %v", err), - ) + return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %v", err)) } if fcRes.PayloadStatus.Status != eth.ExecutionValid { eq.unsafePayloads = eq.unsafePayloads[1:] - return NewTemporaryError( - nil, - fmt.Sprintf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %v", - first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)), - ) + return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %v", + first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus))) } status, err := eq.engine.NewPayload(ctx, first) if err != nil { - return NewTemporaryError( - err, - fmt.Sprintf("failed to update insert payload: %v", err), - ) + return NewTemporaryError(fmt.Errorf("failed to update insert payload: %v", err)) } if status.Status != eth.ExecutionValid { eq.unsafePayloads = eq.unsafePayloads[1:] - return NewTemporaryError( - nil, - fmt.Sprintf("cannot process unsafe payload: new - %v; parent: %v; err: %v", - first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)), - ) + return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %v", + first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus))) } eq.unsafeHead = ref eq.unsafePayloads = eq.unsafePayloads[1:] @@ -220,10 +208,7 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error payload, err := eq.engine.PayloadByNumber(ctx, eq.safeHead.Number+1) if err != nil { - return NewTemporaryError( - err, - fmt.Sprintf("failed to get existing unsafe payload to compare against derived attributes from L1: %v", err), - ) + return NewTemporaryError(fmt.Errorf("failed to get existing unsafe payload to compare against derived attributes from L1: %v", err)) } if err := AttributesMatchBlock(eq.safeAttributes[0], eq.safeHead.Hash, payload); err != nil { eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err) @@ -232,10 +217,7 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error } ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis) if err != nil { - return NewTemporaryError( - err, - fmt.Sprintf("failed to decode L2 block ref from payload: %v", err), - ) + return NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %v", err)) } eq.safeHead = ref // unsafe head stays the same, we did not reorg the chain. @@ -259,10 +241,7 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { payload, rpcErr, payloadErr := InsertHeadBlock(ctx, eq.log, eq.engine, fc, attrs, true) if rpcErr != nil { // RPC errors are recoverable, we can retry the buffered payload attributes later. - return NewTemporaryError( - rpcErr, - fmt.Sprintf("failed to insert new block: %v", rpcErr), - ) + return NewTemporaryError(fmt.Errorf("failed to insert new block: %v", rpcErr)) } if payloadErr != nil { eq.log.Warn("could not process payload derived from L1 data", "err", payloadErr) @@ -279,14 +258,11 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { eq.safeAttributes[0].Transactions = deposits return nil } - return NewCriticalError(payloadErr, "failed to process block with only deposit transactions") + return NewCriticalError(fmt.Errorf("failed to process block with only deposit transactions: %w", payloadErr)) } ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis) if err != nil { - return NewTemporaryError( - err, - fmt.Sprintf("failed to decode L2 block ref from payload: %v", err), - ) + return NewTemporaryError(fmt.Errorf("failed to decode L2 block ref from payload: %v", err)) } eq.safeHead = ref eq.unsafeHead = ref @@ -299,31 +275,21 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { // ResetStep Walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical. // The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical. func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - l2Head, err := eq.engine.L2BlockRefHead(ctx) if err != nil { - return NewTemporaryError( - err, - fmt.Sprintf("failed to find the L2 Head block: %v", err), - ) + return NewTemporaryError(fmt.Errorf("failed to find the L2 Head block: %w", err)) } unsafe, safe, err := sync.FindL2Heads(ctx, l2Head, eq.cfg.SeqWindowSize, l1Fetcher, eq.engine, &eq.cfg.Genesis) if err != nil { - return NewTemporaryError( - err, - fmt.Sprintf("failed to find the L2 Heads to start from: %v", err), - ) + return NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err)) } l1Origin, err := l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash) if err != nil { - return NewTemporaryError( - err, - fmt.Sprintf("failed to fetch the new L1 progress: origin: %v; err: %v", safe.L1Origin, err), - ) + return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %v", safe.L1Origin, err)) } if safe.Time < l1Origin.Time { - return fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken", - safe, safe.Time, l1Origin, l1Origin.Time) + return NewResetError(fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken", + safe, safe.Time, l1Origin, l1Origin.Time)) } eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin) eq.unsafeHead = unsafe @@ -333,5 +299,4 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error Closed: false, } return io.EOF - } diff --git a/op-node/rollup/derive/error.go b/op-node/rollup/derive/error.go index d3182c67b2f1b..893e0fad1b8a7 100644 --- a/op-node/rollup/derive/error.go +++ b/op-node/rollup/derive/error.go @@ -7,6 +7,19 @@ import ( // Level is the severity level of the error. type Level uint +func (lvl Level) String() string { + switch lvl { + case LevelTemporary: + return "temp" + case LevelReset: + return "reset" + case LevelCritical: + return "crit" + default: + return fmt.Sprintf("unknown(%d)", lvl) + } +} + // There are three levels currently, out of which only 2 are being used // to classify error by severity. LevelTemporary const ( @@ -22,16 +35,15 @@ const ( // Error is a wrapper for error, description and a severity level. type Error struct { err error - desc string level Level } // Error satisfies the error interface. func (e Error) Error() string { if e.err != nil { - return fmt.Errorf("%w: %s", e.err, e.desc).Error() + return fmt.Sprintf("%s: %v", e.level, e.err) } - return e.desc + return e.level.String() } // Unwrap satisfies the Is/As interface. @@ -52,43 +64,30 @@ func (e Error) Is(target error) bool { } // NewError returns a custom Error. -func NewError(err error, desc string, level Level) error { +func NewError(err error, level Level) error { return Error{ err: err, - desc: desc, level: level, } } // NewTemporaryError returns a temporary error. -func NewTemporaryError(err error, desc string) error { - return NewError( - err, - desc, - LevelTemporary, - ) +func NewTemporaryError(err error) error { + return NewError(err, LevelTemporary) } // NewResetError returns a pipeline reset error. -func NewResetError(err error, desc string) error { - return NewError( - err, - desc, - LevelReset, - ) +func NewResetError(err error) error { + return NewError(err, LevelReset) } // NewCriticalError returns a critical error. -func NewCriticalError(err error, desc string) error { - return NewError( - err, - desc, - LevelCritical, - ) +func NewCriticalError(err error) error { + return NewError(err, LevelCritical) } // Sentinel errors, use these to get the severity of errors by calling // errors.Is(err, ErrTemporary) for example. -var ErrTemporary = NewTemporaryError(nil, "temporary error") -var ErrReset = NewResetError(nil, "pipeline reset error") -var ErrCritical = NewCriticalError(nil, "critical error") +var ErrTemporary = NewTemporaryError(nil) +var ErrReset = NewResetError(nil) +var ErrCritical = NewCriticalError(nil) diff --git a/op-node/rollup/derive/l1_retrieval.go b/op-node/rollup/derive/l1_retrieval.go index 9b007cced9dec..49bcf9a3fe52e 100644 --- a/op-node/rollup/derive/l1_retrieval.go +++ b/op-node/rollup/derive/l1_retrieval.go @@ -24,7 +24,7 @@ type DataAvailabilitySource interface { type L1SourceOutput interface { StageProgress - IngestData(data []byte) error + IngestData(data []byte) } type L1Retrieval struct { @@ -66,10 +66,7 @@ func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error { if l1r.datas == nil { datas, err := l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID()) if err != nil { - return NewTemporaryError( - err, - fmt.Sprintf("can't fetch L1 data: %v, %v", l1r.progress.Origin, err), - ) + return NewTemporaryError(fmt.Errorf("can't fetch L1 data: %v: %w", l1r.progress.Origin, err)) } l1r.datas = datas return nil @@ -79,25 +76,21 @@ func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error { if l1r.data == nil { l1r.log.Debug("fetching next piece of data") data, err := l1r.datas.Next(ctx) - if err != nil && err == ctx.Err() { - l1r.log.Warn("context to retrieve next L1 data failed", "err", err) - return nil - } else if err == io.EOF { + if err == io.EOF { l1r.progress.Closed = true l1r.datas = nil return io.EOF } else if err != nil { - return err + return NewTemporaryError(fmt.Errorf("context to retrieve next L1 data failed: %w", err)) } else { l1r.data = data return nil } } - // try to flush the data to next stage - if err := l1r.next.IngestData(l1r.data); err != nil { - return err - } + // flush the data to next stage + l1r.next.IngestData(l1r.data) + // and nil the data, the next step will retrieve the next data l1r.data = nil return nil } diff --git a/op-node/rollup/derive/l1_retrieval_test.go b/op-node/rollup/derive/l1_retrieval_test.go index 60583926a7715..efd592f77fd2b 100644 --- a/op-node/rollup/derive/l1_retrieval_test.go +++ b/op-node/rollup/derive/l1_retrieval_test.go @@ -32,13 +32,12 @@ type MockIngestData struct { MockOriginStage } -func (im *MockIngestData) IngestData(data []byte) error { - out := im.Mock.MethodCalled("IngestData", data) - return *out[0].(*error) +func (im *MockIngestData) IngestData(data []byte) { + im.Mock.MethodCalled("IngestData", data) } -func (im *MockIngestData) ExpectIngestData(data []byte, err error) { - im.Mock.On("IngestData", data).Return(&err) +func (im *MockIngestData) ExpectIngestData(data []byte) { + im.Mock.On("IngestData", data).Return() } var _ L1SourceOutput = (*MockIngestData)(nil) @@ -58,8 +57,8 @@ func TestL1Retrieval_Step(t *testing.T) { // mock some L1 data to open for the origin that is opened by the outer stage dataSrc.ExpectOpenData(outer.Origin.ID(), iter, nil) - next.ExpectIngestData(a, nil) - next.ExpectIngestData(b, nil) + next.ExpectIngestData(a) + next.ExpectIngestData(b) defer dataSrc.AssertExpectations(t) defer next.AssertExpectations(t) diff --git a/op-node/rollup/derive/l1_traversal.go b/op-node/rollup/derive/l1_traversal.go index 841038268abf9..f58f4de903465 100644 --- a/op-node/rollup/derive/l1_traversal.go +++ b/op-node/rollup/derive/l1_traversal.go @@ -52,11 +52,10 @@ func (l1t *L1Traversal) Step(ctx context.Context, outer Progress) error { l1t.log.Debug("can't find next L1 block info (yet)", "number", origin.Number+1, "origin", origin) return io.EOF } else if err != nil { - l1t.log.Warn("failed to find L1 block info by number", "number", origin.Number+1, "origin", origin, "err", err) - return nil // nil, don't make the pipeline restart if the RPC fails + return NewTemporaryError(fmt.Errorf("failed to find L1 block info by number, at origin %s next %d: %w", origin, origin.Number+1, err)) } if l1t.progress.Origin.Hash != nextL1Origin.ParentHash { - return NewResetError(ReorgErr, fmt.Sprintf("detected L1 reorg from %s to %s", l1t.progress.Origin, nextL1Origin)) + return NewResetError(fmt.Errorf("detected L1 reorg from %s to %s with conflicting parent %s", l1t.progress.Origin, nextL1Origin, nextL1Origin.ParentID())) } l1t.progress.Origin = nextL1Origin l1t.progress.Closed = false diff --git a/op-node/rollup/derive/l1_traversal_test.go b/op-node/rollup/derive/l1_traversal_test.go index ffc05c7761fbc..c88a2d5bec728 100644 --- a/op-node/rollup/derive/l1_traversal_test.go +++ b/op-node/rollup/derive/l1_traversal_test.go @@ -47,9 +47,10 @@ func TestL1Traversal_Step(t *testing.T) { require.Equal(t, a, tr.Progress().Origin, "stage needs to adopt the origin of next stage on reset") require.False(t, tr.Progress().Closed, "stage needs to be open after reset") + require.ErrorIs(t, RepeatStep(t, tr.Step, Progress{}, 10), ErrTemporary, "expected temporary error because of RPC mock fail") require.NoError(t, RepeatStep(t, tr.Step, Progress{}, 10)) require.Equal(t, c, tr.Progress().Origin, "expected to be stuck on ethereum.NotFound on d") require.NoError(t, RepeatStep(t, tr.Step, Progress{}, 1)) require.Equal(t, c, tr.Progress().Origin, "expected to be stuck again, should get the EOF within 1 step") - require.ErrorIs(t, RepeatStep(t, tr.Step, Progress{}, 10), ReorgErr, "completed pipeline, until L1 input f that causes a reorg") + require.ErrorIs(t, RepeatStep(t, tr.Step, Progress{}, 10), ErrReset, "completed pipeline, until L1 input f that causes a reorg") } diff --git a/op-node/rollup/derive/progress.go b/op-node/rollup/derive/progress.go index 862ef18686f3c..b6915b998548b 100644 --- a/op-node/rollup/derive/progress.go +++ b/op-node/rollup/derive/progress.go @@ -1,14 +1,11 @@ package derive import ( - "errors" "fmt" "github.com/ethereum-optimism/optimism/op-node/eth" ) -var ReorgErr = errors.New("reorg") - // Progress represents the progress of a derivation stage: // the input L1 block that is being processed, and whether it's fully processed yet. type Progress struct { @@ -24,12 +21,12 @@ func (pr *Progress) Update(outer Progress) (changed bool, err error) { if pr.Closed { if outer.Closed { if pr.Origin.ID() != outer.Origin.ID() { - return true, NewResetError(ReorgErr, fmt.Sprintf("outer stage changed origin from %s to %s without opening it", pr.Origin, outer.Origin)) + return true, NewResetError(fmt.Errorf("outer stage changed origin from %s to %s without opening it", pr.Origin, outer.Origin)) } return false, nil } else { if pr.Origin.Hash != outer.Origin.ParentHash { - return true, NewResetError(ReorgErr, fmt.Sprintf("detected internal pipeline reorg of L1 origin data from %s to %s", pr.Origin, outer.Origin)) + return true, NewResetError(fmt.Errorf("detected internal pipeline reorg of L1 origin data from %s to %s", pr.Origin, outer.Origin)) } pr.Origin = outer.Origin pr.Closed = false @@ -37,7 +34,7 @@ func (pr *Progress) Update(outer Progress) (changed bool, err error) { } } else { if pr.Origin.ID() != outer.Origin.ID() { - return true, NewResetError(ReorgErr, fmt.Sprintf("outer stage changed origin from %s to %s before closing it", pr.Origin, outer.Origin)) + return true, NewResetError(fmt.Errorf("outer stage changed origin from %s to %s before closing it", pr.Origin, outer.Origin)) } if outer.Closed { pr.Closed = true