Skip to content

Commit

Permalink
Added 'resize' operation to BoundedQueue (#1949)
Browse files Browse the repository at this point in the history
Signed-off-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
jpkrohling authored Dec 19, 2019
1 parent 46d7f17 commit 3fa8a08
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 13 deletions.
72 changes: 59 additions & 13 deletions pkg/queue/bounded_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/uber/jaeger-lib/metrics"
)
Expand All @@ -29,46 +30,55 @@ import (
// channels, with a special Reaper goroutine that wakes up when the queue is full and consumers
// the items from the top of the queue until its size drops back to maxSize
type BoundedQueue struct {
capacity int
size int32
onDroppedItem func(item interface{})
items chan interface{}
items *chan interface{}
stopCh chan struct{}
stopWG sync.WaitGroup
stopped int32
consumer func(item interface{})
workers int
}

// NewBoundedQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedQueue(capacity int, onDroppedItem func(item interface{})) *BoundedQueue {
queue := make(chan interface{}, capacity)
return &BoundedQueue{
capacity: capacity,
onDroppedItem: onDroppedItem,
items: make(chan interface{}, capacity),
items: &queue,
stopCh: make(chan struct{}),
}
}

// StartConsumers starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) {
q.workers = num
q.consumer = consumer
var startWG sync.WaitGroup
for i := 0; i < num; i++ {
for i := 0; i < q.workers; i++ {
q.stopWG.Add(1)
startWG.Add(1)
go func() {
go func(queue chan interface{}) {
startWG.Done()
defer q.stopWG.Done()
for {
select {
case item := <-q.items:
atomic.AddInt32(&q.size, -1)
consumer(item)
case item, ok := <-queue:
if ok {
atomic.AddInt32(&q.size, -1)
q.consumer(item)
} else {
// channel closed, finish worker
return
}
case <-q.stopCh:
// the whole queue is closing, finish worker
return
}
}
}()
}(*q.items)
}
startWG.Wait()
}
Expand All @@ -79,11 +89,23 @@ func (q *BoundedQueue) Produce(item interface{}) bool {
q.onDroppedItem(item)
return false
}

// we might have two concurrent backing queues at the moment
// their combined size is stored in q.size, and their combined capacity
// should match the capacity of the new queue
if q.Size() >= q.Capacity() && q.Capacity() > 0 {
// current consumers of the queue (like tests) expect the queue capacity = 0 to work
// so, we don't drop items when the capacity is 0
q.onDroppedItem(item)
return false
}

select {
case q.items <- item:
case *q.items <- item:
atomic.AddInt32(&q.size, 1)
return true
default:
// should not happen, as overflows should have been captured earlier
if q.onDroppedItem != nil {
q.onDroppedItem(item)
}
Expand All @@ -97,7 +119,7 @@ func (q *BoundedQueue) Stop() {
atomic.StoreInt32(&q.stopped, 1) // disable producer
close(q.stopCh)
q.stopWG.Wait()
close(q.items)
close(*q.items)
}

// Size returns the current size of the queue
Expand All @@ -107,7 +129,7 @@ func (q *BoundedQueue) Size() int {

// Capacity returns capacity of the queue
func (q *BoundedQueue) Capacity() int {
return q.capacity
return cap(*q.items)
}

// StartLengthReporting starts a timer-based goroutine that periodically reports
Expand All @@ -127,3 +149,27 @@ func (q *BoundedQueue) StartLengthReporting(reportPeriod time.Duration, gauge me
}
}()
}

// Resize changes the capacity of the queue, returning whether the action was successful
func (q *BoundedQueue) Resize(capacity int) bool {
if capacity == cap(*q.items) {
// noop
return false
}

previous := *q.items
queue := make(chan interface{}, capacity)

// swap queues
// #nosec
swapped := atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.items)), unsafe.Pointer(q.items), unsafe.Pointer(&queue))
if swapped {
// start a new set of consumers, based on the information given previously
q.StartConsumers(q.workers, q.consumer)

// gracefully drain the existing queue
close(previous)
}

return swapped
}
171 changes: 171 additions & 0 deletions pkg/queue/bounded_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package queue

import (
"fmt"
"reflect"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -156,3 +157,173 @@ func (s *consumerState) assertConsumed(expected map[string]bool) {
}
assert.Equal(s.t, expected, s.snapshot())
}

func TestResizeUp(t *testing.T) {
q := NewBoundedQueue(2, func(item interface{}) {
fmt.Printf("dropped: %v\n", item)
})

var firstConsumer, secondConsumer, releaseConsumers sync.WaitGroup
firstConsumer.Add(1)
secondConsumer.Add(1)
releaseConsumers.Add(1)

released, resized := false, false
q.StartConsumers(1, func(item interface{}) {
if !resized { // we'll have a second consumer once the queue is resized
// signal that the worker is processing
firstConsumer.Done()
} else {
// once we release the lock, we might end up with multiple calls to reach this
if !released {
secondConsumer.Done()
}
}

// wait until we are signaled that we can finish
releaseConsumers.Wait()
})

assert.True(t, q.Produce("a")) // in process
firstConsumer.Wait()

assert.True(t, q.Produce("b")) // in queue
assert.True(t, q.Produce("c")) // in queue
assert.False(t, q.Produce("d")) // dropped
assert.EqualValues(t, 2, q.Capacity())
assert.EqualValues(t, q.Capacity(), q.Size())
assert.EqualValues(t, q.Capacity(), len(*q.items))

resized = true
assert.True(t, q.Resize(4))
assert.True(t, q.Produce("e")) // in process by the second consumer
secondConsumer.Wait()

assert.True(t, q.Produce("f")) // in the new queue
assert.True(t, q.Produce("g")) // in the new queue
assert.False(t, q.Produce("h")) // the new queue has the capacity, but the sum of queues doesn't

assert.EqualValues(t, 4, q.Capacity())
assert.EqualValues(t, q.Capacity(), q.Size()) // the combined queues are at the capacity right now
assert.EqualValues(t, 2, len(*q.items)) // the new internal queue should have two items only

released = true
releaseConsumers.Done()
}

func TestResizeDown(t *testing.T) {
q := NewBoundedQueue(4, func(item interface{}) {
fmt.Printf("dropped: %v\n", item)
})

var consumer, releaseConsumers sync.WaitGroup
consumer.Add(1)
releaseConsumers.Add(1)

released := false
q.StartConsumers(1, func(item interface{}) {
// once we release the lock, we might end up with multiple calls to reach this
if !released {
// signal that the worker is processing
consumer.Done()
}

// wait until we are signaled that we can finish
releaseConsumers.Wait()
})

assert.True(t, q.Produce("a")) // in process
consumer.Wait()

assert.True(t, q.Produce("b")) // in queue
assert.True(t, q.Produce("c")) // in queue
assert.True(t, q.Produce("d")) // in queue
assert.True(t, q.Produce("e")) // dropped
assert.EqualValues(t, 4, q.Capacity())
assert.EqualValues(t, q.Capacity(), q.Size())
assert.EqualValues(t, q.Capacity(), len(*q.items))

assert.True(t, q.Resize(2))
assert.False(t, q.Produce("f")) // dropped

assert.EqualValues(t, 2, q.Capacity())
assert.EqualValues(t, 4, q.Size()) // the queue will eventually drain, but it will live for a while over capacity
assert.EqualValues(t, 0, len(*q.items)) // the new queue is empty, as the old queue is still full and over capacity

released = true
releaseConsumers.Done()
}

func TestResizeOldQueueIsDrained(t *testing.T) {
q := NewBoundedQueue(2, func(item interface{}) {
fmt.Printf("dropped: %v\n", item)
})

var consumerReady, consumed, readyToConsume sync.WaitGroup
consumerReady.Add(1)
readyToConsume.Add(1)
consumed.Add(5) // we expect 5 items to be processed

first := true
q.StartConsumers(1, func(item interface{}) {
// first run only
if first {
first = false
consumerReady.Done()
}

readyToConsume.Wait()
consumed.Done()
})

assert.True(t, q.Produce("a"))
consumerReady.Wait()

assert.True(t, q.Produce("b"))
assert.True(t, q.Produce("c"))
assert.False(t, q.Produce("d"))

q.Resize(4)

assert.True(t, q.Produce("e"))
assert.True(t, q.Produce("f"))
assert.False(t, q.Produce("g"))

readyToConsume.Done()
consumed.Wait() // once this returns, we've consumed all items, meaning that both queues are drained
}

func TestNoopResize(t *testing.T) {
q := NewBoundedQueue(2, func(item interface{}) {
})

assert.False(t, q.Resize(2))
}

func TestZeroSize(t *testing.T) {
q := NewBoundedQueue(0, func(item interface{}) {
})

var wg sync.WaitGroup
wg.Add(1)
q.StartConsumers(1, func(item interface{}) {
wg.Done()
})

assert.True(t, q.Produce("a")) // in process
wg.Wait()

// if we didn't finish with a timeout, then we are good
}

func BenchmarkBoundedQueue(b *testing.B) {
q := NewBoundedQueue(1000, func(item interface{}) {
})

q.StartConsumers(10, func(item interface{}) {
})

for n := 0; n < b.N; n++ {
q.Produce(n)
}
}

0 comments on commit 3fa8a08

Please sign in to comment.