Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"regexp"
"strings"

"github.com/prometheus/client_golang/prometheus"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove new line please, and merge these two import groups.

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/route"
Expand Down Expand Up @@ -170,7 +172,7 @@ func (a *API) RegisterIngester(i *ingester.Ingester, pushConfig distributor.Conf
// match the Prometheus API but mirror it closely enough to justify their routing under the Prometheus
// component/
func (a *API) RegisterPurger(store *purger.DeleteStore) {
deleteRequestHandler := purger.NewDeleteRequestHandler(store)
deleteRequestHandler := purger.NewDeleteRequestHandler(store, prometheus.DefaultRegisterer)

a.registerRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/admin/tsdb/delete_series", http.HandlerFunc(deleteRequestHandler.AddDeleteRequestHandler), true, "PUT", "POST")
a.registerRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/admin/tsdb/delete_series", http.HandlerFunc(deleteRequestHandler.GetAllDeleteRequestsHandler), true, "GET")
Expand Down
73 changes: 67 additions & 6 deletions pkg/chunk/purger/purger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/user"
Expand All @@ -24,6 +25,42 @@ import (

const millisecondPerDay = int64(24 * time.Hour / time.Millisecond)

type purgerMetrics struct {
deleteRequestsProcessedTotal *prometheus.CounterVec
deleteRequestsChunksSelectedTotal *prometheus.CounterVec
deleteRequestsProcessingFailures *prometheus.GaugeVec
}

func newPurgerMetrics(r prometheus.Registerer) *purgerMetrics {
m := purgerMetrics{}

m.deleteRequestsProcessedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of calling r.MustRegister() below, could you create the metrics with promauto.With(r).NewCounterVec() (and same for NewGaugeVec())? It's the pattern we're heading to.

Namespace: "cortex",
Name: "purger_delete_requests_processed_total",
Help: "Number of delete requests processed per user",
}, []string{"user"})
m.deleteRequestsChunksSelectedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "purger_delete_requests_chunks_selected_total",
Help: "Number of chunks selected while building delete plans per user",
}, []string{"user"})
m.deleteRequestsProcessingFailures = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, this should be a counter.

Namespace: "cortex",
Name: "purger_delete_requests_processing_failure",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think to add an s to failure?

Suggested change
Name: "purger_delete_requests_processing_failure",
Name: "purger_delete_requests_processing_failures",

Help: "Delete requests processing failure for each user",
}, []string{"user"})

if r != nil {
r.MustRegister(
m.deleteRequestsProcessedTotal,
m.deleteRequestsChunksSelectedTotal,
m.deleteRequestsProcessingFailures,
)
}

return &m
}

type deleteRequestWithLogger struct {
DeleteRequest
logger log.Logger // logger is initialized with userID and requestID to add context to every log generated using this
Expand Down Expand Up @@ -58,6 +95,7 @@ type DataPurger struct {
deleteStore *DeleteStore
chunkStore chunk.Store
objectClient chunk.ObjectClient
metrics *purgerMetrics

executePlansChan chan deleteRequestWithLogger
workerJobChan chan workerJob
Expand All @@ -74,14 +112,15 @@ type DataPurger struct {
}

// NewDataPurger creates a new DataPurger
func NewDataPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, storageClient chunk.ObjectClient) (*DataPurger, error) {
func NewDataPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, storageClient chunk.ObjectClient, registerer prometheus.Registerer) (*DataPurger, error) {
util.WarnExperimentalUse("Delete series API")

dataPurger := DataPurger{
cfg: cfg,
deleteStore: deleteStore,
chunkStore: chunkStore,
objectClient: storageClient,
metrics: newPurgerMetrics(registerer),
executePlansChan: make(chan deleteRequestWithLogger, 50),
workerJobChan: make(chan workerJob, 50),
inProcessRequestIDs: map[string]string{},
Expand Down Expand Up @@ -140,12 +179,15 @@ func (dp *DataPurger) workerJobCleanup(job workerJob) {
level.Error(job.logger).Log("msg", "error updating delete request status to process", "err", err)
}

dp.metrics.deleteRequestsProcessedTotal.WithLabelValues(job.userID).Inc()
delete(dp.pendingPlansCount, job.deleteRequestID)
dp.pendingPlansCountMtx.Unlock()

dp.inProcessRequestIDsMtx.Lock()
delete(dp.inProcessRequestIDs, job.userID)
dp.inProcessRequestIDsMtx.Unlock()

dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(job.userID).Set(0)
} else {
dp.pendingPlansCountMtx.Unlock()
}
Expand Down Expand Up @@ -182,6 +224,7 @@ func (dp *DataPurger) worker() {
for job := range dp.workerJobChan {
err := dp.executePlan(job.userID, job.deleteRequestID, job.planNo, job.logger)
if err != nil {
dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(job.userID).Inc()
level.Error(job.logger).Log("msg", "error executing delete plan",
"plan_no", job.planNo, "err", err)
continue
Expand Down Expand Up @@ -267,7 +310,9 @@ func (dp *DataPurger) loadInprocessDeleteRequests() error {
dp.inProcessRequestIDs[deleteRequest.UserID] = deleteRequest.RequestID
err := dp.buildDeletePlan(req)
if err != nil {
dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(deleteRequest.UserID).Inc()
level.Error(req.logger).Log("msg", "error building delete plan", "err", err)
continue
}

level.Info(req.logger).Log("msg", "sending delete request for execution")
Expand Down Expand Up @@ -329,6 +374,8 @@ func (dp *DataPurger) pullDeleteRequestsToPlanDeletes() error {

err := dp.buildDeletePlan(req)
if err != nil {
dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(deleteRequest.UserID).Inc()

// We do not want to remove this delete request from inProcessRequestIDs to make sure
// we do not move multiple deleting requests in deletion process.
// None of the other delete requests from the user would be considered for processing until then.
Expand All @@ -355,6 +402,8 @@ func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error {
level.Info(req.logger).Log("msg", "building delete plan", "num_plans", len(perDayTimeRange))

plans := make([][]byte, len(perDayTimeRange))
includedChunkIDs := map[string]struct{}{}

for i, planRange := range perDayTimeRange {
chunksGroups := []ChunksGroup{}

Expand All @@ -364,13 +413,17 @@ func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error {
return err
}

// ToDo: remove duplicate chunks
chunks, err := dp.chunkStore.Get(ctx, req.UserID, planRange.Start, planRange.End, matchers...)
if err != nil {
return err
}

chunksGroups = append(chunksGroups, groupChunks(chunks, req.StartTime, req.EndTime)...)
var cg []ChunksGroup
cg, includedChunkIDs = groupChunks(chunks, req.StartTime, req.EndTime, includedChunkIDs)

if len(cg) != 0 {
chunksGroups = append(chunksGroups, cg...)
}
}

plan := DeletePlan{
Expand Down Expand Up @@ -399,6 +452,8 @@ func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error {
return err
}

dp.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(req.UserID).Add(float64(len(includedChunkIDs)))

level.Info(req.logger).Log("msg", "built delete plans", "num_plans", len(perDayTimeRange))

return nil
Expand Down Expand Up @@ -482,10 +537,15 @@ func numPlans(start, end model.Time) int {

// groups chunks together by unique label sets i.e all the chunks with same labels would be stored in a group
// chunk details are stored in groups for each unique label set to avoid storing them repetitively for each chunk
func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []ChunksGroup {
func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time, includedChunkIDs map[string]struct{}) ([]ChunksGroup, map[string]struct{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more like processedChunkIDs?

Also there is no need to return it back from the function – modifications are already visible to the caller. Function comment should however mention this fact (that passed map is modified).

metricToChunks := make(map[string]ChunksGroup)

for _, chk := range chunks {
chunkID := chk.ExternalKey()

if _, ok := includedChunkIDs[chunkID]; ok {
continue
}
// chunk.Metric are assumed to be sorted which should give same value from String() for same series.
// If they stop being sorted then in the worst case we would lose the benefit of grouping chunks to avoid storing labels repetitively.
metricString := chk.Metric.String()
Expand All @@ -494,7 +554,7 @@ func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []C
group = ChunksGroup{Labels: client.FromLabelsToLabelAdapters(chk.Metric)}
}

chunkDetails := ChunkDetails{ID: chk.ExternalKey()}
chunkDetails := ChunkDetails{ID: chunkID}

if deleteFrom > chk.From || deleteThrough < chk.Through {
partiallyDeletedInterval := Interval{StartTimestampMs: int64(chk.From), EndTimestampMs: int64(chk.Through)}
Expand All @@ -510,6 +570,7 @@ func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []C
}

group.Chunks = append(group.Chunks, chunkDetails)
includedChunkIDs[chunkID] = struct{}{}
metricToChunks[metricString] = group
}

Expand All @@ -519,7 +580,7 @@ func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []C
chunksGroups = append(chunksGroups, group)
}

return chunksGroups
return chunksGroups, includedChunkIDs
}

func isMissingChunkErr(err error) bool {
Expand Down
13 changes: 11 additions & 2 deletions pkg/chunk/purger/purger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func setupStoresAndPurger(t *testing.T) (*DeleteStore, chunk.Store, chunk.Object
var cfg Config
flagext.DefaultValues(&cfg)

dataPurger, err := NewDataPurger(cfg, deleteStore, chunkStore, storageClient)
dataPurger, err := NewDataPurger(cfg, deleteStore, chunkStore, storageClient, nil)
require.NoError(t, err)

return deleteStore, chunkStore, storageClient, dataPurger
Expand Down Expand Up @@ -149,6 +149,15 @@ var purgePlanTestCases = []struct {
firstChunkPartialDeletionInterval: &Interval{StartTimestampMs: int64(modelTimeDay.Add(-30 * time.Minute)),
EndTimestampMs: int64(modelTimeDay.Add(-15 * time.Minute))},
},
{
name: "building multi-day chunk and deleting part of it for each day",
chunkStoreDataInterval: model.Interval{Start: modelTimeDay.Add(-30 * time.Minute), End: modelTimeDay.Add(30 * time.Minute)},
deleteRequestInterval: model.Interval{Start: modelTimeDay.Add(-15 * time.Minute), End: modelTimeDay.Add(15 * time.Minute)},
expectedNumberOfPlans: 2,
numChunksToDelete: 1,
firstChunkPartialDeletionInterval: &Interval{StartTimestampMs: int64(modelTimeDay.Add(-15 * time.Minute)),
EndTimestampMs: int64(modelTimeDay.Add(15 * time.Minute))},
},
}

func TestDataPurger_BuildPlan(t *testing.T) {
Expand Down Expand Up @@ -327,7 +336,7 @@ func TestDataPurger_Restarts(t *testing.T) {
// create a new purger to check whether it picks up in process delete requests
var cfg Config
flagext.DefaultValues(&cfg)
newPurger, err := NewDataPurger(cfg, deleteStore, chunkStore, storageClient)
newPurger, err := NewDataPurger(cfg, deleteStore, chunkStore, storageClient, nil)
require.NoError(t, err)

// load in process delete requests by calling Run
Expand Down
30 changes: 29 additions & 1 deletion pkg/chunk/purger/request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,48 @@ import (
"fmt"
"net/http"

"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/util"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/user"
)

type deleteRequestHandlerMetrics struct {
deleteRequestsReceivedTotal *prometheus.CounterVec
}

func newDeleteRequestHandlerMetrics(r prometheus.Registerer) *deleteRequestHandlerMetrics {
m := deleteRequestHandlerMetrics{}

m.deleteRequestsReceivedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

promauto.With(r) here as well, please.

Namespace: "cortex",
Name: "purger_delete_requests_received_total",
Help: "Number of delete requests received per user",
}, []string{"user"})

if r != nil {
r.MustRegister(
m.deleteRequestsReceivedTotal,
)
}

return &m
}

// DeleteRequestHandler provides handlers for delete requests
type DeleteRequestHandler struct {
deleteStore *DeleteStore
metrics *deleteRequestHandlerMetrics
}

// NewDeleteRequestHandler creates a DeleteRequestHandler
func NewDeleteRequestHandler(deleteStore *DeleteStore) *DeleteRequestHandler {
func NewDeleteRequestHandler(deleteStore *DeleteStore, registerer prometheus.Registerer) *DeleteRequestHandler {
deleteMgr := DeleteRequestHandler{
deleteStore: deleteStore,
metrics: newDeleteRequestHandlerMetrics(registerer),
}

return &deleteMgr
Expand Down Expand Up @@ -84,6 +110,8 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r
if err := dm.deleteStore.AddDeleteRequest(ctx, userID, model.Time(startTime), model.Time(endTime), match); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}

dm.metrics.deleteRequestsReceivedTotal.WithLabelValues(userID).Inc()
}

// GetAllDeleteRequestsHandler handles get all delete requests
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (t *Cortex) initDataPurger(cfg *Config) (services.Service, error) {
return nil, err
}

t.dataPurger, err = purger.NewDataPurger(cfg.DataPurgerConfig, t.deletesStore, t.store, storageClient)
t.dataPurger, err = purger.NewDataPurger(cfg.DataPurgerConfig, t.deletesStore, t.store, storageClient, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down