diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index 4bd1c71b2de..956e52f4902 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -20,18 +20,22 @@ import ( "fmt" "io" "math" + "net/http" + "net/http/httptest" "os" "os/exec" "path/filepath" "runtime" "strconv" "strings" + "sync" "syscall" "testing" "time" "github.com/alecthomas/kingpin/v2" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" "github.com/stretchr/testify/require" @@ -40,6 +44,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/rules" + "github.com/prometheus/prometheus/util/testutil" ) func init() { @@ -623,3 +628,149 @@ func TestRwProtoMsgFlagParser(t *testing.T) { }) } } + +func getMetricValue(t *testing.T, body io.Reader, metricType model.MetricType, metricName string) (float64, error) { + t.Helper() + + p := expfmt.TextParser{} + metricFamilies, err := p.TextToMetricFamilies(body) + if err != nil { + return 0, err + } + metricFamily, ok := metricFamilies[metricName] + if !ok { + return 0, errors.New("metric family not found") + } + metric := metricFamily.GetMetric() + if len(metric) != 1 { + return 0, errors.New("metric not found") + } + switch metricType { + case model.MetricTypeGauge: + return metric[0].GetGauge().GetValue(), nil + case model.MetricTypeCounter: + return metric[0].GetCounter().GetValue(), nil + default: + t.Fatalf("metric type %s not supported", metricType) + } + + return 0, errors.New("cannot get value") +} + +// This test verifies that metrics for the highest timestamps per queue account for relabelling. +// See: https://github.com/prometheus/prometheus/pull/17065. +func TestRemoteWrite_PerQueueMetricsAfterRelabeling(t *testing.T) { + t.Parallel() + + tmpDir := t.TempDir() + configFile := filepath.Join(tmpDir, "prometheus.yml") + + port := testutil.RandomUnprivilegedPort(t) + targetPort := testutil.RandomUnprivilegedPort(t) + + server := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) { + panic("should never be reached") + })) + t.Cleanup(server.Close) + + // Simulate a remote write relabeling that doesn't yield any series. + config := fmt.Sprintf(` +global: + scrape_interval: 1s +scrape_configs: + - job_name: 'self' + static_configs: + - targets: ['localhost:%d'] + - job_name: 'target' + static_configs: + - targets: ['localhost:%d'] +remote_write: + - url: %s + write_relabel_configs: + - source_labels: [job,__name__] + regex: 'target,special_metric' + action: keep +`, port, targetPort, server.URL) + require.NoError(t, os.WriteFile(configFile, []byte(config), 0o777)) + + prom := prometheusCommandWithLogging( + t, + configFile, + port, + fmt.Sprintf("--storage.tsdb.path=%s", tmpDir), + ) + require.NoError(t, prom.Start()) + + require.Eventually(t, func() bool { + r, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/metrics", port)) + if err != nil { + return false + } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + return false + } + + metrics, err := io.ReadAll(r.Body) + if err != nil { + return false + } + + gHighestTimestamp, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_highest_timestamp_in_seconds") + // The highest timestamp at storage level sees all samples, it should also consider the ones that are filtered out by relabeling. + if err != nil || gHighestTimestamp == 0 { + return false + } + + // The queue shouldn't see and send any sample, all samples are dropped due to relabeling, the metrics should reflect that. + droppedSamples, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeCounter, "prometheus_remote_storage_samples_dropped_total") + if err != nil || droppedSamples == 0 { + return false + } + + highestTimestamp, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_queue_highest_timestamp_seconds") + require.NoError(t, err) + require.Zero(t, highestTimestamp) + + highestSentTimestamp, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_queue_highest_sent_timestamp_seconds") + require.NoError(t, err) + require.Zero(t, highestSentTimestamp) + return true + }, 10*time.Second, 100*time.Millisecond) +} + +func prometheusCommandWithLogging(t *testing.T, configFilePath string, port int, extraArgs ...string) *exec.Cmd { + stdoutPipe, stdoutWriter := io.Pipe() + stderrPipe, stderrWriter := io.Pipe() + + var wg sync.WaitGroup + wg.Add(2) + + args := []string{ + "-test.main", + "--config.file=" + configFilePath, + "--web.listen-address=0.0.0.0:" + strconv.Itoa(port), + } + args = append(args, extraArgs...) + prom := exec.Command(promPath, args...) + prom.Stdout = stdoutWriter + prom.Stderr = stderrWriter + + go func() { + defer wg.Done() + captureLogsToTLog(t, stdoutPipe) + }() + go func() { + defer wg.Done() + captureLogsToTLog(t, stderrPipe) + }() + + t.Cleanup(func() { + prom.Process.Kill() + prom.Wait() + stdoutWriter.Close() + stderrWriter.Close() + wg.Wait() + }) + return prom +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index b274707bfff..ebb23606ce3 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -81,6 +81,7 @@ type queueManagerMetrics struct { droppedHistogramsTotal *prometheus.CounterVec enqueueRetriesTotal prometheus.Counter sentBatchDuration prometheus.Histogram + highestTimestamp *maxTimestamp highestSentTimestamp *maxTimestamp pendingSamples prometheus.Gauge pendingExemplars prometheus.Gauge @@ -227,12 +228,21 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 1 * time.Hour, }) + m.highestTimestamp = &maxTimestamp{ + Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_highest_timestamp_seconds", + Help: "Highest timestamp that was enqueued, in seconds since epoch. Initialized to 0 when no data has been received yet.", + ConstLabels: constLabels, + }), + } m.highestSentTimestamp = &maxTimestamp{ Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "queue_highest_sent_timestamp_seconds", - Help: "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch. Initialized to 0 when no data has been sent yet.", + Help: "Highest timestamp successfully sent by this queue, in seconds since epoch. Initialized to 0 when no data has been sent yet.", ConstLabels: constLabels, }), } @@ -337,6 +347,7 @@ func (m *queueManagerMetrics) register() { m.droppedHistogramsTotal, m.enqueueRetriesTotal, m.sentBatchDuration, + m.highestTimestamp, m.highestSentTimestamp, m.pendingSamples, m.pendingExemplars, @@ -372,6 +383,7 @@ func (m *queueManagerMetrics) unregister() { m.reg.Unregister(m.droppedHistogramsTotal) m.reg.Unregister(m.enqueueRetriesTotal) m.reg.Unregister(m.sentBatchDuration) + m.reg.Unregister(m.highestTimestamp) m.reg.Unregister(m.highestSentTimestamp) m.reg.Unregister(m.pendingSamples) m.reg.Unregister(m.pendingExemplars) @@ -440,9 +452,8 @@ type QueueManager struct { dataIn, dataDropped, dataOut, dataOutDuration *ewmaRate - metrics *queueManagerMetrics - interner *pool - highestRecvTimestamp *maxTimestamp + metrics *queueManagerMetrics + interner *pool } // NewQueueManager builds a new QueueManager and starts a new @@ -464,7 +475,6 @@ func NewQueueManager( client WriteClient, flushDeadline time.Duration, interner *pool, - highestRecvTimestamp *maxTimestamp, sm ReadyScrapeManager, enableExemplarRemoteWrite bool, enableNativeHistogramRemoteWrite bool, @@ -507,9 +517,8 @@ func NewQueueManager( dataOut: newEWMARate(ewmaWeight, shardUpdateDuration), dataOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), - metrics: metrics, - interner: interner, - highestRecvTimestamp: highestRecvTimestamp, + metrics: metrics, + interner: interner, protoMsg: protoMsg, enc: SnappyBlockCompression, // Hardcoded for now, but scaffolding exists for likely future use. @@ -1123,7 +1132,7 @@ func (t *QueueManager) calculateDesiredShards() int { dataOutDuration = t.dataOutDuration.rate() / float64(time.Second) dataPendingRate = dataInRate*dataKeptRatio - dataOutRate highestSent = t.metrics.highestSentTimestamp.Get() - highestRecv = t.highestRecvTimestamp.Get() + highestRecv = t.metrics.highestTimestamp.Get() delay = highestRecv - highestSent dataPending = delay * dataInRate * dataKeptRatio ) @@ -1332,7 +1341,10 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool { case tHistogram, tFloatHistogram: s.qm.metrics.pendingHistograms.Inc() s.enqueuedHistograms.Inc() + default: + return true } + s.qm.metrics.highestTimestamp.Set(float64(data.timestamp / 1000)) return true } } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 38eda81d977..fe0a409508e 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -56,17 +56,6 @@ import ( const defaultFlushDeadline = 1 * time.Minute -func newHighestTimestampMetric() *maxTimestamp { - return &maxTimestamp{ - Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "highest_timestamp_in_seconds", - Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch. Initialized to 0 when no data has been received yet", - }), - } -} - func TestBasicContentNegotiation(t *testing.T) { queueConfig := config.DefaultQueueConfig queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond) @@ -321,7 +310,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), nil, false, false, protoMsg) return m } @@ -774,7 +763,7 @@ func TestDisableReshardOnRetry(t *testing.T) { } ) - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), nil, false, false, config.RemoteWriteProtoMsgV1) m.StoreSeries(fakeSeries, 0) // Attempt to samples while the manager is running. We immediately stop the @@ -1391,7 +1380,7 @@ func BenchmarkStoreSeries(b *testing.B) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), nil, false, false, config.RemoteWriteProtoMsgV1) m.externalLabels = tc.externalLabels m.relabelConfigs = tc.relabelConfigs @@ -1431,7 +1420,7 @@ func BenchmarkStartup(b *testing.B) { // todo: test with new proto type(s) m := NewQueueManager(metrics, watcherMetrics, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), - cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), nil, false, false, config.RemoteWriteProtoMsgV1) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.MaxSegment = segments[len(segments)-2] m.watcher.SetMetrics() @@ -1532,7 +1521,7 @@ func TestCalculateDesiredShards(t *testing.T) { samplesIn.incr(s) samplesIn.tick() - m.highestRecvTimestamp.Set(float64(startedAt.Add(ts).Unix())) + m.metrics.highestTimestamp.Set(float64(startedAt.Add(ts).Unix())) } // helper function for sending samples. @@ -1740,7 +1729,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) { forceEMWA(m.dataOut, tc.dataOut*int64(shardUpdateDuration/time.Second)) forceEMWA(m.dataDropped, tc.dataDropped*int64(shardUpdateDuration/time.Second)) forceEMWA(m.dataOutDuration, int64(tc.dataOutDuration*float64(shardUpdateDuration))) - m.highestRecvTimestamp.value = tc.backlog // Not Set() because it can only increase value. + m.metrics.highestTimestamp.value = tc.backlog // Not Set() because it can only increase value. require.Equal(t, tc.expectedShards, m.calculateDesiredShards()) }) diff --git a/storage/remote/write.go b/storage/remote/write.go index 03630954447..9486e87ff7b 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -213,7 +213,6 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { c, rws.flushDeadline, rws.interner, - rws.highestTimestamp, rws.scraper, rwConf.SendExemplars, rwConf.SendNativeHistograms,