Skip to content

Commit

Permalink
chunk: Delete leaked objects if it's already deleted by other gorouti…
Browse files Browse the repository at this point in the history
…nes (#4748)

Signed-off-by: Changxin Miao <[email protected]>
  • Loading branch information
polyrabbit authored Apr 23, 2024
1 parent 421faea commit 54e0300
Showing 1 changed file with 43 additions and 25 deletions.
68 changes: 43 additions & 25 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,23 +204,7 @@ func (s *rSlice) ReadAt(ctx context.Context, page *Page, off int) (n int, err er

func (s *rSlice) delete(indx int) error {
key := s.key(indx)
st := time.Now()
var reqID string
err := utils.WithTimeout(func() error {
return s.store.storage.Delete(key, object.WithRequestID(&reqID))
}, s.store.conf.PutTimeout)
used := time.Since(st)
if err != nil && (strings.Contains(err.Error(), "NoSuchKey") ||
strings.Contains(err.Error(), "not found") ||
strings.Contains(err.Error(), "No such file")) {
err = nil
}
logRequest("DELETE", key, "", reqID, err, used)
s.store.objectReqsHistogram.WithLabelValues("DELETE", "").Observe(used.Seconds())
if err != nil {
s.store.objectReqErrors.Add(1)
}
return err
return s.store.delete(key)
}

func (s *rSlice) Remove() error {
Expand Down Expand Up @@ -364,6 +348,26 @@ func (store *cachedStore) put(key string, p *Page) error {
}, store.conf.PutTimeout)
}

func (store *cachedStore) delete(key string) error {
st := time.Now()
var reqID string
err := utils.WithTimeout(func() error {
return store.storage.Delete(key, object.WithRequestID(&reqID))
}, store.conf.PutTimeout)
used := time.Since(st)
if err != nil && (strings.Contains(err.Error(), "NoSuchKey") ||
strings.Contains(err.Error(), "not found") ||
strings.Contains(err.Error(), "No such file")) {
err = nil
}
logRequest("DELETE", key, "", reqID, err, used)
store.objectReqsHistogram.WithLabelValues("DELETE", "").Observe(used.Seconds())
if err != nil {
store.objectReqErrors.Add(1)
}
return err
}

func (store *cachedStore) upload(key string, block *Page, s *wSlice) error {
sync := s != nil
blen := len(block.Data)
Expand Down Expand Up @@ -947,10 +951,7 @@ func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
blen := parseObjOrigSize(key)
f, err := openCacheFile(stagingPath, blen, store.conf.CacheChecksum)
if err != nil {
store.pendingMutex.Lock()
_, ok = store.pendingKeys[key]
store.pendingMutex.Unlock()
if ok {
if store.isPendingValid(key) {
logger.Errorf("Open staging file %s: %s", stagingPath, err)
} else {
logger.Debugf("Key %s is not needed, drop it", key)
Expand All @@ -965,13 +966,23 @@ func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
logger.Errorf("Read staging file %s: %s", stagingPath, err)
return
}
if !store.isPendingValid(key) {
block.Release()
logger.Debugf("Key %s is not needed, drop it", key)
return
}

store.stageBlockDelay.Add(time.Since(item.ts).Seconds())
if err = store.upload(key, block, nil); err == nil {
store.bcache.uploaded(key, blen)
store.removePending(key)
if err := store.bcache.removeStage(key); err != nil {
logger.Warnf("failed to remove stage %s, in upload staging file", stagingPath)
if !store.isPendingValid(key) { // Delete leaked objects if it's already deleted by other goroutines
err := store.delete(key)
logger.Infof("Key %s is not needed, abandoned, err: %v", key, err)
} else {
store.bcache.uploaded(key, blen)
store.removePending(key)
if err := store.bcache.removeStage(key); err != nil {
logger.Warnf("failed to remove stage %s, in upload staging file", stagingPath)
}
}
} else {
item.uploading = false
Expand Down Expand Up @@ -1007,6 +1018,13 @@ func (store *cachedStore) removePending(key string) {
store.pendingMutex.Unlock()
}

func (store *cachedStore) isPendingValid(key string) bool {
store.pendingMutex.Lock()
defer store.pendingMutex.Unlock()
_, ok := store.pendingKeys[key]
return ok
}

func (store *cachedStore) scanDelayedStaging() {
if !store.canUpload() {
return
Expand Down

0 comments on commit 54e0300

Please sign in to comment.