Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [ENHANCEMENT] Querier: Apply bytes limiter to LabelNames and LabelValuesForLabelNames. #6568
* [ENHANCEMENT] Query Frontend: Add a `too_many_tenants` reason label value to `cortex_rejected_queries_total` metric to track the rejected query count due to the # of tenant limits. #6569
* [ENHANCEMENT] Alertmanager: Add receiver validations for msteamsv2 and rocketchat. #6606
* [ENHANCEMENT] Query Frontend: Add a `-frontend.enabled-ruler-query-stats` flag to configure whether to report the query stats log for queries coming from the Ruler. #6504
* [ENHANCEMENT] OTLP: Support otlp metadata ingestion. #6617
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4175,6 +4175,12 @@ The `query_frontend_config` configures the Cortex query-frontend.
# CLI flag: -frontend.query-stats-enabled
[query_stats_enabled: <boolean> | default = false]

# If enabled, report the query stats log for queries coming from the ruler to
# evaluate rules. It only takes effect when '-ruler.frontend-address' is
# configured.
# CLI flag: -frontend.enabled-ruler-query-stats
[enabled_ruler_query_stats_log: <boolean> | default = false]

# If a querier disconnects without sending notification about graceful shutdown,
# the query-frontend will keep the querier in the tenant's shard until the
# forget delay has passed. This feature is useful to reduce the blast radius
Expand Down
36 changes: 22 additions & 14 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,17 @@ const (

// Config for a Handler.
type HandlerConfig struct {
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
MaxBodySize int64 `yaml:"max_body_size"`
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
MaxBodySize int64 `yaml:"max_body_size"`
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
EnabledRulerQueryStatsLog bool `yaml:"enabled_ruler_query_stats_log"`
}

func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.LogQueriesLongerThan, "frontend.log-queries-longer-than", 0, "Log queries that are slower than the specified duration. Set to 0 to disable. Set to < 0 to enable on all queries.")
f.Int64Var(&cfg.MaxBodySize, "frontend.max-body-size", 10*1024*1024, "Max body size for downstream prometheus.")
f.BoolVar(&cfg.QueryStatsEnabled, "frontend.query-stats-enabled", false, "True to enable query statistics tracking. When enabled, a message with some statistics is logged for every query.")
f.BoolVar(&cfg.EnabledRulerQueryStatsLog, "frontend.enabled-ruler-query-stats", false, "If enabled, report the query stats log for queries coming from the ruler to evaluate rules. It only takes effect when '-ruler.frontend-address' is configured.")
}

// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
Expand Down Expand Up @@ -245,10 +247,11 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body = io.NopCloser(&buf)
}

source := tripperware.GetSource(r.Header.Get("User-Agent"))
// Log request
if f.cfg.QueryStatsEnabled {
queryString = f.parseRequestQueryString(r, buf)
f.logQueryRequest(r, queryString)
f.logQueryRequest(r, queryString, source)
}

startTime := time.Now()
Expand Down Expand Up @@ -281,7 +284,6 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

source := tripperware.GetSource(r.Header.Get("User-Agent"))
f.reportQueryStats(r, source, userID, queryString, queryResponseTime, stats, err, statusCode, resp)
}

Expand Down Expand Up @@ -322,7 +324,7 @@ func formatGrafanaStatsFields(r *http.Request) []interface{} {
}

// logQueryRequest logs query request before query execution.
func (f *Handler) logQueryRequest(r *http.Request, queryString url.Values) {
func (f *Handler) logQueryRequest(r *http.Request, queryString url.Values, source string) {
logMessage := []interface{}{
"msg", "query request",
"component", "query-frontend",
Expand All @@ -346,9 +348,11 @@ func (f *Handler) logQueryRequest(r *http.Request, queryString url.Values) {
logMessage = append(logMessage, "accept_encoding", acceptEncoding)
}

logMessage = append(logMessage, formatQueryString(queryString)...)

level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
shouldLog := source == tripperware.SourceAPI || (f.cfg.EnabledRulerQueryStatsLog && source == tripperware.SourceRuler)
if shouldLog {
logMessage = append(logMessage, formatQueryString(queryString)...)
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}
}

// reportSlowQuery reports slow queries.
Expand Down Expand Up @@ -473,11 +477,15 @@ func (f *Handler) reportQueryStats(r *http.Request, source, userID string, query
logMessage = append(logMessage, "error", s.Message())
}
}
logMessage = append(logMessage, formatQueryString(queryString)...)
if error != nil {
level.Error(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
} else {
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)

shouldLog := source == tripperware.SourceAPI || (f.cfg.EnabledRulerQueryStatsLog && source == tripperware.SourceRuler)
if shouldLog {
logMessage = append(logMessage, formatQueryString(queryString)...)
if error != nil {
level.Error(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
} else {
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}
}

var reason string
Expand Down
40 changes: 32 additions & 8 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,28 +417,31 @@ func TestHandler_ServeHTTP(t *testing.T) {
func TestReportQueryStatsFormat(t *testing.T) {
outputBuf := bytes.NewBuffer(nil)
logger := log.NewSyncLogger(log.NewLogfmtLogger(outputBuf))
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, tenantfederation.Config{}, http.DefaultTransport, logger, nil)
userID := "fake"
req, _ := http.NewRequest(http.MethodGet, "http://localhost:8080/prometheus/api/v1/query", nil)
resp := &http.Response{ContentLength: 1000}
responseTime := time.Second
statusCode := http.StatusOK

type testCase struct {
queryString url.Values
queryStats *querier_stats.QueryStats
header http.Header
responseErr error
expectedLog string
queryString url.Values
queryStats *querier_stats.QueryStats
header http.Header
responseErr error
expectedLog string
enabledRulerQueryStatsLog bool
source string
}

tests := map[string]testCase{
"should not include query and header details if empty": {
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0`,
source: tripperware.SourceAPI,
},
"should include query length and string at the end": {
queryString: url.Values(map[string][]string{"query": {"up"}}),
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 query_length=2 param_query=up`,
source: tripperware.SourceAPI,
},
"should include query stats": {
queryStats: &querier_stats.QueryStats{
Expand All @@ -455,14 +458,17 @@ func TestReportQueryStatsFormat(t *testing.T) {
},
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 samples_scanned=0 query_storage_wall_time_seconds=6000`,
source: tripperware.SourceAPI,
},
"should include user agent": {
header: http.Header{"User-Agent": []string{"Grafana"}},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 user_agent=Grafana`,
source: tripperware.SourceAPI,
},
"should include response error": {
responseErr: errors.New("foo_err"),
expectedLog: `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 error=foo_err`,
source: tripperware.SourceAPI,
},
"should include query priority": {
queryString: url.Values(map[string][]string{"query": {"up"}}),
Expand All @@ -471,6 +477,7 @@ func TestReportQueryStatsFormat(t *testing.T) {
PriorityAssigned: true,
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 query_length=2 priority=99 param_query=up`,
source: tripperware.SourceAPI,
},
"should include data fetch min and max time": {
queryString: url.Values(map[string][]string{"query": {"up"}}),
Expand All @@ -479,6 +486,7 @@ func TestReportQueryStatsFormat(t *testing.T) {
DataSelectMinTime: 1704067200000,
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 data_select_max_time=1704153600 data_select_min_time=1704067200 query_length=2 param_query=up`,
source: tripperware.SourceAPI,
},
"should include query stats with store gateway stats": {
queryStats: &querier_stats.QueryStats{
Expand All @@ -497,16 +505,32 @@ func TestReportQueryStatsFormat(t *testing.T) {
},
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 samples_scanned=0 store_gateway_touched_postings_count=20 store_gateway_touched_posting_bytes=200 query_storage_wall_time_seconds=6000`,
source: tripperware.SourceAPI,
},
"should not report a log": {
expectedLog: ``,
source: tripperware.SourceRuler,
enabledRulerQueryStatsLog: false,
},
"should report a log": {
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0`,
source: tripperware.SourceRuler,
enabledRulerQueryStatsLog: true,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true, EnabledRulerQueryStatsLog: testData.enabledRulerQueryStatsLog}, tenantfederation.Config{}, http.DefaultTransport, logger, nil)
req.Header = testData.header
handler.reportQueryStats(req, tripperware.SourceAPI, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp)
handler.reportQueryStats(req, testData.source, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp)
data, err := io.ReadAll(outputBuf)
require.NoError(t, err)
require.Equal(t, testData.expectedLog+"\n", string(data))
if testData.expectedLog == "" {
require.Empty(t, string(data))
} else {
require.Equal(t, testData.expectedLog+"\n", string(data))
}
})
}
}
Expand Down
Loading