Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 41 additions & 10 deletions network/p2p/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,44 @@ func (s *HTTPServer) RegisterHTTPHandlerFunc(path string, handler func(http.Resp
})
}

// MakeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
func MakeHTTPClient(addrInfo *peer.AddrInfo) (*http.Client, error) {
clientStreamHost, err := libp2p.New(libp2p.NoListenAddrs)
if err != nil {
return nil, err
type httpClientConfig struct {
host host.Host
}

type httpClientOption func(*httpClientConfig)

// WithHost sets the libp2p host for the http client.
func WithHost(h host.Host) httpClientOption {
return func(o *httpClientConfig) {
o.host = h
}
}

// MakeTestHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
// This exported method is only used in tests.
func MakeTestHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Client, error) {
return makeHTTPClient(addrInfo, opts...)
}

// makeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
// If service is nil, a new libp2p host is created.
func makeHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Client, error) {
var config httpClientConfig
for _, opt := range opts {
opt(&config)
}

var clientStreamHost host.Host
if config.host != nil {
Comment thread
gmalouf marked this conversation as resolved.
clientStreamHost = config.host
} else {
var err error
clientStreamHost, err = libp2p.New(libp2p.NoListenAddrs)
if err != nil {
return nil, err
}
logging.Base().Debugf("MakeHTTPClient made a new P2P host %s for %s", clientStreamHost.ID(), addrInfo.String())
}
logging.Base().Debugf("MakeHTTPClient made a new P2P host %s for %s", clientStreamHost.ID(), addrInfo.String())

client := libp2phttp.Host{StreamHost: clientStreamHost}

Expand All @@ -97,13 +128,13 @@ func MakeHTTPClient(addrInfo *peer.AddrInfo) (*http.Client, error) {
return &http.Client{Transport: rt}, nil
}

// MakeHTTPClientWithRateLimit creates a http.Client that uses libp2p transport for a given protocol and peer address.
func MakeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, pstore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
cl, err := MakeHTTPClient(addrInfo)
// makeHTTPClientWithRateLimit creates a http.Client that uses libp2p transport for a given protocol and peer address.
func makeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, p2pService *serviceImpl, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
cl, err := makeHTTPClient(addrInfo, WithHost(p2pService.host))
if err != nil {
return nil, err
}
rltr := limitcaller.MakeRateLimitingBoundTransportWithRoundTripper(pstore, queueingTimeout, cl.Transport, string(addrInfo.ID))
rltr := limitcaller.MakeRateLimitingBoundTransportWithRoundTripper(connTimeStore, queueingTimeout, cl.Transport, string(addrInfo.ID))
cl.Transport = &rltr
return cl, nil

Expand Down
10 changes: 10 additions & 0 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"encoding/base32"
"fmt"
"net"
"net/http"
"runtime"
"strings"
"time"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network/limitcaller"
pstore "github.com/algorand/go-algorand/network/p2p/peerstore"
"github.com/algorand/go-algorand/network/phonebook"
"github.com/algorand/go-algorand/util/metrics"
Expand Down Expand Up @@ -69,6 +71,9 @@ type Service interface {
ListPeersForTopic(topic string) []peer.ID
Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCancellable, error)
Publish(ctx context.Context, topic string, data []byte) error

// GetHTTPClient returns a rate-limiting libp2p-streaming http client that can be used to make requests to the given peer
GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error)
}

// serviceImpl manages integration with libp2p and implements the Service interface
Expand Down Expand Up @@ -412,3 +417,8 @@ func addressFilter(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr {
}
return res
}

// GetHTTPClient returns a libp2p-streaming http client that can be used to make requests to the given peer
func (s *serviceImpl) GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
return makeHTTPClientWithRateLimit(addrInfo, s, connTimeStore, queueingTimeout)
}
2 changes: 1 addition & 1 deletion network/p2p/testing/httpNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (p httpPeer) GetAddress() string {

// GetAddress implements HTTPPeer interface and returns the http client for a peer
func (p httpPeer) GetHTTPClient() *http.Client {
c, err := p2p.MakeHTTPClient(&p.addrInfo)
c, err := p2p.MakeTestHTTPClient(&p.addrInfo)
require.NoError(p.tb, err)
return c
}
Expand Down
6 changes: 3 additions & 3 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func addrInfoToWsPeerCore(n *P2PNetwork, addrInfo *peer.AddrInfo) (wsPeerCore, b
}
addr := mas[0].String()

client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
client, err := n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
if err != nil {
n.log.Warnf("MakeHTTPClient failed: %v", err)
return wsPeerCore{}, false
Expand Down Expand Up @@ -718,7 +718,7 @@ func (n *P2PNetwork) GetHTTPClient(address string) (*http.Client, error) {
if err != nil {
return nil, err
}
return p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
return n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
}

// OnNetworkAdvance notifies the network library that the agreement protocol was able to make a notable progress.
Expand Down Expand Up @@ -771,7 +771,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea

// create a wsPeer for this stream and added it to the peers map.
addrInfo := &peer.AddrInfo{ID: p2pPeer, Addrs: []multiaddr.Multiaddr{ma}}
client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
client, err := n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
if err != nil {
n.log.Warnf("Cannot construct HTTP Client for %s: %v", p2pPeer, err)
client = nil
Expand Down
14 changes: 10 additions & 4 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ func (s *mockService) Publish(ctx context.Context, topic string, data []byte) er
return nil
}

func (s *mockService) GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
return nil, nil
}

func makeMockService(id peer.ID, addrs []ma.Multiaddr) *mockService {
return &mockService{
id: id,
Expand Down Expand Up @@ -757,7 +761,7 @@ func TestP2PHTTPHandler(t *testing.T) {
require.NoError(t, err)
require.NotZero(t, addrsA[0])

httpClient, err := p2p.MakeHTTPClient(&peerInfoA)
httpClient, err := p2p.MakeTestHTTPClient(&peerInfoA)
require.NoError(t, err)
resp, err := httpClient.Get("/test")
require.NoError(t, err)
Expand All @@ -768,7 +772,7 @@ func TestP2PHTTPHandler(t *testing.T) {
require.Equal(t, "hello", string(body))

// check another endpoint that also access the underlying connection/stream
httpClient, err = p2p.MakeHTTPClient(&peerInfoA)
httpClient, err = p2p.MakeTestHTTPClient(&peerInfoA)
require.NoError(t, err)
resp, err = httpClient.Get("/check-conn")
require.NoError(t, err)
Expand All @@ -780,10 +784,12 @@ func TestP2PHTTPHandler(t *testing.T) {

// check rate limiting client:
// zero clients allowed, rate limiting window (10s) is greater than queue deadline (1s)
netB, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)
pstore, err := peerstore.MakePhonebook(0, 10*time.Second)
require.NoError(t, err)
pstore.AddPersistentPeers([]*peer.AddrInfo{&peerInfoA}, "net", phonebook.PhoneBookEntryRelayRole)
httpClient, err = p2p.MakeHTTPClientWithRateLimit(&peerInfoA, pstore, 1*time.Second)
httpClient, err = netB.service.GetHTTPClient(&peerInfoA, pstore, 1*time.Second)
require.NoError(t, err)
_, err = httpClient.Get("/test")
require.ErrorIs(t, err, limitcaller.ErrConnectionQueueingTimeout)
Expand Down Expand Up @@ -815,7 +821,7 @@ func TestP2PHTTPHandlerAllInterfaces(t *testing.T) {
require.NotZero(t, addrsB[0])

t.Logf("peerInfoB: %s", peerInfoA)
httpClient, err := p2p.MakeHTTPClient(&peerInfoA)
httpClient, err := p2p.MakeTestHTTPClient(&peerInfoA)
require.NoError(t, err)
resp, err := httpClient.Get("/test")
require.NoError(t, err)
Expand Down