Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions cmd/devp2p/internal/ethtest/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (c *Conn) ReadEth() (any, error) {
var msg any
switch int(code) {
case eth.StatusMsg:
msg = new(eth.StatusPacket69)
msg = new(eth.StatusPacket)
case eth.GetBlockHeadersMsg:
msg = new(eth.GetBlockHeadersPacket)
case eth.BlockHeadersMsg:
Expand All @@ -164,10 +164,6 @@ func (c *Conn) ReadEth() (any, error) {
msg = new(eth.GetBlockBodiesPacket)
case eth.BlockBodiesMsg:
msg = new(eth.BlockBodiesPacket)
case eth.NewBlockMsg:
msg = new(eth.NewBlockPacket)
case eth.NewBlockHashesMsg:
msg = new(eth.NewBlockHashesPacket)
case eth.TransactionsMsg:
msg = new(eth.TransactionsPacket)
case eth.NewPooledTransactionHashesMsg:
Expand Down Expand Up @@ -229,7 +225,7 @@ func (c *Conn) ReadSnap() (any, error) {
}

// dialAndPeer creates a peer connection and runs the handshake.
func (s *Suite) dialAndPeer(status *eth.StatusPacket69) (*Conn, error) {
func (s *Suite) dialAndPeer(status *eth.StatusPacket) (*Conn, error) {
c, err := s.dial()
if err != nil {
return nil, err
Expand All @@ -242,7 +238,7 @@ func (s *Suite) dialAndPeer(status *eth.StatusPacket69) (*Conn, error) {

// peer performs both the protocol handshake and the status message
// exchange with the node in order to peer with it.
func (c *Conn) peer(chain *Chain, status *eth.StatusPacket69) error {
func (c *Conn) peer(chain *Chain, status *eth.StatusPacket) error {
if err := c.handshake(); err != nil {
return fmt.Errorf("handshake failed: %v", err)
}
Expand Down Expand Up @@ -315,7 +311,7 @@ func (c *Conn) negotiateEthProtocol(caps []p2p.Cap) {
}

// statusExchange performs a `Status` message exchange with the given node.
func (c *Conn) statusExchange(chain *Chain, status *eth.StatusPacket69) error {
func (c *Conn) statusExchange(chain *Chain, status *eth.StatusPacket) error {
loop:
for {
code, data, err := c.Read()
Expand All @@ -324,7 +320,7 @@ loop:
}
switch code {
case eth.StatusMsg + protoOffset(ethProto):
msg := new(eth.StatusPacket69)
msg := new(eth.StatusPacket)
if err := rlp.DecodeBytes(data, &msg); err != nil {
return fmt.Errorf("error decoding status packet: %w", err)
}
Expand Down Expand Up @@ -363,7 +359,7 @@ loop:
}
if status == nil {
// default status message
status = &eth.StatusPacket69{
status = &eth.StatusPacket{
ProtocolVersion: uint32(c.negotiatedProtoVersion),
NetworkID: chain.config.ChainID.Uint64(),
Genesis: chain.blocks[0].Hash(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (s *Suite) TestGetReceipts(t *utesting.T) {
t.Fatalf("could not write to connection: %v", err)
}
// Wait for response.
resp := new(eth.ReceiptsPacket[*eth.ReceiptList69])
resp := new(eth.ReceiptsPacket)
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
t.Fatalf("error reading block bodies msg: %v", err)
}
Expand Down
89 changes: 25 additions & 64 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package downloader

import (
"fmt"
"math/big"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -261,23 +260,24 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
// peer in the download tester. The returned function can be used to retrieve
// batches of block receipts from the particularly requested peer.
func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) {
blobs := eth.ServiceGetReceiptsQuery68(dlp.chain, hashes)
blobs := eth.ServiceGetReceiptsQuery(dlp.chain, hashes)
receipts := make([]types.Receipts, blobs.Len())

receipts := make([]types.Receipts, len(blobs))
for i, blob := range blobs {
rlp.DecodeBytes(blob, &receipts[i])
}
// compute hashes
hashes = make([]common.Hash, blobs.Len())
hasher := trie.NewStackTrie(nil)
hashes = make([]common.Hash, len(receipts))
for i, receipt := range receipts {
hashes[i] = types.DeriveSha(receipt, hasher)
receiptLists, err := blobs.Items()
if err != nil {
panic(err)
}
req := &eth.Request{
Peer: dlp.id,
for i, rl := range receiptLists {
hashes[i] = types.DeriveSha(rl.Derivable(), hasher)
}

// deliver the response right away
resp := eth.ReceiptsRLPResponse(types.EncodeBlockReceiptLists(receipts))
res := &eth.Response{
Req: req,
Req: &eth.Request{Peer: dlp.id},
Res: &resp,
Meta: hashes,
Time: 1,
Expand All @@ -286,7 +286,7 @@ func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan *
go func() {
sink <- res
}()
return req, nil
return res.Req, nil
}

// ID retrieves the peer's unique identifier.
Expand Down Expand Up @@ -398,8 +398,8 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
}
}

func TestCanonicalSynchronisation68Full(t *testing.T) { testCanonSync(t, eth.ETH68, FullSync) }
func TestCanonicalSynchronisation68Snap(t *testing.T) { testCanonSync(t, eth.ETH68, SnapSync) }
func TestCanonicalSynchronisationFull(t *testing.T) { testCanonSync(t, eth.ETH69, FullSync) }
func TestCanonicalSynchronisationSnap(t *testing.T) { testCanonSync(t, eth.ETH69, SnapSync) }

func testCanonSync(t *testing.T, protocol uint, mode SyncMode) {
success := make(chan struct{})
Expand All @@ -426,8 +426,8 @@ func testCanonSync(t *testing.T, protocol uint, mode SyncMode) {

// Tests that if a large batch of blocks are being downloaded, it is throttled
// until the cached blocks are retrieved.
func TestThrottling68Full(t *testing.T) { testThrottling(t, eth.ETH68, FullSync) }
func TestThrottling68Snap(t *testing.T) { testThrottling(t, eth.ETH68, SnapSync) }
func TestThrottlingFull(t *testing.T) { testThrottling(t, eth.ETH69, FullSync) }
func TestThrottlingSnap(t *testing.T) { testThrottling(t, eth.ETH69, SnapSync) }

func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
tester := newTester(t, mode)
Expand Down Expand Up @@ -504,8 +504,8 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
}

// Tests that a canceled download wipes all previously accumulated state.
func TestCancel68Full(t *testing.T) { testCancel(t, eth.ETH68, FullSync) }
func TestCancel68Snap(t *testing.T) { testCancel(t, eth.ETH68, SnapSync) }
func TestCancelFull(t *testing.T) { testCancel(t, eth.ETH69, FullSync) }
func TestCancelSnap(t *testing.T) { testCancel(t, eth.ETH69, SnapSync) }

func testCancel(t *testing.T, protocol uint, mode SyncMode) {
complete := make(chan struct{})
Expand Down Expand Up @@ -534,49 +534,10 @@ func testCancel(t *testing.T, protocol uint, mode SyncMode) {
}
}

// Tests that synchronisations behave well in multi-version protocol environments
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this because the purpose of this test was checking how sync behaves with peers which have different protocol versions. However, the test was only using one peer, and it was now labeled as "peer 68" but used eth/69. Best to just remove it.

// and not wreak havoc on other nodes in the network.
func TestMultiProtoSynchronisation68Full(t *testing.T) { testMultiProtoSync(t, eth.ETH68, FullSync) }
func TestMultiProtoSynchronisation68Snap(t *testing.T) { testMultiProtoSync(t, eth.ETH68, SnapSync) }

func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) {
complete := make(chan struct{})
success := func() {
close(complete)
}
tester := newTesterWithNotification(t, mode, success)
defer tester.terminate()

// Create a small enough block chain to download
chain := testChainBase.shorten(blockCacheMaxItems - 15)

// Create peers of every type
tester.newPeer("peer 68", eth.ETH68, chain.blocks[1:])

if err := tester.downloader.BeaconSync(chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil {
t.Fatalf("failed to start beacon sync: %v", err)
}
select {
case <-complete:
break
case <-time.NewTimer(time.Second * 3).C:
t.Fatalf("Failed to sync chain in three seconds")
}
assertOwnChain(t, tester, len(chain.blocks))

// Check that no peers have been dropped off
for _, version := range []int{68} {
peer := fmt.Sprintf("peer %d", version)
if _, ok := tester.peers[peer]; !ok {
t.Errorf("%s dropped", peer)
}
}
}

// Tests that if a block is empty (e.g. header only), no body request should be
// made, and instead the header should be assembled into a whole block in itself.
func TestEmptyShortCircuit68Full(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, FullSync) }
func TestEmptyShortCircuit68Snap(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, SnapSync) }
func TestEmptyShortCircuitFull(t *testing.T) { testEmptyShortCircuit(t, eth.ETH69, FullSync) }
func TestEmptyShortCircuitSnap(t *testing.T) { testEmptyShortCircuit(t, eth.ETH69, SnapSync) }

func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {
success := make(chan struct{})
Expand Down Expand Up @@ -644,8 +605,8 @@ func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.Sync

// Tests that peers below a pre-configured checkpoint block are prevented from
// being fast-synced from, avoiding potential cheap eclipse attacks.
func TestBeaconSync68Full(t *testing.T) { testBeaconSync(t, eth.ETH68, FullSync) }
func TestBeaconSync68Snap(t *testing.T) { testBeaconSync(t, eth.ETH68, SnapSync) }
func TestBeaconSyncFull(t *testing.T) { testBeaconSync(t, eth.ETH69, FullSync) }
func TestBeaconSyncSnap(t *testing.T) { testBeaconSync(t, eth.ETH69, SnapSync) }

func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) {
var cases = []struct {
Expand Down Expand Up @@ -690,8 +651,8 @@ func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) {

// Tests that synchronisation progress (origin block number, current block number
// and highest block number) is tracked and updated correctly.
func TestSyncProgress68Full(t *testing.T) { testSyncProgress(t, eth.ETH68, FullSync) }
func TestSyncProgress68Snap(t *testing.T) { testSyncProgress(t, eth.ETH68, SnapSync) }
func TestSyncProgressFull(t *testing.T) { testSyncProgress(t, eth.ETH69, FullSync) }
func TestSyncProgressSnap(t *testing.T) { testSyncProgress(t, eth.ETH69, SnapSync) }

func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
success := make(chan struct{})
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/skeleton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
// Create a peer set to feed headers through
peerset := newPeerSet()
for _, peer := range tt.peers {
peerset.Register(newPeerConnection(peer.id, eth.ETH68, peer, log.New("id", peer.id)))
peerset.Register(newPeerConnection(peer.id, eth.ETH69, peer, log.New("id", peer.id)))
}
// Create a peer dropper to track malicious peers
dropped := make(map[string]int)
Expand Down Expand Up @@ -912,7 +912,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
// Apply the post-init events if there's any
endpeers := tt.peers
if tt.newPeer != nil {
if err := peerset.Register(newPeerConnection(tt.newPeer.id, eth.ETH68, tt.newPeer, log.New("id", tt.newPeer.id))); err != nil {
if err := peerset.Register(newPeerConnection(tt.newPeer.id, eth.ETH69, tt.newPeer, log.New("id", tt.newPeer.id))); err != nil {
t.Errorf("test %d: failed to register new peer: %v", i, err)
}
time.Sleep(time.Millisecond * 50) // given time for peer registration
Expand Down
45 changes: 18 additions & 27 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ import (
// testEthHandler is a mock event handler to listen for inbound network requests
// on the `eth` protocol and convert them into a more easily testable form.
type testEthHandler struct {
blockBroadcasts event.Feed
txAnnounces event.Feed
txBroadcasts event.Feed
txAnnounces event.Feed
txBroadcasts event.Feed
}

func (h *testEthHandler) Chain() *core.BlockChain { panic("no backing chain") }
Expand All @@ -51,10 +50,6 @@ func (h *testEthHandler) PeerInfo(enode.ID) interface{} { panic("not used

func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
switch packet := packet.(type) {
case *eth.NewBlockPacket:
h.blockBroadcasts.Send(packet.Block)
return nil

case *eth.NewPooledTransactionHashesPacket:
h.txAnnounces.Send(packet.Hashes)
return nil
Expand Down Expand Up @@ -82,7 +77,7 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {

// Tests that peers are correctly accepted (or rejected) based on the advertised
// fork IDs in the protocol handshake.
func TestForkIDSplit68(t *testing.T) { testForkIDSplit(t, eth.ETH68) }
func TestForkIDSplit69(t *testing.T) { testForkIDSplit(t, eth.ETH69) }

func testForkIDSplit(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -234,7 +229,7 @@ func testForkIDSplit(t *testing.T, protocol uint) {
}

// Tests that received transactions are added to the local pool.
func TestRecvTransactions68(t *testing.T) { testRecvTransactions(t, eth.ETH68) }
func TestRecvTransactions69(t *testing.T) { testRecvTransactions(t, eth.ETH69) }

func testRecvTransactions(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -263,7 +258,8 @@ func testRecvTransactions(t *testing.T, protocol uint) {
return eth.Handle((*ethHandler)(handler.handler), peer)
})
// Run the handshake locally to avoid spinning up a source handler
if err := src.Handshake(1, handler.chain, eth.BlockRangeUpdatePacket{}); err != nil {
head := handler.chain.CurrentBlock()
if err := src.Handshake(1, handler.chain, eth.BlockRangeUpdatePacket{EarliestBlock: 0, LatestBlock: head.Number.Uint64(), LatestBlockHash: head.Hash()}); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// Send the transaction to the sink and verify that it's added to the tx pool
Expand All @@ -286,7 +282,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
}

// This test checks that pending transactions are sent.
func TestSendTransactions68(t *testing.T) { testSendTransactions(t, eth.ETH68) }
func TestSendTransactions69(t *testing.T) { testSendTransactions(t, eth.ETH69) }

func testSendTransactions(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -318,7 +314,8 @@ func testSendTransactions(t *testing.T, protocol uint) {
return eth.Handle((*ethHandler)(handler.handler), peer)
})
// Run the handshake locally to avoid spinning up a source handler
if err := sink.Handshake(1, handler.chain, eth.BlockRangeUpdatePacket{}); err != nil {
head := handler.chain.CurrentBlock()
if err := sink.Handshake(1, handler.chain, eth.BlockRangeUpdatePacket{EarliestBlock: 0, LatestBlock: head.Number.Uint64(), LatestBlockHash: head.Hash()}); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// After the handshake completes, the source handler should stream the sink
Expand All @@ -338,22 +335,16 @@ func testSendTransactions(t *testing.T, protocol uint) {
// Make sure we get all the transactions on the correct channels
seen := make(map[common.Hash]struct{})
for len(seen) < len(insert) {
switch protocol {
case 68:
select {
case hashes := <-anns:
for _, hash := range hashes {
if _, ok := seen[hash]; ok {
t.Errorf("duplicate transaction announced: %x", hash)
}
seen[hash] = struct{}{}
select {
case hashes := <-anns:
for _, hash := range hashes {
if _, ok := seen[hash]; ok {
t.Errorf("duplicate transaction announced: %x", hash)
}
case <-bcasts:
t.Errorf("initial tx broadcast received on post eth/66")
seen[hash] = struct{}{}
}

default:
panic("unsupported protocol, please extend test")
case <-bcasts:
t.Errorf("initial tx broadcast received on post eth/66")
}
}
for _, tx := range insert {
Expand All @@ -365,7 +356,7 @@ func testSendTransactions(t *testing.T, protocol uint) {

// Tests that transactions get propagated to all attached peers, either via direct
// broadcasts or via announcements/retrievals.
func TestTransactionPropagation68(t *testing.T) { testTransactionPropagation(t, eth.ETH68) }
func TestTransactionPropagation69(t *testing.T) { testTransactionPropagation(t, eth.ETH69) }

func testTransactionPropagation(t *testing.T, protocol uint) {
t.Parallel()
Expand Down
Loading