Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
* [ENHANCEMENT] Experimental TSDB: Applied a jitter to the period bucket scans in order to better distribute bucket operations over the time and increase the probability of hitting the shared cache (if configured). #2693
* [ENHANCEMENT] Experimental TSDB: Series limit per user and per metric now work in TSDB blocks. #2676
* [ENHANCEMENT] Experimental Memberlist: Added ability to periodically rejoin the memberlist cluster. #2724
* [ENHANCEMENT] A new endpoint `/query-frontend/ready` was added to the query frontend to indicate when it was ready to accept queries. This is intended for use by a read path load balancer that would want to wait for the frontend to have attached queriers before including it in the backend. #2733
* [BUGFIX] Ruler: Ensure temporary rule files with special characters are properly mapped and cleaned up. #2506
* [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372
* [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400
Expand Down
12 changes: 11 additions & 1 deletion docs/apis.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,4 +376,14 @@ Note that setting a new config will effectively "re-enable" the Rules and Alertm

#### Testing APIs

`POST /push` - Push samples directly to ingesters. Accepts requests in Prometheus remote write format. Indended for performance testing and debugging.
`POST /push` - Push samples directly to ingesters. Accepts requests in Prometheus remote write format. Indended for performance testing and debugging.

## Query Frontend

### Readiness

This special readiness handler is designed to indicate when the frontend component is ready to receive queries. This differs depending on configuration but is usually the time it takes for a querier to discover the frontend and attach to it.

```
GET /query-frontend/ready
```
29 changes: 29 additions & 0 deletions docs/operations/scalable-query-frontend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
title: "Scaling the Query Frontend"
linkTitle: "Scaling the Query Frontend"
weight: 5
slug: scaling-query-frontend
---

Historically scaling the Cortex query frontend has [posed some challenges](https://cortexmetrics.io/docs/proposals/scalable-query-frontend/). This document aims to detail how to use some of the added configuration parameters to correctly scale the frontend. Note that these instructions apply in both the HA single binary scenario or microservices mode.

## DNS Configuration / Readiness

When a new frontend is first created on scale up it will not immediately have queriers attached to it. A new endpoint `/query-frontend/ready` was added to only return http 200 when the query frontend was ready to serve queries. Make sure to configure this endpoint as a healthcheck in your load balancer. Otherwise a query frontend scale up event might result in failed queries for a bit while queriers attach.

## Querier Max Concurrency

Make sure to configure the querier frontend worker to match max concurrency. This will allow the operator to freely scale the frontend up and down without impacting the amount of work an individual querier is attempting to perform. More details [here](https://cortexmetrics.io/docs/proposals/scalable-query-frontend/#dynamic-querier-concurrency).

### Example Configuration

**CLI**
```
-querier.worker-match-max-concurrent
```

**Config File**
```yaml
frontend_worker:
match_max_concurrent: true
```
7 changes: 4 additions & 3 deletions docs/proposals/scalable-query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ Having a singleton queue is attractive because it is simple to reason about and

## Conclusion

In this document we reviewed the [reasons the frontend exists](#query frontend-role), [challenges and proposals to scaling the frontend](#challenges-and-proposals) and [an alternative architecture that avoids most problems but comes with its own challenges.](#alternative)
In this document we reviewed the [reasons the frontend exists](#query-frontend-role), [challenges and proposals to scaling the frontend](#challenges-and-proposals) and [an alternative architecture that avoids most problems but comes with its own challenges.](#alternative)

<table>
<tr>
Expand Down Expand Up @@ -246,22 +246,23 @@ In this document we reviewed the [reasons the frontend exists](#query frontend-r
<td>Operational/Configuration Issue. No Changes Proposed.
</td>
<td>
N/A
</td>
</tr>
<tr>
<td>Querier Discovery Lag
</td>
<td>Query Frontend HTTP Health Checks
</td>
<td>Proposed
<td><a href="https://github.com/cortexproject/cortex/pull/2733">Pull Request</a>
</td>
</tr>
<tr>
<td>Dilutes Tenant Fairness
</td>
<td>Round Robin with additional alternatives proposed
</td>
<td><a href="https://github.com/cortexproject/cortex/issues/2431">Issue</a>
<td><a href="https://github.com/cortexproject/cortex/pull/2553">Pull Request</a>
</td>
</tr>
</table>
11 changes: 11 additions & 0 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package main

import (
"fmt"
"net/http"
"os/exec"
"sync"
"testing"
Expand Down Expand Up @@ -137,6 +138,11 @@ func runQueryFrontendTest(t *testing.T, testMissingMetricName bool, setup queryF
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "cortex_memcache_client_servers"))
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Greater(0), "cortex_dns_lookups_total"))

// Confirm that the query-frontend is not ready b/c the querier is no connected
resp, err := http.Get("http://" + queryFrontend.HTTPEndpoint() + "/query-frontend/ready")
require.NoError(t, err)
require.Equal(t, 503, resp.StatusCode)

// Start the querier after the query-frontend otherwise we're not
// able to get the query-frontend network endpoint.
querier := e2ecortex.NewQuerierWithConfigFile("querier", consul.NetworkHTTPEndpoint(), configFile, mergeFlags(flags, map[string]string{
Expand Down Expand Up @@ -207,4 +213,9 @@ func runQueryFrontendTest(t *testing.T, testMissingMetricName bool, setup queryF
assertServiceMetricsPrefixes(t, Ingester, ingester)
assertServiceMetricsPrefixes(t, Querier, querier)
assertServiceMetricsPrefixes(t, QueryFrontend, queryFrontend)

// Confirm that the query-frontend is ready b/c the querier is connected
resp, err = http.Get("http://" + queryFrontend.HTTPEndpoint() + "/query-frontend/ready")
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode)
}
3 changes: 3 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ func (a *API) registerQueryAPI(handler http.Handler) {
func (a *API) RegisterQueryFrontend(f *frontend.Frontend) {
frontend.RegisterFrontendServer(a.server.GRPC, f)
a.registerQueryAPI(f.Handler())

// Readiness
a.RegisterRoute("/query-frontend/ready", http.HandlerFunc(f.ReadinessHandler), false, "GET")
}

// RegisterServiceMapHandler registers the Cortex structs service handler
Expand Down
37 changes: 37 additions & 0 deletions pkg/querier/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/NYTimes/gziphandler"
Expand Down Expand Up @@ -65,6 +66,8 @@ type Frontend struct {
cond *sync.Cond
queues *queueIterator

connectedClients int32

// Metrics.
queueDuration prometheus.Histogram
queueLength prometheus.Gauge
Expand Down Expand Up @@ -284,6 +287,9 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *ProcessRequest) (*Pro

// Process allows backends to pull requests from the frontend.
func (f *Frontend) Process(server Frontend_ProcessServer) error {
atomic.AddInt32(&f.connectedClients, 1)
defer atomic.AddInt32(&f.connectedClients, -1)

// If the downstream request(from querier -> frontend) is cancelled,
// we need to ping the condition variable to unblock getNextRequest.
// Ideally we'd have ctx aware condition variables...
Expand Down Expand Up @@ -426,3 +432,34 @@ FindQueue:
// and wait for more requests.
goto FindQueue
}

// ReadinessHandler is a HandlerFunc that is designed to indicate if this frontend is ready to
// receive requests. It will return a positive status code (200) if there are any attached queriers.
func (f *Frontend) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
status := http.StatusServiceUnavailable
ready, msg := f.readyForRequests()

if ready {
status = http.StatusOK
}

http.Error(w, msg, status)
}

func (f *Frontend) readyForRequests() (bool, string) {
// if the downstream url is configured the query frontend is not aware of the state
// of the queriers and is therefore always ready
if f.cfg.DownstreamURL != "" {
return true, "ready: downstream url set"
}

// if we have more than one querier connected we will consider ourselves ready
connectedClients := atomic.LoadInt32(&f.connectedClients)
if connectedClients > 0 {
return true, fmt.Sprintf("ready: connected clients %d", connectedClients)
}

msg := fmt.Sprintf("not ready: connected clients %d", connectedClients)
level.Warn(f.log).Log("msg", msg)
return false, msg
}
93 changes: 85 additions & 8 deletions pkg/querier/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/gorilla/mux"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing-contrib/go-stdlib/nethttp"
opentracing "github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -60,6 +61,42 @@ func TestFrontend(t *testing.T) {
testFrontend(t, handler, test, true)
}

func TestFrontendReady(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte("Hello World"))
require.NoError(t, err)
})
test := func(addr string) {
time.Sleep(100 * time.Millisecond)

req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/query-frontend/ready", addr), nil)
require.NoError(t, err)

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode)
}
testFrontend(t, handler, test, false)
testFrontend(t, handler, test, true)
}

func TestFrontendNotReady(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte("Hello World"))
require.NoError(t, err)
})
test := func(addr string) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/query-frontend/ready", addr), nil)
require.NoError(t, err)

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, 503, resp.StatusCode)
}
testFrontendRunWorker(t, handler, test, false, false)
testFrontendRunWorker(t, handler, test, true, false)
}

func TestFrontendPropagateTrace(t *testing.T) {
closer, err := config.Configuration{}.InitGlobalTracer("test")
require.NoError(t, err)
Expand Down Expand Up @@ -161,7 +198,38 @@ func TestFrontendCancelStatusCode(t *testing.T) {
}
}

func TestFrontendReadyForRequests(t *testing.T) {
for _, tt := range []struct {
name string
downstreamURL string
connectedClients int32
msg string
readyForRequests bool
}{
{"downstream url is always ready", "super url", 0, "ready: downstream url set", true},
{"connected clients are ready", "", 3, "ready: connected clients 3", true},
{"no url, no clients is not ready", "", 0, "not ready: connected clients 0", false},
} {
t.Run(tt.name, func(t *testing.T) {
f := &Frontend{
connectedClients: tt.connectedClients,
log: log.NewNopLogger(),
cfg: Config{
DownstreamURL: tt.downstreamURL,
},
}
ready, msg := f.readyForRequests()
require.Equal(t, tt.readyForRequests, ready)
require.Equal(t, tt.msg, msg)
})
}
}

func testFrontend(t *testing.T, handler http.Handler, test func(addr string), matchMaxConcurrency bool) {
testFrontendRunWorker(t, handler, test, matchMaxConcurrency, true)
}

func testFrontendRunWorker(t *testing.T, handler http.Handler, test func(addr string), matchMaxConcurrency bool, runWorker bool) {
logger := log.NewNopLogger()

var (
Expand Down Expand Up @@ -193,22 +261,31 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string), ma

RegisterFrontendServer(grpcServer, frontend)

r := mux.NewRouter()
r.HandleFunc("/query-frontend/ready", frontend.ReadinessHandler)
r.PathPrefix("/").Handler(middleware.Merge(
middleware.AuthenticateUser,
middleware.Tracer{},
).Wrap(frontend.Handler()))

httpServer := http.Server{
Handler: middleware.Merge(
middleware.AuthenticateUser,
middleware.Tracer{},
).Wrap(frontend.Handler()),
Handler: r,
}
defer httpServer.Shutdown(context.Background()) //nolint:errcheck

go httpServer.Serve(httpListen) //nolint:errcheck
go grpcServer.Serve(grpcListen) //nolint:errcheck

worker, err := NewWorker(workerConfig, querierConfig, httpgrpc_server.NewServer(handler), logger)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker))
var worker services.Service
if runWorker {
worker, err = NewWorker(workerConfig, querierConfig, httpgrpc_server.NewServer(handler), logger)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker))
}

test(httpListen.Addr().String())

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), worker))
if runWorker {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), worker))
}
}