From 1207666a682f830ac981505ad836e5135c7e9b42 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 20 Jun 2024 15:54:52 -0400 Subject: [PATCH 01/13] CR comments to RateLimitingTransport --- network/limitcaller/rateLimitingTransport.go | 6 +++--- network/p2p/http.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/network/limitcaller/rateLimitingTransport.go b/network/limitcaller/rateLimitingTransport.go index 5b2e4e67f0..a00f930622 100644 --- a/network/limitcaller/rateLimitingTransport.go +++ b/network/limitcaller/rateLimitingTransport.go @@ -64,9 +64,9 @@ func MakeRateLimitingTransport(phonebook ConnectionTimeStore, queueingTimeout ti } } -// MakeRateLimitingTransportWithTransport creates a rate limiting http transport that would limit the requests rate +// MakeRateLimitingTransportWithRoundTripper creates a rate limiting http transport that would limit the requests rate // according to the entries in the phonebook. -func MakeRateLimitingTransportWithTransport(phonebook ConnectionTimeStore, queueingTimeout time.Duration, rt http.RoundTripper, target interface{}, maxIdleConnsPerHost int) RateLimitingTransport { +func MakeRateLimitingTransportWithRoundTripper(phonebook ConnectionTimeStore, queueingTimeout time.Duration, rt http.RoundTripper, target interface{}, maxIdleConnsPerHost int) RateLimitingTransport { return RateLimitingTransport{ phonebook: phonebook, innerTransport: rt, @@ -82,8 +82,8 @@ func (r *RateLimitingTransport) RoundTrip(req *http.Request) (res *http.Response var provisionalTime time.Time queueingDeadline := time.Now().Add(r.queueingTimeout) var host interface{} = req.Host + // p2p/http clients have per-connection transport and address info so use that if len(req.Host) == 0 && req.URL != nil && len(req.URL.Host) == 0 { - // p2p/http clients have per-connection transport and address info so use that host = r.targetAddr } for { diff --git a/network/p2p/http.go b/network/p2p/http.go index eea40c127e..9f2622d015 100644 --- a/network/p2p/http.go +++ b/network/p2p/http.go @@ -85,7 +85,7 @@ func MakeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, pstore limitcaller.Con if err != nil { return nil, err } - rlrt := limitcaller.MakeRateLimitingTransportWithTransport(pstore, queueingTimeout, cl.Transport, addrInfo, maxIdleConnsPerHost) + rlrt := limitcaller.MakeRateLimitingTransportWithRoundTripper(pstore, queueingTimeout, cl.Transport, addrInfo, maxIdleConnsPerHost) cl.Transport = &rlrt return cl, nil From c3164bf22181f0bcd098a93e1808b510f5ce9ce9 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 20 Jun 2024 15:58:18 -0400 Subject: [PATCH 02/13] CR: magic numbers --- network/p2p/dnsaddr/resolve.go | 2 +- network/p2p/pubsub.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/network/p2p/dnsaddr/resolve.go b/network/p2p/dnsaddr/resolve.go index dbe2b002a1..5e0e8007fc 100644 --- a/network/p2p/dnsaddr/resolve.go +++ b/network/p2p/dnsaddr/resolve.go @@ -35,7 +35,7 @@ func Iterate(initial multiaddr.Multiaddr, controller ResolveController, f func(d if resolver == nil { return errors.New("passed controller has no resolvers Iterate") } - const maxHops = 100 + const maxHops = 25 // any reasonable number to prevent infinite loop in case of circular dnsaddr hops := 0 var toResolve = []multiaddr.Multiaddr{initial} for resolver != nil && len(toResolve) > 0 { diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go index 8e67ac6725..a968bcb6a9 100644 --- a/network/p2p/pubsub.go +++ b/network/p2p/pubsub.go @@ -53,6 +53,8 @@ const ( // TXTopicName defines a pubsub topic for TX messages const TXTopicName = "/algo/tx/0.1.0" +const incomingThreads = 20 // matches to number wsNetwork workers + func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub.PubSub, error) { //defaultParams := pubsub.DefaultGossipSubParams() @@ -95,7 +97,7 @@ func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub. pubsub.WithValidateQueueSize(256), pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), // pubsub.WithValidateThrottle(cfg.TxBacklogSize), - pubsub.WithValidateWorkers(20), // match to number wsNetwork workers + pubsub.WithValidateWorkers(incomingThreads), } return pubsub.NewGossipSub(ctx, host, options...) From 66947cf66df155b7f97b71f33e1b8be381f77d80 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 20 Jun 2024 15:59:35 -0400 Subject: [PATCH 03/13] txHandler capguard TODO --- data/txHandler.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 98b7e55a6b..2fd4ff2a0a 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -711,11 +711,9 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net accepted := false defer func() { // if we failed to put the item onto the backlog, we should release the capacity if any - if !accepted { - if capguard != nil { - if capErr := capguard.Release(); capErr != nil { - logging.Base().Warnf("processIncomingTxn: failed to release capacity to ElasticRateLimiter: %v", capErr) - } + if !accepted && capguard != nil { + if capErr := capguard.Release(); capErr != nil { + logging.Base().Warnf("processIncomingTxn: failed to release capacity to ElasticRateLimiter: %v", capErr) } } }() @@ -779,11 +777,9 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa accepted := false defer func() { // if we failed to put the item onto the backlog, we should release the capacity if any - if !accepted { - if capguard != nil { - if capErr := capguard.Release(); capErr != nil { - logging.Base().Warnf("validateIncomingTxMessage: failed to release capacity to ElasticRateLimiter: %v", capErr) - } + if !accepted && capguard != nil { + if capErr := capguard.Release(); capErr != nil { + logging.Base().Warnf("validateIncomingTxMessage: failed to release capacity to ElasticRateLimiter: %v", capErr) } } }() From 0772115baec19046c84ed689e6b7e53f8aed78f2 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 20 Jun 2024 16:00:45 -0400 Subject: [PATCH 04/13] ValidatorData -> ValidatedMessage --- data/txHandler.go | 10 +++++----- network/gossipNode.go | 6 +++--- network/p2pNetwork_test.go | 6 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 2fd4ff2a0a..7a598cb477 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -786,27 +786,27 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa if shouldDrop { // this TX message was found in the duplicate cache, or ERL rate-limited it - return network.ValidatedMessage{Action: network.Ignore, ValidatorData: nil} + return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil} } unverifiedTxGroup, consumed, invalid := decodeMsg(rawmsg.Data) if invalid { // invalid encoding or exceeding txgroup, disconnect from this peer - return network.ValidatedMessage{Action: network.Disconnect, ValidatorData: nil} + return network.ValidatedMessage{Action: network.Disconnect, ValidatedMessage: nil} } canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender) if drop { // this re-serialized txgroup was detected as a duplicate by the canonical message cache, // or it was rate-limited by the per-app rate limiter - return network.ValidatedMessage{Action: network.Ignore, ValidatorData: nil} + return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil} } accepted = true return network.ValidatedMessage{ Action: network.Accept, Tag: rawmsg.Tag, - ValidatorData: &validatedIncomingTxMessage{ + ValidatedMessage: &validatedIncomingTxMessage{ rawmsg: rawmsg, unverifiedTxGroup: unverifiedTxGroup, msgKey: msgKey, @@ -818,7 +818,7 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa // processIncomingTxMessage is the handler for the MessageProcessor implementation used by P2PNetwork. func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.ValidatedMessage) network.OutgoingMessage { - msg := validatedMessage.ValidatorData.(*validatedIncomingTxMessage) + msg := validatedMessage.ValidatedMessage.(*validatedIncomingTxMessage) select { case handler.backlogQueue <- &txBacklogMsg{ rawmsg: &msg.rawmsg, diff --git a/network/gossipNode.go b/network/gossipNode.go index 686d864d40..3dd7a5b6f9 100644 --- a/network/gossipNode.go +++ b/network/gossipNode.go @@ -171,9 +171,9 @@ type OutgoingMessage struct { // ValidatedMessage is a message that has been validated and is ready to be processed. // Think as an intermediate one between IncomingMessage and OutgoingMessage type ValidatedMessage struct { - Action ForwardingPolicy - Tag Tag - ValidatorData interface{} + Action ForwardingPolicy + Tag Tag + ValidatedMessage interface{} } // ForwardingPolicy is an enum indicating to whom we should send a message diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 715e8e6fab..5bd582ead0 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -108,7 +108,7 @@ func TestP2PSubmitTX(t *testing.T) { ProcessorHandleFunc }{ ProcessorValidateFunc(func(msg IncomingMessage) ValidatedMessage { - return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatorData: nil} + return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatedMessage: nil} }), ProcessorHandleFunc(func(msg ValidatedMessage) OutgoingMessage { return OutgoingMessage{Action: Ignore} @@ -200,7 +200,7 @@ func TestP2PSubmitTXNoGossip(t *testing.T) { ProcessorHandleFunc }{ ProcessorValidateFunc(func(msg IncomingMessage) ValidatedMessage { - return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatorData: nil} + return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatedMessage: nil} }), ProcessorHandleFunc(func(msg ValidatedMessage) OutgoingMessage { return OutgoingMessage{Action: Ignore} @@ -840,7 +840,7 @@ func TestP2PRelay(t *testing.T) { ProcessorHandleFunc }{ ProcessorValidateFunc(func(msg IncomingMessage) ValidatedMessage { - return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatorData: nil} + return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatedMessage: nil} }), ProcessorHandleFunc(func(msg ValidatedMessage) OutgoingMessage { if count := numActual.Add(1); int(count) >= numExpected { From a35b5c51560ede0e1815d78cdc1069164300aeb0 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 20 Jun 2024 16:07:27 -0400 Subject: [PATCH 05/13] P2PListenAddress -> P2PNetAddress --- config/localTemplate.go | 4 ++-- config/local_defaults.go | 2 +- data/txHandler.go | 2 ++ installer/config.json.example | 2 +- netdeploy/remote/nodecfg/nodeDir.go | 10 +++++----- network/hybridNetwork.go | 2 +- node/node_test.go | 2 +- test/testdata/configs/config-v34.json | 2 +- 8 files changed, 14 insertions(+), 12 deletions(-) diff --git a/config/localTemplate.go b/config/localTemplate.go index 8427bbdbf2..2e82f9fa60 100644 --- a/config/localTemplate.go +++ b/config/localTemplate.go @@ -603,8 +603,8 @@ type Local struct { // EnableP2PHybridMode turns on both websockets and P2P networking. EnableP2PHybridMode bool `version[34]:"false"` - // P2PListenAddress sets the listen address used for P2P networking, if hybrid mode is set. - P2PListenAddress string `version[34]:""` + // P2PNetAddress sets the listen address used for P2P networking, if hybrid mode is set. + P2PNetAddress string `version[34]:""` // EnableDHT will turn on the hash table for use with capabilities advertisement EnableDHTProviders bool `version[34]:"false"` diff --git a/config/local_defaults.go b/config/local_defaults.go index 54fffe1479..ae2ed22ebf 100644 --- a/config/local_defaults.go +++ b/config/local_defaults.go @@ -119,7 +119,7 @@ var defaultLocal = Local{ OptimizeAccountsDatabaseOnStartup: false, OutgoingMessageFilterBucketCount: 3, OutgoingMessageFilterBucketSize: 128, - P2PListenAddress: "", + P2PNetAddress: "", P2PPersistPeerID: false, P2PPrivateKeyLocation: "", ParticipationKeysRefreshInterval: 60000000000, diff --git a/data/txHandler.go b/data/txHandler.go index 7a598cb477..56d0f2ca01 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -240,10 +240,12 @@ func (handler *TxHandler) Start() { if handler.msgCache != nil { handler.msgCache.Start(handler.ctx, 60*time.Second) } + // wsNetwork handler handler.net.RegisterHandlers([]network.TaggedMessageHandler{ {Tag: protocol.TxnTag, MessageHandler: network.HandlerFunc(handler.processIncomingTxn)}, }) + // libp2p pubsub validator and handler abstracted as TaggedMessageProcessor handler.net.RegisterProcessors([]network.TaggedMessageProcessor{ { Tag: protocol.TxnTag, diff --git a/installer/config.json.example b/installer/config.json.example index c5255d63bd..7f16155303 100644 --- a/installer/config.json.example +++ b/installer/config.json.example @@ -98,7 +98,7 @@ "OptimizeAccountsDatabaseOnStartup": false, "OutgoingMessageFilterBucketCount": 3, "OutgoingMessageFilterBucketSize": 128, - "P2PListenAddress": "", + "P2PNetAddress": "", "P2PPersistPeerID": false, "P2PPrivateKeyLocation": "", "ParticipationKeysRefreshInterval": 60000000000, diff --git a/netdeploy/remote/nodecfg/nodeDir.go b/netdeploy/remote/nodecfg/nodeDir.go index 26b1c9b8d1..bdfc037438 100644 --- a/netdeploy/remote/nodecfg/nodeDir.go +++ b/netdeploy/remote/nodecfg/nodeDir.go @@ -174,21 +174,21 @@ func (nd *nodeDir) configureP2PDNSBootstrap(p2pBootstrap bool) error { } // ensure p2p config params set are what is expected: // - EnableP2P or EnableP2PHybridMode - // - NetAddress or P2PListenAddress is set + // - NetAddress or P2PNetAddress is set // - EnableGossipService if !nd.config.EnableP2P && !nd.config.EnableP2PHybridMode { return errors.New("p2p bootstrap requires EnableP2P or EnableP2PHybridMode to be set") } - if nd.NetAddress == "" && nd.config.P2PListenAddress == "" { - return errors.New("p2p bootstrap requires NetAddress or P2PListenAddress to be set") + if nd.NetAddress == "" && nd.config.P2PNetAddress == "" { + return errors.New("p2p bootstrap requires NetAddress or P2PNetAddress to be set") } if !nd.config.EnableGossipService { return errors.New("p2p bootstrap requires EnableGossipService to be set") } netAddress := nd.NetAddress - if nd.config.P2PListenAddress != "" { - netAddress = nd.config.P2PListenAddress + if nd.config.P2PNetAddress != "" { + netAddress = nd.config.P2PNetAddress } key, err := p2p.GetPrivKey(config.Local{P2PPersistPeerID: true}, nd.dataDir) diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index 6041d95f9a..eb06cab400 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -41,7 +41,7 @@ type HybridP2PNetwork struct { func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, nodeInfo NodeInfo) (*HybridP2PNetwork, error) { // supply alternate NetAddress for P2P network p2pcfg := cfg - p2pcfg.NetAddress = cfg.P2PListenAddress + p2pcfg.NetAddress = cfg.P2PNetAddress p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID, nodeInfo) if err != nil { return nil, err diff --git a/node/node_test.go b/node/node_test.go index 361f3478f7..403c06d736 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -896,7 +896,7 @@ func TestNodeHybridTopology(t *testing.T) { ni.p2pID, err = p2p.PeerIDFromPublicKey(privKey.GetPublic()) require.NoError(t, err) - cfg.P2PListenAddress = ni.p2pNetAddr() + cfg.P2PNetAddress = ni.p2pNetAddr() return ni, cfg } diff --git a/test/testdata/configs/config-v34.json b/test/testdata/configs/config-v34.json index c5255d63bd..7f16155303 100644 --- a/test/testdata/configs/config-v34.json +++ b/test/testdata/configs/config-v34.json @@ -98,7 +98,7 @@ "OptimizeAccountsDatabaseOnStartup": false, "OutgoingMessageFilterBucketCount": 3, "OutgoingMessageFilterBucketSize": 128, - "P2PListenAddress": "", + "P2PNetAddress": "", "P2PPersistPeerID": false, "P2PPrivateKeyLocation": "", "ParticipationKeysRefreshInterval": 60000000000, From f6dd7a47e4e911ae38a87e2c5be36456fa1ec8f1 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 20 Jun 2024 16:09:44 -0400 Subject: [PATCH 06/13] DeadlineSettable -> DeadlineSettableConn --- agreement/fuzzer/networkFacade_test.go | 2 +- agreement/gossip/network_test.go | 2 +- components/mocks/mockNetwork.go | 2 +- data/txHandler.go | 4 ++-- network/connPerfMon_test.go | 2 +- network/gossipNode.go | 8 ++++---- network/hybridNetwork.go | 6 +++--- network/p2pNetwork.go | 4 ++-- network/wsNetwork.go | 6 +++--- 9 files changed, 18 insertions(+), 18 deletions(-) diff --git a/agreement/fuzzer/networkFacade_test.go b/agreement/fuzzer/networkFacade_test.go index 804fb1e7ff..d1ec7bf8a5 100644 --- a/agreement/fuzzer/networkFacade_test.go +++ b/agreement/fuzzer/networkFacade_test.go @@ -348,7 +348,7 @@ func (n *NetworkFacade) ReceiveMessage(sourceNode int, tag protocol.Tag, data [] n.pushPendingReceivedMessage() } -func (n *NetworkFacade) Disconnect(sender network.DisconnectablePeer) { +func (n *NetworkFacade) Disconnect(sender network.DeadlineSettableConn) { sourceNode := n.peerToNode[sender.(*facadePeer)] n.fuzzer.Disconnect(n.nodeID, sourceNode) } diff --git a/agreement/gossip/network_test.go b/agreement/gossip/network_test.go index e1c5f49613..760456e66c 100644 --- a/agreement/gossip/network_test.go +++ b/agreement/gossip/network_test.go @@ -135,7 +135,7 @@ func (w *whiteholeNetwork) Relay(ctx context.Context, tag protocol.Tag, data []b func (w *whiteholeNetwork) BroadcastSimple(tag protocol.Tag, data []byte) error { return w.Broadcast(context.Background(), tag, data, true, nil) } -func (w *whiteholeNetwork) Disconnect(badnode network.DisconnectablePeer) { +func (w *whiteholeNetwork) Disconnect(badnode network.DeadlineSettableConn) { return } func (w *whiteholeNetwork) DisconnectPeers() { diff --git a/components/mocks/mockNetwork.go b/components/mocks/mockNetwork.go index c0b7724e07..a6a55e7ec4 100644 --- a/components/mocks/mockNetwork.go +++ b/components/mocks/mockNetwork.go @@ -60,7 +60,7 @@ func (network *MockNetwork) RequestConnectOutgoing(replace bool, quit <-chan str } // Disconnect - unused function -func (network *MockNetwork) Disconnect(badpeer network.DisconnectablePeer) { +func (network *MockNetwork) Disconnect(badpeer network.DeadlineSettableConn) { } // DisconnectPeers - unused function diff --git a/data/txHandler.go b/data/txHandler.go index 56d0f2ca01..c797bc41f3 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -597,7 +597,7 @@ func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.Signed // - the key used for insertion if the message was not found in the cache // - the capacity guard returned by the elastic rate limiter // - a boolean indicating if the message was a duplicate or the sender is rate limited -func (handler *TxHandler) incomingMsgDupErlCheck(data []byte, sender network.DisconnectablePeer) (*crypto.Digest, *util.ErlCapacityGuard, bool) { +func (handler *TxHandler) incomingMsgDupErlCheck(data []byte, sender network.DeadlineSettableConn) (*crypto.Digest, *util.ErlCapacityGuard, bool) { var msgKey *crypto.Digest var capguard *util.ErlCapacityGuard var isDup bool @@ -681,7 +681,7 @@ func decodeMsg(data []byte) (unverifiedTxGroup []transactions.SignedTxn, consume // incomingTxGroupDupRateLimit checks // - if the incoming transaction group has been seen before after reencoding to canonical representation, and // - if the sender is rate limited by the per-application rate limiter. -func (handler *TxHandler) incomingTxGroupDupRateLimit(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int, sender network.DisconnectablePeer) (*crypto.Digest, bool) { +func (handler *TxHandler) incomingTxGroupDupRateLimit(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int, sender network.DeadlineSettableConn) (*crypto.Digest, bool) { var canonicalKey *crypto.Digest if handler.txCanonicalCache != nil { var isDup bool diff --git a/network/connPerfMon_test.go b/network/connPerfMon_test.go index 560be72a96..fe10abbd34 100644 --- a/network/connPerfMon_test.go +++ b/network/connPerfMon_test.go @@ -48,7 +48,7 @@ func makeMsgPool(N int, peers []Peer) (out []IncomingMessage) { addMsg := func(msgCount int) { for i := 0; i < msgCount; i++ { - msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DisconnectablePeer) + msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DeadlineSettableConn) timer += int64(7 * time.Nanosecond) msg.Received = timer out = append(out, msg) diff --git a/network/gossipNode.go b/network/gossipNode.go index 3dd7a5b6f9..8356c254dc 100644 --- a/network/gossipNode.go +++ b/network/gossipNode.go @@ -29,8 +29,8 @@ import ( // Peer opaque interface for referring to a neighbor in the network type Peer interface{} -// DisconnectablePeer is a Peer with a long-living connection to a network that can be disconnected -type DisconnectablePeer interface { +// DeadlineSettableConn is a Peer with a long-living connection to a network that can be disconnected +type DeadlineSettableConn interface { GetNetwork() GossipNode } @@ -62,7 +62,7 @@ type GossipNode interface { Address() (string, bool) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error - Disconnect(badnode DisconnectablePeer) + Disconnect(badnode DeadlineSettableConn) DisconnectPeers() // only used by testing // RegisterHTTPHandler path accepts gorilla/mux path annotations @@ -127,7 +127,7 @@ var outgoingMessagesBufferSize = int( // IncomingMessage represents a message arriving from some peer in our p2p network type IncomingMessage struct { - Sender DisconnectablePeer + Sender DeadlineSettableConn Tag Tag Data []byte Err error diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index eb06cab400..6f97a04926 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -115,7 +115,7 @@ func (n *HybridP2PNetwork) Relay(ctx context.Context, tag protocol.Tag, data []b } // Disconnect implements GossipNode -func (n *HybridP2PNetwork) Disconnect(badnode DisconnectablePeer) { +func (n *HybridP2PNetwork) Disconnect(badnode DeadlineSettableConn) { net := badnode.GetNetwork() if net == n.p2pNetwork { n.p2pNetwork.Disconnect(badnode) @@ -180,13 +180,13 @@ func (n *HybridP2PNetwork) ClearHandlers() { n.wsNetwork.ClearHandlers() } -// RegisterProcessors adds to the set of given message handlers. +// RegisterProcessors adds to the set of given message processors. func (n *HybridP2PNetwork) RegisterProcessors(dispatch []TaggedMessageProcessor) { n.p2pNetwork.RegisterProcessors(dispatch) n.wsNetwork.RegisterProcessors(dispatch) } -// ClearProcessors deregisters all the existing message handlers. +// ClearProcessors deregisters all the existing message processors. func (n *HybridP2PNetwork) ClearProcessors() { n.p2pNetwork.ClearProcessors() n.wsNetwork.ClearProcessors() diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 22da1915f3..95cc3295f4 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -527,7 +527,7 @@ func (n *P2PNetwork) Relay(ctx context.Context, tag protocol.Tag, data []byte, w } // Disconnect from a peer, probably due to protocol errors. -func (n *P2PNetwork) Disconnect(badpeer DisconnectablePeer) { +func (n *P2PNetwork) Disconnect(badpeer DeadlineSettableConn) { var peerID peer.ID var wsp *wsPeer @@ -555,7 +555,7 @@ func (n *P2PNetwork) Disconnect(badpeer DisconnectablePeer) { } } -func (n *P2PNetwork) disconnectThread(badnode DisconnectablePeer, reason disconnectReason) { +func (n *P2PNetwork) disconnectThread(badnode DeadlineSettableConn, reason disconnectReason) { defer n.wg.Done() n.Disconnect(badnode) // ignores reason } diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 92a32091c4..090fcccd87 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -362,7 +362,7 @@ type networkPeerManager interface { // used by msgHandler Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error - disconnectThread(badnode DisconnectablePeer, reason disconnectReason) + disconnectThread(badnode DeadlineSettableConn, reason disconnectReason) checkPeersConnectivity() } @@ -477,13 +477,13 @@ func (wn *WebsocketNetwork) RelayArray(ctx context.Context, tags []protocol.Tag, return nil } -func (wn *WebsocketNetwork) disconnectThread(badnode DisconnectablePeer, reason disconnectReason) { +func (wn *WebsocketNetwork) disconnectThread(badnode DeadlineSettableConn, reason disconnectReason) { defer wn.wg.Done() wn.disconnect(badnode, reason) } // Disconnect from a peer, probably due to protocol errors. -func (wn *WebsocketNetwork) Disconnect(node DisconnectablePeer) { +func (wn *WebsocketNetwork) Disconnect(node DeadlineSettableConn) { wn.disconnect(node, disconnectBadData) } From af3ae165ef08bc2357b19908a6f25001f0c4da7d Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 20 Jun 2024 17:24:06 -0400 Subject: [PATCH 07/13] CR fixes found by Jason --- daemon/algod/server.go | 2 +- network/p2p/logger.go | 3 ++- util/metrics/opencensus_test.go | 6 +++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/daemon/algod/server.go b/daemon/algod/server.go index f1f6d44099..5250f7de40 100644 --- a/daemon/algod/server.go +++ b/daemon/algod/server.go @@ -289,7 +289,7 @@ func (s *Server) Start() { fmt.Print("Initializing the Algorand node... ") err := s.node.Start() if err != nil { - msg := fmt.Sprintf("Failed to start alg Algorand node: %v", err) + msg := fmt.Sprintf("Failed to start an Algorand node: %v", err) s.log.Error(msg) fmt.Println(msg) os.Exit(1) diff --git a/network/p2p/logger.go b/network/p2p/logger.go index 8ac9dc7b97..cde67f7f58 100644 --- a/network/p2p/logger.go +++ b/network/p2p/logger.go @@ -70,7 +70,8 @@ func EnableP2PLogging(log logging.Logger, l logging.Level) { } func (c *loggingCore) Enabled(l zapcore.Level) bool { - return c.log.IsLevelEnabled(c.level) + level := levelsMap[l] + return c.log.IsLevelEnabled(level) } func (c *loggingCore) With(fields []zapcore.Field) zapcore.Core { diff --git a/util/metrics/opencensus_test.go b/util/metrics/opencensus_test.go index b1f5ff102a..f5401af541 100644 --- a/util/metrics/opencensus_test.go +++ b/util/metrics/opencensus_test.go @@ -37,9 +37,9 @@ func TestDHTOpenCensusMetrics(t *testing.T) { defaultBytesDistribution := view.Distribution(1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296) - keyMessageType, _ := tag.NewKey("message_type") - keyPeerID, _ := tag.NewKey("peer_id") - keyInstanceID, _ := tag.NewKey("instance_id") + keyMessageType := tag.MustNewKey("message_type") + keyPeerID := tag.MustNewKey("peer_id") + keyInstanceID := tag.MustNewKey("instance_id") sentMessages := stats.Int64("my_sent_messages", "Total number of messages sent per RPC", stats.UnitDimensionless) receivedBytes := stats.Int64("my_received_bytes", "Total received bytes per RPC", stats.UnitBytes) From 2be7e7e75c24fdbdd80d8d308f321f2a559bf16c Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 21 Jun 2024 11:10:43 -0400 Subject: [PATCH 08/13] fix comment in node_test --- node/node_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/node/node_test.go b/node/node_test.go index 403c06d736..cc9f86d162 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -903,9 +903,7 @@ func TestNodeHybridTopology(t *testing.T) { phonebookHook := func(ni []nodeInfo, i int) []string { switch i { case 0: - // node 0 (N) only accept connections to work around the peer selector - // ConnectedOut priority. TODO: merge switching to archival peers from master - // when ready. + // node 0 (N) only accept connections at the beginning to learn about archival node from DHT t.Logf("Node%d phonebook: empty", i) return []string{} case 1: From ec9ab12daea955490f66abd938500636f3c45fdf Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 21 Jun 2024 12:44:28 -0400 Subject: [PATCH 09/13] Get rid of interfac{} in ConnectionTimeStore and Phonebook methods --- network/limitcaller/rateLimitingTransport.go | 13 ++++---- network/p2p/peerstore/peerstore.go | 20 ++++++------ network/p2p/peerstore/peerstore_test.go | 34 ++++++++++---------- network/phonebook/phonebook.go | 12 +++---- 4 files changed, 40 insertions(+), 39 deletions(-) diff --git a/network/limitcaller/rateLimitingTransport.go b/network/limitcaller/rateLimitingTransport.go index a00f930622..a839c44393 100644 --- a/network/limitcaller/rateLimitingTransport.go +++ b/network/limitcaller/rateLimitingTransport.go @@ -22,12 +22,13 @@ import ( "time" "github.com/algorand/go-algorand/util" + "github.com/libp2p/go-libp2p/core/peer" ) // ConnectionTimeStore is a subset of the phonebook that is used to store the connection times. type ConnectionTimeStore interface { - GetConnectionWaitTime(addrOrInfo interface{}) (bool, time.Duration, time.Time) - UpdateConnectionTime(addrOrInfo interface{}, provisionalTime time.Time) bool + GetConnectionWaitTime(addrOrPeerID string) (bool, time.Duration, time.Time) + UpdateConnectionTime(addrOrPeerID string, provisionalTime time.Time) bool } // RateLimitingTransport is the transport for execute a single HTTP transaction, obtaining the Response for a given Request. @@ -81,13 +82,13 @@ func (r *RateLimitingTransport) RoundTrip(req *http.Request) (res *http.Response var waitTime time.Duration var provisionalTime time.Time queueingDeadline := time.Now().Add(r.queueingTimeout) - var host interface{} = req.Host + addrOrPeerID := req.Host // p2p/http clients have per-connection transport and address info so use that if len(req.Host) == 0 && req.URL != nil && len(req.URL.Host) == 0 { - host = r.targetAddr + addrOrPeerID = string(r.targetAddr.(*peer.AddrInfo).ID) } for { - _, waitTime, provisionalTime = r.phonebook.GetConnectionWaitTime(host) + _, waitTime, provisionalTime = r.phonebook.GetConnectionWaitTime(addrOrPeerID) if waitTime == 0 { break // break out of the loop and proceed to the connection } @@ -99,6 +100,6 @@ func (r *RateLimitingTransport) RoundTrip(req *http.Request) (res *http.Response return nil, ErrConnectionQueueingTimeout } res, err = r.innerTransport.RoundTrip(req) - r.phonebook.UpdateConnectionTime(host, provisionalTime) + r.phonebook.UpdateConnectionTime(addrOrPeerID, provisionalTime) return } diff --git a/network/p2p/peerstore/peerstore.go b/network/p2p/peerstore/peerstore.go index 9bbb48ab46..71d5675fbd 100644 --- a/network/p2p/peerstore/peerstore.go +++ b/network/p2p/peerstore/peerstore.go @@ -127,12 +127,12 @@ func (ps *PeerStore) UpdateRetryAfter(addr string, retryAfter time.Time) { // The connection should be established when the waitTime is 0. // It will register a provisional next connection time when the waitTime is 0. // The provisional time should be updated after the connection with UpdateConnectionTime -func (ps *PeerStore) GetConnectionWaitTime(addr interface{}) (bool, time.Duration, time.Time) { +func (ps *PeerStore) GetConnectionWaitTime(addrOrPeerID string) (bool, time.Duration, time.Time) { curTime := time.Now() - info := addr.(*peer.AddrInfo) var timeSince time.Duration var numElmtsToRemove int - metadata, err := ps.Get(info.ID, addressDataKey) + peerID := peer.ID(addrOrPeerID) + metadata, err := ps.Get(peerID, addressDataKey) if err != nil { return false, 0 /* not used */, curTime /* not used */ } @@ -151,9 +151,9 @@ func (ps *PeerStore) GetConnectionWaitTime(addr interface{}) (bool, time.Duratio } // Remove the expired elements from e.data[addr].recentConnectionTimes - ps.popNElements(numElmtsToRemove, info.ID) + ps.popNElements(numElmtsToRemove, peerID) // If there are max number of connections within the time window, wait - metadata, _ = ps.Get(info.ID, addressDataKey) + metadata, _ = ps.Get(peerID, addressDataKey) ad, ok = metadata.(addressData) if !ok { return false, 0 /* not used */, curTime /* not used */ @@ -169,14 +169,14 @@ func (ps *PeerStore) GetConnectionWaitTime(addr interface{}) (bool, time.Duratio // Update curTime, since it may have significantly changed if waited provisionalTime := time.Now() // Append the provisional time for the next connection request - ps.appendTime(info.ID, provisionalTime) + ps.appendTime(peerID, provisionalTime) return true, 0 /* no wait. proceed */, provisionalTime } // UpdateConnectionTime updates the connection time for the given address. -func (ps *PeerStore) UpdateConnectionTime(addr interface{}, provisionalTime time.Time) bool { - info := addr.(*peer.AddrInfo) - metadata, err := ps.Get(info.ID, addressDataKey) +func (ps *PeerStore) UpdateConnectionTime(addrOrPeerID string, provisionalTime time.Time) bool { + peerID := peer.ID(addrOrPeerID) + metadata, err := ps.Get(peerID, addressDataKey) if err != nil { return false } @@ -185,7 +185,7 @@ func (ps *PeerStore) UpdateConnectionTime(addr interface{}, provisionalTime time return false } defer func() { - _ = ps.Put(info.ID, addressDataKey, ad) + _ = ps.Put(peerID, addressDataKey, ad) }() diff --git a/network/p2p/peerstore/peerstore_test.go b/network/p2p/peerstore/peerstore_test.go index f0c67a93b4..e855013d76 100644 --- a/network/p2p/peerstore/peerstore_test.go +++ b/network/p2p/peerstore/peerstore_test.go @@ -337,17 +337,17 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) { info2, _ := peerInfoFromDomainPort(addr2) // Address not in. Should return false - addrInPhonebook, _, provisionalTime := entries.GetConnectionWaitTime(info1) + addrInPhonebook, _, provisionalTime := entries.GetConnectionWaitTime(string(info1.ID)) require.Equal(t, false, addrInPhonebook) - require.Equal(t, false, entries.UpdateConnectionTime(info1, provisionalTime)) + require.Equal(t, false, entries.UpdateConnectionTime(string(info1.ID), provisionalTime)) // Test the addresses are populated in the phonebook and a // time can be added to one of them entries.ReplacePeerList([]interface{}{info1, info2}, "default", PhoneBookEntryRelayRole) - addrInPhonebook, waitTime, provisionalTime := entries.GetConnectionWaitTime(info1) + addrInPhonebook, waitTime, provisionalTime := entries.GetConnectionWaitTime(string(info1.ID)) require.Equal(t, true, addrInPhonebook) require.Equal(t, time.Duration(0), waitTime) - require.Equal(t, true, entries.UpdateConnectionTime(info1, provisionalTime)) + require.Equal(t, true, entries.UpdateConnectionTime(string(info1.ID), provisionalTime)) data, _ := entries.Get(info1.ID, addressDataKey) require.NotNil(t, data) ad := data.(addressData) @@ -360,9 +360,9 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) { } // add another value to addr - addrInPhonebook, waitTime, provisionalTime = entries.GetConnectionWaitTime(info1) + addrInPhonebook, waitTime, provisionalTime = entries.GetConnectionWaitTime(string(info1.ID)) require.Equal(t, time.Duration(0), waitTime) - require.Equal(t, true, entries.UpdateConnectionTime(info1, provisionalTime)) + require.Equal(t, true, entries.UpdateConnectionTime(string(info1.ID), provisionalTime)) data, _ = entries.Get(info1.ID, addressDataKey) ad = data.(addressData) phBookData = ad.recentConnectionTimes @@ -375,9 +375,9 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) { // the first time should be removed and a new one added // there should not be any wait - addrInPhonebook, waitTime, provisionalTime = entries.GetConnectionWaitTime(info1) + addrInPhonebook, waitTime, provisionalTime = entries.GetConnectionWaitTime(string(info1.ID)) require.Equal(t, time.Duration(0), waitTime) - require.Equal(t, true, entries.UpdateConnectionTime(info1, provisionalTime)) + require.Equal(t, true, entries.UpdateConnectionTime(string(info1.ID), provisionalTime)) data, _ = entries.Get(info1.ID, addressDataKey) ad = data.(addressData) phBookData2 := ad.recentConnectionTimes @@ -392,9 +392,9 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) { // add 3 values to another address. should not wait // value 1 - _, waitTime, provisionalTime = entries.GetConnectionWaitTime(info2) + _, waitTime, provisionalTime = entries.GetConnectionWaitTime(string(info2.ID)) require.Equal(t, time.Duration(0), waitTime) - require.Equal(t, true, entries.UpdateConnectionTime(info2, provisionalTime)) + require.Equal(t, true, entries.UpdateConnectionTime(string(info2.ID), provisionalTime)) // introduce a gap between the two requests so that only the first will be removed later when waited // simulate passing a unit of time @@ -406,13 +406,13 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) { } // value 2 - _, waitTime, provisionalTime = entries.GetConnectionWaitTime(info2) + _, waitTime, provisionalTime = entries.GetConnectionWaitTime(string(info2.ID)) require.Equal(t, time.Duration(0), waitTime) - require.Equal(t, true, entries.UpdateConnectionTime(info2, provisionalTime)) + require.Equal(t, true, entries.UpdateConnectionTime(string(info2.ID), provisionalTime)) // value 3 - _, waitTime, provisionalTime = entries.GetConnectionWaitTime(info2) + _, waitTime, provisionalTime = entries.GetConnectionWaitTime(string(info2.ID)) require.Equal(t, time.Duration(0), waitTime) - require.Equal(t, true, entries.UpdateConnectionTime(info2, provisionalTime)) + require.Equal(t, true, entries.UpdateConnectionTime(string(info2.ID), provisionalTime)) data2, _ = entries.Get(info2.ID, addressDataKey) ad2 = data2.(addressData) @@ -421,7 +421,7 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) { require.Equal(t, 3, len(phBookData)) // add another element to trigger wait - _, waitTime, provisionalTime = entries.GetConnectionWaitTime(info2) + _, waitTime, provisionalTime = entries.GetConnectionWaitTime(string(info2.ID)) require.Greater(t, int64(waitTime), int64(0)) // no element should be removed data2, _ = entries.Get(info2.ID, addressDataKey) @@ -436,9 +436,9 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) { } // The wait should be sufficient - _, waitTime, provisionalTime = entries.GetConnectionWaitTime(info2) + _, waitTime, provisionalTime = entries.GetConnectionWaitTime(string(info2.ID)) require.Equal(t, time.Duration(0), waitTime) - require.Equal(t, true, entries.UpdateConnectionTime(info2, provisionalTime)) + require.Equal(t, true, entries.UpdateConnectionTime(string(info2.ID), provisionalTime)) // only one element should be removed, and one added data2, _ = entries.Get(info2.ID, addressDataKey) ad2 = data2.(addressData) diff --git a/network/phonebook/phonebook.go b/network/phonebook/phonebook.go index a1486c3d5a..634ca9c16c 100644 --- a/network/phonebook/phonebook.go +++ b/network/phonebook/phonebook.go @@ -55,12 +55,12 @@ type Phonebook interface { // The connection should be established when the waitTime is 0. // It will register a provisional next connection time when the waitTime is 0. // The provisional time should be updated after the connection with UpdateConnectionTime - GetConnectionWaitTime(addr interface{}) (addrInPhonebook bool, + GetConnectionWaitTime(addrOrPeerID string) (addrInPhonebook bool, waitTime time.Duration, provisionalTime time.Time) // UpdateConnectionTime will update the provisional connection time. // Returns true of the addr was in the phonebook - UpdateConnectionTime(addr interface{}, provisionalTime time.Time) bool + UpdateConnectionTime(addrOrPeerID string, provisionalTime time.Time) bool // ReplacePeerList merges a set of addresses with that passed in for networkName // new entries in dnsAddresses are being added @@ -231,10 +231,10 @@ func (e *phonebookImpl) UpdateRetryAfter(addr string, retryAfter time.Time) { // The connection should be established when the waitTime is 0. // It will register a provisional next connection time when the waitTime is 0. // The provisional time should be updated after the connection with UpdateConnectionTime -func (e *phonebookImpl) GetConnectionWaitTime(a interface{}) (addrInPhonebook bool, +func (e *phonebookImpl) GetConnectionWaitTime(addrOrPeerID string) (addrInPhonebook bool, waitTime time.Duration, provisionalTime time.Time) { - addr := a.(string) + addr := addrOrPeerID e.lock.Lock() defer e.lock.Unlock() @@ -278,8 +278,8 @@ func (e *phonebookImpl) GetConnectionWaitTime(a interface{}) (addrInPhonebook bo // UpdateConnectionTime will update the provisional connection time. // Returns true of the addr was in the phonebook -func (e *phonebookImpl) UpdateConnectionTime(a interface{}, provisionalTime time.Time) bool { - addr := a.(string) +func (e *phonebookImpl) UpdateConnectionTime(addrOrPeerID string, provisionalTime time.Time) bool { + addr := addrOrPeerID e.lock.Lock() defer e.lock.Unlock() From 9bf2313e98eea67a10edcc81f661d698cc839203 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 21 Jun 2024 14:15:52 -0400 Subject: [PATCH 10/13] logging: fix Formatter data race --- logging/log.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logging/log.go b/logging/log.go index 47a14ec722..770bf08bb9 100644 --- a/logging/log.go +++ b/logging/log.go @@ -319,7 +319,7 @@ func (l logger) getOutput() io.Writer { } func (l logger) SetJSONFormatter() { - l.entry.Logger.Formatter = &logrus.JSONFormatter{TimestampFormat: "2006-01-02T15:04:05.000000Z07:00"} + l.entry.Logger.SetFormatter(&logrus.JSONFormatter{TimestampFormat: "2006-01-02T15:04:05.000000Z07:00"}) } func (l logger) Entry() *logrus.Entry { From bc0412972a94392f919c0cd058e3bb5ccea830bc Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 21 Jun 2024 14:16:26 -0400 Subject: [PATCH 11/13] fix peerstore data race --- network/p2p/peerstore/peerstore.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/network/p2p/peerstore/peerstore.go b/network/p2p/peerstore/peerstore.go index 71d5675fbd..3eda0d3686 100644 --- a/network/p2p/peerstore/peerstore.go +++ b/network/p2p/peerstore/peerstore.go @@ -23,6 +23,7 @@ import ( "time" "github.com/algorand/go-algorand/network/phonebook" + "github.com/algorand/go-deadlock" "github.com/libp2p/go-libp2p/core/peer" libp2p "github.com/libp2p/go-libp2p/core/peerstore" mempstore "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" @@ -53,6 +54,7 @@ type addressData struct { // networkNames: lists the networks to which the given address belongs. networkNames map[string]bool + mu *deadlock.RWMutex // role is the role that this address serves. role phonebook.PhoneBookEntryRoles @@ -216,9 +218,11 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []interface{}, networkName st data, _ := ps.Get(pid, addressDataKey) if data != nil { ad := data.(addressData) + ad.mu.RLock() if ad.networkNames[networkName] && ad.role == role && !ad.persistent { removeItems[pid] = true } + ad.mu.RUnlock() } } @@ -229,7 +233,9 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []interface{}, networkName st // we already have this. // Update the networkName ad := data.(addressData) + ad.mu.Lock() ad.networkNames[networkName] = true + ad.mu.Unlock() // do not remove this entry delete(removeItems, info.ID) @@ -278,6 +284,7 @@ func (ps *PeerStore) Length() int { func makePhonebookEntryData(networkName string, role phonebook.PhoneBookEntryRoles, persistent bool) addressData { pbData := addressData{ networkNames: make(map[string]bool), + mu: &deadlock.RWMutex{}, recentConnectionTimes: make([]time.Time, 0), role: role, persistent: persistent, @@ -292,8 +299,11 @@ func (ps *PeerStore) deletePhonebookEntry(peerID peer.ID, networkName string) { return } ad := data.(addressData) + ad.mu.Lock() delete(ad.networkNames, networkName) - if len(ad.networkNames) == 0 { + isEmpty := len(ad.networkNames) == 0 + ad.mu.Unlock() + if isEmpty { ps.ClearAddrs(peerID) _ = ps.Put(peerID, addressDataKey, nil) } From 79814549ac8453a66b230f399c27530542bdc954 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 21 Jun 2024 14:44:05 -0400 Subject: [PATCH 12/13] node: fixed ledger descriptors leak --- node/node.go | 1 + node/node_test.go | 32 ++++++++------------------------ 2 files changed, 9 insertions(+), 24 deletions(-) diff --git a/node/node.go b/node/node.go index bfb32ba3ff..e5bbc19516 100644 --- a/node/node.go +++ b/node/node.go @@ -463,6 +463,7 @@ func (node *AlgorandFullNode) Stop() { node.highPriorityCryptoVerificationPool.Shutdown() node.lowPriorityCryptoVerificationPool.Shutdown() node.cryptoPool.Shutdown() + node.ledger.Close() node.cancelCtx() } diff --git a/node/node_test.go b/node/node_test.go index cc9f86d162..080c6b54d8 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -45,7 +45,6 @@ import ( "github.com/algorand/go-algorand/test/partitiontest" "github.com/algorand/go-algorand/util" "github.com/algorand/go-algorand/util/db" - "github.com/algorand/go-algorand/util/execpool" ) var expectedAgreementTime = 2*config.Protocol.BigLambda + config.Protocol.SmallLambda + config.Consensus[protocol.ConsensusCurrentVersion].AgreementFilterTimeout + 2*time.Second @@ -86,7 +85,7 @@ func (ni nodeInfo) p2pMultiAddr() string { type configHook func(ni nodeInfo, cfg config.Local) (nodeInfo, config.Local) type phonebookHook func([]nodeInfo, int) []string -func setupFullNodes(t *testing.T, proto protocol.ConsensusVersion, verificationPool execpool.BacklogPool, customConsensus config.ConsensusProtocols) ([]*AlgorandFullNode, []string) { +func setupFullNodes(t *testing.T, proto protocol.ConsensusVersion, customConsensus config.ConsensusProtocols) ([]*AlgorandFullNode, []string) { minMoneyAtStart := 10000 maxMoneyAtStart := 100000 gen := rand.New(rand.NewSource(2)) @@ -111,14 +110,14 @@ func setupFullNodes(t *testing.T, proto protocol.ConsensusVersion, verificationP } return phonebook } - nodes, wallets := setupFullNodesEx(t, proto, verificationPool, customConsensus, acctStake, configHook, phonebookHook) + nodes, wallets := setupFullNodesEx(t, proto, customConsensus, acctStake, configHook, phonebookHook) require.Len(t, nodes, numAccounts) require.Len(t, wallets, numAccounts) return nodes, wallets } func setupFullNodesEx( - t *testing.T, proto protocol.ConsensusVersion, verificationPool execpool.BacklogPool, customConsensus config.ConsensusProtocols, + t *testing.T, proto protocol.ConsensusVersion, customConsensus config.ConsensusProtocols, acctStake []basics.MicroAlgos, configHook configHook, phonebookHook phonebookHook, ) ([]*AlgorandFullNode, []string) { @@ -264,10 +263,7 @@ func TestSyncingFullNode(t *testing.T) { t.Skip("Test is too heavy for amd64 builder running in parallel with other packages") } - backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) - defer backlogPool.Shutdown() - - nodes, wallets := setupFullNodes(t, protocol.ConsensusCurrentVersion, backlogPool, nil) + nodes, wallets := setupFullNodes(t, protocol.ConsensusCurrentVersion, nil) for i := 0; i < len(nodes); i++ { defer os.Remove(wallets[i]) defer nodes[i].Stop() @@ -329,10 +325,7 @@ func TestInitialSync(t *testing.T) { t.Skip("Test is too heavy for amd64 builder running in parallel with other packages") } - backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) - defer backlogPool.Shutdown() - - nodes, wallets := setupFullNodes(t, protocol.ConsensusCurrentVersion, backlogPool, nil) + nodes, wallets := setupFullNodes(t, protocol.ConsensusCurrentVersion, nil) for i := 0; i < len(nodes); i++ { defer os.Remove(wallets[i]) defer nodes[i].Stop() @@ -370,9 +363,6 @@ func TestSimpleUpgrade(t *testing.T) { t.Skip("Test is too heavy for amd64 builder running in parallel with other packages") } - backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) - defer backlogPool.Shutdown() - // ConsensusTest0 is a version of ConsensusV0 used for testing // (it has different approved upgrade paths). const consensusTest0 = protocol.ConsensusVersion("test0") @@ -409,7 +399,7 @@ func TestSimpleUpgrade(t *testing.T) { testParams1.ApprovedUpgrades = map[protocol.ConsensusVersion]uint64{} configurableConsensus[consensusTest1] = testParams1 - nodes, wallets := setupFullNodes(t, consensusTest0, backlogPool, configurableConsensus) + nodes, wallets := setupFullNodes(t, consensusTest0, configurableConsensus) for i := 0; i < len(nodes); i++ { defer os.Remove(wallets[i]) defer nodes[i].Stop() @@ -921,10 +911,7 @@ func TestNodeHybridTopology(t *testing.T) { return nil } - backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) - defer backlogPool.Shutdown() - - nodes, wallets := setupFullNodesEx(t, consensusTest0, backlogPool, configurableConsensus, acctStake, configHook, phonebookHook) + nodes, wallets := setupFullNodesEx(t, consensusTest0, configurableConsensus, acctStake, configHook, phonebookHook) require.Len(t, nodes, 3) require.Len(t, wallets, 3) for i := 0; i < len(nodes); i++ { @@ -1015,10 +1002,7 @@ func TestNodeP2PRelays(t *testing.T) { return nil } - backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) - defer backlogPool.Shutdown() - - nodes, wallets := setupFullNodesEx(t, consensusTest0, backlogPool, configurableConsensus, acctStake, configHook, phonebookHook) + nodes, wallets := setupFullNodesEx(t, consensusTest0, configurableConsensus, acctStake, configHook, phonebookHook) require.Len(t, nodes, 3) require.Len(t, wallets, 3) for i := 0; i < len(nodes); i++ { From c8af9fb82c597fcdd20e5f14037f7fd552b68d18 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 21 Jun 2024 17:06:24 -0400 Subject: [PATCH 13/13] fix peer.AddrInfo cast --- network/limitcaller/rateLimitingTransport.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/network/limitcaller/rateLimitingTransport.go b/network/limitcaller/rateLimitingTransport.go index a839c44393..45bc0725ed 100644 --- a/network/limitcaller/rateLimitingTransport.go +++ b/network/limitcaller/rateLimitingTransport.go @@ -85,7 +85,11 @@ func (r *RateLimitingTransport) RoundTrip(req *http.Request) (res *http.Response addrOrPeerID := req.Host // p2p/http clients have per-connection transport and address info so use that if len(req.Host) == 0 && req.URL != nil && len(req.URL.Host) == 0 { - addrOrPeerID = string(r.targetAddr.(*peer.AddrInfo).ID) + addrInfo, ok := r.targetAddr.(*peer.AddrInfo) + if !ok { + return nil, errors.New("rateLimitingTransport: request without Host/URL and targetAddr is not a peer.AddrInfo") + } + addrOrPeerID = string(addrInfo.ID) } for { _, waitTime, provisionalTime = r.phonebook.GetConnectionWaitTime(addrOrPeerID)