Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions core/txpool/legacypool/tx_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

// Package legacypool implements the normal EVM execution transaction pool.
package legacypool

import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"golang.org/x/exp/slices"
)

var recheckInterval = 10 * time.Second

// TxTracker is a struct used to track priority transactions; it will check from
// time to time if the main pool has forgotten about any of the transaction
// it is tracking, and if so, submit it again.
// This is used to track 'locals'.
// This struct does not care about transaction validity, price-bumps or account limits,
// but optimistically accepts transactions.
type TxTracker struct {
all map[common.Hash]*types.Transaction // All tracked transactions
byAddr map[common.Address]*sortedMap // Transactions by address

journal *journal // Journal of local transaction to back up to disk
modified bool // Modification tracking
pool txpool.SubPool // The 'main' subpool to interact with
signer types.Signer

shutdownCh chan struct{}
mu sync.Mutex
wg sync.WaitGroup
}

func NewTxTracker(journalPath string, chainConfig *params.ChainConfig, next txpool.SubPool) *TxTracker {
signer := types.LatestSigner(chainConfig)
pool := &TxTracker{
all: make(map[common.Hash]*types.Transaction),
byAddr: make(map[common.Address]*sortedMap),
signer: signer,
shutdownCh: make(chan struct{}),
pool: next,
}
if journalPath != "" {
pool.journal = newTxJournal(journalPath)
}
return pool
}

// Track adds a transaction tx to the tracked set.
func (tracker *TxTracker) Track(tx *types.Transaction) {
tracker.mu.Lock()
defer tracker.mu.Unlock()
// If we're already tracking it, it's a no-op
if _, ok := tracker.all[tx.Hash()]; ok {
return
}
tracker.all[tx.Hash()] = tx
addr, _ := types.Sender(tracker.signer, tx)
if tracker.byAddr[addr] == nil {
tracker.byAddr[addr] = newSortedMap()
}
tracker.byAddr[addr].Put(tx)
tracker.modified = true
}

// recheck checks and returns any transactions that needs to be resubmitted.
func (tracker *TxTracker) recheck() []*txpool.Transaction {
tracker.mu.Lock()
defer tracker.mu.Unlock()

if !tracker.modified {
return nil
}
var resubmits []*txpool.Transaction
for sender, txs := range tracker.byAddr {
stales := txs.Forward(tracker.pool.Nonce(sender))
// Wipe the stales
for _, tx := range stales {
delete(tracker.all, tx.Hash())
}
// Check the non-stale
for _, tx := range txs.Flatten() {
if tracker.pool.Has(tx.Hash()) {
continue
}
resubmits = append(resubmits, &txpool.Transaction{
Tx: tx,
})
}
}

{ // rejournal
txs := make(map[common.Address]types.Transactions)
for _, tx := range tracker.all {
addr, _ := types.Sender(tracker.signer, tx)
txs[addr] = append(txs[addr], tx)
}
// Sort them
for _, list := range txs {
slices.SortFunc(list, func(a, b *types.Transaction) bool {
return a.Nonce() < b.Nonce()
})
}
if err := tracker.journal.rotate(txs); err != nil {
log.Warn("Transaction journal rotation failed", "err", err)
}
}
return resubmits
}

// Start implements node.Lifecycle interface
// Start is called after all services have been constructed and the networking
// layer was also initialized to spawn any goroutines required by the service.
func (tracker *TxTracker) Start() error {
tracker.wg.Add(1)
go tracker.loop()
return nil
}

// Start implements node.Lifecycle interface
// Stop terminates all goroutines belonging to the service, blocking until they
// are all terminated.
func (tracker *TxTracker) Stop() error {
close(tracker.shutdownCh)
tracker.wg.Wait()
return nil
}

func (tracker *TxTracker) loop() {
defer tracker.wg.Done()
t := time.NewTimer(recheckInterval)
for {
select {
case <-tracker.shutdownCh:
return
case <-t.C:
// resubmit
if resubmits := tracker.recheck(); len(resubmits) > 0 {
tracker.pool.Add(resubmits, false, false)
}
t.Reset(recheckInterval)
}
}
}
2 changes: 2 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ func (p *TxPool) Get(hash common.Hash) *Transaction {
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
func (p *TxPool) Add(txs []*Transaction, local bool, sync bool) []error {
// Disable all local-tracking within the pool.
local = false
// Split the input transactions between the subpools. It shouldn't really
// happen that we receive merged batches, but better graceful than strange
// errors.
Expand Down
5 changes: 4 additions & 1 deletion eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,10 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
}

func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.Add([]*txpool.Transaction{&txpool.Transaction{Tx: signedTx}}, true, false)[0]
if locals := b.eth.localTxTracker; locals != nil {
locals.Track(signedTx)
}
return b.eth.txPool.Add([]*txpool.Transaction{&txpool.Transaction{Tx: signedTx}}, false, false)[0]
}

func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
Expand Down
7 changes: 6 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type Ethereum struct {
config *ethconfig.Config

// Handlers
txPool *txpool.TxPool
txPool *txpool.TxPool
localTxTracker *legacypool.TxTracker

blockchain *core.BlockChain
handler *handler
Expand Down Expand Up @@ -209,6 +210,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
}
legacyPool := legacypool.New(config.TxPool, eth.blockchain)
if !config.TxPool.NoLocals {
eth.localTxTracker = legacypool.NewTxTracker(config.TxPool.Journal, eth.blockchain.Config(), legacyPool)
stack.RegisterLifecycle(eth.localTxTracker)
}

eth.txPool, err = txpool.New(new(big.Int).SetUint64(config.TxPool.PriceLimit), eth.blockchain, []txpool.SubPool{legacyPool})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion eth/catalyst/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestEth2AssembleBlock(t *testing.T) {
if err != nil {
t.Fatalf("error signing transaction, err=%v", err)
}
ethservice.TxPool().Add([]*txpool.Transaction{{Tx: tx}}, true, false)
ethservice.TxPool().Add([]*txpool.Transaction{{Tx: tx}}, false, false)
blockParams := engine.PayloadAttributes{
Timestamp: blocks[9].Time() + 5,
}
Expand Down
4 changes: 2 additions & 2 deletions miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ func TestGenerateAndImportBlock(t *testing.T) {
w.start()

for i := 0; i < 5; i++ {
b.txPool.Add([]*txpool.Transaction{{Tx: b.newRandomTx(true)}}, true, false)
b.txPool.Add([]*txpool.Transaction{{Tx: b.newRandomTx(false)}}, true, false)
b.txPool.Add([]*txpool.Transaction{{Tx: b.newRandomTx(true)}}, false, false)
b.txPool.Add([]*txpool.Transaction{{Tx: b.newRandomTx(false)}}, fa;se, false)

select {
case ev := <-sub.Chan():
Expand Down