Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_balanced_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *blocksStoreBalancedSet) resolve(ctx context.Context) error {
return nil
}

func (s *blocksStoreBalancedSet) GetClientsFor(_ string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) {
func (s *blocksStoreBalancedSet) GetClientsFor(_ string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string, _ map[ulid.ULID]map[string]int) (map[BlocksStoreClient][]ulid.ULID, error) {
addresses := s.dnsProvider.Addresses()
if len(addresses) == 0 {
return nil, fmt.Errorf("no address resolved for the store-gateway service addresses %s", strings.Join(s.serviceAddresses, ","))
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/blocks_store_balanced_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestBlocksStoreBalancedSet_GetClientsFor(t *testing.T) {
clientsCount := map[string]int{}

for i := 0; i < numGets; i++ {
clients, err := s.GetClientsFor("", []ulid.ULID{block1}, map[ulid.ULID][]string{})
clients, err := s.GetClientsFor("", []ulid.ULID{block1}, map[ulid.ULID][]string{}, nil)
require.NoError(t, err)
require.Len(t, clients, 1)

Expand Down Expand Up @@ -141,7 +141,7 @@ func TestBlocksStoreBalancedSet_GetClientsFor_Exclude(t *testing.T) {
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck

clients, err := s.GetClientsFor("", testData.queryBlocks, testData.exclude)
clients, err := s.GetClientsFor("", testData.queryBlocks, testData.exclude, nil)
assert.Equal(t, testData.expectedErr, err)

if testData.expectedErr == nil {
Expand Down
7 changes: 4 additions & 3 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type BlocksStoreSet interface {
// GetClientsFor returns the store gateway clients that should be used to
// query the set of blocks in input. The exclude parameter is the map of
// blocks -> store-gateway addresses that should be excluded.
GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error)
GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string, attemptedBlocksZones map[ulid.ULID]map[string]int) (map[BlocksStoreClient][]ulid.ULID, error)
}

// BlocksFinder is the interface used to find blocks for a given user and time range.
Expand Down Expand Up @@ -228,7 +228,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
return nil, errors.Wrap(err, "failed to create store-gateway ring client")
}

stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, randomLoadBalancing, limits, querierCfg.StoreGatewayClient, logger, reg)
stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, randomLoadBalancing, limits, querierCfg.StoreGatewayClient, logger, reg, storesRingCfg.ZoneAwarenessEnabled)
if err != nil {
return nil, errors.Wrap(err, "failed to create store set")
}
Expand Down Expand Up @@ -502,10 +502,11 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logg
resQueriedBlocks = []ulid.ULID(nil)
)

attemptedBlocksZones := make(map[ulid.ULID]map[string]int, len(remainingBlocks))
for attempt := 1; attempt <= maxFetchSeriesAttempts; attempt++ {
// Find the set of store-gateway instances having the blocks. The exclude parameter is the
// map of blocks queried so far, with the list of store-gateway addresses for each block.
clients, err := q.stores.GetClientsFor(q.userID, remainingBlocks, attemptedBlocks)
clients, err := q.stores.GetClientsFor(q.userID, remainingBlocks, attemptedBlocks, attemptedBlocksZones)
if err != nil {
// If it's a retry and we get an error, it means there are no more store-gateways left
// from which running another attempt, so we're just stopping retrying.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,7 @@ type blocksStoreSetMock struct {
nextResult int
}

func (m *blocksStoreSetMock) GetClientsFor(_ string, _ []ulid.ULID, _ map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) {
func (m *blocksStoreSetMock) GetClientsFor(_ string, _ []ulid.ULID, _ map[ulid.ULID][]string, _ map[ulid.ULID]map[string]int) (map[BlocksStoreClient][]ulid.ULID, error) {
if m.nextResult >= len(m.mockedResponses) {
panic("not enough mocked results")
}
Expand Down
57 changes: 44 additions & 13 deletions pkg/querier/blocks_store_replicated_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"context"
"fmt"
"math"
"math/rand"

"github.com/go-kit/log"
Expand Down Expand Up @@ -36,6 +37,8 @@ type blocksStoreReplicationSet struct {
balancingStrategy loadBalancingStrategy
limits BlocksStoreLimits

zoneAwarenessEnabled bool

// Subservices manager.
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
Expand All @@ -49,13 +52,15 @@ func newBlocksStoreReplicationSet(
clientConfig ClientConfig,
logger log.Logger,
reg prometheus.Registerer,
zoneAwarenessEnabled bool,
) (*blocksStoreReplicationSet, error) {
s := &blocksStoreReplicationSet{
storesRing: storesRing,
clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg),
shardingStrategy: shardingStrategy,
balancingStrategy: balancingStrategy,
limits: limits,
storesRing: storesRing,
clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg),
shardingStrategy: shardingStrategy,
balancingStrategy: balancingStrategy,
limits: limits,
zoneAwarenessEnabled: zoneAwarenessEnabled,
}

var err error
Expand Down Expand Up @@ -94,7 +99,7 @@ func (s *blocksStoreReplicationSet) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
}

func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) {
func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string, attemptedBlocksZones map[ulid.ULID]map[string]int) (map[BlocksStoreClient][]ulid.ULID, error) {
shards := map[string][]ulid.ULID{}

// If shuffle sharding is enabled, we should build a subring for the user,
Expand All @@ -118,12 +123,19 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid
}

// Pick a non excluded store-gateway instance.
addr := getNonExcludedInstanceAddr(set, exclude[blockID], s.balancingStrategy)
if addr == "" {
instance := getNonExcludedInstance(set, exclude[blockID], s.balancingStrategy, s.zoneAwarenessEnabled, attemptedBlocksZones[blockID])
// A valid instance should have a non-empty address.
if instance.Addr == "" {
return nil, fmt.Errorf("no store-gateway instance left after checking exclude for block %s", blockID.String())
}

shards[addr] = append(shards[addr], blockID)
shards[instance.Addr] = append(shards[instance.Addr], blockID)
if s.zoneAwarenessEnabled {
if _, ok := attemptedBlocksZones[blockID]; !ok {
attemptedBlocksZones[blockID] = make(map[string]int, 0)
}
attemptedBlocksZones[blockID][instance.Zone]++
}
}

clients := map[BlocksStoreClient][]ulid.ULID{}
Expand All @@ -141,19 +153,38 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid
return clients, nil
}

func getNonExcludedInstanceAddr(set ring.ReplicationSet, exclude []string, balancingStrategy loadBalancingStrategy) string {
func getNonExcludedInstance(set ring.ReplicationSet, exclude []string, balancingStrategy loadBalancingStrategy, zoneAwarenessEnabled bool, attemptedZones map[string]int) ring.InstanceDesc {
if balancingStrategy == randomLoadBalancing {
// Randomize the list of instances to not always query the same one.
rand.Shuffle(len(set.Instances), func(i, j int) {
set.Instances[i], set.Instances[j] = set.Instances[j], set.Instances[i]
})
}

minAttempt := math.MaxInt
numOfZone := set.GetNumOfZones()
// There are still unattempted zones so we know min is 0.
if len(attemptedZones) < numOfZone {
minAttempt = 0
} else {
// Iterate over attempted zones and find the min attempts.
for _, c := range attemptedZones {
if c < minAttempt {
minAttempt = c
}
}
}
for _, instance := range set.Instances {
if !util.StringsContain(exclude, instance.Addr) {
return instance.Addr
if util.StringsContain(exclude, instance.Addr) {
continue
}
// If zone awareness is not enabled, pick first non-excluded instance.
// Otherwise, keep iterating until we find an instance in a zone where
// we have the least retries.
if !zoneAwarenessEnabled || attemptedZones[instance.Zone] == minAttempt {
return instance
}
}

return ""
return ring.InstanceDesc{}
}
Loading