Skip to content

Commit

Permalink
The size encoded in a entry header contains the header it self. (#266)
Browse files Browse the repository at this point in the history
Co-authored-by: Fabian Gärtner <[email protected]>
Co-authored-by: Oleg Kovalov <[email protected]>
Co-authored-by: Tomasz Janiszewski <[email protected]>
  • Loading branch information
4 people authored Nov 18, 2021
1 parent 0c76355 commit 16df11e
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 27 deletions.
55 changes: 28 additions & 27 deletions queue/bytes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,23 @@ type queueError struct {
message string
}

// getUvarintSize returns the number of bytes to encode x in uvarint format
func getUvarintSize(x uint32) int {
if x < 128 {
return 1
} else if x < 16384 {
return 2
} else if x < 2097152 {
return 3
} else if x < 268435456 {
return 4
} else {
return 5
// getNeededSize returns the number of bytes an entry of length need in the queue
func getNeededSize(length int) int {
var header int
switch {
case length < 127: // 1<<7-1
header = 1
case length < 16382: // 1<<14-2
header = 2
case length < 2097149: // 1<<21 -3
header = 3
case length < 268435452: // 1<<28 -4
header = 4
default:
header = 5
}

return length + header
}

// NewBytesQueue initialize new bytes queue.
Expand Down Expand Up @@ -82,22 +86,21 @@ func (q *BytesQueue) Reset() {
// Push copies entry at the end of queue and moves tail pointer. Allocates more space if needed.
// Returns index for pushed data or error if maximum size queue limit is reached.
func (q *BytesQueue) Push(data []byte) (int, error) {
dataLen := len(data)
headerEntrySize := getUvarintSize(uint32(dataLen))
neededSize := getNeededSize(len(data))

if !q.canInsertAfterTail(dataLen + headerEntrySize) {
if q.canInsertBeforeHead(dataLen + headerEntrySize) {
if !q.canInsertAfterTail(neededSize) {
if q.canInsertBeforeHead(neededSize) {
q.tail = leftMarginIndex
} else if q.capacity+headerEntrySize+dataLen >= q.maxCapacity && q.maxCapacity > 0 {
} else if q.capacity+neededSize >= q.maxCapacity && q.maxCapacity > 0 {
return -1, &queueError{"Full queue. Maximum size limit reached."}
} else {
q.allocateAdditionalMemory(dataLen + headerEntrySize)
q.allocateAdditionalMemory(neededSize)
}
}

index := q.tail

q.push(data, dataLen)
q.push(data, neededSize)

return index, nil
}
Expand All @@ -120,9 +123,8 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) {

if q.tail <= q.head {
if q.tail != q.head {
headerEntrySize := getUvarintSize(uint32(q.head - q.tail))
emptyBlobLen := q.head - q.tail - headerEntrySize
q.push(make([]byte, emptyBlobLen), emptyBlobLen)
// created slice is slightly larger then need but this is fine after only the needed bytes are copied
q.push(make([]byte, q.head-q.tail), q.head-q.tail)
}

q.head = leftMarginIndex
Expand All @@ -141,7 +143,7 @@ func (q *BytesQueue) push(data []byte, len int) {
headerEntrySize := binary.PutUvarint(q.headerBuffer, uint64(len))
q.copy(q.headerBuffer, headerEntrySize)

q.copy(data, len)
q.copy(data, len-headerEntrySize)

if q.tail > q.head {
q.rightMargin = q.tail
Expand All @@ -159,13 +161,12 @@ func (q *BytesQueue) copy(data []byte, len int) {

// Pop reads the oldest entry from queue and moves head pointer to the next one
func (q *BytesQueue) Pop() ([]byte, error) {
data, headerEntrySize, err := q.peek(q.head)
data, blockSize, err := q.peek(q.head)
if err != nil {
return nil, err
}
size := len(data)

q.head += headerEntrySize + size
q.head += blockSize
q.count--

if q.head == q.rightMargin {
Expand Down Expand Up @@ -238,7 +239,7 @@ func (q *BytesQueue) peek(index int) ([]byte, int, error) {
}

blockSize, n := binary.Uvarint(q.array[index:])
return q.array[index+n : index+n+int(blockSize)], n, nil
return q.array[index+n : index+int(blockSize)], int(blockSize), nil
}

// canInsertAfterTail returns true if it's possible to insert an entry of size of need after the tail of the queue
Expand Down
25 changes: 25 additions & 0 deletions queue/bytes_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,31 @@ func TestAllocateAdditionalSpaceForInsufficientFreeFragmentedSpaceWhereTailIsBef
assertEqual(t, blob('d', 40), pop(queue))
}

func TestAllocateAdditionalSpaceForInsufficientFreeFragmentedSpaceWhereTailIsBeforeHead128(t *testing.T) {
t.Parallel()

// given
queue := NewBytesQueue(200, 0, false)

// when
queue.Push(blob('a', 30)) // header + entry + left margin = 32 bytes
queue.Push(blob('b', 1)) // 32 + 128 + 1 = 161 bytes
queue.Push(blob('b', 125)) // 32 + 128 + 1 = 161 bytes
queue.Push(blob('c', 20)) // 160 + 20 + 1 = 182
queue.Pop() // space freed at the beginning
queue.Pop() // free 2 bytes
queue.Pop() // free 126
queue.Push(blob('d', 30)) // 31 bytes used at the beginning, tail pointer is before head pointer, now free space is 128 bytes
queue.Push(blob('e', 160)) // invoke allocateAdditionalMemory but fill 127 bytes free space (It should be 128 bytes, but 127 are filled, leaving one byte unfilled)

// then
assertEqual(t, 400, queue.Capacity())
assertEqual(t, blob('d', 30), pop(queue))
assertEqual(t, blob(0, 126), pop(queue)) //126 bytes data with 2bytes header only possible as empty entry
assertEqual(t, blob('c', 20), pop(queue)) //The data is not expected
assertEqual(t, blob('e', 160), pop(queue))
}

func TestUnchangedEntriesIndexesAfterAdditionalMemoryAllocationWhereTailIsBeforeHead(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 16df11e

Please sign in to comment.