diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c40da47b33..64a3fa98ae7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ * [ENHANCEMENT] Experimental TSDB: Added `cortex_querier_blocks_meta_synced`, which reflects current state of synced blocks over all tenants. #2392 * [ENHANCEMENT] Added `cortex_distributor_latest_seen_sample_timestamp_seconds` metric to see how far behind Prometheus servers are in sending data. #2371 * [ENHANCEMENT] FIFO cache to support eviction based on memory usage. The `-.fifocache.size` CLI flag has been renamed to `-.fifocache.max-size-items` as well as its YAML config option `size` renamed to `max_size_items`. Added `-.fifocache.max-size-bytes` CLI flag and YAML config option `max_size_bytes` to specify memory limit of the cache. #2319 +* [ENHANCEMENT] Added `-querier.worker-match-max-concurrent`. Force worker concurrency to match the `-querier.max-concurrent` option. Overrides `-querier.worker-parallelism`. #2456 * [ENHANCEMENT] Added the following metrics for monitoring delete requests: #2445 - `cortex_purger_delete_requests_received_total`: Number of delete requests received per user. - `cortex_purger_delete_requests_processed_total`: Number of delete requests processed per user. diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 3c000361292..2b1b25b9cd9 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -16,7 +16,7 @@ Duration arguments should be specified with a unit like `5s` or `3h`. Valid time - `-querier.max-concurrent` The maximum number of top-level PromQL queries that will execute at the same time, per querier process. - If using the query frontend, this should be set to at least (`querier.worker-parallelism` * number of query frontend replicas). Otherwise queries may queue in the queriers and not the frontend, which will affect QoS. + If using the query frontend, this should be set to at least (`-querier.worker-parallelism` * number of query frontend replicas). Otherwise queries may queue in the queriers and not the frontend, which will affect QoS. Alternatively, consider using `-querier.worker-match-max-concurrent` to force worker parallelism to match `-querier.max-concurrent`. - `-querier.query-parallelism` @@ -42,9 +42,15 @@ The next three options only apply when the querier is used together with the Que - `-querier.worker-parallelism` - Number of simultaneous queries to process, per worker process. + Number of simultaneous queries to process, per query frontend. See note on `-querier.max-concurrent` +- `-querier.worker-match-max-concurrent` + + Force worker concurrency to match the -querier.max-concurrent option. Overrides `-querier.worker-parallelism`. + See note on `-querier.max-concurrent` + + ## Querier and Ruler The ingester query API was improved over time, but defaults to the old behaviour for backwards-compatibility. For best results both of these next two flags should be set to `true`: diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index b5dcfd6060d..0c4f745231b 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1801,10 +1801,15 @@ The `frontend_worker_config` configures the worker - running within the Cortex q # CLI flag: -querier.frontend-address [frontend_address: | default = ""] -# Number of simultaneous queries to process. +# Number of simultaneous queries to process per query frontend. # CLI flag: -querier.worker-parallelism [parallelism: | default = 10] +# Force worker concurrency to match the -querier.max-concurrent option. +# Overrides querier.worker-parallelism. +# CLI flag: -querier.worker-match-max-concurrent +[match_max_concurrent: | default = false] + # How often to query DNS. # CLI flag: -querier.dns-lookup-period [dns_lookup_duration: | default = 10s] diff --git a/docs/configuration/single-process-config-blocks-gossip-1.yaml b/docs/configuration/single-process-config-blocks-gossip-1.yaml index 1d75f91b4b0..6b4ae86b224 100644 --- a/docs/configuration/single-process-config-blocks-gossip-1.yaml +++ b/docs/configuration/single-process-config-blocks-gossip-1.yaml @@ -80,3 +80,6 @@ tsdb: backend: filesystem # s3, gcs, azure or filesystem are valid options filesystem: dir: /tmp/cortex/storage + +frontend_worker: + match_max_concurrent: true \ No newline at end of file diff --git a/docs/configuration/single-process-config-blocks-gossip-2.yaml b/docs/configuration/single-process-config-blocks-gossip-2.yaml index aa45fbb16a3..8c74f3f9d20 100644 --- a/docs/configuration/single-process-config-blocks-gossip-2.yaml +++ b/docs/configuration/single-process-config-blocks-gossip-2.yaml @@ -79,3 +79,6 @@ tsdb: backend: filesystem # s3, gcs, azure or filesystem are valid options filesystem: dir: /tmp/cortex/storage + +frontend_worker: + match_max_concurrent: true \ No newline at end of file diff --git a/docs/configuration/single-process-config-blocks.yaml b/docs/configuration/single-process-config-blocks.yaml index 05dcd5c7b25..033ec1618fa 100644 --- a/docs/configuration/single-process-config-blocks.yaml +++ b/docs/configuration/single-process-config-blocks.yaml @@ -82,3 +82,6 @@ compactor: sharding_ring: kvstore: store: inmemory + +frontend_worker: + match_max_concurrent: true \ No newline at end of file diff --git a/docs/configuration/single-process-config.md b/docs/configuration/single-process-config.md index 5ff7f281a45..291d10847b6 100644 --- a/docs/configuration/single-process-config.md +++ b/docs/configuration/single-process-config.md @@ -71,4 +71,9 @@ storage: filesystem: directory: /tmp/cortex/chunks + +# Configure the frontend worker in the querier to match worker count +# to max_concurrent on the queriers. +frontend_worker: + match_max_concurrent: true ``` diff --git a/docs/configuration/single-process-config.yaml b/docs/configuration/single-process-config.yaml index 03c5680690d..03377082fa7 100644 --- a/docs/configuration/single-process-config.yaml +++ b/docs/configuration/single-process-config.yaml @@ -75,3 +75,6 @@ storage: purger: enable: true object_store_type: filesystem + +frontend_worker: + match_max_concurrent: true \ No newline at end of file diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 27da98e26dc..f7c913eca90 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -210,7 +210,7 @@ func (t *Cortex) initQuerier(cfg *Config) (serv services.Service, err error) { // Query frontend worker will only be started after all its dependencies are started, not here. // Worker may also be nil, if not configured, which is OK. - worker, err := frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(handler), util.Logger) + worker, err := frontend.NewWorker(cfg.Worker, cfg.Querier, httpgrpc_server.NewServer(handler), util.Logger) if err != nil { return } diff --git a/pkg/querier/frontend/frontend_test.go b/pkg/querier/frontend/frontend_test.go index 954ec828edd..19dd3c2655a 100644 --- a/pkg/querier/frontend/frontend_test.go +++ b/pkg/querier/frontend/frontend_test.go @@ -26,6 +26,7 @@ import ( "github.com/weaveworks/common/user" "google.golang.org/grpc" + "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -55,7 +56,8 @@ func TestFrontend(t *testing.T) { assert.Equal(t, "Hello World", string(body)) } - testFrontend(t, handler, test) + testFrontend(t, handler, test, false) + testFrontend(t, handler, test, true) } func TestFrontendPropagateTrace(t *testing.T) { @@ -104,7 +106,8 @@ func TestFrontendPropagateTrace(t *testing.T) { // Query should do one calls. assert.Equal(t, traceID, <-observedTraceID) } - testFrontend(t, handler, test) + testFrontend(t, handler, test, false) + testFrontend(t, handler, test, true) } // TestFrontendCancel ensures that when client requests are cancelled, @@ -135,7 +138,9 @@ func TestFrontendCancel(t *testing.T) { time.Sleep(100 * time.Millisecond) assert.Equal(t, int32(1), atomic.LoadInt32(&tries)) } - testFrontend(t, handler, test) + testFrontend(t, handler, test, false) + tries = 0 + testFrontend(t, handler, test, true) } func TestFrontendCancelStatusCode(t *testing.T) { @@ -156,15 +161,18 @@ func TestFrontendCancelStatusCode(t *testing.T) { } } -func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) { +func testFrontend(t *testing.T, handler http.Handler, test func(addr string), matchMaxConcurrency bool) { logger := log.NewNopLogger() var ( - config Config - workerConfig WorkerConfig + config Config + workerConfig WorkerConfig + querierConfig querier.Config ) flagext.DefaultValues(&config, &workerConfig) workerConfig.Parallelism = 1 + workerConfig.MatchMaxConcurrency = matchMaxConcurrency + querierConfig.MaxConcurrent = 1 // localhost:0 prevents firewall warnings on Mac OS X. grpcListen, err := net.Listen("tcp", "localhost:0") @@ -196,7 +204,7 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) { go httpServer.Serve(httpListen) //nolint:errcheck go grpcServer.Serve(grpcListen) //nolint:errcheck - worker, err := NewWorker(workerConfig, httpgrpc_server.NewServer(handler), logger) + worker, err := NewWorker(workerConfig, querierConfig, httpgrpc_server.NewServer(handler), logger) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker)) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index d399697a52a..0b095458293 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -4,36 +4,28 @@ import ( "context" "flag" "fmt" - "net/http" - "sync" + "math/rand" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" - "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" "google.golang.org/grpc" "google.golang.org/grpc/naming" - "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/util/grpcclient" "github.com/cortexproject/cortex/pkg/util/services" ) -var ( - backoffConfig = util.BackoffConfig{ - MinBackoff: 50 * time.Millisecond, - MaxBackoff: 1 * time.Second, - } -) - // WorkerConfig is config for a worker. type WorkerConfig struct { - Address string `yaml:"frontend_address"` - Parallelism int `yaml:"parallelism"` - DNSLookupDuration time.Duration `yaml:"dns_lookup_duration"` + Address string `yaml:"frontend_address"` + Parallelism int `yaml:"parallelism"` + MatchMaxConcurrency bool `yaml:"match_max_concurrent"` + DNSLookupDuration time.Duration `yaml:"dns_lookup_duration"` GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` } @@ -41,7 +33,8 @@ type WorkerConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Address, "querier.frontend-address", "", "Address of query frontend service, in host:port format.") - f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process.") + f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process per query frontend.") + f.BoolVar(&cfg.MatchMaxConcurrency, "querier.worker-match-max-concurrent", false, "Force worker concurrency to match the -querier.max-concurrent option. Overrides querier.worker-parallelism.") f.DurationVar(&cfg.DNSLookupDuration, "querier.dns-lookup-period", 10*time.Second, "How often to query DNS.") cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f) @@ -49,17 +42,18 @@ func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) { // Worker is the counter-part to the frontend, actually processing requests. type worker struct { - cfg WorkerConfig - log log.Logger - server *server.Server + cfg WorkerConfig + querierCfg querier.Config + log log.Logger + server *server.Server - watcher naming.Watcher //nolint:staticcheck //Skipping for now. If you still see this more than likely issue https://github.com/cortexproject/cortex/issues/2015 has not yet been addressed. - wg sync.WaitGroup + watcher naming.Watcher //nolint:staticcheck //Skipping for now. If you still see this more than likely issue https://github.com/cortexproject/cortex/issues/2015 has not yet been addressed. + managers map[string]*frontendManager } // NewWorker creates a new worker and returns a service that is wrapping it. // If no address is specified, it returns nil service (and no error). -func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (services.Service, error) { +func NewWorker(cfg WorkerConfig, querierCfg querier.Config, server *server.Server, log log.Logger) (services.Service, error) { if cfg.Address == "" { level.Info(log).Log("msg", "no address specified, not starting worker") return nil, nil @@ -76,17 +70,21 @@ func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (service } w := &worker{ - cfg: cfg, - log: log, - server: server, - watcher: watcher, + cfg: cfg, + querierCfg: querierCfg, + log: log, + server: server, + watcher: watcher, + managers: map[string]*frontendManager{}, } return services.NewBasicService(nil, w.watchDNSLoop, w.stopping), nil } func (w *worker) stopping(_ error) error { // wait until all per-address workers are done. This is only called after watchDNSLoop exits. - w.wg.Wait() + for _, mgr := range w.managers { + mgr.stop() + } return nil } @@ -101,8 +99,6 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { w.watcher.Close() }() - cancels := map[string]context.CancelFunc{} - for { updates, err := w.watcher.Next() if err != nil { @@ -118,116 +114,90 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { switch update.Op { case naming.Add: level.Debug(w.log).Log("msg", "adding connection", "addr", update.Addr) - ctx, cancel := context.WithCancel(servCtx) - cancels[update.Addr] = cancel - w.runMany(ctx, update.Addr) + client, err := w.connect(servCtx, update.Addr) + if err != nil { + level.Error(w.log).Log("msg", "error connecting", "addr", update.Addr, "err", err) + } + + w.managers[update.Addr] = newFrontendManager(servCtx, w.log, w.server, client, w.cfg.GRPCClientConfig) case naming.Delete: level.Debug(w.log).Log("msg", "removing connection", "addr", update.Addr) - if cancel, ok := cancels[update.Addr]; ok { - cancel() + if mgr, ok := w.managers[update.Addr]; ok { + mgr.stop() + delete(w.managers, update.Addr) } default: return fmt.Errorf("unknown op: %v", update.Op) } } + + w.resetConcurrency() } } -// runMany starts N runOne loops for a given address. -func (w *worker) runMany(ctx context.Context, address string) { - client, err := w.connect(address) - if err != nil { - level.Error(w.log).Log("msg", "error connecting", "addr", address, "err", err) - return +func (w *worker) connect(ctx context.Context, address string) (FrontendClient, error) { + opts := []grpc.DialOption{ + grpc.WithInsecure(), } + opts = append(opts, w.cfg.GRPCClientConfig.DialOption([]grpc.UnaryClientInterceptor{middleware.ClientUserHeaderInterceptor}, nil)...) - w.wg.Add(w.cfg.Parallelism) - for i := 0; i < w.cfg.Parallelism; i++ { - go w.runOne(ctx, client) + conn, err := grpc.DialContext(ctx, address, opts...) + if err != nil { + return nil, err } + return NewFrontendClient(conn), nil } -// runOne loops, trying to establish a stream to the frontend to begin -// request processing. -func (w *worker) runOne(ctx context.Context, client FrontendClient) { - defer w.wg.Done() +func (w *worker) resetConcurrency() { + addresses := make([]string, 0, len(w.managers)) + for addr := range w.managers { + addresses = append(addresses, addr) + } + rand.Shuffle(len(addresses), func(i, j int) { addresses[i], addresses[j] = addresses[j], addresses[i] }) - backoff := util.NewBackoff(ctx, backoffConfig) - for backoff.Ongoing() { - c, err := client.Process(ctx) - if err != nil { - level.Error(w.log).Log("msg", "error contacting frontend", "err", err) - backoff.Wait() - continue - } + totalConcurrency := 0 + for i, addr := range addresses { + concurrentRequests := w.concurrency(i, addr) + totalConcurrency += concurrentRequests - if err := w.process(c); err != nil { - level.Error(w.log).Log("msg", "error processing requests", "err", err) - backoff.Wait() - continue + if mgr, ok := w.managers[addr]; ok { + mgr.concurrentRequests(concurrentRequests) + } else { + level.Error(w.log).Log("msg", "address not found in managers map. this should not happen", "addr", addr) } + } - backoff.Reset() + if totalConcurrency > w.querierCfg.MaxConcurrent { + level.Warn(w.log).Log("msg", "total worker concurrency is greater than promql max concurrency. queries may be queued in the querier which reduces QOS") } } -// process loops processing requests on an established stream. -func (w *worker) process(c Frontend_ProcessClient) error { - // Build a child context so we can cancel querie when the stream is closed. - ctx, cancel := context.WithCancel(c.Context()) - defer cancel() +func (w *worker) concurrency(index int, addr string) int { + concurrentRequests := 0 - for { - request, err := c.Recv() - if err != nil { - return err - } - - // Handle the request on a "background" goroutine, so we go back to - // blocking on c.Recv(). This allows us to detect the stream closing - // and cancel the query. We don't actally handle queries in parallel - // here, as we're running in lock step with the server - each Recv is - // paired with a Send. - go func() { - response, err := w.server.Handle(ctx, request.HttpRequest) - if err != nil { - var ok bool - response, ok = httpgrpc.HTTPResponseFromError(err) - if !ok { - response = &httpgrpc.HTTPResponse{ - Code: http.StatusInternalServerError, - Body: []byte(err.Error()), - } - } - } - - // Ensure responses that are too big are not retried. - if len(response.Body) >= w.cfg.GRPCClientConfig.MaxSendMsgSize { - errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), w.cfg.GRPCClientConfig.MaxSendMsgSize) - response = &httpgrpc.HTTPResponse{ - Code: http.StatusRequestEntityTooLarge, - Body: []byte(errMsg), - } - level.Error(w.log).Log("msg", "error processing query", "err", errMsg) - } + if w.cfg.MatchMaxConcurrency { + concurrentRequests = w.querierCfg.MaxConcurrent / len(w.managers) - if err := c.Send(&ProcessResponse{ - HttpResponse: response, - }); err != nil { - level.Error(w.log).Log("msg", "error processing requests", "err", err) - } - }() + // If max concurrency does not evenly divide into our frontends a subset will be chosen + // to receive an extra connection. Frontend addresses were shuffled above so this will be a + // random selection of frontends. + if index < w.querierCfg.MaxConcurrent%len(w.managers) { + level.Warn(w.log).Log("msg", "max concurrency is not evenly divisible across query frontends. adding an extra connection", "addr", addr) + concurrentRequests++ + } + } else { + concurrentRequests = w.cfg.Parallelism } -} -func (w *worker) connect(address string) (FrontendClient, error) { - opts := []grpc.DialOption{grpc.WithInsecure()} - opts = append(opts, w.cfg.GRPCClientConfig.DialOption([]grpc.UnaryClientInterceptor{middleware.ClientUserHeaderInterceptor}, nil)...) - conn, err := grpc.Dial(address, opts...) - if err != nil { - return nil, err + // If concurrentRequests is 0 then w.querierCfg.MaxConcurrent is less than the total number of + // query frontends. In order to prevent accidentally starving a frontend we are just going to + // always connect once to every frontend. This is dangerous b/c we may start exceeding promql + // max concurrency. + if concurrentRequests == 0 { + concurrentRequests = 1 } - return NewFrontendClient(conn), nil + + return concurrentRequests } diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go new file mode 100644 index 00000000000..2af3ce883d3 --- /dev/null +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -0,0 +1,152 @@ +package frontend + +import ( + "context" + "fmt" + "net/http" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/httpgrpc/server" + "go.uber.org/atomic" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/grpcclient" +) + +var ( + backoffConfig = util.BackoffConfig{ + MinBackoff: 50 * time.Millisecond, + MaxBackoff: 1 * time.Second, + } +) + +type frontendManager struct { + server *server.Server + client FrontendClient + clientCfg grpcclient.Config + + log log.Logger + + workerCancels []context.CancelFunc + serverCtx context.Context + wg sync.WaitGroup + currentProcessors *atomic.Int32 +} + +func newFrontendManager(serverCtx context.Context, log log.Logger, server *server.Server, client FrontendClient, clientCfg grpcclient.Config) *frontendManager { + f := &frontendManager{ + log: log, + client: client, + clientCfg: clientCfg, + server: server, + serverCtx: serverCtx, + currentProcessors: atomic.NewInt32(0), + } + + return f +} + +func (f *frontendManager) stop() { + f.concurrentRequests(0) + f.wg.Wait() +} + +func (f *frontendManager) concurrentRequests(n int) { + if n < 0 { + n = 0 + } + + for len(f.workerCancels) < n { + ctx, cancel := context.WithCancel(f.serverCtx) + f.workerCancels = append(f.workerCancels, cancel) + + go f.runOne(ctx) + } + + for len(f.workerCancels) > n { + var cancel context.CancelFunc + cancel, f.workerCancels = f.workerCancels[0], f.workerCancels[1:] + cancel() + } +} + +// runOne loops, trying to establish a stream to the frontend to begin +// request processing. +func (f *frontendManager) runOne(ctx context.Context) { + f.wg.Add(1) + defer f.wg.Done() + + f.currentProcessors.Inc() + defer f.currentProcessors.Dec() + + backoff := util.NewBackoff(ctx, backoffConfig) + for backoff.Ongoing() { + c, err := f.client.Process(ctx) + if err != nil { + level.Error(f.log).Log("msg", "error contacting frontend", "err", err) + backoff.Wait() + continue + } + + if err := f.process(ctx, c); err != nil { + level.Error(f.log).Log("msg", "error processing requests", "err", err) + backoff.Wait() + continue + } + + backoff.Reset() + } +} + +// process loops processing requests on an established stream. +func (f *frontendManager) process(ctx context.Context, c Frontend_ProcessClient) error { + // Build a child context so we can cancel a query when the stream is closed. + ctx, cancel := context.WithCancel(c.Context()) + defer cancel() + + for { + request, err := c.Recv() + if err != nil { + return err + } + + // Handle the request on a "background" goroutine, so we go back to + // blocking on c.Recv(). This allows us to detect the stream closing + // and cancel the query. We don't actally handle queries in parallel + // here, as we're running in lock step with the server - each Recv is + // paired with a Send. + go func() { + response, err := f.server.Handle(ctx, request.HttpRequest) + if err != nil { + var ok bool + response, ok = httpgrpc.HTTPResponseFromError(err) + if !ok { + response = &httpgrpc.HTTPResponse{ + Code: http.StatusInternalServerError, + Body: []byte(err.Error()), + } + } + } + + // Ensure responses that are too big are not retried. + if len(response.Body) >= f.clientCfg.MaxSendMsgSize { + errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), f.clientCfg.MaxSendMsgSize) + response = &httpgrpc.HTTPResponse{ + Code: http.StatusRequestEntityTooLarge, + Body: []byte(errMsg), + } + level.Error(f.log).Log("msg", "error processing query", "err", errMsg) + } + + if err := c.Send(&ProcessResponse{ + HttpResponse: response, + }); err != nil { + level.Error(f.log).Log("msg", "error processing requests", "err", err) + } + }() + } +} diff --git a/pkg/querier/frontend/worker_frontend_manager_test.go b/pkg/querier/frontend/worker_frontend_manager_test.go new file mode 100644 index 00000000000..90131363502 --- /dev/null +++ b/pkg/querier/frontend/worker_frontend_manager_test.go @@ -0,0 +1,166 @@ +package frontend + +import ( + "context" + "errors" + "fmt" + "net/http" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/weaveworks/common/httpgrpc" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" + "go.uber.org/atomic" + grpc "google.golang.org/grpc" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/grpcclient" +) + +type mockFrontendClient struct { + failRecv bool +} + +func (m *mockFrontendClient) Process(ctx context.Context, opts ...grpc.CallOption) (Frontend_ProcessClient, error) { + return &mockFrontendProcessClient{ + ctx: ctx, + failRecv: m.failRecv, + }, nil +} + +type mockFrontendProcessClient struct { + grpc.ClientStream + + ctx context.Context + failRecv bool + wg sync.WaitGroup +} + +func (m *mockFrontendProcessClient) Send(*ProcessResponse) error { + m.wg.Done() + return nil +} +func (m *mockFrontendProcessClient) Recv() (*ProcessRequest, error) { + m.wg.Wait() + m.wg.Add(1) + + if m.ctx.Err() != nil { + return nil, m.ctx.Err() + } + + if m.failRecv { + return nil, errors.New("wups") + } + + return &ProcessRequest{ + HttpRequest: &httpgrpc.HTTPRequest{}, + }, nil +} +func (m *mockFrontendProcessClient) Context() context.Context { + return context.Background() +} + +func TestConcurrency(t *testing.T) { + calls := atomic.NewInt32(0) + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + calls.Inc() + _, err := w.Write([]byte("Hello World")) + assert.NoError(t, err) + }) + + tests := []struct { + concurrency []int + }{ + { + concurrency: []int{0}, + }, + { + concurrency: []int{1}, + }, + { + concurrency: []int{5}, + }, + { + concurrency: []int{5, 3, 7}, + }, + { + concurrency: []int{-1}, + }, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("Testing concurrency %v", tt.concurrency), func(t *testing.T) { + mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, grpcclient.Config{}) + + for _, c := range tt.concurrency { + calls.Store(0) + mgr.concurrentRequests(c) + time.Sleep(50 * time.Millisecond) + + expected := int32(c) + if expected < 0 { + expected = 0 + } + assert.Equal(t, expected, mgr.currentProcessors.Load()) + + if expected > 0 { + assert.Greater(t, calls.Load(), int32(0)) + } + } + + mgr.stop() + assert.Equal(t, int32(0), mgr.currentProcessors.Load()) + }) + } +} + +func TestRecvFailDoesntCancelProcess(t *testing.T) { + calls := atomic.NewInt32(0) + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + calls.Inc() + _, err := w.Write([]byte("Hello World")) + assert.NoError(t, err) + }) + + client := &mockFrontendClient{ + failRecv: true, + } + + mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), client, grpcclient.Config{}) + + mgr.concurrentRequests(1) + time.Sleep(50 * time.Millisecond) + assert.Equal(t, int32(1), mgr.currentProcessors.Load()) + + mgr.stop() + assert.Equal(t, int32(0), mgr.currentProcessors.Load()) + assert.Equal(t, int32(0), calls.Load()) +} + +func TestServeCancelStopsProcess(t *testing.T) { + calls := atomic.NewInt32(0) + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + calls.Inc() + _, err := w.Write([]byte("Hello World")) + assert.NoError(t, err) + }) + + client := &mockFrontendClient{ + failRecv: true, + } + + ctx, cancel := context.WithCancel(context.Background()) + mgr := newFrontendManager(ctx, util.Logger, httpgrpc_server.NewServer(handler), client, grpcclient.Config{MaxSendMsgSize: 100000}) + + mgr.concurrentRequests(1) + time.Sleep(50 * time.Millisecond) + assert.Equal(t, int32(1), mgr.currentProcessors.Load()) + + cancel() + time.Sleep(50 * time.Millisecond) + assert.Equal(t, int32(0), mgr.currentProcessors.Load()) + + mgr.stop() + assert.Equal(t, int32(0), mgr.currentProcessors.Load()) +} diff --git a/pkg/querier/frontend/worker_test.go b/pkg/querier/frontend/worker_test.go new file mode 100644 index 00000000000..ba3ee655a26 --- /dev/null +++ b/pkg/querier/frontend/worker_test.go @@ -0,0 +1,108 @@ +package frontend + +import ( + "context" + "net/http" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" + + "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/grpcclient" +) + +func TestResetConcurrency(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte("Hello World")) + assert.NoError(t, err) + }) + + tests := []struct { + name string + parallelism int + maxConcurrent int + numManagers int + expectedConcurrency int32 + }{ + { + name: "Test create least one worker per manager", + parallelism: 0, + maxConcurrent: 0, + numManagers: 2, + expectedConcurrency: 2, + }, + { + name: "Test concurrency per query frontend configuration", + parallelism: 4, + maxConcurrent: 0, + numManagers: 2, + expectedConcurrency: 8, + }, + { + name: "Test Total Parallelism with a remainder", + parallelism: 1, + maxConcurrent: 7, + numManagers: 4, + expectedConcurrency: 7, + }, + { + name: "Test Total Parallelism dividing evenly", + parallelism: 1, + maxConcurrent: 6, + numManagers: 2, + expectedConcurrency: 6, + }, + { + name: "Test Total Parallelism at least one worker per manager", + parallelism: 1, + maxConcurrent: 3, + numManagers: 6, + expectedConcurrency: 6, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := WorkerConfig{ + Parallelism: tt.parallelism, + MatchMaxConcurrency: tt.maxConcurrent > 0, + } + querierCfg := querier.Config{ + MaxConcurrent: tt.maxConcurrent, + } + + w := &worker{ + cfg: cfg, + querierCfg: querierCfg, + log: util.Logger, + managers: map[string]*frontendManager{}, + } + + for i := 0; i < tt.numManagers; i++ { + w.managers[strconv.Itoa(i)] = newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, grpcclient.Config{}) + } + + w.resetConcurrency() + time.Sleep(100 * time.Millisecond) + + concurrency := int32(0) + for _, mgr := range w.managers { + concurrency += mgr.currentProcessors.Load() + } + assert.Equal(t, tt.expectedConcurrency, concurrency) + + err := w.stopping(nil) + assert.NoError(t, err) + + concurrency = int32(0) + for _, mgr := range w.managers { + concurrency += mgr.currentProcessors.Load() + } + assert.Equal(t, int32(0), concurrency) + }) + } +}