Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 91 additions & 28 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"time"

mapset "github.com/deckarep/golang-set"
lru "github.com/hashicorp/golang-lru"

"github.com/morph-l2/go-ethereum/common"
"github.com/morph-l2/go-ethereum/common/mclock"
"github.com/morph-l2/go-ethereum/core"
"github.com/morph-l2/go-ethereum/core/types"
"github.com/morph-l2/go-ethereum/event"
"github.com/morph-l2/go-ethereum/log"
"github.com/morph-l2/go-ethereum/metrics"
)
Expand All @@ -54,6 +56,10 @@ const (
// re-request them.
maxTxUnderpricedSetSize = 32768

// txOnChainCacheLimit is the number of recently confirmed transactions to
// keep around for announce pre-filtering.
txOnChainCacheLimit = 32768

// txArriveTimeout is the time allowance before an announced transaction is
// explicitly requested.
txArriveTimeout = 500 * time.Millisecond
Expand All @@ -72,6 +78,7 @@ var (
var (
txAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/in", nil)
txAnnounceKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/known", nil)
txAnnounceOnchainMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/onchain", nil)
txAnnounceUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/underpriced", nil)
txAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/dos", nil)

Expand Down Expand Up @@ -126,6 +133,10 @@ type txDrop struct {
peer string
}

type txFetcherChain interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
}

// TxFetcher is responsible for retrieving new transaction based on announcements.
//
// The fetcher operates in 3 stages:
Expand Down Expand Up @@ -174,41 +185,65 @@ type TxFetcher struct {
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer

step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Time wrapper to simulate in tests
rand *mrand.Rand // Randomizer to use in tests instead of map range loops (soft-random)
step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Time wrapper to simulate in tests
rand *mrand.Rand // Randomizer to use in tests instead of map range loops (soft-random)
chain txFetcherChain // Blockchain used to track recently confirmed txs

txOnChainCache *lru.Cache // Cache of recently confirmed tx hashes
headEventCh chan core.ChainEvent
headSub event.Subscription
}

// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
return newTxFetcher(nil, hasTx, addTxs, fetchTxs, mclock.System{}, nil)
}

// NewTxFetcherWithChain creates a transaction fetcher that also tracks recently
// confirmed transactions from canonical chain events.
func NewTxFetcherWithChain(chain *core.BlockChain, hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
return newTxFetcher(chain, hasTx, addTxs, fetchTxs, mclock.System{}, nil)
}

// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
func NewTxFetcherForTests(
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
return newTxFetcher(nil, hasTx, addTxs, fetchTxs, clock, rand)
}

// newTxFetcher wires up the fetcher with an optional chain event source.
func newTxFetcher(
chain txFetcherChain, hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
txOnChainCache, err := lru.New(txOnChainCacheLimit)
if err != nil {
panic(err)
}
return &TxFetcher{
notify: make(chan *txAnnounce),
cleanup: make(chan *txDelivery),
drop: make(chan *txDrop),
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]struct{}),
announces: make(map[string]map[common.Hash]struct{}),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: mapset.NewSet(),
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
clock: clock,
rand: rand,
notify: make(chan *txAnnounce),
cleanup: make(chan *txDelivery),
drop: make(chan *txDrop),
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]struct{}),
announces: make(map[string]map[common.Hash]struct{}),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: mapset.NewSet(),
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
clock: clock,
rand: rand,
chain: chain,
txOnChainCache: txOnChainCache,
}
}

Expand All @@ -219,19 +254,23 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
txAnnounceInMeter.Mark(int64(len(hashes)))

// Skip any transaction announcements that we already know of, or that we've
// previously marked as cheap and discarded. This check is of course racey,
// because multiple concurrent notifies will still manage to pass it, but it's
// still valuable to check here because it runs concurrent to the internal
// loop, so anything caught here is time saved internally.
// seen recently land on chain, or that we've previously marked as cheap and
// discarded. This check is of course racey, because multiple concurrent
// notifies will still manage to pass it, but it's still valuable to check
// here because it runs concurrent to the internal loop, so anything caught
// here is time saved internally.
var (
unknowns = make([]common.Hash, 0, len(hashes))
duplicate, underpriced int64
unknowns = make([]common.Hash, 0, len(hashes))
duplicate, onchain, underpriced int64
)
for _, hash := range hashes {
switch {
case f.hasTx(hash):
duplicate++

case f.txOnChainCache.Contains(hash):
onchain++

case f.underpriced.Contains(hash):
underpriced++

Expand All @@ -240,6 +279,7 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
}
}
txAnnounceKnownMeter.Mark(duplicate)
txAnnounceOnchainMeter.Mark(onchain)
txAnnounceUnderpricedMeter.Mark(underpriced)

// If anything's left to announce, push it into the internal loop
Expand Down Expand Up @@ -352,6 +392,10 @@ func (f *TxFetcher) Drop(peer string) error {
// Start boots up the announcement based synchroniser, accepting and processing
// hash notifications and block fetches until termination requested.
func (f *TxFetcher) Start() {
if f.chain != nil && f.headEventCh == nil {
f.headEventCh = make(chan core.ChainEvent, 10)
f.headSub = f.chain.SubscribeChainEvent(f.headEventCh)
}
go f.loop()
}

Expand All @@ -362,15 +406,34 @@ func (f *TxFetcher) Stop() {
}

func (f *TxFetcher) loop() {
if f.headSub != nil {
defer f.headSub.Unsubscribe()
}
var (
waitTimer = new(mclock.Timer)
timeoutTimer = new(mclock.Timer)

waitTrigger = make(chan struct{}, 1)
timeoutTrigger = make(chan struct{}, 1)
oldHead common.Hash
haveOldHead bool
)
for {
select {
case ev := <-f.headEventCh:
if ev.Block == nil {
break
}
if haveOldHead && ev.Block.ParentHash() != oldHead {
f.txOnChainCache.Purge()
}
oldHead = ev.Block.Hash()
haveOldHead = true

for _, tx := range ev.Block.Transactions() {
f.txOnChainCache.Add(tx.Hash(), struct{}{})
}

case ann := <-f.notify:
// Drop part of the new announcements if there are too many accumulated.
// Note, we could but do not filter already known transactions here as
Expand Down
136 changes: 136 additions & 0 deletions eth/fetcher/tx_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/morph-l2/go-ethereum/common/mclock"
"github.com/morph-l2/go-ethereum/core"
"github.com/morph-l2/go-ethereum/core/types"
"github.com/morph-l2/go-ethereum/event"
"github.com/morph-l2/go-ethereum/trie"
)

var (
Expand Down Expand Up @@ -54,6 +56,10 @@ type doWait struct {
time time.Duration
step bool
}
type doChainEvent struct {
chain *testTxFetcherChain
block *types.Block
}
type doDrop string
type doFunc func()

Expand All @@ -63,8 +69,28 @@ type isScheduled struct {
fetching map[string][]common.Hash
dangling map[string][]common.Hash
}
type isOnChain []common.Hash
type isUnderpriced int

type testTxFetcherChain struct {
feed event.Feed
}

func (c *testTxFetcherChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return c.feed.Subscribe(ch)
}

func (c *testTxFetcherChain) sendBlock(block *types.Block) {
c.feed.Send(core.ChainEvent{Block: block, Hash: block.Hash()})
}

func newTestChainBlock(number int64, parent common.Hash, txs []*types.Transaction) *types.Block {
return types.NewBlock(&types.Header{
Number: big.NewInt(number),
ParentHash: parent,
}, txs, nil, nil, trie.NewStackTrie(nil))
}

// txFetcherTest represents a test scenario that can be executed by the test
// runner.
type txFetcherTest struct {
Expand Down Expand Up @@ -584,6 +610,105 @@ func TestTransactionFetcherBroadcasts(t *testing.T) {
})
}

// TestTransactionFetcherOnchainFiltered verifies that chain events populate the
// on-chain cache and already confirmed transaction announcements get filtered
// before ever reaching the waitlist.
func TestTransactionFetcherOnchainFiltered(t *testing.T) {
chain := new(testTxFetcherChain)
block := newTestChainBlock(1, common.Hash{}, []*types.Transaction{testTxs[1]})

testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return newTxFetcher(
chain,
func(hash common.Hash) bool { return hash == testTxsHashes[0] },
nil,
func(string, []common.Hash) error { return nil },
mclock.System{}, nil,
)
},
steps: []interface{}{
doChainEvent{chain: chain, block: block},
isOnChain{testTxsHashes[1]},

// Announce a brand new hash, a txpool-known hash, and an on-chain hash.
// Only the new hash may land in the waitlist.
doTxNotify{
peer: "A",
hashes: []common.Hash{{0x01}, testTxsHashes[0], testTxsHashes[1]},
},
isWaiting(map[string][]common.Hash{
"A": {{0x01}},
}),
isScheduled{tracking: nil, fetching: nil},

// Driving the wait timer must not resurrect the filtered
// hashes: they were never tracked, so the scheduler should
// only promote the single new hash.
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]common.Hash{
"A": {{0x01}},
},
fetching: map[string][]common.Hash{
"A": {{0x01}},
},
},
},
})
}

// TestTransactionFetcherOnchainCacheReorg verifies that a non-contiguous chain
// event flushes stale on-chain entries so reorged-out transactions can be
// fetched again.
func TestTransactionFetcherOnchainCacheReorg(t *testing.T) {
chain := new(testTxFetcherChain)
oldHead := newTestChainBlock(1, common.Hash{}, []*types.Transaction{testTxs[0]})
newHead := newTestChainBlock(2, common.Hash{0xff}, []*types.Transaction{testTxs[1]})

testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return newTxFetcher(
chain,
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
mclock.System{}, nil,
)
},
steps: []interface{}{
doChainEvent{chain: chain, block: oldHead},
isOnChain{testTxsHashes[0]},

doChainEvent{chain: chain, block: newHead},
isOnChain{testTxsHashes[1]},

// The old on-chain hash must be forgotten after the reorg, while the new
// canonical one is still filtered out.
doTxNotify{
peer: "A",
hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]},
},
isWaiting(map[string][]common.Hash{
"A": {testTxsHashes[0]},
}),
isScheduled{tracking: nil, fetching: nil},

doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]common.Hash{
"A": {testTxsHashes[0]},
},
fetching: map[string][]common.Hash{
"A": {testTxsHashes[0]},
},
},
},
})
}

// Tests that the waiting list timers properly reset and reschedule.
func TestTransactionFetcherWaitTimerResets(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
Expand Down Expand Up @@ -1289,6 +1414,10 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
<-wait // Fetcher supposed to do something, wait until it's done
}

case doChainEvent:
step.chain.sendBlock(step.block)
<-wait // Fetcher needs to process the chain event, wait until it's done

case doDrop:
if err := fetcher.Drop(string(step)); err != nil {
t.Errorf("step %d: %v", i, err)
Expand All @@ -1298,6 +1427,13 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
case doFunc:
step()

case isOnChain:
for _, hash := range step {
if !fetcher.txOnChainCache.Contains(hash) {
t.Errorf("step %d: hash %x missing from txOnChainCache", i, hash)
}
}

case isWaiting:
// We need to check that the waiting list (stage 1) internals
// match with the expected set. Check the peer->hash mappings
Expand Down
Loading
Loading