Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* `cortex_prometheus_notifications_queue_length`
* `cortex_prometheus_notifications_queue_capacity`
* `cortex_prometheus_notifications_alertmanagers_discovered`
* [ENHANCEMENT] The behavior of the `/ready` was changed for the query frontend to indicate when it was ready to accept queries. This is intended for use by a read path load balancer that would want to wait for the frontend to have attached queriers before including it in the backend. #2733
* [ENHANCEMENT] Experimental Delete Series: Add support for deletion of chunks for remaining stores. #2801
* [ENHANCEMENT] Add `-modules` command line flag to list possible values for `-target`. Also, log warning if given target is internal component. #2752
* [ENHANCEMENT] Added `-ingester.flush-on-shutdown-with-wal-enabled` option to enable chunks flushing even when WAL is enabled. #2780
Expand Down
29 changes: 29 additions & 0 deletions docs/operations/scalable-query-frontend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
title: "Scaling the Query Frontend"
linkTitle: "Scaling the Query Frontend"
weight: 5
slug: scaling-query-frontend
---

Historically scaling the Cortex query frontend has [posed some challenges](https://cortexmetrics.io/docs/proposals/scalable-query-frontend/). This document aims to detail how to use some of the added configuration parameters to correctly scale the frontend. Note that these instructions apply in both the HA single binary scenario or microservices mode.

## DNS Configuration / Readiness

When a new frontend is first created on scale up it will not immediately have queriers attached to it. The existing endpoint `/ready` was updated to only return http 200 when the query frontend was ready to serve queries. Make sure to configure this endpoint as a healthcheck in your load balancer. Otherwise a query frontend scale up event might result in failed queries or high latency for a bit while queriers attach.

## Querier Max Concurrency

Make sure to configure the querier frontend worker to match max concurrency. This will allow the operator to freely scale the frontend up and down without impacting the amount of work an individual querier is attempting to perform. More details [here](https://cortexmetrics.io/docs/proposals/scalable-query-frontend/#dynamic-querier-concurrency).

### Example Configuration

**CLI**
```
-querier.worker-match-max-concurrent=true
```

**Config File**
```yaml
frontend_worker:
match_max_concurrent: true
```
7 changes: 4 additions & 3 deletions docs/proposals/scalable-query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ Having a singleton queue is attractive because it is simple to reason about and

## Conclusion

In this document we reviewed the [reasons the frontend exists](#query frontend-role), [challenges and proposals to scaling the frontend](#challenges-and-proposals) and [an alternative architecture that avoids most problems but comes with its own challenges.](#alternative)
In this document we reviewed the [reasons the frontend exists](#query-frontend-role), [challenges and proposals to scaling the frontend](#challenges-and-proposals) and [an alternative architecture that avoids most problems but comes with its own challenges.](#alternative)

<table>
<tr>
Expand Down Expand Up @@ -246,22 +246,23 @@ In this document we reviewed the [reasons the frontend exists](#query frontend-r
<td>Operational/Configuration Issue. No Changes Proposed.
</td>
<td>
N/A
</td>
</tr>
<tr>
<td>Querier Discovery Lag
</td>
<td>Query Frontend HTTP Health Checks
</td>
<td>Proposed
<td><a href="https://github.com/cortexproject/cortex/pull/2733">Pull Request</a>
</td>
</tr>
<tr>
<td>Dilutes Tenant Fairness
</td>
<td>Round Robin with additional alternatives proposed
</td>
<td><a href="https://github.com/cortexproject/cortex/issues/2431">Issue</a>
<td><a href="https://github.com/cortexproject/cortex/pull/2553">Pull Request</a>
</td>
</tr>
</table>
15 changes: 8 additions & 7 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,19 @@ func runQueryFrontendTest(t *testing.T, testMissingMetricName bool, setup queryF
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", configFile, flags, "")
ingester := e2ecortex.NewIngesterWithConfigFile("ingester", consul.NetworkHTTPEndpoint(), configFile, flags, "")
distributor := e2ecortex.NewDistributorWithConfigFile("distributor", consul.NetworkHTTPEndpoint(), configFile, flags, "")
require.NoError(t, s.StartAndWaitReady(queryFrontend, distributor, ingester))

// Check if we're discovering memcache or not.
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "cortex_memcache_client_servers"))
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Greater(0), "cortex_dns_lookups_total"))
require.NoError(t, s.Start(queryFrontend))

// Start the querier after the query-frontend otherwise we're not
// able to get the query-frontend network endpoint.
querier := e2ecortex.NewQuerierWithConfigFile("querier", consul.NetworkHTTPEndpoint(), configFile, mergeFlags(flags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(querier))

require.NoError(t, s.StartAndWaitReady(querier, ingester, distributor))
require.NoError(t, s.WaitReady(queryFrontend))

// Check if we're discovering memcache or not.
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "cortex_memcache_client_servers"))
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Greater(0), "cortex_dns_lookups_total"))

// Wait until both the distributor and querier have updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
Expand Down
9 changes: 9 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,15 @@ func (t *Cortex) readyHandler(sm *services.Manager) http.HandlerFunc {
}
}

// Query Frontend has a special check that makes sure that a querier is attached before it signals
// itself as ready
if t.Frontend != nil {
if err := t.Frontend.CheckReady(r.Context()); err != nil {
http.Error(w, "Query Frontend not ready: "+err.Error(), http.StatusServiceUnavailable)
return
}
}

http.Error(w, "ready", http.StatusOK)
}
}
28 changes: 28 additions & 0 deletions pkg/querier/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package frontend
import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"io"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/util"
)
Expand Down Expand Up @@ -65,6 +67,8 @@ type Frontend struct {
cond *sync.Cond
queues *queueIterator

connectedClients *atomic.Int32

// Metrics.
queueDuration prometheus.Histogram
queueLength prometheus.Gauge
Expand Down Expand Up @@ -97,6 +101,7 @@ func New(cfg Config, log log.Logger, registerer prometheus.Registerer) (*Fronten
Name: "query_frontend_queue_length",
Help: "Number of queries in the queue.",
}),
connectedClients: atomic.NewInt32(0),
}
f.cond = sync.NewCond(&f.mtx)

Expand Down Expand Up @@ -284,6 +289,9 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *ProcessRequest) (*Pro

// Process allows backends to pull requests from the frontend.
func (f *Frontend) Process(server Frontend_ProcessServer) error {
f.connectedClients.Inc()
defer f.connectedClients.Dec()

// If the downstream request(from querier -> frontend) is cancelled,
// we need to ping the condition variable to unblock getNextRequest.
// Ideally we'd have ctx aware condition variables...
Expand Down Expand Up @@ -426,3 +434,23 @@ FindQueue:
// and wait for more requests.
goto FindQueue
}

// CheckReady determines if the query frontend is ready. Function parameters/return
// chosen to match the same method in the ingester
func (f *Frontend) CheckReady(_ context.Context) error {
// if the downstream url is configured the query frontend is not aware of the state
// of the queriers and is therefore always ready
if f.cfg.DownstreamURL != "" {
return nil
}

// if we have more than one querier connected we will consider ourselves ready
connectedClients := f.connectedClients.Load()
if connectedClients > 0 {
return nil
}

msg := fmt.Sprintf("not ready: number of queriers connected to query-frontend is %d", connectedClients)
level.Info(f.log).Log("msg", msg)
return errors.New(msg)
}
48 changes: 43 additions & 5 deletions pkg/querier/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/gorilla/mux"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing-contrib/go-stdlib/nethttp"
opentracing "github.com/opentracing/opentracing-go"
Expand All @@ -24,6 +25,7 @@ import (
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
uber_atomic "go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/querier"
Expand Down Expand Up @@ -161,6 +163,38 @@ func TestFrontendCancelStatusCode(t *testing.T) {
}
}

func TestFrontendCheckReady(t *testing.T) {
for _, tt := range []struct {
name string
downstreamURL string
connectedClients int32
msg string
readyForRequests bool
}{
{"downstream url is always ready", "super url", 0, "", true},
{"connected clients are ready", "", 3, "", true},
{"no url, no clients is not ready", "", 0, "not ready: number of queriers connected to query-frontend is 0", false},
} {
t.Run(tt.name, func(t *testing.T) {
f := &Frontend{
connectedClients: uber_atomic.NewInt32(tt.connectedClients),
log: log.NewNopLogger(),
cfg: Config{
DownstreamURL: tt.downstreamURL,
},
}
err := f.CheckReady(context.Background())
errMsg := ""

if err != nil {
errMsg = err.Error()
}

require.Equal(t, tt.msg, errMsg)
})
}
}

func testFrontend(t *testing.T, handler http.Handler, test func(addr string), matchMaxConcurrency bool) {
logger := log.NewNopLogger()

Expand Down Expand Up @@ -193,18 +227,22 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string), ma

RegisterFrontendServer(grpcServer, frontend)

r := mux.NewRouter()
r.PathPrefix("/").Handler(middleware.Merge(
middleware.AuthenticateUser,
middleware.Tracer{},
).Wrap(frontend.Handler()))

httpServer := http.Server{
Handler: middleware.Merge(
middleware.AuthenticateUser,
middleware.Tracer{},
).Wrap(frontend.Handler()),
Handler: r,
}
defer httpServer.Shutdown(context.Background()) //nolint:errcheck

go httpServer.Serve(httpListen) //nolint:errcheck
go grpcServer.Serve(grpcListen) //nolint:errcheck

worker, err := NewWorker(workerConfig, querierConfig, httpgrpc_server.NewServer(handler), logger)
var worker services.Service
worker, err = NewWorker(workerConfig, querierConfig, httpgrpc_server.NewServer(handler), logger)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker))

Expand Down