|  | 
|  | 1 | +//go:build requires_docker | 
|  | 2 | +// +build requires_docker | 
|  | 3 | + | 
|  | 4 | +package integration | 
|  | 5 | + | 
|  | 6 | +import ( | 
|  | 7 | +	"fmt" | 
|  | 8 | +	"strings" | 
|  | 9 | +	"testing" | 
|  | 10 | + | 
|  | 11 | +	"github.com/prometheus/prometheus/model/labels" | 
|  | 12 | +	"github.com/prometheus/prometheus/prompb" | 
|  | 13 | +	"github.com/stretchr/testify/require" | 
|  | 14 | + | 
|  | 15 | +	"github.com/cortexproject/cortex/integration/e2e" | 
|  | 16 | +	e2edb "github.com/cortexproject/cortex/integration/e2e/db" | 
|  | 17 | +	"github.com/cortexproject/cortex/integration/e2ecortex" | 
|  | 18 | +) | 
|  | 19 | + | 
|  | 20 | +func TestIngesterMetadata(t *testing.T) { | 
|  | 21 | +	s, err := e2e.NewScenario(networkName) | 
|  | 22 | +	require.NoError(t, err) | 
|  | 23 | +	defer s.Close() | 
|  | 24 | + | 
|  | 25 | +	// Start dependencies. | 
|  | 26 | +	consul := e2edb.NewConsul() | 
|  | 27 | +	require.NoError(t, s.StartAndWaitReady(consul)) | 
|  | 28 | + | 
|  | 29 | +	baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) | 
|  | 30 | + | 
|  | 31 | +	minio := e2edb.NewMinio(9000, baseFlags["-blocks-storage.s3.bucket-name"]) | 
|  | 32 | +	require.NoError(t, s.StartAndWaitReady(minio)) | 
|  | 33 | + | 
|  | 34 | +	flags := mergeFlags(baseFlags, map[string]string{ | 
|  | 35 | +		// alert manager | 
|  | 36 | +		"-alertmanager.web.external-url": "http://localhost/alertmanager", | 
|  | 37 | +		// consul | 
|  | 38 | +		"-ring.store":      "consul", | 
|  | 39 | +		"-consul.hostname": consul.NetworkHTTPEndpoint(), | 
|  | 40 | +	}) | 
|  | 41 | + | 
|  | 42 | +	// Start Cortex components | 
|  | 43 | +	distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") | 
|  | 44 | +	ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") | 
|  | 45 | +	querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") | 
|  | 46 | +	require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier)) | 
|  | 47 | + | 
|  | 48 | +	// Wait until distributor has updated the ring. | 
|  | 49 | +	require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( | 
|  | 50 | +		labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), | 
|  | 51 | +		labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) | 
|  | 52 | + | 
|  | 53 | +	// Wait until querier has updated the ring. | 
|  | 54 | +	require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( | 
|  | 55 | +		labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), | 
|  | 56 | +		labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) | 
|  | 57 | + | 
|  | 58 | +	client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID) | 
|  | 59 | +	require.NoError(t, err) | 
|  | 60 | + | 
|  | 61 | +	metadataMetricNum := 5 | 
|  | 62 | +	metadataPerMetrics := 2 | 
|  | 63 | +	metadata := make([]prompb.MetricMetadata, 0, metadataMetricNum) | 
|  | 64 | +	for i := 0; i < metadataMetricNum; i++ { | 
|  | 65 | +		for j := 0; j < metadataPerMetrics; j++ { | 
|  | 66 | +			metadata = append(metadata, prompb.MetricMetadata{ | 
|  | 67 | +				MetricFamilyName: fmt.Sprintf("metadata_name_%d", i), | 
|  | 68 | +				Help:             fmt.Sprintf("metadata_help_%d_%d", i, j), | 
|  | 69 | +				Unit:             fmt.Sprintf("metadata_unit_%d_%d", i, j), | 
|  | 70 | +			}) | 
|  | 71 | +		} | 
|  | 72 | +	} | 
|  | 73 | +	res, err := client.Push(nil, metadata...) | 
|  | 74 | +	require.NoError(t, err) | 
|  | 75 | +	require.Equal(t, 200, res.StatusCode) | 
|  | 76 | + | 
|  | 77 | +	testMetadataQueryParams(t, client, metadataMetricNum, metadataPerMetrics) | 
|  | 78 | +} | 
|  | 79 | + | 
|  | 80 | +func TestIngesterMetadataWithTenantFederation(t *testing.T) { | 
|  | 81 | +	s, err := e2e.NewScenario(networkName) | 
|  | 82 | +	require.NoError(t, err) | 
|  | 83 | +	defer s.Close() | 
|  | 84 | + | 
|  | 85 | +	// Start dependencies. | 
|  | 86 | +	consul := e2edb.NewConsul() | 
|  | 87 | +	require.NoError(t, s.StartAndWaitReady(consul)) | 
|  | 88 | + | 
|  | 89 | +	baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) | 
|  | 90 | + | 
|  | 91 | +	minio := e2edb.NewMinio(9000, baseFlags["-blocks-storage.s3.bucket-name"]) | 
|  | 92 | +	require.NoError(t, s.StartAndWaitReady(minio)) | 
|  | 93 | + | 
|  | 94 | +	flags := mergeFlags(baseFlags, map[string]string{ | 
|  | 95 | +		// tenant federation | 
|  | 96 | +		"-tenant-federation.enabled": "true", | 
|  | 97 | +		// alert manager | 
|  | 98 | +		"-alertmanager.web.external-url": "http://localhost/alertmanager", | 
|  | 99 | +		// consul | 
|  | 100 | +		"-ring.store":      "consul", | 
|  | 101 | +		"-consul.hostname": consul.NetworkHTTPEndpoint(), | 
|  | 102 | +	}) | 
|  | 103 | + | 
|  | 104 | +	// Start Cortex components | 
|  | 105 | +	distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") | 
|  | 106 | +	ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") | 
|  | 107 | +	querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") | 
|  | 108 | +	require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier)) | 
|  | 109 | + | 
|  | 110 | +	// Wait until distributor has updated the ring. | 
|  | 111 | +	require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( | 
|  | 112 | +		labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), | 
|  | 113 | +		labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) | 
|  | 114 | + | 
|  | 115 | +	// Wait until querier has updated the ring. | 
|  | 116 | +	require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( | 
|  | 117 | +		labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), | 
|  | 118 | +		labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) | 
|  | 119 | + | 
|  | 120 | +	metadataMetricNum := 5 | 
|  | 121 | +	metadataPerMetrics := 2 | 
|  | 122 | +	metadata := make([]prompb.MetricMetadata, 0, metadataMetricNum) | 
|  | 123 | +	for i := 0; i < metadataMetricNum; i++ { | 
|  | 124 | +		for j := 0; j < metadataPerMetrics; j++ { | 
|  | 125 | +			metadata = append(metadata, prompb.MetricMetadata{ | 
|  | 126 | +				MetricFamilyName: fmt.Sprintf("metadata_name_%d", i), | 
|  | 127 | +				Help:             fmt.Sprintf("metadata_help_%d_%d", i, j), | 
|  | 128 | +				Unit:             fmt.Sprintf("metadata_unit_%d_%d", i, j), | 
|  | 129 | +			}) | 
|  | 130 | +		} | 
|  | 131 | +	} | 
|  | 132 | + | 
|  | 133 | +	numUsers := 2 | 
|  | 134 | +	tenantIDs := make([]string, numUsers) | 
|  | 135 | +	for u := 0; u < numUsers; u++ { | 
|  | 136 | +		tenantIDs[u] = fmt.Sprintf("user-%d", u) | 
|  | 137 | +		c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", tenantIDs[u]) | 
|  | 138 | +		require.NoError(t, err) | 
|  | 139 | + | 
|  | 140 | +		res, err := c.Push(nil, metadata...) | 
|  | 141 | +		require.NoError(t, err) | 
|  | 142 | +		require.Equal(t, 200, res.StatusCode) | 
|  | 143 | +	} | 
|  | 144 | + | 
|  | 145 | +	client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", strings.Join(tenantIDs, "|")) | 
|  | 146 | +	require.NoError(t, err) | 
|  | 147 | + | 
|  | 148 | +	testMetadataQueryParams(t, client, metadataMetricNum, metadataPerMetrics) | 
|  | 149 | +} | 
|  | 150 | + | 
|  | 151 | +func testMetadataQueryParams(t *testing.T, client *e2ecortex.Client, metadataMetricNum, metadataPerMetrics int) { | 
|  | 152 | +	t.Run("test no parameter", func(t *testing.T) { | 
|  | 153 | +		result, err := client.Metadata("", "") | 
|  | 154 | +		require.NoError(t, err) | 
|  | 155 | +		require.Equal(t, metadataMetricNum, len(result)) | 
|  | 156 | + | 
|  | 157 | +		for _, v := range result { | 
|  | 158 | +			require.Equal(t, metadataPerMetrics, len(v)) | 
|  | 159 | +		} | 
|  | 160 | +	}) | 
|  | 161 | + | 
|  | 162 | +	t.Run("test name parameter", func(t *testing.T) { | 
|  | 163 | +		t.Run("existing name", func(t *testing.T) { | 
|  | 164 | +			name := "metadata_name_0" | 
|  | 165 | +			result, err := client.Metadata(name, "") | 
|  | 166 | +			require.NoError(t, err) | 
|  | 167 | +			m, ok := result[name] | 
|  | 168 | +			require.True(t, ok) | 
|  | 169 | +			require.Equal(t, metadataPerMetrics, len(m)) | 
|  | 170 | +		}) | 
|  | 171 | +		t.Run("existing name with limit 0", func(t *testing.T) { | 
|  | 172 | +			name := "metadata_name_0" | 
|  | 173 | +			result, err := client.Metadata(name, "0") | 
|  | 174 | +			require.NoError(t, err) | 
|  | 175 | +			require.Equal(t, 0, len(result)) | 
|  | 176 | +		}) | 
|  | 177 | +		t.Run("non-existing name", func(t *testing.T) { | 
|  | 178 | +			result, err := client.Metadata("dummy", "") | 
|  | 179 | +			require.NoError(t, err) | 
|  | 180 | +			require.Equal(t, 0, len(result)) | 
|  | 181 | +		}) | 
|  | 182 | +	}) | 
|  | 183 | + | 
|  | 184 | +	t.Run("test limit parameter", func(t *testing.T) { | 
|  | 185 | +		t.Run("less than length of metadata", func(t *testing.T) { | 
|  | 186 | +			result, err := client.Metadata("", "3") | 
|  | 187 | +			require.NoError(t, err) | 
|  | 188 | +			require.Equal(t, 3, len(result)) | 
|  | 189 | +		}) | 
|  | 190 | +		t.Run("limit: 0", func(t *testing.T) { | 
|  | 191 | +			result, err := client.Metadata("", "0") | 
|  | 192 | +			require.NoError(t, err) | 
|  | 193 | +			require.Equal(t, 0, len(result)) | 
|  | 194 | +		}) | 
|  | 195 | +		t.Run("invalid limit", func(t *testing.T) { | 
|  | 196 | +			_, err := client.Metadata("", "dummy") | 
|  | 197 | +			require.Error(t, err) | 
|  | 198 | +		}) | 
|  | 199 | +	}) | 
|  | 200 | +} | 
0 commit comments