diff --git a/pkg/queue/bounded_queue.go b/pkg/queue/bounded_queue.go index 20d1e39e8e9..b26b2ce7733 100644 --- a/pkg/queue/bounded_queue.go +++ b/pkg/queue/bounded_queue.go @@ -19,6 +19,7 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/uber/jaeger-lib/metrics" ) @@ -29,22 +30,23 @@ import ( // channels, with a special Reaper goroutine that wakes up when the queue is full and consumers // the items from the top of the queue until its size drops back to maxSize type BoundedQueue struct { - capacity int size int32 onDroppedItem func(item interface{}) - items chan interface{} + items *chan interface{} stopCh chan struct{} stopWG sync.WaitGroup stopped int32 + consumer func(item interface{}) + workers int } // NewBoundedQueue constructs the new queue of specified capacity, and with an optional // callback for dropped items (e.g. useful to emit metrics). func NewBoundedQueue(capacity int, onDroppedItem func(item interface{})) *BoundedQueue { + queue := make(chan interface{}, capacity) return &BoundedQueue{ - capacity: capacity, onDroppedItem: onDroppedItem, - items: make(chan interface{}, capacity), + items: &queue, stopCh: make(chan struct{}), } } @@ -52,23 +54,31 @@ func NewBoundedQueue(capacity int, onDroppedItem func(item interface{})) *Bounde // StartConsumers starts a given number of goroutines consuming items from the queue // and passing them into the consumer callback. func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) { + q.workers = num + q.consumer = consumer var startWG sync.WaitGroup - for i := 0; i < num; i++ { + for i := 0; i < q.workers; i++ { q.stopWG.Add(1) startWG.Add(1) - go func() { + go func(queue chan interface{}) { startWG.Done() defer q.stopWG.Done() for { select { - case item := <-q.items: - atomic.AddInt32(&q.size, -1) - consumer(item) + case item, ok := <-queue: + if ok { + atomic.AddInt32(&q.size, -1) + q.consumer(item) + } else { + // channel closed, finish worker + return + } case <-q.stopCh: + // the whole queue is closing, finish worker return } } - }() + }(*q.items) } startWG.Wait() } @@ -79,11 +89,23 @@ func (q *BoundedQueue) Produce(item interface{}) bool { q.onDroppedItem(item) return false } + + // we might have two concurrent backing queues at the moment + // their combined size is stored in q.size, and their combined capacity + // should match the capacity of the new queue + if q.Size() >= q.Capacity() && q.Capacity() > 0 { + // current consumers of the queue (like tests) expect the queue capacity = 0 to work + // so, we don't drop items when the capacity is 0 + q.onDroppedItem(item) + return false + } + select { - case q.items <- item: + case *q.items <- item: atomic.AddInt32(&q.size, 1) return true default: + // should not happen, as overflows should have been captured earlier if q.onDroppedItem != nil { q.onDroppedItem(item) } @@ -97,7 +119,7 @@ func (q *BoundedQueue) Stop() { atomic.StoreInt32(&q.stopped, 1) // disable producer close(q.stopCh) q.stopWG.Wait() - close(q.items) + close(*q.items) } // Size returns the current size of the queue @@ -107,7 +129,7 @@ func (q *BoundedQueue) Size() int { // Capacity returns capacity of the queue func (q *BoundedQueue) Capacity() int { - return q.capacity + return cap(*q.items) } // StartLengthReporting starts a timer-based goroutine that periodically reports @@ -127,3 +149,27 @@ func (q *BoundedQueue) StartLengthReporting(reportPeriod time.Duration, gauge me } }() } + +// Resize changes the capacity of the queue, returning whether the action was successful +func (q *BoundedQueue) Resize(capacity int) bool { + if capacity == cap(*q.items) { + // noop + return false + } + + previous := *q.items + queue := make(chan interface{}, capacity) + + // swap queues + // #nosec + swapped := atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.items)), unsafe.Pointer(q.items), unsafe.Pointer(&queue)) + if swapped { + // start a new set of consumers, based on the information given previously + q.StartConsumers(q.workers, q.consumer) + + // gracefully drain the existing queue + close(previous) + } + + return swapped +} diff --git a/pkg/queue/bounded_queue_test.go b/pkg/queue/bounded_queue_test.go index f54211a137b..983512d5a5a 100644 --- a/pkg/queue/bounded_queue_test.go +++ b/pkg/queue/bounded_queue_test.go @@ -16,6 +16,7 @@ package queue import ( + "fmt" "reflect" "sync" "sync/atomic" @@ -156,3 +157,173 @@ func (s *consumerState) assertConsumed(expected map[string]bool) { } assert.Equal(s.t, expected, s.snapshot()) } + +func TestResizeUp(t *testing.T) { + q := NewBoundedQueue(2, func(item interface{}) { + fmt.Printf("dropped: %v\n", item) + }) + + var firstConsumer, secondConsumer, releaseConsumers sync.WaitGroup + firstConsumer.Add(1) + secondConsumer.Add(1) + releaseConsumers.Add(1) + + released, resized := false, false + q.StartConsumers(1, func(item interface{}) { + if !resized { // we'll have a second consumer once the queue is resized + // signal that the worker is processing + firstConsumer.Done() + } else { + // once we release the lock, we might end up with multiple calls to reach this + if !released { + secondConsumer.Done() + } + } + + // wait until we are signaled that we can finish + releaseConsumers.Wait() + }) + + assert.True(t, q.Produce("a")) // in process + firstConsumer.Wait() + + assert.True(t, q.Produce("b")) // in queue + assert.True(t, q.Produce("c")) // in queue + assert.False(t, q.Produce("d")) // dropped + assert.EqualValues(t, 2, q.Capacity()) + assert.EqualValues(t, q.Capacity(), q.Size()) + assert.EqualValues(t, q.Capacity(), len(*q.items)) + + resized = true + assert.True(t, q.Resize(4)) + assert.True(t, q.Produce("e")) // in process by the second consumer + secondConsumer.Wait() + + assert.True(t, q.Produce("f")) // in the new queue + assert.True(t, q.Produce("g")) // in the new queue + assert.False(t, q.Produce("h")) // the new queue has the capacity, but the sum of queues doesn't + + assert.EqualValues(t, 4, q.Capacity()) + assert.EqualValues(t, q.Capacity(), q.Size()) // the combined queues are at the capacity right now + assert.EqualValues(t, 2, len(*q.items)) // the new internal queue should have two items only + + released = true + releaseConsumers.Done() +} + +func TestResizeDown(t *testing.T) { + q := NewBoundedQueue(4, func(item interface{}) { + fmt.Printf("dropped: %v\n", item) + }) + + var consumer, releaseConsumers sync.WaitGroup + consumer.Add(1) + releaseConsumers.Add(1) + + released := false + q.StartConsumers(1, func(item interface{}) { + // once we release the lock, we might end up with multiple calls to reach this + if !released { + // signal that the worker is processing + consumer.Done() + } + + // wait until we are signaled that we can finish + releaseConsumers.Wait() + }) + + assert.True(t, q.Produce("a")) // in process + consumer.Wait() + + assert.True(t, q.Produce("b")) // in queue + assert.True(t, q.Produce("c")) // in queue + assert.True(t, q.Produce("d")) // in queue + assert.True(t, q.Produce("e")) // dropped + assert.EqualValues(t, 4, q.Capacity()) + assert.EqualValues(t, q.Capacity(), q.Size()) + assert.EqualValues(t, q.Capacity(), len(*q.items)) + + assert.True(t, q.Resize(2)) + assert.False(t, q.Produce("f")) // dropped + + assert.EqualValues(t, 2, q.Capacity()) + assert.EqualValues(t, 4, q.Size()) // the queue will eventually drain, but it will live for a while over capacity + assert.EqualValues(t, 0, len(*q.items)) // the new queue is empty, as the old queue is still full and over capacity + + released = true + releaseConsumers.Done() +} + +func TestResizeOldQueueIsDrained(t *testing.T) { + q := NewBoundedQueue(2, func(item interface{}) { + fmt.Printf("dropped: %v\n", item) + }) + + var consumerReady, consumed, readyToConsume sync.WaitGroup + consumerReady.Add(1) + readyToConsume.Add(1) + consumed.Add(5) // we expect 5 items to be processed + + first := true + q.StartConsumers(1, func(item interface{}) { + // first run only + if first { + first = false + consumerReady.Done() + } + + readyToConsume.Wait() + consumed.Done() + }) + + assert.True(t, q.Produce("a")) + consumerReady.Wait() + + assert.True(t, q.Produce("b")) + assert.True(t, q.Produce("c")) + assert.False(t, q.Produce("d")) + + q.Resize(4) + + assert.True(t, q.Produce("e")) + assert.True(t, q.Produce("f")) + assert.False(t, q.Produce("g")) + + readyToConsume.Done() + consumed.Wait() // once this returns, we've consumed all items, meaning that both queues are drained +} + +func TestNoopResize(t *testing.T) { + q := NewBoundedQueue(2, func(item interface{}) { + }) + + assert.False(t, q.Resize(2)) +} + +func TestZeroSize(t *testing.T) { + q := NewBoundedQueue(0, func(item interface{}) { + }) + + var wg sync.WaitGroup + wg.Add(1) + q.StartConsumers(1, func(item interface{}) { + wg.Done() + }) + + assert.True(t, q.Produce("a")) // in process + wg.Wait() + + // if we didn't finish with a timeout, then we are good +} + +func BenchmarkBoundedQueue(b *testing.B) { + q := NewBoundedQueue(1000, func(item interface{}) { + }) + + q.StartConsumers(10, func(item interface{}) { + }) + + for n := 0; n < b.N; n++ { + q.Produce(n) + } +}