diff --git a/CHANGELOG.md b/CHANGELOG.md index 80a5daa3b60..823e2d29635 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ * `cortex_prometheus_notifications_queue_length` * `cortex_prometheus_notifications_queue_capacity` * `cortex_prometheus_notifications_alertmanagers_discovered` +* [ENHANCEMENT] Experimental Delete Series: Add support for deletion of chunks for remaining stores. #2801 * [ENHANCEMENT] Add `-modules` command line flag to list possible values for `-target`. Also, log warning if given target is internal component. #2752 * [ENHANCEMENT] Added `-ingester.flush-on-shutdown-with-wal-enabled` option to enable chunks flushing even when WAL is enabled. #2780 * [ENHANCEMENT] Query-tee: Support for custom API prefix by using `-server.path-prefix` option. #2814 diff --git a/pkg/chunk/aws/dynamodb_storage_client.go b/pkg/chunk/aws/dynamodb_storage_client.go index 6d7ca71fb46..126de4e209d 100644 --- a/pkg/chunk/aws/dynamodb_storage_client.go +++ b/pkg/chunk/aws/dynamodb_storage_client.go @@ -539,11 +539,6 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c return result, nil } -func (a dynamoDBStorageClient) DeleteChunk(ctx context.Context, chunkID string) error { - // ToDo: implement this to support deleting chunks from DynamoDB - return chunk.ErrMethodNotImplemented -} - func processChunkResponse(response *dynamodb.BatchGetItemOutput, chunksByKey map[string]chunk.Chunk) ([]chunk.Chunk, error) { result := []chunk.Chunk{} decodeContext := chunk.NewDecodeContext() @@ -594,6 +589,22 @@ func (a dynamoDBStorageClient) PutChunks(ctx context.Context, chunks []chunk.Chu return a.BatchWrite(ctx, dynamoDBWrites) } +func (a dynamoDBStorageClient) DeleteChunk(ctx context.Context, userID, chunkID string) error { + chunkRef, err := chunk.ParseExternalKey(userID, chunkID) + if err != nil { + return err + } + + tableName, err := a.schemaCfg.ChunkTableFor(chunkRef.From) + if err != nil { + return err + } + + dynamoDBWrites := dynamoDBWriteBatch{} + dynamoDBWrites.Delete(tableName, chunkID, placeholder) + return a.BatchWrite(ctx, dynamoDBWrites) +} + func (a dynamoDBStorageClient) writesForChunks(chunks []chunk.Chunk) (dynamoDBWriteBatch, error) { var ( dynamoDBWrites = dynamoDBWriteBatch{} diff --git a/pkg/chunk/azure/blob_storage_client.go b/pkg/chunk/azure/blob_storage_client.go index fd856302ed4..2da1aa2683b 100644 --- a/pkg/chunk/azure/blob_storage_client.go +++ b/pkg/chunk/azure/blob_storage_client.go @@ -201,7 +201,12 @@ func (b *BlobStorage) List(ctx context.Context, prefix string) ([]chunk.StorageO return storageObjects, commonPrefixes, nil } -func (b *BlobStorage) DeleteObject(ctx context.Context, chunkID string) error { - // ToDo: implement this to support deleting chunks from Azure BlobStorage - return chunk.ErrMethodNotImplemented +func (b *BlobStorage) DeleteObject(ctx context.Context, blobID string) error { + blockBlobURL, err := b.getBlobURL(blobID) + if err != nil { + return err + } + + _, err = blockBlobURL.Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}) + return err } diff --git a/pkg/chunk/cassandra/storage_client.go b/pkg/chunk/cassandra/storage_client.go index 327938399ab..c4a0fa56f29 100644 --- a/pkg/chunk/cassandra/storage_client.go +++ b/pkg/chunk/cassandra/storage_client.go @@ -486,9 +486,24 @@ func (s *ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.Decode return input, err } -func (s *ObjectClient) DeleteChunk(ctx context.Context, chunkID string) error { - // ToDo: implement this to support deleting chunks from Cassandra - return chunk.ErrMethodNotImplemented +func (s *ObjectClient) DeleteChunk(ctx context.Context, userID, chunkID string) error { + chunkRef, err := chunk.ParseExternalKey(userID, chunkID) + if err != nil { + return err + } + + tableName, err := s.schemaCfg.ChunkTableFor(chunkRef.From) + if err != nil { + return err + } + + q := s.writeSession.Query(fmt.Sprintf("DELETE FROM %s WHERE hash = ?", + tableName), chunkID) + if err := q.WithContext(ctx).Exec(); err != nil { + return errors.WithStack(err) + } + + return nil } // Stop implement chunk.ObjectClient. diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 4363696cf0d..eadfa8121ee 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -627,7 +627,7 @@ func (c *baseStore) deleteChunk(ctx context.Context, return errors.Wrapf(err, "when deleting index entries for chunkID=%s", chunkID) } - err = c.chunks.DeleteChunk(ctx, chunkID) + err = c.chunks.DeleteChunk(ctx, userID, chunkID) if err != nil { if err == ErrStorageObjectNotFound { return nil diff --git a/pkg/chunk/gcp/bigtable_object_client.go b/pkg/chunk/gcp/bigtable_object_client.go index 46fbe2c2da9..a0cc62013b7 100644 --- a/pkg/chunk/gcp/bigtable_object_client.go +++ b/pkg/chunk/gcp/bigtable_object_client.go @@ -161,7 +161,19 @@ func (s *bigtableObjectClient) GetChunks(ctx context.Context, input []chunk.Chun return output, nil } -func (s *bigtableObjectClient) DeleteChunk(ctx context.Context, chunkID string) error { - // ToDo: implement this to support deleting chunks from Bigtable - return chunk.ErrMethodNotImplemented +func (s *bigtableObjectClient) DeleteChunk(ctx context.Context, userID, chunkID string) error { + chunkRef, err := chunk.ParseExternalKey(userID, chunkID) + if err != nil { + return err + } + + tableName, err := s.schemaCfg.ChunkTableFor(chunkRef.From) + if err != nil { + return err + } + + mut := bigtable.NewMutation() + mut.DeleteCellsInColumn(columnFamily, column) + + return s.client.Open(tableName).Apply(ctx, chunkID, mut) } diff --git a/pkg/chunk/grpc/grpc_client_test.go b/pkg/chunk/grpc/grpc_client_test.go index e97e34e69cb..2bd70d500df 100644 --- a/pkg/chunk/grpc/grpc_client_test.go +++ b/pkg/chunk/grpc/grpc_client_test.go @@ -134,7 +134,7 @@ func TestGrpcStore(t *testing.T) { _, err = storageClient.GetChunks(context.Background(), getChunksTestData) require.NoError(t, err) - err = storageClient.DeleteChunk(context.Background(), "") + err = storageClient.DeleteChunk(context.Background(), "", "") require.NoError(t, err) //rpc calls specific to indexClient diff --git a/pkg/chunk/grpc/storage_client.go b/pkg/chunk/grpc/storage_client.go index 1ece225cec0..99595f8c380 100644 --- a/pkg/chunk/grpc/storage_client.go +++ b/pkg/chunk/grpc/storage_client.go @@ -65,7 +65,7 @@ func (s *StorageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err return nil } -func (s *StorageClient) DeleteChunk(ctx context.Context, chunkID string) error { +func (s *StorageClient) DeleteChunk(ctx context.Context, userID, chunkID string) error { chunkInfo := &ChunkID{ChunkID: chunkID} _, err := s.client.DeleteChunks(ctx, chunkInfo) if err != nil { diff --git a/pkg/chunk/inmemory_storage_client.go b/pkg/chunk/inmemory_storage_client.go index 4c3a5bd6a42..1a7ae5c2dce 100644 --- a/pkg/chunk/inmemory_storage_client.go +++ b/pkg/chunk/inmemory_storage_client.go @@ -336,7 +336,7 @@ func (m *MockStorage) GetChunks(ctx context.Context, chunkSet []Chunk) ([]Chunk, } // DeleteChunk implements StorageClient. -func (m *MockStorage) DeleteChunk(ctx context.Context, chunkID string) error { +func (m *MockStorage) DeleteChunk(ctx context.Context, userID, chunkID string) error { return m.DeleteObject(ctx, chunkID) } diff --git a/pkg/chunk/objectclient/client.go b/pkg/chunk/objectclient/client.go index 70b2ff1128b..a3d3e24d262 100644 --- a/pkg/chunk/objectclient/client.go +++ b/pkg/chunk/objectclient/client.go @@ -110,6 +110,6 @@ func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContex } // GetChunks retrieves the specified chunks from the configured backend -func (o *Client) DeleteChunk(ctx context.Context, chunkID string) error { +func (o *Client) DeleteChunk(ctx context.Context, userID, chunkID string) error { return o.store.DeleteObject(ctx, chunkID) } diff --git a/pkg/chunk/storage/metrics.go b/pkg/chunk/storage/metrics.go index 28feece364c..628c8924517 100644 --- a/pkg/chunk/storage/metrics.go +++ b/pkg/chunk/storage/metrics.go @@ -105,6 +105,6 @@ func (c metricsChunkClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) return chks, nil } -func (c metricsChunkClient) DeleteChunk(ctx context.Context, chunkID string) error { - return c.client.DeleteChunk(ctx, chunkID) +func (c metricsChunkClient) DeleteChunk(ctx context.Context, userID, chunkID string) error { + return c.client.DeleteChunk(ctx, userID, chunkID) } diff --git a/pkg/chunk/storage_client.go b/pkg/chunk/storage_client.go index 84fc12d8cfb..bb25d4737a3 100644 --- a/pkg/chunk/storage_client.go +++ b/pkg/chunk/storage_client.go @@ -35,7 +35,7 @@ type Client interface { PutChunks(ctx context.Context, chunks []Chunk) error GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error) - DeleteChunk(ctx context.Context, chunkID string) error + DeleteChunk(ctx context.Context, userID, chunkID string) error } // ObjectAndIndexClient allows optimisations where the same client handles both