Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion op-batcher/batcher/batch_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func Main(version string, cliCtx *cli.Context) error {
return err
}
}
defer batchSubmitter.StopIfRunning()
defer batchSubmitter.StopIfRunning(context.Background())

ctx, cancel := context.WithCancel(context.Background())

Expand Down
22 changes: 16 additions & 6 deletions op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var (
ErrMaxDurationReached = errors.New("max channel duration reached")
ErrChannelTimeoutClose = errors.New("close to channel timeout")
ErrSeqWindowClose = errors.New("close to sequencer window timeout")
ErrTerminated = errors.New("channel terminated")
)

type ChannelFullError struct {
Expand Down Expand Up @@ -188,7 +189,7 @@ func (c *channelBuilder) Reset() error {
}

// AddBlock adds a block to the channel compression pipeline. IsFull should be
// called aftewards to test whether the channel is full. If full, a new channel
// called afterwards to test whether the channel is full. If full, a new channel
// must be started.
//
// AddBlock returns a ChannelFullError if called even though the channel is
Expand Down Expand Up @@ -307,16 +308,17 @@ func (c *channelBuilder) IsFull() bool {
// FullErr returns the reason why the channel is full. If not full yet, it
// returns nil.
//
// It returns a ChannelFullError wrapping one of six possible reasons for the
// channel being full:
// It returns a ChannelFullError wrapping one of the following possible reasons
// for the channel being full:
// - ErrInputTargetReached if the target amount of input data has been reached,
// - derive.MaxRLPBytesPerChannel if the general maximum amount of input data
// would have been exceeded by the latest AddBlock call,
// - ErrMaxFrameIndex if the maximum number of frames has been generated
// (uint16),
// - ErrMaxDurationReached if the max channel duration got reached.
// - ErrChannelTimeoutClose if the consensus channel timeout got too close.
// - ErrSeqWindowClose if the end of the sequencer window got too close.
// - ErrMaxDurationReached if the max channel duration got reached,
// - ErrChannelTimeoutClose if the consensus channel timeout got too close,
// - ErrSeqWindowClose if the end of the sequencer window got too close,
// - ErrTerminated if the channel was explicitly terminated.
func (c *channelBuilder) FullErr() error {
return c.fullErr
}
Expand Down Expand Up @@ -402,6 +404,14 @@ func (c *channelBuilder) outputFrame() error {
return err // possibly io.EOF (last frame)
}

// Close immediately marks the channel as full with an ErrTerminated
// if the channel is not already full.
func (c *channelBuilder) Close() {
if !c.IsFull() {
c.setFullErr(ErrTerminated)
}
}

// HasFrame returns whether there's any available frame. If true, it can be
// popped using NextFrame().
//
Expand Down
36 changes: 34 additions & 2 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type channelManager struct {
pendingTransactions map[txID]txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.BlockID

// if set to true, prevents production of any new channel frames
closed bool
}

func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager {
Expand All @@ -60,6 +63,7 @@ func (s *channelManager) Clear() {
s.log.Trace("clearing channel manager state")
s.blocks = s.blocks[:0]
s.tip = common.Hash{}
s.closed = false
s.clearPendingChannel()
}

Expand All @@ -78,6 +82,10 @@ func (s *channelManager) TxFailed(id txID) {
}

s.metr.RecordBatchTxFailed()
if s.closed && len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 {
s.log.Info("Channel has no submitted transactions, clearing for shutdown", "chID", s.pendingChannel.ID())
s.clearPendingChannel()
}
}

// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in
Expand Down Expand Up @@ -179,8 +187,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))

// Short circuit if there is a pending frame.
if dataPending {
// Short circuit if there is a pending frame or the channel manager is closed.
if dataPending || s.closed {
return s.nextTxData()
}

Expand Down Expand Up @@ -344,3 +352,27 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo)
SequenceNumber: l1info.SequenceNumber,
}
}

// Close closes the current pending channel, if one exists, outputs any remaining frames,
// and prevents the creation of any new channels.
// Any outputted frames still need to be published.
func (s *channelManager) Close() error {
if s.closed {
return nil
}

s.closed = true

// Any pending state can be proactively cleared if there are no submitted transactions
if len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 {
s.clearPendingChannel()
}

if s.pendingChannel == nil {
return nil
}

s.pendingChannel.Close()

return s.outputFrames()
}
142 changes: 142 additions & 0 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,145 @@ func TestChannelManager_TxResend(t *testing.T) {
require.NoError(err)
require.Len(fs, 1)
}

// TestChannelManagerCloseBeforeFirstUse ensures that the channel manager
// will not produce any frames if closed immediately.
func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetFrameSize: 0,
MaxFrameSize: 100,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})

a, _ := derivetest.RandomL2Block(rng, 4)

m.Close()

err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")

_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to contain no tx data")
}

// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with no pending channels, and will not emit any new
// channel frames.
func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetFrameSize: 0,
MaxFrameSize: 100,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())

err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")

txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to return valid tx data")

m.TxConfirmed(txdata.ID(), eth.BlockID{})

_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected channel manager to EOF")

m.Close()

err = m.AddL2Block(b)
require.NoError(err, "Failed to add L2 block")

_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to return no new tx data")
}

// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with a pending channel, and will not produce any
// new channel frames after this point.
func TestChannelManagerClosePendingChannel(t *testing.T) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetNumFrames: 100,
TargetFrameSize: 1000,
MaxFrameSize: 1000,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})

a := newMiniL2Block(50_000)
b := newMiniL2BlockWithNumberParent(10, big.NewInt(1), a.Hash())

err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")

txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce valid tx data")

m.TxConfirmed(txdata.ID(), eth.BlockID{})

m.Close()

txdata, err = m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data")

m.TxConfirmed(txdata.ID(), eth.BlockID{})

_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected channel manager to have no more tx data")

err = m.AddL2Block(b)
require.NoError(err, "Failed to add L2 block")

_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}

// TestChannelManagerCloseAllTxsFailed ensures that the channel manager
// can gracefully close after producing transaction frames if none of these
// have successfully landed on chain.
func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetNumFrames: 100,
TargetFrameSize: 1000,
MaxFrameSize: 1000,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})

a := newMiniL2Block(50_000)

err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")

txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce valid tx data")

m.TxFailed(txdata.ID())

// Show that this data will continue to be emitted as long as the transaction
// fails and the channel manager is not closed
txdata, err = m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to re-attempt the failed transaction")

m.TxFailed(txdata.ID())

m.Close()

_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}
Loading