@@ -6,18 +6,23 @@ package integration
66import  (
77	"compress/gzip" 
88	"fmt" 
9+ 	"strings" 
10+ 
911	"net/http" 
12+ 	"net/url" 
13+ 
1014	"testing" 
1115	"time" 
1216
17+ 	"github.com/prometheus/prometheus/prompb" 
1318	"github.com/stretchr/testify/require" 
1419
1520	"github.com/cortexproject/cortex/integration/e2e" 
1621	e2edb "github.com/cortexproject/cortex/integration/e2e/db" 
1722	"github.com/cortexproject/cortex/integration/e2ecortex" 
1823)
1924
20- func  TestQueryResponseCompression (t  * testing.T ) {
25+ func  TestQuerierResponseCompression (t  * testing.T ) {
2126	s , err  :=  e2e .NewScenario (networkName )
2227	require .NoError (t , err )
2328	defer  s .Close ()
@@ -43,18 +48,127 @@ func TestQueryResponseCompression(t *testing.T) {
4348	c , err  :=  e2ecortex .NewClient (distributor .HTTPEndpoint (), "" , "" , "" , "user-1" )
4449	require .NoError (t , err )
4550
46- 	series , _  :=  generateSeries ("series_1" , now )
47- 	res , err  :=  c .Push (series )
48- 	require .NoError (t , err )
49- 	require .Equal (t , 200 , res .StatusCode )
51+ 	for  i  :=  0 ; i  <  200 ; i ++  {
52+ 		series , _  :=  generateSeries (
53+ 			fmt .Sprintf ("series_%d" , i ),
54+ 			now ,
55+ 			prompb.Label {Name : fmt .Sprintf ("label_%d" , i ), Value : strings .Repeat ("val_" , 10 )},
56+ 		)
57+ 		res , err  :=  c .Push (series )
58+ 		require .NoError (t , err )
59+ 		require .Equal (t , 200 , res .StatusCode )
60+ 	}
5061
5162	querier  :=  e2ecortex .NewQuerier ("querier" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
5263	require .NoError (t , s .StartAndWaitReady (querier ))
5364
5465	// Wait until the querier has updated the ring. 
5566	require .NoError (t , querier .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
5667
57- 	endpoint  :=  fmt .Sprintf ("http://%s/api/prom/api/v1/query?query=series_1" , querier .HTTPEndpoint ())
68+ 	query  :=  `{__name__=~"series_.*"}` 
69+ 	u  :=  & url.URL {
70+ 		Scheme : "http" ,
71+ 		Path :   fmt .Sprintf ("%s/api/prom/api/v1/query" , querier .HTTPEndpoint ()),
72+ 	}
73+ 	q  :=  u .Query ()
74+ 	q .Set ("query" , query )
75+ 	q .Set ("time" , e2ecortex .FormatTime (now ))
76+ 	u .RawQuery  =  q .Encode ()
77+ 	endpoint  :=  u .String ()
78+ 
79+ 	t .Run ("Compressed" , func (t  * testing.T ) {
80+ 		req , err  :=  http .NewRequest ("GET" , endpoint , nil )
81+ 		require .NoError (t , err )
82+ 		req .Header .Set ("X-Scope-OrgID" , "user-1" )
83+ 		req .Header .Set ("Accept-Encoding" , "gzip" )
84+ 
85+ 		resp , err  :=  http .DefaultClient .Do (req )
86+ 		require .NoError (t , err )
87+ 
88+ 		defer  resp .Body .Close ()
89+ 
90+ 		require .Equal (t , http .StatusOK , resp .StatusCode )
91+ 		require .Equal (t , "gzip" , resp .Header .Get ("Content-Encoding" ))
92+ 
93+ 		gzipReader , err  :=  gzip .NewReader (resp .Body )
94+ 		require .NoError (t , err )
95+ 		defer  gzipReader .Close ()
96+ 	})
97+ 
98+ 	t .Run ("Uncompressed" , func (t  * testing.T ) {
99+ 		req , err  :=  http .NewRequest ("GET" , endpoint , nil )
100+ 		require .NoError (t , err )
101+ 		req .Header .Set ("X-Scope-OrgID" , "user-1" )
102+ 
103+ 		resp , err  :=  http .DefaultClient .Do (req )
104+ 		require .NoError (t , err )
105+ 		defer  resp .Body .Close ()
106+ 
107+ 		require .Equal (t , http .StatusOK , resp .StatusCode )
108+ 		require .Empty (t , resp .Header .Get ("Content-Encoding" ))
109+ 	})
110+ }
111+ 
112+ func  TestQueryFrontendResponseCompression (t  * testing.T ) {
113+ 	s , err  :=  e2e .NewScenario (networkName )
114+ 	require .NoError (t , err )
115+ 	defer  s .Close ()
116+ 
117+ 	// Start dependencies. 
118+ 	consul  :=  e2edb .NewConsul ()
119+ 	minio  :=  e2edb .NewMinio (9000 , bucketName )
120+ 	require .NoError (t , s .StartAndWaitReady (consul , minio ))
121+ 
122+ 	flags  :=  mergeFlags (BlocksStorageFlags (), map [string ]string {
123+ 		"-api.response-compression-enabled" : "true" ,
124+ 	})
125+ 
126+ 	// Start the query-frontend. 
127+ 	queryFrontend  :=  e2ecortex .NewQueryFrontend ("query-frontend" , flags , "" )
128+ 	require .NoError (t , s .Start (queryFrontend ))
129+ 
130+ 	distributor  :=  e2ecortex .NewDistributor ("distributor" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
131+ 	ingester  :=  e2ecortex .NewIngester ("ingester" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
132+ 	require .NoError (t , s .StartAndWaitReady (distributor , ingester ))
133+ 
134+ 	// Wait until both the distributor updated the ring. 
135+ 	require .NoError (t , distributor .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
136+ 
137+ 	querier  :=  e2ecortex .NewQuerier ("querierWithFrontend" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), mergeFlags (flags , map [string ]string {
138+ 		"-querier.frontend-address" : queryFrontend .NetworkGRPCEndpoint (),
139+ 	}), "" )
140+ 
141+ 	require .NoError (t , s .StartAndWaitReady (querier ))
142+ 	require .NoError (t , s .WaitReady (queryFrontend ))
143+ 
144+ 	now  :=  time .Now ()
145+ 
146+ 	c , err  :=  e2ecortex .NewClient (distributor .HTTPEndpoint (), queryFrontend .HTTPEndpoint (), "" , "" , "user-1" )
147+ 	require .NoError (t , err )
148+ 
149+ 	for  i  :=  0 ; i  <  200 ; i ++  {
150+ 		series , _  :=  generateSeries (
151+ 			fmt .Sprintf ("series_%d" , i ),
152+ 			now ,
153+ 			prompb.Label {Name : fmt .Sprintf ("label_%d" , i ), Value : strings .Repeat ("val_" , 10 )},
154+ 		)
155+ 		res , err  :=  c .Push (series )
156+ 		require .NoError (t , err )
157+ 		require .Equal (t , 200 , res .StatusCode )
158+ 	}
159+ 
160+ 	require .NoError (t , querier .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
161+ 
162+ 	query  :=  `{__name__=~"series_.*"}` 
163+ 	u  :=  & url.URL {
164+ 		Scheme : "http" ,
165+ 		Path :   fmt .Sprintf ("%s/api/prom/api/v1/query" , queryFrontend .HTTPEndpoint ()),
166+ 	}
167+ 	q  :=  u .Query ()
168+ 	q .Set ("query" , query )
169+ 	q .Set ("time" , e2ecortex .FormatTime (now ))
170+ 	u .RawQuery  =  q .Encode ()
171+ 	endpoint  :=  u .String ()
58172
59173	t .Run ("Compressed" , func (t  * testing.T ) {
60174		req , err  :=  http .NewRequest ("GET" , endpoint , nil )
0 commit comments