From 44f2bce7452d62d0a442fec573e8e5becf842abb Mon Sep 17 00:00:00 2001 From: jiefenghuang Date: Mon, 19 Aug 2024 11:18:24 +0800 Subject: [PATCH 1/3] fix: warmup may remove stage incorrectly Signed-off-by: jiefenghuang --- pkg/chunk/cached_store.go | 2 +- pkg/chunk/disk_cache.go | 44 ++++++++++++++++++++++++++++++++------- pkg/chunk/mem_cache.go | 4 ++++ 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/pkg/chunk/cached_store.go b/pkg/chunk/cached_store.go index b0cb41a0fc2f..27dc6becf292 100644 --- a/pkg/chunk/cached_store.go +++ b/pkg/chunk/cached_store.go @@ -1115,7 +1115,7 @@ func (store *cachedStore) EvictCache(id uint64, length uint32) error { r := sliceForRead(id, int(length), store) keys := r.keys() for _, k := range keys { - store.bcache.remove(k) + store.bcache.removeReadCache(k) } return nil } diff --git a/pkg/chunk/disk_cache.go b/pkg/chunk/disk_cache.go index 44d21690fd1d..7588de4a7285 100644 --- a/pkg/chunk/disk_cache.go +++ b/pkg/chunk/disk_cache.go @@ -561,6 +561,33 @@ func (cache *cacheStore) remove(key string) { return } + path := cache.doRemove(key) + if path != "" { + if err := cache.removeFile(path); err != nil && !os.IsNotExist(err) { + logger.Warnf("remove %s failed: %s", path, err) + } + if err := cache.removeStage(key); err != nil && !os.IsNotExist(err) { + logger.Warnf("remove stage %s failed: %s", cache.stagePath(key), err) + } + } +} + +func (cache *cacheStore) removeReadCache(key string) { + cache.state.beforeCacheOp() + defer cache.state.afterCacheOp() + if cache.state.checkCacheOp() != nil { + return + } + + path := cache.doRemove(key) + if path != "" { + if err := cache.removeFile(path); err != nil && !os.IsNotExist(err) { + logger.Warnf("remove %s failed: %s", path, err) + } + } +} + +func (cache *cacheStore) doRemove(key string) string { cache.Lock() delete(cache.pages, key) path := cache.cachePath(key) @@ -574,14 +601,7 @@ func (cache *cacheStore) remove(key string) { path = "" // not existed } cache.Unlock() - if path != "" { - if err := cache.removeFile(path); err != nil && !os.IsNotExist(err) { - logger.Warnf("remove %s failed: %s", path, err) - } - if err := cache.removeStage(key); err != nil && !os.IsNotExist(err) { - logger.Warnf("remove %s failed: %s", cache.stagePath(key), err) - } - } + return path } func (cache *cacheStore) load(key string) (ReadCloser, error) { @@ -1005,6 +1025,7 @@ func expandDir(pattern string) []string { type CacheManager interface { cache(key string, p *Page, force, dropCache bool) remove(key string) + removeReadCache(key string) // remove read cache only load(key string) (ReadCloser, error) uploaded(key string, size int) stage(key string, data []byte, keepCache bool) (string, error) @@ -1192,6 +1213,13 @@ func (m *cacheManager) remove(key string) { } } +func (m *cacheManager) removeReadCache(key string) { + store := m.getStore(key) + if store != nil { + store.removeReadCache(key) + } +} + func (m *cacheManager) stage(key string, data []byte, keepCache bool) (string, error) { store := m.getStore(key) if store != nil { diff --git a/pkg/chunk/mem_cache.go b/pkg/chunk/mem_cache.go index e7fe195c17fd..be7c04676da1 100644 --- a/pkg/chunk/mem_cache.go +++ b/pkg/chunk/mem_cache.go @@ -111,6 +111,10 @@ func (c *memcache) remove(key string) { } } +func (c *memcache) removeReadCache(key string) { + c.remove(key) +} + func (c *memcache) load(key string) (ReadCloser, error) { c.Lock() defer c.Unlock() From 3a6ba15211407e2d0756ce85520360f9ab8c5fda Mon Sep 17 00:00:00 2001 From: jiefenghuang Date: Fri, 23 Aug 2024 10:37:04 +0800 Subject: [PATCH 2/3] fix: skip stage in evit Signed-off-by: jiefenghuang --- pkg/chunk/disk_cache.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/chunk/disk_cache.go b/pkg/chunk/disk_cache.go index 7588de4a7285..e310b43bd369 100644 --- a/pkg/chunk/disk_cache.go +++ b/pkg/chunk/disk_cache.go @@ -579,6 +579,11 @@ func (cache *cacheStore) removeReadCache(key string) { return } + // skip stage + if utils.Exists(cache.stagePath(key)) { + return + } + path := cache.doRemove(key) if path != "" { if err := cache.removeFile(path); err != nil && !os.IsNotExist(err) { From b15421ef19285d2eefa60bac5119f4692658e3bc Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 23 Aug 2024 17:25:51 +0800 Subject: [PATCH 3/3] review --- pkg/chunk/cached_store.go | 4 +-- pkg/chunk/disk_cache.go | 68 +++++++++++------------------------ pkg/chunk/disk_cache_state.go | 2 +- pkg/chunk/mem_cache.go | 6 +--- 4 files changed, 25 insertions(+), 55 deletions(-) diff --git a/pkg/chunk/cached_store.go b/pkg/chunk/cached_store.go index 27dc6becf292..dbf73848faff 100644 --- a/pkg/chunk/cached_store.go +++ b/pkg/chunk/cached_store.go @@ -222,7 +222,7 @@ func (s *rSlice) Remove() error { // any of them should succeed if any blocks is removed key := s.key(i) s.store.removePending(key) - s.store.bcache.remove(key) + s.store.bcache.remove(key, true) } var err error @@ -1115,7 +1115,7 @@ func (store *cachedStore) EvictCache(id uint64, length uint32) error { r := sliceForRead(id, int(length), store) keys := r.keys() for _, k := range keys { - store.bcache.removeReadCache(k) + store.bcache.remove(k, false) } return nil } diff --git a/pkg/chunk/disk_cache.go b/pkg/chunk/disk_cache.go index e310b43bd369..223a7d1b4671 100644 --- a/pkg/chunk/disk_cache.go +++ b/pkg/chunk/disk_cache.go @@ -554,45 +554,13 @@ func (cache *cacheStore) getPathFromKey(k cacheKey) string { } } -func (cache *cacheStore) remove(key string) { +func (cache *cacheStore) remove(key string, staging bool) { cache.state.beforeCacheOp() defer cache.state.afterCacheOp() if cache.state.checkCacheOp() != nil { return } - path := cache.doRemove(key) - if path != "" { - if err := cache.removeFile(path); err != nil && !os.IsNotExist(err) { - logger.Warnf("remove %s failed: %s", path, err) - } - if err := cache.removeStage(key); err != nil && !os.IsNotExist(err) { - logger.Warnf("remove stage %s failed: %s", cache.stagePath(key), err) - } - } -} - -func (cache *cacheStore) removeReadCache(key string) { - cache.state.beforeCacheOp() - defer cache.state.afterCacheOp() - if cache.state.checkCacheOp() != nil { - return - } - - // skip stage - if utils.Exists(cache.stagePath(key)) { - return - } - - path := cache.doRemove(key) - if path != "" { - if err := cache.removeFile(path); err != nil && !os.IsNotExist(err) { - logger.Warnf("remove %s failed: %s", path, err) - } - } -} - -func (cache *cacheStore) doRemove(key string) string { cache.Lock() delete(cache.pages, key) path := cache.cachePath(key) @@ -600,13 +568,27 @@ func (cache *cacheStore) doRemove(key string) string { if it, ok := cache.keys[k]; ok { if it.size > 0 { cache.used -= int64(it.size + 4096) + delete(cache.keys, k) + } else if !staging { + path = "" // for staging block + } else { + delete(cache.keys, k) } - delete(cache.keys, k) } else if cache.scanned { path = "" // not existed } cache.Unlock() - return path + + if path != "" { + if err := cache.removeFile(path); err != nil && !os.IsNotExist(err) { + logger.Warnf("remove %s failed: %s", path, err) + } + if staging { + if err := cache.removeStage(key); err != nil && !os.IsNotExist(err) { + logger.Warnf("remove stage %s failed: %s", cache.stagePath(key), err) + } + } + } } func (cache *cacheStore) load(key string) (ReadCloser, error) { @@ -672,7 +654,7 @@ func (cache *cacheStore) flush() { cache.Unlock() w.page.Release() if !ok { - cache.remove(w.key) + cache.remove(w.key, false) } } } @@ -1029,8 +1011,7 @@ func expandDir(pattern string) []string { type CacheManager interface { cache(key string, p *Page, force, dropCache bool) - remove(key string) - removeReadCache(key string) // remove read cache only + remove(key string, staging bool) load(key string) (ReadCloser, error) uploaded(key string, size int) stage(key string, data []byte, keepCache bool) (string, error) @@ -1211,17 +1192,10 @@ func (m *cacheManager) load(key string) (ReadCloser, error) { return r, err } -func (m *cacheManager) remove(key string) { - store := m.getStore(key) - if store != nil { - store.remove(key) - } -} - -func (m *cacheManager) removeReadCache(key string) { +func (m *cacheManager) remove(key string, staging bool) { store := m.getStore(key) if store != nil { - store.removeReadCache(key) + store.remove(key, staging) } } diff --git a/pkg/chunk/disk_cache_state.go b/pkg/chunk/disk_cache_state.go index d466d96a2c6b..20f614d835af 100644 --- a/pkg/chunk/disk_cache_state.go +++ b/pkg/chunk/disk_cache_state.go @@ -240,7 +240,7 @@ func (dc *unstableDC) doProbe(key string, page *Page) { } defer reader.Close() _, _ = reader.ReadAt(probeBuff, 0) - dc.cache.remove(key) + dc.cache.remove(key, false) } func (dc *unstableDC) beforeCacheOp() { dc.concurrency.Add(1) } diff --git a/pkg/chunk/mem_cache.go b/pkg/chunk/mem_cache.go index be7c04676da1..0c66480e9ae8 100644 --- a/pkg/chunk/mem_cache.go +++ b/pkg/chunk/mem_cache.go @@ -102,7 +102,7 @@ func (c *memcache) delete(key string, p *Page) { delete(c.pages, key) } -func (c *memcache) remove(key string) { +func (c *memcache) remove(key string, staging bool) { c.Lock() defer c.Unlock() if item, ok := c.pages[key]; ok { @@ -111,10 +111,6 @@ func (c *memcache) remove(key string) { } } -func (c *memcache) removeReadCache(key string) { - c.remove(key) -} - func (c *memcache) load(key string) (ReadCloser, error) { c.Lock() defer c.Unlock()