diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 9d9256862b63..fc056a76f7f3 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -69,6 +69,7 @@ var ( utils.TxPoolNoLocalsFlag, utils.TxPoolJournalFlag, utils.TxPoolRejournalFlag, + utils.TxPoolBroadcastPendingLocalTxFlag, utils.TxPoolPriceLimitFlag, utils.TxPoolPriceBumpFlag, utils.TxPoolAccountSlotsFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index f079df83b9da..82aa8084d2c8 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -347,11 +347,15 @@ var ( Value: ethconfig.Defaults.TxPool.Rejournal, Category: flags.TxPoolCategory, } + TxPoolBroadcastPendingLocalTxFlag = &cli.DurationFlag{ + Name: "txpool.broadcastpendinglocaltx", + Usage: "Time interval to broadcast the pending local transaction", + Value: legacypool.DefaultConfig.BroadcastPendingLocalTx, + } TxPoolPriceLimitFlag = &cli.Uint64Flag{ - Name: "txpool.pricelimit", - Usage: "Minimum gas price tip to enforce for acceptance into the pool", - Value: ethconfig.Defaults.TxPool.PriceLimit, - Category: flags.TxPoolCategory, + Name: "txpool.pricelimit", + Usage: "Minimum gas price tip to enforce for acceptance into the pool", + Value: ethconfig.Defaults.TxPool.PriceLimit, } TxPoolPriceBumpFlag = &cli.Uint64Flag{ Name: "txpool.pricebump", @@ -1443,6 +1447,9 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) { if ctx.IsSet(TxPoolRejournalFlag.Name) { cfg.Rejournal = ctx.Duration(TxPoolRejournalFlag.Name) } + if ctx.IsSet(TxPoolBroadcastPendingLocalTxFlag.Name) { + cfg.BroadcastPendingLocalTx = ctx.Duration(TxPoolBroadcastPendingLocalTxFlag.Name) + } if ctx.IsSet(TxPoolPriceLimitFlag.Name) { cfg.PriceLimit = ctx.Uint64(TxPoolPriceLimitFlag.Name) } diff --git a/core/events.go b/core/events.go index 4324ac4deb97..8c5532e0b033 100644 --- a/core/events.go +++ b/core/events.go @@ -23,6 +23,9 @@ import ( // NewTxsEvent is posted when a batch of transactions enter the transaction pool. type NewTxsEvent struct{ Txs []*types.Transaction } +// PendingLocalTxsEvent is posted when there are pending local transactions in the transaction pool. +type PendingLocalTxsEvent struct{ Txs []*types.Transaction } + // NewQueuedTxsEvent is posted when a batch of transactions enter the transaction pool. type NewQueuedTxsEvent struct{ Txs []*types.Transaction } diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 435370ee3b6d..0b6ab7471d6f 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -122,10 +122,11 @@ type BlockChain interface { // Config are the configuration parameters of the transaction pool. type Config struct { - Locals []common.Address // Addresses that should be treated by default as local - NoLocals bool // Whether local transaction handling should be disabled - Journal string // Journal of local transactions to survive node restarts - Rejournal time.Duration // Time interval to regenerate the local transaction journal + Locals []common.Address // Addresses that should be treated by default as local + NoLocals bool // Whether local transaction handling should be disabled + Journal string // Journal of local transactions to survive node restarts + Rejournal time.Duration // Time interval to regenerate the local transaction journal + BroadcastPendingLocalTx time.Duration // Time interval to broadcast the local transaction PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce) @@ -140,8 +141,9 @@ type Config struct { // DefaultConfig contains the default configurations for the transaction pool. var DefaultConfig = Config{ - Journal: "transactions.rlp", - Rejournal: time.Hour, + Journal: "transactions.rlp", + Rejournal: time.Hour, + BroadcastPendingLocalTx: 5 * time.Minute, PriceLimit: 1, PriceBump: 10, diff --git a/core/txpool/locals/tx_tracker.go b/core/txpool/locals/tx_tracker.go index a24fcb1f4e38..fe87281ed519 100644 --- a/core/txpool/locals/tx_tracker.go +++ b/core/txpool/locals/tx_tracker.go @@ -22,9 +22,11 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" @@ -46,10 +48,13 @@ type TxTracker struct { all map[common.Hash]*types.Transaction // All tracked transactions byAddr map[common.Address]*legacypool.SortedMap // Transactions by address - journal *journal // Journal of local transaction to back up to disk - rejournal time.Duration // How often to rotate journal - pool *txpool.TxPool // The tx pool to interact with - signer types.Signer + journal *journal // Journal of local transaction to back up to disk + rejournal time.Duration // How often to rotate journal + broadcastPendingLocalTx time.Duration // Time interval to broadcast the local transaction + pool *txpool.TxPool // The tx pool to interact with + signer types.Signer + + pendingLocalTxFeed event.Feed shutdownCh chan struct{} mu sync.Mutex @@ -57,7 +62,8 @@ type TxTracker struct { } // New creates a new TxTracker -func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool) *TxTracker { +func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool, + broadcastPendingLocalTxTime time.Duration) *TxTracker { pool := &TxTracker{ all: make(map[common.Hash]*types.Transaction), byAddr: make(map[common.Address]*legacypool.SortedMap), @@ -69,6 +75,7 @@ func New(journalPath string, journalTime time.Duration, chainConfig *params.Chai pool.journal = newTxJournal(journalPath) pool.rejournal = journalTime } + pool.broadcastPendingLocalTx = broadcastPendingLocalTxTime return pool } @@ -187,6 +194,10 @@ func (tracker *TxTracker) loop() { lastJournal = time.Now() timer = time.NewTimer(10 * time.Second) // Do initial check after 10 seconds, do rechecks more seldom. ) + + pendingLocalTxs := time.NewTicker(tracker.broadcastPendingLocalTx) + defer pendingLocalTxs.Stop() + for { select { case <-tracker.shutdownCh: @@ -207,6 +218,25 @@ func (tracker *TxTracker) loop() { tracker.mu.Unlock() } timer.Reset(recheckInterval) + case <-pendingLocalTxs.C: + lTxs := types.Transactions{} + for addr, lazyTxs := range tracker.pool.Pending(txpool.PendingFilter{OnlyPlainTxs: true}) { + if _, ok := tracker.byAddr[addr]; !ok { + continue + } + for _, lazyTx := range lazyTxs { + lTxs = append(lTxs, lazyTx.Tx) + } + } + + if len(lTxs) > 0 { + go tracker.pendingLocalTxFeed.Send(core.PendingLocalTxsEvent{Txs: lTxs}) + } } } } + +// SubscribePendingLocalTransactions subscribes to pending local transaction events. +func (tracker *TxTracker) SubscribePendingLocalTransactions(ch chan<- core.PendingLocalTxsEvent) event.Subscription { + return tracker.pendingLocalTxFeed.Subscribe(ch) +} diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index eb5e25024b78..034e528bd78d 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -60,6 +60,10 @@ type BlockChain interface { SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription } +type PendingLocalTxsPublisher interface { + SubscribePendingLocalTransactions(ch chan<- core.PendingLocalTxsEvent) event.Subscription +} + // TxPool is an aggregator for various transaction specific pools, collectively // tracking all the transactions deemed interesting by the node. Transactions // enter the pool when they are received from the network or submitted locally. @@ -76,6 +80,8 @@ type TxPool struct { term chan struct{} // Termination channel to detect a closed pool sync chan chan error // Testing / simulator channel to block until internal reset is done + + PendingLocalTxsPublisher PendingLocalTxsPublisher // Publisher for pending local transactions } // New creates a new transaction pool to gather, sort and filter inbound @@ -386,6 +392,16 @@ func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransac return txs } +// SubscribePendingLocalTxsEvent registers a subscription of PendingLocalTxsEvent and +// starts sending event to the given channel. +func (p *TxPool) SubscribePendingLocalTxsEvent(ch chan<- core.PendingLocalTxsEvent) event.Subscription { + var subs []event.Subscription + if p.PendingLocalTxsPublisher != nil { + subs = append(subs, p.PendingLocalTxsPublisher.SubscribePendingLocalTransactions(ch)) + } + return p.subs.Track(event.JoinSubscriptions(subs...)) +} + // SubscribeNewQueuedTxsEvent registers a subscription of NewQueuedTxsEvent and func (p *TxPool) SubscribeNewQueuedTxsEvent(ch chan<- core.NewQueuedTxsEvent) event.Subscription { subs := make([]event.Subscription, len(p.subpools)) diff --git a/eth/backend.go b/eth/backend.go index fea7e4e1fe7c..93197b05e5fb 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -247,8 +247,14 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { log.Warn("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second) rejournal = time.Second } - eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool) + broadcastPendingLocalTx := config.TxPool.BroadcastPendingLocalTx + if broadcastPendingLocalTx < time.Second { + log.Warn("Sanitizing invalid txpool broadcast local tx time", "provided", broadcastPendingLocalTx, "updated", time.Second) + broadcastPendingLocalTx = time.Second + } + eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool, broadcastPendingLocalTx) stack.RegisterLifecycle(eth.localTxTracker) + eth.txPool.PendingLocalTxsPublisher = eth.localTxTracker } // Permit the downloader to use the trie cache allowance during fast sync cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit diff --git a/eth/handler.go b/eth/handler.go index 6ac890902b69..8b23621a3271 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -52,6 +52,10 @@ const ( // All transactions with a higher size will be announced and need to be fetched // by the peer. txMaxBroadcastSize = 4096 + + // pendingLocalTxChanSize is the size of channel listening to NewTxsEvent. + // The number is referenced from the size of tx pool. + pendingLocalTxChanSize = 4096 ) var syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge @@ -78,6 +82,10 @@ type txPool interface { // can decide whether to receive notifications only for newly seen transactions // or also for reorged out ones. SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription + + // SubscribePendingLocalTxsEvent should return an event subscription of + // NewTxsEvent and send events to the given channel. + SubscribePendingLocalTxsEvent(chan<- core.PendingLocalTxsEvent) event.Subscription } // handlerConfig is the collection of initialization parameters to create a full @@ -111,9 +119,11 @@ type handler struct { txFetcher *fetcher.TxFetcher peers *peerSet - eventMux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription + pendingLocalTxsCh chan core.PendingLocalTxsEvent + pendingLocalTxsSub event.Subscription requiredBlocks map[uint64]common.Hash @@ -424,7 +434,10 @@ func (h *handler) Start(maxPeers int) { h.wg.Add(1) h.txsCh = make(chan core.NewTxsEvent, txChanSize) h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false) + h.pendingLocalTxsCh = make(chan core.PendingLocalTxsEvent, pendingLocalTxChanSize) + h.pendingLocalTxsSub = h.txpool.SubscribePendingLocalTxsEvent(h.pendingLocalTxsCh) go h.txBroadcastLoop() + go h.pendingLocalTxBroadcastLoop() // start sync handlers h.txFetcher.Start() @@ -438,6 +451,7 @@ func (h *handler) Stop() { h.txsSub.Unsubscribe() // quits txBroadcastLoop h.txFetcher.Stop() h.downloader.Terminate() + h.pendingLocalTxsSub.Unsubscribe() // Quit chainSync and txsync64. // After this is done, no new peers will be accepted. @@ -531,6 +545,20 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { "bcastpeers", len(txset), "bcastcount", directCount, "annpeers", len(annos), "anncount", annCount) } +// BroadcastPendingLocalTxs will propagate a batch of transactions to all peers +func (h *handler) BroadcastPendingLocalTxs(txs types.Transactions) { + peers := h.peers.Clone() + // Build tx hashes + txHashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + txHashes[i] = tx.Hash() + } + for _, peer := range peers { + peer.AsyncSendTransactions(txHashes) + } + log.Info("Broadcast pending local transaction to all peers", "recipients", len(peers)) +} + // txBroadcastLoop announces new transactions to connected peers. func (h *handler) txBroadcastLoop() { defer h.wg.Done() @@ -557,3 +585,16 @@ func (h *handler) enableSyncedFeatures() { h.snapSync.Store(false) } } + +func (h *handler) pendingLocalTxBroadcastLoop() { + for { + select { + case event := <-h.pendingLocalTxsCh: + h.BroadcastPendingLocalTxs(event.Txs) + + // Err() channel will be closed when unsubscribing. + case <-h.pendingLocalTxsSub.Err(): + return + } + } +} diff --git a/eth/handler_test.go b/eth/handler_test.go index d5d46a3c65a5..6a91f742918a 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -48,10 +48,10 @@ var ( // Its goal is to get around setting up a valid statedb for the balance and nonce // checks. type testTxPool struct { - pool map[common.Hash]*types.Transaction // Hash map of collected transactions - - txFeed event.Feed // Notification feed to allow waiting for inclusion - lock sync.RWMutex // Protects the transaction pool + pool map[common.Hash]*types.Transaction // Hash map of collected transactions + pendingLocalTxFeed event.Feed + txFeed event.Feed // Notification feed to allow waiting for inclusion + lock sync.RWMutex // Protects the transaction pool } // newTestTxPool creates a mock transaction pool. @@ -121,6 +121,10 @@ func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]* return pending } +func (p *testTxPool) SubscribePendingLocalTxsEvent(ch chan<- core.PendingLocalTxsEvent) event.Subscription { + return p.pendingLocalTxFeed.Subscribe(ch) +} + // SubscribeTransactions should return an event subscription of NewTxsEvent and // send events to the given channel. func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { diff --git a/eth/peer.go b/eth/peer.go index 761877771660..e7587c222156 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -57,3 +57,15 @@ func (p *snapPeer) info() *snapPeerInfo { Version: p.Version(), } } + +// Clone clones a peers +func (ps *peerSet) Clone() []*ethPeer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*ethPeer, 0, len(ps.peers)) + for _, p := range ps.peers { + list = append(list, p) + } + return list +}