Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-service/txmgr: multiple fixes / improvements #11614

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion op-service/txmgr/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var (
MinBaseFeeGwei: 1.0,
ResubmissionTimeout: 48 * time.Second,
NetworkTimeout: 10 * time.Second,
TxSendTimeout: 0 * time.Second,
TxSendTimeout: 10 * time.Minute,
TxNotInMempoolTimeout: 2 * time.Minute,
ReceiptQueryInterval: 12 * time.Second,
}
Expand Down
7 changes: 7 additions & 0 deletions op-service/txmgr/send_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type SendState struct {
// Whether any attempt to send the tx resulted in ErrAlreadyReserved
alreadyReserved bool

// Whether we should bump fees before trying to publish the tx again
bumpFees bool

// Miscellaneous tracking
bumpCount int // number of times we have bumped the gas price
}
Expand Down Expand Up @@ -120,6 +123,10 @@ func (s *SendState) CriticalError() error {
case s.nonceTooLowCount >= s.safeAbortNonceTooLowCount:
// we have exceeded the nonce too low count
return core.ErrNonceTooLow
case s.successFullPublishCount == 0 && s.nonceTooLowCount > 0:
// A nonce too low error before successfully publishing any transaction means the tx will
// need a different nonce, which we can force by returning error.
return core.ErrNonceTooLow
case s.successFullPublishCount == 0 && s.now().After(s.txInMempoolDeadline):
// unable to get the tx into the mempool in the allotted time
return ErrMempoolDeadlineExpired
Expand Down
12 changes: 12 additions & 0 deletions op-service/txmgr/send_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,21 @@ func TestSendStateNoAbortAfterProcessOtherError(t *testing.T) {
require.Nil(t, sendState.CriticalError())
}

// TestSendStateAbortSafelyAfterNonceTooLowButNoTxMined asserts that we will abort after the very
// first none-too-low error if a tx hasn't yet been published.
func TestSendStateAbortSafelyAfterNonceTooLowNoTxPublished(t *testing.T) {
sendState := newSendState()

sendState.ProcessSendError(core.ErrNonceTooLow)
require.ErrorIs(t, sendState.CriticalError(), core.ErrNonceTooLow)
}

// TestSendStateAbortSafelyAfterNonceTooLowButNoTxMined asserts that we will
// abort after the safe abort interval has elapsed if we haven't mined a tx.
func TestSendStateAbortSafelyAfterNonceTooLowButNoTxMined(t *testing.T) {
sendState := newSendState()

sendState.ProcessSendError(nil)
sendState.ProcessSendError(core.ErrNonceTooLow)
require.Nil(t, sendState.CriticalError())
sendState.ProcessSendError(core.ErrNonceTooLow)
Expand Down Expand Up @@ -90,6 +100,7 @@ func TestSendStateMiningTxCancelsAbort(t *testing.T) {
func TestSendStateReorgingTxResetsAbort(t *testing.T) {
sendState := newSendState()

sendState.ProcessSendError(nil)
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.TxMined(testHash)
Expand Down Expand Up @@ -120,6 +131,7 @@ func TestSendStateNoAbortEvenIfNonceTooLowAfterTxMined(t *testing.T) {
func TestSendStateSafeAbortIfNonceTooLowPersistsAfterUnmine(t *testing.T) {
sendState := newSendState()

sendState.ProcessSendError(nil)
sendState.TxMined(testHash)
sendState.TxNotMined(testHash)
sendState.ProcessSendError(core.ErrNonceTooLow)
Expand Down
111 changes: 55 additions & 56 deletions op-service/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ var (
type TxManager interface {
// Send is used to create & send a transaction. It will handle increasing
// the gas price & ensuring that the transaction remains in the transaction pool.
// It can be stopped by cancelling the provided context; however, the transaction
// may be included on L1 even if the context is cancelled.
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
// It can be stopped by canceling the provided context; however, the transaction
// may be included on L1 even if the context is canceled.
//
// NOTE: Send can be called concurrently, the nonce will be managed internally.
//
Expand Down Expand Up @@ -470,44 +470,33 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t

sendState := NewSendState(m.cfg.SafeAbortNonceTooLowCount, m.cfg.TxNotInMempoolTimeout)
receiptChan := make(chan *types.Receipt, 1)
publishAndWait := func(tx *types.Transaction, bumpFees bool) *types.Transaction {
wg.Add(1)
tx, published := m.publishTx(ctx, tx, sendState, bumpFees)
if published {
go func() {
defer wg.Done()
m.waitForTx(ctx, tx, sendState, receiptChan)
}()
} else {
wg.Done()
}
return tx
}

// Immediately publish a transaction before starting the resubmission loop
tx = publishAndWait(tx, false)

resubmissionTimeout := m.GetBumpFeeRetryTime()
ticker := time.NewTicker(resubmissionTimeout)
defer ticker.Stop()

for {
if !sendState.IsWaitingForConfirmation() {
if m.closed.Load() {
// the tx manager closed and no txs are waiting to be confirmed, give up
m.txLogger(tx, false).Warn("TxManager closed, aborting transaction submission")
return nil, ErrClosed
}
var published bool
if tx, published = m.publishTx(ctx, tx, sendState); published {
wg.Add(1)
go func() {
defer wg.Done()
m.waitForTx(ctx, tx, sendState, receiptChan)
}()
}
}
if err := sendState.CriticalError(); err != nil {
m.txLogger(tx, false).Warn("Aborting transaction submission", "err", err)
return nil, fmt.Errorf("aborted tx send due to critical error: %w", err)
}

select {
case <-ticker.C:
// Don't resubmit a transaction if it has been mined, but we are waiting for the conf depth.
if sendState.IsWaitingForConfirmation() {
continue
}
// if the tx manager closed while we were waiting for the tx, give up
if m.closed.Load() {
m.txLogger(tx, false).Warn("TxManager closed, aborting transaction submission")
return nil, ErrClosed
}
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
tx = publishAndWait(tx, true)

case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -523,34 +512,34 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t
// publishTx publishes the transaction to the transaction pool. If it receives any underpriced errors
// it will bump the fees and retry.
// Returns the latest fee bumped tx, and a boolean indicating whether the tx was sent or not
func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction, sendState *SendState, bumpFeesImmediately bool) (*types.Transaction, bool) {
func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction, sendState *SendState) (*types.Transaction, bool) {
l := m.txLogger(tx, true)

l.Info("Publishing transaction", "tx", tx.Hash())

for {
// if the tx manager closed, give up without bumping fees or retrying
if m.closed.Load() {
l.Warn("TxManager closed, aborting transaction submission")
return tx, false
}
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
if bumpFeesImmediately {
newTx, err := m.increaseGasPrice(ctx, tx)
if err != nil {
l.Error("unable to increase gas", "err", err)
if sendState.bumpFees {
if newTx, err := m.increaseGasPrice(ctx, tx); err != nil {
l.Warn("unable to increase gas, will try to re-publish the tx", "err", err)
m.metr.TxPublished("bump_failed")
return tx, false
// Even if we are unable to bump fees, we must still resubmit the transaction
// because a previously successfully published tx can get dropped from the
// mempool. If we don't try to resubmit it to either force a failure (eg. from
// nonce to low errors) or get it back into the mempool, we can end up waiting on
// it to get mined indefinitely.
} else {
if sendState.IsWaitingForConfirmation() {
// A previously published tx might get mined during the increaseGasPrice call
// above, in which case we can abort trying to replace it with a higher fee tx.
return tx, false
}
sendState.bumpCount++
tx = newTx
l = m.txLogger(tx, true)
// Disable bumping fees again until the new transaction is successfully published,
// or we immediately get another underpriced error.
sendState.bumpFees = false
}
tx = newTx
sendState.bumpCount++
l = m.txLogger(tx, true)
}
bumpFeesImmediately = true // bump fees next loop

if sendState.IsWaitingForConfirmation() {
// there is a chance the previous tx goes into "waiting for confirmation" state
// during the increaseGasPrice call; continue waiting rather than resubmit the tx
return tx, false
}

cCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
Expand All @@ -561,6 +550,9 @@ func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction,
if err == nil {
m.metr.TxPublished("")
l.Info("Transaction successfully published", "tx", tx.Hash())
// Tx made it into the mempool, so we'll need a fee bump if we end up trying to replace
// it with another publish attempt.
sendState.bumpFees = true
return tx, true
}

Expand All @@ -575,26 +567,33 @@ func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction,
m.metr.TxPublished("nonce_too_low")
case errStringMatch(err, context.Canceled):
m.metr.RPCError()
l.Warn("transaction send cancelled", "err", err)
m.metr.TxPublished("context_cancelled")
l.Warn("transaction send canceled", "err", err)
m.metr.TxPublished("context_canceled")
case errStringMatch(err, txpool.ErrAlreadyKnown):
l.Warn("resubmitted already known transaction", "err", err)
m.metr.TxPublished("tx_already_known")
case errStringMatch(err, txpool.ErrReplaceUnderpriced):
l.Warn("transaction replacement is underpriced", "err", err)
m.metr.TxPublished("tx_replacement_underpriced")
continue // retry with fee bump
// retry tx with fee bump, unless we already just tried to bump them
if !sendState.bumpFees {
sendState.bumpFees = true
continue
}
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
case errStringMatch(err, txpool.ErrUnderpriced):
l.Warn("transaction is underpriced", "err", err)
m.metr.TxPublished("tx_underpriced")
continue // retry with fee bump
// retry tx with fee bump, unless we already just tried to bump them
if !sendState.bumpFees {
sendState.bumpFees = true
continue
}
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
default:
m.metr.RPCError()
l.Error("unable to publish transaction", "err", err)
m.metr.TxPublished("unknown_error")
}

// on non-underpriced error return immediately; will retry on next resubmission timeout
return tx, false
}
}
Expand All @@ -617,7 +616,7 @@ func (m *SimpleTxManager) waitForTx(ctx context.Context, tx *types.Transaction,
}
}

// waitMined waits for the transaction to be mined or for the context to be cancelled.
// waitMined waits for the transaction to be mined or for the context to be canceled.
func (m *SimpleTxManager) waitMined(ctx context.Context, tx *types.Transaction, sendState *SendState) (*types.Receipt, error) {
txHash := tx.Hash()
queryTicker := time.NewTicker(m.cfg.ReceiptQueryInterval)
Expand Down
Loading