Skip to content
78 changes: 47 additions & 31 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type BatchSubmitter struct {
running bool

txpoolMutex sync.Mutex // guards txpoolState and txpoolBlockedBlob
txpoolState int
txpoolState TxPoolState
txpoolBlockedBlob bool

// lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head.
Expand Down Expand Up @@ -160,8 +160,20 @@ func (l *BatchSubmitter) StartBatchSubmitting() error {
}
}

l.wg.Add(1)
go l.loop()
receiptsCh := make(chan txmgr.TxReceipt[txRef])
receiptsLoopCtx, cancelReceiptsLoopCtx := context.WithCancel(context.Background())
throttlingLoopCtx, cancelThrottlingLoopCtx := context.WithCancel(context.Background())

// DA throttling loop should always be started except for testing (indicated by ThrottleInterval == 0)
if l.Config.ThrottleInterval > 0 {
l.wg.Add(1)
go l.throttlingLoop(throttlingLoopCtx)
} else {
l.Log.Warn("Throttling loop is DISABLED due to 0 throttle-interval. This should not be disabled in prod.")
}
l.wg.Add(2)
go l.processReceiptsLoop(receiptsLoopCtx, receiptsCh) // receives from receiptsCh
go l.mainLoop(l.shutdownCtx, receiptsCh, cancelReceiptsLoopCtx, cancelThrottlingLoopCtx) // sends on receiptsCh

l.Log.Info("Batch Submitter started")
return nil
Expand Down Expand Up @@ -390,6 +402,8 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(syncStatus eth.SyncStatus)
// Submitted batch, but it is not valid
// Missed L2 block somehow.

type TxPoolState int

const (
// Txpool states. Possible state transitions:
// TxpoolGood -> TxpoolBlocked:
Expand All @@ -399,13 +413,29 @@ const (
// send a cancellation transaction.
// TxpoolCancelPending -> TxpoolGood:
// happens once the cancel transaction completes, whether successfully or in error.
TxpoolGood int = iota
TxpoolGood TxPoolState = iota
TxpoolBlocked
TxpoolCancelPending
)

func (l *BatchSubmitter) loop() {
// setTxPoolState locks the mutex, sets the parameters to the supplied ones, and release the mutex.
func (l *BatchSubmitter) setTxPoolState(txPoolState TxPoolState, txPoolBlockedBlob bool) {
l.txpoolMutex.Lock()
l.txpoolState = txPoolState
l.txpoolBlockedBlob = txPoolBlockedBlob
l.txpoolMutex.Unlock()
}

// mainLoop periodically:
// - polls the sequencer,
// - prunes the channel manager state (i.e. safe blocks)
// - loads unsafe blocks from the sequencer
// - drives the creation of channels and frames
// - sends transactions to the DA layer
func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxReceipt[txRef], receiptsLoopCancel, throttlingLoopCancel context.CancelFunc) {
defer l.wg.Done()
defer receiptsLoopCancel()
defer throttlingLoopCancel()

queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
daGroup := &errgroup.Group{}
Expand All @@ -419,22 +449,8 @@ func (l *BatchSubmitter) loop() {
l.txpoolState = TxpoolGood
l.txpoolMutex.Unlock()

// start the receipt/result processing loop
receiptsLoopDone := make(chan struct{})
defer close(receiptsLoopDone) // shut down receipt loop
l.l2BlockAdded = make(chan struct{})
defer close(l.l2BlockAdded)
receiptsCh := make(chan txmgr.TxReceipt[txRef])
go l.processReceiptsLoop(receiptsCh, receiptsLoopDone)

// DA throttling loop should always be started except for testing (indicated by ThrottleInterval == 0)
if l.Config.ThrottleInterval > 0 {
throttlingLoopDone := make(chan struct{})
defer close(throttlingLoopDone)
go l.throttlingLoop(throttlingLoopDone)
} else {
l.Log.Warn("Throttling loop is DISABLED due to 0 throttle-interval. This should not be disabled in prod.")
}

ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()
Expand Down Expand Up @@ -467,34 +483,33 @@ func (l *BatchSubmitter) loop() {
continue
}
l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval)
case <-l.shutdownCtx.Done():
case <-ctx.Done():
l.Log.Warn("main loop returning")
return
}
}
}

func (l *BatchSubmitter) processReceiptsLoop(receiptsCh chan txmgr.TxReceipt[txRef], receiptsLoopDone chan struct{}) {
// processReceiptsLoop handles transaction receipts from the DA layer
func (l *BatchSubmitter) processReceiptsLoop(ctx context.Context, receiptsCh chan txmgr.TxReceipt[txRef]) {
defer l.wg.Done()
l.Log.Info("Starting receipts processing loop")
for {
select {
case r := <-receiptsCh:
l.txpoolMutex.Lock()
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && l.txpoolState == TxpoolGood {
l.txpoolState = TxpoolBlocked
l.txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool", "is_blob", r.ID.isBlob)
l.setTxPoolState(TxpoolBlocked, r.ID.isBlob)
l.Log.Warn("incompatible tx in txpool", "id", r.ID, "is_blob", r.ID.isBlob)
} else if r.ID.isCancel && l.txpoolState == TxpoolCancelPending {
// Set state to TxpoolGood even if the cancellation transaction ended in error
// since the stuck transaction could have cleared while we were waiting.
l.txpoolState = TxpoolGood
l.setTxPoolState(TxpoolGood, l.txpoolBlockedBlob)
l.Log.Info("txpool may no longer be blocked", "err", r.Err)
}
l.txpoolMutex.Unlock()
l.Log.Info("Handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-receiptsLoopDone:
l.Log.Info("Receipts processing loop done")
case <-ctx.Done():
l.Log.Info("Receipt processing loop done")
return
}
}
Expand All @@ -504,7 +519,8 @@ func (l *BatchSubmitter) processReceiptsLoop(receiptsCh chan txmgr.TxReceipt[txR
// throttling of incoming data prevent the backlog from growing too large. By looping & calling the miner API setter
// continuously, we ensure the engine currently in use is always going to be reset to the proper throttling settings
// even in the event of sequencer failover.
func (l *BatchSubmitter) throttlingLoop(throttlingLoopDone chan struct{}) {
func (l *BatchSubmitter) throttlingLoop(ctx context.Context) {
defer l.wg.Done()
l.Log.Info("Starting DA throttling loop")
ticker := time.NewTicker(l.Config.ThrottleInterval)
defer ticker.Stop()
Expand Down Expand Up @@ -556,7 +572,7 @@ func (l *BatchSubmitter) throttlingLoop(throttlingLoopDone chan struct{}) {
updateParams()
case <-ticker.C:
updateParams()
case <-throttlingLoopDone:
case <-ctx.Done():
l.Log.Info("DA throttling loop done")
return
}
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ When an L2 unsafe reorg is detected, the batch submitter will reset its state, a
When a Tx fails, an asynchronous receipts handler is triggered. The channel from whence the Tx's frames came has its `frameCursor` rewound, so that all the frames can be resubmitted in order.

### Channel Times Out
When at Tx is confirmed, an asynchronous receipts handler is triggered. We only update the batcher's state if the channel timed out on chain. In that case, the `blockCursor` is rewound to the first block added to that channel, and the channel queue is cleared out. This allows the batcher to start fresh building a new channel starting from the same block -- it does not need to refetch blocks from the sequencer.
When a Tx is confirmed, an asynchronous receipts handler is triggered. We only update the batcher's state if the channel timed out on chain. In that case, the `blockCursor` is rewound to the first block added to that channel, and the channel queue is cleared out. This allows the batcher to start fresh building a new channel starting from the same block -- it does not need to refetch blocks from the sequencer.

## Design Principles and Optimization Targets
At the current time, the batcher should be optimized for correctness, simplicity and robustness. It is considered preferable to prioritize these properties, even at the expense of other potentially desirable properties such as frugality. For example, it is preferable to have the batcher resubmit some data from time to time ("wasting" money on data availability costs) instead of avoiding that by e.g. adding some persistent state to the batcher.
Expand Down