-
Notifications
You must be signed in to change notification settings - Fork 838
metrics for monitoring delete requests #2445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
72511f9
b0cc286
871568c
f1a4dce
1141020
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,8 @@ import ( | |
| "github.com/go-kit/kit/log" | ||
| "github.com/go-kit/kit/log/level" | ||
| "github.com/gogo/protobuf/proto" | ||
| "github.com/prometheus/client_golang/prometheus" | ||
| "github.com/prometheus/client_golang/prometheus/promauto" | ||
| "github.com/prometheus/common/model" | ||
| "github.com/prometheus/prometheus/promql" | ||
| "github.com/weaveworks/common/user" | ||
|
|
@@ -24,6 +26,34 @@ import ( | |
|
|
||
| const millisecondPerDay = int64(24 * time.Hour / time.Millisecond) | ||
|
|
||
| type purgerMetrics struct { | ||
| deleteRequestsProcessedTotal *prometheus.CounterVec | ||
| deleteRequestsChunksSelectedTotal *prometheus.CounterVec | ||
| deleteRequestsProcessingFailures *prometheus.CounterVec | ||
| } | ||
|
|
||
| func newPurgerMetrics(r prometheus.Registerer) *purgerMetrics { | ||
| m := purgerMetrics{} | ||
|
|
||
| m.deleteRequestsProcessedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ | ||
| Namespace: "cortex", | ||
| Name: "purger_delete_requests_processed_total", | ||
| Help: "Number of delete requests processed per user", | ||
| }, []string{"user"}) | ||
| m.deleteRequestsChunksSelectedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ | ||
| Namespace: "cortex", | ||
| Name: "purger_delete_requests_chunks_selected_total", | ||
| Help: "Number of chunks selected while building delete plans per user", | ||
| }, []string{"user"}) | ||
| m.deleteRequestsProcessingFailures = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ | ||
| Namespace: "cortex", | ||
| Name: "purger_delete_requests_processing_failures", | ||
|
||
| Help: "Delete requests processing failure for each user", | ||
| }, []string{"user"}) | ||
|
|
||
| return &m | ||
| } | ||
|
|
||
| type deleteRequestWithLogger struct { | ||
| DeleteRequest | ||
| logger log.Logger // logger is initialized with userID and requestID to add context to every log generated using this | ||
|
|
@@ -58,6 +88,7 @@ type DataPurger struct { | |
| deleteStore *DeleteStore | ||
| chunkStore chunk.Store | ||
| objectClient chunk.ObjectClient | ||
| metrics *purgerMetrics | ||
|
|
||
| executePlansChan chan deleteRequestWithLogger | ||
| workerJobChan chan workerJob | ||
|
|
@@ -74,14 +105,15 @@ type DataPurger struct { | |
| } | ||
|
|
||
| // NewDataPurger creates a new DataPurger | ||
| func NewDataPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, storageClient chunk.ObjectClient) (*DataPurger, error) { | ||
| func NewDataPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, storageClient chunk.ObjectClient, registerer prometheus.Registerer) (*DataPurger, error) { | ||
| util.WarnExperimentalUse("Delete series API") | ||
|
|
||
| dataPurger := DataPurger{ | ||
| cfg: cfg, | ||
| deleteStore: deleteStore, | ||
| chunkStore: chunkStore, | ||
| objectClient: storageClient, | ||
| metrics: newPurgerMetrics(registerer), | ||
| executePlansChan: make(chan deleteRequestWithLogger, 50), | ||
| workerJobChan: make(chan workerJob, 50), | ||
| inProcessRequestIDs: map[string]string{}, | ||
|
|
@@ -140,6 +172,7 @@ func (dp *DataPurger) workerJobCleanup(job workerJob) { | |
| level.Error(job.logger).Log("msg", "error updating delete request status to process", "err", err) | ||
| } | ||
|
|
||
| dp.metrics.deleteRequestsProcessedTotal.WithLabelValues(job.userID).Inc() | ||
| delete(dp.pendingPlansCount, job.deleteRequestID) | ||
| dp.pendingPlansCountMtx.Unlock() | ||
|
|
||
|
|
@@ -182,6 +215,7 @@ func (dp *DataPurger) worker() { | |
| for job := range dp.workerJobChan { | ||
| err := dp.executePlan(job.userID, job.deleteRequestID, job.planNo, job.logger) | ||
| if err != nil { | ||
| dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(job.userID).Inc() | ||
| level.Error(job.logger).Log("msg", "error executing delete plan", | ||
| "plan_no", job.planNo, "err", err) | ||
| continue | ||
|
|
@@ -267,7 +301,9 @@ func (dp *DataPurger) loadInprocessDeleteRequests() error { | |
| dp.inProcessRequestIDs[deleteRequest.UserID] = deleteRequest.RequestID | ||
| err := dp.buildDeletePlan(req) | ||
| if err != nil { | ||
| dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(deleteRequest.UserID).Inc() | ||
| level.Error(req.logger).Log("msg", "error building delete plan", "err", err) | ||
| continue | ||
| } | ||
|
|
||
| level.Info(req.logger).Log("msg", "sending delete request for execution") | ||
|
|
@@ -329,6 +365,8 @@ func (dp *DataPurger) pullDeleteRequestsToPlanDeletes() error { | |
|
|
||
| err := dp.buildDeletePlan(req) | ||
| if err != nil { | ||
| dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(deleteRequest.UserID).Inc() | ||
|
|
||
| // We do not want to remove this delete request from inProcessRequestIDs to make sure | ||
| // we do not move multiple deleting requests in deletion process. | ||
| // None of the other delete requests from the user would be considered for processing until then. | ||
|
|
@@ -355,6 +393,8 @@ func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error { | |
| level.Info(req.logger).Log("msg", "building delete plan", "num_plans", len(perDayTimeRange)) | ||
|
|
||
| plans := make([][]byte, len(perDayTimeRange)) | ||
| includedChunkIDs := map[string]struct{}{} | ||
|
|
||
| for i, planRange := range perDayTimeRange { | ||
| chunksGroups := []ChunksGroup{} | ||
|
|
||
|
|
@@ -364,13 +404,17 @@ func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error { | |
| return err | ||
| } | ||
|
|
||
| // ToDo: remove duplicate chunks | ||
| chunks, err := dp.chunkStore.Get(ctx, req.UserID, planRange.Start, planRange.End, matchers...) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| chunksGroups = append(chunksGroups, groupChunks(chunks, req.StartTime, req.EndTime)...) | ||
| var cg []ChunksGroup | ||
| cg, includedChunkIDs = groupChunks(chunks, req.StartTime, req.EndTime, includedChunkIDs) | ||
|
|
||
| if len(cg) != 0 { | ||
| chunksGroups = append(chunksGroups, cg...) | ||
| } | ||
| } | ||
|
|
||
| plan := DeletePlan{ | ||
|
|
@@ -399,6 +443,8 @@ func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error { | |
| return err | ||
| } | ||
|
|
||
| dp.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(req.UserID).Add(float64(len(includedChunkIDs))) | ||
|
|
||
| level.Info(req.logger).Log("msg", "built delete plans", "num_plans", len(perDayTimeRange)) | ||
|
|
||
| return nil | ||
|
|
@@ -482,10 +528,15 @@ func numPlans(start, end model.Time) int { | |
|
|
||
| // groups chunks together by unique label sets i.e all the chunks with same labels would be stored in a group | ||
| // chunk details are stored in groups for each unique label set to avoid storing them repetitively for each chunk | ||
| func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []ChunksGroup { | ||
| func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time, includedChunkIDs map[string]struct{}) ([]ChunksGroup, map[string]struct{}) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it more like Also there is no need to return it back from the function – modifications are already visible to the caller. Function comment should however mention this fact (that passed map is modified). |
||
| metricToChunks := make(map[string]ChunksGroup) | ||
|
|
||
| for _, chk := range chunks { | ||
| chunkID := chk.ExternalKey() | ||
|
|
||
| if _, ok := includedChunkIDs[chunkID]; ok { | ||
| continue | ||
| } | ||
| // chunk.Metric are assumed to be sorted which should give same value from String() for same series. | ||
| // If they stop being sorted then in the worst case we would lose the benefit of grouping chunks to avoid storing labels repetitively. | ||
| metricString := chk.Metric.String() | ||
|
|
@@ -494,7 +545,7 @@ func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []C | |
| group = ChunksGroup{Labels: client.FromLabelsToLabelAdapters(chk.Metric)} | ||
| } | ||
|
|
||
| chunkDetails := ChunkDetails{ID: chk.ExternalKey()} | ||
| chunkDetails := ChunkDetails{ID: chunkID} | ||
|
|
||
| if deleteFrom > chk.From || deleteThrough < chk.Through { | ||
| partiallyDeletedInterval := Interval{StartTimestampMs: int64(chk.From), EndTimestampMs: int64(chk.Through)} | ||
|
|
@@ -510,6 +561,7 @@ func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []C | |
| } | ||
|
|
||
| group.Chunks = append(group.Chunks, chunkDetails) | ||
| includedChunkIDs[chunkID] = struct{}{} | ||
| metricToChunks[metricString] = group | ||
| } | ||
|
|
||
|
|
@@ -519,7 +571,7 @@ func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []C | |
| chunksGroups = append(chunksGroups, group) | ||
| } | ||
|
|
||
| return chunksGroups | ||
| return chunksGroups, includedChunkIDs | ||
| } | ||
|
|
||
| func isMissingChunkErr(err error) bool { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove new line please, and merge these two import groups.