From c4a679cb4e3b1a7e247bf82e51ae944a736df504 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Thu, 9 Mar 2023 19:08:39 -0800 Subject: [PATCH 01/15] Gracefully shut down the BatchSubmitter, closing current channel --- op-batcher/batcher/channel_builder.go | 20 ++++++++++++++++---- op-batcher/batcher/channel_manager.go | 10 ++++++++++ op-batcher/batcher/driver.go | 17 ++++++++--------- 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/op-batcher/batcher/channel_builder.go b/op-batcher/batcher/channel_builder.go index 9fdf0dfcba39d..a3ccf665b871a 100644 --- a/op-batcher/batcher/channel_builder.go +++ b/op-batcher/batcher/channel_builder.go @@ -18,6 +18,7 @@ var ( ErrMaxDurationReached = errors.New("max channel duration reached") ErrChannelTimeoutClose = errors.New("close to channel timeout") ErrSeqWindowClose = errors.New("close to sequencer window timeout") + ErrTerminated = errors.New("channel terminated") ) type ChannelFullError struct { @@ -188,7 +189,7 @@ func (c *channelBuilder) Reset() error { } // AddBlock adds a block to the channel compression pipeline. IsFull should be -// called aftewards to test whether the channel is full. If full, a new channel +// called afterwards to test whether the channel is full. If full, a new channel // must be started. // // AddBlock returns a ChannelFullError if called even though the channel is @@ -314,9 +315,10 @@ func (c *channelBuilder) IsFull() bool { // would have been exceeded by the latest AddBlock call, // - ErrMaxFrameIndex if the maximum number of frames has been generated // (uint16), -// - ErrMaxDurationReached if the max channel duration got reached. -// - ErrChannelTimeoutClose if the consensus channel timeout got too close. -// - ErrSeqWindowClose if the end of the sequencer window got too close. +// - ErrMaxDurationReached if the max channel duration got reached, +// - ErrChannelTimeoutClose if the consensus channel timeout got too close, +// - ErrSeqWindowClose if the end of the sequencer window got too close, +// - ErrTerminated if the channel was explicitly terminated. func (c *channelBuilder) FullErr() error { return c.fullErr } @@ -402,6 +404,16 @@ func (c *channelBuilder) outputFrame() error { return err // possibly io.EOF (last frame) } +// Close immediately marks the channel as full with an ErrTerminated +// if the channel is not already full. This ensures that no additional +// frames will be added to the channel. +func (c *channelBuilder) Close() error { + if !c.IsFull() { + c.setFullErr(ErrTerminated) + } + return c.FullErr() +} + // HasFrame returns whether there's any available frame. If true, it can be // popped using NextFrame(). // diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 270d603953464..96ae381df83d1 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -344,3 +344,13 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) SequenceNumber: l1info.SequenceNumber, } } + +// CloseCurrentChannel closes the current pending channel, if one exists. +// This ensures that no new frames will be produced, but there still may be any +// number of pending frames produced before this call. +func (s *channelManager) CloseCurrentChannel() error { + if s.pendingChannel == nil { + return nil + } + return s.pendingChannel.Close() +} diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index fc3fe5aab302d..0eccb60fec47f 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -145,8 +145,6 @@ func (l *BatchSubmitter) Start() error { l.running = true l.done = make(chan struct{}) - // TODO: this context only exists because the event loop doesn't reach done - // if the tx manager is blocking forever due to e.g. insufficient balance. l.ctx, l.cancel = context.WithCancel(context.Background()) l.state.Clear() l.lastStoredBlock = eth.BlockID{} @@ -287,6 +285,9 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth. func (l *BatchSubmitter) loop() { defer l.wg.Done() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ticker := time.NewTicker(l.PollInterval) defer ticker.Stop() for { @@ -294,9 +295,8 @@ func (l *BatchSubmitter) loop() { case <-ticker.C: l.loadBlocksIntoState(l.ctx) - blockLoop: for { - l1tip, err := l.l1Tip(l.ctx) + l1tip, err := l.l1Tip(ctx) if err != nil { l.log.Error("Failed to query L1 tip", "error", err) break @@ -320,12 +320,11 @@ func (l *BatchSubmitter) loop() { l.recordConfirmedTx(txdata.ID(), receipt) } - // hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending - // from the channel manager rather than sending the channel in a loop. This stalls b/c if the - // context is cancelled while sending, it will never fully clear the pending txns. + // Attempt to gracefully terminate the current channel, ensuring that no new frames will be + // produced. Any remaining frames must still be published to the L1 to prevent stalling. select { - case <-l.ctx.Done(): - break blockLoop + case <-l.done: + l.state.CloseCurrentChannel() default: } } From c9fc6d87bbbc1d07ac1c8d75a0ba9b13dd85d5a9 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Fri, 10 Mar 2023 12:01:21 -0800 Subject: [PATCH 02/15] Extract inner loop into new publishStateToL1 method --- op-batcher/batcher/driver.go | 71 +++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 0eccb60fec47f..f29c3e7cf2036 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -294,43 +294,46 @@ func (l *BatchSubmitter) loop() { select { case <-ticker.C: l.loadBlocksIntoState(l.ctx) + l.publishStateToL1(ctx) + case <-l.done: + return + } + } +} + +// publishStateToL1 loops through the block data loaded into `state` and +// submits the associated data to the L1 in the form of channel frames. +func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { + for { + l1tip, err := l.l1Tip(ctx) + if err != nil { + l.log.Error("Failed to query L1 tip", "error", err) + return + } + l.recordL1Tip(l1tip) - for { - l1tip, err := l.l1Tip(ctx) - if err != nil { - l.log.Error("Failed to query L1 tip", "error", err) - break - } - l.recordL1Tip(l1tip) - - // Collect next transaction data - txdata, err := l.state.TxData(l1tip.ID()) - if err == io.EOF { - l.log.Trace("no transaction data available") - break // local for loop - } else if err != nil { - l.log.Error("unable to get tx data", "err", err) - break - } - - // Record TX Status - if receipt, err := l.sendTransaction(l.ctx, txdata.Bytes()); err != nil { - l.recordFailedTx(txdata.ID(), err) - } else { - l.recordConfirmedTx(txdata.ID(), receipt) - } - - // Attempt to gracefully terminate the current channel, ensuring that no new frames will be - // produced. Any remaining frames must still be published to the L1 to prevent stalling. - select { - case <-l.done: - l.state.CloseCurrentChannel() - default: - } - } + // Collect next transaction data + txdata, err := l.state.TxData(l1tip.ID()) + if err == io.EOF { + l.log.Trace("no transaction data available") + break + } else if err != nil { + l.log.Error("unable to get tx data", "err", err) + break + } + // Record TX Status + if receipt, err := l.sendTransaction(ctx, txdata.Bytes()); err != nil { + l.recordFailedTx(txdata.ID(), err) + } else { + l.recordConfirmedTx(txdata.ID(), receipt) + } + // Attempt to gracefully terminate the current channel, ensuring that no new frames will be + // produced. Any remaining frames must still be published to the L1 to prevent stalling. + select { case <-l.done: - return + l.state.CloseCurrentChannel() + default: } } } From 56c2bff4fc1641136e5926fdf89de5768059ffe3 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 13 Mar 2023 19:11:53 -0700 Subject: [PATCH 03/15] Improve graceful shutdown logic, add tests --- op-batcher/batcher/channel_builder.go | 5 +- op-batcher/batcher/channel_manager.go | 25 ++- op-batcher/batcher/channel_manager_test.go | 214 ++++++++++++++++++++- op-batcher/batcher/driver.go | 2 +- 4 files changed, 228 insertions(+), 18 deletions(-) diff --git a/op-batcher/batcher/channel_builder.go b/op-batcher/batcher/channel_builder.go index a3ccf665b871a..ec3aaa6832f27 100644 --- a/op-batcher/batcher/channel_builder.go +++ b/op-batcher/batcher/channel_builder.go @@ -405,13 +405,12 @@ func (c *channelBuilder) outputFrame() error { } // Close immediately marks the channel as full with an ErrTerminated -// if the channel is not already full. This ensures that no additional -// frames will be added to the channel. +// if the channel is not already full, then outputs any remaining frames. func (c *channelBuilder) Close() error { if !c.IsFull() { c.setFullErr(ErrTerminated) } - return c.FullErr() + return c.closeAndOutputAllFrames() } // HasFrame returns whether there's any available frame. If true, it can be diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 96ae381df83d1..73a8ca3340495 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -41,6 +41,9 @@ type channelManager struct { pendingTransactions map[txID]txData // Set of confirmed txID -> inclusion block. For determining if the channel is timed out confirmedTransactions map[txID]eth.BlockID + + // if set to true, prevents production of any new channel frames + closed bool } func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager { @@ -78,6 +81,13 @@ func (s *channelManager) TxFailed(id txID) { } s.metr.RecordBatchTxFailed() + // If this channel has no submitted transactions, put the pending blocks back into the + // local saved blocks and reset this state so it can try to build a new channel. + if len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 { + s.log.Info("Channel has no submitted transactions", "chID", s.pendingChannel.ID()) + s.blocks = append(s.pendingChannel.Blocks(), s.blocks...) + s.clearPendingChannel() + } } // TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in @@ -184,6 +194,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { return s.nextTxData() } + // Avoid producing new frames if the channel has been explicitly closed. + if s.closed { + return txData{}, io.EOF + } + // No pending frame, so we have to add new blocks to the channel // If we have no saved blocks, we will not be able to create valid frames @@ -345,10 +360,12 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) } } -// CloseCurrentChannel closes the current pending channel, if one exists. -// This ensures that no new frames will be produced, but there still may be any -// number of pending frames produced before this call. -func (s *channelManager) CloseCurrentChannel() error { +// Close closes the current pending channel, if one exists, and prevents the +// creation of any new channels. +// This ensures that no new frames will be produced, but there may be any number +// of pending frames produced before this call which should still be published. +func (s *channelManager) Close() error { + s.closed = true if s.pendingChannel == nil { return nil } diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index ec04fb1f9dd00..b8ad1e9ac3402 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/trie" "github.com/stretchr/testify/require" ) @@ -331,9 +332,11 @@ func TestChannelManager_TxResend(t *testing.T) { log := testlog.Logger(t, log.LvlError) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ - TargetFrameSize: 0, - MaxFrameSize: 120_000, + TargetNumFrames: 2, + TargetFrameSize: 1000, + MaxFrameSize: 2000, ApproxComprRatio: 1.0, + ChannelTimeout: 1000, }) a, _ := derivetest.RandomL2Block(rng, 4) @@ -342,24 +345,215 @@ func TestChannelManager_TxResend(t *testing.T) { txdata0, err := m.TxData(eth.BlockID{}) require.NoError(err) - txdata0bytes := txdata0.Bytes() - data0 := make([]byte, len(txdata0bytes)) + + // confirm one frame to keep the channel open + m.TxConfirmed(txdata0.ID(), eth.BlockID{}) + + txdata1, err := m.TxData(eth.BlockID{}) + require.NoError(err) + txdata1bytes := txdata1.Bytes() + data1 := make([]byte, len(txdata1bytes)) // make sure we have a clone for later comparison - copy(data0, txdata0bytes) + copy(data1, txdata1bytes) // ensure channel is drained _, err = m.TxData(eth.BlockID{}) require.ErrorIs(err, io.EOF) // requeue frame - m.TxFailed(txdata0.ID()) + m.TxFailed(txdata1.ID()) - txdata1, err := m.TxData(eth.BlockID{}) + txdata2, err := m.TxData(eth.BlockID{}) require.NoError(err) - data1 := txdata1.Bytes() - require.Equal(data1, data0) - fs, err := derive.ParseFrames(data1) + data2 := txdata2.Bytes() + require.Equal(data2, data1) + fs, err := derive.ParseFrames(data2) require.NoError(err) require.Len(fs, 1) } + +// TestChannelManagerCloseBeforeFirstUse ensures that the channel manager +// will not produce any frames if closed immediately. +func TestChannelManagerCloseBeforeFirstUse(t *testing.T) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + log := testlog.Logger(t, log.LvlCrit) + m := NewChannelManager(log, metrics.NoopMetrics, + ChannelConfig{ + TargetFrameSize: 0, + MaxFrameSize: 100, + ApproxComprRatio: 1.0, + ChannelTimeout: 1000, + }) + + a, _ := derivetest.RandomL2Block(rng, 4) + + err := m.Close() + require.NoError(t, err) + + err = m.AddL2Block(a) + require.NoError(t, err) + + _, err = m.TxData(eth.BlockID{}) + require.ErrorIs(t, err, io.EOF) +} + +// TestChannelManagerCloseNoPendingChannel ensures that the channel manager +// can gracefully close with no pending channels, and will not emit any new +// channel frames. +func TestChannelManagerCloseNoPendingChannel(t *testing.T) { + log := testlog.Logger(t, log.LvlCrit) + m := NewChannelManager(log, metrics.NoopMetrics, + ChannelConfig{ + TargetFrameSize: 0, + MaxFrameSize: 100, + ApproxComprRatio: 1.0, + ChannelTimeout: 1000, + }) + lBlock := types.NewBlock(&types.Header{ + BaseFee: big.NewInt(10), + Difficulty: common.Big0, + Number: big.NewInt(100), + }, nil, nil, nil, trie.NewStackTrie(nil)) + l1InfoTx, err := derive.L1InfoDeposit(0, lBlock, eth.SystemConfig{}, false) + require.NoError(t, err) + txs := []*types.Transaction{types.NewTx(l1InfoTx)} + + a := types.NewBlock(&types.Header{ + Number: big.NewInt(0), + }, txs, nil, nil, trie.NewStackTrie(nil)) + + l1InfoTx, err = derive.L1InfoDeposit(1, lBlock, eth.SystemConfig{}, false) + require.NoError(t, err) + txs = []*types.Transaction{types.NewTx(l1InfoTx)} + + b := types.NewBlock(&types.Header{ + Number: big.NewInt(1), + ParentHash: a.Hash(), + }, txs, nil, nil, trie.NewStackTrie(nil)) + + err = m.AddL2Block(a) + require.NoError(t, err) + + txdata, err := m.TxData(eth.BlockID{}) + require.NoError(t, err) + + m.TxConfirmed(txdata.ID(), eth.BlockID{}) + + _, err = m.TxData(eth.BlockID{}) + require.ErrorIs(t, err, io.EOF) + + err = m.Close() + require.NoError(t, err) + + err = m.AddL2Block(b) + require.NoError(t, err) + + _, err = m.TxData(eth.BlockID{}) + require.ErrorIs(t, err, io.EOF) +} + +// TestChannelManagerCloseNoPendingChannel ensures that the channel manager +// can gracefully close with a pending channel, and will not produce any +// new channel frames after this point. +func TestChannelManagerClosePendingChannel(t *testing.T) { + log := testlog.Logger(t, log.LvlCrit) + m := NewChannelManager(log, metrics.NoopMetrics, + ChannelConfig{ + TargetNumFrames: 100, + TargetFrameSize: 1, + MaxFrameSize: 1, + ApproxComprRatio: 1.0, + ChannelTimeout: 1000, + }) + lBlock := types.NewBlock(&types.Header{ + BaseFee: big.NewInt(10), + Difficulty: common.Big0, + Number: big.NewInt(100), + }, nil, nil, nil, trie.NewStackTrie(nil)) + l1InfoTx, err := derive.L1InfoDeposit(0, lBlock, eth.SystemConfig{}, false) + require.NoError(t, err) + txs := []*types.Transaction{types.NewTx(l1InfoTx)} + + a := types.NewBlock(&types.Header{ + Number: big.NewInt(0), + }, txs, nil, nil, trie.NewStackTrie(nil)) + + l1InfoTx, err = derive.L1InfoDeposit(1, lBlock, eth.SystemConfig{}, false) + require.NoError(t, err) + txs = []*types.Transaction{types.NewTx(l1InfoTx)} + + b := types.NewBlock(&types.Header{ + Number: big.NewInt(1), + ParentHash: a.Hash(), + }, txs, nil, nil, trie.NewStackTrie(nil)) + + err = m.AddL2Block(a) + require.NoError(t, err) + + txdata, err := m.TxData(eth.BlockID{}) + require.NoError(t, err) + + m.TxConfirmed(txdata.ID(), eth.BlockID{}) + + err = m.Close() + require.NoError(t, err) + + txdata, err = m.TxData(eth.BlockID{}) + require.NoError(t, err) + + m.TxConfirmed(txdata.ID(), eth.BlockID{}) + + err = m.AddL2Block(b) + require.NoError(t, err) + + _, err = m.TxData(eth.BlockID{}) + require.ErrorIs(t, err, io.EOF) +} + +// TestChannelManagerCloseAllTxsFailed ensures that the channel manager +// can gracefully close after producing transaction frames if none of these +// have successfully landed on chain. +func TestChannelManagerCloseAllTxsFailed(t *testing.T) { + log := testlog.Logger(t, log.LvlCrit) + m := NewChannelManager(log, metrics.NoopMetrics, + ChannelConfig{ + TargetFrameSize: 0, + MaxFrameSize: 100, + ApproxComprRatio: 1.0, + ChannelTimeout: 1000, + }) + lBlock := types.NewBlock(&types.Header{ + BaseFee: big.NewInt(10), + Difficulty: common.Big0, + Number: big.NewInt(100), + }, nil, nil, nil, trie.NewStackTrie(nil)) + l1InfoTx, err := derive.L1InfoDeposit(0, lBlock, eth.SystemConfig{}, false) + require.NoError(t, err) + txs := []*types.Transaction{types.NewTx(l1InfoTx)} + + a := types.NewBlock(&types.Header{ + Number: big.NewInt(0), + }, txs, nil, nil, trie.NewStackTrie(nil)) + + err = m.AddL2Block(a) + require.NoError(t, err) + + txdata, err := m.TxData(eth.BlockID{}) + require.NoError(t, err) + + m.TxFailed(txdata.ID()) + + // Show that this data will continue to be emitted as long as the transaction + // fails and the channel manager is not closed + txdata, err = m.TxData(eth.BlockID{}) + require.NoError(t, err) + + m.TxFailed(txdata.ID()) + + err = m.Close() + require.NoError(t, err) + + _, err = m.TxData(eth.BlockID{}) + require.ErrorIs(t, err, io.EOF) +} diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index f29c3e7cf2036..bffda9a1e31ad 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -332,7 +332,7 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { // produced. Any remaining frames must still be published to the L1 to prevent stalling. select { case <-l.done: - l.state.CloseCurrentChannel() + l.state.Close() default: } } From 995e3e4232afaa540a8d922dad8022fae9a83af1 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Tue, 14 Mar 2023 15:00:09 -0700 Subject: [PATCH 04/15] Minor test improvements --- op-batcher/batcher/channel_manager_test.go | 54 ++++++++-------------- 1 file changed, 18 insertions(+), 36 deletions(-) diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index b8ad1e9ac3402..aebf3fc47b91b 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -457,38 +457,25 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) { // can gracefully close with a pending channel, and will not produce any // new channel frames after this point. func TestChannelManagerClosePendingChannel(t *testing.T) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) log := testlog.Logger(t, log.LvlCrit) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ TargetNumFrames: 100, - TargetFrameSize: 1, - MaxFrameSize: 1, + TargetFrameSize: 25_000, + MaxFrameSize: 40_000, ApproxComprRatio: 1.0, ChannelTimeout: 1000, }) - lBlock := types.NewBlock(&types.Header{ - BaseFee: big.NewInt(10), - Difficulty: common.Big0, - Number: big.NewInt(100), - }, nil, nil, nil, trie.NewStackTrie(nil)) - l1InfoTx, err := derive.L1InfoDeposit(0, lBlock, eth.SystemConfig{}, false) - require.NoError(t, err) - txs := []*types.Transaction{types.NewTx(l1InfoTx)} - - a := types.NewBlock(&types.Header{ - Number: big.NewInt(0), - }, txs, nil, nil, trie.NewStackTrie(nil)) - l1InfoTx, err = derive.L1InfoDeposit(1, lBlock, eth.SystemConfig{}, false) - require.NoError(t, err) - txs = []*types.Transaction{types.NewTx(l1InfoTx)} + a, _ := derivetest.RandomL2Block(rng, 128) - b := types.NewBlock(&types.Header{ - Number: big.NewInt(1), - ParentHash: a.Hash(), - }, txs, nil, nil, trie.NewStackTrie(nil)) + b, _ := derivetest.RandomL2Block(rng, 8) + header := b.Header() + header.ParentHash = a.Hash() + b = b.WithSeal(header) - err = m.AddL2Block(a) + err := m.AddL2Block(a) require.NoError(t, err) txdata, err := m.TxData(eth.BlockID{}) @@ -504,6 +491,9 @@ func TestChannelManagerClosePendingChannel(t *testing.T) { m.TxConfirmed(txdata.ID(), eth.BlockID{}) + _, err = m.TxData(eth.BlockID{}) + require.ErrorIs(t, err, io.EOF) + err = m.AddL2Block(b) require.NoError(t, err) @@ -515,28 +505,20 @@ func TestChannelManagerClosePendingChannel(t *testing.T) { // can gracefully close after producing transaction frames if none of these // have successfully landed on chain. func TestChannelManagerCloseAllTxsFailed(t *testing.T) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) log := testlog.Logger(t, log.LvlCrit) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ - TargetFrameSize: 0, - MaxFrameSize: 100, + TargetNumFrames: 100, + TargetFrameSize: 25_000, + MaxFrameSize: 40_000, ApproxComprRatio: 1.0, ChannelTimeout: 1000, }) - lBlock := types.NewBlock(&types.Header{ - BaseFee: big.NewInt(10), - Difficulty: common.Big0, - Number: big.NewInt(100), - }, nil, nil, nil, trie.NewStackTrie(nil)) - l1InfoTx, err := derive.L1InfoDeposit(0, lBlock, eth.SystemConfig{}, false) - require.NoError(t, err) - txs := []*types.Transaction{types.NewTx(l1InfoTx)} - a := types.NewBlock(&types.Header{ - Number: big.NewInt(0), - }, txs, nil, nil, trie.NewStackTrie(nil)) + a, _ := derivetest.RandomL2Block(rng, 128) - err = m.AddL2Block(a) + err := m.AddL2Block(a) require.NoError(t, err) txdata, err := m.TxData(eth.BlockID{}) From e9f8ae504da2d6174e0b97f69fd18590d3c805f5 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 20 Mar 2023 18:15:08 -0700 Subject: [PATCH 05/15] Improve select logic, avoid double-closure --- op-batcher/batcher/channel_builder.go | 9 +++--- op-batcher/batcher/channel_manager.go | 35 ++++++++++++--------- op-batcher/batcher/channel_manager_test.go | 36 +++++++++------------- op-batcher/batcher/driver.go | 25 ++++++++++----- 4 files changed, 56 insertions(+), 49 deletions(-) diff --git a/op-batcher/batcher/channel_builder.go b/op-batcher/batcher/channel_builder.go index ec3aaa6832f27..092391711268b 100644 --- a/op-batcher/batcher/channel_builder.go +++ b/op-batcher/batcher/channel_builder.go @@ -308,8 +308,8 @@ func (c *channelBuilder) IsFull() bool { // FullErr returns the reason why the channel is full. If not full yet, it // returns nil. // -// It returns a ChannelFullError wrapping one of six possible reasons for the -// channel being full: +// It returns a ChannelFullError wrapping one of the following possible reasons +// for the channel being full: // - ErrInputTargetReached if the target amount of input data has been reached, // - derive.MaxRLPBytesPerChannel if the general maximum amount of input data // would have been exceeded by the latest AddBlock call, @@ -405,12 +405,11 @@ func (c *channelBuilder) outputFrame() error { } // Close immediately marks the channel as full with an ErrTerminated -// if the channel is not already full, then outputs any remaining frames. -func (c *channelBuilder) Close() error { +// if the channel is not already full. +func (c *channelBuilder) Close() { if !c.IsFull() { c.setFullErr(ErrTerminated) } - return c.closeAndOutputAllFrames() } // HasFrame returns whether there's any available frame. If true, it can be diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 73a8ca3340495..dcfef0a231aed 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -81,12 +81,9 @@ func (s *channelManager) TxFailed(id txID) { } s.metr.RecordBatchTxFailed() - // If this channel has no submitted transactions, put the pending blocks back into the - // local saved blocks and reset this state so it can try to build a new channel. - if len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 { - s.log.Info("Channel has no submitted transactions", "chID", s.pendingChannel.ID()) - s.blocks = append(s.pendingChannel.Blocks(), s.blocks...) - s.clearPendingChannel() + if s.closed && len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 { + s.log.Info("Channel has no submitted transactions, clearing for shutdown", "chID", s.pendingChannel.ID()) + s.Clear() } } @@ -194,11 +191,6 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { return s.nextTxData() } - // Avoid producing new frames if the channel has been explicitly closed. - if s.closed { - return txData{}, io.EOF - } - // No pending frame, so we have to add new blocks to the channel // If we have no saved blocks, we will not be able to create valid frames @@ -210,8 +202,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { return txData{}, err } - if err := s.processBlocks(); err != nil { - return txData{}, err + // Avoid processing blocks if the channel manager has been explicitly closed. + if !s.closed { + if err := s.processBlocks(); err != nil { + return txData{}, err + } } // Register current L1 head only after all pending blocks have been @@ -365,9 +360,21 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) // This ensures that no new frames will be produced, but there may be any number // of pending frames produced before this call which should still be published. func (s *channelManager) Close() error { + if s.closed { + return nil + } + s.closed = true + + // Any pending state can be proactively cleared if there are no submitted transactions + if len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 { + s.Clear() + } + if s.pendingChannel == nil { return nil } - return s.pendingChannel.Close() + + s.pendingChannel.Close() + return nil } diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index aebf3fc47b91b..27feac4d1a973 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -332,11 +332,9 @@ func TestChannelManager_TxResend(t *testing.T) { log := testlog.Logger(t, log.LvlError) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ - TargetNumFrames: 2, - TargetFrameSize: 1000, - MaxFrameSize: 2000, + TargetFrameSize: 0, + MaxFrameSize: 120_000, ApproxComprRatio: 1.0, - ChannelTimeout: 1000, }) a, _ := derivetest.RandomL2Block(rng, 4) @@ -345,30 +343,24 @@ func TestChannelManager_TxResend(t *testing.T) { txdata0, err := m.TxData(eth.BlockID{}) require.NoError(err) - - // confirm one frame to keep the channel open - m.TxConfirmed(txdata0.ID(), eth.BlockID{}) - - txdata1, err := m.TxData(eth.BlockID{}) - require.NoError(err) - txdata1bytes := txdata1.Bytes() - data1 := make([]byte, len(txdata1bytes)) + txdata0bytes := txdata0.Bytes() + data0 := make([]byte, len(txdata0bytes)) // make sure we have a clone for later comparison - copy(data1, txdata1bytes) + copy(data0, txdata0bytes) // ensure channel is drained _, err = m.TxData(eth.BlockID{}) require.ErrorIs(err, io.EOF) // requeue frame - m.TxFailed(txdata1.ID()) + m.TxFailed(txdata0.ID()) - txdata2, err := m.TxData(eth.BlockID{}) + txdata1, err := m.TxData(eth.BlockID{}) require.NoError(err) - data2 := txdata2.Bytes() - require.Equal(data2, data1) - fs, err := derive.ParseFrames(data2) + data1 := txdata1.Bytes() + require.Equal(data1, data0) + fs, err := derive.ParseFrames(data1) require.NoError(err) require.Len(fs, 1) } @@ -457,13 +449,13 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) { // can gracefully close with a pending channel, and will not produce any // new channel frames after this point. func TestChannelManagerClosePendingChannel(t *testing.T) { - rng := rand.New(rand.NewSource(time.Now().UnixNano())) + rng := rand.New(rand.NewSource(1)) log := testlog.Logger(t, log.LvlCrit) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ TargetNumFrames: 100, - TargetFrameSize: 25_000, - MaxFrameSize: 40_000, + TargetFrameSize: 20_000, + MaxFrameSize: 20_000, ApproxComprRatio: 1.0, ChannelTimeout: 1000, }) @@ -505,7 +497,7 @@ func TestChannelManagerClosePendingChannel(t *testing.T) { // can gracefully close after producing transaction frames if none of these // have successfully landed on chain. func TestChannelManagerCloseAllTxsFailed(t *testing.T) { - rng := rand.New(rand.NewSource(time.Now().UnixNano())) + rng := rand.New(rand.NewSource(1)) log := testlog.Logger(t, log.LvlCrit) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index bffda9a1e31ad..b27b9a1ad618b 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -293,9 +293,18 @@ func (l *BatchSubmitter) loop() { for { select { case <-ticker.C: + // prioritize the `done` condition over the ticker, even though select ordering is randomized + select { + case <-l.done: + l.publishStateToL1(ctx) + return + default: + } + l.loadBlocksIntoState(l.ctx) l.publishStateToL1(ctx) case <-l.done: + l.publishStateToL1(ctx) return } } @@ -305,6 +314,14 @@ func (l *BatchSubmitter) loop() { // submits the associated data to the L1 in the form of channel frames. func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { for { + // Attempt to gracefully terminate the current channel, ensuring that no new frames will be + // produced. Any remaining frames must still be published to the L1 to prevent stalling. + select { + case <-l.done: + l.state.Close() + default: + } + l1tip, err := l.l1Tip(ctx) if err != nil { l.log.Error("Failed to query L1 tip", "error", err) @@ -327,14 +344,6 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { } else { l.recordConfirmedTx(txdata.ID(), receipt) } - - // Attempt to gracefully terminate the current channel, ensuring that no new frames will be - // produced. Any remaining frames must still be published to the L1 to prevent stalling. - select { - case <-l.done: - l.state.Close() - default: - } } } From ae230d214421857f0062ebff32dae1225aef9225 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 20 Mar 2023 18:25:19 -0700 Subject: [PATCH 06/15] Allow channel manager to be un-closed on clear --- op-batcher/batcher/channel_manager.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index dcfef0a231aed..6eec9e64a81e4 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -63,6 +63,7 @@ func (s *channelManager) Clear() { s.log.Trace("clearing channel manager state") s.blocks = s.blocks[:0] s.tip = common.Hash{} + s.closed = false s.clearPendingChannel() } @@ -83,7 +84,7 @@ func (s *channelManager) TxFailed(id txID) { s.metr.RecordBatchTxFailed() if s.closed && len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 { s.log.Info("Channel has no submitted transactions, clearing for shutdown", "chID", s.pendingChannel.ID()) - s.Clear() + s.clearPendingChannel() } } @@ -368,7 +369,7 @@ func (s *channelManager) Close() error { // Any pending state can be proactively cleared if there are no submitted transactions if len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 { - s.Clear() + s.clearPendingChannel() } if s.pendingChannel == nil { From d0922bff382e0bfbec56516e50956dfe4ee8ac06 Mon Sep 17 00:00:00 2001 From: Michael de Hoog Date: Fri, 17 Mar 2023 03:26:02 -0500 Subject: [PATCH 07/15] Allow passing in ctx to Stop Usa separate contexts for loading L2 blocks and tx submission --- op-batcher/batcher/driver.go | 37 +++++++++++++++++++++++------------- op-batcher/rpc/api.go | 6 +++--- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index b27b9a1ad618b..705c90d65f60d 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -29,8 +29,10 @@ type BatchSubmitter struct { wg sync.WaitGroup done chan struct{} - ctx context.Context - cancel context.CancelFunc + loadCtx context.Context + cancelLoad context.CancelFunc + txCtx context.Context + cancelTx context.CancelFunc mutex sync.Mutex running bool @@ -145,7 +147,8 @@ func (l *BatchSubmitter) Start() error { l.running = true l.done = make(chan struct{}) - l.ctx, l.cancel = context.WithCancel(context.Background()) + l.loadCtx, l.cancelLoad = context.WithCancel(context.Background()) + l.txCtx, l.cancelTx = context.WithCancel(context.Background()) l.state.Clear() l.lastStoredBlock = eth.BlockID{} @@ -158,10 +161,10 @@ func (l *BatchSubmitter) Start() error { } func (l *BatchSubmitter) StopIfRunning() { - _ = l.Stop() + _ = l.Stop(context.Background()) } -func (l *BatchSubmitter) Stop() error { +func (l *BatchSubmitter) Stop(ctx context.Context) error { l.log.Info("Stopping Batch Submitter") l.mutex.Lock() @@ -172,7 +175,16 @@ func (l *BatchSubmitter) Stop() error { } l.running = false - l.cancel() + // go routine will call cancelTx() if the passed in ctx is ever Done + cancelTx := l.cancelTx + wrapped, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + <-wrapped.Done() + cancelTx() + }() + + l.cancelLoad() close(l.done) l.wg.Wait() @@ -285,9 +297,6 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth. func (l *BatchSubmitter) loop() { defer l.wg.Done() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ticker := time.NewTicker(l.PollInterval) defer ticker.Stop() for { @@ -296,15 +305,15 @@ func (l *BatchSubmitter) loop() { // prioritize the `done` condition over the ticker, even though select ordering is randomized select { case <-l.done: - l.publishStateToL1(ctx) + l.publishStateToL1(l.txCtx) return default: } - l.loadBlocksIntoState(l.ctx) - l.publishStateToL1(ctx) + l.loadBlocksIntoState(l.loadCtx) + l.publishStateToL1(l.txCtx) case <-l.done: - l.publishStateToL1(ctx) + l.publishStateToL1(l.txCtx) return } } @@ -317,6 +326,8 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { // Attempt to gracefully terminate the current channel, ensuring that no new frames will be // produced. Any remaining frames must still be published to the L1 to prevent stalling. select { + case <-ctx.Done(): + l.state.Close() case <-l.done: l.state.Close() default: diff --git a/op-batcher/rpc/api.go b/op-batcher/rpc/api.go index 29db8669bc4c9..a1c4d5b2e8da5 100644 --- a/op-batcher/rpc/api.go +++ b/op-batcher/rpc/api.go @@ -6,7 +6,7 @@ import ( type batcherClient interface { Start() error - Stop() error + Stop(ctx context.Context) error } type adminAPI struct { @@ -23,6 +23,6 @@ func (a *adminAPI) StartBatcher(_ context.Context) error { return a.b.Start() } -func (a *adminAPI) StopBatcher(_ context.Context) error { - return a.b.Stop() +func (a *adminAPI) StopBatcher(ctx context.Context) error { + return a.b.Stop(ctx) } From c16a3cd07d5059d4eb610a5bd9e7530a14df68d9 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Tue, 21 Mar 2023 14:02:17 -0700 Subject: [PATCH 08/15] Accept a context in System.Close --- op-batcher/batcher/batch_submitter.go | 2 +- op-batcher/batcher/driver.go | 4 +-- op-e2e/bridge_test.go | 3 ++- op-e2e/migration_test.go | 2 +- op-e2e/setup.go | 4 +-- op-e2e/system_test.go | 35 +++++++++++++++------------ op-e2e/system_tob_test.go | 8 +++--- 7 files changed, 31 insertions(+), 27 deletions(-) diff --git a/op-batcher/batcher/batch_submitter.go b/op-batcher/batcher/batch_submitter.go index 132660614b2de..3010ea8d3807a 100644 --- a/op-batcher/batcher/batch_submitter.go +++ b/op-batcher/batcher/batch_submitter.go @@ -51,7 +51,7 @@ func Main(version string, cliCtx *cli.Context) error { return err } } - defer batchSubmitter.StopIfRunning() + defer batchSubmitter.StopIfRunning(context.Background()) ctx, cancel := context.WithCancel(context.Background()) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 705c90d65f60d..86d2ff8b0193c 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -160,8 +160,8 @@ func (l *BatchSubmitter) Start() error { return nil } -func (l *BatchSubmitter) StopIfRunning() { - _ = l.Stop(context.Background()) +func (l *BatchSubmitter) StopIfRunning(ctx context.Context) { + _ = l.Stop(ctx) } func (l *BatchSubmitter) Stop(ctx context.Context) error { diff --git a/op-e2e/bridge_test.go b/op-e2e/bridge_test.go index d3d65a591b47a..da728f63d8428 100644 --- a/op-e2e/bridge_test.go +++ b/op-e2e/bridge_test.go @@ -1,6 +1,7 @@ package op_e2e import ( + "context" "math" "math/big" "testing" @@ -29,7 +30,7 @@ func TestERC20BridgeDeposits(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) log := testlog.Logger(t, log.LvlInfo) log.Info("genesis", "l2", sys.RollupConfig.Genesis.L2, "l1", sys.RollupConfig.Genesis.L1, "l2_time", sys.RollupConfig.Genesis.L2Time) diff --git a/op-e2e/migration_test.go b/op-e2e/migration_test.go index 4e3808b8974ec..f91f4faf1f49f 100644 --- a/op-e2e/migration_test.go +++ b/op-e2e/migration_test.go @@ -352,7 +352,7 @@ func TestMigration(t *testing.T) { }, lgr.New("module", "batcher"), batchermetrics.NoopMetrics) require.NoError(t, err) t.Cleanup(func() { - batcher.StopIfRunning() + batcher.StopIfRunning(context.Background()) }) proposer, err := l2os.NewL2OutputSubmitterFromCLIConfig(l2os.CLIConfig{ diff --git a/op-e2e/setup.go b/op-e2e/setup.go index 7a23d59f4a5e8..8c074e60a6156 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -214,12 +214,12 @@ type System struct { Mocknet mocknet.Mocknet } -func (sys *System) Close() { +func (sys *System) Close(ctx context.Context) { if sys.L2OutputSubmitter != nil { sys.L2OutputSubmitter.Stop() } if sys.BatchSubmitter != nil { - sys.BatchSubmitter.StopIfRunning() + sys.BatchSubmitter.StopIfRunning(ctx) } for _, node := range sys.RollupNodes { diff --git a/op-e2e/system_test.go b/op-e2e/system_test.go index 5ef31be96b2a2..c332420410c04 100644 --- a/op-e2e/system_test.go +++ b/op-e2e/system_test.go @@ -73,7 +73,7 @@ func TestL2OutputSubmitter(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) l1Client := sys.Clients["l1"] @@ -146,7 +146,7 @@ func TestSystemE2E(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) log := testlog.Logger(t, log.LvlInfo) log.Info("genesis", "l2", sys.RollupConfig.Genesis.L2, "l1", sys.RollupConfig.Genesis.L1, "l2_time", sys.RollupConfig.Genesis.L2Time) @@ -261,7 +261,7 @@ func TestConfirmationDepth(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) log := testlog.Logger(t, log.LvlInfo) log.Info("genesis", "l2", sys.RollupConfig.Genesis.L2, "l1", sys.RollupConfig.Genesis.L1, "l2_time", sys.RollupConfig.Genesis.L2Time) @@ -320,7 +320,7 @@ func TestPendingGasLimit(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) log := testlog.Logger(t, log.LvlInfo) log.Info("genesis", "l2", sys.RollupConfig.Genesis.L2, "l1", sys.RollupConfig.Genesis.L1, "l2_time", sys.RollupConfig.Genesis.L2Time) @@ -365,7 +365,7 @@ func TestFinalize(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) l2Seq := sys.Clients["sequencer"] @@ -392,7 +392,7 @@ func TestMintOnRevertedDeposit(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) l1Client := sys.Clients["l1"] l2Verif := sys.Clients["verifier"] @@ -475,7 +475,10 @@ func TestMissingBatchE2E(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer func() { + ctx, _ := context.WithTimeout(context.Background(), time.Second) + sys.Close(ctx) + }() l2Seq := sys.Clients["sequencer"] l2Verif := sys.Clients["verifier"] @@ -609,7 +612,7 @@ func TestSystemMockP2P(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) l2Seq := sys.Clients["sequencer"] l2Verif := sys.Clients["verifier"] @@ -697,7 +700,7 @@ func TestSystemRPCAltSync(t *testing.T) { }, }) require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) l2Seq := sys.Clients["sequencer"] l2Verif := sys.Clients["verifier"] @@ -809,7 +812,7 @@ func TestSystemDenseTopology(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) l2Seq := sys.Clients["sequencer"] l2Verif := sys.Clients["verifier"] @@ -874,7 +877,7 @@ func TestL1InfoContract(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) l1Client := sys.Clients["l1"] l2Seq := sys.Clients["sequencer"] @@ -1003,7 +1006,7 @@ func TestWithdrawals(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) l1Client := sys.Clients["l1"] l2Seq := sys.Clients["sequencer"] @@ -1210,7 +1213,7 @@ func TestFees(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) l2Seq := sys.Clients["sequencer"] l2Verif := sys.Clients["verifier"] @@ -1356,7 +1359,7 @@ func TestStopStartSequencer(t *testing.T) { cfg := DefaultSystemConfig(t) sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) l2Seq := sys.Clients["sequencer"] rollupNode := sys.RollupNodes["sequencer"] @@ -1400,7 +1403,7 @@ func TestStopStartBatcher(t *testing.T) { cfg := DefaultSystemConfig(t) sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) rollupRPCClient, err := rpc.DialContext(context.Background(), sys.RollupNodes["verifier"].HTTPEndpoint()) require.Nil(t, err) @@ -1449,7 +1452,7 @@ func TestStopStartBatcher(t *testing.T) { require.Greater(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain did not advance") // stop the batch submission - err = sys.BatchSubmitter.Stop() + err = sys.BatchSubmitter.Stop(context.Background()) require.Nil(t, err) // wait for any old safe blocks being submitted / derived diff --git a/op-e2e/system_tob_test.go b/op-e2e/system_tob_test.go index df09202df89d0..b8191c95a3d48 100644 --- a/op-e2e/system_tob_test.go +++ b/op-e2e/system_tob_test.go @@ -49,7 +49,7 @@ func TestGasPriceOracleFeeUpdates(t *testing.T) { cfg := DefaultSystemConfig(t) sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) // Obtain our sequencer, verifier, and transactor keypair. l1Client := sys.Clients["l1"] @@ -136,7 +136,7 @@ func TestL2SequencerRPCDepositTx(t *testing.T) { cfg := DefaultSystemConfig(t) sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) // Obtain our sequencer, verifier, and transactor keypair. l2Seq := sys.Clients["sequencer"] @@ -250,7 +250,7 @@ func TestMixedDepositValidity(t *testing.T) { cfg := DefaultSystemConfig(t) sys, testAccounts, err := startConfigWithTestAccounts(&cfg, accountUsedToDeposit) require.Nil(t, err, "Error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) // Obtain our sequencer, verifier, and transactor keypair. l1Client := sys.Clients["l1"] @@ -430,7 +430,7 @@ func TestMixedWithdrawalValidity(t *testing.T) { cfg.DeployConfig.FinalizationPeriodSeconds = 6 sys, err := cfg.Start() require.NoError(t, err, "error starting up system") - defer sys.Close() + defer sys.Close(context.Background()) // Obtain our sequencer, verifier, and transactor keypair. l1Client := sys.Clients["l1"] From 57d3c50e09ed11466404f74f24e3928abd330d2d Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Tue, 21 Mar 2023 14:26:43 -0700 Subject: [PATCH 09/15] Clean up tests --- op-batcher/batcher/channel_manager.go | 8 +- op-batcher/batcher/channel_manager_test.go | 98 ++++++++-------------- 2 files changed, 40 insertions(+), 66 deletions(-) diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 6eec9e64a81e4..99337ecc26cc6 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -360,9 +360,9 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) // creation of any new channels. // This ensures that no new frames will be produced, but there may be any number // of pending frames produced before this call which should still be published. -func (s *channelManager) Close() error { +func (s *channelManager) Close() { if s.closed { - return nil + return } s.closed = true @@ -373,9 +373,9 @@ func (s *channelManager) Close() error { } if s.pendingChannel == nil { - return nil + return } s.pendingChannel.Close() - return nil + return } diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index 27feac4d1a973..9e432b1e361c7 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -15,7 +15,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/trie" "github.com/stretchr/testify/require" ) @@ -368,6 +367,7 @@ func TestChannelManager_TxResend(t *testing.T) { // TestChannelManagerCloseBeforeFirstUse ensures that the channel manager // will not produce any frames if closed immediately. func TestChannelManagerCloseBeforeFirstUse(t *testing.T) { + require := require.New(t) rng := rand.New(rand.NewSource(time.Now().UnixNano())) log := testlog.Logger(t, log.LvlCrit) m := NewChannelManager(log, metrics.NoopMetrics, @@ -380,20 +380,20 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) { a, _ := derivetest.RandomL2Block(rng, 4) - err := m.Close() - require.NoError(t, err) + m.Close() - err = m.AddL2Block(a) - require.NoError(t, err) + err := m.AddL2Block(a) + require.NoError(err, "Failed to add L2 block") _, err = m.TxData(eth.BlockID{}) - require.ErrorIs(t, err, io.EOF) + require.ErrorIs(err, io.EOF, "Expected closed channel manager to contain no tx data") } // TestChannelManagerCloseNoPendingChannel ensures that the channel manager // can gracefully close with no pending channels, and will not emit any new // channel frames. func TestChannelManagerCloseNoPendingChannel(t *testing.T) { + require := require.New(t) log := testlog.Logger(t, log.LvlCrit) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ @@ -402,132 +402,106 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) { ApproxComprRatio: 1.0, ChannelTimeout: 1000, }) - lBlock := types.NewBlock(&types.Header{ - BaseFee: big.NewInt(10), - Difficulty: common.Big0, - Number: big.NewInt(100), - }, nil, nil, nil, trie.NewStackTrie(nil)) - l1InfoTx, err := derive.L1InfoDeposit(0, lBlock, eth.SystemConfig{}, false) - require.NoError(t, err) - txs := []*types.Transaction{types.NewTx(l1InfoTx)} - - a := types.NewBlock(&types.Header{ - Number: big.NewInt(0), - }, txs, nil, nil, trie.NewStackTrie(nil)) - - l1InfoTx, err = derive.L1InfoDeposit(1, lBlock, eth.SystemConfig{}, false) - require.NoError(t, err) - txs = []*types.Transaction{types.NewTx(l1InfoTx)} - - b := types.NewBlock(&types.Header{ - Number: big.NewInt(1), - ParentHash: a.Hash(), - }, txs, nil, nil, trie.NewStackTrie(nil)) + a := newMiniL2Block(0) + b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash()) - err = m.AddL2Block(a) - require.NoError(t, err) + err := m.AddL2Block(a) + require.NoError(err, "Failed to add L2 block") txdata, err := m.TxData(eth.BlockID{}) - require.NoError(t, err) + require.NoError(err, "Expected channel manager to return valid tx data") m.TxConfirmed(txdata.ID(), eth.BlockID{}) _, err = m.TxData(eth.BlockID{}) - require.ErrorIs(t, err, io.EOF) + require.ErrorIs(err, io.EOF, "Expected channel manager to EOF") - err = m.Close() - require.NoError(t, err) + m.Close() err = m.AddL2Block(b) - require.NoError(t, err) + require.NoError(err, "Failed to add L2 block") _, err = m.TxData(eth.BlockID{}) - require.ErrorIs(t, err, io.EOF) + require.ErrorIs(err, io.EOF, "Expected closed channel manager to return no new tx data") } // TestChannelManagerCloseNoPendingChannel ensures that the channel manager // can gracefully close with a pending channel, and will not produce any // new channel frames after this point. func TestChannelManagerClosePendingChannel(t *testing.T) { - rng := rand.New(rand.NewSource(1)) + require := require.New(t) log := testlog.Logger(t, log.LvlCrit) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ TargetNumFrames: 100, - TargetFrameSize: 20_000, - MaxFrameSize: 20_000, + TargetFrameSize: 1000, + MaxFrameSize: 1000, ApproxComprRatio: 1.0, ChannelTimeout: 1000, }) - a, _ := derivetest.RandomL2Block(rng, 128) - - b, _ := derivetest.RandomL2Block(rng, 8) - header := b.Header() - header.ParentHash = a.Hash() - b = b.WithSeal(header) + a := newMiniL2Block(50_000) + b := newMiniL2BlockWithNumberParent(10, big.NewInt(1), a.Hash()) err := m.AddL2Block(a) - require.NoError(t, err) + require.NoError(err, "Failed to add L2 block") txdata, err := m.TxData(eth.BlockID{}) - require.NoError(t, err) + require.NoError(err, "Expected channel manager to produce valid tx data") m.TxConfirmed(txdata.ID(), eth.BlockID{}) - err = m.Close() - require.NoError(t, err) + m.Close() txdata, err = m.TxData(eth.BlockID{}) - require.NoError(t, err) + require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data") m.TxConfirmed(txdata.ID(), eth.BlockID{}) _, err = m.TxData(eth.BlockID{}) - require.ErrorIs(t, err, io.EOF) + require.ErrorIs(err, io.EOF, "Expected channel manager to have no more tx data") err = m.AddL2Block(b) - require.NoError(t, err) + require.NoError(err, "Failed to add L2 block") _, err = m.TxData(eth.BlockID{}) - require.ErrorIs(t, err, io.EOF) + require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data") } // TestChannelManagerCloseAllTxsFailed ensures that the channel manager // can gracefully close after producing transaction frames if none of these // have successfully landed on chain. func TestChannelManagerCloseAllTxsFailed(t *testing.T) { - rng := rand.New(rand.NewSource(1)) + require := require.New(t) log := testlog.Logger(t, log.LvlCrit) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ TargetNumFrames: 100, - TargetFrameSize: 25_000, - MaxFrameSize: 40_000, + TargetFrameSize: 1000, + MaxFrameSize: 1000, ApproxComprRatio: 1.0, ChannelTimeout: 1000, }) - a, _ := derivetest.RandomL2Block(rng, 128) + a := newMiniL2Block(50_000) err := m.AddL2Block(a) - require.NoError(t, err) + require.NoError(err, "Failed to add L2 block") txdata, err := m.TxData(eth.BlockID{}) - require.NoError(t, err) + require.NoError(err, "Expected channel manager to produce valid tx data") m.TxFailed(txdata.ID()) // Show that this data will continue to be emitted as long as the transaction // fails and the channel manager is not closed txdata, err = m.TxData(eth.BlockID{}) - require.NoError(t, err) + require.NoError(err, "Expected channel manager to re-attempt the failed transaction") m.TxFailed(txdata.ID()) - err = m.Close() - require.NoError(t, err) + m.Close() _, err = m.TxData(eth.BlockID{}) - require.ErrorIs(t, err, io.EOF) + require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data") } From 5b31ea5ab6d772e5ba592a0dc8590e3ed0ae3879 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Tue, 21 Mar 2023 14:29:03 -0700 Subject: [PATCH 10/15] Lint --- op-batcher/batcher/channel_manager.go | 1 - 1 file changed, 1 deletion(-) diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 99337ecc26cc6..c74e14f6c630e 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -377,5 +377,4 @@ func (s *channelManager) Close() { } s.pendingChannel.Close() - return } From decf982530d520e43fb9e5d23785e0ae09390c60 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Tue, 21 Mar 2023 15:26:17 -0700 Subject: [PATCH 11/15] Lint --- op-e2e/system_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/op-e2e/system_test.go b/op-e2e/system_test.go index c332420410c04..f683babfe48d6 100644 --- a/op-e2e/system_test.go +++ b/op-e2e/system_test.go @@ -476,7 +476,8 @@ func TestMissingBatchE2E(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") defer func() { - ctx, _ := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() sys.Close(ctx) }() From acc22d766a32bdfb502c9c2f324fff0f8f614d54 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Fri, 24 Mar 2023 14:38:54 -0700 Subject: [PATCH 12/15] Address PR feedback, improve context naming and simplify selects --- op-batcher/batcher/channel_manager.go | 5 +-- op-batcher/batcher/driver.go | 45 ++++++++------------------- 2 files changed, 16 insertions(+), 34 deletions(-) diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index c74e14f6c630e..b77fe040bc266 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -358,8 +358,9 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) // Close closes the current pending channel, if one exists, and prevents the // creation of any new channels. -// This ensures that no new frames will be produced, but there may be any number -// of pending frames produced before this call which should still be published. +// This ensures that no new blocks will be added to the channel, but there may be any number +// of frames still produced by calling `OutputFrames()`, which flushes the compression buffer. +// These frames still need to be published. func (s *channelManager) Close() { if s.closed { return diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 86d2ff8b0193c..1faed55890d4d 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -27,12 +27,11 @@ type BatchSubmitter struct { txMgr txmgr.TxManager wg sync.WaitGroup - done chan struct{} - loadCtx context.Context - cancelLoad context.CancelFunc - txCtx context.Context - cancelTx context.CancelFunc + shutdownCtx context.Context + cancelShutdownCtx context.CancelFunc + killCtx context.Context + cancelKillCtx context.CancelFunc mutex sync.Mutex running bool @@ -146,9 +145,8 @@ func (l *BatchSubmitter) Start() error { } l.running = true - l.done = make(chan struct{}) - l.loadCtx, l.cancelLoad = context.WithCancel(context.Background()) - l.txCtx, l.cancelTx = context.WithCancel(context.Background()) + l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background()) + l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background()) l.state.Clear() l.lastStoredBlock = eth.BlockID{} @@ -175,18 +173,9 @@ func (l *BatchSubmitter) Stop(ctx context.Context) error { } l.running = false - // go routine will call cancelTx() if the passed in ctx is ever Done - cancelTx := l.cancelTx - wrapped, cancel := context.WithCancel(ctx) - defer cancel() - go func() { - <-wrapped.Done() - cancelTx() - }() - - l.cancelLoad() - close(l.done) + l.cancelShutdownCtx() l.wg.Wait() + l.cancelKillCtx() l.log.Info("Batch Submitter stopped") @@ -302,18 +291,10 @@ func (l *BatchSubmitter) loop() { for { select { case <-ticker.C: - // prioritize the `done` condition over the ticker, even though select ordering is randomized - select { - case <-l.done: - l.publishStateToL1(l.txCtx) - return - default: - } - - l.loadBlocksIntoState(l.loadCtx) - l.publishStateToL1(l.txCtx) - case <-l.done: - l.publishStateToL1(l.txCtx) + l.loadBlocksIntoState(l.shutdownCtx) + l.publishStateToL1(l.killCtx) + case <-l.shutdownCtx.Done(): + l.publishStateToL1(l.killCtx) return } } @@ -328,7 +309,7 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { select { case <-ctx.Done(): l.state.Close() - case <-l.done: + case <-l.shutdownCtx.Done(): l.state.Close() default: } From 99c75b47755b7d3b98baeb426d2a41459fa2a7ca Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 27 Mar 2023 11:11:51 -0700 Subject: [PATCH 13/15] Force-kill the batcher if the Stop context is ever done --- op-batcher/batcher/driver.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 1faed55890d4d..971edf49e80dc 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -173,6 +173,15 @@ func (l *BatchSubmitter) Stop(ctx context.Context) error { } l.running = false + // go routine will call cancelKill() if the passed in ctx is ever Done + cancelKill := l.cancelKillCtx + wrapped, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + <-wrapped.Done() + cancelKill() + }() + l.cancelShutdownCtx() l.wg.Wait() l.cancelKillCtx() From 8b87c2d2a99e9cd3b91942d26d608d95760a2a79 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 27 Mar 2023 11:15:13 -0700 Subject: [PATCH 14/15] Simplify e2e test Close --- op-e2e/bridge_test.go | 3 +-- op-e2e/setup.go | 4 +++- op-e2e/system_test.go | 34 +++++++++++++++------------------- op-e2e/system_tob_test.go | 8 ++++---- 4 files changed, 23 insertions(+), 26 deletions(-) diff --git a/op-e2e/bridge_test.go b/op-e2e/bridge_test.go index da728f63d8428..d3d65a591b47a 100644 --- a/op-e2e/bridge_test.go +++ b/op-e2e/bridge_test.go @@ -1,7 +1,6 @@ package op_e2e import ( - "context" "math" "math/big" "testing" @@ -30,7 +29,7 @@ func TestERC20BridgeDeposits(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() log := testlog.Logger(t, log.LvlInfo) log.Info("genesis", "l2", sys.RollupConfig.Genesis.L2, "l1", sys.RollupConfig.Genesis.L1, "l2_time", sys.RollupConfig.Genesis.L2Time) diff --git a/op-e2e/setup.go b/op-e2e/setup.go index 8c074e60a6156..0b958da5eeb74 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -214,11 +214,13 @@ type System struct { Mocknet mocknet.Mocknet } -func (sys *System) Close(ctx context.Context) { +func (sys *System) Close() { if sys.L2OutputSubmitter != nil { sys.L2OutputSubmitter.Stop() } if sys.BatchSubmitter != nil { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() sys.BatchSubmitter.StopIfRunning(ctx) } diff --git a/op-e2e/system_test.go b/op-e2e/system_test.go index f683babfe48d6..6aa183a6fc9a0 100644 --- a/op-e2e/system_test.go +++ b/op-e2e/system_test.go @@ -73,7 +73,7 @@ func TestL2OutputSubmitter(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() l1Client := sys.Clients["l1"] @@ -146,7 +146,7 @@ func TestSystemE2E(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() log := testlog.Logger(t, log.LvlInfo) log.Info("genesis", "l2", sys.RollupConfig.Genesis.L2, "l1", sys.RollupConfig.Genesis.L1, "l2_time", sys.RollupConfig.Genesis.L2Time) @@ -261,7 +261,7 @@ func TestConfirmationDepth(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() log := testlog.Logger(t, log.LvlInfo) log.Info("genesis", "l2", sys.RollupConfig.Genesis.L2, "l1", sys.RollupConfig.Genesis.L1, "l2_time", sys.RollupConfig.Genesis.L2Time) @@ -320,7 +320,7 @@ func TestPendingGasLimit(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() log := testlog.Logger(t, log.LvlInfo) log.Info("genesis", "l2", sys.RollupConfig.Genesis.L2, "l1", sys.RollupConfig.Genesis.L1, "l2_time", sys.RollupConfig.Genesis.L2Time) @@ -365,7 +365,7 @@ func TestFinalize(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() l2Seq := sys.Clients["sequencer"] @@ -392,7 +392,7 @@ func TestMintOnRevertedDeposit(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() l1Client := sys.Clients["l1"] l2Verif := sys.Clients["verifier"] @@ -475,11 +475,7 @@ func TestMissingBatchE2E(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - sys.Close(ctx) - }() + defer sys.Close() l2Seq := sys.Clients["sequencer"] l2Verif := sys.Clients["verifier"] @@ -613,7 +609,7 @@ func TestSystemMockP2P(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() l2Seq := sys.Clients["sequencer"] l2Verif := sys.Clients["verifier"] @@ -701,7 +697,7 @@ func TestSystemRPCAltSync(t *testing.T) { }, }) require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() l2Seq := sys.Clients["sequencer"] l2Verif := sys.Clients["verifier"] @@ -813,7 +809,7 @@ func TestSystemDenseTopology(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() l2Seq := sys.Clients["sequencer"] l2Verif := sys.Clients["verifier"] @@ -878,7 +874,7 @@ func TestL1InfoContract(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() l1Client := sys.Clients["l1"] l2Seq := sys.Clients["sequencer"] @@ -1007,7 +1003,7 @@ func TestWithdrawals(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() l1Client := sys.Clients["l1"] l2Seq := sys.Clients["sequencer"] @@ -1214,7 +1210,7 @@ func TestFees(t *testing.T) { sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() l2Seq := sys.Clients["sequencer"] l2Verif := sys.Clients["verifier"] @@ -1360,7 +1356,7 @@ func TestStopStartSequencer(t *testing.T) { cfg := DefaultSystemConfig(t) sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() l2Seq := sys.Clients["sequencer"] rollupNode := sys.RollupNodes["sequencer"] @@ -1404,7 +1400,7 @@ func TestStopStartBatcher(t *testing.T) { cfg := DefaultSystemConfig(t) sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() rollupRPCClient, err := rpc.DialContext(context.Background(), sys.RollupNodes["verifier"].HTTPEndpoint()) require.Nil(t, err) diff --git a/op-e2e/system_tob_test.go b/op-e2e/system_tob_test.go index b8191c95a3d48..df09202df89d0 100644 --- a/op-e2e/system_tob_test.go +++ b/op-e2e/system_tob_test.go @@ -49,7 +49,7 @@ func TestGasPriceOracleFeeUpdates(t *testing.T) { cfg := DefaultSystemConfig(t) sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() // Obtain our sequencer, verifier, and transactor keypair. l1Client := sys.Clients["l1"] @@ -136,7 +136,7 @@ func TestL2SequencerRPCDepositTx(t *testing.T) { cfg := DefaultSystemConfig(t) sys, err := cfg.Start() require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() // Obtain our sequencer, verifier, and transactor keypair. l2Seq := sys.Clients["sequencer"] @@ -250,7 +250,7 @@ func TestMixedDepositValidity(t *testing.T) { cfg := DefaultSystemConfig(t) sys, testAccounts, err := startConfigWithTestAccounts(&cfg, accountUsedToDeposit) require.Nil(t, err, "Error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() // Obtain our sequencer, verifier, and transactor keypair. l1Client := sys.Clients["l1"] @@ -430,7 +430,7 @@ func TestMixedWithdrawalValidity(t *testing.T) { cfg.DeployConfig.FinalizationPeriodSeconds = 6 sys, err := cfg.Start() require.NoError(t, err, "error starting up system") - defer sys.Close(context.Background()) + defer sys.Close() // Obtain our sequencer, verifier, and transactor keypair. l1Client := sys.Clients["l1"] From acdd051de362b7ed9fe51357b6f3c809a6682ce0 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 27 Mar 2023 12:10:30 -0700 Subject: [PATCH 15/15] Immediately flush channel builder on channel manager close --- op-batcher/batcher/channel_manager.go | 27 ++++++++++++--------------- op-batcher/batcher/driver.go | 10 ++++++++-- op-e2e/migration_test.go | 4 +++- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index b77fe040bc266..30467d75ad55b 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -187,8 +187,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame() s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks)) - // Short circuit if there is a pending frame. - if dataPending { + // Short circuit if there is a pending frame or the channel manager is closed. + if dataPending || s.closed { return s.nextTxData() } @@ -203,11 +203,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { return txData{}, err } - // Avoid processing blocks if the channel manager has been explicitly closed. - if !s.closed { - if err := s.processBlocks(); err != nil { - return txData{}, err - } + if err := s.processBlocks(); err != nil { + return txData{}, err } // Register current L1 head only after all pending blocks have been @@ -356,14 +353,12 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) } } -// Close closes the current pending channel, if one exists, and prevents the -// creation of any new channels. -// This ensures that no new blocks will be added to the channel, but there may be any number -// of frames still produced by calling `OutputFrames()`, which flushes the compression buffer. -// These frames still need to be published. -func (s *channelManager) Close() { +// Close closes the current pending channel, if one exists, outputs any remaining frames, +// and prevents the creation of any new channels. +// Any outputted frames still need to be published. +func (s *channelManager) Close() error { if s.closed { - return + return nil } s.closed = true @@ -374,8 +369,10 @@ func (s *channelManager) Close() { } if s.pendingChannel == nil { - return + return nil } s.pendingChannel.Close() + + return s.outputFrames() } diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 971edf49e80dc..f67d7d956093a 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -317,9 +317,15 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { // produced. Any remaining frames must still be published to the L1 to prevent stalling. select { case <-ctx.Done(): - l.state.Close() + err := l.state.Close() + if err != nil { + l.log.Error("error closing the channel manager", "err", err) + } case <-l.shutdownCtx.Done(): - l.state.Close() + err := l.state.Close() + if err != nil { + l.log.Error("error closing the channel manager", "err", err) + } default: } diff --git a/op-e2e/migration_test.go b/op-e2e/migration_test.go index f91f4faf1f49f..c8db45302dd4c 100644 --- a/op-e2e/migration_test.go +++ b/op-e2e/migration_test.go @@ -352,7 +352,9 @@ func TestMigration(t *testing.T) { }, lgr.New("module", "batcher"), batchermetrics.NoopMetrics) require.NoError(t, err) t.Cleanup(func() { - batcher.StopIfRunning(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + batcher.StopIfRunning(ctx) }) proposer, err := l2os.NewL2OutputSubmitterFromCLIConfig(l2os.CLIConfig{