-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added 'resize' operation to BoundedQueue #1949
Added 'resize' operation to BoundedQueue #1949
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1949 +/- ##
==========================================
- Coverage 96.99% 96.94% -0.06%
==========================================
Files 203 203
Lines 10064 10082 +18
==========================================
+ Hits 9762 9774 +12
- Misses 264 269 +5
- Partials 38 39 +1
Continue to review full report at Codecov.
|
how is it supposed to be used? who would call the Resize()? Isn't it possible to just pre-determine the queue size upfront? |
A change to the span processor will make it keep track of the number of spans and the total bytes it has seen during the process lifetime. Once enough spans have been recorded, the span processor starts to calculate what's the ideal queue size based on the average number of spans and available memory. It can then decide to trigger a I have a local branch that does the above, but I thought it would be cleaner to separate those two features into separate PRs.
When we know what's the expected average span size, yes. But I've not heard of any cases where people know in advance what's the average span size, making the queue size hard to calculate. |
I just added a benchmark to the tests, to make ensure that I'm not bringing performance problems to the queue. I'm updating the benchmark results, I was having some data races when reading the len/cap of the channel and changed to use a RWMutex. I ran the benchmark a few times as I'm getting a big standard deviation. Master:
This PR, which seems more consistent:
|
I am skeptical about relying on the average span size, because the actual size differences can be very large, 2-3 orders of magnitude, e.g a basic span vs. span with a stack trace vs. span with request payload added to the logs. I think if we want to go by memory size, we should go by the actual size of spans in the queue. The proto/domain model already implements Size() method iirc, which could be used as an approximation of the actual memory size, and we would need to re-implement the queue using an explicit buffer rather than a bound channel (might be worth looking around for an existing implementation). |
You are right, but in reality, this wouldn't be a big problem. The queue sizes will typically be big enough to make outliers non-relevant. In my working branch for #943, spans generated by I would give this simple approach a try first and see how it works in the real world. |
@yurishkuro any feedback on this one? |
pkg/queue/bounded_queue.go
Outdated
q.resizeLock.Lock() | ||
defer q.resizeLock.Unlock() | ||
|
||
new := make(chan interface{}, newCap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- what if newCap is less than currentCap and current size? This will probably deadlock
- please don't use
new
keyword as var name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Funny, I thought I had added a test for the downscale, but looks like I haven't. Will add one.
I wonder if it's possible to avoid the mutex by approaching this differently. Go routines are cheap to start, so rather than copying the messages into another queue, we could start another set of workers reading from a new channel, close the original channel and let those old workers drain it naturally and exit. |
That could work, let me give it a try. |
PR updated, to use a new channel for the resized queue. |
@yurishkuro would you please re-review? |
Signed-off-by: Juraci Paixão Kröhling <[email protected]>
pkg/queue/bounded_queue.go
Outdated
// we might have two concurrent backing queues at the moment | ||
// their combined size is stored in q.size, and their combined capacity | ||
// should match the capacity of the new queue | ||
if atomic.LoadInt32(&q.size) >= int32(q.Capacity()) && q.Capacity() > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/atomic.LoadInt32(&q.size)/int32(q.Size())
to keep consistent with the capacity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure If I understand the && q.Capacity() > 0
isn't the capacity always bigger than 0, does it matter in this if?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are consumers of this queue that are not setting the queue size. IIRC, one of them is in the span processor tests. Agree with your earlier suggestion though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see there is a test which sets the size to 0
|
||
released, resized := false, false | ||
q.StartConsumers(1, func(item interface{}) { | ||
if !resized { // we'll have a second consumer once the queue is resized |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't find the second consumer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Around line 200:
jaeger/pkg/queue/bounded_queue_test.go
Lines 197 to 200 in a7819b8
resized = true | |
assert.True(t, q.Resize(4)) | |
assert.True(t, q.Produce("e")) // in process by the second consumer | |
secondConsumer.Wait() |
released := false | ||
q.StartConsumers(1, func(item interface{}) { | ||
// once we release the lock, we might end up with multiple calls to reach this | ||
if !released { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Released does not seem to be needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why's that? Note that there's quite a bunch of go funcs just waiting for this lock to be released to start calling consumers with other items. If we have not released, we mark one step down in the work group. If we have released the consumer, we do not want to count down further, as we'd reach a negative count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean you can remove this parameter altogether. It's not changed during the test - only at the end which does not affect tests and the default value satisfy this if statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removal of this variable does not have any effect on the tests. It seems redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needs to be there, otherwise the test will intermittently fail due to "negative counts" in the work group.
|
||
assert.EqualValues(t, 2, q.Capacity()) | ||
assert.EqualValues(t, 4, q.Size()) // the queue will eventually drain, but it will live for a while over capacity | ||
assert.EqualValues(t, 0, len(*q.items)) // the new queue is empty, as the old queue is still full and over capacity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we keep accepting here? There are free spots in the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are free spots in the individual channel, but if the queue size was set by the user to 3000, it's wrong to have 6000 items in the two queues combined.
Signed-off-by: Juraci Paixão Kröhling <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Let's see how this will be applied in the collector. But that is part of a separate issue.
Thanks! |
size int32 | ||
onDroppedItem func(item interface{}) | ||
items chan interface{} | ||
items *chan interface{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is being updated with a CAS in one place but accessed directly in other places, which is technically not thread safe. I would rather we used a normal mutex or atomic.Value
.
Signed-off-by: Juraci Paixão Kröhling [email protected]
Which problem is this PR solving?
Short description of the changes
Resize(int)
method to theBoundedQueue