diff --git a/op-batcher/batcher/channel.go b/op-batcher/batcher/channel.go index 8270977a68c57..b9299d11bf4fa 100644 --- a/op-batcher/batcher/channel.go +++ b/op-batcher/batcher/channel.go @@ -13,30 +13,26 @@ import ( // channel is a lightweight wrapper around a ChannelBuilder which keeps track of pending // and confirmed transactions for a single channel. type channel struct { + *ChannelBuilder // pending channel builder + log log.Logger metr metrics.Metricer cfg ChannelConfig - // pending channel builder - channelBuilder *ChannelBuilder - // Set of unconfirmed txID -> tx data. For tx resubmission - pendingTransactions map[string]txData - // Set of confirmed txID -> inclusion block. For determining if the channel is timed out - confirmedTransactions map[string]eth.BlockID - - // Inclusion block number of first confirmed TX - minInclusionBlock uint64 - // Inclusion block number of last confirmed TX - maxInclusionBlock uint64 + pendingTransactions map[string]txData // Set of unconfirmed txID -> tx data. For tx resubmission + confirmedTransactions map[string]eth.BlockID // Set of confirmed txID -> inclusion block. For determining if the channel is timed out + + minInclusionBlock uint64 // Inclusion block number of first confirmed TX + maxInclusionBlock uint64 // Inclusion block number of last confirmed TX } func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64, channelOut derive.ChannelOut) *channel { cb := NewChannelBuilderWithChannelOut(cfg, rollupCfg, latestL1OriginBlockNum, channelOut) return &channel{ + ChannelBuilder: cb, log: log, metr: metr, cfg: cfg, - channelBuilder: cb, pendingTransactions: make(map[string]txData), confirmedTransactions: make(map[string]eth.BlockID), minInclusionBlock: math.MaxUint64, @@ -51,7 +47,7 @@ func (c *channel) TxFailed(id string) { // Rewind to the first frame of the failed tx // -- the frames are ordered, and we want to send them // all again. - c.channelBuilder.RewindFrameCursor(data.Frames()[0]) + c.RewindFrameCursor(data.Frames()[0]) delete(c.pendingTransactions, id) } else { c.log.Warn("unknown transaction marked as failed", "id", id) @@ -73,7 +69,7 @@ func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) bool { } delete(c.pendingTransactions, id) c.confirmedTransactions[id] = inclusionBlock - c.channelBuilder.FramePublished(inclusionBlock.Number) + c.FramePublished(inclusionBlock.Number) // Update min/max inclusion blocks for timeout check c.minInclusionBlock = min(c.minInclusionBlock, inclusionBlock.Number) @@ -95,11 +91,6 @@ func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) bool { return false } -// Timeout returns the channel timeout L1 block number. If there is no timeout set, it returns 0. -func (c *channel) Timeout() uint64 { - return c.channelBuilder.Timeout() -} - // isTimedOut returns true if submitted channel has timed out. // A channel has timed out if the difference in L1 Inclusion blocks between // the first & last included block is greater than or equal to the channel timeout. @@ -115,14 +106,10 @@ func (c *channel) isFullySubmitted() bool { return c.IsFull() && len(c.pendingTransactions)+c.PendingFrames() == 0 } -func (c *channel) NoneSubmitted() bool { +func (c *channel) noneSubmitted() bool { return len(c.confirmedTransactions) == 0 && len(c.pendingTransactions) == 0 } -func (c *channel) ID() derive.ChannelID { - return c.channelBuilder.ID() -} - // NextTxData dequeues the next frames from the channel and returns them encoded in a tx data packet. // If cfg.UseBlobs is false, it returns txData with a single frame. // If cfg.UseBlobs is true, it will read frames from its channel builder @@ -132,8 +119,8 @@ func (c *channel) ID() derive.ChannelID { func (c *channel) NextTxData() txData { nf := c.cfg.MaxFramesPerTx() txdata := txData{frames: make([]frameData, 0, nf), asBlob: c.cfg.UseBlobs} - for i := 0; i < nf && c.channelBuilder.HasPendingFrame(); i++ { - frame := c.channelBuilder.NextFrame() + for i := 0; i < nf && c.HasPendingFrame(); i++ { + frame := c.NextFrame() txdata.frames = append(txdata.frames, frame) } @@ -147,74 +134,10 @@ func (c *channel) NextTxData() txData { func (c *channel) HasTxData() bool { if c.IsFull() || // If the channel is full, we should start to submit it !c.cfg.UseBlobs { // If using calldata, we only send one frame per tx - return c.channelBuilder.HasPendingFrame() + return c.HasPendingFrame() } // Collect enough frames if channel is not full yet - return c.channelBuilder.PendingFrames() >= int(c.cfg.MaxFramesPerTx()) -} - -func (c *channel) IsFull() bool { - return c.channelBuilder.IsFull() -} - -func (c *channel) FullErr() error { - return c.channelBuilder.FullErr() -} - -func (c *channel) CheckTimeout(l1BlockNum uint64) { - c.channelBuilder.CheckTimeout(l1BlockNum) -} - -func (c *channel) AddBlock(block SizedBlock) (*derive.L1BlockInfo, error) { - return c.channelBuilder.AddBlock(block) -} - -func (c *channel) InputBytes() int { - return c.channelBuilder.InputBytes() -} - -func (c *channel) ReadyBytes() int { - return c.channelBuilder.ReadyBytes() -} - -func (c *channel) OutputBytes() int { - return c.channelBuilder.OutputBytes() -} - -func (c *channel) TotalFrames() int { - return c.channelBuilder.TotalFrames() -} - -func (c *channel) PendingFrames() int { - return c.channelBuilder.PendingFrames() -} - -func (c *channel) OutputFrames() error { - return c.channelBuilder.OutputFrames() -} - -// LatestL1Origin returns the latest L1 block origin from all the L2 blocks that have been added to the channel -func (c *channel) LatestL1Origin() eth.BlockID { - return c.channelBuilder.LatestL1Origin() -} - -// OldestL1Origin returns the oldest L1 block origin from all the L2 blocks that have been added to the channel -func (c *channel) OldestL1Origin() eth.BlockID { - return c.channelBuilder.OldestL1Origin() -} - -// LatestL2 returns the latest L2 block from all the L2 blocks that have been added to the channel -func (c *channel) LatestL2() eth.BlockID { - return c.channelBuilder.LatestL2() -} - -// OldestL2 returns the oldest L2 block from all the L2 blocks that have been added to the channel -func (c *channel) OldestL2() eth.BlockID { - return c.channelBuilder.OldestL2() -} - -func (c *channel) Close() { - c.channelBuilder.Close() + return c.PendingFrames() >= int(c.cfg.MaxFramesPerTx()) } func (c *channel) MaxInclusionBlock() uint64 { diff --git a/op-batcher/batcher/channel_builder.go b/op-batcher/batcher/channel_builder.go index c9f29283cd94d..eab2f20372622 100644 --- a/op-batcher/batcher/channel_builder.go +++ b/op-batcher/batcher/channel_builder.go @@ -133,12 +133,6 @@ func (c *ChannelBuilder) OutputBytes() int { return c.outputBytes } -// Blocks returns a backup list of all blocks that were added to the channel. It -// can be used in case the channel needs to be rebuilt. -func (c *ChannelBuilder) Blocks() []SizedBlock { - return c.blocks -} - // LatestL1Origin returns the latest L1 block origin from all the L2 blocks that have been added to the channel func (c *ChannelBuilder) LatestL1Origin() eth.BlockID { return c.latestL1Origin @@ -213,13 +207,6 @@ func (c *ChannelBuilder) AddBlock(block SizedBlock) (*derive.L1BlockInfo, error) return l1info, nil } -// Timeout management - -// Timeout returns the block number of the channel timeout. If no timeout is set it returns 0 -func (c *ChannelBuilder) Timeout() uint64 { - return c.timeout -} - // FramePublished should be called whenever a frame of this channel got // published with the L1-block number of the block that the frame got included // in. diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index b933de454f2f9..4e1ca799cc375 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -152,12 +152,12 @@ func (s *channelManager) rewindToBlock(block eth.BlockID) { // and removes the channel from the channelQueue, along with // any channels which are newer than the provided channel. func (s *channelManager) handleChannelInvalidated(c *channel) { - if len(c.channelBuilder.blocks) > 0 { + if len(c.ChannelBuilder.blocks) > 0 { // This is usually true, but there is an edge case // where a channel timed out before any blocks got added. // In that case we end up with an empty frame (header only), // and there are no blocks to requeue. - blockID := eth.ToBlockID(c.channelBuilder.blocks[0]) + blockID := eth.ToBlockID(c.ChannelBuilder.blocks[0]) s.rewindToBlock(blockID) } else { s.log.Debug("channelManager.handleChannelInvalidated: channel had no blocks") @@ -176,7 +176,7 @@ func (s *channelManager) handleChannelInvalidated(c *channel) { for i := invalidatedChannelIdx; i < len(s.channelQueue); i++ { s.log.Warn("Dropped channel", "id", s.channelQueue[i].ID(), - "none_submitted", s.channelQueue[i].NoneSubmitted(), + "none_submitted", s.channelQueue[i].noneSubmitted(), "fully_submitted", s.channelQueue[i].isFullySubmitted(), "timed_out", s.channelQueue[i].isTimedOut(), "full_reason", s.channelQueue[i].FullErr(), @@ -232,7 +232,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID, isPectra, isThrottling, forc } // If the channel has already started being submitted, // return now and ensure no requeuing happens - if !channel.NoneSubmitted() { + if !channel.noneSubmitted() { return s.nextTxData(channel) } @@ -472,7 +472,7 @@ func (s *channelManager) outputFrames() error { "compr_ratio", comprRatio, ) - s.currentChannel.channelBuilder.co.DiscardCompressor() // Free up memory by discarding the compressor + s.currentChannel.ChannelBuilder.co.DiscardCompressor() // Free up memory by discarding the compressor return nil } @@ -586,7 +586,7 @@ func (s *channelManager) unsafeBytesInOpenChannels() int64 { var bytesInOpenChannels int64 for _, channel := range s.channelQueue { if channel.TotalFrames() == 0 { - for _, block := range channel.channelBuilder.blocks { + for _, block := range channel.ChannelBuilder.blocks { bytesInOpenChannels += int64(block.EstimatedDABytes()) } } diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index c8bce13262ee9..2aafe1f315d5a 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -153,7 +153,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { // Process the blocks // We should have a pending channel with 1 frame require.NoError(m.processBlocks()) - require.NoError(m.currentChannel.channelBuilder.co.Flush()) + require.NoError(m.currentChannel.ChannelBuilder.co.Flush()) require.NoError(m.outputFrames()) _, err := m.nextTxData(m.currentChannel) require.NoError(err) @@ -270,7 +270,7 @@ func TestChannelManager_ChannelCreation(t *testing.T) { require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{})) require.NotNil(t, m.currentChannel) - require.Equal(t, test.expectedChannelTimeout, m.currentChannel.Timeout()) + require.Equal(t, test.expectedChannelTimeout, m.currentChannel.timeout) }) } } @@ -693,7 +693,7 @@ func TestChannelManager_ChannelOutFactory(t *testing.T) { }) require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{})) - require.IsType(t, &ChannelOutWrapper{}, m.currentChannel.channelBuilder.co) + require.IsType(t, &ChannelOutWrapper{}, m.currentChannel.ChannelBuilder.co) } // TestChannelManager_TxData seeds the channel manager with blocks and triggers the diff --git a/op-batcher/batcher/channel_test.go b/op-batcher/batcher/channel_test.go index 0e1365eceec9c..e1674fe8814bd 100644 --- a/op-batcher/batcher/channel_test.go +++ b/op-batcher/batcher/channel_test.go @@ -113,7 +113,7 @@ func TestChannelManager_NextTxData(t *testing.T) { frameNumber: uint16(0), }, } - channel.channelBuilder.frames = append(channel.channelBuilder.frames, frame) + channel.ChannelBuilder.frames = append(channel.ChannelBuilder.frames, frame) require.Equal(t, 1, channel.PendingFrames()) // Now the nextTxData function should return the frame @@ -142,7 +142,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) { mockframes := makeMockFrameDatas(chID, n+1) // put multiple frames into channel, but less than target - ch.channelBuilder.frames = mockframes[:n-1] + ch.ChannelBuilder.frames = mockframes[:n-1] requireTxData := func(i int) { require.True(ch.HasTxData(), "expected tx data %d", i) @@ -160,7 +160,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) { require.False(ch.HasTxData()) // put in last two - ch.channelBuilder.frames = append(ch.channelBuilder.frames, mockframes[n-1:n+1]...) + ch.ChannelBuilder.frames = append(ch.ChannelBuilder.frames, mockframes[n-1:n+1]...) for i := n - 1; i < n+1; i++ { requireTxData(i) } @@ -183,11 +183,11 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) { mockframes := makeMockFrameDatas(chID, n+1) // put multiple frames into channel, but less than target - ch.channelBuilder.frames = append(ch.channelBuilder.frames, mockframes[:n-1]...) + ch.ChannelBuilder.frames = append(ch.ChannelBuilder.frames, mockframes[:n-1]...) require.False(ch.HasTxData()) // put in last two - ch.channelBuilder.frames = append(ch.channelBuilder.frames, mockframes[n-1:n+1]...) + ch.ChannelBuilder.frames = append(ch.ChannelBuilder.frames, mockframes[n-1:n+1]...) require.True(ch.HasTxData()) txdata := ch.NextTxData() require.Len(txdata.frames, n) @@ -240,7 +240,7 @@ func TestChannelTxConfirmed(t *testing.T) { frameNumber: uint16(0), }, } - m.currentChannel.channelBuilder.frames = append(m.currentChannel.channelBuilder.frames, frame) + m.currentChannel.ChannelBuilder.frames = append(m.currentChannel.ChannelBuilder.frames, frame) require.Equal(t, 1, m.currentChannel.PendingFrames()) returnedTxData, err := m.nextTxData(m.currentChannel) @@ -292,7 +292,7 @@ func TestChannelTxFailed(t *testing.T) { frameNumber: uint16(0), }, } - m.currentChannel.channelBuilder.frames = append(m.currentChannel.channelBuilder.frames, frame) + m.currentChannel.ChannelBuilder.frames = append(m.currentChannel.ChannelBuilder.frames, frame) require.Equal(t, 1, m.currentChannel.PendingFrames()) returnedTxData, err := m.nextTxData(m.currentChannel) expectedTxData := singleFrameTxData(frame) diff --git a/op-batcher/batcher/throttler/linear_strategy_test.go b/op-batcher/batcher/throttler/linear_strategy_test.go index bcef50f375b6a..8080d70ee436d 100644 --- a/op-batcher/batcher/throttler/linear_strategy_test.go +++ b/op-batcher/batcher/throttler/linear_strategy_test.go @@ -46,61 +46,51 @@ func TestLinearStrategy_Update(t *testing.T) { tests := []struct { name string pendingBytes uint64 - targetBytes uint64 expectedIntensity float64 }{ { name: "zero load", pendingBytes: 0, - targetBytes: 0, expectedIntensity: TestIntensityMin, }, { name: "below threshold", pendingBytes: TestLinearThreshold / 2, - targetBytes: 0, expectedIntensity: TestIntensityMin, }, { name: "exactly at threshold", pendingBytes: TestLinearThreshold, - targetBytes: 0, expectedIntensity: TestIntensityMin, }, { name: "25% above threshold", pendingBytes: TestLinearThreshold + TestLinearThreshold/4, - targetBytes: 0, expectedIntensity: 0.25, }, { name: "50% above threshold", pendingBytes: TestLinearThreshold + TestLinearThreshold/2, - targetBytes: 0, expectedIntensity: 0.50, }, { name: "75% above threshold", pendingBytes: TestLinearThreshold + 3*TestLinearThreshold/4, - targetBytes: 0, expectedIntensity: 0.75, }, { name: "100% above threshold (max)", pendingBytes: TestLinearMaxThreshold, - targetBytes: 0, expectedIntensity: TestIntensityMax, }, { name: "beyond max threshold", pendingBytes: TestLinearMaxThreshold * 2, - targetBytes: 0, expectedIntensity: TestIntensityMax, }, { name: "with target bytes ignored", pendingBytes: TestLinearThreshold + TestLinearThreshold/2, - targetBytes: TestLinearThreshold * 10, // Target bytes should be ignored expectedIntensity: 0.50, }, } @@ -173,12 +163,10 @@ func TestLinearStrategy_Reset(t *testing.T) { func TestLinearStrategy_EdgeCases(t *testing.T) { t.Run("max threshold less than threshold", func(t *testing.T) { - require.Panics(t, func() { // Test when multiplier results in maxThreshold <= threshold NewLinearStrategy(TestLinearThreshold, 0, newTestLogger(t)) }) - }) t.Run("very large multiplier", func(t *testing.T) { diff --git a/op-batcher/batcher/throttler/pid_strategy_test.go b/op-batcher/batcher/throttler/pid_strategy_test.go index 835e9ece0290f..1bd4050da3c1f 100644 --- a/op-batcher/batcher/throttler/pid_strategy_test.go +++ b/op-batcher/batcher/throttler/pid_strategy_test.go @@ -299,31 +299,26 @@ func TestPIDStrategy_ErrorCalculation(t *testing.T) { testCases := []struct { name string pendingBytes uint64 - targetBytes uint64 expectError bool }{ { name: "no error at threshold", pendingBytes: TestPIDThreshold, - targetBytes: TestPIDThreshold, expectError: false, }, { name: "error above threshold", pendingBytes: TestPIDThreshold * 2, - targetBytes: TestPIDThreshold, expectError: true, }, { name: "no error below threshold", pendingBytes: TestPIDThreshold / 2, - targetBytes: TestPIDThreshold, expectError: false, }, { name: "error with different target", pendingBytes: TestPIDThreshold * 2, - targetBytes: TestPIDThreshold / 2, expectError: true, }, } diff --git a/op-batcher/batcher/throttler/quadratic_strategy_test.go b/op-batcher/batcher/throttler/quadratic_strategy_test.go index a88c0f0755b61..a6b44e1789fd0 100644 --- a/op-batcher/batcher/throttler/quadratic_strategy_test.go +++ b/op-batcher/batcher/throttler/quadratic_strategy_test.go @@ -46,62 +46,52 @@ func TestQuadraticStrategy_Update(t *testing.T) { tests := []struct { name string pendingBytes uint64 - targetBytes uint64 expectedIntensity float64 }{ { name: "zero load", pendingBytes: 0, - targetBytes: 0, expectedIntensity: TestIntensityMin, }, { name: "below threshold", pendingBytes: TestQuadraticThreshold / 2, - targetBytes: 0, expectedIntensity: TestIntensityMin, }, { name: "exactly at threshold", pendingBytes: TestQuadraticThreshold, - targetBytes: 0, expectedIntensity: TestIntensityMin, }, { name: "25% above threshold", pendingBytes: TestQuadraticThreshold + TestQuadraticThreshold/4, - targetBytes: 0, expectedIntensity: 0.0625, // (0.25)^2 }, { name: "50% above threshold", pendingBytes: TestQuadraticThreshold + TestQuadraticThreshold/2, - targetBytes: 0, expectedIntensity: 0.25, // (0.5)^2 }, { name: "75% above threshold", pendingBytes: TestQuadraticThreshold + 3*TestQuadraticThreshold/4, - targetBytes: 0, expectedIntensity: 0.5625, // (0.75)^2 }, { name: "100% above threshold (max)", pendingBytes: TestQuadraticMaxThreshold, - targetBytes: 0, expectedIntensity: TestIntensityMax, }, { name: "beyond max threshold", pendingBytes: TestQuadraticMaxThreshold * 2, - targetBytes: 0, expectedIntensity: TestIntensityMax, }, { name: "with target bytes ignored", pendingBytes: TestQuadraticThreshold + TestQuadraticThreshold/2, - targetBytes: TestQuadraticThreshold * 10, // Target bytes should be ignored - expectedIntensity: 0.25, // (0.5)^2 + expectedIntensity: 0.25, // (0.5)^2 }, } diff --git a/op-batcher/batcher/throttler/step_strategy_test.go b/op-batcher/batcher/throttler/step_strategy_test.go index a4e668ee09484..7c7f2d7d73163 100644 --- a/op-batcher/batcher/throttler/step_strategy_test.go +++ b/op-batcher/batcher/throttler/step_strategy_test.go @@ -38,43 +38,36 @@ func TestStepStrategy_Update(t *testing.T) { tests := []struct { name string pendingBytes uint64 - targetBytes uint64 expectedIntensity float64 }{ { name: "zero load", pendingBytes: 0, - targetBytes: 0, expectedIntensity: TestIntensityMin, }, { name: "below threshold", pendingBytes: TestStepThreshold / 2, - targetBytes: 0, expectedIntensity: TestIntensityMin, }, { name: "exactly at threshold", pendingBytes: TestStepThreshold, - targetBytes: 0, expectedIntensity: TestIntensityMin, }, { name: "just above threshold", pendingBytes: TestStepThreshold + 1, - targetBytes: 0, expectedIntensity: TestIntensityMax, }, { name: "far above threshold", pendingBytes: TestStepThreshold * 10, - targetBytes: 0, expectedIntensity: TestIntensityMax, }, { name: "with target bytes ignored", pendingBytes: TestStepThreshold + 1000, - targetBytes: TestStepThreshold * 2, // Target bytes should be ignored in step strategy expectedIntensity: TestIntensityMax, }, }