diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 5bd484fcc..b9bd76e93 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -338,6 +338,8 @@ func (pool *LegacyPool) ClearAstriaOrdered() { if pool.astria == nil { return } + pool.mu.Lock() + defer pool.mu.Unlock() astriaExcludedFromBlockMeter.Mark(int64(len(pool.astria.excludedFromBlock))) for _, tx := range pool.astria.excludedFromBlock { @@ -1370,11 +1372,11 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, delete(events, addr) } } - // Reset needs promote for all addresses - promoteAddrs = make([]common.Address, 0, len(pool.queue)) - for addr := range pool.queue { - promoteAddrs = append(promoteAddrs, addr) - } + /// bharath: don't promote any addresses since we are going to be clearing the mempool + //promoteAddrs = make([]common.Address, 0, len(pool.queue)) + //for addr := range pool.queue { + // promoteAddrs = append(promoteAddrs, addr) + //} } // Check for pending transactions for every account that sent new ones promoted := pool.promoteExecutables(promoteAddrs) @@ -1383,7 +1385,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, // remove any transaction that has been included in the block or was invalidated // because of another transaction (e.g. higher gas price). if reset != nil { - pool.demoteUnexecutables() + pool.clearPendingAndQueued() if reset.newHead != nil { if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) { pendingBaseFee := eip1559.CalcBaseFee(pool.chainconfig, reset.newHead) @@ -1429,82 +1431,84 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, // of the transaction pool is valid with regard to the chain state. func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { // If we're reorging an old state, reinject all dropped transactions - var reinject types.Transactions - - if oldHead != nil && oldHead.Hash() != newHead.ParentHash { - // If the reorg is too deep, avoid doing it (will happen during fast sync) - oldNum := oldHead.Number.Uint64() - newNum := newHead.Number.Uint64() - - if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { - log.Debug("Skipping deep transaction reorg", "depth", depth) - } else { - // Reorg seems shallow enough to pull in all transactions into memory - var ( - rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64()) - add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) - ) - if rem == nil { - // This can happen if a setHead is performed, where we simply discard the old - // head from the chain. - // If that is the case, we don't have the lost transactions anymore, and - // there's nothing to add - if newNum >= oldNum { - // If we reorged to a same or higher number, then it's not a case of setHead - log.Warn("Transaction pool reset with missing old head", - "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) - return - } - // If the reorg ended up on a lower number, it's indicative of setHead being the cause - log.Debug("Skipping transaction reset caused by setHead", - "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) - // We still need to update the current state s.th. the lost transactions can be readded by the user - } else { - if add == nil { - // if the new head is nil, it means that something happened between - // the firing of newhead-event and _now_: most likely a - // reorg caused by sync-reversion or explicit sethead back to an - // earlier block. - log.Warn("Transaction pool reset with missing new head", "number", newHead.Number, "hash", newHead.Hash()) - return - } - var discarded, included types.Transactions - for rem.NumberU64() > add.NumberU64() { - discarded = append(discarded, rem.Transactions()...) - if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { - log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) - return - } - } - for add.NumberU64() > rem.NumberU64() { - included = append(included, add.Transactions()...) - if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { - log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) - return - } - } - for rem.Hash() != add.Hash() { - discarded = append(discarded, rem.Transactions()...) - if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { - log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) - return - } - included = append(included, add.Transactions()...) - if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { - log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) - return - } - } - lost := make([]*types.Transaction, 0, len(discarded)) - for _, tx := range types.TxDifference(discarded, included) { - if pool.Filter(tx) { - lost = append(lost, tx) - } - } - reinject = lost - } - } - } + //var reinject types.Transactions + // + //if oldHead != nil && oldHead.Hash() != newHead.ParentHash { + // // If the reorg is too deep, avoid doing it (will happen during fast sync) + // oldNum := oldHead.Number.Uint64() + // newNum := newHead.Number.Uint64() + // + // if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { + // log.Debug("Skipping deep transaction reorg", "depth", depth) + // } else { + // // Reorg seems shallow enough to pull in all transactions into memory + // var ( + // rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64()) + // add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) + // ) + // if rem == nil { + // // This can happen if a setHead is performed, where we simply discard the old + // // head from the chain. + // // If that is the case, we don't have the lost transactions anymore, and + // // there's nothing to add + // if newNum >= oldNum { + // // If we reorged to a same or higher number, then it's not a case of setHead + // log.Warn("Transaction pool reset with missing old head", + // "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) + // return + // } + // // If the reorg ended up on a lower number, it's indicative of setHead being the cause + // log.Debug("Skipping transaction reset caused by setHead", + // "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) + // // We still need to update the current state s.th. the lost transactions can be readded by the user + // } else { + // if add == nil { + // // if the new head is nil, it means that something happened between + // // the firing of newhead-event and _now_: most likely a + // // reorg caused by sync-reversion or explicit sethead back to an + // // earlier block. + // log.Warn("Transaction pool reset with missing new head", "number", newHead.Number, "hash", newHead.Hash()) + // return + // } + // var discarded, included types.Transactions + // for rem.NumberU64() > add.NumberU64() { + // discarded = append(discarded, rem.Transactions()...) + // if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { + // log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) + // return + // } + // } + // for add.NumberU64() > rem.NumberU64() { + // included = append(included, add.Transactions()...) + // if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { + // log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) + // return + // } + // } + // for rem.Hash() != add.Hash() { + // discarded = append(discarded, rem.Transactions()...) + // if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { + // log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) + // return + // } + // included = append(included, add.Transactions()...) + // if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { + // log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) + // return + // } + // } + // lost := make([]*types.Transaction, 0, len(discarded)) + // for _, tx := range types.TxDifference(discarded, included) { + // if pool.Filter(tx) { + // lost = append(lost, tx) + // } + // } + // reinject = lost + // } + // } + //} + + // TODO - We only care about setting the head // Initialize the internal state to the current head if newHead == nil { newHead = pool.chain.CurrentBlock() // Special case during testing @@ -1518,10 +1522,11 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { pool.currentState = statedb pool.pendingNonces = newNoncer(statedb) - // Inject any transactions discarded due to reorgs - log.Debug("Reinjecting stale transactions", "count", len(reinject)) - core.SenderCacher.Recover(pool.signer, reinject) - pool.addTxsLocked(reinject, false) + //// we don't care about these + //// Inject any transactions discarded due to reorgs + //log.Debug("Reinjecting stale transactions", "count", len(reinject)) + //core.SenderCacher.Recover(pool.signer, reinject) + //pool.addTxsLocked(reinject, false) } // promoteExecutables moves transactions that have become processable from the @@ -1726,6 +1731,87 @@ func (pool *LegacyPool) truncateQueue() { } } +func (pool *LegacyPool) clearPendingAndQueued() { + // Iterate over all accounts and demote any non-executable transactions + for addr, list := range pool.queue { + dropped, invalids := list.ClearList() + queuedGauge.Dec(int64(len(dropped) + len(invalids))) + + for _, tx := range dropped { + pool.all.Remove(tx.Hash()) + } + + if list.Empty() { + delete(pool.queue, addr) + } + } + + for addr, list := range pool.pending { + dropped, invalids := list.ClearList() + pendingGauge.Dec(int64(len(dropped) + len(invalids))) + + for _, tx := range dropped { + pool.all.Remove(tx.Hash()) + } + + if list.Empty() { + delete(pool.pending, addr) + delete(pool.beats, addr) + } + } + + //for addr, list := range pool.pending { + // nonce := pool.currentState.GetNonce(addr) + // + // // Drop all transactions that are deemed too old (low nonce) + // olds := list.Forward(nonce) + // for _, tx := range olds { + // hash := tx.Hash() + // pool.all.Remove(hash) + // log.Trace("Removed old pending transaction", "hash", hash) + // } + // // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later + // drops, invalids := list.Filter(pool.currentState.GetBalance(addr), gasLimit) + // for _, tx := range drops { + // hash := tx.Hash() + // log.Trace("Removed unpayable pending transaction", "hash", hash) + // pool.all.Remove(hash) + // } + // pendingNofundsMeter.Mark(int64(len(drops))) + // + // for _, tx := range invalids { + // hash := tx.Hash() + // log.Trace("Demoting pending transaction", "hash", hash) + // + // // Internal shuffle shouldn't touch the lookup set. + // pool.enqueueTx(hash, tx, false, false) + // } + // pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) + // if pool.locals.contains(addr) { + // localGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) + // } + // // If there's a gap in front, alert (should never happen) and postpone all transactions + // if list.Len() > 0 && list.txs.Get(nonce) == nil { + // gapped := list.Cap(0) + // for _, tx := range gapped { + // hash := tx.Hash() + // log.Error("Demoting invalidated transaction", "hash", hash) + // + // // Internal shuffle shouldn't touch the lookup set. + // pool.enqueueTx(hash, tx, false, false) + // } + // pendingGauge.Dec(int64(len(gapped))) + // } + // // Delete the entire pending entry if it became empty. + // if list.Empty() { + // delete(pool.pending, addr) + // if _, ok := pool.queue[addr]; !ok { + // pool.reserve(addr, false) + // } + // } + //} +} + // demoteUnexecutables removes invalid and processed transactions from the pools // executable/pending queue and any subsequent transactions that become unexecutable // are moved back into the future queue. @@ -1736,6 +1822,7 @@ func (pool *LegacyPool) truncateQueue() { func (pool *LegacyPool) demoteUnexecutables() { // Iterate over all accounts and demote any non-executable transactions gasLimit := pool.currentHead.Load().GasLimit + for addr, list := range pool.pending { nonce := pool.currentState.GetNonce(addr) diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index c86991c94..63f5b21b2 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -13,7 +13,6 @@ // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . - package legacypool import ( @@ -301,7 +300,8 @@ func TestStateChangeDuringReset(t *testing.T) { <-pool.requestReset(nil, nil) nonce = pool.Nonce(address) - if nonce != 2 { + // mempool is cleared + if nonce != 0 { t.Fatalf("Invalid nonce, want 2, got %d", nonce) } } @@ -489,6 +489,67 @@ func TestChainFork(t *testing.T) { } } +func TestRemoveTxSanity(t *testing.T) { + t.Parallel() + + pool, key := setupPool() + defer pool.Close() + + addr := crypto.PubkeyToAddress(key.PublicKey) + resetState := func() { + statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) + statedb.AddBalance(addr, uint256.NewInt(100000000000000), tracing.BalanceChangeUnspecified) + + pool.chain = newTestBlockChain(pool.chainconfig, 1000000, statedb, new(event.Feed)) + <-pool.requestReset(nil, nil) + } + resetState() + + tx1 := transaction(0, 100000, key) + tx2 := transaction(1, 100000, key) + tx3 := transaction(2, 100000, key) + + if err := pool.addLocal(tx1); err != nil { + t.Error("didn't expect error", err) + } + if err := pool.addLocal(tx2); err != nil { + t.Error("didn't expect error", err) + } + if err := pool.addLocal(tx3); err != nil { + t.Error("didn't expect error", err) + } + + pendingTxs := pool.pending[addr] + if pendingTxs.Len() != 3 { + t.Error("expected 3 pending transactions, got", pendingTxs.Len()) + } + + if err := validatePoolInternals(pool); err != nil { + t.Errorf("pool internals validation failed: %v", err) + } + + n := pool.removeTx(tx1.Hash(), false, true) + if n != 3 { + t.Error("expected 3 transactions to be removed, got", n) + } + n = pool.removeTx(tx2.Hash(), false, true) + if n != 0 { + t.Error("expected 0 transactions to be removed, got", n) + } + n = pool.removeTx(tx3.Hash(), false, true) + if n != 0 { + t.Error("expected 0 transactions to be removed, got", n) + } + + if len(pool.pending) != 0 { + t.Error("expected 0 pending transactions, got", pendingTxs.Len()) + } + + if err := validatePoolInternals(pool); err != nil { + t.Errorf("pool internals validation failed: %v", err) + } +} + func TestDoubleNonce(t *testing.T) { t.Parallel() @@ -635,58 +696,17 @@ func TestDropping(t *testing.T) { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6) } <-pool.requestReset(nil, nil) - if pool.pending[account].Len() != 3 { - t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3) - } - if pool.queue[account].Len() != 3 { - t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3) - } - if pool.all.Count() != 6 { - t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6) - } - // Reduce the balance of the account, and check that invalidated transactions are dropped - testAddBalance(pool, account, big.NewInt(-650)) - <-pool.requestReset(nil, nil) - if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { - t.Errorf("funded pending transaction missing: %v", tx0) - } - if _, ok := pool.pending[account].txs.items[tx1.Nonce()]; !ok { - t.Errorf("funded pending transaction missing: %v", tx0) - } - if _, ok := pool.pending[account].txs.items[tx2.Nonce()]; ok { - t.Errorf("out-of-fund pending transaction present: %v", tx1) - } - if _, ok := pool.queue[account].txs.items[tx10.Nonce()]; !ok { - t.Errorf("funded queued transaction missing: %v", tx10) - } - if _, ok := pool.queue[account].txs.items[tx11.Nonce()]; !ok { - t.Errorf("funded queued transaction missing: %v", tx10) - } - if _, ok := pool.queue[account].txs.items[tx12.Nonce()]; ok { - t.Errorf("out-of-fund queued transaction present: %v", tx11) - } - if pool.all.Count() != 4 { - t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4) - } - // Reduce the block gas limit, check that invalidated transactions are dropped - pool.chain.(*testBlockChain).gasLimit.Store(100) - <-pool.requestReset(nil, nil) + pending, queued := pool.Stats() - if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { - t.Errorf("funded pending transaction missing: %v", tx0) - } - if _, ok := pool.pending[account].txs.items[tx1.Nonce()]; ok { - t.Errorf("over-gased pending transaction present: %v", tx1) - } - if _, ok := pool.queue[account].txs.items[tx10.Nonce()]; !ok { - t.Errorf("funded queued transaction missing: %v", tx10) + if pending != 0 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) } - if _, ok := pool.queue[account].txs.items[tx11.Nonce()]; ok { - t.Errorf("over-gased queued transaction present: %v", tx11) + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) } - if pool.all.Count() != 2 { - t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 2) + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) } } @@ -743,64 +763,18 @@ func TestPostponing(t *testing.T) { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs)) } <-pool.requestReset(nil, nil) - if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) { - t.Errorf("pending transaction mismatch: have %d, want %d", pending, len(txs)) - } - if len(pool.queue) != 0 { - t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue), 0) - } - if pool.all.Count() != len(txs) { - t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs)) - } - // Reduce the balance of the account, and check that transactions are reorganised - for _, addr := range accs { - testAddBalance(pool, addr, big.NewInt(-1)) - } - <-pool.requestReset(nil, nil) - // The first account's first transaction remains valid, check that subsequent - // ones are either filtered out, or queued up for later. - if _, ok := pool.pending[accs[0]].txs.items[txs[0].Nonce()]; !ok { - t.Errorf("tx %d: valid and funded transaction missing from pending pool: %v", 0, txs[0]) - } - if _, ok := pool.queue[accs[0]].txs.items[txs[0].Nonce()]; ok { - t.Errorf("tx %d: valid and funded transaction present in future queue: %v", 0, txs[0]) - } - for i, tx := range txs[1:100] { - if i%2 == 1 { - if _, ok := pool.pending[accs[0]].txs.items[tx.Nonce()]; ok { - t.Errorf("tx %d: valid but future transaction present in pending pool: %v", i+1, tx) - } - if _, ok := pool.queue[accs[0]].txs.items[tx.Nonce()]; !ok { - t.Errorf("tx %d: valid but future transaction missing from future queue: %v", i+1, tx) - } - } else { - if _, ok := pool.pending[accs[0]].txs.items[tx.Nonce()]; ok { - t.Errorf("tx %d: out-of-fund transaction present in pending pool: %v", i+1, tx) - } - if _, ok := pool.queue[accs[0]].txs.items[tx.Nonce()]; ok { - t.Errorf("tx %d: out-of-fund transaction present in future queue: %v", i+1, tx) - } - } - } - // The second account's first transaction got invalid, check that all transactions - // are either filtered out, or queued up for later. - if pool.pending[accs[1]] != nil { - t.Errorf("invalidated account still has pending transactions") + pending, queued := pool.Stats() + + if pending != 0 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) } - for i, tx := range txs[100:] { - if i%2 == 1 { - if _, ok := pool.queue[accs[1]].txs.items[tx.Nonce()]; !ok { - t.Errorf("tx %d: valid but future transaction missing from future queue: %v", 100+i, tx) - } - } else { - if _, ok := pool.queue[accs[1]].txs.items[tx.Nonce()]; ok { - t.Errorf("tx %d: out-of-fund transaction present in future queue: %v", 100+i, tx) - } - } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) } - if pool.all.Count() != len(txs)/2 { - t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs)/2) + + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) } } @@ -993,6 +967,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) { func TestQueueTimeLimiting(t *testing.T) { testQueueTimeLimiting(t, false) } + func TestQueueTimeLimitingNoLocals(t *testing.T) { testQueueTimeLimiting(t, true) } @@ -1090,55 +1065,6 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } - - // Queue gapped transactions - if err := pool.addLocal(pricedTransaction(4, 100000, big.NewInt(1), local)); err != nil { - t.Fatalf("failed to add remote transaction: %v", err) - } - if err := pool.addRemoteSync(pricedTransaction(4, 100000, big.NewInt(1), remote)); err != nil { - t.Fatalf("failed to add remote transaction: %v", err) - } - time.Sleep(5 * evictionInterval) // A half lifetime pass - - // Queue executable transactions, the life cycle should be restarted. - if err := pool.addLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil { - t.Fatalf("failed to add remote transaction: %v", err) - } - if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), remote)); err != nil { - t.Fatalf("failed to add remote transaction: %v", err) - } - time.Sleep(6 * evictionInterval) - - // All gapped transactions shouldn't be kicked out - pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) - } - if queued != 2 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } - - // The whole life time pass after last promotion, kick out stale transactions - time.Sleep(2 * config.Lifetime) - pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) - } - if nolocals { - if queued != 0 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) - } - } else { - if queued != 1 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) - } - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } } // Tests that even if the transaction count belonging to a single account goes @@ -2344,6 +2270,7 @@ func TestReplacementDynamicFee(t *testing.T) { // Tests that local transactions are journaled to disk, but remote transactions // get discarded between restarts. +// TODO - fix this func TestJournaling(t *testing.T) { testJournaling(t, false) } func TestJournalingNoLocals(t *testing.T) { testJournaling(t, true) } @@ -2439,18 +2366,13 @@ func testJournaling(t *testing.T, nolocals bool) { pool = New(config, blockchain) pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + // tx mempool is cleared out completely after a reset pending, queued = pool.Stats() if pending != 0 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) } - if nolocals { - if queued != 0 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) - } - } else { - if queued != 1 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) - } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) } if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) diff --git a/core/txpool/legacypool/list.go b/core/txpool/legacypool/list.go index b749db44d..6b1a48b15 100644 --- a/core/txpool/legacypool/list.go +++ b/core/txpool/legacypool/list.go @@ -396,6 +396,35 @@ func (l *list) Filter(costLimit *uint256.Int, gasLimit uint64) (types.Transactio return removed, invalids } +func (l *list) ClearList() (types.Transactions, types.Transactions) { + // Filter out all the transactions + removed := l.txs.Filter(func(tx *types.Transaction) bool { + return true + }) + + if len(removed) == 0 { + return nil, nil + } + + // TODO: we might not need the code below + var invalids types.Transactions + // If the list was strict, filter anything above the lowest nonce + if l.strict { + lowest := uint64(math.MaxUint64) + for _, tx := range removed { + if nonce := tx.Nonce(); lowest > nonce { + lowest = nonce + } + } + invalids = l.txs.filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest }) + } + // Reset total cost + l.subTotalCost(removed) + l.subTotalCost(invalids) + l.txs.reheap() + return removed, invalids +} + // Cap places a hard limit on the number of items, returning all transactions // exceeding that limit. func (l *list) Cap(threshold int) types.Transactions { diff --git a/core/txpool/legacypool/list_test.go b/core/txpool/legacypool/list_test.go index 8587c66f7..b46574867 100644 --- a/core/txpool/legacypool/list_test.go +++ b/core/txpool/legacypool/list_test.go @@ -13,7 +13,6 @@ // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . - package legacypool import ( diff --git a/grpc/execution/server_test.go b/grpc/execution/server_test.go index f1a844fd4..272297d13 100644 --- a/grpc/execution/server_test.go +++ b/grpc/execution/server_test.go @@ -18,6 +18,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "math/big" "testing" + "time" ) func TestExecutionService_GetGenesisInfo(t *testing.T) { @@ -475,3 +476,139 @@ func TestExecutionServiceServerV1Alpha2_ExecuteBlockAndUpdateCommitment(t *testi balanceDiff := new(uint256.Int).Sub(chainDestinationAddressBalanceAfter, chainDestinationAddressBalanceBefore) require.True(t, balanceDiff.Cmp(uint256.NewInt(1000000000000000000)) == 0, "Chain destination address balance is not correct") } + +// Check that invalid transactions are not added into a block and are removed from the mempool +func TestExecutionServiceServerV1Alpha2_ExecuteBlockAndUpdateCommitmentWithInvalidTransactions(t *testing.T) { + ethservice, serviceV1Alpha1 := setupExecutionService(t, 10) + + // call genesis info + genesisInfo, err := serviceV1Alpha1.GetGenesisInfo(context.Background(), &astriaPb.GetGenesisInfoRequest{}) + require.Nil(t, err, "GetGenesisInfo failed") + require.NotNil(t, genesisInfo, "GenesisInfo is nil") + + // call get commitment state + commitmentState, err := serviceV1Alpha1.GetCommitmentState(context.Background(), &astriaPb.GetCommitmentStateRequest{}) + require.Nil(t, err, "GetCommitmentState failed") + require.NotNil(t, commitmentState, "CommitmentState is nil") + + ethservice.BlockChain().SetSafe(ethservice.BlockChain().CurrentBlock()) + + // get previous block hash + previousBlock := ethservice.BlockChain().CurrentSafeBlock() + require.NotNil(t, previousBlock, "Previous block not found") + + gasLimit := ethservice.BlockChain().GasLimit() + + stateDb, err := ethservice.BlockChain().StateAt(previousBlock.Root) + require.Nil(t, err, "Failed to get state db") + + latestNonce := stateDb.GetNonce(testAddr) + + // create 5 txs + txs := []*types.Transaction{} + marshalledTxs := []*sequencerblockv1alpha1.RollupData{} + for i := 0; i < 5; i++ { + unsignedTx := types.NewTransaction(latestNonce+uint64(i), testToAddress, big.NewInt(1), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil) + tx, err := types.SignTx(unsignedTx, types.LatestSigner(ethservice.BlockChain().Config()), testKey) + require.Nil(t, err, "Failed to sign tx") + txs = append(txs, tx) + + marshalledTx, err := tx.MarshalBinary() + require.Nil(t, err, "Failed to marshal tx") + marshalledTxs = append(marshalledTxs, &sequencerblockv1alpha1.RollupData{ + Value: &sequencerblockv1alpha1.RollupData_SequencedData{SequencedData: marshalledTx}, + }) + } + + // add a tx with lesser gas than the base gas + unsignedTx := types.NewTransaction(latestNonce+uint64(5), testToAddress, big.NewInt(1), gasLimit, big.NewInt(params.InitialBaseFee*2), nil) + tx, err := types.SignTx(unsignedTx, types.LatestSigner(ethservice.BlockChain().Config()), testKey) + require.Nil(t, err, "Failed to sign tx") + txs = append(txs, tx) + + marshalledTx, err := tx.MarshalBinary() + require.Nil(t, err, "Failed to marshal tx") + marshalledTxs = append(marshalledTxs, &sequencerblockv1alpha1.RollupData{ + Value: &sequencerblockv1alpha1.RollupData_SequencedData{SequencedData: marshalledTx}, + }) + + errors := ethservice.TxPool().Add(txs, true, false) + for _, err := range errors { + require.Nil(t, err, "Failed to add tx to pool") + } + + pending, queued := ethservice.TxPool().Stats() + require.Equal(t, 6, pending, "Pending txs should be 6") + require.Equal(t, 0, queued, "Queued txs should be 0") + + executeBlockReq := &astriaPb.ExecuteBlockRequest{ + PrevBlockHash: previousBlock.Hash().Bytes(), + Timestamp: ×tamppb.Timestamp{ + Seconds: int64(previousBlock.Time + 2), + }, + Transactions: marshalledTxs, + } + + executeBlockRes, err := serviceV1Alpha1.ExecuteBlock(context.Background(), executeBlockReq) + require.Nil(t, err, "ExecuteBlock failed") + + require.NotNil(t, executeBlockRes, "ExecuteBlock response is nil") + + // check if astria ordered txs are cleared + astriaOrdered := ethservice.TxPool().AstriaOrdered() + require.Equal(t, 0, astriaOrdered.Len(), "AstriaOrdered should be empty") + + // call update commitment state to set the block we executed as soft and firm + updateCommitmentStateReq := &astriaPb.UpdateCommitmentStateRequest{ + CommitmentState: &astriaPb.CommitmentState{ + Soft: &astriaPb.Block{ + Hash: executeBlockRes.Hash, + ParentBlockHash: executeBlockRes.ParentBlockHash, + Number: executeBlockRes.Number, + Timestamp: executeBlockRes.Timestamp, + }, + Firm: &astriaPb.Block{ + Hash: executeBlockRes.Hash, + ParentBlockHash: executeBlockRes.ParentBlockHash, + Number: executeBlockRes.Number, + Timestamp: executeBlockRes.Timestamp, + }, + BaseCelestiaHeight: commitmentState.BaseCelestiaHeight + 1, + }, + } + + updateCommitmentStateRes, err := serviceV1Alpha1.UpdateCommitmentState(context.Background(), updateCommitmentStateReq) + require.Nil(t, err, "UpdateCommitmentState failed") + require.NotNil(t, updateCommitmentStateRes, "UpdateCommitmentState response should not be nil") + require.Equal(t, updateCommitmentStateRes, updateCommitmentStateReq.CommitmentState, "CommitmentState response should match request") + + // get the soft and firm block + softBlock := ethservice.BlockChain().CurrentSafeBlock() + require.NotNil(t, softBlock, "SoftBlock is nil") + firmBlock := ethservice.BlockChain().CurrentFinalBlock() + require.NotNil(t, firmBlock, "FirmBlock is nil") + + block := ethservice.BlockChain().GetBlockByNumber(softBlock.Number.Uint64()) + require.NotNil(t, block, "Soft Block not found") + require.Equal(t, block.Transactions().Len(), 5, "Soft Block should have 5 txs") + + // give the tx loop time to run + time.Sleep(1 * time.Millisecond) + + // after the tx loop is run, all pending txs should be removed + pending, queued = ethservice.TxPool().Stats() + require.Equal(t, 0, pending, "Pending txs should be 0") + require.Equal(t, 0, queued, "Queued txs should be 0") + + // check if the soft and firm block are set correctly + require.True(t, bytes.Equal(softBlock.Hash().Bytes(), updateCommitmentStateRes.Soft.Hash), "Soft Block Hashes do not match") + require.True(t, bytes.Equal(softBlock.ParentHash.Bytes(), updateCommitmentStateRes.Soft.ParentBlockHash), "Soft Block Parent Hash do not match") + require.Equal(t, softBlock.Number.Uint64(), uint64(updateCommitmentStateRes.Soft.Number), "Soft Block Number do not match") + + require.True(t, bytes.Equal(firmBlock.Hash().Bytes(), updateCommitmentStateRes.Firm.Hash), "Firm Block Hashes do not match") + require.True(t, bytes.Equal(firmBlock.ParentHash.Bytes(), updateCommitmentStateRes.Firm.ParentBlockHash), "Firm Block Parent Hash do not match") + require.Equal(t, firmBlock.Number.Uint64(), uint64(updateCommitmentStateRes.Firm.Number), "Firm Block Number do not match") + + celestiaBaseHeight := ethservice.BlockChain().CurrentBaseCelestiaHeight() + require.Equal(t, celestiaBaseHeight, updateCommitmentStateRes.BaseCelestiaHeight, "BaseCelestiaHeight should be updated in db") +}