Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
* [ENHANCEMENT] Store Gateway/Querier/Compactor: Handling CMK Access Denied errors. #5420 #5442 #5446
* [ENHANCEMENT] Store Gateway: Implementing multi level index cache. #5451
* [ENHANCEMENT] Alertmanager: Add the alert name in error log when it get throttled. #5456
* [ENHANCEMENT] DDBKV: Change metric name from dynamodb_kv_read_capacity_total to dynamodb_kv_consumed_capacity_total and include Delete, Put, Batch dimension. #5481
* [ENHANCEMENT] Querier: Retry store gateway on different zones when zone awareness is enabled. #5476
* [ENHANCEMENT] DDBKV: Change metric name from dynamodb_kv_read_capacity_total to dynamodb_kv_consumed_capacity_total and include Delete, Put, Batch dimension. #5481
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
4 changes: 2 additions & 2 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,11 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
}
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache

if strings.Contains(testCfg.indexCacheBackend, tsdb.IndexCacheBackendInMemory) {
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items")) // as before
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items_added_total")) // as before
}
if strings.Contains(testCfg.indexCacheBackend, tsdb.IndexCacheBackendMemcached) {
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(23-l0CacheHits), "thanos_memcached_operations_total")) // as before + 2 gets - cache hits
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,10 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) {
// to optimize Prometheus query requests.
func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) {
queryAnalyzer := querysharding.NewQueryAnalyzer()
defaultSubQueryInterval := t.Cfg.Querier.DefaultEvaluationInterval
// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
prometheusCodec := queryrange.NewPrometheusCodec(false, defaultSubQueryInterval)
prometheusCodec := queryrange.NewPrometheusCodec(false)
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, defaultSubQueryInterval)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true)

queryRangeMiddlewares, cache, err := queryrange.Middlewares(
t.Cfg.QueryRange,
Expand Down Expand Up @@ -486,7 +485,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
instantquery.InstantQueryCodec,
t.Overrides,
queryAnalyzer,
defaultSubQueryInterval,
t.Cfg.Querier.DefaultEvaluationInterval,
)

return services.NewIdleService(nil, func(_ error) error {
Expand Down
7 changes: 1 addition & 6 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ func (r *PrometheusRequest) WithStats(stats string) tripperware.Request {

type instantQueryCodec struct {
tripperware.Codec
now func() time.Time
noStepSubQueryInterval time.Duration
now func() time.Time
}

func newInstantQueryCodec() instantQueryCodec {
Expand Down Expand Up @@ -139,10 +138,6 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for
}

result.Query = r.FormValue("query")
if err := tripperware.SubQueryStepSizeCheck(result.Query, c.noStepSubQueryInterval, tripperware.MaxStep); err != nil {
return nil, err
}

result.Stats = r.FormValue("stats")
result.Path = r.URL.Path

Expand Down
3 changes: 1 addition & 2 deletions pkg/querier/tripperware/instantquery/shard_by_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package instantquery

import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
)

func Test_shardQuery(t *testing.T) {
t.Parallel()
tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true, time.Minute))
tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true))
}
13 changes: 2 additions & 11 deletions pkg/querier/tripperware/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,10 @@ var (

type prometheusCodec struct {
sharded bool

noStepSubQueryInterval time.Duration
}

func NewPrometheusCodec(sharded bool, noStepSubQueryInterval time.Duration) *prometheusCodec { //nolint:revive
return &prometheusCodec{
sharded: sharded,
noStepSubQueryInterval: noStepSubQueryInterval,
}
func NewPrometheusCodec(sharded bool) *prometheusCodec { //nolint:revive
return &prometheusCodec{sharded: sharded}
}

// WithStartEnd clones the current `PrometheusRequest` with a new `start` and `end` timestamp.
Expand Down Expand Up @@ -203,10 +198,6 @@ func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwa
}

result.Query = r.FormValue("query")
if err := tripperware.SubQueryStepSizeCheck(result.Query, c.noStepSubQueryInterval, tripperware.MaxStep); err != nil {
return nil, err
}

result.Stats = r.FormValue("stats")
result.Path = r.URL.Path

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
)

var (
PrometheusCodec = NewPrometheusCodec(false, time.Minute)
ShardedPrometheusCodec = NewPrometheusCodec(false, time.Minute)
PrometheusCodec = NewPrometheusCodec(false)
ShardedPrometheusCodec = NewPrometheusCodec(false)
)

func TestRoundTrip(t *testing.T) {
Expand Down
4 changes: 0 additions & 4 deletions pkg/querier/tripperware/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ func TestRequest(t *testing.T) {
url: "api/v1/query_range?start=0&end=11001&step=1",
expectedErr: errStepTooSmall,
},
{
url: "/api/v1/query?query=up%5B30d%3A%5D&start=123&end=456&step=10",
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tripperware.ErrSubQueryStepTooSmall, 11000),
},
} {
tc := tc
t.Run(tc.url, func(t *testing.T) {
Expand Down
12 changes: 8 additions & 4 deletions pkg/querier/tripperware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,19 @@ func NewQueryTripperware(
activeUsers.UpdateUserTimestamp(userStr, time.Now())
queriesPerTenant.WithLabelValues(op, userStr).Inc()

if isQueryRange {
return queryrange.RoundTrip(r)
} else if isQuery {
// If the given query is not shardable, use downstream roundtripper.
if isQuery || isQueryRange {
query := r.FormValue("query")
// Check subquery step size.
if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, MaxStep); err != nil {
return nil, err
}
}

if isQueryRange {
return queryrange.RoundTrip(r)
} else if isQuery {
// If the given query is not shardable, use downstream roundtripper.
query := r.FormValue("query")

// If vertical sharding is not enabled for the tenant, use downstream roundtripper.
numShards := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize)
Expand Down
34 changes: 23 additions & 11 deletions pkg/ring/kv/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,28 +157,37 @@ func (kv dynamodbKV) Query(ctx context.Context, key dynamodbKey, isPrefix bool)
return keys, totalCapacity, nil
}

func (kv dynamodbKV) Delete(ctx context.Context, key dynamodbKey) error {
func (kv dynamodbKV) Delete(ctx context.Context, key dynamodbKey) (float64, error) {
input := &dynamodb.DeleteItemInput{
TableName: kv.tableName,
Key: generateItemKey(key),
}
_, err := kv.ddbClient.DeleteItemWithContext(ctx, input)
return err
totalCapacity := float64(0)
output, err := kv.ddbClient.DeleteItemWithContext(ctx, input)
if err != nil {
totalCapacity = getCapacityUnits(output.ConsumedCapacity)
}
return totalCapacity, err
}

func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) error {
func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) (float64, error) {
input := &dynamodb.PutItemInput{
TableName: kv.tableName,
Item: kv.generatePutItemRequest(key, data),
}
_, err := kv.ddbClient.PutItemWithContext(ctx, input)
return err
totalCapacity := float64(0)
output, err := kv.ddbClient.PutItemWithContext(ctx, input)
if err != nil {
totalCapacity = getCapacityUnits(output.ConsumedCapacity)
}
return totalCapacity, err
}

func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) (float64, error) {
totalCapacity := float64(0)
writeRequestSize := len(put) + len(delete)
if writeRequestSize == 0 {
return nil
return totalCapacity, nil
}

writeRequestsSlices := make([][]*dynamodb.WriteRequest, int(math.Ceil(float64(writeRequestSize)/float64(DdbBatchSizeLimit))))
Expand Down Expand Up @@ -220,15 +229,18 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, dele

resp, err := kv.ddbClient.BatchWriteItemWithContext(ctx, input)
if err != nil {
return err
return totalCapacity, err
}
for _, consumedCapacity := range resp.ConsumedCapacity {
totalCapacity += getCapacityUnits(consumedCapacity)
}

if resp.UnprocessedItems != nil && len(resp.UnprocessedItems) > 0 {
return fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems)
return totalCapacity, fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems)
}
}

return nil
return totalCapacity, nil
}

func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[string]*dynamodb.AttributeValue {
Expand Down
14 changes: 7 additions & 7 deletions pkg/ring/kv/dynamodb/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func Test_TTLDisabled(t *testing.T) {
}

ddb := newDynamodbClientMock("TEST", ddbClientMock, 0)
err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST"))
_, err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST"))
require.NoError(t, err)

}
Expand All @@ -41,7 +41,7 @@ func Test_TTL(t *testing.T) {
}

ddb := newDynamodbClientMock("TEST", ddbClientMock, 5*time.Hour)
err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST"))
_, err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST"))
require.NoError(t, err)
}

Expand Down Expand Up @@ -72,7 +72,7 @@ func Test_Batch(t *testing.T) {
}

ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
err := ddb.Batch(context.TODO(), update, delete)
_, err := ddb.Batch(context.TODO(), update, delete)
require.NoError(t, err)
}

Expand Down Expand Up @@ -120,7 +120,7 @@ func Test_BatchSlices(t *testing.T) {
delete = append(delete, ddbKeyDelete)
}

err := ddb.Batch(context.TODO(), nil, delete)
_, err := ddb.Batch(context.TODO(), nil, delete)
require.NoError(t, err)
require.EqualValues(t, tc.expectedCalls, numOfCalls)

Expand All @@ -134,7 +134,7 @@ func Test_EmptyBatch(t *testing.T) {
ddbClientMock := &mockDynamodb{}

ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
err := ddb.Batch(context.TODO(), nil, nil)
_, err := ddb.Batch(context.TODO(), nil, nil)
require.NoError(t, err)
}

Expand All @@ -159,7 +159,7 @@ func Test_Batch_UnprocessedItems(t *testing.T) {
}

ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
err := ddb.Batch(context.TODO(), nil, delete)
_, err := ddb.Batch(context.TODO(), nil, delete)
require.Errorf(t, err, "error processing batch dynamodb")
}

Expand All @@ -178,7 +178,7 @@ func Test_Batch_Error(t *testing.T) {
}

ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
err := ddb.Batch(context.TODO(), nil, delete)
_, err := ddb.Batch(context.TODO(), nil, delete)
require.Errorf(t, err, "mocked error")
}

Expand Down
16 changes: 11 additions & 5 deletions pkg/ring/kv/dynamodb/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func newDynamoDbMetrics(registerer prometheus.Registerer) *dynamodbMetrics {
}, []string{"operation", "status_code"}))

dynamodbUsageMetrics := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "dynamodb_kv_read_capacity_total",
Help: "Total used read capacity on dynamodb",
Name: "dynamodb_kv_consumed_capacity_total",
Help: "Total consumed capacity on dynamodb",
}, []string{"operation"})

dynamodbMetrics := dynamodbMetrics{
Expand Down Expand Up @@ -66,19 +66,25 @@ func (d dynamodbInstrumentation) Query(ctx context.Context, key dynamodbKey, isP

func (d dynamodbInstrumentation) Delete(ctx context.Context, key dynamodbKey) error {
return instrument.CollectedRequest(ctx, "Delete", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error {
return d.kv.Delete(ctx, key)
totalCapacity, err := d.kv.Delete(ctx, key)
d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Delete").Add(totalCapacity)
return err
})
}

func (d dynamodbInstrumentation) Put(ctx context.Context, key dynamodbKey, data []byte) error {
return instrument.CollectedRequest(ctx, "Put", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error {
return d.kv.Put(ctx, key, data)
totalCapacity, err := d.kv.Put(ctx, key, data)
d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Put").Add(totalCapacity)
return err
})
}

func (d dynamodbInstrumentation) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
return instrument.CollectedRequest(ctx, "Batch", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error {
return d.kv.Batch(ctx, put, delete)
totalCapacity, err := d.kv.Batch(ctx, put, delete)
d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Batch").Add(totalCapacity)
return err
})
}

Expand Down
Loading