From 4382aa3ec2561e7620ef12f99dcd5d9a44589ce4 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 25 Sep 2024 16:15:25 -0400 Subject: [PATCH 1/6] network: add algod_peer_p2p_broadcast_dropped_total metric --- network/metrics.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/network/metrics.go b/network/metrics.go index a1e92b2424..eabc94f7df 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -90,6 +90,8 @@ var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "a var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"}) var networkPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_broadcast_dropped_total", Description: "number of broadcast messages not sent to some peer"}) +var networkP2PPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_p2p_broadcast_dropped_total", Description: "number of broadcast messages not sent to some p2p peer"}) + var networkPeerIdentityDisconnect = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_duplicate", Description: "number of times identity challenge cause us to disconnect a peer"}) var networkPeerIdentityError = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_error", Description: "number of times an error occurs (besides expected) when processing identity challenges"}) var networkPeerAlreadyClosed = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_peer_already_closed", Description: "number of times a peer would be added but the peer connection is already closed"}) @@ -197,7 +199,9 @@ func (t pubsubMetricsTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) { } // DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full. -func (t pubsubMetricsTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {} +func (t pubsubMetricsTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) { + networkP2PPeerBroadcastDropped.Inc(nil) +} // UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and // the pressure release mechanism trigger, dropping messages. From 50f78ed5e8436245a5f775daa716c93b4647c61c Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 25 Sep 2024 17:45:06 -0400 Subject: [PATCH 2/6] fix misplaced recvRPC metric --- network/metrics.go | 21 +++++++++++++-------- network/metrics_test.go | 2 +- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/network/metrics.go b/network/metrics.go index eabc94f7df..fe62434095 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -111,6 +111,8 @@ var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics. var networkP2PGossipSubSentBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_sent_bytes_total", Description: "Total number of bytes sent through gossipsub"}) var networkP2PGossipSubReceivedBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_received_bytes_total", Description: "Total number of bytes received through gossipsub"}) +// var networkP2PGossipSubSentMsgs = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_message_sent", Description: "Number of complete messages that were sent to the network through gossipsub"}) + var _ = pubsub.RawTracer(pubsubMetricsTracer{}) // pubsubMetricsTracer is a tracer for pubsub events used to track metrics. @@ -136,14 +138,6 @@ func (t pubsubMetricsTracer) Prune(p peer.ID, topic string) {} // ValidateMessage is invoked when a message first enters the validation pipeline. func (t pubsubMetricsTracer) ValidateMessage(msg *pubsub.Message) { - if msg != nil && msg.Topic != nil { - switch *msg.Topic { - case p2p.TXTopicName: - networkP2PReceivedBytesTotal.AddUint64(uint64(len(msg.Data)), nil) - networkP2PReceivedBytesByTag.Add(string(protocol.TxnTag), uint64(len(msg.Data))) - networkP2PMessageReceivedByTag.Add(string(protocol.TxnTag), 1) - } - } } // DeliverMessage is invoked when a message is delivered @@ -180,6 +174,17 @@ func (t pubsubMetricsTracer) ThrottlePeer(p peer.ID) {} // RecvRPC is invoked when an incoming RPC is received. func (t pubsubMetricsTracer) RecvRPC(rpc *pubsub.RPC) { + for i := range rpc.GetPublish() { + if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil { + switch *rpc.Publish[i].Topic { + case p2p.TXTopicName: + networkP2PReceivedBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil) + networkP2PReceivedBytesByTag.Add(string(protocol.TxnTag), uint64(len(rpc.Publish[i].Data))) + networkP2PMessageReceivedByTag.Add(string(protocol.TxnTag), 1) + } + } + } + // service gossipsub traffic = networkP2PGossipSubReceivedBytesTotal - networkP2PReceivedBytesByTag_TX networkP2PGossipSubReceivedBytesTotal.AddUint64(uint64(rpc.Size()), nil) } diff --git a/network/metrics_test.go b/network/metrics_test.go index 857ab57051..ea0448ebc6 100644 --- a/network/metrics_test.go +++ b/network/metrics_test.go @@ -55,7 +55,7 @@ func TestMetrics_PubsubTracer_TagList(t *testing.T) { return true }) } - if stmt.Name.Name == "ValidateMessage" { + if stmt.Name.Name == "RecvRPC" { ast.Inspect(stmt.Body, func(n ast.Node) bool { if switchStmt, ok := n.(*ast.SwitchStmt); ok { for _, stmt := range switchStmt.Body.List { From 8fec6f8872bfd7c5ebad08e7a8af767bc02332c8 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 26 Sep 2024 18:36:52 -0400 Subject: [PATCH 3/6] fix metrics diff plotter --- test/heapwatch/metrics_lib.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/heapwatch/metrics_lib.py b/test/heapwatch/metrics_lib.py index 1ac0d85bd7..e6fafaeebb 100644 --- a/test/heapwatch/metrics_lib.py +++ b/test/heapwatch/metrics_lib.py @@ -267,8 +267,8 @@ def parse_metrics( if diff and metrics_names and len(metrics_names) == 2 and len(out) == 2: m = list(out.keys()) name = f'{m[0]}_-_{m[1]}' - metric = Metric(name, MetricType.GAUGE, out[m[0]].value - out[m[1]].value) - out = [{name: metric}] + metric = Metric(name, MetricType.GAUGE, out[m[0]][0].value - out[m[1]][0].value) + out = {name: [metric]} return out From 434890a5e481f7d5c1947ef833a58ccfe8a42d0b Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> Date: Tue, 22 Oct 2024 14:08:13 -0400 Subject: [PATCH 4/6] Update network/metrics.go --- network/metrics.go | 1 - 1 file changed, 1 deletion(-) diff --git a/network/metrics.go b/network/metrics.go index fe62434095..69b2786d41 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -111,7 +111,6 @@ var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics. var networkP2PGossipSubSentBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_sent_bytes_total", Description: "Total number of bytes sent through gossipsub"}) var networkP2PGossipSubReceivedBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_received_bytes_total", Description: "Total number of bytes received through gossipsub"}) -// var networkP2PGossipSubSentMsgs = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_message_sent", Description: "Number of complete messages that were sent to the network through gossipsub"}) var _ = pubsub.RawTracer(pubsubMetricsTracer{}) From ffd9cb2c4d7b26f4b50b6325522727a173103346 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 3 Oct 2024 15:34:13 -0400 Subject: [PATCH 5/6] Subscribe(pubsub.WithBufferSize(32768)) --- network/p2p/pubsub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go index a592657010..3dfe08301f 100644 --- a/network/p2p/pubsub.go +++ b/network/p2p/pubsub.go @@ -153,7 +153,7 @@ func (s *serviceImpl) Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCa return nil, err } // t.SetScoreParams() // already set in makePubSub - return t.Subscribe() + return t.Subscribe(pubsub.WithBufferSize(32768)) } // Publish publishes data to the given topic From 6b87e047b0476c0eaf9fb60d363fb2b6e8bfe106 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 22 Oct 2024 14:58:05 -0400 Subject: [PATCH 6/6] fix linter --- config/config_test.go | 5 +++-- network/metrics.go | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index 936622b06c..2ad8e32d61 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -773,7 +773,8 @@ func TestLocal_ValidateP2PHybridConfig(t *testing.T) { for i, test := range tests { test := test - t.Run(fmt.Sprintf("test=%d", i), func(t *testing.T) { + name := fmt.Sprintf("test=%d", i) + t.Run(name, func(t *testing.T) { t.Parallel() c := Local{ @@ -782,7 +783,7 @@ func TestLocal_ValidateP2PHybridConfig(t *testing.T) { NetAddress: test.netAddress, } err := c.ValidateP2PHybridConfig() - require.Equal(t, test.err, err != nil, "test=%d", i) + require.Equal(t, test.err, err != nil, name) }) } } diff --git a/network/metrics.go b/network/metrics.go index 69b2786d41..781eb00543 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -111,7 +111,6 @@ var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics. var networkP2PGossipSubSentBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_sent_bytes_total", Description: "Total number of bytes sent through gossipsub"}) var networkP2PGossipSubReceivedBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_received_bytes_total", Description: "Total number of bytes received through gossipsub"}) - var _ = pubsub.RawTracer(pubsubMetricsTracer{}) // pubsubMetricsTracer is a tracer for pubsub events used to track metrics.