diff --git a/bigcache_148_test.go b/bigcache_148_test.go new file mode 100644 index 00000000..139b570b --- /dev/null +++ b/bigcache_148_test.go @@ -0,0 +1,113 @@ +package bigcache + +import ( + "bytes" + "math/rand" + "runtime" + "strconv" + "testing" + "time" +) + +func Test_issue_148(t *testing.T) { + const n = 2070400 + var message = bytes.Repeat([]byte{0}, 2<<10) + cache, _ := NewBigCache(Config{ + Shards: 1, + LifeWindow: time.Hour, + MaxEntriesInWindow: 10, + MaxEntrySize: len(message), + HardMaxCacheSize: 2 << 13, + }) + for i := 0; i < n; i++ { + err := cache.Set(strconv.Itoa(i), message) + if err != nil { + t.Fatal(err) + } + } + + err := cache.Set(strconv.Itoa(n), message) + if err != nil { + t.Fatal(err) + } + + cache.Get(strconv.Itoa(n)) + + i := 0 + defer func() { + if r := recover(); r != nil { + t.Log("Element: ", i) + t.Fatal(r) + } + }() + + for ; i < n; i++ { + v, err := cache.Get(strconv.Itoa(i)) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(v, message) { + t.Fatal("Should be equal", i, v, message) + } + } +} + +const ( + entries = 3000 + repeats = 1000 +) + +var valBig = append(make([]byte, 100*1024), 1) +var valMed = append(make([]byte, 1024), 1) +var valSmall = append(make([]byte, 2), 1) + +func getValue() []byte { + x := rand.Float64() + if x < 0.7 { + return valSmall + } else if x < 0.9 { + return valMed + } + return valBig +} + +// https://github.com/allegro/bigcache/issues/234#issuecomment-673895517 +func Test_issue_234(t *testing.T) { + t.Log("Number of entries: ", entries) + printAllocs(t) + rand.Seed(1) + + config := Config{ + Shards: 128, + LifeWindow: time.Hour, + CleanWindow: time.Second, + MaxEntriesInWindow: entries, + MaxEntrySize: 1024, + Verbose: true, + HardMaxCacheSize: 128, + OnRemoveWithReason: func(key string, entry []byte, reason RemoveReason) { + t.Log("Evicted:", len(entry), " reason: ", reason) + }, + } + + bigcache, err := NewBigCache(config) + if err != nil { + panic(err) + } + for i := 0; i < repeats; i++ { + printAllocs(t) + for j := 0; j < entries; j++ { + key := strconv.FormatInt(int64(j), 10) + err := bigcache.Set(key, getValue()) + if err != nil { + t.Fatal(err) + } + } + } +} + +func printAllocs(t *testing.T) { + var m runtime.MemStats + runtime.ReadMemStats(&m) + t.Logf("Alloc: %6d MB \n", m.Alloc/1e6) +} diff --git a/queue/bytes_queue.go b/queue/bytes_queue.go index 927a1783..04926ed7 100644 --- a/queue/bytes_queue.go +++ b/queue/bytes_queue.go @@ -24,15 +24,15 @@ var ( type BytesQueue struct { full bool array []byte - capacity int - maxCapacity int - head int - tail int + capacity uint64 + maxCapacity uint64 + head uint64 + tail uint64 count int - rightMargin int + rightMargin uint64 headerBuffer []byte verbose bool - initialCapacity int + initialCapacity uint64 } type queueError struct { @@ -40,7 +40,7 @@ type queueError struct { } // getUvarintSize returns the number of bytes to encode x in uvarint format -func getUvarintSize(x uint32) int { +func getUvarintSize(x uint32) uint64 { if x < 128 { return 1 } else if x < 16384 { @@ -60,14 +60,14 @@ func getUvarintSize(x uint32) int { func NewBytesQueue(initialCapacity int, maxCapacity int, verbose bool) *BytesQueue { return &BytesQueue{ array: make([]byte, initialCapacity), - capacity: initialCapacity, - maxCapacity: maxCapacity, + capacity: uint64(initialCapacity), + maxCapacity: uint64(maxCapacity), headerBuffer: make([]byte, binary.MaxVarintLen32), tail: leftMarginIndex, head: leftMarginIndex, rightMargin: leftMarginIndex, verbose: verbose, - initialCapacity: initialCapacity, + initialCapacity: uint64(initialCapacity), } } @@ -84,14 +84,14 @@ func (q *BytesQueue) Reset() { // Push copies entry at the end of queue and moves tail pointer. Allocates more space if needed. // Returns index for pushed data or error if maximum size queue limit is reached. func (q *BytesQueue) Push(data []byte) (int, error) { - dataLen := len(data) + dataLen := uint64(len(data)) headerEntrySize := getUvarintSize(uint32(dataLen)) if !q.canInsertAfterTail(dataLen + headerEntrySize) { if q.canInsertBeforeHead(dataLen + headerEntrySize) { q.tail = leftMarginIndex } else if q.capacity+headerEntrySize+dataLen >= q.maxCapacity && q.maxCapacity > 0 { - return -1, &queueError{"Full queue. Maximum size limit reached."} + return 0, &queueError{"Full queue. Maximum size limit reached."} } else { q.allocateAdditionalMemory(dataLen + headerEntrySize) } @@ -101,10 +101,10 @@ func (q *BytesQueue) Push(data []byte) (int, error) { q.push(data, dataLen) - return index, nil + return int(index), nil } -func (q *BytesQueue) allocateAdditionalMemory(minimum int) { +func (q *BytesQueue) allocateAdditionalMemory(minimum uint64) { start := time.Now() if q.capacity < minimum { q.capacity += minimum @@ -136,8 +136,8 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) { } } -func (q *BytesQueue) push(data []byte, len int) { - headerEntrySize := binary.PutUvarint(q.headerBuffer, uint64(len)) +func (q *BytesQueue) push(data []byte, len uint64) { + headerEntrySize := uint64(binary.PutUvarint(q.headerBuffer, len)) q.copy(q.headerBuffer, headerEntrySize) q.copy(data, len) @@ -152,8 +152,8 @@ func (q *BytesQueue) push(data []byte, len int) { q.count++ } -func (q *BytesQueue) copy(data []byte, len int) { - q.tail += copy(q.array[q.tail:], data[:len]) +func (q *BytesQueue) copy(data []byte, len uint64) { + q.tail += uint64(copy(q.array[q.tail:], data[:len])) } // Pop reads the oldest entry from queue and moves head pointer to the next one @@ -162,7 +162,7 @@ func (q *BytesQueue) Pop() ([]byte, error) { if err != nil { return nil, err } - size := len(data) + size := uint64(len(data)) q.head += headerEntrySize + size q.count-- @@ -188,18 +188,18 @@ func (q *BytesQueue) Peek() ([]byte, error) { // Get reads entry from index func (q *BytesQueue) Get(index int) ([]byte, error) { - data, _, err := q.peek(index) + data, _, err := q.peek(uint64(index)) return data, err } // CheckGet checks if an entry can be read from index func (q *BytesQueue) CheckGet(index int) error { - return q.peekCheckErr(index) + return q.peekCheckErr(uint64(index)) } // Capacity returns number of allocated bytes for queue func (q *BytesQueue) Capacity() int { - return q.capacity + return int(q.capacity) } // Len returns number of entries kept in queue @@ -213,7 +213,7 @@ func (e *queueError) Error() string { } // peekCheckErr is identical to peek, but does not actually return any data -func (q *BytesQueue) peekCheckErr(index int) error { +func (q *BytesQueue) peekCheckErr(index uint64) error { if q.count == 0 { return errEmptyQueue @@ -223,25 +223,26 @@ func (q *BytesQueue) peekCheckErr(index int) error { return errInvalidIndex } - if index >= len(q.array) { + if index >= uint64(len(q.array)) { return errIndexOutOfBounds } return nil } // peek returns the data from index and the number of bytes to encode the length of the data in uvarint format -func (q *BytesQueue) peek(index int) ([]byte, int, error) { +func (q *BytesQueue) peek(index uint64) ([]byte, uint64, error) { err := q.peekCheckErr(index) if err != nil { return nil, 0, err } blockSize, n := binary.Uvarint(q.array[index:]) - return q.array[index+n : index+n+int(blockSize)], n, nil + un := uint64(n) + return q.array[index+un : index+un+blockSize], un, nil } // canInsertAfterTail returns true if it's possible to insert an entry of size of need after the tail of the queue -func (q *BytesQueue) canInsertAfterTail(need int) bool { +func (q *BytesQueue) canInsertAfterTail(need uint64) bool { if q.full { return false } @@ -256,7 +257,7 @@ func (q *BytesQueue) canInsertAfterTail(need int) bool { } // canInsertBeforeHead returns true if it's possible to insert an entry of size of need before the head of the queue -func (q *BytesQueue) canInsertBeforeHead(need int) bool { +func (q *BytesQueue) canInsertBeforeHead(need uint64) bool { if q.full { return false } diff --git a/shard.go b/shard.go index a11e7060..b7cbb057 100644 --- a/shard.go +++ b/shard.go @@ -16,7 +16,7 @@ type Metadata struct { } type cacheShard struct { - hashmap map[uint64]uint32 + hashmap map[uint64]uint64 entries queue.BytesQueue lock sync.RWMutex entryBuffer []byte @@ -135,7 +135,7 @@ func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error { for { if index, err := s.entries.Push(w); err == nil { - s.hashmap[hashedKey] = uint32(index) + s.hashmap[hashedKey] = uint64(index) s.lock.Unlock() return nil } @@ -163,7 +163,7 @@ func (s *cacheShard) setWithoutLock(key string, hashedKey uint64, entry []byte) for { if index, err := s.entries.Push(w); err == nil { - s.hashmap[hashedKey] = uint32(index) + s.hashmap[hashedKey] = uint64(index) return nil } if s.removeOldestEntry(NoSpace) != nil { @@ -310,7 +310,7 @@ func (s *cacheShard) removeOldestEntry(reason RemoveReason) error { func (s *cacheShard) reset(config Config) { s.lock.Lock() - s.hashmap = make(map[uint64]uint32, config.initialShardSize()) + s.hashmap = make(map[uint64]uint64, config.initialShardSize()) s.entryBuffer = make([]byte, config.MaxEntrySize+headersSizeInBytes) s.entries.Reset() s.lock.Unlock() @@ -390,7 +390,7 @@ func (s *cacheShard) collision() { func initNewShard(config Config, callback onRemoveCallback, clock clock) *cacheShard { return &cacheShard{ - hashmap: make(map[uint64]uint32, config.initialShardSize()), + hashmap: make(map[uint64]uint64, config.initialShardSize()), hashmapStats: make(map[uint64]uint32, config.initialShardSize()), entries: *queue.NewBytesQueue(config.initialShardSize()*config.MaxEntrySize, config.maximumShardSize(), config.Verbose), entryBuffer: make([]byte, config.MaxEntrySize+headersSizeInBytes),