diff --git a/integration/alertmanager_test.go b/integration/alertmanager_test.go index 3069dc10b2a..2288ab879c1 100644 --- a/integration/alertmanager_test.go +++ b/integration/alertmanager_test.go @@ -5,8 +5,8 @@ package main import ( "context" "testing" - "time" + "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/integration/e2e" @@ -80,8 +80,9 @@ func TestAlertmanagerStoreAPI(t *testing.T) { err = c.SetAlertmanagerConfig(context.Background(), cortexAlertmanagerUserConfigYaml, map[string]string{}) require.NoError(t, err) - time.Sleep(2 * time.Second) - require.NoError(t, am.WaitSumMetrics(e2e.Equals(0), "cortex_alertmanager_config_invalid")) + require.NoError(t, am.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_alertmanager_config_invalid"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1")), + e2e.WaitMissingMetrics)) cfg, err := c.GetAlertmanagerConfig(context.Background()) require.NoError(t, err) @@ -97,7 +98,10 @@ func TestAlertmanagerStoreAPI(t *testing.T) { err = c.DeleteAlertmanagerConfig(context.Background()) require.NoError(t, err) - time.Sleep(2 * time.Second) + // The deleted config is applied asynchronously, so we should wait until the metric + // disappear for the specific user. + require.NoError(t, am.WaitRemovedMetric("cortex_alertmanager_config_invalid", e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "user", "user-1")))) cfg, err = c.GetAlertmanagerConfig(context.Background()) require.Error(t, err) diff --git a/integration/api_ruler_test.go b/integration/api_ruler_test.go index 5c65c86be0e..f4799362689 100644 --- a/integration/api_ruler_test.go +++ b/integration/api_ruler_test.go @@ -6,6 +6,7 @@ import ( "path/filepath" "testing" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" @@ -137,6 +138,9 @@ func TestRulerAPISingleBinary(t *testing.T) { require.Equal(t, retrievedNamespace[0].Name, "rule") // Check to make sure prometheus engine metrics are available for both engine types - require.NoError(t, cortex.WaitForMetricWithLabels(e2e.EqualsSingle(0), "prometheus_engine_queries", map[string]string{"engine": "querier"})) - require.NoError(t, cortex.WaitForMetricWithLabels(e2e.EqualsSingle(0), "prometheus_engine_queries", map[string]string{"engine": "ruler"})) + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"prometheus_engine_queries"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "engine", "querier")))) + + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"prometheus_engine_queries"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "engine", "ruler")))) } diff --git a/integration/e2e/composite_service.go b/integration/e2e/composite_service.go index d1a5e9b082e..4b89d45af64 100644 --- a/integration/e2e/composite_service.go +++ b/integration/e2e/composite_service.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/pkg/errors" + "github.com/cortexproject/cortex/pkg/util" ) @@ -39,52 +41,41 @@ func (s *CompositeHTTPService) Instances() []*HTTPService { // WaitSumMetrics waits for at least one instance of each given metric names to be present and their sums, returning true // when passed to given isExpected(...). func (s *CompositeHTTPService) WaitSumMetrics(isExpected func(sums ...float64) bool, metricNames ...string) error { + return s.WaitSumMetricsWithOptions(isExpected, metricNames) +} + +func (s *CompositeHTTPService) WaitSumMetricsWithOptions(isExpected func(sums ...float64) bool, metricNames []string, opts ...MetricsOption) error { var ( - sums []float64 - err error + sums []float64 + err error + options = buildMetricsOptions(opts) ) for s.retryBackoff.Reset(); s.retryBackoff.Ongoing(); { - sums, err = s.SumMetrics(metricNames...) - if err != nil { - return err - } - - if isExpected(sums...) { - return nil + sums, err = s.SumMetrics(metricNames, opts...) + if options.WaitMissingMetrics && errors.Is(err, errMissingMetric) { + continue } - - s.retryBackoff.Wait() - } - - return fmt.Errorf("unable to find metrics %s with expected values. Last values: %v", metricNames, sums) -} - -func (s *CompositeHTTPService) WaitSumMetricWithLabels(isExpected func(sums float64) bool, metricName string, expectedLabels map[string]string) error { - lastSum := 0.0 - - for s.retryBackoff.Reset(); s.retryBackoff.Ongoing(); { - lastSum, err := s.SumMetricWithLabels(metricName, expectedLabels) if err != nil { return err } - if isExpected(lastSum) { + if isExpected(sums...) { return nil } s.retryBackoff.Wait() } - return fmt.Errorf("unable to find metric %s with labels %v with expected value. Last value: %v", metricName, expectedLabels, lastSum) + return fmt.Errorf("unable to find metrics %s with expected values. Last error: %v. Last values: %v", metricNames, err, sums) } // SumMetrics returns the sum of the values of each given metric names. -func (s *CompositeHTTPService) SumMetrics(metricNames ...string) ([]float64, error) { +func (s *CompositeHTTPService) SumMetrics(metricNames []string, opts ...MetricsOption) ([]float64, error) { sums := make([]float64, len(metricNames)) for _, service := range s.services { - partials, err := service.SumMetrics(metricNames...) + partials, err := service.SumMetrics(metricNames, opts...) if err != nil { return nil, err } @@ -100,19 +91,3 @@ func (s *CompositeHTTPService) SumMetrics(metricNames ...string) ([]float64, err return sums, nil } - -// SumMetricWithLabels returns the sum of the values of metric with matching labels across all services. -func (s *CompositeHTTPService) SumMetricWithLabels(metricName string, expectedLabels map[string]string) (float64, error) { - sum := 0.0 - - for _, service := range s.services { - s, err := service.SumMetricWithLabels(metricName, expectedLabels) - if err != nil { - return 0, err - } - - sum += s - } - - return sum, nil -} diff --git a/integration/e2e/metrics.go b/integration/e2e/metrics.go index 4e1b6ff5c4b..445cdcd24d8 100644 --- a/integration/e2e/metrics.go +++ b/integration/e2e/metrics.go @@ -6,7 +6,7 @@ import ( io_prometheus_client "github.com/prometheus/client_model/go" ) -func getValue(m *io_prometheus_client.Metric) float64 { +func getMetricValue(m *io_prometheus_client.Metric) float64 { if m.GetGauge() != nil { return m.GetGauge().GetValue() } else if m.GetCounter() != nil { @@ -20,10 +20,63 @@ func getValue(m *io_prometheus_client.Metric) float64 { } } -func sumValues(family *io_prometheus_client.MetricFamily) float64 { +func getMetricCount(m *io_prometheus_client.Metric) float64 { + if m.GetHistogram() != nil { + return float64(m.GetHistogram().GetSampleCount()) + } else if m.GetSummary() != nil { + return float64(m.GetSummary().GetSampleCount()) + } else { + return 0 + } +} + +func getValues(metrics []*io_prometheus_client.Metric, opts MetricsOptions) []float64 { + values := make([]float64, 0, len(metrics)) + for _, m := range metrics { + values = append(values, opts.GetValue(m)) + } + return values +} + +func filterMetrics(metrics []*io_prometheus_client.Metric, opts MetricsOptions) []*io_prometheus_client.Metric { + // If no label matcher is configured, then no filtering should be done. + if len(opts.LabelMatchers) == 0 { + return metrics + } + if len(metrics) == 0 { + return metrics + } + + filtered := make([]*io_prometheus_client.Metric, 0, len(metrics)) + + for _, m := range metrics { + metricLabels := map[string]string{} + for _, lp := range m.GetLabel() { + metricLabels[lp.GetName()] = lp.GetValue() + } + + matches := true + for _, matcher := range opts.LabelMatchers { + if !matcher.Matches(metricLabels[matcher.Name]) { + matches = false + break + } + } + + if !matches { + continue + } + + filtered = append(filtered, m) + } + + return filtered +} + +func sumValues(values []float64) float64 { sum := 0.0 - for _, m := range family.Metric { - sum += getValue(m) + for _, v := range values { + sum += v } return sum } diff --git a/integration/e2e/metrics_options.go b/integration/e2e/metrics_options.go new file mode 100644 index 00000000000..3ec98601538 --- /dev/null +++ b/integration/e2e/metrics_options.go @@ -0,0 +1,52 @@ +package e2e + +import ( + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/prometheus/prometheus/pkg/labels" +) + +var ( + DefaultMetricsOptions = MetricsOptions{ + GetValue: getMetricValue, + WaitMissingMetrics: false, + } +) + +// GetMetricValueFunc defined the signature of a function used to get the metric value. +type GetMetricValueFunc func(m *io_prometheus_client.Metric) float64 + +// MetricsOption defined the signature of a function used to manipulate options. +type MetricsOption func(*MetricsOptions) + +// MetricsOptions is the structure holding all options. +type MetricsOptions struct { + GetValue GetMetricValueFunc + LabelMatchers []*labels.Matcher + WaitMissingMetrics bool +} + +// WithMetricCount is an option to get the histogram/summary count as metric value. +func WithMetricCount(opts *MetricsOptions) { + opts.GetValue = getMetricCount +} + +// WithLabelMatchers is an option to filter only matching series. +func WithLabelMatchers(matchers ...*labels.Matcher) MetricsOption { + return func(opts *MetricsOptions) { + opts.LabelMatchers = matchers + } +} + +// WithWaitMissingMetrics is an option to wait whenever an expected metric is missing. If this +// option is not enabled, will return error on missing metrics. +func WaitMissingMetrics(opts *MetricsOptions) { + opts.WaitMissingMetrics = true +} + +func buildMetricsOptions(opts []MetricsOption) MetricsOptions { + result := DefaultMetricsOptions + for _, opt := range opts { + opt(&result) + } + return result +} diff --git a/integration/e2e/service.go b/integration/e2e/service.go index c5184de4c84..7ef3059642a 100644 --- a/integration/e2e/service.go +++ b/integration/e2e/service.go @@ -14,7 +14,6 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" - dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/thanos-io/thanos/pkg/runutil" @@ -23,6 +22,7 @@ import ( var ( dockerPortPattern = regexp.MustCompile(`^.*:(\d+)$`) + errMissingMetric = errors.New("metric not found") ) // ConcreteService represents microservice with optional ports which will be discoverable from docker @@ -522,13 +522,21 @@ func (s *HTTPService) NetworkHTTPEndpointFor(networkName string) string { // WaitSumMetrics waits for at least one instance of each given metric names to be present and their sums, returning true // when passed to given isExpected(...). func (s *HTTPService) WaitSumMetrics(isExpected func(sums ...float64) bool, metricNames ...string) error { + return s.WaitSumMetricsWithOptions(isExpected, metricNames) +} + +func (s *HTTPService) WaitSumMetricsWithOptions(isExpected func(sums ...float64) bool, metricNames []string, opts ...MetricsOption) error { var ( - sums []float64 - err error + sums []float64 + err error + options = buildMetricsOptions(opts) ) for s.retryBackoff.Reset(); s.retryBackoff.Ongoing(); { - sums, err = s.SumMetrics(metricNames...) + sums, err = s.SumMetrics(metricNames, opts...) + if options.WaitMissingMetrics && errors.Is(err, errMissingMetric) { + continue + } if err != nil { return err } @@ -540,11 +548,12 @@ func (s *HTTPService) WaitSumMetrics(isExpected func(sums ...float64) bool, metr s.retryBackoff.Wait() } - return fmt.Errorf("unable to find metrics %s with expected values. Last values: %v", metricNames, sums) + return fmt.Errorf("unable to find metrics %s with expected values. Last error: %v. Last values: %v", metricNames, err, sums) } // SumMetrics returns the sum of the values of each given metric names. -func (s *HTTPService) SumMetrics(metricNames ...string) ([]float64, error) { +func (s *HTTPService) SumMetrics(metricNames []string, opts ...MetricsOption) ([]float64, error) { + options := buildMetricsOptions(opts) sums := make([]float64, len(metricNames)) metrics, err := s.Metrics() @@ -561,89 +570,55 @@ func (s *HTTPService) SumMetrics(metricNames ...string) ([]float64, error) { for i, m := range metricNames { sums[i] = 0.0 - // Check if the metric is exported. - if mf, ok := families[m]; ok { - sums[i] = sumValues(mf) - continue + // Get the metric family. + mf, ok := families[m] + if !ok { + return nil, errors.Wrapf(errMissingMetric, "metric=%s service=%s", m, s.name) } - return nil, errors.Errorf("metric %s not found in %s metric page", m, s.name) + + // Filter metrics. + metrics := filterMetrics(mf.GetMetric(), options) + if len(metrics) == 0 { + return nil, errors.Wrapf(errMissingMetric, "metric=%s service=%s", m, s.name) + } + + sums[i] = sumValues(getValues(metrics, options)) } return sums, nil } -// WaitForMetricWithLabels waits until given metric with matching labels passes `okFn`. If function returns false, -// wait continues. If no such matching metric can be found or wait times out, function returns error. -func (s *HTTPService) WaitForMetricWithLabels(okFn func(v float64) bool, metricName string, expectedLabels map[string]string) error { +// WaitRemovedMetric waits until a metric disappear from the list of metrics exported by the service. +func (s *HTTPService) WaitRemovedMetric(metricName string, opts ...MetricsOption) error { + options := buildMetricsOptions(opts) + for s.retryBackoff.Reset(); s.retryBackoff.Ongoing(); { - ms, err := s.getMetricsMatchingLabels(metricName, expectedLabels) + // Fetch metrics. + metrics, err := s.Metrics() if err != nil { return err } - for _, m := range ms { - if okFn(getValue(m)) { - return nil - } + // Parse metrics. + var tp expfmt.TextParser + families, err := tp.TextToMetricFamilies(strings.NewReader(metrics)) + if err != nil { + return err } - s.retryBackoff.Wait() - } - - return fmt.Errorf("unable to find metric %s with labels %v with expected value", metricName, expectedLabels) -} - -// Returns sum of all metrics matching given labels. -func (s *HTTPService) SumMetricWithLabels(metricName string, expectedLabels map[string]string) (float64, error) { - sum := 0.0 - ms, err := s.getMetricsMatchingLabels(metricName, expectedLabels) - if err != nil { - return 0, err - } - - for _, m := range ms { - sum += getValue(m) - } - return sum, nil -} - -func (s *HTTPService) getMetricsMatchingLabels(metricName string, expectedLabels map[string]string) ([]*dto.Metric, error) { - metrics, err := s.Metrics() - if err != nil { - return nil, err - } - - var tp expfmt.TextParser - families, err := tp.TextToMetricFamilies(strings.NewReader(metrics)) - if err != nil { - return nil, err - } - - mf, ok := families[metricName] - if !ok { - return nil, errors.Errorf("metric %s not found in %s metric page", metricName, s.name) - } - - result := []*dto.Metric(nil) - - for _, m := range mf.GetMetric() { - // check if some metric has all required labels - metricLabels := map[string]string{} - for _, lp := range m.GetLabel() { - metricLabels[lp.GetName()] = lp.GetValue() + // Get the metric family. + mf, ok := families[metricName] + if !ok { + return nil } - matches := true - for k, v := range expectedLabels { - if mv, ok := metricLabels[k]; !ok || mv != v { - matches = false - break - } + // Filter metrics. + if len(filterMetrics(mf.GetMetric(), options)) == 0 { + return nil } - if matches { - result = append(result, m) - } + s.retryBackoff.Wait() } - return result, nil + + return fmt.Errorf("the metric %s is still exported by %s", metricName, s.name) } diff --git a/integration/getting_started_with_gossiped_ring_test.go b/integration/getting_started_with_gossiped_ring_test.go index 6c32a96bc61..8f18becd29c 100644 --- a/integration/getting_started_with_gossiped_ring_test.go +++ b/integration/getting_started_with_gossiped_ring_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -52,8 +53,13 @@ func TestGettingStartedWithGossipedRing(t *testing.T) { require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total")) // We need two "ring members" visible from both Cortex instances - require.NoError(t, cortex1.WaitForMetricWithLabels(e2e.EqualsSingle(2), "cortex_ring_members", map[string]string{"name": "ingester", "state": "ACTIVE"})) - require.NoError(t, cortex2.WaitForMetricWithLabels(e2e.EqualsSingle(2), "cortex_ring_members", map[string]string{"name": "ingester", "state": "ACTIVE"})) + require.NoError(t, cortex1.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + require.NoError(t, cortex2.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) c1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), cortex1.HTTPEndpoint(), "", "", "user-1") require.NoError(t, err) diff --git a/integration/ingester_hand_over_test.go b/integration/ingester_hand_over_test.go index a7a8209ed7d..91dd61c9cfc 100644 --- a/integration/ingester_hand_over_test.go +++ b/integration/ingester_hand_over_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -88,17 +89,30 @@ func runIngesterHandOverTest(t *testing.T, flags map[string]string, setup func(t // Wait a bit to make sure that querier is caught up. Otherwise, we may be querying for data, // while querier still knows about old ingester only. - require.NoError(t, querier.WaitForMetricWithLabels(e2e.EqualsSingle(1), "cortex_ring_members", map[string]string{"name": "ingester", "state": "ACTIVE"})) - require.NoError(t, querier.WaitForMetricWithLabels(e2e.EqualsSingle(1), "cortex_ring_members", map[string]string{"name": "ingester", "state": "PENDING"})) + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "PENDING")))) // Stop ingester-1. This function will return once the ingester-1 is successfully // stopped, which means the transfer to ingester-2 is completed. require.NoError(t, s.Stop(ingester1)) // Make sure querier now sees only new ingester. We check that by verifying that there is only one ACTIVE, but no PENDING or JOINING ingester. - require.NoError(t, querier.WaitForMetricWithLabels(e2e.EqualsSingle(1), "cortex_ring_members", map[string]string{"name": "ingester", "state": "ACTIVE"})) - require.NoError(t, querier.WaitForMetricWithLabels(e2e.EqualsSingle(0), "cortex_ring_members", map[string]string{"name": "ingester", "state": "JOINING"})) - require.NoError(t, querier.WaitForMetricWithLabels(e2e.EqualsSingle(0), "cortex_ring_members", map[string]string{"name": "ingester", "state": "PENDING"})) + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "JOINING")))) + + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "PENDING")))) // Query the series again. result, err = c.Query("series_1", now) diff --git a/integration/querier_test.go b/integration/querier_test.go index 752a253df3b..03830f33ae1 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -338,7 +339,8 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) { require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*cluster.NumInstances())), "cortex_ingester_memory_series_removed_total")) // Wait until the querier has discovered the uploaded blocks (discovered both by the querier and store-gateway). - require.NoError(t, cluster.WaitSumMetricWithLabels(e2e.EqualsSingle(float64(2*cluster.NumInstances()*2)), "cortex_blocks_meta_synced", map[string]string{"component": "querier"})) + require.NoError(t, cluster.WaitSumMetricsWithOptions(e2e.Equals(float64(2*cluster.NumInstances()*2)), []string{"cortex_blocks_meta_synced"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "component", "querier")))) // Wait until the store-gateway has synched the new uploaded blocks. const shippedBlocks = 2 diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 8561075291f..34dc8903524 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -203,6 +203,11 @@ func runQueryFrontendTest(t *testing.T, testMissingMetricName bool, setup queryF } require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser+extra), "cortex_query_frontend_queries_total")) + // The number of received request is greater then the query requests because include + // requests to /metrics and /ready. + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount)) + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount)) + // Ensure no service-specific metrics prefix is used by the wrong service. assertServiceMetricsPrefixes(t, Distributor, distributor) assertServiceMetricsPrefixes(t, Ingester, ingester)