Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6216](https://github.com/thanos-io/thanos/pull/6216) Receiver: removed hard-coded value of EnableExemplarStorage flag and set it according to max-exemplar value.
- [#6222](https://github.com/thanos-io/thanos/pull/6222) mixin(Receive): Fix tenant series received charts.
- [#6218](https://github.com/thanos-io/thanos/pull/6218) mixin(Store): handle ResourceExhausted as a non-server error. As a consequence, this error won't contribute to Store's grpc errors alerts.
- [#6271](https://github.com/thanos-io/thanos/pull/6271) Receive: Fix segfault in `LabelValues` during head compaction.

### Changed
- [#6168](https://github.com/thanos-io/thanos/pull/6168) Receiver: Make ketama hashring fail early when configured with number of nodes lower than the replication factor.
Expand Down
86 changes: 81 additions & 5 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,25 @@ package receive
import (
"context"
"io"
"math"
"os"
"strings"
"testing"
"time"

"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"

"github.com/efficientgo/core/testutil"
"google.golang.org/grpc"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
Expand Down Expand Up @@ -606,7 +607,61 @@ func TestMultiTSDBWithNilStore(t *testing.T) {
testutil.Ok(t, appendSample(m, tenantID, time.Now()))
}

type slowClient struct {
store.Client
}

func (s *slowClient) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest, _ ...grpc.CallOption) (*storepb.LabelValuesResponse, error) {
<-time.After(10 * time.Millisecond)
return s.Client.LabelValues(ctx, r)
}

func TestProxyLabelValues(t *testing.T) {
dir := t.TempDir()
m := NewMultiTSDB(
dir, nil, prometheus.NewRegistry(), &tsdb.Options{
RetentionDuration: 10 * time.Minute.Milliseconds(),
MinBlockDuration: 5 * time.Minute.Milliseconds(),
MaxBlockDuration: 5 * time.Minute.Milliseconds(),
NoLockfile: true,
},
labels.FromStrings("replica", "01"),
"tenant_id",
nil,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
select {
case <-ctx.Done():
return
default:
testutil.Ok(t, queryLabelValues(ctx, m))
}
}
}()

// Append several samples to a TSDB outside the retention period.
testutil.Ok(t, appendSampleWithLabels(m, "tenant-a", labels.FromStrings(labels.MetricName, "metric-a"), time.Now().Add(-5*time.Hour)))
testutil.Ok(t, appendSampleWithLabels(m, "tenant-a", labels.FromStrings(labels.MetricName, "metric-b"), time.Now().Add(-3*time.Hour)))
testutil.Ok(t, appendSampleWithLabels(m, "tenant-b", labels.FromStrings(labels.MetricName, "metric-c"), time.Now().Add(-1*time.Hour)))

// Append a sample within the retention period and flush all tenants.
// This will lead deletion of blocks that fall out of the retention period.
testutil.Ok(t, appendSampleWithLabels(m, "tenant-b", labels.FromStrings(labels.MetricName, "metric-d"), time.Now()))
testutil.Ok(t, m.Flush())
}

func appendSample(m *MultiTSDB, tenant string, timestamp time.Time) error {
return appendSampleWithLabels(m, tenant, labels.FromStrings("foo", "bar"), timestamp)
}

func appendSampleWithLabels(m *MultiTSDB, tenant string, lbls labels.Labels, timestamp time.Time) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

Expand All @@ -623,14 +678,35 @@ func appendSample(m *MultiTSDB, tenant string, timestamp time.Time) error {
return err
}

_, err = a.Append(0, labels.FromStrings("foo", "bar"), timestamp.UnixMilli(), 10)
_, err = a.Append(0, lbls, timestamp.UnixMilli(), 10)
if err != nil {
return err
}

return a.Commit()
}

func queryLabelValues(ctx context.Context, m *MultiTSDB) error {
proxy := store.NewProxyStore(nil, nil, func() []store.Client {
clients := m.TSDBLocalClients()
if len(clients) > 0 {
clients[0] = &slowClient{clients[0]}
}
return clients
}, component.Store, nil, 1*time.Minute, store.LazyRetrieval)

req := &storepb.LabelValuesRequest{
Label: labels.MetricName,
Start: math.MinInt64,
End: math.MaxInt64,
}
_, err := proxy.LabelValues(ctx, req)
if err != nil {
return err
}
return nil
}

func BenchmarkMultiTSDB(b *testing.B) {
dir := b.TempDir()

Expand Down
23 changes: 21 additions & 2 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"math"
"sort"
"strings"
"sync"

"github.com/go-kit/log"
Expand Down Expand Up @@ -277,7 +278,16 @@ func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest
sort.Strings(res)
}

return &storepb.LabelNamesResponse{Names: res}, nil
// Label values can come from a postings table of a memory-mapped block which can be deleted during
// head compaction. Since we close the block querier before we return from the function,
// we need to copy label values to make sure the client still has access to the data when
// a block is deleted.
values := make([]string, len(res))
for i := range res {
values[i] = strings.Clone(res[i])
}

return &storepb.LabelNamesResponse{Names: values}, nil
}

// LabelValues returns all known label values for a given label name.
Expand Down Expand Up @@ -312,5 +322,14 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque
return nil, status.Error(codes.Internal, err.Error())
}

return &storepb.LabelValuesResponse{Values: res}, nil
// Label values can come from a postings table of a memory-mapped block which can be deleted during
// head compaction. Since we close the block querier before we return from the function,
// we need to copy label values to make sure the client still has access to the data when
// a block is deleted.
values := make([]string, len(res))
for i := range res {
values[i] = strings.Clone(res[i])
}

return &storepb.LabelValuesResponse{Values: values}, nil
}