diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d599ea8d04..89b87452dbb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ * [ENHANCEMENT] Store Gateway/Querier/Compactor: Handling CMK Access Denied errors. #5420 #5442 #5446 * [ENHANCEMENT] Store Gateway: Implementing multi level index cache. #5451 * [ENHANCEMENT] Alertmanager: Add the alert name in error log when it get throttled. #5456 +* [ENHANCEMENT] Querier: Retry store gateway on different zones when zone awareness is enabled. #5476 * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 * [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293 diff --git a/pkg/querier/blocks_store_balanced_set.go b/pkg/querier/blocks_store_balanced_set.go index 73278b0619f..44b52eff8c0 100644 --- a/pkg/querier/blocks_store_balanced_set.go +++ b/pkg/querier/blocks_store_balanced_set.go @@ -60,7 +60,7 @@ func (s *blocksStoreBalancedSet) resolve(ctx context.Context) error { return nil } -func (s *blocksStoreBalancedSet) GetClientsFor(_ string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) { +func (s *blocksStoreBalancedSet) GetClientsFor(_ string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string, _ map[ulid.ULID]map[string]int) (map[BlocksStoreClient][]ulid.ULID, error) { addresses := s.dnsProvider.Addresses() if len(addresses) == 0 { return nil, fmt.Errorf("no address resolved for the store-gateway service addresses %s", strings.Join(s.serviceAddresses, ",")) diff --git a/pkg/querier/blocks_store_balanced_set_test.go b/pkg/querier/blocks_store_balanced_set_test.go index 084f3c51422..8f6ca606efe 100644 --- a/pkg/querier/blocks_store_balanced_set_test.go +++ b/pkg/querier/blocks_store_balanced_set_test.go @@ -34,7 +34,7 @@ func TestBlocksStoreBalancedSet_GetClientsFor(t *testing.T) { clientsCount := map[string]int{} for i := 0; i < numGets; i++ { - clients, err := s.GetClientsFor("", []ulid.ULID{block1}, map[ulid.ULID][]string{}) + clients, err := s.GetClientsFor("", []ulid.ULID{block1}, map[ulid.ULID][]string{}, nil) require.NoError(t, err) require.Len(t, clients, 1) @@ -141,7 +141,7 @@ func TestBlocksStoreBalancedSet_GetClientsFor_Exclude(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(ctx, s)) defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck - clients, err := s.GetClientsFor("", testData.queryBlocks, testData.exclude) + clients, err := s.GetClientsFor("", testData.queryBlocks, testData.exclude, nil) assert.Equal(t, testData.expectedErr, err) if testData.expectedErr == nil { diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index c355be998cc..fc194932f2e 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -70,7 +70,7 @@ type BlocksStoreSet interface { // GetClientsFor returns the store gateway clients that should be used to // query the set of blocks in input. The exclude parameter is the map of // blocks -> store-gateway addresses that should be excluded. - GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) + GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string, attemptedBlocksZones map[ulid.ULID]map[string]int) (map[BlocksStoreClient][]ulid.ULID, error) } // BlocksFinder is the interface used to find blocks for a given user and time range. @@ -228,7 +228,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa return nil, errors.Wrap(err, "failed to create store-gateway ring client") } - stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, randomLoadBalancing, limits, querierCfg.StoreGatewayClient, logger, reg) + stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, randomLoadBalancing, limits, querierCfg.StoreGatewayClient, logger, reg, storesRingCfg.ZoneAwarenessEnabled) if err != nil { return nil, errors.Wrap(err, "failed to create store set") } @@ -499,13 +499,14 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logg attemptedBlocks = map[ulid.ULID][]string{} touchedStores = map[string]struct{}{} - resQueriedBlocks = []ulid.ULID(nil) + resQueriedBlocks = []ulid.ULID(nil) + attemptedBlocksZones = make(map[ulid.ULID]map[string]int, len(remainingBlocks)) ) for attempt := 1; attempt <= maxFetchSeriesAttempts; attempt++ { // Find the set of store-gateway instances having the blocks. The exclude parameter is the // map of blocks queried so far, with the list of store-gateway addresses for each block. - clients, err := q.stores.GetClientsFor(q.userID, remainingBlocks, attemptedBlocks) + clients, err := q.stores.GetClientsFor(q.userID, remainingBlocks, attemptedBlocks, attemptedBlocksZones) if err != nil { // If it's a retry and we get an error, it means there are no more store-gateways left // from which running another attempt, so we're just stopping retrying. diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index c4d925d4caf..26935a1f397 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -1515,7 +1515,7 @@ type blocksStoreSetMock struct { nextResult int } -func (m *blocksStoreSetMock) GetClientsFor(_ string, _ []ulid.ULID, _ map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) { +func (m *blocksStoreSetMock) GetClientsFor(_ string, _ []ulid.ULID, _ map[ulid.ULID][]string, _ map[ulid.ULID]map[string]int) (map[BlocksStoreClient][]ulid.ULID, error) { if m.nextResult >= len(m.mockedResponses) { panic("not enough mocked results") } diff --git a/pkg/querier/blocks_store_replicated_set.go b/pkg/querier/blocks_store_replicated_set.go index bd63fddf025..cb2bdd3863e 100644 --- a/pkg/querier/blocks_store_replicated_set.go +++ b/pkg/querier/blocks_store_replicated_set.go @@ -3,6 +3,7 @@ package querier import ( "context" "fmt" + "math" "math/rand" "github.com/go-kit/log" @@ -36,6 +37,8 @@ type blocksStoreReplicationSet struct { balancingStrategy loadBalancingStrategy limits BlocksStoreLimits + zoneAwarenessEnabled bool + // Subservices manager. subservices *services.Manager subservicesWatcher *services.FailureWatcher @@ -49,13 +52,15 @@ func newBlocksStoreReplicationSet( clientConfig ClientConfig, logger log.Logger, reg prometheus.Registerer, + zoneAwarenessEnabled bool, ) (*blocksStoreReplicationSet, error) { s := &blocksStoreReplicationSet{ - storesRing: storesRing, - clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg), - shardingStrategy: shardingStrategy, - balancingStrategy: balancingStrategy, - limits: limits, + storesRing: storesRing, + clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg), + shardingStrategy: shardingStrategy, + balancingStrategy: balancingStrategy, + limits: limits, + zoneAwarenessEnabled: zoneAwarenessEnabled, } var err error @@ -94,7 +99,7 @@ func (s *blocksStoreReplicationSet) stopping(_ error) error { return services.StopManagerAndAwaitStopped(context.Background(), s.subservices) } -func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) { +func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string, attemptedBlocksZones map[ulid.ULID]map[string]int) (map[BlocksStoreClient][]ulid.ULID, error) { shards := map[string][]ulid.ULID{} // If shuffle sharding is enabled, we should build a subring for the user, @@ -118,12 +123,19 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid } // Pick a non excluded store-gateway instance. - addr := getNonExcludedInstanceAddr(set, exclude[blockID], s.balancingStrategy) - if addr == "" { + instance := getNonExcludedInstance(set, exclude[blockID], s.balancingStrategy, s.zoneAwarenessEnabled, attemptedBlocksZones[blockID]) + // A valid instance should have a non-empty address. + if instance.Addr == "" { return nil, fmt.Errorf("no store-gateway instance left after checking exclude for block %s", blockID.String()) } - shards[addr] = append(shards[addr], blockID) + shards[instance.Addr] = append(shards[instance.Addr], blockID) + if s.zoneAwarenessEnabled { + if _, ok := attemptedBlocksZones[blockID]; !ok { + attemptedBlocksZones[blockID] = make(map[string]int, 0) + } + attemptedBlocksZones[blockID][instance.Zone]++ + } } clients := map[BlocksStoreClient][]ulid.ULID{} @@ -141,7 +153,7 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid return clients, nil } -func getNonExcludedInstanceAddr(set ring.ReplicationSet, exclude []string, balancingStrategy loadBalancingStrategy) string { +func getNonExcludedInstance(set ring.ReplicationSet, exclude []string, balancingStrategy loadBalancingStrategy, zoneAwarenessEnabled bool, attemptedZones map[string]int) ring.InstanceDesc { if balancingStrategy == randomLoadBalancing { // Randomize the list of instances to not always query the same one. rand.Shuffle(len(set.Instances), func(i, j int) { @@ -149,11 +161,30 @@ func getNonExcludedInstanceAddr(set ring.ReplicationSet, exclude []string, balan }) } + minAttempt := math.MaxInt + numOfZone := set.GetNumOfZones() + // There are still unattempted zones so we know min is 0. + if len(attemptedZones) < numOfZone { + minAttempt = 0 + } else { + // Iterate over attempted zones and find the min attempts. + for _, c := range attemptedZones { + if c < minAttempt { + minAttempt = c + } + } + } for _, instance := range set.Instances { - if !util.StringsContain(exclude, instance.Addr) { - return instance.Addr + if util.StringsContain(exclude, instance.Addr) { + continue + } + // If zone awareness is not enabled, pick first non-excluded instance. + // Otherwise, keep iterating until we find an instance in a zone where + // we have the least retries. + if !zoneAwarenessEnabled || attemptedZones[instance.Zone] == minAttempt { + return instance } } - return "" + return ring.InstanceDesc{} } diff --git a/pkg/querier/blocks_store_replicated_set_test.go b/pkg/querier/blocks_store_replicated_set_test.go index 4c1b9a529d7..8d780694e35 100644 --- a/pkg/querier/blocks_store_replicated_set_test.go +++ b/pkg/querier/blocks_store_replicated_set_test.go @@ -3,6 +3,7 @@ package querier import ( "context" "fmt" + "strconv" "strings" "testing" "time" @@ -28,28 +29,51 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { // The following block IDs have been picked to have increasing hash values // in order to simplify the tests. - block1 := ulid.MustNew(1, nil) // hash: 283204220 - block2 := ulid.MustNew(2, nil) // hash: 444110359 - block3 := ulid.MustNew(5, nil) // hash: 2931974232 - block4 := ulid.MustNew(6, nil) // hash: 3092880371 + block1 := ulid.MustNew(4, nil) // hash: 122298081 + block2 := ulid.MustNew(1, nil) // hash: 283204220 + block3 := ulid.MustNew(2, nil) // hash: 444110359 + block4 := ulid.MustNew(12, nil) // hash: 1124870809 + block5 := ulid.MustNew(9, nil) // hash: 1285776948 + block6 := ulid.MustNew(10, nil) // hash: 1446683087 + block7 := ulid.MustNew(7, nil) // hash: 1607589226 + block8 := ulid.MustNew(8, nil) // hash: 1285776948 + block9 := ulid.MustNew(5, nil) // hash: 2931974232 + block10 := ulid.MustNew(6, nil) // hash: 3092880371 + block11 := ulid.MustNew(3, nil) // hash: 3092880371 block1Hash := cortex_tsdb.HashBlockID(block1) block2Hash := cortex_tsdb.HashBlockID(block2) block3Hash := cortex_tsdb.HashBlockID(block3) block4Hash := cortex_tsdb.HashBlockID(block4) + block5Hash := cortex_tsdb.HashBlockID(block5) + _ = block5Hash + block6Hash := cortex_tsdb.HashBlockID(block6) + _ = block6Hash + block7Hash := cortex_tsdb.HashBlockID(block7) + _ = block7Hash + block8Hash := cortex_tsdb.HashBlockID(block8) + _ = block8Hash + block9Hash := cortex_tsdb.HashBlockID(block9) + _ = block9Hash + block10Hash := cortex_tsdb.HashBlockID(block10) + _ = block10Hash + block11Hash := cortex_tsdb.HashBlockID(block11) + _ = block11Hash userID := "user-A" registeredAt := time.Now() tests := map[string]struct { - shardingStrategy string - tenantShardSize float64 - replicationFactor int - setup func(*ring.Desc) - queryBlocks []ulid.ULID - exclude map[ulid.ULID][]string - expectedClients map[string][]ulid.ULID - expectedErr error + shardingStrategy string + tenantShardSize float64 + replicationFactor int + setup func(*ring.Desc) + queryBlocks []ulid.ULID + exclude map[ulid.ULID][]string + attemptedBlocksZones map[ulid.ULID]map[string]int + zoneAwarenessEnabled bool + expectedClients map[string][]ulid.ULID + expectedErr error }{ // // Sharding strategy: default @@ -263,7 +287,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { queryBlocks: []ulid.ULID{block1, block2, block4}, expectedClients: map[string][]ulid.ULID{ "127.0.0.1": {block1, block4}, - "127.0.0.3": {block2}, + "127.0.0.2": {block2}, }, }, "shuffle sharding, multiple instances in the ring with RF = 1, SS = 4": { @@ -299,7 +323,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { block2: {"127.0.0.1"}, }, expectedClients: map[string][]ulid.ULID{ - "127.0.0.3": {block1, block2}, + "127.0.0.2": {block1, block2}, }, }, "shuffle sharding, multiple instances in the ring with RF = 2, SS = 2 with excluded blocks and no replacement available": { @@ -314,11 +338,220 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { }, queryBlocks: []ulid.ULID{block1, block2}, exclude: map[ulid.ULID][]string{ - block1: {"127.0.0.1", "127.0.0.3"}, + block1: {"127.0.0.1", "127.0.0.2"}, block2: {"127.0.0.1"}, }, expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()), }, + "shuffle sharding, multiple instances in the ring with RF = 3, SS = 3 and zone awareness enabled": { + shardingStrategy: util.ShardingStrategyShuffle, + tenantShardSize: 3, + replicationFactor: 3, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "1", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-2", "127.0.0.2", "2", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-3", "127.0.0.3", "3", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-4", "127.0.0.4", "1", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-5", "127.0.0.5", "2", []uint32{block5Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-6", "127.0.0.6", "3", []uint32{block6Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-7", "127.0.0.7", "1", []uint32{block7Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-8", "127.0.0.8", "2", []uint32{block8Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-9", "127.0.0.9", "3", []uint32{block9Hash + 1}, ring.ACTIVE, registeredAt) + }, + queryBlocks: []ulid.ULID{block1, block2}, + zoneAwarenessEnabled: true, + attemptedBlocksZones: make(map[ulid.ULID]map[string]int, 0), + expectedClients: map[string][]ulid.ULID{ + "127.0.0.1": {block1}, + "127.0.0.6": {block2}, + }, + }, + "shuffle sharding, multiple instances in the ring with RF = 3, SS = 3, exclude and zone awareness enabled": { + shardingStrategy: util.ShardingStrategyShuffle, + tenantShardSize: 3, + replicationFactor: 3, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "1", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-2", "127.0.0.2", "2", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-3", "127.0.0.3", "3", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-4", "127.0.0.4", "1", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-5", "127.0.0.5", "2", []uint32{block5Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-6", "127.0.0.6", "3", []uint32{block6Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-7", "127.0.0.7", "1", []uint32{block7Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-8", "127.0.0.8", "2", []uint32{block8Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-9", "127.0.0.9", "3", []uint32{block9Hash + 1}, ring.ACTIVE, registeredAt) + }, + queryBlocks: []ulid.ULID{block1}, + zoneAwarenessEnabled: true, + exclude: map[ulid.ULID][]string{ + block1: {"127.0.0.1"}, + }, + attemptedBlocksZones: map[ulid.ULID]map[string]int{ + block1: {"1": 1}, + }, + expectedClients: map[string][]ulid.ULID{ + "127.0.0.6": {block1}, + }, + }, + "shuffle sharding, multiple instances in the ring with RF = 3, SS = 3, exclude 2 blocks and zone awareness enabled": { + shardingStrategy: util.ShardingStrategyShuffle, + tenantShardSize: 3, + replicationFactor: 3, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "1", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-2", "127.0.0.2", "2", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-3", "127.0.0.3", "3", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-4", "127.0.0.4", "1", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-5", "127.0.0.5", "2", []uint32{block5Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-6", "127.0.0.6", "3", []uint32{block6Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-7", "127.0.0.7", "1", []uint32{block7Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-8", "127.0.0.8", "2", []uint32{block8Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-9", "127.0.0.9", "3", []uint32{block9Hash + 1}, ring.ACTIVE, registeredAt) + }, + queryBlocks: []ulid.ULID{block1}, + zoneAwarenessEnabled: true, + exclude: map[ulid.ULID][]string{ + block1: {"127.0.0.1", "127.0.0.6"}, + }, + attemptedBlocksZones: map[ulid.ULID]map[string]int{ + block1: {"1": 1, "3": 1}, + }, + expectedClients: map[string][]ulid.ULID{ + "127.0.0.8": {block1}, + }, + }, + "shuffle sharding, multiple instances in the ring with RF = 3, SS = 3, exclude 3 blocks and zone awareness enabled": { + shardingStrategy: util.ShardingStrategyShuffle, + tenantShardSize: 3, + replicationFactor: 3, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "1", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-2", "127.0.0.2", "2", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-3", "127.0.0.3", "3", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-4", "127.0.0.4", "1", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-5", "127.0.0.5", "2", []uint32{block5Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-6", "127.0.0.6", "3", []uint32{block6Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-7", "127.0.0.7", "1", []uint32{block7Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-8", "127.0.0.8", "2", []uint32{block8Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-9", "127.0.0.9", "3", []uint32{block9Hash + 1}, ring.ACTIVE, registeredAt) + }, + queryBlocks: []ulid.ULID{block1}, + zoneAwarenessEnabled: true, + exclude: map[ulid.ULID][]string{ + block1: {"127.0.0.1", "127.0.0.6", "127.0.0.8"}, + }, + attemptedBlocksZones: map[ulid.ULID]map[string]int{ + block1: {"1": 1, "2": 1, "3": 1}, + }, + expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()), + }, + "shuffle sharding, multiple instances in the ring with RF = 6, SS = 6, exclude 3 blocks and zone awareness enabled": { + shardingStrategy: util.ShardingStrategyShuffle, + tenantShardSize: 6, + replicationFactor: 6, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "1", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-2", "127.0.0.2", "2", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-3", "127.0.0.3", "3", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-4", "127.0.0.4", "1", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-5", "127.0.0.5", "2", []uint32{block5Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-6", "127.0.0.6", "3", []uint32{block6Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-7", "127.0.0.7", "1", []uint32{block7Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-8", "127.0.0.8", "2", []uint32{block8Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-9", "127.0.0.9", "3", []uint32{block9Hash + 1}, ring.ACTIVE, registeredAt) + }, + queryBlocks: []ulid.ULID{block1}, + zoneAwarenessEnabled: true, + exclude: map[ulid.ULID][]string{ + block1: {"127.0.0.1", "127.0.0.6", "127.0.0.8"}, + }, + attemptedBlocksZones: map[ulid.ULID]map[string]int{ + block1: {"1": 1, "2": 1, "3": 1}, + }, + expectedClients: map[string][]ulid.ULID{ + "127.0.0.2": {block1}, + }, + }, + "shuffle sharding, multiple instances in the ring with RF = 6, SS = 6, exclude 2 blocks and zone awareness enabled": { + shardingStrategy: util.ShardingStrategyShuffle, + tenantShardSize: 6, + replicationFactor: 6, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "1", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-2", "127.0.0.2", "2", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-3", "127.0.0.3", "3", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-4", "127.0.0.4", "1", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-5", "127.0.0.5", "2", []uint32{block5Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-6", "127.0.0.6", "3", []uint32{block6Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-7", "127.0.0.7", "1", []uint32{block7Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-8", "127.0.0.8", "2", []uint32{block8Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-9", "127.0.0.9", "3", []uint32{block9Hash + 1}, ring.ACTIVE, registeredAt) + }, + queryBlocks: []ulid.ULID{block1}, + zoneAwarenessEnabled: true, + exclude: map[ulid.ULID][]string{ + block1: {"127.0.0.1", "127.0.0.6"}, + }, + attemptedBlocksZones: map[ulid.ULID]map[string]int{ + block1: {"1": 1, "3": 1}, + }, + expectedClients: map[string][]ulid.ULID{ + "127.0.0.2": {block1}, + }, + }, + // This should never happen, just to test the attemptedZoneMap works correctly. + "shuffle sharding, multiple instances in the ring with RF = 6, SS = 6, no exclude blocks but attempted 2 zones and zone awareness enabled": { + shardingStrategy: util.ShardingStrategyShuffle, + tenantShardSize: 6, + replicationFactor: 6, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "1", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-2", "127.0.0.2", "2", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-3", "127.0.0.3", "3", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-4", "127.0.0.4", "1", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-5", "127.0.0.5", "2", []uint32{block5Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-6", "127.0.0.6", "3", []uint32{block6Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-7", "127.0.0.7", "1", []uint32{block7Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-8", "127.0.0.8", "2", []uint32{block8Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-9", "127.0.0.9", "3", []uint32{block9Hash + 1}, ring.ACTIVE, registeredAt) + }, + queryBlocks: []ulid.ULID{block1}, + zoneAwarenessEnabled: true, + attemptedBlocksZones: map[ulid.ULID]map[string]int{ + block1: {"1": 1, "3": 1}, + }, + expectedClients: map[string][]ulid.ULID{ + "127.0.0.2": {block1}, + }, + }, + // This should never happen, just to test the attemptedZoneMap works correctly. + "shuffle sharding, multiple instances in the ring with RF = 6, SS = 6, one exclude block but attempted 2 zones and zone awareness enabled": { + shardingStrategy: util.ShardingStrategyShuffle, + tenantShardSize: 6, + replicationFactor: 6, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "1", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-2", "127.0.0.2", "2", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-3", "127.0.0.3", "3", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-4", "127.0.0.4", "1", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-5", "127.0.0.5", "2", []uint32{block5Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-6", "127.0.0.6", "3", []uint32{block6Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-7", "127.0.0.7", "1", []uint32{block7Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-8", "127.0.0.8", "2", []uint32{block8Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-9", "127.0.0.9", "3", []uint32{block9Hash + 1}, ring.ACTIVE, registeredAt) + }, + queryBlocks: []ulid.ULID{block1}, + zoneAwarenessEnabled: true, + exclude: map[ulid.ULID][]string{ + block1: {"127.0.0.2"}, + }, + attemptedBlocksZones: map[ulid.ULID]map[string]int{ + block1: {"1": 1, "3": 1}, + }, + expectedClients: map[string][]ulid.ULID{ + "127.0.0.8": {block1}, + }, + }, } for testName, testData := range tests { @@ -342,6 +575,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { ringCfg := ring.Config{} flagext.DefaultValues(&ringCfg) ringCfg.ReplicationFactor = testData.replicationFactor + ringCfg.HeartbeatTimeout = time.Hour r, err := ring.NewWithStoreClientAndStrategy(ringCfg, "test", "test", ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), nil, nil) require.NoError(t, err) @@ -351,7 +585,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { } reg := prometheus.NewPedanticRegistry() - s, err := newBlocksStoreReplicationSet(r, testData.shardingStrategy, noLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg) + s, err := newBlocksStoreReplicationSet(r, testData.shardingStrategy, noLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, testData.zoneAwarenessEnabled) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck @@ -362,7 +596,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { return err == nil && len(all.Instances) > 0 }) - clients, err := s.GetClientsFor(userID, testData.queryBlocks, testData.exclude) + clients, err := s.GetClientsFor(userID, testData.queryBlocks, testData.exclude, testData.attemptedBlocksZones) assert.Equal(t, testData.expectedErr, err) if testData.expectedErr == nil { @@ -413,7 +647,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancin limits := &blocksStoreLimitsMock{} reg := prometheus.NewPedanticRegistry() - s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg) + s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, false) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck @@ -429,7 +663,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancin distribution := map[string]int{} for n := 0; n < numRuns; n++ { - clients, err := s.GetClientsFor(userID, []ulid.ULID{block1}, nil) + clients, err := s.GetClientsFor(userID, []ulid.ULID{block1}, nil, nil) require.NoError(t, err) require.Len(t, clients, 1) @@ -446,6 +680,78 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancin } } +func TestBlocksStoreReplicationSet_GetClientsFor_ZoneAwareness(t *testing.T) { + t.Parallel() + + const ( + numRuns = 1000 + numInstances = 9 + ) + + ctx := context.Background() + userID := "user-A" + registeredAt := time.Now() + block1 := ulid.MustNew(1, nil) + + // Create a ring. + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + require.NoError(t, ringStore.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) { + d := ring.NewDesc() + for n := 1; n <= numInstances; n++ { + zone := strconv.Itoa((n-1)%3 + 1) + d.AddIngester(fmt.Sprintf("instance-%d", n), fmt.Sprintf("127.0.0.%d", n), zone, []uint32{uint32(n)}, ring.ACTIVE, registeredAt) + } + return d, true, nil + })) + + // Configure a replication factor equal to the number of instances, so that every store-gateway gets all blocks. + ringCfg := ring.Config{} + flagext.DefaultValues(&ringCfg) + ringCfg.ReplicationFactor = numInstances + + r, err := ring.NewWithStoreClientAndStrategy(ringCfg, "test", "test", ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), nil, nil) + require.NoError(t, err) + + limits := &blocksStoreLimitsMock{} + reg := prometheus.NewPedanticRegistry() + s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, true) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, s)) + defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck + + // Wait until the ring client has initialised the state. + test.Poll(t, time.Second, true, func() interface{} { + all, err := r.GetAllHealthy(ring.Read) + return err == nil && len(all.Instances) > 0 + }) + + // Target hit shouldn't exist in the blocksMap. + targets := [3]int{3, 2, 1} + for i := 0; i < numRuns; i++ { + blocksMap := [3]map[string]int{ + {"1": 1, "2": 1}, + {"1": 1, "3": 1}, + {"2": 1, "3": 1}, + } + attemptedBlocksZone := map[ulid.ULID]map[string]int{ + block1: blocksMap[i%3], + } + clients, err := s.GetClientsFor(userID, []ulid.ULID{block1}, nil, attemptedBlocksZone) + require.NoError(t, err) + require.Len(t, clients, 1) + for c := range clients { + addr := c.RemoteAddress() + parts := strings.Split(addr, ".") + require.True(t, len(parts) > 3) + id, err := strconv.Atoi(parts[3]) + require.NoError(t, err) + require.Equal(t, targets[i%3], (id-1)%3+1) + } + } +} + func getStoreGatewayClientAddrs(clients map[BlocksStoreClient][]ulid.ULID) map[string][]ulid.ULID { addrs := map[string][]ulid.ULID{} for c, blockIDs := range clients { diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index c39f253dae8..67630bf53cd 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -127,6 +127,15 @@ func (r ReplicationSet) GetAddressesWithout(exclude string) []string { return addrs } +// GetNumOfZones returns number of distinct zones. +func (r ReplicationSet) GetNumOfZones() int { + set := make(map[string]struct{}) + for _, instance := range r.Instances { + set[instance.GetZone()] = struct{}{} + } + return len(set) +} + // HasReplicationSetChanged returns false if two replications sets are the same (with possibly different timestamps), // true if they differ in any way (number of instances, instance states, tokens, zones, ...). func HasReplicationSetChanged(before, after ReplicationSet) bool {