Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 173 additions & 86 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ func (pool *LegacyPool) ClearAstriaOrdered() {
if pool.astria == nil {
return
}
pool.mu.Lock()
defer pool.mu.Unlock()

astriaExcludedFromBlockMeter.Mark(int64(len(pool.astria.excludedFromBlock)))
for _, tx := range pool.astria.excludedFromBlock {
Expand Down Expand Up @@ -1370,11 +1372,11 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
delete(events, addr)
}
}
// Reset needs promote for all addresses
promoteAddrs = make([]common.Address, 0, len(pool.queue))
for addr := range pool.queue {
promoteAddrs = append(promoteAddrs, addr)
}
/// bharath: don't promote any addresses since we are going to be clearing the mempool
//promoteAddrs = make([]common.Address, 0, len(pool.queue))
//for addr := range pool.queue {
// promoteAddrs = append(promoteAddrs, addr)
//}
}
// Check for pending transactions for every account that sent new ones
promoted := pool.promoteExecutables(promoteAddrs)
Expand All @@ -1383,7 +1385,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
// remove any transaction that has been included in the block or was invalidated
// because of another transaction (e.g. higher gas price).
if reset != nil {
pool.demoteUnexecutables()
pool.clearPendingAndQueued()
if reset.newHead != nil {
if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
pendingBaseFee := eip1559.CalcBaseFee(pool.chainconfig, reset.newHead)
Expand Down Expand Up @@ -1429,82 +1431,84 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
// of the transaction pool is valid with regard to the chain state.
func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
// If we're reorging an old state, reinject all dropped transactions
var reinject types.Transactions

if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
// If the reorg is too deep, avoid doing it (will happen during fast sync)
oldNum := oldHead.Number.Uint64()
newNum := newHead.Number.Uint64()

if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
log.Debug("Skipping deep transaction reorg", "depth", depth)
} else {
// Reorg seems shallow enough to pull in all transactions into memory
var (
rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
)
if rem == nil {
// This can happen if a setHead is performed, where we simply discard the old
// head from the chain.
// If that is the case, we don't have the lost transactions anymore, and
// there's nothing to add
if newNum >= oldNum {
// If we reorged to a same or higher number, then it's not a case of setHead
log.Warn("Transaction pool reset with missing old head",
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
return
}
// If the reorg ended up on a lower number, it's indicative of setHead being the cause
log.Debug("Skipping transaction reset caused by setHead",
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
// We still need to update the current state s.th. the lost transactions can be readded by the user
} else {
if add == nil {
// if the new head is nil, it means that something happened between
// the firing of newhead-event and _now_: most likely a
// reorg caused by sync-reversion or explicit sethead back to an
// earlier block.
log.Warn("Transaction pool reset with missing new head", "number", newHead.Number, "hash", newHead.Hash())
return
}
var discarded, included types.Transactions
for rem.NumberU64() > add.NumberU64() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
}
for add.NumberU64() > rem.NumberU64() {
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
}
for rem.Hash() != add.Hash() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
}
lost := make([]*types.Transaction, 0, len(discarded))
for _, tx := range types.TxDifference(discarded, included) {
if pool.Filter(tx) {
lost = append(lost, tx)
}
}
reinject = lost
}
}
}
//var reinject types.Transactions
//
//if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
// // If the reorg is too deep, avoid doing it (will happen during fast sync)
// oldNum := oldHead.Number.Uint64()
// newNum := newHead.Number.Uint64()
//
// if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
// log.Debug("Skipping deep transaction reorg", "depth", depth)
// } else {
// // Reorg seems shallow enough to pull in all transactions into memory
// var (
// rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
// add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
// )
// if rem == nil {
// // This can happen if a setHead is performed, where we simply discard the old
// // head from the chain.
// // If that is the case, we don't have the lost transactions anymore, and
// // there's nothing to add
// if newNum >= oldNum {
// // If we reorged to a same or higher number, then it's not a case of setHead
// log.Warn("Transaction pool reset with missing old head",
// "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
// return
// }
// // If the reorg ended up on a lower number, it's indicative of setHead being the cause
// log.Debug("Skipping transaction reset caused by setHead",
// "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
// // We still need to update the current state s.th. the lost transactions can be readded by the user
// } else {
// if add == nil {
// // if the new head is nil, it means that something happened between
// // the firing of newhead-event and _now_: most likely a
// // reorg caused by sync-reversion or explicit sethead back to an
// // earlier block.
// log.Warn("Transaction pool reset with missing new head", "number", newHead.Number, "hash", newHead.Hash())
// return
// }
// var discarded, included types.Transactions
// for rem.NumberU64() > add.NumberU64() {
// discarded = append(discarded, rem.Transactions()...)
// if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
// log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
// return
// }
// }
// for add.NumberU64() > rem.NumberU64() {
// included = append(included, add.Transactions()...)
// if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
// log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
// return
// }
// }
// for rem.Hash() != add.Hash() {
// discarded = append(discarded, rem.Transactions()...)
// if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
// log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
// return
// }
// included = append(included, add.Transactions()...)
// if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
// log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
// return
// }
// }
// lost := make([]*types.Transaction, 0, len(discarded))
// for _, tx := range types.TxDifference(discarded, included) {
// if pool.Filter(tx) {
// lost = append(lost, tx)
// }
// }
// reinject = lost
// }
// }
//}

// TODO - We only care about setting the head
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentBlock() // Special case during testing
Expand All @@ -1518,10 +1522,11 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
pool.currentState = statedb
pool.pendingNonces = newNoncer(statedb)

// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
core.SenderCacher.Recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
//// we don't care about these
//// Inject any transactions discarded due to reorgs
//log.Debug("Reinjecting stale transactions", "count", len(reinject))
//core.SenderCacher.Recover(pool.signer, reinject)
//pool.addTxsLocked(reinject, false)
}

// promoteExecutables moves transactions that have become processable from the
Expand Down Expand Up @@ -1726,6 +1731,87 @@ func (pool *LegacyPool) truncateQueue() {
}
}

func (pool *LegacyPool) clearPendingAndQueued() {
// Iterate over all accounts and demote any non-executable transactions
for addr, list := range pool.queue {
dropped, invalids := list.ClearList()
queuedGauge.Dec(int64(len(dropped) + len(invalids)))

for _, tx := range dropped {
pool.all.Remove(tx.Hash())
}

if list.Empty() {
delete(pool.queue, addr)
}
}

for addr, list := range pool.pending {
dropped, invalids := list.ClearList()
pendingGauge.Dec(int64(len(dropped) + len(invalids)))

for _, tx := range dropped {
pool.all.Remove(tx.Hash())
}

if list.Empty() {
delete(pool.pending, addr)
delete(pool.beats, addr)
}
}

//for addr, list := range pool.pending {
// nonce := pool.currentState.GetNonce(addr)
//
// // Drop all transactions that are deemed too old (low nonce)
// olds := list.Forward(nonce)
// for _, tx := range olds {
// hash := tx.Hash()
// pool.all.Remove(hash)
// log.Trace("Removed old pending transaction", "hash", hash)
// }
// // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
// drops, invalids := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
// for _, tx := range drops {
// hash := tx.Hash()
// log.Trace("Removed unpayable pending transaction", "hash", hash)
// pool.all.Remove(hash)
// }
// pendingNofundsMeter.Mark(int64(len(drops)))
//
// for _, tx := range invalids {
// hash := tx.Hash()
// log.Trace("Demoting pending transaction", "hash", hash)
//
// // Internal shuffle shouldn't touch the lookup set.
// pool.enqueueTx(hash, tx, false, false)
// }
// pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
// if pool.locals.contains(addr) {
// localGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
// }
// // If there's a gap in front, alert (should never happen) and postpone all transactions
// if list.Len() > 0 && list.txs.Get(nonce) == nil {
// gapped := list.Cap(0)
// for _, tx := range gapped {
// hash := tx.Hash()
// log.Error("Demoting invalidated transaction", "hash", hash)
//
// // Internal shuffle shouldn't touch the lookup set.
// pool.enqueueTx(hash, tx, false, false)
// }
// pendingGauge.Dec(int64(len(gapped)))
// }
// // Delete the entire pending entry if it became empty.
// if list.Empty() {
// delete(pool.pending, addr)
// if _, ok := pool.queue[addr]; !ok {
// pool.reserve(addr, false)
// }
// }
//}
}

// demoteUnexecutables removes invalid and processed transactions from the pools
// executable/pending queue and any subsequent transactions that become unexecutable
// are moved back into the future queue.
Expand All @@ -1736,6 +1822,7 @@ func (pool *LegacyPool) truncateQueue() {
func (pool *LegacyPool) demoteUnexecutables() {
// Iterate over all accounts and demote any non-executable transactions
gasLimit := pool.currentHead.Load().GasLimit

for addr, list := range pool.pending {
nonce := pool.currentState.GetNonce(addr)

Expand Down
Loading