Skip to content

Commit

Permalink
warmup: evict sub command may remove stage incorrectly (#5095)
Browse files Browse the repository at this point in the history
Signed-off-by: jiefenghuang <[email protected]>
Co-authored-by: Davies Liu <[email protected]>
  • Loading branch information
jiefenghuang and davies authored Aug 23, 2024
1 parent d95a627 commit 293f224
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 12 deletions.
4 changes: 2 additions & 2 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *rSlice) Remove() error {
// any of them should succeed if any blocks is removed
key := s.key(i)
s.store.removePending(key)
s.store.bcache.remove(key)
s.store.bcache.remove(key, true)
}

var err error
Expand Down Expand Up @@ -1115,7 +1115,7 @@ func (store *cachedStore) EvictCache(id uint64, length uint32) error {
r := sliceForRead(id, int(length), store)
keys := r.keys()
for _, k := range keys {
store.bcache.remove(k)
store.bcache.remove(k, false)
}
return nil
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/chunk/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (cache *cacheStore) getPathFromKey(k cacheKey) string {
}
}

func (cache *cacheStore) remove(key string) {
func (cache *cacheStore) remove(key string, staging bool) {
cache.state.beforeCacheOp()
defer cache.state.afterCacheOp()
if cache.state.checkCacheOp() != nil {
Expand All @@ -568,18 +568,25 @@ func (cache *cacheStore) remove(key string) {
if it, ok := cache.keys[k]; ok {
if it.size > 0 {
cache.used -= int64(it.size + 4096)
delete(cache.keys, k)
} else if !staging {
path = "" // for staging block
} else {
delete(cache.keys, k)
}
delete(cache.keys, k)
} else if cache.scanned {
path = "" // not existed
}
cache.Unlock()

if path != "" {
if err := cache.removeFile(path); err != nil && !os.IsNotExist(err) {
logger.Warnf("remove %s failed: %s", path, err)
}
if err := cache.removeStage(key); err != nil && !os.IsNotExist(err) {
logger.Warnf("remove %s failed: %s", cache.stagePath(key), err)
if staging {
if err := cache.removeStage(key); err != nil && !os.IsNotExist(err) {
logger.Warnf("remove stage %s failed: %s", cache.stagePath(key), err)
}
}
}
}
Expand Down Expand Up @@ -647,7 +654,7 @@ func (cache *cacheStore) flush() {
cache.Unlock()
w.page.Release()
if !ok {
cache.remove(w.key)
cache.remove(w.key, false)
}
}
}
Expand Down Expand Up @@ -1004,7 +1011,7 @@ func expandDir(pattern string) []string {

type CacheManager interface {
cache(key string, p *Page, force, dropCache bool)
remove(key string)
remove(key string, staging bool)
load(key string) (ReadCloser, error)
uploaded(key string, size int)
stage(key string, data []byte, keepCache bool) (string, error)
Expand Down Expand Up @@ -1185,10 +1192,10 @@ func (m *cacheManager) load(key string) (ReadCloser, error) {
return r, err
}

func (m *cacheManager) remove(key string) {
func (m *cacheManager) remove(key string, staging bool) {
store := m.getStore(key)
if store != nil {
store.remove(key)
store.remove(key, staging)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/disk_cache_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (dc *unstableDC) doProbe(key string, page *Page) {
}
defer reader.Close()
_, _ = reader.ReadAt(probeBuff, 0)
dc.cache.remove(key)
dc.cache.remove(key, false)
}

func (dc *unstableDC) beforeCacheOp() { dc.concurrency.Add(1) }
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/mem_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c *memcache) delete(key string, p *Page) {
delete(c.pages, key)
}

func (c *memcache) remove(key string) {
func (c *memcache) remove(key string, staging bool) {
c.Lock()
defer c.Unlock()
if item, ok := c.pages[key]; ok {
Expand Down

0 comments on commit 293f224

Please sign in to comment.