diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 39ebf2f25b240..594dcde3fc74d 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -8,7 +8,6 @@ import ( "math/big" _ "net/http/pprof" "sync" - "sync/atomic" "time" altda "github.com/ethereum-optimism/optimism/op-alt-da" @@ -83,6 +82,10 @@ type BatchSubmitter struct { mutex sync.Mutex running bool + txpoolMutex sync.Mutex // guards txpoolState and txpoolBlockedBlob + txpoolState int + txpoolBlockedBlob bool + // lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head. lastStoredBlock eth.BlockID lastL1Tip eth.L1BlockRef @@ -289,7 +292,7 @@ const ( // send a cancellation transaction. // TxpoolCancelPending -> TxpoolGood: // happens once the cancel transaction completes, whether successfully or in error. - TxpoolGood int32 = iota + TxpoolGood int = iota TxpoolBlocked TxpoolCancelPending ) @@ -304,23 +307,25 @@ func (l *BatchSubmitter) loop() { receiptLoopDone := make(chan struct{}) defer close(receiptLoopDone) // shut down receipt loop - var ( - txpoolState atomic.Int32 - txpoolBlockedBlob bool - ) - txpoolState.Store(TxpoolGood) + l.txpoolMutex.Lock() + l.txpoolState = TxpoolGood + l.txpoolMutex.Unlock() go func() { for { select { case r := <-receiptsCh: - if errors.Is(r.Err, txpool.ErrAlreadyReserved) && txpoolState.CompareAndSwap(TxpoolGood, TxpoolBlocked) { - txpoolBlockedBlob = r.ID.isBlob - l.Log.Info("incompatible tx in txpool") - } else if r.ID.isCancel && txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) { + l.txpoolMutex.Lock() + if errors.Is(r.Err, txpool.ErrAlreadyReserved) && l.txpoolState == TxpoolGood { + l.txpoolState = TxpoolBlocked + l.txpoolBlockedBlob = r.ID.isBlob + l.Log.Info("incompatible tx in txpool", "is_blob", r.ID.isBlob) + } else if r.ID.isCancel && l.txpoolState == TxpoolCancelPending { // Set state to TxpoolGood even if the cancellation transaction ended in error // since the stuck transaction could have cleared while we were waiting. + l.txpoolState = TxpoolGood l.Log.Info("txpool may no longer be blocked", "err", r.Err) } + l.txpoolMutex.Unlock() l.Log.Info("Handling receipt", "id", r.ID) l.handleReceipt(r) case <-receiptLoopDone: @@ -345,13 +350,7 @@ func (l *BatchSubmitter) loop() { for { select { case <-ticker.C: - if txpoolState.CompareAndSwap(TxpoolBlocked, TxpoolCancelPending) { - // txpoolState is set to Blocked only if Send() is returning - // ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil, - // allowing us to send a cancellation transaction. - l.cancelBlockingTx(queue, receiptsCh, txpoolBlockedBlob) - } - if txpoolState.Load() != TxpoolGood { + if !l.checkTxpool(queue, receiptsCh) { continue } if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) { @@ -433,7 +432,12 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh l.Log.Info("Txmgr is closed, aborting state publishing") return } + if !l.checkTxpool(queue, receiptsCh) { + l.Log.Info("txpool state is not good, aborting state publishing") + return + } err := l.publishTxToL1(l.killCtx, queue, receiptsCh) + if err != nil { if err != io.EOF { l.Log.Error("Error publishing tx to l1", "err", err) @@ -665,6 +669,23 @@ func (l *BatchSubmitter) l1Tip(ctx context.Context) (eth.L1BlockRef, error) { return eth.InfoToL1BlockRef(eth.HeaderBlockInfo(head)), nil } +func (l *BatchSubmitter) checkTxpool(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) bool { + l.txpoolMutex.Lock() + if l.txpoolState == TxpoolBlocked { + // txpoolState is set to Blocked only if Send() is returning + // ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil, + // allowing us to send a cancellation transaction. + l.txpoolState = TxpoolCancelPending + isBlob := l.txpoolBlockedBlob + l.txpoolMutex.Unlock() + l.cancelBlockingTx(queue, receiptsCh, isBlob) + return false + } + r := l.txpoolState == TxpoolGood + l.txpoolMutex.Unlock() + return r +} + func logFields(xs ...any) (fs []any) { for _, x := range xs { switch v := x.(type) {