Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add a retry middleware to all the stats handlers #13584

Merged
merged 3 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pkg/querier/queryrange/queryrangebase/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package queryrangebase

import (
"context"
"reflect"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -95,6 +96,7 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
level.Error(util_log.WithContext(ctx, r.log)).Log(
"msg", "error processing request",
"try", tries,
"type", logImplementingType(req),
"query", query,
"query_hash", util.HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
Expand All @@ -113,3 +115,18 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
}
return nil, lastErr
}

func logImplementingType(i Request) string {
if i == nil {
return "nil"
}

t := reflect.TypeOf(i)

// Check if it's a pointer and get the underlying type if so
if t.Kind() == reflect.Ptr {
t = t.Elem()
}

return t.String()
}
23 changes: 14 additions & 9 deletions pkg/querier/queryrange/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func NewQueryShardMiddleware(
limits Limits,
maxShards int,
statsHandler queryrangebase.Handler,
retryNextHandler queryrangebase.Handler,
shardAggregation []string,
) queryrangebase.Middleware {
noshards := !hasShards(confs)
Expand All @@ -56,7 +57,7 @@ func NewQueryShardMiddleware(
}

mapperware := queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return newASTMapperware(confs, engineOpts, next, statsHandler, logger, shardingMetrics, limits, maxShards, shardAggregation)
return newASTMapperware(confs, engineOpts, next, retryNextHandler, statsHandler, logger, shardingMetrics, limits, maxShards, shardAggregation)
})

return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
Expand All @@ -76,6 +77,7 @@ func newASTMapperware(
confs ShardingConfigs,
engineOpts logql.EngineOpts,
next queryrangebase.Handler,
retryNextHandler queryrangebase.Handler,
statsHandler queryrangebase.Handler,
logger log.Logger,
metrics *logql.MapperMetrics,
Expand All @@ -88,6 +90,7 @@ func newASTMapperware(
logger: log.With(logger, "middleware", "QueryShard.astMapperware"),
limits: limits,
next: next,
retryNextHandler: retryNextHandler,
statsHandler: next,
ng: logql.NewDownstreamEngine(engineOpts, DownstreamHandler{next: next, limits: limits}, limits, logger),
metrics: metrics,
Expand All @@ -103,14 +106,15 @@ func newASTMapperware(
}

type astMapperware struct {
confs ShardingConfigs
logger log.Logger
limits Limits
next queryrangebase.Handler
statsHandler queryrangebase.Handler
ng *logql.DownstreamEngine
metrics *logql.MapperMetrics
maxShards int
confs ShardingConfigs
logger log.Logger
limits Limits
next queryrangebase.Handler
retryNextHandler queryrangebase.Handler
statsHandler queryrangebase.Handler
ng *logql.DownstreamEngine
metrics *logql.MapperMetrics
maxShards int

// Feature flag for sharding range and vector aggregations such as
// quantile_ver_time with probabilistic data structures.
Expand Down Expand Up @@ -191,6 +195,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que
ast.maxShards,
r,
ast.statsHandler,
ast.retryNextHandler,
ast.next,
ast.limits,
)
Expand Down
32 changes: 32 additions & 0 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ func NewMiddleware(
func NewDetectedLabelsTripperware(cfg Config, opts logql.EngineOpts, logger log.Logger, l Limits, schema config.SchemaConfig, metrics *Metrics, mw base.Middleware, namespace string, merger base.Merger, limits Limits, iqo util.IngesterQueryOptions) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := mw.Wrap(next)
if cfg.MaxRetries > 0 {
rm := base.NewRetryMiddleware(logger, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, namespace)
statsHandler = rm.Wrap(statsHandler)
}
splitter := newDefaultSplitter(limits, iqo)

queryRangeMiddleware := []base.Middleware{
Expand Down Expand Up @@ -553,6 +557,12 @@ func getOperation(path string) string {
func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, iqo util.IngesterQueryOptions, c cache.Cache, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
QueryMetricsMiddleware(metrics.QueryMetrics),
Expand Down Expand Up @@ -592,6 +602,7 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo
limits,
0, // 0 is unlimited shards
statsHandler,
retryNextHandler,
cfg.ShardAggregations,
),
)
Expand All @@ -618,6 +629,12 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo
func NewLimitedTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, merger base.Merger, iqo util.IngesterQueryOptions, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
StatsCollectorMiddleware(),
Expand All @@ -639,6 +656,7 @@ func NewLimitedTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logg
// and overwhelming the frontend, therefore we fix the number of shards to prevent this.
32, // 0 is unlimited shards
statsHandler,
retryNextHandler,
cfg.ShardAggregations,
),
)
Expand Down Expand Up @@ -854,6 +872,12 @@ func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logge

return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
QueryMetricsMiddleware(metrics.QueryMetrics),
Expand Down Expand Up @@ -895,6 +919,7 @@ func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logge
limits,
0, // 0 is unlimited shards
statsHandler,
retryNextHandler,
cfg.ShardAggregations,
),
)
Expand Down Expand Up @@ -976,6 +1001,12 @@ func NewInstantMetricTripperware(

return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
StatsCollectorMiddleware(),
Expand Down Expand Up @@ -1003,6 +1034,7 @@ func NewInstantMetricTripperware(
limits,
0, // 0 is unlimited shards
statsHandler,
retryNextHandler,
cfg.ShardAggregations,
),
)
Expand Down
35 changes: 19 additions & 16 deletions pkg/querier/queryrange/shard_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,22 @@ func shardResolverForConf(
maxParallelism int,
maxShards int,
r queryrangebase.Request,
statsHandler, next queryrangebase.Handler,
statsHandler, next, retryNext queryrangebase.Handler,
limits Limits,
) (logql.ShardResolver, bool) {
if conf.IndexType == types.TSDBType {
return &dynamicShardResolver{
ctx: ctx,
logger: logger,
statsHandler: statsHandler,
next: next,
limits: limits,
from: model.Time(r.GetStart().UnixMilli()),
through: model.Time(r.GetEnd().UnixMilli()),
maxParallelism: maxParallelism,
maxShards: maxShards,
defaultLookback: defaultLookback,
ctx: ctx,
logger: logger,
statsHandler: statsHandler,
retryNextHandler: retryNext,
next: next,
limits: limits,
from: model.Time(r.GetStart().UnixMilli()),
through: model.Time(r.GetEnd().UnixMilli()),
maxParallelism: maxParallelism,
maxShards: maxShards,
defaultLookback: defaultLookback,
}, true
}
if conf.RowShards < 2 {
Expand All @@ -64,10 +65,11 @@ type dynamicShardResolver struct {
ctx context.Context
// TODO(owen-d): shouldn't have to fork handlers here -- one should just transparently handle the right logic
// depending on the underlying type?
statsHandler queryrangebase.Handler // index stats handler (hooked up to results cache, etc)
next queryrangebase.Handler // next handler in the chain (used for non-stats reqs)
logger log.Logger
limits Limits
statsHandler queryrangebase.Handler // index stats handler (hooked up to results cache, etc)
retryNextHandler queryrangebase.Handler // next handler wrapped with retries
next queryrangebase.Handler // next handler in the chain (used for non-stats reqs)
logger log.Logger
limits Limits

from, through model.Time
maxParallelism int
Expand Down Expand Up @@ -251,7 +253,8 @@ func (r *dynamicShardResolver) ShardingRanges(expr syntax.Expr, targetBytesPerSh
exprStr := expr.String()
// try to get shards for the given expression
// if it fails, fallback to linearshards based on stats
resp, err := r.next.Do(r.ctx, &logproto.ShardsRequest{
// use the retry handler here to retry transient errors
resp, err := r.retryNextHandler.Do(r.ctx, &logproto.ShardsRequest{
From: adjustedFrom,
Through: r.through,
Query: expr.String(),
Expand Down
Loading