@@ -6,18 +6,23 @@ package integration
66import (
77 "compress/gzip"
88 "fmt"
9+ "strings"
10+
911 "net/http"
12+ "net/url"
13+
1014 "testing"
1115 "time"
1216
17+ "github.com/prometheus/prometheus/prompb"
1318 "github.com/stretchr/testify/require"
1419
1520 "github.com/cortexproject/cortex/integration/e2e"
1621 e2edb "github.com/cortexproject/cortex/integration/e2e/db"
1722 "github.com/cortexproject/cortex/integration/e2ecortex"
1823)
1924
20- func TestQueryResponseCompression (t * testing.T ) {
25+ func TestQuerierResponseCompression (t * testing.T ) {
2126 s , err := e2e .NewScenario (networkName )
2227 require .NoError (t , err )
2328 defer s .Close ()
@@ -43,18 +48,127 @@ func TestQueryResponseCompression(t *testing.T) {
4348 c , err := e2ecortex .NewClient (distributor .HTTPEndpoint (), "" , "" , "" , "user-1" )
4449 require .NoError (t , err )
4550
46- series , _ := generateSeries ("series_1" , now )
47- res , err := c .Push (series )
48- require .NoError (t , err )
49- require .Equal (t , 200 , res .StatusCode )
51+ for i := 0 ; i < 200 ; i ++ {
52+ series , _ := generateSeries (
53+ fmt .Sprintf ("series_%d" , i ),
54+ now ,
55+ prompb.Label {Name : fmt .Sprintf ("label_%d" , i ), Value : strings .Repeat ("val_" , 10 )},
56+ )
57+ res , err := c .Push (series )
58+ require .NoError (t , err )
59+ require .Equal (t , 200 , res .StatusCode )
60+ }
5061
5162 querier := e2ecortex .NewQuerier ("querier" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
5263 require .NoError (t , s .StartAndWaitReady (querier ))
5364
5465 // Wait until the querier has updated the ring.
5566 require .NoError (t , querier .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
5667
57- endpoint := fmt .Sprintf ("http://%s/api/prom/api/v1/query?query=series_1" , querier .HTTPEndpoint ())
68+ query := `{__name__=~"series_.*"}`
69+ u := & url.URL {
70+ Scheme : "http" ,
71+ Path : fmt .Sprintf ("%s/api/prom/api/v1/query" , querier .HTTPEndpoint ()),
72+ }
73+ q := u .Query ()
74+ q .Set ("query" , query )
75+ q .Set ("time" , e2ecortex .FormatTime (now ))
76+ u .RawQuery = q .Encode ()
77+ endpoint := u .String ()
78+
79+ t .Run ("Compressed" , func (t * testing.T ) {
80+ req , err := http .NewRequest ("GET" , endpoint , nil )
81+ require .NoError (t , err )
82+ req .Header .Set ("X-Scope-OrgID" , "user-1" )
83+ req .Header .Set ("Accept-Encoding" , "gzip" )
84+
85+ resp , err := http .DefaultClient .Do (req )
86+ require .NoError (t , err )
87+
88+ defer resp .Body .Close ()
89+
90+ require .Equal (t , http .StatusOK , resp .StatusCode )
91+ require .Equal (t , "gzip" , resp .Header .Get ("Content-Encoding" ))
92+
93+ gzipReader , err := gzip .NewReader (resp .Body )
94+ require .NoError (t , err )
95+ defer gzipReader .Close ()
96+ })
97+
98+ t .Run ("Uncompressed" , func (t * testing.T ) {
99+ req , err := http .NewRequest ("GET" , endpoint , nil )
100+ require .NoError (t , err )
101+ req .Header .Set ("X-Scope-OrgID" , "user-1" )
102+
103+ resp , err := http .DefaultClient .Do (req )
104+ require .NoError (t , err )
105+ defer resp .Body .Close ()
106+
107+ require .Equal (t , http .StatusOK , resp .StatusCode )
108+ require .Empty (t , resp .Header .Get ("Content-Encoding" ))
109+ })
110+ }
111+
112+ func TestQueryFrontendResponseCompression (t * testing.T ) {
113+ s , err := e2e .NewScenario (networkName )
114+ require .NoError (t , err )
115+ defer s .Close ()
116+
117+ // Start dependencies.
118+ consul := e2edb .NewConsul ()
119+ minio := e2edb .NewMinio (9000 , bucketName )
120+ require .NoError (t , s .StartAndWaitReady (consul , minio ))
121+
122+ flags := mergeFlags (BlocksStorageFlags (), map [string ]string {
123+ "-api.response-compression-enabled" : "true" ,
124+ })
125+
126+ // Start the query-frontend.
127+ queryFrontend := e2ecortex .NewQueryFrontend ("query-frontend" , flags , "" )
128+ require .NoError (t , s .Start (queryFrontend ))
129+
130+ distributor := e2ecortex .NewDistributor ("distributor" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
131+ ingester := e2ecortex .NewIngester ("ingester" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
132+ require .NoError (t , s .StartAndWaitReady (distributor , ingester ))
133+
134+ // Wait until both the distributor updated the ring.
135+ require .NoError (t , distributor .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
136+
137+ querier := e2ecortex .NewQuerier ("querierWithFrontend" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), mergeFlags (flags , map [string ]string {
138+ "-querier.frontend-address" : queryFrontend .NetworkGRPCEndpoint (),
139+ }), "" )
140+
141+ require .NoError (t , s .StartAndWaitReady (querier ))
142+ require .NoError (t , s .WaitReady (queryFrontend ))
143+
144+ now := time .Now ()
145+
146+ c , err := e2ecortex .NewClient (distributor .HTTPEndpoint (), queryFrontend .HTTPEndpoint (), "" , "" , "user-1" )
147+ require .NoError (t , err )
148+
149+ for i := 0 ; i < 200 ; i ++ {
150+ series , _ := generateSeries (
151+ fmt .Sprintf ("series_%d" , i ),
152+ now ,
153+ prompb.Label {Name : fmt .Sprintf ("label_%d" , i ), Value : strings .Repeat ("val_" , 10 )},
154+ )
155+ res , err := c .Push (series )
156+ require .NoError (t , err )
157+ require .Equal (t , 200 , res .StatusCode )
158+ }
159+
160+ require .NoError (t , querier .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
161+
162+ query := `{__name__=~"series_.*"}`
163+ u := & url.URL {
164+ Scheme : "http" ,
165+ Path : fmt .Sprintf ("%s/api/prom/api/v1/query" , queryFrontend .HTTPEndpoint ()),
166+ }
167+ q := u .Query ()
168+ q .Set ("query" , query )
169+ q .Set ("time" , e2ecortex .FormatTime (now ))
170+ u .RawQuery = q .Encode ()
171+ endpoint := u .String ()
58172
59173 t .Run ("Compressed" , func (t * testing.T ) {
60174 req , err := http .NewRequest ("GET" , endpoint , nil )
0 commit comments