Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added 'resize' operation to BoundedQueue #1949

Merged
merged 2 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 59 additions & 13 deletions pkg/queue/bounded_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/uber/jaeger-lib/metrics"
)
Expand All @@ -29,46 +30,55 @@ import (
// channels, with a special Reaper goroutine that wakes up when the queue is full and consumers
// the items from the top of the queue until its size drops back to maxSize
type BoundedQueue struct {
capacity int
size int32
onDroppedItem func(item interface{})
items chan interface{}
items *chan interface{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being updated with a CAS in one place but accessed directly in other places, which is technically not thread safe. I would rather we used a normal mutex or atomic.Value.

stopCh chan struct{}
stopWG sync.WaitGroup
stopped int32
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
consumer func(item interface{})
workers int
}

// NewBoundedQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedQueue(capacity int, onDroppedItem func(item interface{})) *BoundedQueue {
queue := make(chan interface{}, capacity)
return &BoundedQueue{
capacity: capacity,
onDroppedItem: onDroppedItem,
items: make(chan interface{}, capacity),
items: &queue,
stopCh: make(chan struct{}),
}
}

// StartConsumers starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) {
q.workers = num
q.consumer = consumer
var startWG sync.WaitGroup
for i := 0; i < num; i++ {
for i := 0; i < q.workers; i++ {
q.stopWG.Add(1)
startWG.Add(1)
go func() {
go func(queue chan interface{}) {
startWG.Done()
defer q.stopWG.Done()
for {
select {
case item := <-q.items:
atomic.AddInt32(&q.size, -1)
consumer(item)
case item, ok := <-queue:
if ok {
atomic.AddInt32(&q.size, -1)
q.consumer(item)
} else {
// channel closed, finish worker
return
}
case <-q.stopCh:
// the whole queue is closing, finish worker
return
}
}
}()
}(*q.items)
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
}
startWG.Wait()
}
Expand All @@ -79,11 +89,23 @@ func (q *BoundedQueue) Produce(item interface{}) bool {
q.onDroppedItem(item)
return false
}

// we might have two concurrent backing queues at the moment
// their combined size is stored in q.size, and their combined capacity
// should match the capacity of the new queue
if q.Size() >= q.Capacity() && q.Capacity() > 0 {
// current consumers of the queue (like tests) expect the queue capacity = 0 to work
// so, we don't drop items when the capacity is 0
q.onDroppedItem(item)
return false
}
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved

select {
case q.items <- item:
case *q.items <- item:
atomic.AddInt32(&q.size, 1)
return true
default:
// should not happen, as overflows should have been captured earlier
if q.onDroppedItem != nil {
q.onDroppedItem(item)
}
Expand All @@ -97,7 +119,7 @@ func (q *BoundedQueue) Stop() {
atomic.StoreInt32(&q.stopped, 1) // disable producer
close(q.stopCh)
q.stopWG.Wait()
close(q.items)
close(*q.items)
}

// Size returns the current size of the queue
Expand All @@ -107,7 +129,7 @@ func (q *BoundedQueue) Size() int {

// Capacity returns capacity of the queue
func (q *BoundedQueue) Capacity() int {
return q.capacity
return cap(*q.items)
}

// StartLengthReporting starts a timer-based goroutine that periodically reports
Expand All @@ -127,3 +149,27 @@ func (q *BoundedQueue) StartLengthReporting(reportPeriod time.Duration, gauge me
}
}()
}

// Resize changes the capacity of the queue, returning whether the action was successful
func (q *BoundedQueue) Resize(capacity int) bool {
if capacity == cap(*q.items) {
// noop
return false
}

previous := *q.items
queue := make(chan interface{}, capacity)

// swap queues
// #nosec
swapped := atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.items)), unsafe.Pointer(q.items), unsafe.Pointer(&queue))
if swapped {
// start a new set of consumers, based on the information given previously
q.StartConsumers(q.workers, q.consumer)

// gracefully drain the existing queue
close(previous)
}

return swapped
}
171 changes: 171 additions & 0 deletions pkg/queue/bounded_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package queue

import (
"fmt"
"reflect"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -156,3 +157,173 @@ func (s *consumerState) assertConsumed(expected map[string]bool) {
}
assert.Equal(s.t, expected, s.snapshot())
}

func TestResizeUp(t *testing.T) {
q := NewBoundedQueue(2, func(item interface{}) {
fmt.Printf("dropped: %v\n", item)
})

var firstConsumer, secondConsumer, releaseConsumers sync.WaitGroup
firstConsumer.Add(1)
secondConsumer.Add(1)
releaseConsumers.Add(1)

released, resized := false, false
q.StartConsumers(1, func(item interface{}) {
if !resized { // we'll have a second consumer once the queue is resized
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find the second consumer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Around line 200:

resized = true
assert.True(t, q.Resize(4))
assert.True(t, q.Produce("e")) // in process by the second consumer
secondConsumer.Wait()

// signal that the worker is processing
firstConsumer.Done()
} else {
// once we release the lock, we might end up with multiple calls to reach this
if !released {
secondConsumer.Done()
}
}

// wait until we are signaled that we can finish
releaseConsumers.Wait()
})

assert.True(t, q.Produce("a")) // in process
firstConsumer.Wait()

assert.True(t, q.Produce("b")) // in queue
assert.True(t, q.Produce("c")) // in queue
assert.False(t, q.Produce("d")) // dropped
assert.EqualValues(t, 2, q.Capacity())
assert.EqualValues(t, q.Capacity(), q.Size())
assert.EqualValues(t, q.Capacity(), len(*q.items))

resized = true
assert.True(t, q.Resize(4))
assert.True(t, q.Produce("e")) // in process by the second consumer
secondConsumer.Wait()

assert.True(t, q.Produce("f")) // in the new queue
assert.True(t, q.Produce("g")) // in the new queue
assert.False(t, q.Produce("h")) // the new queue has the capacity, but the sum of queues doesn't

assert.EqualValues(t, 4, q.Capacity())
assert.EqualValues(t, q.Capacity(), q.Size()) // the combined queues are at the capacity right now
assert.EqualValues(t, 2, len(*q.items)) // the new internal queue should have two items only

released = true
releaseConsumers.Done()
}

func TestResizeDown(t *testing.T) {
q := NewBoundedQueue(4, func(item interface{}) {
fmt.Printf("dropped: %v\n", item)
})

var consumer, releaseConsumers sync.WaitGroup
consumer.Add(1)
releaseConsumers.Add(1)

released := false
q.StartConsumers(1, func(item interface{}) {
// once we release the lock, we might end up with multiple calls to reach this
if !released {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Released does not seem to be needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why's that? Note that there's quite a bunch of go funcs just waiting for this lock to be released to start calling consumers with other items. If we have not released, we mark one step down in the work group. If we have released the consumer, we do not want to count down further, as we'd reach a negative count.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean you can remove this parameter altogether. It's not changed during the test - only at the end which does not affect tests and the default value satisfy this if statement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removal of this variable does not have any effect on the tests. It seems redundant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to be there, otherwise the test will intermittently fail due to "negative counts" in the work group.

// signal that the worker is processing
consumer.Done()
}

// wait until we are signaled that we can finish
releaseConsumers.Wait()
})

assert.True(t, q.Produce("a")) // in process
consumer.Wait()

assert.True(t, q.Produce("b")) // in queue
assert.True(t, q.Produce("c")) // in queue
assert.True(t, q.Produce("d")) // in queue
assert.True(t, q.Produce("e")) // dropped
assert.EqualValues(t, 4, q.Capacity())
assert.EqualValues(t, q.Capacity(), q.Size())
assert.EqualValues(t, q.Capacity(), len(*q.items))

assert.True(t, q.Resize(2))
assert.False(t, q.Produce("f")) // dropped

assert.EqualValues(t, 2, q.Capacity())
assert.EqualValues(t, 4, q.Size()) // the queue will eventually drain, but it will live for a while over capacity
assert.EqualValues(t, 0, len(*q.items)) // the new queue is empty, as the old queue is still full and over capacity
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we keep accepting here? There are free spots in the queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are free spots in the individual channel, but if the queue size was set by the user to 3000, it's wrong to have 6000 items in the two queues combined.


released = true
releaseConsumers.Done()
}

func TestResizeOldQueueIsDrained(t *testing.T) {
q := NewBoundedQueue(2, func(item interface{}) {
fmt.Printf("dropped: %v\n", item)
})

var consumerReady, consumed, readyToConsume sync.WaitGroup
consumerReady.Add(1)
readyToConsume.Add(1)
consumed.Add(5) // we expect 5 items to be processed

first := true
q.StartConsumers(1, func(item interface{}) {
// first run only
if first {
first = false
consumerReady.Done()
}

readyToConsume.Wait()
consumed.Done()
})

assert.True(t, q.Produce("a"))
consumerReady.Wait()

assert.True(t, q.Produce("b"))
assert.True(t, q.Produce("c"))
assert.False(t, q.Produce("d"))

q.Resize(4)

assert.True(t, q.Produce("e"))
assert.True(t, q.Produce("f"))
assert.False(t, q.Produce("g"))

readyToConsume.Done()
consumed.Wait() // once this returns, we've consumed all items, meaning that both queues are drained
}

func TestNoopResize(t *testing.T) {
q := NewBoundedQueue(2, func(item interface{}) {
})

assert.False(t, q.Resize(2))
}

func TestZeroSize(t *testing.T) {
q := NewBoundedQueue(0, func(item interface{}) {
})

var wg sync.WaitGroup
wg.Add(1)
q.StartConsumers(1, func(item interface{}) {
wg.Done()
})

assert.True(t, q.Produce("a")) // in process
wg.Wait()

// if we didn't finish with a timeout, then we are good
}

func BenchmarkBoundedQueue(b *testing.B) {
q := NewBoundedQueue(1000, func(item interface{}) {
})

q.StartConsumers(10, func(item interface{}) {
})

for n := 0; n < b.N; n++ {
q.Produce(n)
}
}