diff --git a/CHANGELOG.md b/CHANGELOG.md index 13b91dd1dd7..12eedba80b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ * `cortex_prometheus_notifications_queue_length` * `cortex_prometheus_notifications_queue_capacity` * `cortex_prometheus_notifications_alertmanagers_discovered` +* [ENHANCEMENT] The behavior of the `/ready` was changed for the query frontend to indicate when it was ready to accept queries. This is intended for use by a read path load balancer that would want to wait for the frontend to have attached queriers before including it in the backend. #2733 * [ENHANCEMENT] Experimental Delete Series: Add support for deletion of chunks for remaining stores. #2801 * [ENHANCEMENT] Add `-modules` command line flag to list possible values for `-target`. Also, log warning if given target is internal component. #2752 * [ENHANCEMENT] Added `-ingester.flush-on-shutdown-with-wal-enabled` option to enable chunks flushing even when WAL is enabled. #2780 diff --git a/docs/operations/scalable-query-frontend.md b/docs/operations/scalable-query-frontend.md new file mode 100644 index 00000000000..15a801b782b --- /dev/null +++ b/docs/operations/scalable-query-frontend.md @@ -0,0 +1,29 @@ +--- +title: "Scaling the Query Frontend" +linkTitle: "Scaling the Query Frontend" +weight: 5 +slug: scaling-query-frontend +--- + +Historically scaling the Cortex query frontend has [posed some challenges](https://cortexmetrics.io/docs/proposals/scalable-query-frontend/). This document aims to detail how to use some of the added configuration parameters to correctly scale the frontend. Note that these instructions apply in both the HA single binary scenario or microservices mode. + +## DNS Configuration / Readiness + +When a new frontend is first created on scale up it will not immediately have queriers attached to it. The existing endpoint `/ready` was updated to only return http 200 when the query frontend was ready to serve queries. Make sure to configure this endpoint as a healthcheck in your load balancer. Otherwise a query frontend scale up event might result in failed queries or high latency for a bit while queriers attach. + +## Querier Max Concurrency + +Make sure to configure the querier frontend worker to match max concurrency. This will allow the operator to freely scale the frontend up and down without impacting the amount of work an individual querier is attempting to perform. More details [here](https://cortexmetrics.io/docs/proposals/scalable-query-frontend/#dynamic-querier-concurrency). + +### Example Configuration + +**CLI** +``` +-querier.worker-match-max-concurrent=true +``` + +**Config File** +```yaml +frontend_worker: + match_max_concurrent: true +``` \ No newline at end of file diff --git a/docs/proposals/scalable-query-frontend.md b/docs/proposals/scalable-query-frontend.md index b33ad61e956..7688d2c5c6f 100644 --- a/docs/proposals/scalable-query-frontend.md +++ b/docs/proposals/scalable-query-frontend.md @@ -213,7 +213,7 @@ Having a singleton queue is attractive because it is simple to reason about and ## Conclusion -In this document we reviewed the [reasons the frontend exists](#query frontend-role), [challenges and proposals to scaling the frontend](#challenges-and-proposals) and [an alternative architecture that avoids most problems but comes with its own challenges.](#alternative) +In this document we reviewed the [reasons the frontend exists](#query-frontend-role), [challenges and proposals to scaling the frontend](#challenges-and-proposals) and [an alternative architecture that avoids most problems but comes with its own challenges.](#alternative) @@ -246,6 +246,7 @@ In this document we reviewed the [reasons the frontend exists](#query frontend-r @@ -253,7 +254,7 @@ In this document we reviewed the [reasons the frontend exists](#query frontend-r - @@ -261,7 +262,7 @@ In this document we reviewed the [reasons the frontend exists](#query frontend-r -
Operational/Configuration Issue. No Changes Proposed. + N/A
Query Frontend HTTP Health Checks Proposed + Pull Request
Round Robin with additional alternatives proposed Issue + Pull Request
diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 9118f85817b..ba79ed9e1de 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -131,18 +131,19 @@ func runQueryFrontendTest(t *testing.T, testMissingMetricName bool, setup queryF queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", configFile, flags, "") ingester := e2ecortex.NewIngesterWithConfigFile("ingester", consul.NetworkHTTPEndpoint(), configFile, flags, "") distributor := e2ecortex.NewDistributorWithConfigFile("distributor", consul.NetworkHTTPEndpoint(), configFile, flags, "") - require.NoError(t, s.StartAndWaitReady(queryFrontend, distributor, ingester)) - // Check if we're discovering memcache or not. - require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "cortex_memcache_client_servers")) - require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Greater(0), "cortex_dns_lookups_total")) + require.NoError(t, s.Start(queryFrontend)) - // Start the querier after the query-frontend otherwise we're not - // able to get the query-frontend network endpoint. querier := e2ecortex.NewQuerierWithConfigFile("querier", consul.NetworkHTTPEndpoint(), configFile, mergeFlags(flags, map[string]string{ "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), }), "") - require.NoError(t, s.StartAndWaitReady(querier)) + + require.NoError(t, s.StartAndWaitReady(querier, ingester, distributor)) + require.NoError(t, s.WaitReady(queryFrontend)) + + // Check if we're discovering memcache or not. + require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "cortex_memcache_client_servers")) + require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Greater(0), "cortex_dns_lookups_total")) // Wait until both the distributor and querier have updated the ring. require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 98c71fe12f8..23b9689496f 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -401,6 +401,15 @@ func (t *Cortex) readyHandler(sm *services.Manager) http.HandlerFunc { } } + // Query Frontend has a special check that makes sure that a querier is attached before it signals + // itself as ready + if t.Frontend != nil { + if err := t.Frontend.CheckReady(r.Context()); err != nil { + http.Error(w, "Query Frontend not ready: "+err.Error(), http.StatusServiceUnavailable) + return + } + } + http.Error(w, "ready", http.StatusOK) } } diff --git a/pkg/querier/frontend/frontend.go b/pkg/querier/frontend/frontend.go index 5d1e5fc81bb..3f42b030d18 100644 --- a/pkg/querier/frontend/frontend.go +++ b/pkg/querier/frontend/frontend.go @@ -3,6 +3,7 @@ package frontend import ( "bytes" "context" + "errors" "flag" "fmt" "io" @@ -23,6 +24,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/user" + "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/util" ) @@ -65,6 +67,8 @@ type Frontend struct { cond *sync.Cond queues *queueIterator + connectedClients *atomic.Int32 + // Metrics. queueDuration prometheus.Histogram queueLength prometheus.Gauge @@ -97,6 +101,7 @@ func New(cfg Config, log log.Logger, registerer prometheus.Registerer) (*Fronten Name: "query_frontend_queue_length", Help: "Number of queries in the queue.", }), + connectedClients: atomic.NewInt32(0), } f.cond = sync.NewCond(&f.mtx) @@ -284,6 +289,9 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *ProcessRequest) (*Pro // Process allows backends to pull requests from the frontend. func (f *Frontend) Process(server Frontend_ProcessServer) error { + f.connectedClients.Inc() + defer f.connectedClients.Dec() + // If the downstream request(from querier -> frontend) is cancelled, // we need to ping the condition variable to unblock getNextRequest. // Ideally we'd have ctx aware condition variables... @@ -426,3 +434,23 @@ FindQueue: // and wait for more requests. goto FindQueue } + +// CheckReady determines if the query frontend is ready. Function parameters/return +// chosen to match the same method in the ingester +func (f *Frontend) CheckReady(_ context.Context) error { + // if the downstream url is configured the query frontend is not aware of the state + // of the queriers and is therefore always ready + if f.cfg.DownstreamURL != "" { + return nil + } + + // if we have more than one querier connected we will consider ourselves ready + connectedClients := f.connectedClients.Load() + if connectedClients > 0 { + return nil + } + + msg := fmt.Sprintf("not ready: number of queriers connected to query-frontend is %d", connectedClients) + level.Info(f.log).Log("msg", msg) + return errors.New(msg) +} diff --git a/pkg/querier/frontend/frontend_test.go b/pkg/querier/frontend/frontend_test.go index 19dd3c2655a..af52c30ac33 100644 --- a/pkg/querier/frontend/frontend_test.go +++ b/pkg/querier/frontend/frontend_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/go-kit/kit/log" + "github.com/gorilla/mux" otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing-contrib/go-stdlib/nethttp" opentracing "github.com/opentracing/opentracing-go" @@ -24,6 +25,7 @@ import ( httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" + uber_atomic "go.uber.org/atomic" "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/querier" @@ -161,6 +163,38 @@ func TestFrontendCancelStatusCode(t *testing.T) { } } +func TestFrontendCheckReady(t *testing.T) { + for _, tt := range []struct { + name string + downstreamURL string + connectedClients int32 + msg string + readyForRequests bool + }{ + {"downstream url is always ready", "super url", 0, "", true}, + {"connected clients are ready", "", 3, "", true}, + {"no url, no clients is not ready", "", 0, "not ready: number of queriers connected to query-frontend is 0", false}, + } { + t.Run(tt.name, func(t *testing.T) { + f := &Frontend{ + connectedClients: uber_atomic.NewInt32(tt.connectedClients), + log: log.NewNopLogger(), + cfg: Config{ + DownstreamURL: tt.downstreamURL, + }, + } + err := f.CheckReady(context.Background()) + errMsg := "" + + if err != nil { + errMsg = err.Error() + } + + require.Equal(t, tt.msg, errMsg) + }) + } +} + func testFrontend(t *testing.T, handler http.Handler, test func(addr string), matchMaxConcurrency bool) { logger := log.NewNopLogger() @@ -193,18 +227,22 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string), ma RegisterFrontendServer(grpcServer, frontend) + r := mux.NewRouter() + r.PathPrefix("/").Handler(middleware.Merge( + middleware.AuthenticateUser, + middleware.Tracer{}, + ).Wrap(frontend.Handler())) + httpServer := http.Server{ - Handler: middleware.Merge( - middleware.AuthenticateUser, - middleware.Tracer{}, - ).Wrap(frontend.Handler()), + Handler: r, } defer httpServer.Shutdown(context.Background()) //nolint:errcheck go httpServer.Serve(httpListen) //nolint:errcheck go grpcServer.Serve(grpcListen) //nolint:errcheck - worker, err := NewWorker(workerConfig, querierConfig, httpgrpc_server.NewServer(handler), logger) + var worker services.Service + worker, err = NewWorker(workerConfig, querierConfig, httpgrpc_server.NewServer(handler), logger) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker))