Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* `cortex_prometheus_notifications_queue_length`
* `cortex_prometheus_notifications_queue_capacity`
* `cortex_prometheus_notifications_alertmanagers_discovered`
* [ENHANCEMENT] Experimental Delete Series: Add support for deletion of chunks for remaining stores. #2801
* [BUGFIX] Fixed a bug in the index intersect code causing storage to return more chunks/series than required. #2796
* [BUGFIX] Fixed the number of reported keys in the background cache queue. #2764
* [BUGFIX] Fix race in processing of headers in sharded queries. #2762
Expand Down
21 changes: 16 additions & 5 deletions pkg/chunk/aws/dynamodb_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,11 +539,6 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c
return result, nil
}

func (a dynamoDBStorageClient) DeleteChunk(ctx context.Context, chunkID string) error {
// ToDo: implement this to support deleting chunks from DynamoDB
return chunk.ErrMethodNotImplemented
}

func processChunkResponse(response *dynamodb.BatchGetItemOutput, chunksByKey map[string]chunk.Chunk) ([]chunk.Chunk, error) {
result := []chunk.Chunk{}
decodeContext := chunk.NewDecodeContext()
Expand Down Expand Up @@ -594,6 +589,22 @@ func (a dynamoDBStorageClient) PutChunks(ctx context.Context, chunks []chunk.Chu
return a.BatchWrite(ctx, dynamoDBWrites)
}

func (a dynamoDBStorageClient) DeleteChunk(ctx context.Context, userID, chunkID string) error {
chunkRef, err := chunk.ParseExternalKey(userID, chunkID)
if err != nil {
return err
}

tableName, err := a.schemaCfg.ChunkTableFor(chunkRef.From)
if err != nil {
return err
}

dynamoDBWrites := dynamoDBWriteBatch{}
dynamoDBWrites.Delete(tableName, chunkID, placeholder)
return a.BatchWrite(ctx, dynamoDBWrites)
}

func (a dynamoDBStorageClient) writesForChunks(chunks []chunk.Chunk) (dynamoDBWriteBatch, error) {
var (
dynamoDBWrites = dynamoDBWriteBatch{}
Expand Down
9 changes: 7 additions & 2 deletions pkg/chunk/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ func (b *BlobStorage) List(ctx context.Context, prefix string) ([]chunk.StorageO
}

func (b *BlobStorage) DeleteObject(ctx context.Context, chunkID string) error {
// ToDo: implement this to support deleting chunks from Azure BlobStorage
return chunk.ErrMethodNotImplemented
blockBlobURL, err := b.getBlobURL(chunkID)
if err != nil {
return err
}

_, err = blockBlobURL.Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{})
return err
}
21 changes: 18 additions & 3 deletions pkg/chunk/cassandra/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,24 @@ func (s *ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.Decode
return input, err
}

func (s *ObjectClient) DeleteChunk(ctx context.Context, chunkID string) error {
// ToDo: implement this to support deleting chunks from Cassandra
return chunk.ErrMethodNotImplemented
func (s *ObjectClient) DeleteChunk(ctx context.Context, userID, chunkID string) error {
chunkRef, err := chunk.ParseExternalKey(userID, chunkID)
if err != nil {
return err
}

tableName, err := s.schemaCfg.ChunkTableFor(chunkRef.From)
if err != nil {
return err
}

q := s.writeSession.Query(fmt.Sprintf("DELETE FROM %s WHERE hash = ?",
tableName), chunkID)
if err := q.WithContext(ctx).Exec(); err != nil {
return errors.WithStack(err)
}

return nil
}

// Stop implement chunk.ObjectClient.
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func (c *baseStore) deleteChunk(ctx context.Context,
return errors.Wrapf(err, "when deleting index entries for chunkID=%s", chunkID)
}

err = c.chunks.DeleteChunk(ctx, chunkID)
err = c.chunks.DeleteChunk(ctx, userID, chunkID)
if err != nil {
if err == ErrStorageObjectNotFound {
return nil
Expand Down
18 changes: 15 additions & 3 deletions pkg/chunk/gcp/bigtable_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,19 @@ func (s *bigtableObjectClient) GetChunks(ctx context.Context, input []chunk.Chun
return output, nil
}

func (s *bigtableObjectClient) DeleteChunk(ctx context.Context, chunkID string) error {
// ToDo: implement this to support deleting chunks from Bigtable
return chunk.ErrMethodNotImplemented
func (s *bigtableObjectClient) DeleteChunk(ctx context.Context, userID, chunkID string) error {
chunkRef, err := chunk.ParseExternalKey(userID, chunkID)
if err != nil {
return err
}

tableName, err := s.schemaCfg.ChunkTableFor(chunkRef.From)
if err != nil {
return err
}

mut := bigtable.NewMutation()
mut.DeleteCellsInColumn(columnFamily, column)

return s.client.Open(tableName).Apply(ctx, chunkID, mut)
}
2 changes: 1 addition & 1 deletion pkg/chunk/grpc/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestGrpcStore(t *testing.T) {
_, err = storageClient.GetChunks(context.Background(), getChunksTestData)
require.NoError(t, err)

err = storageClient.DeleteChunk(context.Background(), "")
err = storageClient.DeleteChunk(context.Background(), "", "")
require.NoError(t, err)

//rpc calls specific to indexClient
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/grpc/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *StorageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err
return nil
}

func (s *StorageClient) DeleteChunk(ctx context.Context, chunkID string) error {
func (s *StorageClient) DeleteChunk(ctx context.Context, userID, chunkID string) error {
chunkInfo := &ChunkID{ChunkID: chunkID}
_, err := s.client.DeleteChunks(ctx, chunkInfo)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/inmemory_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (m *MockStorage) GetChunks(ctx context.Context, chunkSet []Chunk) ([]Chunk,
}

// DeleteChunk implements StorageClient.
func (m *MockStorage) DeleteChunk(ctx context.Context, chunkID string) error {
func (m *MockStorage) DeleteChunk(ctx context.Context, userID, chunkID string) error {
return m.DeleteObject(ctx, chunkID)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/objectclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,6 @@ func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContex
}

// GetChunks retrieves the specified chunks from the configured backend
func (o *Client) DeleteChunk(ctx context.Context, chunkID string) error {
func (o *Client) DeleteChunk(ctx context.Context, userID, chunkID string) error {
return o.store.DeleteObject(ctx, chunkID)
}
4 changes: 2 additions & 2 deletions pkg/chunk/storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,6 @@ func (c metricsChunkClient) GetChunks(ctx context.Context, chunks []chunk.Chunk)
return chks, nil
}

func (c metricsChunkClient) DeleteChunk(ctx context.Context, chunkID string) error {
return c.client.DeleteChunk(ctx, chunkID)
func (c metricsChunkClient) DeleteChunk(ctx context.Context, userID, chunkID string) error {
return c.client.DeleteChunk(ctx, userID, chunkID)
}
2 changes: 1 addition & 1 deletion pkg/chunk/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Client interface {

PutChunks(ctx context.Context, chunks []Chunk) error
GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error)
DeleteChunk(ctx context.Context, chunkID string) error
DeleteChunk(ctx context.Context, userID, chunkID string) error
}

// ObjectAndIndexClient allows optimisations where the same client handles both
Expand Down