Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* [BUGFIX] Querier: Fix marshal native histogram with empty bucket when protobuf codec is enabled. #6595
* [BUGFIX] Query Frontend: Fix samples scanned and peak samples query stats when query hits results cache. #6591
* [BUGFIX] Query Frontend: Fix panic caused by nil pointer dereference. #6609
* [BUGFIX] Ingester: Add check to avoid query 5xx when closing tsdb. #6616

## 1.19.0 in progress

Expand Down
52 changes: 51 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ type userTSDB struct {
stateMtx sync.RWMutex
state tsdbState
pushesInFlight sync.WaitGroup // Increased with stateMtx read lock held, only if state == active or activeShipping.
readInFlight sync.WaitGroup // Increased with stateMtx read lock held, only if state == active, activeShipping or forceCompacting.

// Used to detect idle TSDBs.
lastUpdate atomic.Int64
Expand Down Expand Up @@ -1508,6 +1509,29 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
return &cortexpb.WriteResponse{}, nil
}

func (u *userTSDB) acquireReadLock() error {
u.stateMtx.RLock()
defer u.stateMtx.RUnlock()

switch u.state {
case active:
case activeShipping:
case forceCompacting:
// Read are allowed.
case closing:
return errors.New("TSDB is closing")
default:
return errors.New("TSDB is not active")
}

u.readInFlight.Add(1)
return nil
}

func (u *userTSDB) releaseReadLock() {
u.readInFlight.Done()
}

func (u *userTSDB) acquireAppendLock() error {
u.stateMtx.RLock()
defer u.stateMtx.RUnlock()
Expand Down Expand Up @@ -1555,6 +1579,11 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
return &client.ExemplarQueryResponse{}, nil
}

if err := db.acquireReadLock(); err != nil {
return &client.ExemplarQueryResponse{}, nil
}
defer db.releaseReadLock()

q, err := db.ExemplarQuerier(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1648,6 +1677,11 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
return &client.LabelValuesResponse{}, cleanup, nil
}

if err := db.acquireReadLock(); err != nil {
return &client.LabelValuesResponse{}, cleanup, nil
}
defer db.releaseReadLock()

mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryIngestersWithin)
if err != nil {
return nil, cleanup, err
Expand Down Expand Up @@ -1738,6 +1772,11 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
return &client.LabelNamesResponse{}, cleanup, nil
}

if err := db.acquireReadLock(); err != nil {
return &client.LabelNamesResponse{}, cleanup, nil
}
defer db.releaseReadLock()

mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryIngestersWithin)
if err != nil {
return nil, cleanup, err
Expand Down Expand Up @@ -1836,6 +1875,11 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
return cleanup, nil
}

if err := db.acquireReadLock(); err != nil {
return cleanup, nil
}
defer db.releaseReadLock()

// Parse the request
_, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(i.matchersCache, req)
if err != nil {
Expand Down Expand Up @@ -2070,6 +2114,11 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return nil
}

if err := db.acquireReadLock(); err != nil {
return nil
}
defer db.releaseReadLock()

numSamples := 0
numSeries := 0
totalDataBytes := 0
Expand Down Expand Up @@ -2857,8 +2906,9 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes
// If TSDB is fully closed, we will set state to 'closed', which will prevent this deferred closing -> active transition.
defer userDB.casState(closing, active)

// Make sure we don't ignore any possible inflight pushes.
// Make sure we don't ignore any possible inflight requests.
userDB.pushesInFlight.Wait()
userDB.readInFlight.Wait()

// Verify again, things may have changed during the checks and pushes.
tenantDeleted := false
Expand Down
57 changes: 57 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4327,6 +4327,63 @@ func TestIngester_idleCloseEmptyTSDB(t *testing.T) {
require.NotNil(t, db)
}

func TestIngester_ReadNotFailWhenTSDBIsBeingDeleted(t *testing.T) {

tc := map[string]struct {
state tsdbState
}{
"closingTsdb": {state: closing},
"closedTsdb": {state: closed},
}
for name, c := range tc {
t.Run(name, func(t *testing.T) {
ctx := context.Background()
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.CloseIdleTSDBTimeout = 0 // Will not run the loop, but will allow us to close any TSDB fast.
cfg.BlocksStorageConfig.TSDB.KeepUserTSDBOpenOnShutdown = true

// Create ingester
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(ctx, i))
defer services.StopAndAwaitTerminated(ctx, i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

pushSingleSampleAtTime(t, i, 1*time.Minute.Milliseconds())

db, err := i.getOrCreateTSDB(userID, true)
require.NoError(t, err)
require.NotNil(t, db)

err = db.Close()
require.NoError(t, err)

b := db.casState(active, c.state)
require.True(t, b)

// Mock request
ctx = user.InjectOrgID(context.Background(), userID)

err = i.QueryStream(&client.QueryRequest{EndTimestampMs: 10 * time.Minute.Milliseconds()}, &mockQueryStreamServer{ctx: ctx})
require.NoError(t, err)

_, err = i.LabelNames(ctx, &client.LabelNamesRequest{Limit: int64(1)})
require.NoError(t, err)

_, err = i.LabelValues(ctx, &client.LabelValuesRequest{Limit: int64(1)})
require.NoError(t, err)

_, err = i.MetricsForLabelMatchers(ctx, &client.MetricsForLabelMatchersRequest{Limit: int64(1)})
require.NoError(t, err)
})
}
}

type shipperMock struct {
mock.Mock
}
Expand Down
Loading