diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 2c9db3821e3a7..7b1d6139e2655 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -111,7 +111,7 @@ type BatchSubmitter struct { running bool txpoolMutex sync.Mutex // guards txpoolState and txpoolBlockedBlob - txpoolState int + txpoolState TxPoolState txpoolBlockedBlob bool // lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head. @@ -160,8 +160,20 @@ func (l *BatchSubmitter) StartBatchSubmitting() error { } } - l.wg.Add(1) - go l.loop() + receiptsCh := make(chan txmgr.TxReceipt[txRef]) + receiptsLoopCtx, cancelReceiptsLoopCtx := context.WithCancel(context.Background()) + throttlingLoopCtx, cancelThrottlingLoopCtx := context.WithCancel(context.Background()) + + // DA throttling loop should always be started except for testing (indicated by ThrottleInterval == 0) + if l.Config.ThrottleInterval > 0 { + l.wg.Add(1) + go l.throttlingLoop(throttlingLoopCtx) + } else { + l.Log.Warn("Throttling loop is DISABLED due to 0 throttle-interval. This should not be disabled in prod.") + } + l.wg.Add(2) + go l.processReceiptsLoop(receiptsLoopCtx, receiptsCh) // receives from receiptsCh + go l.mainLoop(l.shutdownCtx, receiptsCh, cancelReceiptsLoopCtx, cancelThrottlingLoopCtx) // sends on receiptsCh l.Log.Info("Batch Submitter started") return nil @@ -390,6 +402,8 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(syncStatus eth.SyncStatus) // Submitted batch, but it is not valid // Missed L2 block somehow. +type TxPoolState int + const ( // Txpool states. Possible state transitions: // TxpoolGood -> TxpoolBlocked: @@ -399,13 +413,29 @@ const ( // send a cancellation transaction. // TxpoolCancelPending -> TxpoolGood: // happens once the cancel transaction completes, whether successfully or in error. - TxpoolGood int = iota + TxpoolGood TxPoolState = iota TxpoolBlocked TxpoolCancelPending ) -func (l *BatchSubmitter) loop() { +// setTxPoolState locks the mutex, sets the parameters to the supplied ones, and release the mutex. +func (l *BatchSubmitter) setTxPoolState(txPoolState TxPoolState, txPoolBlockedBlob bool) { + l.txpoolMutex.Lock() + l.txpoolState = txPoolState + l.txpoolBlockedBlob = txPoolBlockedBlob + l.txpoolMutex.Unlock() +} + +// mainLoop periodically: +// - polls the sequencer, +// - prunes the channel manager state (i.e. safe blocks) +// - loads unsafe blocks from the sequencer +// - drives the creation of channels and frames +// - sends transactions to the DA layer +func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxReceipt[txRef], receiptsLoopCancel, throttlingLoopCancel context.CancelFunc) { defer l.wg.Done() + defer receiptsLoopCancel() + defer throttlingLoopCancel() queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions) daGroup := &errgroup.Group{} @@ -419,22 +449,8 @@ func (l *BatchSubmitter) loop() { l.txpoolState = TxpoolGood l.txpoolMutex.Unlock() - // start the receipt/result processing loop - receiptsLoopDone := make(chan struct{}) - defer close(receiptsLoopDone) // shut down receipt loop l.l2BlockAdded = make(chan struct{}) defer close(l.l2BlockAdded) - receiptsCh := make(chan txmgr.TxReceipt[txRef]) - go l.processReceiptsLoop(receiptsCh, receiptsLoopDone) - - // DA throttling loop should always be started except for testing (indicated by ThrottleInterval == 0) - if l.Config.ThrottleInterval > 0 { - throttlingLoopDone := make(chan struct{}) - defer close(throttlingLoopDone) - go l.throttlingLoop(throttlingLoopDone) - } else { - l.Log.Warn("Throttling loop is DISABLED due to 0 throttle-interval. This should not be disabled in prod.") - } ticker := time.NewTicker(l.Config.PollInterval) defer ticker.Stop() @@ -467,34 +483,33 @@ func (l *BatchSubmitter) loop() { continue } l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval) - case <-l.shutdownCtx.Done(): + case <-ctx.Done(): l.Log.Warn("main loop returning") return } } } -func (l *BatchSubmitter) processReceiptsLoop(receiptsCh chan txmgr.TxReceipt[txRef], receiptsLoopDone chan struct{}) { +// processReceiptsLoop handles transaction receipts from the DA layer +func (l *BatchSubmitter) processReceiptsLoop(ctx context.Context, receiptsCh chan txmgr.TxReceipt[txRef]) { + defer l.wg.Done() l.Log.Info("Starting receipts processing loop") for { select { case r := <-receiptsCh: - l.txpoolMutex.Lock() if errors.Is(r.Err, txpool.ErrAlreadyReserved) && l.txpoolState == TxpoolGood { - l.txpoolState = TxpoolBlocked - l.txpoolBlockedBlob = r.ID.isBlob - l.Log.Info("incompatible tx in txpool", "is_blob", r.ID.isBlob) + l.setTxPoolState(TxpoolBlocked, r.ID.isBlob) + l.Log.Warn("incompatible tx in txpool", "id", r.ID, "is_blob", r.ID.isBlob) } else if r.ID.isCancel && l.txpoolState == TxpoolCancelPending { // Set state to TxpoolGood even if the cancellation transaction ended in error // since the stuck transaction could have cleared while we were waiting. - l.txpoolState = TxpoolGood + l.setTxPoolState(TxpoolGood, l.txpoolBlockedBlob) l.Log.Info("txpool may no longer be blocked", "err", r.Err) } - l.txpoolMutex.Unlock() l.Log.Info("Handling receipt", "id", r.ID) l.handleReceipt(r) - case <-receiptsLoopDone: - l.Log.Info("Receipts processing loop done") + case <-ctx.Done(): + l.Log.Info("Receipt processing loop done") return } } @@ -504,7 +519,8 @@ func (l *BatchSubmitter) processReceiptsLoop(receiptsCh chan txmgr.TxReceipt[txR // throttling of incoming data prevent the backlog from growing too large. By looping & calling the miner API setter // continuously, we ensure the engine currently in use is always going to be reset to the proper throttling settings // even in the event of sequencer failover. -func (l *BatchSubmitter) throttlingLoop(throttlingLoopDone chan struct{}) { +func (l *BatchSubmitter) throttlingLoop(ctx context.Context) { + defer l.wg.Done() l.Log.Info("Starting DA throttling loop") ticker := time.NewTicker(l.Config.ThrottleInterval) defer ticker.Stop() @@ -556,7 +572,7 @@ func (l *BatchSubmitter) throttlingLoop(throttlingLoopDone chan struct{}) { updateParams() case <-ticker.C: updateParams() - case <-throttlingLoopDone: + case <-ctx.Done(): l.Log.Info("DA throttling loop done") return } diff --git a/op-batcher/readme.md b/op-batcher/readme.md index 1d601fea94be4..e09eb3fce2a96 100644 --- a/op-batcher/readme.md +++ b/op-batcher/readme.md @@ -50,7 +50,7 @@ When an L2 unsafe reorg is detected, the batch submitter will reset its state, a When a Tx fails, an asynchronous receipts handler is triggered. The channel from whence the Tx's frames came has its `frameCursor` rewound, so that all the frames can be resubmitted in order. ### Channel Times Out -When at Tx is confirmed, an asynchronous receipts handler is triggered. We only update the batcher's state if the channel timed out on chain. In that case, the `blockCursor` is rewound to the first block added to that channel, and the channel queue is cleared out. This allows the batcher to start fresh building a new channel starting from the same block -- it does not need to refetch blocks from the sequencer. +When a Tx is confirmed, an asynchronous receipts handler is triggered. We only update the batcher's state if the channel timed out on chain. In that case, the `blockCursor` is rewound to the first block added to that channel, and the channel queue is cleared out. This allows the batcher to start fresh building a new channel starting from the same block -- it does not need to refetch blocks from the sequencer. ## Design Principles and Optimization Targets At the current time, the batcher should be optimized for correctness, simplicity and robustness. It is considered preferable to prioritize these properties, even at the expense of other potentially desirable properties such as frugality. For example, it is preferable to have the batcher resubmit some data from time to time ("wasting" money on data availability costs) instead of avoiding that by e.g. adding some persistent state to the batcher.