Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1078] Streaming with Monix's Observable #1112

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ lazy val iterateeVersion = "0.18.0"
lazy val refinedVersion = "0.9.5"
lazy val catsEffectVersion = "1.3.0"
lazy val fs2Version = "1.0.4"
lazy val monixVersion = "3.0.0-RC2"

lazy val compilerOptions = Seq(
"-deprecation",
Expand Down Expand Up @@ -236,7 +237,7 @@ lazy val finch = project.in(file("."))
"io.circe" %% "circe-generic" % circeVersion
))
.aggregate(
core, fs2, iteratee, generic, argonaut, circe, benchmarks, test, jsonTest, examples, refined
core, fs2, iteratee, monix, generic, argonaut, circe, benchmarks, test, jsonTest, examples, refined
)
.dependsOn(core, iteratee, generic, circe)

Expand Down Expand Up @@ -264,6 +265,16 @@ lazy val fs2 = project
)
.dependsOn(core % "compile->compile;test->test")

lazy val monix = project
.settings(moduleName := "finchx-monix")
.settings(allSettings)
.settings(
libraryDependencies ++= Seq(
"io.monix" %% "monix-reactive" % monixVersion
)
)
.dependsOn(core % "compile->compile;test->test")

lazy val generic = project
.settings(moduleName := "finchx-generic")
.settings(allSettings)
Expand Down Expand Up @@ -306,10 +317,11 @@ lazy val circe = project
"io.circe" %% "circe-iteratee" % circeIterateeVersion,
"io.circe" %% "circe-fs2" % circeFs2Version,
"io.circe" %% "circe-jawn" % circeVersion,
"io.monix" %% "monix-circe" % "0.0.1",
"io.circe" %% "circe-generic" % circeVersion % "test"
)
)
.dependsOn(core, jsonTest % "test")
.dependsOn(core, monix, jsonTest % "test")

lazy val refined = project
.settings(moduleName := "finchx-refined")
Expand Down Expand Up @@ -357,7 +369,7 @@ lazy val examples = project
"com.twitter" %% "twitter-server" % twitterVersion
)
)
.dependsOn(core, circe, iteratee)
.dependsOn(core, circe, iteratee, monix)

lazy val benchmarks = project
.settings(moduleName := "finchx-benchmarks")
Expand Down
16 changes: 16 additions & 0 deletions circe/src/main/scala/io/finch/circe/Decoders.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.circe.iteratee
import io.circe.jawn._
import io.finch.{Application, Decode, DecodeStream}
import io.finch.internal.HttpContent
import io.finch.monix.ObservableF
import io.iteratee.Enumerator
import java.nio.charset.StandardCharsets

Expand Down Expand Up @@ -53,4 +54,19 @@ trait Decoders {
}
parsed.through(fs2.decoder[F, A])
})

implicit def monixCirce[F[_], A: Decoder]: DecodeStream.Json[ObservableF, F, A] =
DecodeStream.instance[ObservableF, F, A, Application.Json]((stream, cs) => {
val parsed = cs match {
case StandardCharsets.UTF_8 =>
stream
.map(_.asByteArray)
.liftByOperator(monix.circe.byteStreamParser)
case _ =>
stream
.map(_.asString(cs))
.liftByOperator(monix.circe.stringStreamParser)
}
monix.circe.decoder(parsed)
})
}
84 changes: 84 additions & 0 deletions examples/src/main/scala/io/finch/monix/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.finch.monix

import _root_.monix.eval.{Task, TaskApp}
import _root_.monix.execution.Scheduler
import _root_.monix.reactive.Observable
import cats.effect.{ExitCode, Resource}
import cats.implicits._
import com.twitter.finagle.{Http, ListeningServer}
import com.twitter.util.Future
import io.circe.generic.auto._
import io.finch._
import io.finch.circe._
import scala.util.Random

/**
* A Finch application featuring Monix Observable-based streaming support.
* This approach is more advanced and performant then basic [[com.twitter.concurrent.AsyncStream]]
*
* There are three endpoints in this example:
*
* 1. `sumJson` - streaming request
* 2. `streamJson` - streaming response
* 3. `isPrime` - end-to-end (request - response) streaming
*
* Use the following sbt command to run the application.
*
* {{{
* $ sbt 'examples/runMain io.finch.monix.Main'
* }}}
*
* Use the following HTTPie/curl commands to test endpoints.
*
* {{{
* $ curl -X POST --header "Transfer-Encoding: chunked" -d '{"i": 40} {"i": 2}' localhost:8081/sumJson
*
* $ http --stream GET :8081/streamJson
*
* $ curl -X POST --header "Transfer-Encoding: chunked" -d '{"i": 40} {"i": 42}' localhost:8081/streamPrime
* }}}
*/
object Main extends TaskApp with EndpointModule[Task] {

override implicit def scheduler: Scheduler = super.scheduler

final case class Result(result: Int) {
def add(n: Number): Result = copy(result = result + n.i)
}

final case class Number(i: Int) {
def isPrime: IsPrime = IsPrime(!(2 :: (3 to Math.sqrt(i.toDouble).toInt by 2).toList exists (i % _ == 0)))
}

final case class IsPrime(isPrime: Boolean)

private def stream: Stream[Int] = Stream.continually(Random.nextInt())

val sumJson: Endpoint[Task, Result] = post("sumJson" :: jsonBodyStream[ObservableF, Number]) {
o: Observable[Number] =>
o.foldLeftL(Result(0))(_ add _).map(Ok)
}

val streamJson: Endpoint[Task, ObservableF[Task, Number]] = get("streamJson") {
Ok(Observable.fromIterable(stream).map(Number.apply))
}

val isPrime: Endpoint[Task, ObservableF[Task, IsPrime]] =
post("streamPrime" :: jsonBodyStream[ObservableF, Number]) { o: Observable[Number] =>
Ok(o.map(_.isPrime))
}

def serve: Task[ListeningServer] = Task(
Http.server
.withStreaming(enabled = true)
.serve(":8081", (sumJson :+: streamJson :+: isPrime).toServiceAs[Application.Json])
)

def run(args: List[String]): Task[ExitCode] = {
val server = Resource.make(serve)(s =>
Task.suspend(implicitly[ToAsync[Future, Task]].apply(s.close()))
)

server.use(_ => Task.never).as(ExitCode.Success)
}
}
154 changes: 154 additions & 0 deletions monix/src/main/scala/io/finch/monix/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package io.finch

import java.nio.charset.Charset

import _root_.monix.eval.TaskLift
import _root_.monix.reactive.Observable
import cats.effect._
import com.twitter.io.{Buf, Pipe, Reader}
import com.twitter.util.Future
import io.finch.internal.newLine
import io.finch.monix.ObservableF

package object monix extends ObservableConcurrentEffectInstances {

type ObservableF[F[_], A] = Observable[A]

implicit def aliasResponseToRealResponse[F[_], A, CT <: Application.Json](implicit
tr: ToResponse.Aux[F, ObservableF[F, A], CT]
): ToResponse.Aux[F, Observable[A], CT] = tr

implicit def observableLiftReader[F[_]](implicit
F: Effect[F],
TA: ToAsync[Future, F]
): LiftReader[ObservableF, F] =
new LiftReader[ObservableF, F] {
final def apply[A](reader: Reader[Buf], process: Buf => A): ObservableF[F, A] = {
Observable
.repeatEvalF(F.suspend(TA(reader.read())))
.takeWhile(_.isDefined)
.collect { case Some(buf) => process(buf) }
.guaranteeF(F.delay(reader.discard()))
}
}

implicit def encodeBufConcurrentEffectObservable[F[_] : ConcurrentEffect : TaskLift, CT <: String]: EncodeStream.Aux[F, ObservableF, Buf, CT] =
new EncodeConcurrentEffectObservable[F, Buf, CT] {
protected def encodeChunk(chunk: Buf, cs: Charset): Buf = chunk
}
}

trait ObservableConcurrentEffectInstances extends ObservableEffectInstances {

implicit def encodeJsonConcurrentObservable[F[_] : ConcurrentEffect : TaskLift, A](implicit
A: Encode.Json[A]
): EncodeStream.Json[F, ObservableF, A] =
new EncodeNewLineDelimitedConcurrentEffectObservable[F, A, Application.Json]

implicit def encodeSseConcurrentEffectObservable[F[_] : ConcurrentEffect : TaskLift, A](implicit
A: Encode.Aux[A, Text.EventStream]
): EncodeStream.Aux[F, ObservableF, A, Text.EventStream] =
new EncodeNewLineDelimitedConcurrentEffectObservable[F, A, Text.EventStream]

implicit def encodeTextConcurrentEffectObservable[F[_] : ConcurrentEffect : TaskLift, A](implicit
A: Encode.Text[A]
): EncodeStream.Text[F, ObservableF, A] =
new EncodeConcurrentEffectObservable[F, A, Text.Plain] {
override protected def encodeChunk(chunk: A, cs: Charset): Buf =
A(chunk, cs)
}

implicit def encodeBufEffectObservable[F[_] : Effect : TaskLift, CT <: String]: EncodeStream.Aux[F, ObservableF, Buf, CT] =
new EncodeEffectObservable[F, Buf, CT] {
protected def encodeChunk(chunk: Buf, cs: Charset): Buf = chunk
}
}

trait ObservableEffectInstances extends ObservableInstances {

implicit def encodeJsonEffectObservable[F[_] : Effect : TaskLift, A](implicit
A: Encode.Json[A]
): EncodeStream.Json[F, ObservableF, A] =
new EncodeNewLineDelimitedEffectObservable[F, A, Application.Json]

implicit def encodeSseEffectObservable[F[_] : Effect : TaskLift, A](implicit
A: Encode.Aux[A, Text.EventStream]
): EncodeStream.Aux[F, ObservableF, A, Text.EventStream] =
new EncodeNewLineDelimitedEffectObservable[F, A, Text.EventStream]

implicit def encodeTextEffectObservable[F[_] : Effect : TaskLift, A](implicit
A: Encode.Text[A]
): EncodeStream.Text[F, ObservableF, A] =
new EncodeEffectObservable[F, A, Text.Plain] {
override protected def encodeChunk(chunk: A, cs: Charset): Buf =
A(chunk, cs)
}
}

trait ObservableInstances {

protected final class EncodeNewLineDelimitedConcurrentEffectObservable[F[_] : ConcurrentEffect : TaskLift, A, CT <: String](implicit
A: Encode.Aux[A, CT]
) extends EncodeConcurrentEffectObservable[F, A, CT] {
protected def encodeChunk(chunk: A, cs: Charset): Buf =
A(chunk, cs).concat(newLine(cs))
}

protected final class EncodeNewLineDelimitedEffectObservable[F[_] : Effect : TaskLift, A, CT <: String](implicit
A: Encode.Aux[A, CT]
) extends EncodeEffectObservable[F, A, CT] {
protected def encodeChunk(chunk: A, cs: Charset): Buf =
A(chunk, cs).concat(newLine(cs))
}

protected abstract class EncodeConcurrentEffectObservable[F[_] : TaskLift, A, CT <: String](implicit
F: ConcurrentEffect[F],
TA: ToAsync[Future, F]
) extends EncodeObservable[F, A, CT] {
protected def dispatch(reader: Reader[Buf], run: F[Unit]): F[Reader[Buf]] =
F.bracketCase(F.start(run))(_ => F.pure(reader)) {
case (f, ExitCase.Canceled) => f.cancel
case _ => F.unit
}
}

protected abstract class EncodeEffectObservable[F[_]: TaskLift, A, CT <: String](implicit
F: Effect[F],
TA: ToAsync[Future, F]
) extends EncodeObservable[F, A, CT] with (Either[Throwable, Unit] => IO[Unit]) {

def apply(cb: Either[Throwable, Unit]): IO[Unit] = IO.unit

protected def dispatch(reader: Reader[Buf], run: F[Unit]): F[Reader[Buf]] =
F.productR(F.runAsync(run)(this).to[F])(F.pure(reader))
}

protected abstract class EncodeObservable[F[_]: TaskLift, A, CT <: String](implicit
F: Effect[F],
TA: ToAsync[Future, F]
) extends EncodeStream[F, ObservableF, A] {

type ContentType = CT

protected def encodeChunk(chunk: A, cs: Charset): Buf

protected def dispatch(reader: Reader[Buf], run: F[Unit]): F[Reader[Buf]]

override def apply(s: ObservableF[F, A], cs: Charset): F[Reader[Buf]] =
F.suspend {
val p = new Pipe[Buf]

val run = s
.map(chunk => encodeChunk(chunk, cs))
.mapEvalF(chunk => TA(p.write(chunk)))
.guaranteeF(F.suspend(TA(p.close())))
.completedL
.to[F]

dispatch(p, run)

}
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.finch.monix

import cats.effect.{ConcurrentEffect, Effect, IO}
import com.twitter.io.Buf
import io.finch.{FinchSpec, StreamingLaws}
import monix.eval.TaskLift
import monix.execution.Scheduler
import monix.reactive.Observable
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks

class MonixObservableStreamingSpec extends FinchSpec with ScalaCheckDrivenPropertyChecks {

implicit val s = Scheduler.global

checkEffect[IO]
checkConcurrentEffect[IO]

def checkEffect[F[_]: TaskLift](implicit F: Effect[F]): Unit =
checkAll("monixObservable.streamBody[F[_]: Effect]", StreamingLaws[ObservableF, F](
list => Observable(list:_*),
stream => F.toIO(stream.map(array => Buf.ByteArray.Owned(array)).toListL.to[F]).unsafeRunSync()
).all)

def checkConcurrentEffect[F[_]: TaskLift](implicit F: ConcurrentEffect[F]): Unit =
checkAll("monixObservable.streamBody[F[_]: ConcurrentEffect]", StreamingLaws[ObservableF, F](
list => Observable(list:_*),
stream => F.toIO(stream.map(array => Buf.ByteArray.Owned(array)).toListL.to[F]).unsafeRunSync()
).all)
}