diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 6e8ef5403e..70263e4a67 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -11582,7 +11582,7 @@ }, { "kind": "block", - "name": "expanded_replication", + "name": "dynamic_replication", "required": false, "desc": "", "blockEntries": [ @@ -11593,7 +11593,7 @@ "desc": "Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage.", "fieldValue": null, "fieldDefaultValue": false, - "fieldFlag": "store-gateway.expanded-replication.enabled", + "fieldFlag": "store-gateway.dynamic-replication.enabled", "fieldType": "boolean", "fieldCategory": "experimental" }, @@ -11604,7 +11604,7 @@ "desc": "Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas.", "fieldValue": null, "fieldDefaultValue": 90000000000000, - "fieldFlag": "store-gateway.expanded-replication.max-time-threshold", + "fieldFlag": "store-gateway.dynamic-replication.max-time-threshold", "fieldType": "duration", "fieldCategory": "experimental" } diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index c19da8b228..2abe13571f 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -3181,12 +3181,12 @@ Usage of ./cmd/mimir/mimir: How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Mimir will report not-ready status via /ready endpoint. -store-gateway.disabled-tenants comma-separated-list-of-strings Comma separated list of tenants that cannot be loaded by the store-gateway. If specified, and the store-gateway would normally load a given tenant for (via -store-gateway.enabled-tenants or sharding), it will be ignored instead. - -store-gateway.enabled-tenants comma-separated-list-of-strings - Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding. - -store-gateway.expanded-replication.enabled + -store-gateway.dynamic-replication.enabled [experimental] Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage. - -store-gateway.expanded-replication.max-time-threshold duration + -store-gateway.dynamic-replication.max-time-threshold duration [experimental] Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas. (default 25h0m0s) + -store-gateway.enabled-tenants comma-separated-list-of-strings + Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding. -store-gateway.sharding-ring.auto-forget-enabled When enabled, a store-gateway is automatically removed from the ring after failing to heartbeat the ring for a period longer than 10 times the configured -store-gateway.sharding-ring.heartbeat-timeout. (default true) -store-gateway.sharding-ring.consul.acl-token string diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index e6ffb13eb0..286dce48fb 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -4849,18 +4849,18 @@ sharding_ring: # CLI flag: -store-gateway.sharding-ring.unregister-on-shutdown [unregister_on_shutdown: | default = true] -# Experimental expanded replication configuration. -expanded_replication: +# Experimental dynamic replication configuration. +dynamic_replication: # (experimental) Use a higher number of replicas for recent blocks. Useful to # spread query load more evenly at the cost of slightly higher disk usage. - # CLI flag: -store-gateway.expanded-replication.enabled + # CLI flag: -store-gateway.dynamic-replication.enabled [enabled: | default = false] # (experimental) Threshold of the most recent sample in a block used to # determine it is eligible for higher than default replication. If a block has # samples within this amount of time, it is considered recent and will be # owned by more replicas. - # CLI flag: -store-gateway.expanded-replication.max-time-threshold + # CLI flag: -store-gateway.dynamic-replication.max-time-threshold [max_time_threshold: | default = 25h] # (advanced) Comma separated list of tenants that can be loaded by the diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index f14ae48fb0..f54b15f3d9 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -244,17 +244,17 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa return nil, errors.Wrap(err, "failed to create store-gateway ring client") } - var expandedReplication storegateway.ExpandedReplication = storegateway.NewNopExpandedReplication() - if gatewayCfg.ExpandedReplication.Enabled { - expandedReplication = storegateway.NewMaxTimeExpandedReplication( - gatewayCfg.ExpandedReplication.MaxTimeThreshold, + var dynamicReplication storegateway.DynamicReplication = storegateway.NewNopDynamicReplication() + if gatewayCfg.DynamicReplication.Enabled { + dynamicReplication = storegateway.NewMaxTimeDynamicReplication( + gatewayCfg.DynamicReplication.MaxTimeThreshold, // Exclude blocks which have recently become eligible for expanded replication, in order to give // enough time to store-gateways to discover and load them (3 times the sync interval) mimir_tsdb.NewBlockDiscoveryDelayMultiplier*storageCfg.BucketStore.SyncInterval, ) } - stores, err = newBlocksStoreReplicationSet(storesRing, randomLoadBalancing, expandedReplication, limits, querierCfg.StoreGatewayClient, logger, reg) + stores, err = newBlocksStoreReplicationSet(storesRing, randomLoadBalancing, dynamicReplication, limits, querierCfg.StoreGatewayClient, logger, reg) if err != nil { return nil, errors.Wrap(err, "failed to create store set") } diff --git a/pkg/querier/blocks_store_replicated_set.go b/pkg/querier/blocks_store_replicated_set.go index 1e7cdc6eb7..37f51431e8 100644 --- a/pkg/querier/blocks_store_replicated_set.go +++ b/pkg/querier/blocks_store_replicated_set.go @@ -36,11 +36,11 @@ const ( type blocksStoreReplicationSet struct { services.Service - storesRing *ring.Ring - clientsPool *client.Pool - balancingStrategy loadBalancingStrategy - expandedReplication storegateway.ExpandedReplication - limits BlocksStoreLimits + storesRing *ring.Ring + clientsPool *client.Pool + balancingStrategy loadBalancingStrategy + dynamicReplication storegateway.DynamicReplication + limits BlocksStoreLimits // Subservices manager. subservices *services.Manager @@ -50,19 +50,19 @@ type blocksStoreReplicationSet struct { func newBlocksStoreReplicationSet( storesRing *ring.Ring, balancingStrategy loadBalancingStrategy, - expandedReplication storegateway.ExpandedReplication, + dynamicReplication storegateway.DynamicReplication, limits BlocksStoreLimits, clientConfig ClientConfig, logger log.Logger, reg prometheus.Registerer, ) (*blocksStoreReplicationSet, error) { s := &blocksStoreReplicationSet{ - storesRing: storesRing, - clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg), - expandedReplication: expandedReplication, - balancingStrategy: balancingStrategy, - limits: limits, - subservicesWatcher: services.NewFailureWatcher(), + storesRing: storesRing, + clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg), + dynamicReplication: dynamicReplication, + balancingStrategy: balancingStrategy, + limits: limits, + subservicesWatcher: services.NewFailureWatcher(), } var err error @@ -106,13 +106,13 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blocks bucketin instances := make(map[string]ring.InstanceDesc) userRing := storegateway.GetShuffleShardingSubring(s.storesRing, userID, s.limits) - expandedReplicationOption := ring.WithReplicationFactor(userRing.InstancesCount()) + replicationOption := ring.WithReplicationFactor(userRing.InstancesCount()) // Find the replication set of each block we need to query. for _, block := range blocks { var ringOpts []ring.Option - if s.expandedReplication.EligibleForQuerying(block) { - ringOpts = append(ringOpts, expandedReplicationOption) + if s.dynamicReplication.EligibleForQuerying(block) { + ringOpts = append(ringOpts, replicationOption) } // Note that we don't pass buffers since we retain instances from the returned replication set. diff --git a/pkg/querier/blocks_store_replicated_set_test.go b/pkg/querier/blocks_store_replicated_set_test.go index b9b446a4da..4f1012731e 100644 --- a/pkg/querier/blocks_store_replicated_set_test.go +++ b/pkg/querier/blocks_store_replicated_set_test.go @@ -356,7 +356,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { } reg := prometheus.NewPedanticRegistry() - s, err := newBlocksStoreReplicationSet(r, noLoadBalancing, storegateway.NewNopExpandedReplication(), limits, ClientConfig{}, log.NewNopLogger(), reg) + s, err := newBlocksStoreReplicationSet(r, noLoadBalancing, storegateway.NewNopDynamicReplication(), limits, ClientConfig{}, log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck @@ -426,7 +426,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancin limits := &blocksStoreLimitsMock{storeGatewayTenantShardSize: 0} reg := prometheus.NewPedanticRegistry() - s, err := newBlocksStoreReplicationSet(r, randomLoadBalancing, storegateway.NewNopExpandedReplication(), limits, ClientConfig{}, log.NewNopLogger(), reg) + s, err := newBlocksStoreReplicationSet(r, randomLoadBalancing, storegateway.NewNopDynamicReplication(), limits, ClientConfig{}, log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck diff --git a/pkg/storegateway/expanded_replication.go b/pkg/storegateway/dynamic_replication.go similarity index 56% rename from pkg/storegateway/expanded_replication.go rename to pkg/storegateway/dynamic_replication.go index 9581729d75..29dc7bc32b 100644 --- a/pkg/storegateway/expanded_replication.go +++ b/pkg/storegateway/dynamic_replication.go @@ -12,17 +12,17 @@ var ( errInvalidExpandedReplicationMaxTimeThreshold = errors.New("invalid expanded replication max time threshold, the value must be at least one hour") ) -type ExpandedReplicationConfig struct { +type DynamicReplicationConfig struct { Enabled bool `yaml:"enabled" category:"experimental"` MaxTimeThreshold time.Duration `yaml:"max_time_threshold" category:"experimental"` } -func (cfg *ExpandedReplicationConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { - f.BoolVar(&cfg.Enabled, prefix+"expanded-replication.enabled", false, "Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage.") - f.DurationVar(&cfg.MaxTimeThreshold, prefix+"expanded-replication.max-time-threshold", 25*time.Hour, "Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas.") +func (cfg *DynamicReplicationConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.BoolVar(&cfg.Enabled, prefix+"dynamic-replication.enabled", false, "Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage.") + f.DurationVar(&cfg.MaxTimeThreshold, prefix+"dynamic-replication.max-time-threshold", 25*time.Hour, "Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas.") } -func (cfg *ExpandedReplicationConfig) Validate() error { +func (cfg *DynamicReplicationConfig) Validate() error { if cfg.Enabled && cfg.MaxTimeThreshold < time.Hour { return errInvalidExpandedReplicationMaxTimeThreshold } @@ -38,9 +38,9 @@ type ReplicatedBlock interface { GetMaxTime() time.Time } -// ExpandedReplication determines if a TSDB block is eligible to be sync to and queried from more +// DynamicReplication determines if a TSDB block is eligible to be sync to and queried from more // store-gateways than the configured replication factor based on metadata about the block. -type ExpandedReplication interface { +type DynamicReplication interface { // EligibleForSync returns true if the block can be synced to more than the configured (via // replication factor) number of store-gateways, false otherwise. EligibleForSync(b ReplicatedBlock) bool @@ -50,40 +50,40 @@ type ExpandedReplication interface { EligibleForQuerying(b ReplicatedBlock) bool } -func NewNopExpandedReplication() *NopExpandedReplication { - return &NopExpandedReplication{} +func NewNopDynamicReplication() *NopDynamicReplication { + return &NopDynamicReplication{} } -// NopExpandedReplication is an ExpandedReplication implementation that always returns false. -type NopExpandedReplication struct{} +// NopDynamicReplication is an DynamicReplication implementation that always returns false. +type NopDynamicReplication struct{} -func (n NopExpandedReplication) EligibleForSync(ReplicatedBlock) bool { +func (n NopDynamicReplication) EligibleForSync(ReplicatedBlock) bool { return false } -func (n NopExpandedReplication) EligibleForQuerying(ReplicatedBlock) bool { +func (n NopDynamicReplication) EligibleForQuerying(ReplicatedBlock) bool { return false } -func NewMaxTimeExpandedReplication(maxTime time.Duration, gracePeriod time.Duration) *MaxTimeExpandedReplication { - return &MaxTimeExpandedReplication{ +func NewMaxTimeDynamicReplication(maxTime time.Duration, gracePeriod time.Duration) *MaxTimeDynamicReplication { + return &MaxTimeDynamicReplication{ maxTime: maxTime, gracePeriod: gracePeriod, now: time.Now, } } -// MaxTimeExpandedReplication is an ExpandedReplication implementation that determines +// MaxTimeDynamicReplication is an DynamicReplication implementation that determines // if a block is eligible for expanded replication based on how recent its MaxTime (most // recent sample) is. An upload grace period can optionally be used to ensure that blocks // are synced to store-gateways before they are expected to be available by queriers. -type MaxTimeExpandedReplication struct { +type MaxTimeDynamicReplication struct { maxTime time.Duration gracePeriod time.Duration now func() time.Time } -func (e *MaxTimeExpandedReplication) EligibleForSync(b ReplicatedBlock) bool { +func (e *MaxTimeDynamicReplication) EligibleForSync(b ReplicatedBlock) bool { now := e.now() maxTimeDelta := now.Sub(b.GetMaxTime()) // We start syncing blocks `gracePeriod` before they become eligible for querying to @@ -91,7 +91,7 @@ func (e *MaxTimeExpandedReplication) EligibleForSync(b ReplicatedBlock) bool { return maxTimeDelta <= (e.maxTime + e.gracePeriod) } -func (e *MaxTimeExpandedReplication) EligibleForQuerying(b ReplicatedBlock) bool { +func (e *MaxTimeDynamicReplication) EligibleForQuerying(b ReplicatedBlock) bool { now := e.now() maxTimeDelta := now.Sub(b.GetMaxTime()) return maxTimeDelta <= e.maxTime diff --git a/pkg/storegateway/expanded_replication_test.go b/pkg/storegateway/dynamic_replication_test.go similarity index 96% rename from pkg/storegateway/expanded_replication_test.go rename to pkg/storegateway/dynamic_replication_test.go index a8bd0bdecb..0ded34bae7 100644 --- a/pkg/storegateway/expanded_replication_test.go +++ b/pkg/storegateway/dynamic_replication_test.go @@ -15,7 +15,7 @@ func TestMaxTimeExpandedReplication(t *testing.T) { // Round "now" to the nearest millisecond since we are using millisecond precision // for min/max times for the blocks. now := time.Now().Round(time.Millisecond) - replication := NewMaxTimeExpandedReplication(25*time.Hour, 45*time.Minute) + replication := NewMaxTimeDynamicReplication(25*time.Hour, 45*time.Minute) replication.now = func() time.Time { return now } type testCase struct { diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 161fc30965..d0ca043b58 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -53,8 +53,8 @@ var ( // Config holds the store gateway config. type Config struct { - ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration."` - ExpandedReplication ExpandedReplicationConfig `yaml:"expanded_replication" doc:"description=Experimental expanded replication configuration." category:"experimental"` + ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration."` + DynamicReplication DynamicReplicationConfig `yaml:"dynamic_replication" doc:"description=Experimental dynamic replication configuration." category:"experimental"` EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants" category:"advanced"` DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants" category:"advanced"` @@ -63,7 +63,7 @@ type Config struct { // RegisterFlags registers the Config flags. func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { cfg.ShardingRing.RegisterFlags(f, logger) - cfg.ExpandedReplication.RegisterFlagsWithPrefix(f, "store-gateway.") + cfg.DynamicReplication.RegisterFlagsWithPrefix(f, "store-gateway.") f.Var(&cfg.EnabledTenants, "store-gateway.enabled-tenants", "Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding.") f.Var(&cfg.DisabledTenants, "store-gateway.disabled-tenants", "Comma separated list of tenants that cannot be loaded by the store-gateway. If specified, and the store-gateway would normally load a given tenant for (via -store-gateway.enabled-tenants or sharding), it will be ignored instead.") @@ -75,7 +75,7 @@ func (cfg *Config) Validate(limits validation.Limits) error { return errInvalidTenantShardSize } - if err := cfg.ExpandedReplication.Validate(); err != nil { + if err := cfg.DynamicReplication.Validate(); err != nil { return err } @@ -179,17 +179,17 @@ func newStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfi return nil, errors.Wrap(err, "create ring client") } - var expandedReplication ExpandedReplication = NewNopExpandedReplication() - if gatewayCfg.ExpandedReplication.Enabled { - expandedReplication = NewMaxTimeExpandedReplication( - gatewayCfg.ExpandedReplication.MaxTimeThreshold, + var dynamicReplication DynamicReplication = NewNopDynamicReplication() + if gatewayCfg.DynamicReplication.Enabled { + dynamicReplication = NewMaxTimeDynamicReplication( + gatewayCfg.DynamicReplication.MaxTimeThreshold, // Exclude blocks which have recently become eligible for expanded replication, in order to give // enough time to store-gateways to discover and load them (3 times the sync interval) mimir_tsdb.NewBlockDiscoveryDelayMultiplier*storageCfg.BucketStore.SyncInterval, ) } - shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, expandedReplication, limits, logger) + shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, dynamicReplication, limits, logger) allowedTenants := util.NewAllowedTenants(gatewayCfg.EnabledTenants, gatewayCfg.DisabledTenants) if len(gatewayCfg.EnabledTenants) > 0 { diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go index cebe891fd4..d46ae0c8d2 100644 --- a/pkg/storegateway/sharding_strategy.go +++ b/pkg/storegateway/sharding_strategy.go @@ -46,23 +46,23 @@ type ShardingLimits interface { // ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, // where each tenant blocks are sharded across a subset of store-gateway instances. type ShuffleShardingStrategy struct { - r *ring.Ring - instanceID string - instanceAddr string - expandedReplication ExpandedReplication - limits ShardingLimits - logger log.Logger + r *ring.Ring + instanceID string + instanceAddr string + dynamicReplication DynamicReplication + limits ShardingLimits + logger log.Logger } // NewShuffleShardingStrategy makes a new ShuffleShardingStrategy. -func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, expandedReplication ExpandedReplication, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy { +func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, dynamicReplication DynamicReplication, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy { return &ShuffleShardingStrategy{ - r: r, - instanceID: instanceID, - instanceAddr: instanceAddr, - expandedReplication: expandedReplication, - limits: limits, - logger: logger, + r: r, + instanceID: instanceID, + instanceAddr: instanceAddr, + dynamicReplication: dynamicReplication, + limits: limits, + logger: logger, } } @@ -113,14 +113,14 @@ func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, } r := GetShuffleShardingSubring(s.r, userID, s.limits) - expandedReplicationOption := ring.WithReplicationFactor(r.InstancesCount()) + replicationOption := ring.WithReplicationFactor(r.InstancesCount()) bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() bufOption := ring.WithBuffers(bufDescs, bufHosts, bufZones) for blockID := range metas { ringOpts := []ring.Option{bufOption} - if s.expandedReplication.EligibleForSync(metas[blockID]) { - ringOpts = append(ringOpts, expandedReplicationOption) + if s.dynamicReplication.EligibleForSync(metas[blockID]) { + ringOpts = append(ringOpts, replicationOption) } // Check if the block is owned by the store-gateway diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index fdc14904c7..958fe5b94e 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -58,7 +58,7 @@ func TestShuffleShardingStrategy(t *testing.T) { tests := map[string]struct { replicationFactor int - expandedReplication ExpandedReplication + expandedReplication DynamicReplication limits ShardingLimits setupRing func(*ring.Desc) prevLoadedBlocks map[string]map[ulid.ULID]struct{} @@ -67,7 +67,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }{ "one ACTIVE instance in the ring with RF = 1 and SS = 1": { replicationFactor: 1, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -83,7 +83,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "one ACTIVE instance in the ring with RF = 2 and SS = 1 (should still sync blocks on the only available instance)": { replicationFactor: 1, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -99,7 +99,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "one ACTIVE instance in the ring with RF = 2 and SS = 2 (should still sync blocks on the only available instance)": { replicationFactor: 1, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -115,7 +115,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "two ACTIVE instances in the ring with RF = 1 and SS = 1 (should sync blocks on 1 instance because of the shard size)": { replicationFactor: 1, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -132,7 +132,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "two ACTIVE instances in the ring with RF = 1 and SS = 2 (should sync blocks on 2 instances because of the shard size)": { replicationFactor: 1, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -149,7 +149,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "two ACTIVE instances in the ring with RF = 2 and SS = 1 (should sync blocks on 1 instance because of the shard size)": { replicationFactor: 2, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -166,7 +166,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "two ACTIVE instances in the ring with RF = 2 and SS = 2 (should sync all blocks on 2 instances)": { replicationFactor: 2, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -183,7 +183,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "multiple ACTIVE instances in the ring with RF = 2 and SS = 3": { replicationFactor: 2, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -203,7 +203,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "multiple ACTIVE instances in the ring with RF = 1 and SS = 3 and ER = true": { replicationFactor: 1, - expandedReplication: NewMaxTimeExpandedReplication(25*time.Hour, 45*time.Minute), + expandedReplication: NewMaxTimeDynamicReplication(25*time.Hour, 45*time.Minute), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -223,7 +223,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "one unhealthy instance in the ring with RF = 1, SS = 3 and NO previously loaded blocks": { replicationFactor: 1, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -250,7 +250,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "one unhealthy instance in the ring with RF = 2, SS = 3 and NO previously loaded blocks": { replicationFactor: 2, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -276,7 +276,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "one unhealthy instance in the ring with RF = 2, SS = 2 and NO previously loaded blocks": { replicationFactor: 2, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -302,7 +302,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "one unhealthy instance in the ring with RF = 2, SS = 2 and some previously loaded blocks": { replicationFactor: 2, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -331,7 +331,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "LEAVING instance in the ring should continue to keep its shard blocks and they should NOT be replicated to another instance": { replicationFactor: 1, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -351,7 +351,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "JOINING instance in the ring should get its shard blocks and they should not be replicated to another instance": { replicationFactor: 1, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{}) @@ -371,7 +371,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "SS = 0 disables shuffle sharding": { replicationFactor: 1, - expandedReplication: NewNopExpandedReplication(), + expandedReplication: NewNopDynamicReplication(), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 0}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{})