From c5373a837d770173da4640067fffd983ae889988 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 30 Oct 2020 17:17:05 +0100 Subject: [PATCH] Fixed TestSchedulerEnqueueWithFrontendDisconnect flakyness Signed-off-by: Marco Pracucci --- pkg/querier/frontend2/scheduler.go | 63 ++++++++++++++++--------- pkg/querier/frontend2/scheduler_test.go | 13 ++++- 2 files changed, 53 insertions(+), 23 deletions(-) diff --git a/pkg/querier/frontend2/scheduler.go b/pkg/querier/frontend2/scheduler.go index 1d3070a7c16..25a6a12dbf0 100644 --- a/pkg/querier/frontend2/scheduler.go +++ b/pkg/querier/frontend2/scheduler.go @@ -42,9 +42,10 @@ type Scheduler struct { pendingRequests map[requestKey]*schedulerRequest // Request is kept in this map even after being dispatched to querier. It can still be canceled at that time. // Metrics. - connectedWorkers prometheus.GaugeFunc - queueDuration prometheus.Histogram - queueLength *prometheus.GaugeVec + connectedQuerierClients prometheus.GaugeFunc + connectedFrontendClients prometheus.GaugeFunc + queueDuration prometheus.Histogram + queueLength *prometheus.GaugeVec } type requestKey struct { @@ -71,33 +72,35 @@ func (cfg *SchedulerConfig) RegisterFlags(f *flag.FlagSet) { // NewScheduler creates a new Scheduler. func NewScheduler(cfg SchedulerConfig, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Scheduler, error) { - connectedQuerierWorkers := atomic.NewInt32(0) s := &Scheduler{ log: log, limits: limits, - queues: newUserQueues(cfg.MaxOutstandingPerTenant), - pendingRequests: map[requestKey]*schedulerRequest{}, - - queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ - Name: "cortex_query_scheduler_queue_duration_seconds", - Help: "Time spend by requests in queue before getting picked up by a querier.", - Buckets: prometheus.DefBuckets, - }), - connectedWorkers: promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ - Name: "cortex_query_scheduler_connected_workers", - Help: "Number of querier worker clients currently connected to the query-scheduler.", - }, func() float64 { return float64(connectedQuerierWorkers.Load()) }), - queueLength: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ - Name: "cortex_query_scheduler_queue_length", - Help: "Number of queries in the queue.", - }, []string{"user"}), - + queues: newUserQueues(cfg.MaxOutstandingPerTenant), + pendingRequests: map[requestKey]*schedulerRequest{}, connectedFrontends: map[string]*connectedFrontend{}, - connectedQuerierWorkers: connectedQuerierWorkers, + connectedQuerierWorkers: atomic.NewInt32(0), } s.cond = sync.NewCond(&s.mtx) + s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_query_scheduler_queue_duration_seconds", + Help: "Time spend by requests in queue before getting picked up by a querier.", + Buckets: prometheus.DefBuckets, + }) + s.connectedQuerierClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_query_scheduler_connected_querier_clients", + Help: "Number of querier worker clients currently connected to the query-scheduler.", + }, s.getConnectedQuerierClientsMetric) + s.connectedFrontendClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_query_scheduler_connected_frontend_clients", + Help: "Number of query-frontend worker clients currently connected to the query-scheduler.", + }, s.getConnectedFrontendClientsMetric) + s.queueLength = promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_query_scheduler_queue_length", + Help: "Number of queries in the queue.", + }, []string{"user"}) + s.Service = services.NewIdleService(nil, s.stopping) return s, nil } @@ -515,3 +518,19 @@ func (s *Scheduler) unregisterQuerierConnection(querier string) { defer s.mtx.Unlock() s.queues.removeQuerierConnection(querier) } + +func (s *Scheduler) getConnectedQuerierClientsMetric() float64 { + return float64(s.connectedQuerierWorkers.Load()) +} + +func (s *Scheduler) getConnectedFrontendClientsMetric() float64 { + s.connectedFrontendsMu.Lock() + defer s.connectedFrontendsMu.Unlock() + + count := 0 + for _, workers := range s.connectedFrontends { + count += workers.connections + } + + return float64(count) +} diff --git a/pkg/querier/frontend2/scheduler_test.go b/pkg/querier/frontend2/scheduler_test.go index ae399503bbe..c141c65f38c 100644 --- a/pkg/querier/frontend2/scheduler_test.go +++ b/pkg/querier/frontend2/scheduler_test.go @@ -9,6 +9,7 @@ import ( "github.com/go-kit/kit/log" "github.com/opentracing/opentracing-go" + promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" "github.com/uber/jaeger-client-go/config" "github.com/weaveworks/common/httpgrpc" @@ -164,11 +165,21 @@ func TestSchedulerEnqueueWithFrontendDisconnect(t *testing.T) { HttpRequest: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"}, }) - querierLoop := initQuerierLoop(t, querierClient, "querier-1") + // Wait until the frontend has connected to the scheduler. + test.Poll(t, time.Second, float64(1), func() interface{} { + return promtest.ToFloat64(scheduler.connectedFrontendClients) + }) // Disconnect frontend. require.NoError(t, frontendLoop.CloseSend()) + // Wait until the frontend has disconnected. + test.Poll(t, time.Second, float64(0), func() interface{} { + return promtest.ToFloat64(scheduler.connectedFrontendClients) + }) + + querierLoop := initQuerierLoop(t, querierClient, "querier-1") + verifyQuerierDoesntReceiveRequest(t, querierLoop, 500*time.Millisecond) verifyNoPendingRequestsLeft(t, scheduler) }