From 5d547cf46abb1773289e4c571ba71f4239a2e8b3 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 15 May 2024 20:18:08 -0700 Subject: [PATCH] rcmgr: Add conn_limiter to limit number of conns per ip cidr (#2788) * Add conn_limiter to limit number of conns per ip cidr * Handle the case where we want to call OpenConnection without an IP address * Delete key when count==0 --- p2p/host/resource-manager/conn_limiter.go | 141 ++++++++++++++++ .../resource-manager/conn_limiter_test.go | 158 ++++++++++++++++++ p2p/host/resource-manager/rcmgr.go | 56 ++++++- p2p/protocol/circuitv2/client/transport.go | 18 +- 4 files changed, 364 insertions(+), 9 deletions(-) create mode 100644 p2p/host/resource-manager/conn_limiter.go create mode 100644 p2p/host/resource-manager/conn_limiter_test.go diff --git a/p2p/host/resource-manager/conn_limiter.go b/p2p/host/resource-manager/conn_limiter.go new file mode 100644 index 0000000000..2cea6b30cd --- /dev/null +++ b/p2p/host/resource-manager/conn_limiter.go @@ -0,0 +1,141 @@ +package rcmgr + +import ( + "net/netip" + "sync" +) + +type ConnLimitPerCIDR struct { + // How many leading 1 bits in the mask + BitMask int + ConnCount int +} + +// 8 for now so that it matches the number of concurrent dials we may do +// in swarm_dial.go. With future smart dialing work we should bring this +// down +var defaultMaxConcurrentConns = 8 + +var defaultIP4Limit = ConnLimitPerCIDR{ + ConnCount: defaultMaxConcurrentConns, + BitMask: 32, +} +var defaultIP6Limits = []ConnLimitPerCIDR{ + { + ConnCount: defaultMaxConcurrentConns, + BitMask: 56, + }, + { + ConnCount: 8 * defaultMaxConcurrentConns, + BitMask: 48, + }, +} + +func WithLimitPeersPerCIDR(ipv4 []ConnLimitPerCIDR, ipv6 []ConnLimitPerCIDR) Option { + return func(rm *resourceManager) error { + if ipv4 != nil { + rm.connLimiter.connLimitPerCIDRIP4 = ipv4 + } + if ipv6 != nil { + rm.connLimiter.connLimitPerCIDRIP6 = ipv6 + } + return nil + } +} + +type connLimiter struct { + mu sync.Mutex + connLimitPerCIDRIP4 []ConnLimitPerCIDR + connLimitPerCIDRIP6 []ConnLimitPerCIDR + ip4connsPerLimit []map[string]int + ip6connsPerLimit []map[string]int +} + +func newConnLimiter() *connLimiter { + return &connLimiter{ + connLimitPerCIDRIP4: []ConnLimitPerCIDR{defaultIP4Limit}, + connLimitPerCIDRIP6: defaultIP6Limits, + } +} + +// addConn adds a connection for the given IP address. It returns true if the connection is allowed. +func (cl *connLimiter) addConn(ip netip.Addr) bool { + cl.mu.Lock() + defer cl.mu.Unlock() + limits := cl.connLimitPerCIDRIP4 + countsPerLimit := cl.ip4connsPerLimit + isIP6 := ip.Is6() + if isIP6 { + limits = cl.connLimitPerCIDRIP6 + countsPerLimit = cl.ip6connsPerLimit + } + + if len(countsPerLimit) == 0 && len(limits) > 0 { + countsPerLimit = make([]map[string]int, len(limits)) + if isIP6 { + cl.ip6connsPerLimit = countsPerLimit + } else { + cl.ip4connsPerLimit = countsPerLimit + } + } + + for i, limit := range limits { + prefix, err := ip.Prefix(limit.BitMask) + if err != nil { + return false + } + masked := prefix.String() + + counts, ok := countsPerLimit[i][masked] + if !ok { + if countsPerLimit[i] == nil { + countsPerLimit[i] = make(map[string]int) + } + countsPerLimit[i][masked] = 0 + } + if counts+1 > limit.ConnCount { + return false + } + } + + // All limit checks passed, now we update the counts + for i, limit := range limits { + prefix, _ := ip.Prefix(limit.BitMask) + masked := prefix.String() + countsPerLimit[i][masked]++ + } + + return true +} + +func (cl *connLimiter) rmConn(ip netip.Addr) { + cl.mu.Lock() + defer cl.mu.Unlock() + limits := cl.connLimitPerCIDRIP4 + countsPerLimit := cl.ip4connsPerLimit + isIP6 := ip.Is6() + if isIP6 { + limits = cl.connLimitPerCIDRIP6 + countsPerLimit = cl.ip6connsPerLimit + } + + for i, limit := range limits { + prefix, err := ip.Prefix(limit.BitMask) + if err != nil { + // Unexpected since we should have seen this IP before in addConn + log.Errorf("unexpected error getting prefix: %v", err) + continue + } + masked := prefix.String() + counts, ok := countsPerLimit[i][masked] + if !ok || counts == 0 { + // Unexpected, but don't panic + log.Errorf("unexpected conn count for %s ok=%v count=%v", masked, ok, counts) + continue + } + countsPerLimit[i][masked]-- + if countsPerLimit[i][masked] == 0 { + delete(countsPerLimit[i], masked) + } + } +} diff --git a/p2p/host/resource-manager/conn_limiter_test.go b/p2p/host/resource-manager/conn_limiter_test.go new file mode 100644 index 0000000000..9c871c4c69 --- /dev/null +++ b/p2p/host/resource-manager/conn_limiter_test.go @@ -0,0 +1,158 @@ +package rcmgr + +import ( + "encoding/binary" + "fmt" + "net" + "net/netip" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestItLimits(t *testing.T) { + t.Run("IPv4", func(t *testing.T) { + ip, err := netip.ParseAddr("1.2.3.4") + require.NoError(t, err) + cl := newConnLimiter() + cl.connLimitPerCIDRIP4[0].ConnCount = 1 + require.True(t, cl.addConn(ip)) + + // should fail the second time + require.False(t, cl.addConn(ip)) + + otherIP, err := netip.ParseAddr("1.2.3.5") + require.NoError(t, err) + require.True(t, cl.addConn(otherIP)) + }) + t.Run("IPv6", func(t *testing.T) { + ip, err := netip.ParseAddr("1:2:3:4::1") + require.NoError(t, err) + cl := newConnLimiter() + original := cl.connLimitPerCIDRIP6[0].ConnCount + cl.connLimitPerCIDRIP6[0].ConnCount = 1 + defer func() { + cl.connLimitPerCIDRIP6[0].ConnCount = original + }() + require.True(t, cl.addConn(ip)) + + // should fail the second time + require.False(t, cl.addConn(ip)) + otherIPSameSubnet := netip.MustParseAddr("1:2:3:4::2") + require.False(t, cl.addConn(otherIPSameSubnet)) + + otherIP := netip.MustParseAddr("2:2:3:4::2") + require.True(t, cl.addConn(otherIP)) + }) + + t.Run("IPv6 with multiple limits", func(t *testing.T) { + cl := newConnLimiter() + for i := 0; i < defaultMaxConcurrentConns; i++ { + ip := net.ParseIP("ff:2:3:4::1") + binary.BigEndian.PutUint16(ip[14:], uint16(i)) + ipAddr := netip.MustParseAddr(ip.String()) + require.True(t, cl.addConn(ipAddr)) + } + + // Next one should fail + ip := net.ParseIP("ff:2:3:4::1") + binary.BigEndian.PutUint16(ip[14:], uint16(defaultMaxConcurrentConns+1)) + require.False(t, cl.addConn(netip.MustParseAddr(ip.String()))) + + // But on a different root subnet should work + otherIP := netip.MustParseAddr("ffef:2:3::1") + require.True(t, cl.addConn(otherIP)) + + // But too many on the next subnet limit will fail too + for i := 0; i < defaultMaxConcurrentConns*8; i++ { + ip := net.ParseIP("ffef:2:3:4::1") + binary.BigEndian.PutUint16(ip[5:7], uint16(i)) + fmt.Println(ip.String()) + ipAddr := netip.MustParseAddr(ip.String()) + require.True(t, cl.addConn(ipAddr)) + } + + ip = net.ParseIP("ffef:2:3:4::1") + binary.BigEndian.PutUint16(ip[5:7], uint16(defaultMaxConcurrentConns*8+1)) + ipAddr := netip.MustParseAddr(ip.String()) + require.False(t, cl.addConn(ipAddr)) + }) +} + +func genIP(data *[]byte) (netip.Addr, bool) { + if len(*data) < 1 { + return netip.Addr{}, false + } + + genIP6 := (*data)[0]&0x01 == 1 + bytesRequired := 4 + if genIP6 { + bytesRequired = 16 + } + + if len((*data)[1:]) < bytesRequired { + return netip.Addr{}, false + } + + *data = (*data)[1:] + ip, ok := netip.AddrFromSlice((*data)[:bytesRequired]) + *data = (*data)[bytesRequired:] + return ip, ok +} + +func FuzzConnLimiter(f *testing.F) { + // The goal is to try to enter a state where the count is incorrectly 0 + f.Fuzz(func(t *testing.T, data []byte) { + ips := make([]netip.Addr, 0, len(data)/5) + for { + ip, ok := genIP(&data) + if !ok { + break + } + ips = append(ips, ip) + } + + cl := newConnLimiter() + addedConns := make([]netip.Addr, 0, len(ips)) + for _, ip := range ips { + if cl.addConn(ip) { + addedConns = append(addedConns, ip) + } + } + + addedCount := 0 + for _, ip := range cl.ip4connsPerLimit { + for _, count := range ip { + addedCount += count + } + } + for _, ip := range cl.ip6connsPerLimit { + for _, count := range ip { + addedCount += count + } + } + if addedCount == 0 && len(addedConns) > 0 { + t.Fatalf("added count: %d", addedCount) + } + + for _, ip := range addedConns { + cl.rmConn(ip) + } + + leftoverCount := 0 + for _, ip := range cl.ip4connsPerLimit { + for _, count := range ip { + leftoverCount += count + } + } + for _, ip := range cl.ip6connsPerLimit { + for _, count := range ip { + leftoverCount += count + } + } + if leftoverCount != 0 { + t.Fatalf("leftover count: %d", leftoverCount) + } + }) + +} diff --git a/p2p/host/resource-manager/rcmgr.go b/p2p/host/resource-manager/rcmgr.go index 188a171f56..dceb18bd8a 100644 --- a/p2p/host/resource-manager/rcmgr.go +++ b/p2p/host/resource-manager/rcmgr.go @@ -3,6 +3,7 @@ package rcmgr import ( "context" "fmt" + "net/netip" "strings" "sync" "time" @@ -13,6 +14,7 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" ) var log = logging.Logger("rcmgr") @@ -20,6 +22,8 @@ var log = logging.Logger("rcmgr") type resourceManager struct { limits Limiter + connLimiter *connLimiter + trace *trace metrics *metrics disableMetrics bool @@ -103,6 +107,7 @@ type connectionScope struct { rcmgr *resourceManager peer *peerScope endpoint multiaddr.Multiaddr + ip netip.Addr } var _ network.ConnScope = (*connectionScope)(nil) @@ -129,11 +134,12 @@ type Option func(*resourceManager) error func NewResourceManager(limits Limiter, opts ...Option) (network.ResourceManager, error) { allowlist := newAllowlist() r := &resourceManager{ - limits: limits, - allowlist: &allowlist, - svc: make(map[string]*serviceScope), - proto: make(map[protocol.ID]*protocolScope), - peer: make(map[peer.ID]*peerScope), + limits: limits, + connLimiter: newConnLimiter(), + allowlist: &allowlist, + svc: make(map[string]*serviceScope), + proto: make(map[protocol.ID]*protocolScope), + peer: make(map[peer.ID]*peerScope), } for _, opt := range opts { @@ -310,12 +316,38 @@ func (r *resourceManager) nextStreamId() int64 { return r.streamId } +// OpenConnectionNoIP is like OpenConnection, but does not use IP information. +// Used when we still want to limit the connection by other scopes, but don't +// have IP information like when we are relaying. +func (r *resourceManager) OpenConnectionNoIP(dir network.Direction, usefd bool, endpoint multiaddr.Multiaddr) (network.ConnManagementScope, error) { + return r.openConnection(dir, usefd, endpoint, netip.Addr{}) +} + func (r *resourceManager) OpenConnection(dir network.Direction, usefd bool, endpoint multiaddr.Multiaddr) (network.ConnManagementScope, error) { + ip, err := manet.ToIP(endpoint) + if err != nil { + return nil, err + } + + ipAddr, ok := netip.AddrFromSlice(ip) + if !ok { + return nil, fmt.Errorf("failed to convert ip to netip.Addr") + } + return r.openConnection(dir, usefd, endpoint, ipAddr) +} + +func (r *resourceManager) openConnection(dir network.Direction, usefd bool, endpoint multiaddr.Multiaddr, ip netip.Addr) (network.ConnManagementScope, error) { + if ip.IsValid() { + if ok := r.connLimiter.addConn(ip); !ok { + return nil, fmt.Errorf("connections per ip limit exceeded for %s", endpoint) + } + } + var conn *connectionScope - conn = newConnectionScope(dir, usefd, r.limits.GetConnLimits(), r, endpoint) + conn = newConnectionScope(dir, usefd, r.limits.GetConnLimits(), r, endpoint, ip) err := conn.AddConn(dir, usefd) - if err != nil { + if err != nil && ip.IsValid() { // Try again if this is an allowlisted connection // Failed to open connection, let's see if this was allowlisted and try again allowed := r.allowlist.Allowed(endpoint) @@ -476,7 +508,7 @@ func newPeerScope(p peer.ID, limit Limit, rcmgr *resourceManager) *peerScope { } } -func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *resourceManager, endpoint multiaddr.Multiaddr) *connectionScope { +func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *resourceManager, endpoint multiaddr.Multiaddr, ip netip.Addr) *connectionScope { return &connectionScope{ resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, @@ -485,6 +517,7 @@ func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *r usefd: usefd, rcmgr: rcmgr, endpoint: endpoint, + ip: ip, } } @@ -643,6 +676,13 @@ func (s *connectionScope) PeerScope() network.PeerScope { return s.peer } +func (s *connectionScope) Done() { + if s.ip.IsValid() { + s.rcmgr.connLimiter.rmConn(s.ip) + } + s.resourceScope.Done() +} + // transferAllowedToStandard transfers this connection scope from being part of // the allowlist set of scopes to being part of the standard set of scopes. // Happens when we first allowlisted this connection due to its IP, but later diff --git a/p2p/protocol/circuitv2/client/transport.go b/p2p/protocol/circuitv2/client/transport.go index e08d55707e..135fdcf017 100644 --- a/p2p/protocol/circuitv2/client/transport.go +++ b/p2p/protocol/circuitv2/client/transport.go @@ -48,8 +48,24 @@ func AddTransport(h host.Host, upgrader transport.Upgrader) error { var _ transport.Transport = (*Client)(nil) var _ io.Closer = (*Client)(nil) +// If the resource manager supports OpenConnectionNoIP, we'll use it to open connections. +// That's because the swarm is already limiting by IP address at the swarm +// level, and here we want to limit by everything but the IP. +// Some ResourceManager implementations may not care about IP addresses, so we +// do our own interface check to see if they provide this option. +type rcmgrOpenConnectionNoIPer interface { + OpenConnectionNoIP(network.Direction, bool, ma.Multiaddr) (network.ConnManagementScope, error) +} + func (c *Client) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (transport.CapableConn, error) { - connScope, err := c.host.Network().ResourceManager().OpenConnection(network.DirOutbound, false, a) + var connScope network.ConnManagementScope + var err error + if rcmgr, ok := c.host.Network().ResourceManager().(rcmgrOpenConnectionNoIPer); ok { + connScope, err = rcmgr.OpenConnectionNoIP(network.DirOutbound, false, a) + } else { + connScope, err = c.host.Network().ResourceManager().OpenConnection(network.DirOutbound, false, a) + } + if err != nil { return nil, err }