Skip to content

Commit 986b7c2

Browse files
committed
handle running scheduler but not yet in shouldRun state
1 parent b3e8026 commit 986b7c2

File tree

2 files changed

+61
-1
lines changed

2 files changed

+61
-1
lines changed

pkg/scheduler/scheduler.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,10 @@ func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_Front
250250
if err := frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}); err != nil {
251251
return err
252252
}
253+
} else if s.State() == services.Running && !s.shouldRun.Load() {
254+
// Scheduler is "RUNNING" should not run (yet)
255+
level.Info(s.log).Log("msg", "scheduler is not in ReplicationSet, sending ERROR so frontend can try another scheduler", "frontend", frontendAddress)
256+
return frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.ERROR})
253257
}
254258

255259
// We stop accepting new queries in Stopping state. By returning quickly, we disconnect frontends, which in turns

pkg/scheduler/scheduler_test.go

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ import (
66
"testing"
77

88
"github.com/grafana/dskit/httpgrpc"
9+
"github.com/grafana/dskit/services"
910
"github.com/prometheus/client_golang/prometheus"
1011
"github.com/prometheus/client_golang/prometheus/promauto"
1112

1213
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
1315
"google.golang.org/grpc/metadata"
1416

1517
"github.com/grafana/loki/v3/pkg/scheduler/schedulerpb"
@@ -63,6 +65,56 @@ func TestScheduler_setRunState(t *testing.T) {
6365
s.setRunState(false)
6466
assert.Nil(t, mock.msg)
6567

68+
}
69+
func TestFrontendConnectsToRunningSchedulerButBeforeShouldRun(t *testing.T) {
70+
71+
// This test is even a bit bit cruder than the one above as we inject a noop BaseService
72+
// to have a way to transition into the RUNNING state.
73+
// This scheduler starts with no frontends connected
74+
75+
s := Scheduler{
76+
log: util_log.Logger,
77+
schedulerRunning: promauto.With(nil).NewGauge(prometheus.GaugeOpts{
78+
Name: "cortex_query_scheduler_running",
79+
Help: "Value will be 1 if the scheduler is in the ReplicationSet and actively receiving/processing requests",
80+
}),
81+
Service: services.NewBasicService(func(serviceContext context.Context) error {
82+
return nil
83+
}, func(serviceContext context.Context) error {
84+
<-serviceContext.Done()
85+
return serviceContext.Err()
86+
}, func(failureCase error) error {
87+
return nil
88+
}),
89+
}
90+
require.NoError(t, s.StartAsync(t.Context()))
91+
require.NoError(t, s.AwaitRunning(t.Context()))
92+
require.Equal(t, services.Running, s.State())
93+
mock := &mockSchedulerForFrontendFrontendLoopServer{
94+
recvFn: func() (*schedulerpb.FrontendToScheduler, error) {
95+
return &schedulerpb.FrontendToScheduler{
96+
Type: schedulerpb.INIT,
97+
FrontendAddress: "127.0.0.1:9095",
98+
}, nil
99+
},
100+
}
101+
s.connectedFrontends = map[string]*connectedFrontend{}
102+
103+
// not_running, shouldRun == false
104+
assert.False(t, s.shouldRun.Load())
105+
106+
err := s.FrontendLoop(mock)
107+
assert.NoError(t, err)
108+
109+
// not_running -> running, shouldRun == true
110+
// to simulate last "setRunState(true)" happening after FrontendLoop started
111+
s.setRunState(true)
112+
assert.True(t, s.shouldRun.Load())
113+
114+
// Now we expect the scheduler to have sent a ERROR message to the frontend
115+
// so the frontend will retry connecting now that the scheduler and is not waiting for an INIT response
116+
assert.Equal(t, schedulerpb.ERROR, mock.msg.Status)
117+
66118
}
67119

68120
func TestProtobufBackwardsCompatibility(t *testing.T) {
@@ -114,7 +166,8 @@ func TestProtobufBackwardsCompatibility(t *testing.T) {
114166
}
115167

116168
type mockSchedulerForFrontendFrontendLoopServer struct {
117-
msg *schedulerpb.SchedulerToFrontend
169+
msg *schedulerpb.SchedulerToFrontend
170+
recvFn func() (*schedulerpb.FrontendToScheduler, error)
118171
}
119172

120173
func (m *mockSchedulerForFrontendFrontendLoopServer) Send(frontend *schedulerpb.SchedulerToFrontend) error {
@@ -123,6 +176,9 @@ func (m *mockSchedulerForFrontendFrontendLoopServer) Send(frontend *schedulerpb.
123176
}
124177

125178
func (m mockSchedulerForFrontendFrontendLoopServer) Recv() (*schedulerpb.FrontendToScheduler, error) {
179+
if m.recvFn != nil {
180+
return m.recvFn()
181+
}
126182
panic("implement me")
127183
}
128184

0 commit comments

Comments
 (0)