Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions pkg/receive/expandedpostingscache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type BlocksPostingsForMatchersCache struct {
postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
timeNow func() time.Time

metrics *ExpandedPostingsCacheMetrics
metrics ExpandedPostingsCacheMetrics
}

var (
Expand All @@ -66,8 +66,8 @@ type ExpandedPostingsCacheMetrics struct {
NonCacheableQueries *prometheus.CounterVec
}

func NewPostingCacheMetrics(r prometheus.Registerer) *ExpandedPostingsCacheMetrics {
return &ExpandedPostingsCacheMetrics{
func NewPostingCacheMetrics(r prometheus.Registerer) ExpandedPostingsCacheMetrics {
return ExpandedPostingsCacheMetrics{
CacheRequests: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "expanded_postings_cache_requests_total",
Help: "Total number of requests to the cache.",
Expand All @@ -87,11 +87,15 @@ func NewPostingCacheMetrics(r prometheus.Registerer) *ExpandedPostingsCacheMetri
}
}

func NewBlocksPostingsForMatchersCache(metrics *ExpandedPostingsCacheMetrics, headExpandedPostingsCacheSize uint64, blockExpandedPostingsCacheSize uint64) ExpandedPostingsCache {
func NewBlocksPostingsForMatchersCache(metrics ExpandedPostingsCacheMetrics, headExpandedPostingsCacheSize uint64, blockExpandedPostingsCacheSize uint64, seedSize int64) *BlocksPostingsForMatchersCache {
if seedSize <= 0 {
seedSize = seedArraySize
}

return &BlocksPostingsForMatchersCache{
headCache: newFifoCache[[]storage.SeriesRef]("head", metrics, time.Now, headExpandedPostingsCacheSize),
blocksCache: newFifoCache[[]storage.SeriesRef]("block", metrics, time.Now, blockExpandedPostingsCacheSize),
headSeedByMetricName: make([]int, seedArraySize),
headSeedByMetricName: make([]int, seedSize),
strippedLock: make([]sync.RWMutex, numOfSeedsStripes),
postingsForMatchersFunc: tsdb.PostingsForMatchers,
timeNow: time.Now,
Expand Down Expand Up @@ -129,7 +133,7 @@ func (c *BlocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) {

h := MemHashString(metricName)
i := h % uint64(len(c.headSeedByMetricName))
l := h % uint64(len(c.strippedLock))
l := i % uint64(len(c.strippedLock))
c.strippedLock[l].Lock()
defer c.strippedLock[l].Unlock()
c.headSeedByMetricName[i]++
Expand Down Expand Up @@ -200,7 +204,7 @@ func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.
func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string {
h := MemHashString(metricName)
i := h % uint64(len(c.headSeedByMetricName))
l := h % uint64(len(c.strippedLock))
l := i % uint64(len(c.strippedLock))
c.strippedLock[l].RLock()
defer c.strippedLock[l].RUnlock()
return strconv.Itoa(c.headSeedByMetricName[i])
Expand Down Expand Up @@ -276,13 +280,13 @@ type fifoCache[V any] struct {
cachedBytes int64
}

func newFifoCache[V any](name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time, maxBytes uint64) *fifoCache[V] {
func newFifoCache[V any](name string, metrics ExpandedPostingsCacheMetrics, timeNow func() time.Time, maxBytes uint64) *fifoCache[V] {
return &fifoCache[V]{
cachedValues: new(sync.Map),
cached: list.New(),
timeNow: timeNow,
name: name,
metrics: *metrics,
metrics: metrics,
ttl: 10 * time.Minute,
maxBytes: int64(maxBytes),
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/receive/expandedpostingscache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"time"

"go.uber.org/atomic"
"golang.org/x/exp/rand"

"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -166,3 +168,43 @@ func repeatStringIfNeeded(seed string, length int) string {

return strings.Repeat(seed, 1+length/len(seed))[:max(length, len(seed))]
}

func TestLockRaceExpireSeries(t *testing.T) {
for j := 0; j < 10; j++ {
wg := &sync.WaitGroup{}

c := NewBlocksPostingsForMatchersCache(ExpandedPostingsCacheMetrics{}, 1<<7, 1<<7, 3)
for i := 0; i < 1000; i++ {
wg.Add(2)

go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
c.ExpireSeries(
labels.FromMap(map[string]string{"__name__": randSeq(10)}),
)
}
}()

go func() {
defer wg.Done()

for i := 0; i < 10; i++ {
c.getSeedForMetricName(randSeq(10))
}
}()
}
wg.Wait()
}
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randSeq(n int) string {
b := make([]rune, n)
rand.Seed(uint64(time.Now().UnixNano()))
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
2 changes: 1 addition & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
if t.headExpandedPostingsCacheSize > 0 || t.blockExpandedPostingsCacheSize > 0 {
var expandedPostingsCacheMetrics = expandedpostingscache.NewPostingCacheMetrics(extprom.WrapRegistererWithPrefix("thanos_", reg))

expandedPostingsCache = expandedpostingscache.NewBlocksPostingsForMatchersCache(expandedPostingsCacheMetrics, t.headExpandedPostingsCacheSize, t.blockExpandedPostingsCacheSize)
expandedPostingsCache = expandedpostingscache.NewBlocksPostingsForMatchersCache(expandedPostingsCacheMetrics, t.headExpandedPostingsCacheSize, t.blockExpandedPostingsCacheSize, 0)
}

opts := *t.tsdbOpts
Expand Down
Loading