Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions cmd/prometheus/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@ import (
"fmt"
"io"
"math"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"testing"
"time"

"github.com/alecthomas/kingpin/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
Expand All @@ -40,6 +44,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/util/testutil"
)

func init() {
Expand Down Expand Up @@ -623,3 +628,149 @@ func TestRwProtoMsgFlagParser(t *testing.T) {
})
}
}

func getMetricValue(t *testing.T, body io.Reader, metricType model.MetricType, metricName string) (float64, error) {
t.Helper()

p := expfmt.TextParser{}
metricFamilies, err := p.TextToMetricFamilies(body)
if err != nil {
return 0, err
}
metricFamily, ok := metricFamilies[metricName]
if !ok {
return 0, errors.New("metric family not found")
}
metric := metricFamily.GetMetric()
if len(metric) != 1 {
return 0, errors.New("metric not found")
}
switch metricType {
case model.MetricTypeGauge:
return metric[0].GetGauge().GetValue(), nil
case model.MetricTypeCounter:
return metric[0].GetCounter().GetValue(), nil
default:
t.Fatalf("metric type %s not supported", metricType)
}

return 0, errors.New("cannot get value")
}

// This test verifies that metrics for the highest timestamps per queue account for relabelling.
// See: https://github.com/prometheus/prometheus/pull/17065.
func TestRemoteWrite_PerQueueMetricsAfterRelabeling(t *testing.T) {
t.Parallel()

tmpDir := t.TempDir()
configFile := filepath.Join(tmpDir, "prometheus.yml")

port := testutil.RandomUnprivilegedPort(t)
targetPort := testutil.RandomUnprivilegedPort(t)

server := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
panic("should never be reached")
}))
t.Cleanup(server.Close)

// Simulate a remote write relabeling that doesn't yield any series.
config := fmt.Sprintf(`
global:
scrape_interval: 1s
scrape_configs:
- job_name: 'self'
static_configs:
- targets: ['localhost:%d']
- job_name: 'target'
static_configs:
- targets: ['localhost:%d']
remote_write:
- url: %s
write_relabel_configs:
- source_labels: [job,__name__]
regex: 'target,special_metric'
action: keep
`, port, targetPort, server.URL)
require.NoError(t, os.WriteFile(configFile, []byte(config), 0o777))

prom := prometheusCommandWithLogging(
t,
configFile,
port,
fmt.Sprintf("--storage.tsdb.path=%s", tmpDir),
)
require.NoError(t, prom.Start())

require.Eventually(t, func() bool {
r, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/metrics", port))
if err != nil {
return false
}
defer r.Body.Close()
if r.StatusCode != http.StatusOK {
return false
}

metrics, err := io.ReadAll(r.Body)
if err != nil {
return false
}

gHighestTimestamp, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_highest_timestamp_in_seconds")
// The highest timestamp at storage level sees all samples, it should also consider the ones that are filtered out by relabeling.
if err != nil || gHighestTimestamp == 0 {
return false
}

// The queue shouldn't see and send any sample, all samples are dropped due to relabeling, the metrics should reflect that.
droppedSamples, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeCounter, "prometheus_remote_storage_samples_dropped_total")
if err != nil || droppedSamples == 0 {
return false
}

highestTimestamp, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_queue_highest_timestamp_seconds")
require.NoError(t, err)
require.Zero(t, highestTimestamp)

highestSentTimestamp, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_queue_highest_sent_timestamp_seconds")
require.NoError(t, err)
require.Zero(t, highestSentTimestamp)
return true
}, 10*time.Second, 100*time.Millisecond)
}

func prometheusCommandWithLogging(t *testing.T, configFilePath string, port int, extraArgs ...string) *exec.Cmd {
stdoutPipe, stdoutWriter := io.Pipe()
stderrPipe, stderrWriter := io.Pipe()

var wg sync.WaitGroup
wg.Add(2)

args := []string{
"-test.main",
"--config.file=" + configFilePath,
"--web.listen-address=0.0.0.0:" + strconv.Itoa(port),
}
args = append(args, extraArgs...)
prom := exec.Command(promPath, args...)
prom.Stdout = stdoutWriter
prom.Stderr = stderrWriter

go func() {
defer wg.Done()
captureLogsToTLog(t, stdoutPipe)
}()
go func() {
defer wg.Done()
captureLogsToTLog(t, stderrPipe)
}()

t.Cleanup(func() {
prom.Process.Kill()
prom.Wait()
stdoutWriter.Close()
stderrWriter.Close()
wg.Wait()
})
return prom
}
30 changes: 21 additions & 9 deletions storage/remote/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type queueManagerMetrics struct {
droppedHistogramsTotal *prometheus.CounterVec
enqueueRetriesTotal prometheus.Counter
sentBatchDuration prometheus.Histogram
highestTimestamp *maxTimestamp
highestSentTimestamp *maxTimestamp
pendingSamples prometheus.Gauge
pendingExemplars prometheus.Gauge
Expand Down Expand Up @@ -227,12 +228,21 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
})
m.highestTimestamp = &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_highest_timestamp_seconds",
Help: "Highest timestamp that was enqueued, in seconds since epoch. Initialized to 0 when no data has been received yet.",
ConstLabels: constLabels,
}),
}
m.highestSentTimestamp = &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_highest_sent_timestamp_seconds",
Help: "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch. Initialized to 0 when no data has been sent yet.",
Help: "Highest timestamp successfully sent by this queue, in seconds since epoch. Initialized to 0 when no data has been sent yet.",
ConstLabels: constLabels,
}),
}
Expand Down Expand Up @@ -337,6 +347,7 @@ func (m *queueManagerMetrics) register() {
m.droppedHistogramsTotal,
m.enqueueRetriesTotal,
m.sentBatchDuration,
m.highestTimestamp,
m.highestSentTimestamp,
m.pendingSamples,
m.pendingExemplars,
Expand Down Expand Up @@ -372,6 +383,7 @@ func (m *queueManagerMetrics) unregister() {
m.reg.Unregister(m.droppedHistogramsTotal)
m.reg.Unregister(m.enqueueRetriesTotal)
m.reg.Unregister(m.sentBatchDuration)
m.reg.Unregister(m.highestTimestamp)
m.reg.Unregister(m.highestSentTimestamp)
m.reg.Unregister(m.pendingSamples)
m.reg.Unregister(m.pendingExemplars)
Expand Down Expand Up @@ -440,9 +452,8 @@ type QueueManager struct {

dataIn, dataDropped, dataOut, dataOutDuration *ewmaRate

metrics *queueManagerMetrics
interner *pool
highestRecvTimestamp *maxTimestamp
metrics *queueManagerMetrics
interner *pool
}

// NewQueueManager builds a new QueueManager and starts a new
Expand All @@ -464,7 +475,6 @@ func NewQueueManager(
client WriteClient,
flushDeadline time.Duration,
interner *pool,
highestRecvTimestamp *maxTimestamp,
sm ReadyScrapeManager,
enableExemplarRemoteWrite bool,
enableNativeHistogramRemoteWrite bool,
Expand Down Expand Up @@ -507,9 +517,8 @@ func NewQueueManager(
dataOut: newEWMARate(ewmaWeight, shardUpdateDuration),
dataOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),

metrics: metrics,
interner: interner,
highestRecvTimestamp: highestRecvTimestamp,
metrics: metrics,
interner: interner,

protoMsg: protoMsg,
enc: SnappyBlockCompression, // Hardcoded for now, but scaffolding exists for likely future use.
Expand Down Expand Up @@ -1123,7 +1132,7 @@ func (t *QueueManager) calculateDesiredShards() int {
dataOutDuration = t.dataOutDuration.rate() / float64(time.Second)
dataPendingRate = dataInRate*dataKeptRatio - dataOutRate
highestSent = t.metrics.highestSentTimestamp.Get()
highestRecv = t.highestRecvTimestamp.Get()
highestRecv = t.metrics.highestTimestamp.Get()
delay = highestRecv - highestSent
dataPending = delay * dataInRate * dataKeptRatio
)
Expand Down Expand Up @@ -1332,7 +1341,10 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool {
case tHistogram, tFloatHistogram:
s.qm.metrics.pendingHistograms.Inc()
s.enqueuedHistograms.Inc()
default:
return true
}
s.qm.metrics.highestTimestamp.Set(float64(data.timestamp / 1000))
return true
}
}
Expand Down
23 changes: 6 additions & 17 deletions storage/remote/queue_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,6 @@ import (

const defaultFlushDeadline = 1 * time.Minute

func newHighestTimestampMetric() *maxTimestamp {
return &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "highest_timestamp_in_seconds",
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch. Initialized to 0 when no data has been received yet",
}),
}
}

func TestBasicContentNegotiation(t *testing.T) {
queueConfig := config.DefaultQueueConfig
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
Expand Down Expand Up @@ -321,7 +310,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), nil, false, false, protoMsg)

return m
}
Expand Down Expand Up @@ -774,7 +763,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
}
)

m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.StoreSeries(fakeSeries, 0)

// Attempt to samples while the manager is running. We immediately stop the
Expand Down Expand Up @@ -1391,7 +1380,7 @@ func BenchmarkStoreSeries(b *testing.B) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.externalLabels = tc.externalLabels
m.relabelConfigs = tc.relabelConfigs

Expand Down Expand Up @@ -1431,7 +1420,7 @@ func BenchmarkStartup(b *testing.B) {
// todo: test with new proto type(s)
m := NewQueueManager(metrics, watcherMetrics, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2]
m.watcher.SetMetrics()
Expand Down Expand Up @@ -1532,7 +1521,7 @@ func TestCalculateDesiredShards(t *testing.T) {
samplesIn.incr(s)
samplesIn.tick()

m.highestRecvTimestamp.Set(float64(startedAt.Add(ts).Unix()))
m.metrics.highestTimestamp.Set(float64(startedAt.Add(ts).Unix()))
}

// helper function for sending samples.
Expand Down Expand Up @@ -1740,7 +1729,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
forceEMWA(m.dataOut, tc.dataOut*int64(shardUpdateDuration/time.Second))
forceEMWA(m.dataDropped, tc.dataDropped*int64(shardUpdateDuration/time.Second))
forceEMWA(m.dataOutDuration, int64(tc.dataOutDuration*float64(shardUpdateDuration)))
m.highestRecvTimestamp.value = tc.backlog // Not Set() because it can only increase value.
m.metrics.highestTimestamp.value = tc.backlog // Not Set() because it can only increase value.

require.Equal(t, tc.expectedShards, m.calculateDesiredShards())
})
Expand Down
1 change: 0 additions & 1 deletion storage/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
c,
rws.flushDeadline,
rws.interner,
rws.highestTimestamp,
rws.scraper,
rwConf.SendExemplars,
rwConf.SendNativeHistograms,
Expand Down