diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 06403645ea4d2..8e9870413a405 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -39,8 +39,9 @@ type channelManager struct { // All blocks since the last request for new tx data. blocks queue.Queue[*types.Block] - // The latest L1 block from all the L2 blocks in the most recently closed channel - l1OriginLastClosedChannel eth.BlockID + // The latest L1 block from all the L2 blocks in the most recently submitted channel. + // Used to track channel duration timeouts. + l1OriginLastSubmittedChannel eth.BlockID // The default ChannelConfig to use for the next channel defaultCfg ChannelConfig // last block hash - for reorg detection @@ -75,12 +76,12 @@ func (s *channelManager) SetChannelOutFactory(outFactory ChannelOutFactory) { // Clear clears the entire state of the channel manager. // It is intended to be used before launching op-batcher and after an L2 reorg. -func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) { +func (s *channelManager) Clear(l1OriginLastSubmittedChannel eth.BlockID) { s.mu.Lock() defer s.mu.Unlock() s.log.Trace("clearing channel manager state") s.blocks.Clear() - s.l1OriginLastClosedChannel = l1OriginLastClosedChannel + s.l1OriginLastSubmittedChannel = l1OriginLastSubmittedChannel s.tip = common.Hash{} s.closed = false s.currentChannel = nil @@ -160,6 +161,12 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) { return txData{}, io.EOF // TODO: not enough data error instead } tx := channel.NextTxData() + + // update s.l1OriginLastSubmittedChannel so that the next + // channel's duration timeout will trigger properly + if channel.LatestL1Origin().Number > s.l1OriginLastSubmittedChannel.Number { + s.l1OriginLastSubmittedChannel = channel.LatestL1Origin() + } s.txChannels[tx.ID().String()] = channel return tx, nil } @@ -284,7 +291,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { return fmt.Errorf("creating channel out: %w", err) } - pc := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number, channelOut) + pc := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastSubmittedChannel.Number, channelOut) s.currentChannel = pc s.channelQueue = append(s.channelQueue, pc) @@ -292,7 +299,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { s.log.Info("Created channel", "id", pc.ID(), "l1Head", l1Head, - "l1OriginLastClosedChannel", s.l1OriginLastClosedChannel, + "l1OriginLastSubmittedChannel", s.l1OriginLastSubmittedChannel, "blocks_pending", s.blocks.Len(), "batch_type", cfg.BatchType, "compression_algo", cfg.CompressorConfig.CompressionAlgo, @@ -374,11 +381,6 @@ func (s *channelManager) outputFrames() error { return nil } - lastClosedL1Origin := s.currentChannel.LatestL1Origin() - if lastClosedL1Origin.Number > s.l1OriginLastClosedChannel.Number { - s.l1OriginLastClosedChannel = lastClosedL1Origin - } - inBytes, outBytes := s.currentChannel.InputBytes(), s.currentChannel.OutputBytes() s.metr.RecordChannelClosed( s.currentChannel.ID(), @@ -401,12 +403,11 @@ func (s *channelManager) outputFrames() error { "input_bytes", inBytes, "output_bytes", outBytes, "oldest_l1_origin", s.currentChannel.OldestL1Origin(), - "l1_origin", lastClosedL1Origin, + "l1_origin", s.currentChannel.LatestL1Origin(), "oldest_l2", s.currentChannel.OldestL2(), "latest_l2", s.currentChannel.LatestL2(), "full_reason", s.currentChannel.FullErr(), "compr_ratio", comprRatio, - "latest_l1_origin", s.l1OriginLastClosedChannel, ) return nil } diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index 8dcb0745c1642..d3a67a1bf1f4b 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -130,7 +130,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { // Channel Manager state should be empty by default require.Empty(m.blocks) - require.Equal(eth.BlockID{}, m.l1OriginLastClosedChannel) + require.Equal(eth.BlockID{}, m.l1OriginLastSubmittedChannel) require.Equal(common.Hash{}, m.tip) require.Nil(m.currentChannel) require.Empty(m.channelQueue) @@ -161,8 +161,8 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { require.NoError(m.outputFrames()) _, err := m.nextTxData(m.currentChannel) require.NoError(err) - require.NotNil(m.l1OriginLastClosedChannel) require.Len(m.blocks, 0) + require.NotNil(m.l1OriginLastSubmittedChannel) require.Equal(newL1Tip, m.tip) require.Len(m.currentChannel.pendingTransactions, 1) @@ -184,7 +184,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { // Check that the entire channel manager state cleared require.Empty(m.blocks) - require.Equal(uint64(123), m.l1OriginLastClosedChannel.Number) + require.Equal(uint64(123), m.l1OriginLastSubmittedChannel.Number) require.Equal(common.Hash{}, m.tip) require.Nil(m.currentChannel) require.Empty(m.channelQueue) @@ -475,7 +475,7 @@ func TestChannelManager_ChannelCreation(t *testing.T) { t.Run(test.name, func(t *testing.T) { m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) - m.l1OriginLastClosedChannel = test.safeL1Block + m.l1OriginLastSubmittedChannel = test.safeL1Block require.Nil(t, m.currentChannel) require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{})) @@ -639,6 +639,8 @@ func TestChannelManager_Requeue(t *testing.T) { // Assert that at least one block was processed into the channel require.NotContains(t, m.blocks, blockA) + l1OriginBeforeRequeue := m.l1OriginLastSubmittedChannel + // Call the function we are testing m.Requeue(m.defaultCfg) @@ -646,6 +648,12 @@ func TestChannelManager_Requeue(t *testing.T) { require.Equal(t, m.blocks, stateSnapshot) require.Empty(t, m.channelQueue) + // Ensure the l1OridingLastSubmittedChannel was + // not changed. This ensures the next channel + // has its duration timeout deadline computed + // properly. + require.Equal(t, l1OriginBeforeRequeue, m.l1OriginLastSubmittedChannel) + // Trigger the blocks -> channelQueue data pipelining again require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{})) require.NotEmpty(t, m.channelQueue)