diff --git a/components/mocks/mockNetwork.go b/components/mocks/mockNetwork.go index 5d8ae9918c..a24cf8fbe8 100644 --- a/components/mocks/mockNetwork.go +++ b/components/mocks/mockNetwork.go @@ -18,6 +18,7 @@ package mocks import ( "context" + "errors" "net" "net/http" @@ -76,11 +77,6 @@ func (network *MockNetwork) GetPeers(options ...network.PeerOption) []network.Pe return nil } -// GetRoundTripper -- returns the network round tripper -func (network *MockNetwork) GetRoundTripper(peer network.Peer) http.RoundTripper { - return http.DefaultTransport -} - // Ready - always ready func (network *MockNetwork) Ready() chan struct{} { c := make(chan struct{}) @@ -115,3 +111,8 @@ func (network *MockNetwork) GetGenesisID() string { } return network.GenesisID } + +// GetHTTPClient returns a http.Client with a suitable for the network +func (network *MockNetwork) GetHTTPClient(p network.HTTPPeer) (*http.Client, error) { + return nil, errors.New("not implemented") +} diff --git a/network/addr_test.go b/network/addr_test.go index eec2eccc36..c373b2efc6 100644 --- a/network/addr_test.go +++ b/network/addr_test.go @@ -89,10 +89,12 @@ func TestParseHostOrURL(t *testing.T) { t.Run(addr, func(t *testing.T) { _, err := ParseHostOrURL(addr) require.Error(t, err, "url should fail", addr) + require.False(t, IsMultiaddr(addr)) }) t.Run(addr+"-multiaddr", func(t *testing.T) { _, err := ParseHostOrURLOrMultiaddr(addr) require.Error(t, err, "url should fail", addr) + require.False(t, IsMultiaddr(addr)) }) } diff --git a/network/gossipNode.go b/network/gossipNode.go index aa5b2741a9..d973ef8a00 100644 --- a/network/gossipNode.go +++ b/network/gossipNode.go @@ -83,8 +83,9 @@ type GossipNode interface { // ClearHandlers deregisters all the existing message handlers. ClearHandlers() - // GetRoundTripper returns a Transport that would limit the number of outgoing connections. - GetRoundTripper(peer Peer) http.RoundTripper + // GetHTTPClient returns a http.Client with a suitable for the network Transport + // that would also limit the number of outgoing connections. + GetHTTPClient(peer HTTPPeer) (*http.Client, error) // OnNetworkAdvance notifies the network library that the agreement protocol was able to make a notable progress. // this is the only indication that we have that we haven't formed a clique, where all incoming messages diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index b6168dc71e..69c09186aa 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -180,14 +180,14 @@ func (n *HybridP2PNetwork) ClearHandlers() { n.wsNetwork.ClearHandlers() } -// GetRoundTripper returns a Transport that would limit the number of outgoing connections. -func (n *HybridP2PNetwork) GetRoundTripper(peer Peer) http.RoundTripper { - // TODO today this is used by HTTPTxSync.Sync after calling GetPeers(network.PeersPhonebookRelays) - switch p := peer.(type) { +// GetHTTPClient returns a http.Client with a suitable for the network Transport +// that would also limit the number of outgoing connections. +func (n *HybridP2PNetwork) GetHTTPClient(peer HTTPPeer) (*http.Client, error) { + switch peer.(type) { case *wsPeer: - return p.net.GetRoundTripper(peer) - case gossipSubPeer: - return p.net.GetRoundTripper(peer) + return n.wsNetwork.GetHTTPClient(peer) + case *wsPeerCore: + return n.p2pNetwork.GetHTTPClient(peer) default: panic("unrecognized peer type") } diff --git a/network/p2p/capabilities_test.go b/network/p2p/capabilities_test.go index d6694e5fb2..08e04f6f62 100644 --- a/network/p2p/capabilities_test.go +++ b/network/p2p/capabilities_test.go @@ -98,7 +98,7 @@ func setupDHTHosts(t *testing.T, numHosts int) []*dht.IpfsDHT { require.NoError(t, err) // this is a workaround for the following issue // "failed to negotiate security protocol: error reading handshake message: noise: message is too short" - // it appears simultenous connectino attempts (dht.New() attempts to connect) causes this handshake error. + // it appears simultaneous connection attempts (dht.New() attempts to connect) causes this handshake error. // https://github.com/libp2p/go-libp2p-noise/issues/70 time.Sleep(200 * time.Millisecond) @@ -158,7 +158,7 @@ func setupCapDiscovery(t *testing.T, numHosts int, numBootstrapPeers int) []*Cap require.NoError(t, err) // this is a workaround for the following issue // "failed to negotiate security protocol: error reading handshake message: noise: message is too short" - // it appears simultenous connectino attempts (dht.New() attempts to connect) causes this handshake error. + // it appears simultaneous connection attempts (dht.New() attempts to connect) causes this handshake error. // https://github.com/libp2p/go-libp2p-noise/issues/70 time.Sleep(200 * time.Millisecond) diff --git a/network/p2p/http.go b/network/p2p/http.go index 1f39fa56c2..33a0ede570 100644 --- a/network/p2p/http.go +++ b/network/p2p/http.go @@ -18,17 +18,47 @@ package p2p import ( "net/http" + "sync" + "github.com/gorilla/mux" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" libp2phttp "github.com/libp2p/go-libp2p/p2p/http" ) -// MakeHTTPClient creates a http.Client that uses libp2p transport for a goven protocol and peer address. -func MakeHTTPClient(protocolID string, addrInfo peer.AddrInfo) (http.Client, error) { +// algorandP2pHTTPProtocol defines a libp2p protocol name for algorand's http over p2p messages +const algorandP2pHTTPProtocol = "/algorand-http/1.0.0" + +// HTTPServer is a wrapper around libp2phttp.Host that allows registering http handlers with path parameters. +type HTTPServer struct { + libp2phttp.Host + p2phttpMux *mux.Router + p2phttpMuxRegistrarOnce sync.Once +} + +// MakeHTTPServer creates a new HTTPServer +func MakeHTTPServer(streamHost host.Host) *HTTPServer { + httpServer := HTTPServer{ + Host: libp2phttp.Host{StreamHost: streamHost}, + p2phttpMux: mux.NewRouter(), + } + return &httpServer +} + +// RegisterHTTPHandler registers a http handler with a given path. +func (s *HTTPServer) RegisterHTTPHandler(path string, handler http.Handler) { + s.p2phttpMux.Handle(path, handler) + s.p2phttpMuxRegistrarOnce.Do(func() { + s.Host.SetHTTPHandlerAtPath(algorandP2pHTTPProtocol, "/", s.p2phttpMux) + }) +} + +// MakeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address. +func MakeHTTPClient(addrInfo *peer.AddrInfo) (*http.Client, error) { clientStreamHost, err := libp2p.New(libp2p.NoListenAddrs) if err != nil { - return http.Client{}, err + return nil, err } client := libp2phttp.Host{StreamHost: clientStreamHost} @@ -37,10 +67,10 @@ func MakeHTTPClient(protocolID string, addrInfo peer.AddrInfo) (http.Client, err // to make a NamespaceRoundTripper that limits to specific URL paths. // First, we do not want make requests when listing peers (the main MakeHTTPClient invoker). // Secondly, this makes unit testing easier - no need to register fake handlers. - rt, err := client.NewConstrainedRoundTripper(addrInfo) + rt, err := client.NewConstrainedRoundTripper(*addrInfo) if err != nil { - return http.Client{}, err + return nil, err } - return http.Client{Transport: rt}, nil + return &http.Client{Transport: rt}, nil } diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 99f537ed36..7450f34794 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -75,9 +75,6 @@ type serviceImpl struct { // AlgorandWsProtocol defines a libp2p protocol name for algorand's websockets messages const AlgorandWsProtocol = "/algorand-ws/1.0.0" -// AlgorandP2pHTTPProtocol defines a libp2p protocol name for algorand's http over p2p messages -const AlgorandP2pHTTPProtocol = "/algorand-http/1.0.0" - const dialTimeout = 30 * time.Second // MakeHost creates a libp2p host but does not start listening. diff --git a/network/p2p/streams.go b/network/p2p/streams.go index 0e5c59d50c..160f273e17 100644 --- a/network/p2p/streams.go +++ b/network/p2p/streams.go @@ -80,7 +80,7 @@ func (n *streamManager) streamHandler(stream network.Stream) { if stream.Stat().Direction == network.DirUnknown { n.log.Warnf("Unknown direction for a steam %s to/from %s", stream.ID(), remotePeer) } else { - n.log.Warnf("Unexpected outgoing sream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String()) + n.log.Warnf("Unexpected outgoing stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String()) } } n.handler(n.ctx, remotePeer, stream, incoming) @@ -98,7 +98,7 @@ func (n *streamManager) streamHandler(stream network.Stream) { if stream.Stat().Direction == network.DirUnknown { n.log.Warnf("streamHandler: unknown direction for a steam %s to/from %s", stream.ID(), remotePeer) } else { - n.log.Warnf("Unexpected outgoing sream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String()) + n.log.Warnf("Unexpected outgoing stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String()) } } n.handler(n.ctx, remotePeer, stream, incoming) @@ -141,7 +141,7 @@ func (n *streamManager) Connected(net network.Network, conn network.Conn) { // a new stream created above, expected direction is outbound incoming := stream.Stat().Direction == network.DirInbound if incoming { - n.log.Warnf("Unexpected incoming sream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String()) + n.log.Warnf("Unexpected incoming stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String()) } else { if stream.Stat().Direction == network.DirUnknown { n.log.Warnf("Connected: unknown direction for a steam %s to/from %s", stream.ID(), remotePeer) diff --git a/network/p2p/testing/httpNode.go b/network/p2p/testing/httpNode.go new file mode 100644 index 0000000000..f188abcb1a --- /dev/null +++ b/network/p2p/testing/httpNode.go @@ -0,0 +1,116 @@ +// Copyright (C) 2019-2024 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +// This package wraps and re-exports the libp2p functions on order to keep +// all go-libp2p imports in one place. + +package p2p + +import ( + "net/http" + "testing" + + "github.com/algorand/go-algorand/components/mocks" + "github.com/algorand/go-algorand/network" + "github.com/algorand/go-algorand/network/p2p" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" +) + +// HTTPNode is a mock network node that uses libp2p and http. +type HTTPNode struct { + mocks.MockNetwork + host.Host + httpServer *p2p.HTTPServer + peers []network.Peer + tb testing.TB + genesisID string +} + +// MakeHTTPNode returns a new P2PHTTPNode node. +func MakeHTTPNode(tb testing.TB) *HTTPNode { + p2pHost, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) + require.NoError(tb, err) + + return &HTTPNode{ + Host: p2pHost, + httpServer: p2p.MakeHTTPServer(p2pHost), + tb: tb, + } +} + +// RegisterHTTPHandler registers a http handler with a given path. +func (p *HTTPNode) RegisterHTTPHandler(path string, handler http.Handler) { + p.httpServer.RegisterHTTPHandler(path, handler) +} + +// RegisterHandlers not implemented. +func (p *HTTPNode) RegisterHandlers(dispatch []network.TaggedMessageHandler) {} + +// Start starts http service +func (p *HTTPNode) Start() error { + go func() { + err := p.httpServer.Serve() + require.NoError(p.tb, err) + }() + return nil +} + +// Stop stops http service +func (p *HTTPNode) Stop() { + p.httpServer.Close() + p.Host.Close() +} + +// GetGenesisID returns genesisID +func (p *HTTPNode) GetGenesisID() string { return p.genesisID } + +// SetGenesisID sets genesisID +func (p *HTTPNode) SetGenesisID(genesisID string) { p.genesisID = genesisID } + +type httpPeer struct { + addrInfo peer.AddrInfo + tb testing.TB +} + +// GetAddress implements HTTPPeer interface returns the address of the peer +func (p httpPeer) GetAddress() string { + mas, err := peer.AddrInfoToP2pAddrs(&p.addrInfo) + require.NoError(p.tb, err) + require.Len(p.tb, mas, 1) + return mas[0].String() +} + +// GetAddress implements HTTPPeer interface and returns the http client for a peer +func (p httpPeer) GetHTTPClient() *http.Client { + c, err := p2p.MakeHTTPClient(&p.addrInfo) + require.NoError(p.tb, err) + return c +} + +// SetPeers sets peers +func (p *HTTPNode) SetPeers(other *HTTPNode) { + addrInfo := peer.AddrInfo{ID: other.ID(), Addrs: other.Addrs()} + hpeer := httpPeer{addrInfo, p.tb} + p.peers = append(p.peers, hpeer) +} + +// GetPeers returns peers +func (p *HTTPNode) GetPeers(options ...network.PeerOption) []network.Peer { + return p.peers +} diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 4cfec68bad..d546f2fe05 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -32,12 +32,11 @@ import ( "github.com/algorand/go-algorand/network/p2p/peerstore" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-deadlock" - "github.com/gorilla/mux" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - libp2phttp "github.com/libp2p/go-libp2p/p2p/http" + "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" ) @@ -72,10 +71,7 @@ type P2PNetwork struct { bootstrapper bootstrapper nodeInfo NodeInfo pstore *peerstore.PeerStore - httpServer libp2phttp.Host - - p2phttpMux *mux.Router - p2phttpMuxRegistarOnce sync.Once + httpServer *p2p.HTTPServer } type bootstrapper struct { @@ -171,7 +167,6 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo peerStats: make(map[peer.ID]*p2pPeerStats), nodeInfo: node, pstore: pstore, - p2phttpMux: mux.NewRouter(), } net.ctx, net.ctxCancel = context.WithCancel(context.Background()) net.handler = msgHandler{ @@ -215,9 +210,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo net.capabilitiesDiscovery = disc } - net.httpServer = libp2phttp.Host{ - StreamHost: h, - } + net.httpServer = p2p.MakeHTTPServer(h) err = net.setup() if err != nil { @@ -434,10 +427,7 @@ func (n *P2PNetwork) DisconnectPeers() { // RegisterHTTPHandler path accepts gorilla/mux path annotations func (n *P2PNetwork) RegisterHTTPHandler(path string, handler http.Handler) { - n.p2phttpMux.Handle(path, handler) - n.p2phttpMuxRegistarOnce.Do(func() { - n.httpServer.SetHTTPHandlerAtPath(p2p.AlgorandP2pHTTPProtocol, "/", n.p2phttpMux) - }) + n.httpServer.RegisterHTTPHandler(path, handler) } // RequestConnectOutgoing asks the system to actually connect to peers. @@ -479,7 +469,7 @@ func (n *P2PNetwork) GetPeers(options ...PeerOption) []Peer { n.wsPeersLock.RUnlock() case PeersPhonebookArchivalNodes: - // query known archvial nodes from DHT if enabled + // query known archival nodes from DHT if enabled if n.config.EnableDHTProviders { const nodesToFind = 5 info, err := n.capabilitiesDiscovery.PeersForCapability(p2p.Archival, nodesToFind) @@ -500,7 +490,7 @@ func (n *P2PNetwork) GetPeers(options ...PeerOption) []Peer { continue } addr := mas[0].String() - client, err := p2p.MakeHTTPClient(p2p.AlgorandP2pHTTPProtocol, addrInfo) + client, err := p2p.MakeHTTPClient(&info) if err != nil { n.log.Warnf("MakeHTTPClient failed: %v", err) continue @@ -552,9 +542,14 @@ func (n *P2PNetwork) ClearHandlers() { n.handler.ClearHandlers([]Tag{}) } -// GetRoundTripper returns a Transport that would limit the number of outgoing connections. -func (n *P2PNetwork) GetRoundTripper(peer Peer) http.RoundTripper { - return http.DefaultTransport +// GetHTTPClient returns a http.Client with a suitable for the network Transport +// that would also limit the number of outgoing connections. +func (n *P2PNetwork) GetHTTPClient(p HTTPPeer) (*http.Client, error) { + addrInfo, err := peer.AddrInfoFromString(p.GetAddress()) + if err != nil { + return nil, err + } + return p2p.MakeHTTPClient(addrInfo) } // OnNetworkAdvance notifies the network library that the agreement protocol was able to make a notable progress. @@ -569,7 +564,7 @@ func (n *P2PNetwork) GetHTTPRequestConnection(request *http.Request) (conn net.C // wsStreamHandler is a callback that the p2p package calls when a new peer connects and establishes a // stream for the websocket protocol. -func (n *P2PNetwork) wsStreamHandler(ctx context.Context, peer peer.ID, stream network.Stream, incoming bool) { +func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2ppeer peer.ID, stream network.Stream, incoming bool) { if stream.Protocol() != p2p.AlgorandWsProtocol { n.log.Warnf("unknown protocol %s", stream.Protocol()) return @@ -595,7 +590,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, peer peer.ID, stream n if numOutgoingPeers >= n.config.GossipFanout { // this appears to be some auxiliary connection made by libp2p itself like DHT connection. // skip this connection since there are already enough peers - n.log.Debugf("skipping outgoing connection to peer %s: num outgoing %d > fanout %d ", peer, numOutgoingPeers, n.config.GossipFanout) + n.log.Debugf("skipping outgoing connection to peer %s: num outgoing %d > fanout %d ", p2ppeer, numOutgoingPeers, n.config.GossipFanout) stream.Close() return } @@ -611,18 +606,23 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, peer peer.ID, stream n ma := stream.Conn().RemoteMultiaddr() addr := ma.String() if addr == "" { - n.log.Warnf("Could not get address for peer %s", peer) + n.log.Warnf("Could not get address for peer %s", p2ppeer) } // create a wsPeer for this stream and added it to the peers map. + client, err := p2p.MakeHTTPClient(&peer.AddrInfo{ID: p2ppeer, Addrs: []multiaddr.Multiaddr{ma}}) + if err != nil { + client = nil + } + peerCore := makePeerCoreWithClient(ctx, n, n.log, n.handler.readBuffer, addr, client, addr) wsp := &wsPeer{ - wsPeerCore: makePeerCore(ctx, n, n.log, n.handler.readBuffer, addr, n.GetRoundTripper(nil), addr), + wsPeerCore: peerCore, conn: &wsPeerConnP2PImpl{stream: stream}, outgoing: !incoming, } wsp.init(n.config, outgoingMessagesBufferSize) n.wsPeersLock.Lock() - n.wsPeers[peer] = wsp - n.wsPeersToIDs[wsp] = peer + n.wsPeers[p2ppeer] = wsp + n.wsPeersToIDs[wsp] = p2ppeer n.wsPeersLock.Unlock() n.wsPeersChangeCounter.Add(1) @@ -633,7 +633,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, peer peer.ID, stream n msg = "Accepted incoming connection from peer %s" } localAddr, _ := n.Address() - n.log.With("event", event).With("remote", addr).With("local", localAddr).Infof(msg, peer.String()) + n.log.With("event", event).With("remote", addr).With("local", localAddr).Infof(msg, p2ppeer.String()) if n.log.GetLevel() >= logging.Debug { n.log.Debugf("streams for %s conn %s ", stream.Conn().Stat().Direction.String(), stream.Conn().ID()) diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 5cabaf7a57..49a717de92 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -36,8 +36,6 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - peerstore "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" ) @@ -53,9 +51,7 @@ func TestP2PSubmitTX(t *testing.T) { defer netA.Stop() peerInfoA := netA.service.AddrInfo() - fmt.Print("peerInfoA is ", peerInfoA) - addrsA, err := peerstore.AddrInfoToP2pAddrs(&peerInfoA) - fmt.Printf("addrsA is %v\n", addrsA) + addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA) require.NoError(t, err) require.NotZero(t, addrsA[0]) @@ -131,7 +127,7 @@ func TestP2PSubmitWS(t *testing.T) { defer netA.Stop() peerInfoA := netA.service.AddrInfo() - addrsA, err := peerstore.AddrInfoToP2pAddrs(&peerInfoA) + addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA) require.NoError(t, err) require.NotZero(t, addrsA[0]) @@ -337,9 +333,9 @@ func (c *mockResolveController) Resolver() dnsaddr.Resolver { type mockResolver struct{} -func (r *mockResolver) Resolve(ctx context.Context, maddr multiaddr.Multiaddr) ([]multiaddr.Multiaddr, error) { - ma, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC") - return []multiaddr.Multiaddr{ma}, err +func (r *mockResolver) Resolve(ctx context.Context, _ ma.Multiaddr) ([]ma.Multiaddr, error) { + maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC") + return []ma.Multiaddr{maddr}, err } func TestBootstrapFunc(t *testing.T) { @@ -454,7 +450,7 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) { defer netA.Stop() peerInfoA := netA.service.AddrInfo() - addrsA, err := peerstore.AddrInfoToP2pAddrs(&peerInfoA) + addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA) require.NoError(t, err) require.NotZero(t, addrsA[0]) @@ -535,9 +531,9 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) { net := nets[idx] peers := net.GetPeers(PeersPhonebookArchivalNodes) uniquePeerIDs := make(map[peer.ID]struct{}) - for _, peer := range peers { - wsPeer := peer.(*wsPeerCore) - pi, err := peerstore.AddrInfoFromString(wsPeer.rootURL) + for _, p := range peers { + wsPeer := p.(*wsPeerCore) + pi, err := peer.AddrInfoFromString(wsPeer.rootURL) require.NoError(t, err) uniquePeerIDs[pi.ID] = struct{}{} } @@ -555,7 +551,7 @@ func TestMultiaddrConversionToFrom(t *testing.T) { t.Parallel() a := "/ip4/192.168.1.1/tcp/8180/p2p/Qmewz5ZHN1AAGTarRbMupNPbZRfg3p5jUGoJ3JYEatJVVk" - ma, err := multiaddr.NewMultiaddr(a) + ma, err := ma.NewMultiaddr(a) require.NoError(t, err) require.Equal(t, a, ma.String()) @@ -601,11 +597,11 @@ func TestP2PHTTPHandler(t *testing.T) { defer netA.Stop() peerInfoA := netA.service.AddrInfo() - addrsA, err := peerstore.AddrInfoToP2pAddrs(&peerInfoA) + addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA) require.NoError(t, err) require.NotZero(t, addrsA[0]) - httpClient, err := p2p.MakeHTTPClient(p2p.AlgorandP2pHTTPProtocol, netA.service.AddrInfo()) + httpClient, err := p2p.MakeHTTPClient(&peerInfoA) require.NoError(t, err) resp, err := httpClient.Get("/test") require.NoError(t, err) @@ -615,7 +611,7 @@ func TestP2PHTTPHandler(t *testing.T) { require.NoError(t, err) require.Equal(t, "hello", string(body)) - httpClient, err = p2p.MakeHTTPClient(p2p.AlgorandP2pHTTPProtocol, netA.service.AddrInfo()) + httpClient, err = p2p.MakeHTTPClient(&peerInfoA) require.NoError(t, err) resp, err = httpClient.Get("/bar") require.NoError(t, err) diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 56fb64db99..be283439bd 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -553,14 +553,14 @@ func (wn *WebsocketNetwork) GetPeers(options ...PeerOption) []Peer { var addrs []string addrs = wn.phonebook.GetAddresses(1000, PhoneBookEntryRelayRole) for _, addr := range addrs { - peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(nil), "" /*origin address*/) + peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.getRoundTripper(nil), "" /*origin address*/) outPeers = append(outPeers, &peerCore) } case PeersPhonebookArchivalNodes: var addrs []string addrs = wn.phonebook.GetAddresses(1000, PhoneBookEntryRelayRole) for _, addr := range addrs { - peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(nil), "" /*origin address*/) + peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.getRoundTripper(nil), "" /*origin address*/) outPeers = append(outPeers, &peerCore) } case PeersPhonebookArchivers: @@ -568,7 +568,7 @@ func (wn *WebsocketNetwork) GetPeers(options ...PeerOption) []Peer { var addrs []string addrs = wn.phonebook.GetAddresses(1000, PhoneBookEntryArchiverRole) for _, addr := range addrs { - peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(nil), "" /*origin address*/) + peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.getRoundTripper(nil), "" /*origin address*/) outPeers = append(outPeers, &peerCore) } case PeersConnectedIn: @@ -1095,7 +1095,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt } peer := &wsPeer{ - wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, trackedRequest.remoteAddress(), wn.GetRoundTripper(nil), trackedRequest.remoteHost), + wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, trackedRequest.remoteAddress(), wn.getRoundTripper(nil), trackedRequest.remoteHost), conn: wsPeerWebsocketConnImpl{conn}, outgoing: false, InstanceName: trackedRequest.otherInstanceName, @@ -2028,12 +2028,20 @@ func (wn *WebsocketNetwork) numOutgoingPending() int { return len(wn.tryConnectAddrs) } -// GetRoundTripper returns an http.Transport that limits the number of connection +// getRoundTripper returns an http.Transport that limits the number of connection // to comply with connectionsRateLimitingCount. -func (wn *WebsocketNetwork) GetRoundTripper(peer Peer) http.RoundTripper { +func (wn *WebsocketNetwork) getRoundTripper(peer Peer) http.RoundTripper { return &wn.transport } +// GetHTTPClient returns a http.Client with a suitable for the network Transport +// that would also limit the number of outgoing connections. +func (wn *WebsocketNetwork) GetHTTPClient(peer HTTPPeer) (*http.Client, error) { + return &http.Client{ + Transport: &wn.transport, + }, nil +} + // filterASCII filter out the non-ascii printable characters out of the given input string and // and replace these with unprintableCharacterGlyph. // It's used as a security qualifier before logging a network-provided data. @@ -2170,7 +2178,7 @@ func (wn *WebsocketNetwork) tryConnect(addr, gossipAddr string) { } peer := &wsPeer{ - wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(nil), "" /* origin */), + wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.getRoundTripper(nil), "" /* origin */), conn: wsPeerWebsocketConnImpl{conn}, outgoing: true, incomingMsgFilter: wn.incomingMsgFilter, diff --git a/network/wsPeer.go b/network/wsPeer.go index 06af7c6bad..da5adb7407 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -167,7 +167,7 @@ type wsPeerCore struct { readBuffer chan<- IncomingMessage rootURL string originAddress string // incoming connection remote host - client http.Client + client *http.Client } type disconnectReason string @@ -348,11 +348,11 @@ type TCPInfoUnicastPeer interface { // Create a wsPeerCore object func makePeerCore(ctx context.Context, net GossipNode, log logging.Logger, readBuffer chan<- IncomingMessage, rootURL string, roundTripper http.RoundTripper, originAddress string) wsPeerCore { - return makePeerCoreWithClient(ctx, net, log, readBuffer, rootURL, http.Client{Transport: roundTripper}, originAddress) + return makePeerCoreWithClient(ctx, net, log, readBuffer, rootURL, &http.Client{Transport: roundTripper}, originAddress) } // Create a wsPeerCore object -func makePeerCoreWithClient(ctx context.Context, net GossipNode, log logging.Logger, readBuffer chan<- IncomingMessage, rootURL string, client http.Client, originAddress string) wsPeerCore { +func makePeerCoreWithClient(ctx context.Context, net GossipNode, log logging.Logger, readBuffer chan<- IncomingMessage, rootURL string, client *http.Client, originAddress string) wsPeerCore { return wsPeerCore{ net: net, netCtx: ctx, @@ -374,7 +374,7 @@ func (wp *wsPeerCore) GetAddress() string { // GetHTTPClient returns a client for this peer. // http.Client will maintain a cache of connections with some keepalive. func (wp *wsPeerCore) GetHTTPClient() *http.Client { - return &wp.client + return wp.client } func (wp *wsPeerCore) GetNetwork() GossipNode { diff --git a/rpcs/httpTxSync.go b/rpcs/httpTxSync.go index 2c90f40936..4d803bddda 100644 --- a/rpcs/httpTxSync.go +++ b/rpcs/httpTxSync.go @@ -103,19 +103,26 @@ func (hts *HTTPTxSync) Sync(ctx context.Context, bloom *bloom.Filter) (txgroups if !ok { return nil, fmt.Errorf("cannot HTTPTxSync non http peer %T %#v", peer, peer) } + var syncURL string hts.rootURL = hpeer.GetAddress() client := hpeer.GetHTTPClient() if client == nil { - client = &http.Client{} - client.Transport = hts.peers.GetRoundTripper(peer) + client, err = hts.peers.GetHTTPClient(hpeer) + if err != nil { + return nil, fmt.Errorf("HTTPTxSync cannot create a HTTP client for a peer %T %#v: %s", peer, peer, err.Error()) + } } - parsedURL, err := network.ParseHostOrURL(hts.rootURL) - if err != nil { - hts.log.Warnf("txSync bad url %v: %s", hts.rootURL, err) - return nil, err + if network.IsMultiaddr(hts.rootURL) { + syncURL = network.SubstituteGenesisID(hts.peers, path.Join("", TxServiceHTTPPath)) + } else { + parsedURL, err0 := network.ParseHostOrURL(hts.rootURL) + if err0 != nil { + hts.log.Warnf("txSync bad url %v: %s", hts.rootURL, err0) + return nil, err0 + } + parsedURL.Path = network.SubstituteGenesisID(hts.peers, path.Join(parsedURL.Path, TxServiceHTTPPath)) + syncURL = parsedURL.String() } - parsedURL.Path = network.SubstituteGenesisID(hts.peers, path.Join(parsedURL.Path, TxServiceHTTPPath)) - syncURL := parsedURL.String() hts.log.Infof("http sync from %s", syncURL) params := url.Values{} params.Set("bf", bloomParam) diff --git a/rpcs/txService_test.go b/rpcs/txService_test.go index 012d22082e..49cfa3bf49 100644 --- a/rpcs/txService_test.go +++ b/rpcs/txService_test.go @@ -33,6 +33,7 @@ import ( "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/network" + p2ptesting "github.com/algorand/go-algorand/network/p2p/testing" "github.com/algorand/go-algorand/test/partitiontest" "github.com/algorand/go-algorand/util/bloom" ) @@ -129,27 +130,84 @@ func nodePair() (*basicRPCNode, *basicRPCNode) { return nodeA, nodeB } +func nodePairP2p(tb testing.TB) (*p2ptesting.HTTPNode, *p2ptesting.HTTPNode) { + nodeA := p2ptesting.MakeHTTPNode(tb) + addrsA := nodeA.Addrs() + require.Greater(tb, len(addrsA), 0) + + nodeB := p2ptesting.MakeHTTPNode(tb) + addrsB := nodeA.Addrs() + require.Greater(tb, len(addrsB), 0) + + nodeA.SetPeers(nodeB) + nodeB.SetPeers(nodeA) + nodeA.SetGenesisID("test genesisID") + nodeB.SetGenesisID("test genesisID") + + nodeA.Start() + nodeB.Start() + + return nodeA, nodeB +} + +// TestTxSync checks txsync on a network with two nodes, A and B func TestTxSync(t *testing.T) { partitiontest.PartitionTest(t) - // A network with two nodes, A and B - nodeA, nodeB := nodePair() - defer nodeA.stop() - defer nodeB.stop() + type txSyncNode interface { + Registrar + network.GossipNode + } - pool := makeMockPendingTxAggregate(3) - RegisterTxService(pool, nodeA, "test genesisID", config.GetDefaultLocal().TxPoolSize, config.GetDefaultLocal().TxSyncServeResponseSize) + tests := []struct { + name string + setup func(t *testing.T) (txSyncNode, txSyncNode, func()) + }{ + { + name: "tcp", + setup: func(t *testing.T) (txSyncNode, txSyncNode, func()) { + nodeA, nodeB := nodePair() + cleanup := func() { + nodeA.stop() + nodeB.stop() + } + return nodeA, nodeB, cleanup + }, + }, + { + name: "p2p", + setup: func(t *testing.T) (txSyncNode, txSyncNode, func()) { + nodeA, nodeB := nodePairP2p(t) + cleanup := func() { + nodeA.Stop() + nodeB.Stop() + } + return nodeA, nodeB, cleanup + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // A network with two nodes, A and B + nodeA, nodeB, cleanupFn := test.setup(t) + defer cleanupFn() - // B tries to fetch block - handler := mockHandler{} - syncInterval := time.Second - syncTimeout := time.Second - syncerPool := makeMockPendingTxAggregate(0) - syncer := MakeTxSyncer(syncerPool, nodeB, &handler, syncInterval, syncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize) - // Since syncer is not Started, set the context here - syncer.ctx, syncer.cancel = context.WithCancel(context.Background()) - require.NoError(t, syncer.sync()) - require.Equal(t, int32(3), handler.messageCounter.Load()) + pool := makeMockPendingTxAggregate(3) + RegisterTxService(pool, nodeA, "test genesisID", config.GetDefaultLocal().TxPoolSize, config.GetDefaultLocal().TxSyncServeResponseSize) + + // B tries to fetch block + handler := mockHandler{} + syncInterval := time.Second + syncTimeout := time.Second + syncerPool := makeMockPendingTxAggregate(0) + syncer := MakeTxSyncer(syncerPool, nodeB, &handler, syncInterval, syncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize) + // Since syncer is not Started, set the context here + syncer.ctx, syncer.cancel = context.WithCancel(context.Background()) + require.NoError(t, syncer.sync()) + require.Equal(t, int32(3), handler.messageCounter.Load()) + }) + } } func BenchmarkTxSync(b *testing.B) { diff --git a/rpcs/txSyncer_test.go b/rpcs/txSyncer_test.go index 61b6a769aa..6ad820b0df 100644 --- a/rpcs/txSyncer_test.go +++ b/rpcs/txSyncer_test.go @@ -170,16 +170,8 @@ func (mca *mockClientAggregator) GetPeers(options ...network.PeerOption) []netwo return mca.peers } -const numberOfPeers = 10 - -func makeMockClientAggregator(t *testing.T, failWithNil bool, failWithError bool) *mockClientAggregator { - clients := make([]network.Peer, 0) - for i := 0; i < numberOfPeers; i++ { - runner := mockRunner{failWithNil: failWithNil, failWithError: failWithError, done: make(chan *rpc.Call)} - clients = append(clients, &mockRPCClient{client: &runner, log: logging.TestingLog(t)}) - } - t.Logf("len(mca.clients) = %d", len(clients)) - return &mockClientAggregator{peers: clients} +func (mca *mockClientAggregator) GetHTTPClient(peer network.HTTPPeer) (*http.Client, error) { + return &http.Client{Transport: http.DefaultTransport}, nil } func TestSyncFromClient(t *testing.T) {