diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 44538bed94e..f65844a6745 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -268,8 +268,12 @@ func (s *channelManager) TxData(l1Head eth.BlockID, isPectra bool, isThrottling // pubInfo is a struct that contains signal information sent on the publishSignal channel type pubInfo struct { - forcePublish bool - publishingBacklog bool // publishingBacklog is set to true if there are more blocks to be processed while loading blocks into state + // forcePublish is set to true if the current channel should be force-closed and submitted now. + forcePublish bool + + // ignoreMaxChannelDuration is set to true if we should keep the current channel open even if it's duration is exceeded. + // For example, if we know there are more blocks to load and we want to pack those into the current channel before sending it. + ignoreMaxChannelDuration bool } // getReadyChannel returns the next channel ready to submit data, or an error. @@ -319,10 +323,12 @@ func (s *channelManager) getReadyChannel(l1Head eth.BlockID, pi pubInfo) (*chann return nil, err } - if !pi.publishingBacklog { - // Register current L1 head only after all pending blocks have been - // processed. Even if a timeout will be triggered now, it is better to have - // all pending blocks be included in this channel for submission. + if !pi.ignoreMaxChannelDuration { + // Register current L1 head (which checks for the max duration timeout) + // only after all blocks in the manager's state have been + // processed, and only if we weren't told to ignore the max channel duration. + // The aim is to prefer to optimally pack blocks into channels when + // instead of timing out the channel when more blocks soon to be processed. s.registerL1Block(l1Head) } diff --git a/op-batcher/batcher/channel_manager_memory_test.go b/op-batcher/batcher/channel_manager_memory_test.go index 21a406a214a..fe305aa2fe6 100644 --- a/op-batcher/batcher/channel_manager_memory_test.go +++ b/op-batcher/batcher/channel_manager_memory_test.go @@ -124,7 +124,7 @@ func runMemoryTest(t *testing.T, batchType uint, compressorType string, compress require.NoError(t, m.processBlocks()) // Try to get transaction data to fill channels - _, err := m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false}) + _, err := m.TxData(eth.BlockID{}, false, false, pubInfo{}) // It's okay if there's no data ready (io.EOF) if err != nil && err.Error() != "EOF" { require.NoError(t, err) diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index 98dab9ae515..0c4feb9e0eb 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -103,9 +103,9 @@ func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) { require.NoError(t, m.AddL2Block(a)) - _, err := m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false}) + _, err := m.TxData(eth.BlockID{}, false, false, pubInfo{}) require.NoError(t, err) - _, err = m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false}) + _, err = m.TxData(eth.BlockID{}, false, false, pubInfo{}) require.ErrorIs(t, err, io.EOF) require.ErrorIs(t, m.AddL2Block(x), ErrReorg) @@ -207,7 +207,7 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) { require.NoError(m.AddL2Block(a)) - txdata0, err := m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false}) + txdata0, err := m.TxData(eth.BlockID{}, false, false, pubInfo{}) require.NoError(err) txdata0bytes := txdata0.CallData() data0 := make([]byte, len(txdata0bytes)) @@ -215,13 +215,13 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) { copy(data0, txdata0bytes) // ensure channel is drained - _, err = m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false}) + _, err = m.TxData(eth.BlockID{}, false, false, pubInfo{}) require.ErrorIs(err, io.EOF) // requeue frame m.TxFailed(txdata0.ID()) - txdata1, err := m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false}) + txdata1, err := m.TxData(eth.BlockID{}, false, false, pubInfo{}) require.NoError(err) data1 := txdata1.CallData() @@ -318,9 +318,9 @@ func newFakeDynamicEthChannelConfig(lgr log.Logger, } } -// TestChannelManager_PublishingBacklog tests that the channel manager will not time out -// when publishingBacklog is set to true in the signal struct. -func TestChannelManager_PublishingBacklog(t *testing.T) { +// TestChannelManager_IgnoreMaxChannelDuration tests that the channel manager will not time out +// when ignoreMaxChannelDuration is set to true in the signal struct. +func TestChannelManager_IgnoreMaxChannelDuration(t *testing.T) { l := testlog.Logger(t, log.LevelCrit) cfg := channelManagerTestConfig(10000, derive.SingularBatchType) @@ -336,10 +336,9 @@ func TestChannelManager_PublishingBacklog(t *testing.T) { m.blocks.Enqueue(SizedBlock{Block: block}) } - // Call TxData a first time - if `publishingBacklog` is `false`, channel would be timed out, but since `publishingBacklog` - // is set to true here, we essentially ignore the registering of L1 block, so channel doesn't time out, and - // io.EOF is expected here. - _, err := m.TxData(eth.BlockID{Number: 21}, false, false, pubInfo{forcePublish: false, publishingBacklog: true}) + // Call TxData a first time - if `ignoreMaxChannelDuration` is `false`, channel would be timed out, + // but since `ignoreMaxChannelDuration` is `true`, we expect it to be not timed out. + _, err := m.TxData(eth.BlockID{Number: 21}, false, false, pubInfo{ignoreMaxChannelDuration: true}) require.ErrorIs(t, err, io.EOF) // Add more blocks to the channel manager @@ -351,12 +350,12 @@ func TestChannelManager_PublishingBacklog(t *testing.T) { require.NotEmpty(t, m.channelQueue) require.False(t, m.channelQueue[0].IsFull()) - // Call TxData again, with publishingBacklog set to false - _, err = m.TxData(eth.BlockID{Number: 22}, false, false, pubInfo{forcePublish: false, publishingBacklog: false}) + // Call TxData again, with ignoreMaxChannelDuration unset. + _, err = m.TxData(eth.BlockID{Number: 22}, false, false, pubInfo{}) require.NoError(t, err) require.NotEmpty(t, m.channelQueue) - // Given that `publishingBacklog` is set to false, the channel should be timed out + // Given that ignoreMaxChannelDuration was unset, the channel should be timed out require.True(t, m.channelQueue[0].IsFull()) require.ErrorIs(t, m.channelQueue[0].FullErr(), ErrMaxDurationReached) } @@ -407,7 +406,7 @@ func TestChannelManager_TxData(t *testing.T) { m.blocks = queue.Queue[SizedBlock]{SizedBlock{Block: blockA}} // Call TxData a first time to trigger blocks->channels pipeline - _, err := m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false}) + _, err := m.TxData(eth.BlockID{}, false, false, pubInfo{}) require.ErrorIs(t, err, io.EOF) // The test requires us to have something in the channel queue @@ -426,7 +425,7 @@ func TestChannelManager_TxData(t *testing.T) { var data txData for { m.blocks.Enqueue(SizedBlock{Block: blockA}) - data, err = m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false}) + data, err = m.TxData(eth.BlockID{}, false, false, pubInfo{}) if err == nil && data.Len() > 0 { break } @@ -754,7 +753,7 @@ func TestChannelManager_TxData_ForcePublish(t *testing.T) { m.blocks = queue.Queue[SizedBlock]{SizedBlock{Block: blockA}} // Call TxData a first time to trigger blocks->channels pipeline - txData, err := m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false}) + txData, err := m.TxData(eth.BlockID{}, false, false, pubInfo{}) require.ErrorIs(t, err, io.EOF) require.Zero(t, txData.Len(), 0) @@ -764,7 +763,7 @@ func TestChannelManager_TxData_ForcePublish(t *testing.T) { require.False(t, m.channelQueue[0].IsFull()) // Call TxData with force publish enabled - txData, err = m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: true, publishingBacklog: false}) + txData, err = m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: true}) // Despite no additional blocks being added, we should have tx data: require.NoError(t, err) @@ -871,7 +870,7 @@ func TestChannelManagerUnsafeBytes(t *testing.T) { _, err = manager.TxData(eth.BlockID{ Hash: common.Hash{}, Number: 0, - }, true, false, pubInfo{forcePublish: false, publishingBacklog: false}) + }, true, false, pubInfo{}) } assert.Equal(t, tc.afterAddingToChannel, manager.UnsafeDABytes()) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 356256f411c..7cce08003ef 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -268,7 +268,7 @@ func (l *BatchSubmitter) Flush(ctx context.Context) error { } l.Log.Info("Flushing Batch Submitter") - l.tryPublishSignal(l.publishSignal, pubInfo{forcePublish: true, publishingBacklog: false}) + l.tryPublishSignal(l.publishSignal, pubInfo{forcePublish: true}) return nil } @@ -302,7 +302,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context, start, end uin // This allows the batcher to start publishing sooner in the // case of a large backlog of blocks to load. l.sendToThrottlingLoop(unsafeBytesUpdated) - l.tryPublishSignal(publishSignal, pubInfo{publishingBacklog: i < end, forcePublish: false}) + l.tryPublishSignal(publishSignal, pubInfo{ignoreMaxChannelDuration: i < end}) } } @@ -436,7 +436,7 @@ func (l *BatchSubmitter) sendToThrottlingLoop(unsafeBytesUpdated chan int64) { } } -// trySignal tries to send an empty struct on the provided channel. +// trySignal tries to send the provided value on the provided channel. // It is not blocking, no signal will be sent if the channel is full. func (l *BatchSubmitter) tryPublishSignal(c chan pubInfo, value pubInfo) { select { @@ -556,7 +556,7 @@ func (l *BatchSubmitter) blockLoadingLoop(ctx context.Context, wg *sync.WaitGrou l.sendToThrottlingLoop(unsafeBytesUpdated) // we have increased the unsafe data. Signal the throttling loop to check if it should throttle. } } - l.tryPublishSignal(publishSignal, pubInfo{forcePublish: false, publishingBacklog: false}) // always signal the write loop to ensure we periodically publish even if we aren't loading blocks + l.tryPublishSignal(publishSignal, pubInfo{}) // always signal the publishing loop to ensure we periodically publish even if we aren't loading blocks case <-ctx.Done(): l.Log.Info("blockLoadingLoop returning") return