diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 32e8454b26..b8541ad387 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -11655,6 +11655,38 @@ "fieldValue": null, "fieldDefaultValue": null }, + { + "kind": "block", + "name": "dynamic_replication", + "required": false, + "desc": "", + "blockEntries": [ + { + "kind": "field", + "name": "enabled", + "required": false, + "desc": "Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "store-gateway.dynamic-replication.enabled", + "fieldType": "boolean", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "max_time_threshold", + "required": false, + "desc": "Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas.", + "fieldValue": null, + "fieldDefaultValue": 90000000000000, + "fieldFlag": "store-gateway.dynamic-replication.max-time-threshold", + "fieldType": "duration", + "fieldCategory": "experimental" + } + ], + "fieldValue": null, + "fieldDefaultValue": null + }, { "kind": "field", "name": "enabled_tenants", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 7f210fa904..46edd25960 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -3191,6 +3191,10 @@ Usage of ./cmd/mimir/mimir: How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Mimir will report not-ready status via /ready endpoint. -store-gateway.disabled-tenants comma-separated-list-of-strings Comma separated list of tenants that cannot be loaded by the store-gateway. If specified, and the store-gateway would normally load a given tenant for (via -store-gateway.enabled-tenants or sharding), it will be ignored instead. + -store-gateway.dynamic-replication.enabled + [experimental] Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage. + -store-gateway.dynamic-replication.max-time-threshold duration + [experimental] Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas. (default 25h0m0s) -store-gateway.enabled-tenants comma-separated-list-of-strings Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding. -store-gateway.sharding-ring.auto-forget-enabled diff --git a/development/mimir-microservices-mode/config/grafana-agent.yaml b/development/mimir-microservices-mode/config/grafana-agent.yaml index 62dcdc1942..14304e9695 100644 --- a/development/mimir-microservices-mode/config/grafana-agent.yaml +++ b/development/mimir-microservices-mode/config/grafana-agent.yaml @@ -23,10 +23,12 @@ prometheus: - 'ruler-2:8023' - 'compactor:8006' - 'query-frontend:8007' - - 'store-gateway-1:8008' - - 'store-gateway-2:8009' - - 'query-scheduler:8011' + - 'query-scheduler:8008' + - 'store-gateway-1:8011' + - 'store-gateway-2:8012' + - 'store-gateway-3:8013' - 'memcached-exporter:9150' + - 'continuous-test:8090' - 'load-generator:9900' labels: cluster: 'docker-compose' diff --git a/development/mimir-microservices-mode/config/mimir.yaml b/development/mimir-microservices-mode/config/mimir.yaml index 5d24599911..d90ea91fed 100644 --- a/development/mimir-microservices-mode/config/mimir.yaml +++ b/development/mimir-microservices-mode/config/mimir.yaml @@ -128,7 +128,7 @@ compactor: store_gateway: sharding_ring: - replication_factor: 1 + replication_factor: 3 heartbeat_period: 5s heartbeat_timeout: 15s wait_stability_min_duration: 0 diff --git a/development/mimir-microservices-mode/config/otel-collector.yaml b/development/mimir-microservices-mode/config/otel-collector.yaml index db98ceffe8..8d7634d310 100644 --- a/development/mimir-microservices-mode/config/otel-collector.yaml +++ b/development/mimir-microservices-mode/config/otel-collector.yaml @@ -57,20 +57,20 @@ receivers: cluster: 'docker-compose' namespace: 'mimir-microservices-mode' container: 'query-frontend' - - job_name: mimir-microservices-mode/store-gateway + - job_name: mimir-microservices-mode/query-scheduler static_configs: - - targets: ['store-gateway-1:8008', 'store-gateway-2:8009'] + - targets: ['query-scheduler:8008'] labels: cluster: 'docker-compose' namespace: 'mimir-microservices-mode' - container: 'store-gateway' - - job_name: mimir-microservices-mode/query-scheduler + container: 'query-scheduler' + - job_name: mimir-microservices-mode/store-gateway static_configs: - - targets: ['query-scheduler:8011'] + - targets: ['store-gateway-1:8011', 'store-gateway-2:8012', 'store-gateway-3:8013'] labels: cluster: 'docker-compose' namespace: 'mimir-microservices-mode' - container: 'query-scheduler' + container: 'store-gateway' - job_name: mimir-microservices-mode/memcached-exporter static_configs: - targets: ['memcached-exporter:9150'] diff --git a/development/mimir-microservices-mode/config/prometheus.yaml b/development/mimir-microservices-mode/config/prometheus.yaml index 760bc7843a..f2d30912e3 100644 --- a/development/mimir-microservices-mode/config/prometheus.yaml +++ b/development/mimir-microservices-mode/config/prometheus.yaml @@ -17,10 +17,13 @@ scrape_configs: - 'ruler-2:8023' - 'compactor:8006' - 'query-frontend:8007' - - 'store-gateway-1:8008' - - 'store-gateway-2:8009' - - 'query-scheduler:8011' + - 'query-scheduler:8008' + - 'store-gateway-1:8011' + - 'store-gateway-2:8012' + - 'store-gateway-3:8013' + - 'memcached-exporter:9150' - 'continuous-test:8090' + - 'load-generator:9900' labels: cluster: 'docker-compose' namespace: 'mimir-microservices-mode' diff --git a/development/mimir-microservices-mode/docker-compose.jsonnet b/development/mimir-microservices-mode/docker-compose.jsonnet index d99ce761c8..7d9aced4ff 100644 --- a/development/mimir-microservices-mode/docker-compose.jsonnet +++ b/development/mimir-microservices-mode/docker-compose.jsonnet @@ -45,7 +45,7 @@ std.manifestYamlDoc({ self.distributor + self.ingesters + self.read_components + // Querier, Frontend and query-scheduler, if enabled. - self.store_gateways + + self.store_gateways(3) + self.compactor + self.rulers(2) + self.alertmanagers(3) + @@ -113,7 +113,7 @@ std.manifestYamlDoc({ httpPort: 8005, extraArguments: // Use of scheduler is activated by `-querier.scheduler-address` option and setting -querier.frontend-address option to nothing. - if $._config.use_query_scheduler then '-querier.scheduler-address=query-scheduler:9011 -querier.frontend-address=' else '', + if $._config.use_query_scheduler then '-querier.scheduler-address=query-scheduler:9008 -querier.frontend-address=' else '', }), 'query-frontend': mimirService({ @@ -124,14 +124,14 @@ std.manifestYamlDoc({ extraArguments: '-query-frontend.max-total-query-length=8760h' + // Use of scheduler is activated by `-query-frontend.scheduler-address` option. - (if $._config.use_query_scheduler then ' -query-frontend.scheduler-address=query-scheduler:9011' else ''), + (if $._config.use_query_scheduler then ' -query-frontend.scheduler-address=query-scheduler:9008' else ''), }), } + ( if $._config.use_query_scheduler then { 'query-scheduler': mimirService({ name: 'query-scheduler', target: 'query-scheduler', - httpPort: 8011, + httpPort: 8008, extraArguments: '-query-frontend.max-total-query-length=8760h', }), } else {} @@ -167,20 +167,14 @@ std.manifestYamlDoc({ for id in std.range(1, count) }, - store_gateways:: { - 'store-gateway-1': mimirService({ - name: 'store-gateway-1', + store_gateways(count):: { + ['store-gateway-%d' % id]: mimirService({ + name: 'store-gateway-' + id, target: 'store-gateway', - httpPort: 8008, - jaegerApp: 'store-gateway-1', - }), - - 'store-gateway-2': mimirService({ - name: 'store-gateway-2', - target: 'store-gateway', - httpPort: 8009, - jaegerApp: 'store-gateway-2', - }), + httpPort: 8010 + id, + jaegerApp: 'store-gateway-%d' % id, + }) + for id in std.range(1, count) }, continuous_test:: { diff --git a/development/mimir-microservices-mode/docker-compose.yml b/development/mimir-microservices-mode/docker-compose.yml index 683a94e998..d59b39e809 100644 --- a/development/mimir-microservices-mode/docker-compose.yml +++ b/development/mimir-microservices-mode/docker-compose.yml @@ -365,7 +365,7 @@ "command": - "sh" - "-c" - - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=querier -server.http-listen-port=8005 -server.grpc-listen-port=9005 -activity-tracker.filepath=/activity/querier-8005 -querier.scheduler-address=query-scheduler:9011 -querier.frontend-address= -memberlist.nodename=querier -memberlist.bind-port=10005 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" + - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=querier -server.http-listen-port=8005 -server.grpc-listen-port=9005 -activity-tracker.filepath=/activity/querier-8005 -querier.scheduler-address=query-scheduler:9008 -querier.frontend-address= -memberlist.nodename=querier -memberlist.bind-port=10005 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" "depends_on": - "minio" - "distributor-1" @@ -391,7 +391,7 @@ "command": - "sh" - "-c" - - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=query-frontend -server.http-listen-port=8007 -server.grpc-listen-port=9007 -activity-tracker.filepath=/activity/query-frontend-8007 -query-frontend.max-total-query-length=8760h -query-frontend.scheduler-address=query-scheduler:9011 -memberlist.nodename=query-frontend -memberlist.bind-port=10007 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" + - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=query-frontend -server.http-listen-port=8007 -server.grpc-listen-port=9007 -activity-tracker.filepath=/activity/query-frontend-8007 -query-frontend.max-total-query-length=8760h -query-frontend.scheduler-address=query-scheduler:9008 -memberlist.nodename=query-frontend -memberlist.bind-port=10007 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" "depends_on": - "minio" - "distributor-1" @@ -417,7 +417,7 @@ "command": - "sh" - "-c" - - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=query-scheduler -server.http-listen-port=8011 -server.grpc-listen-port=9011 -activity-tracker.filepath=/activity/query-scheduler-8011 -query-frontend.max-total-query-length=8760h -memberlist.nodename=query-scheduler -memberlist.bind-port=10011 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" + - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=query-scheduler -server.http-listen-port=8008 -server.grpc-listen-port=9008 -activity-tracker.filepath=/activity/query-scheduler-8008 -query-frontend.max-total-query-length=8760h -memberlist.nodename=query-scheduler -memberlist.bind-port=10008 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" "depends_on": - "minio" - "distributor-1" @@ -431,8 +431,8 @@ "hostname": "query-scheduler" "image": "mimir" "ports": - - "8011:8011" - - "10011:10011" + - "8008:8008" + - "10008:10008" "volumes": - "./config:/mimir/config" - "./activity:/activity" @@ -495,7 +495,7 @@ "command": - "sh" - "-c" - - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=store-gateway -server.http-listen-port=8008 -server.grpc-listen-port=9008 -activity-tracker.filepath=/activity/store-gateway-8008 -memberlist.nodename=store-gateway-1 -memberlist.bind-port=10008 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" + - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=store-gateway -server.http-listen-port=8011 -server.grpc-listen-port=9011 -activity-tracker.filepath=/activity/store-gateway-8011 -memberlist.nodename=store-gateway-1 -memberlist.bind-port=10011 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" "depends_on": - "minio" - "distributor-1" @@ -509,8 +509,8 @@ "hostname": "store-gateway-1" "image": "mimir" "ports": - - "8008:8008" - - "10008:10008" + - "8011:8011" + - "10011:10011" "volumes": - "./config:/mimir/config" - "./activity:/activity" @@ -521,7 +521,7 @@ "command": - "sh" - "-c" - - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=store-gateway -server.http-listen-port=8009 -server.grpc-listen-port=9009 -activity-tracker.filepath=/activity/store-gateway-8009 -memberlist.nodename=store-gateway-2 -memberlist.bind-port=10009 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" + - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=store-gateway -server.http-listen-port=8012 -server.grpc-listen-port=9012 -activity-tracker.filepath=/activity/store-gateway-8012 -memberlist.nodename=store-gateway-2 -memberlist.bind-port=10012 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" "depends_on": - "minio" - "distributor-1" @@ -535,8 +535,34 @@ "hostname": "store-gateway-2" "image": "mimir" "ports": - - "8009:8009" - - "10009:10009" + - "8012:8012" + - "10012:10012" + "volumes": + - "./config:/mimir/config" + - "./activity:/activity" + "store-gateway-3": + "build": + "context": "." + "dockerfile": "dev.dockerfile" + "command": + - "sh" + - "-c" + - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=store-gateway -server.http-listen-port=8013 -server.grpc-listen-port=9013 -activity-tracker.filepath=/activity/store-gateway-8013 -memberlist.nodename=store-gateway-3 -memberlist.bind-port=10013 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" + "depends_on": + - "minio" + - "distributor-1" + "environment": + - "JAEGER_AGENT_HOST=jaeger" + - "JAEGER_AGENT_PORT=6831" + - "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000" + - "JAEGER_SAMPLER_PARAM=1" + - "JAEGER_SAMPLER_TYPE=const" + - "JAEGER_TAGS=app=store-gateway-3" + "hostname": "store-gateway-3" + "image": "mimir" + "ports": + - "8013:8013" + - "10013:10013" "volumes": - "./config:/mimir/config" - "./activity:/activity" diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index cfd5a77453..7ebe325c96 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -4919,6 +4919,20 @@ sharding_ring: # CLI flag: -store-gateway.sharding-ring.unregister-on-shutdown [unregister_on_shutdown: | default = true] +# Experimental dynamic replication configuration. +dynamic_replication: + # (experimental) Use a higher number of replicas for recent blocks. Useful to + # spread query load more evenly at the cost of slightly higher disk usage. + # CLI flag: -store-gateway.dynamic-replication.enabled + [enabled: | default = false] + + # (experimental) Threshold of the most recent sample in a block used to + # determine it is eligible for higher than default replication. If a block has + # samples within this amount of time, it is considered recent and will be + # owned by more replicas. + # CLI flag: -store-gateway.dynamic-replication.max-time-threshold + [max_time_threshold: | default = 25h] + # (advanced) Comma separated list of tenants that can be loaded by the # store-gateway. If specified, only blocks for these tenants will be loaded by # the store-gateway, otherwise all tenants can be loaded. Subject to sharding. diff --git a/go.mod b/go.mod index 84a05ee26e..705a726dfa 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20250122122458-53db97b18080 + github.com/grafana/dskit v0.0.0-20250128193928-104df19e2080 github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/influxdata/influxdb/v2 v2.7.11 diff --git a/go.sum b/go.sum index d564af85b4..2c05c10b0f 100644 --- a/go.sum +++ b/go.sum @@ -1271,8 +1271,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20250113170557-b4ab2ba363a8 h1:mdI6P22PgFD7bQ0Yf4h8cfHSldak4nxogvlsTHZyZmc= github.com/grafana/alerting v0.0.0-20250113170557-b4ab2ba363a8/go.mod h1:QsnoKX/iYZxA4Cv+H+wC7uxutBD8qi8ZW5UJvD2TYmU= -github.com/grafana/dskit v0.0.0-20250122122458-53db97b18080 h1:VrEVJtQtvq7fqWKAIHOtAEYa+kTrPImXB5/vM+QZOf0= -github.com/grafana/dskit v0.0.0-20250122122458-53db97b18080/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20250128193928-104df19e2080 h1:IHRsAMdemxPu9g9zPxTFcU3hLhfd5cl6W4fqRovAzkU= +github.com/grafana/dskit v0.0.0-20250128193928-104df19e2080/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937 h1:fwwnG/NcygoS6XbAaEyK2QzMXI/BZIEJvQ3CD+7XZm8= diff --git a/pkg/querier/blocks_consistency_checker.go b/pkg/querier/blocks_consistency_checker.go index 455524144e..cf8785d08f 100644 --- a/pkg/querier/blocks_consistency_checker.go +++ b/pkg/querier/blocks_consistency_checker.go @@ -41,7 +41,7 @@ func NewBlocksConsistency(uploadGracePeriod time.Duration, reg prometheus.Regist // NewTracker creates a consistency tracker from the known blocks. It filters out any block uploaded within uploadGracePeriod // and with a deletion mark within deletionGracePeriod. func (c *BlocksConsistency) NewTracker(knownBlocks bucketindex.Blocks, logger log.Logger) BlocksConsistencyTracker { - blocksToTrack := make(map[ulid.ULID]struct{}, len(knownBlocks)) + blocksToTrack := make(map[ulid.ULID]*bucketindex.Block, len(knownBlocks)) for _, block := range knownBlocks { // Some recently uploaded blocks, already discovered by the querier, may not have been discovered // and loaded by the store-gateway yet. In order to avoid false positives, we grant some time @@ -55,7 +55,7 @@ func (c *BlocksConsistency) NewTracker(knownBlocks bucketindex.Blocks, logger lo continue } - blocksToTrack[block.ID] = struct{}{} + blocksToTrack[block.ID] = block } return BlocksConsistencyTracker{ @@ -70,13 +70,13 @@ type BlocksConsistencyTracker struct { checksTotal prometheus.Counter checksFailed prometheus.Counter - tracked map[ulid.ULID]struct{} + tracked map[ulid.ULID]*bucketindex.Block queried map[ulid.ULID]struct{} } // Check takes a slice of blocks which can be all queried blocks so far or only blocks queried since the last call to Check. // Check returns the blocks which haven't been seen in any call to Check yet. -func (c BlocksConsistencyTracker) Check(queriedBlocks []ulid.ULID) (missingBlocks []ulid.ULID) { +func (c BlocksConsistencyTracker) Check(queriedBlocks []ulid.ULID) (missingBlocks bucketindex.Blocks) { // Make map of queried blocks, for quick lookup. for _, blockID := range queriedBlocks { if _, ok := c.tracked[blockID]; !ok { @@ -88,8 +88,8 @@ func (c BlocksConsistencyTracker) Check(queriedBlocks []ulid.ULID) (missingBlock } // Look for any missing blocks. - for block := range c.tracked { - if _, ok := c.queried[block]; !ok { + for id, block := range c.tracked { + if _, ok := c.queried[id]; !ok { missingBlocks = append(missingBlocks, block) } } diff --git a/pkg/querier/blocks_consistency_checker_test.go b/pkg/querier/blocks_consistency_checker_test.go index 24fb3b87ea..1cfa774a94 100644 --- a/pkg/querier/blocks_consistency_checker_test.go +++ b/pkg/querier/blocks_consistency_checker_test.go @@ -95,13 +95,16 @@ func TestBlocksConsistencyTracker_Check(t *testing.T) { reg := prometheus.NewPedanticRegistry() c := NewBlocksConsistency(uploadGracePeriod, reg) tracker := c.NewTracker(testData.knownBlocks, log.NewNopLogger()) - var missingBlocks []ulid.ULID + var missingBlocks bucketindex.Blocks for _, queriedBlocksAttempt := range testData.queriedBlocks { missingBlocks = tracker.Check(queriedBlocksAttempt) } tracker.Complete() - assert.Equal(t, testData.expectedMissingBlocks, missingBlocks) + // Note that we're comparing elements not the slices themselves because the + // zero value for a slice is nil but `GetULIDs()` returns a non-nil but possibly + // empty slice. + assert.ElementsMatch(t, testData.expectedMissingBlocks, missingBlocks.GetULIDs()) assert.Equal(t, float64(1), testutil.ToFloat64(c.checksTotal)) if len(testData.expectedMissingBlocks) > 0 { diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index d514983a9c..f91ad99f87 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -70,7 +70,7 @@ type BlocksStoreSet interface { // GetClientsFor returns the store gateway clients that should be used to // query the set of blocks in input. The exclude parameter is the map of // blocks -> store-gateway addresses that should be excluded. - GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) + GetClientsFor(userID string, blocks bucketindex.Blocks, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) } // BlocksFinder is the interface used to find blocks for a given user and time range. @@ -244,7 +244,17 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa return nil, errors.Wrap(err, "failed to create store-gateway ring client") } - stores, err = newBlocksStoreReplicationSet(storesRing, randomLoadBalancing, limits, querierCfg.StoreGatewayClient, logger, reg) + var dynamicReplication storegateway.DynamicReplication = storegateway.NewNopDynamicReplication() + if gatewayCfg.DynamicReplication.Enabled { + dynamicReplication = storegateway.NewMaxTimeDynamicReplication( + gatewayCfg.DynamicReplication.MaxTimeThreshold, + // Keep syncing blocks to store-gateways for a grace period (3 times the sync interval) to + // ensure they are not unloaded while they are still being queried. + mimir_tsdb.NewBlockDiscoveryDelayMultiplier*storageCfg.BucketStore.SyncInterval, + ) + } + + stores, err = newBlocksStoreReplicationSet(storesRing, randomLoadBalancing, dynamicReplication, limits, querierCfg.StoreGatewayClient, logger, reg) if err != nil { return nil, errors.Wrap(err, "failed to create store set") } @@ -537,7 +547,7 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck( var ( // At the beginning the list of blocks to query are all known blocks. - remainingBlocks = knownBlocks.GetULIDs() + remainingBlocks = knownBlocks attemptedBlocks = map[ulid.ULID][]string{} touchedStores = map[string]struct{}{} ) @@ -597,11 +607,11 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck( return nil } - spanLog.DebugLog("msg", "couldn't query all blocks", "attempt", attempt, "missing blocks", strings.Join(convertULIDsToString(remainingBlocks), " ")) + spanLog.DebugLog("msg", "couldn't query all blocks", "attempt", attempt, "missing blocks", strings.Join(convertULIDsToString(remainingBlocks.GetULIDs()), " ")) } // We've not been able to query all expected blocks after all retries. - err = newStoreConsistencyCheckFailedError(remainingBlocks) + err = newStoreConsistencyCheckFailedError(remainingBlocks.GetULIDs()) level.Warn(util_log.WithContext(ctx, spanLog)).Log("msg", "failed consistency check after all attempts", "err", err) return err } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 1d1710cea5..d692cd0d7e 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -3055,7 +3055,7 @@ type blocksStoreSetMock struct { nextResult int } -func (m *blocksStoreSetMock) GetClientsFor(_ string, _ []ulid.ULID, _ map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) { +func (m *blocksStoreSetMock) GetClientsFor(_ string, _ bucketindex.Blocks, _ map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) { if m.nextResult >= len(m.mockedResponses) { panic("not enough mocked results") } @@ -3191,9 +3191,10 @@ func (m *cancelerStoreGatewayClientMock) RemoteAddress() string { } type blocksStoreLimitsMock struct { - maxLabelsQueryLength time.Duration - maxChunksPerQuery int - storeGatewayTenantShardSize int + maxLabelsQueryLength time.Duration + maxChunksPerQuery int + storeGatewayTenantShardSize int + storeGatewayExpandedReplication bool } func (m *blocksStoreLimitsMock) MaxLabelsQueryLength(_ string) time.Duration { @@ -3208,6 +3209,10 @@ func (m *blocksStoreLimitsMock) StoreGatewayTenantShardSize(_ string) int { return m.storeGatewayTenantShardSize } +func (m *blocksStoreLimitsMock) StoreGatewayExpandedReplication(_ string) bool { + return m.storeGatewayExpandedReplication +} + func (m *blocksStoreLimitsMock) S3SSEType(_ string) string { return "" } diff --git a/pkg/querier/blocks_store_replicated_set.go b/pkg/querier/blocks_store_replicated_set.go index ead609729d..37f51431e8 100644 --- a/pkg/querier/blocks_store_replicated_set.go +++ b/pkg/querier/blocks_store_replicated_set.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/client_golang/prometheus" mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" + "github.com/grafana/mimir/pkg/storage/tsdb/bucketindex" "github.com/grafana/mimir/pkg/storegateway" "github.com/grafana/mimir/pkg/util" ) @@ -35,10 +36,11 @@ const ( type blocksStoreReplicationSet struct { services.Service - storesRing *ring.Ring - clientsPool *client.Pool - balancingStrategy loadBalancingStrategy - limits BlocksStoreLimits + storesRing *ring.Ring + clientsPool *client.Pool + balancingStrategy loadBalancingStrategy + dynamicReplication storegateway.DynamicReplication + limits BlocksStoreLimits // Subservices manager. subservices *services.Manager @@ -48,6 +50,7 @@ type blocksStoreReplicationSet struct { func newBlocksStoreReplicationSet( storesRing *ring.Ring, balancingStrategy loadBalancingStrategy, + dynamicReplication storegateway.DynamicReplication, limits BlocksStoreLimits, clientConfig ClientConfig, logger log.Logger, @@ -56,6 +59,7 @@ func newBlocksStoreReplicationSet( s := &blocksStoreReplicationSet{ storesRing: storesRing, clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg), + dynamicReplication: dynamicReplication, balancingStrategy: balancingStrategy, limits: limits, subservicesWatcher: services.NewFailureWatcher(), @@ -97,31 +101,34 @@ func (s *blocksStoreReplicationSet) stopping(_ error) error { return services.StopManagerAndAwaitStopped(context.Background(), s.subservices) } -func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) { - blocks := make(map[string][]ulid.ULID) +func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blocks bucketindex.Blocks, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) { + blocksByAddr := make(map[string][]ulid.ULID) instances := make(map[string]ring.InstanceDesc) userRing := storegateway.GetShuffleShardingSubring(s.storesRing, userID, s.limits) + replicationOption := ring.WithReplicationFactor(userRing.InstancesCount()) // Find the replication set of each block we need to query. - for _, blockID := range blockIDs { - // Do not reuse the same buffer across multiple Get() calls because we do retain the - // returned replication set. - bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() + for _, block := range blocks { + var ringOpts []ring.Option + if s.dynamicReplication.EligibleForQuerying(block) { + ringOpts = append(ringOpts, replicationOption) + } - set, err := userRing.Get(mimir_tsdb.HashBlockID(blockID), storegateway.BlocksRead, bufDescs, bufHosts, bufZones) + // Note that we don't pass buffers since we retain instances from the returned replication set. + set, err := userRing.GetWithOptions(mimir_tsdb.HashBlockID(block.ID), storegateway.BlocksRead, ringOpts...) if err != nil { - return nil, errors.Wrapf(err, "failed to get store-gateway replication set owning the block %s", blockID.String()) + return nil, errors.Wrapf(err, "failed to get store-gateway replication set owning the block %s", block.ID) } // Pick a non excluded store-gateway instance. - inst := getNonExcludedInstance(set, exclude[blockID], s.balancingStrategy) + inst := getNonExcludedInstance(set, exclude[block.ID], s.balancingStrategy) if inst == nil { - return nil, fmt.Errorf("no store-gateway instance left after checking exclude for block %s", blockID.String()) + return nil, fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block.ID) } instances[inst.Addr] = *inst - blocks[inst.Addr] = append(blocks[inst.Addr], blockID) + blocksByAddr[inst.Addr] = append(blocksByAddr[inst.Addr], block.ID) } clients := map[BlocksStoreClient][]ulid.ULID{} @@ -133,7 +140,7 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid return nil, errors.Wrapf(err, "failed to get store-gateway client for %s %s", instance.Id, addr) } - clients[c.(BlocksStoreClient)] = blocks[addr] + clients[c.(BlocksStoreClient)] = blocksByAddr[addr] } return clients, nil diff --git a/pkg/querier/blocks_store_replicated_set_test.go b/pkg/querier/blocks_store_replicated_set_test.go index 55896ace21..4f1012731e 100644 --- a/pkg/querier/blocks_store_replicated_set_test.go +++ b/pkg/querier/blocks_store_replicated_set_test.go @@ -26,21 +26,38 @@ import ( "github.com/stretchr/testify/require" mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" + "github.com/grafana/mimir/pkg/storage/tsdb/bucketindex" "github.com/grafana/mimir/pkg/storegateway" ) +func newBlock(id ulid.ULID, minT time.Time, maxT time.Time) *bucketindex.Block { + return &bucketindex.Block{ + ID: id, + MinTime: minT.UnixMilli(), + MaxTime: maxT.UnixMilli(), + } +} + func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { // The following block IDs have been picked to have increasing hash values // in order to simplify the tests. - block1 := ulid.MustNew(1, nil) // hash: 283204220 - block2 := ulid.MustNew(2, nil) // hash: 444110359 - block3 := ulid.MustNew(5, nil) // hash: 2931974232 - block4 := ulid.MustNew(6, nil) // hash: 3092880371 + blockID1 := ulid.MustNew(1, nil) // hash: 283204220 + blockID2 := ulid.MustNew(2, nil) // hash: 444110359 + blockID3 := ulid.MustNew(5, nil) // hash: 2931974232 + blockID4 := ulid.MustNew(6, nil) // hash: 3092880371 - block1Hash := mimir_tsdb.HashBlockID(block1) - block2Hash := mimir_tsdb.HashBlockID(block2) - block3Hash := mimir_tsdb.HashBlockID(block3) - block4Hash := mimir_tsdb.HashBlockID(block4) + block1Hash := mimir_tsdb.HashBlockID(blockID1) + block2Hash := mimir_tsdb.HashBlockID(blockID2) + block3Hash := mimir_tsdb.HashBlockID(blockID3) + block4Hash := mimir_tsdb.HashBlockID(blockID4) + + minT := time.Now().Add(-5 * time.Hour) + maxT := minT.Add(2 * time.Hour) + + block1 := newBlock(blockID1, minT, maxT) + block2 := newBlock(blockID2, minT, maxT) + block3 := newBlock(blockID3, minT, maxT) + block4 := newBlock(blockID4, minT, maxT) userID := "user-A" registeredAt := time.Now() @@ -49,7 +66,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { tenantShardSize int replicationFactor int setup func(*ring.Desc) - queryBlocks []ulid.ULID + queryBlocks bucketindex.Blocks exclude map[ulid.ULID][]string expectedClients map[string][]ulid.ULID expectedErr error @@ -60,9 +77,9 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block2}, + queryBlocks: []*bucketindex.Block{block1, block2}, expectedClients: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block2}, + "127.0.0.1": {blockID1, blockID2}, }, }, "shard size 0, single instance in the ring with RF = 1 but excluded": { @@ -71,11 +88,11 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block2}, + queryBlocks: []*bucketindex.Block{block1, block2}, exclude: map[ulid.ULID][]string{ - block1: {"127.0.0.1"}, + blockID1: {"127.0.0.1"}, }, - expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()), + expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", blockID1.String()), }, "shard size 0, single instance in the ring with RF = 1 but excluded for non queried block": { tenantShardSize: 0, @@ -83,12 +100,12 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block2}, + queryBlocks: []*bucketindex.Block{block1, block2}, exclude: map[ulid.ULID][]string{ - block3: {"127.0.0.1"}, + blockID3: {"127.0.0.1"}, }, expectedClients: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block2}, + "127.0.0.1": {blockID1, blockID2}, }, }, "shard size 0, single instance in the ring with RF = 2": { @@ -97,9 +114,9 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block2}, + queryBlocks: []*bucketindex.Block{block1, block2}, expectedClients: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block2}, + "127.0.0.1": {blockID1, blockID2}, }, }, "shard size 0, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 1": { @@ -111,11 +128,11 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block3, block4}, + queryBlocks: []*bucketindex.Block{block1, block3, block4}, expectedClients: map[string][]ulid.ULID{ - "127.0.0.1": {block1}, - "127.0.0.3": {block3}, - "127.0.0.4": {block4}, + "127.0.0.1": {blockID1}, + "127.0.0.3": {blockID3}, + "127.0.0.4": {blockID4}, }, }, "shard size 0, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 1 but excluded": { @@ -127,11 +144,11 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block3, block4}, + queryBlocks: []*bucketindex.Block{block1, block3, block4}, exclude: map[ulid.ULID][]string{ - block3: {"127.0.0.3"}, + blockID3: {"127.0.0.3"}, }, - expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block3.String()), + expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", blockID3.String()), }, "shard size 0, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 2": { tenantShardSize: 0, @@ -142,11 +159,11 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block3, block4}, + queryBlocks: []*bucketindex.Block{block1, block3, block4}, expectedClients: map[string][]ulid.ULID{ - "127.0.0.1": {block1}, - "127.0.0.3": {block3}, - "127.0.0.4": {block4}, + "127.0.0.1": {blockID1}, + "127.0.0.3": {blockID3}, + "127.0.0.4": {blockID4}, }, }, "shard size 0, multiple instances in the ring with multiple requested blocks belonging to the same store-gateway and RF = 2": { @@ -156,10 +173,10 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block2, block3, block4}, + queryBlocks: []*bucketindex.Block{block1, block2, block3, block4}, expectedClients: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block4}, - "127.0.0.2": {block2, block3}, + "127.0.0.1": {blockID1, blockID4}, + "127.0.0.2": {blockID2, blockID3}, }, }, "shard size 0, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 2 and some blocks excluded but with replacement available": { @@ -171,14 +188,14 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block3, block4}, + queryBlocks: []*bucketindex.Block{block1, block3, block4}, exclude: map[ulid.ULID][]string{ - block3: {"127.0.0.3"}, - block1: {"127.0.0.1"}, + blockID3: {"127.0.0.3"}, + blockID1: {"127.0.0.1"}, }, expectedClients: map[string][]ulid.ULID{ - "127.0.0.2": {block1}, - "127.0.0.4": {block3, block4}, + "127.0.0.2": {blockID1}, + "127.0.0.4": {blockID3, blockID4}, }, }, "shard size 0, multiple instances in the ring are JOINING, the requested block + its replicas only belongs to JOINING instances": { @@ -190,9 +207,9 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.JOINING, registeredAt, false, time.Time{}) d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1}, + queryBlocks: []*bucketindex.Block{block1}, expectedClients: map[string][]ulid.ULID{ - "127.0.0.4": {block1}, + "127.0.0.4": {blockID1}, }, }, "shard size 1, single instance in the ring with RF = 1": { @@ -201,9 +218,9 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block2}, + queryBlocks: []*bucketindex.Block{block1, block2}, expectedClients: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block2}, + "127.0.0.1": {blockID1, blockID2}, }, }, "shard size 1, single instance in the ring with RF = 1, but store-gateway excluded": { @@ -212,11 +229,11 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block2}, + queryBlocks: []*bucketindex.Block{block1, block2}, exclude: map[ulid.ULID][]string{ - block1: {"127.0.0.1"}, + blockID1: {"127.0.0.1"}, }, - expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()), + expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", blockID1.String()), }, "shard size 2, single instance in the ring with RF = 2": { tenantShardSize: 2, @@ -224,9 +241,9 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block2}, + queryBlocks: []*bucketindex.Block{block1, block2}, expectedClients: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block2}, + "127.0.0.1": {blockID1, blockID2}, }, }, "shard size 1, multiple instances in the ring with RF = 1": { @@ -238,9 +255,9 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block2, block4}, + queryBlocks: []*bucketindex.Block{block1, block2, block4}, expectedClients: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block2, block4}, + "127.0.0.1": {blockID1, blockID2, blockID4}, }, }, "shard size 2, shuffle sharding, multiple instances in the ring with RF = 1": { @@ -252,10 +269,10 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block2, block4}, + queryBlocks: []*bucketindex.Block{block1, block2, block4}, expectedClients: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block4}, - "127.0.0.3": {block2}, + "127.0.0.1": {blockID1, blockID4}, + "127.0.0.3": {blockID2}, }, }, "shard size 4, multiple instances in the ring with RF = 1": { @@ -267,11 +284,11 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block2, block4}, + queryBlocks: []*bucketindex.Block{block1, block2, block4}, expectedClients: map[string][]ulid.ULID{ - "127.0.0.1": {block1}, - "127.0.0.2": {block2}, - "127.0.0.4": {block4}, + "127.0.0.1": {blockID1}, + "127.0.0.2": {blockID2}, + "127.0.0.4": {blockID4}, }, }, "shard size 2, multiple instances in the ring with RF = 2, with excluded blocks but some replacement available": { @@ -283,13 +300,13 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block2}, + queryBlocks: []*bucketindex.Block{block1, block2}, exclude: map[ulid.ULID][]string{ - block1: {"127.0.0.1"}, - block2: {"127.0.0.1"}, + blockID1: {"127.0.0.1"}, + blockID2: {"127.0.0.1"}, }, expectedClients: map[string][]ulid.ULID{ - "127.0.0.3": {block1, block2}, + "127.0.0.3": {blockID1, blockID2}, }, }, "shard size 2, multiple instances in the ring with RF = 2, SS = 2 with excluded blocks and no replacement available": { @@ -301,12 +318,12 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) }, - queryBlocks: []ulid.ULID{block1, block2}, + queryBlocks: []*bucketindex.Block{block1, block2}, exclude: map[ulid.ULID][]string{ - block1: {"127.0.0.1", "127.0.0.3"}, - block2: {"127.0.0.1"}, + blockID1: {"127.0.0.1", "127.0.0.3"}, + blockID2: {"127.0.0.1"}, }, - expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()), + expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", blockID1.String()), }, } @@ -339,7 +356,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { } reg := prometheus.NewPedanticRegistry() - s, err := newBlocksStoreReplicationSet(r, noLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg) + s, err := newBlocksStoreReplicationSet(r, noLoadBalancing, storegateway.NewNopDynamicReplication(), limits, ClientConfig{}, log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck @@ -381,7 +398,11 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancin ctx := context.Background() userID := "user-A" registeredAt := time.Now() - block1 := ulid.MustNew(1, nil) + + minT := time.Now().Add(-5 * time.Hour) + maxT := minT.Add(2 * time.Hour) + blockID1 := ulid.MustNew(1, nil) + block1 := newBlock(blockID1, minT, maxT) // Create a ring. ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) @@ -405,7 +426,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancin limits := &blocksStoreLimitsMock{storeGatewayTenantShardSize: 0} reg := prometheus.NewPedanticRegistry() - s, err := newBlocksStoreReplicationSet(r, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg) + s, err := newBlocksStoreReplicationSet(r, randomLoadBalancing, storegateway.NewNopDynamicReplication(), limits, ClientConfig{}, log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck @@ -421,7 +442,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancin distribution := map[string]int{} for n := 0; n < numRuns; n++ { - clients, err := s.GetClientsFor(userID, []ulid.ULID{block1}, nil) + clients, err := s.GetClientsFor(userID, []*bucketindex.Block{block1}, nil) require.NoError(t, err) defer func() { // Close all clients to ensure no goroutines are leaked. diff --git a/pkg/storage/tsdb/block/meta.go b/pkg/storage/tsdb/block/meta.go index faaa4b0f9c..431c809226 100644 --- a/pkg/storage/tsdb/block/meta.go +++ b/pkg/storage/tsdb/block/meta.go @@ -11,6 +11,7 @@ import ( "io" "os" "path/filepath" + "time" "github.com/go-kit/log" "github.com/grafana/dskit/runutil" @@ -49,7 +50,15 @@ type Meta struct { Thanos ThanosMeta `json:"thanos"` } -func (m *Meta) String() string { +func (m Meta) GetMinTime() time.Time { + return time.UnixMilli(m.MinTime) +} + +func (m Meta) GetMaxTime() time.Time { + return time.UnixMilli(m.MaxTime) +} + +func (m Meta) String() string { return fmt.Sprintf("%s (min time: %d, max time: %d)", m.ULID, m.MinTime, m.MaxTime) } diff --git a/pkg/storage/tsdb/bucketindex/index.go b/pkg/storage/tsdb/bucketindex/index.go index b864f731df..697dd2be2a 100644 --- a/pkg/storage/tsdb/bucketindex/index.go +++ b/pkg/storage/tsdb/bucketindex/index.go @@ -114,6 +114,14 @@ func (m *Block) GetUploadedAt() time.Time { return time.Unix(m.UploadedAt, 0) } +func (m *Block) GetMinTime() time.Time { + return time.UnixMilli(m.MinTime) +} + +func (m *Block) GetMaxTime() time.Time { + return time.UnixMilli(m.MaxTime) +} + // ThanosMeta returns a block meta based on the known information in the index. // The returned meta doesn't include all original meta.json data but only a subset // of it. diff --git a/pkg/storegateway/dynamic_replication.go b/pkg/storegateway/dynamic_replication.go new file mode 100644 index 0000000000..4571b09454 --- /dev/null +++ b/pkg/storegateway/dynamic_replication.go @@ -0,0 +1,99 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package storegateway + +import ( + "errors" + "flag" + "time" +) + +var ( + errInvalidDynamicReplicationMaxTimeThreshold = errors.New("invalid dynamic replication max time threshold, the value must be at least one hour") +) + +type DynamicReplicationConfig struct { + Enabled bool `yaml:"enabled" category:"experimental"` + MaxTimeThreshold time.Duration `yaml:"max_time_threshold" category:"experimental"` +} + +func (cfg *DynamicReplicationConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.BoolVar(&cfg.Enabled, prefix+"dynamic-replication.enabled", false, "Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage.") + f.DurationVar(&cfg.MaxTimeThreshold, prefix+"dynamic-replication.max-time-threshold", 25*time.Hour, "Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas.") +} + +func (cfg *DynamicReplicationConfig) Validate() error { + if cfg.Enabled && cfg.MaxTimeThreshold < time.Hour { + return errInvalidDynamicReplicationMaxTimeThreshold + } + + return nil +} + +// ReplicatedBlock is a TSDB block that may be eligible to be synced to and queried from +// more store-gateways than the configured replication factor based on metadata about the +// block. +type ReplicatedBlock interface { + GetMinTime() time.Time + GetMaxTime() time.Time +} + +// DynamicReplication determines if a TSDB block is eligible to be sync to and queried from more +// store-gateways than the configured replication factor based on metadata about the block. +type DynamicReplication interface { + // EligibleForSync returns true if the block can be synced to more than the configured (via + // replication factor) number of store-gateways, false otherwise. + EligibleForSync(b ReplicatedBlock) bool + + // EligibleForQuerying returns true if the block can be safely queried from more than the + // configured (via replication factor) number of store-gateways, false otherwise. + EligibleForQuerying(b ReplicatedBlock) bool +} + +func NewNopDynamicReplication() *NopDynamicReplication { + return &NopDynamicReplication{} +} + +// NopDynamicReplication is an DynamicReplication implementation that always returns false. +type NopDynamicReplication struct{} + +func (n NopDynamicReplication) EligibleForSync(ReplicatedBlock) bool { + return false +} + +func (n NopDynamicReplication) EligibleForQuerying(ReplicatedBlock) bool { + return false +} + +func NewMaxTimeDynamicReplication(maxTime time.Duration, gracePeriod time.Duration) *MaxTimeDynamicReplication { + return &MaxTimeDynamicReplication{ + maxTimeThreshold: maxTime, + gracePeriod: gracePeriod, + now: time.Now, + } +} + +// MaxTimeDynamicReplication is an DynamicReplication implementation that determines +// if a block is eligible for expanded replication based on how recent its MaxTime (most +// recent sample) is. A grace period can optionally be used to ensure that blocks are +// synced to store-gateways until they are no longer being queried. +type MaxTimeDynamicReplication struct { + maxTimeThreshold time.Duration + gracePeriod time.Duration + now func() time.Time +} + +func (e *MaxTimeDynamicReplication) EligibleForSync(b ReplicatedBlock) bool { + now := e.now() + maxTimeDelta := now.Sub(b.GetMaxTime()) + // We keep syncing blocks for `gracePeriod` after they are no longer eligible for + // querying to ensure that they are not unloaded by store-gateways while still being + // queried. + return maxTimeDelta <= (e.maxTimeThreshold + e.gracePeriod) +} + +func (e *MaxTimeDynamicReplication) EligibleForQuerying(b ReplicatedBlock) bool { + now := e.now() + maxTimeDelta := now.Sub(b.GetMaxTime()) + return maxTimeDelta <= e.maxTimeThreshold +} diff --git a/pkg/storegateway/dynamic_replication_test.go b/pkg/storegateway/dynamic_replication_test.go new file mode 100644 index 0000000000..0ded34bae7 --- /dev/null +++ b/pkg/storegateway/dynamic_replication_test.go @@ -0,0 +1,79 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package storegateway + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/grafana/mimir/pkg/storage/tsdb/bucketindex" +) + +func TestMaxTimeExpandedReplication(t *testing.T) { + // Round "now" to the nearest millisecond since we are using millisecond precision + // for min/max times for the blocks. + now := time.Now().Round(time.Millisecond) + replication := NewMaxTimeDynamicReplication(25*time.Hour, 45*time.Minute) + replication.now = func() time.Time { return now } + + type testCase struct { + block bucketindex.Block + expectedSync bool + expectedQuery bool + } + + testCases := map[string]testCase{ + "max time eligible": { + block: bucketindex.Block{ + MinTime: now.Add(-24 * time.Hour).UnixMilli(), + MaxTime: now.Add(-12 * time.Hour).UnixMilli(), + }, + expectedSync: true, + expectedQuery: true, + }, + "max time on boundary": { + block: bucketindex.Block{ + MinTime: now.Add(-49 * time.Hour).UnixMilli(), + MaxTime: now.Add(-25 * time.Hour).UnixMilli(), + }, + expectedSync: true, + expectedQuery: true, + }, + "max time on boundary including grace period": { + block: bucketindex.Block{ + MinTime: now.Add(-49 * time.Hour).UnixMilli(), + MaxTime: now.Add(-(25*time.Hour + 45*time.Minute)).UnixMilli(), + }, + expectedSync: true, + expectedQuery: false, + }, + "max time inside grace period": { + block: bucketindex.Block{ + MinTime: now.Add(-49 * time.Hour).UnixMilli(), + MaxTime: now.Add(-(25*time.Hour + 15*time.Minute)).UnixMilli(), + }, + expectedSync: true, + expectedQuery: false, + }, + "max time too old": { + block: bucketindex.Block{ + MinTime: now.Add(-72 * time.Hour).UnixMilli(), + MaxTime: now.Add(-48 * time.Hour).UnixMilli(), + }, + expectedSync: false, + expectedQuery: false, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + canSync := replication.EligibleForSync(&tc.block) + canQuery := replication.EligibleForQuerying(&tc.block) + + require.Equal(t, tc.expectedSync, canSync, "expected to be able/not-able to sync block %+v using %+v", tc.block, replication) + require.Equal(t, tc.expectedQuery, canQuery, "expected to be able/not-able to query block %+v using %+v", tc.block, replication) + }) + } +} diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index d7f66e02bf..167db8c41e 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -53,7 +53,8 @@ var ( // Config holds the store gateway config. type Config struct { - ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration."` + ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration."` + DynamicReplication DynamicReplicationConfig `yaml:"dynamic_replication" doc:"description=Experimental dynamic replication configuration." category:"experimental"` EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants" category:"advanced"` DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants" category:"advanced"` @@ -62,6 +63,7 @@ type Config struct { // RegisterFlags registers the Config flags. func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { cfg.ShardingRing.RegisterFlags(f, logger) + cfg.DynamicReplication.RegisterFlagsWithPrefix(f, "store-gateway.") f.Var(&cfg.EnabledTenants, "store-gateway.enabled-tenants", "Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding.") f.Var(&cfg.DisabledTenants, "store-gateway.disabled-tenants", "Comma separated list of tenants that cannot be loaded by the store-gateway. If specified, and the store-gateway would normally load a given tenant for (via -store-gateway.enabled-tenants or sharding), it will be ignored instead.") @@ -73,6 +75,10 @@ func (cfg *Config) Validate(limits validation.Limits) error { return errInvalidTenantShardSize } + if err := cfg.DynamicReplication.Validate(); err != nil { + return err + } + return nil } @@ -173,7 +179,17 @@ func newStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfi return nil, errors.Wrap(err, "create ring client") } - shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger) + var dynamicReplication DynamicReplication = NewNopDynamicReplication() + if gatewayCfg.DynamicReplication.Enabled { + dynamicReplication = NewMaxTimeDynamicReplication( + gatewayCfg.DynamicReplication.MaxTimeThreshold, + // Keep syncing blocks to store-gateways for a grace period (3 times the sync interval) to + // ensure they are not unloaded while they are still being queried. + mimir_tsdb.NewBlockDiscoveryDelayMultiplier*storageCfg.BucketStore.SyncInterval, + ) + } + + shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, dynamicReplication, limits, logger) allowedTenants := util.NewAllowedTenants(gatewayCfg.EnabledTenants, gatewayCfg.DisabledTenants) if len(gatewayCfg.EnabledTenants) > 0 { diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go index dd93067598..d46ae0c8d2 100644 --- a/pkg/storegateway/sharding_strategy.go +++ b/pkg/storegateway/sharding_strategy.go @@ -46,21 +46,23 @@ type ShardingLimits interface { // ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, // where each tenant blocks are sharded across a subset of store-gateway instances. type ShuffleShardingStrategy struct { - r *ring.Ring - instanceID string - instanceAddr string - limits ShardingLimits - logger log.Logger + r *ring.Ring + instanceID string + instanceAddr string + dynamicReplication DynamicReplication + limits ShardingLimits + logger log.Logger } // NewShuffleShardingStrategy makes a new ShuffleShardingStrategy. -func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy { +func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, dynamicReplication DynamicReplication, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy { return &ShuffleShardingStrategy{ - r: r, - instanceID: instanceID, - instanceAddr: instanceAddr, - limits: limits, - logger: logger, + r: r, + instanceID: instanceID, + instanceAddr: instanceAddr, + dynamicReplication: dynamicReplication, + limits: limits, + logger: logger, } } @@ -111,20 +113,26 @@ func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, } r := GetShuffleShardingSubring(s.r, userID, s.limits) + replicationOption := ring.WithReplicationFactor(r.InstancesCount()) bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() + bufOption := ring.WithBuffers(bufDescs, bufHosts, bufZones) for blockID := range metas { - key := mimir_tsdb.HashBlockID(blockID) + ringOpts := []ring.Option{bufOption} + if s.dynamicReplication.EligibleForSync(metas[blockID]) { + ringOpts = append(ringOpts, replicationOption) + } // Check if the block is owned by the store-gateway - set, err := r.Get(key, BlocksOwnerSync, bufDescs, bufHosts, bufZones) + key := mimir_tsdb.HashBlockID(blockID) + set, err := r.GetWithOptions(key, BlocksOwnerSync, ringOpts...) // If an error occurs while checking the ring, we keep the previously loaded blocks. if err != nil { if _, ok := loaded[blockID]; ok { - level.Warn(s.logger).Log("msg", "failed to check block owner but block is kept because was previously loaded", "block", blockID.String(), "err", err) + level.Warn(s.logger).Log("msg", "failed to check block owner but block is kept because was previously loaded", "block", blockID, "err", err) } else { - level.Warn(s.logger).Log("msg", "failed to check block owner and block has been excluded because was not previously loaded", "block", blockID.String(), "err", err) + level.Warn(s.logger).Log("msg", "failed to check block owner and block has been excluded because was not previously loaded", "block", blockID, "err", err) // Skip the block. synced.WithLabelValues(shardExcludedMeta).Inc() @@ -143,8 +151,8 @@ func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, // we can safely unload it only once at least 1 authoritative owner is available // for queries. if _, ok := loaded[blockID]; ok { - // The ring Get() returns an error if there's no available instance. - if _, err := r.Get(key, BlocksOwnerRead, bufDescs, bufHosts, bufZones); err != nil { + // The ring GetWithOptions() method returns an error if there's no available instance. + if _, err := r.GetWithOptions(key, BlocksOwnerRead, ringOpts...); err != nil { // Keep the block. continue } diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index 6e6c0fabea..958fe5b94e 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -17,6 +17,7 @@ import ( "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -56,16 +57,18 @@ func TestShuffleShardingStrategy(t *testing.T) { } tests := map[string]struct { - replicationFactor int - limits ShardingLimits - setupRing func(*ring.Desc) - prevLoadedBlocks map[string]map[ulid.ULID]struct{} - expectedUsers []usersExpectation - expectedBlocks []blocksExpectation + replicationFactor int + expandedReplication DynamicReplication + limits ShardingLimits + setupRing func(*ring.Desc) + prevLoadedBlocks map[string]map[ulid.ULID]struct{} + expectedUsers []usersExpectation + expectedBlocks []blocksExpectation }{ "one ACTIVE instance in the ring with RF = 1 and SS = 1": { - replicationFactor: 1, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, + replicationFactor: 1, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE, registeredAt, false, time.Time{}) }, @@ -79,8 +82,9 @@ func TestShuffleShardingStrategy(t *testing.T) { }, }, "one ACTIVE instance in the ring with RF = 2 and SS = 1 (should still sync blocks on the only available instance)": { - replicationFactor: 1, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, + replicationFactor: 1, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE, registeredAt, false, time.Time{}) }, @@ -94,8 +98,9 @@ func TestShuffleShardingStrategy(t *testing.T) { }, }, "one ACTIVE instance in the ring with RF = 2 and SS = 2 (should still sync blocks on the only available instance)": { - replicationFactor: 1, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, + replicationFactor: 1, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE, registeredAt, false, time.Time{}) }, @@ -109,8 +114,9 @@ func TestShuffleShardingStrategy(t *testing.T) { }, }, "two ACTIVE instances in the ring with RF = 1 and SS = 1 (should sync blocks on 1 instance because of the shard size)": { - replicationFactor: 1, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, + replicationFactor: 1, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -125,8 +131,9 @@ func TestShuffleShardingStrategy(t *testing.T) { }, }, "two ACTIVE instances in the ring with RF = 1 and SS = 2 (should sync blocks on 2 instances because of the shard size)": { - replicationFactor: 1, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, + replicationFactor: 1, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -141,8 +148,9 @@ func TestShuffleShardingStrategy(t *testing.T) { }, }, "two ACTIVE instances in the ring with RF = 2 and SS = 1 (should sync blocks on 1 instance because of the shard size)": { - replicationFactor: 2, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, + replicationFactor: 2, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -157,8 +165,9 @@ func TestShuffleShardingStrategy(t *testing.T) { }, }, "two ACTIVE instances in the ring with RF = 2 and SS = 2 (should sync all blocks on 2 instances)": { - replicationFactor: 2, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, + replicationFactor: 2, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -173,8 +182,9 @@ func TestShuffleShardingStrategy(t *testing.T) { }, }, "multiple ACTIVE instances in the ring with RF = 2 and SS = 3": { - replicationFactor: 2, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, + replicationFactor: 2, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -191,9 +201,30 @@ func TestShuffleShardingStrategy(t *testing.T) { {instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{block4 /* replicated: */, block3}}, }, }, + "multiple ACTIVE instances in the ring with RF = 1 and SS = 3 and ER = true": { + replicationFactor: 1, + expandedReplication: NewMaxTimeDynamicReplication(25*time.Hour, 45*time.Minute), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) + r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}}, + }, + expectedBlocks: []blocksExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block3 /* extended replication: */, block4}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{block2 /* extended replication: */, block4}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{block4}}, + }, + }, "one unhealthy instance in the ring with RF = 1, SS = 3 and NO previously loaded blocks": { - replicationFactor: 1, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, + replicationFactor: 1, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -218,8 +249,9 @@ func TestShuffleShardingStrategy(t *testing.T) { }, }, "one unhealthy instance in the ring with RF = 2, SS = 3 and NO previously loaded blocks": { - replicationFactor: 2, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, + replicationFactor: 2, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -243,8 +275,9 @@ func TestShuffleShardingStrategy(t *testing.T) { }, }, "one unhealthy instance in the ring with RF = 2, SS = 2 and NO previously loaded blocks": { - replicationFactor: 2, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, + replicationFactor: 2, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -268,8 +301,9 @@ func TestShuffleShardingStrategy(t *testing.T) { }, }, "one unhealthy instance in the ring with RF = 2, SS = 2 and some previously loaded blocks": { - replicationFactor: 2, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, + replicationFactor: 2, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -296,8 +330,9 @@ func TestShuffleShardingStrategy(t *testing.T) { }, }, "LEAVING instance in the ring should continue to keep its shard blocks and they should NOT be replicated to another instance": { - replicationFactor: 1, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, + replicationFactor: 1, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -315,8 +350,9 @@ func TestShuffleShardingStrategy(t *testing.T) { }, }, "JOINING instance in the ring should get its shard blocks and they should not be replicated to another instance": { - replicationFactor: 1, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, + replicationFactor: 1, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -334,8 +370,9 @@ func TestShuffleShardingStrategy(t *testing.T) { }, }, "SS = 0 disables shuffle sharding": { - replicationFactor: 1, - limits: &shardingLimitsMock{storeGatewayTenantShardSize: 0}, + replicationFactor: 1, + expandedReplication: NewNopDynamicReplication(), + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 0}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -355,6 +392,7 @@ func TestShuffleShardingStrategy(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() + now := time.Now() ctx := context.Background() store, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -375,14 +413,16 @@ func TestShuffleShardingStrategy(t *testing.T) { r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), nil, log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, r)) - defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck + t.Cleanup(func() { + _ = services.StopAndAwaitTerminated(ctx, r) + }) // Wait until the ring client has synced. require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) // Assert on filter users. for _, expected := range testData.expectedUsers { - filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger()) + filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.expandedReplication, testData.limits, log.NewNopLogger()) actualUsers, err := filter.FilterUsers(ctx, []string{userID}) assert.Equal(t, expected.err, err) assert.Equal(t, expected.users, actualUsers) @@ -390,15 +430,35 @@ func TestShuffleShardingStrategy(t *testing.T) { // Assert on filter blocks. for _, expected := range testData.expectedBlocks { - filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger()) + filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.expandedReplication, testData.limits, log.NewNopLogger()) synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}) synced.WithLabelValues(shardExcludedMeta).Set(0) metas := map[ulid.ULID]*block.Meta{ - block1: {}, - block2: {}, - block3: {}, - block4: {}, + block1: { + BlockMeta: tsdb.BlockMeta{ + MinTime: now.Add(-5 * 24 * time.Hour).UnixMilli(), + MaxTime: now.Add(-4 * 24 * time.Hour).UnixMilli(), + }, + }, + block2: { + BlockMeta: tsdb.BlockMeta{ + MinTime: now.Add(-4 * 24 * time.Hour).UnixMilli(), + MaxTime: now.Add(-3 * 24 * time.Hour).UnixMilli(), + }, + }, + block3: { + BlockMeta: tsdb.BlockMeta{ + MinTime: now.Add(-3 * 24 * time.Hour).UnixMilli(), + MaxTime: now.Add(-2 * 24 * time.Hour).UnixMilli(), + }, + }, + block4: { + BlockMeta: tsdb.BlockMeta{ + MinTime: now.Add(-2 * 24 * time.Hour).UnixMilli(), + MaxTime: now.Add(-1 * 24 * time.Hour).UnixMilli(), + }, + }, } err = filter.FilterBlocks(ctx, userID, metas, testData.prevLoadedBlocks[expected.instanceID], synced) diff --git a/vendor/github.com/grafana/dskit/ring/replication_strategy.go b/vendor/github.com/grafana/dskit/ring/replication_strategy.go index db2b283548..8b9b501cad 100644 --- a/vendor/github.com/grafana/dskit/ring/replication_strategy.go +++ b/vendor/github.com/grafana/dskit/ring/replication_strategy.go @@ -7,10 +7,15 @@ import ( ) type ReplicationStrategy interface { - // Filter out unhealthy instances and checks if there're enough instances + // Filter out unhealthy instances and checks if there are enough instances // for an operation to succeed. Returns an error if there are not enough // instances. Filter(instances []InstanceDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []InstanceDesc, maxFailures int, err error) + + // SupportsExpandedReplication returns true for replication strategies that + // support increasing the replication factor beyond a single instance per zone, + // false otherwise. + SupportsExpandedReplication() bool } type defaultReplicationStrategy struct{} @@ -70,6 +75,14 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati return instances, len(instances) - minSuccess, nil } +func (s *defaultReplicationStrategy) SupportsExpandedReplication() bool { + // defaultReplicationStrategy assumes that a single instance per zone is returned and that + // it can treat replication factor as equivalent to the number of zones. This doesn't work + // when a per-call replication factor increases it beyond the configured replication factor + // and the number of zones. + return false +} + type ignoreUnhealthyInstancesReplicationStrategy struct{} func NewIgnoreUnhealthyInstancesReplicationStrategy() ReplicationStrategy { @@ -101,6 +114,10 @@ func (r *ignoreUnhealthyInstancesReplicationStrategy) Filter(instances []Instanc return instances, len(instances) - 1, nil } +func (r *ignoreUnhealthyInstancesReplicationStrategy) SupportsExpandedReplication() bool { + return true +} + func (r *Ring) IsHealthy(instance *InstanceDesc, op Operation, now time.Time) bool { return instance.IsHealthy(op, r.cfg.HeartbeatTimeout, now) } diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index 62a49a6d81..d8c793170f 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -34,6 +34,44 @@ const ( GetBufferSize = 5 ) +// Options are the result of Option instances that can be used to modify Ring.GetWithOptions behavior. +type Options struct { + ReplicationFactor int + BufDescs []InstanceDesc + BufHosts []string + BufZones []string +} + +// Option can be used to modify Ring behavior when calling Ring.GetWithOptions +type Option func(opts *Options) + +// WithBuffers creates an Option that will cause the given buffers to be used, avoiding allocations. +func WithBuffers(bufDescs []InstanceDesc, bufHosts, bufZones []string) Option { + return func(opts *Options) { + opts.BufDescs = bufDescs + opts.BufHosts = bufHosts + opts.BufZones = bufZones + } +} + +// WithReplicationFactor creates an Option that overrides the default replication factor for a single call. +// Note that the overridden replication factor must be a multiple of the number of zones. That is, there +// should be an identical number of instances in each zone. E.g. if Zones = 3 and Default RF = 3, overridden +// replication factor must be 6, 9, etc. +func WithReplicationFactor(replication int) Option { + return func(opts *Options) { + opts.ReplicationFactor = replication + } +} + +func collectOptions(opts ...Option) Options { + final := Options{} + for _, opt := range opts { + opt(&final) + } + return final +} + // ReadRing represents the read interface to the ring. // Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet. type ReadRing interface { @@ -42,13 +80,17 @@ type ReadRing interface { // to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet(). Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) + // GetWithOptions returns n (or more) instances which form the replicas for the given key + // with 0 or more Option instances to change the behavior of the method call. + GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) + // GetAllHealthy returns all healthy instances in the ring, for the given operation. // This function doesn't check if the quorum is honored, so doesn't fail if the number // of unhealthy instances is greater than the tolerated max unavailable. GetAllHealthy(op Operation) (ReplicationSet, error) // GetReplicationSetForOperation returns all instances where the input operation should be executed. - // The resulting ReplicationSet doesn't necessarily contains all healthy instances + // The resulting ReplicationSet doesn't necessarily contain all healthy instances // in the ring, but could contain the minimum set of instances required to execute // the input operation. GetReplicationSetForOperation(op Operation) (ReplicationSet, error) @@ -424,19 +466,44 @@ func (r *Ring) setRingStateFromDesc(ringDesc *Desc, updateMetrics, updateRegiste } // Get returns n (or more) instances which form the replicas for the given key. -func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) { +func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, _ []string) (ReplicationSet, error) { + // Note that we purposefully aren't calling GetWithOptions here since the closures it + // uses result in heap allocations which we specifically avoid in this method since it's + // called in hot loops. + return r.getReplicationSetForKey(key, op, bufDescs, bufHosts, r.cfg.ReplicationFactor) +} + +// GetWithOptions returns n (or more) instances which form the replicas for the given key +// with 0 or more options to change the behavior of the method call. +func (r *Ring) GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) { + options := collectOptions(opts...) + return r.getReplicationSetForKey(key, op, options.BufDescs, options.BufHosts, options.ReplicationFactor) +} + +func (r *Ring) getReplicationSetForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, replicationFactor int) (ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() if r.ringDesc == nil || len(r.ringTokens) == 0 { return ReplicationSet{}, ErrEmptyRing } - instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, bufZones, nil) + if replicationFactor <= 0 || replicationFactor < r.cfg.ReplicationFactor { + replicationFactor = r.cfg.ReplicationFactor + } + + // Not all replication strategies support increasing the replication factor beyond + // the number of zones available. Return an error unless a ReplicationStrategy has + // explicitly opted into supporting this. + if replicationFactor > r.cfg.ReplicationFactor && !r.strategy.SupportsExpandedReplication() { + return ReplicationSet{}, fmt.Errorf("per-call replication factor %d cannot exceed the configured replication factor %d with this replication strategy", replicationFactor, r.cfg.ReplicationFactor) + } + + instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, replicationFactor, nil) if err != nil { return ReplicationSet{}, err } - healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled) + healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, replicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled) if err != nil { return ReplicationSet{}, err } @@ -450,21 +517,34 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, // Returns instances for given key and operation. Instances are not filtered through ReplicationStrategy. // InstanceFilter can ignore uninteresting instances that would otherwise be part of the output, and can also stop search early. // This function needs to be called with read lock on the ring. -func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) { +func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, replicationFactor int, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) { var ( - n = r.cfg.ReplicationFactor - instances = bufDescs[:0] - start = searchToken(r.ringTokens, key) - iterations = 0 - maxZones = len(r.ringTokensByZone) + n = replicationFactor + instances = bufDescs[:0] + start = searchToken(r.ringTokens, key) + iterations = 0 + // The configured replication factor is treated as the expected number of zones + // when zone-awareness is enabled. Per-call replication factor may increase the + // number of instances selected per zone, but the number of inferred zones does + // not change in this case. + maxZones = r.cfg.ReplicationFactor maxInstances = len(r.ringDesc.Ingesters) // We use a slice instead of a map because it's faster to search within a - // slice than lookup a map for a very low number of items. + // slice than lookup a map for a very low number of items, we only expect + // to have low single-digit number of hosts. distinctHosts = bufHosts[:0] - distinctZones = bufZones[:0] + + hostsPerZone = make(map[string]int) + targetHostsPerZone = max(1, replicationFactor/maxZones) ) - for i := start; len(distinctHosts) < min(maxInstances, n) && len(distinctZones) < maxZones && iterations < len(r.ringTokens); i++ { + + for i := start; len(distinctHosts) < min(maxInstances, n) && iterations < len(r.ringTokens); i++ { + // If we have the target number of instances in all zones, stop looking. + if r.cfg.ZoneAwarenessEnabled && haveTargetHostsInAllZones(hostsPerZone, targetHostsPerZone, maxZones) { + break + } + iterations++ // Wrap i around in the ring. i %= len(r.ringTokens) @@ -481,9 +561,9 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance continue } - // Ignore if the instances don't have a zone set. + // If we already have the required number of instances for this zone, skip. if r.cfg.ZoneAwarenessEnabled && info.Zone != "" { - if slices.Contains(distinctZones, info.Zone) { + if hostsPerZone[info.Zone] >= targetHostsPerZone { continue } } @@ -496,9 +576,9 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance if op.ShouldExtendReplicaSetOnState(instance.State) { n++ } else if r.cfg.ZoneAwarenessEnabled && info.Zone != "" { - // We should only add the zone if we are not going to extend, - // as we want to extend the instance in the same AZ. - distinctZones = append(distinctZones, info.Zone) + // We should only increment the count for this zone if we are not going to + // extend, as we want to extend the instance in the same AZ. + hostsPerZone[info.Zone]++ } include, keepGoing := true, true @@ -515,6 +595,20 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance return instances, nil } +func haveTargetHostsInAllZones(hostsByZone map[string]int, targetHostsPerZone int, maxZones int) bool { + if len(hostsByZone) != maxZones { + return false + } + + for _, count := range hostsByZone { + if count < targetHostsPerZone { + return false + } + } + + return true +} + // GetAllHealthy implements ReadRing. func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) { r.mtx.RLock() @@ -1335,36 +1429,3 @@ func (op Operation) ShouldExtendReplicaSetOnState(s InstanceState) bool { // All states are healthy, no states extend replica set. var allStatesRingOperation = Operation(0x0000ffff) - -// numberOfKeysOwnedByInstance returns how many of the supplied keys are owned by given instance. -func (r *Ring) numberOfKeysOwnedByInstance(keys []uint32, op Operation, instanceID string, bufDescs []InstanceDesc, bufHosts []string, bufZones []string) (int, error) { - r.mtx.RLock() - defer r.mtx.RUnlock() - - if r.ringDesc == nil || len(r.ringTokens) == 0 { - return 0, ErrEmptyRing - } - - // Instance is not in this ring, it can't own any key. - if _, ok := r.ringDesc.Ingesters[instanceID]; !ok { - return 0, nil - } - - owned := 0 - for _, tok := range keys { - i, err := r.findInstancesForKey(tok, op, bufDescs, bufHosts, bufZones, func(foundInstanceID string) (include, keepGoing bool) { - if foundInstanceID == instanceID { - // If we've found our instance, we can stop. - return true, false - } - return false, true - }) - if err != nil { - return 0, err - } - if len(i) > 0 { - owned++ - } - } - return owned, nil -} diff --git a/vendor/modules.txt b/vendor/modules.txt index dd1a96610a..4044ea8b04 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -644,7 +644,7 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20250122122458-53db97b18080 +# github.com/grafana/dskit v0.0.0-20250128193928-104df19e2080 ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast