From 85bd97fd0b1d6b1955981b1c4b421d553b036cce Mon Sep 17 00:00:00 2001 From: "Xiaochao Dong (@damnever)" Date: Wed, 17 Jul 2024 16:13:01 +0800 Subject: [PATCH] Reduce distributor memory usage when error volume is high Signed-off-by: Xiaochao Dong (@damnever) --- CHANGELOG.md | 1 + pkg/ring/batch.go | 15 ++++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ef662a6269e..f5c8a8a9c21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ * [ENHANCEMENT] Ingester: Add link to renew 10% of the ingesters tokens in the admin page. #6063 * [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040 * [ENHANCEMENT] Ruler: Add support for filtering by `match` field on Rules API. #6083 +* [ENHANCEMENT] Distributor: Reduce memory usage when error volume is high. #6095 * [BUGFIX] Configsdb: Fix endline issue in db password. #5920 * [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952 * [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018 diff --git a/pkg/ring/batch.go b/pkg/ring/batch.go index 95914cda366..7f063c20b03 100644 --- a/pkg/ring/batch.go +++ b/pkg/ring/batch.go @@ -152,8 +152,7 @@ func (b *batchTracker) record(instance instance, err error) { if err != nil { // Track the number of errors by error family, and if it exceeds maxFailures // shortcut the waiting rpc. - wrappedErr := httpgrpcutil.WrapHTTPGrpcError(err, "addr=%s state=%s zone=%s", instance.desc.Addr, instance.desc.State, instance.desc.Zone) - errCount := instance.itemTrackers[i].recordError(wrappedErr) + errCount := instance.itemTrackers[i].recordError(err) // We should return an error if we reach the maxFailure (quorum) on a given error family OR // we dont have any remaining ingesters to try // Ex: 2xx, 4xx, 5xx -> return 4xx @@ -162,13 +161,17 @@ func (b *batchTracker) record(instance instance, err error) { // Ex: 5xx, _, 5xx -> return 5xx if errCount > int32(sampleTrackers[i].maxFailures) { if b.rpcsFailed.Inc() == 1 { - b.err <- httpgrpcutil.WrapHTTPGrpcError(sampleTrackers[i].getError(), "maxFailure (quorum) on a given error family") + b.err <- httpgrpcutil.WrapHTTPGrpcError( + sampleTrackers[i].getError(), "maxFailure (quorum) on a given error family, addr=%s state=%s zone=%s", + instance.desc.Addr, instance.desc.State, instance.desc.Zone) } continue } if sampleTrackers[i].remaining.Dec() == 0 { if b.rpcsFailed.Inc() == 1 { - b.err <- httpgrpcutil.WrapHTTPGrpcError(sampleTrackers[i].getError(), "not enough remaining instances to try") + b.err <- httpgrpcutil.WrapHTTPGrpcError( + sampleTrackers[i].getError(), "not enough remaining instances to try, addr=%s state=%s zone=%s", + instance.desc.Addr, instance.desc.State, instance.desc.Zone) } continue } @@ -187,7 +190,9 @@ func (b *batchTracker) record(instance instance, err error) { // Ex: 4xx, 5xx, 2xx if sampleTrackers[i].remaining.Dec() == 0 { if b.rpcsFailed.Inc() == 1 { - b.err <- httpgrpcutil.WrapHTTPGrpcError(sampleTrackers[i].getError(), "not enough remaining instances to try") + b.err <- httpgrpcutil.WrapHTTPGrpcError( + sampleTrackers[i].getError(), "not enough remaining instances to try, addr=%s state=%s zone=%s", + instance.desc.Addr, instance.desc.State, instance.desc.Zone) } } }