Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildnumber.dat
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0
1
18 changes: 10 additions & 8 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo
config: cfg,
broadcastQueueHighPrio: make(chan broadcastRequest, outgoingMessagesBufferSize),
broadcastQueueBulk: make(chan broadcastRequest, 100),
enableVoteCompression: cfg.EnableVoteCompression,
}

if identityOpts != nil {
Expand Down Expand Up @@ -916,14 +917,15 @@ func (n *P2PNetwork) baseWsStreamHandler(ctx context.Context, p2pPeer peer.ID, s
}
peerCore := makePeerCore(ctx, n, n.log, n.handler.readBuffer, addr, client, addr)
wsp := &wsPeer{
wsPeerCore: peerCore,
conn: &wsPeerConnP2P{stream: stream},
outgoing: !incoming,
identity: netIdentPeerID,
peerType: peerTypeP2P,
TelemetryGUID: pmi.telemetryID,
InstanceName: pmi.instanceName,
features: decodePeerFeatures(pmi.version, pmi.features),
wsPeerCore: peerCore,
conn: &wsPeerConnP2P{stream: stream},
outgoing: !incoming,
identity: netIdentPeerID,
peerType: peerTypeP2P,
TelemetryGUID: pmi.telemetryID,
InstanceName: pmi.instanceName,
features: decodePeerFeatures(pmi.version, pmi.features),
enableVoteCompression: n.config.EnableVoteCompression,
}

localAddr, has := n.Address()
Expand Down
103 changes: 103 additions & 0 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1591,3 +1591,106 @@ func TestP2PMetainfoV1vsV22(t *testing.T) {
require.False(t, peer.features&pfCompressedProposal != 0)
require.False(t, peer.vpackVoteCompressionSupported())
}

// TestP2PVoteCompression tests vote compression feature in P2P network
func TestP2PVoteCompression(t *testing.T) {
partitiontest.PartitionTest(t)

type testDef struct {
netAEnableCompression, netBEnableCompression bool
}

var tests []testDef = []testDef{
{true, true}, // both nodes with compression enabled
{true, false}, // node A with compression, node B without
{false, true}, // node A without compression, node B with compression
{false, false}, // both nodes with compression disabled
}

for _, test := range tests {
t.Run(fmt.Sprintf("A_compression_%v+B_compression_%v", test.netAEnableCompression, test.netBEnableCompression), func(t *testing.T) {
cfg := config.GetDefaultLocal()
cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses
cfg.NetAddress = "127.0.0.1:0"
cfg.GossipFanout = 1
cfg.EnableVoteCompression = test.netAEnableCompression
log := logging.TestingLog(t)
netA, err := NewP2PNetwork(log.With("name", "netA"), cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)
err = netA.Start()
require.NoError(t, err)
defer netA.Stop()

peerInfoA := netA.service.AddrInfo()
addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA)
require.NoError(t, err)
require.NotZero(t, addrsA[0])

cfgB := cfg
cfgB.EnableVoteCompression = test.netBEnableCompression
cfgB.NetAddress = ""
multiAddrStr := addrsA[0].String()
phoneBookAddresses := []string{multiAddrStr}
netB, err := NewP2PNetwork(log.With("name", "netB"), cfgB, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)
err = netB.Start()
require.NoError(t, err)
defer netB.Stop()

// ps is empty, so this is a valid vote
vote1 := map[string]any{
"cred": map[string]any{"pf": algocrypto.VrfProof{1}},
"r": map[string]any{"rnd": uint64(2), "snd": [32]byte{3}},
"sig": map[string]any{
"p": [32]byte{4}, "p1s": [64]byte{5}, "p2": [32]byte{6},
"p2s": [64]byte{7}, "ps": [64]byte{}, "s": [64]byte{9},
},
}
// ps is not empty: vpack compression will fail, but it will still be sent through
vote2 := map[string]any{
"cred": map[string]any{"pf": algocrypto.VrfProof{10}},
"r": map[string]any{"rnd": uint64(11), "snd": [32]byte{12}},
"sig": map[string]any{
"p": [32]byte{13}, "p1s": [64]byte{14}, "p2": [32]byte{15},
"p2s": [64]byte{16}, "ps": [64]byte{17}, "s": [64]byte{18},
},
}
// Send a totally invalid message to ensure that it goes through. Even though vpack compression
// and decompression will fail, the message should still go through (as an intended fallback).
vote3 := []byte("hello")
messages := [][]byte{protocol.EncodeReflect(vote1), protocol.EncodeReflect(vote2), vote3}
matcher := newMessageMatcher(t, messages)
counterDone := matcher.done
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: matcher}})

// Wait for peers to connect
require.Eventually(t, func() bool {
return len(netA.service.Conns()) > 0 && len(netB.service.Conns()) > 0
}, 2*time.Second, 50*time.Millisecond)

for _, msg := range messages {
netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil)
}

select {
case <-counterDone:
case <-time.After(2 * time.Second):
t.Errorf("timeout, count=%d, wanted %d", len(matcher.received), len(messages))
}

require.True(t, matcher.Match())

// Verify compression feature is correctly reflected in peer properties
// Check peers have the correct compression capability
peers := netA.GetPeers(PeersConnectedIn)
require.Len(t, peers, 1)
peer := peers[0].(*wsPeer)
require.Equal(t, test.netBEnableCompression, peer.vpackVoteCompressionSupported())

peers = netB.GetPeers(PeersConnectedOut)
require.Len(t, peers, 1)
peer = peers[0].(*wsPeer)
require.Equal(t, test.netAEnableCompression, peer.vpackVoteCompressionSupported())
})
}
}
13 changes: 13 additions & 0 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,19 @@ func TestWebsocketVoteCompression(t *testing.T) {
}

require.True(t, matcher.Match())

// Verify compression feature is correctly reflected in peer properties
// Check peers have the correct compression capability
peers := netA.GetPeers(PeersConnectedIn)
require.Len(t, peers, 1)
peer := peers[0].(*wsPeer)
require.Equal(t, test.netBEnableCompression, peer.vpackVoteCompressionSupported())

peers = netB.GetPeers(PeersConnectedOut)
require.Len(t, peers, 1)
peer = peers[0].(*wsPeer)
require.Equal(t, test.netAEnableCompression, peer.vpackVoteCompressionSupported())

})
}
}
Expand Down
10 changes: 5 additions & 5 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,11 +590,6 @@
msg.processing = wp.processed
msg.Received = time.Now().UnixNano()
msg.Data = slurper.Bytes()
msg.Data, err = dataConverter.convert(msg.Tag, msg.Data)
if err != nil {
wp.reportReadErr(err)
return
}
msg.Net = wp.net
wp.lastPacketTime.Store(msg.Received)
if wp.peerType == peerTypeWs {
Expand All @@ -608,6 +603,11 @@
networkP2PReceivedBytesByTag.Add(string(tag[:]), uint64(len(msg.Data)+2))
networkP2PMessageReceivedByTag.Add(string(tag[:]), 1)
}
msg.Data, err = dataConverter.convert(msg.Tag, msg.Data)
if err != nil {
wp.reportReadErr(err)
return

Check warning on line 609 in network/wsPeer.go

View check run for this annotation

Codecov / codecov/patch

network/wsPeer.go#L608-L609

Added lines #L608 - L609 were not covered by tests
}
msg.Sender = wp

// for outgoing connections, we want to notify the connection monitor that we've received
Expand Down
Loading