Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 8 additions & 25 deletions op-node/rollup/derive/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,42 +33,28 @@ func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1Rece
if l2Parent.L1Origin.Number != epoch.Number {
info, _, receipts, err := dl.Fetch(ctx, epoch.Hash)
if err != nil {
return nil, NewTemporaryError(
err,
"failed to fetch L1 block info and receipts",
)
return nil, NewTemporaryError(fmt.Errorf("failed to fetch L1 block info and receipts: %w", err))
}
if l2Parent.L1Origin.Hash != info.ParentHash() {
return nil, NewResetError(
nil,
fmt.Sprintf("cannot create new block with L1 origin %s (parent %s) on top of L1 origin %s",
epoch, info.ParentHash(), l2Parent.L1Origin),
)
fmt.Errorf("cannot create new block with L1 origin %s (parent %s) on top of L1 origin %s",
epoch, info.ParentHash(), l2Parent.L1Origin))
}
deposits, err := DeriveDeposits(receipts, cfg.DepositContractAddress)
if err != nil {
return nil, NewResetError(
err,
"failed to derive some deposits",
)
// deposits may never be ignored. Failing to process them is a critical error.
return nil, NewCriticalError(fmt.Errorf("failed to derive some deposits: %w", err))
}
l1Info = info
depositTxs = deposits
seqNumber = 0
} else {
if l2Parent.L1Origin.Hash != epoch.Hash {
return nil, NewResetError(
nil,
fmt.Sprintf("cannot create new block with L1 origin %s in conflict with L1 origin %s",
epoch, l2Parent.L1Origin),
)
return nil, NewResetError(fmt.Errorf("cannot create new block with L1 origin %s in conflict with L1 origin %s", epoch, l2Parent.L1Origin))
}
info, err := dl.InfoByHash(ctx, epoch.Hash)
if err != nil {
return nil, NewTemporaryError(
err,
"failed to fetch L1 block info",
)
return nil, NewTemporaryError(fmt.Errorf("failed to fetch L1 block info: %w", err))
}
l1Info = info
depositTxs = nil
Expand All @@ -77,10 +63,7 @@ func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1Rece

l1InfoTx, err := L1InfoDepositBytes(seqNumber, l1Info)
if err != nil {
return nil, NewResetError(
err,
"failed to create l1InfoTx",
)
return nil, NewCriticalError(fmt.Errorf("failed to create l1InfoTx: %w", err))
}

txs := make([]hexutil.Bytes, 0, 1+len(depositTxs))
Expand Down
59 changes: 23 additions & 36 deletions op-node/rollup/derive/batch_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package derive

import (
"context"
"errors"
"fmt"
"io"
"sort"
Expand Down Expand Up @@ -74,9 +73,7 @@ func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error {
bq.log.Trace("Out of batches")
return io.EOF
} else if err != nil {
bq.log.Error("Error deriving batches", "err", err)
// Suppress transient errors for when reporting back to the pipeline
return nil
return err
}

for _, batch := range batches {
Expand All @@ -103,14 +100,14 @@ func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
return io.EOF
}

func (bq *BatchQueue) AddBatch(batch *BatchData) error {
func (bq *BatchQueue) AddBatch(batch *BatchData) {
if bq.progress.Closed {
panic("write batch while closed")
}
bq.log.Trace("queued batch", "origin", bq.progress.Origin, "tx_count", len(batch.Transactions), "timestamp", batch.Timestamp)
if len(bq.l1Blocks) == 0 {
return fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp)
panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp))
}
bq.log.Trace("queuing batch", "origin", bq.progress.Origin, "tx_count", len(batch.Transactions), "timestamp", batch.Timestamp)

data := BatchWithL1InclusionBlock{
L1InclusionBlock: bq.progress.Origin,
Expand All @@ -122,55 +119,45 @@ func (bq *BatchQueue) AddBatch(batch *BatchData) error {
for _, b := range batches {
if b.Batch.Timestamp == batch.Timestamp && b.Batch.Epoch() == batch.Epoch() {
bq.log.Warn("duplicate batch", "epoch", batch.Epoch(), "timestamp", batch.Timestamp, "txs", len(batch.Transactions))
return nil
return
}
}
} else {
bq.log.Debug("First seen batch", "epoch", batch.Epoch(), "timestamp", batch.Timestamp, "txs", len(batch.Transactions))

}
// May have duplicate block numbers or individual fields, but have limited complete duplicates
bq.batchesByTimestamp[batch.Timestamp] = append(batches, &data)
return nil
}

// validExtension determines if a batch follows the previous attributes
func (bq *BatchQueue) validExtension(batch *BatchWithL1InclusionBlock, prevTime, prevEpoch uint64) bool {
func (bq *BatchQueue) validExtension(batch *BatchWithL1InclusionBlock, prevTime, prevEpoch uint64) (valid bool, err error) {
if batch.Batch.Timestamp != prevTime+bq.config.BlockTime {
bq.log.Debug("Batch does not extend the block time properly", "time", batch.Batch.Timestamp, "prev_time", prevTime)

return false
return false, nil
}
if batch.Batch.EpochNum != rollup.Epoch(prevEpoch) && batch.Batch.EpochNum != rollup.Epoch(prevEpoch+1) {
bq.log.Debug("Batch does not extend the epoch properly", "epoch", batch.Batch.EpochNum, "prev_epoch", prevEpoch)

return false
return false, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
l1BlockRef, err := bq.dl.L1BlockRefByNumber(ctx, batch.Batch.Epoch().Number)
cancel()
if err != nil {
bq.log.Warn("err fetching l1 block", "err", err)
if errors.Is(err, ErrTemporary) {
// Skipping validation in case of temporary RPC error
bq.log.Warn("temporary err - skipping epoch hash validation", "err", err)
return true
} else {
return false
}
return false, err
}

if l1BlockRef.Hash != batch.Batch.EpochHash {
return false
bq.log.Debug("Batch epoch hash does not match expected L1 block hash", "batch_epoch", batch.Batch.Epoch(), "expected", l1BlockRef.ID())
return false, nil
}

// Note: `Batch.EpochNum` is an external input, but it is constrained to be a reasonable size by the
// above equality checks.
if uint64(batch.Batch.EpochNum)+bq.config.SeqWindowSize < batch.L1InclusionBlock.Number {
bq.log.Debug("Batch submitted outside sequence window", "epoch", batch.Batch.EpochNum, "inclusion_block", batch.L1InclusionBlock.Number)
return false
return false, nil
}
return true
return true, nil
}

// deriveBatches pulls a single batch eagerly or a collection of batches if it is the end of
Expand Down Expand Up @@ -230,15 +217,14 @@ func (bq *BatchQueue) deriveBatches(ctx context.Context, l2SafeHead eth.L2BlockR

} else {
bq.log.Trace("Trying to eagerly find batch")
var ret []*BatchData
next, err := bq.tryPopNextBatch(ctx, l2SafeHead)
if next != nil {
if err != nil {
return nil, err
} else {
bq.log.Info("found eager batch", "batch", next.Batch)
ret = append(ret, next.Batch)
return []*BatchData{next.Batch}, nil
}
return ret, err
}

}

// tryPopNextBatch tries to get the next batch from the batch queue using an eager approach.
Expand Down Expand Up @@ -285,29 +271,30 @@ func (bq *BatchQueue) tryPopNextBatch(ctx context.Context, l2SafeHead eth.L2Bloc
// Note: Don't check epoch change here, check it in `validExtension`
epoch, err := bq.dl.L1BlockRefByNumber(ctx, uint64(batch.Batch.EpochNum))
if err != nil {
bq.log.Warn("error fetching origin", "err", err)
return nil, err
return nil, NewTemporaryError(fmt.Errorf("error fetching origin: %w", err))
}
if err := ValidBatch(batch.Batch, bq.config, epoch.ID(), minL2Time, maxL2Time); err != nil {
bq.log.Warn("Invalid batch", "err", err)
break
}

// We have a valid batch, no make sure that it builds off the previous L2 block
if bq.validExtension(batch, l2SafeHead.Time, l2SafeHead.L1Origin.Number) {
if valid, err := bq.validExtension(batch, l2SafeHead.Time, l2SafeHead.L1Origin.Number); err != nil {
return nil, err
} else if valid {
// Advance the epoch if needed
if l2SafeHead.L1Origin.Number != uint64(batch.Batch.EpochNum) {
bq.l1Blocks = bq.l1Blocks[1:]
}
// Don't leak data in the map
delete(bq.batchesByTimestamp, batch.Batch.Timestamp)

bq.log.Info("Batch was valid extension")
bq.log.Debug("Batch was valid extension")

// We have found the fist valid batch.
return batch, nil
} else {
bq.log.Info("batch was not valid extension")
bq.log.Warn("batch was not valid extension", "inclusion", batch.L1InclusionBlock, "safe_origin", l2SafeHead.L1Origin, "l2_time", l2SafeHead.Time)
}
}

Expand Down
12 changes: 4 additions & 8 deletions op-node/rollup/derive/batch_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ func TestBatchQueueEager(t *testing.T) {
// Add batches
batches := []*BatchData{b(12, l1[0]), b(14, l1[0])}
for _, batch := range batches {
err := bq.AddBatch(batch)
require.Nil(t, err)
bq.AddBatch(batch)
}
// Step
for {
Expand Down Expand Up @@ -170,8 +169,7 @@ func TestBatchQueueFull(t *testing.T) {
// Add batches
batches := []*BatchData{b(14, l1[0]), b(16, l1[0]), b(18, l1[1])}
for _, batch := range batches {
err := bq.AddBatch(batch)
require.Nil(t, err)
bq.AddBatch(batch)
}
// Missing first batch
err = bq.Step(context.Background(), prevProgress)
Expand Down Expand Up @@ -205,8 +203,7 @@ func TestBatchQueueFull(t *testing.T) {

// Finally add batch
firstBatch := b(12, l1[0])
err = bq.AddBatch(firstBatch)
require.Equal(t, err, nil)
bq.AddBatch(firstBatch)

// Close the origin
prevProgress.Closed = true
Expand Down Expand Up @@ -271,8 +268,7 @@ func TestBatchQueueMissing(t *testing.T) {
// that batch timestamp 12 & 14 is created & 16 is used.
batches := []*BatchData{b(16, l1[0]), b(20, l1[1])}
for _, batch := range batches {
err := bq.AddBatch(batch)
require.Nil(t, err)
bq.AddBatch(batch)
}
// Missing first batch
err = bq.Step(context.Background(), prevProgress)
Expand Down
51 changes: 24 additions & 27 deletions op-node/rollup/derive/channel_bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,66 +66,66 @@ func (ib *ChannelBank) prune() {
}

// IngestData adds new L1 data to the channel bank.
// Read() should be called repeatedly first, until everything has been read, before adding new data.
// Then NextL1(ref) should be called to move forward to the next L1 input
func (ib *ChannelBank) IngestData(data []byte) error {
// Read() should be called repeatedly first, until everything has been read, before adding new data.\
func (ib *ChannelBank) IngestData(data []byte) {
if ib.progress.Closed {
panic("write data to bank while closed")
}
ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data))
if len(data) < 1 {
return NewTemporaryError(
nil,
"data must be at least have a version byte, but got empty string",
)
ib.log.Warn("data must be at least have a version byte, but got empty string")
return
}

if data[0] != DerivationVersion0 {
return fmt.Errorf("unrecognized derivation version: %d", data)
ib.log.Warn("unrecognized derivation version", "version", data)
return
}
buf := bytes.NewBuffer(data[1:])

ib.prune()

if buf.Len() < minimumFrameSize {
return fmt.Errorf("data must be at least have one frame")
ib.log.Warn("data must be at least have one frame", "length", buf.Len())
return
}

// Iterate over all frames. They may have different channel IDs to indicate that they stream consumer should reset.
for {
// Don't try to unmarshal from an empty buffer.
// The if done checks should catch most/all of this case though.
if buf.Len() < ChannelIDDataSize+1 {
return nil
return
}
done := false
var f Frame
if err := (&f).UnmarshalBinary(buf); err == io.EOF {
done = true
} else if err != nil {
return fmt.Errorf("failed to unmarshal a frame: %w", err)

ib.log.Warn("malformed frame: %w", err)
return
}

// stop reading and ignore remaining data if we encounter a zeroed ID
// stop reading and ignore remaining data if we encounter a zeroed ID,
// this happens when there is zero padding at the end.
if f.ID == (ChannelID{}) {
ib.log.Info("empty channel ID")
return nil
ib.log.Trace("empty channel ID")
return
}

// check if the channel is not timed out
if f.ID.Time+ib.cfg.ChannelTimeout < ib.progress.Origin.Time {
ib.log.Info("channel is timed out, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber)
ib.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber)
if done {
return nil
return
}
continue
}
// check if the channel is not included too soon (otherwise timeouts wouldn't be effective)
if f.ID.Time > ib.progress.Origin.Time {
ib.log.Info("channel claims to be from the future, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber)
ib.log.Warn("channel claims to be from the future, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber)
if done {
return nil
return
}
continue
}
Expand All @@ -137,17 +137,17 @@ func (ib *ChannelBank) IngestData(data []byte) error {
ib.channelQueue = append(ib.channelQueue, f.ID)
}

ib.log.Debug("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.IngestData(uint64(f.FrameNumber), f.IsLast, f.Data); err != nil {
ib.log.Debug("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
if done {
return nil
return
}
continue
}

if done {
return nil
return
}
}
}
Expand Down Expand Up @@ -221,10 +221,7 @@ func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
// go back in history if we are not distant enough from the next stage
parent, err := l1Fetcher.L1BlockRefByHash(ctx, ib.progress.Origin.ParentHash)
if err != nil {
return NewTemporaryError(
err,
fmt.Sprintf("failed to find channel bank block, failed to retrieve L1 reference: %v", err),
)
return NewTemporaryError(fmt.Errorf("failed to find channel bank block, failed to retrieve L1 reference: %w", err))
}
ib.progress.Origin = parent
return nil
Expand Down
3 changes: 2 additions & 1 deletion op-node/rollup/derive/channel_bank_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ func (tf testFrame) ToFrame() Frame {
}

func (bt *bankTestSetup) ingestData(data []byte) {
require.NoError(bt.t, bt.cb.IngestData(data))
bt.cb.IngestData(data)
}

func (bt *bankTestSetup) ingestFrames(frames ...testFrame) {
data := new(bytes.Buffer)
data.WriteByte(DerivationVersion0)
Expand Down
Loading