From 6f73bc6af1c3beef13c9049ffffb145a328b97c9 Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Wed, 15 May 2024 12:55:59 +0200 Subject: [PATCH] feat: improved resource manager based on Rainbow --- CHANGELOG.md | 4 +- docs/environment-variables.md | 35 ++++++++ go.mod | 5 +- go.sum | 2 + internal/fd/sys_not_unix.go | 7 ++ internal/fd/sys_unix.go | 17 ++++ internal/fd/sys_windows.go | 11 +++ main.go | 48 ++++++++++- rcmgr.go | 155 ++++++++++++++++++++++++++++++++++ server.go | 68 ++++++++------- 10 files changed, 315 insertions(+), 37 deletions(-) create mode 100644 internal/fd/sys_not_unix.go create mode 100644 internal/fd/sys_unix.go create mode 100644 internal/fd/sys_windows.go create mode 100644 rcmgr.go diff --git a/CHANGELOG.md b/CHANGELOG.md index fdc928b..5bd1bd8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ The following emojis are used to highlight certain changes: ### Changed +- The resource manager's defaults have been improved based on Rainbow's and Kubo's defaults. In addition, you can now customize a few options using flags, or [environment variables](./docs/environment-variables.md). + ### Removed ### Fixed @@ -29,8 +31,6 @@ The following emojis are used to highlight certain changes: - The `/routing/v1/peers` endpoint correctly filters out private addresses. -### Security - ## [v0.2.1] ### Fixed diff --git a/docs/environment-variables.md b/docs/environment-variables.md index f4de147..d86a1b1 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -8,6 +8,11 @@ - [`SOMEGUY_PROVIDER_ENDPOINTS`](#someguy_provider_endpoints) - [`SOMEGUY_PEER_ENDPOINTS`](#someguy_peer_endpoints) - [`SOMEGUY_IPNS_ENDPOINTS`](#someguy_ipns_endpoints) + - [`SOMEGUY_CONNMGR_LOW`](#someguy_connmgr_low) + - [`SOMEGUY_CONNMGR_HIGH`](#someguy_connmgr_high) + - [`SOMEGUY_CONNMGR_GRACE_PERIOD`](#someguy_connmgr_grace_period) + - [`SOMEGUY_MAX_MEMORY`](#someguy_max_memory) + - [`SOMEGUY_MAX_FD`](#someguy_max_fd) - [Logging](#logging) - [`GOLOG_LOG_LEVEL`](#golog_log_level) - [`GOLOG_LOG_FMT`](#golog_log_fmt) @@ -46,6 +51,36 @@ Comma-separated list of other Delegated Routing V1 endpoints to proxy IPNS reque Default: none +### `SOMEGUY_CONNMGR_LOW` + +Minimum number of connections to keep. + +Default: 100 + +### `SOMEGUY_CONNMGR_HIGH` + +Maximum number of connections to keep. + +Default: 100 + +### `SOMEGUY_CONNMGR_GRACE_PERIOD` + +Minimum connection TTL. + +Default: 1m + +### `SOMEGUY_MAX_MEMORY` + +Maximum memory to use. + +Default: 0 (85% of the system's available RAM) + +### `SOMEGUY_MAX_FD` + +Maximum number of file descriptors. + +Default: 0 (50% of the process' limit) + ## Logging ### `GOLOG_LOG_LEVEL` diff --git a/go.mod b/go.mod index 859a27e..67b44d2 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( github.com/CAFxX/httpcompression v0.0.9 + github.com/dustin/go-humanize v1.0.1 github.com/felixge/httpsnoop v1.0.4 github.com/ipfs/boxo v0.19.1-0.20240515083429-ac0bab3926a8 github.com/ipfs/go-cid v0.4.1 @@ -14,11 +15,13 @@ require ( github.com/multiformats/go-multiaddr v0.12.3 github.com/multiformats/go-multibase v0.2.0 github.com/multiformats/go-multihash v0.2.3 + github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/prometheus/client_golang v1.19.0 github.com/rs/cors v1.10.1 github.com/slok/go-http-metrics v0.11.0 github.com/stretchr/testify v1.9.0 github.com/urfave/cli/v2 v2.27.1 + golang.org/x/sys v0.19.0 ) require ( @@ -89,7 +92,6 @@ require ( github.com/onsi/ginkgo/v2 v2.17.1 // indirect github.com/opencontainers/runtime-spec v1.2.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect - github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/polydawn/refmt v0.89.0 // indirect @@ -120,7 +122,6 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.20.0 // indirect gonum.org/v1/gonum v0.15.0 // indirect diff --git a/go.sum b/go.sum index 3884381..f209a31 100644 --- a/go.sum +++ b/go.sum @@ -75,6 +75,8 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elastic/gosigar v0.14.3 h1:xwkKwPia+hSfg9GqrCUKYdId102m9qTJIIr7egmK/uo= github.com/elastic/gosigar v0.14.3/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= diff --git a/internal/fd/sys_not_unix.go b/internal/fd/sys_not_unix.go new file mode 100644 index 0000000..c857987 --- /dev/null +++ b/internal/fd/sys_not_unix.go @@ -0,0 +1,7 @@ +//go:build !linux && !darwin && !windows + +package fd + +func GetNumFDs() int { + return 0 +} diff --git a/internal/fd/sys_unix.go b/internal/fd/sys_unix.go new file mode 100644 index 0000000..9c96576 --- /dev/null +++ b/internal/fd/sys_unix.go @@ -0,0 +1,17 @@ +//go:build linux || darwin +// +build linux darwin + +// Package fd provides filesystem descriptor count for different architectures. +package fd + +import ( + "golang.org/x/sys/unix" +) + +func GetNumFDs() int { + var l unix.Rlimit + if err := unix.Getrlimit(unix.RLIMIT_NOFILE, &l); err != nil { + return 0 + } + return int(l.Cur) +} diff --git a/internal/fd/sys_windows.go b/internal/fd/sys_windows.go new file mode 100644 index 0000000..eec17f3 --- /dev/null +++ b/internal/fd/sys_windows.go @@ -0,0 +1,11 @@ +//go:build windows + +package fd + +import ( + "math" +) + +func GetNumFDs() int { + return math.MaxInt +} diff --git a/main.go b/main.go index 3315803..5f640e3 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "errors" "log" "os" + "time" "github.com/ipfs/boxo/ipns" "github.com/ipfs/go-cid" @@ -53,9 +54,54 @@ func main() { EnvVars: []string{"SOMEGUY_IPNS_ENDPOINTS"}, Usage: "other Delegated Routing V1 endpoints to proxy IPNS requests to", }, + &cli.IntFlag{ + Name: "connmgr-low", + Value: 100, + EnvVars: []string{"SOMEGUY_CONNMGR_LOW"}, + Usage: "minimum number of connections to keep", + }, + &cli.IntFlag{ + Name: "connmgr-high", + Value: 3000, + EnvVars: []string{"SOMEGUY_CONNMGR_HIGH"}, + Usage: "maximum number of connections to keep", + }, + &cli.DurationFlag{ + Name: "connmgr-grace", + Value: time.Minute, + EnvVars: []string{"SOMEGUY_CONNMGR_GRACE_PERIOD"}, + Usage: "minimum connection TTL", + }, + &cli.Uint64Flag{ + Name: "max-memory", + Value: 0, + EnvVars: []string{"SOMEGUY_MAX_MEMORY"}, + Usage: "maximum memory to use. Defaults to 85% of the system's available RAM", + }, + &cli.Uint64Flag{ + Name: "max-fd", + Value: 0, + EnvVars: []string{"SOMEGUY_MAX_FD"}, + Usage: "maximum number of file descriptors. Defaults to 50% of the process' limit", + }, }, Action: func(ctx *cli.Context) error { - return start(ctx.Context, ctx.String("listen-address"), ctx.Bool("accelerated-dht"), ctx.StringSlice("provider-endpoints"), ctx.StringSlice("peer-endpoints"), ctx.StringSlice("ipns-endpoints")) + cfg := &config{ + listenAddress: ctx.String("listen-address"), + acceleratedDHTClient: ctx.Bool("accelerated-dht"), + + contentEndpoints: ctx.StringSlice("provider-endpoints"), + peerEndpoints: ctx.StringSlice("peer-endpoints"), + ipnsEndpoints: ctx.StringSlice("ipns-endpoints"), + + connMgrLow: ctx.Int("connmgr-low"), + connMgrHi: ctx.Int("connmgr-high"), + connMgrGrace: ctx.Duration("connmgr-grace"), + maxMemory: ctx.Uint64("max-memory"), + maxFD: ctx.Int("max-fd"), + } + + return start(ctx.Context, cfg) }, }, { diff --git a/rcmgr.go b/rcmgr.go new file mode 100644 index 0000000..9fdbc24 --- /dev/null +++ b/rcmgr.go @@ -0,0 +1,155 @@ +package main + +import ( + "log" + + "github.com/dustin/go-humanize" + "github.com/pbnjay/memory" + + "github.com/ipfs/someguy/internal/fd" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/network" + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" +) + +// Note: this comes from rainbow/rcmgr.go with minimal adaptations. + +var infiniteResourceLimits = rcmgr.InfiniteLimits.ToPartialLimitConfig().System + +func makeResourceMgrs(maxMemory uint64, maxFD int, connMgrHighWater int) (rm network.ResourceManager, err error) { + if maxMemory == 0 { + maxMemory = uint64((float64(memory.TotalMemory()) * 0.85)) + } + if maxFD == 0 { + maxFD = fd.GetNumFDs() / 2 + } + return rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(makeResourceManagerConfig(maxMemory, maxFD, connMgrHighWater))) +} + +func makeResourceManagerConfig(maxMemory uint64, maxFD int, connMgrHighWater int) (limitConfig rcmgr.ConcreteLimitConfig) { + if maxMemory == 0 { + maxMemory = uint64((float64(memory.TotalMemory()) * 0.85)) + } + if maxFD == 0 { + maxFD = fd.GetNumFDs() / 2 + } + + maxMemoryMB := maxMemory / (1024 * 1024) + + // At least as of 2023-01-25, it's possible to open a connection that + // doesn't ask for any memory usage with the libp2p Resource Manager/Accountant + // (see https://github.com/libp2p/go-libp2p/issues/2010#issuecomment-1404280736). + // As a result, we can't currently rely on Memory limits to full protect us. + // Until https://github.com/libp2p/go-libp2p/issues/2010 is addressed, + // we take a proxy now of restricting to 1 inbound connection per MB. + // Note: this is more generous than go-libp2p's default autoscaled limits which do + // 64 connections per 1GB + // (see https://github.com/libp2p/go-libp2p/blob/master/p2p/host/resource-manager/limit_defaults.go#L357 ). + systemConnsInbound := int(1 * maxMemoryMB) + + partialLimits := rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{ + Memory: rcmgr.LimitVal64(maxMemory), + FD: rcmgr.LimitVal(maxFD), + + Conns: rcmgr.Unlimited, + ConnsInbound: rcmgr.LimitVal(systemConnsInbound), + ConnsOutbound: rcmgr.Unlimited, + + Streams: rcmgr.Unlimited, + StreamsOutbound: rcmgr.Unlimited, + StreamsInbound: rcmgr.Unlimited, + }, + + // Transient connections won't cause any memory to be accounted for by the resource manager/accountant. + // Only established connections do. + // As a result, we can't rely on System.Memory to protect us from a bunch of transient connection being opened. + // We limit the same values as the System scope, but only allow the Transient scope to take 25% of what is allowed for the System scope. + Transient: rcmgr.ResourceLimits{ + Memory: rcmgr.LimitVal64(maxMemory / 4), + FD: rcmgr.LimitVal(maxFD / 4), + + Conns: rcmgr.Unlimited, + ConnsInbound: rcmgr.LimitVal(systemConnsInbound / 4), + ConnsOutbound: rcmgr.Unlimited, + + Streams: rcmgr.Unlimited, + StreamsInbound: rcmgr.Unlimited, + StreamsOutbound: rcmgr.Unlimited, + }, + + // Lets get out of the way of the allow list functionality. + // If someone specified "Swarm.ResourceMgr.Allowlist" we should let it go through. + AllowlistedSystem: infiniteResourceLimits, + + AllowlistedTransient: infiniteResourceLimits, + + // Keep it simple by not having Service, ServicePeer, Protocol, ProtocolPeer, Conn, or Stream limits. + ServiceDefault: infiniteResourceLimits, + + ServicePeerDefault: infiniteResourceLimits, + + ProtocolDefault: infiniteResourceLimits, + + ProtocolPeerDefault: infiniteResourceLimits, + + Conn: infiniteResourceLimits, + + Stream: infiniteResourceLimits, + + // Limit the resources consumed by a peer. + // This doesn't protect us against intentional DoS attacks since an attacker can easily spin up multiple peers. + // We specify this limit against unintentional DoS attacks (e.g., a peer has a bug and is sending too much traffic intentionally). + // In that case we want to keep that peer's resource consumption contained. + // To keep this simple, we only constrain inbound connections and streams. + PeerDefault: rcmgr.ResourceLimits{ + Memory: rcmgr.Unlimited64, + FD: rcmgr.Unlimited, + Conns: rcmgr.Unlimited, + ConnsInbound: rcmgr.DefaultLimit, + ConnsOutbound: rcmgr.Unlimited, + Streams: rcmgr.Unlimited, + StreamsInbound: rcmgr.DefaultLimit, + StreamsOutbound: rcmgr.Unlimited, + }, + } + + scalingLimitConfig := rcmgr.DefaultLimits + libp2p.SetDefaultServiceLimits(&scalingLimitConfig) + + // Anything set above in partialLimits that had a value of rcmgr.DefaultLimit will be overridden. + // Anything in scalingLimitConfig that wasn't defined in partialLimits above will be added (e.g., libp2p's default service limits). + partialLimits = partialLimits.Build(scalingLimitConfig.Scale(int64(maxMemory), maxFD)).ToPartialLimitConfig() + + // Simple checks to override autoscaling ensuring limits make sense versus the connmgr values. + // There are ways to break this, but this should catch most problems already. + // We might improve this in the future. + // See: https://github.com/ipfs/kubo/issues/9545 + if partialLimits.System.ConnsInbound > rcmgr.DefaultLimit { + maxInboundConns := int(partialLimits.System.ConnsInbound) + if connmgrHighWaterTimesTwo := connMgrHighWater * 2; maxInboundConns < connmgrHighWaterTimesTwo { + maxInboundConns = connmgrHighWaterTimesTwo + } + + if maxInboundConns < 800 { + maxInboundConns = 800 + } + + // Scale System.StreamsInbound as well, but use the existing ratio of StreamsInbound to ConnsInbound + if partialLimits.System.StreamsInbound > rcmgr.DefaultLimit { + partialLimits.System.StreamsInbound = rcmgr.LimitVal(int64(maxInboundConns) * int64(partialLimits.System.StreamsInbound) / int64(partialLimits.System.ConnsInbound)) + } + partialLimits.System.ConnsInbound = rcmgr.LimitVal(maxInboundConns) + } + + log.Printf(` + +go-libp2p Resource Manager limits based on: + - --max-memory: %s + - --max-fd: %d + +`, humanize.Bytes(maxMemory), maxFD) + + // We already have a complete value thus pass in an empty ConcreteLimitConfig. + return partialLimits.Build(rcmgr.ConcreteLimitConfig{}) +} diff --git a/server.go b/server.go index 41e985f..052d957 100644 --- a/server.go +++ b/server.go @@ -6,6 +6,7 @@ import ( "log" "net" "net/http" + "time" "github.com/CAFxX/httpcompression" "github.com/felixge/httpsnoop" @@ -16,7 +17,7 @@ import ( dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/routing" - rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" + "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/cors" metrics "github.com/slok/go-http-metrics/metrics/prometheus" @@ -33,14 +34,29 @@ func withRequestLogger(next http.Handler) http.Handler { }) } -func start(ctx context.Context, listenAddress string, runAcceleratedDHTClient bool, contentEndpoints, peerEndpoints, ipnsEndpoints []string) error { - h, err := newHost(runAcceleratedDHTClient) +type config struct { + listenAddress string + acceleratedDHTClient bool + + contentEndpoints []string + peerEndpoints []string + ipnsEndpoints []string + + connMgrLow int + connMgrHi int + connMgrGrace time.Duration + maxMemory uint64 + maxFD int +} + +func start(ctx context.Context, cfg *config) error { + h, err := newHost(cfg) if err != nil { return err } var dhtRouting routing.Routing - if runAcceleratedDHTClient { + if cfg.acceleratedDHTClient { wrappedDHT, err := newBundledDHT(ctx, h) if err != nil { return err @@ -54,28 +70,28 @@ func start(ctx context.Context, listenAddress string, runAcceleratedDHTClient bo dhtRouting = standardDHT } - crRouters, err := getCombinedRouting(contentEndpoints, dhtRouting) + crRouters, err := getCombinedRouting(cfg.contentEndpoints, dhtRouting) if err != nil { return err } - prRouters, err := getCombinedRouting(peerEndpoints, dhtRouting) + prRouters, err := getCombinedRouting(cfg.peerEndpoints, dhtRouting) if err != nil { return err } - ipnsRouters, err := getCombinedRouting(ipnsEndpoints, dhtRouting) + ipnsRouters, err := getCombinedRouting(cfg.ipnsEndpoints, dhtRouting) if err != nil { return err } - _, port, err := net.SplitHostPort(listenAddress) + _, port, err := net.SplitHostPort(cfg.listenAddress) if err != nil { return err } log.Printf("Starting %s %s\n", name, version) - log.Printf("Listening on %s", listenAddress) + log.Printf("Listening on %s", cfg.listenAddress) log.Printf("Delegated Routing API on http://127.0.0.1:%s/routing/v1", port) mdlw := middleware.New(middleware.Config{ @@ -115,37 +131,25 @@ func start(ctx context.Context, listenAddress string, runAcceleratedDHTClient bo }) http.Handle("/", handler) - server := &http.Server{Addr: listenAddress, Handler: nil} + server := &http.Server{Addr: cfg.listenAddress, Handler: nil} return server.ListenAndServe() } -func newHost(highOutboundLimits bool) (host.Host, error) { - if !highOutboundLimits { - return libp2p.New() - } - - defaultLimits := rcmgr.DefaultLimits - libp2p.SetDefaultServiceLimits(&defaultLimits) - // Outbound conns and FDs are set very high to allow for the accelerated DHT client to (re)load its routing table. - // Currently it doesn't gracefully handle RM throttling--once it does we can lower these. - // High outbound conn limits are considered less of a DoS risk than high inbound conn limits. - // Also note that, due to the behavior of the accelerated DHT client, we don't need many streams, just conns. - if minOutbound := 65536; defaultLimits.SystemBaseLimit.ConnsOutbound < minOutbound { - defaultLimits.SystemBaseLimit.ConnsOutbound = minOutbound - if defaultLimits.SystemBaseLimit.Conns < defaultLimits.SystemBaseLimit.ConnsOutbound { - defaultLimits.SystemBaseLimit.Conns = defaultLimits.SystemBaseLimit.ConnsOutbound - } - } - if minFD := 4096; defaultLimits.SystemBaseLimit.FD < minFD { - defaultLimits.SystemBaseLimit.FD = minFD +func newHost(cfg *config) (host.Host, error) { + cmgr, err := connmgr.NewConnManager(cfg.connMgrLow, cfg.connMgrHi, connmgr.WithGracePeriod(cfg.connMgrGrace)) + if err != nil { + return nil, err } - defaultLimitConfig := defaultLimits.AutoScale() - rm, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(defaultLimitConfig)) + rcmgr, err := makeResourceMgrs(cfg.maxMemory, cfg.maxFD, cfg.connMgrHi) if err != nil { return nil, err } - h, err := libp2p.New(libp2p.ResourceManager(rm)) + + h, err := libp2p.New( + libp2p.ConnectionManager(cmgr), + libp2p.ResourceManager(rcmgr), + ) if err != nil { return nil, err }