Skip to content

Commit d3c6900

Browse files
authored
Merge pull request #2845 from bplommer/timeout
Add methods to timeout on pulls
2 parents fb78701 + b38f1a7 commit d3c6900

File tree

1 file changed

+74
-0
lines changed

1 file changed

+74
-0
lines changed

core/shared/src/main/scala/fs2/Stream.scala

+74
Original file line numberDiff line numberDiff line change
@@ -2856,6 +2856,55 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
28562856
.widen[Either[Throwable, Unit]]
28572857
)
28582858

2859+
/** Fails this stream with a `TimeoutException` if it does not emit a new chunk within the
2860+
* given `timeout` after it is requested.
2861+
*/
2862+
def timeoutOnPull[F2[x] >: F[x]: Temporal](timeout: FiniteDuration): Stream[F2, O] =
2863+
timeoutOnPullTo(
2864+
timeout,
2865+
Stream.raiseError[F2](new TimeoutException(s"Timed out waiting for pull after $timeout"))
2866+
)
2867+
2868+
/** Stops pulling from this stream if it does not emit a new chunk within the
2869+
* given `timeout` after it is requested, and starts pulling from the `onTimeout` stream instead.
2870+
*
2871+
* @example {{{
2872+
* scala> import cats.effect.IO
2873+
* scala> import cats.effect.unsafe.implicits.global
2874+
* scala> import scala.concurrent.duration._
2875+
* scala> val s = Stream(1) ++ Stream.sleep_[IO](100.millis) ++ Stream(2).repeat.meteredStartImmediately[IO](200.millis)
2876+
* scala> s.timeoutOnPullTo(150.millis, Stream(3)).compile.toVector.unsafeRunSync()
2877+
* res0: Vector[Int] = Vector(1, 2, 3)
2878+
* }}}
2879+
*/
2880+
def timeoutOnPullTo[F2[x] >: F[x]: Temporal, O2 >: O](
2881+
timeout: FiniteDuration,
2882+
onTimeout: => Stream[F2, O2]
2883+
): Stream[F2, O2] =
2884+
timeoutOnPullWith[F2, O2](timeout)(_ => onTimeout)
2885+
2886+
/** Applies the pipe `f` if this stream does not emit a new chunk within the given `timeout` after
2887+
* it is requested.
2888+
*
2889+
* @example {{{
2890+
* scala> import cats.effect._
2891+
* scala> import cats.effect.unsafe.implicits.global
2892+
* scala> import scala.concurrent.duration._
2893+
* scala> val s = Stream(1) ++ Stream.sleep_[IO](100.millis) ++ Stream(2).repeat.meteredStartImmediately[IO](200.millis)
2894+
* scala> Ref[IO].of(0).flatTap { lateCount =>
2895+
* | s.take(4).timeoutOnPullWith(150.millis)(Stream.exec(lateCount.update(_ + 1)) ++ _).compile.drain
2896+
* | }.flatMap(_.get).unsafeRunSync()
2897+
* res0: Int = 2
2898+
* }}}
2899+
*/
2900+
def timeoutOnPullWith[F2[x] >: F[x]: Temporal, O2 >: O](timeout: FiniteDuration)(
2901+
f: Pipe[F2, O2, O2]
2902+
): Stream[F2, O2] = this
2903+
.covaryAll[F2, O2]
2904+
.pull
2905+
.timeoutWith(timeout)(_.stream.through(f).pull.echo)
2906+
.stream
2907+
28592908
/** Creates a [[Publisher]] from this [[Stream]].
28602909
*
28612910
* The stream is only ran when elements are requested.
@@ -4955,6 +5004,31 @@ object Stream extends StreamLowPriority {
49555004

49565005
pull(toTimedPull(output))
49575006
}
5007+
5008+
/** Transforms this pull with the function `f` whenever an element is not emitted within
5009+
* the duration `t`.
5010+
* @example {{{
5011+
* scala> import cats.effect.IO
5012+
* scala> import cats.effect.unsafe.implicits.global
5013+
* scala> import scala.concurrent.duration._
5014+
* scala> val s = (Stream("elem") ++ Stream.sleep_[IO](600.millis)).repeat.take(3)
5015+
* scala> s.pull.timeoutWith(450.millis)(Pull.output1("late!") >> _).stream.compile.toVector.unsafeRunSync()
5016+
* res0: Vector[String] = Vector(elem, late!, elem, late!, elem)
5017+
* }}}
5018+
*/
5019+
def timeoutWith[O2 >: O](t: FiniteDuration)(f: Pull[F, O2, Unit] => Pull[F, O2, Unit])(implicit
5020+
F: Temporal[F]
5021+
): Pull[F, O2, Unit] =
5022+
timed { timedPull =>
5023+
def go(timedPull: Pull.Timed[F, O]): Pull[F, O2, Unit] =
5024+
timedPull.timeout(t) >>
5025+
timedPull.uncons.flatMap {
5026+
case Some((Right(elems), next)) => Pull.output(elems) >> go(next)
5027+
case Some((Left(_), next)) => f(go(next))
5028+
case None => Pull.done
5029+
}
5030+
go(timedPull)
5031+
}
49585032
}
49595033

49605034
/** Projection of a `Stream` providing various ways to compile a `Stream[F,O]` to a `G[...]`. */

0 commit comments

Comments
 (0)