Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-batcher/batcher: check txpool state in state publishing loop #11633

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 45 additions & 21 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math/big"
_ "net/http/pprof"
"sync"
"sync/atomic"
"time"

altda "github.com/ethereum-optimism/optimism/op-alt-da"
Expand Down Expand Up @@ -83,6 +82,10 @@ type BatchSubmitter struct {
mutex sync.Mutex
running bool

txpoolMutex sync.Mutex // guards txpoolState and txpoolBlockedBlob
txpoolState int
txpoolBlockedBlob bool

// lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head.
lastStoredBlock eth.BlockID
lastL1Tip eth.L1BlockRef
Expand Down Expand Up @@ -289,7 +292,7 @@ const (
// send a cancellation transaction.
// TxpoolCancelPending -> TxpoolGood:
// happens once the cancel transaction completes, whether successfully or in error.
TxpoolGood int32 = iota
TxpoolGood int = iota
TxpoolBlocked
TxpoolCancelPending
)
Expand All @@ -304,23 +307,25 @@ func (l *BatchSubmitter) loop() {
receiptLoopDone := make(chan struct{})
defer close(receiptLoopDone) // shut down receipt loop

var (
txpoolState atomic.Int32
txpoolBlockedBlob bool
)
txpoolState.Store(TxpoolGood)
l.txpoolMutex.Lock()
l.txpoolState = TxpoolGood
l.txpoolMutex.Unlock()
go func() {
for {
select {
case r := <-receiptsCh:
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && txpoolState.CompareAndSwap(TxpoolGood, TxpoolBlocked) {
txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool")
} else if r.ID.isCancel && txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) {
l.txpoolMutex.Lock()
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && l.txpoolState == TxpoolGood {
l.txpoolState = TxpoolBlocked
l.txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool", "is_blob", r.ID.isBlob)
} else if r.ID.isCancel && l.txpoolState == TxpoolCancelPending {
// Set state to TxpoolGood even if the cancellation transaction ended in error
// since the stuck transaction could have cleared while we were waiting.
l.txpoolState = TxpoolGood
l.Log.Info("txpool may no longer be blocked", "err", r.Err)
}
l.txpoolMutex.Unlock()
l.Log.Info("Handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-receiptLoopDone:
Expand All @@ -345,13 +350,7 @@ func (l *BatchSubmitter) loop() {
for {
select {
case <-ticker.C:
if txpoolState.CompareAndSwap(TxpoolBlocked, TxpoolCancelPending) {
// txpoolState is set to Blocked only if Send() is returning
// ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil,
// allowing us to send a cancellation transaction.
l.cancelBlockingTx(queue, receiptsCh, txpoolBlockedBlob)
}
if txpoolState.Load() != TxpoolGood {
if !l.checkTxpool(queue, receiptsCh) {
continue
}
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
Expand Down Expand Up @@ -433,7 +432,12 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh
l.Log.Info("Txmgr is closed, aborting state publishing")
return
}
if !l.checkTxpool(queue, receiptsCh) {
l.Log.Info("txpool state is not good, aborting state publishing")
return
}
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)

if err != nil {
if err != io.EOF {
l.Log.Error("Error publishing tx to l1", "err", err)
Expand Down Expand Up @@ -545,10 +549,11 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh
panic(err) // this error should not happen
}
l.Log.Warn("sending a cancellation transaction to unblock txpool", "blocked_blob", isBlockedBlob)
l.queueTx(txData{}, true, candidate, queue, receiptsCh)
l.sendTx(txData{}, true, candidate, queue, receiptsCh)
}

// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// This call will block if the txmgr queue is at the max-pending limit.
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
var err error
Expand Down Expand Up @@ -585,11 +590,13 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que
candidate = l.calldataTxCandidate(data)
}

l.queueTx(txdata, false, candidate, queue, receiptsCh)
l.sendTx(txdata, false, candidate, queue, receiptsCh)
return nil
}

func (l *BatchSubmitter) queueTx(txdata txData, isCancel bool, candidate *txmgr.TxCandidate, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
// sendTx uses the txmgr queue to send the given transaction candidate after setting its
// gaslimit. It will block if the txmgr queue has reached its MaxPendingTransactions limit.
func (l *BatchSubmitter) sendTx(txdata txData, isCancel bool, candidate *txmgr.TxCandidate, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
intrinsicGas, err := core.IntrinsicGas(candidate.TxData, nil, false, true, true, false)
if err != nil {
// we log instead of return an error here because txmgr can do its own gas estimation
Expand Down Expand Up @@ -665,6 +672,23 @@ func (l *BatchSubmitter) l1Tip(ctx context.Context) (eth.L1BlockRef, error) {
return eth.InfoToL1BlockRef(eth.HeaderBlockInfo(head)), nil
}

func (l *BatchSubmitter) checkTxpool(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) bool {
l.txpoolMutex.Lock()
if l.txpoolState == TxpoolBlocked {
// txpoolState is set to Blocked only if Send() is returning
// ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil,
// allowing us to send a cancellation transaction.
l.txpoolState = TxpoolCancelPending
isBlob := l.txpoolBlockedBlob
l.txpoolMutex.Unlock()
l.cancelBlockingTx(queue, receiptsCh, isBlob)
return false
}
r := l.txpoolState == TxpoolGood
l.txpoolMutex.Unlock()
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
return r
}

func logFields(xs ...any) (fs []any) {
for _, x := range xs {
switch v := x.(type) {
Expand Down