Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type channelManager struct {

// All blocks since the last request for new tx data.
blocks queue.Queue[*types.Block]
// The latest L1 block from all the L2 blocks in the most recently closed channel
l1OriginLastClosedChannel eth.BlockID
// The latest L1 block from all the L2 blocks in the most recently submitted channel.
// Used to track channel duration timeouts.
l1OriginLastSubmittedChannel eth.BlockID
// The default ChannelConfig to use for the next channel
defaultCfg ChannelConfig
// last block hash - for reorg detection
Expand Down Expand Up @@ -75,12 +76,12 @@ func (s *channelManager) SetChannelOutFactory(outFactory ChannelOutFactory) {

// Clear clears the entire state of the channel manager.
// It is intended to be used before launching op-batcher and after an L2 reorg.
func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) {
func (s *channelManager) Clear(l1OriginLastSubmittedChannel eth.BlockID) {
s.mu.Lock()
defer s.mu.Unlock()
s.log.Trace("clearing channel manager state")
s.blocks.Clear()
s.l1OriginLastClosedChannel = l1OriginLastClosedChannel
s.l1OriginLastSubmittedChannel = l1OriginLastSubmittedChannel
s.tip = common.Hash{}
s.closed = false
s.currentChannel = nil
Expand Down Expand Up @@ -160,6 +161,12 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
return txData{}, io.EOF // TODO: not enough data error instead
}
tx := channel.NextTxData()

// update s.l1OriginLastSubmittedChannel so that the next
// channel's duration timeout will trigger properly
if channel.LatestL1Origin().Number > s.l1OriginLastSubmittedChannel.Number {
s.l1OriginLastSubmittedChannel = channel.LatestL1Origin()
}
s.txChannels[tx.ID().String()] = channel
return tx, nil
}
Expand Down Expand Up @@ -284,15 +291,15 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return fmt.Errorf("creating channel out: %w", err)
}

pc := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number, channelOut)
pc := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastSubmittedChannel.Number, channelOut)

s.currentChannel = pc
s.channelQueue = append(s.channelQueue, pc)

s.log.Info("Created channel",
"id", pc.ID(),
"l1Head", l1Head,
"l1OriginLastClosedChannel", s.l1OriginLastClosedChannel,
"l1OriginLastSubmittedChannel", s.l1OriginLastSubmittedChannel,
"blocks_pending", s.blocks.Len(),
"batch_type", cfg.BatchType,
"compression_algo", cfg.CompressorConfig.CompressionAlgo,
Expand Down Expand Up @@ -374,11 +381,6 @@ func (s *channelManager) outputFrames() error {
return nil
}

lastClosedL1Origin := s.currentChannel.LatestL1Origin()
if lastClosedL1Origin.Number > s.l1OriginLastClosedChannel.Number {
s.l1OriginLastClosedChannel = lastClosedL1Origin
}

inBytes, outBytes := s.currentChannel.InputBytes(), s.currentChannel.OutputBytes()
s.metr.RecordChannelClosed(
s.currentChannel.ID(),
Expand All @@ -401,12 +403,11 @@ func (s *channelManager) outputFrames() error {
"input_bytes", inBytes,
"output_bytes", outBytes,
"oldest_l1_origin", s.currentChannel.OldestL1Origin(),
"l1_origin", lastClosedL1Origin,
"l1_origin", s.currentChannel.LatestL1Origin(),
"oldest_l2", s.currentChannel.OldestL2(),
"latest_l2", s.currentChannel.LatestL2(),
"full_reason", s.currentChannel.FullErr(),
"compr_ratio", comprRatio,
"latest_l1_origin", s.l1OriginLastClosedChannel,
)
return nil
}
Expand Down
16 changes: 12 additions & 4 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {

// Channel Manager state should be empty by default
require.Empty(m.blocks)
require.Equal(eth.BlockID{}, m.l1OriginLastClosedChannel)
require.Equal(eth.BlockID{}, m.l1OriginLastSubmittedChannel)
require.Equal(common.Hash{}, m.tip)
require.Nil(m.currentChannel)
require.Empty(m.channelQueue)
Expand Down Expand Up @@ -161,8 +161,8 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
require.NoError(m.outputFrames())
_, err := m.nextTxData(m.currentChannel)
require.NoError(err)
require.NotNil(m.l1OriginLastClosedChannel)
require.Len(m.blocks, 0)
require.NotNil(m.l1OriginLastSubmittedChannel)
require.Equal(newL1Tip, m.tip)
require.Len(m.currentChannel.pendingTransactions, 1)

Expand All @@ -184,7 +184,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {

// Check that the entire channel manager state cleared
require.Empty(m.blocks)
require.Equal(uint64(123), m.l1OriginLastClosedChannel.Number)
require.Equal(uint64(123), m.l1OriginLastSubmittedChannel.Number)
require.Equal(common.Hash{}, m.tip)
require.Nil(m.currentChannel)
require.Empty(m.channelQueue)
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestChannelManager_ChannelCreation(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)

m.l1OriginLastClosedChannel = test.safeL1Block
m.l1OriginLastSubmittedChannel = test.safeL1Block
require.Nil(t, m.currentChannel)

require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))
Expand Down Expand Up @@ -639,13 +639,21 @@ func TestChannelManager_Requeue(t *testing.T) {
// Assert that at least one block was processed into the channel
require.NotContains(t, m.blocks, blockA)

l1OriginBeforeRequeue := m.l1OriginLastSubmittedChannel

// Call the function we are testing
m.Requeue(m.defaultCfg)

// Ensure we got back to the state above
require.Equal(t, m.blocks, stateSnapshot)
require.Empty(t, m.channelQueue)

// Ensure the l1OridingLastSubmittedChannel was
// not changed. This ensures the next channel
// has its duration timeout deadline computed
// properly.
require.Equal(t, l1OriginBeforeRequeue, m.l1OriginLastSubmittedChannel)

// Trigger the blocks -> channelQueue data pipelining again
require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))
require.NotEmpty(t, m.channelQueue)
Expand Down