Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stack safety and constant factor improvements of Catenable #784

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions benchmark/src/main/scala/fs2/benchmark/CatenableBenchmark.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package fs2
package benchmark

import org.openjdk.jmh.annotations.{Benchmark, State, Scope}

import fs2.util.Catenable

@State(Scope.Thread)
class CatenableBenchmark extends BenchmarkUtils {

val smallCatenable = Catenable(1, 2, 3, 4, 5)
val smallVector = Vector(1, 2, 3, 4, 5)

val largeCatenable = Catenable.fromSeq(0 to 1000000)
val largeVector = (0 to 1000000).toVector

@Benchmark def mapSmallCatenable = smallCatenable.map(_ + 1)
@Benchmark def mapSmallVector = smallVector.map(_ + 1)
@Benchmark def mapLargeCatenable = largeCatenable.map(_ + 1)
@Benchmark def mapLargeVector = largeVector.map(_ + 1)

@Benchmark def foldLeftSmallCatenable = smallCatenable.foldLeft(0)(_ + _)
@Benchmark def foldLeftSmallVector = smallVector.foldLeft(0)(_ + _)
@Benchmark def foldLeftLargeCatenable = largeCatenable.foldLeft(0)(_ + _)
@Benchmark def foldLeftLargeVector = largeVector.foldLeft(0)(_ + _)

@Benchmark def consSmallCatenable = 0 +: smallCatenable
@Benchmark def consSmallVector = 0 +: smallVector
@Benchmark def consLargeCatenable = 0 +: largeCatenable
@Benchmark def consLargeVector = 0 +: largeVector

@Benchmark def createTinyCatenable = Catenable(1)
@Benchmark def createTinyVector = Vector(1)
@Benchmark def createSmallCatenable = Catenable(1, 2, 3, 4, 5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not saying do this, but if you added a Many(seq: Seq[A]) constructor to Catenable, it would obviously speed up fromSeq since it would just be a no-op. However it would probably slow down other operations and would make the implementation more complicated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the same thing actually. Before I went down that path, I wanted to look in to the paper I linked above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO that paper is overkill for this use case (amortized is totally fine) and I'm sure the constant factors will be worse. If you think about it, the implementation of append for Catenable is literally the fastest it could possibly be since it does absolutely nothing. And the reassociating is also going to be very hard to beat in terms of speed since it can be done with a mutable data structure as I suggested above, rather than some fancy deamortized functional version of a similar algorithm.

Paper might still be worth reading just for fun though, don't want to discourage that. :) Also maybe I am totally wrong in my intuition here. :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mpilquist I want to look into adding this data structure into Scala with 2.13, if that's alright with you. @pchiusano I agree that append is as fast as it could be, but I wonder about adding Many or looking at the balanced-tree creation approach in scalaz/scalaz#1022 which may make reassociating more performant.

I am not an expert on the subject but Ed Kmett said that amortized constant is not enough for reflection without remorse; but I am not sure why. Either way it's much more complicated, but I would be very interested in an implementation of this paper in Scala regardless for real-time applications.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pchiusano @edmundnoble I've done a lot of work with amortized factors in functional data structures (a few years ago). The problem with amortization arguments in all contexts, but especially functional data structures, is they often require enormously high values of n to come into play. Additionally, amortization arguments often ignore constant factors (in fact, that's literally what the argument is doing in an inductive fashion), which is not a fair argument to make when your constant factors are extremely high.

There are two excellent and classic examples of this problem: BankersQueue and FingerTree. BankersQueue is basically just a lazy version of the eager "front/back list queue" thing that everyone's tried at least once. And there is a proof that it is more efficient than the eager version… for some arbitrarily large value of n. It turns out that, if you benchmark it, the eager version is almost twice as fast (in Scala) for any queue size that fits into 32bit memory, which is sort of insane. FingerTree is a similar, even more dramatic example. FingerTree is an amortized constant double-ended queue, which is something that the eager banker's queue can't provide, and so in a very real sense it is offering (and proving) better asymptotes, not just better amortization costs. But on the JVM, and for queue sizes less than the billions, it is massively, hilariously slower than the alternatives.

So we have to be careful about this stuff. Amortization arguments, discard by definition performance factors which are relevant and even dominant in practical workloads. And this is especially true on the JVM and with a functional style, where amortization often relies in thunking and other pipeline- and PIC-defeating constructs that deoptimize code (by a constant factor) in ways below the level of algorithmic analysis.

@Benchmark def createSmallVector = Vector(1, 2, 3, 4, 5)
}
12 changes: 6 additions & 6 deletions core/shared/src/main/scala/fs2/StreamCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ private[fs2] object StreamCore {
type O0 = O
def push[G[_],O2](u: NT[F,G], stack: Stack[G,O,O2]) =
Scope.pure { stack pushSegments (NT.convert(s)(u)) }
def render = "Segments(" + s.toStream.toList.mkString(", ") + ")"
def render = "Segments(" + s.toList.mkString(", ") + ")"
}

def scope[F[_],O](s: StreamCore[F,O]): StreamCore[F,O] = StreamCore.evalScope(Scope.snapshot).flatMap { tokens =>
Expand Down Expand Up @@ -374,11 +374,11 @@ private[fs2] object StreamCore {
private final case class Segments[F[_],O](segments: Catenable[Segment[F,O]]) extends Stack[F,O,O] {

def render = {
val segmentsList = segments.toStream.toList
val segmentsList = segments.toList
List(s"Segments (${segmentsList.size})\n"+segmentsList.zipWithIndex.map { case (s, idx) => s" s$idx: $s" }.mkString("\n"))
}
def pushNonEmptySegment(s: Segment[F,O]): Stack[F,O,O] =
Segments(s :: segments)
Segments(s +: segments)

def pushNonEmptySegments(s: Catenable[Segment[F,O]]): Stack[F,O,O] =
Segments(s ++ segments)
Expand Down Expand Up @@ -410,7 +410,7 @@ private[fs2] object StreamCore {
def render = "Map" :: stack.render

def pushNonEmptySegment(s: Segment[F,O1]): Stack[F,O1,O2] =
Map(s :: segments, f, stack)
Map(s +: segments, f, stack)

def pushNonEmptySegments(s: Catenable[Segment[F,O1]]): Stack[F,O1,O2] =
Map(s ++ segments, f, stack)
Expand Down Expand Up @@ -452,7 +452,7 @@ private[fs2] object StreamCore {
def render = "Bind" :: stack.render

def pushNonEmptySegment(s: Segment[F,O1]): Stack[F,O1,O2] =
Bind(s :: segments, f, stack)
Bind(s +: segments, f, stack)

def pushNonEmptySegments(s: Catenable[Segment[F,O1]]): Stack[F,O1,O2] =
Bind(s ++ segments, f, stack)
Expand All @@ -466,7 +466,7 @@ private[fs2] object StreamCore {
case None => stack.pushBind(f).pushSegments(segments).step
case Some((hd,tl)) =>
val segs2 =
(if (tl.isEmpty) segments else segments.push(Segment.Emit(tl))).map(_.interpretBind(f))
(if (tl.isEmpty) segments else segments.cons(Segment.Emit(tl))).map(_.interpretBind(f))
val stack2 = stack.pushSegments(segs2)
(try stack2.pushAppend(f(hd)) catch { case NonFatal(t) => stack2.pushFail(t) }).step
}
Expand Down
136 changes: 100 additions & 36 deletions core/shared/src/main/scala/fs2/util/Catenable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,70 +5,134 @@ import Catenable._
/**
* Trivial catenable sequence. Supports O(1) append, and (amortized)
* O(1) `uncons`, such that walking the sequence via N successive `uncons`
* steps takes O(N). Like a difference list, conversion to a `Stream[A]`
* steps takes O(N). Like a difference list, conversion to a `Seq[A]`
* takes linear time, regardless of how the sequence is built up.
*/
sealed abstract class Catenable[+A] {

/** Returns the head and tail of this catenable if non empty, none otherwise. Amortized O(1). */
def uncons: Option[(A, Catenable[A])] = {
@annotation.tailrec
def go(c: Catenable[A], rights: List[Catenable[A]]): Option[(A,Catenable[A])] = c match {
case Empty => rights match {
case Nil => None
case c :: rights => go(c, rights)
final def uncons: Option[(A, Catenable[A])] = {
var c: Catenable[A] = this
var rights: List[Catenable[A]] = Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you look into using an ArrayList or something similar here? Advantage would be less allocation, and no need to reverse it to do the reduce right, since you have random access.

If you're going to go imperative, go big. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't benchmark it but I figured it would be significantly slower since whatever we use here needs O(1) cons/uncons too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what I was thinking -

  • On each step through the loop, you snoc onto the end of the ArrayList. No consing.
  • Now you get to the end, and do a foldLeft over your ArrayList. Notice that the first element of the ArrayList is the first element you snoc'd (whereas the first element of the List you are building here is the last element you pushed), that is, the ArrayList is already in exactly the right order to implement the reduceRight operation via a foldLeft or reduceLeft! (Assuming I haven't mixed this up... )

var result: Option[(A, Catenable[A])] = null
while (result eq null) {
c match {
case Empty =>
rights match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the mutable approach, this is going to become an isEmpty check, then a call to .last, and then a popping off of the last element of the ArrayList (just by mutably decrementing the internal max index).

case Nil => result = None
case h :: t => c = h; rights = t
}
case Single(a) =>
val next = if (rights.isEmpty) empty else rights.reverse.reduceLeft((x, y) => Append(y,x))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically, you could avoid this reverse call

result = Some(a -> next)
case Append(l, r) => c = l; rights = r :: rights
}
case Single(a) => Some(a -> (if (rights.isEmpty) empty else rights.reduceRight(Append(_,_))))
case Append(l,r) => go(l, r :: rights)
}
go(this, List())
result
}

def isEmpty: Boolean = this match {
case Empty => true
case _ => false // okay since `append` smart constructor guarantees each branch non-empty
}
/** Returns true if there are no elements in this collection. */
def isEmpty: Boolean

/** Concatenates this with `c` in O(1) runtime. */
def ++[A2>:A](c: Catenable[A2])(implicit T: RealSupertype[A,A2]): Catenable[A2] =
final def ++[A2>:A](c: Catenable[A2])(implicit T: RealSupertype[A,A2]): Catenable[A2] =
append(this, c)

/** Alias for [[push]]. */
def ::[A2>:A](a: A2)(implicit T: RealSupertype[A,A2]): Catenable[A2] =
this.push(a)

/** Returns a new catenable consisting of `a` followed by this. O(1) runtime. */
def push[A2>:A](a: A2)(implicit T: RealSupertype[A,A2]): Catenable[A2] =
final def cons[A2>:A](a: A2)(implicit T: RealSupertype[A,A2]): Catenable[A2] =
append(single(a), this)

def toStream: Stream[A] = uncons match {
case None => Stream.empty
case Some((hd, tl)) => hd #:: tl.toStream
/** Alias for [[cons]]. */
final def +:[A2>:A](a: A2)(implicit T: RealSupertype[A,A2]): Catenable[A2] =
cons(a)

/** Returns a new catenable consisting of this followed by `a`. O(1) runtime. */
final def snoc[A2>:A](a: A2)(implicit T: RealSupertype[A,A2]): Catenable[A2] =
append(this, single(a))

/** Alias for [[snoc]]. */
final def :+[A2>:A](a: A2)(implicit T: RealSupertype[A,A2]): Catenable[A2] =
snoc(a)

/** Applies the supplied function to each element and returns a new catenable. */
final def map[B](f: A => B): Catenable[B] =
foldLeft(empty: Catenable[B])((acc, a) => acc :+ f(a))

/** Folds over the elements from left to right using the supplied initial value and function. */
final def foldLeft[B](z: B)(f: (B, A) => B): B = {
var result = z
foreach(a => result = f(result, a))
result
}

/** Applies the supplied function to each element, left to right. */
final def foreach(f: A => Unit): Unit = {
var c: Catenable[A] = this
var rights: List[Catenable[A]] = Nil
while (c ne null) {
c match {
case Empty =>
rights match {
case Nil => c = null
case h :: t => c = h; rights = t
}
case Single(a) =>
f(a)
c = if (rights.isEmpty) Empty else rights.reverse.reduceLeft((x, y) => Append(y,x))
rights = Nil
case Append(l, r) => c = l; rights = r :: rights
}
}
}

def map[B](f: A => B): Catenable[B] = Catenable.fromSeq(toStream.map(f))
/** Converts to a list. */
final def toList: List[A] = {
val builder = List.newBuilder[A]
foreach { a => builder += a; () }
builder.result
}

override def toString = "Catenable(..)"
}

object Catenable {
private case object Empty extends Catenable[Nothing]
private final case class Single[A](a: A) extends Catenable[A]
private final case class Append[A](left: Catenable[A], right: Catenable[A]) extends Catenable[A]
private final case object Empty extends Catenable[Nothing] {
def isEmpty: Boolean = true
}
private final case class Single[A](a: A) extends Catenable[A] {
def isEmpty: Boolean = false
}
private final case class Append[A](left: Catenable[A], right: Catenable[A]) extends Catenable[A] {
def isEmpty: Boolean = false // b/c `append` constructor doesn't allow either branch to be empty
}

/** Empty catenable. */
val empty: Catenable[Nothing] = Empty

/** Creates a catenable of 1 element. */
def single[A](a: A): Catenable[A] = Single(a)

def append[A](c: Catenable[A], c2: Catenable[A]): Catenable[A] = c match {
case Empty => c2
case _ => c2 match {
case Empty => c
case _ => Append(c,c2)
}
}
/** Appends two catenables. */
def append[A](c: Catenable[A], c2: Catenable[A]): Catenable[A] =
if (c.isEmpty) c2
else if (c2.isEmpty) c
else Append(c, c2)

/** Creates a catenable from the specified sequence. */
def fromSeq[A](s: Seq[A]): Catenable[A] =
if (s.isEmpty) empty
else s.view.map(single).reduceRight(Append(_,_))
else s.view.reverse.map(single).reduceLeft((x, y) => Append(y, x))

def apply[A](as: A*): Catenable[A] = fromSeq(as)
/** Creates a catenable from the specified elements. */
def apply[A](as: A*): Catenable[A] = {
// Assumption: `as` is small enough that calling size doesn't outweigh benefit of calling empty/single
as.size match {
case 0 => empty
case 1 => single(as.head)
case n if n <= 1024 =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't in practice an A* passed as something with random access? (Like an ArraySeq?) Maybe exploit this...

The 1024 limit might blow some stacks. Makes me a little nervous.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, scala.collection.mutable.WrappedArray. Maybe 1024 is too high -- we could limit to 16 or 32 and cover 99.9% of cases.

Copy link
Contributor

@pchiusano pchiusano Dec 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd imagine that exploiting the WrappedArray is going to be faster even for small n, since it's a loop vs a bunch of function calls that build and then tear down the call stack...

// nb: avoid cost of reversing input if colection is small
as.view.map(single).reduceRight(Append(_, _))
case n => as.view.reverse.map(single).reduceLeft((x, y) => Append(y, x))
}
}
}