Skip to content

Commit

Permalink
Pass affected labels to MemPostings.Delete() (prometheus#14307)
Browse files Browse the repository at this point in the history
* Pass affected labels to MemPostings.Delete

As suggested by @bboreham, we can track the labels of the deleted series
and avoid iterating through all the label/value combinations.

This looks much faster on the MemPostings.Delete call. We don't have a
benchmark on stripeSeries.gc() where we'll pay the price of iterating
the labels of each one of the deleted series.

Signed-off-by: Oleg Zaytsev <[email protected]>
  • Loading branch information
colega authored Jun 18, 2024
1 parent 4f78cc8 commit fd1a89b
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 112 deletions.
10 changes: 6 additions & 4 deletions tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {

// Drop old chunks and remember series IDs and hashes if they can be
// deleted entirely.
deleted, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
seriesRemoved := len(deleted)

h.metrics.seriesRemoved.Add(float64(seriesRemoved))
Expand All @@ -1561,7 +1561,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
h.numSeries.Sub(uint64(seriesRemoved))

// Remove deleted series IDs from the postings lists.
h.postings.Delete(deleted)
h.postings.Delete(deleted, affected)

// Remove tombstones referring to the deleted series.
h.tombstones.DeleteTombstones(deleted)
Expand Down Expand Up @@ -1869,9 +1869,10 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
// but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct
// and there's no easy way to cast maps.
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ int, _, _ int64, minMmapFile int) {
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) {
var (
deleted = map[storage.SeriesRef]struct{}{}
affected = map[labels.Label]struct{}{}
rmChunks = 0
actualMint int64 = math.MaxInt64
minOOOTime int64 = math.MaxInt64
Expand Down Expand Up @@ -1927,6 +1928,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
}

deleted[storage.SeriesRef(series.ref)] = struct{}{}
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
s.hashes[hashShard].del(hash, series.ref)
delete(s.series[refShard], series.ref)
deletedForCallback[series.ref] = series.lset
Expand All @@ -1938,7 +1940,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
actualMint = mint
}

return deleted, rmChunks, actualMint, minOOOTime, minMmapFile
return deleted, affected, rmChunks, actualMint, minOOOTime, minMmapFile
}

// The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each.
Expand Down
74 changes: 74 additions & 0 deletions tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,80 @@ func TestHead_UnknownWALRecord(t *testing.T) {
require.NoError(t, head.Close())
}

// BenchmarkHead_Truncate is quite heavy, so consider running it with
// -benchtime=10x or similar to get more stable and comparable results.
func BenchmarkHead_Truncate(b *testing.B) {
const total = 1e6

prepare := func(b *testing.B, churn int) *Head {
h, _ := newTestHead(b, 1000, wlog.CompressionNone, false)
b.Cleanup(func() {
require.NoError(b, h.Close())
})

h.initTime(0)

internedItoa := map[int]string{}
var mtx sync.RWMutex
itoa := func(i int) string {
mtx.RLock()
s, ok := internedItoa[i]
mtx.RUnlock()
if ok {
return s
}
mtx.Lock()
s = strconv.Itoa(i)
internedItoa[i] = s
mtx.Unlock()
return s
}

allSeries := [total]labels.Labels{}
nameValues := make([]string, 0, 100)
for i := 0; i < total; i++ {
nameValues = nameValues[:0]

// A thousand labels like lbl_x_of_1000, each with total/1000 values
thousand := "lbl_" + itoa(i%1000) + "_of_1000"
nameValues = append(nameValues, thousand, itoa(i/1000))
// A hundred labels like lbl_x_of_100, each with total/100 values.
hundred := "lbl_" + itoa(i%100) + "_of_100"
nameValues = append(nameValues, hundred, itoa(i/100))

if i%13 == 0 {
ten := "lbl_" + itoa(i%10) + "_of_10"
nameValues = append(nameValues, ten, itoa(i%10))
}

allSeries[i] = labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...)
s, _, _ := h.getOrCreate(allSeries[i].Hash(), allSeries[i])
s.mmappedChunks = []*mmappedChunk{
{minTime: 1000 * int64(i/churn), maxTime: 999 + 1000*int64(i/churn)},
}
}

return h
}

for _, churn := range []int{10, 100, 1000} {
b.Run(fmt.Sprintf("churn=%d", churn), func(b *testing.B) {
if b.N > total/churn {
// Just to make sure that benchmark still makes sense.
panic("benchmark not prepared")
}
h := prepare(b, churn)
b.ResetTimer()

for i := 0; i < b.N; i++ {
require.NoError(b, h.Truncate(1000*int64(i)))
// Make sure the benchmark is meaningful and it's actually truncating the expected amount of series.
require.Equal(b, total-churn*i, int(h.NumSeries()))
}
})
}
}

func TestHead_Truncate(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
defer func() {
Expand Down
97 changes: 21 additions & 76 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,89 +288,34 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) {
}

// Delete removes all ids in the given map from the postings lists.
func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) {
// We will take an optimistic read lock for the entire method,
// and only lock for writing when we actually find something to delete.
//
// Each SeriesRef can appear in several Postings.
// To change each one, we need to know the label name and value that it is indexed under.
// We iterate over all label names, then for each name all values,
// and look for individual series to be deleted.
p.mtx.RLock()
defer p.mtx.RUnlock()

// Collect all keys relevant for deletion once. New keys added afterwards
// can by definition not be affected by any of the given deletes.
keys := make([]string, 0, len(p.m))
maxVals := 0
for n := range p.m {
keys = append(keys, n)
if len(p.m[n]) > maxVals {
maxVals = len(p.m[n])
}
}

vals := make([]string, 0, maxVals)
for _, n := range keys {
// Copy the values and iterate the copy: if we unlock in the loop below,
// another goroutine might modify the map while we are part-way through it.
vals = vals[:0]
for v := range p.m[n] {
vals = append(vals, v)
}

// For each posting we first analyse whether the postings list is affected by the deletes.
// If no, we remove the label value from the vals list.
// This way we only need to Lock once later.
for i := 0; i < len(vals); {
found := false
refs := p.m[n][vals[i]]
for _, id := range refs {
if _, ok := deleted[id]; ok {
i++
found = true
break
}
}
// affectedLabels contains all the labels that are affected by the deletion, there's no need to check other labels.
func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected map[labels.Label]struct{}) {
p.mtx.Lock()
defer p.mtx.Unlock()

if !found {
// Didn't match, bring the last value to this position, make the slice shorter and check again.
// The order of the slice doesn't matter as it comes from a map iteration.
vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1]
process := func(l labels.Label) {
orig := p.m[l.Name][l.Value]
repl := make([]storage.SeriesRef, 0, len(orig))
for _, id := range orig {
if _, ok := deleted[id]; !ok {
repl = append(repl, id)
}
}

// If no label values have deleted ids, just continue.
if len(vals) == 0 {
continue
}

// The only vals left here are the ones that contain deleted ids.
// Now we take the write lock and remove the ids.
p.mtx.RUnlock()
p.mtx.Lock()
for _, l := range vals {
repl := make([]storage.SeriesRef, 0, len(p.m[n][l]))

for _, id := range p.m[n][l] {
if _, ok := deleted[id]; !ok {
repl = append(repl, id)
}
}
if len(repl) > 0 {
p.m[n][l] = repl
} else {
delete(p.m[n], l)
if len(repl) > 0 {
p.m[l.Name][l.Value] = repl
} else {
delete(p.m[l.Name], l.Value)
// Delete the key if we removed all values.
if len(p.m[l.Name]) == 0 {
delete(p.m, l.Name)
}
}
}

// Delete the key if we removed all values.
if len(p.m[n]) == 0 {
delete(p.m, n)
}
p.mtx.Unlock()
p.mtx.RLock()
for l := range affected {
process(l)
}
process(allPostingsKey)
}

// Iter calls f for each postings list. It aborts if f returns an error and returns it.
Expand Down
67 changes: 35 additions & 32 deletions tsdb/index/postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,9 +979,13 @@ func TestMemPostings_Delete(t *testing.T) {
p.Add(3, labels.FromStrings("lbl2", "a"))

before := p.Get(allPostingsKey.Name, allPostingsKey.Value)
p.Delete(map[storage.SeriesRef]struct{}{
deletedRefs := map[storage.SeriesRef]struct{}{
2: {},
})
}
affectedLabels := map[labels.Label]struct{}{
{Name: "lbl1", Value: "b"}: {},
}
p.Delete(deletedRefs, affectedLabels)
after := p.Get(allPostingsKey.Name, allPostingsKey.Value)

// Make sure postings gotten before the delete have the old data when
Expand Down Expand Up @@ -1022,33 +1026,23 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
}

const total = 1e6
prepare := func() *MemPostings {
var ref storage.SeriesRef
next := func() storage.SeriesRef {
ref++
return ref
allSeries := [total]labels.Labels{}
nameValues := make([]string, 0, 100)
for i := 0; i < total; i++ {
nameValues = nameValues[:0]

// A thousand labels like lbl_x_of_1000, each with total/1000 values
thousand := "lbl_" + itoa(i%1000) + "_of_1000"
nameValues = append(nameValues, thousand, itoa(i/1000))
// A hundred labels like lbl_x_of_100, each with total/100 values.
hundred := "lbl_" + itoa(i%100) + "_of_100"
nameValues = append(nameValues, hundred, itoa(i/100))

if i < 100 {
ten := "lbl_" + itoa(i%10) + "_of_10"
nameValues = append(nameValues, ten, itoa(i%10))
}

p := NewMemPostings()
nameValues := make([]string, 0, 100)
for i := 0; i < total; i++ {
nameValues = nameValues[:0]

// A thousand labels like lbl_x_of_1000, each with total/1000 values
thousand := "lbl_" + itoa(i%1000) + "_of_1000"
nameValues = append(nameValues, thousand, itoa(i/1000))
// A hundred labels like lbl_x_of_100, each with total/100 values.
hundred := "lbl_" + itoa(i%100) + "_of_100"
nameValues = append(nameValues, hundred, itoa(i/100))

if i < 100 {
ten := "lbl_" + itoa(i%10) + "_of_10"
nameValues = append(nameValues, ten, itoa(i%10))
}

p.Add(next(), labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...))
}
return p
allSeries[i] = labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...)
}

for _, refs := range []int{1, 100, 10_000} {
Expand All @@ -1060,7 +1054,11 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
panic("benchmark not prepared")
}

p := prepare()
p := NewMemPostings()
for i := range allSeries {
p.Add(storage.SeriesRef(i), allSeries[i])
}

stop := make(chan struct{})
wg := sync.WaitGroup{}
for i := 0; i < reads; i++ {
Expand All @@ -1086,11 +1084,16 @@ func BenchmarkMemPostings_Delete(b *testing.B) {

b.ResetTimer()
for n := 0; n < b.N; n++ {
deleted := map[storage.SeriesRef]struct{}{}
deleted := make(map[storage.SeriesRef]struct{}, refs)
affected := make(map[labels.Label]struct{}, refs)
for i := 0; i < refs; i++ {
deleted[storage.SeriesRef(n*refs+i)] = struct{}{}
ref := storage.SeriesRef(n*refs + i)
deleted[ref] = struct{}{}
allSeries[ref].Range(func(l labels.Label) {
affected[l] = struct{}{}
})
}
p.Delete(deleted)
p.Delete(deleted, affected)
}
})
}
Expand Down

0 comments on commit fd1a89b

Please sign in to comment.