Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3219,6 +3219,11 @@ The `frontend` block configures the Loki query-frontend.
# CLI flag: -frontend.scheduler-worker-concurrency
[scheduler_worker_concurrency: <int> | default = 5]
# Time to wait for a worker to initialize connection to query-scheduler.
# If left empty, no timeout is applied.
# If the timeout is reached, the worker init fails, backoffs and tries again.
[scheduler_worker_init_timeout: <duration> | default = 0s]
# The grpc_client block configures the gRPC client used to communicate between a
# client and server component in Loki.
# The CLI flags prefix for this block configuration is:
Expand Down
1 change: 1 addition & 0 deletions pkg/lokifrontend/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Config struct {
SchedulerAddress string `yaml:"scheduler_address"`
DNSLookupPeriod time.Duration `yaml:"scheduler_dns_lookup_period"`
WorkerConcurrency int `yaml:"scheduler_worker_concurrency"`
WorkerInitTimeout time.Duration `yaml:"scheduler_worker_init_timeout"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
GracefulShutdownTimeout time.Duration `yaml:"graceful_shutdown_timeout"`

Expand Down
51 changes: 48 additions & 3 deletions pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type frontendSchedulerWorkers struct {
mu sync.Mutex
// Set to nil when stop is called... no more workers are created afterwards.
workers map[string]*frontendSchedulerWorker

workerInitTimeout time.Duration
}

func newFrontendSchedulerWorkers(cfg Config, frontendAddress string, ring ring.ReadRing, requestsCh <-chan *frontendRequest, logger log.Logger) (*frontendSchedulerWorkers, error) {
Expand All @@ -44,6 +46,8 @@ func newFrontendSchedulerWorkers(cfg Config, frontendAddress string, ring ring.R
frontendAddress: frontendAddress,
requestsCh: requestsCh,
workers: map[string]*frontendSchedulerWorker{},

workerInitTimeout: cfg.WorkerInitTimeout,
}

switch {
Expand Down Expand Up @@ -111,7 +115,7 @@ func (f *frontendSchedulerWorkers) AddressAdded(address string) {
}

// No worker for this address yet, start a new one.
w = newFrontendSchedulerWorker(conn, address, f.frontendAddress, f.requestsCh, f.cfg.WorkerConcurrency, f.logger)
w = newFrontendSchedulerWorker(conn, address, f.frontendAddress, f.requestsCh, f.cfg.WorkerConcurrency, f.logger, f.workerInitTimeout)

f.mu.Lock()
defer f.mu.Unlock()
Expand Down Expand Up @@ -187,9 +191,11 @@ type frontendSchedulerWorker struct {
// Cancellation requests for this scheduler are received via this channel. It is passed to frontend after
// query has been enqueued to scheduler.
cancelCh chan uint64

workerInitTimeout time.Duration
}

func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, frontendAddr string, requestCh <-chan *frontendRequest, concurrency int, log log.Logger) *frontendSchedulerWorker {
func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, frontendAddr string, requestCh <-chan *frontendRequest, concurrency int, log log.Logger, workerInitTimeout time.Duration) *frontendSchedulerWorker {
w := &frontendSchedulerWorker{
log: log,
conn: conn,
Expand All @@ -199,6 +205,8 @@ func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, fro
requestCh: requestCh,
// Allow to enqueue enough cancellation requests. ~ 8MB memory size.
cancelCh: make(chan uint64, 1000000),

workerInitTimeout: workerInitTimeout,
}
w.ctx, w.cancel = context.WithCancel(context.Background())

Expand Down Expand Up @@ -262,7 +270,10 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
return err
}

if resp, err := loop.Recv(); err != nil || resp.Status != schedulerpb.OK {
if resp, err := w.receiveInitResponse(loop); err != nil || resp.Status != schedulerpb.OK {
if errors.Is(err, context.DeadlineExceeded) {
return errors.New("timeout waiting for INIT response from scheduler")
}
if err != nil {
return err
}
Expand Down Expand Up @@ -355,3 +366,37 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
}
}
}

func (w *frontendSchedulerWorker) receiveInitResponse(loop schedulerpb.SchedulerForFrontend_FrontendLoopClient) (*schedulerpb.SchedulerToFrontend, error) {
if w.workerInitTimeout > 0 {
initRespCtx, cancel := context.WithTimeout(loop.Context(), w.workerInitTimeout)
defer cancel()
return w.receiveWithContext(initRespCtx, loop)
}
return loop.Recv()
}

// calls loop.Recv() with timeout
// only used to add timeout to receive when waiting for INIT response
func (w *frontendSchedulerWorker) receiveWithContext(ctx context.Context, loop schedulerpb.SchedulerForFrontend_FrontendLoopClient) (*schedulerpb.SchedulerToFrontend, error) {

receiveCh := make(chan struct {
resp *schedulerpb.SchedulerToFrontend
err error
}, 1)

go func() {
resp, err := loop.Recv()
receiveCh <- struct {
resp *schedulerpb.SchedulerToFrontend
err error
}{resp, err}
}()

select {
case initResultOrErr := <-receiveCh:
return initResultOrErr.resp, initResultOrErr.err
case <-ctx.Done():
return nil, ctx.Err()
}
}
67 changes: 61 additions & 6 deletions pkg/lokifrontend/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
const testFrontendWorkerConcurrency = 5

func setupFrontend(t *testing.T, cfg Config, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) (*Frontend, *mockScheduler) {
return setupFrontendWithSchedulerWithIsReadyState(t, cfg, true, schedulerReplyFunc)
}
func setupFrontendWithSchedulerWithIsReadyState(t *testing.T, cfg Config, isReady bool, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) (*Frontend, *mockScheduler) {
l, err := net.Listen("tcp", "")
require.NoError(t, err)

Expand All @@ -54,7 +57,7 @@ func setupFrontend(t *testing.T, cfg Config, schedulerReplyFunc func(f *Frontend

frontendv2pb.RegisterFrontendForQuerierServer(server, f)

ms := newMockScheduler(t, f, schedulerReplyFunc)
ms := newMockScheduler(t, f, isReady, schedulerReplyFunc)
schedulerpb.RegisterSchedulerForFrontendServer(server, ms)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), f))
Expand Down Expand Up @@ -120,7 +123,40 @@ func TestFrontendBasicWorkflow(t *testing.T) {
require.Equal(t, int32(200), resp.Code)
require.Equal(t, []byte(body), resp.Body)
}
func TestFrontendWorkflowWithMissingInitialInitResponse(t *testing.T) {

// Simulate scheduler not being ready at the time of frontend connect time
// Therefore INIT response is not sent (current behaviour of the real scheduler)
// same test as in the TestFrontendBasicWorkflow but with delayed scheduler readiness

const (
body = "all fine here"
userID = "test"
)

cfg := Config{
WorkerInitTimeout: 1 * time.Second, // enable worker init timeout for test
}
flagext.DefaultValues(&cfg)
f, mockScheduler := setupFrontendWithSchedulerWithIsReadyState(t, cfg, false, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
go sendResponseWithDelay(f, 100*time.Millisecond, userID, msg.QueryID, &httpgrpc.HTTPResponse{
Code: 200,
Body: []byte(body),
})
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
})

go func() {
// Simulate scheduler is now ready to accept requests after some delay
time.Sleep(500 * time.Millisecond)
mockScheduler.setReady(true)
}()

resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int32(200), resp.Code)
require.Equal(t, []byte(body), resp.Body)
}
func TestFrontendBasicWorkflowProto(t *testing.T) {
const (
userID = "test"
Expand Down Expand Up @@ -426,15 +462,18 @@ type mockScheduler struct {
t *testing.T
f *Frontend

isReadyMu sync.RWMutex // not sure if this is needed in tests
isReady bool

replyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend

mu sync.Mutex
frontendAddr map[string]int
msgs []*schedulerpb.FrontendToScheduler
}

func newMockScheduler(t *testing.T, f *Frontend, replyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) *mockScheduler {
return &mockScheduler{t: t, f: f, frontendAddr: map[string]int{}, replyFunc: replyFunc}
func newMockScheduler(t *testing.T, f *Frontend, isReady bool, replyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) *mockScheduler {
return &mockScheduler{t: t, f: f, frontendAddr: map[string]int{}, replyFunc: replyFunc, isReady: isReady}
}

func (m *mockScheduler) checkWithLock(fn func()) {
Expand All @@ -443,6 +482,18 @@ func (m *mockScheduler) checkWithLock(fn func()) {

fn()
}
func (m *mockScheduler) setReady(ready bool) {
m.isReadyMu.Lock()
defer m.isReadyMu.Unlock()

m.isReady = ready
}
func (m *mockScheduler) shouldRun() bool {
m.isReadyMu.RLock()
defer m.isReadyMu.RUnlock()

return m.isReady
}

func (m *mockScheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer) error {
init, err := frontend.Recv()
Expand All @@ -454,10 +505,14 @@ func (m *mockScheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_F
m.frontendAddr[init.FrontendAddress]++
m.mu.Unlock()

// Ack INIT from frontend.
if err := frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}); err != nil {
return err
if m.shouldRun() { // see should run in scheduler (state == runnning and shouldRun == true)
// Ack INIT from frontend.
if err := frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}); err != nil {
return err
}
}
//TODO : the other fix which is in scheduler handles this and sends an error if not ready
// should this behavior be added here too?

for {
msg, err := frontend.Recv()
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_Front
if err := frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}); err != nil {
return err
}
} else if s.State() == services.Running && !s.shouldRun.Load() {
// Scheduler is "RUNNING" should not run (yet)
level.Info(s.log).Log("msg", "scheduler is not in ReplicationSet, sending ERROR so frontend can try another scheduler", "frontend", frontendAddress)
return frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.ERROR})
}

// We stop accepting new queries in Stopping state. By returning quickly, we disconnect frontends, which in turns
Expand Down
58 changes: 57 additions & 1 deletion pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"testing"

"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"

"github.com/grafana/loki/v3/pkg/scheduler/schedulerpb"
Expand Down Expand Up @@ -63,6 +65,56 @@ func TestScheduler_setRunState(t *testing.T) {
s.setRunState(false)
assert.Nil(t, mock.msg)

}
func TestFrontendConnectsToRunningSchedulerButBeforeShouldRun(t *testing.T) {

// This test is even a bit bit cruder than the one above as we inject a noop BaseService
// to have a way to transition into the RUNNING state.
// This scheduler starts with no frontends connected

s := Scheduler{
log: util_log.Logger,
schedulerRunning: promauto.With(nil).NewGauge(prometheus.GaugeOpts{
Name: "cortex_query_scheduler_running",
Help: "Value will be 1 if the scheduler is in the ReplicationSet and actively receiving/processing requests",
}),
Service: services.NewBasicService(func(serviceContext context.Context) error {
return nil
}, func(serviceContext context.Context) error {
<-serviceContext.Done()
return serviceContext.Err()
}, func(failureCase error) error {
return nil
}),
}
require.NoError(t, s.StartAsync(t.Context()))
require.NoError(t, s.AwaitRunning(t.Context()))
require.Equal(t, services.Running, s.State())
mock := &mockSchedulerForFrontendFrontendLoopServer{
recvFn: func() (*schedulerpb.FrontendToScheduler, error) {
return &schedulerpb.FrontendToScheduler{
Type: schedulerpb.INIT,
FrontendAddress: "127.0.0.1:9095",
}, nil
},
}
s.connectedFrontends = map[string]*connectedFrontend{}

// not_running, shouldRun == false
assert.False(t, s.shouldRun.Load())

err := s.FrontendLoop(mock)
assert.NoError(t, err)

// not_running -> running, shouldRun == true
// to simulate last "setRunState(true)" happening after FrontendLoop started
s.setRunState(true)
assert.True(t, s.shouldRun.Load())

// Now we expect the scheduler to have sent a ERROR message to the frontend
// so the frontend will retry connecting now that the scheduler and is not waiting for an INIT response
assert.Equal(t, schedulerpb.ERROR, mock.msg.Status)

}

func TestProtobufBackwardsCompatibility(t *testing.T) {
Expand Down Expand Up @@ -114,7 +166,8 @@ func TestProtobufBackwardsCompatibility(t *testing.T) {
}

type mockSchedulerForFrontendFrontendLoopServer struct {
msg *schedulerpb.SchedulerToFrontend
msg *schedulerpb.SchedulerToFrontend
recvFn func() (*schedulerpb.FrontendToScheduler, error)
}

func (m *mockSchedulerForFrontendFrontendLoopServer) Send(frontend *schedulerpb.SchedulerToFrontend) error {
Expand All @@ -123,6 +176,9 @@ func (m *mockSchedulerForFrontendFrontendLoopServer) Send(frontend *schedulerpb.
}

func (m mockSchedulerForFrontendFrontendLoopServer) Recv() (*schedulerpb.FrontendToScheduler, error) {
if m.recvFn != nil {
return m.recvFn()
}
panic("implement me")
}

Expand Down