Skip to content

Commit 18e6e4d

Browse files
authored
Add Label values Batch, Delete, Put for ddb metrics. (#5487)
* add write consumed capacity to ddb metrics Signed-off-by: Wen Xu <[email protected]> * add changelog and rename metric Signed-off-by: Wen Xu <[email protected]> --------- Signed-off-by: Wen Xu <[email protected]>
1 parent 3e43a4b commit 18e6e4d

File tree

4 files changed

+42
-23
lines changed

4 files changed

+42
-23
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
* [ENHANCEMENT] Store Gateway: Implementing multi level index cache. #5451
4444
* [ENHANCEMENT] Alertmanager: Add the alert name in error log when it get throttled. #5456
4545
* [ENHANCEMENT] Querier: Retry store gateway on different zones when zone awareness is enabled. #5476
46+
* [ENHANCEMENT] DDBKV: Change metric name from dynamodb_kv_read_capacity_total to dynamodb_kv_consumed_capacity_total and include Delete, Put, Batch dimension. #5481
4647
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
4748
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
4849
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293

pkg/ring/kv/dynamodb/dynamodb.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -157,28 +157,37 @@ func (kv dynamodbKV) Query(ctx context.Context, key dynamodbKey, isPrefix bool)
157157
return keys, totalCapacity, nil
158158
}
159159

160-
func (kv dynamodbKV) Delete(ctx context.Context, key dynamodbKey) error {
160+
func (kv dynamodbKV) Delete(ctx context.Context, key dynamodbKey) (float64, error) {
161161
input := &dynamodb.DeleteItemInput{
162162
TableName: kv.tableName,
163163
Key: generateItemKey(key),
164164
}
165-
_, err := kv.ddbClient.DeleteItemWithContext(ctx, input)
166-
return err
165+
totalCapacity := float64(0)
166+
output, err := kv.ddbClient.DeleteItemWithContext(ctx, input)
167+
if err != nil {
168+
totalCapacity = getCapacityUnits(output.ConsumedCapacity)
169+
}
170+
return totalCapacity, err
167171
}
168172

169-
func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) error {
173+
func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) (float64, error) {
170174
input := &dynamodb.PutItemInput{
171175
TableName: kv.tableName,
172176
Item: kv.generatePutItemRequest(key, data),
173177
}
174-
_, err := kv.ddbClient.PutItemWithContext(ctx, input)
175-
return err
178+
totalCapacity := float64(0)
179+
output, err := kv.ddbClient.PutItemWithContext(ctx, input)
180+
if err != nil {
181+
totalCapacity = getCapacityUnits(output.ConsumedCapacity)
182+
}
183+
return totalCapacity, err
176184
}
177185

178-
func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
186+
func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) (float64, error) {
187+
totalCapacity := float64(0)
179188
writeRequestSize := len(put) + len(delete)
180189
if writeRequestSize == 0 {
181-
return nil
190+
return totalCapacity, nil
182191
}
183192

184193
writeRequestsSlices := make([][]*dynamodb.WriteRequest, int(math.Ceil(float64(writeRequestSize)/float64(DdbBatchSizeLimit))))
@@ -220,15 +229,18 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, dele
220229

221230
resp, err := kv.ddbClient.BatchWriteItemWithContext(ctx, input)
222231
if err != nil {
223-
return err
232+
return totalCapacity, err
233+
}
234+
for _, consumedCapacity := range resp.ConsumedCapacity {
235+
totalCapacity += getCapacityUnits(consumedCapacity)
224236
}
225237

226238
if resp.UnprocessedItems != nil && len(resp.UnprocessedItems) > 0 {
227-
return fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems)
239+
return totalCapacity, fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems)
228240
}
229241
}
230242

231-
return nil
243+
return totalCapacity, nil
232244
}
233245

234246
func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[string]*dynamodb.AttributeValue {

pkg/ring/kv/dynamodb/dynamodb_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func Test_TTLDisabled(t *testing.T) {
2323
}
2424

2525
ddb := newDynamodbClientMock("TEST", ddbClientMock, 0)
26-
err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST"))
26+
_, err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST"))
2727
require.NoError(t, err)
2828

2929
}
@@ -41,7 +41,7 @@ func Test_TTL(t *testing.T) {
4141
}
4242

4343
ddb := newDynamodbClientMock("TEST", ddbClientMock, 5*time.Hour)
44-
err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST"))
44+
_, err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST"))
4545
require.NoError(t, err)
4646
}
4747

@@ -72,7 +72,7 @@ func Test_Batch(t *testing.T) {
7272
}
7373

7474
ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
75-
err := ddb.Batch(context.TODO(), update, delete)
75+
_, err := ddb.Batch(context.TODO(), update, delete)
7676
require.NoError(t, err)
7777
}
7878

@@ -120,7 +120,7 @@ func Test_BatchSlices(t *testing.T) {
120120
delete = append(delete, ddbKeyDelete)
121121
}
122122

123-
err := ddb.Batch(context.TODO(), nil, delete)
123+
_, err := ddb.Batch(context.TODO(), nil, delete)
124124
require.NoError(t, err)
125125
require.EqualValues(t, tc.expectedCalls, numOfCalls)
126126

@@ -134,7 +134,7 @@ func Test_EmptyBatch(t *testing.T) {
134134
ddbClientMock := &mockDynamodb{}
135135

136136
ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
137-
err := ddb.Batch(context.TODO(), nil, nil)
137+
_, err := ddb.Batch(context.TODO(), nil, nil)
138138
require.NoError(t, err)
139139
}
140140

@@ -159,7 +159,7 @@ func Test_Batch_UnprocessedItems(t *testing.T) {
159159
}
160160

161161
ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
162-
err := ddb.Batch(context.TODO(), nil, delete)
162+
_, err := ddb.Batch(context.TODO(), nil, delete)
163163
require.Errorf(t, err, "error processing batch dynamodb")
164164
}
165165

@@ -178,7 +178,7 @@ func Test_Batch_Error(t *testing.T) {
178178
}
179179

180180
ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
181-
err := ddb.Batch(context.TODO(), nil, delete)
181+
_, err := ddb.Batch(context.TODO(), nil, delete)
182182
require.Errorf(t, err, "mocked error")
183183
}
184184

pkg/ring/kv/dynamodb/metrics.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ func newDynamoDbMetrics(registerer prometheus.Registerer) *dynamodbMetrics {
2929
}, []string{"operation", "status_code"}))
3030

3131
dynamodbUsageMetrics := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
32-
Name: "dynamodb_kv_read_capacity_total",
33-
Help: "Total used read capacity on dynamodb",
32+
Name: "dynamodb_kv_consumed_capacity_total",
33+
Help: "Total consumed capacity on dynamodb",
3434
}, []string{"operation"})
3535

3636
dynamodbMetrics := dynamodbMetrics{
@@ -66,19 +66,25 @@ func (d dynamodbInstrumentation) Query(ctx context.Context, key dynamodbKey, isP
6666

6767
func (d dynamodbInstrumentation) Delete(ctx context.Context, key dynamodbKey) error {
6868
return instrument.CollectedRequest(ctx, "Delete", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error {
69-
return d.kv.Delete(ctx, key)
69+
totalCapacity, err := d.kv.Delete(ctx, key)
70+
d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Delete").Add(totalCapacity)
71+
return err
7072
})
7173
}
7274

7375
func (d dynamodbInstrumentation) Put(ctx context.Context, key dynamodbKey, data []byte) error {
7476
return instrument.CollectedRequest(ctx, "Put", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error {
75-
return d.kv.Put(ctx, key, data)
77+
totalCapacity, err := d.kv.Put(ctx, key, data)
78+
d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Put").Add(totalCapacity)
79+
return err
7680
})
7781
}
7882

7983
func (d dynamodbInstrumentation) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
8084
return instrument.CollectedRequest(ctx, "Batch", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error {
81-
return d.kv.Batch(ctx, put, delete)
85+
totalCapacity, err := d.kv.Batch(ctx, put, delete)
86+
d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Batch").Add(totalCapacity)
87+
return err
8288
})
8389
}
8490

0 commit comments

Comments
 (0)