diff --git a/op-node/rollup/derive/calldata_source.go b/op-node/rollup/derive/calldata_source.go index 00a9f89dca36d..0f671e7712fcc 100644 --- a/op-node/rollup/derive/calldata_source.go +++ b/op-node/rollup/derive/calldata_source.go @@ -2,55 +2,104 @@ package derive import ( "context" + "errors" "fmt" "io" "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" ) -// CalldataSource readers raw transactions from a given block & then filters for -// batch submitter transactions. -// This is not a stage in the pipeline, but a wrapper for another stage in the pipeline -// +type DataIter interface { + Next(ctx context.Context) (eth.Data, error) +} type L1TransactionFetcher interface { InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) } -type DataSlice []eth.Data - -func (ds *DataSlice) Next(ctx context.Context) (eth.Data, error) { - if len(*ds) == 0 { - return nil, io.EOF - } - out := (*ds)[0] - *ds = (*ds)[1:] - return out, nil -} - -type CalldataSource struct { +// DataSourceFactory readers raw transactions from a given block & then filters for +// batch submitter transactions. +// This is not a stage in the pipeline, but a wrapper for another stage in the pipeline +type DataSourceFactory struct { log log.Logger cfg *rollup.Config fetcher L1TransactionFetcher } -func NewCalldataSource(log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher) *CalldataSource { - return &CalldataSource{log: log, cfg: cfg, fetcher: fetcher} +func NewDataSourceFactory(log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher) *DataSourceFactory { + return &DataSourceFactory{log: log, cfg: cfg, fetcher: fetcher} +} + +// OpenData returns a CalldataSourceImpl. This struct implements the `Next` function. +func (ds *DataSourceFactory) OpenData(ctx context.Context, id eth.BlockID) DataIter { + return NewDataSource(ctx, ds.log, ds.cfg, ds.fetcher, id) } -func (cs *CalldataSource) OpenData(ctx context.Context, id eth.BlockID) (DataIter, error) { - _, txs, err := cs.fetcher.InfoAndTxsByHash(ctx, id.Hash) +// DataSource is a fault tolerant approach to fetching data. +// The constructor will never fail & it will instead re-attempt the fetcher +// at a later point. +type DataSource struct { + // Internal state + data + open bool + data []eth.Data + // Required to re-attempt fetching + id eth.BlockID + cfg *rollup.Config // TODO: `DataFromEVMTransactions` should probably not take the full config + fetcher L1TransactionFetcher + log log.Logger +} + +// NewDataSource creates a new calldata source. It suppresses errors in fetching the L1 block if they occur. +// If there is an error, it will attempt to fetch the result on the next call to `Next`. +func NewDataSource(ctx context.Context, log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher, block eth.BlockID) DataIter { + _, txs, err := fetcher.InfoAndTxsByHash(ctx, block.Hash) if err != nil { - return nil, fmt.Errorf("failed to fetch transactions: %w", err) + return &DataSource{ + open: false, + id: block, + cfg: cfg, + fetcher: fetcher, + log: log, + } + } else { + return &DataSource{ + open: true, + data: DataFromEVMTransactions(cfg, txs, log.New("origin", block)), + } + } +} + +// Next returns the next piece of data if it has it. If the constructor failed, this +// will attempt to reinitialize itself. If it cannot find the block it returns a ResetError +// otherwise it returns a temporary error if fetching the block returns an error. +func (ds *DataSource) Next(ctx context.Context) (eth.Data, error) { + if !ds.open { + if _, txs, err := ds.fetcher.InfoAndTxsByHash(ctx, ds.id.Hash); err == nil { + ds.open = true + ds.data = DataFromEVMTransactions(ds.cfg, txs, log.New("origin", ds.id)) + } else if errors.Is(err, ethereum.NotFound) { + return nil, NewResetError(fmt.Errorf("failed to open calldata source: %w", err)) + } else { + return nil, NewTemporaryError(fmt.Errorf("failed to open calldata source: %w", err)) + } + } + if len(ds.data) == 0 { + return nil, io.EOF + } else { + data := ds.data[0] + ds.data = ds.data[1:] + return data, nil } - data := DataFromEVMTransactions(cs.cfg, txs, cs.log.New("origin", id)) - return (*DataSlice)(&data), nil } +// DataFromEVMTransactions filters all of the transactions and returns the calldata from transactions +// that are sent to the batch inbox address from the batch sender address. +// This will return an empty array if no valid transactions are found. func DataFromEVMTransactions(config *rollup.Config, txs types.Transactions, log log.Logger) []eth.Data { var out []eth.Data l1Signer := config.L1Signer() diff --git a/op-node/rollup/derive/calldata_source_test.go b/op-node/rollup/derive/calldata_source_test.go index a7b598b87e418..7ae01a7a88fb5 100644 --- a/op-node/rollup/derive/calldata_source_test.go +++ b/op-node/rollup/derive/calldata_source_test.go @@ -1,10 +1,7 @@ package derive import ( - "context" "crypto/ecdsa" - "fmt" - "io" "math/big" "math/rand" "testing" @@ -45,61 +42,15 @@ func (tx *testTx) Create(t *testing.T, signer types.Signer, rng *rand.Rand) *typ return out } -type calldataTestSetup struct { - inboxPriv *ecdsa.PrivateKey - batcherPriv *ecdsa.PrivateKey - cfg *rollup.Config - signer types.Signer -} - type calldataTest struct { name string txs []testTx - err error -} - -func (ct *calldataTest) Run(t *testing.T, setup *calldataTestSetup) { - rng := rand.New(rand.NewSource(1234)) - l1Src := &testutils.MockL1Source{} - txs := make([]*types.Transaction, len(ct.txs)) - - expectedData := make([]eth.Data, 0) - - for i, tx := range ct.txs { - txs[i] = tx.Create(t, setup.signer, rng) - if tx.good { - expectedData = append(expectedData, txs[i].Data()) - } - } - - info := testutils.RandomBlockInfo(rng) - l1Src.ExpectInfoAndTxsByHash(info.Hash(), info, txs, ct.err) - - defer l1Src.Mock.AssertExpectations(t) - - src := NewCalldataSource(testlog.Logger(t, log.LvlError), setup.cfg, l1Src) - dataIter, err := src.OpenData(context.Background(), info.ID()) - - if ct.err != nil { - require.ErrorIs(t, err, ct.err) - return - } - require.NoError(t, err) - - for { - dat, err := dataIter.Next(context.Background()) - if err == io.EOF { - break - } - require.NoError(t, err) - require.Equal(t, dat, expectedData[0], "data must match next expected value") - expectedData = expectedData[1:] - } - require.Len(t, expectedData, 0, "all expected data should have been read") } -func TestCalldataSource_OpenData(t *testing.T) { - +// TestDataFromEVMTransactions creates some transactions from a specified template and asserts +// that DataFromEVMTransactions properly filters and returns the data from the authorized transactions +// inside the transaction set. +func TestDataFromEVMTransactions(t *testing.T) { inboxPriv := testutils.RandomKey() batcherPriv := testutils.RandomKey() cfg := &rollup.Config{ @@ -107,59 +58,68 @@ func TestCalldataSource_OpenData(t *testing.T) { BatchInboxAddress: crypto.PubkeyToAddress(inboxPriv.PublicKey), BatchSenderAddress: crypto.PubkeyToAddress(batcherPriv.PublicKey), } - signer := cfg.L1Signer() - setup := &calldataTestSetup{ - inboxPriv: inboxPriv, - batcherPriv: batcherPriv, - cfg: cfg, - signer: signer, - } altInbox := testutils.RandomAddress(rand.New(rand.NewSource(1234))) altAuthor := testutils.RandomKey() testCases := []calldataTest{ - {name: "simple", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: batcherPriv, good: true}}}, - {name: "other inbox", txs: []testTx{{to: &altInbox, dataLen: 1234, author: batcherPriv, good: false}}}, - {name: "other author", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: altAuthor, good: false}}}, - {name: "inbox is author", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: inboxPriv, good: false}}}, - {name: "author is inbox", txs: []testTx{{to: &cfg.BatchSenderAddress, dataLen: 1234, author: batcherPriv, good: false}}}, - {name: "unrelated", txs: []testTx{{to: &altInbox, dataLen: 1234, author: altAuthor, good: false}}}, - {name: "contract creation", txs: []testTx{{to: nil, dataLen: 1234, author: batcherPriv, good: false}}}, - {name: "empty tx", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 0, author: batcherPriv, good: true}}}, - {name: "value tx", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, value: 42, author: batcherPriv, good: true}}}, - {name: "empty block", txs: []testTx{}}, + { + name: "simple", + txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: batcherPriv, good: true}}, + }, + { + name: "other inbox", + txs: []testTx{{to: &altInbox, dataLen: 1234, author: batcherPriv, good: false}}}, + { + name: "other author", + txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: altAuthor, good: false}}}, + { + name: "inbox is author", + txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: inboxPriv, good: false}}}, + { + name: "author is inbox", + txs: []testTx{{to: &cfg.BatchSenderAddress, dataLen: 1234, author: batcherPriv, good: false}}}, + { + name: "unrelated", + txs: []testTx{{to: &altInbox, dataLen: 1234, author: altAuthor, good: false}}}, + { + name: "contract creation", + txs: []testTx{{to: nil, dataLen: 1234, author: batcherPriv, good: false}}}, + { + name: "empty tx", + txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 0, author: batcherPriv, good: true}}}, + { + name: "value tx", + txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, value: 42, author: batcherPriv, good: true}}}, + { + name: "empty block", txs: []testTx{}, + }, + { + name: "mixed txs", + txs: []testTx{ + {to: &cfg.BatchInboxAddress, dataLen: 1234, value: 42, author: batcherPriv, good: true}, + {to: &cfg.BatchInboxAddress, dataLen: 3333, value: 32, author: altAuthor, good: false}, + {to: &cfg.BatchInboxAddress, dataLen: 2000, value: 22, author: batcherPriv, good: true}, + {to: &altInbox, dataLen: 2020, value: 12, author: batcherPriv, good: false}, + }, + }, } - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - testCase.Run(t, setup) - }) - } - - t.Run("random combinations", func(t *testing.T) { - var all []testTx - for _, tc := range testCases { - all = append(all, tc.txs...) - } - var combiTestCases []calldataTest - for i := 0; i < 100; i++ { - txs := append(make([]testTx, 0), all...) - rng := rand.New(rand.NewSource(42 + int64(i))) - rng.Shuffle(len(txs), func(i, j int) { - txs[i], txs[j] = txs[j], txs[i] - }) - subset := txs[:rng.Intn(len(txs))] - combiTestCases = append(combiTestCases, calldataTest{ - name: fmt.Sprintf("combi_%d_subset_%d", i, len(subset)), - txs: subset, - }) + for i, tc := range testCases { + rng := rand.New(rand.NewSource(int64(i))) + signer := cfg.L1Signer() + + var expectedData []eth.Data + var txs []*types.Transaction + for i, tx := range tc.txs { + txs = append(txs, tx.Create(t, signer, rng)) + if tx.good { + expectedData = append(expectedData, txs[i].Data()) + } } - for _, testCase := range combiTestCases { - t.Run(testCase.name, func(t *testing.T) { - testCase.Run(t, setup) - }) - } - }) + out := DataFromEVMTransactions(cfg, txs, testlog.Logger(t, log.LvlCrit)) + require.ElementsMatch(t, expectedData, out) + } + } diff --git a/op-node/rollup/derive/l1_retrieval.go b/op-node/rollup/derive/l1_retrieval.go index 9b888b084bbdb..4d90ad9f48c16 100644 --- a/op-node/rollup/derive/l1_retrieval.go +++ b/op-node/rollup/derive/l1_retrieval.go @@ -2,34 +2,21 @@ package derive import ( "context" - "fmt" "io" "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum/go-ethereum/log" ) -// This is a generic wrapper around fetching all transactions in a block & then -// it feeds one L1 transaction at a time to the next stage - -// DataIter is a minimal iteration interface to fetch rollup input data from an arbitrary data-availability source -type DataIter interface { - // Next can be repeatedly called for more data, until it returns an io.EOF error. - // It never returns io.EOF and data at the same time. - Next(ctx context.Context) (eth.Data, error) -} - -// DataAvailabilitySource provides rollup input data -type DataAvailabilitySource interface { - // OpenData does any initial data-fetching work and returns an iterator to fetch data with. - OpenData(ctx context.Context, id eth.BlockID) (DataIter, error) -} - type L1SourceOutput interface { StageProgress IngestData(data []byte) } +type DataAvailabilitySource interface { + OpenData(ctx context.Context, id eth.BlockID) DataIter +} + type L1Retrieval struct { log log.Logger dataSrc DataAvailabilitySource @@ -67,11 +54,7 @@ func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error { // create a source if we have none if l1r.datas == nil { - datas, err := l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID()) - if err != nil { - return NewTemporaryError(fmt.Errorf("can't fetch L1 data: %v: %w", l1r.progress.Origin, err)) - } - l1r.datas = datas + l1r.datas = l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID()) return nil } @@ -84,7 +67,7 @@ func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error { l1r.datas = nil return io.EOF } else if err != nil { - return NewTemporaryError(fmt.Errorf("context to retrieve next L1 data failed: %w", err)) + return err } else { l1r.data = data return nil diff --git a/op-node/rollup/derive/l1_retrieval_test.go b/op-node/rollup/derive/l1_retrieval_test.go index 32f60e5b9a551..42794476a312d 100644 --- a/op-node/rollup/derive/l1_retrieval_test.go +++ b/op-node/rollup/derive/l1_retrieval_test.go @@ -2,6 +2,7 @@ package derive import ( "context" + "io" "math/rand" "testing" @@ -14,13 +15,27 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type fakeDataIter struct { + data []eth.Data +} + +func (cs *fakeDataIter) Next(ctx context.Context) (eth.Data, error) { + if len(cs.data) == 0 { + return nil, io.EOF + } else { + data := cs.data[0] + cs.data = cs.data[1:] + return data, nil + } +} + type MockDataSource struct { mock.Mock } -func (m *MockDataSource) OpenData(ctx context.Context, id eth.BlockID) (DataIter, error) { +func (m *MockDataSource) OpenData(ctx context.Context, id eth.BlockID) DataIter { out := m.Mock.MethodCalled("OpenData", id) - return out[0].(DataIter), *out[1].(*error) + return out[0].(DataIter) } func (m *MockDataSource) ExpectOpenData(id eth.BlockID, iter DataIter, err error) { @@ -51,7 +66,7 @@ func TestL1Retrieval_Step(t *testing.T) { a := testutils.RandomData(rng, 10) b := testutils.RandomData(rng, 15) - iter := &DataSlice{a, b} + iter := &fakeDataIter{data: []eth.Data{a, b}} outer := Progress{Origin: testutils.NextRandomRef(rng, next.progress.Origin), Closed: false} diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 17dd7d8805afa..93f43f48986f8 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -88,7 +88,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch batchQueue := NewBatchQueue(log, cfg, attributesQueue) chInReader := NewChannelInReader(log, batchQueue) bank := NewChannelBank(log, cfg, chInReader) - dataSrc := NewCalldataSource(log, cfg, l1Fetcher) + dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) l1Src := NewL1Retrieval(log, dataSrc, bank) l1Traversal := NewL1Traversal(log, l1Fetcher, l1Src) stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal}