Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some memory leaks in TTL implementation #358

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (sm *shardedMap[V]) Clear(onEvict func(item *Item[V])) {
for i := uint64(0); i < numShards; i++ {
sm.shards[i].Clear(onEvict)
}
sm.expiryMap.clear()
}

type lockedMap[V any] struct {
Expand Down
72 changes: 50 additions & 22 deletions ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ type bucket map[uint64]uint64
// expirationMap is a map of bucket number to the corresponding bucket.
type expirationMap[V any] struct {
sync.RWMutex
buckets map[int64]bucket
buckets map[int64]bucket
lastCleanedBucketNum int64
}

func newExpirationMap[V any]() *expirationMap[V] {
return &expirationMap[V]{
buckets: make(map[int64]bucket),
buckets: make(map[int64]bucket),
lastCleanedBucketNum: cleanupBucket(time.Now()),
}
}

Expand Down Expand Up @@ -87,6 +89,11 @@ func (m *expirationMap[_]) update(key, conflict uint64, oldExpTime, newExpTime t
delete(oldBucket, key)
}

// Items that don't expire don't need to be in the expiration map.
if newExpTime.IsZero() {
return
}

newBucketNum := storageBucket(newExpTime)
newBucket, ok := m.buckets[newBucketNum]
if !ok {
Expand Down Expand Up @@ -121,29 +128,50 @@ func (m *expirationMap[V]) cleanup(store store[V], policy policy[V], onEvict fun

m.Lock()
now := time.Now()
bucketNum := cleanupBucket(now)
keys := m.buckets[bucketNum]
delete(m.buckets, bucketNum)
currentBucketNum := cleanupBucket(now)
// Clean up all buckets up to and including currentBucketNum, starting from
// (but not including) the last one that was cleaned up
var buckets []bucket
for bucketNum := m.lastCleanedBucketNum + 1; bucketNum <= currentBucketNum; bucketNum++ {
buckets = append(buckets, m.buckets[bucketNum])
delete(m.buckets, bucketNum)
}
m.lastCleanedBucketNum = currentBucketNum
m.Unlock()

for key, conflict := range keys {
expr := store.Expiration(key)
// Sanity check. Verify that the store agrees that this key is expired.
if expr.After(now) {
continue
for _, keys := range buckets {
for key, conflict := range keys {
expr := store.Expiration(key)
// Sanity check. Verify that the store agrees that this key is expired.
if store.Expiration(key).After(now) {
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
continue
}

cost := policy.Cost(key)
policy.Del(key)
_, value := store.Del(key, conflict)

if onEvict != nil {
onEvict(&Item[V]{Key: key,
Conflict: conflict,
Value: value,
Cost: cost,
Expiration: expr,
})
}
}
}
}

cost := policy.Cost(key)
policy.Del(key)
_, value := store.Del(key, conflict)

if onEvict != nil {
onEvict(&Item[V]{Key: key,
Conflict: conflict,
Value: value,
Cost: cost,
Expiration: expr,
})
}
// clear clears the expirationMap, the caller is responsible for properly
// evicting the referenced items
func (m *expirationMap[V]) clear() {
if m == nil {
return
}

m.Lock()
m.buckets = make(map[int64]bucket)
m.lastCleanedBucketNum = cleanupBucket(time.Now())
m.Unlock()
}
Loading