From 732dab602e618bd0b84c479e21d3439fe57f0b1e Mon Sep 17 00:00:00 2001 From: Daniel Deluiggi Date: Thu, 27 Feb 2025 16:35:21 -0800 Subject: [PATCH 1/2] Add a ingester readLock Signed-off-by: Daniel Deluiggi --- pkg/ingester/ingester.go | 52 +++++++++++++++++++++++++++++++- pkg/ingester/ingester_test.go | 57 +++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index f3960813ca3..78097ee945f 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -305,6 +305,7 @@ type userTSDB struct { stateMtx sync.RWMutex state tsdbState pushesInFlight sync.WaitGroup // Increased with stateMtx read lock held, only if state == active or activeShipping. + readInFlight sync.WaitGroup // Increased with stateMtx read lock held, only if state == active, activeShipping or forceCompacting. // Used to detect idle TSDBs. lastUpdate atomic.Int64 @@ -1508,6 +1509,29 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte return &cortexpb.WriteResponse{}, nil } +func (u *userTSDB) acquireReadLock() error { + u.stateMtx.RLock() + defer u.stateMtx.RUnlock() + + switch u.state { + case active: + case activeShipping: + case forceCompacting: + // Read are allowed. + case closing: + return errors.New("TSDB is closing") + default: + return errors.New("TSDB is not active") + } + + u.readInFlight.Add(1) + return nil +} + +func (u *userTSDB) releaseReadLock() { + u.readInFlight.Done() +} + func (u *userTSDB) acquireAppendLock() error { u.stateMtx.RLock() defer u.stateMtx.RUnlock() @@ -1555,6 +1579,11 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery return &client.ExemplarQueryResponse{}, nil } + if err := db.acquireReadLock(); err != nil { + return &client.ExemplarQueryResponse{}, nil + } + defer db.releaseReadLock() + q, err := db.ExemplarQuerier(ctx) if err != nil { return nil, err @@ -1648,6 +1677,11 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu return &client.LabelValuesResponse{}, cleanup, nil } + if err := db.acquireReadLock(); err != nil { + return &client.LabelValuesResponse{}, cleanup, nil + } + defer db.releaseReadLock() + mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryIngestersWithin) if err != nil { return nil, cleanup, err @@ -1738,6 +1772,11 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR return &client.LabelNamesResponse{}, cleanup, nil } + if err := db.acquireReadLock(); err != nil { + return &client.LabelNamesResponse{}, cleanup, nil + } + defer db.releaseReadLock() + mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryIngestersWithin) if err != nil { return nil, cleanup, err @@ -1836,6 +1875,11 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien return cleanup, nil } + if err := db.acquireReadLock(); err != nil { + return cleanup, nil + } + defer db.releaseReadLock() + // Parse the request _, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(i.matchersCache, req) if err != nil { @@ -2070,6 +2114,11 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return nil } + if err := db.acquireReadLock(); err != nil { + return nil + } + defer db.releaseReadLock() + numSamples := 0 numSeries := 0 totalDataBytes := 0 @@ -2857,8 +2906,9 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes // If TSDB is fully closed, we will set state to 'closed', which will prevent this deferred closing -> active transition. defer userDB.casState(closing, active) - // Make sure we don't ignore any possible inflight pushes. + // Make sure we don't ignore any possible inflight requests. userDB.pushesInFlight.Wait() + userDB.readInFlight.Wait() // Verify again, things may have changed during the checks and pushes. tenantDeleted := false diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 758c3009781..f42d8a25d7d 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -4327,6 +4327,63 @@ func TestIngester_idleCloseEmptyTSDB(t *testing.T) { require.NotNil(t, db) } +func TestIngester_ReadNotFailWhenTSDBIsBeingDeleted(t *testing.T) { + + tc := map[string]struct { + state tsdbState + }{ + "closingTsdb": {state: closing}, + "closedTsdb": {state: closed}, + } + for name, c := range tc { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + cfg := defaultIngesterTestConfig(t) + cfg.BlocksStorageConfig.TSDB.CloseIdleTSDBTimeout = 0 // Will not run the loop, but will allow us to close any TSDB fast. + cfg.BlocksStorageConfig.TSDB.KeepUserTSDBOpenOnShutdown = true + + // Create ingester + i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry()) + require.NoError(t, err) + + require.NoError(t, services.StartAndAwaitRunning(ctx, i)) + defer services.StopAndAwaitTerminated(ctx, i) //nolint:errcheck + + // Wait until it's ACTIVE + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + pushSingleSampleAtTime(t, i, 1*time.Minute.Milliseconds()) + + db, err := i.getOrCreateTSDB(userID, true) + require.NoError(t, err) + require.NotNil(t, db) + + err = db.Close() + require.NoError(t, err) + + b := db.casState(active, c.state) + require.True(t, b) + + // Mock request + ctx = user.InjectOrgID(context.Background(), userID) + + err = i.QueryStream(&client.QueryRequest{EndTimestampMs: 10 * time.Minute.Milliseconds()}, &mockQueryStreamServer{ctx: ctx}) + require.NoError(t, err) + + _, err = i.LabelNames(ctx, &client.LabelNamesRequest{Limit: int64(1)}) + require.NoError(t, err) + + _, err = i.LabelValues(ctx, &client.LabelValuesRequest{Limit: int64(1)}) + require.NoError(t, err) + + _, err = i.MetricsForLabelMatchers(ctx, &client.MetricsForLabelMatchersRequest{Limit: int64(1)}) + require.NoError(t, err) + }) + } +} + type shipperMock struct { mock.Mock } From 2515d7e8b8416e322bf11073391b606c662d36a3 Mon Sep 17 00:00:00 2001 From: Daniel Deluiggi Date: Thu, 27 Feb 2025 16:39:19 -0800 Subject: [PATCH 2/2] changelog Signed-off-by: Daniel Deluiggi --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 522365a8669..b47193c5643 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * [BUGFIX] Querier: Fix marshal native histogram with empty bucket when protobuf codec is enabled. #6595 * [BUGFIX] Query Frontend: Fix samples scanned and peak samples query stats when query hits results cache. #6591 * [BUGFIX] Query Frontend: Fix panic caused by nil pointer dereference. #6609 +* [BUGFIX] Ingester: Add check to avoid query 5xx when closing tsdb. #6616 ## 1.19.0 in progress