From 3221ba3e499a7fbcc0f2edbfbc970123c6d5c998 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 13 Jun 2024 17:36:29 -0400 Subject: [PATCH 01/22] Implement shared identity tracker add TestPeerIDChallengeSigner refactor NewIdentityChallengeScheme p2pnet: use identity tracker add NewIdentityChallengeScheme tests fix TestNodeHybridTopology after enforcing PublicAddress in config fix ident tracker reinit fix wsnet setup CR: self review --- config/config.go | 18 ++++- config/config_test.go | 35 +++++++++ config/localTemplate.go | 1 + network/hybridNetwork.go | 8 +- network/hybridNetwork_test.go | 136 +++++++++++++++++++++++++++++++++ network/netidentity.go | 89 +++++++++++++++++---- network/netidentity_test.go | 99 ++++++++++++++++++++---- network/p2p/peerID_test.go | 21 +++++ network/p2pNetwork.go | 45 +++++++++-- network/p2pNetwork_test.go | 34 ++++----- network/requestLogger_test.go | 13 ++-- network/requestTracker_test.go | 13 ++-- network/wsNetwork.go | 31 ++++---- network/wsNetwork_test.go | 80 ++++++++++--------- node/follower_node.go | 2 +- node/node.go | 4 +- node/node_test.go | 1 + 17 files changed, 500 insertions(+), 130 deletions(-) create mode 100644 network/hybridNetwork_test.go diff --git a/config/config.go b/config/config.go index 2d5d0bdbfe..8f513f9aee 100644 --- a/config/config.go +++ b/config/config.go @@ -144,7 +144,17 @@ func mergeConfigFromFile(configpath string, source Local) (Local, error) { defer f.Close() err = loadConfig(f, &source) + if err != nil { + return source, err + } + source, err = fixupConfig(source) + return source, err +} +// fixupConfig makes the following tweaks to the config: +// - If NetAddress is set, enable the ledger and block services +// - If EnableP2PHybridMode is set, require PublicAddress to be set +func fixupConfig(source Local) (Local, error) { if source.NetAddress != "" { source.EnableLedgerService = true source.EnableBlockService = true @@ -155,8 +165,12 @@ func mergeConfigFromFile(configpath string, source Local) (Local, error) { source.GossipFanout = defaultRelayGossipFanout } } - - return source, err + // In hybrid mode we want to prevent connections from the same node over both P2P and WS. + // The only way it is supported at the moment is to use net identity challenge that is based on PublicAddress. + if source.EnableP2PHybridMode && source.PublicAddress == "" { + return source, errors.New("PublicAddress must be specified when EnableP2PHybridMode is set") + } + return source, nil } func loadConfig(reader io.Reader, config *Local) error { diff --git a/config/config_test.go b/config/config_test.go index 432c0f9281..7da3156157 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -121,6 +121,41 @@ func TestLocal_MergeConfig(t *testing.T) { require.Equal(t, c1.GossipFanout, c2.GossipFanout) } +func TestLocal_FixupConfig(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + c1 := Local{ + NetAddress: "test1", + GossipFanout: defaultLocal.GossipFanout, + } + c2, err := fixupConfig(c1) + require.NoError(t, err) + require.NotEqual(t, c1, c2) + require.False(t, c1.EnableLedgerService) + require.False(t, c1.EnableBlockService) + require.Equal(t, c1.GossipFanout, defaultLocal.GossipFanout) + require.True(t, c2.EnableLedgerService) + require.True(t, c2.EnableBlockService) + require.Equal(t, c2.GossipFanout, defaultRelayGossipFanout) + + c1 = Local{ + EnableP2PHybridMode: true, + } + c2, err = fixupConfig(c1) + require.Error(t, err) + + c1 = Local{ + EnableP2PHybridMode: true, + PublicAddress: "test2", + } + c2, err = fixupConfig(c1) + require.NoError(t, err) + require.Equal(t, c1, c2) + require.True(t, c2.EnableP2PHybridMode) + require.NotEmpty(t, c2.PublicAddress) +} + func saveFullPhonebook(phonebook phonebookBlackWhiteList, saveToDir string) error { filename := filepath.Join(saveToDir, PhonebookFilename) f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) diff --git a/config/localTemplate.go b/config/localTemplate.go index 314b83a78b..534817c8a4 100644 --- a/config/localTemplate.go +++ b/config/localTemplate.go @@ -602,6 +602,7 @@ type Local struct { EnableP2P bool `version[31]:"false"` // EnableP2PHybridMode turns on both websockets and P2P networking. + // Enabling this setting also requires PublicAddress to be set. EnableP2PHybridMode bool `version[34]:"false"` // P2PNetAddress sets the listen address used for P2P networking, if hybrid mode is set. diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index 27fc6edbb0..72a634651d 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -42,11 +42,15 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p // supply alternate NetAddress for P2P network p2pcfg := cfg p2pcfg.NetAddress = cfg.P2PNetAddress - p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID, nodeInfo) + identityOpts := identityOpts{ + tracker: NewIdentityTracker(), + } + p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID, nodeInfo, &identityOpts) if err != nil { return nil, err } - wsnet, err := NewWebsocketNetwork(log, cfg, phonebookAddresses, genesisID, networkID, nodeInfo, p2pnet.PeerID(), p2pnet.PeerIDSigner()) + identityOpts.scheme = NewIdentityChallengeScheme(NetIdentityDedupNames(cfg.PublicAddress, p2pnet.PeerID().String()), NetIdentitySigner(p2pnet.PeerIDSigner())) + wsnet, err := NewWebsocketNetwork(log, cfg, phonebookAddresses, genesisID, networkID, nodeInfo, &identityOpts) if err != nil { return nil, err } diff --git a/network/hybridNetwork_test.go b/network/hybridNetwork_test.go new file mode 100644 index 0000000000..689f575cdf --- /dev/null +++ b/network/hybridNetwork_test.go @@ -0,0 +1,136 @@ +// Copyright (C) 2019-2024 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package network + +import ( + "net/url" + "testing" + "time" + + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/test/partitiontest" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" +) + +// TestHybridNetwork_DuplicateConn checks the same nodes do not connect over ws and p2p. +// Scenario: +// 1. Create a hybrid network: relay and two nodes +// 2. Let them connect to the relay +// 3. Ensure relay has only two connections +// 4. Ensure extra connection attempts were rejected by nodes rather than relay +func TestHybridNetwork_DuplicateConn(t *testing.T) { + partitiontest.PartitionTest(t) + + cfg := config.GetDefaultLocal() + cfg.EnableP2PHybridMode = true + log := logging.TestingLog(t) + const p2pKeyDir = "" + + identDiscValue := int(networkPeerIdentityDisconnect.GetUint64Value()) + + relayCfg := cfg + relayCfg.ForceRelayMessages = true + netA, err := NewHybridP2PNetwork(log.With("node", "netA"), relayCfg, p2pKeyDir, nil, genesisID, "net", &nopeNodeInfo{}) + require.NoError(t, err) + + err = netA.Start() + require.NoError(t, err) + + // collect ws address + addr, portListen := netA.wsNetwork.Address() + require.True(t, portListen) + require.NotZero(t, addr) + parsed, err := url.Parse(addr) + require.NoError(t, err) + addr = parsed.Host + netA.Stop() + + // make it net address and restart the node + relayCfg.NetAddress = addr + relayCfg.PublicAddress = addr + netA, err = NewHybridP2PNetwork(log.With("node", "netA"), relayCfg, p2pKeyDir, nil, genesisID, "net", &nopeNodeInfo{}) + require.NoError(t, err) + + err = netA.Start() + require.NoError(t, err) + defer netA.Stop() + + // collect relay address and prepare nodes phonebook + peerInfoA := netA.p2pNetwork.service.AddrInfo() + addrsAp2p, err := peer.AddrInfoToP2pAddrs(&peerInfoA) + require.NoError(t, err) + require.NotZero(t, addrsAp2p[0]) + multiAddrStr := addrsAp2p[0].String() + + fullAddr, portListen := netA.wsNetwork.Address() + require.True(t, portListen) + require.NotZero(t, addr) + require.Contains(t, fullAddr, addr) + + phoneBookAddresses := []string{multiAddrStr, addr} + + netB, err := NewHybridP2PNetwork(log.With("node", "netB"), cfg, "", phoneBookAddresses, genesisID, "net", &nopeNodeInfo{}) + require.NoError(t, err) + // for netB start the p2p network first + err = netB.p2pNetwork.Start() + require.NoError(t, err) + defer netB.Stop() + + netC, err := NewHybridP2PNetwork(log.With("node", "netC"), cfg, "", phoneBookAddresses, genesisID, "net", &nopeNodeInfo{}) + require.NoError(t, err) + // for netC start the ws network first + err = netC.wsNetwork.Start() + require.NoError(t, err) + defer netC.Stop() + + // ensure initial connections are done + require.Eventually(t, func() bool { + return len(netA.GetPeers(PeersConnectedIn)) == 2+identDiscValue + }, 3*time.Second, 50*time.Millisecond) + + // start the second half of the hybrid net + err = netB.wsNetwork.Start() + require.NoError(t, err) + err = netC.p2pNetwork.Start() + require.NoError(t, err) + + // wait for connection attempts. nodes need some time to make connections, + // and instead of `time.Sleep(1 * time.Second)` the networkPeerIdentityDisconnect net identity counter is used. + // Since this test is not parallel the networkPeerIdentityDisconnect should not be modified from outside. + require.Eventually(t, func() bool { + return networkPeerIdentityDisconnect.GetUint64Value() == 2 + }, 2*time.Second, 50*time.Millisecond) + + // now count connections + // netA should have 2 connections, not 4 + // netB should have 1 connection (via p2p) + // netC should have 1 connection (via ws) + + require.Eventually(t, func() bool { + return len(netB.GetPeers(PeersConnectedOut)) == 1 + }, time.Second, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return len(netC.GetPeers(PeersConnectedOut)) == 1 + }, time.Second, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return len(netA.GetPeers(PeersConnectedIn)) == 2 + }, 3*time.Second, 50*time.Millisecond) +} diff --git a/network/netidentity.go b/network/netidentity.go index 4d797a1a5b..30755f0648 100644 --- a/network/netidentity.go +++ b/network/netidentity.go @@ -100,6 +100,11 @@ type identityChallengeSigner interface { PublicKey() crypto.PublicKey } +type identityOpts struct { + scheme identityChallengeScheme + tracker identityTracker +} + type identityChallengeLegacySigner struct { keys *crypto.SignatureSecrets } @@ -120,37 +125,81 @@ func (s *identityChallengeLegacySigner) PublicKey() crypto.PublicKey { // exchanging and verifying public key challenges and attaching them to headers, // or returning the message payload to be sent type identityChallengePublicKeyScheme struct { - dedupName string + dedupNames map[string]struct{} identityKeys identityChallengeSigner } +type identityChallengeSchemeConfig struct { + dedupNames []string + signer identityChallengeSigner +} + +// IdentityChallengeSchemeOption is a function that can be passed to NewIdentityChallengeScheme +type IdentityChallengeSchemeOption func(*identityChallengeSchemeConfig) + +// NetIdentityDedupNames is an option to set the deduplication names for the identity challenge scheme +func NetIdentityDedupNames(dn ...string) IdentityChallengeSchemeOption { + return func(c *identityChallengeSchemeConfig) { + c.dedupNames = append(c.dedupNames, dn...) + } +} + +// NetIdentitySigner is an option to set the signer for the identity challenge scheme +func NetIdentitySigner(s identityChallengeSigner) IdentityChallengeSchemeOption { + return func(c *identityChallengeSchemeConfig) { + c.signer = s + } +} + // NewIdentityChallengeScheme will create a default Identification Scheme -func NewIdentityChallengeScheme(dn string) *identityChallengePublicKeyScheme { - // without an deduplication name, there is no identityto manage, so just return an empty scheme - if dn == "" { +func NewIdentityChallengeScheme(opts ...IdentityChallengeSchemeOption) *identityChallengePublicKeyScheme { + // without an deduplication name, there is no identity to manage, so just return an empty scheme + if len(opts) == 0 { + return &identityChallengePublicKeyScheme{} + } + + config := identityChallengeSchemeConfig{} + for _, opt := range opts { + opt(&config) + } + + if len(config.dedupNames) == 0 { return &identityChallengePublicKeyScheme{} } + hasNonEmpty := false + dedupNames := make(map[string]struct{}, len(config.dedupNames)) + for _, name := range config.dedupNames { + if len(name) > 0 { + dedupNames[name] = struct{}{} + hasNonEmpty = true + } + } + if !hasNonEmpty { + return &identityChallengePublicKeyScheme{} + } + + if config.signer != nil { + return &identityChallengePublicKeyScheme{ + dedupNames: dedupNames, + identityKeys: config.signer, + } + } + var seed crypto.Seed crypto.RandBytes(seed[:]) - return &identityChallengePublicKeyScheme{ - dedupName: dn, + dedupNames: dedupNames, identityKeys: &identityChallengeLegacySigner{keys: crypto.GenerateSignatureSecrets(seed)}, } } -// NewIdentityChallengeSchemeWithSigner will create an identification Scheme with a given signer -func NewIdentityChallengeSchemeWithSigner(dn string, signer identityChallengeSigner) *identityChallengePublicKeyScheme { - return &identityChallengePublicKeyScheme{dedupName: dn, identityKeys: signer} -} - // AttachChallenge will generate a new identity challenge and will encode and attach the challenge // as a header. It returns the identityChallengeValue used for this challenge, so the network can // confirm it later (by passing it to VerifyResponse), or returns an empty challenge if dedupName is // not set. func (i identityChallengePublicKeyScheme) AttachChallenge(attachTo http.Header, addr string) identityChallengeValue { - if i.dedupName == "" || addr == "" { + if len(i.dedupNames) == 0 || addr == "" { return identityChallengeValue{} } c := identityChallenge{ @@ -172,7 +221,7 @@ func (i identityChallengePublicKeyScheme) AttachChallenge(attachTo http.Header, // or returns empty values if the header did not end up getting set func (i identityChallengePublicKeyScheme) VerifyRequestAndAttachResponse(attachTo http.Header, h http.Header) (identityChallengeValue, crypto.PublicKey, error) { // if dedupName is not set, this scheme is not configured to exchange identity - if i.dedupName == "" { + if len(i.dedupNames) == 0 { return identityChallengeValue{}, crypto.PublicKey{}, nil } // if the headerString is not populated, the peer isn't participating in identity exchange @@ -193,10 +242,11 @@ func (i identityChallengePublicKeyScheme) VerifyRequestAndAttachResponse(attachT if !idChal.Verify() { return identityChallengeValue{}, crypto.PublicKey{}, fmt.Errorf("identity challenge incorrectly signed") } + // if the address is not meant for this host, return without attaching headers, // but also do not emit an error. This is because if an operator were to incorrectly // specify their dedupName, it could result in inappropriate disconnections from valid peers - if string(idChal.Msg.PublicAddress) != i.dedupName { + if _, ok := i.dedupNames[string(idChal.Msg.PublicAddress)]; !ok { return identityChallengeValue{}, crypto.PublicKey{}, nil } // make the response object, encode it and attach it to the header @@ -216,7 +266,7 @@ func (i identityChallengePublicKeyScheme) VerifyRequestAndAttachResponse(attachT // encoded identityVerificationMessage to send to the peer. Otherwise, it returns empty values. func (i identityChallengePublicKeyScheme) VerifyResponse(h http.Header, c identityChallengeValue) (crypto.PublicKey, []byte, error) { // if we are not participating in identity challenge exchange, do nothing (no error and no value) - if i.dedupName == "" { + if len(i.dedupNames) == 0 { return crypto.PublicKey{}, []byte{}, nil } headerString := h.Get(IdentityChallengeHeader) @@ -400,9 +450,16 @@ type identityTracker interface { setIdentity(p *wsPeer) bool } +// noopIdentityTracker implements identityTracker by doing nothing. +// Intended for pure p2p mode when libp2p is handling identities itself. +type noopIdentityTracker struct{} + +func (noopIdentityTracker) setIdentity(p *wsPeer) bool { return true } +func (noopIdentityTracker) removeIdentity(p *wsPeer) {} + // publicKeyIdentTracker implements identityTracker by // mapping from PublicKeys exchanged in identity challenges to a peer -// this structure is not thread-safe; it is protected by wn.peersLock. +// this structure is not thread-safe; it is protected by wn.peersLock or p2p.wsPeersLock type publicKeyIdentTracker struct { peersByID map[crypto.PublicKey]*wsPeer } diff --git a/network/netidentity_test.go b/network/netidentity_test.go index f87480c1b1..f40dec6832 100644 --- a/network/netidentity_test.go +++ b/network/netidentity_test.go @@ -32,12 +32,15 @@ func TestIdentityChallengeSchemeAttachIfEnabled(t *testing.T) { partitiontest.PartitionTest(t) h := http.Header{} - i := NewIdentityChallengeScheme("") + i0 := NewIdentityChallengeScheme() + i := NewIdentityChallengeScheme(NetIdentityDedupNames("")) + require.Equal(t, i0, i) + require.Empty(t, i) chal := i.AttachChallenge(h, "other") require.Empty(t, h.Get(IdentityChallengeHeader)) require.Empty(t, chal) - j := NewIdentityChallengeScheme("yes") + j := NewIdentityChallengeScheme(NetIdentityDedupNames("yes")) chal = j.AttachChallenge(h, "other") require.NotEmpty(t, h.Get(IdentityChallengeHeader)) require.NotEmpty(t, chal) @@ -48,7 +51,7 @@ func TestIdentityChallengeSchemeAttachIfEnabled(t *testing.T) { func TestIdentityChallengeSchemeVerifyRequestAndAttachResponse(t *testing.T) { partitiontest.PartitionTest(t) - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) // author a challenge to the other scheme h := http.Header{} i.AttachChallenge(h, "i2") @@ -58,7 +61,7 @@ func TestIdentityChallengeSchemeVerifyRequestAndAttachResponse(t *testing.T) { h = http.Header{} i.AttachChallenge(h, "i2") r := http.Header{} - i2 := NewIdentityChallengeScheme("") + i2 := NewIdentityChallengeScheme() chal, key, err := i2.VerifyRequestAndAttachResponse(r, h) require.Empty(t, r.Get(IdentityChallengeHeader)) require.Empty(t, chal) @@ -69,7 +72,7 @@ func TestIdentityChallengeSchemeVerifyRequestAndAttachResponse(t *testing.T) { h = http.Header{} i.AttachChallenge(h, "i2") r = http.Header{} - i2 = NewIdentityChallengeScheme("not i2") + i2 = NewIdentityChallengeScheme(NetIdentityDedupNames("not i2")) chal, key, err = i2.VerifyRequestAndAttachResponse(r, h) require.Empty(t, r.Get(IdentityChallengeHeader)) require.Empty(t, chal) @@ -80,7 +83,7 @@ func TestIdentityChallengeSchemeVerifyRequestAndAttachResponse(t *testing.T) { h = http.Header{} h.Add(IdentityChallengeHeader, "garbage") r = http.Header{} - i2 = NewIdentityChallengeScheme("i2") + i2 = NewIdentityChallengeScheme(NetIdentityDedupNames("i2")) chal, key, err = i2.VerifyRequestAndAttachResponse(r, h) require.Empty(t, r.Get(IdentityChallengeHeader)) require.Empty(t, chal) @@ -91,7 +94,7 @@ func TestIdentityChallengeSchemeVerifyRequestAndAttachResponse(t *testing.T) { h = http.Header{} i.AttachChallenge(h, "i2") r = http.Header{} - i2 = NewIdentityChallengeScheme("i2") + i2 = NewIdentityChallengeScheme(NetIdentityDedupNames("i2")) chal, key, err = i2.VerifyRequestAndAttachResponse(r, h) require.NotEmpty(t, r.Get(IdentityChallengeHeader)) require.NotEmpty(t, chal) @@ -103,11 +106,11 @@ func TestIdentityChallengeNoErrorWhenNotParticipating(t *testing.T) { partitiontest.PartitionTest(t) // blank deduplication name will make the scheme a no-op - iNotParticipate := NewIdentityChallengeScheme("") + iNotParticipate := NewIdentityChallengeScheme() // create a request header first h := http.Header{} - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) origChal := i.AttachChallenge(h, "i1") require.NotEmpty(t, h.Get(IdentityChallengeHeader)) require.NotEmpty(t, origChal) @@ -120,7 +123,7 @@ func TestIdentityChallengeNoErrorWhenNotParticipating(t *testing.T) { // create a response h2 := http.Header{} - i2 := NewIdentityChallengeScheme("i2") + i2 := NewIdentityChallengeScheme(NetIdentityDedupNames("i2")) i2.VerifyRequestAndAttachResponse(h2, h) // confirm a nil scheme will not return values or error @@ -148,7 +151,7 @@ func TestIdentityChallengeSchemeVerifyResponse(t *testing.T) { partitiontest.PartitionTest(t) h := http.Header{} - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) // author a challenge to ourselves origChal := i.AttachChallenge(h, "i1") require.NotEmpty(t, h.Get(IdentityChallengeHeader)) @@ -176,7 +179,7 @@ func TestIdentityChallengeSchemeBadSignature(t *testing.T) { partitiontest.PartitionTest(t) h := http.Header{} - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) // Copy the logic of attaching the header and signing so we can sign it wrong c := identityChallengeSigned{ Msg: identityChallenge{ @@ -204,7 +207,7 @@ func TestIdentityChallengeSchemeBadPayload(t *testing.T) { partitiontest.PartitionTest(t) h := http.Header{} - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) h.Add(IdentityChallengeHeader, "NOT VALID BASE 64! :)") // observe that VerifyRequestAndAttachResponse won't do anything on bad signature @@ -222,7 +225,7 @@ func TestIdentityChallengeSchemeBadResponseSignature(t *testing.T) { partitiontest.PartitionTest(t) h := http.Header{} - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) // author a challenge to ourselves origChal := i.AttachChallenge(h, "i1") require.NotEmpty(t, h.Get(IdentityChallengeHeader)) @@ -253,7 +256,7 @@ func TestIdentityChallengeSchemeBadResponsePayload(t *testing.T) { partitiontest.PartitionTest(t) h := http.Header{} - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) // author a challenge to ourselves origChal := i.AttachChallenge(h, "i1") require.NotEmpty(t, h.Get(IdentityChallengeHeader)) @@ -275,7 +278,7 @@ func TestIdentityChallengeSchemeWrongChallenge(t *testing.T) { partitiontest.PartitionTest(t) h := http.Header{} - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) // author a challenge to ourselves origChal := i.AttachChallenge(h, "i1") require.NotEmpty(t, h.Get(IdentityChallengeHeader)) @@ -366,3 +369,67 @@ func TestIdentityTrackerHandlerGuard(t *testing.T) { } require.Equal(t, OutgoingMessage{}, identityVerificationHandler(msg)) } + +// TestNewIdentityChallengeScheme ensures NewIdentityChallengeScheme returns +// a correct identityChallengePublicKeyScheme for the following inputs: +// DedupNames(a, b) vs DedupNames(a), DedupNames(b) +// Empty vs non-empty PeerID, PublicAddress +// Empty vs non-empty Signer +func TestNewIdentityChallengeScheme(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + s1 := NewIdentityChallengeScheme() + s2 := NewIdentityChallengeScheme(NetIdentityDedupNames("")) + s3 := NewIdentityChallengeScheme(NetIdentityDedupNames("", "")) + s4 := NewIdentityChallengeScheme(NetIdentityDedupNames(""), NetIdentityDedupNames("")) + require.Equal(t, s1, s2) + require.Equal(t, s2, s3) + require.Equal(t, s3, s4) + require.Empty(t, s1) + + s1 = NewIdentityChallengeScheme(NetIdentityDedupNames("a", "a")) + s2 = NewIdentityChallengeScheme(NetIdentityDedupNames("a"), NetIdentityDedupNames("a")) + require.Equal(t, s1.dedupNames, s2.dedupNames) + require.Len(t, s1.dedupNames, 1) + require.IsType(t, &identityChallengeLegacySigner{}, s1.identityKeys) + require.IsType(t, &identityChallengeLegacySigner{}, s2.identityKeys) + require.NotEqual(t, s1.identityKeys, s2.identityKeys) + + s1 = NewIdentityChallengeScheme(NetIdentityDedupNames("a", "b")) + s2 = NewIdentityChallengeScheme(NetIdentityDedupNames("a"), NetIdentityDedupNames("b")) + require.Equal(t, s1.dedupNames, s2.dedupNames) + require.Len(t, s1.dedupNames, 2) + require.IsType(t, &identityChallengeLegacySigner{}, s1.identityKeys) + require.IsType(t, &identityChallengeLegacySigner{}, s2.identityKeys) + require.NotEqual(t, s1.identityKeys, s2.identityKeys) + + s1 = NewIdentityChallengeScheme(NetIdentityDedupNames("", "a")) + s2 = NewIdentityChallengeScheme(NetIdentityDedupNames("a"), NetIdentityDedupNames("")) + s3 = NewIdentityChallengeScheme(NetIdentityDedupNames("a", "")) + s4 = NewIdentityChallengeScheme(NetIdentityDedupNames(""), NetIdentityDedupNames("a")) + require.Equal(t, s1.dedupNames, s2.dedupNames) + require.Equal(t, s2.dedupNames, s3.dedupNames) + require.Equal(t, s3.dedupNames, s4.dedupNames) + require.Len(t, s1.dedupNames, 1) + require.IsType(t, &identityChallengeLegacySigner{}, s1.identityKeys) + require.IsType(t, &identityChallengeLegacySigner{}, s2.identityKeys) + require.NotEqual(t, s1.identityKeys, s2.identityKeys) + + s1 = NewIdentityChallengeScheme(NetIdentityDedupNames("a"), NetIdentitySigner(&identityChallengeLegacySigner{})) + require.Len(t, s1.dedupNames, 1) + require.IsType(t, &identityChallengeLegacySigner{}, s1.identityKeys) + + var seed crypto.Seed + crypto.RandBytes(seed[:]) + signer := &identityChallengeLegacySigner{keys: crypto.GenerateSignatureSecrets(seed)} + s1 = NewIdentityChallengeScheme(NetIdentityDedupNames("a"), NetIdentitySigner(signer)) + require.Len(t, s1.dedupNames, 1) + require.IsType(t, &identityChallengeLegacySigner{}, s1.identityKeys) + require.Equal(t, signer, s1.identityKeys) + + s1 = NewIdentityChallengeScheme(NetIdentityDedupNames(""), NetIdentitySigner(signer)) + require.Empty(t, s1) + s1 = NewIdentityChallengeScheme(NetIdentitySigner(signer)) + require.Empty(t, s1) +} diff --git a/network/p2p/peerID_test.go b/network/p2p/peerID_test.go index 9d7729d593..6d9599ee33 100644 --- a/network/p2p/peerID_test.go +++ b/network/p2p/peerID_test.go @@ -25,6 +25,8 @@ import ( "github.com/stretchr/testify/require" "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/data/hashable" "github.com/algorand/go-algorand/test/partitiontest" ) @@ -103,3 +105,22 @@ func TestGetPrivKeyUserGeneratedEphemeral(t *testing.T) { _, err = loadPrivateKeyFromFile(path.Join(tempdir, DefaultPrivKeyPath)) assert.True(t, os.IsNotExist(err)) } + +func TestPeerIDChallengeSigner(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + privKey, err := generatePrivKey() + require.NoError(t, err) + + data := make([]byte, 111) + crypto.RandBytes(data) + msg := hashable.Message{Message: string(data)} + + signer := PeerIDChallengeSigner{key: privKey} + sig := signer.Sign(msg) + require.True(t, signer.Verify(msg, sig)) + pubKey := privKey.GetPublic() + pubKeyRaw, err := pubKey.Raw() + require.NoError(t, err) + require.Equal(t, crypto.PublicKey(pubKeyRaw), signer.PublicKey()) +} diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 1ad49bd045..d43a62f56b 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -26,6 +26,7 @@ import ( "time" "github.com/algorand/go-algorand/config" + algocrypto "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/logging/telemetryspec" "github.com/algorand/go-algorand/network/limitcaller" @@ -81,6 +82,8 @@ type P2PNetwork struct { nodeInfo NodeInfo pstore *peerstore.PeerStore httpServer *p2p.HTTPServer + + identityTracker identityTracker } type bootstrapper struct { @@ -214,7 +217,7 @@ func (p gossipSubPeer) RoutingAddr() []byte { } // NewP2PNetwork returns an instance of GossipNode that uses the p2p.Service -func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, node NodeInfo) (*P2PNetwork, error) { +func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, node NodeInfo, identityOpts *identityOpts) (*P2PNetwork, error) { const readBufferLen = 2048 // create Peerstore and add phonebook addresses @@ -262,6 +265,13 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo broadcastQueueBulk: make(chan broadcastRequest, 100), } + if identityOpts != nil { + net.identityTracker = identityOpts.tracker + } + if net.identityTracker == nil { + net.identityTracker = noopIdentityTracker{} + } + p2p.EnableP2PLogging(log, logging.Level(cfg.BaseLoggerDebugLevel)) h, la, err := p2p.MakeHost(cfg, datadir, pstore) @@ -751,21 +761,32 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea ma := stream.Conn().RemoteMultiaddr() addr := ma.String() if addr == "" { - n.log.Warnf("Could not get address for peer %s", p2pPeer) + n.log.Warnf("Cannot get address for peer %s", p2pPeer) } - // create a wsPeer for this stream and added it to the peers map. + // create a wsPeer for this stream and added it to the peers map. addrInfo := &peer.AddrInfo{ID: p2pPeer, Addrs: []multiaddr.Multiaddr{ma}} maxIdleConnsPerHost := int(n.config.ConnectionsRateLimitingCount) client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout, maxIdleConnsPerHost) if err != nil { client = nil } + var netIdentPeerID algocrypto.PublicKey + if p2pPeerPubKey, err0 := p2pPeer.ExtractPublicKey(); err0 == nil { + if b, err0 := p2pPeerPubKey.Raw(); err0 == nil { + netIdentPeerID = algocrypto.PublicKey(b) + } else { + n.log.Warnf("Cannot get raw pubkey for peer %s", p2pPeer) + } + } else { + n.log.Warnf("Cannot get pubkey for peer %s", p2pPeer) + } peerCore := makePeerCore(ctx, n, n.log, n.handler.readBuffer, addr, client, addr) wsp := &wsPeer{ wsPeerCore: peerCore, conn: &wsPeerConnP2PImpl{stream: stream}, outgoing: !incoming, + identity: netIdentPeerID, } protos, err := n.pstore.GetProtocols(p2pPeer) if err != nil { @@ -773,6 +794,19 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea } wsp.TelemetryGUID, wsp.InstanceName = p2p.GetPeerTelemetryInfo(protos) + localAddr, has := n.Address() + if !has { + n.log.Warn("Could not get local address") + } + n.wsPeersLock.Lock() + ok := n.identityTracker.setIdentity(wsp) + n.wsPeersLock.Unlock() + if !ok { + networkPeerIdentityDisconnect.Inc(nil) + n.log.With("remote", addr).With("local", localAddr).Warn("peer deduplicated before adding because the identity is already known") + stream.Close() + } + wsp.init(n.config, outgoingMessagesBufferSize) n.wsPeersLock.Lock() n.wsPeers[p2pPeer] = wsp @@ -786,10 +820,6 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea event = "ConnectedIn" msg = "Accepted incoming connection from peer %s" } - localAddr, has := n.Address() - if !has { - n.log.Warn("Could not get local address") - } n.log.With("event", event).With("remote", addr).With("local", localAddr).Infof(msg, p2pPeer.String()) if n.log.GetLevel() >= logging.Debug { @@ -811,6 +841,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea func (n *P2PNetwork) peerRemoteClose(peer *wsPeer, reason disconnectReason) { remotePeerID := peer.conn.(*wsPeerConnP2PImpl).stream.Conn().RemotePeer() n.wsPeersLock.Lock() + n.identityTracker.removeIdentity(peer) delete(n.wsPeers, remotePeerID) delete(n.wsPeersToIDs, peer) n.wsPeersLock.Unlock() diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 7c94be98e4..c562bcfcd6 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -58,7 +58,7 @@ func TestP2PSubmitTX(t *testing.T) { cfg := config.GetDefaultLocal() cfg.ForceFetchTransactions = true log := logging.TestingLog(t) - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netA.Start() defer netA.Stop() @@ -70,12 +70,12 @@ func TestP2PSubmitTX(t *testing.T) { multiAddrStr := addrsA[0].String() phoneBookAddresses := []string{multiAddrStr} - netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netB.Start() defer netB.Stop() - netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netC.Start() defer netC.Stop() @@ -150,7 +150,7 @@ func TestP2PSubmitTXNoGossip(t *testing.T) { cfg := config.GetDefaultLocal() cfg.ForceFetchTransactions = true log := logging.TestingLog(t) - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netA.Start() defer netA.Stop() @@ -162,7 +162,7 @@ func TestP2PSubmitTXNoGossip(t *testing.T) { multiAddrStr := addrsA[0].String() phoneBookAddresses := []string{multiAddrStr} - netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netB.Start() defer netB.Stop() @@ -179,7 +179,7 @@ func TestP2PSubmitTXNoGossip(t *testing.T) { // run netC in NPN mode (no relay => no gossip sup => no TX receiving) cfg.ForceFetchTransactions = false - netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netC.Start() defer netC.Stop() @@ -244,7 +244,7 @@ func TestP2PSubmitWS(t *testing.T) { cfg := config.GetDefaultLocal() log := logging.TestingLog(t) - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) err = netA.Start() @@ -258,13 +258,13 @@ func TestP2PSubmitWS(t *testing.T) { multiAddrStr := addrsA[0].String() phoneBookAddresses := []string{multiAddrStr} - netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) err = netB.Start() require.NoError(t, err) defer netB.Stop() - netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) err = netC.Start() require.NoError(t, err) @@ -378,7 +378,7 @@ func TestP2PNetworkAddress(t *testing.T) { cfg := config.GetDefaultLocal() log := logging.TestingLog(t) - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) defer netA.Stop() require.NoError(t, err) addrInfo := netA.service.AddrInfo() @@ -589,7 +589,7 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, test.nis[0]) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, test.nis[0], nil) require.NoError(t, err) err = netA.Start() @@ -603,13 +603,13 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) { multiAddrStr := addrsA[0].String() phoneBookAddresses := []string{multiAddrStr} - netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[1]) + netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[1], nil) require.NoError(t, err) err = netB.Start() require.NoError(t, err) defer netB.Stop() - netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[2]) + netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[2], nil) require.NoError(t, err) err = netC.Start() require.NoError(t, err) @@ -736,7 +736,7 @@ func TestP2PHTTPHandler(t *testing.T) { cfg.GossipFanout = 1 log := logging.TestingLog(t) - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) h := &p2phttpHandler{t, "hello", nil} @@ -798,7 +798,7 @@ func TestP2PRelay(t *testing.T) { cfg.ForceFetchTransactions = true log := logging.TestingLog(t) log.Debugln("Starting netA") - netA, err := NewP2PNetwork(log.With("net", "netA"), cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netA, err := NewP2PNetwork(log.With("net", "netA"), cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) err = netA.Start() @@ -814,7 +814,7 @@ func TestP2PRelay(t *testing.T) { phoneBookAddresses := []string{multiAddrStr} log.Debugf("Starting netB with phonebook addresses %v", phoneBookAddresses) - netB, err := NewP2PNetwork(log.With("net", "netB"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netB, err := NewP2PNetwork(log.With("net", "netB"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) err = netB.Start() require.NoError(t, err) @@ -881,7 +881,7 @@ func TestP2PRelay(t *testing.T) { // ensure all messages from netB and netC are received by netA cfg.NetAddress = "127.0.0.1:0" log.Debugf("Starting netB with phonebook addresses %v", phoneBookAddresses) - netC, err := NewP2PNetwork(log.With("net", "netC"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netC, err := NewP2PNetwork(log.With("net", "netC"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) err = netC.Start() require.NoError(t, err) diff --git a/network/requestLogger_test.go b/network/requestLogger_test.go index 0de6a41c73..c6bde8956e 100644 --- a/network/requestLogger_test.go +++ b/network/requestLogger_test.go @@ -50,12 +50,13 @@ func TestRequestLogger(t *testing.T) { dl := eventsDetailsLogger{Logger: log, eventReceived: make(chan interface{}, 1), eventIdentifier: telemetryspec.HTTPRequestEvent} log.SetLevel(logging.Level(defaultConfig.BaseLoggerDebugLevel)) netA := &WebsocketNetwork{ - log: dl, - config: defaultConfig, - phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: "go-test-network-genesis", - NetworkID: config.Devtestnet, - peerStater: peerConnectionStater{log: log}, + log: dl, + config: defaultConfig, + phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), + GenesisID: "go-test-network-genesis", + NetworkID: config.Devtestnet, + peerStater: peerConnectionStater{log: log}, + identityTracker: noopIdentityTracker{}, } netA.config.EnableRequestLogger = true netA.setup() diff --git a/network/requestTracker_test.go b/network/requestTracker_test.go index d814507c78..46f003e0f8 100644 --- a/network/requestTracker_test.go +++ b/network/requestTracker_test.go @@ -87,12 +87,13 @@ func TestRateLimiting(t *testing.T) { // This test is conducted locally, so we want to treat all hosts the same for counting incoming requests. testConfig.DisableLocalhostConnectionRateLimit = false wn := &WebsocketNetwork{ - log: log, - config: testConfig, - phonebook: phonebook.MakePhonebook(1, 1), - GenesisID: "go-test-network-genesis", - NetworkID: config.Devtestnet, - peerStater: peerConnectionStater{log: log}, + log: log, + config: testConfig, + phonebook: phonebook.MakePhonebook(1, 1), + GenesisID: "go-test-network-genesis", + NetworkID: config.Devtestnet, + peerStater: peerConnectionStater{log: log}, + identityTracker: noopIdentityTracker{}, } // increase the IncomingConnectionsLimit/MaxConnectionsPerIP limits, since we don't want to test these. diff --git a/network/wsNetwork.go b/network/wsNetwork.go index f222d2ff27..80be0da43c 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -214,9 +214,6 @@ type WebsocketNetwork struct { NetworkID protocol.NetworkID RandomID string - peerID p2p.PeerID - peerIDSigner identityChallengeSigner - ready atomic.Int32 readyChan chan struct{} @@ -631,8 +628,6 @@ func (wn *WebsocketNetwork) setup() { wn.outgoingMessagesBufferSize = outgoingMessagesBufferSize wn.wsMaxHeaderBytes = wsMaxHeaderBytes - wn.identityTracker = NewIdentityTracker() - wn.broadcaster = msgBroadcaster{ ctx: wn.ctx, log: wn.log, @@ -699,7 +694,7 @@ func (wn *WebsocketNetwork) Start() error { wn.messagesOfInterestEnc = MarshallMessageOfInterestMap(wn.messagesOfInterest) } - if wn.config.IsGossipServer() { + if wn.config.IsGossipServer() || wn.config.ForceRelayMessages { listener, err := net.Listen("tcp", wn.config.NetAddress) if err != nil { wn.log.Errorf("network could not listen %v: %s", wn.config.NetAddress, err) @@ -736,16 +731,11 @@ func (wn *WebsocketNetwork) Start() error { } } // if the network has a public address or a libp2p peer ID, use that as the name for connection deduplication - if wn.config.PublicAddress != "" || (wn.peerID != "" && wn.peerIDSigner != nil) { + if wn.config.PublicAddress != "" || wn.identityScheme != nil { wn.RegisterHandlers(identityHandlers) } if wn.identityScheme == nil { - if wn.peerID != "" && wn.peerIDSigner != nil { - wn.identityScheme = NewIdentityChallengeSchemeWithSigner(string(wn.peerID), wn.peerIDSigner) - } - if wn.config.PublicAddress != "" { - wn.identityScheme = NewIdentityChallengeScheme(wn.config.PublicAddress) - } + wn.identityScheme = NewIdentityChallengeScheme(NetIdentityDedupNames(wn.config.PublicAddress)) } wn.meshUpdateRequests <- meshRequest{false, nil} @@ -2305,7 +2295,7 @@ func (wn *WebsocketNetwork) SetPeerData(peer Peer, key string, value interface{} } // NewWebsocketNetwork constructor for websockets based gossip network -func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, nodeInfo NodeInfo, peerID p2p.PeerID, idSigner identityChallengeSigner) (wn *WebsocketNetwork, err error) { +func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, nodeInfo NodeInfo, identityOpts *identityOpts) (wn *WebsocketNetwork, err error) { pb := phonebook.MakePhonebook(config.ConnectionsRateLimitingCount, time.Duration(config.ConnectionsRateLimitingWindowSeconds)*time.Second) @@ -2324,8 +2314,6 @@ func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddre GenesisID: genesisID, NetworkID: networkID, nodeInfo: nodeInfo, - peerID: peerID, - peerIDSigner: idSigner, resolveSRVRecords: tools_network.ReadFromSRV, peerStater: peerConnectionStater{ log: log, @@ -2334,13 +2322,22 @@ func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddre }, } + // initialize net identity tracker either from the provided options or with a new one + if identityOpts != nil { + wn.identityScheme = identityOpts.scheme + wn.identityTracker = identityOpts.tracker + } + if wn.identityTracker == nil { + wn.identityTracker = NewIdentityTracker() + } + wn.setup() return wn, nil } // NewWebsocketGossipNode constructs a websocket network node and returns it as a GossipNode interface implementation func NewWebsocketGossipNode(log logging.Logger, config config.Local, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID) (gn GossipNode, err error) { - return NewWebsocketNetwork(log, config, phonebookAddresses, genesisID, networkID, nil, "", nil) + return NewWebsocketNetwork(log, config, phonebookAddresses, genesisID, networkID, nil, nil) } // SetPrioScheme specifies the network priority scheme for a network node diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 038a9d6e2d..6af3a697fc 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -128,12 +128,13 @@ func makeTestWebsocketNodeWithConfig(t testing.TB, conf config.Local, opts ...te log := logging.TestingLog(t) log.SetLevel(logging.Warn) wn := &WebsocketNetwork{ - log: log, - config: conf, - phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, - NetworkID: config.Devtestnet, - peerStater: peerConnectionStater{log: log}, + log: log, + config: conf, + phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), + GenesisID: genesisID, + NetworkID: config.Devtestnet, + peerStater: peerConnectionStater{log: log}, + identityTracker: NewIdentityTracker(), } // apply options to newly-created WebsocketNetwork, if provided for _, opt := range opts { @@ -1055,12 +1056,13 @@ func makeTestFilterWebsocketNode(t *testing.T, nodename string) *WebsocketNetwor dc.OutgoingMessageFilterBucketCount = 3 dc.OutgoingMessageFilterBucketSize = 128 wn := &WebsocketNetwork{ - log: logging.TestingLog(t).With("node", nodename), - config: dc, - phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, - NetworkID: config.Devtestnet, - peerStater: peerConnectionStater{log: logging.TestingLog(t).With("node", nodename)}, + log: logging.TestingLog(t).With("node", nodename), + config: dc, + phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), + GenesisID: genesisID, + NetworkID: config.Devtestnet, + peerStater: peerConnectionStater{log: logging.TestingLog(t).With("node", nodename)}, + identityTracker: noopIdentityTracker{}, } require.True(t, wn.config.EnableIncomingMessageFilter) wn.setup() @@ -1696,7 +1698,7 @@ type mockIdentityScheme struct { } func newMockIdentityScheme(t *testing.T) *mockIdentityScheme { - return &mockIdentityScheme{t: t, realScheme: NewIdentityChallengeScheme("any")} + return &mockIdentityScheme{t: t, realScheme: NewIdentityChallengeScheme(NetIdentityDedupNames("any"))} } func (i mockIdentityScheme) AttachChallenge(attach http.Header, addr string) identityChallengeValue { if i.attachChallenge != nil { @@ -1768,7 +1770,7 @@ func TestPeeringWithBadIdentityChallenge(t *testing.T) { { name: "incorrect address", attachChallenge: func(attach http.Header, addr string) identityChallengeValue { - s := NewIdentityChallengeScheme("does not matter") // make a scheme to use its keys + s := NewIdentityChallengeScheme(NetIdentityDedupNames("does not matter")) // make a scheme to use its keys c := identityChallenge{ Key: s.identityKeys.PublicKey(), Challenge: newIdentityChallengeValue(), @@ -1786,7 +1788,7 @@ func TestPeeringWithBadIdentityChallenge(t *testing.T) { { name: "bad signature", attachChallenge: func(attach http.Header, addr string) identityChallengeValue { - s := NewIdentityChallengeScheme("does not matter") // make a scheme to use its keys + s := NewIdentityChallengeScheme(NetIdentityDedupNames("does not matter")) // make a scheme to use its keys c := identityChallenge{ Key: s.identityKeys.PublicKey(), Challenge: newIdentityChallengeValue(), @@ -1901,7 +1903,7 @@ func TestPeeringWithBadIdentityChallengeResponse(t *testing.T) { { name: "incorrect original challenge", verifyAndAttachResponse: func(attach http.Header, h http.Header) (identityChallengeValue, crypto.PublicKey, error) { - s := NewIdentityChallengeScheme("does not matter") // make a scheme to use its keys + s := NewIdentityChallengeScheme(NetIdentityDedupNames("does not matter")) // make a scheme to use its keys // decode the header to an identityChallenge msg, _ := base64.StdEncoding.DecodeString(h.Get(IdentityChallengeHeader)) idChal := identityChallenge{} @@ -1924,7 +1926,7 @@ func TestPeeringWithBadIdentityChallengeResponse(t *testing.T) { { name: "bad signature", verifyAndAttachResponse: func(attach http.Header, h http.Header) (identityChallengeValue, crypto.PublicKey, error) { - s := NewIdentityChallengeScheme("does not matter") // make a scheme to use its keys + s := NewIdentityChallengeScheme(NetIdentityDedupNames("does not matter")) // make a scheme to use its keys // decode the header to an identityChallenge msg, _ := base64.StdEncoding.DecodeString(h.Get(IdentityChallengeHeader)) idChal := identityChallenge{} @@ -2056,7 +2058,7 @@ func TestPeeringWithBadIdentityVerification(t *testing.T) { resp := identityChallengeResponseSigned{} err = protocol.Decode(msg, &resp) require.NoError(t, err) - s := NewIdentityChallengeScheme("does not matter") // make a throwaway key + s := NewIdentityChallengeScheme(NetIdentityDedupNames("does not matter")) // make a throwaway key ver := identityVerificationMessageSigned{ // fill in correct ResponseChallenge field Msg: identityVerificationMessage{ResponseChallenge: resp.Msg.ResponseChallenge}, @@ -2074,7 +2076,7 @@ func TestPeeringWithBadIdentityVerification(t *testing.T) { // when the verification signature doesn't match the peer's expectation (the previously exchanged identity), peer is disconnected name: "bad signature", verifyResponse: func(t *testing.T, h http.Header, c identityChallengeValue) (crypto.PublicKey, []byte, error) { - s := NewIdentityChallengeScheme("does not matter") // make a throwaway key + s := NewIdentityChallengeScheme(NetIdentityDedupNames("does not matter")) // make a throwaway key ver := identityVerificationMessageSigned{ // fill in wrong ResponseChallenge field Msg: identityVerificationMessage{ResponseChallenge: newIdentityChallengeValue()}, @@ -2566,12 +2568,13 @@ func TestSlowPeerDisconnection(t *testing.T) { log := logging.TestingLog(t) log.SetLevel(logging.Info) wn := &WebsocketNetwork{ - log: log, - config: defaultConfig, - phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, - NetworkID: config.Devtestnet, - peerStater: peerConnectionStater{log: log}, + log: log, + config: defaultConfig, + phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), + GenesisID: genesisID, + NetworkID: config.Devtestnet, + peerStater: peerConnectionStater{log: log}, + identityTracker: noopIdentityTracker{}, } wn.setup() wn.broadcaster.slowWritingPeerMonitorInterval = time.Millisecond * 50 @@ -2642,12 +2645,13 @@ func TestForceMessageRelaying(t *testing.T) { log := logging.TestingLog(t) log.SetLevel(logging.Level(defaultConfig.BaseLoggerDebugLevel)) wn := &WebsocketNetwork{ - log: log, - config: defaultConfig, - phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, - NetworkID: config.Devtestnet, - peerStater: peerConnectionStater{log: log}, + log: log, + config: defaultConfig, + phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), + GenesisID: genesisID, + NetworkID: config.Devtestnet, + peerStater: peerConnectionStater{log: log}, + identityTracker: noopIdentityTracker{}, } wn.setup() wn.eventualReadyDelay = time.Second @@ -2737,12 +2741,13 @@ func TestCheckProtocolVersionMatch(t *testing.T) { log := logging.TestingLog(t) log.SetLevel(logging.Level(defaultConfig.BaseLoggerDebugLevel)) wn := &WebsocketNetwork{ - log: log, - config: defaultConfig, - phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, - NetworkID: config.Devtestnet, - peerStater: peerConnectionStater{log: log}, + log: log, + config: defaultConfig, + phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), + GenesisID: genesisID, + NetworkID: config.Devtestnet, + peerStater: peerConnectionStater{log: log}, + identityTracker: noopIdentityTracker{}, } wn.setup() wn.supportedProtocolVersions = []string{"2", "1"} @@ -4560,7 +4565,6 @@ func TestWsNetworkPhonebookMix(t *testing.T) { "test", "net", nil, - "", nil, ) require.NoError(t, err) diff --git a/node/follower_node.go b/node/follower_node.go index 117cc56e86..7d8fc64388 100644 --- a/node/follower_node.go +++ b/node/follower_node.go @@ -94,7 +94,7 @@ func MakeFollower(log logging.Logger, rootDir string, cfg config.Local, phoneboo node.config = cfg // tie network, block fetcher, and agreement services together - p2pNode, err := network.NewWebsocketNetwork(node.log, node.config, phonebookAddresses, genesis.ID(), genesis.Network, nil, "", nil) + p2pNode, err := network.NewWebsocketNetwork(node.log, node.config, phonebookAddresses, genesis.ID(), genesis.Network, nil, nil) if err != nil { log.Errorf("could not create websocket node: %v", err) return nil, err diff --git a/node/node.go b/node/node.go index 5f1baa56be..b6118aadc0 100644 --- a/node/node.go +++ b/node/node.go @@ -206,14 +206,14 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd return nil, err } } else if cfg.EnableP2P { - p2pNode, err = network.NewP2PNetwork(node.log, node.config, rootDir, phonebookAddresses, genesis.ID(), genesis.Network, node) + p2pNode, err = network.NewP2PNetwork(node.log, node.config, rootDir, phonebookAddresses, genesis.ID(), genesis.Network, node, nil) if err != nil { log.Errorf("could not create p2p node: %v", err) return nil, err } } else { var wsNode *network.WebsocketNetwork - wsNode, err = network.NewWebsocketNetwork(node.log, node.config, phonebookAddresses, genesis.ID(), genesis.Network, node, "", nil) + wsNode, err = network.NewWebsocketNetwork(node.log, node.config, phonebookAddresses, genesis.ID(), genesis.Network, node, nil) if err != nil { log.Errorf("could not create websocket node: %v", err) return nil, err diff --git a/node/node_test.go b/node/node_test.go index 3ea6d4a33d..e17e3e8d3f 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -875,6 +875,7 @@ func TestNodeHybridTopology(t *testing.T) { cfg.NetAddress = ni.wsNetAddr() cfg.EnableP2PHybridMode = true + cfg.PublicAddress = ni.wsNetAddr() cfg.EnableDHTProviders = true cfg.P2PPersistPeerID = true privKey, err := p2p.GetPrivKey(cfg, ni.rootDir) From d2de05cf0961da2195950932ee05c57dc043b0cf Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 21 Jun 2024 10:23:08 -0400 Subject: [PATCH 02/22] CR: fix identityOpts reuse concern extend error message in TestHybridNetwork_DuplicateConn --- network/hybridNetwork.go | 14 ++++++++------ network/hybridNetwork_test.go | 26 +++++++++++++++++++------- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index 72a634651d..d30e03cee2 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -42,15 +42,17 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p // supply alternate NetAddress for P2P network p2pcfg := cfg p2pcfg.NetAddress = cfg.P2PNetAddress - identityOpts := identityOpts{ - tracker: NewIdentityTracker(), - } - p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID, nodeInfo, &identityOpts) + identityTracker := NewIdentityTracker() + p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID, nodeInfo, &identityOpts{tracker: identityTracker}) if err != nil { return nil, err } - identityOpts.scheme = NewIdentityChallengeScheme(NetIdentityDedupNames(cfg.PublicAddress, p2pnet.PeerID().String()), NetIdentitySigner(p2pnet.PeerIDSigner())) - wsnet, err := NewWebsocketNetwork(log, cfg, phonebookAddresses, genesisID, networkID, nodeInfo, &identityOpts) + + identOpts := identityOpts{ + tracker: identityTracker, + scheme: NewIdentityChallengeScheme(NetIdentityDedupNames(cfg.PublicAddress, p2pnet.PeerID().String()), NetIdentitySigner(p2pnet.PeerIDSigner())), + } + wsnet, err := NewWebsocketNetwork(log, cfg, phonebookAddresses, genesisID, networkID, nodeInfo, &identOpts) if err != nil { return nil, err } diff --git a/network/hybridNetwork_test.go b/network/hybridNetwork_test.go index 689f575cdf..6e65aa01fa 100644 --- a/network/hybridNetwork_test.go +++ b/network/hybridNetwork_test.go @@ -42,7 +42,7 @@ func TestHybridNetwork_DuplicateConn(t *testing.T) { log := logging.TestingLog(t) const p2pKeyDir = "" - identDiscValue := int(networkPeerIdentityDisconnect.GetUint64Value()) + identDiscValue := networkPeerIdentityDisconnect.GetUint64Value() relayCfg := cfg relayCfg.ForceRelayMessages = true @@ -101,7 +101,7 @@ func TestHybridNetwork_DuplicateConn(t *testing.T) { // ensure initial connections are done require.Eventually(t, func() bool { - return len(netA.GetPeers(PeersConnectedIn)) == 2+identDiscValue + return len(netA.GetPeers(PeersConnectedIn)) == 2 }, 3*time.Second, 50*time.Millisecond) // start the second half of the hybrid net @@ -113,24 +113,36 @@ func TestHybridNetwork_DuplicateConn(t *testing.T) { // wait for connection attempts. nodes need some time to make connections, // and instead of `time.Sleep(1 * time.Second)` the networkPeerIdentityDisconnect net identity counter is used. // Since this test is not parallel the networkPeerIdentityDisconnect should not be modified from outside. + // Both netB and netC are attempting to connect but netA could also open an outgoing stream in netB or netC connection. + // So, the counter should be at least 2+identDiscValue. require.Eventually(t, func() bool { - return networkPeerIdentityDisconnect.GetUint64Value() == 2 - }, 2*time.Second, 50*time.Millisecond) + return networkPeerIdentityDisconnect.GetUint64Value() >= 2+identDiscValue + }, 3*time.Second, 50*time.Millisecond) // now count connections // netA should have 2 connections, not 4 // netB should have 1 connection (via p2p) // netC should have 1 connection (via ws) + var netAIn, netAOut, netBIn, netBOut, netCIn, netCOut int require.Eventually(t, func() bool { return len(netB.GetPeers(PeersConnectedOut)) == 1 - }, time.Second, 50*time.Millisecond) + }, 3*time.Second, 50*time.Millisecond) require.Eventually(t, func() bool { return len(netC.GetPeers(PeersConnectedOut)) == 1 - }, time.Second, 50*time.Millisecond) + }, 3*time.Second, 50*time.Millisecond) require.Eventually(t, func() bool { - return len(netA.GetPeers(PeersConnectedIn)) == 2 + netAIn = len(netA.GetPeers(PeersConnectedIn)) + netAOut = len(netA.GetPeers(PeersConnectedOut)) + netBIn = len(netB.GetPeers(PeersConnectedIn)) + netBOut = len(netB.GetPeers(PeersConnectedOut)) + netCIn = len(netC.GetPeers(PeersConnectedIn)) + netCOut = len(netC.GetPeers(PeersConnectedOut)) + if netAIn != 2 { + log.Infof("netA in/out: %d/%d, netB in/out: %d/%d, netC in/out: %d/%d\n", netAIn, netAOut, netBIn, netBOut, netCIn, netCOut) + } + return netAIn == 2 }, 3*time.Second, 50*time.Millisecond) } From a1a33b8a3fb452063d4004554cf109c0f7a329ed Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 24 Jun 2024 14:17:50 -0400 Subject: [PATCH 03/22] gossipsub: fix pubsub.processLoop and test termination race * there is no way to say if pubsub.processLoop terminated so bump logging level to exclude it from logging. fix build error after Verify removal --- network/p2p/logger.go | 23 +++++++++++++++++++++-- network/p2p/p2p.go | 1 - network/p2p/peerID_test.go | 5 ----- network/p2pNetwork.go | 14 +++++++++++++- 4 files changed, 34 insertions(+), 9 deletions(-) diff --git a/network/p2p/logger.go b/network/p2p/logger.go index 26c738e1e1..741755745b 100644 --- a/network/p2p/logger.go +++ b/network/p2p/logger.go @@ -19,6 +19,7 @@ package p2p import ( + "errors" "runtime" "strings" @@ -55,19 +56,37 @@ type loggingCore struct { zapcore.Core } +// ErrInvalidLogLevel is returned when an invalid log level is provided. +var ErrInvalidLogLevel = errors.New("invalid log level") + // EnableP2PLogging enables libp2p logging into the provided logger with the provided level. -func EnableP2PLogging(log logging.Logger, l logging.Level) { +func EnableP2PLogging(log logging.Logger, l logging.Level) error { core := loggingCore{ log: log, level: l, } + err := SetP2PLogLevel(l) + if err != nil { + return err + } + p2plogging.SetPrimaryCore(&core) + return nil +} + +// SetP2PLogLevel sets the log level for libp2p logging. +func SetP2PLogLevel(l logging.Level) error { + var seen bool for p2pLevel, logLevel := range levelsMap { if logLevel == l { p2plogging.SetAllLoggers(p2plogging.LogLevel(p2pLevel)) + seen = true break } } - p2plogging.SetPrimaryCore(&core) + if !seen { + return ErrInvalidLogLevel + } + return nil } func (c *loggingCore) Enabled(l zapcore.Level) bool { diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index ac0489d5e1..6fe7aec796 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -155,7 +155,6 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho return nil, err } return &serviceImpl{ - log: log, listenAddr: listenAddr, host: h, diff --git a/network/p2p/peerID_test.go b/network/p2p/peerID_test.go index 6d9599ee33..beed18868c 100644 --- a/network/p2p/peerID_test.go +++ b/network/p2p/peerID_test.go @@ -26,7 +26,6 @@ import ( "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" - "github.com/algorand/go-algorand/data/hashable" "github.com/algorand/go-algorand/test/partitiontest" ) @@ -114,11 +113,7 @@ func TestPeerIDChallengeSigner(t *testing.T) { data := make([]byte, 111) crypto.RandBytes(data) - msg := hashable.Message{Message: string(data)} - signer := PeerIDChallengeSigner{key: privKey} - sig := signer.Sign(msg) - require.True(t, signer.Verify(msg, sig)) pubKey := privKey.GetPublic() pubKeyRaw, err := pubKey.Raw() require.NoError(t, err) diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index d43a62f56b..f7ef6e03bb 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -272,7 +272,10 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo net.identityTracker = noopIdentityTracker{} } - p2p.EnableP2PLogging(log, logging.Level(cfg.BaseLoggerDebugLevel)) + err = p2p.EnableP2PLogging(log, logging.Level(cfg.BaseLoggerDebugLevel)) + if err != nil { + return nil, err + } h, la, err := p2p.MakeHost(cfg, datadir, pstore) if err != nil { @@ -387,7 +390,16 @@ func (n *P2PNetwork) Stop() { n.wsPeersConnectivityCheckTicker = nil } n.innerStop() + + // This is a workaround for a race between PubSub.processLoop (triggered by context cancellation below) termination + // and this function returning that causes main goroutine to exit before + // PubSub.processLoop goroutine finishes logging its termination message + // to already closed logger. Not seen in wild, only in tests. + if n.log.GetLevel() >= logging.Warn { + _ = p2p.SetP2PLogLevel(logging.Warn) + } n.ctxCancel() + n.service.Close() n.bootstrapperStop() n.httpServer.Close() From 808ba6752bd0255e7b9ab4ad6d3933ff331ea274 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 27 Jun 2024 20:09:15 -0400 Subject: [PATCH 04/22] netdeploy: support hybrid p2p scenarios --- netdeploy/remote/deployedNetwork.go | 10 ++ netdeploy/remote/nodeConfig.go | 2 + netdeploy/remote/nodecfg/nodeConfigurator.go | 11 +- netdeploy/remote/nodecfg/nodeDir.go | 36 ++++- .../recipes/scenario1s-p2p/Makefile | 10 +- .../recipes/scenario1s-p2p/README.md | 9 +- .../scenario1s-p2p/copy-node-configs.py | 132 +++++++++++++++--- 7 files changed, 180 insertions(+), 30 deletions(-) diff --git a/netdeploy/remote/deployedNetwork.go b/netdeploy/remote/deployedNetwork.go index 25de422026..a58d8a15fb 100644 --- a/netdeploy/remote/deployedNetwork.go +++ b/netdeploy/remote/deployedNetwork.go @@ -1004,6 +1004,16 @@ func createHostSpec(host HostConfig, template cloudHost) (hostSpec cloudHostSpec ports[port] = true portList = append(portList, strconv.Itoa(port)) } + if node.P2PNetAddress != "" { + port, err = extractPublicPort(node.P2PNetAddress) + if err != nil { + return + } + if !ports[port] { + ports[port] = true + portList = append(portList, strconv.Itoa(port)) + } + } } // See if the APIEndpoint is open to the public, and if so add it diff --git a/netdeploy/remote/nodeConfig.go b/netdeploy/remote/nodeConfig.go index 4880d76eb9..bd4b63dac8 100644 --- a/netdeploy/remote/nodeConfig.go +++ b/netdeploy/remote/nodeConfig.go @@ -35,6 +35,8 @@ type NodeConfig struct { DeadlockOverride int `json:",omitempty"` // -1 = Disable deadlock detection, 0 = Use Default for build, 1 = Enable ConfigJSONOverride string `json:",omitempty"` // Raw json to merge into config.json after other modifications are complete P2PBootstrap bool // True if this node should be a p2p bootstrap node and registered in DNS + P2PNetAddress string `json:",omitempty"` + PublicAddress bool // NodeNameMatchRegex is tested against Name in generated configs and if matched the rest of the configs in this record are applied as a template NodeNameMatchRegex string `json:",omitempty"` diff --git a/netdeploy/remote/nodecfg/nodeConfigurator.go b/netdeploy/remote/nodecfg/nodeConfigurator.go index 842570bfc8..90c3f012ac 100644 --- a/netdeploy/remote/nodecfg/nodeConfigurator.go +++ b/netdeploy/remote/nodecfg/nodeConfigurator.go @@ -93,6 +93,10 @@ func (nc *nodeConfigurator) apply(rootConfigDir, rootNodeDir string) (err error) nc.genesisFile = filepath.Join(rootConfigDir, "genesisdata", config.GenesisJSONFile) nc.genesisData, err = bookkeeping.LoadGenesisFromFile(nc.genesisFile) + if err != nil { + return fmt.Errorf("error loading genesis from '%s': %v", nc.genesisFile, err) + + } nodeDirs, err := nc.prepareNodeDirs(nc.config.Nodes, rootConfigDir, rootNodeDir) if err != nil { return fmt.Errorf("error preparing node directories: %v", err) @@ -198,6 +202,11 @@ func (nc *nodeConfigurator) prepareNodeDirs(configs []remote.NodeConfig, rootCon return } +// getHostName creates a DNS name for a host +func (nc *nodeConfigurator) getNetworkHostName() string { + return nc.config.Name + "." + string(nc.genesisData.Network) + ".algodev.network" +} + func (nc *nodeConfigurator) registerDNSRecords() (err error) { cfZoneID, cfToken, err := getClouldflareCredentials() if err != nil { @@ -215,7 +224,7 @@ func (nc *nodeConfigurator) registerDNSRecords() (err error) { // If we need to register anything, first register a DNS entry // to map our network DNS name to our public name (or IP) provided to nodecfg // Network HostName = eg r1.testnet.algodev.network - networkHostName := nc.config.Name + "." + string(nc.genesisData.Network) + ".algodev.network" + networkHostName := nc.getNetworkHostName() isIP := net.ParseIP(nc.dnsName) != nil var recordType string if isIP { diff --git a/netdeploy/remote/nodecfg/nodeDir.go b/netdeploy/remote/nodecfg/nodeDir.go index bdfc037438..8d8787dd11 100644 --- a/netdeploy/remote/nodecfg/nodeDir.go +++ b/netdeploy/remote/nodecfg/nodeDir.go @@ -104,6 +104,11 @@ func (nd *nodeDir) configure() (err error) { return } + if err = nd.configurePublicAddress(nd.PublicAddress); err != nil { + fmt.Fprintf(os.Stdout, "Error during configurePublicAddress: %s\n", err) + return + } + if err = nd.configureP2PDNSBootstrap(nd.P2PBootstrap); err != nil { fmt.Fprintf(os.Stdout, "Error during configureP2PDNSBootstrap: %s\n", err) return @@ -160,10 +165,35 @@ func (nd *nodeDir) configureNetAddress() (err error) { nd.configurator.addRelaySrv(bootstrapRecord.PrimarySRVBootstrap, nd.NetAddress) } } + if nd.P2PNetAddress != "" { + fmt.Fprintf(os.Stdout, " - Assigning P2PNetAddress: %s\n", nd.P2PNetAddress) + nd.config.P2PNetAddress = nd.P2PNetAddress + } err = nd.saveConfig() return } +func (nd *nodeDir) configurePublicAddress(publicAddress bool) error { + if !publicAddress { + return nil + } + + if err := nd.ensureConfig(); err != nil { + return err + } + + if !nd.IsRelay() { + return errors.New("publicAddress is only valid for relay nodes") + } + + if nd.NetAddress[0] == ':' { + networkHostName := nd.configurator.getNetworkHostName() + nd.NetAddress + nd.config.PublicAddress = networkHostName + } + + return nil +} + func (nd *nodeDir) configureP2PDNSBootstrap(p2pBootstrap bool) error { if !p2pBootstrap { return nil @@ -179,7 +209,7 @@ func (nd *nodeDir) configureP2PDNSBootstrap(p2pBootstrap bool) error { if !nd.config.EnableP2P && !nd.config.EnableP2PHybridMode { return errors.New("p2p bootstrap requires EnableP2P or EnableP2PHybridMode to be set") } - if nd.NetAddress == "" && nd.config.P2PNetAddress == "" { + if nd.NetAddress == "" && nd.P2PNetAddress == "" { return errors.New("p2p bootstrap requires NetAddress or P2PNetAddress to be set") } if !nd.config.EnableGossipService { @@ -187,8 +217,8 @@ func (nd *nodeDir) configureP2PDNSBootstrap(p2pBootstrap bool) error { } netAddress := nd.NetAddress - if nd.config.P2PNetAddress != "" { - netAddress = nd.config.P2PNetAddress + if nd.P2PNetAddress != "" { + netAddress = nd.P2PNetAddress } key, err := p2p.GetPrivKey(config.Local{P2PPersistPeerID: true}, nd.dataDir) diff --git a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/Makefile b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/Makefile index f4ec4b3c1f..7222fd3882 100644 --- a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/Makefile +++ b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/Makefile @@ -3,15 +3,17 @@ PARAMS=-w 20 -R 8 -N 20 -n 20 --npn-algod-nodes 10 --node-template node.json --r .PHONY: clean all +HYBRID ?= no + all: net.json genesis.json topology.json -node.json nonPartNode.json relay.json: - python3 copy-node-configs.py +node.json nonPartNode.json relay.json: copy-node-configs.py + python3 copy-node-configs.py --hybrid=${HYBRID} -net.json: node.json nonPartNode.json relay.json ${GOPATH}/bin/netgoal Makefile +net.json: node.json nonPartNode.json relay.json Makefile netgoal generate -t net -r /tmp/wat -o net.json ${PARAMS} -genesis.json: ${GOPATH}/bin/netgoal Makefile +genesis.json: Makefile netgoal generate -t genesis -r /tmp/wat -o genesis.l.json ${PARAMS} jq '.LastPartKeyRound=5000|.NetworkName="s1s-p2p"|.ConsensusProtocol="future"' < genesis.l.json > genesis.json rm genesis.l.json diff --git a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/README.md b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/README.md index 1cad95bc2d..04e8b986c7 100644 --- a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/README.md +++ b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/README.md @@ -7,10 +7,17 @@ This is a copy of scenario1s with the following changes in nodes configuration: ## Build ```sh -export GOPATH=~/go make ``` +If want to configure a hybrid net, set the `HYBRID` mode parameter to: + - `p2p` meaning all nodes are p2pnet and 50% of them are hybrid + - `ws` meaning all nodes are wsnet and 50% of them are hybrid + +```sh +make -D HYBRID=p2p +``` + ## Run Run as usual cluster test scenario with algonet. diff --git a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py index 6ffbc01d8d..be0e6a1527 100644 --- a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py +++ b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py @@ -5,14 +5,116 @@ 3. Set DNSSecurityFlags: 0 to all configs """ +import argparse +import copy import json import os CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) SCENARIO1S_DIR = os.path.join(CURRENT_DIR, "..", "scenario1s") +def make_p2p_net(*args): + """convert config to a pure p2p network""" + for config in args: + override_json = json.loads(config.get("ConfigJSONOverride", "{}")) + override_json["EnableP2P"] = True + override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC + config["ConfigJSONOverride"] = json.dumps(override_json) + + net_address = config.get("NetAddress") + if net_address: + config["P2PBootstrap"] = True + altconfigs = config.get("AltConfigs", []) + if altconfigs: + for i, altconfig in enumerate(altconfigs): + override_json = json.loads(altconfig.get("ConfigJSONOverride", "{}")) + override_json["EnableP2P"] = True + override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC + altconfigs[i]["ConfigJSONOverride"] = json.dumps(override_json) + config["AltConfigs"] = altconfigs + + +def make_hybrid_p2p_net(*args): + """convert config to a hybrid p2p network: + - half of relays become hybrid and receive public address + - half of non-relay nodes become hybrid + - AltConfigs are used for hybrid nodes with FractionApply=0.5 + - Only one AltConfigs is supported and its FractionApply is forced to 0.5 + """ + for config in args: + override_json = json.loads(config.get("ConfigJSONOverride", "{}")) + override_json["EnableP2P"] = True + override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC + config["ConfigJSONOverride"] = json.dumps(override_json) + + net_address = config.get("NetAddress") + altconfigs = config.get("AltConfigs") + altconfig = None + if altconfigs: + altconfig = altconfigs[0] + else: + altconfig = copy.deepcopy(config) + + override_json = json.loads(altconfig.get("ConfigJSONOverride", "{}")) + override_json["EnableP2PHybridMode"] = True + override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC + altconfig["ConfigJSONOverride"] = json.dumps(override_json) + if net_address: # relay, set public address + altconfig["P2PBootstrap"] = True + altconfig["P2PNetAddress"] = "{{NetworkPort1}}" + altconfig["PublicAddress"] = True + altconfig['FractionApply'] = 0.5 + + altconfigs = [altconfig] + config["AltConfigs"] = altconfigs + + +def make_hybrid_ws_net(*args): + """convert config to a hybrid ws network: + - half of relays become hybrid and receive public address + - half of non-relay nodes become hybrid + - AltConfigs are used for hybrid nodes with FractionApply=0.5 + - Only one AltConfigs is supported and its FractionApply is forced to 0.5 + """ + for config in args: + override_json = json.loads(config.get("ConfigJSONOverride", "{}")) + override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC + config["ConfigJSONOverride"] = json.dumps(override_json) + + net_address = config.get("NetAddress") + altconfigs = config.get("AltConfigs") + altconfig = None + if altconfigs: + altconfig = altconfigs[0] + else: + altconfig = copy.deepcopy(config) + + override_json = json.loads(altconfig.get("ConfigJSONOverride", "{}")) + override_json["EnableP2PHybridMode"] = True + override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC + altconfig["ConfigJSONOverride"] = json.dumps(override_json) + if net_address: # relay, set public address + altconfig["P2PBootstrap"] = True + altconfig["P2PNetAddress"] = "{{NetworkPort1}}" + altconfig["PublicAddress"] = True + altconfig['FractionApply'] = 0.5 + + altconfigs = [altconfig] + config["AltConfigs"] = altconfigs + + def main(): """main""" + ap = argparse.ArgumentParser() + ap.add_argument('--hybrid', type=str, help='Hybrid mode: p2p, ws') + args = ap.parse_args() + + hybrid_mode = args.hybrid + if hybrid_mode not in ("p2p", "ws"): + hybrid_mode = None + + print('Hybrid mode:', hybrid_mode) + with open(os.path.join(SCENARIO1S_DIR, "node.json"), "r") as f: node = json.load(f) with open(os.path.join(SCENARIO1S_DIR, "relay.json"), "r") as f: @@ -20,27 +122,15 @@ def main(): with open(os.path.join(SCENARIO1S_DIR, "nonPartNode.json"), "r") as f: non_part_node = json.load(f) - # make all relays P2PBootstrap'able - relay["P2PBootstrap"] = True - - # enable P2P for all configs - for config in (node, relay, non_part_node): - override = config.get("ConfigJSONOverride") - if override: - override_json = json.loads(override) - override_json["EnableP2P"] = True - override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC - config["ConfigJSONOverride"] = json.dumps(override_json) - altconfigs = config.get("AltConfigs", []) - if altconfigs: - for i, altconfig in enumerate(altconfigs): - override = altconfig.get("ConfigJSONOverride") - if override: - override_json = json.loads(override) - override_json["EnableP2P"] = True - override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC - altconfigs[i]["ConfigJSONOverride"] = json.dumps(override_json) - config["AltConfigs"] = altconfigs + # in p2p-only mode all relays are P2PBootstrap-able + if not hybrid_mode: + make_p2p_net(node, relay, non_part_node) + elif hybrid_mode == 'p2p': + make_hybrid_p2p_net(node, relay, non_part_node) + elif hybrid_mode == 'ws': + make_hybrid_ws_net(node, relay, non_part_node) + else: + raise ValueError(f"Invalid hybrid mode: { hybrid_mode }") with open("node.json", "w") as f: json.dump(node, f, indent=4) From 0c6c43e09f82defd0b1caabe6beec005bf87fc2e Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 1 Jul 2024 16:03:56 -0400 Subject: [PATCH 05/22] more logging into TestHybridNetwork_DuplicateConn --- network/hybridNetwork_test.go | 58 +++++++++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 12 deletions(-) diff --git a/network/hybridNetwork_test.go b/network/hybridNetwork_test.go index 6e65aa01fa..dba860348e 100644 --- a/network/hybridNetwork_test.go +++ b/network/hybridNetwork_test.go @@ -115,34 +115,68 @@ func TestHybridNetwork_DuplicateConn(t *testing.T) { // Since this test is not parallel the networkPeerIdentityDisconnect should not be modified from outside. // Both netB and netC are attempting to connect but netA could also open an outgoing stream in netB or netC connection. // So, the counter should be at least 2+identDiscValue. + const waitFor = 3 * time.Second + const checkEvery = 50 * time.Millisecond + const maxTicks = int(waitFor / checkEvery) + const debugThreshold = maxTicks - maxTicks/20 // log last 5% of ticks + require.Greater(t, debugThreshold, 1) + require.Less(t, debugThreshold, maxTicks) + tickCounter := 0 require.Eventually(t, func() bool { + if tickCounter >= debugThreshold { + log.Infof("networkPeerIdentityDisconnect: %d\n", networkPeerIdentityDisconnect.GetUint64Value()) + } + tickCounter++ return networkPeerIdentityDisconnect.GetUint64Value() >= 2+identDiscValue - }, 3*time.Second, 50*time.Millisecond) + }, waitFor, checkEvery) // now count connections // netA should have 2 connections, not 4 // netB should have 1 connection (via p2p) // netC should have 1 connection (via ws) - var netAIn, netAOut, netBIn, netBOut, netCIn, netCOut int + tickCounter = 0 require.Eventually(t, func() bool { + if tickCounter >= debugThreshold { + netAIn := len(netA.GetPeers(PeersConnectedIn)) + netAOut := len(netA.GetPeers(PeersConnectedOut)) + netBIn := len(netB.GetPeers(PeersConnectedIn)) + netBOut := len(netB.GetPeers(PeersConnectedOut)) + netCIn := len(netC.GetPeers(PeersConnectedIn)) + netCOut := len(netC.GetPeers(PeersConnectedOut)) + log.Infof("netA in/out: %d/%d, netB in/out: %d/%d, netC in/out: %d/%d\n", netAIn, netAOut, netBIn, netBOut, netCIn, netCOut) + } + tickCounter++ return len(netB.GetPeers(PeersConnectedOut)) == 1 - }, 3*time.Second, 50*time.Millisecond) + }, waitFor, checkEvery) + tickCounter = 0 require.Eventually(t, func() bool { + if tickCounter >= debugThreshold { + netAIn := len(netA.GetPeers(PeersConnectedIn)) + netAOut := len(netA.GetPeers(PeersConnectedOut)) + netBIn := len(netB.GetPeers(PeersConnectedIn)) + netBOut := len(netB.GetPeers(PeersConnectedOut)) + netCIn := len(netC.GetPeers(PeersConnectedIn)) + netCOut := len(netC.GetPeers(PeersConnectedOut)) + log.Infof("netA in/out: %d/%d, netB in/out: %d/%d, netC in/out: %d/%d\n", netAIn, netAOut, netBIn, netBOut, netCIn, netCOut) + } + tickCounter++ return len(netC.GetPeers(PeersConnectedOut)) == 1 - }, 3*time.Second, 50*time.Millisecond) + }, waitFor, checkEvery) + tickCounter = 0 require.Eventually(t, func() bool { - netAIn = len(netA.GetPeers(PeersConnectedIn)) - netAOut = len(netA.GetPeers(PeersConnectedOut)) - netBIn = len(netB.GetPeers(PeersConnectedIn)) - netBOut = len(netB.GetPeers(PeersConnectedOut)) - netCIn = len(netC.GetPeers(PeersConnectedIn)) - netCOut = len(netC.GetPeers(PeersConnectedOut)) - if netAIn != 2 { + if tickCounter >= debugThreshold { + netAIn := len(netA.GetPeers(PeersConnectedIn)) + netAOut := len(netA.GetPeers(PeersConnectedOut)) + netBIn := len(netB.GetPeers(PeersConnectedIn)) + netBOut := len(netB.GetPeers(PeersConnectedOut)) + netCIn := len(netC.GetPeers(PeersConnectedIn)) + netCOut := len(netC.GetPeers(PeersConnectedOut)) log.Infof("netA in/out: %d/%d, netB in/out: %d/%d, netC in/out: %d/%d\n", netAIn, netAOut, netBIn, netBOut, netCIn, netCOut) } - return netAIn == 2 + tickCounter++ + return len(netA.GetPeers(PeersConnectedIn)) == 2 }, 3*time.Second, 50*time.Millisecond) } From 4350de3aa3f176e7c1d30dc59f3c36811bec74a2 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 1 Jul 2024 16:55:18 -0400 Subject: [PATCH 06/22] CR fixes --- config/config_test.go | 2 +- network/netidentity_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index 7da3156157..4ce68c6a8e 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -143,7 +143,7 @@ func TestLocal_FixupConfig(t *testing.T) { EnableP2PHybridMode: true, } c2, err = fixupConfig(c1) - require.Error(t, err) + require.ErrorContains(t, err, "PublicAddress must be specified when EnableP2PHybridMode is set") c1 = Local{ EnableP2PHybridMode: true, diff --git a/network/netidentity_test.go b/network/netidentity_test.go index f40dec6832..a54628a6fe 100644 --- a/network/netidentity_test.go +++ b/network/netidentity_test.go @@ -35,10 +35,10 @@ func TestIdentityChallengeSchemeAttachIfEnabled(t *testing.T) { i0 := NewIdentityChallengeScheme() i := NewIdentityChallengeScheme(NetIdentityDedupNames("")) require.Equal(t, i0, i) - require.Empty(t, i) + require.Zero(t, *i) chal := i.AttachChallenge(h, "other") - require.Empty(t, h.Get(IdentityChallengeHeader)) - require.Empty(t, chal) + require.Zero(t, h.Get(IdentityChallengeHeader)) + require.Zero(t, chal) j := NewIdentityChallengeScheme(NetIdentityDedupNames("yes")) chal = j.AttachChallenge(h, "other") @@ -386,7 +386,7 @@ func TestNewIdentityChallengeScheme(t *testing.T) { require.Equal(t, s1, s2) require.Equal(t, s2, s3) require.Equal(t, s3, s4) - require.Empty(t, s1) + require.Zero(t, *s1) s1 = NewIdentityChallengeScheme(NetIdentityDedupNames("a", "a")) s2 = NewIdentityChallengeScheme(NetIdentityDedupNames("a"), NetIdentityDedupNames("a")) From 798722eea958fecf3207581752a2aed98fc6881b Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 1 Jul 2024 17:15:28 -0400 Subject: [PATCH 07/22] fix hybrid scenario generation --- .../recipes/scenario1s-p2p/copy-node-configs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py index be0e6a1527..2bd6d8f1ed 100644 --- a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py +++ b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py @@ -61,7 +61,7 @@ def make_hybrid_p2p_net(*args): altconfig["ConfigJSONOverride"] = json.dumps(override_json) if net_address: # relay, set public address altconfig["P2PBootstrap"] = True - altconfig["P2PNetAddress"] = "{{NetworkPort1}}" + altconfig["P2PNetAddress"] = "{{NetworkPort2}}" altconfig["PublicAddress"] = True altconfig['FractionApply'] = 0.5 @@ -95,7 +95,7 @@ def make_hybrid_ws_net(*args): altconfig["ConfigJSONOverride"] = json.dumps(override_json) if net_address: # relay, set public address altconfig["P2PBootstrap"] = True - altconfig["P2PNetAddress"] = "{{NetworkPort1}}" + altconfig["P2PNetAddress"] = "{{NetworkPort2}}" altconfig["PublicAddress"] = True altconfig['FractionApply'] = 0.5 From 2fbe97992905f5ffe965c76bb7a1c6b6ffe03368 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 1 Jul 2024 18:00:31 -0400 Subject: [PATCH 08/22] fixes to hybrid test scripts --- config/config.go | 2 +- config/config_test.go | 14 ++++++++++++++ .../recipes/scenario1s-p2p/copy-node-configs.py | 3 +++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index 8f513f9aee..af4f38c307 100644 --- a/config/config.go +++ b/config/config.go @@ -167,7 +167,7 @@ func fixupConfig(source Local) (Local, error) { } // In hybrid mode we want to prevent connections from the same node over both P2P and WS. // The only way it is supported at the moment is to use net identity challenge that is based on PublicAddress. - if source.EnableP2PHybridMode && source.PublicAddress == "" { + if (source.NetAddress != "" || source.P2PNetAddress != "") && source.EnableP2PHybridMode && source.PublicAddress == "" { return source, errors.New("PublicAddress must be specified when EnableP2PHybridMode is set") } return source, nil diff --git a/config/config_test.go b/config/config_test.go index 4ce68c6a8e..2e0f6bad69 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -143,6 +143,20 @@ func TestLocal_FixupConfig(t *testing.T) { EnableP2PHybridMode: true, } c2, err = fixupConfig(c1) + require.NoError(t, err) + + c1 = Local{ + NetAddress: "test1", + EnableP2PHybridMode: true, + } + c2, err = fixupConfig(c1) + require.ErrorContains(t, err, "PublicAddress must be specified when EnableP2PHybridMode is set") + + c1 = Local{ + P2PNetAddress: "test1", + EnableP2PHybridMode: true, + } + c2, err = fixupConfig(c1) require.ErrorContains(t, err, "PublicAddress must be specified when EnableP2PHybridMode is set") c1 = Local{ diff --git a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py index 2bd6d8f1ed..ea5639ae72 100644 --- a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py +++ b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py @@ -48,6 +48,9 @@ def make_hybrid_p2p_net(*args): config["ConfigJSONOverride"] = json.dumps(override_json) net_address = config.get("NetAddress") + if net_address: + config["P2PBootstrap"] = True + altconfigs = config.get("AltConfigs") altconfig = None if altconfigs: From d12f93f5e865aaa4d9d2dfed057b116f3b098c63 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 2 Jul 2024 13:25:29 -0400 Subject: [PATCH 09/22] do not assign NetAddress to SRV in p2p mode --- netdeploy/remote/nodecfg/nodeConfigurator.go | 11 +++++++---- netdeploy/remote/nodecfg/nodeDir.go | 13 +++++++++---- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/netdeploy/remote/nodecfg/nodeConfigurator.go b/netdeploy/remote/nodecfg/nodeConfigurator.go index 90c3f012ac..8e6bea9718 100644 --- a/netdeploy/remote/nodecfg/nodeConfigurator.go +++ b/netdeploy/remote/nodecfg/nodeConfigurator.go @@ -219,6 +219,7 @@ func (nc *nodeConfigurator) registerDNSRecords() (err error) { const weight = 1 const relayBootstrap = "_algobootstrap" const metricsSrv = "_metrics" + const tcpProto = "_tcp" const proxied = false // If we need to register anything, first register a DNS entry @@ -241,9 +242,10 @@ func (nc *nodeConfigurator) registerDNSRecords() (err error) { if parseErr != nil { return parseErr } - fmt.Fprintf(os.Stdout, "...... Adding Relay SRV Record '%s' -> '%s' .\n", entry.srvName, networkHostName) + fmt.Fprintf(os.Stdout, "...... Adding Relay SRV Record [%s.%s] '%s' [%d %d] -> '%s' .\n", + relayBootstrap, tcpProto, entry.srvName, priority, port, networkHostName) err = cloudflareDNS.SetSRVRecord(context.Background(), entry.srvName, networkHostName, - cloudflare.AutomaticTTL, priority, uint(port), relayBootstrap, "_tcp", weight) + cloudflare.AutomaticTTL, priority, uint(port), relayBootstrap, tcpProto, weight) if err != nil { return } @@ -255,9 +257,10 @@ func (nc *nodeConfigurator) registerDNSRecords() (err error) { fmt.Fprintf(os.Stdout, "Error parsing port for srv record: %s (port %v)\n", parseErr, entry) return parseErr } - fmt.Fprintf(os.Stdout, "...... Adding Metrics SRV Record '%s' -> '%s' .\n", entry.srvName, networkHostName) + fmt.Fprintf(os.Stdout, "...... Adding Metrics SRV Record [%s.%s] '%s' [%d %d] -> '%s' .\n", + metricsSrv, tcpProto, entry.srvName, priority, port, networkHostName) err = cloudflareDNS.SetSRVRecord(context.Background(), entry.srvName, networkHostName, - cloudflare.AutomaticTTL, priority, uint(port), metricsSrv, "_tcp", weight) + cloudflare.AutomaticTTL, priority, uint(port), metricsSrv, tcpProto, weight) if err != nil { fmt.Fprintf(os.Stdout, "Error creating srv record: %s (%v)\n", err, entry) return diff --git a/netdeploy/remote/nodecfg/nodeDir.go b/netdeploy/remote/nodecfg/nodeDir.go index 8d8787dd11..4ea42539a8 100644 --- a/netdeploy/remote/nodecfg/nodeDir.go +++ b/netdeploy/remote/nodecfg/nodeDir.go @@ -160,6 +160,9 @@ func (nd *nodeDir) configureNetAddress() (err error) { fmt.Fprintf(os.Stdout, " - Assigning NetAddress: %s\n", nd.NetAddress) nd.config.NetAddress = nd.NetAddress if nd.IsRelay() && nd.NetAddress[0] == ':' { + if nd.config.EnableP2P && !nd.config.EnableP2PHybridMode { + fmt.Fprintf(os.Stdout, " - skipping relay addresses - p2p mode\n") + } fmt.Fprintf(os.Stdout, " - adding to relay addresses\n") for _, bootstrapRecord := range nd.config.DNSBootstrapArray(nd.configurator.genesisData.Network) { nd.configurator.addRelaySrv(bootstrapRecord.PrimarySRVBootstrap, nd.NetAddress) @@ -177,15 +180,17 @@ func (nd *nodeDir) configurePublicAddress(publicAddress bool) error { if !publicAddress { return nil } + if !nd.IsRelay() { + return errors.New("publicAddress is only valid for relay nodes") + } + if nd.config.EnableP2P && !nd.config.EnableP2PHybridMode { + return errors.New("publicAddress is only valid websocket gossip node or a hybrid mode node") + } if err := nd.ensureConfig(); err != nil { return err } - if !nd.IsRelay() { - return errors.New("publicAddress is only valid for relay nodes") - } - if nd.NetAddress[0] == ':' { networkHostName := nd.configurator.getNetworkHostName() + nd.NetAddress nd.config.PublicAddress = networkHostName From 9b281bdd3dac19f62658e2115d431c40d42e0fa3 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 2 Jul 2024 15:08:47 -0400 Subject: [PATCH 10/22] fix p2p netaddress and publicaddress --- netdeploy/remote/nodecfg/nodeDir.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/netdeploy/remote/nodecfg/nodeDir.go b/netdeploy/remote/nodecfg/nodeDir.go index 4ea42539a8..63acf0a8e5 100644 --- a/netdeploy/remote/nodecfg/nodeDir.go +++ b/netdeploy/remote/nodecfg/nodeDir.go @@ -162,10 +162,11 @@ func (nd *nodeDir) configureNetAddress() (err error) { if nd.IsRelay() && nd.NetAddress[0] == ':' { if nd.config.EnableP2P && !nd.config.EnableP2PHybridMode { fmt.Fprintf(os.Stdout, " - skipping relay addresses - p2p mode\n") - } - fmt.Fprintf(os.Stdout, " - adding to relay addresses\n") - for _, bootstrapRecord := range nd.config.DNSBootstrapArray(nd.configurator.genesisData.Network) { - nd.configurator.addRelaySrv(bootstrapRecord.PrimarySRVBootstrap, nd.NetAddress) + } else { + fmt.Fprintf(os.Stdout, " - adding to relay addresses\n") + for _, bootstrapRecord := range nd.config.DNSBootstrapArray(nd.configurator.genesisData.Network) { + nd.configurator.addRelaySrv(bootstrapRecord.PrimarySRVBootstrap, nd.NetAddress) + } } } if nd.P2PNetAddress != "" { @@ -194,9 +195,9 @@ func (nd *nodeDir) configurePublicAddress(publicAddress bool) error { if nd.NetAddress[0] == ':' { networkHostName := nd.configurator.getNetworkHostName() + nd.NetAddress nd.config.PublicAddress = networkHostName + fmt.Fprintf(os.Stdout, " - Assigning PublicAddress: %s\n", networkHostName) } - - return nil + return nd.saveConfig() } func (nd *nodeDir) configureP2PDNSBootstrap(p2pBootstrap bool) error { From 8532f1f07d3c65d0d3b19a4fc050df5ca60dc9bc Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 2 Jul 2024 15:09:19 -0400 Subject: [PATCH 11/22] remove gossibSubPeer type --- network/p2pNetwork.go | 31 +++++-------------------------- network/p2pNetwork_test.go | 20 -------------------- 2 files changed, 5 insertions(+), 46 deletions(-) diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index f7ef6e03bb..f94f49a231 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -194,28 +194,6 @@ type p2pPeerStats struct { txReceived atomic.Uint64 } -// gossipSubPeer implements the DeadlineSettableConn, IPAddressable, and ErlClient interfaces. -type gossipSubPeer struct { - peerID peer.ID - net GossipNode - routingAddr [8]byte -} - -func (p gossipSubPeer) GetNetwork() GossipNode { return p.net } - -func (p gossipSubPeer) OnClose(f func()) { - net := p.GetNetwork().(*P2PNetwork) - net.wsPeersLock.Lock() - defer net.wsPeersLock.Unlock() - if wsp, ok := net.wsPeers[p.peerID]; ok { - wsp.OnClose(f) - } -} - -func (p gossipSubPeer) RoutingAddr() []byte { - return p.routingAddr[:] -} - // NewP2PNetwork returns an instance of GossipNode that uses the p2p.Service func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, node NodeInfo, identityOpts *identityOpts) (*P2PNetwork, error) { const readBufferLen = 2048 @@ -559,8 +537,6 @@ func (n *P2PNetwork) Disconnect(badpeer DisconnectablePeer) { n.wsPeersLock.Lock() defer n.wsPeersLock.Unlock() switch p := badpeer.(type) { - case gossipSubPeer: // Disconnect came from a message received via GossipSub - peerID, wsp = p.peerID, n.wsPeers[p.peerID] case *wsPeer: // Disconnect came from a message received via wsPeer peerID, wsp = n.wsPeersToIDs[p], p default: @@ -952,7 +928,9 @@ func (n *P2PNetwork) txTopicHandleLoop() { func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg *pubsub.Message) pubsub.ValidationResult { var routingAddr [8]byte n.wsPeersLock.Lock() - if wsp, ok := n.wsPeers[peerID]; ok { + var wsp *wsPeer + var ok bool + if wsp, ok = n.wsPeers[peerID]; ok { copy(routingAddr[:], wsp.RoutingAddr()) } else { // well, otherwise use last 8 bytes of peerID @@ -961,7 +939,8 @@ func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg * n.wsPeersLock.Unlock() inmsg := IncomingMessage{ - Sender: gossipSubPeer{peerID: msg.ReceivedFrom, net: n, routingAddr: routingAddr}, + // Sender: gossipSubPeer{peerID: msg.ReceivedFrom, net: n, routingAddr: routingAddr}, + Sender: wsp, Tag: protocol.TxnTag, Data: msg.Data, Net: n, diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index c562bcfcd6..1070fece9e 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -36,7 +36,6 @@ import ( "github.com/algorand/go-algorand/network/phonebook" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" - "github.com/algorand/go-algorand/util" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/crypto" @@ -1119,22 +1118,3 @@ func TestMergeP2PAddrInfoResolvedAddresses(t *testing.T) { }) } } - -// TestP2PGossipSubPeerCasts checks that gossipSubPeer implements the ErlClient and IPAddressable interfaces -// needed by TxHandler -func TestP2PGossipSubPeerCasts(t *testing.T) { - partitiontest.PartitionTest(t) - t.Parallel() - - var g interface{} = gossipSubPeer{} - _, ok := g.(util.ErlClient) - require.True(t, ok) - - _, ok = g.(IPAddressable) - require.True(t, ok) - - // check that gossipSubPeer is hashable as ERL wants - var m map[util.ErlClient]struct{} - require.Equal(t, m[gossipSubPeer{}], struct{}{}) - require.Equal(t, m[g.(util.ErlClient)], struct{}{}) -} From 3584830911da0d4e6ab771b524a126c78771c10b Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 3 Jul 2024 11:10:12 -0400 Subject: [PATCH 12/22] fix letter case for public address in net identity --- config/config.go | 4 ++++ config/config_test.go | 7 +++++++ netdeploy/remote/nodecfg/nodeDir.go | 2 +- network/wsNetwork.go | 3 ++- 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/config/config.go b/config/config.go index af4f38c307..4f16437803 100644 --- a/config/config.go +++ b/config/config.go @@ -23,6 +23,7 @@ import ( "os" "os/user" "path/filepath" + "strings" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/codecs" @@ -170,6 +171,9 @@ func fixupConfig(source Local) (Local, error) { if (source.NetAddress != "" || source.P2PNetAddress != "") && source.EnableP2PHybridMode && source.PublicAddress == "" { return source, errors.New("PublicAddress must be specified when EnableP2PHybridMode is set") } + if source.PublicAddress != "" { + source.PublicAddress = strings.ToLower(source.PublicAddress) + } return source, nil } diff --git a/config/config_test.go b/config/config_test.go index 2e0f6bad69..ea5c495d64 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -168,6 +168,13 @@ func TestLocal_FixupConfig(t *testing.T) { require.Equal(t, c1, c2) require.True(t, c2.EnableP2PHybridMode) require.NotEmpty(t, c2.PublicAddress) + + c1 = Local{ + PublicAddress: "R1.test3.my-domain.tld", + } + c2, err = fixupConfig(c1) + require.NoError(t, err) + require.Equal(t, "r1.test3.my-domain.tld", c2.PublicAddress) } func saveFullPhonebook(phonebook phonebookBlackWhiteList, saveToDir string) error { diff --git a/netdeploy/remote/nodecfg/nodeDir.go b/netdeploy/remote/nodecfg/nodeDir.go index 63acf0a8e5..304fa4c636 100644 --- a/netdeploy/remote/nodecfg/nodeDir.go +++ b/netdeploy/remote/nodecfg/nodeDir.go @@ -194,7 +194,7 @@ func (nd *nodeDir) configurePublicAddress(publicAddress bool) error { if nd.NetAddress[0] == ':' { networkHostName := nd.configurator.getNetworkHostName() + nd.NetAddress - nd.config.PublicAddress = networkHostName + nd.config.PublicAddress = strings.ToLower(networkHostName) fmt.Fprintf(os.Stdout, " - Assigning PublicAddress: %s\n", networkHostName) } return nd.saveConfig() diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 80be0da43c..1c0f3e8676 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -2105,7 +2105,8 @@ func (wn *WebsocketNetwork) tryConnect(netAddr, gossipAddr string) { var idChallenge identityChallengeValue if wn.identityScheme != nil { - idChallenge = wn.identityScheme.AttachChallenge(requestHeader, netAddr) + theirAddr := strings.ToLower(netAddr) + idChallenge = wn.identityScheme.AttachChallenge(requestHeader, theirAddr) } // for backward compatibility, include the ProtocolVersion header as well. From c11a329cebf838dcb69d1b3b2af11382e12db520 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> Date: Mon, 8 Jul 2024 16:14:09 -0400 Subject: [PATCH 13/22] Apply suggestions from code review Co-authored-by: Gary Malouf <982483+gmalouf@users.noreply.github.com> --- config/config.go | 6 ++---- config/config_test.go | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/config/config.go b/config/config.go index 4f16437803..658cfcddc8 100644 --- a/config/config.go +++ b/config/config.go @@ -155,7 +155,7 @@ func mergeConfigFromFile(configpath string, source Local) (Local, error) { // fixupConfig makes the following tweaks to the config: // - If NetAddress is set, enable the ledger and block services // - If EnableP2PHybridMode is set, require PublicAddress to be set -func fixupConfig(source Local) (Local, error) { +func enrichNetworkingConfig(source Local) (Local, error) { if source.NetAddress != "" { source.EnableLedgerService = true source.EnableBlockService = true @@ -171,9 +171,7 @@ func fixupConfig(source Local) (Local, error) { if (source.NetAddress != "" || source.P2PNetAddress != "") && source.EnableP2PHybridMode && source.PublicAddress == "" { return source, errors.New("PublicAddress must be specified when EnableP2PHybridMode is set") } - if source.PublicAddress != "" { - source.PublicAddress = strings.ToLower(source.PublicAddress) - } + source.PublicAddress = strings.ToLower(source.PublicAddress) return source, nil } diff --git a/config/config_test.go b/config/config_test.go index ea5c495d64..cc77fe226a 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -121,7 +121,7 @@ func TestLocal_MergeConfig(t *testing.T) { require.Equal(t, c1.GossipFanout, c2.GossipFanout) } -func TestLocal_FixupConfig(t *testing.T) { +func TestLocal_EnrichNetworkingConfig(t *testing.T) { partitiontest.PartitionTest(t) t.Parallel() From 57675547dd09fbbd06eaa24f0dcb67f51725488d Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 9 Jul 2024 16:16:54 -0400 Subject: [PATCH 14/22] fix build after applying GH suggestions --- config/config.go | 4 ++-- config/config_test.go | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/config/config.go b/config/config.go index 658cfcddc8..ffd2aa2795 100644 --- a/config/config.go +++ b/config/config.go @@ -148,11 +148,11 @@ func mergeConfigFromFile(configpath string, source Local) (Local, error) { if err != nil { return source, err } - source, err = fixupConfig(source) + source, err = enrichNetworkingConfig(source) return source, err } -// fixupConfig makes the following tweaks to the config: +// enrichNetworkingConfig makes the following tweaks to the config: // - If NetAddress is set, enable the ledger and block services // - If EnableP2PHybridMode is set, require PublicAddress to be set func enrichNetworkingConfig(source Local) (Local, error) { diff --git a/config/config_test.go b/config/config_test.go index cc77fe226a..c27cae77d0 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -129,7 +129,7 @@ func TestLocal_EnrichNetworkingConfig(t *testing.T) { NetAddress: "test1", GossipFanout: defaultLocal.GossipFanout, } - c2, err := fixupConfig(c1) + c2, err := enrichNetworkingConfig(c1) require.NoError(t, err) require.NotEqual(t, c1, c2) require.False(t, c1.EnableLedgerService) @@ -142,28 +142,28 @@ func TestLocal_EnrichNetworkingConfig(t *testing.T) { c1 = Local{ EnableP2PHybridMode: true, } - c2, err = fixupConfig(c1) + c2, err = enrichNetworkingConfig(c1) require.NoError(t, err) c1 = Local{ NetAddress: "test1", EnableP2PHybridMode: true, } - c2, err = fixupConfig(c1) + c2, err = enrichNetworkingConfig(c1) require.ErrorContains(t, err, "PublicAddress must be specified when EnableP2PHybridMode is set") c1 = Local{ P2PNetAddress: "test1", EnableP2PHybridMode: true, } - c2, err = fixupConfig(c1) + c2, err = enrichNetworkingConfig(c1) require.ErrorContains(t, err, "PublicAddress must be specified when EnableP2PHybridMode is set") c1 = Local{ EnableP2PHybridMode: true, PublicAddress: "test2", } - c2, err = fixupConfig(c1) + c2, err = enrichNetworkingConfig(c1) require.NoError(t, err) require.Equal(t, c1, c2) require.True(t, c2.EnableP2PHybridMode) @@ -172,7 +172,7 @@ func TestLocal_EnrichNetworkingConfig(t *testing.T) { c1 = Local{ PublicAddress: "R1.test3.my-domain.tld", } - c2, err = fixupConfig(c1) + c2, err = enrichNetworkingConfig(c1) require.NoError(t, err) require.Equal(t, "r1.test3.my-domain.tld", c2.PublicAddress) } From ef77424837fde1f52139b4714d4e5c90f2b787cc Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 9 Jul 2024 16:17:26 -0400 Subject: [PATCH 15/22] CR: hybrid cluster script update --- .../recipes/scenario1s-p2p/copy-node-configs.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py index ea5639ae72..12da86f348 100644 --- a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py +++ b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py @@ -49,6 +49,7 @@ def make_hybrid_p2p_net(*args): net_address = config.get("NetAddress") if net_address: + # in p2p-only mode all relays are P2PBootstrap-able config["P2PBootstrap"] = True altconfigs = config.get("AltConfigs") @@ -125,15 +126,15 @@ def main(): with open(os.path.join(SCENARIO1S_DIR, "nonPartNode.json"), "r") as f: non_part_node = json.load(f) - # in p2p-only mode all relays are P2PBootstrap-able - if not hybrid_mode: - make_p2p_net(node, relay, non_part_node) - elif hybrid_mode == 'p2p': + if hybrid_mode == 'p2p': + print('making hybrid p2p network...') make_hybrid_p2p_net(node, relay, non_part_node) elif hybrid_mode == 'ws': + print('making hybrid ws network...') make_hybrid_ws_net(node, relay, non_part_node) else: - raise ValueError(f"Invalid hybrid mode: { hybrid_mode }") + print('making pure p2p network...') + make_p2p_net(node, relay, non_part_node) with open("node.json", "w") as f: json.dump(node, f, indent=4) From 2d04b946af42846a1f6885a565dd5a01c3cdd148 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 10 Jul 2024 13:12:15 -0400 Subject: [PATCH 16/22] add a stub for connection limiting in p2p net --- network/p2p/p2p.go | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 6fe7aec796..144c2d2f26 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -38,6 +38,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" "github.com/libp2p/go-libp2p/p2p/security/noise" "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -115,15 +116,21 @@ func MakeHost(cfg config.Local, datadir string, pstore *pstore.PeerStore) (host. listenAddr = "/ip4/0.0.0.0/tcp/0" } - // the libp2p.NoListenAddrs builtin disables relays but this one does not + // the libp2p.NoListenAddrs builtin disables relays but this one does not. + // libp2p.Host is started with listening disabled to prevent listening before Start() is called. var noListenAddrs = func(cfg *libp2p.Config) error { cfg.ListenAddrs = []multiaddr.Multiaddr{} return nil } - var disableMetrics = func(cfg *libp2p.Config) error { return nil } + var enableMetrics = func(cfg *libp2p.Config) error { cfg.DisableMetrics = false; return nil } metrics.DefaultRegistry().Register(&metrics.PrometheusDefaultMetrics) + rm, err := configureResourceManager(cfg) + if err != nil { + return nil, "", err + } + host, err := libp2p.New( libp2p.Identity(privKey), libp2p.UserAgent(ua), @@ -132,11 +139,29 @@ func MakeHost(cfg config.Local, datadir string, pstore *pstore.PeerStore) (host. libp2p.Peerstore(pstore), noListenAddrs, libp2p.Security(noise.ID, noise.New), - disableMetrics, + enableMetrics, + libp2p.ResourceManager(rm), ) return host, listenAddr, err } +func configureResourceManager(cfg config.Local) (network.ResourceManager, error) { + // see https://github.com/libp2p/go-libp2p/tree/master/p2p/host/resource-manager for more details + scalingLimits := rcmgr.DefaultLimits + libp2p.SetDefaultServiceLimits(&scalingLimits) + scaledDefaultLimits := scalingLimits.AutoScale() + + limitConfig := rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{ + Conns: rcmgr.LimitVal(cfg.IncomingConnectionsLimit), + }, + // Everything else is default. The exact values will come from `scaledDefaultLimits` above. + } + limiter := rcmgr.NewFixedLimiter(limitConfig.Build(scaledDefaultLimits)) + rm, err := rcmgr.NewResourceManager(limiter) + return rm, err +} + // MakeService creates a P2P service instance func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, bootstrapPeers []*peer.AddrInfo) (*serviceImpl, error) { From 16a096c61ebe3a8e8ac693e58185c8ed9bba7a57 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 12 Jul 2024 14:19:15 -0400 Subject: [PATCH 17/22] IsGossipServer takes into account p2p and hybrid options --- config/config_test.go | 44 +++++++++++++++++++++++++++++++++++++++++ config/localTemplate.go | 15 +++++++++++--- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index c27cae77d0..9fc832b385 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -615,9 +615,53 @@ func TestLocal_IsGossipServer(t *testing.T) { cfg := GetDefaultLocal() require.False(t, cfg.IsGossipServer()) + require.False(t, cfg.isWsGossipServer()) + require.False(t, cfg.isP2PGossipServer()) cfg.NetAddress = ":4160" require.True(t, cfg.IsGossipServer()) + require.True(t, cfg.isWsGossipServer()) + require.False(t, cfg.isP2PGossipServer()) + + cfg.EnableGossipService = false + // EnableGossipService does not matter + require.True(t, cfg.IsGossipServer()) + require.True(t, cfg.isWsGossipServer()) + require.False(t, cfg.isP2PGossipServer()) + + cfg.EnableP2P = true + cfg.NetAddress = ":4160" + require.True(t, cfg.IsGossipServer()) + require.False(t, cfg.isWsGossipServer()) + require.True(t, cfg.isP2PGossipServer()) + + cfg.EnableP2P = false + + cfg.EnableP2PHybridMode = true + // with net address set it is ws net gossip server + require.True(t, cfg.IsGossipServer()) + require.True(t, cfg.isWsGossipServer()) + require.False(t, cfg.isP2PGossipServer()) + + cfg.EnableP2PHybridMode = true + cfg.NetAddress = "" + require.False(t, cfg.IsGossipServer()) + require.False(t, cfg.isWsGossipServer()) + require.False(t, cfg.isP2PGossipServer()) + + cfg.EnableP2PHybridMode = true + cfg.P2PNetAddress = ":4190" + require.True(t, cfg.IsGossipServer()) + require.False(t, cfg.isWsGossipServer()) + require.True(t, cfg.isP2PGossipServer()) + + cfg.EnableP2PHybridMode = true + cfg.NetAddress = ":4160" + cfg.P2PNetAddress = ":4190" + require.True(t, cfg.IsGossipServer()) + require.True(t, cfg.isWsGossipServer()) + require.True(t, cfg.isP2PGossipServer()) + } func TestLocal_RecalculateConnectionLimits(t *testing.T) { diff --git a/config/localTemplate.go b/config/localTemplate.go index 534817c8a4..229c753496 100644 --- a/config/localTemplate.go +++ b/config/localTemplate.go @@ -735,10 +735,19 @@ func (cfg Local) TxFilterCanonicalEnabled() bool { return cfg.TxIncomingFilteringFlags&txFilterCanonical != 0 } -// IsGossipServer returns true if NetAddress is set and this node supposed -// to start websocket server +// IsGossipServer returns true if this node supposed to start websocket or p2p server func (cfg Local) IsGossipServer() bool { - return cfg.NetAddress != "" + return cfg.isWsGossipServer() || cfg.isP2PGossipServer() +} + +// isWsGossipServer returns true if a node configured to run a listening ws net +func (cfg Local) isWsGossipServer() bool { + return cfg.NetAddress != "" && !cfg.EnableP2P +} + +// isP2PGossipServer returns true if a node configured to run a listening p2p net +func (cfg Local) isP2PGossipServer() bool { + return cfg.EnableP2P && cfg.NetAddress != "" || cfg.EnableP2PHybridMode && cfg.P2PNetAddress != "" } // ensureAbsGenesisDir will convert a path to absolute, and will attempt to make a genesis directory there From dfdd101d8ac384e98ad7d0fb1d4032a7af761306 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 12 Jul 2024 15:13:08 -0400 Subject: [PATCH 18/22] config: Add P2PIncomingConnectionsLimit --- config/config_test.go | 102 +++++++++++++++----------- config/localTemplate.go | 33 ++++++--- config/local_defaults.go | 1 + daemon/algod/server.go | 24 +++++- installer/config.json.example | 1 + network/p2p/p2p.go | 2 +- test/testdata/configs/config-v34.json | 1 + 7 files changed, 106 insertions(+), 58 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index 9fc832b385..5b7e86cfd1 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -615,52 +615,52 @@ func TestLocal_IsGossipServer(t *testing.T) { cfg := GetDefaultLocal() require.False(t, cfg.IsGossipServer()) - require.False(t, cfg.isWsGossipServer()) - require.False(t, cfg.isP2PGossipServer()) + require.False(t, cfg.IsWsGossipServer()) + require.False(t, cfg.IsP2PGossipServer()) cfg.NetAddress = ":4160" require.True(t, cfg.IsGossipServer()) - require.True(t, cfg.isWsGossipServer()) - require.False(t, cfg.isP2PGossipServer()) + require.True(t, cfg.IsWsGossipServer()) + require.False(t, cfg.IsP2PGossipServer()) cfg.EnableGossipService = false // EnableGossipService does not matter require.True(t, cfg.IsGossipServer()) - require.True(t, cfg.isWsGossipServer()) - require.False(t, cfg.isP2PGossipServer()) + require.True(t, cfg.IsWsGossipServer()) + require.False(t, cfg.IsP2PGossipServer()) cfg.EnableP2P = true cfg.NetAddress = ":4160" require.True(t, cfg.IsGossipServer()) - require.False(t, cfg.isWsGossipServer()) - require.True(t, cfg.isP2PGossipServer()) + require.False(t, cfg.IsWsGossipServer()) + require.True(t, cfg.IsP2PGossipServer()) cfg.EnableP2P = false cfg.EnableP2PHybridMode = true // with net address set it is ws net gossip server require.True(t, cfg.IsGossipServer()) - require.True(t, cfg.isWsGossipServer()) - require.False(t, cfg.isP2PGossipServer()) + require.True(t, cfg.IsWsGossipServer()) + require.False(t, cfg.IsP2PGossipServer()) cfg.EnableP2PHybridMode = true cfg.NetAddress = "" require.False(t, cfg.IsGossipServer()) - require.False(t, cfg.isWsGossipServer()) - require.False(t, cfg.isP2PGossipServer()) + require.False(t, cfg.IsWsGossipServer()) + require.False(t, cfg.IsP2PGossipServer()) cfg.EnableP2PHybridMode = true cfg.P2PNetAddress = ":4190" require.True(t, cfg.IsGossipServer()) - require.False(t, cfg.isWsGossipServer()) - require.True(t, cfg.isP2PGossipServer()) + require.False(t, cfg.IsWsGossipServer()) + require.True(t, cfg.IsP2PGossipServer()) cfg.EnableP2PHybridMode = true cfg.NetAddress = ":4160" cfg.P2PNetAddress = ":4190" require.True(t, cfg.IsGossipServer()) - require.True(t, cfg.isWsGossipServer()) - require.True(t, cfg.isP2PGossipServer()) + require.True(t, cfg.IsWsGossipServer()) + require.True(t, cfg.IsP2PGossipServer()) } @@ -669,45 +669,59 @@ func TestLocal_RecalculateConnectionLimits(t *testing.T) { t.Parallel() var tests = []struct { - maxFDs uint64 - reservedIn uint64 - restSoftIn uint64 - restHardIn uint64 - incomingIn int - - updated bool - restSoftExp uint64 - restHardExp uint64 - incomingExp int + maxFDs uint64 + reservedIn uint64 + restSoftIn uint64 + restHardIn uint64 + incomingIn int + p2pIncomingIn int + + updated bool + restSoftExp uint64 + restHardExp uint64 + incomingExp int + p2pIncomingExp int }{ - {100, 10, 20, 40, 50, false, 20, 40, 50}, // no change - {100, 10, 20, 50, 50, true, 20, 40, 50}, // borrow from rest - {100, 10, 25, 50, 50, true, 25, 40, 50}, // borrow from rest - {100, 10, 50, 50, 50, true, 40, 40, 50}, // borrow from rest, update soft - {100, 10, 9, 19, 81, true, 9, 10, 80}, // borrow from both rest and incoming - {100, 10, 10, 20, 80, true, 10, 10, 80}, // borrow from both rest and incoming - {100, 50, 10, 30, 40, true, 10, 10, 40}, // borrow from both rest and incoming - {100, 90, 10, 30, 40, true, 10, 10, 0}, // borrow from both rest and incoming, clear incoming - {4096, 256, 1024, 2048, 2400, true, 1024, 1440, 2400}, // real numbers - {5000, 256, 1024, 2048, 2400, false, 1024, 2048, 2400}, // real numbers + {100, 10, 20, 40, 50, 0, false, 20, 40, 50, 0}, // no change + {100, 10, 20, 50, 50, 0, true, 20, 40, 50, 0}, // borrow from rest + {100, 10, 25, 50, 50, 0, true, 25, 40, 50, 0}, // borrow from rest + {100, 10, 25, 50, 50, 50, true, 10, 10, 40, 40}, // borrow from rest for incoming and p2p incoming + {100, 10, 50, 50, 50, 0, true, 40, 40, 50, 0}, // borrow from rest, update soft + {100, 10, 50, 50, 40, 10, true, 40, 40, 40, 10}, // borrow from rest, update soft for incoming and p2p incoming + {100, 10, 9, 19, 81, 0, true, 9, 10, 80, 0}, // borrow from both rest and incoming + {100, 10, 9, 19, 41, 41, true, 9, 10, 40, 40}, // borrow from both rest and incoming for incoming and p2p incoming + {100, 90, 10, 30, 40, 0, true, 10, 10, 0, 0}, // borrow from both rest and incoming, clear incoming + {100, 90, 10, 30, 40, 40, true, 10, 10, 0, 0}, // borrow from both rest and incoming, clear incoming + {100, 90, 10, 30, 50, 40, true, 10, 10, 0, 0}, // borrow from both rest and incoming, clear incoming + {4096, 256, 1024, 2048, 2400, 0, true, 1024, 1440, 2400, 0}, // real numbers + {5000, 256, 1024, 2048, 2400, 0, false, 1024, 2048, 2400, 0}, // real numbers + {4096, 256, 1024, 2048, 2400, 1200, true, 240, 240, 2400, 1200}, // real numbers + {6000, 256, 1024, 2048, 2400, 1200, false, 1024, 2048, 2400, 1200}, // real numbers } for i, test := range tests { test := test - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + t.Run(fmt.Sprintf("test=%d", i), func(t *testing.T) { t.Parallel() c := Local{ - RestConnectionsSoftLimit: test.restSoftIn, - RestConnectionsHardLimit: test.restHardIn, - IncomingConnectionsLimit: test.incomingIn, + NetAddress: ":4160", + RestConnectionsSoftLimit: test.restSoftIn, + RestConnectionsHardLimit: test.restHardIn, + IncomingConnectionsLimit: test.incomingIn, + P2PIncomingConnectionsLimit: test.p2pIncomingIn, } - requireFDs := test.reservedIn + test.restHardIn + uint64(test.incomingIn) + if test.p2pIncomingIn > 0 { + c.EnableP2PHybridMode = true + c.P2PNetAddress = ":4190" + } + requireFDs := test.reservedIn + test.restHardIn + uint64(test.incomingIn) + uint64(test.p2pIncomingIn) res := c.AdjustConnectionLimits(requireFDs, test.maxFDs) require.Equal(t, test.updated, res) - require.Equal(t, test.restSoftExp, c.RestConnectionsSoftLimit) - require.Equal(t, test.restHardExp, c.RestConnectionsHardLimit) - require.Equal(t, test.incomingExp, c.IncomingConnectionsLimit) + require.Equal(t, int(test.restSoftExp), int(c.RestConnectionsSoftLimit)) + require.Equal(t, int(test.restHardExp), int(c.RestConnectionsHardLimit)) + require.Equal(t, int(test.incomingExp), int(c.IncomingConnectionsLimit)) + require.Equal(t, int(test.p2pIncomingExp), int(c.P2PIncomingConnectionsLimit)) }) } } diff --git a/config/localTemplate.go b/config/localTemplate.go index 229c753496..cff062b6bf 100644 --- a/config/localTemplate.go +++ b/config/localTemplate.go @@ -134,6 +134,8 @@ type Local struct { // Estimating 1.5MB per incoming connection, 1.5MB*2400 = 3.6GB IncomingConnectionsLimit int `version[0]:"-1" version[1]:"10000" version[17]:"800" version[27]:"2400"` + P2PIncomingConnectionsLimit int `version[34]:"1200"` + // BroadcastConnectionsLimit specifies the number of connections that // will receive broadcast (gossip) messages from this node. If the // node has more connections than this number, it will send broadcasts @@ -737,16 +739,16 @@ func (cfg Local) TxFilterCanonicalEnabled() bool { // IsGossipServer returns true if this node supposed to start websocket or p2p server func (cfg Local) IsGossipServer() bool { - return cfg.isWsGossipServer() || cfg.isP2PGossipServer() + return cfg.IsWsGossipServer() || cfg.IsP2PGossipServer() } -// isWsGossipServer returns true if a node configured to run a listening ws net -func (cfg Local) isWsGossipServer() bool { +// IsWsGossipServer returns true if a node configured to run a listening ws net +func (cfg Local) IsWsGossipServer() bool { return cfg.NetAddress != "" && !cfg.EnableP2P } -// isP2PGossipServer returns true if a node configured to run a listening p2p net -func (cfg Local) isP2PGossipServer() bool { +// IsP2PGossipServer returns true if a node configured to run a listening p2p net +func (cfg Local) IsP2PGossipServer() bool { return cfg.EnableP2P && cfg.NetAddress != "" || cfg.EnableP2PHybridMode && cfg.P2PNetAddress != "" } @@ -945,10 +947,23 @@ func (cfg *Local) AdjustConnectionLimits(requiredFDs, maxFDs uint64) bool { if cfg.RestConnectionsHardLimit <= diff+reservedRESTConns { restDelta := diff + reservedRESTConns - cfg.RestConnectionsHardLimit cfg.RestConnectionsHardLimit = reservedRESTConns - if cfg.IncomingConnectionsLimit > int(restDelta) { - cfg.IncomingConnectionsLimit -= int(restDelta) - } else { - cfg.IncomingConnectionsLimit = 0 + splitRatio := 1 + if cfg.IsWsGossipServer() && cfg.IsP2PGossipServer() { + splitRatio = 2 + } + if cfg.IsWsGossipServer() { + if cfg.IncomingConnectionsLimit > int(restDelta) { + cfg.IncomingConnectionsLimit -= int(restDelta) / splitRatio + } else { + cfg.IncomingConnectionsLimit = 0 + } + } + if cfg.IsP2PGossipServer() { + if cfg.P2PIncomingConnectionsLimit > int(restDelta) { + cfg.P2PIncomingConnectionsLimit -= int(restDelta) / splitRatio + } else { + cfg.P2PIncomingConnectionsLimit = 0 + } } } else { cfg.RestConnectionsHardLimit -= diff diff --git a/config/local_defaults.go b/config/local_defaults.go index ae2ed22ebf..57457531be 100644 --- a/config/local_defaults.go +++ b/config/local_defaults.go @@ -119,6 +119,7 @@ var defaultLocal = Local{ OptimizeAccountsDatabaseOnStartup: false, OutgoingMessageFilterBucketCount: 3, OutgoingMessageFilterBucketSize: 128, + P2PIncomingConnectionsLimit: 1200, P2PNetAddress: "", P2PPersistPeerID: false, P2PPrivateKeyLocation: "", diff --git a/daemon/algod/server.go b/daemon/algod/server.go index c43b0b0693..72102021fa 100644 --- a/daemon/algod/server.go +++ b/daemon/algod/server.go @@ -149,9 +149,21 @@ func (s *Server) Initialize(cfg config.Local, phonebookAddresses []string, genes if cfg.IsGossipServer() { var ot basics.OverflowTracker - fdRequired = ot.Add(fdRequired, uint64(cfg.IncomingConnectionsLimit)+network.ReservedHealthServiceConnections) + fdRequired = ot.Add(fdRequired, network.ReservedHealthServiceConnections) if ot.Overflowed { - return errors.New("Initialize() overflowed when adding up IncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease RestConnectionsHardLimit or IncomingConnectionsLimit") + return errors.New("Initialize() overflowed when adding up ReservedHealthServiceConnections to the existing RLIMIT_NOFILE value; decrease RestConnectionsHardLimit") + } + if cfg.IsWsGossipServer() { + fdRequired = ot.Add(fdRequired, uint64(cfg.IncomingConnectionsLimit)) + if ot.Overflowed { + return errors.New("Initialize() overflowed when adding up IncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease IncomingConnectionsLimit") + } + } + if cfg.IsP2PGossipServer() { + fdRequired = ot.Add(fdRequired, uint64(cfg.P2PIncomingConnectionsLimit)+network.ReservedHealthServiceConnections) + if ot.Overflowed { + return errors.New("Initialize() overflowed when adding up P2PIncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease P2PIncomingConnectionsLimit") + } } _, hard, fdErr := util.GetFdLimits() if fdErr != nil { @@ -164,14 +176,18 @@ func (s *Server) Initialize(cfg config.Local, phonebookAddresses []string, genes // but try to keep cfg.ReservedFDs untouched by decreasing other limits if cfg.AdjustConnectionLimits(fdRequired, hard) { s.log.Warnf( - "Updated connection limits: RestConnectionsSoftLimit=%d, RestConnectionsHardLimit=%d, IncomingConnectionsLimit=%d", + "Updated connection limits: RestConnectionsSoftLimit=%d, RestConnectionsHardLimit=%d, IncomingConnectionsLimit=%d, P2PIncomingConnectionsLimit=%d", cfg.RestConnectionsSoftLimit, cfg.RestConnectionsHardLimit, cfg.IncomingConnectionsLimit, + cfg.P2PIncomingConnectionsLimit, ) - if cfg.IncomingConnectionsLimit == 0 { + if cfg.IsWsGossipServer() && cfg.IncomingConnectionsLimit == 0 { return errors.New("Initialize() failed to adjust connection limits") } + if cfg.IsP2PGossipServer() && cfg.P2PIncomingConnectionsLimit == 0 { + return errors.New("Initialize() failed to adjust p2p connection limits") + } } } fdErr = util.SetFdSoftLimit(maxFDs) diff --git a/installer/config.json.example b/installer/config.json.example index 7f16155303..3a9714bbfb 100644 --- a/installer/config.json.example +++ b/installer/config.json.example @@ -98,6 +98,7 @@ "OptimizeAccountsDatabaseOnStartup": false, "OutgoingMessageFilterBucketCount": 3, "OutgoingMessageFilterBucketSize": 128, + "P2PIncomingConnectionsLimit": 1200, "P2PNetAddress": "", "P2PPersistPeerID": false, "P2PPrivateKeyLocation": "", diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 6dceba0576..bd65f01d64 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -146,7 +146,7 @@ func configureResourceManager(cfg config.Local) (network.ResourceManager, error) limitConfig := rcmgr.PartialLimitConfig{ System: rcmgr.ResourceLimits{ - Conns: rcmgr.LimitVal(cfg.IncomingConnectionsLimit), + Conns: rcmgr.LimitVal(cfg.P2PIncomingConnectionsLimit), }, // Everything else is default. The exact values will come from `scaledDefaultLimits` above. } diff --git a/test/testdata/configs/config-v34.json b/test/testdata/configs/config-v34.json index 7f16155303..3a9714bbfb 100644 --- a/test/testdata/configs/config-v34.json +++ b/test/testdata/configs/config-v34.json @@ -98,6 +98,7 @@ "OptimizeAccountsDatabaseOnStartup": false, "OutgoingMessageFilterBucketCount": 3, "OutgoingMessageFilterBucketSize": 128, + "P2PIncomingConnectionsLimit": 1200, "P2PNetAddress": "", "P2PPersistPeerID": false, "P2PPrivateKeyLocation": "", From 1425b70f591f841354a310bc8160008d40d7d538 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 17 Jul 2024 16:17:37 -0400 Subject: [PATCH 19/22] CR fixes: fd calc and deploy net --- daemon/algod/server.go | 2 +- netdeploy/remote/deployedNetwork.go | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/daemon/algod/server.go b/daemon/algod/server.go index 72102021fa..309fdc5799 100644 --- a/daemon/algod/server.go +++ b/daemon/algod/server.go @@ -160,7 +160,7 @@ func (s *Server) Initialize(cfg config.Local, phonebookAddresses []string, genes } } if cfg.IsP2PGossipServer() { - fdRequired = ot.Add(fdRequired, uint64(cfg.P2PIncomingConnectionsLimit)+network.ReservedHealthServiceConnections) + fdRequired = ot.Add(fdRequired, uint64(cfg.P2PIncomingConnectionsLimit)) if ot.Overflowed { return errors.New("Initialize() overflowed when adding up P2PIncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease P2PIncomingConnectionsLimit") } diff --git a/netdeploy/remote/deployedNetwork.go b/netdeploy/remote/deployedNetwork.go index a58d8a15fb..ce72071ff0 100644 --- a/netdeploy/remote/deployedNetwork.go +++ b/netdeploy/remote/deployedNetwork.go @@ -1004,15 +1004,15 @@ func createHostSpec(host HostConfig, template cloudHost) (hostSpec cloudHostSpec ports[port] = true portList = append(portList, strconv.Itoa(port)) } - if node.P2PNetAddress != "" { - port, err = extractPublicPort(node.P2PNetAddress) - if err != nil { - return - } - if !ports[port] { - ports[port] = true - portList = append(portList, strconv.Itoa(port)) - } + } + if node.P2PNetAddress != "" { + port, err = extractPublicPort(node.P2PNetAddress) + if err != nil { + return + } + if !ports[port] { + ports[port] = true + portList = append(portList, strconv.Itoa(port)) } } From f00275231e6ab497160f89142d8432cd06636d47 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 17 Jul 2024 16:31:53 -0400 Subject: [PATCH 20/22] CR fixes: IsGossipServer changes --- config/config_test.go | 15 +++++++++++++++ config/localTemplate.go | 7 +++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index 5b7e86cfd1..20338766c3 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -662,6 +662,21 @@ func TestLocal_IsGossipServer(t *testing.T) { require.True(t, cfg.IsWsGossipServer()) require.True(t, cfg.IsP2PGossipServer()) + cfg.EnableP2PHybridMode = true + cfg.EnableP2P = true + cfg.NetAddress = ":4160" + cfg.P2PNetAddress = ":4190" + require.True(t, cfg.IsGossipServer()) + require.True(t, cfg.IsWsGossipServer()) + require.True(t, cfg.IsP2PGossipServer()) + + cfg.EnableP2PHybridMode = true + cfg.EnableP2P = true + cfg.NetAddress = ":4160" + cfg.P2PNetAddress = "" + require.True(t, cfg.IsGossipServer()) + require.True(t, cfg.IsWsGossipServer()) + require.False(t, cfg.IsP2PGossipServer()) } func TestLocal_RecalculateConnectionLimits(t *testing.T) { diff --git a/config/localTemplate.go b/config/localTemplate.go index cff062b6bf..ffe3e1a8e0 100644 --- a/config/localTemplate.go +++ b/config/localTemplate.go @@ -744,12 +744,14 @@ func (cfg Local) IsGossipServer() bool { // IsWsGossipServer returns true if a node configured to run a listening ws net func (cfg Local) IsWsGossipServer() bool { - return cfg.NetAddress != "" && !cfg.EnableP2P + // 1. NetAddress is set and EnableP2P is not set + // 2. NetAddress is set and EnableP2PHybridMode is set then EnableP2P is overridden by EnableP2PHybridMode + return cfg.NetAddress != "" && (!cfg.EnableP2P || cfg.EnableP2PHybridMode) } // IsP2PGossipServer returns true if a node configured to run a listening p2p net func (cfg Local) IsP2PGossipServer() bool { - return cfg.EnableP2P && cfg.NetAddress != "" || cfg.EnableP2PHybridMode && cfg.P2PNetAddress != "" + return cfg.EnableP2P && !cfg.EnableP2PHybridMode && cfg.NetAddress != "" || cfg.EnableP2PHybridMode && cfg.P2PNetAddress != "" } // ensureAbsGenesisDir will convert a path to absolute, and will attempt to make a genesis directory there @@ -949,6 +951,7 @@ func (cfg *Local) AdjustConnectionLimits(requiredFDs, maxFDs uint64) bool { cfg.RestConnectionsHardLimit = reservedRESTConns splitRatio := 1 if cfg.IsWsGossipServer() && cfg.IsP2PGossipServer() { + // split the rest of the delta between ws and p2p evenly splitRatio = 2 } if cfg.IsWsGossipServer() { From d0b973aef13ba174637b68d2ffa33439985779f4 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 18 Jul 2024 18:59:41 -0400 Subject: [PATCH 21/22] post merge fixes --- network/hybridNetwork_test.go | 1 + network/p2pNetwork_test.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/network/hybridNetwork_test.go b/network/hybridNetwork_test.go index dba860348e..7c76c1e38e 100644 --- a/network/hybridNetwork_test.go +++ b/network/hybridNetwork_test.go @@ -64,6 +64,7 @@ func TestHybridNetwork_DuplicateConn(t *testing.T) { // make it net address and restart the node relayCfg.NetAddress = addr relayCfg.PublicAddress = addr + relayCfg.P2PNetAddress = ":0" netA, err = NewHybridP2PNetwork(log.With("node", "netA"), relayCfg, p2pKeyDir, nil, genesisID, "net", &nopeNodeInfo{}) require.NoError(t, err) diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 19506e7aa9..0eac398431 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -192,7 +192,7 @@ func TestP2PSubmitTXNoGossip(t *testing.T) { cfg.ForceFetchTransactions = false // Have to unset NetAddress to get IsGossipServer to return false cfg.NetAddress = "" - netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netC.Start() defer netC.Stop() From c3cd19c4a72697e60e8fba32be469129155d5585 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> Date: Fri, 19 Jul 2024 10:37:51 -0400 Subject: [PATCH 22/22] Update config/localTemplate.go Co-authored-by: Gary Malouf <982483+gmalouf@users.noreply.github.com> --- config/localTemplate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/localTemplate.go b/config/localTemplate.go index ffe3e1a8e0..9583a194cd 100644 --- a/config/localTemplate.go +++ b/config/localTemplate.go @@ -751,7 +751,7 @@ func (cfg Local) IsWsGossipServer() bool { // IsP2PGossipServer returns true if a node configured to run a listening p2p net func (cfg Local) IsP2PGossipServer() bool { - return cfg.EnableP2P && !cfg.EnableP2PHybridMode && cfg.NetAddress != "" || cfg.EnableP2PHybridMode && cfg.P2PNetAddress != "" + return (cfg.EnableP2P && !cfg.EnableP2PHybridMode && cfg.NetAddress != "") || (cfg.EnableP2PHybridMode && cfg.P2PNetAddress != "") } // ensureAbsGenesisDir will convert a path to absolute, and will attempt to make a genesis directory there