Skip to content

Commit

Permalink
- make state publishing loop abort if the txpool state is not good
Browse files Browse the repository at this point in the history
- protect txpool state vars with a mutex so they can be automically updated to avoid potential race
  condition
  • Loading branch information
roberto-bayardo committed Aug 28, 2024
1 parent 36f093a commit 6fa5b69
Showing 1 changed file with 39 additions and 18 deletions.
57 changes: 39 additions & 18 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math/big"
_ "net/http/pprof"
"sync"
"sync/atomic"
"time"

altda "github.com/ethereum-optimism/optimism/op-alt-da"
Expand Down Expand Up @@ -83,6 +82,10 @@ type BatchSubmitter struct {
mutex sync.Mutex
running bool

txpoolMutex sync.Mutex // guards txpoolState and txpoolBlockedBlob
txpoolState int
txpoolBlockedBlob bool

// lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head.
lastStoredBlock eth.BlockID
lastL1Tip eth.L1BlockRef
Expand Down Expand Up @@ -289,7 +292,7 @@ const (
// send a cancellation transaction.
// TxpoolCancelPending -> TxpoolGood:
// happens once the cancel transaction completes, whether successfully or in error.
TxpoolGood int32 = iota
TxpoolGood int = iota
TxpoolBlocked
TxpoolCancelPending
)
Expand All @@ -304,23 +307,25 @@ func (l *BatchSubmitter) loop() {
receiptLoopDone := make(chan struct{})
defer close(receiptLoopDone) // shut down receipt loop

var (
txpoolState atomic.Int32
txpoolBlockedBlob bool
)
txpoolState.Store(TxpoolGood)
l.txpoolMutex.Lock()
l.txpoolState = TxpoolGood
l.txpoolMutex.Unlock()
go func() {
for {
select {
case r := <-receiptsCh:
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && txpoolState.CompareAndSwap(TxpoolGood, TxpoolBlocked) {
txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool")
} else if r.ID.isCancel && txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) {
l.txpoolMutex.Lock()
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && l.txpoolState == TxpoolGood {
l.txpoolState = TxpoolBlocked
l.txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool", "is_blob", r.ID.isBlob)
} else if r.ID.isCancel && l.txpoolState == TxpoolCancelPending {
// Set state to TxpoolGood even if the cancellation transaction ended in error
// since the stuck transaction could have cleared while we were waiting.
l.txpoolState = TxpoolGood
l.Log.Info("txpool may no longer be blocked", "err", r.Err)
}
l.txpoolMutex.Unlock()
l.Log.Info("Handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-receiptLoopDone:
Expand All @@ -345,13 +350,7 @@ func (l *BatchSubmitter) loop() {
for {
select {
case <-ticker.C:
if txpoolState.CompareAndSwap(TxpoolBlocked, TxpoolCancelPending) {
// txpoolState is set to Blocked only if Send() is returning
// ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil,
// allowing us to send a cancellation transaction.
l.cancelBlockingTx(queue, receiptsCh, txpoolBlockedBlob)
}
if txpoolState.Load() != TxpoolGood {
if !l.checkTxpool(queue, receiptsCh) {
continue
}
if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
Expand Down Expand Up @@ -433,7 +432,12 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh
l.Log.Info("Txmgr is closed, aborting state publishing")
return
}
if !l.checkTxpool(queue, receiptsCh) {
l.Log.Info("txpool state is not good, aborting state publishing")
return
}
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)

if err != nil {
if err != io.EOF {
l.Log.Error("Error publishing tx to l1", "err", err)
Expand Down Expand Up @@ -665,6 +669,23 @@ func (l *BatchSubmitter) l1Tip(ctx context.Context) (eth.L1BlockRef, error) {
return eth.InfoToL1BlockRef(eth.HeaderBlockInfo(head)), nil
}

func (l *BatchSubmitter) checkTxpool(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) bool {
l.txpoolMutex.Lock()
if l.txpoolState == TxpoolBlocked {
// txpoolState is set to Blocked only if Send() is returning
// ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil,
// allowing us to send a cancellation transaction.
l.txpoolState = TxpoolCancelPending
isBlob := l.txpoolBlockedBlob
l.txpoolMutex.Unlock()
l.cancelBlockingTx(queue, receiptsCh, isBlob)
return false
}
r := l.txpoolState == TxpoolGood
l.txpoolMutex.Unlock()
return r
}

func logFields(xs ...any) (fs []any) {
for _, x := range xs {
switch v := x.(type) {
Expand Down

0 comments on commit 6fa5b69

Please sign in to comment.