From 1a93c67a2f085c6f744ba83aa4336ceadc76ab12 Mon Sep 17 00:00:00 2001 From: Tochemey Date: Thu, 12 Dec 2024 16:06:51 +0000 Subject: [PATCH] refactor: refactor events stream queue implementation --- internal/queue/queue.go | 106 ++++++++++++++++++++++------------------ 1 file changed, 59 insertions(+), 47 deletions(-) diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 7b866e5..85af643 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -34,7 +34,7 @@ import ( type Queue struct { head unsafe.Pointer // pointer to the head of the queue tail unsafe.Pointer // pointer to the tail of the queue - len uint64 // length of the queue + len int64 // length of the queue pool sync.Pool } @@ -51,6 +51,7 @@ func NewQueue() *Queue { return &Queue{ head: unsafe.Pointer(dummy), // both head and tail point to the dummy node tail: unsafe.Pointer(dummy), + len: 0, pool: sync.Pool{ New: func() interface{} { return &item{} @@ -61,25 +62,31 @@ func NewQueue() *Queue { // Enqueue adds a value to the tail of the queue. func (q *Queue) Enqueue(v interface{}) { - xitem := q.pool.Get().(*item) - xitem.next = nil - xitem.v = v + // Get a node from the pool + newNode := q.getItem() + newNode.v = v + newNodePtr := unsafe.Pointer(newNode) + for { - last := loadItem(&q.tail) // Load current tail - lastNext := loadItem(&last.next) // Load the next pointer of tail - if last == loadItem(&q.tail) { // Check tail consistency - if lastNext == nil { // Is tail really pointing to the last node? - // Try to link the new item at the end - if casItem(&last.next, nil, xitem) { - // Enqueue successful, now try to move the tail pointer - casItem(&q.tail, last, xitem) - atomic.AddUint64(&q.len, 1) - return - } - } else { - // Tail was pointing to an intermediate node, help move it forward - casItem(&q.tail, last, lastNext) - } + tail := (*item)(atomic.LoadPointer(&q.tail)) + next := atomic.LoadPointer(&tail.next) + + // Another thread might have already enqueued a node + if next != nil { + // Try to help advance the tail + atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), next) + continue + } + + // Try to link the new node + if atomic.CompareAndSwapPointer(&tail.next, nil, newNodePtr) { + // Successfully linked, now try to advance tail + atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), newNodePtr) + + // Increment length atomically + atomic.AddInt64(&q.len, 1) + + return } } } @@ -88,46 +95,51 @@ func (q *Queue) Enqueue(v interface{}) { // It returns nil if the queue is empty. func (q *Queue) Dequeue() interface{} { for { - head := loadItem(&q.head) // Load the current head - tail := loadItem(&q.tail) // Load the current tail - next := loadItem(&head.next) // Load the next node after head - if head == loadItem(&q.head) { // Check head consistency - if head == tail { // Is the queue empty? - if next == nil { // Confirm that queue is empty - return nil - } - // Tail is lagging behind, move it forward - casItem(&q.tail, tail, next) - } else { - // Get the value before CAS to avoid freeing the node too early - v := next.v - // Try to swing the head to the next node - if casItem(&q.head, head, next) { - atomic.AddUint64(&q.len, ^uint64(0)) // decrement length - q.pool.Put(next) - return v // return the dequeued value - } - } + head := (*item)(atomic.LoadPointer(&q.head)) + next := atomic.LoadPointer(&head.next) + + // Queue is empty + if next == nil { + return nil + } + + nextNode := (*item)(next) + + // Try to advance the head + if atomic.CompareAndSwapPointer(&q.head, unsafe.Pointer(head), next) { + // Get the value before potentially releasing the node + value := nextNode.v + + // Release the old head node back to the pool + q.releaseItem(head) + + // Decrement length atomically + atomic.AddInt64(&q.len, -1) + + return value } } } // Length returns the number of items in the queue. func (q *Queue) Length() uint64 { - return atomic.LoadUint64(&q.len) + return uint64(atomic.LoadInt64(&q.len)) } // IsEmpty returns true when the queue is empty func (q *Queue) IsEmpty() bool { - return atomic.LoadUint64(&q.len) == 0 + return atomic.LoadInt64(&q.len) == 0 } -// loadItem atomically loads an item pointer from the given unsafe pointer. -func loadItem(p *unsafe.Pointer) *item { - return (*item)(atomic.LoadPointer(p)) +// getItem retrieves a node from the pool or creates a new one +func (q *Queue) getItem() *item { + return q.pool.Get().(*item) } -// casItem performs an atomic compare-and-swap on an unsafe pointer. -func casItem(p *unsafe.Pointer, old, new *item) bool { - return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new)) +// releaseItem returns a node to the pool for reuse +func (q *Queue) releaseItem(i *item) { + // Reset i to prevent memory leaks + i.v = nil + i.next = nil + q.pool.Put(i) }