Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 15 additions & 92 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,26 @@ import (
// channel is a lightweight wrapper around a ChannelBuilder which keeps track of pending
// and confirmed transactions for a single channel.
type channel struct {
*ChannelBuilder // pending channel builder

log log.Logger
metr metrics.Metricer
cfg ChannelConfig

// pending channel builder
channelBuilder *ChannelBuilder
// Set of unconfirmed txID -> tx data. For tx resubmission
pendingTransactions map[string]txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[string]eth.BlockID

// Inclusion block number of first confirmed TX
minInclusionBlock uint64
// Inclusion block number of last confirmed TX
maxInclusionBlock uint64
pendingTransactions map[string]txData // Set of unconfirmed txID -> tx data. For tx resubmission
confirmedTransactions map[string]eth.BlockID // Set of confirmed txID -> inclusion block. For determining if the channel is timed out

minInclusionBlock uint64 // Inclusion block number of first confirmed TX
maxInclusionBlock uint64 // Inclusion block number of last confirmed TX
}

func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64, channelOut derive.ChannelOut) *channel {
cb := NewChannelBuilderWithChannelOut(cfg, rollupCfg, latestL1OriginBlockNum, channelOut)
return &channel{
ChannelBuilder: cb,
log: log,
metr: metr,
cfg: cfg,
channelBuilder: cb,
pendingTransactions: make(map[string]txData),
confirmedTransactions: make(map[string]eth.BlockID),
minInclusionBlock: math.MaxUint64,
Expand All @@ -51,7 +47,7 @@ func (c *channel) TxFailed(id string) {
// Rewind to the first frame of the failed tx
// -- the frames are ordered, and we want to send them
// all again.
c.channelBuilder.RewindFrameCursor(data.Frames()[0])
c.RewindFrameCursor(data.Frames()[0])
delete(c.pendingTransactions, id)
} else {
c.log.Warn("unknown transaction marked as failed", "id", id)
Expand All @@ -73,7 +69,7 @@ func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) bool {
}
delete(c.pendingTransactions, id)
c.confirmedTransactions[id] = inclusionBlock
c.channelBuilder.FramePublished(inclusionBlock.Number)
c.FramePublished(inclusionBlock.Number)

// Update min/max inclusion blocks for timeout check
c.minInclusionBlock = min(c.minInclusionBlock, inclusionBlock.Number)
Expand All @@ -95,11 +91,6 @@ func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) bool {
return false
}

// Timeout returns the channel timeout L1 block number. If there is no timeout set, it returns 0.
func (c *channel) Timeout() uint64 {
return c.channelBuilder.Timeout()
}

// isTimedOut returns true if submitted channel has timed out.
// A channel has timed out if the difference in L1 Inclusion blocks between
// the first & last included block is greater than or equal to the channel timeout.
Expand All @@ -115,14 +106,10 @@ func (c *channel) isFullySubmitted() bool {
return c.IsFull() && len(c.pendingTransactions)+c.PendingFrames() == 0
}

func (c *channel) NoneSubmitted() bool {
func (c *channel) noneSubmitted() bool {
return len(c.confirmedTransactions) == 0 && len(c.pendingTransactions) == 0
}

func (c *channel) ID() derive.ChannelID {
return c.channelBuilder.ID()
}

// NextTxData dequeues the next frames from the channel and returns them encoded in a tx data packet.
// If cfg.UseBlobs is false, it returns txData with a single frame.
// If cfg.UseBlobs is true, it will read frames from its channel builder
Expand All @@ -132,8 +119,8 @@ func (c *channel) ID() derive.ChannelID {
func (c *channel) NextTxData() txData {
nf := c.cfg.MaxFramesPerTx()
txdata := txData{frames: make([]frameData, 0, nf), asBlob: c.cfg.UseBlobs}
for i := 0; i < nf && c.channelBuilder.HasPendingFrame(); i++ {
frame := c.channelBuilder.NextFrame()
for i := 0; i < nf && c.HasPendingFrame(); i++ {
frame := c.NextFrame()
txdata.frames = append(txdata.frames, frame)
}

Expand All @@ -147,74 +134,10 @@ func (c *channel) NextTxData() txData {
func (c *channel) HasTxData() bool {
if c.IsFull() || // If the channel is full, we should start to submit it
!c.cfg.UseBlobs { // If using calldata, we only send one frame per tx
return c.channelBuilder.HasPendingFrame()
return c.HasPendingFrame()
}
// Collect enough frames if channel is not full yet
return c.channelBuilder.PendingFrames() >= int(c.cfg.MaxFramesPerTx())
}

func (c *channel) IsFull() bool {
return c.channelBuilder.IsFull()
}

func (c *channel) FullErr() error {
return c.channelBuilder.FullErr()
}

func (c *channel) CheckTimeout(l1BlockNum uint64) {
c.channelBuilder.CheckTimeout(l1BlockNum)
}

func (c *channel) AddBlock(block SizedBlock) (*derive.L1BlockInfo, error) {
return c.channelBuilder.AddBlock(block)
}

func (c *channel) InputBytes() int {
return c.channelBuilder.InputBytes()
}

func (c *channel) ReadyBytes() int {
return c.channelBuilder.ReadyBytes()
}

func (c *channel) OutputBytes() int {
return c.channelBuilder.OutputBytes()
}

func (c *channel) TotalFrames() int {
return c.channelBuilder.TotalFrames()
}

func (c *channel) PendingFrames() int {
return c.channelBuilder.PendingFrames()
}

func (c *channel) OutputFrames() error {
return c.channelBuilder.OutputFrames()
}

// LatestL1Origin returns the latest L1 block origin from all the L2 blocks that have been added to the channel
func (c *channel) LatestL1Origin() eth.BlockID {
return c.channelBuilder.LatestL1Origin()
}

// OldestL1Origin returns the oldest L1 block origin from all the L2 blocks that have been added to the channel
func (c *channel) OldestL1Origin() eth.BlockID {
return c.channelBuilder.OldestL1Origin()
}

// LatestL2 returns the latest L2 block from all the L2 blocks that have been added to the channel
func (c *channel) LatestL2() eth.BlockID {
return c.channelBuilder.LatestL2()
}

// OldestL2 returns the oldest L2 block from all the L2 blocks that have been added to the channel
func (c *channel) OldestL2() eth.BlockID {
return c.channelBuilder.OldestL2()
}

func (c *channel) Close() {
c.channelBuilder.Close()
return c.PendingFrames() >= int(c.cfg.MaxFramesPerTx())
}

func (c *channel) MaxInclusionBlock() uint64 {
Expand Down
13 changes: 0 additions & 13 deletions op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,6 @@ func (c *ChannelBuilder) OutputBytes() int {
return c.outputBytes
}

// Blocks returns a backup list of all blocks that were added to the channel. It
// can be used in case the channel needs to be rebuilt.
func (c *ChannelBuilder) Blocks() []SizedBlock {
return c.blocks
}

// LatestL1Origin returns the latest L1 block origin from all the L2 blocks that have been added to the channel
func (c *ChannelBuilder) LatestL1Origin() eth.BlockID {
return c.latestL1Origin
Expand Down Expand Up @@ -213,13 +207,6 @@ func (c *ChannelBuilder) AddBlock(block SizedBlock) (*derive.L1BlockInfo, error)
return l1info, nil
}

// Timeout management

// Timeout returns the block number of the channel timeout. If no timeout is set it returns 0
func (c *ChannelBuilder) Timeout() uint64 {
return c.timeout
}

// FramePublished should be called whenever a frame of this channel got
// published with the L1-block number of the block that the frame got included
// in.
Expand Down
12 changes: 6 additions & 6 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,12 @@ func (s *channelManager) rewindToBlock(block eth.BlockID) {
// and removes the channel from the channelQueue, along with
// any channels which are newer than the provided channel.
func (s *channelManager) handleChannelInvalidated(c *channel) {
if len(c.channelBuilder.blocks) > 0 {
if len(c.ChannelBuilder.blocks) > 0 {
// This is usually true, but there is an edge case
// where a channel timed out before any blocks got added.
// In that case we end up with an empty frame (header only),
// and there are no blocks to requeue.
blockID := eth.ToBlockID(c.channelBuilder.blocks[0])
blockID := eth.ToBlockID(c.ChannelBuilder.blocks[0])
s.rewindToBlock(blockID)
} else {
s.log.Debug("channelManager.handleChannelInvalidated: channel had no blocks")
Expand All @@ -176,7 +176,7 @@ func (s *channelManager) handleChannelInvalidated(c *channel) {
for i := invalidatedChannelIdx; i < len(s.channelQueue); i++ {
s.log.Warn("Dropped channel",
"id", s.channelQueue[i].ID(),
"none_submitted", s.channelQueue[i].NoneSubmitted(),
"none_submitted", s.channelQueue[i].noneSubmitted(),
"fully_submitted", s.channelQueue[i].isFullySubmitted(),
"timed_out", s.channelQueue[i].isTimedOut(),
"full_reason", s.channelQueue[i].FullErr(),
Expand Down Expand Up @@ -232,7 +232,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID, isPectra, isThrottling, forc
}
// If the channel has already started being submitted,
// return now and ensure no requeuing happens
if !channel.NoneSubmitted() {
if !channel.noneSubmitted() {
return s.nextTxData(channel)
}

Expand Down Expand Up @@ -472,7 +472,7 @@ func (s *channelManager) outputFrames() error {
"compr_ratio", comprRatio,
)

s.currentChannel.channelBuilder.co.DiscardCompressor() // Free up memory by discarding the compressor
s.currentChannel.ChannelBuilder.co.DiscardCompressor() // Free up memory by discarding the compressor

return nil
}
Expand Down Expand Up @@ -586,7 +586,7 @@ func (s *channelManager) unsafeBytesInOpenChannels() int64 {
var bytesInOpenChannels int64
for _, channel := range s.channelQueue {
if channel.TotalFrames() == 0 {
for _, block := range channel.channelBuilder.blocks {
for _, block := range channel.ChannelBuilder.blocks {
bytesInOpenChannels += int64(block.EstimatedDABytes())
}
}
Expand Down
6 changes: 3 additions & 3 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
// Process the blocks
// We should have a pending channel with 1 frame
require.NoError(m.processBlocks())
require.NoError(m.currentChannel.channelBuilder.co.Flush())
require.NoError(m.currentChannel.ChannelBuilder.co.Flush())
require.NoError(m.outputFrames())
_, err := m.nextTxData(m.currentChannel)
require.NoError(err)
Expand Down Expand Up @@ -270,7 +270,7 @@ func TestChannelManager_ChannelCreation(t *testing.T) {
require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))

require.NotNil(t, m.currentChannel)
require.Equal(t, test.expectedChannelTimeout, m.currentChannel.Timeout())
require.Equal(t, test.expectedChannelTimeout, m.currentChannel.timeout)
})
}
}
Expand Down Expand Up @@ -693,7 +693,7 @@ func TestChannelManager_ChannelOutFactory(t *testing.T) {
})
require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))

require.IsType(t, &ChannelOutWrapper{}, m.currentChannel.channelBuilder.co)
require.IsType(t, &ChannelOutWrapper{}, m.currentChannel.ChannelBuilder.co)
}

// TestChannelManager_TxData seeds the channel manager with blocks and triggers the
Expand Down
14 changes: 7 additions & 7 deletions op-batcher/batcher/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestChannelManager_NextTxData(t *testing.T) {
frameNumber: uint16(0),
},
}
channel.channelBuilder.frames = append(channel.channelBuilder.frames, frame)
channel.ChannelBuilder.frames = append(channel.ChannelBuilder.frames, frame)
require.Equal(t, 1, channel.PendingFrames())

// Now the nextTxData function should return the frame
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {

mockframes := makeMockFrameDatas(chID, n+1)
// put multiple frames into channel, but less than target
ch.channelBuilder.frames = mockframes[:n-1]
ch.ChannelBuilder.frames = mockframes[:n-1]

requireTxData := func(i int) {
require.True(ch.HasTxData(), "expected tx data %d", i)
Expand All @@ -160,7 +160,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
require.False(ch.HasTxData())

// put in last two
ch.channelBuilder.frames = append(ch.channelBuilder.frames, mockframes[n-1:n+1]...)
ch.ChannelBuilder.frames = append(ch.ChannelBuilder.frames, mockframes[n-1:n+1]...)
for i := n - 1; i < n+1; i++ {
requireTxData(i)
}
Expand All @@ -183,11 +183,11 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) {

mockframes := makeMockFrameDatas(chID, n+1)
// put multiple frames into channel, but less than target
ch.channelBuilder.frames = append(ch.channelBuilder.frames, mockframes[:n-1]...)
ch.ChannelBuilder.frames = append(ch.ChannelBuilder.frames, mockframes[:n-1]...)
require.False(ch.HasTxData())

// put in last two
ch.channelBuilder.frames = append(ch.channelBuilder.frames, mockframes[n-1:n+1]...)
ch.ChannelBuilder.frames = append(ch.ChannelBuilder.frames, mockframes[n-1:n+1]...)
require.True(ch.HasTxData())
txdata := ch.NextTxData()
require.Len(txdata.frames, n)
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestChannelTxConfirmed(t *testing.T) {
frameNumber: uint16(0),
},
}
m.currentChannel.channelBuilder.frames = append(m.currentChannel.channelBuilder.frames, frame)
m.currentChannel.ChannelBuilder.frames = append(m.currentChannel.ChannelBuilder.frames, frame)

require.Equal(t, 1, m.currentChannel.PendingFrames())
returnedTxData, err := m.nextTxData(m.currentChannel)
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestChannelTxFailed(t *testing.T) {
frameNumber: uint16(0),
},
}
m.currentChannel.channelBuilder.frames = append(m.currentChannel.channelBuilder.frames, frame)
m.currentChannel.ChannelBuilder.frames = append(m.currentChannel.ChannelBuilder.frames, frame)
require.Equal(t, 1, m.currentChannel.PendingFrames())
returnedTxData, err := m.nextTxData(m.currentChannel)
expectedTxData := singleFrameTxData(frame)
Expand Down
Loading