Skip to content

Commit eca6922

Browse files
support for cancellation of delete requests (#2555)
* support for cancellation of delete requests Signed-off-by: Sandeep Sukhani <[email protected]> * having deadline to cancel delete request instead of synchronizing with mutex Signed-off-by: Sandeep Sukhani <[email protected]> * changes suggested from PR review Signed-off-by: Sandeep Sukhani <[email protected]>
1 parent ab2d2cd commit eca6922

File tree

4 files changed

+76
-3
lines changed

4 files changed

+76
-3
lines changed

pkg/api/api.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,18 +177,20 @@ func (a *API) RegisterIngester(i *ingester.Ingester, pushConfig distributor.Conf
177177
a.RegisterRoute("/push", push.Handler(pushConfig, i.Push), true) // For testing and debugging.
178178
}
179179

180-
// RegisterPurger registers the endpoints associated with the Purger/DeleteStore. They do not exacty
180+
// RegisterPurger registers the endpoints associated with the Purger/DeleteStore. They do not exactly
181181
// match the Prometheus API but mirror it closely enough to justify their routing under the Prometheus
182182
// component/
183183
func (a *API) RegisterPurger(store *purger.DeleteStore) {
184184
deleteRequestHandler := purger.NewDeleteRequestHandler(store, prometheus.DefaultRegisterer)
185185

186186
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/admin/tsdb/delete_series", http.HandlerFunc(deleteRequestHandler.AddDeleteRequestHandler), true, "PUT", "POST")
187187
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/admin/tsdb/delete_series", http.HandlerFunc(deleteRequestHandler.GetAllDeleteRequestsHandler), true, "GET")
188+
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/admin/tsdb/cancel_delete_request", http.HandlerFunc(deleteRequestHandler.CancelDeleteRequestHandler), true, "PUT", "POST")
188189

189190
// Legacy Routes
190191
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/admin/tsdb/delete_series", http.HandlerFunc(deleteRequestHandler.AddDeleteRequestHandler), true, "PUT", "POST")
191192
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/admin/tsdb/delete_series", http.HandlerFunc(deleteRequestHandler.GetAllDeleteRequestsHandler), true, "GET")
193+
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/admin/tsdb/cancel_delete_request", http.HandlerFunc(deleteRequestHandler.CancelDeleteRequestHandler), true, "PUT", "POST")
192194
}
193195

194196
// RegisterRuler registers routes associated with the Ruler service. If the

pkg/chunk/purger/delete_requests_store.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,25 @@ func (ds *DeleteStore) queryCacheGenerationNumber(ctx context.Context, userID st
293293
return genNumber, nil
294294
}
295295

296+
// RemoveDeleteRequest removes a delete request and increments cache gen number
297+
func (ds *DeleteStore) RemoveDeleteRequest(ctx context.Context, userID, requestID string, createdAt, startTime, endTime model.Time) error {
298+
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)
299+
300+
writeBatch := ds.indexClient.NewWriteBatch()
301+
writeBatch.Delete(ds.cfg.RequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID))
302+
303+
// Add another entry with additional details like creation time, time range of delete request and selectors in value
304+
rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime))
305+
writeBatch.Delete(ds.cfg.RequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
306+
[]byte(rangeValue))
307+
308+
// we need to invalidate results cache since removal of delete request would cause query results to change
309+
writeBatch.Add(ds.cfg.RequestsTableName, fmt.Sprintf("%s:%s:%s", cacheGenNum, userID, CacheKindResults),
310+
nil, []byte(strconv.FormatInt(time.Now().Unix(), 10)))
311+
312+
return ds.indexClient.BatchWrite(ctx, writeBatch)
313+
}
314+
296315
func parseDeleteRequestTimestamps(rangeValue []byte, deleteRequest DeleteRequest) (DeleteRequest, error) {
297316
hexParts := strings.Split(string(rangeValue), ":")
298317
if len(hexParts) != 3 {

pkg/chunk/purger/purger.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ import (
2424
"github.com/cortexproject/cortex/pkg/util/services"
2525
)
2626

27-
const millisecondPerDay = int64(24 * time.Hour / time.Millisecond)
27+
const (
28+
millisecondPerDay = int64(24 * time.Hour / time.Millisecond)
29+
deleteRequestCancellationDeadline = 24 * time.Hour
30+
)
2831

2932
type purgerMetrics struct {
3033
deleteRequestsProcessedTotal *prometheus.CounterVec
@@ -335,7 +338,8 @@ func (dp *DataPurger) pullDeleteRequestsToPlanDeletes() error {
335338
}
336339

337340
for _, deleteRequest := range deleteRequests {
338-
if deleteRequest.CreatedAt.Add(24 * time.Hour).After(model.Now()) {
341+
// adding an extra minute here to avoid a race between cancellation of request and picking of the request for processing
342+
if deleteRequest.CreatedAt.Add(deleteRequestCancellationDeadline).Add(time.Minute).After(model.Now()) {
339343
continue
340344
}
341345

pkg/chunk/purger/request_handler.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"fmt"
66
"net/http"
77

8+
"github.com/go-kit/kit/log/level"
9+
810
"github.com/prometheus/client_golang/prometheus"
911
"github.com/prometheus/client_golang/prometheus/promauto"
1012
"github.com/prometheus/common/model"
@@ -102,6 +104,7 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r
102104
}
103105

104106
if err := dm.deleteStore.AddDeleteRequest(ctx, userID, model.Time(startTime), model.Time(endTime), match); err != nil {
107+
level.Error(util.Logger).Log("msg", "error adding delete request to the store", "err", err)
105108
http.Error(w, err.Error(), http.StatusInternalServerError)
106109
return
107110
}
@@ -120,11 +123,56 @@ func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWrite
120123

121124
deleteRequests, err := dm.deleteStore.GetAllDeleteRequestsForUser(ctx, userID)
122125
if err != nil {
126+
level.Error(util.Logger).Log("msg", "error getting delete requests from the store", "err", err)
123127
http.Error(w, err.Error(), http.StatusInternalServerError)
124128
return
125129
}
126130

127131
if err := json.NewEncoder(w).Encode(deleteRequests); err != nil {
132+
level.Error(util.Logger).Log("msg", "error marshalling response", "err", err)
128133
http.Error(w, fmt.Sprintf("Error marshalling response: %v", err), http.StatusInternalServerError)
129134
}
130135
}
136+
137+
// CancelDeleteRequestHandler handles delete request cancellation
138+
func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
139+
ctx := r.Context()
140+
userID, err := user.ExtractOrgID(ctx)
141+
if err != nil {
142+
http.Error(w, err.Error(), http.StatusBadRequest)
143+
return
144+
}
145+
146+
params := r.URL.Query()
147+
requestID := params.Get("request_id")
148+
149+
deleteRequest, err := dm.deleteStore.GetDeleteRequest(ctx, userID, requestID)
150+
if err != nil {
151+
level.Error(util.Logger).Log("msg", "error getting delete request from the store", "err", err)
152+
http.Error(w, err.Error(), http.StatusInternalServerError)
153+
return
154+
}
155+
156+
if deleteRequest == nil {
157+
http.Error(w, "could not find delete request with given id", http.StatusBadRequest)
158+
return
159+
}
160+
161+
if deleteRequest.Status != StatusReceived {
162+
http.Error(w, "deletion of request which is in process or already processed is not allowed", http.StatusBadRequest)
163+
return
164+
}
165+
166+
if deleteRequest.CreatedAt.Add(deleteRequestCancellationDeadline).Before(model.Now()) {
167+
http.Error(w, fmt.Sprintf("deletion of request past the deadline of %s since its creation is not allowed", deleteRequestCancellationDeadline.String()), http.StatusBadRequest)
168+
return
169+
}
170+
171+
if err := dm.deleteStore.RemoveDeleteRequest(ctx, userID, requestID, deleteRequest.CreatedAt, deleteRequest.StartTime, deleteRequest.EndTime); err != nil {
172+
level.Error(util.Logger).Log("msg", "error cancelling the delete request", "err", err)
173+
http.Error(w, err.Error(), http.StatusInternalServerError)
174+
return
175+
}
176+
177+
w.WriteHeader(http.StatusOK)
178+
}

0 commit comments

Comments
 (0)