Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 24 additions & 25 deletions network/limitcaller/rateLimitingTransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/algorand/go-algorand/util"
"github.com/libp2p/go-libp2p/core/peer"
)

// ConnectionTimeStore is a subset of the phonebook that is used to store the connection times.
Expand All @@ -31,12 +30,12 @@ type ConnectionTimeStore interface {
UpdateConnectionTime(addrOrPeerID string, provisionalTime time.Time) bool
}

// RateLimitingTransport is the transport for execute a single HTTP transaction, obtaining the Response for a given Request.
type RateLimitingTransport struct {
// RateLimitingBoundTransport is the transport for execute a single HTTP transaction, obtaining the Response for a given Request.
type RateLimitingBoundTransport struct {
phonebook ConnectionTimeStore
innerTransport http.RoundTripper
queueingTimeout time.Duration
targetAddr interface{} // target address for the p2p http request
addrOrPeerID string
}

// DefaultQueueingTimeout is the default timeout for queueing the request.
Expand All @@ -46,9 +45,10 @@ const DefaultQueueingTimeout = 10 * time.Second
// queueing the current request before the request attempt could be made.
var ErrConnectionQueueingTimeout = errors.New("rateLimitingTransport: queueing timeout")

// MakeRateLimitingTransport creates a rate limiting http transport that would limit the requests rate
// according to the entries in the phonebook.
func MakeRateLimitingTransport(phonebook ConnectionTimeStore, queueingTimeout time.Duration, dialer *Dialer, maxIdleConnsPerHost int) RateLimitingTransport {
// MakeRateLimitingBoundTransport creates a rate limiting http transport that that:
// 1. would limit the requests rate according to the entries in the phonebook.
// 2. is bound to a specific target.
func MakeRateLimitingBoundTransport(phonebook ConnectionTimeStore, queueingTimeout time.Duration, dialer *Dialer, maxIdleConnsPerHost int, target string) RateLimitingBoundTransport {
defaultTransport := http.DefaultTransport.(*http.Transport)
innerTransport := &http.Transport{
Proxy: defaultTransport.Proxy,
Expand All @@ -59,37 +59,36 @@ func MakeRateLimitingTransport(phonebook ConnectionTimeStore, queueingTimeout ti
ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout,
MaxIdleConnsPerHost: maxIdleConnsPerHost,
}
return MakeRateLimitingTransportWithRoundTripper(phonebook, queueingTimeout, innerTransport, nil, maxIdleConnsPerHost)
return MakeRateLimitingBoundTransportWithRoundTripper(phonebook, queueingTimeout, innerTransport, target)
}

// MakeRateLimitingTransportWithRoundTripper creates a rate limiting http transport that would limit the requests rate
// according to the entries in the phonebook.
func MakeRateLimitingTransportWithRoundTripper(phonebook ConnectionTimeStore, queueingTimeout time.Duration, rt http.RoundTripper, target interface{}, maxIdleConnsPerHost int) RateLimitingTransport {
Comment thread
cce marked this conversation as resolved.
return RateLimitingTransport{
// MakeRateLimitingBoundTransportWithRoundTripper creates a rate limiting http transport that:
// 1. would limit the requests rate according to the entries in the phonebook.
// 2. is bound to a specific target.
func MakeRateLimitingBoundTransportWithRoundTripper(phonebook ConnectionTimeStore, queueingTimeout time.Duration, rt http.RoundTripper, target string) RateLimitingBoundTransport {
return RateLimitingBoundTransport{
phonebook: phonebook,
innerTransport: rt,
queueingTimeout: queueingTimeout,
targetAddr: target,
addrOrPeerID: target,
}
}

// RoundTrip connects to the address on the named network using the provided context.
// It waits if needed not to exceed connectionsRateLimitingCount.
func (r *RateLimitingTransport) RoundTrip(req *http.Request) (res *http.Response, err error) {
func (r *RateLimitingBoundTransport) RoundTrip(req *http.Request) (res *http.Response, err error) {
var waitTime time.Duration
var provisionalTime time.Time
queueingDeadline := time.Now().Add(r.queueingTimeout)
addrOrPeerID := req.Host
// p2p/http clients have per-connection transport and address info so use that
if len(req.Host) == 0 && req.URL != nil && len(req.URL.Host) == 0 {
Comment thread
cce marked this conversation as resolved.
addrInfo, ok := r.targetAddr.(*peer.AddrInfo)
if !ok {
return nil, errors.New("rateLimitingTransport: request without Host/URL and targetAddr is not a peer.AddrInfo")
}
addrOrPeerID = string(addrInfo.ID)
if r.addrOrPeerID == "" {
return nil, errors.New("rateLimitingTransport: target not set")
}
if req.URL != nil && req.URL.Host != "" && req.URL.Host != r.addrOrPeerID {
Comment thread
gmalouf marked this conversation as resolved.
return nil, errors.New("rateLimitingTransport: request URL host does not match the target")
}

queueingDeadline := time.Now().Add(r.queueingTimeout)
for {
_, waitTime, provisionalTime = r.phonebook.GetConnectionWaitTime(addrOrPeerID)
_, waitTime, provisionalTime = r.phonebook.GetConnectionWaitTime(r.addrOrPeerID)
if waitTime == 0 {
break // break out of the loop and proceed to the connection
}
Expand All @@ -101,6 +100,6 @@ func (r *RateLimitingTransport) RoundTrip(req *http.Request) (res *http.Response
return nil, ErrConnectionQueueingTimeout
}
res, err = r.innerTransport.RoundTrip(req)
r.phonebook.UpdateConnectionTime(addrOrPeerID, provisionalTime)
r.phonebook.UpdateConnectionTime(r.addrOrPeerID, provisionalTime)
return
}
72 changes: 72 additions & 0 deletions network/limitcaller/rateLimitingTransport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package limitcaller

import (
"net/http"
"testing"
"time"

"github.com/algorand/go-algorand/test/partitiontest"
"github.com/stretchr/testify/require"
)

type ctStore struct {
t *testing.T
getCnt uint64
}

func (c *ctStore) GetConnectionWaitTime(addrOrPeerID string) (bool, time.Duration, time.Time) {
require.NotEmpty(c.t, addrOrPeerID)
c.getCnt++
return false, 0, time.Time{}
}

func (c *ctStore) UpdateConnectionTime(addrOrPeerID string, provisionalTime time.Time) bool {
require.NotEmpty(c.t, addrOrPeerID)
return false
}

type emptyRoundTripper struct{}

func (e *emptyRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, nil }

func TestRoundTrip(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

ctStore := ctStore{t: t}
rtt := MakeRateLimitingBoundTransportWithRoundTripper(&ctStore, 0, &emptyRoundTripper{}, "")
req := &http.Request{}
_, err := rtt.RoundTrip(req)
require.ErrorContains(t, err, "target not set")
require.Equal(t, uint64(0), ctStore.getCnt)

rtt = MakeRateLimitingBoundTransportWithRoundTripper(&ctStore, 0, &emptyRoundTripper{}, "mytarget")
req, err = http.NewRequest("GET", "https://example.com/test", nil)
require.NoError(t, err)
_, err = rtt.RoundTrip(req)
require.ErrorContains(t, err, "URL host does not match the target")
require.Equal(t, uint64(0), ctStore.getCnt)

rtt = MakeRateLimitingBoundTransportWithRoundTripper(&ctStore, 0, &emptyRoundTripper{}, "mytarget")
req, err = http.NewRequest("GET", "/test", nil)
require.NoError(t, err)
_, err = rtt.RoundTrip(req)
require.NoError(t, err)
require.Equal(t, uint64(1), ctStore.getCnt)
}
6 changes: 3 additions & 3 deletions network/p2p/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ func MakeHTTPClient(addrInfo *peer.AddrInfo) (*http.Client, error) {
}

// MakeHTTPClientWithRateLimit creates a http.Client that uses libp2p transport for a given protocol and peer address.
func MakeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, pstore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration, maxIdleConnsPerHost int) (*http.Client, error) {
func MakeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, pstore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
cl, err := MakeHTTPClient(addrInfo)
if err != nil {
return nil, err
}
rlrt := limitcaller.MakeRateLimitingTransportWithRoundTripper(pstore, queueingTimeout, cl.Transport, addrInfo, maxIdleConnsPerHost)
cl.Transport = &rlrt
rltr := limitcaller.MakeRateLimitingBoundTransportWithRoundTripper(pstore, queueingTimeout, cl.Transport, string(addrInfo.ID))
cl.Transport = &rltr
return cl, nil

}
9 changes: 3 additions & 6 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,7 @@ func addrInfoToWsPeerCore(n *P2PNetwork, addrInfo *peer.AddrInfo) (wsPeerCore, b
}
addr := mas[0].String()

maxIdleConnsPerHost := int(n.config.ConnectionsRateLimitingCount)
client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout, maxIdleConnsPerHost)
client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
if err != nil {
n.log.Warnf("MakeHTTPClient failed: %v", err)
return wsPeerCore{}, false
Expand Down Expand Up @@ -720,8 +719,7 @@ func (n *P2PNetwork) GetHTTPClient(address string) (*http.Client, error) {
if err != nil {
return nil, err
}
maxIdleConnsPerHost := int(n.config.ConnectionsRateLimitingCount)
return p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout, maxIdleConnsPerHost)
return p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
}

// OnNetworkAdvance notifies the network library that the agreement protocol was able to make a notable progress.
Expand Down Expand Up @@ -774,8 +772,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea

// create a wsPeer for this stream and added it to the peers map.
addrInfo := &peer.AddrInfo{ID: p2pPeer, Addrs: []multiaddr.Multiaddr{ma}}
maxIdleConnsPerHost := int(n.config.ConnectionsRateLimitingCount)
client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout, maxIdleConnsPerHost)
client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
if err != nil {
n.log.Warnf("Cannot construct HTTP Client for %s: %v", p2pPeer, err)
client = nil
Expand Down
2 changes: 1 addition & 1 deletion network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ func TestP2PHTTPHandler(t *testing.T) {
pstore, err := peerstore.MakePhonebook(0, 10*time.Second)
require.NoError(t, err)
pstore.AddPersistentPeers([]*peer.AddrInfo{&peerInfoA}, "net", phonebook.PhoneBookEntryRelayRole)
httpClient, err = p2p.MakeHTTPClientWithRateLimit(&peerInfoA, pstore, 1*time.Second, 1)
httpClient, err = p2p.MakeHTTPClientWithRateLimit(&peerInfoA, pstore, 1*time.Second)
require.NoError(t, err)
_, err = httpClient.Get("/test")
require.ErrorIs(t, err, limitcaller.ErrConnectionQueueingTimeout)
Expand Down
19 changes: 13 additions & 6 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,9 @@ type WebsocketNetwork struct {
// number of throttled outgoing connections "slots" needed to be populated.
throttledOutgoingConnections atomic.Int32

// transport and dialer are customized to limit the number of
// dialer is customized to limit the number of
// connection in compliance with connectionsRateLimitingCount.
transport limitcaller.RateLimitingTransport
dialer limitcaller.Dialer
dialer limitcaller.Dialer

// messagesOfInterest specifies the message types that this node
// wants to receive. nil means default. non-nil causes this
Expand Down Expand Up @@ -565,9 +564,7 @@ func (wn *WebsocketNetwork) setup() {
if wn.nodeInfo == nil {
wn.nodeInfo = &nopeNodeInfo{}
}
maxIdleConnsPerHost := int(wn.config.ConnectionsRateLimitingCount)
wn.dialer = limitcaller.MakeRateLimitingDialer(wn.phonebook, preferredResolver)
wn.transport = limitcaller.MakeRateLimitingTransport(wn.phonebook, limitcaller.DefaultQueueingTimeout, &wn.dialer, maxIdleConnsPerHost)

wn.upgrader.ReadBufferSize = 4096
wn.upgrader.WriteBufferSize = 4096
Expand Down Expand Up @@ -1975,8 +1972,18 @@ func (wn *WebsocketNetwork) numOutgoingPending() int {
// GetHTTPClient returns a http.Client with a suitable for the network Transport
// that would also limit the number of outgoing connections.
func (wn *WebsocketNetwork) GetHTTPClient(address string) (*http.Client, error) {
url, err := addr.ParseHostOrURL(address)
if err != nil {
return nil, err
}

maxIdleConnsPerHost := int(wn.config.ConnectionsRateLimitingCount)
Comment thread
gmalouf marked this conversation as resolved.
rltr := limitcaller.MakeRateLimitingBoundTransport(wn.phonebook, limitcaller.DefaultQueueingTimeout, &wn.dialer, maxIdleConnsPerHost, url.Host)
return &http.Client{
Transport: &HTTPPAddressBoundTransport{address, &wn.transport},
Transport: &HTTPPAddressBoundTransport{
address,
&rltr,
},
}, nil
}

Expand Down
39 changes: 39 additions & 0 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4601,3 +4601,42 @@ func TestHTTPPAddressBoundTransport(t *testing.T) {
}
}
}

// TestWebsocketNetworkHTTPClient checks ws net HTTP client can connect to another node
// with out unexpected errors
func TestWebsocketNetworkHTTPClient(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

netA := makeTestWebsocketNode(t)
err := netA.Start()
require.NoError(t, err)
defer netStop(t, netA, "A")

netB := makeTestWebsocketNodeWithConfig(t, defaultConfig)

addr, ok := netA.Address()
require.True(t, ok)

c, err := netB.GetHTTPClient(addr)
require.NoError(t, err)

netA.RegisterHTTPHandlerFunc("/handled", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

resp, err := c.Do(&http.Request{URL: &url.URL{Path: "/handled"}})
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

resp, err = c.Do(&http.Request{URL: &url.URL{Path: "/test"}})
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, resp.StatusCode) // no such handler

resp, err = c.Do(&http.Request{URL: &url.URL{Path: "/v1/" + genesisID + "/gossip"}})
require.NoError(t, err)
require.Equal(t, http.StatusPreconditionFailed, resp.StatusCode) // not enough ws peer headers

_, err = netB.GetHTTPClient("invalid")
require.Error(t, err)
}