Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* `cortex_ruler_client_request_duration_seconds`
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently #3959

## 1.8.0 in progress

Expand Down
13 changes: 10 additions & 3 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -1894,9 +1894,16 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes
// This will prevent going back to "active" state in deferred statement.
userDB.casState(closing, closed)

i.userStatesMtx.Lock()
delete(i.TSDBState.dbs, userID)
i.userStatesMtx.Unlock()
// Only remove user from TSDBState when everything is cleaned up
// This will prevent concurrency problems when cortex are trying to open new TSDB - Ie: New request for a given tenant
// came in - while closing the tsdb for the same tenant.
// If this happens now, the request will get reject as the push will not be able to acquire the lock as the tsdb will be
// in closed state
defer func() {
i.userStatesMtx.Lock()
delete(i.TSDBState.dbs, userID)
i.userStatesMtx.Unlock()
}()

i.metrics.memUsers.Dec()
i.TSDBState.tsdbMetrics.removeRegistryForUser(userID)
Expand Down
51 changes: 51 additions & 0 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2051,6 +2051,57 @@ func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInP
assert.Equal(t, tsdbNotActive, i.closeAndDeleteUserTSDBIfIdle(userID))
}

func TestIngester_closingAndOpeningTsdbConcurrently(t *testing.T) {
ctx := context.Background()
cfg := defaultIngesterTestConfig()
cfg.BlocksStorageConfig.TSDB.CloseIdleTSDBTimeout = 0 // Will not run the loop, but will allow us to close any TSDB fast.

// Create ingester
i, err := prepareIngesterWithBlocksStorage(t, cfg, nil)
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(ctx, i))
defer services.StopAndAwaitTerminated(ctx, i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

_, err = i.getOrCreateTSDB(userID, false)
require.NoError(t, err)

iterations := 5000
chanErr := make(chan error, 1)
quit := make(chan bool)

go func() {
for {
select {
case <-quit:
return
default:
_, err = i.getOrCreateTSDB(userID, false)
if err != nil {
chanErr <- err
}
}
}
}()

for k := 0; k < iterations; k++ {
i.closeAndDeleteUserTSDBIfIdle(userID)
}

select {
case err := <-chanErr:
assert.Fail(t, err.Error())
quit <- true
default:
quit <- true
}
}

func TestIngester_idleCloseEmptyTSDB(t *testing.T) {
ctx := context.Background()
cfg := defaultIngesterTestConfig()
Expand Down