Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: better defaults for querier max concurrency #10785

Merged
merged 4 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* [10655](https://github.com/grafana/loki/pull/10655) **chaudum** Remove legacy ingester shutdown handler `/ingester/flush_shutdown`.
* [10736](https://github.com/grafana/loki/pull/10736) **ashwanthgoli** Deprecate write dedupe cache as this is not required by the newer single store indexes (tsdb and boltdb-shipper).
* [10693](https://github.com/grafana/loki/pull/10693) **ashwanthgoli** Embedded cache: Updates the metric prefix from `querier_cache_` to `loki_embeddedcache_` and removes duplicate metrics.
* [10785](https://github.com/grafana/loki/pull/10785) **ashwanthgoli** Config: Removes `querier.worker-parallelism` and updates default value of `querier.max-concurrent` to 4.

##### Fixes

Expand Down
15 changes: 3 additions & 12 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,10 @@ engine:
# CLI flag: -querier.engine.max-lookback-period
[max_look_back_period: <duration> | default = 30s]

# The maximum number of concurrent queries allowed.
# The maximum number of queries that can be simultaneously processed by the
# querier.
# CLI flag: -querier.max-concurrent
[max_concurrent: <int> | default = 10]
[max_concurrent: <int> | default = 4]

# Only query the store, and not attempt any ingesters. This is useful for
# running a standalone querier pool operating only against stored data.
Expand Down Expand Up @@ -2750,16 +2751,6 @@ The `frontend_worker` configures the worker - running within the Loki querier -
# CLI flag: -querier.dns-lookup-period
[dns_lookup_duration: <duration> | default = 3s]

# Number of simultaneous queries to process per query-frontend or
# query-scheduler.
# CLI flag: -querier.worker-parallelism
[parallelism: <int> | default = 10]

# Force worker concurrency to match the -querier.max-concurrent option.
# Overrides querier.worker-parallelism.
# CLI flag: -querier.worker-match-max-concurrent
[match_max_concurrent: <boolean> | default = true]

# Querier ID, sent to frontend service to identify requests from the same
# querier. Defaults to hostname.
# CLI flag: -querier.id
Expand Down
18 changes: 11 additions & 7 deletions docs/sources/setup/upgrade/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ The previous default value `false` is applied.
#### Deprecated configuration options are removed

1. Removes already deprecated `-querier.engine.timeout` CLI flag and the corresponding YAML setting.
2. Also removes the `query_timeout` from the querier YAML section. Instead of configuring `query_timeout` under `querier`, you now configure it in [Limits Config](/docs/loki/latest/configuration/#limits_config).
3. `s3.sse-encryption` is removed. AWS now defaults encryption of all buckets to SSE-S3. Use `sse.type` to set SSE type.
4. `ruler.wal-cleaer.period` is removed. Use `ruler.wal-cleaner.period` instead.
5. `experimental.ruler.enable-api` is removed. Use `ruler.enable-api` instead.
6. `split_queries_by_interval` is removed from `query_range` YAML section. You can instead configure it in [Limits Config](/docs/loki/latest/configuration/#limits_config).
7. `frontend.forward-headers-list` CLI flag and its corresponding YAML setting are removed.
8. `frontend.cache-split-interval` CLI flag is removed. Results caching interval is now determined by `querier.split-queries-by-interval`.
1. Also removes the `query_timeout` from the querier YAML section. Instead of configuring `query_timeout` under `querier`, you now configure it in [Limits Config](/docs/loki/latest/configuration/#limits_config).
1. `s3.sse-encryption` is removed. AWS now defaults encryption of all buckets to SSE-S3. Use `sse.type` to set SSE type.
1. `ruler.wal-cleaer.period` is removed. Use `ruler.wal-cleaner.period` instead.
1. `experimental.ruler.enable-api` is removed. Use `ruler.enable-api` instead.
1. `split_queries_by_interval` is removed from `query_range` YAML section. You can instead configure it in [Limits Config](/docs/loki/latest/configuration/#limits_config).
1. `frontend.forward-headers-list` CLI flag and its corresponding YAML setting are removed.
1. `frontend.cache-split-interval` CLI flag is removed. Results caching interval is now determined by `querier.split-queries-by-interval`.
1. `querier.worker-parallelism` CLI flag and its corresponding yaml setting are now removed as it does not offer additional value to already existing `querier.max-concurrent`.
We recommend configuring `querier.max-concurrent` to limit the max concurrent requests processed by the queriers.

#### Legacy ingester shutdown handler is removed

Expand All @@ -67,6 +69,8 @@ This new metric will provide a more clear signal that there is an issue with ing

#### Changes to default configuration values

1. `querier.max-concurrent` now defaults to 4. Consider increasing this if queriers have access to more CPU resources.
Note that you risk running into out of memory errors if you set this to a very high value.
1. `frontend.embedded-cache.max-size-mb` Embedded results cache size now defaults to 100MB.

#### Write dedupe cache is deprecated
Expand Down
5 changes: 2 additions & 3 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ func (t *Loki) initQuerier() (services.Service, error) {
if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 {
t.Cfg.Querier.IngesterQueryStoreMaxLookback = t.Cfg.Ingester.QueryStoreMaxLookBackPeriod
}
// Querier worker's max concurrent requests must be the same as the querier setting
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
// Querier worker's max concurrent must be the same as the querier setting
t.Cfg.Worker.MaxConcurrent = t.Cfg.Querier.MaxConcurrent

deleteStore, err := t.deleteRequestsClient("querier", t.Overrides)
if err != nil {
Expand All @@ -357,7 +357,6 @@ func (t *Loki) initQuerier() (services.Service, error) {
ReadEnabled: t.Cfg.isModuleEnabled(Read),
GrpcListenAddress: t.Cfg.Server.GRPCListenAddress,
GrpcListenPort: t.Cfg.Server.GRPCListenPort,
QuerierMaxConcurrent: t.Cfg.Querier.MaxConcurrent,
QuerierWorkerConfig: &t.Cfg.Worker,
QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend),
QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler),
Expand Down
4 changes: 1 addition & 3 deletions pkg/lokifrontend/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a

var workerConfig querier_worker.Config
flagext.DefaultValues(&workerConfig)
workerConfig.Parallelism = 1
workerConfig.MatchMaxConcurrency = matchMaxConcurrency
workerConfig.MaxConcurrentRequests = 1
workerConfig.MaxConcurrent = 1

// localhost:0 prevents firewall warnings on Mac OS X.
grpcListen, err := net.Listen("tcp", "localhost:0")
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.TailMaxDuration, "querier.tail-max-duration", 1*time.Hour, "Maximum duration for which the live tailing requests are served.")
f.DurationVar(&cfg.ExtraQueryDelay, "querier.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 3*time.Hour, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 10, "The maximum number of concurrent queries allowed.")
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 4, "The maximum number of queries that can be simultaneously processed by the querier.")
f.BoolVar(&cfg.QueryStoreOnly, "querier.query-store-only", false, "Only query the store, and not attempt any ingesters. This is useful for running a standalone querier pool operating only against stored data.")
f.BoolVar(&cfg.QueryIngesterOnly, "querier.query-ingester-only", false, "When true, queriers only query the ingesters, and not stored data. This is useful when the object store is unavailable.")
f.BoolVar(&cfg.MultiTenantQueriesEnabled, "querier.multi-tenant-queries-enabled", false, "When true, allow queries to span multiple tenants.")
Expand Down
41 changes: 13 additions & 28 deletions pkg/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ type Config struct {
FrontendAddress string `yaml:"frontend_address"`
SchedulerAddress string `yaml:"scheduler_address"`
DNSLookupPeriod time.Duration `yaml:"dns_lookup_duration"`

Parallelism int `yaml:"parallelism"`
MatchMaxConcurrency bool `yaml:"match_max_concurrent"`
MaxConcurrentRequests int `yaml:"-"` // Must be same as passed to LogQL Engine.
MaxConcurrent int `yaml:"-"` // same as querier.max-concurrent.

QuerierID string `yaml:"id"`

Expand All @@ -38,11 +35,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.SchedulerAddress, "querier.scheduler-address", "", "Hostname (and port) of scheduler that querier will periodically resolve, connect to and receive queries from. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.")
f.StringVar(&cfg.FrontendAddress, "querier.frontend-address", "", "Address of query frontend service, in host:port format. If -querier.scheduler-address is set as well, querier will use scheduler instead. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.")

f.DurationVar(&cfg.DNSLookupPeriod, "querier.dns-lookup-period", 3*time.Second, "How often to query DNS for query-frontend or query-scheduler address. Also used to determine how often to poll the scheduler-ring for addresses if the scheduler-ring is configured.")

f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process per query-frontend or query-scheduler.")
f.BoolVar(&cfg.MatchMaxConcurrency, "querier.worker-match-max-concurrent", true, "Force worker concurrency to match the -querier.max-concurrent option. Overrides querier.worker-parallelism.")
f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to frontend service to identify requests from the same querier. Defaults to hostname.")

cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)
Expand Down Expand Up @@ -235,27 +228,23 @@ func (w *querierWorker) AddressRemoved(address string) {

// Must be called with lock.
func (w *querierWorker) resetConcurrency() {
totalConcurrency := 0
var (
index, totalConcurrency int
)

defer func() {
w.metrics.concurrentWorkers.Set(float64(totalConcurrency))
}()
index := 0

for _, m := range w.managers {
concurrency := 0

if w.cfg.MatchMaxConcurrency {
concurrency = w.cfg.MaxConcurrentRequests / len(w.managers)

// If max concurrency does not evenly divide into our frontends a subset will be chosen
// to receive an extra connection. Frontend addresses were shuffled above so this will be a
// random selection of frontends.
if index < w.cfg.MaxConcurrentRequests%len(w.managers) {
level.Warn(w.logger).Log("msg", "max concurrency is not evenly divisible across targets, adding an extra connection", "addr", m.address)
concurrency++
}
} else {
concurrency = w.cfg.Parallelism
concurrency := w.cfg.MaxConcurrent / len(w.managers)

// If max concurrency does not evenly divide into our frontends a subset will be chosen
// to receive an extra connection. Frontend addresses were shuffled above so this will be a
// random selection of frontends.
if index < w.cfg.MaxConcurrent%len(w.managers) {
level.Warn(w.logger).Log("msg", "max concurrency is not evenly divisible across targets, adding an extra connection", "addr", m.address)
concurrency++
}

// If concurrency is 0 then MaxConcurrentRequests is less than the total number of
Expand All @@ -270,10 +259,6 @@ func (w *querierWorker) resetConcurrency() {
m.concurrency(concurrency)
index++
}

if totalConcurrency > w.cfg.MaxConcurrentRequests {
level.Warn(w.logger).Log("msg", "total worker concurrency is greater than logql max concurrency. Queries may be queued in the querier which reduces QOS")
}
}

func (w *querierWorker) connect(ctx context.Context, address string) (*grpc.ClientConn, error) {
Expand Down
24 changes: 5 additions & 19 deletions pkg/querier/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,30 @@ import (
func TestResetConcurrency(t *testing.T) {
tests := []struct {
name string
parallelism int
maxConcurrent int
numTargets int
expectedConcurrency int
}{
{
name: "Test create at least one processor per target",
parallelism: 0,
name: "create at least one processor per target",
maxConcurrent: 0,
numTargets: 2,
expectedConcurrency: 2,
},
{
name: "Test concurrency equal to parallelism * target when MatchMaxConcurrency is false",
parallelism: 4,
maxConcurrent: 0,
numTargets: 2,
expectedConcurrency: 8,
},
{
name: "Test concurrency is correct when numTargets does not divide evenly into maxConcurrent",
parallelism: 1,
name: "concurrency is correct when numTargets does not divide evenly into maxConcurrent",
maxConcurrent: 7,
numTargets: 4,
expectedConcurrency: 7,
},
{
name: "Test Total Parallelism dividing evenly",
parallelism: 1,
name: "total concurrency dividing evenly",
maxConcurrent: 6,
numTargets: 2,
expectedConcurrency: 6,
},
{
name: "Test Total Parallelism at least one worker per target",
parallelism: 1,
name: "total concurrency at least one processor per target",
maxConcurrent: 3,
numTargets: 6,
expectedConcurrency: 6,
Expand All @@ -63,9 +51,7 @@ func TestResetConcurrency(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := Config{
Parallelism: tt.parallelism,
MatchMaxConcurrency: tt.maxConcurrent > 0,
MaxConcurrentRequests: tt.maxConcurrent,
MaxConcurrent: tt.maxConcurrent,
}

w, err := newQuerierWorkerWithProcessor(cfg, NewMetrics(cfg, nil), log.NewNopLogger(), &mockProcessor{}, "", nil, nil)
Expand Down
1 change: 0 additions & 1 deletion pkg/querier/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type WorkerServiceConfig struct {
ReadEnabled bool
GrpcListenAddress string
GrpcListenPort int
QuerierMaxConcurrent int
QuerierWorkerConfig *querier_worker.Config
QueryFrontendEnabled bool
QuerySchedulerEnabled bool
Expand Down