diff --git a/op-batcher/batcher/batch_submitter.go b/op-batcher/batcher/batch_submitter.go index 132660614b2de..3010ea8d3807a 100644 --- a/op-batcher/batcher/batch_submitter.go +++ b/op-batcher/batcher/batch_submitter.go @@ -51,7 +51,7 @@ func Main(version string, cliCtx *cli.Context) error { return err } } - defer batchSubmitter.StopIfRunning() + defer batchSubmitter.StopIfRunning(context.Background()) ctx, cancel := context.WithCancel(context.Background()) diff --git a/op-batcher/batcher/channel_builder.go b/op-batcher/batcher/channel_builder.go index 9fdf0dfcba39d..092391711268b 100644 --- a/op-batcher/batcher/channel_builder.go +++ b/op-batcher/batcher/channel_builder.go @@ -18,6 +18,7 @@ var ( ErrMaxDurationReached = errors.New("max channel duration reached") ErrChannelTimeoutClose = errors.New("close to channel timeout") ErrSeqWindowClose = errors.New("close to sequencer window timeout") + ErrTerminated = errors.New("channel terminated") ) type ChannelFullError struct { @@ -188,7 +189,7 @@ func (c *channelBuilder) Reset() error { } // AddBlock adds a block to the channel compression pipeline. IsFull should be -// called aftewards to test whether the channel is full. If full, a new channel +// called afterwards to test whether the channel is full. If full, a new channel // must be started. // // AddBlock returns a ChannelFullError if called even though the channel is @@ -307,16 +308,17 @@ func (c *channelBuilder) IsFull() bool { // FullErr returns the reason why the channel is full. If not full yet, it // returns nil. // -// It returns a ChannelFullError wrapping one of six possible reasons for the -// channel being full: +// It returns a ChannelFullError wrapping one of the following possible reasons +// for the channel being full: // - ErrInputTargetReached if the target amount of input data has been reached, // - derive.MaxRLPBytesPerChannel if the general maximum amount of input data // would have been exceeded by the latest AddBlock call, // - ErrMaxFrameIndex if the maximum number of frames has been generated // (uint16), -// - ErrMaxDurationReached if the max channel duration got reached. -// - ErrChannelTimeoutClose if the consensus channel timeout got too close. -// - ErrSeqWindowClose if the end of the sequencer window got too close. +// - ErrMaxDurationReached if the max channel duration got reached, +// - ErrChannelTimeoutClose if the consensus channel timeout got too close, +// - ErrSeqWindowClose if the end of the sequencer window got too close, +// - ErrTerminated if the channel was explicitly terminated. func (c *channelBuilder) FullErr() error { return c.fullErr } @@ -402,6 +404,14 @@ func (c *channelBuilder) outputFrame() error { return err // possibly io.EOF (last frame) } +// Close immediately marks the channel as full with an ErrTerminated +// if the channel is not already full. +func (c *channelBuilder) Close() { + if !c.IsFull() { + c.setFullErr(ErrTerminated) + } +} + // HasFrame returns whether there's any available frame. If true, it can be // popped using NextFrame(). // diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 270d603953464..30467d75ad55b 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -41,6 +41,9 @@ type channelManager struct { pendingTransactions map[txID]txData // Set of confirmed txID -> inclusion block. For determining if the channel is timed out confirmedTransactions map[txID]eth.BlockID + + // if set to true, prevents production of any new channel frames + closed bool } func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager { @@ -60,6 +63,7 @@ func (s *channelManager) Clear() { s.log.Trace("clearing channel manager state") s.blocks = s.blocks[:0] s.tip = common.Hash{} + s.closed = false s.clearPendingChannel() } @@ -78,6 +82,10 @@ func (s *channelManager) TxFailed(id txID) { } s.metr.RecordBatchTxFailed() + if s.closed && len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 { + s.log.Info("Channel has no submitted transactions, clearing for shutdown", "chID", s.pendingChannel.ID()) + s.clearPendingChannel() + } } // TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in @@ -179,8 +187,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame() s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks)) - // Short circuit if there is a pending frame. - if dataPending { + // Short circuit if there is a pending frame or the channel manager is closed. + if dataPending || s.closed { return s.nextTxData() } @@ -344,3 +352,27 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) SequenceNumber: l1info.SequenceNumber, } } + +// Close closes the current pending channel, if one exists, outputs any remaining frames, +// and prevents the creation of any new channels. +// Any outputted frames still need to be published. +func (s *channelManager) Close() error { + if s.closed { + return nil + } + + s.closed = true + + // Any pending state can be proactively cleared if there are no submitted transactions + if len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 { + s.clearPendingChannel() + } + + if s.pendingChannel == nil { + return nil + } + + s.pendingChannel.Close() + + return s.outputFrames() +} diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index ec04fb1f9dd00..9e432b1e361c7 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -363,3 +363,145 @@ func TestChannelManager_TxResend(t *testing.T) { require.NoError(err) require.Len(fs, 1) } + +// TestChannelManagerCloseBeforeFirstUse ensures that the channel manager +// will not produce any frames if closed immediately. +func TestChannelManagerCloseBeforeFirstUse(t *testing.T) { + require := require.New(t) + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + log := testlog.Logger(t, log.LvlCrit) + m := NewChannelManager(log, metrics.NoopMetrics, + ChannelConfig{ + TargetFrameSize: 0, + MaxFrameSize: 100, + ApproxComprRatio: 1.0, + ChannelTimeout: 1000, + }) + + a, _ := derivetest.RandomL2Block(rng, 4) + + m.Close() + + err := m.AddL2Block(a) + require.NoError(err, "Failed to add L2 block") + + _, err = m.TxData(eth.BlockID{}) + require.ErrorIs(err, io.EOF, "Expected closed channel manager to contain no tx data") +} + +// TestChannelManagerCloseNoPendingChannel ensures that the channel manager +// can gracefully close with no pending channels, and will not emit any new +// channel frames. +func TestChannelManagerCloseNoPendingChannel(t *testing.T) { + require := require.New(t) + log := testlog.Logger(t, log.LvlCrit) + m := NewChannelManager(log, metrics.NoopMetrics, + ChannelConfig{ + TargetFrameSize: 0, + MaxFrameSize: 100, + ApproxComprRatio: 1.0, + ChannelTimeout: 1000, + }) + a := newMiniL2Block(0) + b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash()) + + err := m.AddL2Block(a) + require.NoError(err, "Failed to add L2 block") + + txdata, err := m.TxData(eth.BlockID{}) + require.NoError(err, "Expected channel manager to return valid tx data") + + m.TxConfirmed(txdata.ID(), eth.BlockID{}) + + _, err = m.TxData(eth.BlockID{}) + require.ErrorIs(err, io.EOF, "Expected channel manager to EOF") + + m.Close() + + err = m.AddL2Block(b) + require.NoError(err, "Failed to add L2 block") + + _, err = m.TxData(eth.BlockID{}) + require.ErrorIs(err, io.EOF, "Expected closed channel manager to return no new tx data") +} + +// TestChannelManagerCloseNoPendingChannel ensures that the channel manager +// can gracefully close with a pending channel, and will not produce any +// new channel frames after this point. +func TestChannelManagerClosePendingChannel(t *testing.T) { + require := require.New(t) + log := testlog.Logger(t, log.LvlCrit) + m := NewChannelManager(log, metrics.NoopMetrics, + ChannelConfig{ + TargetNumFrames: 100, + TargetFrameSize: 1000, + MaxFrameSize: 1000, + ApproxComprRatio: 1.0, + ChannelTimeout: 1000, + }) + + a := newMiniL2Block(50_000) + b := newMiniL2BlockWithNumberParent(10, big.NewInt(1), a.Hash()) + + err := m.AddL2Block(a) + require.NoError(err, "Failed to add L2 block") + + txdata, err := m.TxData(eth.BlockID{}) + require.NoError(err, "Expected channel manager to produce valid tx data") + + m.TxConfirmed(txdata.ID(), eth.BlockID{}) + + m.Close() + + txdata, err = m.TxData(eth.BlockID{}) + require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data") + + m.TxConfirmed(txdata.ID(), eth.BlockID{}) + + _, err = m.TxData(eth.BlockID{}) + require.ErrorIs(err, io.EOF, "Expected channel manager to have no more tx data") + + err = m.AddL2Block(b) + require.NoError(err, "Failed to add L2 block") + + _, err = m.TxData(eth.BlockID{}) + require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data") +} + +// TestChannelManagerCloseAllTxsFailed ensures that the channel manager +// can gracefully close after producing transaction frames if none of these +// have successfully landed on chain. +func TestChannelManagerCloseAllTxsFailed(t *testing.T) { + require := require.New(t) + log := testlog.Logger(t, log.LvlCrit) + m := NewChannelManager(log, metrics.NoopMetrics, + ChannelConfig{ + TargetNumFrames: 100, + TargetFrameSize: 1000, + MaxFrameSize: 1000, + ApproxComprRatio: 1.0, + ChannelTimeout: 1000, + }) + + a := newMiniL2Block(50_000) + + err := m.AddL2Block(a) + require.NoError(err, "Failed to add L2 block") + + txdata, err := m.TxData(eth.BlockID{}) + require.NoError(err, "Expected channel manager to produce valid tx data") + + m.TxFailed(txdata.ID()) + + // Show that this data will continue to be emitted as long as the transaction + // fails and the channel manager is not closed + txdata, err = m.TxData(eth.BlockID{}) + require.NoError(err, "Expected channel manager to re-attempt the failed transaction") + + m.TxFailed(txdata.ID()) + + m.Close() + + _, err = m.TxData(eth.BlockID{}) + require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data") +} diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index fc3fe5aab302d..f67d7d956093a 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -27,10 +27,11 @@ type BatchSubmitter struct { txMgr txmgr.TxManager wg sync.WaitGroup - done chan struct{} - ctx context.Context - cancel context.CancelFunc + shutdownCtx context.Context + cancelShutdownCtx context.CancelFunc + killCtx context.Context + cancelKillCtx context.CancelFunc mutex sync.Mutex running bool @@ -144,10 +145,8 @@ func (l *BatchSubmitter) Start() error { } l.running = true - l.done = make(chan struct{}) - // TODO: this context only exists because the event loop doesn't reach done - // if the tx manager is blocking forever due to e.g. insufficient balance. - l.ctx, l.cancel = context.WithCancel(context.Background()) + l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background()) + l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background()) l.state.Clear() l.lastStoredBlock = eth.BlockID{} @@ -159,11 +158,11 @@ func (l *BatchSubmitter) Start() error { return nil } -func (l *BatchSubmitter) StopIfRunning() { - _ = l.Stop() +func (l *BatchSubmitter) StopIfRunning(ctx context.Context) { + _ = l.Stop(ctx) } -func (l *BatchSubmitter) Stop() error { +func (l *BatchSubmitter) Stop(ctx context.Context) error { l.log.Info("Stopping Batch Submitter") l.mutex.Lock() @@ -174,9 +173,18 @@ func (l *BatchSubmitter) Stop() error { } l.running = false - l.cancel() - close(l.done) + // go routine will call cancelKill() if the passed in ctx is ever Done + cancelKill := l.cancelKillCtx + wrapped, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + <-wrapped.Done() + cancelKill() + }() + + l.cancelShutdownCtx() l.wg.Wait() + l.cancelKillCtx() l.log.Info("Batch Submitter stopped") @@ -292,47 +300,57 @@ func (l *BatchSubmitter) loop() { for { select { case <-ticker.C: - l.loadBlocksIntoState(l.ctx) - - blockLoop: - for { - l1tip, err := l.l1Tip(l.ctx) - if err != nil { - l.log.Error("Failed to query L1 tip", "error", err) - break - } - l.recordL1Tip(l1tip) - - // Collect next transaction data - txdata, err := l.state.TxData(l1tip.ID()) - if err == io.EOF { - l.log.Trace("no transaction data available") - break // local for loop - } else if err != nil { - l.log.Error("unable to get tx data", "err", err) - break - } - - // Record TX Status - if receipt, err := l.sendTransaction(l.ctx, txdata.Bytes()); err != nil { - l.recordFailedTx(txdata.ID(), err) - } else { - l.recordConfirmedTx(txdata.ID(), receipt) - } - - // hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending - // from the channel manager rather than sending the channel in a loop. This stalls b/c if the - // context is cancelled while sending, it will never fully clear the pending txns. - select { - case <-l.ctx.Done(): - break blockLoop - default: - } + l.loadBlocksIntoState(l.shutdownCtx) + l.publishStateToL1(l.killCtx) + case <-l.shutdownCtx.Done(): + l.publishStateToL1(l.killCtx) + return + } + } +} + +// publishStateToL1 loops through the block data loaded into `state` and +// submits the associated data to the L1 in the form of channel frames. +func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { + for { + // Attempt to gracefully terminate the current channel, ensuring that no new frames will be + // produced. Any remaining frames must still be published to the L1 to prevent stalling. + select { + case <-ctx.Done(): + err := l.state.Close() + if err != nil { + l.log.Error("error closing the channel manager", "err", err) + } + case <-l.shutdownCtx.Done(): + err := l.state.Close() + if err != nil { + l.log.Error("error closing the channel manager", "err", err) } + default: + } - case <-l.done: + l1tip, err := l.l1Tip(ctx) + if err != nil { + l.log.Error("Failed to query L1 tip", "error", err) return } + l.recordL1Tip(l1tip) + + // Collect next transaction data + txdata, err := l.state.TxData(l1tip.ID()) + if err == io.EOF { + l.log.Trace("no transaction data available") + break + } else if err != nil { + l.log.Error("unable to get tx data", "err", err) + break + } + // Record TX Status + if receipt, err := l.sendTransaction(ctx, txdata.Bytes()); err != nil { + l.recordFailedTx(txdata.ID(), err) + } else { + l.recordConfirmedTx(txdata.ID(), receipt) + } } } diff --git a/op-batcher/rpc/api.go b/op-batcher/rpc/api.go index 29db8669bc4c9..a1c4d5b2e8da5 100644 --- a/op-batcher/rpc/api.go +++ b/op-batcher/rpc/api.go @@ -6,7 +6,7 @@ import ( type batcherClient interface { Start() error - Stop() error + Stop(ctx context.Context) error } type adminAPI struct { @@ -23,6 +23,6 @@ func (a *adminAPI) StartBatcher(_ context.Context) error { return a.b.Start() } -func (a *adminAPI) StopBatcher(_ context.Context) error { - return a.b.Stop() +func (a *adminAPI) StopBatcher(ctx context.Context) error { + return a.b.Stop(ctx) } diff --git a/op-e2e/migration_test.go b/op-e2e/migration_test.go index 4e3808b8974ec..c8db45302dd4c 100644 --- a/op-e2e/migration_test.go +++ b/op-e2e/migration_test.go @@ -352,7 +352,9 @@ func TestMigration(t *testing.T) { }, lgr.New("module", "batcher"), batchermetrics.NoopMetrics) require.NoError(t, err) t.Cleanup(func() { - batcher.StopIfRunning() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + batcher.StopIfRunning(ctx) }) proposer, err := l2os.NewL2OutputSubmitterFromCLIConfig(l2os.CLIConfig{ diff --git a/op-e2e/setup.go b/op-e2e/setup.go index 7a23d59f4a5e8..0b958da5eeb74 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -219,7 +219,9 @@ func (sys *System) Close() { sys.L2OutputSubmitter.Stop() } if sys.BatchSubmitter != nil { - sys.BatchSubmitter.StopIfRunning() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + sys.BatchSubmitter.StopIfRunning(ctx) } for _, node := range sys.RollupNodes { diff --git a/op-e2e/system_test.go b/op-e2e/system_test.go index 5ef31be96b2a2..6aa183a6fc9a0 100644 --- a/op-e2e/system_test.go +++ b/op-e2e/system_test.go @@ -1449,7 +1449,7 @@ func TestStopStartBatcher(t *testing.T) { require.Greater(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain did not advance") // stop the batch submission - err = sys.BatchSubmitter.Stop() + err = sys.BatchSubmitter.Stop(context.Background()) require.Nil(t, err) // wait for any old safe blocks being submitted / derived