diff --git a/core/txpool/legacypool/tx_tracker.go b/core/txpool/legacypool/tx_tracker.go new file mode 100644 index 000000000000..ee8a2686984a --- /dev/null +++ b/core/txpool/legacypool/tx_tracker.go @@ -0,0 +1,164 @@ +// Copyright 2023 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package legacypool implements the normal EVM execution transaction pool. +package legacypool + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" + "golang.org/x/exp/slices" +) + +var recheckInterval = 10 * time.Second + +// TxTracker is a struct used to track priority transactions; it will check from +// time to time if the main pool has forgotten about any of the transaction +// it is tracking, and if so, submit it again. +// This is used to track 'locals'. +// This struct does not care about transaction validity, price-bumps or account limits, +// but optimistically accepts transactions. +type TxTracker struct { + all map[common.Hash]*types.Transaction // All tracked transactions + byAddr map[common.Address]*sortedMap // Transactions by address + + journal *journal // Journal of local transaction to back up to disk + modified bool // Modification tracking + pool txpool.SubPool // The 'main' subpool to interact with + signer types.Signer + + shutdownCh chan struct{} + mu sync.Mutex + wg sync.WaitGroup +} + +func NewTxTracker(journalPath string, chainConfig *params.ChainConfig, next txpool.SubPool) *TxTracker { + signer := types.LatestSigner(chainConfig) + pool := &TxTracker{ + all: make(map[common.Hash]*types.Transaction), + byAddr: make(map[common.Address]*sortedMap), + signer: signer, + shutdownCh: make(chan struct{}), + pool: next, + } + if journalPath != "" { + pool.journal = newTxJournal(journalPath) + } + return pool +} + +// Track adds a transaction tx to the tracked set. +func (tracker *TxTracker) Track(tx *types.Transaction) { + tracker.mu.Lock() + defer tracker.mu.Unlock() + // If we're already tracking it, it's a no-op + if _, ok := tracker.all[tx.Hash()]; ok { + return + } + tracker.all[tx.Hash()] = tx + addr, _ := types.Sender(tracker.signer, tx) + if tracker.byAddr[addr] == nil { + tracker.byAddr[addr] = newSortedMap() + } + tracker.byAddr[addr].Put(tx) + tracker.modified = true +} + +// recheck checks and returns any transactions that needs to be resubmitted. +func (tracker *TxTracker) recheck() []*txpool.Transaction { + tracker.mu.Lock() + defer tracker.mu.Unlock() + + if !tracker.modified { + return nil + } + var resubmits []*txpool.Transaction + for sender, txs := range tracker.byAddr { + stales := txs.Forward(tracker.pool.Nonce(sender)) + // Wipe the stales + for _, tx := range stales { + delete(tracker.all, tx.Hash()) + } + // Check the non-stale + for _, tx := range txs.Flatten() { + if tracker.pool.Has(tx.Hash()) { + continue + } + resubmits = append(resubmits, &txpool.Transaction{ + Tx: tx, + }) + } + } + + { // rejournal + txs := make(map[common.Address]types.Transactions) + for _, tx := range tracker.all { + addr, _ := types.Sender(tracker.signer, tx) + txs[addr] = append(txs[addr], tx) + } + // Sort them + for _, list := range txs { + slices.SortFunc(list, func(a, b *types.Transaction) bool { + return a.Nonce() < b.Nonce() + }) + } + if err := tracker.journal.rotate(txs); err != nil { + log.Warn("Transaction journal rotation failed", "err", err) + } + } + return resubmits +} + +// Start implements node.Lifecycle interface +// Start is called after all services have been constructed and the networking +// layer was also initialized to spawn any goroutines required by the service. +func (tracker *TxTracker) Start() error { + tracker.wg.Add(1) + go tracker.loop() + return nil +} + +// Start implements node.Lifecycle interface +// Stop terminates all goroutines belonging to the service, blocking until they +// are all terminated. +func (tracker *TxTracker) Stop() error { + close(tracker.shutdownCh) + tracker.wg.Wait() + return nil +} + +func (tracker *TxTracker) loop() { + defer tracker.wg.Done() + t := time.NewTimer(recheckInterval) + for { + select { + case <-tracker.shutdownCh: + return + case <-t.C: + // resubmit + if resubmits := tracker.recheck(); len(resubmits) > 0 { + tracker.pool.Add(resubmits, false, false) + } + t.Reset(recheckInterval) + } + } +} diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index f2c94563bc4f..a0531c67d506 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -191,6 +191,8 @@ func (p *TxPool) Get(hash common.Hash) *Transaction { // to the large transaction churn, add may postpone fully integrating the tx // to a later point to batch multiple ones together. func (p *TxPool) Add(txs []*Transaction, local bool, sync bool) []error { + // Disable all local-tracking within the pool. + local = false // Split the input transactions between the subpools. It shouldn't really // happen that we receive merged batches, but better graceful than strange // errors. diff --git a/eth/api_backend.go b/eth/api_backend.go index 02d1946e8e42..bcc0b44d8dda 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -294,7 +294,10 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri } func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { - return b.eth.txPool.Add([]*txpool.Transaction{&txpool.Transaction{Tx: signedTx}}, true, false)[0] + if locals := b.eth.localTxTracker; locals != nil { + locals.Track(signedTx) + } + return b.eth.txPool.Add([]*txpool.Transaction{&txpool.Transaction{Tx: signedTx}}, false, false)[0] } func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { diff --git a/eth/backend.go b/eth/backend.go index 8d6977205e2c..eb88c73a21f0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -67,7 +67,8 @@ type Ethereum struct { config *ethconfig.Config // Handlers - txPool *txpool.TxPool + txPool *txpool.TxPool + localTxTracker *legacypool.TxTracker blockchain *core.BlockChain handler *handler @@ -209,6 +210,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal) } legacyPool := legacypool.New(config.TxPool, eth.blockchain) + if !config.TxPool.NoLocals { + eth.localTxTracker = legacypool.NewTxTracker(config.TxPool.Journal, eth.blockchain.Config(), legacyPool) + stack.RegisterLifecycle(eth.localTxTracker) + } eth.txPool, err = txpool.New(new(big.Int).SetUint64(config.TxPool.PriceLimit), eth.blockchain, []txpool.SubPool{legacyPool}) if err != nil { diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index 254a510ab40f..36724acb19f1 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -107,7 +107,7 @@ func TestEth2AssembleBlock(t *testing.T) { if err != nil { t.Fatalf("error signing transaction, err=%v", err) } - ethservice.TxPool().Add([]*txpool.Transaction{{Tx: tx}}, true, false) + ethservice.TxPool().Add([]*txpool.Transaction{{Tx: tx}}, false, false) blockParams := engine.PayloadAttributes{ Timestamp: blocks[9].Time() + 5, } diff --git a/miner/worker_test.go b/miner/worker_test.go index 80557d99bfcf..24084819eaf1 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -194,8 +194,8 @@ func TestGenerateAndImportBlock(t *testing.T) { w.start() for i := 0; i < 5; i++ { - b.txPool.Add([]*txpool.Transaction{{Tx: b.newRandomTx(true)}}, true, false) - b.txPool.Add([]*txpool.Transaction{{Tx: b.newRandomTx(false)}}, true, false) + b.txPool.Add([]*txpool.Transaction{{Tx: b.newRandomTx(true)}}, false, false) + b.txPool.Add([]*txpool.Transaction{{Tx: b.newRandomTx(false)}}, fa;se, false) select { case ev := <-sub.Chan():