Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [ENHANCEMENT] Store Gateway/Querier/Compactor: Handling CMK Access Denied errors. #5420 #5442 #5446
* [ENHANCEMENT] Store Gateway: Implementing multi level index cache. #5451
* [ENHANCEMENT] Alertmanager: Add the alert name in error log when it get throttled. #5456
* [ENHANCEMENT] Querier: Retry store gateway on different zones when zone awareness is enabled. #5476
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_balanced_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *blocksStoreBalancedSet) resolve(ctx context.Context) error {
return nil
}

func (s *blocksStoreBalancedSet) GetClientsFor(_ string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) {
func (s *blocksStoreBalancedSet) GetClientsFor(_ string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string, _ map[ulid.ULID]map[string]int) (map[BlocksStoreClient][]ulid.ULID, error) {
addresses := s.dnsProvider.Addresses()
if len(addresses) == 0 {
return nil, fmt.Errorf("no address resolved for the store-gateway service addresses %s", strings.Join(s.serviceAddresses, ","))
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/blocks_store_balanced_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestBlocksStoreBalancedSet_GetClientsFor(t *testing.T) {
clientsCount := map[string]int{}

for i := 0; i < numGets; i++ {
clients, err := s.GetClientsFor("", []ulid.ULID{block1}, map[ulid.ULID][]string{})
clients, err := s.GetClientsFor("", []ulid.ULID{block1}, map[ulid.ULID][]string{}, nil)
require.NoError(t, err)
require.Len(t, clients, 1)

Expand Down Expand Up @@ -141,7 +141,7 @@ func TestBlocksStoreBalancedSet_GetClientsFor_Exclude(t *testing.T) {
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck

clients, err := s.GetClientsFor("", testData.queryBlocks, testData.exclude)
clients, err := s.GetClientsFor("", testData.queryBlocks, testData.exclude, nil)
assert.Equal(t, testData.expectedErr, err)

if testData.expectedErr == nil {
Expand Down
9 changes: 5 additions & 4 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type BlocksStoreSet interface {
// GetClientsFor returns the store gateway clients that should be used to
// query the set of blocks in input. The exclude parameter is the map of
// blocks -> store-gateway addresses that should be excluded.
GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error)
GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string, attemptedBlocksZones map[ulid.ULID]map[string]int) (map[BlocksStoreClient][]ulid.ULID, error)
}

// BlocksFinder is the interface used to find blocks for a given user and time range.
Expand Down Expand Up @@ -228,7 +228,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
return nil, errors.Wrap(err, "failed to create store-gateway ring client")
}

stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, randomLoadBalancing, limits, querierCfg.StoreGatewayClient, logger, reg)
stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, randomLoadBalancing, limits, querierCfg.StoreGatewayClient, logger, reg, storesRingCfg.ZoneAwarenessEnabled)
if err != nil {
return nil, errors.Wrap(err, "failed to create store set")
}
Expand Down Expand Up @@ -499,13 +499,14 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logg
attemptedBlocks = map[ulid.ULID][]string{}
touchedStores = map[string]struct{}{}

resQueriedBlocks = []ulid.ULID(nil)
resQueriedBlocks = []ulid.ULID(nil)
attemptedBlocksZones = make(map[ulid.ULID]map[string]int, len(remainingBlocks))
)

for attempt := 1; attempt <= maxFetchSeriesAttempts; attempt++ {
// Find the set of store-gateway instances having the blocks. The exclude parameter is the
// map of blocks queried so far, with the list of store-gateway addresses for each block.
clients, err := q.stores.GetClientsFor(q.userID, remainingBlocks, attemptedBlocks)
clients, err := q.stores.GetClientsFor(q.userID, remainingBlocks, attemptedBlocks, attemptedBlocksZones)
if err != nil {
// If it's a retry and we get an error, it means there are no more store-gateways left
// from which running another attempt, so we're just stopping retrying.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,7 @@ type blocksStoreSetMock struct {
nextResult int
}

func (m *blocksStoreSetMock) GetClientsFor(_ string, _ []ulid.ULID, _ map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) {
func (m *blocksStoreSetMock) GetClientsFor(_ string, _ []ulid.ULID, _ map[ulid.ULID][]string, _ map[ulid.ULID]map[string]int) (map[BlocksStoreClient][]ulid.ULID, error) {
if m.nextResult >= len(m.mockedResponses) {
panic("not enough mocked results")
}
Expand Down
57 changes: 44 additions & 13 deletions pkg/querier/blocks_store_replicated_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"context"
"fmt"
"math"
"math/rand"

"github.com/go-kit/log"
Expand Down Expand Up @@ -36,6 +37,8 @@ type blocksStoreReplicationSet struct {
balancingStrategy loadBalancingStrategy
limits BlocksStoreLimits

zoneAwarenessEnabled bool

// Subservices manager.
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
Expand All @@ -49,13 +52,15 @@ func newBlocksStoreReplicationSet(
clientConfig ClientConfig,
logger log.Logger,
reg prometheus.Registerer,
zoneAwarenessEnabled bool,
) (*blocksStoreReplicationSet, error) {
s := &blocksStoreReplicationSet{
storesRing: storesRing,
clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg),
shardingStrategy: shardingStrategy,
balancingStrategy: balancingStrategy,
limits: limits,
storesRing: storesRing,
clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg),
shardingStrategy: shardingStrategy,
balancingStrategy: balancingStrategy,
limits: limits,
zoneAwarenessEnabled: zoneAwarenessEnabled,
}

var err error
Expand Down Expand Up @@ -94,7 +99,7 @@ func (s *blocksStoreReplicationSet) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
}

func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) {
func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string, attemptedBlocksZones map[ulid.ULID]map[string]int) (map[BlocksStoreClient][]ulid.ULID, error) {
shards := map[string][]ulid.ULID{}

// If shuffle sharding is enabled, we should build a subring for the user,
Expand All @@ -118,12 +123,19 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid
}

// Pick a non excluded store-gateway instance.
addr := getNonExcludedInstanceAddr(set, exclude[blockID], s.balancingStrategy)
if addr == "" {
instance := getNonExcludedInstance(set, exclude[blockID], s.balancingStrategy, s.zoneAwarenessEnabled, attemptedBlocksZones[blockID])
// A valid instance should have a non-empty address.
if instance.Addr == "" {
return nil, fmt.Errorf("no store-gateway instance left after checking exclude for block %s", blockID.String())
}

shards[addr] = append(shards[addr], blockID)
shards[instance.Addr] = append(shards[instance.Addr], blockID)
if s.zoneAwarenessEnabled {
if _, ok := attemptedBlocksZones[blockID]; !ok {
attemptedBlocksZones[blockID] = make(map[string]int, 0)
}
attemptedBlocksZones[blockID][instance.Zone]++
}
}

clients := map[BlocksStoreClient][]ulid.ULID{}
Expand All @@ -141,19 +153,38 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid
return clients, nil
}

func getNonExcludedInstanceAddr(set ring.ReplicationSet, exclude []string, balancingStrategy loadBalancingStrategy) string {
func getNonExcludedInstance(set ring.ReplicationSet, exclude []string, balancingStrategy loadBalancingStrategy, zoneAwarenessEnabled bool, attemptedZones map[string]int) ring.InstanceDesc {
if balancingStrategy == randomLoadBalancing {
// Randomize the list of instances to not always query the same one.
rand.Shuffle(len(set.Instances), func(i, j int) {
set.Instances[i], set.Instances[j] = set.Instances[j], set.Instances[i]
})
}

minAttempt := math.MaxInt
numOfZone := set.GetNumOfZones()
// There are still unattempted zones so we know min is 0.
if len(attemptedZones) < numOfZone {
minAttempt = 0
} else {
// Iterate over attempted zones and find the min attempts.
for _, c := range attemptedZones {
if c < minAttempt {
minAttempt = c
}
}
}
for _, instance := range set.Instances {
if !util.StringsContain(exclude, instance.Addr) {
return instance.Addr
if util.StringsContain(exclude, instance.Addr) {
continue
}
// If zone awareness is not enabled, pick first non-excluded instance.
// Otherwise, keep iterating until we find an instance in a zone where
// we have the least retries.
if !zoneAwarenessEnabled || attemptedZones[instance.Zone] == minAttempt {
return instance
}
}

return ""
return ring.InstanceDesc{}
}
Loading