Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,13 +788,20 @@ func TestP2PHTTPHandler(t *testing.T) {
require.ErrorIs(t, err, limitcaller.ErrConnectionQueueingTimeout)
}

// TestP2PRelay checks p2p nodes can properly relay messages:
// netA and netB are started with ForceFetchTransactions so it subscribes to the txn topic,
// both of them are connected and do not relay messages.
// Later, netB is forced to relay messages and netC is started with a listening address set
// so that it relays messages as well.
// The test checks messages from both netB and netC are received by netA.
func TestP2PRelay(t *testing.T) {
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.ForceFetchTransactions = true
log := logging.TestingLog(t)
netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
log.Debugln("Starting netA")
netA, err := NewP2PNetwork(log.With("net", "netA"), cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)

err = netA.Start()
Expand All @@ -809,7 +816,8 @@ func TestP2PRelay(t *testing.T) {
multiAddrStr := addrsA[0].String()
phoneBookAddresses := []string{multiAddrStr}

netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
log.Debugf("Starting netB with phonebook addresses %v", phoneBookAddresses)
netB, err := NewP2PNetwork(log.With("net", "netB"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
err = netB.Start()
require.NoError(t, err)
Expand All @@ -829,8 +837,7 @@ func TestP2PRelay(t *testing.T) {
return netA.hasPeers() && netB.hasPeers()
}, 2*time.Second, 50*time.Millisecond)

makeCounterHandler := func(numExpected int) ([]TaggedMessageProcessor, *atomic.Uint32, chan struct{}) {
var numActual atomic.Uint32
makeCounterHandler := func(numExpected int, counter *atomic.Uint32, msgs *[][]byte) ([]TaggedMessageProcessor, chan struct{}) {
counterDone := make(chan struct{})
counterHandler := []TaggedMessageProcessor{
{
Expand All @@ -840,20 +847,24 @@ func TestP2PRelay(t *testing.T) {
ProcessorHandleFunc
}{
ProcessorValidateFunc(func(msg IncomingMessage) ValidatedMessage {
return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatedMessage: nil}
return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatedMessage: msg.Data}
}),
ProcessorHandleFunc(func(msg ValidatedMessage) OutgoingMessage {
if count := numActual.Add(1); int(count) >= numExpected {
if msgs != nil {
*msgs = append(*msgs, msg.ValidatedMessage.([]byte))
}
if count := counter.Add(1); int(count) >= numExpected {
close(counterDone)
}
return OutgoingMessage{Action: Ignore}
}),
},
},
}
return counterHandler, &numActual, counterDone
return counterHandler, counterDone
}
counterHandler, _, counterDone := makeCounterHandler(1)
var counter atomic.Uint32
counterHandler, counterDone := makeCounterHandler(1, &counter, nil)
netA.RegisterProcessors(counterHandler)

// send 5 messages from both netB to netA
Expand All @@ -869,10 +880,11 @@ func TestP2PRelay(t *testing.T) {
case <-time.After(1 * time.Second):
}

// add netC with listening address set, and enable relaying on netB
// ensure all messages are received by netA
// add a netC with listening address set and enable relaying on netB
// ensure all messages from netB and netC are received by netA
cfg.NetAddress = "127.0.0.1:0"
netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
log.Debugf("Starting netB with phonebook addresses %v", phoneBookAddresses)
netC, err := NewP2PNetwork(log.With("net", "netC"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
err = netC.Start()
require.NoError(t, err)
Expand All @@ -896,28 +908,32 @@ func TestP2PRelay(t *testing.T) {
}, 2*time.Second, 50*time.Millisecond)

const expectedMsgs = 10
counterHandler, count, counterDone := makeCounterHandler(expectedMsgs)
counter.Store(0)
var loggedMsgs [][]byte
counterHandler, counterDone = makeCounterHandler(expectedMsgs, &counter, &loggedMsgs)
netA.ClearProcessors()
netA.RegisterProcessors(counterHandler)

for i := 0; i < expectedMsgs/2; i++ {
err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil)
err := netB.Relay(context.Background(), protocol.TxnTag, []byte{5, 6, 7, byte(i)}, true, nil)
require.NoError(t, err)
err = netC.Relay(context.Background(), protocol.TxnTag, []byte{11, 12, 10 + byte(i), 14}, true, nil)
require.NoError(t, err)
}
// send some duplicate messages, they should be dropped
for i := 0; i < expectedMsgs/2; i++ {
err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil)
err := netB.Relay(context.Background(), protocol.TxnTag, []byte{5, 6, 7, byte(i)}, true, nil)
require.NoError(t, err)
}

select {
case <-counterDone:
case <-time.After(2 * time.Second):
if c := count.Load(); c < expectedMsgs {
case <-time.After(3 * time.Second):
if c := counter.Load(); c < expectedMsgs {
t.Logf("Logged messages: %v", loggedMsgs)
require.Failf(t, "One or more messages failed to reach destination network", "%d > %d", expectedMsgs, c)
} else if c > expectedMsgs {
t.Logf("Logged messages: %v", loggedMsgs)
require.Failf(t, "One or more messages that were expected to be dropped, reached destination network", "%d < %d", expectedMsgs, c)
}
}
Expand Down