Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,12 @@ func (s *channelManager) TxData(l1Head eth.BlockID, isPectra bool, isThrottling

// pubInfo is a struct that contains signal information sent on the publishSignal channel
type pubInfo struct {
forcePublish bool
publishingBacklog bool // publishingBacklog is set to true if there are more blocks to be processed while loading blocks into state
// forcePublish is set to true if the current channel should be force-closed and submitted now.
forcePublish bool

// ignoreMaxChannelDuration is set to true if we should keep the current channel open even if it's duration is exceeded.
// For example, if we know there are more blocks to load and we want to pack those into the current channel before sending it.
ignoreMaxChannelDuration bool
}

// getReadyChannel returns the next channel ready to submit data, or an error.
Expand Down Expand Up @@ -319,10 +323,12 @@ func (s *channelManager) getReadyChannel(l1Head eth.BlockID, pi pubInfo) (*chann
return nil, err
}

if !pi.publishingBacklog {
// Register current L1 head only after all pending blocks have been
// processed. Even if a timeout will be triggered now, it is better to have
// all pending blocks be included in this channel for submission.
if !pi.ignoreMaxChannelDuration {
// Register current L1 head (which checks for the max duration timeout)
// only after all blocks in the manager's state have been
// processed, and only if we weren't told to ignore the max channel duration.
// The aim is to prefer to optimally pack blocks into channels when
// instead of timing out the channel when more blocks soon to be processed.
s.registerL1Block(l1Head)
}

Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/channel_manager_memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func runMemoryTest(t *testing.T, batchType uint, compressorType string, compress
require.NoError(t, m.processBlocks())

// Try to get transaction data to fill channels
_, err := m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false})
_, err := m.TxData(eth.BlockID{}, false, false, pubInfo{})
// It's okay if there's no data ready (io.EOF)
if err != nil && err.Error() != "EOF" {
require.NoError(t, err)
Expand Down
39 changes: 19 additions & 20 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) {

require.NoError(t, m.AddL2Block(a))

_, err := m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false})
_, err := m.TxData(eth.BlockID{}, false, false, pubInfo{})
require.NoError(t, err)
_, err = m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false})
_, err = m.TxData(eth.BlockID{}, false, false, pubInfo{})
require.ErrorIs(t, err, io.EOF)

require.ErrorIs(t, m.AddL2Block(x), ErrReorg)
Expand Down Expand Up @@ -207,21 +207,21 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) {

require.NoError(m.AddL2Block(a))

txdata0, err := m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false})
txdata0, err := m.TxData(eth.BlockID{}, false, false, pubInfo{})
require.NoError(err)
txdata0bytes := txdata0.CallData()
data0 := make([]byte, len(txdata0bytes))
// make sure we have a clone for later comparison
copy(data0, txdata0bytes)

// ensure channel is drained
_, err = m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false})
_, err = m.TxData(eth.BlockID{}, false, false, pubInfo{})
require.ErrorIs(err, io.EOF)

// requeue frame
m.TxFailed(txdata0.ID())

txdata1, err := m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false})
txdata1, err := m.TxData(eth.BlockID{}, false, false, pubInfo{})
require.NoError(err)

data1 := txdata1.CallData()
Expand Down Expand Up @@ -318,9 +318,9 @@ func newFakeDynamicEthChannelConfig(lgr log.Logger,
}
}

// TestChannelManager_PublishingBacklog tests that the channel manager will not time out
// when publishingBacklog is set to true in the signal struct.
func TestChannelManager_PublishingBacklog(t *testing.T) {
// TestChannelManager_IgnoreMaxChannelDuration tests that the channel manager will not time out
// when ignoreMaxChannelDuration is set to true in the signal struct.
func TestChannelManager_IgnoreMaxChannelDuration(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)

cfg := channelManagerTestConfig(10000, derive.SingularBatchType)
Expand All @@ -336,10 +336,9 @@ func TestChannelManager_PublishingBacklog(t *testing.T) {
m.blocks.Enqueue(SizedBlock{Block: block})
}

// Call TxData a first time - if `publishingBacklog` is `false`, channel would be timed out, but since `publishingBacklog`
// is set to true here, we essentially ignore the registering of L1 block, so channel doesn't time out, and
// io.EOF is expected here.
_, err := m.TxData(eth.BlockID{Number: 21}, false, false, pubInfo{forcePublish: false, publishingBacklog: true})
// Call TxData a first time - if `ignoreMaxChannelDuration` is `false`, channel would be timed out,
// but since `ignoreMaxChannelDuration` is `true`, we expect it to be not timed out.
_, err := m.TxData(eth.BlockID{Number: 21}, false, false, pubInfo{ignoreMaxChannelDuration: true})
require.ErrorIs(t, err, io.EOF)

// Add more blocks to the channel manager
Expand All @@ -351,12 +350,12 @@ func TestChannelManager_PublishingBacklog(t *testing.T) {
require.NotEmpty(t, m.channelQueue)
require.False(t, m.channelQueue[0].IsFull())

// Call TxData again, with publishingBacklog set to false
_, err = m.TxData(eth.BlockID{Number: 22}, false, false, pubInfo{forcePublish: false, publishingBacklog: false})
// Call TxData again, with ignoreMaxChannelDuration unset.
_, err = m.TxData(eth.BlockID{Number: 22}, false, false, pubInfo{})
require.NoError(t, err)
require.NotEmpty(t, m.channelQueue)

// Given that `publishingBacklog` is set to false, the channel should be timed out
// Given that ignoreMaxChannelDuration was unset, the channel should be timed out
require.True(t, m.channelQueue[0].IsFull())
require.ErrorIs(t, m.channelQueue[0].FullErr(), ErrMaxDurationReached)
}
Expand Down Expand Up @@ -407,7 +406,7 @@ func TestChannelManager_TxData(t *testing.T) {
m.blocks = queue.Queue[SizedBlock]{SizedBlock{Block: blockA}}

// Call TxData a first time to trigger blocks->channels pipeline
_, err := m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false})
_, err := m.TxData(eth.BlockID{}, false, false, pubInfo{})
require.ErrorIs(t, err, io.EOF)

// The test requires us to have something in the channel queue
Expand All @@ -426,7 +425,7 @@ func TestChannelManager_TxData(t *testing.T) {
var data txData
for {
m.blocks.Enqueue(SizedBlock{Block: blockA})
data, err = m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false})
data, err = m.TxData(eth.BlockID{}, false, false, pubInfo{})
if err == nil && data.Len() > 0 {
break
}
Expand Down Expand Up @@ -754,7 +753,7 @@ func TestChannelManager_TxData_ForcePublish(t *testing.T) {
m.blocks = queue.Queue[SizedBlock]{SizedBlock{Block: blockA}}

// Call TxData a first time to trigger blocks->channels pipeline
txData, err := m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: false, publishingBacklog: false})
txData, err := m.TxData(eth.BlockID{}, false, false, pubInfo{})
require.ErrorIs(t, err, io.EOF)
require.Zero(t, txData.Len(), 0)

Expand All @@ -764,7 +763,7 @@ func TestChannelManager_TxData_ForcePublish(t *testing.T) {
require.False(t, m.channelQueue[0].IsFull())

// Call TxData with force publish enabled
txData, err = m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: true, publishingBacklog: false})
txData, err = m.TxData(eth.BlockID{}, false, false, pubInfo{forcePublish: true})

// Despite no additional blocks being added, we should have tx data:
require.NoError(t, err)
Expand Down Expand Up @@ -871,7 +870,7 @@ func TestChannelManagerUnsafeBytes(t *testing.T) {
_, err = manager.TxData(eth.BlockID{
Hash: common.Hash{},
Number: 0,
}, true, false, pubInfo{forcePublish: false, publishingBacklog: false})
}, true, false, pubInfo{})
}

assert.Equal(t, tc.afterAddingToChannel, manager.UnsafeDABytes())
Expand Down
8 changes: 4 additions & 4 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (l *BatchSubmitter) Flush(ctx context.Context) error {
}

l.Log.Info("Flushing Batch Submitter")
l.tryPublishSignal(l.publishSignal, pubInfo{forcePublish: true, publishingBacklog: false})
l.tryPublishSignal(l.publishSignal, pubInfo{forcePublish: true})
return nil
}

Expand Down Expand Up @@ -302,7 +302,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context, start, end uin
// This allows the batcher to start publishing sooner in the
// case of a large backlog of blocks to load.
l.sendToThrottlingLoop(unsafeBytesUpdated)
l.tryPublishSignal(publishSignal, pubInfo{publishingBacklog: i < end, forcePublish: false})
l.tryPublishSignal(publishSignal, pubInfo{ignoreMaxChannelDuration: i < end})
}

}
Expand Down Expand Up @@ -436,7 +436,7 @@ func (l *BatchSubmitter) sendToThrottlingLoop(unsafeBytesUpdated chan int64) {
}
}

// trySignal tries to send an empty struct on the provided channel.
// trySignal tries to send the provided value on the provided channel.
// It is not blocking, no signal will be sent if the channel is full.
func (l *BatchSubmitter) tryPublishSignal(c chan pubInfo, value pubInfo) {
select {
Expand Down Expand Up @@ -556,7 +556,7 @@ func (l *BatchSubmitter) blockLoadingLoop(ctx context.Context, wg *sync.WaitGrou
l.sendToThrottlingLoop(unsafeBytesUpdated) // we have increased the unsafe data. Signal the throttling loop to check if it should throttle.
}
}
l.tryPublishSignal(publishSignal, pubInfo{forcePublish: false, publishingBacklog: false}) // always signal the write loop to ensure we periodically publish even if we aren't loading blocks
l.tryPublishSignal(publishSignal, pubInfo{}) // always signal the publishing loop to ensure we periodically publish even if we aren't loading blocks
case <-ctx.Done():
l.Log.Info("blockLoadingLoop returning")
return
Expand Down