From 33bf8c0adaeaa963a4c4f1c117269639100e278f Mon Sep 17 00:00:00 2001 From: John Lee Date: Tue, 20 May 2025 16:24:06 -0400 Subject: [PATCH 1/3] Bump buildnumber.dat --- buildnumber.dat | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildnumber.dat b/buildnumber.dat index 573541ac97..d00491fd7e 100644 --- a/buildnumber.dat +++ b/buildnumber.dat @@ -1 +1 @@ -0 +1 From 38810a3cf3a16498ca4ed42fb16bbbe454e2732e Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> Date: Tue, 20 May 2025 15:53:38 -0400 Subject: [PATCH 2/3] network: count received traffic before decompression (#6330) --- network/wsPeer.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/network/wsPeer.go b/network/wsPeer.go index c102ce48f8..9c7c5a7700 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -590,11 +590,6 @@ func (wp *wsPeer) readLoop() { msg.processing = wp.processed msg.Received = time.Now().UnixNano() msg.Data = slurper.Bytes() - msg.Data, err = dataConverter.convert(msg.Tag, msg.Data) - if err != nil { - wp.reportReadErr(err) - return - } msg.Net = wp.net wp.lastPacketTime.Store(msg.Received) if wp.peerType == peerTypeWs { @@ -608,6 +603,11 @@ func (wp *wsPeer) readLoop() { networkP2PReceivedBytesByTag.Add(string(tag[:]), uint64(len(msg.Data)+2)) networkP2PMessageReceivedByTag.Add(string(tag[:]), 1) } + msg.Data, err = dataConverter.convert(msg.Tag, msg.Data) + if err != nil { + wp.reportReadErr(err) + return + } msg.Sender = wp // for outgoing connections, we want to notify the connection monitor that we've received From 26a113c2df169818c5cdfa267fe8bc7f600bd283 Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Tue, 20 May 2025 16:18:44 -0400 Subject: [PATCH 3/3] network: enable vote compression for P2PNetwork (#6331) --- network/p2pNetwork.go | 18 ++++--- network/p2pNetwork_test.go | 103 +++++++++++++++++++++++++++++++++++++ network/wsNetwork_test.go | 13 +++++ 3 files changed, 126 insertions(+), 8 deletions(-) diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index f6408e75ec..051cf2cbae 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -259,6 +259,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo config: cfg, broadcastQueueHighPrio: make(chan broadcastRequest, outgoingMessagesBufferSize), broadcastQueueBulk: make(chan broadcastRequest, 100), + enableVoteCompression: cfg.EnableVoteCompression, } if identityOpts != nil { @@ -916,14 +917,15 @@ func (n *P2PNetwork) baseWsStreamHandler(ctx context.Context, p2pPeer peer.ID, s } peerCore := makePeerCore(ctx, n, n.log, n.handler.readBuffer, addr, client, addr) wsp := &wsPeer{ - wsPeerCore: peerCore, - conn: &wsPeerConnP2P{stream: stream}, - outgoing: !incoming, - identity: netIdentPeerID, - peerType: peerTypeP2P, - TelemetryGUID: pmi.telemetryID, - InstanceName: pmi.instanceName, - features: decodePeerFeatures(pmi.version, pmi.features), + wsPeerCore: peerCore, + conn: &wsPeerConnP2P{stream: stream}, + outgoing: !incoming, + identity: netIdentPeerID, + peerType: peerTypeP2P, + TelemetryGUID: pmi.telemetryID, + InstanceName: pmi.instanceName, + features: decodePeerFeatures(pmi.version, pmi.features), + enableVoteCompression: n.config.EnableVoteCompression, } localAddr, has := n.Address() diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 78bf6b3848..d3a3821d9f 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -1591,3 +1591,106 @@ func TestP2PMetainfoV1vsV22(t *testing.T) { require.False(t, peer.features&pfCompressedProposal != 0) require.False(t, peer.vpackVoteCompressionSupported()) } + +// TestP2PVoteCompression tests vote compression feature in P2P network +func TestP2PVoteCompression(t *testing.T) { + partitiontest.PartitionTest(t) + + type testDef struct { + netAEnableCompression, netBEnableCompression bool + } + + var tests []testDef = []testDef{ + {true, true}, // both nodes with compression enabled + {true, false}, // node A with compression, node B without + {false, true}, // node A without compression, node B with compression + {false, false}, // both nodes with compression disabled + } + + for _, test := range tests { + t.Run(fmt.Sprintf("A_compression_%v+B_compression_%v", test.netAEnableCompression, test.netBEnableCompression), func(t *testing.T) { + cfg := config.GetDefaultLocal() + cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses + cfg.NetAddress = "127.0.0.1:0" + cfg.GossipFanout = 1 + cfg.EnableVoteCompression = test.netAEnableCompression + log := logging.TestingLog(t) + netA, err := NewP2PNetwork(log.With("name", "netA"), cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) + require.NoError(t, err) + err = netA.Start() + require.NoError(t, err) + defer netA.Stop() + + peerInfoA := netA.service.AddrInfo() + addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA) + require.NoError(t, err) + require.NotZero(t, addrsA[0]) + + cfgB := cfg + cfgB.EnableVoteCompression = test.netBEnableCompression + cfgB.NetAddress = "" + multiAddrStr := addrsA[0].String() + phoneBookAddresses := []string{multiAddrStr} + netB, err := NewP2PNetwork(log.With("name", "netB"), cfgB, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) + require.NoError(t, err) + err = netB.Start() + require.NoError(t, err) + defer netB.Stop() + + // ps is empty, so this is a valid vote + vote1 := map[string]any{ + "cred": map[string]any{"pf": algocrypto.VrfProof{1}}, + "r": map[string]any{"rnd": uint64(2), "snd": [32]byte{3}}, + "sig": map[string]any{ + "p": [32]byte{4}, "p1s": [64]byte{5}, "p2": [32]byte{6}, + "p2s": [64]byte{7}, "ps": [64]byte{}, "s": [64]byte{9}, + }, + } + // ps is not empty: vpack compression will fail, but it will still be sent through + vote2 := map[string]any{ + "cred": map[string]any{"pf": algocrypto.VrfProof{10}}, + "r": map[string]any{"rnd": uint64(11), "snd": [32]byte{12}}, + "sig": map[string]any{ + "p": [32]byte{13}, "p1s": [64]byte{14}, "p2": [32]byte{15}, + "p2s": [64]byte{16}, "ps": [64]byte{17}, "s": [64]byte{18}, + }, + } + // Send a totally invalid message to ensure that it goes through. Even though vpack compression + // and decompression will fail, the message should still go through (as an intended fallback). + vote3 := []byte("hello") + messages := [][]byte{protocol.EncodeReflect(vote1), protocol.EncodeReflect(vote2), vote3} + matcher := newMessageMatcher(t, messages) + counterDone := matcher.done + netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: matcher}}) + + // Wait for peers to connect + require.Eventually(t, func() bool { + return len(netA.service.Conns()) > 0 && len(netB.service.Conns()) > 0 + }, 2*time.Second, 50*time.Millisecond) + + for _, msg := range messages { + netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil) + } + + select { + case <-counterDone: + case <-time.After(2 * time.Second): + t.Errorf("timeout, count=%d, wanted %d", len(matcher.received), len(messages)) + } + + require.True(t, matcher.Match()) + + // Verify compression feature is correctly reflected in peer properties + // Check peers have the correct compression capability + peers := netA.GetPeers(PeersConnectedIn) + require.Len(t, peers, 1) + peer := peers[0].(*wsPeer) + require.Equal(t, test.netBEnableCompression, peer.vpackVoteCompressionSupported()) + + peers = netB.GetPeers(PeersConnectedOut) + require.Len(t, peers, 1) + peer = peers[0].(*wsPeer) + require.Equal(t, test.netAEnableCompression, peer.vpackVoteCompressionSupported()) + }) + } +} diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 218b2687bd..533b3c9e5d 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -569,6 +569,19 @@ func TestWebsocketVoteCompression(t *testing.T) { } require.True(t, matcher.Match()) + + // Verify compression feature is correctly reflected in peer properties + // Check peers have the correct compression capability + peers := netA.GetPeers(PeersConnectedIn) + require.Len(t, peers, 1) + peer := peers[0].(*wsPeer) + require.Equal(t, test.netBEnableCompression, peer.vpackVoteCompressionSupported()) + + peers = netB.GetPeers(PeersConnectedOut) + require.Len(t, peers, 1) + peer = peers[0].(*wsPeer) + require.Equal(t, test.netAEnableCompression, peer.vpackVoteCompressionSupported()) + }) } }