From 5aa961dd72b64536f84646608c784d74a7c78b88 Mon Sep 17 00:00:00 2001 From: Joshua Gutow Date: Tue, 27 Sep 2022 16:35:17 -0700 Subject: [PATCH 1/6] op-node: Switch L1 Retrieval to pull based This makes the L1 Retrieval a purely pull based stage. This commit required large modifications to the channel bank stage in order for the channel bank to maintain it's own progress. --- op-node/rollup/derive/channel_bank.go | 48 +++++-- op-node/rollup/derive/channel_bank_test.go | 5 +- op-node/rollup/derive/l1_retrieval.go | 68 ++++------ op-node/rollup/derive/l1_retrieval_test.go | 142 ++++++++++++++------- op-node/rollup/derive/l1_traversal.go | 4 + op-node/rollup/derive/pipeline.go | 11 +- 6 files changed, 174 insertions(+), 104 deletions(-) diff --git a/op-node/rollup/derive/channel_bank.go b/op-node/rollup/derive/channel_bank.go index 00d603469b17a..e3050112e6f8d 100644 --- a/op-node/rollup/derive/channel_bank.go +++ b/op-node/rollup/derive/channel_bank.go @@ -38,18 +38,20 @@ type ChannelBank struct { progress Progress next ChannelBankOutput + prev *L1Retrieval } var _ Stage = (*ChannelBank)(nil) // NewChannelBank creates a ChannelBank, which should be Reset(origin) before use. -func NewChannelBank(log log.Logger, cfg *rollup.Config, next ChannelBankOutput) *ChannelBank { +func NewChannelBank(log log.Logger, cfg *rollup.Config, next ChannelBankOutput, prev *L1Retrieval) *ChannelBank { return &ChannelBank{ log: log, cfg: cfg, channels: make(map[ChannelID]*Channel), channelQueue: make([]ChannelID, 0, 10), next: next, + prev: prev, } } @@ -141,26 +143,52 @@ func (ib *ChannelBank) Read() (data []byte, err error) { return data, nil } -func (ib *ChannelBank) Step(ctx context.Context, outer Progress) error { - if changed, err := ib.progress.Update(outer); err != nil || changed { - return err +// Step does the advancement for the channel bank. +// Channel bank as the first non-pull stage does it's own progress maintentance. +// When closed, it checks against the previous origin to determine if to open itself +func (ib *ChannelBank) Step(ctx context.Context, _ Progress) error { + // Open ourselves + // This is ok to do b/c we would not have yielded control to the lower stages + // of the pipeline without being completely done reading from L1. + if ib.progress.Closed { + if ib.progress.Origin != ib.prev.Origin() { + ib.progress.Closed = false + ib.progress.Origin = ib.prev.Origin() + return nil + } } - // If the bank is behind the channel reader, then we are replaying old data to prepare the bank. - // Read if we can, and drop if it gives anything - if ib.next.Progress().Origin.Number > ib.progress.Origin.Number { - _, err := ib.Read() + skipIngest := ib.next.Progress().Origin.Number > ib.progress.Origin.Number + outOfData := false + + if data, err := ib.prev.NextData(ctx); err == io.EOF { + outOfData = true + } else if err != nil { return err + } else { + ib.IngestData(data) } // otherwise, read the next channel data from the bank data, err := ib.Read() if err == io.EOF { // need new L1 data in the bank before we can read more channel data - return io.EOF + if outOfData { + if !ib.progress.Closed { + ib.progress.Closed = true + return nil + } + return io.EOF + } else { + return nil + } } else if err != nil { return err + } else { + if !skipIngest { + ib.next.WriteChannel(data) + return nil + } } - ib.next.WriteChannel(data) return nil } diff --git a/op-node/rollup/derive/channel_bank_test.go b/op-node/rollup/derive/channel_bank_test.go index 9f642591a9f5f..dfa62a25266d3 100644 --- a/op-node/rollup/derive/channel_bank_test.go +++ b/op-node/rollup/derive/channel_bank_test.go @@ -40,6 +40,7 @@ type bankTestSetup struct { l1 *testutils.MockL1Source } +// nolint - this is getting picked up b/c of a t.Skip that will go away type channelBankTestCase struct { name string originTimes []uint64 @@ -48,6 +49,7 @@ type channelBankTestCase struct { fn func(bt *bankTestSetup) } +// nolint func (ct *channelBankTestCase) Run(t *testing.T) { cfg := &rollup.Config{ ChannelTimeout: ct.channelTimeout, @@ -69,7 +71,7 @@ func (ct *channelBankTestCase) Run(t *testing.T) { } bt.out = &MockChannelBankOutput{MockOriginStage{progress: Progress{Origin: bt.origins[ct.nextStartsAt], Closed: false}}} - bt.cb = NewChannelBank(testlog.Logger(t, log.LvlError), cfg, bt.out) + bt.cb = NewChannelBank(testlog.Logger(t, log.LvlError), cfg, bt.out, nil) ct.fn(bt) } @@ -155,6 +157,7 @@ func (bt *bankTestSetup) assertExpectations() { } func TestL1ChannelBank(t *testing.T) { + t.Skip("broken b/c the fake L1Retrieval is not yet built") testCases := []channelBankTestCase{ { name: "time outs and buffering", diff --git a/op-node/rollup/derive/l1_retrieval.go b/op-node/rollup/derive/l1_retrieval.go index c20ef71d8fc4b..dda4be6434bac 100644 --- a/op-node/rollup/derive/l1_retrieval.go +++ b/op-node/rollup/derive/l1_retrieval.go @@ -8,85 +8,69 @@ import ( "github.com/ethereum/go-ethereum/log" ) -type L1SourceOutput interface { - StageProgress - IngestData(data []byte) -} - type DataAvailabilitySource interface { OpenData(ctx context.Context, id eth.BlockID) DataIter } type NextBlockProvider interface { NextL1Block(context.Context) (eth.L1BlockRef, error) + Origin() eth.L1BlockRef } type L1Retrieval struct { log log.Logger dataSrc DataAvailabilitySource - next L1SourceOutput prev NextBlockProvider - progress Progress - datas DataIter } -var _ Stage = (*L1Retrieval)(nil) +var _ PullStage = (*L1Retrieval)(nil) -func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, next L1SourceOutput, prev NextBlockProvider) *L1Retrieval { +func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, prev NextBlockProvider) *L1Retrieval { return &L1Retrieval{ log: log, dataSrc: dataSrc, - next: next, prev: prev, } } -func (l1r *L1Retrieval) Progress() Progress { - return l1r.progress +func (l1r *L1Retrieval) Origin() eth.L1BlockRef { + return l1r.prev.Origin() } -// Step does an action in the L1 Retrieval stage +// NextData does an action in the L1 Retrieval stage // If there is data, it pushes it to the next stage. // If there is no more data open ourselves if we are closed or close ourselves if we are open -func (l1r *L1Retrieval) Step(ctx context.Context, _ Progress) error { - if l1r.datas != nil { - l1r.log.Debug("fetching next piece of data") - data, err := l1r.datas.Next(ctx) +func (l1r *L1Retrieval) NextData(ctx context.Context) ([]byte, error) { + if l1r.datas == nil { + next, err := l1r.prev.NextL1Block(ctx) if err == io.EOF { - l1r.datas = nil - return io.EOF + return nil, io.EOF } else if err != nil { - return err - } else { - l1r.next.IngestData(data) - return nil + return nil, err } + l1r.datas = l1r.dataSrc.OpenData(ctx, next.ID()) + } + + l1r.log.Debug("fetching next piece of data") + data, err := l1r.datas.Next(ctx) + if err == io.EOF { + l1r.datas = nil + return nil, io.EOF + } else if err != nil { + // CalldataSource appropriately wraps the error so avoid double wrapping errors here. + return nil, err } else { - if l1r.progress.Closed { - next, err := l1r.prev.NextL1Block(ctx) - if err == io.EOF { - return io.EOF - } else if err != nil { - return err - } - l1r.datas = l1r.dataSrc.OpenData(ctx, next.ID()) - l1r.progress.Origin = next - l1r.progress.Closed = false - } else { - l1r.progress.Closed = true - } - return nil + return data, nil } } // ResetStep re-initializes the L1 Retrieval stage to block of it's `next` progress. // Note that we open up the `l1r.datas` here because it is requires to maintain the // internal invariants that later propagate up the derivation pipeline. -func (l1r *L1Retrieval) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - l1r.progress = l1r.next.Progress() - l1r.datas = l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID()) - l1r.log.Info("Reset of L1Retrieval done", "origin", l1r.progress.Origin) +func (l1r *L1Retrieval) Reset(ctx context.Context, base eth.L1BlockRef) error { + l1r.datas = l1r.dataSrc.OpenData(ctx, base.ID()) + l1r.log.Info("Reset of L1Retrieval done", "origin", base) return io.EOF } diff --git a/op-node/rollup/derive/l1_retrieval_test.go b/op-node/rollup/derive/l1_retrieval_test.go index 8e6209f0b99f2..5e58fc8102fed 100644 --- a/op-node/rollup/derive/l1_retrieval_test.go +++ b/op-node/rollup/derive/l1_retrieval_test.go @@ -12,21 +12,21 @@ import ( "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testutils" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" ) type fakeDataIter struct { + idx int data []eth.Data + errs []error } func (cs *fakeDataIter) Next(ctx context.Context) (eth.Data, error) { - if len(cs.data) == 0 { - return nil, io.EOF - } else { - data := cs.data[0] - cs.data = cs.data[1:] - return data, nil - } + i := cs.idx + cs.idx += 1 + return cs.data[i], cs.errs[i] } type MockDataSource struct { @@ -38,8 +38,8 @@ func (m *MockDataSource) OpenData(ctx context.Context, id eth.BlockID) DataIter return out[0].(DataIter) } -func (m *MockDataSource) ExpectOpenData(id eth.BlockID, iter DataIter, err error) { - m.Mock.On("OpenData", id).Return(iter, &err) +func (m *MockDataSource) ExpectOpenData(id eth.BlockID, iter DataIter) { + m.Mock.On("OpenData", id).Return(iter) } var _ DataAvailabilitySource = (*MockDataSource)(nil) @@ -48,57 +48,109 @@ type MockL1Traversal struct { mock.Mock } -func (m *MockL1Traversal) NextL1Block(_ context.Context) (eth.L1BlockRef, error) { - out := m.Mock.MethodCalled("NextL1Block") - return out[0].(eth.L1BlockRef), out[1].(error) +func (m *MockL1Traversal) Origin() eth.L1BlockRef { + out := m.Mock.MethodCalled("Origin") + return out[0].(eth.L1BlockRef) } -func (m *MockL1Traversal) ExpectNextL1Block(block eth.L1BlockRef, err error) { - m.Mock.On("NextL1Block").Return(block, err) +func (m *MockL1Traversal) ExpectOrigin(block eth.L1BlockRef) { + m.Mock.On("Origin").Return(block) } -type MockIngestData struct { - MockOriginStage -} - -func (im *MockIngestData) IngestData(data []byte) { - im.Mock.MethodCalled("IngestData", data) +func (m *MockL1Traversal) NextL1Block(_ context.Context) (eth.L1BlockRef, error) { + out := m.Mock.MethodCalled("NextL1Block") + return out[0].(eth.L1BlockRef), *out[1].(*error) } -func (im *MockIngestData) ExpectIngestData(data []byte) { - im.Mock.On("IngestData", data).Return() +func (m *MockL1Traversal) ExpectNextL1Block(block eth.L1BlockRef, err error) { + m.Mock.On("NextL1Block").Return(block, &err) } -var _ L1SourceOutput = (*MockIngestData)(nil) +var _ NextBlockProvider = (*MockL1Traversal)(nil) -func TestL1Retrieval_Step(t *testing.T) { +// TestL1RetrievalReset tests the reset. The reset just opens up a new +// data for the specified block. +func TestL1RetrievalReset(t *testing.T) { rng := rand.New(rand.NewSource(1234)) - - next := &MockIngestData{MockOriginStage{progress: Progress{Origin: testutils.RandomBlockRef(rng), Closed: true}}} dataSrc := &MockDataSource{} - prev := &MockL1Traversal{} - - a := testutils.RandomData(rng, 10) - b := testutils.RandomData(rng, 15) - iter := &fakeDataIter{data: []eth.Data{a, b}} - - outer := next.progress + a := testutils.RandomBlockRef(rng) - // mock some L1 data to open for the origin that is opened by the outer stage - dataSrc.ExpectOpenData(outer.Origin.ID(), iter, nil) + dataSrc.ExpectOpenData(a.ID(), &fakeDataIter{}) + defer dataSrc.AssertExpectations(t) - next.ExpectIngestData(a) - next.ExpectIngestData(b) + l1r := NewL1Retrieval(testlog.Logger(t, log.LvlError), dataSrc, nil) - defer dataSrc.AssertExpectations(t) - defer next.AssertExpectations(t) + // We assert that it opens up the correct data on a reset + _ = l1r.Reset(context.Background(), a) +} - l1r := NewL1Retrieval(testlog.Logger(t, log.LvlError), dataSrc, next, prev) +// TestL1RetrievalNextData tests that the `NextData` function properly +// handles different error cases and returns the expected data +// if there is no error. +func TestL1RetrievalNextData(t *testing.T) { + rng := rand.New(rand.NewSource(1234)) + a := testutils.RandomBlockRef(rng) + + tests := []struct { + name string + prevBlock eth.L1BlockRef + prevErr error // error returned by prev.NextL1Block + openErr error // error returned by NextData if prev.NextL1Block fails + datas []eth.Data + datasErrs []error + expectedErrs []error + }{ + { + name: "simple retrieval", + prevBlock: a, + prevErr: nil, + openErr: nil, + datas: []eth.Data{testutils.RandomData(rng, 10), testutils.RandomData(rng, 10), testutils.RandomData(rng, 10), nil}, + datasErrs: []error{nil, nil, nil, io.EOF}, + expectedErrs: []error{nil, nil, nil, io.EOF}, + }, + { + name: "out of data", + prevErr: io.EOF, + openErr: io.EOF, + }, + { + name: "fail to open data", + prevBlock: a, + prevErr: nil, + openErr: nil, + datas: []eth.Data{nil}, + datasErrs: []error{NewCriticalError(ethereum.NotFound)}, + expectedErrs: []error{ErrCritical}, + }, + } - // first we expect the stage to reset to the origin of the inner stage - require.NoError(t, RepeatResetStep(t, l1r.ResetStep, nil, 1)) - require.Equal(t, next.Progress(), l1r.Progress(), "stage needs to adopt the progress of next stage on reset") + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + l1t := &MockL1Traversal{} + l1t.ExpectNextL1Block(test.prevBlock, test.prevErr) + dataSrc := &MockDataSource{} + dataSrc.ExpectOpenData(test.prevBlock.ID(), &fakeDataIter{data: test.datas, errs: test.datasErrs}) + + ret := NewL1Retrieval(testlog.Logger(t, log.LvlCrit), dataSrc, l1t) + + // If prevErr != nil we forced an error while getting data from the previous stage + if test.openErr != nil { + data, err := ret.NextData(context.Background()) + require.Nil(t, data) + require.ErrorIs(t, err, test.openErr) + } + + // Go through the fake data an assert that data is passed through and the correct + // errors are returned. + for i := range test.expectedErrs { + data, err := ret.NextData(context.Background()) + require.Equal(t, test.datas[i], hexutil.Bytes(data)) + require.ErrorIs(t, err, test.expectedErrs[i]) + } + + l1t.AssertExpectations(t) + }) + } - // and then start processing the data of the next stage - require.NoError(t, RepeatStep(t, l1r.Step, outer, 10)) } diff --git a/op-node/rollup/derive/l1_traversal.go b/op-node/rollup/derive/l1_traversal.go index 52d0436db20d0..db19eed5a94db 100644 --- a/op-node/rollup/derive/l1_traversal.go +++ b/op-node/rollup/derive/l1_traversal.go @@ -33,6 +33,10 @@ func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher) *L1Trave } } +func (l1t *L1Traversal) Origin() eth.L1BlockRef { + return l1t.block +} + // NextL1Block returns the next block. It does not advance, but it can only be // called once before returning io.EOF func (l1t *L1Traversal) NextL1Block(_ context.Context) (eth.L1BlockRef, error) { diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 772e1b8579897..cddf0e0c5d595 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -96,19 +96,18 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch // Pull stages l1Traversal := NewL1Traversal(log, l1Fetcher) + dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) // auxiliary stage for L1Retrieval + l1Src := NewL1Retrieval(log, dataSrc, l1Traversal) // Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages) eng := NewEngineQueue(log, cfg, engine, metrics) attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng) batchQueue := NewBatchQueue(log, cfg, attributesQueue) chInReader := NewChannelInReader(log, batchQueue) - bank := NewChannelBank(log, cfg, chInReader) + bank := NewChannelBank(log, cfg, chInReader, l1Src) - dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) - l1Src := NewL1Retrieval(log, dataSrc, bank, l1Traversal) - - stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank, l1Src} - pullStages := []PullStage{l1Traversal} + stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank} + pullStages := []PullStage{l1Src, l1Traversal} return &DerivationPipeline{ log: log, From fe78c148405575a749dcc6356fb674674464a29a Mon Sep 17 00:00:00 2001 From: Joshua Gutow Date: Tue, 27 Sep 2022 23:15:38 +0200 Subject: [PATCH 2/6] op-node: Switch L1 Traversal to a pull based model The L1 Retrieval stage is now responsible for pulling data from the L1 Traversal stage. In addition, the pipeline is responsible for advancing the state of the L1 Traversal stage. The L1 Traversal stage only provides access to the current L1 block once - it pretends to be a queue that is consumed from. --- op-node/rollup/derive/l1_retrieval.go | 63 +++++----- op-node/rollup/derive/l1_retrieval_test.go | 18 ++- op-node/rollup/derive/l1_traversal.go | 56 +++++---- op-node/rollup/derive/l1_traversal_test.go | 134 +++++++++++++++------ op-node/rollup/derive/pipeline.go | 64 ++++++++-- 5 files changed, 226 insertions(+), 109 deletions(-) diff --git a/op-node/rollup/derive/l1_retrieval.go b/op-node/rollup/derive/l1_retrieval.go index 4d90ad9f48c16..c20ef71d8fc4b 100644 --- a/op-node/rollup/derive/l1_retrieval.go +++ b/op-node/rollup/derive/l1_retrieval.go @@ -17,24 +17,29 @@ type DataAvailabilitySource interface { OpenData(ctx context.Context, id eth.BlockID) DataIter } +type NextBlockProvider interface { + NextL1Block(context.Context) (eth.L1BlockRef, error) +} + type L1Retrieval struct { log log.Logger dataSrc DataAvailabilitySource next L1SourceOutput + prev NextBlockProvider progress Progress - data eth.Data datas DataIter } var _ Stage = (*L1Retrieval)(nil) -func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, next L1SourceOutput) *L1Retrieval { +func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, next L1SourceOutput, prev NextBlockProvider) *L1Retrieval { return &L1Retrieval{ log: log, dataSrc: dataSrc, next: next, + prev: prev, } } @@ -42,48 +47,46 @@ func (l1r *L1Retrieval) Progress() Progress { return l1r.progress } -func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error { - if changed, err := l1r.progress.Update(outer); err != nil || changed { - return err - } - - // specific to L1 source: if the L1 origin is closed, there is no more data to retrieve. - if l1r.progress.Closed { - return io.EOF - } - - // create a source if we have none - if l1r.datas == nil { - l1r.datas = l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID()) - return nil - } - - // buffer data if we have none - if l1r.data == nil { +// Step does an action in the L1 Retrieval stage +// If there is data, it pushes it to the next stage. +// If there is no more data open ourselves if we are closed or close ourselves if we are open +func (l1r *L1Retrieval) Step(ctx context.Context, _ Progress) error { + if l1r.datas != nil { l1r.log.Debug("fetching next piece of data") data, err := l1r.datas.Next(ctx) if err == io.EOF { - l1r.progress.Closed = true l1r.datas = nil return io.EOF } else if err != nil { return err } else { - l1r.data = data + l1r.next.IngestData(data) return nil } + } else { + if l1r.progress.Closed { + next, err := l1r.prev.NextL1Block(ctx) + if err == io.EOF { + return io.EOF + } else if err != nil { + return err + } + l1r.datas = l1r.dataSrc.OpenData(ctx, next.ID()) + l1r.progress.Origin = next + l1r.progress.Closed = false + } else { + l1r.progress.Closed = true + } + return nil } - - // flush the data to next stage - l1r.next.IngestData(l1r.data) - // and nil the data, the next step will retrieve the next data - l1r.data = nil - return nil } +// ResetStep re-initializes the L1 Retrieval stage to block of it's `next` progress. +// Note that we open up the `l1r.datas` here because it is requires to maintain the +// internal invariants that later propagate up the derivation pipeline. func (l1r *L1Retrieval) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { l1r.progress = l1r.next.Progress() - l1r.datas = nil - l1r.data = nil + l1r.datas = l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID()) + l1r.log.Info("Reset of L1Retrieval done", "origin", l1r.progress.Origin) return io.EOF } diff --git a/op-node/rollup/derive/l1_retrieval_test.go b/op-node/rollup/derive/l1_retrieval_test.go index 42794476a312d..8e6209f0b99f2 100644 --- a/op-node/rollup/derive/l1_retrieval_test.go +++ b/op-node/rollup/derive/l1_retrieval_test.go @@ -44,6 +44,19 @@ func (m *MockDataSource) ExpectOpenData(id eth.BlockID, iter DataIter, err error var _ DataAvailabilitySource = (*MockDataSource)(nil) +type MockL1Traversal struct { + mock.Mock +} + +func (m *MockL1Traversal) NextL1Block(_ context.Context) (eth.L1BlockRef, error) { + out := m.Mock.MethodCalled("NextL1Block") + return out[0].(eth.L1BlockRef), out[1].(error) +} + +func (m *MockL1Traversal) ExpectNextL1Block(block eth.L1BlockRef, err error) { + m.Mock.On("NextL1Block").Return(block, err) +} + type MockIngestData struct { MockOriginStage } @@ -63,12 +76,13 @@ func TestL1Retrieval_Step(t *testing.T) { next := &MockIngestData{MockOriginStage{progress: Progress{Origin: testutils.RandomBlockRef(rng), Closed: true}}} dataSrc := &MockDataSource{} + prev := &MockL1Traversal{} a := testutils.RandomData(rng, 10) b := testutils.RandomData(rng, 15) iter := &fakeDataIter{data: []eth.Data{a, b}} - outer := Progress{Origin: testutils.NextRandomRef(rng, next.progress.Origin), Closed: false} + outer := next.progress // mock some L1 data to open for the origin that is opened by the outer stage dataSrc.ExpectOpenData(outer.Origin.ID(), iter, nil) @@ -79,7 +93,7 @@ func TestL1Retrieval_Step(t *testing.T) { defer dataSrc.AssertExpectations(t) defer next.AssertExpectations(t) - l1r := NewL1Retrieval(testlog.Logger(t, log.LvlError), dataSrc, next) + l1r := NewL1Retrieval(testlog.Logger(t, log.LvlError), dataSrc, next, prev) // first we expect the stage to reset to the origin of the inner stage require.NoError(t, RepeatResetStep(t, l1r.ResetStep, nil, 1)) diff --git a/op-node/rollup/derive/l1_traversal.go b/op-node/rollup/derive/l1_traversal.go index f58f4de903465..52d0436db20d0 100644 --- a/op-node/rollup/derive/l1_traversal.go +++ b/op-node/rollup/derive/l1_traversal.go @@ -11,42 +11,42 @@ import ( "github.com/ethereum/go-ethereum/log" ) +// L1 Traversal fetches the next L1 block and exposes it through the progress API + type L1BlockRefByNumberFetcher interface { L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) } type L1Traversal struct { - log log.Logger + block eth.L1BlockRef + done bool l1Blocks L1BlockRefByNumberFetcher - next StageProgress - progress Progress + log log.Logger } -var _ Stage = (*L1Traversal)(nil) +var _ PullStage = (*L1Traversal)(nil) -func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher, next StageProgress) *L1Traversal { +func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher) *L1Traversal { return &L1Traversal{ log: log, l1Blocks: l1Blocks, - next: next, } } -func (l1t *L1Traversal) Progress() Progress { - return l1t.progress -} - -func (l1t *L1Traversal) Step(ctx context.Context, outer Progress) error { - if !l1t.progress.Closed { // close origin and do another pipeline sweep, before we try to move to the next origin - l1t.progress.Closed = true - return nil +// NextL1Block returns the next block. It does not advance, but it can only be +// called once before returning io.EOF +func (l1t *L1Traversal) NextL1Block(_ context.Context) (eth.L1BlockRef, error) { + if !l1t.done { + l1t.done = true + return l1t.block, nil + } else { + return eth.L1BlockRef{}, io.EOF } +} - // If we reorg to a shorter chain, then we'll only derive new L2 data once the L1 reorg - // becomes longer than the previous L1 chain. - // This is fine, assuming the new L1 chain is live, but we may want to reconsider this. - - origin := l1t.progress.Origin +// AdvanceL1Block advances the internal state of L1 Traversal +func (l1t *L1Traversal) AdvanceL1Block(ctx context.Context) error { + origin := l1t.block nextL1Origin, err := l1t.l1Blocks.L1BlockRefByNumber(ctx, origin.Number+1) if errors.Is(err, ethereum.NotFound) { l1t.log.Debug("can't find next L1 block info (yet)", "number", origin.Number+1, "origin", origin) @@ -54,16 +54,20 @@ func (l1t *L1Traversal) Step(ctx context.Context, outer Progress) error { } else if err != nil { return NewTemporaryError(fmt.Errorf("failed to find L1 block info by number, at origin %s next %d: %w", origin, origin.Number+1, err)) } - if l1t.progress.Origin.Hash != nextL1Origin.ParentHash { - return NewResetError(fmt.Errorf("detected L1 reorg from %s to %s with conflicting parent %s", l1t.progress.Origin, nextL1Origin, nextL1Origin.ParentID())) + if l1t.block.Hash != nextL1Origin.ParentHash { + return NewResetError(fmt.Errorf("detected L1 reorg from %s to %s with conflicting parent %s", l1t.block, nextL1Origin, nextL1Origin.ParentID())) } - l1t.progress.Origin = nextL1Origin - l1t.progress.Closed = false + l1t.block = nextL1Origin + l1t.done = false return nil } -func (l1t *L1Traversal) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - l1t.progress = l1t.next.Progress() - l1t.log.Info("completed reset of derivation pipeline", "origin", l1t.progress.Origin) +// Reset sets the internal L1 block to the supplied base. +// Note that the next call to `NextL1Block` will return the block after `base` +// TODO: Walk one back/figure this out. +func (l1t *L1Traversal) Reset(ctx context.Context, base eth.L1BlockRef) error { + l1t.block = base + l1t.done = false + l1t.log.Info("completed reset of derivation pipeline", "origin", base) return io.EOF } diff --git a/op-node/rollup/derive/l1_traversal_test.go b/op-node/rollup/derive/l1_traversal_test.go index 66f0e459cafb1..623307db74014 100644 --- a/op-node/rollup/derive/l1_traversal_test.go +++ b/op-node/rollup/derive/l1_traversal_test.go @@ -1,57 +1,115 @@ package derive import ( + "context" "errors" + "io" "math/rand" "testing" "github.com/stretchr/testify/require" + "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testutils" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/log" ) -func TestL1Traversal_Step(t *testing.T) { +// TestL1TraversalNext tests that the `Next` function only returns +// a block reference once and then properly returns io.EOF afterwards +func TestL1TraversalNext(t *testing.T) { + rng := rand.New(rand.NewSource(1234)) + a := testutils.RandomBlockRef(rng) + + tr := NewL1Traversal(testlog.Logger(t, log.LvlError), nil) + // Load up the initial state with a reset + _ = tr.Reset(context.Background(), a) + + // First call should always succeed + ref, err := tr.NextL1Block(context.Background()) + require.Nil(t, err) + require.Equal(t, a, ref) + + // Subsequent calls should return io.EOF + ref, err = tr.NextL1Block(context.Background()) + require.Equal(t, eth.L1BlockRef{}, ref) + require.Equal(t, io.EOF, err) + + ref, err = tr.NextL1Block(context.Background()) + require.Equal(t, eth.L1BlockRef{}, ref) + require.Equal(t, io.EOF, err) +} + +// TestL1TraversalAdvance tests that the `Advance` function properly +// handles different error cases and returns the expected block ref +// if there is no error. +func TestL1TraversalAdvance(t *testing.T) { rng := rand.New(rand.NewSource(1234)) a := testutils.RandomBlockRef(rng) b := testutils.NextRandomRef(rng, a) - c := testutils.NextRandomRef(rng, b) - d := testutils.NextRandomRef(rng, c) - e := testutils.NextRandomRef(rng, d) - - f := testutils.RandomBlockRef(rng) // a fork, doesn't build on d - f.Number = e.Number + 1 // even though it might be the next number - - l1Fetcher := &testutils.MockL1Source{} - l1Fetcher.ExpectL1BlockRefByNumber(b.Number, b, nil) - // pretend there's an RPC error - l1Fetcher.ExpectL1BlockRefByNumber(c.Number, c, errors.New("rpc error - check back later")) - l1Fetcher.ExpectL1BlockRefByNumber(c.Number, c, nil) - // pretend the block is not there yet for a while - l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, ethereum.NotFound) - l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, ethereum.NotFound) - // it will show up though - l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, nil) - l1Fetcher.ExpectL1BlockRefByNumber(e.Number, e, nil) - l1Fetcher.ExpectL1BlockRefByNumber(f.Number, f, nil) - - next := &MockOriginStage{progress: Progress{Origin: a, Closed: false}} - - tr := NewL1Traversal(testlog.Logger(t, log.LvlError), l1Fetcher, next) - - defer l1Fetcher.AssertExpectations(t) - defer next.AssertExpectations(t) - - require.NoError(t, RepeatResetStep(t, tr.ResetStep, nil, 1)) - require.Equal(t, a, tr.Progress().Origin, "stage needs to adopt the origin of next stage on reset") - require.False(t, tr.Progress().Closed, "stage needs to be open after reset") - - require.ErrorIs(t, RepeatStep(t, tr.Step, Progress{}, 10), ErrTemporary, "expected temporary error because of RPC mock fail") - require.NoError(t, RepeatStep(t, tr.Step, Progress{}, 10)) - require.Equal(t, c, tr.Progress().Origin, "expected to be stuck on ethereum.NotFound on d") - require.NoError(t, RepeatStep(t, tr.Step, Progress{}, 1)) - require.Equal(t, c, tr.Progress().Origin, "expected to be stuck again, should get the EOF within 1 step") - require.ErrorIs(t, RepeatStep(t, tr.Step, Progress{}, 10), ErrReset, "completed pipeline, until L1 input f that causes a reorg") + // x is at the same height as b but does not extend `a` + x := testutils.RandomBlockRef(rng) + x.Number = b.Number + + tests := []struct { + name string + startBlock eth.L1BlockRef + nextBlock eth.L1BlockRef + fetcherErr error + expectedErr error + }{ + { + name: "simple extension", + startBlock: a, + nextBlock: b, + fetcherErr: nil, + expectedErr: nil, + }, + { + name: "reorg", + startBlock: a, + nextBlock: x, + fetcherErr: nil, + expectedErr: ErrReset, + }, + { + name: "not found", + startBlock: a, + nextBlock: eth.L1BlockRef{}, + fetcherErr: ethereum.NotFound, + expectedErr: io.EOF, + }, + { + name: "temporary error", + startBlock: a, + nextBlock: eth.L1BlockRef{}, + fetcherErr: errors.New("interrupted connection"), + expectedErr: ErrTemporary, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + src := &testutils.MockL1Source{} + src.ExpectL1BlockRefByNumber(test.startBlock.Number+1, test.nextBlock, test.fetcherErr) + + tr := NewL1Traversal(testlog.Logger(t, log.LvlError), src) + // Load up the initial state with a reset + _ = tr.Reset(context.Background(), test.startBlock) + + // Advance it + assert output + err := tr.AdvanceL1Block(context.Background()) + require.ErrorIs(t, err, test.expectedErr) + + if test.expectedErr == nil { + ref, err := tr.NextL1Block(context.Background()) + require.Nil(t, err) + require.Equal(t, test.nextBlock, ref) + } + + src.AssertExpectations(t) + }) + } + } diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 93f43f48986f8..772e1b8579897 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -28,6 +28,12 @@ type StageProgress interface { Progress() Progress } +type PullStage interface { + // Reset resets a pull stage. `base` refers to the L1 Block Reference to reset to. + // TODO: Return L1 Block reference + Reset(ctx context.Context, base eth.L1BlockRef) error +} + type Stage interface { StageProgress @@ -68,7 +74,8 @@ type DerivationPipeline struct { // Index of the stage that is currently being reset. // >= len(stages) if no additional resetting is required - resetting int + resetting int + pullResetIdx int // Index of the stage that is currently being processed. active int @@ -76,6 +83,9 @@ type DerivationPipeline struct { // stages in execution order. A stage Step that: stages []Stage + pullStages []PullStage + traversal *L1Traversal + eng EngineQueueStage metrics Metrics @@ -83,30 +93,40 @@ type DerivationPipeline struct { // NewDerivationPipeline creates a derivation pipeline, which should be reset before use. func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics) *DerivationPipeline { + + // Pull stages + l1Traversal := NewL1Traversal(log, l1Fetcher) + + // Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages) eng := NewEngineQueue(log, cfg, engine, metrics) attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng) batchQueue := NewBatchQueue(log, cfg, attributesQueue) chInReader := NewChannelInReader(log, batchQueue) bank := NewChannelBank(log, cfg, chInReader) + dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) - l1Src := NewL1Retrieval(log, dataSrc, bank) - l1Traversal := NewL1Traversal(log, l1Fetcher, l1Src) - stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal} + l1Src := NewL1Retrieval(log, dataSrc, bank, l1Traversal) + + stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank, l1Src} + pullStages := []PullStage{l1Traversal} return &DerivationPipeline{ - log: log, - cfg: cfg, - l1Fetcher: l1Fetcher, - resetting: 0, - active: 0, - stages: stages, - eng: eng, - metrics: metrics, + log: log, + cfg: cfg, + l1Fetcher: l1Fetcher, + resetting: 0, + active: 0, + stages: stages, + pullStages: pullStages, + eng: eng, + metrics: metrics, + traversal: l1Traversal, } } func (dp *DerivationPipeline) Reset() { dp.resetting = 0 + dp.pullResetIdx = 0 } func (dp *DerivationPipeline) Progress() Progress { @@ -160,7 +180,24 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error { return nil } } + // Then reset the pull based stages + if dp.pullResetIdx < len(dp.pullStages) { + // Use the last stage's progress as the one to pull from + inner := dp.stages[len(dp.stages)-1].Progress() + + // Do the reset + if err := dp.pullStages[dp.pullResetIdx].Reset(ctx, inner.Origin); err == io.EOF { + // dp.log.Debug("reset of stage completed", "stage", dp.pullResetIdx, "origin", dp.pullStages[dp.pullResetIdx].Progress().Origin) + dp.pullResetIdx += 1 + return nil + } else if err != nil { + return fmt.Errorf("stage %d failed resetting: %w", dp.pullResetIdx, err) + } else { + return nil + } + } + // Lastly advance the stages for i, stage := range dp.stages { var outer Progress if i+1 < len(dp.stages) { @@ -174,5 +211,6 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error { return nil } } - return io.EOF + // If every stage has returned io.EOF, try to advance the L1 Origin + return dp.traversal.AdvanceL1Block(ctx) } From 9e3a8847a5c3bd90df0d73c218e475a291200a86 Mon Sep 17 00:00:00 2001 From: Joshua Gutow Date: Wed, 28 Sep 2022 15:23:35 -0700 Subject: [PATCH 3/6] op-node: Switch channel bank to be pull based This again requires a fair amount of changes to channel_in_reader.go for the channel in reader to maintain its progress state. --- op-node/rollup/derive/channel_bank.go | 159 ++++++------ op-node/rollup/derive/channel_bank_test.go | 275 +++++++-------------- op-node/rollup/derive/channel_in_reader.go | 61 +++-- op-node/rollup/derive/pipeline.go | 8 +- 4 files changed, 205 insertions(+), 298 deletions(-) diff --git a/op-node/rollup/derive/channel_bank.go b/op-node/rollup/derive/channel_bank.go index e3050112e6f8d..402e0134df89f 100644 --- a/op-node/rollup/derive/channel_bank.go +++ b/op-node/rollup/derive/channel_bank.go @@ -2,6 +2,7 @@ package derive import ( "context" + "errors" "fmt" "io" @@ -11,6 +12,11 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type NextDataProvider interface { + NextData(ctx context.Context) ([]byte, error) + Origin() eth.L1BlockRef +} + // ChannelBank is a stateful stage that does the following: // 1. Unmarshalls frames from L1 transaction data // 2. Applies those frames to a channel @@ -22,11 +28,6 @@ import ( // Specifically, the channel bank is not allowed to become too large between successive calls // to `IngestData`. This means that we can do an ingest and then do a read while becoming too large. -type ChannelBankOutput interface { - StageProgress - WriteChannel(data []byte) -} - // ChannelBank buffers channel frames, and emits full channel data type ChannelBank struct { log log.Logger @@ -35,82 +36,79 @@ type ChannelBank struct { channels map[ChannelID]*Channel // channels by ID channelQueue []ChannelID // channels in FIFO order - progress Progress + origin eth.L1BlockRef - next ChannelBankOutput - prev *L1Retrieval + prev NextDataProvider + fetcher L1Fetcher } -var _ Stage = (*ChannelBank)(nil) +var _ PullStage = (*ChannelBank)(nil) // NewChannelBank creates a ChannelBank, which should be Reset(origin) before use. -func NewChannelBank(log log.Logger, cfg *rollup.Config, next ChannelBankOutput, prev *L1Retrieval) *ChannelBank { +func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextDataProvider, fetcher L1Fetcher) *ChannelBank { return &ChannelBank{ log: log, cfg: cfg, channels: make(map[ChannelID]*Channel), channelQueue: make([]ChannelID, 0, 10), - next: next, prev: prev, + fetcher: fetcher, } } -func (ib *ChannelBank) Progress() Progress { - return ib.progress +func (cb *ChannelBank) Origin() eth.L1BlockRef { + return cb.prev.Origin() } -func (ib *ChannelBank) prune() { +func (cb *ChannelBank) prune() { // check total size totalSize := uint64(0) - for _, ch := range ib.channels { + for _, ch := range cb.channels { totalSize += ch.size } // prune until it is reasonable again. The high-priority channel failed to be read, so we start pruning there. for totalSize > MaxChannelBankSize { - id := ib.channelQueue[0] - ch := ib.channels[id] - ib.channelQueue = ib.channelQueue[1:] - delete(ib.channels, id) + id := cb.channelQueue[0] + ch := cb.channels[id] + cb.channelQueue = cb.channelQueue[1:] + delete(cb.channels, id) totalSize -= ch.size } } // IngestData adds new L1 data to the channel bank. // Read() should be called repeatedly first, until everything has been read, before adding new data.\ -func (ib *ChannelBank) IngestData(data []byte) { - if ib.progress.Closed { - panic("write data to bank while closed") - } - ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data)) +func (cb *ChannelBank) IngestData(data []byte) { + cb.log.Debug("channel bank got new data", "origin", cb.origin, "data_len", len(data)) // TODO: Why is the prune here? - ib.prune() + cb.prune() frames, err := ParseFrames(data) if err != nil { - ib.log.Warn("malformed frame", "err", err) + cb.log.Warn("malformed frame", "err", err) return } // Process each frame for _, f := range frames { - currentCh, ok := ib.channels[f.ID] + currentCh, ok := cb.channels[f.ID] if !ok { // create new channel if it doesn't exist yet - currentCh = NewChannel(f.ID, ib.progress.Origin) - ib.channels[f.ID] = currentCh - ib.channelQueue = append(ib.channelQueue, f.ID) + currentCh = NewChannel(f.ID, cb.origin) + cb.channels[f.ID] = currentCh + cb.channelQueue = append(cb.channelQueue, f.ID) } // check if the channel is not timed out - if currentCh.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.progress.Origin.Number { - ib.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber) + if currentCh.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.origin.Number { + cb.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber) continue } - ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data)) - if err := currentCh.AddFrame(f, ib.progress.Origin); err != nil { - ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err) + cb.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data)) + if err := currentCh.AddFrame(f, cb.origin); err != nil { + cb.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err) continue } } @@ -118,78 +116,60 @@ func (ib *ChannelBank) IngestData(data []byte) { // Read the raw data of the first channel, if it's timed-out or closed. // Read returns io.EOF if there is nothing new to read. -func (ib *ChannelBank) Read() (data []byte, err error) { - if len(ib.channelQueue) == 0 { +func (cb *ChannelBank) Read() (data []byte, err error) { + if len(cb.channelQueue) == 0 { return nil, io.EOF } - first := ib.channelQueue[0] - ch := ib.channels[first] - timedOut := ch.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.progress.Origin.Number + first := cb.channelQueue[0] + ch := cb.channels[first] + timedOut := ch.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.origin.Number if timedOut { - ib.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs)) - delete(ib.channels, first) - ib.channelQueue = ib.channelQueue[1:] + cb.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs)) + delete(cb.channels, first) + cb.channelQueue = cb.channelQueue[1:] return nil, io.EOF } if !ch.IsReady() { return nil, io.EOF } - delete(ib.channels, first) - ib.channelQueue = ib.channelQueue[1:] + delete(cb.channels, first) + cb.channelQueue = cb.channelQueue[1:] r := ch.Reader() // Suprress error here. io.ReadAll does return nil instead of io.EOF though. data, _ = io.ReadAll(r) return data, nil } -// Step does the advancement for the channel bank. -// Channel bank as the first non-pull stage does it's own progress maintentance. -// When closed, it checks against the previous origin to determine if to open itself -func (ib *ChannelBank) Step(ctx context.Context, _ Progress) error { - // Open ourselves - // This is ok to do b/c we would not have yielded control to the lower stages - // of the pipeline without being completely done reading from L1. - if ib.progress.Closed { - if ib.progress.Origin != ib.prev.Origin() { - ib.progress.Closed = false - ib.progress.Origin = ib.prev.Origin() - return nil - } +// NextData pulls the next piece of data from the channel bank. +// Note that it attempts to pull data out of the channel bank prior to +// loading data in (unlike most other stages). This is to ensure maintain +// consistency around channel bank pruning which depends upon the order +// of operations. +func (cb *ChannelBank) NextData(ctx context.Context) ([]byte, error) { + if cb.origin != cb.prev.Origin() { + cb.origin = cb.prev.Origin() } - skipIngest := ib.next.Progress().Origin.Number > ib.progress.Origin.Number - outOfData := false - - if data, err := ib.prev.NextData(ctx); err == io.EOF { - outOfData = true + // Do the read from the channel bank first + data, err := cb.Read() + if err == io.EOF { + // continue - We will attempt to load data into the channel bank } else if err != nil { - return err + return nil, err } else { - ib.IngestData(data) + return data, nil } - // otherwise, read the next channel data from the bank - data, err := ib.Read() - if err == io.EOF { // need new L1 data in the bank before we can read more channel data - if outOfData { - if !ib.progress.Closed { - ib.progress.Closed = true - return nil - } - return io.EOF - } else { - return nil - } + // Then load data into the channel bank + if data, err := cb.prev.NextData(ctx); err == io.EOF { + return nil, io.EOF } else if err != nil { - return err + return nil, err } else { - if !skipIngest { - ib.next.WriteChannel(data) - return nil - } + cb.IngestData(data) + return nil, NewTemporaryError(errors.New("not enough data")) } - return nil } // ResetStep walks back the L1 chain, starting at the origin of the next stage, @@ -197,19 +177,18 @@ func (ib *ChannelBank) Step(ctx context.Context, _ Progress) error { // to get consistent reads starting at origin. // Any channel data before this origin will be timed out by the time the channel bank is synced up to the origin, // so it is not relevant to replay it into the bank. -func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - ib.progress = ib.next.Progress() - ib.log.Debug("walking back to find reset origin for channel bank", "origin", ib.progress.Origin) +func (cb *ChannelBank) Reset(ctx context.Context, base eth.L1BlockRef) error { + cb.log.Debug("walking back to find reset origin for channel bank", "origin", base) // go back in history if we are not distant enough from the next stage - resetBlock := ib.progress.Origin.Number - ib.cfg.ChannelTimeout - if ib.progress.Origin.Number < ib.cfg.ChannelTimeout { + resetBlock := base.Number - cb.cfg.ChannelTimeout + if base.Number < cb.cfg.ChannelTimeout { resetBlock = 0 // don't underflow } - parent, err := l1Fetcher.L1BlockRefByNumber(ctx, resetBlock) + parent, err := cb.fetcher.L1BlockRefByNumber(ctx, resetBlock) if err != nil { return NewTemporaryError(fmt.Errorf("failed to find channel bank block, failed to retrieve L1 reference: %w", err)) } - ib.progress.Origin = parent + cb.origin = parent return io.EOF } diff --git a/op-node/rollup/derive/channel_bank_test.go b/op-node/rollup/derive/channel_bank_test.go index dfa62a25266d3..86cf3d53584e2 100644 --- a/op-node/rollup/derive/channel_bank_test.go +++ b/op-node/rollup/derive/channel_bank_test.go @@ -2,80 +2,63 @@ package derive import ( "bytes" + "context" "fmt" + "io" "math/rand" "strconv" "strings" "testing" - "github.com/stretchr/testify/require" - "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testutils" "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" ) -type MockChannelBankOutput struct { - MockOriginStage -} - -func (m *MockChannelBankOutput) WriteChannel(data []byte) { - m.MethodCalled("WriteChannel", data) +type fakeChannelBankInput struct { + origin eth.L1BlockRef + data []struct { + data []byte + err error + } } -func (m *MockChannelBankOutput) ExpectWriteChannel(data []byte) { - m.On("WriteChannel", data).Once().Return() +func (f *fakeChannelBankInput) Origin() eth.L1BlockRef { + return f.origin } -var _ ChannelBankOutput = (*MockChannelBankOutput)(nil) - -type bankTestSetup struct { - origins []eth.L1BlockRef - t *testing.T - rng *rand.Rand - cb *ChannelBank - out *MockChannelBankOutput - l1 *testutils.MockL1Source +func (f *fakeChannelBankInput) NextData(_ context.Context) ([]byte, error) { + out := f.data[0] + f.data = f.data[1:] + return out.data, out.err } -// nolint - this is getting picked up b/c of a t.Skip that will go away -type channelBankTestCase struct { - name string - originTimes []uint64 - nextStartsAt int - channelTimeout uint64 - fn func(bt *bankTestSetup) +func (f *fakeChannelBankInput) AddOutput(data []byte, err error) { + f.data = append(f.data, struct { + data []byte + err error + }{data: data, err: err}) } -// nolint -func (ct *channelBankTestCase) Run(t *testing.T) { - cfg := &rollup.Config{ - ChannelTimeout: ct.channelTimeout, - } - - bt := &bankTestSetup{ - t: t, - rng: rand.New(rand.NewSource(1234)), - l1: &testutils.MockL1Source{}, - } - - bt.origins = append(bt.origins, testutils.RandomBlockRef(bt.rng)) - for i := range ct.originTimes[1:] { - ref := testutils.NextRandomRef(bt.rng, bt.origins[i]) - bt.origins = append(bt.origins, ref) - } - for i, x := range ct.originTimes { - bt.origins[i].Time = x +// ExpectNextFrameData takes a set of test frame & turns into the raw data +// for reading into the channel bank via `NextData` +func (f *fakeChannelBankInput) AddFrames(frames ...testFrame) { + data := new(bytes.Buffer) + data.WriteByte(DerivationVersion0) + for _, frame := range frames { + ff := frame.ToFrame() + if err := ff.MarshalBinary(data); err != nil { + panic(fmt.Errorf("error in making frame during test: %w", err)) + } } - - bt.out = &MockChannelBankOutput{MockOriginStage{progress: Progress{Origin: bt.origins[ct.nextStartsAt], Closed: false}}} - bt.cb = NewChannelBank(testlog.Logger(t, log.LvlError), cfg, bt.out, nil) - - ct.fn(bt) + f.AddOutput(data.Bytes(), nil) } +var _ NextDataProvider = (*fakeChannelBankInput)(nil) + // format: :: // example: "abc:0:helloworld!" type testFrame string @@ -115,154 +98,76 @@ func (tf testFrame) ToFrame() Frame { } } -func (bt *bankTestSetup) ingestData(data []byte) { - bt.cb.IngestData(data) -} +func TestChannelBankSimple(t *testing.T) { + rng := rand.New(rand.NewSource(1234)) + a := testutils.RandomBlockRef(rng) -func (bt *bankTestSetup) ingestFrames(frames ...testFrame) { - data := new(bytes.Buffer) - data.WriteByte(DerivationVersion0) - for _, fr := range frames { - f := fr.ToFrame() - if err := f.MarshalBinary(data); err != nil { - panic(fmt.Errorf("error in making frame during test: %w", err)) - } - } - bt.ingestData(data.Bytes()) -} -func (bt *bankTestSetup) repeatStep(max int, outer int, outerClosed bool, err error) { - require.Equal(bt.t, err, RepeatStep(bt.t, bt.cb.Step, Progress{Origin: bt.origins[outer], Closed: outerClosed}, max)) -} -func (bt *bankTestSetup) repeatResetStep(max int, err error) { - require.Equal(bt.t, err, RepeatResetStep(bt.t, bt.cb.ResetStep, bt.l1, max)) -} + input := &fakeChannelBankInput{origin: a} + input.AddFrames("a:0:first", "a:2:third!") + input.AddFrames("a:1:second") + input.AddOutput(nil, io.EOF) -func (bt *bankTestSetup) assertOrigin(i int) { - require.Equal(bt.t, bt.cb.progress.Origin, bt.origins[i]) -} -func (bt *bankTestSetup) assertOriginTime(x uint64) { - require.Equal(bt.t, x, bt.cb.progress.Origin.Time) -} -func (bt *bankTestSetup) expectChannel(data string) { - bt.out.ExpectWriteChannel([]byte(data)) -} -func (bt *bankTestSetup) expectL1BlockRefByNumber(i int) { - bt.l1.ExpectL1BlockRefByNumber(bt.origins[i].Number, bt.origins[i], nil) -} -func (bt *bankTestSetup) assertExpectations() { - bt.l1.AssertExpectations(bt.t) - bt.l1.ExpectedCalls = nil - bt.out.AssertExpectations(bt.t) - bt.out.ExpectedCalls = nil -} + cfg := &rollup.Config{ChannelTimeout: 10} -func TestL1ChannelBank(t *testing.T) { - t.Skip("broken b/c the fake L1Retrieval is not yet built") - testCases := []channelBankTestCase{ - { - name: "time outs and buffering", - originTimes: []uint64{0, 1, 2, 3, 4, 5}, - nextStartsAt: 3, // Start next stage at block #3 - channelTimeout: 2, // Start at block #1 - fn: func(bt *bankTestSetup) { - bt.expectL1BlockRefByNumber(1) - bt.repeatResetStep(10, nil) - bt.ingestFrames("a:0:first") // will time out b/c not closed + cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil) - bt.repeatStep(10, 1, true, nil) - bt.repeatStep(10, 2, false, nil) - bt.assertOrigin(2) + // Load the first + third frame + out, err := cb.NextData(context.Background()) + require.ErrorIs(t, err, ErrTemporary) + require.Equal(t, []byte(nil), out) - bt.repeatStep(10, 2, true, nil) - bt.repeatStep(10, 3, false, nil) - bt.assertOrigin(3) + // Load the second frame + out, err = cb.NextData(context.Background()) + require.ErrorIs(t, err, ErrTemporary) + require.Equal(t, []byte(nil), out) - bt.repeatStep(10, 3, true, nil) - bt.repeatStep(10, 4, false, nil) - bt.assertOrigin(4) + // Pull out the channel data + out, err = cb.NextData(context.Background()) + require.Nil(t, err) + require.Equal(t, "firstsecondthird", string(out)) - // Properly closed channel - bt.expectChannel("foobarclosed") - bt.ingestFrames("b:0:foobar") - bt.ingestFrames("b:1:closed!") - bt.repeatStep(10, 4, true, nil) - bt.assertExpectations() - }, - }, - { - name: "duplicate frames", - originTimes: []uint64{0, 1, 2, 3, 4, 5}, - nextStartsAt: 3, // Start next stage at block #3 - channelTimeout: 2, // Start at block #1c - fn: func(bt *bankTestSetup) { - bt.expectL1BlockRefByNumber(1) - bt.repeatResetStep(10, nil) - bt.ingestFrames("a:0:first") // will time out b/c not closed + // No more data + out, err = cb.NextData(context.Background()) + require.Nil(t, out) + require.Equal(t, io.EOF, err) +} - bt.repeatStep(10, 1, true, nil) - bt.repeatStep(10, 2, false, nil) - bt.assertOrigin(2) +func TestChannelBankDuplicates(t *testing.T) { + rng := rand.New(rand.NewSource(1234)) + a := testutils.RandomBlockRef(rng) - bt.repeatStep(10, 2, true, nil) - bt.repeatStep(10, 3, false, nil) - bt.assertOrigin(3) + input := &fakeChannelBankInput{origin: a} + input.AddFrames("a:0:first", "a:2:third!") + input.AddFrames("a:0:altfirst", "a:2:altthird!") + input.AddFrames("a:1:second") + input.AddOutput(nil, io.EOF) - bt.repeatStep(10, 3, true, nil) - bt.repeatStep(10, 4, false, nil) - bt.assertOrigin(4) + cfg := &rollup.Config{ChannelTimeout: 10} - bt.ingestFrames("a:0:first") - bt.repeatStep(1, 4, false, nil) - bt.ingestFrames("a:1:second") - bt.repeatStep(1, 4, false, nil) - bt.ingestFrames("a:0:altfirst") // ignored as duplicate - bt.repeatStep(1, 4, false, nil) - bt.ingestFrames("a:1:altsecond") // ignored as duplicate - bt.repeatStep(1, 4, false, nil) - bt.ingestFrames("b:0:new") - bt.repeatStep(1, 4, false, nil) + cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil) - // close origin 4 - bt.repeatStep(2, 4, true, nil) + // Load the first + third frame + out, err := cb.NextData(context.Background()) + require.ErrorIs(t, err, ErrTemporary) + require.Equal(t, []byte(nil), out) - // open origin 1 - bt.repeatStep(2, 5, false, nil) - bt.ingestFrames("b:1:hi!") // close the other one first, but blocked - bt.repeatStep(1, 5, false, nil) - bt.ingestFrames("a:2:!") // empty closing frame - bt.expectChannel("firstsecond") - bt.expectChannel("newhi") - bt.repeatStep(5, 5, false, nil) - bt.assertExpectations() - }, - }, - { - name: "skip bad frames", - originTimes: []uint64{101, 102}, - nextStartsAt: 0, - channelTimeout: 3, - fn: func(bt *bankTestSetup) { - // don't do the whole setup process, just override where the stages are - bt.cb.progress = Progress{Origin: bt.origins[0], Closed: false} - bt.out.progress = Progress{Origin: bt.origins[0], Closed: false} + // Load the duplicate frames + out, err = cb.NextData(context.Background()) + require.ErrorIs(t, err, ErrTemporary) + require.Equal(t, []byte(nil), out) - bt.assertOriginTime(101) + // Load the second frame + out, err = cb.NextData(context.Background()) + require.ErrorIs(t, err, ErrTemporary) + require.Equal(t, []byte(nil), out) - badTx := new(bytes.Buffer) - badTx.WriteByte(DerivationVersion0) - goodFrame := testFrame("a:0:helloworld!").ToFrame() - if err := goodFrame.MarshalBinary(badTx); err != nil { - panic(fmt.Errorf("error in marshalling frame: %w", err)) - } - badTx.Write(testutils.RandomData(bt.rng, 30)) // incomplete frame data - bt.ingestData(badTx.Bytes()) - // Expect the bad frame to render the entire chunk invalid. - bt.repeatStep(2, 0, false, nil) - bt.assertExpectations() - }, - }, - } - for _, testCase := range testCases { - t.Run(testCase.name, testCase.Run) - } + // Pull out the channel data. Expect to see the original set & not the duplicates + out, err = cb.NextData(context.Background()) + require.Nil(t, err) + require.Equal(t, "firstsecondthird", string(out)) + + // No more data + out, err = cb.NextData(context.Background()) + require.Nil(t, out) + require.Equal(t, io.EOF, err) } diff --git a/op-node/rollup/derive/channel_in_reader.go b/op-node/rollup/derive/channel_in_reader.go index da96058f0b55d..2c966635b0f4d 100644 --- a/op-node/rollup/derive/channel_in_reader.go +++ b/op-node/rollup/derive/channel_in_reader.go @@ -26,13 +26,18 @@ type ChannelInReader struct { progress Progress next BatchQueueStage + prev *ChannelBank } -var _ ChannelBankOutput = (*ChannelInReader)(nil) +var _ Stage = (*ChannelInReader)(nil) // NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use. -func NewChannelInReader(log log.Logger, next BatchQueueStage) *ChannelInReader { - return &ChannelInReader{log: log, next: next} +func NewChannelInReader(log log.Logger, next BatchQueueStage, prev *ChannelBank) *ChannelInReader { + return &ChannelInReader{ + log: log, + next: next, + prev: prev, + } } func (cr *ChannelInReader) Progress() Progress { @@ -58,27 +63,45 @@ func (cr *ChannelInReader) NextChannel() { } func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error { - if changed, err := cr.progress.Update(outer); err != nil || changed { - return err + // Close ourselves if required + if cr.progress.Closed { + if cr.progress.Origin != cr.prev.Origin() { + cr.progress.Closed = false + cr.progress.Origin = cr.prev.Origin() + return nil + } } if cr.nextBatchFn == nil { - return io.EOF - } - - // TODO: can batch be non nil while err == io.EOF - // This depends on the behavior of rlp.Stream - batch, err := cr.nextBatchFn() - - if err == io.EOF { - return io.EOF - } else if err != nil { - cr.log.Warn("failed to read batch from channel reader, skipping to next channel now", "err", err) - cr.NextChannel() + if data, err := cr.prev.NextData(ctx); err == io.EOF { + if !cr.progress.Closed { + cr.progress.Closed = true + return nil + } else { + return io.EOF + } + } else if err != nil { + return err + } else { + cr.WriteChannel(data) + return nil + } + } else { + // TODO: can batch be non nil while err == io.EOF + // This depends on the behavior of rlp.Stream + batch, err := cr.nextBatchFn() + if err == io.EOF { + cr.NextChannel() + return io.EOF + } else if err != nil { + cr.log.Warn("failed to read batch from channel reader, skipping to next channel now", "err", err) + cr.NextChannel() + return nil + } + cr.next.AddBatch(batch.Batch) return nil } - cr.next.AddBatch(batch.Batch) - return nil + } func (cr *ChannelInReader) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index cddf0e0c5d595..4acba52def823 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -98,16 +98,16 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch l1Traversal := NewL1Traversal(log, l1Fetcher) dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) // auxiliary stage for L1Retrieval l1Src := NewL1Retrieval(log, dataSrc, l1Traversal) + bank := NewChannelBank(log, cfg, l1Src, l1Fetcher) // Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages) eng := NewEngineQueue(log, cfg, engine, metrics) attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng) batchQueue := NewBatchQueue(log, cfg, attributesQueue) - chInReader := NewChannelInReader(log, batchQueue) - bank := NewChannelBank(log, cfg, chInReader, l1Src) + chInReader := NewChannelInReader(log, batchQueue, bank) - stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank} - pullStages := []PullStage{l1Src, l1Traversal} + stages := []Stage{eng, attributesQueue, batchQueue, chInReader} + pullStages := []PullStage{bank, l1Src, l1Traversal} return &DerivationPipeline{ log: log, From 94958ab1aa7b0ce014cb0b477ea184425b8d98d2 Mon Sep 17 00:00:00 2001 From: Joshua Gutow Date: Tue, 27 Sep 2022 14:30:07 -0700 Subject: [PATCH 4/6] op-node: Switch channel in reader to a pull based stage Like the rest of the changes, this also required modifications to the next stage - the batch queue in order for it to manage the progress API. This commit required even more changes than usual. I changed the pipeline to be reset to a common starting point and now use the L2SafeHead block to filter out adding batches & L1 blocks to the batch queue. --- op-node/rollup/derive/batch_queue.go | 62 +++++++++++++--- op-node/rollup/derive/batch_queue_test.go | 12 +++- op-node/rollup/derive/channel_bank.go | 38 +++------- op-node/rollup/derive/channel_bank_test.go | 10 +-- op-node/rollup/derive/channel_in_reader.go | 84 ++++++++-------------- op-node/rollup/derive/engine_queue.go | 11 ++- op-node/rollup/derive/engine_queue_test.go | 1 + op-node/rollup/derive/error.go | 5 ++ op-node/rollup/derive/pipeline.go | 8 +-- op-node/rollup/driver/state.go | 4 ++ 10 files changed, 132 insertions(+), 103 deletions(-) diff --git a/op-node/rollup/derive/batch_queue.go b/op-node/rollup/derive/batch_queue.go index ed49f5dcfdbf7..44afedb9d42a6 100644 --- a/op-node/rollup/derive/batch_queue.go +++ b/op-node/rollup/derive/batch_queue.go @@ -32,12 +32,18 @@ type BatchQueueOutput interface { SafeL2Head() eth.L2BlockRef } +type NextBatchProvider interface { + Origin() eth.L1BlockRef + NextBatch(ctx context.Context) (*BatchData, error) +} + // BatchQueue contains a set of batches for every L1 block. // L1 blocks are contiguous and this does not support reorgs. type BatchQueue struct { log log.Logger config *rollup.Config next BatchQueueOutput + prev NextBatchProvider progress Progress l1Blocks []eth.L1BlockRef @@ -47,11 +53,12 @@ type BatchQueue struct { } // NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use. -func NewBatchQueue(log log.Logger, cfg *rollup.Config, next BatchQueueOutput) *BatchQueue { +func NewBatchQueue(log log.Logger, cfg *rollup.Config, next BatchQueueOutput, prev NextBatchProvider) *BatchQueue { return &BatchQueue{ log: log, config: cfg, next: next, + prev: prev, } } @@ -60,19 +67,58 @@ func (bq *BatchQueue) Progress() Progress { } func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error { - if changed, err := bq.progress.Update(outer); err != nil { - return err - } else if changed { - if !bq.progress.Closed { // init inputs if we moved to a new open origin + + originBehind := bq.progress.Origin.Number < bq.next.SafeL2Head().L1Origin.Number + + // Advance origin if needed + // Note: The entire pipeline has the same origin + // We just don't accept batches prior to the L1 origin of the L2 safe head + if bq.progress.Origin != bq.prev.Origin() { + bq.progress.Closed = false + bq.progress.Origin = bq.prev.Origin() + if !originBehind { bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin) } + bq.log.Info("Advancing bq origin", "origin", bq.progress.Origin) return nil } + if !bq.progress.Closed { + if batch, err := bq.prev.NextBatch(ctx); err == io.EOF { + bq.log.Info("Closing batch queue origin") + bq.progress.Closed = true + return nil + } else if err != nil { + return err + } else { + bq.log.Info("have batch") + if !originBehind { + bq.AddBatch(batch) + } else { + bq.log.Warn("Skipping old batch") + } + } + } + + // Skip adding batches / blocks to the internal state until they are from the same L1 origin + // as the current safe head. + if originBehind { + if bq.progress.Closed { + return io.EOF + } else { + // Immediately close the stage + bq.progress.Closed = true + return nil + } + } + batch, err := bq.deriveNextBatch(ctx) if err == io.EOF { - // very noisy, commented for now, or we should bump log level from trace to debug - // bq.log.Trace("need more L1 data before deriving next batch", "progress", bq.progress.Origin) - return io.EOF + bq.log.Info("no more batches in deriveNextBatch") + if bq.progress.Closed { + return io.EOF + } else { + return nil + } } else if err != nil { return err } diff --git a/op-node/rollup/derive/batch_queue_test.go b/op-node/rollup/derive/batch_queue_test.go index 9c001d5af5cd6..d766f7345bb8b 100644 --- a/op-node/rollup/derive/batch_queue_test.go +++ b/op-node/rollup/derive/batch_queue_test.go @@ -62,6 +62,7 @@ func mockHash(time uint64, layer uint8) common.Hash { return hash } +// nolint - will be used in next PR when the t.Skip goes away func b(timestamp uint64, epoch eth.L1BlockRef) *BatchData { rng := rand.New(rand.NewSource(int64(timestamp))) data := testutils.RandomData(rng, 20) @@ -91,6 +92,7 @@ func L1Chain(l1Times []uint64) []eth.L1BlockRef { } func TestBatchQueueEager(t *testing.T) { + t.Skip("want to migrate the test suite at once") log := testlog.Logger(t, log.LvlTrace) l1 := L1Chain([]uint64{10, 20, 30}) next := &fakeBatchQueueOutput{ @@ -116,7 +118,7 @@ func TestBatchQueueEager(t *testing.T) { SeqWindowSize: 30, } - bq := NewBatchQueue(log, cfg, next) + bq := NewBatchQueue(log, cfg, next, nil) require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step") // We start with an open L1 origin as progress in the first step @@ -136,6 +138,8 @@ func TestBatchQueueEager(t *testing.T) { } func TestBatchQueueFull(t *testing.T) { + t.Skip("want to migrate the test suite at once") + log := testlog.Logger(t, log.LvlTrace) l1 := L1Chain([]uint64{10, 15, 20}) next := &fakeBatchQueueOutput{ @@ -161,7 +165,7 @@ func TestBatchQueueFull(t *testing.T) { SeqWindowSize: 2, } - bq := NewBatchQueue(log, cfg, next) + bq := NewBatchQueue(log, cfg, next, nil) require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step") // We start with an open L1 origin as progress in the first step @@ -224,6 +228,8 @@ func TestBatchQueueFull(t *testing.T) { } func TestBatchQueueMissing(t *testing.T) { + t.Skip("want to migrate the test suite at once") + log := testlog.Logger(t, log.LvlTrace) l1 := L1Chain([]uint64{10, 15, 20}) next := &fakeBatchQueueOutput{ @@ -249,7 +255,7 @@ func TestBatchQueueMissing(t *testing.T) { SeqWindowSize: 2, } - bq := NewBatchQueue(log, cfg, next) + bq := NewBatchQueue(log, cfg, next, nil) require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step") // We start with an open L1 origin as progress in the first step diff --git a/op-node/rollup/derive/channel_bank.go b/op-node/rollup/derive/channel_bank.go index 402e0134df89f..9a4581051cddc 100644 --- a/op-node/rollup/derive/channel_bank.go +++ b/op-node/rollup/derive/channel_bank.go @@ -2,8 +2,6 @@ package derive import ( "context" - "errors" - "fmt" "io" "github.com/ethereum-optimism/optimism/op-node/eth" @@ -36,8 +34,6 @@ type ChannelBank struct { channels map[ChannelID]*Channel // channels by ID channelQueue []ChannelID // channels in FIFO order - origin eth.L1BlockRef - prev NextDataProvider fetcher L1Fetcher } @@ -79,7 +75,8 @@ func (cb *ChannelBank) prune() { // IngestData adds new L1 data to the channel bank. // Read() should be called repeatedly first, until everything has been read, before adding new data.\ func (cb *ChannelBank) IngestData(data []byte) { - cb.log.Debug("channel bank got new data", "origin", cb.origin, "data_len", len(data)) + origin := cb.Origin() + cb.log.Debug("channel bank got new data", "origin", origin, "data_len", len(data)) // TODO: Why is the prune here? cb.prune() @@ -95,19 +92,19 @@ func (cb *ChannelBank) IngestData(data []byte) { currentCh, ok := cb.channels[f.ID] if !ok { // create new channel if it doesn't exist yet - currentCh = NewChannel(f.ID, cb.origin) + currentCh = NewChannel(f.ID, origin) cb.channels[f.ID] = currentCh cb.channelQueue = append(cb.channelQueue, f.ID) } // check if the channel is not timed out - if currentCh.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.origin.Number { + if currentCh.OpenBlockNumber()+cb.cfg.ChannelTimeout < origin.Number { cb.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber) continue } cb.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data)) - if err := currentCh.AddFrame(f, cb.origin); err != nil { + if err := currentCh.AddFrame(f, origin); err != nil { cb.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err) continue } @@ -122,7 +119,7 @@ func (cb *ChannelBank) Read() (data []byte, err error) { } first := cb.channelQueue[0] ch := cb.channels[first] - timedOut := ch.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.origin.Number + timedOut := ch.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.Origin().Number if timedOut { cb.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs)) delete(cb.channels, first) @@ -147,9 +144,6 @@ func (cb *ChannelBank) Read() (data []byte, err error) { // consistency around channel bank pruning which depends upon the order // of operations. func (cb *ChannelBank) NextData(ctx context.Context) ([]byte, error) { - if cb.origin != cb.prev.Origin() { - cb.origin = cb.prev.Origin() - } // Do the read from the channel bank first data, err := cb.Read() @@ -168,27 +162,13 @@ func (cb *ChannelBank) NextData(ctx context.Context) ([]byte, error) { return nil, err } else { cb.IngestData(data) - return nil, NewTemporaryError(errors.New("not enough data")) + return nil, NotEnoughData } } -// ResetStep walks back the L1 chain, starting at the origin of the next stage, -// to find the origin that the channel bank should be reset to, -// to get consistent reads starting at origin. -// Any channel data before this origin will be timed out by the time the channel bank is synced up to the origin, -// so it is not relevant to replay it into the bank. func (cb *ChannelBank) Reset(ctx context.Context, base eth.L1BlockRef) error { - cb.log.Debug("walking back to find reset origin for channel bank", "origin", base) - // go back in history if we are not distant enough from the next stage - resetBlock := base.Number - cb.cfg.ChannelTimeout - if base.Number < cb.cfg.ChannelTimeout { - resetBlock = 0 // don't underflow - } - parent, err := cb.fetcher.L1BlockRefByNumber(ctx, resetBlock) - if err != nil { - return NewTemporaryError(fmt.Errorf("failed to find channel bank block, failed to retrieve L1 reference: %w", err)) - } - cb.origin = parent + cb.channels = make(map[ChannelID]*Channel) + cb.channelQueue = make([]ChannelID, 0, 10) return io.EOF } diff --git a/op-node/rollup/derive/channel_bank_test.go b/op-node/rollup/derive/channel_bank_test.go index 86cf3d53584e2..822bc059c666c 100644 --- a/op-node/rollup/derive/channel_bank_test.go +++ b/op-node/rollup/derive/channel_bank_test.go @@ -113,12 +113,12 @@ func TestChannelBankSimple(t *testing.T) { // Load the first + third frame out, err := cb.NextData(context.Background()) - require.ErrorIs(t, err, ErrTemporary) + require.ErrorIs(t, err, NotEnoughData) require.Equal(t, []byte(nil), out) // Load the second frame out, err = cb.NextData(context.Background()) - require.ErrorIs(t, err, ErrTemporary) + require.ErrorIs(t, err, NotEnoughData) require.Equal(t, []byte(nil), out) // Pull out the channel data @@ -148,17 +148,17 @@ func TestChannelBankDuplicates(t *testing.T) { // Load the first + third frame out, err := cb.NextData(context.Background()) - require.ErrorIs(t, err, ErrTemporary) + require.ErrorIs(t, err, NotEnoughData) require.Equal(t, []byte(nil), out) // Load the duplicate frames out, err = cb.NextData(context.Background()) - require.ErrorIs(t, err, ErrTemporary) + require.ErrorIs(t, err, NotEnoughData) require.Equal(t, []byte(nil), out) // Load the second frame out, err = cb.NextData(context.Background()) - require.ErrorIs(t, err, ErrTemporary) + require.ErrorIs(t, err, NotEnoughData) require.Equal(t, []byte(nil), out) // Pull out the channel data. Expect to see the original set & not the duplicates diff --git a/op-node/rollup/derive/channel_in_reader.go b/op-node/rollup/derive/channel_in_reader.go index 2c966635b0f4d..9865dacd5aa7f 100644 --- a/op-node/rollup/derive/channel_in_reader.go +++ b/op-node/rollup/derive/channel_in_reader.go @@ -5,6 +5,7 @@ import ( "context" "io" + "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum/go-ethereum/log" ) @@ -13,46 +14,36 @@ import ( // This is a pure function from the channel, but each channel (or channel fragment) // must be tagged with an L1 inclusion block to be passed to the the batch queue. -type BatchQueueStage interface { - StageProgress - AddBatch(batch *BatchData) -} - type ChannelInReader struct { log log.Logger nextBatchFn func() (BatchWithL1InclusionBlock, error) - progress Progress - - next BatchQueueStage prev *ChannelBank } -var _ Stage = (*ChannelInReader)(nil) +var _ PullStage = (*ChannelInReader)(nil) // NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use. -func NewChannelInReader(log log.Logger, next BatchQueueStage, prev *ChannelBank) *ChannelInReader { +func NewChannelInReader(log log.Logger, prev *ChannelBank) *ChannelInReader { return &ChannelInReader{ log: log, - next: next, prev: prev, } } -func (cr *ChannelInReader) Progress() Progress { - return cr.progress +func (cr *ChannelInReader) Origin() eth.L1BlockRef { + return cr.prev.Origin() } // TODO: Take full channel for better logging -func (cr *ChannelInReader) WriteChannel(data []byte) { - if cr.progress.Closed { - panic("write channel while closed") - } - if f, err := BatchReader(bytes.NewBuffer(data), cr.progress.Origin); err == nil { +func (cr *ChannelInReader) WriteChannel(data []byte) error { + if f, err := BatchReader(bytes.NewBuffer(data), cr.Origin()); err == nil { cr.nextBatchFn = f + return nil } else { cr.log.Error("Error creating batch reader from channel data", "err", err) + return err } } @@ -62,50 +53,37 @@ func (cr *ChannelInReader) NextChannel() { cr.nextBatchFn = nil } -func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error { - // Close ourselves if required - if cr.progress.Closed { - if cr.progress.Origin != cr.prev.Origin() { - cr.progress.Closed = false - cr.progress.Origin = cr.prev.Origin() - return nil - } - } - +// NextBatch pulls out the next batch from the channel if it has it. +// It returns io.EOF when it cannot make any more progress. +// It will return a temporary error if it needs to be called again to advance some internal state. +func (cr *ChannelInReader) NextBatch(ctx context.Context) (*BatchData, error) { if cr.nextBatchFn == nil { if data, err := cr.prev.NextData(ctx); err == io.EOF { - if !cr.progress.Closed { - cr.progress.Closed = true - return nil - } else { - return io.EOF - } + return nil, io.EOF } else if err != nil { - return err + return nil, err } else { - cr.WriteChannel(data) - return nil - } - } else { - // TODO: can batch be non nil while err == io.EOF - // This depends on the behavior of rlp.Stream - batch, err := cr.nextBatchFn() - if err == io.EOF { - cr.NextChannel() - return io.EOF - } else if err != nil { - cr.log.Warn("failed to read batch from channel reader, skipping to next channel now", "err", err) - cr.NextChannel() - return nil + if err := cr.WriteChannel(data); err != nil { + return nil, NewTemporaryError(err) + } } - cr.next.AddBatch(batch.Batch) - return nil } + // TODO: can batch be non nil while err == io.EOF + // This depends on the behavior of rlp.Stream + batch, err := cr.nextBatchFn() + if err == io.EOF { + cr.NextChannel() + return nil, NotEnoughData + } else if err != nil { + cr.log.Warn("failed to read batch from channel reader, skipping to next channel now", "err", err) + cr.NextChannel() + return nil, NotEnoughData + } + return batch.Batch, nil } -func (cr *ChannelInReader) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { +func (cr *ChannelInReader) Reset(ctx context.Context, _ eth.L1BlockRef) error { cr.nextBatchFn = nil - cr.progress = cr.next.Progress() return io.EOF } diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index dbdffec809cc1..412b553e3a4ab 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -398,6 +398,15 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error return NewResetError(fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken", safe, safe.Time, l1Origin, l1Origin.Time)) } + + pipelineNumber := l1Origin.Number - eq.cfg.ChannelTimeout + if l1Origin.Number < eq.cfg.ChannelTimeout { + pipelineNumber = 0 + } + pipelineOrigin, err := l1Fetcher.L1BlockRefByNumber(ctx, pipelineNumber) + if err != nil { + return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", pipelineNumber, err)) + } eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin) eq.unsafeHead = unsafe eq.safeHead = safe @@ -405,7 +414,7 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error eq.finalityData = eq.finalityData[:0] // note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads. eq.progress = Progress{ - Origin: l1Origin, + Origin: pipelineOrigin, Closed: false, } eq.metrics.RecordL2Ref("l2_finalized", finalized) diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go index 9506bc0733214..503f40c33e118 100644 --- a/op-node/rollup/derive/engine_queue_test.go +++ b/op-node/rollup/derive/engine_queue_test.go @@ -209,6 +209,7 @@ func TestEngineQueue_Finalize(t *testing.T) { // and we fetch the L1 origin of that as starting point for engine queue l1F.ExpectL1BlockRefByHash(refB.Hash, refB, nil) + l1F.ExpectL1BlockRefByNumber(refB.Number, refB, nil) eq := NewEngineQueue(logger, cfg, eng, metrics) require.NoError(t, RepeatResetStep(t, eq.ResetStep, l1F, 20)) diff --git a/op-node/rollup/derive/error.go b/op-node/rollup/derive/error.go index 893e0fad1b8a7..ef896d2fa677e 100644 --- a/op-node/rollup/derive/error.go +++ b/op-node/rollup/derive/error.go @@ -1,6 +1,7 @@ package derive import ( + "errors" "fmt" ) @@ -91,3 +92,7 @@ func NewCriticalError(err error) error { var ErrTemporary = NewTemporaryError(nil) var ErrReset = NewResetError(nil) var ErrCritical = NewCriticalError(nil) + +// NotEnoughData implies that the function currently does not have enough data to progress +// but if it is retried enough times, it will eventually return a real value or io.EOF +var NotEnoughData = errors.New("not enough data") diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 4acba52def823..b5cf89f3f7ec8 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -99,15 +99,15 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) // auxiliary stage for L1Retrieval l1Src := NewL1Retrieval(log, dataSrc, l1Traversal) bank := NewChannelBank(log, cfg, l1Src, l1Fetcher) + chInReader := NewChannelInReader(log, bank) // Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages) eng := NewEngineQueue(log, cfg, engine, metrics) attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng) - batchQueue := NewBatchQueue(log, cfg, attributesQueue) - chInReader := NewChannelInReader(log, batchQueue, bank) + batchQueue := NewBatchQueue(log, cfg, attributesQueue, chInReader) - stages := []Stage{eng, attributesQueue, batchQueue, chInReader} - pullStages := []PullStage{bank, l1Src, l1Traversal} + stages := []Stage{eng, attributesQueue, batchQueue} + pullStages := []PullStage{chInReader, bank, l1Src, l1Traversal} return &DerivationPipeline{ log: log, diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index cd2bdd2fe5abf..3538cb6d8344f 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -440,6 +440,10 @@ func (s *state) eventLoop() { } else if err != nil && errors.Is(err, derive.ErrCritical) { s.log.Error("Derivation process critical error", "err", err) return + } else if err != nil && errors.Is(err, derive.NotEnoughData) { + stepAttempts = 0 // don't do a backoff for this error + reqStep() + continue } else if err != nil { s.log.Error("Derivation process error", "attempts", stepAttempts, "err", err) reqStep() From 48b7cc5bad954020bf0768a594f74c4556b12e70 Mon Sep 17 00:00:00 2001 From: Joshua Gutow Date: Tue, 27 Sep 2022 14:30:34 -0700 Subject: [PATCH 5/6] op-node: Switch batch queue to be pull based The attributes queue actually had pretty few modifications to work with the progress API. The logic of switching the batch queue over was a bit more complex because the batch queue is very stateful, but still not the worst. --- op-node/rollup/derive/attributes_queue.go | 25 +- .../rollup/derive/attributes_queue_test.go | 3 +- op-node/rollup/derive/batch_queue.go | 115 +++---- op-node/rollup/derive/batch_queue_test.go | 323 ++++++------------ op-node/rollup/derive/pipeline.go | 8 +- 5 files changed, 189 insertions(+), 285 deletions(-) diff --git a/op-node/rollup/derive/attributes_queue.go b/op-node/rollup/derive/attributes_queue.go index cec93477a3205..63088a6eba8b4 100644 --- a/op-node/rollup/derive/attributes_queue.go +++ b/op-node/rollup/derive/attributes_queue.go @@ -33,16 +33,18 @@ type AttributesQueue struct { config *rollup.Config dl L1ReceiptsFetcher next AttributesQueueOutput + prev *BatchQueue progress Progress batches []*BatchData } -func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, next AttributesQueueOutput) *AttributesQueue { +func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, next AttributesQueueOutput, prev *BatchQueue) *AttributesQueue { return &AttributesQueue{ log: log, config: cfg, dl: l1Fetcher, next: next, + prev: prev, } } @@ -56,11 +58,26 @@ func (aq *AttributesQueue) Progress() Progress { } func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error { - if changed, err := aq.progress.Update(outer); err != nil || changed { - return err + if aq.progress.Origin != aq.prev.Origin() { + aq.progress.Closed = false + aq.progress.Origin = aq.prev.Origin() + return nil } + if len(aq.batches) == 0 { - return io.EOF + batch, err := aq.prev.NextBatch(ctx, aq.next.SafeL2Head()) + if err == io.EOF { + if !aq.progress.Closed { + aq.progress.Closed = true + return nil + } else { + return io.EOF + } + + } else if err != nil { + return err + } + aq.batches = append(aq.batches, batch) } batch := aq.batches[0] diff --git a/op-node/rollup/derive/attributes_queue_test.go b/op-node/rollup/derive/attributes_queue_test.go index 226cfa82fd701..959fc79c6b6e9 100644 --- a/op-node/rollup/derive/attributes_queue_test.go +++ b/op-node/rollup/derive/attributes_queue_test.go @@ -40,6 +40,7 @@ func (m *MockAttributesQueueOutput) ExpectSafeL2Head(head eth.L2BlockRef) { var _ AttributesQueueOutput = (*MockAttributesQueueOutput)(nil) func TestAttributesQueue_Step(t *testing.T) { + t.Skip("don't fake out batch queue") // test config, only init the necessary fields cfg := &rollup.Config{ BlockTime: 2, @@ -87,7 +88,7 @@ func TestAttributesQueue_Step(t *testing.T) { } out.ExpectAddSafeAttributes(&attrs) - aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, out) + aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, out, nil) require.NoError(t, RepeatResetStep(t, aq.ResetStep, l1Fetcher, 1)) aq.AddBatch(batch) diff --git a/op-node/rollup/derive/batch_queue.go b/op-node/rollup/derive/batch_queue.go index 44afedb9d42a6..5e5dd5496ff62 100644 --- a/op-node/rollup/derive/batch_queue.go +++ b/op-node/rollup/derive/batch_queue.go @@ -40,11 +40,10 @@ type NextBatchProvider interface { // BatchQueue contains a set of batches for every L1 block. // L1 blocks are contiguous and this does not support reorgs. type BatchQueue struct { - log log.Logger - config *rollup.Config - next BatchQueueOutput - prev NextBatchProvider - progress Progress + log log.Logger + config *rollup.Config + prev NextBatchProvider + origin eth.L1BlockRef l1Blocks []eth.L1BlockRef @@ -53,102 +52,91 @@ type BatchQueue struct { } // NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use. -func NewBatchQueue(log log.Logger, cfg *rollup.Config, next BatchQueueOutput, prev NextBatchProvider) *BatchQueue { +func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider) *BatchQueue { return &BatchQueue{ log: log, config: cfg, - next: next, prev: prev, } } -func (bq *BatchQueue) Progress() Progress { - return bq.progress +func (bq *BatchQueue) Origin() eth.L1BlockRef { + return bq.prev.Origin() } -func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error { - - originBehind := bq.progress.Origin.Number < bq.next.SafeL2Head().L1Origin.Number +func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) { + originBehind := bq.origin.Number < safeL2Head.L1Origin.Number // Advance origin if needed // Note: The entire pipeline has the same origin // We just don't accept batches prior to the L1 origin of the L2 safe head - if bq.progress.Origin != bq.prev.Origin() { - bq.progress.Closed = false - bq.progress.Origin = bq.prev.Origin() + if bq.origin != bq.prev.Origin() { + bq.origin = bq.prev.Origin() if !originBehind { - bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin) - } - bq.log.Info("Advancing bq origin", "origin", bq.progress.Origin) - return nil - } - if !bq.progress.Closed { - if batch, err := bq.prev.NextBatch(ctx); err == io.EOF { - bq.log.Info("Closing batch queue origin") - bq.progress.Closed = true - return nil - } else if err != nil { - return err + bq.l1Blocks = append(bq.l1Blocks, bq.origin) } else { - bq.log.Info("have batch") - if !originBehind { - bq.AddBatch(batch) - } else { - bq.log.Warn("Skipping old batch") - } + // This is to handle the special case of startup. At startup we call Reset & include + // the L1 origin. That is the only time where immediately after `Reset` is called + // originBehind is false. + bq.l1Blocks = bq.l1Blocks[:0] } + bq.log.Info("Advancing bq origin", "origin", bq.origin) + } + + // Load more data into the batch queue + outOfData := false + if batch, err := bq.prev.NextBatch(ctx); err == io.EOF { + outOfData = true + } else if err != nil { + return nil, err + } else if !originBehind { + bq.AddBatch(batch, safeL2Head) } - // Skip adding batches / blocks to the internal state until they are from the same L1 origin - // as the current safe head. + // Skip adding data unless we are up to date with the origin, but do fully + // empty the previous stages if originBehind { - if bq.progress.Closed { - return io.EOF + if outOfData { + return nil, io.EOF } else { - // Immediately close the stage - bq.progress.Closed = true - return nil + return nil, NotEnoughData } } - batch, err := bq.deriveNextBatch(ctx) - if err == io.EOF { - bq.log.Info("no more batches in deriveNextBatch") - if bq.progress.Closed { - return io.EOF - } else { - return nil - } + // Finally attempt to derive more batches + batch, err := bq.deriveNextBatch(ctx, outOfData, safeL2Head) + if err == io.EOF && outOfData { + return nil, io.EOF + } else if err == io.EOF { + return nil, NotEnoughData } else if err != nil { - return err + return nil, err } - bq.next.AddBatch(batch) - return nil + return batch, nil } -func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { +func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef) error { // Copy over the Origin from the next stage // It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress - bq.progress = bq.next.Progress() + bq.origin = base bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock) // Include the new origin as an origin to build on + // Note: This is only for the initialization case. During normal resets we will later + // throw out this block. bq.l1Blocks = bq.l1Blocks[:0] - bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin) + bq.l1Blocks = append(bq.l1Blocks, base) return io.EOF } -func (bq *BatchQueue) AddBatch(batch *BatchData) { - if bq.progress.Closed { - panic("write batch while closed") - } +func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) { if len(bq.l1Blocks) == 0 { panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp)) } data := BatchWithL1InclusionBlock{ - L1InclusionBlock: bq.progress.Origin, + L1InclusionBlock: bq.origin, Batch: batch, } - validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, bq.next.SafeL2Head(), &data) + validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data) if validity == BatchDrop { return // if we do drop the batch, CheckBatch will log the drop reason with WARN level. } @@ -159,12 +147,11 @@ func (bq *BatchQueue) AddBatch(batch *BatchData) { // following the validity rules imposed on consecutive batches, // based on currently available buffered batch and L1 origin information. // If no batch can be derived yet, then (nil, io.EOF) is returned. -func (bq *BatchQueue) deriveNextBatch(ctx context.Context) (*BatchData, error) { +func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) { if len(bq.l1Blocks) == 0 { return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared")) } epoch := bq.l1Blocks[0] - l2SafeHead := bq.next.SafeL2Head() if l2SafeHead.L1Origin != epoch.ID() { return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head %s", epoch, l2SafeHead)) @@ -229,8 +216,8 @@ batchLoop: // i.e. if the sequence window expired, we create empty batches expiryEpoch := epoch.Number + bq.config.SeqWindowSize forceNextEpoch := - (expiryEpoch == bq.progress.Origin.Number && bq.progress.Closed) || - expiryEpoch < bq.progress.Origin.Number + (expiryEpoch == bq.origin.Number && outOfData) || + expiryEpoch < bq.origin.Number if !forceNextEpoch { // sequence window did not expire yet, still room to receive batches for the current epoch, diff --git a/op-node/rollup/derive/batch_queue_test.go b/op-node/rollup/derive/batch_queue_test.go index d766f7345bb8b..b921ff1318aaa 100644 --- a/op-node/rollup/derive/batch_queue_test.go +++ b/op-node/rollup/derive/batch_queue_test.go @@ -7,8 +7,6 @@ import ( "math/rand" "testing" - "github.com/stretchr/testify/require" - "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/testlog" @@ -16,44 +14,28 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" ) -// fakeBatchQueueOutput fakes the next stage (receive only) for the batch queue -// It tracks the progress state of the next stage. -// Upon receiving a batch, relevant characteristic of safeL2Head are immediately advanced. -type fakeBatchQueueOutput struct { - progress Progress - batches []*BatchData - safeL2Head eth.L2BlockRef -} - -var _ BatchQueueOutput = (*fakeBatchQueueOutput)(nil) - -func (f *fakeBatchQueueOutput) AddBatch(batch *BatchData) { - f.batches = append(f.batches, batch) - if batch.ParentHash != f.safeL2Head.Hash { - panic("batch has wrong parent hash") - } - newEpoch := f.safeL2Head.L1Origin.Hash != batch.EpochHash - // Advance SafeL2Head - f.safeL2Head.Time = batch.Timestamp - f.safeL2Head.L1Origin.Number = uint64(batch.EpochNum) - f.safeL2Head.L1Origin.Hash = batch.EpochHash - if newEpoch { - f.safeL2Head.SequenceNumber = 0 - } else { - f.safeL2Head.SequenceNumber += 1 - } - f.safeL2Head.ParentHash = batch.ParentHash - f.safeL2Head.Hash = mockHash(batch.Timestamp, 2) +type fakeBatchQueueInput struct { + i int + batches []*BatchData + errors []error + origin eth.L1BlockRef } -func (f *fakeBatchQueueOutput) SafeL2Head() eth.L2BlockRef { - return f.safeL2Head +func (f *fakeBatchQueueInput) Origin() eth.L1BlockRef { + return f.origin } -func (f *fakeBatchQueueOutput) Progress() Progress { - return f.progress +func (f *fakeBatchQueueInput) NextBatch(ctx context.Context) (*BatchData, error) { + if f.i >= len(f.batches) { + return nil, io.EOF + } + b := f.batches[f.i] + e := f.errors[f.i] + f.i += 1 + return b, e } func mockHash(time uint64, layer uint8) common.Hash { @@ -62,7 +44,6 @@ func mockHash(time uint64, layer uint8) common.Hash { return hash } -// nolint - will be used in next PR when the t.Skip goes away func b(timestamp uint64, epoch eth.L1BlockRef) *BatchData { rng := rand.New(rand.NewSource(int64(timestamp))) data := testutils.RandomData(rng, 20) @@ -91,23 +72,18 @@ func L1Chain(l1Times []uint64) []eth.L1BlockRef { return out } +// TestBatchQueueEager adds a bunch of contiguous batches and asserts that +// enough calls to `NextBatch` return all of those batches. func TestBatchQueueEager(t *testing.T) { - t.Skip("want to migrate the test suite at once") - log := testlog.Logger(t, log.LvlTrace) + log := testlog.Logger(t, log.LvlCrit) l1 := L1Chain([]uint64{10, 20, 30}) - next := &fakeBatchQueueOutput{ - safeL2Head: eth.L2BlockRef{ - Hash: mockHash(10, 2), - Number: 0, - ParentHash: common.Hash{}, - Time: 10, - L1Origin: l1[0].ID(), - SequenceNumber: 0, - }, - progress: Progress{ - Origin: l1[0], - Closed: false, - }, + safeHead := eth.L2BlockRef{ + Hash: mockHash(10, 2), + Number: 0, + ParentHash: common.Hash{}, + Time: 10, + L1Origin: l1[0].ID(), + SequenceNumber: 0, } cfg := &rollup.Config{ Genesis: rollup.Genesis{ @@ -118,133 +94,44 @@ func TestBatchQueueEager(t *testing.T) { SeqWindowSize: 30, } - bq := NewBatchQueue(log, cfg, next, nil) - require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step") - - // We start with an open L1 origin as progress in the first step - progress := bq.progress - require.Equal(t, bq.progress.Closed, false) + batches := []*BatchData{b(12, l1[0]), b(14, l1[0]), b(16, l1[0]), b(18, l1[0]), b(20, l1[0]), b(22, l1[0]), b(24, l1[1]), nil} + errors := []error{nil, nil, nil, nil, nil, nil, nil, io.EOF} - // Add batches - batches := []*BatchData{b(12, l1[0]), b(14, l1[0])} - for _, batch := range batches { - bq.AddBatch(batch) - } - // Step - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - - // Verify Output - require.Equal(t, batches, next.batches) -} - -func TestBatchQueueFull(t *testing.T) { - t.Skip("want to migrate the test suite at once") - - log := testlog.Logger(t, log.LvlTrace) - l1 := L1Chain([]uint64{10, 15, 20}) - next := &fakeBatchQueueOutput{ - safeL2Head: eth.L2BlockRef{ - Hash: mockHash(10, 2), - Number: 0, - ParentHash: common.Hash{}, - Time: 10, - L1Origin: l1[0].ID(), - SequenceNumber: 0, - }, - progress: Progress{ - Origin: l1[0], - Closed: false, - }, - } - cfg := &rollup.Config{ - Genesis: rollup.Genesis{ - L2Time: 10, - }, - BlockTime: 2, - MaxSequencerDrift: 600, - SeqWindowSize: 2, + input := &fakeBatchQueueInput{ + batches: batches, + errors: errors, + origin: l1[0], } - bq := NewBatchQueue(log, cfg, next, nil) - require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step") - - // We start with an open L1 origin as progress in the first step - progress := bq.progress - require.Equal(t, bq.progress.Closed, false) - - // Add batches - batches := []*BatchData{b(14, l1[0]), b(16, l1[0]), b(18, l1[1])} - for _, batch := range batches { - bq.AddBatch(batch) + bq := NewBatchQueue(log, cfg, input) + _ = bq.Reset(context.Background(), l1[0]) + // Advance the origin + input.origin = l1[1] + + for i := 0; i < len(batches); i++ { + b, e := bq.NextBatch(context.Background(), safeHead) + require.ErrorIs(t, e, errors[i]) + require.Equal(t, batches[i], b) + + if b != nil { + safeHead.Number += 1 + safeHead.Time += 2 + safeHead.Hash = mockHash(b.Timestamp, 2) + safeHead.L1Origin = b.Epoch() + } } - // Missing first batch - err := bq.Step(context.Background(), progress) - require.Equal(t, err, io.EOF) - - // Close previous to close bq - progress.Closed = true - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, true) - - // Open previous to open bq with the new inclusion block - progress.Closed = false - progress.Origin = l1[1] - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, false) - - // Close previous to close bq (for epoch 2) - progress.Closed = true - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, true) - - // Open previous to open bq with the new inclusion block (epoch 2) - progress.Closed = false - progress.Origin = l1[2] - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, false) - - // Finally add batch - firstBatch := b(12, l1[0]) - bq.AddBatch(firstBatch) - - // Close the origin - progress.Closed = true - err = bq.Step(context.Background(), progress) - require.Equal(t, err, nil) - require.Equal(t, bq.progress.Closed, true) - - // Step, but should have full epoch now - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - - // Verify Output - var final []*BatchData - final = append(final, firstBatch) - final = append(final, batches...) - require.Equal(t, final, next.batches) } func TestBatchQueueMissing(t *testing.T) { - t.Skip("want to migrate the test suite at once") - - log := testlog.Logger(t, log.LvlTrace) + log := testlog.Logger(t, log.LvlCrit) l1 := L1Chain([]uint64{10, 15, 20}) - next := &fakeBatchQueueOutput{ - safeL2Head: eth.L2BlockRef{ - Hash: mockHash(10, 2), - Number: 0, - ParentHash: common.Hash{}, - Time: 10, - L1Origin: l1[0].ID(), - SequenceNumber: 0, - }, - progress: Progress{ - Origin: l1[0], - Closed: false, - }, + safeHead := eth.L2BlockRef{ + Hash: mockHash(10, 2), + Number: 0, + ParentHash: common.Hash{}, + Time: 10, + L1Origin: l1[0].ID(), + SequenceNumber: 0, } cfg := &rollup.Config{ Genesis: rollup.Genesis{ @@ -255,56 +142,68 @@ func TestBatchQueueMissing(t *testing.T) { SeqWindowSize: 2, } - bq := NewBatchQueue(log, cfg, next, nil) - require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step") - - // We start with an open L1 origin as progress in the first step - progress := bq.progress - require.Equal(t, bq.progress.Closed, false) - // The batches at 18 and 20 are skipped to stop 22 from being eagerly processed. // This test checks that batch timestamp 12 & 14 are created, 16 is used, and 18 is advancing the epoch. // Due to the large sequencer time drift 16 is perfectly valid to have epoch 0 as origin. batches := []*BatchData{b(16, l1[0]), b(22, l1[1])} - for _, batch := range batches { - bq.AddBatch(batch) - } - // Missing first batches with timestamp 12 and 14, nothing to do yet. - err := bq.Step(context.Background(), progress) - require.Equal(t, err, io.EOF) + errors := []error{nil, nil} - // Close l1[0] - progress.Closed = true - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, true) + input := &fakeBatchQueueInput{ + batches: batches, + errors: errors, + origin: l1[0], + } - // Open l1[1] - progress.Closed = false - progress.Origin = l1[1] - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, false) - require.Empty(t, next.batches, "no batches yet, sequence window did not expire, waiting for 12 and 14") + bq := NewBatchQueue(log, cfg, input) + _ = bq.Reset(context.Background(), l1[0]) - // Close l1[1] - progress.Closed = true - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, true) + for i := 0; i < len(batches); i++ { + b, e := bq.NextBatch(context.Background(), safeHead) + require.ErrorIs(t, e, NotEnoughData) + require.Nil(t, b) + } - // Open l1[2] - progress.Closed = false - progress.Origin = l1[2] - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, false) + // advance origin. Underlying stage still has no more batches + // This is not enough to auto advance yet + input.origin = l1[1] + b, e := bq.NextBatch(context.Background(), safeHead) + require.ErrorIs(t, e, io.EOF) + require.Nil(t, b) + + // Advance the origin. At this point batch timestamps 12 and 14 will be created + input.origin = l1[2] + + // Check for a generated batch at t = 12 + b, e = bq.NextBatch(context.Background(), safeHead) + require.Nil(t, e) + require.Equal(t, b.Timestamp, uint64(12)) + require.Empty(t, b.BatchV1.Transactions) + safeHead.Number += 1 + safeHead.Time += 2 + safeHead.Hash = mockHash(b.Timestamp, 2) + + // Check for generated batch at t = 14 + b, e = bq.NextBatch(context.Background(), safeHead) + require.Nil(t, e) + require.Equal(t, b.Timestamp, uint64(14)) + require.Empty(t, b.BatchV1.Transactions) + safeHead.Number += 1 + safeHead.Time += 2 + safeHead.Hash = mockHash(b.Timestamp, 2) + + // Check for the inputted batch at t = 16 + b, e = bq.NextBatch(context.Background(), safeHead) + require.Nil(t, e) + require.Equal(t, b, batches[0]) + safeHead.Number += 1 + safeHead.Time += 2 + safeHead.Hash = mockHash(b.Timestamp, 2) + + // Check for the generated batch at t = 18. This batch advances the epoch + b, e = bq.NextBatch(context.Background(), safeHead) + require.Nil(t, e) + require.Equal(t, b.Timestamp, uint64(18)) + require.Empty(t, b.BatchV1.Transactions) + require.Equal(t, rollup.Epoch(1), b.EpochNum) - // Close l1[2], this is the moment that l1[0] expires and empty batches 12 and 14 can be created, - // and batch 16 can then be used. - progress.Closed = true - require.NoError(t, RepeatStep(t, bq.Step, progress, 10)) - require.Equal(t, bq.progress.Closed, true) - require.Equal(t, 4, len(next.batches), "expecting empty batches with timestamp 12 and 14 to be created and existing batch 16 to follow") - require.Equal(t, uint64(12), next.batches[0].Timestamp) - require.Equal(t, uint64(14), next.batches[1].Timestamp) - require.Equal(t, batches[0], next.batches[2]) - require.Equal(t, uint64(18), next.batches[3].Timestamp) - require.Equal(t, rollup.Epoch(1), next.batches[3].EpochNum) } diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index b5cf89f3f7ec8..ba2f62472d68d 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -100,14 +100,14 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch l1Src := NewL1Retrieval(log, dataSrc, l1Traversal) bank := NewChannelBank(log, cfg, l1Src, l1Fetcher) chInReader := NewChannelInReader(log, bank) + batchQueue := NewBatchQueue(log, cfg, chInReader) // Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages) eng := NewEngineQueue(log, cfg, engine, metrics) - attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng) - batchQueue := NewBatchQueue(log, cfg, attributesQueue, chInReader) + attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng, batchQueue) - stages := []Stage{eng, attributesQueue, batchQueue} - pullStages := []PullStage{chInReader, bank, l1Src, l1Traversal} + stages := []Stage{eng, attributesQueue} + pullStages := []PullStage{batchQueue, chInReader, bank, l1Src, l1Traversal} return &DerivationPipeline{ log: log, From 102a9c22ada778d636b2f220d9594ecaae140fb7 Mon Sep 17 00:00:00 2001 From: Joshua Gutow Date: Wed, 28 Sep 2022 15:52:41 -0700 Subject: [PATCH 6/6] op-node: Switch attributes queue to be pull based (#3598) The progress API is very nearly removed from the engine queue stage. --- op-node/rollup/derive/attributes_queue.go | 93 +++++++------------ .../rollup/derive/attributes_queue_test.go | 48 ++-------- op-node/rollup/derive/engine_queue.go | 45 ++++++--- op-node/rollup/derive/engine_queue_test.go | 27 +++++- op-node/rollup/derive/pipeline.go | 8 +- 5 files changed, 99 insertions(+), 122 deletions(-) diff --git a/op-node/rollup/derive/attributes_queue.go b/op-node/rollup/derive/attributes_queue.go index 63088a6eba8b4..f2e764b0c3cb0 100644 --- a/op-node/rollup/derive/attributes_queue.go +++ b/op-node/rollup/derive/attributes_queue.go @@ -22,75 +22,60 @@ import ( // This stage can be reset by clearing it's batch buffer. // This stage does not need to retain any references to L1 blocks. -type AttributesQueueOutput interface { - AddSafeAttributes(attributes *eth.PayloadAttributes) - SafeL2Head() eth.L2BlockRef - StageProgress -} - type AttributesQueue struct { - log log.Logger - config *rollup.Config - dl L1ReceiptsFetcher - next AttributesQueueOutput - prev *BatchQueue - progress Progress - batches []*BatchData + log log.Logger + config *rollup.Config + dl L1ReceiptsFetcher + prev *BatchQueue + batch *BatchData } -func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, next AttributesQueueOutput, prev *BatchQueue) *AttributesQueue { +func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, prev *BatchQueue) *AttributesQueue { return &AttributesQueue{ log: log, config: cfg, dl: l1Fetcher, - next: next, prev: prev, } } -func (aq *AttributesQueue) AddBatch(batch *BatchData) { - aq.log.Debug("Received next batch", "batch_epoch", batch.EpochNum, "batch_timestamp", batch.Timestamp, "tx_count", len(batch.Transactions)) - aq.batches = append(aq.batches, batch) -} - -func (aq *AttributesQueue) Progress() Progress { - return aq.progress +func (aq *AttributesQueue) Origin() eth.L1BlockRef { + return aq.prev.Origin() } -func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error { - if aq.progress.Origin != aq.prev.Origin() { - aq.progress.Closed = false - aq.progress.Origin = aq.prev.Origin() - return nil +func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) { + // Get a batch if we need it + if aq.batch == nil { + batch, err := aq.prev.NextBatch(ctx, l2SafeHead) + if err != nil { + return nil, err + } + aq.batch = batch } - if len(aq.batches) == 0 { - batch, err := aq.prev.NextBatch(ctx, aq.next.SafeL2Head()) - if err == io.EOF { - if !aq.progress.Closed { - aq.progress.Closed = true - return nil - } else { - return io.EOF - } - - } else if err != nil { - return err - } - aq.batches = append(aq.batches, batch) + // Actually generate the next attributes + if attrs, err := aq.createNextAttributes(ctx, aq.batch, l2SafeHead); err != nil { + return nil, err + } else { + // Clear out the local state once we will succeed + aq.batch = nil + return attrs, nil } - batch := aq.batches[0] - safeL2Head := aq.next.SafeL2Head() +} + +// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions +// to the attributes transaction list +func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) { // sanity check parent hash - if batch.ParentHash != safeL2Head.Hash { - return NewCriticalError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, safeL2Head.Hash)) + if batch.ParentHash != l2SafeHead.Hash { + return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash)) } fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second) defer cancel() - attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, safeL2Head, batch.Timestamp, batch.Epoch()) + attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, l2SafeHead, batch.Timestamp, batch.Epoch()) if err != nil { - return err + return nil, err } // we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool @@ -100,19 +85,9 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error { aq.log.Info("generated attributes in payload queue", "txs", len(attrs.Transactions), "timestamp", batch.Timestamp) - // Slice off the batch once we are guaranteed to succeed - aq.batches = aq.batches[1:] - - aq.next.AddSafeAttributes(attrs) - return nil + return attrs, nil } -func (aq *AttributesQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { - aq.batches = aq.batches[:0] - aq.progress = aq.next.Progress() +func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error { return io.EOF } - -func (aq *AttributesQueue) SafeL2Head() eth.L2BlockRef { - return aq.next.SafeL2Head() -} diff --git a/op-node/rollup/derive/attributes_queue_test.go b/op-node/rollup/derive/attributes_queue_test.go index 959fc79c6b6e9..c5784f59f1715 100644 --- a/op-node/rollup/derive/attributes_queue_test.go +++ b/op-node/rollup/derive/attributes_queue_test.go @@ -2,7 +2,6 @@ package derive import ( "context" - "io" "math/big" "math/rand" "testing" @@ -17,30 +16,10 @@ import ( "github.com/ethereum/go-ethereum/log" ) -type MockAttributesQueueOutput struct { - MockOriginStage -} - -func (m *MockAttributesQueueOutput) AddSafeAttributes(attributes *eth.PayloadAttributes) { - m.Mock.MethodCalled("AddSafeAttributes", attributes) -} - -func (m *MockAttributesQueueOutput) ExpectAddSafeAttributes(attributes *eth.PayloadAttributes) { - m.Mock.On("AddSafeAttributes", attributes).Once().Return() -} - -func (m *MockAttributesQueueOutput) SafeL2Head() eth.L2BlockRef { - return m.Mock.MethodCalled("SafeL2Head").Get(0).(eth.L2BlockRef) -} - -func (m *MockAttributesQueueOutput) ExpectSafeL2Head(head eth.L2BlockRef) { - m.Mock.On("SafeL2Head").Once().Return(head) -} - -var _ AttributesQueueOutput = (*MockAttributesQueueOutput)(nil) - -func TestAttributesQueue_Step(t *testing.T) { - t.Skip("don't fake out batch queue") +// TestAttributesQueue checks that it properly uses the PreparePayloadAttributes function +// (which is well tested) and that it properly sets NoTxPool and adds in the candidate +// transactions. +func TestAttributesQueue(t *testing.T) { // test config, only init the necessary fields cfg := &rollup.Config{ BlockTime: 2, @@ -57,18 +36,9 @@ func TestAttributesQueue_Step(t *testing.T) { l1Fetcher.ExpectInfoByHash(l1Info.InfoHash, l1Info, nil) - out := &MockAttributesQueueOutput{} - out.progress = Progress{ - Origin: l1Info.BlockRef(), - Closed: false, - } - defer out.AssertExpectations(t) - safeHead := testutils.RandomL2BlockRef(rng) safeHead.L1Origin = l1Info.ID() - out.ExpectSafeL2Head(safeHead) - batch := &BatchData{BatchV1{ ParentHash: safeHead.Hash, EpochNum: rollup.Epoch(l1Info.InfoNum), @@ -86,13 +56,11 @@ func TestAttributesQueue_Step(t *testing.T) { Transactions: []eth.Data{l1InfoTx, eth.Data("foobar"), eth.Data("example")}, NoTxPool: true, } - out.ExpectAddSafeAttributes(&attrs) - aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, out, nil) - require.NoError(t, RepeatResetStep(t, aq.ResetStep, l1Fetcher, 1)) + aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, nil) - aq.AddBatch(batch) + actual, err := aq.createNextAttributes(context.Background(), batch, safeHead) - require.NoError(t, aq.Step(context.Background(), out.progress), "adding batch to next stage, no EOF yet") - require.Equal(t, io.EOF, aq.Step(context.Background(), out.progress), "done with batches") + require.Nil(t, err) + require.Equal(t, attrs, *actual) } diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 412b553e3a4ab..b585619dde36b 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -17,6 +17,11 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type NextAttributesProvider interface { + Origin() eth.L1BlockRef + NextAttributes(context.Context, eth.L2BlockRef) (*eth.PayloadAttributes, error) +} + type Engine interface { GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) @@ -64,8 +69,6 @@ type EngineQueue struct { finalizedL1 eth.BlockID - progress Progress - safeAttributes []*eth.PayloadAttributes unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps @@ -73,14 +76,15 @@ type EngineQueue struct { finalityData []FinalityData engine Engine + prev NextAttributesProvider + + progress Progress // only used for pipeline resets metrics Metrics } -var _ AttributesQueueOutput = (*EngineQueue)(nil) - // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. -func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue { +func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider) *EngineQueue { return &EngineQueue{ log: log, cfg: cfg, @@ -91,6 +95,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M MaxSize: maxUnsafePayloadsMemory, SizeFn: payloadMemSize, }, + prev: prev, } } @@ -146,17 +151,30 @@ func (eq *EngineQueue) LastL2Time() uint64 { return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp) } -func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error { - if changed, err := eq.progress.Update(outer); err != nil || changed { - return err - } +func (eq *EngineQueue) Step(ctx context.Context, _ Progress) error { if len(eq.safeAttributes) > 0 { return eq.tryNextSafeAttributes(ctx) } + outOfData := false + if len(eq.safeAttributes) == 0 { + if next, err := eq.prev.NextAttributes(ctx, eq.safeHead); err == io.EOF { + outOfData = true + } else if err != nil { + return err + } else { + eq.safeAttributes = append(eq.safeAttributes, next) + return NotEnoughData + } + } if eq.unsafePayloads.Len() > 0 { return eq.tryNextUnsafePayload(ctx) } - return io.EOF + + if outOfData { + return io.EOF + } else { + return nil + } } // tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized, @@ -186,11 +204,11 @@ func (eq *EngineQueue) postProcessSafeL2() { eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...) } // remember the last L2 block that we fully derived from the given finality data - if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.progress.Origin.Number { + if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.prev.Origin().Number { // append entry for new L1 block eq.finalityData = append(eq.finalityData, FinalityData{ L2Block: eq.safeHead, - L1Block: eq.progress.Origin.ID(), + L1Block: eq.prev.Origin().ID(), }) } else { // if it's a now L2 block that was derived from the same latest L1 block, then just update the entry @@ -205,7 +223,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) { "l2_safe", eq.safeHead, "l2_unsafe", eq.unsafeHead, "l2_time", eq.unsafeHead.Time, - "l1_derived", eq.progress.Origin, + "l1_derived", eq.prev.Origin(), ) } @@ -415,7 +433,6 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error // note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads. eq.progress = Progress{ Origin: pipelineOrigin, - Closed: false, } eq.metrics.RecordL2Ref("l2_finalized", finalized) eq.metrics.RecordL2Ref("l2_safe", safe) diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go index 503f40c33e118..800c5c7736fe8 100644 --- a/op-node/rollup/derive/engine_queue_test.go +++ b/op-node/rollup/derive/engine_queue_test.go @@ -1,6 +1,8 @@ package derive import ( + "context" + "io" "math/rand" "testing" @@ -14,6 +16,20 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type fakeAttributesQueue struct { + origin eth.L1BlockRef +} + +func (f *fakeAttributesQueue) Origin() eth.L1BlockRef { + return f.origin +} + +func (f *fakeAttributesQueue) NextAttributes(_ context.Context, _ eth.L2BlockRef) (*eth.PayloadAttributes, error) { + return nil, io.EOF +} + +var _ NextAttributesProvider = (*fakeAttributesQueue)(nil) + func TestEngineQueue_Finalize(t *testing.T) { logger := testlog.Logger(t, log.LvlInfo) @@ -211,8 +227,10 @@ func TestEngineQueue_Finalize(t *testing.T) { l1F.ExpectL1BlockRefByHash(refB.Hash, refB, nil) l1F.ExpectL1BlockRefByNumber(refB.Number, refB, nil) - eq := NewEngineQueue(logger, cfg, eng, metrics) - require.NoError(t, RepeatResetStep(t, eq.ResetStep, l1F, 20)) + prev := &fakeAttributesQueue{} + + eq := NewEngineQueue(logger, cfg, eng, metrics, prev) + require.ErrorIs(t, eq.ResetStep(context.Background(), l1F), io.EOF) require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B") @@ -220,20 +238,19 @@ func TestEngineQueue_Finalize(t *testing.T) { // now say C1 was included in D and became the new safe head eq.progress.Origin = refD + prev.origin = refD eq.safeHead = refC1 eq.postProcessSafeL2() // now say D0 was included in E and became the new safe head eq.progress.Origin = refE + prev.origin = refE eq.safeHead = refD0 eq.postProcessSafeL2() // let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E) eq.Finalize(refD.ID()) - // Now a few steps later, without consuming any additional L1 inputs, - // we should be able to resolve that B1 is now finalized, since it was included in finalized L1 block C - require.NoError(t, RepeatStep(t, eq.Step, eq.progress, 10)) require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized") l1F.AssertExpectations(t) diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index ba2f62472d68d..bc8fe63d78009 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -101,13 +101,13 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch bank := NewChannelBank(log, cfg, l1Src, l1Fetcher) chInReader := NewChannelInReader(log, bank) batchQueue := NewBatchQueue(log, cfg, chInReader) + attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue) // Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages) - eng := NewEngineQueue(log, cfg, engine, metrics) - attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng, batchQueue) + eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue) - stages := []Stage{eng, attributesQueue} - pullStages := []PullStage{batchQueue, chInReader, bank, l1Src, l1Traversal} + stages := []Stage{eng} + pullStages := []PullStage{attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal} return &DerivationPipeline{ log: log,