Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Removed

## [v0.35.1](https://github.com/thanos-io/thanos/tree/release-0.35) - 28.05.2024

### Fixed

- [#7323](https://github.com/thanos-io/thanos/pull/7323) Sidecar: wait for prometheus on startup
- [#6948](https://github.com/thanos-io/thanos/pull/6948) Receive: fix goroutines leak during series requests to thanos store api.
- [#7382](https://github.com/thanos-io/thanos/pull/7382) *: Ensure objstore flag values are masked & disable debug/pprof/cmdline
- [#7392](https://github.com/thanos-io/thanos/pull/7392) Query: fix broken min, max for pre 0.34.1 sidecars
- [#7373](https://github.com/thanos-io/thanos/pull/7373) Receive: Fix stats for remote write
- [#7318](https://github.com/thanos-io/thanos/pull/7318) Compactor: Recover from panic to log block ID

### Added

### Changed

### Removed

## [v0.35.0](https://github.com/thanos-io/thanos/tree/release-0.35) - 02.05.2024

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.35.0
0.35.1
5 changes: 5 additions & 0 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ func getFlagsMap(flags []*kingpin.FlagModel) map[string]string {
if boilerplateFlags.GetFlag(f.Name) != nil {
continue
}
// Mask inline objstore flag which can have credentials.
if f.Name == "objstore.config" || f.Name == "objstore.config-file" {
flagsMap[f.Name] = "<REDACTED>"
continue
}
flagsMap[f.Name] = f.Value.String()
}

Expand Down
99 changes: 61 additions & 38 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,64 +172,87 @@ func runSidecar(
Help: "Boolean indicator whether the sidecar can reach its Prometheus peer.",
})

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// Only check Prometheus's flags when upload is enabled.
if uploads {
// Check prometheus's flags to ensure same sidecar flags.
if err := validatePrometheus(ctx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil {
return errors.Wrap(err, "validate Prometheus flags")
}
}
ctx := context.Background()
// Only check Prometheus's flags when upload is enabled.
if uploads {
// Check prometheus's flags to ensure same sidecar flags.
// We retry infinitely until we validated prometheus flags
err := runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
defer iterCancel()

// We retry infinitely until we reach and fetch BuildVersion from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := m.BuildVersion(ctx); err != nil {
if err := validatePrometheus(iterCtx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch prometheus version. Is Prometheus running? Retrying",
"msg", "failed to validate prometheus flags. Is Prometheus running? Retrying",
"err", err,
)
return err
}

level.Info(logger).Log(
"msg", "successfully loaded prometheus version",
"msg", "successfully validated prometheus flags",
)
return nil
})
if err != nil {
return errors.Wrap(err, "failed to get prometheus version")
return errors.Wrap(err, "failed to validate prometheus flags")
}
}

// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err = runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := m.UpdateLabels(ctx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
"err", err,
)
promUp.Set(0)
statusProber.NotReady(err)
return err
}
// We retry infinitely until we reach and fetch BuildVersion from our Prometheus.
err := runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
defer iterCancel()

level.Info(logger).Log(
"msg", "successfully loaded prometheus external labels",
"external_labels", m.Labels().String(),
if err := m.BuildVersion(iterCtx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch prometheus version. Is Prometheus running? Retrying",
"err", err,
)
promUp.Set(1)
statusProber.Ready()
return nil
})
if err != nil {
return errors.Wrap(err, "initial external labels query")
return err
}

if len(m.Labels()) == 0 {
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
level.Info(logger).Log(
"msg", "successfully loaded prometheus version",
)
return nil
})
if err != nil {
return errors.Wrap(err, "failed to get prometheus version")
}

// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err = runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
defer iterCancel()

if err := m.UpdateLabels(iterCtx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
"err", err,
)
return err
}

level.Info(logger).Log(
"msg", "successfully loaded prometheus external labels",
"external_labels", m.Labels().String(),
)
return nil
})
if err != nil {
return errors.Wrap(err, "initial external labels query")
}

if len(m.Labels()) == 0 {
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
}
promUp.Set(1)
statusProber.Ready()

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// Periodically query the Prometheus config. We use this as a heartbeat as well as for updating
// the external labels we apply.
return runutil.Repeat(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
Expand Down
16 changes: 16 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -871,6 +872,21 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp
return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir")
}

defer func() {
if p := recover(); p != nil {
var sb strings.Builder

cgIDs := cg.IDs()
for i, blid := range cgIDs {
_, _ = sb.WriteString(blid.String())
if i < len(cgIDs)-1 {
_, _ = sb.WriteString(",")
}
}
rerr = fmt.Errorf("paniced while compacting %s: %v", sb.String(), p)
}
}()

errChan := make(chan error, 1)
err := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (err error) {
shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp, blockDeletableChecker, compactionLifecycleCallback, errChan)
Expand Down
15 changes: 0 additions & 15 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,20 +241,6 @@ func aggrsFromFunc(f string) []storepb.Aggr {
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}
}

func storeHintsFromPromHints(hints *storage.SelectHints) *storepb.QueryHints {
return &storepb.QueryHints{
StepMillis: hints.Step,
Func: &storepb.Func{
Name: hints.Func,
},
Grouping: &storepb.Grouping{
By: hints.By,
Labels: hints.Grouping,
},
Range: &storepb.Range{Millis: hints.Range},
}
}

func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
if hints == nil {
hints = &storage.SelectHints{
Expand Down Expand Up @@ -351,7 +337,6 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
ShardInfo: q.shardInfo,
PartialResponseStrategy: q.partialResponseStrategy,
SkipChunks: q.skipChunks,
QueryHints: storeHintsFromPromHints(hints),
}
if q.isDedupEnabled() {
// Soft ask to sort without replica labels and push them at the end of labelset.
Expand Down
35 changes: 16 additions & 19 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,35 +681,32 @@ type remoteWriteParams struct {
alreadyReplicated bool
}

func (h *Handler) gatherWriteStats(writes ...map[endpointReplica]map[string]trackedSeries) tenantRequestStats {
func (h *Handler) gatherWriteStats(localWrites map[endpointReplica]map[string]trackedSeries) tenantRequestStats {
var stats tenantRequestStats = make(tenantRequestStats)

for _, write := range writes {
for er := range write {
for tenant, series := range write[er] {
samples := 0
for er := range localWrites {
for tenant, series := range localWrites[er] {
samples := 0

for _, ts := range series.timeSeries {
samples += len(ts.Samples)
}
for _, ts := range series.timeSeries {
samples += len(ts.Samples)
}

if st, ok := stats[tenant]; ok {
st.timeseries += len(series.timeSeries)
st.totalSamples += samples
if st, ok := stats[tenant]; ok {
st.timeseries += len(series.timeSeries)
st.totalSamples += samples

stats[tenant] = st
} else {
stats[tenant] = requestStats{
timeseries: len(series.timeSeries),
totalSamples: samples,
}
stats[tenant] = st
} else {
stats[tenant] = requestStats{
timeseries: len(series.timeSeries),
totalSamples: samples,
}
}
}
}

return stats

}

func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) (tenantRequestStats, error) {
Expand Down Expand Up @@ -739,7 +736,7 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) (
return stats, err
}

stats = h.gatherWriteStats(localWrites, remoteWrites)
stats = h.gatherWriteStats(localWrites)

// Prepare a buffered channel to receive the responses from the local and remote writes. Remote writes will all go
// asynchronously and with this capacity we will never block on writing to the channel.
Expand Down
1 change: 0 additions & 1 deletion pkg/server/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func (s *Server) Handle(pattern string, handler http.Handler) {

func registerProfiler(mux *http.ServeMux) {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
Expand Down
2 changes: 0 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
var resp respSet
if s.sortingStrategy == sortingStrategyStore {
resp = newEagerRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
Expand All @@ -1585,7 +1584,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
)
} else {
resp = newLazyRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
Expand Down
14 changes: 1 addition & 13 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto
// Don't ask for more than available time. This includes potential `minTime` flag limit.
availableMinTime, _ := p.timestamps()
if r.MinTime < availableMinTime {
// Align min time with the step to avoid missing data when it gets retrieved by the upper layer's PromQL engine.
// This also is necessary when Sidecar uploads a block and then availableMinTime
// becomes a fixed timestamp.
if r.QueryHints != nil && r.QueryHints.StepMillis != 0 {
diff := availableMinTime - r.MinTime
r.MinTime += (diff / r.QueryHints.StepMillis) * r.QueryHints.StepMillis
// Add one more to strictly fit within --min-time -> infinity.
if r.MinTime != availableMinTime {
r.MinTime += r.QueryHints.StepMillis
}
} else {
r.MinTime = availableMinTime
}
r.MinTime = availableMinTime
}

extLsetToRemove := map[string]struct{}{}
Expand Down
Loading