Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 77 additions & 54 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,78 +184,101 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

freq := &frontendRequest{
queryID: f.lastQueryID.Inc(),
request: req,
userID: userID,
statsEnabled: stats.IsEnabled(ctx),
return f.withRetry(func() (*httpgrpc.HTTPResponse, error) {
freq := &frontendRequest{
queryID: f.lastQueryID.Inc(),
request: req,
userID: userID,
statsEnabled: stats.IsEnabled(ctx),

cancel: cancel,
cancel: cancel,

// Buffer of 1 to ensure response or error can be written to the channel
// even if this goroutine goes away due to client context cancellation.
enqueue: make(chan enqueueResult, 1),
response: make(chan *frontendv2pb.QueryResultRequest, 1),
// Buffer of 1 to ensure response or error can be written to the channel
// even if this goroutine goes away due to client context cancellation.
enqueue: make(chan enqueueResult, 1),
response: make(chan *frontendv2pb.QueryResultRequest, 1),

retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1,
}

f.requests.put(freq)
defer f.requests.delete(freq.queryID)
retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1,
}

retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers.
f.requests.put(freq)
defer f.requests.delete(freq.queryID)

enqueueAgain:
select {
case <-ctx.Done():
return nil, ctx.Err()
retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers.

case f.requestsCh <- freq:
// Enqueued, let's wait for response.
}
enqueueAgain:
select {
case <-ctx.Done():
return nil, ctx.Err()

var cancelCh chan<- uint64
case f.requestsCh <- freq:
// Enqueued, let's wait for response.
}

select {
case <-ctx.Done():
return nil, ctx.Err()
var cancelCh chan<- uint64

case enqRes := <-freq.enqueue:
if enqRes.status == waitForResponse {
cancelCh = enqRes.cancelCh
break // go wait for response.
} else if enqRes.status == failed {
retries--
if retries > 0 {
goto enqueueAgain
select {
case <-ctx.Done():
return nil, ctx.Err()

case enqRes := <-freq.enqueue:
if enqRes.status == waitForResponse {
cancelCh = enqRes.cancelCh
break // go wait for response.
} else if enqRes.status == failed {
retries--
if retries > 0 {
goto enqueueAgain
}
}

return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request")
}

return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request")
}
select {
case <-ctx.Done():
if cancelCh != nil {
select {
case cancelCh <- freq.queryID:
// cancellation sent.
default:
// failed to cancel, log it.
level.Warn(util_log.WithContext(ctx, f.log)).Log("msg", "failed to enqueue cancellation signal", "query_id", freq.queryID)
f.cancelFailedQueries.Inc()
}
}
return nil, ctx.Err()

select {
case <-ctx.Done():
if cancelCh != nil {
select {
case cancelCh <- freq.queryID:
// cancellation sent.
default:
// failed to cancel, log it.
level.Warn(util_log.WithContext(ctx, f.log)).Log("msg", "failed to enqueue cancellation signal", "query_id", freq.queryID)
f.cancelFailedQueries.Inc()
case resp := <-freq.response:
if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) {
stats := stats.FromContext(ctx)
stats.Merge(resp.Stats) // Safe if stats is nil.
}
}
return nil, ctx.Err()

case resp := <-freq.response:
if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) {
stats := stats.FromContext(ctx)
stats.Merge(resp.Stats) // Safe if stats is nil.
return resp.HttpResponse, nil
}
})
}

return resp.HttpResponse, nil
func (f *Frontend) withRetry(do func() (*httpgrpc.HTTPResponse, error)) (*httpgrpc.HTTPResponse, error) {
tries := 3
var (
resp *httpgrpc.HTTPResponse
err error
)
for tries > 0 {
tries--
resp, err = do()

if err != nil && err != context.Canceled {
continue // Retryable
} else if resp != nil && resp.Code/100 == 5 {
continue // Retryable
} else {
break
}
}
return resp, err
}

func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryResultRequest) (*frontendv2pb.QueryResultResponse, error) {
Expand Down
30 changes: 30 additions & 0 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,36 @@ func TestFrontendBasicWorkflow(t *testing.T) {
require.Equal(t, []byte(body), resp.Body)
}

func TestFrontendRetryRequest(t *testing.T) {
// Frontend uses worker concurrency to compute number of retries. We use one less failure.
tries := atomic.NewInt64(3)
const (
body = "hello world"
userID = "test"
)

f, _ := setupFrontend(t, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
try := tries.Dec()
if try > 0 {
go sendResponseWithDelay(f, 100*time.Millisecond, userID, msg.QueryID, &httpgrpc.HTTPResponse{
Code: 500,
Body: []byte(body),
})
} else {
go sendResponseWithDelay(f, 100*time.Millisecond, userID, msg.QueryID, &httpgrpc.HTTPResponse{
Code: 200,
Body: []byte(body),
})
}

return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
})

res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int32(200), res.Code)
}

func TestFrontendRetryEnqueue(t *testing.T) {
// Frontend uses worker concurrency to compute number of retries. We use one less failure.
failures := atomic.NewInt64(testFrontendWorkerConcurrency - 1)
Expand Down