From 925eca4745c92b1f2c7dd7ea7c1200c161b4127d Mon Sep 17 00:00:00 2001 From: Max Froehlich Date: Fri, 17 Oct 2025 21:37:22 +0200 Subject: [PATCH 1/2] handle if frontend INIT is not replied to --- docs/sources/shared/configuration.md | 5 ++ pkg/lokifrontend/frontend/v2/frontend.go | 1 + .../frontend/v2/frontend_scheduler_worker.go | 51 +++++++++++++- pkg/lokifrontend/frontend/v2/frontend_test.go | 67 +++++++++++++++++-- 4 files changed, 115 insertions(+), 9 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 0116c2523a7a8..106afc58661ef 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3219,6 +3219,11 @@ The `frontend` block configures the Loki query-frontend. # CLI flag: -frontend.scheduler-worker-concurrency [scheduler_worker_concurrency: | default = 5] +# Time to wait for a worker to initialize connection to query-scheduler. +# If left empty, no timeout is applied. +# If the timeout is reached, the worker init fails, backoffs and tries again. +[scheduler_worker_init_timeout: | default = 0s] + # The grpc_client block configures the gRPC client used to communicate between a # client and server component in Loki. # The CLI flags prefix for this block configuration is: diff --git a/pkg/lokifrontend/frontend/v2/frontend.go b/pkg/lokifrontend/frontend/v2/frontend.go index 1bf36d6cb4e57..42220e325993e 100644 --- a/pkg/lokifrontend/frontend/v2/frontend.go +++ b/pkg/lokifrontend/frontend/v2/frontend.go @@ -47,6 +47,7 @@ type Config struct { SchedulerAddress string `yaml:"scheduler_address"` DNSLookupPeriod time.Duration `yaml:"scheduler_dns_lookup_period"` WorkerConcurrency int `yaml:"scheduler_worker_concurrency"` + WorkerInitTimeout time.Duration `yaml:"scheduler_worker_init_timeout"` GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` GracefulShutdownTimeout time.Duration `yaml:"graceful_shutdown_timeout"` diff --git a/pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go b/pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go index 585fce797818f..725d1382af428 100644 --- a/pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go @@ -35,6 +35,8 @@ type frontendSchedulerWorkers struct { mu sync.Mutex // Set to nil when stop is called... no more workers are created afterwards. workers map[string]*frontendSchedulerWorker + + workerInitTimeout time.Duration } func newFrontendSchedulerWorkers(cfg Config, frontendAddress string, ring ring.ReadRing, requestsCh <-chan *frontendRequest, logger log.Logger) (*frontendSchedulerWorkers, error) { @@ -44,6 +46,8 @@ func newFrontendSchedulerWorkers(cfg Config, frontendAddress string, ring ring.R frontendAddress: frontendAddress, requestsCh: requestsCh, workers: map[string]*frontendSchedulerWorker{}, + + workerInitTimeout: cfg.WorkerInitTimeout, } switch { @@ -111,7 +115,7 @@ func (f *frontendSchedulerWorkers) AddressAdded(address string) { } // No worker for this address yet, start a new one. - w = newFrontendSchedulerWorker(conn, address, f.frontendAddress, f.requestsCh, f.cfg.WorkerConcurrency, f.logger) + w = newFrontendSchedulerWorker(conn, address, f.frontendAddress, f.requestsCh, f.cfg.WorkerConcurrency, f.logger, f.workerInitTimeout) f.mu.Lock() defer f.mu.Unlock() @@ -187,9 +191,11 @@ type frontendSchedulerWorker struct { // Cancellation requests for this scheduler are received via this channel. It is passed to frontend after // query has been enqueued to scheduler. cancelCh chan uint64 + + workerInitTimeout time.Duration } -func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, frontendAddr string, requestCh <-chan *frontendRequest, concurrency int, log log.Logger) *frontendSchedulerWorker { +func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, frontendAddr string, requestCh <-chan *frontendRequest, concurrency int, log log.Logger, workerInitTimeout time.Duration) *frontendSchedulerWorker { w := &frontendSchedulerWorker{ log: log, conn: conn, @@ -199,6 +205,8 @@ func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, fro requestCh: requestCh, // Allow to enqueue enough cancellation requests. ~ 8MB memory size. cancelCh: make(chan uint64, 1000000), + + workerInitTimeout: workerInitTimeout, } w.ctx, w.cancel = context.WithCancel(context.Background()) @@ -262,7 +270,10 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro return err } - if resp, err := loop.Recv(); err != nil || resp.Status != schedulerpb.OK { + if resp, err := w.receiveInitResponse(loop); err != nil || resp.Status != schedulerpb.OK { + if errors.Is(err, context.DeadlineExceeded) { + return errors.New("timeout waiting for INIT response from scheduler") + } if err != nil { return err } @@ -355,3 +366,37 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro } } } + +func (w *frontendSchedulerWorker) receiveInitResponse(loop schedulerpb.SchedulerForFrontend_FrontendLoopClient) (*schedulerpb.SchedulerToFrontend, error) { + if w.workerInitTimeout > 0 { + initRespCtx, cancel := context.WithTimeout(loop.Context(), w.workerInitTimeout) + defer cancel() + return w.receiveWithContext(initRespCtx, loop) + } + return loop.Recv() +} + +// calls loop.Recv() with timeout +// only used to add timeout to receive when waiting for INIT response +func (w *frontendSchedulerWorker) receiveWithContext(ctx context.Context, loop schedulerpb.SchedulerForFrontend_FrontendLoopClient) (*schedulerpb.SchedulerToFrontend, error) { + + receiveCh := make(chan struct { + resp *schedulerpb.SchedulerToFrontend + err error + }, 1) + + go func() { + resp, err := loop.Recv() + receiveCh <- struct { + resp *schedulerpb.SchedulerToFrontend + err error + }{resp, err} + }() + + select { + case initResultOrErr := <-receiveCh: + return initResultOrErr.resp, initResultOrErr.err + case <-ctx.Done(): + return nil, ctx.Err() + } +} diff --git a/pkg/lokifrontend/frontend/v2/frontend_test.go b/pkg/lokifrontend/frontend/v2/frontend_test.go index baf62348216f5..28586bc812352 100644 --- a/pkg/lokifrontend/frontend/v2/frontend_test.go +++ b/pkg/lokifrontend/frontend/v2/frontend_test.go @@ -32,6 +32,9 @@ import ( const testFrontendWorkerConcurrency = 5 func setupFrontend(t *testing.T, cfg Config, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) (*Frontend, *mockScheduler) { + return setupFrontendWithSchedulerWithIsReadyState(t, cfg, true, schedulerReplyFunc) +} +func setupFrontendWithSchedulerWithIsReadyState(t *testing.T, cfg Config, isReady bool, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) (*Frontend, *mockScheduler) { l, err := net.Listen("tcp", "") require.NoError(t, err) @@ -54,7 +57,7 @@ func setupFrontend(t *testing.T, cfg Config, schedulerReplyFunc func(f *Frontend frontendv2pb.RegisterFrontendForQuerierServer(server, f) - ms := newMockScheduler(t, f, schedulerReplyFunc) + ms := newMockScheduler(t, f, isReady, schedulerReplyFunc) schedulerpb.RegisterSchedulerForFrontendServer(server, ms) require.NoError(t, services.StartAndAwaitRunning(context.Background(), f)) @@ -120,7 +123,40 @@ func TestFrontendBasicWorkflow(t *testing.T) { require.Equal(t, int32(200), resp.Code) require.Equal(t, []byte(body), resp.Body) } +func TestFrontendWorkflowWithMissingInitialInitResponse(t *testing.T) { + + // Simulate scheduler not being ready at the time of frontend connect time + // Therefore INIT response is not sent (current behaviour of the real scheduler) + // same test as in the TestFrontendBasicWorkflow but with delayed scheduler readiness + + const ( + body = "all fine here" + userID = "test" + ) + + cfg := Config{ + WorkerInitTimeout: 1 * time.Second, // enable worker init timeout for test + } + flagext.DefaultValues(&cfg) + f, mockScheduler := setupFrontendWithSchedulerWithIsReadyState(t, cfg, false, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend { + go sendResponseWithDelay(f, 100*time.Millisecond, userID, msg.QueryID, &httpgrpc.HTTPResponse{ + Code: 200, + Body: []byte(body), + }) + return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} + }) + go func() { + // Simulate scheduler is now ready to accept requests after some delay + time.Sleep(500 * time.Millisecond) + mockScheduler.setReady(true) + }() + + resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}) + require.NoError(t, err) + require.Equal(t, int32(200), resp.Code) + require.Equal(t, []byte(body), resp.Body) +} func TestFrontendBasicWorkflowProto(t *testing.T) { const ( userID = "test" @@ -426,6 +462,9 @@ type mockScheduler struct { t *testing.T f *Frontend + isReadyMu sync.RWMutex // not sure if this is needed in tests + isReady bool + replyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend mu sync.Mutex @@ -433,8 +472,8 @@ type mockScheduler struct { msgs []*schedulerpb.FrontendToScheduler } -func newMockScheduler(t *testing.T, f *Frontend, replyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) *mockScheduler { - return &mockScheduler{t: t, f: f, frontendAddr: map[string]int{}, replyFunc: replyFunc} +func newMockScheduler(t *testing.T, f *Frontend, isReady bool, replyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) *mockScheduler { + return &mockScheduler{t: t, f: f, frontendAddr: map[string]int{}, replyFunc: replyFunc, isReady: isReady} } func (m *mockScheduler) checkWithLock(fn func()) { @@ -443,6 +482,18 @@ func (m *mockScheduler) checkWithLock(fn func()) { fn() } +func (m *mockScheduler) setReady(ready bool) { + m.isReadyMu.Lock() + defer m.isReadyMu.Unlock() + + m.isReady = ready +} +func (m *mockScheduler) shouldRun() bool { + m.isReadyMu.RLock() + defer m.isReadyMu.RUnlock() + + return m.isReady +} func (m *mockScheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer) error { init, err := frontend.Recv() @@ -454,10 +505,14 @@ func (m *mockScheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_F m.frontendAddr[init.FrontendAddress]++ m.mu.Unlock() - // Ack INIT from frontend. - if err := frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}); err != nil { - return err + if m.shouldRun() { // see should run in scheduler (state == runnning and shouldRun == true) + // Ack INIT from frontend. + if err := frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}); err != nil { + return err + } } + //TODO : the other fix which is in scheduler handles this and sends an error if not ready + // should this behavior be added here too? for { msg, err := frontend.Recv() From 8d1d23886f525720e4a4522a522ac52bf03f96c8 Mon Sep 17 00:00:00 2001 From: Max Froehlich Date: Fri, 17 Oct 2025 21:38:06 +0200 Subject: [PATCH 2/2] handle running scheduler but not yet in shouldRun state --- pkg/scheduler/scheduler.go | 4 +++ pkg/scheduler/scheduler_test.go | 58 ++++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 74c03bf31021b..0e884d91ad49a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -250,6 +250,10 @@ func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_Front if err := frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}); err != nil { return err } + } else if s.State() == services.Running && !s.shouldRun.Load() { + // Scheduler is "RUNNING" should not run (yet) + level.Info(s.log).Log("msg", "scheduler is not in ReplicationSet, sending ERROR so frontend can try another scheduler", "frontend", frontendAddress) + return frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.ERROR}) } // We stop accepting new queries in Stopping state. By returning quickly, we disconnect frontends, which in turns diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 939aa2b18bb93..2412e65ce8370 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -6,10 +6,12 @@ import ( "testing" "github.com/grafana/dskit/httpgrpc" + "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/grpc/metadata" "github.com/grafana/loki/v3/pkg/scheduler/schedulerpb" @@ -63,6 +65,56 @@ func TestScheduler_setRunState(t *testing.T) { s.setRunState(false) assert.Nil(t, mock.msg) +} +func TestFrontendConnectsToRunningSchedulerButBeforeShouldRun(t *testing.T) { + + // This test is even a bit bit cruder than the one above as we inject a noop BaseService + // to have a way to transition into the RUNNING state. + // This scheduler starts with no frontends connected + + s := Scheduler{ + log: util_log.Logger, + schedulerRunning: promauto.With(nil).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_query_scheduler_running", + Help: "Value will be 1 if the scheduler is in the ReplicationSet and actively receiving/processing requests", + }), + Service: services.NewBasicService(func(serviceContext context.Context) error { + return nil + }, func(serviceContext context.Context) error { + <-serviceContext.Done() + return serviceContext.Err() + }, func(failureCase error) error { + return nil + }), + } + require.NoError(t, s.StartAsync(t.Context())) + require.NoError(t, s.AwaitRunning(t.Context())) + require.Equal(t, services.Running, s.State()) + mock := &mockSchedulerForFrontendFrontendLoopServer{ + recvFn: func() (*schedulerpb.FrontendToScheduler, error) { + return &schedulerpb.FrontendToScheduler{ + Type: schedulerpb.INIT, + FrontendAddress: "127.0.0.1:9095", + }, nil + }, + } + s.connectedFrontends = map[string]*connectedFrontend{} + + // not_running, shouldRun == false + assert.False(t, s.shouldRun.Load()) + + err := s.FrontendLoop(mock) + assert.NoError(t, err) + + // not_running -> running, shouldRun == true + // to simulate last "setRunState(true)" happening after FrontendLoop started + s.setRunState(true) + assert.True(t, s.shouldRun.Load()) + + // Now we expect the scheduler to have sent a ERROR message to the frontend + // so the frontend will retry connecting now that the scheduler and is not waiting for an INIT response + assert.Equal(t, schedulerpb.ERROR, mock.msg.Status) + } func TestProtobufBackwardsCompatibility(t *testing.T) { @@ -114,7 +166,8 @@ func TestProtobufBackwardsCompatibility(t *testing.T) { } type mockSchedulerForFrontendFrontendLoopServer struct { - msg *schedulerpb.SchedulerToFrontend + msg *schedulerpb.SchedulerToFrontend + recvFn func() (*schedulerpb.FrontendToScheduler, error) } func (m *mockSchedulerForFrontendFrontendLoopServer) Send(frontend *schedulerpb.SchedulerToFrontend) error { @@ -123,6 +176,9 @@ func (m *mockSchedulerForFrontendFrontendLoopServer) Send(frontend *schedulerpb. } func (m mockSchedulerForFrontendFrontendLoopServer) Recv() (*schedulerpb.FrontendToScheduler, error) { + if m.recvFn != nil { + return m.recvFn() + } panic("implement me") }