A simple functional reactive library for ScalaJS. Colibri is an implementation of the Observable
, Observer
and Subject
reactive concepts.
If you're new to these concepts, here is a nice introduction from rx.js: https://rxjs.dev/guide/overview. Another good resource are these visualizations for common reactive operators: https://rxmarbles.com/.
This library includes:
- A (minimal and performant) reactive library based on JavaScript native operations like
setTimeout
,setInterval
,setImmediate
,queueMicrotask
- Typeclasses to integrate with other streaming libraries
Reactive core library with typeclasses:
libraryDependencies += "com.github.cornerman" %%% "colibri" % "0.8.3"
import colibri._
Reactive variables with lazy, distinct, shared state variables (a bit like scala-rx, but lazy):
libraryDependencies += "com.github.cornerman" %%% "colibri-reactive" % "0.8.3"
import colibri.reactive._
For jsdom-based operations in the browser (EventObservable
, Storage
):
libraryDependencies += "com.github.cornerman" %%% "colibri-jsdom" % "0.8.3"
import colibri.jsdom._
For scala.rx support (only Scala 2.x):
libraryDependencies += "com.github.cornerman" %%% "colibri-rx" % "0.8.3"
import colibri.ext.rx._
For airstream support:
libraryDependencies += "com.github.cornerman" %%% "colibri-airstream" % "0.8.3"
import colibri.ext.airstream._
For zio support:
libraryDependencies += "com.github.cornerman" %%% "colibri-zio" % "0.8.3"
import colibri.ext.zio._
For fs2 support (Source
only):
libraryDependencies += "com.github.cornerman" %%% "colibri-fs2" % "0.8.3"
import colibri.ext.fs2._
The implementation follows the reactive design:
- An observable is a stream to which you can subscribe with an Observer.
- An observer is basically a callback which can receive a value or an error from an Observable.
- A Subject is both an observable and an observer, receiving values and errors from the outside and distributing them to all subscribing observers.
Observables in colibri are lazy, that means nothing starts until you call unsafeSubscribe
on an Observable
(or any unsafe*
method).
We integrate with effect types by means of typeclasses (see below). It provides support for cats.effect.IO
, cats.effect.SyncIO
, cats.Eval
, cats.effect.Resource
(out of the box) as well as zio.Task
(with colibri-zio
).
Example Observables:
import colibri._
import scala.concurrent.duration._
import cats.effect.IO
val observable = Observable
.interval(1.second)
.mapEffect[IO](i => myCount(i))
.distinctOnEquals
.tapEffect[IO](c => myLog(c))
.mapResource(x => myResource(x))
.switchMap(x => myObservable(x))
.debounceMillis(1000)
val observer = Observer.foreach[Int](println(_))
val subscription: Cancelable = observable.unsafeSubscribe(observer)
val subscriptionIO: IO[Cancelable] = observable.subscribeF[IO](observer)
Example Subjects:
import colibri._
val subject = Subject.publish[Int]() // or Subject.behavior(seed) or Subject.replayLast or Subject.replayAll
val subscription: Cancelable = subject.unsafeForeach(println(_))
subject.unsafeOnNext(1)
val myEffect: IO[Unit] = subject.onNextF[IO](2)
Every subscription that is created inside of colibri methods is returned to the user. For example unsafeSubscribe
or subscribeF
returns a Cancelable
. That means, the caller is responsible to cleanup the subscription by calling Cancelable#unsafeCancel()
or Cancelable#cancelF
.
If you are working with Outwatch
, you can just use Observable
without ever subscribing yourself. Then all memory management is handled for you automatically. No memory leaks.
We have prepared typeclasses for integrating with other streaming libaries. The most important ones are Sink
and Source
. Source
is a typeclass for Observables, Sink
is a typeclass for Observers:
Sink[G[_]]
can send values and errors intoG
has anonNext
andonError
method.Source[H[_]]
can unsafely subscribe toH
with aSink
(returns a cancelable subscription)CanCancel[T]
can cancelT
to stop a subscriptionLiftSink[G[_]]
can lift aSink
into typeG
LiftSource[H[_]]
can lift aSource
into typeH
SubscriptionOwner[T]
can let typeT
own a subscription
In order to work with effects inside our Observable, we have defined the following two typeclasses similar to Effect
in cats-effect 2:
RunEffect[F[_]]
can unsafely run an effectF[_]
asynchronously, potentially starting synchronously until reaching an async boundary.RunSyncEffect[F[_]]
can unsafely run an effectF[_]
synchronously.
You can convert any Source
into an Observable
with Observable.lift(source)
. The same for Sink
and Observer
with Observer.lift(sink)
.
The module colibri-reactive
exposes reactive variables. This is lazy, distinct shared state variables (internally using observables) that always have a value. These reactive variables are meant for managing state - opposed to managing events which is a perfect fit for lazy Observable
in the core colibri
library.
This module behaves similar to scala-rx - though variables are not hot and it is built on top of colibri Observables for seamless integration and powerful operators.
The whole thing is not entirely glitch-free, as invalid state can appear in operators like map or foreach. But you always have a consistent state in now()
and it reduces the number of intermediate triggers or glitches. You can become completely glitch-free by converting back to observable and using dropSyncGlitches
which will introduce an async boundary (micro-task).
A state variable is of type Var[A] extends Rx[A] with RxWriter[A]
.
The laziness of variables means that the current value is only tracked if anyone subscribes to the Rx[A]
. So an Rx does not compute anything on its own. You can still always call now()
on it - if it is currently not subscribed, it will lazily calculate the current value.
Example:
import colibri.reactive._
val variable = Var(1)
val variable2 = Var("Test")
val rx = Rx {
s"${variable()} - ${variable2()}"
}
val cancelable = rx.unsafeForeach(println(_))
println(variable.now()) // 1
println(variable2.now()) // "Test"
println(rx.now()) // "1 - Test"
variable.set(2) // println("2 - Test")
println(variable.now()) // 2
println(variable2.now()) // "Test"
println(rx.now()) // "2 - Test"
variable2.set("Foo") // println("2 - Foo")
println(variable.now()) // 2
println(variable2.now()) // "Foo"
println(rx.now()) // "2 - Foo"
cancelable.unsafeCancel()
println(variable.now()) // 2
println(variable2.now()) // "Foo"
println(rx.now()) // "2 - Foo"
variable.set(3) // no println
// now calculates new value lazily
println(variable.now()) // 3
println(variable2.now()) // "Foo"
println(rx.now()) // "3 - Foo"
Apart from Rx
which always has an initial value, there is RxLater
(and VarLater
) which will eventually have a value (both extend RxState which extends RxSource). It also meant for representing state just without an initial state. It is lazy, distinct and has shared execution just like Rx
.
import colibri.reactive._
val variable = VarLater[Int]()
val stream1 = RxLater.empty
val stream2 = RxLater.future(Future.successful(1)).map(_ + 1)
val cancelable = variable.unsafeForeach(println(_))
val cancelable1 = stream1.unsafeForeach(println(_))
val cancelable2 = stream2.unsafeForeach(println(_))
println(variable.toRx.now()) // None
println(stream1.toRx.now()) // None
println(stream2.toRx.now()) // Some(2)
variable.set(13)
println(variable.toRx.now()) // Some(13)
There also exist RxEvent
and VarEvent
, which are event observables with shared execution. That is they behave like Rx
and Var
such that transformations are only applied once and not per subscription. But RxEvent
and VarEvent
are not distinct and have no current value. They should be used for event streams.
import colibri.reactive._
val variable = VarEvent[Int]()
val stream = RxEvent.empty
val mapped = RxEvent.merge(variable.tap(println(_)).map(_ + 1), stream)
val cancelable = mapped.unsafeForeach(println(_))
Outwatch works perfectly with Rx (or RxLater, RxEvent which all extend RxSource) - just like Observable.
import outwatch._
import outwatch.dsl._
import colibri.reactive._
import cats.effect.SyncIO
val component: VModifier = {
val variable = Var(1)
val mapped = rx.map(_ + 1)
val rx = Rx {
"Hallo: ${mapped()}"
}
div(rx)
}
The same principles as for Observables hold. Any cancelable that is returned from the API needs to be handled by the the caller. Best practice: use subscribe/foreach as seldomly as possible - only in selected spots or within a library.
If you are working with Outwatch
, you can just use Rx
without ever subscribing yourself. Then all memory management is handled for you automatically. No memory leaks.
Throughout the library, the type parameters for the Sink
and Source
typeclasses are named consistenly to avoid naming ambiguity when working with F[_]
in the same context:
F[_] : RunEffect
G[_] : Sink
H[_] : Source
Source Code: Source.scala, Sink.scala, RunEffect.scala
In general, we take a middle ground with pure functional programming. We focus on performance and ease of use. Internally, the code is mutable for performance reasons. Externally, we try to expose a typesafe, immutable, and mostly pure interface to the user. There are some impure methods for example for subscribing observables - thereby potentially executing side effects. These impure methods are named unsafe*
. And there are normally pure alias methods returning an effect type for public use. The unsafe methods exist so they can be used internally - we try to keep extra allocations to a minimum there.
Types like Observable
are conceptionally very similar to IO
- they can just return more than zero or one value. They are also lazy, and operations like map/flatMap/filter/... do not actually do anything. It is only after you unsafely run or subscribe an Observable that it actually starts evaluating.