Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BREAKING] support generic API #321

Merged
merged 29 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
fe3efab
add generics.
aryehlev Nov 25, 2022
4e36ce4
change module name.
aryehlev Nov 25, 2022
bd7c08e
ecxport key type
aryehlev Nov 25, 2022
f92fd57
changeName
aryehlev Nov 25, 2022
40b7ca8
fix nils and make key type comparable.
aryehlev Nov 25, 2022
f1fb7b7
change nils
aryehlev Nov 25, 2022
8ec1534
try cahnge 1
aryehlev Nov 25, 2022
d388a7e
replace names and fix tests to work with generics.
aryehlev Dec 11, 2022
e3ec3dc
Merge remote-tracking branch 'origin/main'
aryehlev Mar 21, 2023
25500b8
fix comments on PR and also fix KeyToHash to be able to deal with nils .
aryehlev Mar 21, 2023
fc1ddb8
remove random change and make version go 1.19 fo performance.
aryehlev Mar 21, 2023
fd222a1
set the keyToHash function.
aryehlev Mar 25, 2023
23c117a
change keyToHash function to take K type
aryehlev Aug 31, 2023
7c98d35
merge cache.go
aryehlev Aug 31, 2023
cf986a0
merge policy.go
aryehlev Aug 31, 2023
4ef9df4
merge ttl.go
aryehlev Aug 31, 2023
6a17a19
Merge remote-tracking branch 'originFar/main'
aryehlev Aug 31, 2023
a8fffe8
resolve merge changes.
aryehlev Aug 31, 2023
e656080
merge cache_test.go.
aryehlev Aug 31, 2023
0d4debf
fix lint ci.
aryehlev Aug 31, 2023
2af8be1
align tests.
aryehlev Aug 31, 2023
ad223b0
Merge remote-tracking branch 'originFar/main'
aryehlev Sep 1, 2023
d59bbcf
put back regular config.KeyToHash.
aryehlev Sep 1, 2023
0a3c1dc
if key to hash isnt working return error instead of fatal.
aryehlev Sep 1, 2023
a052959
Merge remote-tracking branch 'originFar/main'
aryehlev Sep 1, 2023
91a06d8
merge with main.
aryehlev Sep 1, 2023
d2fba36
fix comment spacing.
aryehlev Sep 4, 2023
f7faa68
put keyToHash without function getter.
aryehlev Sep 4, 2023
e5e54ea
fix typo.
aryehlev Sep 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 59 additions & 50 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,40 +36,47 @@ var (
setBufSize = 32 * 1024
)

type itemCallback func(*Item)
const itemSize = int64(unsafe.Sizeof(storeItem[any]{}))

const itemSize = int64(unsafe.Sizeof(storeItem{}))
//type HashableKey interface {
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
// KeyToHash() (uint64, uint64)
//}

func Zero[T any]() T {
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
var zero T
return zero
}

// Cache is a thread-safe implementation of a hashmap with a TinyLFU admission
// policy and a Sampled LFU eviction policy. You can use the same Cache instance
// from as many goroutines as you want.
type Cache struct {
type Cache[K any, V any] struct {
// store is the central concurrent hashmap where key-value items are stored.
store store
store store[V]
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
// policy determines what gets let in to the cache and what gets kicked out.
policy policy
policy policy[V]
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
// getBuf is a custom ring buffer implementation that gets pushed to when
// keys are read.
getBuf *ringBuffer
// setBuf is a buffer allowing us to batch/drop Sets during times of high
// contention.
setBuf chan *Item
setBuf chan *Item[V]
// onEvict is called for item evictions.
onEvict itemCallback
onEvict func(*Item[V])
// onReject is called when an item is rejected via admission policy.
onReject itemCallback
onReject func(*Item[V])
// onExit is called whenever a value goes out of scope from the cache.
onExit (func(interface{}))
onExit (func(V))
// KeyToHash function is used to customize the key hashing algorithm.
// Each key will be hashed using the provided function. If keyToHash value
// is not set, the default keyToHash function is used.
keyToHash func(interface{}) (uint64, uint64)
keyToHash func(K) (uint64, uint64)
// stop is used to stop the processItems goroutine.
stop chan struct{}
// indicates whether cache is closed.
isClosed bool
// cost calculates cost from a value.
cost func(value interface{}) int64
cost func(value V) int64
// ignoreInternalCost dictates whether to ignore the cost of internally storing
// the item in the cost calculation.
ignoreInternalCost bool
Expand All @@ -81,7 +88,7 @@ type Cache struct {
}

// Config is passed to NewCache for creating new Cache instances.
type Config struct {
type Config[K any, V any] struct {
// NumCounters determines the number of counters (keys) to keep that hold
// access frequency information. It's generally a good idea to have more
// counters than the max cache capacity, as this will improve eviction
Expand Down Expand Up @@ -115,21 +122,21 @@ type Config struct {
Metrics bool
// OnEvict is called for every eviction and passes the hashed key, value,
// and cost to the function.
OnEvict func(item *Item)
OnEvict func(item *Item[V])
// OnReject is called for every rejection done via the policy.
OnReject func(item *Item)
OnReject func(item *Item[V])
// OnExit is called whenever a value is removed from cache. This can be
// used to do manual memory deallocation. Would also be called on eviction
// and rejection of the value.
OnExit func(val interface{})
OnExit func(val V)
// KeyToHash function is used to customize the key hashing algorithm.
// Each key will be hashed using the provided function. If keyToHash value
// is not set, the default keyToHash function is used.
KeyToHash func(key interface{}) (uint64, uint64)
KeyToHash func(key K) (uint64, uint64)
// Cost evaluates a value and outputs a corresponding cost. This function
// is ran after Set is called for a new item or an item update with a cost
// param of 0.
Cost func(value interface{}) int64
Cost func(value V) int64
// IgnoreInternalCost set to true indicates to the cache that the cost of
// internally storing the value should be ignored. This is useful when the
// cost passed to set is not using bytes as units. Keep in mind that setting
Expand All @@ -146,18 +153,18 @@ const (
)

// Item is passed to setBuf so items can eventually be added to the cache.
type Item struct {
type Item[V any] struct {
flag itemFlag
Key uint64
Conflict uint64
Value interface{}
Value V
Cost int64
Expiration time.Time
wg *sync.WaitGroup
}

// NewCache returns a new Cache instance and any configuration errors, if any.
func NewCache(config *Config) (*Cache, error) {
func NewCache[K any, V any](config *Config[K, V]) (*Cache[K, V], error) {
switch {
case config.NumCounters == 0:
return nil, errors.New("NumCounters can't be zero")
Expand All @@ -166,37 +173,37 @@ func NewCache(config *Config) (*Cache, error) {
case config.BufferItems == 0:
return nil, errors.New("BufferItems can't be zero")
}
policy := newPolicy(config.NumCounters, config.MaxCost)
cache := &Cache{
store: newStore(),
policy := newPolicy[V](config.NumCounters, config.MaxCost)
cache := &Cache[K, V]{
store: newStore[V](),
policy: policy,
getBuf: newRingBuffer(policy, config.BufferItems),
setBuf: make(chan *Item, setBufSize),
setBuf: make(chan *Item[V], setBufSize),
keyToHash: config.KeyToHash,
stop: make(chan struct{}),
cost: config.Cost,
ignoreInternalCost: config.IgnoreInternalCost,
cleanupTicker: time.NewTicker(time.Duration(bucketDurationSecs) * time.Second / 2),
}
cache.onExit = func(val interface{}) {
if config.OnExit != nil && val != nil {
cache.onExit = func(val V) {
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
if config.OnExit != nil {
config.OnExit(val)
}
}
cache.onEvict = func(item *Item) {
cache.onEvict = func(item *Item[V]) {
if config.OnEvict != nil {
config.OnEvict(item)
}
cache.onExit(item.Value)
}
cache.onReject = func(item *Item) {
cache.onReject = func(item *Item[V]) {
if config.OnReject != nil {
config.OnReject(item)
}
cache.onExit(item.Value)
}
if cache.keyToHash == nil {
cache.keyToHash = z.KeyToHash
cache.keyToHash = z.KeyToHash[K]
}
if config.Metrics {
cache.collectMetrics()
Expand All @@ -205,25 +212,27 @@ func NewCache(config *Config) (*Cache, error) {
// goroutines we have running cache.processItems(), so 1 should
// usually be sufficient
go cache.processItems()

return cache, nil
}

func (c *Cache) Wait() {
func (c *Cache[K, V]) Wait() {
if c == nil || c.isClosed {
return
}
wg := &sync.WaitGroup{}
wg.Add(1)
c.setBuf <- &Item{wg: wg}
c.setBuf <- &Item[V]{wg: wg}
wg.Wait()
}

// Get returns the value (if any) and a boolean representing whether the
// value was found or not. The value can be nil and the boolean can be true at
// the same time.
func (c *Cache) Get(key interface{}) (interface{}, bool) {
if c == nil || c.isClosed || key == nil {
return nil, false
func (c *Cache[K, V]) Get(key K) (V, bool) {

mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
if c == nil || c.isClosed || &key == nil {
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
return Zero[V](), false
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
}
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
keyHash, conflictHash := c.keyToHash(key)
c.getBuf.Push(keyHash)
Expand All @@ -245,16 +254,16 @@ func (c *Cache) Get(key interface{}) (interface{}, bool) {
// To dynamically evaluate the items cost using the Config.Coster function, set
// the cost parameter to 0 and Coster will be ran when needed in order to find
// the items true cost.
func (c *Cache) Set(key, value interface{}, cost int64) bool {
func (c *Cache[K, V]) Set(key K, value V, cost int64) bool {
return c.SetWithTTL(key, value, cost, 0*time.Second)
}

// SetWithTTL works like Set but adds a key-value pair to the cache that will expire
// after the specified TTL (time to live) has passed. A zero value means the value never
// expires, which is identical to calling Set. A negative value is a no-op and the value
// is discarded.
func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration) bool {
if c == nil || c.isClosed || key == nil {
func (c *Cache[K, V]) SetWithTTL(key K, value V, cost int64, ttl time.Duration) bool {
if c == nil || c.isClosed || &key == nil {
return false
}

Expand All @@ -271,7 +280,7 @@ func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration
}

keyHash, conflictHash := c.keyToHash(key)
i := &Item{
i := &Item[V]{
flag: itemNew,
Key: keyHash,
Conflict: conflictHash,
Expand Down Expand Up @@ -302,8 +311,8 @@ func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration
}

// Del deletes the key-value item from the cache if it exists.
func (c *Cache) Del(key interface{}) {
if c == nil || c.isClosed || key == nil {
func (c *Cache[K, V]) Del(key K) {
if c == nil || c.isClosed || &key == nil {
return
}
keyHash, conflictHash := c.keyToHash(key)
Expand All @@ -314,7 +323,7 @@ func (c *Cache) Del(key interface{}) {
// So we must push the same item to `setBuf` with the deletion flag.
// This ensures that if a set is followed by a delete, it will be
// applied in the correct order.
c.setBuf <- &Item{
c.setBuf <- &Item[V]{
flag: itemDelete,
Key: keyHash,
Conflict: conflictHash,
Expand All @@ -323,8 +332,8 @@ func (c *Cache) Del(key interface{}) {

// GetTTL returns the TTL for the specified key and a bool that is true if the
// item was found and is not expired.
func (c *Cache) GetTTL(key interface{}) (time.Duration, bool) {
if c == nil || key == nil {
func (c *Cache[K, V]) GetTTL(key K) (time.Duration, bool) {
if c == nil || &key == nil {
return 0, false
}

Expand All @@ -349,7 +358,7 @@ func (c *Cache) GetTTL(key interface{}) (time.Duration, bool) {
}

// Close stops all goroutines and closes all channels.
func (c *Cache) Close() {
func (c *Cache[K, V]) Close() {
if c == nil || c.isClosed {
return
}
Expand All @@ -366,7 +375,7 @@ func (c *Cache) Close() {
// Clear empties the hashmap and zeroes all policy counters. Note that this is
// not an atomic operation (but that shouldn't be a problem as it's assumed that
// Set/Get calls won't be occurring until after this).
func (c *Cache) Clear() {
func (c *Cache[K, V]) Clear() {
if c == nil || c.isClosed {
return
}
Expand Down Expand Up @@ -404,23 +413,23 @@ loop:
}

// MaxCost returns the max cost of the cache.
func (c *Cache) MaxCost() int64 {
func (c *Cache[K, V]) MaxCost() int64 {
if c == nil {
return 0
}
return c.policy.MaxCost()
}

// UpdateMaxCost updates the maxCost of an existing cache.
func (c *Cache) UpdateMaxCost(maxCost int64) {
func (c *Cache[K, V]) UpdateMaxCost(maxCost int64) {
if c == nil {
return
}
c.policy.UpdateMaxCost(maxCost)
}

// processItems is ran by goroutines processing the Set buffer.
func (c *Cache) processItems() {
func (c *Cache[K, V]) processItems() {
startTs := make(map[uint64]time.Time)
numToKeep := 100000 // TODO: Make this configurable via options.

Expand All @@ -438,7 +447,7 @@ func (c *Cache) processItems() {
}
}
}
onEvict := func(i *Item) {
onEvict := func(i *Item[V]) {
if ts, has := startTs[i.Key]; has {
c.Metrics.trackEviction(int64(time.Since(ts) / time.Second))
delete(startTs, i.Key)
Expand Down Expand Up @@ -497,7 +506,7 @@ func (c *Cache) processItems() {

// collectMetrics just creates a new *Metrics instance and adds the pointers
// to the cache and policy instances.
func (c *Cache) collectMetrics() {
func (c *Cache[K, V]) collectMetrics() {
c.Metrics = newMetrics()
c.policy.CollectMetrics(c.Metrics)
}
Expand Down
Loading