Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*ty
return logs, nil
}

func (fb *filterBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
func (fb *filterBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
<-quit
return nil
Expand Down
7 changes: 2 additions & 5 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)

// TxPreEvent is posted when a transaction enters the transaction pool.
type TxPreEvent struct{ Tx *types.Transaction }
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

// PendingLogsEvent is posted pre mining and notifies of pending logs.
type PendingLogsEvent struct {
Expand All @@ -35,9 +35,6 @@ type PendingStateEvent struct{}
// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }

// RemovedTransactionEvent is posted when a reorg happens
type RemovedTransactionEvent struct{ Txs types.Transactions }

// RemovedLogsEvent is posted when a reorg happens
type RemovedLogsEvent struct{ Logs []*types.Log }

Expand Down
31 changes: 24 additions & 7 deletions core/tx_journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func newTxJournal(path string) *txJournal {

// load parses a transaction journal dump from disk, loading its contents into
// the specified pool.
func (journal *txJournal) load(add func(*types.Transaction) error) error {
func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
// Skip the parsing if the journal file doens't exist at all
if _, err := os.Stat(journal.path); os.IsNotExist(err) {
return nil
Expand All @@ -76,22 +76,39 @@ func (journal *txJournal) load(add func(*types.Transaction) error) error {
stream := rlp.NewStream(input, 0)
total, dropped := 0, 0

var failure error
// Create a method to load a limited batch of transactions and bump the
// appropriate progress counters. Then use this method to load all the
// journalled transactions in small-ish batches.
loadBatch := func(txs types.Transactions) {
for _, err := range add(txs) {
if err != nil {
log.Debug("Failed to add journaled transaction", "err", err)
dropped++
}
}
}
var (
failure error
batch types.Transactions
)
for {
// Parse the next transaction and terminate on error
tx := new(types.Transaction)
if err = stream.Decode(tx); err != nil {
if err != io.EOF {
failure = err
}
if batch.Len() > 0 {
loadBatch(batch)
}
break
}
// Import the transaction and bump the appropriate progress counters
// New transaction parsed, queue up for later, import if threnshold is reached
total++
if err = add(tx); err != nil {
log.Debug("Failed to add journaled transaction", "err", err)
dropped++
continue

if batch = append(batch, tx); batch.Len() > 1024 {
loadBatch(batch)
batch = batch[:0]
}
}
log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped)
Expand Down
30 changes: 20 additions & 10 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
if !config.NoLocals && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)

if err := pool.journal.load(pool.AddLocal); err != nil {
if err := pool.journal.load(pool.AddLocals); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
Expand Down Expand Up @@ -444,9 +444,9 @@ func (pool *TxPool) Stop() {
log.Info("Transaction pool stopped")
}

// SubscribeTxPreEvent registers a subscription of TxPreEvent and
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription {
func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}

Expand Down Expand Up @@ -653,7 +653,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

// We've directly injected a replacement transaction, notify subsystems
go pool.txFeed.Send(TxPreEvent{tx})
go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})

return old != nil, nil
}
Expand Down Expand Up @@ -712,10 +712,11 @@ func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
}
}

// promoteTx adds a transaction to the pending (processable) list of transactions.
// promoteTx adds a transaction to the pending (processable) list of transactions
// and returns whether it was inserted or an older was better.
//
// Note, this method assumes the pool lock is held!
func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) {
func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
// Try to insert the transaction into the pending queue
if pool.pending[addr] == nil {
pool.pending[addr] = newTxList(true)
Expand All @@ -729,7 +730,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
pool.priced.Removed()

pendingDiscardCounter.Inc(1)
return
return false
}
// Otherwise discard any previous transaction and mark this
if old != nil {
Expand All @@ -747,7 +748,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, tx.Nonce()+1)

go pool.txFeed.Send(TxPreEvent{tx})
return true
}

// AddLocal enqueues a single transaction into the pool if it is valid, marking
Expand Down Expand Up @@ -907,6 +908,9 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction

// Gather all the accounts potentially needing updates
if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue))
Expand Down Expand Up @@ -939,8 +943,10 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Gather all executable transactions and promote them
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
hash := tx.Hash()
log.Trace("Promoting queued transaction", "hash", hash)
pool.promoteTx(addr, hash, tx)
if pool.promoteTx(addr, hash, tx) {
log.Trace("Promoting queued transaction", "hash", hash)
promoted = append(promoted, tx)
}
}
// Drop all transactions over the allowed limit
if !pool.locals.contains(addr) {
Expand All @@ -957,6 +963,10 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
delete(pool.queue, addr)
}
}
// Notify subsystem for new promoted transactions.
if len(promoted) > 0 {
pool.txFeed.Send(NewTxsEvent{promoted})
}
// If the pending limit is overflown, start equalizing allowances
pending := uint64(0)
for _, list := range pool.pending {
Expand Down
32 changes: 19 additions & 13 deletions core/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,27 @@ func validateTxPoolInternals(pool *TxPool) error {

// validateEvents checks that the correct number of transaction addition events
// were fired on the pool's event feed.
func validateEvents(events chan TxPreEvent, count int) error {
for i := 0; i < count; i++ {
func validateEvents(events chan NewTxsEvent, count int) error {
var received []*types.Transaction

for len(received) < count {
select {
case <-events:
case ev := <-events:
received = append(received, ev.Txs...)
case <-time.After(time.Second):
return fmt.Errorf("event #%d not fired", i)
return fmt.Errorf("event #%d not fired", received)
}
}
if len(received) > count {
return fmt.Errorf("more than %d events fired: %v", count, received[count:])
}
select {
case tx := <-events:
return fmt.Errorf("more than %d events fired: %v", count, tx.Tx)
case ev := <-events:
return fmt.Errorf("more than %d events fired: %v", count, ev.Txs)

case <-time.After(50 * time.Millisecond):
// This branch should be "default", but it's a data race between goroutines,
// reading the event channel and pushng into it, so better wait a bit ensuring
// reading the event channel and pushing into it, so better wait a bit ensuring
// really nothing gets injected.
}
return nil
Expand Down Expand Up @@ -669,7 +675,7 @@ func TestTransactionGapFilling(t *testing.T) {
pool.currentState.AddBalance(account, big.NewInt(1000000))

// Keep track of transaction events to ensure all executables get announced
events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5)
events := make(chan NewTxsEvent, testTxPoolConfig.AccountQueue+5)
sub := pool.txFeed.Subscribe(events)
defer sub.Unsubscribe()

Expand Down Expand Up @@ -920,7 +926,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
pool.currentState.AddBalance(account, big.NewInt(1000000))

// Keep track of transaction events to ensure all executables get announced
events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5)
events := make(chan NewTxsEvent, testTxPoolConfig.AccountQueue+5)
sub := pool.txFeed.Subscribe(events)
defer sub.Unsubscribe()

Expand Down Expand Up @@ -1140,7 +1146,7 @@ func TestTransactionPoolRepricing(t *testing.T) {
defer pool.Stop()

// Keep track of transaction events to ensure all executables get announced
events := make(chan TxPreEvent, 32)
events := make(chan NewTxsEvent, 32)
sub := pool.txFeed.Subscribe(events)
defer sub.Unsubscribe()

Expand Down Expand Up @@ -1327,7 +1333,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
defer pool.Stop()

// Keep track of transaction events to ensure all executables get announced
events := make(chan TxPreEvent, 32)
events := make(chan NewTxsEvent, 32)
sub := pool.txFeed.Subscribe(events)
defer sub.Unsubscribe()

Expand Down Expand Up @@ -1433,7 +1439,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) {
defer pool.Stop()

// Keep track of transaction events to ensure all executables get announced
events := make(chan TxPreEvent, 32)
events := make(chan NewTxsEvent, 32)
sub := pool.txFeed.Subscribe(events)
defer sub.Unsubscribe()

Expand Down Expand Up @@ -1495,7 +1501,7 @@ func TestTransactionReplacement(t *testing.T) {
defer pool.Stop()

// Keep track of transaction events to ensure all executables get announced
events := make(chan TxPreEvent, 32)
events := make(chan NewTxsEvent, 32)
sub := pool.txFeed.Subscribe(events)
defer sub.Unsubscribe()

Expand Down
4 changes: 2 additions & 2 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ func (b *EthAPIBackend) TxPoolContent() (map[common.Address]types.Transactions,
return b.eth.TxPool().Content()
}

func (b *EthAPIBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
return b.eth.TxPool().SubscribeTxPreEvent(ch)
func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.eth.TxPool().SubscribeNewTxsEvent(ch)
}

func (b *EthAPIBackend) Downloader() *downloader.Downloader {
Expand Down
18 changes: 11 additions & 7 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func (api *PublicFilterAPI) timeoutLoop() {
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
var (
pendingTxs = make(chan common.Hash)
pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs)
pendingTxs = make(chan []common.Hash)
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
)

api.filtersMu.Lock()
Expand All @@ -118,7 +118,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
case ph := <-pendingTxs:
api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID]; found {
f.hashes = append(f.hashes, ph)
f.hashes = append(f.hashes, ph...)
}
api.filtersMu.Unlock()
case <-pendingTxSub.Err():
Expand All @@ -144,13 +144,17 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su
rpcSub := notifier.CreateSubscription()

go func() {
txHashes := make(chan common.Hash)
pendingTxSub := api.events.SubscribePendingTxEvents(txHashes)
txHashes := make(chan []common.Hash, 128)
pendingTxSub := api.events.SubscribePendingTxs(txHashes)

for {
select {
case h := <-txHashes:
notifier.Notify(rpcSub.ID, h)
case hashes := <-txHashes:
// To keep the original behaviour, send a single tx hash in one notification.
// TODO(rjl493456442) Send a batch of tx hashes in one notification
for _, h := range hashes {
notifier.Notify(rpcSub.ID, h)
}
case <-rpcSub.Err():
pendingTxSub.Unsubscribe()
return
Expand Down
2 changes: 1 addition & 1 deletion eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Backend interface {
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)

SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
Expand Down
Loading