-
Notifications
You must be signed in to change notification settings - Fork 347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dynamic Summer, may use heuristics to decide not to keep a tuple in a buffer for aggregation. #314
Conversation
…d harder. Have options to decide what should be a HH, if we should reset our metrics, and what frequency to update the mutable state
backingSummer: AsyncSummer[(Key, Value), Iterable[(Key, Value)]]) | ||
extends AsyncSummer[(Key, Value), Iterable[(Key, Value)]] | ||
with WithFlushConditions[(Key, Value), Iterable[(Key, Value)]] { | ||
type T = (Key, Value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes you reference T, sometimes (Key, Value). Is there a reason for that? flush is an example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lazyness since I often deal with it as a tuple rather than the Key, Value indvidually. I'll collapse it to one or the other this is messy
updated for comments |
case class HeavyHittersPercent(toFloat: Float) | ||
|
||
object DynamicSummer { | ||
// 1% |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's up here?
LGTM pending @johnynek 's comments |
Move the key type to be an Int rather than a Long
hh.containsKey(t) | ||
} | ||
|
||
private[this] final def wrapTraversableOnce(t: Iterator[T]): (ListBuffer[T], Iterator[T]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is really filtering an Iterator and all the ones that pass wind up in the result iterator, all the ones that fail the filter wind up in the ListBuffer, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename (where is TraversableOnce in this picture)? And can we get more descriptive name and comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually why not work on TraversableOnce?
def filter(to: TraversableOnce[T])(pred: T => Boolean): (ListBuffer[T], TraversableOnce[T]) = {
val failed = ListBuffer[T]()
val passed = new TraversableOnce[T] {
def foreach(fn: T => Unit): Unit = {
to.foreach { t => if(pred(t)) fn(t) else failed += t }
}
}
(failed, passed)
}
Then we don't need to do the toIterator conversion when we use this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TraversableOnce has a much wider set of functions required:
method toTraversable in trait TraversableOnce of type => Traversable[T] is not defined
method copyToArray in trait TraversableOnce of type [B >: T](xs: Array[B], start: Int, len: Int)Unit is not defined
method find in trait TraversableOnce of type (p: T => Boolean)Option[T] is not defined
method exists in trait TraversableOnce of type (p: T => Boolean)Boolean is not defined
method forall in trait TraversableOnce of type (p: T => Boolean)Boolean is not defined
method seq in trait TraversableOnce of type => scala.collection.TraversableOnce[T] is not defined
method hasDefiniteSize in trait TraversableOnce of type => Boolean is not defined
method isEmpty in trait TraversableOnce of type => Boolean is not defined
iterators have a much simpler interface.
} | ||
} | ||
|
||
object DynamicSummer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this: HeavyHittersCachingSummer
? I think Dynamic is not descriptive enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any comment on this particular request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I actually just missed this comment entirely. Rename sounds good. will do that now
Can we merge with develop and get the tests green? This could be a big win for summingbird (or scalding). |
Conflicts: algebird-util/src/main/scala/com/twitter/algebird/util/summer/AsyncSummer.scala
Merged/tested and green |
Dynamic Summer, may use heuristics to decide not to keep a tuple in a buffer for aggregation.
No description provided.