Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Add dequeueAll to Queue #386

Merged
merged 1 commit into from
Dec 4, 2013
Merged

Add dequeueAll to Queue #386

merged 1 commit into from
Dec 4, 2013

Conversation

johnynek
Copy link
Collaborator

Useful to clear out finished futures if we enqueue them.

@@ -76,6 +76,12 @@ abstract class Queue[T] {

private val count = new AtomicInteger(0)

def dequeueAll(fn: T => Boolean): Seq[T] = {
val (result, putBack) = toSeq.partition(fn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of partitioning and putting back, will using a version of trimTo that takes in a predicate work better?
That version could trim all elements that satisfy the predicate.

Or am I missing any queue semantics here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that for the items that fail the predicate need to be put back into the queue. This cannot be done better than we have here if we only have poll and add.

The better solution is to atomically swap the tail out with an empty queue, filter the existing items and push those failing the filter onto a new queue, then move into a phase of draining the tail. When the tail is drained, atomically swap the refilled queue with old location. I don't think this is needed for the case we are considering because there is only one thread processing input tuples and that is the same thread that will cass this dequeueAll. So, there is no (or little) concern with out-of-order issues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks for the explanation.

ianoc added a commit that referenced this pull request Dec 4, 2013
@ianoc ianoc merged commit eedb870 into develop Dec 4, 2013
@ianoc ianoc deleted the queueAll branch December 4, 2013 04:12
snoble pushed a commit to snoble/summingbird that referenced this pull request Sep 8, 2017
applyCumulative method on Aggregator
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants