Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exponential Histogram implementation #568

Merged
merged 18 commits into from
Nov 14, 2016
Merged

Conversation

sritchie
Copy link
Collaborator

@sritchie sritchie commented Oct 29, 2016

This PR implements the exponential histogram, or DGIM, algorithm from this paper: http://ilpubs.stanford.edu:8090/504/1/2001-34.pdf

The code for ExpHist and ExpHist.Canonical have a good description of what's going on and how the algorithm works. The supplied tut-based documentation has a description of the l-canonical representation procedure.

Next Steps

  • Add a Monoid implementation based on the algorithm presented here http://megaslides.com/doc/1974/ecm-sketches
  • wrap this impl with a queue that buffers N items before pushing them all into the ExpHist, and update relevant error calculations.
  • Get some benchmarks going. I'm going to propose that we do this in the next PR.

cc @non @johnynek

@sritchie sritchie changed the title [WIP] Exponential Histogram implementation Exponential Histogram implementation Nov 1, 2016
* of 2 matches `s`'s l-canonical representation (for the supplied
* l).
*/
def bucketsFromLong(s: Long, l: Int): Vector[Long] = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@non @johnynek would love some advice on how to efficiently share the calculations going on here and in fromLong, without creating an intermediate return tuple for the values that both functions need.

That said... bucketsFromLong is the only thing we're actually using in the implementation. I could just rebuild fromLong from the results of bucketsFromLong.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is exactly toBuckets(fromLong(s, k)) btw

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add that comment: bucketsFromLong(s, l) == toBuckets(fromLong(s, k))

This is a different representation right? Can we have a different AnyVal wrapper to distinguish?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sure, I'll add some AnyVal wrappers now.

total = total + delta)
}

def oldestBucketSize: Long = if (total == 0) 0L else buckets.last.size
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it'd be a good idea to skip the zero checking by adding a trait ExpHist and a case object EmptyExpHist.

* window of size `windowSize`.
*/
case class Config(epsilon: Double, windowSize: Long) {
val k: Int = math.ceil(1 / epsilon).toInt
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we never really need k, but it shows up in the paper.

val (b @ Bucket(count, _)) +: tail = input
(toDrop - count) match {
case 0 => tail
case x if x < 0 => b.copy(size = -x) +: tail
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only reason I'm not able to use an iterator, this pushing back into the vector. I wonder if we can implement this better with some carry Option. thoughts?

if (delta == 0)
step(timestamp)
else {
addAllWithoutStep(sorted, delta).step(timestamp)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a little subtle. We have to step AFTER adding because the range of timestamps of the new items we're pushing in might be larger than windowSize.

def add(delta: Long, timestamp: Long): ExpHist = {
val self = step(timestamp)
if (delta == 0) self
else self.addAllWithoutStep(Vector(Bucket(delta, timestamp)), delta)
Copy link
Collaborator Author

@sritchie sritchie Nov 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we can step BEFORE, because we know that there's only a single timestamp coming in (vs addAll, see my comment below)

Copy link

@oscar-stripe oscar-stripe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the benchmarks before committing. We don't need to optimize them, but at least add them.

Thanks for working so much on this. Could be really nice if we can figure out to control the growth of error in the associative context (the free tricks, etc..)

def guess: Double =
if (total == 0) 0.0
else (total - (oldestBucketSize - 1) / 2.0)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a Approximate[Long] return value? Those compose somewhat.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boom, will do

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we want approximate[Double], since we want the estimate to fall in the middle of the range?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's talk about this one before I add it, so I can add appropriate tests as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, added!

@inline private[this] def floorPowerOfTwo(x: Long): Int =
JLong.numberOfTrailingZeros(JLong.highestOneBit(x))

@inline private[this] def modPow2(i: Int, exp2: Int): Int = i & ((1 << exp2) - 1)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the bugs on inlining you found? Are we sure none of these exhibit this? Downstream folks still could if they compile with -optimize.

* - ret(i) for all i < j == l or l + 1
* - ret(j) < l + 1
*/
def fromLong(s: Long, l: Int): Vector[Int] = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no curlies...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

*
* the "l" means that
*
* - ret(i) for all i < j == l or l + 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is ret? The returned array? Can we comment to that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed up

* (i = vector index, j = index of last entry)
*
* returns a vector of the the coefficients of s^i in the
* l-canonical representation of s.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this confuses me. If you are returning the coefficients of s^i in the i'th position of the vector, then why isn't the vector infinite? We can take infinite powers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huge typo! should have been 2^i. Fixed up the docs too.

* - ret(i) for all i < j == l or l + 1
* - ret(j) < l + 1
*/
def fromLong(s: Long, l: Int): Vector[Int] = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about final case class CanonicalRepresentation(toVector: Vector[Int]) extends AnyVal

Also, maybe add these specialized types under object ExpHist.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. I moved the whole object inside ExpHist - will add these as well.

* @param rep l-canonical representation of some number s for some l
* @return The original s
*/
def toLong(rep: Vector[Int]): Long =

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be a method on an AnyVal class?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boom, moved that and toBuckets

* of 2 matches `s`'s l-canonical representation (for the supplied
* l).
*/
def bucketsFromLong(s: Long, l: Int): Vector[Long] = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add that comment: bucketsFromLong(s, l) == toBuckets(fromLong(s, k))

This is a different representation right? Can we have a different AnyVal wrapper to distinguish?

}
}

def isPowerOfTwo(i: Long): Boolean = (i & -i) == i

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fancy! :) Hacker's Delight in the house.

@codecov-io
Copy link

codecov-io commented Nov 1, 2016

Current coverage is 64.32% (diff: 96.62%)

Merging #568 into develop will increase coverage by 0.76%

@@            develop       #568   diff @@
==========================================
  Files           110        111     +1   
  Lines          4435       4524    +89   
  Methods        4041       4111    +70   
  Messages          0          0          
  Branches        355        374    +19   
==========================================
+ Hits           2819       2910    +91   
+ Misses         1616       1614     -2   
  Partials          0          0          

Powered by Codecov. Last update 2c5aa7a...84b299b

@sritchie sritchie force-pushed the sritchie/exponential_histogram branch from 1679c94 to 04bcf47 Compare November 3, 2016 18:33
@sritchie
Copy link
Collaborator Author

sritchie commented Nov 4, 2016

Btw the test failure in the update paper link push is fixed by the sbt-mima-plugin bump to 0.1.11 in #556.

@sritchie sritchie force-pushed the sritchie/exponential_histogram branch from f50f1ae to 45f8c5a Compare November 8, 2016 18:57
Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of comments, let me know what you think. I think it looks great.

Do you want to add the monoid or wait on that? It is a bummer that the monoid increases the error.

def addAll(unsorted: Vector[Bucket]): ExpHist =
if (unsorted.isEmpty) this
else {
val sorted = unsorted.sorted(Ordering[Bucket].reverse)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this inside the else branch below? We can compute delta before sorting and if it is zero, just step to the max time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* the returned ExpHist will have the same timestamp, equal to
* `ts`.
*/
def from(i: Long, ts: Long, conf: Config): ExpHist = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should make an case class Timestamp(toLong: Long) extends AnyVal to prevent value/timestamp confusion. Seems really easy to get those wrong. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, let me thread it through and see what it looks like

else {
rep.iterator.zipWithIndex
.map { case (i, exp) => i.toLong << exp }
.reduce(_ + _)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is going to be a fair amount of boxing cost here. Actually calling Monoid.sum( ) will avoid some of that since Monoid is specialized on Long

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

Monoid.sum {
        rep.iterator.zipWithIndex
          .map { case (i, exp) => i.toLong << exp }
      }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

* @return vector of powers of 2 (where ret.sum == the original s)
*/
def toBuckets: Vector[Long] =
rep.zipWithIndex.flatMap { case (i, exp) => List.fill(i)(1L << exp) }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about:

rep.iterator
  .zipWithIndex
  .flatMap { case (i, exp) => Iterator.fill(i)(1L << exp) }
  .toVector

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* into this exponential histogram instance.
*/
def fold: Fold[Bucket, ExpHist] =
Fold.foldLeft(this) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder about using a foldMutable here and accumulating a batch of Bucket and then doing an addAll which might be a lot faster, but we can punt on this if you like.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a test and this impl:

def fold: Fold[Bucket, ExpHist] =
    Fold.foldMutable[Builder[Bucket, Vector[Bucket]], Bucket, ExpHist](
      { case (b, bucket) => b += bucket },
      { _ => Vector.newBuilder[Bucket] },
      { x => addAll(x.result) })

way better imo.

@sritchie
Copy link
Collaborator Author

Okay @johnynek, addressed all comments. I think we should add the Monoid in the next round, so we can experiment with controlling the relative error. We'll probably need to track the error separately from the calculation I have here...

We might need to create an ApproximateHistogram trait that we can use for the ExpHist, the Windows implementation, the ZeroExpHist and the ExpHist that keeps a queue of items around before adding.

@sritchie
Copy link
Collaborator Author

Amazing, the docs build failed after my change!! Fixing now.

@oscar-stripe
Copy link

So good! Love tut docs! Thanks for an excellent example for us to follow @sritchie !

👍

Any concerns @isnotinvain ?

@sritchie
Copy link
Collaborator Author

Thanks for the review, @johnynek! @isnotinvain, would love a final sign off.

@johnynek
Copy link
Collaborator

I think the rules say it is fine to merge now.

@johnynek johnynek merged commit 9dd8f68 into develop Nov 14, 2016
@johnynek johnynek deleted the sritchie/exponential_histogram branch November 14, 2016 18:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants