Skip to content

Commit 548e90e

Browse files
authored
Support an analogous Java Guava cache implementation (milvus-io#20831)
Signed-off-by: yun.zhang <[email protected]> Signed-off-by: yun.zhang <[email protected]>
1 parent 243d8cf commit 548e90e

15 files changed

+2263
-821
lines changed

internal/storage/vector_chunk_manager.go

+65-63
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,14 @@ var (
4040
type VectorChunkManager struct {
4141
cacheStorage ChunkManager
4242
vectorStorage ChunkManager
43-
cache *cache.LRU[string, *mmap.ReaderAt]
43+
cache cache.LoadingCache[string, *mmap.ReaderAt]
4444

4545
insertCodec *InsertCodec
4646

4747
cacheEnable bool
4848
cacheLimit int64
4949
cacheSize int64
5050
cacheSizeMutex sync.Mutex
51-
fixSize bool // Prevent cache capactiy from changing too frequently
5251
}
5352

5453
var _ ChunkManager = (*VectorChunkManager)(nil)
@@ -64,31 +63,50 @@ func NewVectorChunkManager(ctx context.Context, cacheStorage ChunkManager, vecto
6463
cacheEnable: cacheEnable,
6564
cacheLimit: cacheLimit,
6665
}
67-
if cacheEnable {
68-
if cacheLimit <= 0 {
69-
return nil, errors.New("cache limit must be positive if cacheEnable")
66+
67+
err := vcm.initCache(ctx)
68+
if err != nil {
69+
return nil, err
70+
}
71+
72+
return vcm, nil
73+
}
74+
75+
func (vcm *VectorChunkManager) initCache(ctx context.Context) error {
76+
if !vcm.cacheEnable {
77+
return nil
78+
}
79+
80+
if vcm.cacheLimit <= 0 {
81+
return errors.New("cache limit must be positive if cacheEnable")
82+
}
83+
84+
loader := func(filePath string) (*mmap.ReaderAt, error) {
85+
return vcm.readFile(ctx, filePath)
86+
}
87+
88+
onRemoveFn := func(filePath string, v *mmap.ReaderAt) {
89+
size := v.Len()
90+
err := v.Close()
91+
if err != nil {
92+
log.Error("close mmap file failed", zap.Any("file", filePath))
7093
}
71-
c, err := cache.NewLRU(defaultLocalCacheSize, func(k string, v *mmap.ReaderAt) {
72-
size := v.Len()
73-
err := v.Close()
74-
if err != nil {
75-
log.Error("Unmmap file failed", zap.Any("file", k))
76-
}
77-
err = cacheStorage.Remove(ctx, k)
78-
if err != nil {
79-
log.Error("cache storage remove file failed", zap.Any("file", k))
80-
}
81-
vcm.cacheSizeMutex.Lock()
82-
vcm.cacheSize -= int64(size)
83-
vcm.cacheSizeMutex.Unlock()
84-
})
94+
err = vcm.cacheStorage.Remove(ctx, filePath)
8595
if err != nil {
86-
return nil, err
96+
log.Error("cache storage remove file failed", zap.Any("file", filePath))
8797
}
88-
vcm.cache = c
98+
99+
vcm.cacheSizeMutex.Lock()
100+
vcm.cacheSize -= int64(size)
101+
vcm.cacheSizeMutex.Unlock()
89102
}
90103

91-
return vcm, nil
104+
vcm.cache = cache.NewLoadingCache(loader,
105+
cache.WithRemovalListener[string, *mmap.ReaderAt](onRemoveFn),
106+
cache.WithMaximumSize[string, *mmap.ReaderAt](vcm.cacheLimit),
107+
)
108+
109+
return nil
92110
}
93111

94112
// For vector data, we will download vector file from storage. And we will
@@ -146,7 +164,7 @@ func (vcm *VectorChunkManager) Exist(ctx context.Context, filePath string) (bool
146164
return vcm.vectorStorage.Exist(ctx, filePath)
147165
}
148166

149-
func (vcm *VectorChunkManager) readWithCache(ctx context.Context, filePath string) ([]byte, error) {
167+
func (vcm *VectorChunkManager) readFile(ctx context.Context, filePath string) (*mmap.ReaderAt, error) {
150168
contents, err := vcm.vectorStorage.Read(ctx, filePath)
151169
if err != nil {
152170
return nil, err
@@ -159,45 +177,31 @@ func (vcm *VectorChunkManager) readWithCache(ctx context.Context, filePath strin
159177
if err != nil {
160178
return nil, err
161179
}
180+
162181
r, err := vcm.cacheStorage.Mmap(ctx, filePath)
163182
if err != nil {
164183
return nil, err
165184
}
166-
size, err := vcm.cacheStorage.Size(ctx, filePath)
167-
if err != nil {
168-
return nil, err
169-
}
170185
vcm.cacheSizeMutex.Lock()
171-
vcm.cacheSize += size
186+
vcm.cacheSize += int64(r.Len())
172187
vcm.cacheSizeMutex.Unlock()
173-
if !vcm.fixSize {
174-
if vcm.cacheSize < vcm.cacheLimit {
175-
if vcm.cache.Len() == vcm.cache.Capacity() {
176-
newSize := float32(vcm.cache.Capacity()) * 1.25
177-
vcm.cache.Resize(int(newSize))
178-
}
179-
} else {
180-
// +1 is for add current value
181-
vcm.cache.Resize(vcm.cache.Len() + 1)
182-
vcm.fixSize = true
183-
}
184-
}
185-
vcm.cache.Add(filePath, r)
186-
return results, nil
188+
return r, nil
187189
}
188190

189191
// Read reads the pure vector data. If cached, it reads from local.
190192
func (vcm *VectorChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) {
191193
if vcm.cacheEnable {
192-
if r, ok := vcm.cache.Get(filePath); ok {
193-
p := make([]byte, r.Len())
194-
_, err := r.ReadAt(p, 0)
195-
if err != nil {
196-
return p, err
197-
}
198-
return p, nil
194+
r, err := vcm.cache.Get(filePath)
195+
if err != nil {
196+
return nil, err
199197
}
200-
return vcm.readWithCache(ctx, filePath)
198+
199+
p := make([]byte, r.Len())
200+
_, err = r.ReadAt(p, 0)
201+
if err != nil {
202+
return nil, err
203+
}
204+
return p, nil
201205
}
202206
contents, err := vcm.vectorStorage.Read(ctx, filePath)
203207
if err != nil {
@@ -238,7 +242,7 @@ func (vcm *VectorChunkManager) ListWithPrefix(ctx context.Context, prefix string
238242

239243
func (vcm *VectorChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error) {
240244
if vcm.cacheEnable && vcm.cache != nil {
241-
if r, ok := vcm.cache.Get(filePath); ok {
245+
if r, err := vcm.cache.Get(filePath); err == nil {
242246
return r, nil
243247
}
244248
}
@@ -252,19 +256,17 @@ func (vcm *VectorChunkManager) Reader(ctx context.Context, filePath string) (Fil
252256
// ReadAt reads specific position data of vector. If cached, it reads from local.
253257
func (vcm *VectorChunkManager) ReadAt(ctx context.Context, filePath string, off int64, length int64) ([]byte, error) {
254258
if vcm.cacheEnable {
255-
if r, ok := vcm.cache.Get(filePath); ok {
256-
p := make([]byte, length)
257-
_, err := r.ReadAt(p, off)
258-
if err != nil {
259-
return nil, err
260-
}
261-
return p, nil
259+
r, err := vcm.cache.Get(filePath)
260+
if err != nil {
261+
return nil, err
262262
}
263-
results, err := vcm.readWithCache(ctx, filePath)
263+
264+
p := make([]byte, length)
265+
_, err = r.ReadAt(p, off)
264266
if err != nil {
265267
return nil, err
266268
}
267-
return results[off : off+length], nil
269+
return p, nil
268270
}
269271
contents, err := vcm.vectorStorage.Read(ctx, filePath)
270272
if err != nil {
@@ -292,7 +294,7 @@ func (vcm *VectorChunkManager) Remove(ctx context.Context, filePath string) erro
292294
return err
293295
}
294296
if vcm.cacheEnable {
295-
vcm.cache.Remove(filePath)
297+
vcm.cache.Invalidate(filePath)
296298
}
297299
return nil
298300
}
@@ -304,7 +306,7 @@ func (vcm *VectorChunkManager) MultiRemove(ctx context.Context, filePaths []stri
304306
}
305307
if vcm.cacheEnable {
306308
for _, p := range filePaths {
307-
vcm.cache.Remove(p)
309+
vcm.cache.Invalidate(p)
308310
}
309311
}
310312
return nil
@@ -321,7 +323,7 @@ func (vcm *VectorChunkManager) RemoveWithPrefix(ctx context.Context, prefix stri
321323
return err
322324
}
323325
for _, p := range filePaths {
324-
vcm.cache.Remove(p)
326+
vcm.cache.Invalidate(p)
325327
}
326328
}
327329
return nil
+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package cache
18+
19+
// Cache implement based on https://github.com/goburrow/cache, which
20+
// provides partial implementations of Guava Cache, mainly support LRU.
21+
22+
// Cache is a key-value cache which entries are added and stayed in the
23+
// cache until either are evicted or manually invalidated.
24+
// TODO: support async clean up expired data
25+
type Cache[K comparable, V any] interface {
26+
// GetIfPresent returns value associated with Key or (nil, false)
27+
// if there is no cached value for Key.
28+
GetIfPresent(K) (V, bool)
29+
30+
// Put associates value with Key. If a value is already associated
31+
// with Key, the old one will be replaced with Value.
32+
Put(K, V)
33+
34+
// Invalidate discards cached value of the given Key.
35+
Invalidate(K)
36+
37+
// InvalidateAll discards all entries.
38+
InvalidateAll()
39+
40+
// Scan walk cache and apply a filter func to each element
41+
Scan(func(K, V) bool) map[K]V
42+
43+
// Stats returns cache statistics.
44+
Stats() *Stats
45+
46+
// Close implements io.Closer for cleaning up all resources.
47+
// Users must ensure the cache is not being used before closing or
48+
// after closed.
49+
Close() error
50+
}
51+
52+
// Func is a generic callback for entry events in the cache.
53+
type Func[K comparable, V any] func(K, V)
54+
55+
// LoadingCache is a cache with values are loaded automatically and stored
56+
// in the cache until either evicted or manually invalidated.
57+
type LoadingCache[K comparable, V any] interface {
58+
Cache[K, V]
59+
60+
// Get returns value associated with Key or call underlying LoaderFunc
61+
// to load value if it is not present.
62+
Get(K) (V, error)
63+
64+
// Refresh loads new value for Key. If the Key already existed, it will
65+
// sync refresh it. or this function will block until the value is loaded.
66+
Refresh(K) error
67+
}
68+
69+
// LoaderFunc retrieves the value corresponding to given Key.
70+
type LoaderFunc[K comparable, V any] func(K) (V, error)
71+
72+
type GetPreLoadDataFunc[K comparable, V any] func() (map[K]V, error)

0 commit comments

Comments
 (0)