diff --git a/op-node/rollup/derive/attributes_queue.go b/op-node/rollup/derive/attributes_queue.go index cec93477a3205..f2e764b0c3cb0 100644 --- a/op-node/rollup/derive/attributes_queue.go +++ b/op-node/rollup/derive/attributes_queue.go @@ -22,58 +22,60 @@ import ( // This stage can be reset by clearing it's batch buffer. // This stage does not need to retain any references to L1 blocks. -type AttributesQueueOutput interface { - AddSafeAttributes(attributes *eth.PayloadAttributes) - SafeL2Head() eth.L2BlockRef - StageProgress -} - type AttributesQueue struct { - log log.Logger - config *rollup.Config - dl L1ReceiptsFetcher - next AttributesQueueOutput - progress Progress - batches []*BatchData + log log.Logger + config *rollup.Config + dl L1ReceiptsFetcher + prev *BatchQueue + batch *BatchData } -func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, next AttributesQueueOutput) *AttributesQueue { +func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, prev *BatchQueue) *AttributesQueue { return &AttributesQueue{ log: log, config: cfg, dl: l1Fetcher, - next: next, + prev: prev, } } -func (aq *AttributesQueue) AddBatch(batch *BatchData) { - aq.log.Debug("Received next batch", "batch_epoch", batch.EpochNum, "batch_timestamp", batch.Timestamp, "tx_count", len(batch.Transactions)) - aq.batches = append(aq.batches, batch) +func (aq *AttributesQueue) Origin() eth.L1BlockRef { + return aq.prev.Origin() } -func (aq *AttributesQueue) Progress() Progress { - return aq.progress -} - -func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error { - if changed, err := aq.progress.Update(outer); err != nil || changed { - return err +func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) { + // Get a batch if we need it + if aq.batch == nil { + batch, err := aq.prev.NextBatch(ctx, l2SafeHead) + if err != nil { + return nil, err + } + aq.batch = batch } - if len(aq.batches) == 0 { - return io.EOF + + // Actually generate the next attributes + if attrs, err := aq.createNextAttributes(ctx, aq.batch, l2SafeHead); err != nil { + return nil, err + } else { + // Clear out the local state once we will succeed + aq.batch = nil + return attrs, nil } - batch := aq.batches[0] - safeL2Head := aq.next.SafeL2Head() +} + +// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions +// to the attributes transaction list +func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) { // sanity check parent hash - if batch.ParentHash != safeL2Head.Hash { - return NewCriticalError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, safeL2Head.Hash)) + if batch.ParentHash != l2SafeHead.Hash { + return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash)) } fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second) defer cancel() - attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, safeL2Head, batch.Timestamp, batch.Epoch()) + attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, l2SafeHead, batch.Timestamp, batch.Epoch()) if err != nil { - return err + return nil, err } // we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool @@ -83,19 +85,9 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error { aq.log.Info("generated attributes in payload queue", "txs", len(attrs.Transactions), "timestamp", batch.Timestamp) - // Slice off the batch once we are guaranteed to succeed - aq.batches = aq.batches[1:] - - aq.next.AddSafeAttributes(attrs) - return nil + return attrs, nil } -func (aq *AttributesQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - aq.batches = aq.batches[:0] - aq.progress = aq.next.Progress() +func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error { return io.EOF } - -func (aq *AttributesQueue) SafeL2Head() eth.L2BlockRef { - return aq.next.SafeL2Head() -} diff --git a/op-node/rollup/derive/attributes_queue_test.go b/op-node/rollup/derive/attributes_queue_test.go index 226cfa82fd701..c5784f59f1715 100644 --- a/op-node/rollup/derive/attributes_queue_test.go +++ b/op-node/rollup/derive/attributes_queue_test.go @@ -2,7 +2,6 @@ package derive import ( "context" - "io" "math/big" "math/rand" "testing" @@ -17,29 +16,10 @@ import ( "github.com/ethereum/go-ethereum/log" ) -type MockAttributesQueueOutput struct { - MockOriginStage -} - -func (m *MockAttributesQueueOutput) AddSafeAttributes(attributes *eth.PayloadAttributes) { - m.Mock.MethodCalled("AddSafeAttributes", attributes) -} - -func (m *MockAttributesQueueOutput) ExpectAddSafeAttributes(attributes *eth.PayloadAttributes) { - m.Mock.On("AddSafeAttributes", attributes).Once().Return() -} - -func (m *MockAttributesQueueOutput) SafeL2Head() eth.L2BlockRef { - return m.Mock.MethodCalled("SafeL2Head").Get(0).(eth.L2BlockRef) -} - -func (m *MockAttributesQueueOutput) ExpectSafeL2Head(head eth.L2BlockRef) { - m.Mock.On("SafeL2Head").Once().Return(head) -} - -var _ AttributesQueueOutput = (*MockAttributesQueueOutput)(nil) - -func TestAttributesQueue_Step(t *testing.T) { +// TestAttributesQueue checks that it properly uses the PreparePayloadAttributes function +// (which is well tested) and that it properly sets NoTxPool and adds in the candidate +// transactions. +func TestAttributesQueue(t *testing.T) { // test config, only init the necessary fields cfg := &rollup.Config{ BlockTime: 2, @@ -56,18 +36,9 @@ func TestAttributesQueue_Step(t *testing.T) { l1Fetcher.ExpectInfoByHash(l1Info.InfoHash, l1Info, nil) - out := &MockAttributesQueueOutput{} - out.progress = Progress{ - Origin: l1Info.BlockRef(), - Closed: false, - } - defer out.AssertExpectations(t) - safeHead := testutils.RandomL2BlockRef(rng) safeHead.L1Origin = l1Info.ID() - out.ExpectSafeL2Head(safeHead) - batch := &BatchData{BatchV1{ ParentHash: safeHead.Hash, EpochNum: rollup.Epoch(l1Info.InfoNum), @@ -85,13 +56,11 @@ func TestAttributesQueue_Step(t *testing.T) { Transactions: []eth.Data{l1InfoTx, eth.Data("foobar"), eth.Data("example")}, NoTxPool: true, } - out.ExpectAddSafeAttributes(&attrs) - aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, out) - require.NoError(t, RepeatResetStep(t, aq.ResetStep, l1Fetcher, 1)) + aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, nil) - aq.AddBatch(batch) + actual, err := aq.createNextAttributes(context.Background(), batch, safeHead) - require.NoError(t, aq.Step(context.Background(), out.progress), "adding batch to next stage, no EOF yet") - require.Equal(t, io.EOF, aq.Step(context.Background(), out.progress), "done with batches") + require.Nil(t, err) + require.Equal(t, attrs, *actual) } diff --git a/op-node/rollup/derive/batch_queue.go b/op-node/rollup/derive/batch_queue.go index ed49f5dcfdbf7..5e5dd5496ff62 100644 --- a/op-node/rollup/derive/batch_queue.go +++ b/op-node/rollup/derive/batch_queue.go @@ -32,13 +32,18 @@ type BatchQueueOutput interface { SafeL2Head() eth.L2BlockRef } +type NextBatchProvider interface { + Origin() eth.L1BlockRef + NextBatch(ctx context.Context) (*BatchData, error) +} + // BatchQueue contains a set of batches for every L1 block. // L1 blocks are contiguous and this does not support reorgs. type BatchQueue struct { - log log.Logger - config *rollup.Config - next BatchQueueOutput - progress Progress + log log.Logger + config *rollup.Config + prev NextBatchProvider + origin eth.L1BlockRef l1Blocks []eth.L1BlockRef @@ -47,62 +52,91 @@ type BatchQueue struct { } // NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use. -func NewBatchQueue(log log.Logger, cfg *rollup.Config, next BatchQueueOutput) *BatchQueue { +func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider) *BatchQueue { return &BatchQueue{ log: log, config: cfg, - next: next, + prev: prev, } } -func (bq *BatchQueue) Progress() Progress { - return bq.progress +func (bq *BatchQueue) Origin() eth.L1BlockRef { + return bq.prev.Origin() } -func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error { - if changed, err := bq.progress.Update(outer); err != nil { - return err - } else if changed { - if !bq.progress.Closed { // init inputs if we moved to a new open origin - bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin) +func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) { + originBehind := bq.origin.Number < safeL2Head.L1Origin.Number + + // Advance origin if needed + // Note: The entire pipeline has the same origin + // We just don't accept batches prior to the L1 origin of the L2 safe head + if bq.origin != bq.prev.Origin() { + bq.origin = bq.prev.Origin() + if !originBehind { + bq.l1Blocks = append(bq.l1Blocks, bq.origin) + } else { + // This is to handle the special case of startup. At startup we call Reset & include + // the L1 origin. That is the only time where immediately after `Reset` is called + // originBehind is false. + bq.l1Blocks = bq.l1Blocks[:0] } - return nil + bq.log.Info("Advancing bq origin", "origin", bq.origin) } - batch, err := bq.deriveNextBatch(ctx) - if err == io.EOF { - // very noisy, commented for now, or we should bump log level from trace to debug - // bq.log.Trace("need more L1 data before deriving next batch", "progress", bq.progress.Origin) - return io.EOF + + // Load more data into the batch queue + outOfData := false + if batch, err := bq.prev.NextBatch(ctx); err == io.EOF { + outOfData = true } else if err != nil { - return err + return nil, err + } else if !originBehind { + bq.AddBatch(batch, safeL2Head) } - bq.next.AddBatch(batch) - return nil + + // Skip adding data unless we are up to date with the origin, but do fully + // empty the previous stages + if originBehind { + if outOfData { + return nil, io.EOF + } else { + return nil, NotEnoughData + } + } + + // Finally attempt to derive more batches + batch, err := bq.deriveNextBatch(ctx, outOfData, safeL2Head) + if err == io.EOF && outOfData { + return nil, io.EOF + } else if err == io.EOF { + return nil, NotEnoughData + } else if err != nil { + return nil, err + } + return batch, nil } -func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { +func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef) error { // Copy over the Origin from the next stage // It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress - bq.progress = bq.next.Progress() + bq.origin = base bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock) // Include the new origin as an origin to build on + // Note: This is only for the initialization case. During normal resets we will later + // throw out this block. bq.l1Blocks = bq.l1Blocks[:0] - bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin) + bq.l1Blocks = append(bq.l1Blocks, base) return io.EOF } -func (bq *BatchQueue) AddBatch(batch *BatchData) { - if bq.progress.Closed { - panic("write batch while closed") - } +func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) { if len(bq.l1Blocks) == 0 { panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp)) } data := BatchWithL1InclusionBlock{ - L1InclusionBlock: bq.progress.Origin, + L1InclusionBlock: bq.origin, Batch: batch, } - validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, bq.next.SafeL2Head(), &data) + validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data) if validity == BatchDrop { return // if we do drop the batch, CheckBatch will log the drop reason with WARN level. } @@ -113,12 +147,11 @@ func (bq *BatchQueue) AddBatch(batch *BatchData) { // following the validity rules imposed on consecutive batches, // based on currently available buffered batch and L1 origin information. // If no batch can be derived yet, then (nil, io.EOF) is returned. -func (bq *BatchQueue) deriveNextBatch(ctx context.Context) (*BatchData, error) { +func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) { if len(bq.l1Blocks) == 0 { return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared")) } epoch := bq.l1Blocks[0] - l2SafeHead := bq.next.SafeL2Head() if l2SafeHead.L1Origin != epoch.ID() { return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head %s", epoch, l2SafeHead)) @@ -183,8 +216,8 @@ batchLoop: // i.e. if the sequence window expired, we create empty batches expiryEpoch := epoch.Number + bq.config.SeqWindowSize forceNextEpoch := - (expiryEpoch == bq.progress.Origin.Number && bq.progress.Closed) || - expiryEpoch < bq.progress.Origin.Number + (expiryEpoch == bq.origin.Number && outOfData) || + expiryEpoch < bq.origin.Number if !forceNextEpoch { // sequence window did not expire yet, still room to receive batches for the current epoch, diff --git a/op-node/rollup/derive/batch_queue_test.go b/op-node/rollup/derive/batch_queue_test.go index 9c001d5af5cd6..b921ff1318aaa 100644 --- a/op-node/rollup/derive/batch_queue_test.go +++ b/op-node/rollup/derive/batch_queue_test.go @@ -7,8 +7,6 @@ import ( "math/rand" "testing" - "github.com/stretchr/testify/require" - "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/testlog" @@ -16,44 +14,28 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" ) -// fakeBatchQueueOutput fakes the next stage (receive only) for the batch queue -// It tracks the progress state of the next stage. -// Upon receiving a batch, relevant characteristic of safeL2Head are immediately advanced. -type fakeBatchQueueOutput struct { - progress Progress - batches []*BatchData - safeL2Head eth.L2BlockRef +type fakeBatchQueueInput struct { + i int + batches []*BatchData + errors []error + origin eth.L1BlockRef } -var _ BatchQueueOutput = (*fakeBatchQueueOutput)(nil) - -func (f *fakeBatchQueueOutput) AddBatch(batch *BatchData) { - f.batches = append(f.batches, batch) - if batch.ParentHash != f.safeL2Head.Hash { - panic("batch has wrong parent hash") - } - newEpoch := f.safeL2Head.L1Origin.Hash != batch.EpochHash - // Advance SafeL2Head - f.safeL2Head.Time = batch.Timestamp - f.safeL2Head.L1Origin.Number = uint64(batch.EpochNum) - f.safeL2Head.L1Origin.Hash = batch.EpochHash - if newEpoch { - f.safeL2Head.SequenceNumber = 0 - } else { - f.safeL2Head.SequenceNumber += 1 - } - f.safeL2Head.ParentHash = batch.ParentHash - f.safeL2Head.Hash = mockHash(batch.Timestamp, 2) +func (f *fakeBatchQueueInput) Origin() eth.L1BlockRef { + return f.origin } -func (f *fakeBatchQueueOutput) SafeL2Head() eth.L2BlockRef { - return f.safeL2Head -} - -func (f *fakeBatchQueueOutput) Progress() Progress { - return f.progress +func (f *fakeBatchQueueInput) NextBatch(ctx context.Context) (*BatchData, error) { + if f.i >= len(f.batches) { + return nil, io.EOF + } + b := f.batches[f.i] + e := f.errors[f.i] + f.i += 1 + return b, e } func mockHash(time uint64, layer uint8) common.Hash { @@ -90,22 +72,18 @@ func L1Chain(l1Times []uint64) []eth.L1BlockRef { return out } +// TestBatchQueueEager adds a bunch of contiguous batches and asserts that +// enough calls to `NextBatch` return all of those batches. func TestBatchQueueEager(t *testing.T) { - log := testlog.Logger(t, log.LvlTrace) + log := testlog.Logger(t, log.LvlCrit) l1 := L1Chain([]uint64{10, 20, 30}) - next := &fakeBatchQueueOutput{ - safeL2Head: eth.L2BlockRef{ - Hash: mockHash(10, 2), - Number: 0, - ParentHash: common.Hash{}, - Time: 10, - L1Origin: l1[0].ID(), - SequenceNumber: 0, - }, - progress: Progress{ - Origin: l1[0], - Closed: false, - }, + safeHead := eth.L2BlockRef{ + Hash: mockHash(10, 2), + Number: 0, + ParentHash: common.Hash{}, + Time: 10, + L1Origin: l1[0].ID(), + SequenceNumber: 0, } cfg := &rollup.Config{ Genesis: rollup.Genesis{ @@ -116,129 +94,44 @@ func TestBatchQueueEager(t *testing.T) { SeqWindowSize: 30, } - bq := NewBatchQueue(log, cfg, next) - require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step") + batches := []*BatchData{b(12, l1[0]), b(14, l1[0]), b(16, l1[0]), b(18, l1[0]), b(20, l1[0]), b(22, l1[0]), b(24, l1[1]), nil} + errors := []error{nil, nil, nil, nil, nil, nil, nil, io.EOF} - // We start with an open L1 origin as progress in the first step - progress := bq.progress - require.Equal(t, bq.progress.Closed, false) - - // Add batches - batches := []*BatchData{b(12, l1[0]), b(14, l1[0])} - for _, batch := range batches { - bq.AddBatch(batch) - } - // Step - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - - // Verify Output - require.Equal(t, batches, next.batches) -} - -func TestBatchQueueFull(t *testing.T) { - log := testlog.Logger(t, log.LvlTrace) - l1 := L1Chain([]uint64{10, 15, 20}) - next := &fakeBatchQueueOutput{ - safeL2Head: eth.L2BlockRef{ - Hash: mockHash(10, 2), - Number: 0, - ParentHash: common.Hash{}, - Time: 10, - L1Origin: l1[0].ID(), - SequenceNumber: 0, - }, - progress: Progress{ - Origin: l1[0], - Closed: false, - }, - } - cfg := &rollup.Config{ - Genesis: rollup.Genesis{ - L2Time: 10, - }, - BlockTime: 2, - MaxSequencerDrift: 600, - SeqWindowSize: 2, + input := &fakeBatchQueueInput{ + batches: batches, + errors: errors, + origin: l1[0], } - bq := NewBatchQueue(log, cfg, next) - require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step") - - // We start with an open L1 origin as progress in the first step - progress := bq.progress - require.Equal(t, bq.progress.Closed, false) - - // Add batches - batches := []*BatchData{b(14, l1[0]), b(16, l1[0]), b(18, l1[1])} - for _, batch := range batches { - bq.AddBatch(batch) + bq := NewBatchQueue(log, cfg, input) + _ = bq.Reset(context.Background(), l1[0]) + // Advance the origin + input.origin = l1[1] + + for i := 0; i < len(batches); i++ { + b, e := bq.NextBatch(context.Background(), safeHead) + require.ErrorIs(t, e, errors[i]) + require.Equal(t, batches[i], b) + + if b != nil { + safeHead.Number += 1 + safeHead.Time += 2 + safeHead.Hash = mockHash(b.Timestamp, 2) + safeHead.L1Origin = b.Epoch() + } } - // Missing first batch - err := bq.Step(context.Background(), progress) - require.Equal(t, err, io.EOF) - - // Close previous to close bq - progress.Closed = true - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, true) - - // Open previous to open bq with the new inclusion block - progress.Closed = false - progress.Origin = l1[1] - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, false) - - // Close previous to close bq (for epoch 2) - progress.Closed = true - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, true) - - // Open previous to open bq with the new inclusion block (epoch 2) - progress.Closed = false - progress.Origin = l1[2] - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, false) - - // Finally add batch - firstBatch := b(12, l1[0]) - bq.AddBatch(firstBatch) - - // Close the origin - progress.Closed = true - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, true) - - // Step, but should have full epoch now - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - - // Verify Output - var final []*BatchData - final = append(final, firstBatch) - final = append(final, batches...) - require.Equal(t, final, next.batches) } func TestBatchQueueMissing(t *testing.T) { - log := testlog.Logger(t, log.LvlTrace) + log := testlog.Logger(t, log.LvlCrit) l1 := L1Chain([]uint64{10, 15, 20}) - next := &fakeBatchQueueOutput{ - safeL2Head: eth.L2BlockRef{ - Hash: mockHash(10, 2), - Number: 0, - ParentHash: common.Hash{}, - Time: 10, - L1Origin: l1[0].ID(), - SequenceNumber: 0, - }, - progress: Progress{ - Origin: l1[0], - Closed: false, - }, + safeHead := eth.L2BlockRef{ + Hash: mockHash(10, 2), + Number: 0, + ParentHash: common.Hash{}, + Time: 10, + L1Origin: l1[0].ID(), + SequenceNumber: 0, } cfg := &rollup.Config{ Genesis: rollup.Genesis{ @@ -249,56 +142,68 @@ func TestBatchQueueMissing(t *testing.T) { SeqWindowSize: 2, } - bq := NewBatchQueue(log, cfg, next) - require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step") - - // We start with an open L1 origin as progress in the first step - progress := bq.progress - require.Equal(t, bq.progress.Closed, false) - // The batches at 18 and 20 are skipped to stop 22 from being eagerly processed. // This test checks that batch timestamp 12 & 14 are created, 16 is used, and 18 is advancing the epoch. // Due to the large sequencer time drift 16 is perfectly valid to have epoch 0 as origin. batches := []*BatchData{b(16, l1[0]), b(22, l1[1])} - for _, batch := range batches { - bq.AddBatch(batch) - } - // Missing first batches with timestamp 12 and 14, nothing to do yet. - err := bq.Step(context.Background(), progress) - require.Equal(t, err, io.EOF) + errors := []error{nil, nil} - // Close l1[0] - progress.Closed = true - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, true) + input := &fakeBatchQueueInput{ + batches: batches, + errors: errors, + origin: l1[0], + } - // Open l1[1] - progress.Closed = false - progress.Origin = l1[1] - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, false) - require.Empty(t, next.batches, "no batches yet, sequence window did not expire, waiting for 12 and 14") + bq := NewBatchQueue(log, cfg, input) + _ = bq.Reset(context.Background(), l1[0]) - // Close l1[1] - progress.Closed = true - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, true) + for i := 0; i < len(batches); i++ { + b, e := bq.NextBatch(context.Background(), safeHead) + require.ErrorIs(t, e, NotEnoughData) + require.Nil(t, b) + } - // Open l1[2] - progress.Closed = false - progress.Origin = l1[2] - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, false) + // advance origin. Underlying stage still has no more batches + // This is not enough to auto advance yet + input.origin = l1[1] + b, e := bq.NextBatch(context.Background(), safeHead) + require.ErrorIs(t, e, io.EOF) + require.Nil(t, b) + + // Advance the origin. At this point batch timestamps 12 and 14 will be created + input.origin = l1[2] + + // Check for a generated batch at t = 12 + b, e = bq.NextBatch(context.Background(), safeHead) + require.Nil(t, e) + require.Equal(t, b.Timestamp, uint64(12)) + require.Empty(t, b.BatchV1.Transactions) + safeHead.Number += 1 + safeHead.Time += 2 + safeHead.Hash = mockHash(b.Timestamp, 2) + + // Check for generated batch at t = 14 + b, e = bq.NextBatch(context.Background(), safeHead) + require.Nil(t, e) + require.Equal(t, b.Timestamp, uint64(14)) + require.Empty(t, b.BatchV1.Transactions) + safeHead.Number += 1 + safeHead.Time += 2 + safeHead.Hash = mockHash(b.Timestamp, 2) + + // Check for the inputted batch at t = 16 + b, e = bq.NextBatch(context.Background(), safeHead) + require.Nil(t, e) + require.Equal(t, b, batches[0]) + safeHead.Number += 1 + safeHead.Time += 2 + safeHead.Hash = mockHash(b.Timestamp, 2) + + // Check for the generated batch at t = 18. This batch advances the epoch + b, e = bq.NextBatch(context.Background(), safeHead) + require.Nil(t, e) + require.Equal(t, b.Timestamp, uint64(18)) + require.Empty(t, b.BatchV1.Transactions) + require.Equal(t, rollup.Epoch(1), b.EpochNum) - // Close l1[2], this is the moment that l1[0] expires and empty batches 12 and 14 can be created, - // and batch 16 can then be used. - progress.Closed = true - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, true) - require.Equal(t, 4, len(next.batches), "expecting empty batches with timestamp 12 and 14 to be created and existing batch 16 to follow") - require.Equal(t, uint64(12), next.batches[0].Timestamp) - require.Equal(t, uint64(14), next.batches[1].Timestamp) - require.Equal(t, batches[0], next.batches[2]) - require.Equal(t, uint64(18), next.batches[3].Timestamp) - require.Equal(t, rollup.Epoch(1), next.batches[3].EpochNum) } diff --git a/op-node/rollup/derive/channel_bank.go b/op-node/rollup/derive/channel_bank.go index 00d603469b17a..9a4581051cddc 100644 --- a/op-node/rollup/derive/channel_bank.go +++ b/op-node/rollup/derive/channel_bank.go @@ -2,7 +2,6 @@ package derive import ( "context" - "fmt" "io" "github.com/ethereum-optimism/optimism/op-node/eth" @@ -11,6 +10,11 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type NextDataProvider interface { + NextData(ctx context.Context) ([]byte, error) + Origin() eth.L1BlockRef +} + // ChannelBank is a stateful stage that does the following: // 1. Unmarshalls frames from L1 transaction data // 2. Applies those frames to a channel @@ -22,11 +26,6 @@ import ( // Specifically, the channel bank is not allowed to become too large between successive calls // to `IngestData`. This means that we can do an ingest and then do a read while becoming too large. -type ChannelBankOutput interface { - StageProgress - WriteChannel(data []byte) -} - // ChannelBank buffers channel frames, and emits full channel data type ChannelBank struct { log log.Logger @@ -35,80 +34,78 @@ type ChannelBank struct { channels map[ChannelID]*Channel // channels by ID channelQueue []ChannelID // channels in FIFO order - progress Progress - - next ChannelBankOutput + prev NextDataProvider + fetcher L1Fetcher } -var _ Stage = (*ChannelBank)(nil) +var _ PullStage = (*ChannelBank)(nil) // NewChannelBank creates a ChannelBank, which should be Reset(origin) before use. -func NewChannelBank(log log.Logger, cfg *rollup.Config, next ChannelBankOutput) *ChannelBank { +func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextDataProvider, fetcher L1Fetcher) *ChannelBank { return &ChannelBank{ log: log, cfg: cfg, channels: make(map[ChannelID]*Channel), channelQueue: make([]ChannelID, 0, 10), - next: next, + prev: prev, + fetcher: fetcher, } } -func (ib *ChannelBank) Progress() Progress { - return ib.progress +func (cb *ChannelBank) Origin() eth.L1BlockRef { + return cb.prev.Origin() } -func (ib *ChannelBank) prune() { +func (cb *ChannelBank) prune() { // check total size totalSize := uint64(0) - for _, ch := range ib.channels { + for _, ch := range cb.channels { totalSize += ch.size } // prune until it is reasonable again. The high-priority channel failed to be read, so we start pruning there. for totalSize > MaxChannelBankSize { - id := ib.channelQueue[0] - ch := ib.channels[id] - ib.channelQueue = ib.channelQueue[1:] - delete(ib.channels, id) + id := cb.channelQueue[0] + ch := cb.channels[id] + cb.channelQueue = cb.channelQueue[1:] + delete(cb.channels, id) totalSize -= ch.size } } // IngestData adds new L1 data to the channel bank. // Read() should be called repeatedly first, until everything has been read, before adding new data.\ -func (ib *ChannelBank) IngestData(data []byte) { - if ib.progress.Closed { - panic("write data to bank while closed") - } - ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data)) +func (cb *ChannelBank) IngestData(data []byte) { + origin := cb.Origin() + cb.log.Debug("channel bank got new data", "origin", origin, "data_len", len(data)) // TODO: Why is the prune here? - ib.prune() + cb.prune() frames, err := ParseFrames(data) if err != nil { - ib.log.Warn("malformed frame", "err", err) + cb.log.Warn("malformed frame", "err", err) return } // Process each frame for _, f := range frames { - currentCh, ok := ib.channels[f.ID] + currentCh, ok := cb.channels[f.ID] if !ok { // create new channel if it doesn't exist yet - currentCh = NewChannel(f.ID, ib.progress.Origin) - ib.channels[f.ID] = currentCh - ib.channelQueue = append(ib.channelQueue, f.ID) + currentCh = NewChannel(f.ID, origin) + cb.channels[f.ID] = currentCh + cb.channelQueue = append(cb.channelQueue, f.ID) } // check if the channel is not timed out - if currentCh.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.progress.Origin.Number { - ib.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber) + if currentCh.OpenBlockNumber()+cb.cfg.ChannelTimeout < origin.Number { + cb.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber) continue } - ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data)) - if err := currentCh.AddFrame(f, ib.progress.Origin); err != nil { - ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err) + cb.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data)) + if err := currentCh.AddFrame(f, origin); err != nil { + cb.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err) continue } } @@ -116,72 +113,62 @@ func (ib *ChannelBank) IngestData(data []byte) { // Read the raw data of the first channel, if it's timed-out or closed. // Read returns io.EOF if there is nothing new to read. -func (ib *ChannelBank) Read() (data []byte, err error) { - if len(ib.channelQueue) == 0 { +func (cb *ChannelBank) Read() (data []byte, err error) { + if len(cb.channelQueue) == 0 { return nil, io.EOF } - first := ib.channelQueue[0] - ch := ib.channels[first] - timedOut := ch.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.progress.Origin.Number + first := cb.channelQueue[0] + ch := cb.channels[first] + timedOut := ch.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.Origin().Number if timedOut { - ib.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs)) - delete(ib.channels, first) - ib.channelQueue = ib.channelQueue[1:] + cb.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs)) + delete(cb.channels, first) + cb.channelQueue = cb.channelQueue[1:] return nil, io.EOF } if !ch.IsReady() { return nil, io.EOF } - delete(ib.channels, first) - ib.channelQueue = ib.channelQueue[1:] + delete(cb.channels, first) + cb.channelQueue = cb.channelQueue[1:] r := ch.Reader() // Suprress error here. io.ReadAll does return nil instead of io.EOF though. data, _ = io.ReadAll(r) return data, nil } -func (ib *ChannelBank) Step(ctx context.Context, outer Progress) error { - if changed, err := ib.progress.Update(outer); err != nil || changed { - return err - } - - // If the bank is behind the channel reader, then we are replaying old data to prepare the bank. - // Read if we can, and drop if it gives anything - if ib.next.Progress().Origin.Number > ib.progress.Origin.Number { - _, err := ib.Read() - return err +// NextData pulls the next piece of data from the channel bank. +// Note that it attempts to pull data out of the channel bank prior to +// loading data in (unlike most other stages). This is to ensure maintain +// consistency around channel bank pruning which depends upon the order +// of operations. +func (cb *ChannelBank) NextData(ctx context.Context) ([]byte, error) { + + // Do the read from the channel bank first + data, err := cb.Read() + if err == io.EOF { + // continue - We will attempt to load data into the channel bank + } else if err != nil { + return nil, err + } else { + return data, nil } - // otherwise, read the next channel data from the bank - data, err := ib.Read() - if err == io.EOF { // need new L1 data in the bank before we can read more channel data - return io.EOF + // Then load data into the channel bank + if data, err := cb.prev.NextData(ctx); err == io.EOF { + return nil, io.EOF } else if err != nil { - return err + return nil, err + } else { + cb.IngestData(data) + return nil, NotEnoughData } - ib.next.WriteChannel(data) - return nil } -// ResetStep walks back the L1 chain, starting at the origin of the next stage, -// to find the origin that the channel bank should be reset to, -// to get consistent reads starting at origin. -// Any channel data before this origin will be timed out by the time the channel bank is synced up to the origin, -// so it is not relevant to replay it into the bank. -func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - ib.progress = ib.next.Progress() - ib.log.Debug("walking back to find reset origin for channel bank", "origin", ib.progress.Origin) - // go back in history if we are not distant enough from the next stage - resetBlock := ib.progress.Origin.Number - ib.cfg.ChannelTimeout - if ib.progress.Origin.Number < ib.cfg.ChannelTimeout { - resetBlock = 0 // don't underflow - } - parent, err := l1Fetcher.L1BlockRefByNumber(ctx, resetBlock) - if err != nil { - return NewTemporaryError(fmt.Errorf("failed to find channel bank block, failed to retrieve L1 reference: %w", err)) - } - ib.progress.Origin = parent +func (cb *ChannelBank) Reset(ctx context.Context, base eth.L1BlockRef) error { + cb.channels = make(map[ChannelID]*Channel) + cb.channelQueue = make([]ChannelID, 0, 10) return io.EOF } diff --git a/op-node/rollup/derive/channel_bank_test.go b/op-node/rollup/derive/channel_bank_test.go index 9f642591a9f5f..822bc059c666c 100644 --- a/op-node/rollup/derive/channel_bank_test.go +++ b/op-node/rollup/derive/channel_bank_test.go @@ -2,78 +2,63 @@ package derive import ( "bytes" + "context" "fmt" + "io" "math/rand" "strconv" "strings" "testing" - "github.com/stretchr/testify/require" - "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testutils" "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" ) -type MockChannelBankOutput struct { - MockOriginStage -} - -func (m *MockChannelBankOutput) WriteChannel(data []byte) { - m.MethodCalled("WriteChannel", data) +type fakeChannelBankInput struct { + origin eth.L1BlockRef + data []struct { + data []byte + err error + } } -func (m *MockChannelBankOutput) ExpectWriteChannel(data []byte) { - m.On("WriteChannel", data).Once().Return() +func (f *fakeChannelBankInput) Origin() eth.L1BlockRef { + return f.origin } -var _ ChannelBankOutput = (*MockChannelBankOutput)(nil) - -type bankTestSetup struct { - origins []eth.L1BlockRef - t *testing.T - rng *rand.Rand - cb *ChannelBank - out *MockChannelBankOutput - l1 *testutils.MockL1Source +func (f *fakeChannelBankInput) NextData(_ context.Context) ([]byte, error) { + out := f.data[0] + f.data = f.data[1:] + return out.data, out.err } -type channelBankTestCase struct { - name string - originTimes []uint64 - nextStartsAt int - channelTimeout uint64 - fn func(bt *bankTestSetup) +func (f *fakeChannelBankInput) AddOutput(data []byte, err error) { + f.data = append(f.data, struct { + data []byte + err error + }{data: data, err: err}) } -func (ct *channelBankTestCase) Run(t *testing.T) { - cfg := &rollup.Config{ - ChannelTimeout: ct.channelTimeout, - } - - bt := &bankTestSetup{ - t: t, - rng: rand.New(rand.NewSource(1234)), - l1: &testutils.MockL1Source{}, - } - - bt.origins = append(bt.origins, testutils.RandomBlockRef(bt.rng)) - for i := range ct.originTimes[1:] { - ref := testutils.NextRandomRef(bt.rng, bt.origins[i]) - bt.origins = append(bt.origins, ref) - } - for i, x := range ct.originTimes { - bt.origins[i].Time = x +// ExpectNextFrameData takes a set of test frame & turns into the raw data +// for reading into the channel bank via `NextData` +func (f *fakeChannelBankInput) AddFrames(frames ...testFrame) { + data := new(bytes.Buffer) + data.WriteByte(DerivationVersion0) + for _, frame := range frames { + ff := frame.ToFrame() + if err := ff.MarshalBinary(data); err != nil { + panic(fmt.Errorf("error in making frame during test: %w", err)) + } } - - bt.out = &MockChannelBankOutput{MockOriginStage{progress: Progress{Origin: bt.origins[ct.nextStartsAt], Closed: false}}} - bt.cb = NewChannelBank(testlog.Logger(t, log.LvlError), cfg, bt.out) - - ct.fn(bt) + f.AddOutput(data.Bytes(), nil) } +var _ NextDataProvider = (*fakeChannelBankInput)(nil) + // format: :: // example: "abc:0:helloworld!" type testFrame string @@ -113,153 +98,76 @@ func (tf testFrame) ToFrame() Frame { } } -func (bt *bankTestSetup) ingestData(data []byte) { - bt.cb.IngestData(data) -} +func TestChannelBankSimple(t *testing.T) { + rng := rand.New(rand.NewSource(1234)) + a := testutils.RandomBlockRef(rng) -func (bt *bankTestSetup) ingestFrames(frames ...testFrame) { - data := new(bytes.Buffer) - data.WriteByte(DerivationVersion0) - for _, fr := range frames { - f := fr.ToFrame() - if err := f.MarshalBinary(data); err != nil { - panic(fmt.Errorf("error in making frame during test: %w", err)) - } - } - bt.ingestData(data.Bytes()) -} -func (bt *bankTestSetup) repeatStep(max int, outer int, outerClosed bool, err error) { - require.Equal(bt.t, err, RepeatStep(bt.t, bt.cb.Step, Progress{Origin: bt.origins[outer], Closed: outerClosed}, max)) -} -func (bt *bankTestSetup) repeatResetStep(max int, err error) { - require.Equal(bt.t, err, RepeatResetStep(bt.t, bt.cb.ResetStep, bt.l1, max)) -} + input := &fakeChannelBankInput{origin: a} + input.AddFrames("a:0:first", "a:2:third!") + input.AddFrames("a:1:second") + input.AddOutput(nil, io.EOF) -func (bt *bankTestSetup) assertOrigin(i int) { - require.Equal(bt.t, bt.cb.progress.Origin, bt.origins[i]) -} -func (bt *bankTestSetup) assertOriginTime(x uint64) { - require.Equal(bt.t, x, bt.cb.progress.Origin.Time) -} -func (bt *bankTestSetup) expectChannel(data string) { - bt.out.ExpectWriteChannel([]byte(data)) -} -func (bt *bankTestSetup) expectL1BlockRefByNumber(i int) { - bt.l1.ExpectL1BlockRefByNumber(bt.origins[i].Number, bt.origins[i], nil) -} -func (bt *bankTestSetup) assertExpectations() { - bt.l1.AssertExpectations(bt.t) - bt.l1.ExpectedCalls = nil - bt.out.AssertExpectations(bt.t) - bt.out.ExpectedCalls = nil -} + cfg := &rollup.Config{ChannelTimeout: 10} -func TestL1ChannelBank(t *testing.T) { - testCases := []channelBankTestCase{ - { - name: "time outs and buffering", - originTimes: []uint64{0, 1, 2, 3, 4, 5}, - nextStartsAt: 3, // Start next stage at block #3 - channelTimeout: 2, // Start at block #1 - fn: func(bt *bankTestSetup) { - bt.expectL1BlockRefByNumber(1) - bt.repeatResetStep(10, nil) - bt.ingestFrames("a:0:first") // will time out b/c not closed + cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil) - bt.repeatStep(10, 1, true, nil) - bt.repeatStep(10, 2, false, nil) - bt.assertOrigin(2) + // Load the first + third frame + out, err := cb.NextData(context.Background()) + require.ErrorIs(t, err, NotEnoughData) + require.Equal(t, []byte(nil), out) - bt.repeatStep(10, 2, true, nil) - bt.repeatStep(10, 3, false, nil) - bt.assertOrigin(3) + // Load the second frame + out, err = cb.NextData(context.Background()) + require.ErrorIs(t, err, NotEnoughData) + require.Equal(t, []byte(nil), out) - bt.repeatStep(10, 3, true, nil) - bt.repeatStep(10, 4, false, nil) - bt.assertOrigin(4) + // Pull out the channel data + out, err = cb.NextData(context.Background()) + require.Nil(t, err) + require.Equal(t, "firstsecondthird", string(out)) - // Properly closed channel - bt.expectChannel("foobarclosed") - bt.ingestFrames("b:0:foobar") - bt.ingestFrames("b:1:closed!") - bt.repeatStep(10, 4, true, nil) - bt.assertExpectations() - }, - }, - { - name: "duplicate frames", - originTimes: []uint64{0, 1, 2, 3, 4, 5}, - nextStartsAt: 3, // Start next stage at block #3 - channelTimeout: 2, // Start at block #1c - fn: func(bt *bankTestSetup) { - bt.expectL1BlockRefByNumber(1) - bt.repeatResetStep(10, nil) - bt.ingestFrames("a:0:first") // will time out b/c not closed + // No more data + out, err = cb.NextData(context.Background()) + require.Nil(t, out) + require.Equal(t, io.EOF, err) +} - bt.repeatStep(10, 1, true, nil) - bt.repeatStep(10, 2, false, nil) - bt.assertOrigin(2) +func TestChannelBankDuplicates(t *testing.T) { + rng := rand.New(rand.NewSource(1234)) + a := testutils.RandomBlockRef(rng) - bt.repeatStep(10, 2, true, nil) - bt.repeatStep(10, 3, false, nil) - bt.assertOrigin(3) + input := &fakeChannelBankInput{origin: a} + input.AddFrames("a:0:first", "a:2:third!") + input.AddFrames("a:0:altfirst", "a:2:altthird!") + input.AddFrames("a:1:second") + input.AddOutput(nil, io.EOF) - bt.repeatStep(10, 3, true, nil) - bt.repeatStep(10, 4, false, nil) - bt.assertOrigin(4) + cfg := &rollup.Config{ChannelTimeout: 10} - bt.ingestFrames("a:0:first") - bt.repeatStep(1, 4, false, nil) - bt.ingestFrames("a:1:second") - bt.repeatStep(1, 4, false, nil) - bt.ingestFrames("a:0:altfirst") // ignored as duplicate - bt.repeatStep(1, 4, false, nil) - bt.ingestFrames("a:1:altsecond") // ignored as duplicate - bt.repeatStep(1, 4, false, nil) - bt.ingestFrames("b:0:new") - bt.repeatStep(1, 4, false, nil) + cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil) - // close origin 4 - bt.repeatStep(2, 4, true, nil) + // Load the first + third frame + out, err := cb.NextData(context.Background()) + require.ErrorIs(t, err, NotEnoughData) + require.Equal(t, []byte(nil), out) - // open origin 1 - bt.repeatStep(2, 5, false, nil) - bt.ingestFrames("b:1:hi!") // close the other one first, but blocked - bt.repeatStep(1, 5, false, nil) - bt.ingestFrames("a:2:!") // empty closing frame - bt.expectChannel("firstsecond") - bt.expectChannel("newhi") - bt.repeatStep(5, 5, false, nil) - bt.assertExpectations() - }, - }, - { - name: "skip bad frames", - originTimes: []uint64{101, 102}, - nextStartsAt: 0, - channelTimeout: 3, - fn: func(bt *bankTestSetup) { - // don't do the whole setup process, just override where the stages are - bt.cb.progress = Progress{Origin: bt.origins[0], Closed: false} - bt.out.progress = Progress{Origin: bt.origins[0], Closed: false} + // Load the duplicate frames + out, err = cb.NextData(context.Background()) + require.ErrorIs(t, err, NotEnoughData) + require.Equal(t, []byte(nil), out) - bt.assertOriginTime(101) + // Load the second frame + out, err = cb.NextData(context.Background()) + require.ErrorIs(t, err, NotEnoughData) + require.Equal(t, []byte(nil), out) - badTx := new(bytes.Buffer) - badTx.WriteByte(DerivationVersion0) - goodFrame := testFrame("a:0:helloworld!").ToFrame() - if err := goodFrame.MarshalBinary(badTx); err != nil { - panic(fmt.Errorf("error in marshalling frame: %w", err)) - } - badTx.Write(testutils.RandomData(bt.rng, 30)) // incomplete frame data - bt.ingestData(badTx.Bytes()) - // Expect the bad frame to render the entire chunk invalid. - bt.repeatStep(2, 0, false, nil) - bt.assertExpectations() - }, - }, - } - for _, testCase := range testCases { - t.Run(testCase.name, testCase.Run) - } + // Pull out the channel data. Expect to see the original set & not the duplicates + out, err = cb.NextData(context.Background()) + require.Nil(t, err) + require.Equal(t, "firstsecondthird", string(out)) + + // No more data + out, err = cb.NextData(context.Background()) + require.Nil(t, out) + require.Equal(t, io.EOF, err) } diff --git a/op-node/rollup/derive/channel_in_reader.go b/op-node/rollup/derive/channel_in_reader.go index da96058f0b55d..9865dacd5aa7f 100644 --- a/op-node/rollup/derive/channel_in_reader.go +++ b/op-node/rollup/derive/channel_in_reader.go @@ -5,6 +5,7 @@ import ( "context" "io" + "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum/go-ethereum/log" ) @@ -13,41 +14,36 @@ import ( // This is a pure function from the channel, but each channel (or channel fragment) // must be tagged with an L1 inclusion block to be passed to the the batch queue. -type BatchQueueStage interface { - StageProgress - AddBatch(batch *BatchData) -} - type ChannelInReader struct { log log.Logger nextBatchFn func() (BatchWithL1InclusionBlock, error) - progress Progress - - next BatchQueueStage + prev *ChannelBank } -var _ ChannelBankOutput = (*ChannelInReader)(nil) +var _ PullStage = (*ChannelInReader)(nil) // NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use. -func NewChannelInReader(log log.Logger, next BatchQueueStage) *ChannelInReader { - return &ChannelInReader{log: log, next: next} +func NewChannelInReader(log log.Logger, prev *ChannelBank) *ChannelInReader { + return &ChannelInReader{ + log: log, + prev: prev, + } } -func (cr *ChannelInReader) Progress() Progress { - return cr.progress +func (cr *ChannelInReader) Origin() eth.L1BlockRef { + return cr.prev.Origin() } // TODO: Take full channel for better logging -func (cr *ChannelInReader) WriteChannel(data []byte) { - if cr.progress.Closed { - panic("write channel while closed") - } - if f, err := BatchReader(bytes.NewBuffer(data), cr.progress.Origin); err == nil { +func (cr *ChannelInReader) WriteChannel(data []byte) error { + if f, err := BatchReader(bytes.NewBuffer(data), cr.Origin()); err == nil { cr.nextBatchFn = f + return nil } else { cr.log.Error("Error creating batch reader from channel data", "err", err) + return err } } @@ -57,32 +53,37 @@ func (cr *ChannelInReader) NextChannel() { cr.nextBatchFn = nil } -func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error { - if changed, err := cr.progress.Update(outer); err != nil || changed { - return err - } - +// NextBatch pulls out the next batch from the channel if it has it. +// It returns io.EOF when it cannot make any more progress. +// It will return a temporary error if it needs to be called again to advance some internal state. +func (cr *ChannelInReader) NextBatch(ctx context.Context) (*BatchData, error) { if cr.nextBatchFn == nil { - return io.EOF + if data, err := cr.prev.NextData(ctx); err == io.EOF { + return nil, io.EOF + } else if err != nil { + return nil, err + } else { + if err := cr.WriteChannel(data); err != nil { + return nil, NewTemporaryError(err) + } + } } // TODO: can batch be non nil while err == io.EOF // This depends on the behavior of rlp.Stream batch, err := cr.nextBatchFn() - if err == io.EOF { - return io.EOF + cr.NextChannel() + return nil, NotEnoughData } else if err != nil { cr.log.Warn("failed to read batch from channel reader, skipping to next channel now", "err", err) cr.NextChannel() - return nil + return nil, NotEnoughData } - cr.next.AddBatch(batch.Batch) - return nil + return batch.Batch, nil } -func (cr *ChannelInReader) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { +func (cr *ChannelInReader) Reset(ctx context.Context, _ eth.L1BlockRef) error { cr.nextBatchFn = nil - cr.progress = cr.next.Progress() return io.EOF } diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index dbdffec809cc1..b585619dde36b 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -17,6 +17,11 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type NextAttributesProvider interface { + Origin() eth.L1BlockRef + NextAttributes(context.Context, eth.L2BlockRef) (*eth.PayloadAttributes, error) +} + type Engine interface { GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) @@ -64,8 +69,6 @@ type EngineQueue struct { finalizedL1 eth.BlockID - progress Progress - safeAttributes []*eth.PayloadAttributes unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps @@ -73,14 +76,15 @@ type EngineQueue struct { finalityData []FinalityData engine Engine + prev NextAttributesProvider + + progress Progress // only used for pipeline resets metrics Metrics } -var _ AttributesQueueOutput = (*EngineQueue)(nil) - // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. -func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue { +func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider) *EngineQueue { return &EngineQueue{ log: log, cfg: cfg, @@ -91,6 +95,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M MaxSize: maxUnsafePayloadsMemory, SizeFn: payloadMemSize, }, + prev: prev, } } @@ -146,17 +151,30 @@ func (eq *EngineQueue) LastL2Time() uint64 { return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp) } -func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error { - if changed, err := eq.progress.Update(outer); err != nil || changed { - return err - } +func (eq *EngineQueue) Step(ctx context.Context, _ Progress) error { if len(eq.safeAttributes) > 0 { return eq.tryNextSafeAttributes(ctx) } + outOfData := false + if len(eq.safeAttributes) == 0 { + if next, err := eq.prev.NextAttributes(ctx, eq.safeHead); err == io.EOF { + outOfData = true + } else if err != nil { + return err + } else { + eq.safeAttributes = append(eq.safeAttributes, next) + return NotEnoughData + } + } if eq.unsafePayloads.Len() > 0 { return eq.tryNextUnsafePayload(ctx) } - return io.EOF + + if outOfData { + return io.EOF + } else { + return nil + } } // tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized, @@ -186,11 +204,11 @@ func (eq *EngineQueue) postProcessSafeL2() { eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...) } // remember the last L2 block that we fully derived from the given finality data - if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.progress.Origin.Number { + if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.prev.Origin().Number { // append entry for new L1 block eq.finalityData = append(eq.finalityData, FinalityData{ L2Block: eq.safeHead, - L1Block: eq.progress.Origin.ID(), + L1Block: eq.prev.Origin().ID(), }) } else { // if it's a now L2 block that was derived from the same latest L1 block, then just update the entry @@ -205,7 +223,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) { "l2_safe", eq.safeHead, "l2_unsafe", eq.unsafeHead, "l2_time", eq.unsafeHead.Time, - "l1_derived", eq.progress.Origin, + "l1_derived", eq.prev.Origin(), ) } @@ -398,6 +416,15 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error return NewResetError(fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken", safe, safe.Time, l1Origin, l1Origin.Time)) } + + pipelineNumber := l1Origin.Number - eq.cfg.ChannelTimeout + if l1Origin.Number < eq.cfg.ChannelTimeout { + pipelineNumber = 0 + } + pipelineOrigin, err := l1Fetcher.L1BlockRefByNumber(ctx, pipelineNumber) + if err != nil { + return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", pipelineNumber, err)) + } eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin) eq.unsafeHead = unsafe eq.safeHead = safe @@ -405,8 +432,7 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error eq.finalityData = eq.finalityData[:0] // note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads. eq.progress = Progress{ - Origin: l1Origin, - Closed: false, + Origin: pipelineOrigin, } eq.metrics.RecordL2Ref("l2_finalized", finalized) eq.metrics.RecordL2Ref("l2_safe", safe) diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go index 9506bc0733214..800c5c7736fe8 100644 --- a/op-node/rollup/derive/engine_queue_test.go +++ b/op-node/rollup/derive/engine_queue_test.go @@ -1,6 +1,8 @@ package derive import ( + "context" + "io" "math/rand" "testing" @@ -14,6 +16,20 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type fakeAttributesQueue struct { + origin eth.L1BlockRef +} + +func (f *fakeAttributesQueue) Origin() eth.L1BlockRef { + return f.origin +} + +func (f *fakeAttributesQueue) NextAttributes(_ context.Context, _ eth.L2BlockRef) (*eth.PayloadAttributes, error) { + return nil, io.EOF +} + +var _ NextAttributesProvider = (*fakeAttributesQueue)(nil) + func TestEngineQueue_Finalize(t *testing.T) { logger := testlog.Logger(t, log.LvlInfo) @@ -209,9 +225,12 @@ func TestEngineQueue_Finalize(t *testing.T) { // and we fetch the L1 origin of that as starting point for engine queue l1F.ExpectL1BlockRefByHash(refB.Hash, refB, nil) + l1F.ExpectL1BlockRefByNumber(refB.Number, refB, nil) + + prev := &fakeAttributesQueue{} - eq := NewEngineQueue(logger, cfg, eng, metrics) - require.NoError(t, RepeatResetStep(t, eq.ResetStep, l1F, 20)) + eq := NewEngineQueue(logger, cfg, eng, metrics, prev) + require.ErrorIs(t, eq.ResetStep(context.Background(), l1F), io.EOF) require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B") @@ -219,20 +238,19 @@ func TestEngineQueue_Finalize(t *testing.T) { // now say C1 was included in D and became the new safe head eq.progress.Origin = refD + prev.origin = refD eq.safeHead = refC1 eq.postProcessSafeL2() // now say D0 was included in E and became the new safe head eq.progress.Origin = refE + prev.origin = refE eq.safeHead = refD0 eq.postProcessSafeL2() // let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E) eq.Finalize(refD.ID()) - // Now a few steps later, without consuming any additional L1 inputs, - // we should be able to resolve that B1 is now finalized, since it was included in finalized L1 block C - require.NoError(t, RepeatStep(t, eq.Step, eq.progress, 10)) require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized") l1F.AssertExpectations(t) diff --git a/op-node/rollup/derive/error.go b/op-node/rollup/derive/error.go index 893e0fad1b8a7..ef896d2fa677e 100644 --- a/op-node/rollup/derive/error.go +++ b/op-node/rollup/derive/error.go @@ -1,6 +1,7 @@ package derive import ( + "errors" "fmt" ) @@ -91,3 +92,7 @@ func NewCriticalError(err error) error { var ErrTemporary = NewTemporaryError(nil) var ErrReset = NewResetError(nil) var ErrCritical = NewCriticalError(nil) + +// NotEnoughData implies that the function currently does not have enough data to progress +// but if it is retried enough times, it will eventually return a real value or io.EOF +var NotEnoughData = errors.New("not enough data") diff --git a/op-node/rollup/derive/l1_retrieval.go b/op-node/rollup/derive/l1_retrieval.go index c20ef71d8fc4b..dda4be6434bac 100644 --- a/op-node/rollup/derive/l1_retrieval.go +++ b/op-node/rollup/derive/l1_retrieval.go @@ -8,85 +8,69 @@ import ( "github.com/ethereum/go-ethereum/log" ) -type L1SourceOutput interface { - StageProgress - IngestData(data []byte) -} - type DataAvailabilitySource interface { OpenData(ctx context.Context, id eth.BlockID) DataIter } type NextBlockProvider interface { NextL1Block(context.Context) (eth.L1BlockRef, error) + Origin() eth.L1BlockRef } type L1Retrieval struct { log log.Logger dataSrc DataAvailabilitySource - next L1SourceOutput prev NextBlockProvider - progress Progress - datas DataIter } -var _ Stage = (*L1Retrieval)(nil) +var _ PullStage = (*L1Retrieval)(nil) -func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, next L1SourceOutput, prev NextBlockProvider) *L1Retrieval { +func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, prev NextBlockProvider) *L1Retrieval { return &L1Retrieval{ log: log, dataSrc: dataSrc, - next: next, prev: prev, } } -func (l1r *L1Retrieval) Progress() Progress { - return l1r.progress +func (l1r *L1Retrieval) Origin() eth.L1BlockRef { + return l1r.prev.Origin() } -// Step does an action in the L1 Retrieval stage +// NextData does an action in the L1 Retrieval stage // If there is data, it pushes it to the next stage. // If there is no more data open ourselves if we are closed or close ourselves if we are open -func (l1r *L1Retrieval) Step(ctx context.Context, _ Progress) error { - if l1r.datas != nil { - l1r.log.Debug("fetching next piece of data") - data, err := l1r.datas.Next(ctx) +func (l1r *L1Retrieval) NextData(ctx context.Context) ([]byte, error) { + if l1r.datas == nil { + next, err := l1r.prev.NextL1Block(ctx) if err == io.EOF { - l1r.datas = nil - return io.EOF + return nil, io.EOF } else if err != nil { - return err - } else { - l1r.next.IngestData(data) - return nil + return nil, err } + l1r.datas = l1r.dataSrc.OpenData(ctx, next.ID()) + } + + l1r.log.Debug("fetching next piece of data") + data, err := l1r.datas.Next(ctx) + if err == io.EOF { + l1r.datas = nil + return nil, io.EOF + } else if err != nil { + // CalldataSource appropriately wraps the error so avoid double wrapping errors here. + return nil, err } else { - if l1r.progress.Closed { - next, err := l1r.prev.NextL1Block(ctx) - if err == io.EOF { - return io.EOF - } else if err != nil { - return err - } - l1r.datas = l1r.dataSrc.OpenData(ctx, next.ID()) - l1r.progress.Origin = next - l1r.progress.Closed = false - } else { - l1r.progress.Closed = true - } - return nil + return data, nil } } // ResetStep re-initializes the L1 Retrieval stage to block of it's `next` progress. // Note that we open up the `l1r.datas` here because it is requires to maintain the // internal invariants that later propagate up the derivation pipeline. -func (l1r *L1Retrieval) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - l1r.progress = l1r.next.Progress() - l1r.datas = l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID()) - l1r.log.Info("Reset of L1Retrieval done", "origin", l1r.progress.Origin) +func (l1r *L1Retrieval) Reset(ctx context.Context, base eth.L1BlockRef) error { + l1r.datas = l1r.dataSrc.OpenData(ctx, base.ID()) + l1r.log.Info("Reset of L1Retrieval done", "origin", base) return io.EOF } diff --git a/op-node/rollup/derive/l1_retrieval_test.go b/op-node/rollup/derive/l1_retrieval_test.go index 8e6209f0b99f2..5e58fc8102fed 100644 --- a/op-node/rollup/derive/l1_retrieval_test.go +++ b/op-node/rollup/derive/l1_retrieval_test.go @@ -12,21 +12,21 @@ import ( "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testutils" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" ) type fakeDataIter struct { + idx int data []eth.Data + errs []error } func (cs *fakeDataIter) Next(ctx context.Context) (eth.Data, error) { - if len(cs.data) == 0 { - return nil, io.EOF - } else { - data := cs.data[0] - cs.data = cs.data[1:] - return data, nil - } + i := cs.idx + cs.idx += 1 + return cs.data[i], cs.errs[i] } type MockDataSource struct { @@ -38,8 +38,8 @@ func (m *MockDataSource) OpenData(ctx context.Context, id eth.BlockID) DataIter return out[0].(DataIter) } -func (m *MockDataSource) ExpectOpenData(id eth.BlockID, iter DataIter, err error) { - m.Mock.On("OpenData", id).Return(iter, &err) +func (m *MockDataSource) ExpectOpenData(id eth.BlockID, iter DataIter) { + m.Mock.On("OpenData", id).Return(iter) } var _ DataAvailabilitySource = (*MockDataSource)(nil) @@ -48,57 +48,109 @@ type MockL1Traversal struct { mock.Mock } -func (m *MockL1Traversal) NextL1Block(_ context.Context) (eth.L1BlockRef, error) { - out := m.Mock.MethodCalled("NextL1Block") - return out[0].(eth.L1BlockRef), out[1].(error) +func (m *MockL1Traversal) Origin() eth.L1BlockRef { + out := m.Mock.MethodCalled("Origin") + return out[0].(eth.L1BlockRef) } -func (m *MockL1Traversal) ExpectNextL1Block(block eth.L1BlockRef, err error) { - m.Mock.On("NextL1Block").Return(block, err) +func (m *MockL1Traversal) ExpectOrigin(block eth.L1BlockRef) { + m.Mock.On("Origin").Return(block) } -type MockIngestData struct { - MockOriginStage -} - -func (im *MockIngestData) IngestData(data []byte) { - im.Mock.MethodCalled("IngestData", data) +func (m *MockL1Traversal) NextL1Block(_ context.Context) (eth.L1BlockRef, error) { + out := m.Mock.MethodCalled("NextL1Block") + return out[0].(eth.L1BlockRef), *out[1].(*error) } -func (im *MockIngestData) ExpectIngestData(data []byte) { - im.Mock.On("IngestData", data).Return() +func (m *MockL1Traversal) ExpectNextL1Block(block eth.L1BlockRef, err error) { + m.Mock.On("NextL1Block").Return(block, &err) } -var _ L1SourceOutput = (*MockIngestData)(nil) +var _ NextBlockProvider = (*MockL1Traversal)(nil) -func TestL1Retrieval_Step(t *testing.T) { +// TestL1RetrievalReset tests the reset. The reset just opens up a new +// data for the specified block. +func TestL1RetrievalReset(t *testing.T) { rng := rand.New(rand.NewSource(1234)) - - next := &MockIngestData{MockOriginStage{progress: Progress{Origin: testutils.RandomBlockRef(rng), Closed: true}}} dataSrc := &MockDataSource{} - prev := &MockL1Traversal{} - - a := testutils.RandomData(rng, 10) - b := testutils.RandomData(rng, 15) - iter := &fakeDataIter{data: []eth.Data{a, b}} - - outer := next.progress + a := testutils.RandomBlockRef(rng) - // mock some L1 data to open for the origin that is opened by the outer stage - dataSrc.ExpectOpenData(outer.Origin.ID(), iter, nil) + dataSrc.ExpectOpenData(a.ID(), &fakeDataIter{}) + defer dataSrc.AssertExpectations(t) - next.ExpectIngestData(a) - next.ExpectIngestData(b) + l1r := NewL1Retrieval(testlog.Logger(t, log.LvlError), dataSrc, nil) - defer dataSrc.AssertExpectations(t) - defer next.AssertExpectations(t) + // We assert that it opens up the correct data on a reset + _ = l1r.Reset(context.Background(), a) +} - l1r := NewL1Retrieval(testlog.Logger(t, log.LvlError), dataSrc, next, prev) +// TestL1RetrievalNextData tests that the `NextData` function properly +// handles different error cases and returns the expected data +// if there is no error. +func TestL1RetrievalNextData(t *testing.T) { + rng := rand.New(rand.NewSource(1234)) + a := testutils.RandomBlockRef(rng) + + tests := []struct { + name string + prevBlock eth.L1BlockRef + prevErr error // error returned by prev.NextL1Block + openErr error // error returned by NextData if prev.NextL1Block fails + datas []eth.Data + datasErrs []error + expectedErrs []error + }{ + { + name: "simple retrieval", + prevBlock: a, + prevErr: nil, + openErr: nil, + datas: []eth.Data{testutils.RandomData(rng, 10), testutils.RandomData(rng, 10), testutils.RandomData(rng, 10), nil}, + datasErrs: []error{nil, nil, nil, io.EOF}, + expectedErrs: []error{nil, nil, nil, io.EOF}, + }, + { + name: "out of data", + prevErr: io.EOF, + openErr: io.EOF, + }, + { + name: "fail to open data", + prevBlock: a, + prevErr: nil, + openErr: nil, + datas: []eth.Data{nil}, + datasErrs: []error{NewCriticalError(ethereum.NotFound)}, + expectedErrs: []error{ErrCritical}, + }, + } - // first we expect the stage to reset to the origin of the inner stage - require.NoError(t, RepeatResetStep(t, l1r.ResetStep, nil, 1)) - require.Equal(t, next.Progress(), l1r.Progress(), "stage needs to adopt the progress of next stage on reset") + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + l1t := &MockL1Traversal{} + l1t.ExpectNextL1Block(test.prevBlock, test.prevErr) + dataSrc := &MockDataSource{} + dataSrc.ExpectOpenData(test.prevBlock.ID(), &fakeDataIter{data: test.datas, errs: test.datasErrs}) + + ret := NewL1Retrieval(testlog.Logger(t, log.LvlCrit), dataSrc, l1t) + + // If prevErr != nil we forced an error while getting data from the previous stage + if test.openErr != nil { + data, err := ret.NextData(context.Background()) + require.Nil(t, data) + require.ErrorIs(t, err, test.openErr) + } + + // Go through the fake data an assert that data is passed through and the correct + // errors are returned. + for i := range test.expectedErrs { + data, err := ret.NextData(context.Background()) + require.Equal(t, test.datas[i], hexutil.Bytes(data)) + require.ErrorIs(t, err, test.expectedErrs[i]) + } + + l1t.AssertExpectations(t) + }) + } - // and then start processing the data of the next stage - require.NoError(t, RepeatStep(t, l1r.Step, outer, 10)) } diff --git a/op-node/rollup/derive/l1_traversal.go b/op-node/rollup/derive/l1_traversal.go index 52d0436db20d0..db19eed5a94db 100644 --- a/op-node/rollup/derive/l1_traversal.go +++ b/op-node/rollup/derive/l1_traversal.go @@ -33,6 +33,10 @@ func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher) *L1Trave } } +func (l1t *L1Traversal) Origin() eth.L1BlockRef { + return l1t.block +} + // NextL1Block returns the next block. It does not advance, but it can only be // called once before returning io.EOF func (l1t *L1Traversal) NextL1Block(_ context.Context) (eth.L1BlockRef, error) { diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 772e1b8579897..bc8fe63d78009 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -96,19 +96,18 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch // Pull stages l1Traversal := NewL1Traversal(log, l1Fetcher) + dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) // auxiliary stage for L1Retrieval + l1Src := NewL1Retrieval(log, dataSrc, l1Traversal) + bank := NewChannelBank(log, cfg, l1Src, l1Fetcher) + chInReader := NewChannelInReader(log, bank) + batchQueue := NewBatchQueue(log, cfg, chInReader) + attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue) // Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages) - eng := NewEngineQueue(log, cfg, engine, metrics) - attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng) - batchQueue := NewBatchQueue(log, cfg, attributesQueue) - chInReader := NewChannelInReader(log, batchQueue) - bank := NewChannelBank(log, cfg, chInReader) + eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue) - dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) - l1Src := NewL1Retrieval(log, dataSrc, bank, l1Traversal) - - stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank, l1Src} - pullStages := []PullStage{l1Traversal} + stages := []Stage{eng} + pullStages := []PullStage{attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal} return &DerivationPipeline{ log: log, diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index cd2bdd2fe5abf..3538cb6d8344f 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -440,6 +440,10 @@ func (s *state) eventLoop() { } else if err != nil && errors.Is(err, derive.ErrCritical) { s.log.Error("Derivation process critical error", "err", err) return + } else if err != nil && errors.Is(err, derive.NotEnoughData) { + stepAttempts = 0 // don't do a backoff for this error + reqStep() + continue } else if err != nil { s.log.Error("Derivation process error", "attempts", stepAttempts, "err", err) reqStep()