Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
80b2d5c
Add resource based throttling to ingesters and store gateways
justinjung04 Mar 25, 2025
2121845
doc
justinjung04 Mar 25, 2025
2b168fc
Add automaxprocs
justinjung04 Mar 25, 2025
56f8e57
nit
justinjung04 Mar 25, 2025
9efbbd9
Add test for monitor
justinjung04 Mar 26, 2025
30bbd3d
fix tests
justinjung04 Mar 26, 2025
fa56e65
changelog
justinjung04 Mar 26, 2025
a2ffcdd
Merge branch 'master' into resource-based-throttling
justinjung04 Mar 26, 2025
5cccd60
fix test
justinjung04 Mar 26, 2025
6e37330
remove interface
justinjung04 Mar 26, 2025
08a6adf
address comments
justinjung04 Mar 31, 2025
067478b
rename doc
justinjung04 Mar 31, 2025
18fdf37
Make monitor more generic + separate scanners
justinjung04 Apr 10, 2025
aa81155
fix tests
justinjung04 Apr 10, 2025
a528a7a
fix more tests
justinjung04 Apr 10, 2025
42e52b3
remove monitor_test.go
justinjung04 Apr 10, 2025
50993e1
move noop scanner to darwin scanner
justinjung04 Apr 10, 2025
e56431e
doc update
justinjung04 Apr 10, 2025
eae4df7
doc
justinjung04 Apr 10, 2025
fd19f5c
lint
justinjung04 Apr 10, 2025
f588d94
add debugging log on unsupported resource type
justinjung04 Apr 10, 2025
6138a9d
test
justinjung04 Apr 10, 2025
7bd7ab9
add more error handling + resource_based_limiter_limit metric
justinjung04 Apr 10, 2025
6da53e9
fix test
justinjung04 Apr 10, 2025
a8d4218
fix test
justinjung04 Apr 10, 2025
d6d3839
update changelog
justinjung04 Apr 10, 2025
c68bbd2
Move noopScanner to scanner.go and fix RegisterFlagsWithPrefix
justinjung04 Apr 15, 2025
025a93a
Add limit breached metric + wrap error with 429
justinjung04 Apr 16, 2025
6ffef63
Add more validation and test on instance_limits
justinjung04 Apr 16, 2025
7808940
Added _total to counter metric
justinjung04 Apr 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/configs/instance_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,18 @@ func (cfg *InstanceLimits) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix strin
}

func (cfg *InstanceLimits) Validate(monitoredResources flagext.StringSliceCSV) error {
if cfg.CPUUtilization > 1 || cfg.CPUUtilization < 0 {
return errors.New("cpu_utilization must be between 0 and 1")
}

if cfg.CPUUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.CPU)) {
return errors.New("monitored_resources config must include \"cpu\" as well")
}

if cfg.HeapUtilization > 1 || cfg.HeapUtilization < 0 {
return errors.New("heap_utilization must be between 0 and 1")
}

if cfg.HeapUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.Heap)) {
return errors.New("monitored_resources config must include \"heap\" as well")
}
Expand Down
64 changes: 64 additions & 0 deletions pkg/configs/instance_limits_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package configs

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
)

func Test_Validate(t *testing.T) {
for name, tc := range map[string]struct {
instanceLimits InstanceLimits
monitoredResources []string
err error
}{
"correct config should pass validation": {
instanceLimits: InstanceLimits{
CPUUtilization: 0.5,
HeapUtilization: 0.5,
},
monitoredResources: []string{"cpu", "heap"},
err: nil,
},
"utilization config less than 0 should fail validation": {
instanceLimits: InstanceLimits{
CPUUtilization: -0.5,
HeapUtilization: 0.5,
},
monitoredResources: []string{"cpu", "heap"},
err: errors.New("cpu_utilization must be between 0 and 1"),
},
"utilization config greater than 1 should fail validation": {
instanceLimits: InstanceLimits{
CPUUtilization: 0.5,
HeapUtilization: 1.5,
},
monitoredResources: []string{"cpu", "heap"},
err: errors.New("heap_utilization must be between 0 and 1"),
},
"missing cpu in monitored_resources config should fail validation": {
instanceLimits: InstanceLimits{
CPUUtilization: 0.5,
},
monitoredResources: []string{"heap"},
err: errors.New("monitored_resources config must include \"cpu\" as well"),
},
"missing heap in monitored_resources config should fail validation": {
instanceLimits: InstanceLimits{
HeapUtilization: 0.5,
},
monitoredResources: []string{"cpu"},
err: errors.New("monitored_resources config must include \"heap\" as well"),
},
} {
t.Run(name, func(t *testing.T) {
err := tc.instanceLimits.Validate(tc.monitoredResources)
if tc.err != nil {
require.Errorf(t, err, tc.err.Error())
} else {
require.NoError(t, err)
}
})
}
}
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
if cfg.DefaultLimits.HeapUtilization > 0 {
resourceLimits[resource.Heap] = cfg.DefaultLimits.HeapUtilization
}
i.resourceBasedLimiter, err = limiter.NewResourceBasedLimiter(resourceMonitor, resourceLimits, registerer)
i.resourceBasedLimiter, err = limiter.NewResourceBasedLimiter(resourceMonitor, resourceLimits, registerer, "ingester")
if err != nil {
return nil, errors.Wrap(err, "error creating resource based limiter")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3078,7 +3078,7 @@ func Test_Ingester_Query_ResourceThresholdBreached(t *testing.T) {
resource.CPU: 0.5,
resource.Heap: 0.5,
}
i.resourceBasedLimiter, err = limiter.NewResourceBasedLimiter(&mockResourceMonitor{cpu: 0.4, heap: 0.6}, limits, nil)
i.resourceBasedLimiter, err = limiter.NewResourceBasedLimiter(&mockResourceMonitor{cpu: 0.4, heap: 0.6}, limits, nil, "ingester")
require.NoError(t, err)

// Wait until it's ACTIVE
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf
if gatewayCfg.InstanceLimits.HeapUtilization > 0 {
resourceLimits[resource.Heap] = gatewayCfg.InstanceLimits.HeapUtilization
}
g.resourceBasedLimiter, err = util_limiter.NewResourceBasedLimiter(resourceMonitor, resourceLimits, reg)
g.resourceBasedLimiter, err = util_limiter.NewResourceBasedLimiter(resourceMonitor, resourceLimits, reg, "store-gateway")
if err != nil {
return nil, errors.Wrap(err, "error creating resource based limiter")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ func TestStoreGateway_SeriesThrottledByResourceMonitor(t *testing.T) {
resource.CPU: 0.5,
resource.Heap: 0.5,
}
g.resourceBasedLimiter, err = util_limiter.NewResourceBasedLimiter(&mockResourceMonitor{cpu: 0.4, heap: 0.6}, limits, nil)
g.resourceBasedLimiter, err = util_limiter.NewResourceBasedLimiter(&mockResourceMonitor{cpu: 0.4, heap: 0.6}, limits, nil, "store-gateway")
require.NoError(t, err)

srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID))
Expand Down
10 changes: 6 additions & 4 deletions pkg/util/limiter/resource_based_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ type ResourceBasedLimiter struct {
limitBreachedCount *prometheus.CounterVec
}

func NewResourceBasedLimiter(resourceMonitor resource.IMonitor, limits map[resource.Type]float64, registerer prometheus.Registerer) (*ResourceBasedLimiter, error) {
func NewResourceBasedLimiter(resourceMonitor resource.IMonitor, limits map[resource.Type]float64, registerer prometheus.Registerer, component string) (*ResourceBasedLimiter, error) {
for resType, limit := range limits {
switch resType {
case resource.CPU, resource.Heap:
promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Name: "cortex_resource_based_limiter_limit",
ConstLabels: map[string]string{"resource": string(resType)},
Help: "Limit set for the resource utilization.",
ConstLabels: map[string]string{"component": component},
}).Set(limit)
default:
return nil, fmt.Errorf("unsupported resource type: [%s]", resType)
Expand All @@ -41,8 +42,9 @@ func NewResourceBasedLimiter(resourceMonitor resource.IMonitor, limits map[resou
limits: limits,
limitBreachedCount: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Name: "cortex_resource_based_limiter_limit_breached",
Help: "The total number of times resource based limiter was throttled.",
Name: "cortex_resource_based_limiter_limit_breached",
Help: "The total number of times resource based limiter was throttled.",
ConstLabels: map[string]string{"component": component},
},
[]string{"resource"},
),
Expand Down
Loading