diff --git a/build.sbt b/build.sbt index df3c905456..f5ce144209 100644 --- a/build.sbt +++ b/build.sbt @@ -1104,7 +1104,10 @@ lazy val std = crossProject(JSPlatform, JVMPlatform, NativePlatform) ProblemFilters.exclude[FinalMethodProblem]( "cats.effect.std.Dispatcher#RegState#Unstarted.toString"), ProblemFilters.exclude[DirectMissingMethodProblem]( - "cats.effect.std.Dispatcher#Registration#Primary.*") + "cats.effect.std.Dispatcher#Registration#Primary.*"), + // #4500, private class: + ProblemFilters.exclude[ReversedMissingMethodProblem]( + "cats.effect.std.Supervisor#State.numberOfFibers") ) ) .jsSettings( diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index b47a583673..2302f11c2d 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -141,7 +141,7 @@ object Supervisor { def apply[F[_]: Concurrent]: Resource[F, Supervisor[F]] = apply[F](false) - private sealed abstract class State[F[_]] { + private[std] sealed abstract class State[F[_]] { def remove(token: Unique.Token): F[Unit] @@ -150,6 +150,8 @@ object Supervisor { */ def add(token: Unique.Token, fiber: Fiber[F, Throwable, ?]): F[Boolean] + private[std] def numberOfFibers: F[Int] // for testing + // these are allowed to destroy the state, since they're only called during closing: val joinAll: F[Unit] val cancelAll: F[Unit] @@ -169,152 +171,161 @@ object Supervisor { case (st, _) => doneR.set(true) *> st.cancelAll } - } yield new Supervisor[F] { - - def supervise[A](fa: F[A]): F[Fiber[F, Throwable, A]] = - F.uncancelable { _ => - val monitor: (F[A], F[Unit]) => F[Fiber[F, Throwable, A]] = checkRestart match { - case Some(restart) => { (fa, fin) => - F.deferred[Outcome[F, Throwable, A]] flatMap { resultR => - F.ref(false) flatMap { canceledR => - F.deferred[Fiber[F, Throwable, A]].flatMap { firstCurrent => - // `currentR` holds (a `Deferred` to) the current - // incarnation of the fiber executing `fa`: - F.ref(firstCurrent).flatMap { currentR => - def action(current: Deferred[F, Fiber[F, Throwable, A]]): F[Unit] = { - F uncancelable { _ => - val started = F start { - fa guaranteeCase { oc => - F.deferred[Fiber[F, Throwable, A]].flatMap { newCurrent => - // we're replacing the `Deferred` holding - // the current fiber with a new one before - // the current fiber finishes, and even - // before we check for the cancel signal; - // this guarantees, that the fiber reachable - // through `currentR` is the last one (or - // null, see below): - currentR.set(newCurrent) *> { - canceledR.get flatMap { canceled => - doneR.get flatMap { done => - if (!canceled && !done && restart(oc)) { - action(newCurrent) - } else { - // we must complete `newCurrent`, - // because `cancel` below may wait - // for it; we signal that it is not - // restarted with `null`: - newCurrent.complete(null) *> fin.guarantee( - resultR.complete(oc).void) - } + } yield new SupervisorImpl[F](checkRestart, doneR, state) + } + + private[std] final class SupervisorImpl[F[_]]( + checkRestart: Option[Outcome[F, Throwable, ?] => Boolean], + doneR: Ref[F, Boolean], + private[std] val state: State[F] + )(implicit F: Concurrent[F]) + extends Supervisor[F] { + + def supervise[A](fa: F[A]): F[Fiber[F, Throwable, A]] = + F.uncancelable { _ => + val monitor: (F[A], F[Unit]) => F[Fiber[F, Throwable, A]] = checkRestart match { + case Some(restart) => { (fa, fin) => + F.deferred[Outcome[F, Throwable, A]] flatMap { resultR => + F.ref(false) flatMap { canceledR => + F.deferred[Fiber[F, Throwable, A]].flatMap { firstCurrent => + // `currentR` holds (a `Deferred` to) the current + // incarnation of the fiber executing `fa`: + F.ref(firstCurrent).flatMap { currentR => + def action(current: Deferred[F, Fiber[F, Throwable, A]]): F[Unit] = { + F uncancelable { _ => + val started = F start { + fa guaranteeCase { oc => + F.deferred[Fiber[F, Throwable, A]].flatMap { newCurrent => + // we're replacing the `Deferred` holding + // the current fiber with a new one before + // the current fiber finishes, and even + // before we check for the cancel signal; + // this guarantees, that the fiber reachable + // through `currentR` is the last one (or + // null, see below): + currentR.set(newCurrent) *> { + canceledR.get flatMap { canceled => + doneR.get flatMap { done => + if (!canceled && !done && restart(oc)) { + action(newCurrent) + } else { + // we must complete `newCurrent`, + // because `cancel` below may wait + // for it; we signal that it is not + // restarted with `null`: + newCurrent.complete(null) *> fin.guarantee( + resultR.complete(oc).void) } } } } } } - - started flatMap { f => current.complete(f).void } } + + started flatMap { f => current.complete(f).void } } + } - action(firstCurrent).as( - new Fiber[F, Throwable, A] { - - private[this] val delegateF = currentR.get.flatMap(_.get) - - val cancel: F[Unit] = F uncancelable { _ => - // after setting `canceledR`, at - // most one restart happens, and - // the fiber we get through `delegateF` - // is the final one: - canceledR.set(true) *> delegateF flatMap { - case null => - // ok, task wasn't restarted, but we - // wait for the result to be completed - // (and the finalizer to run): - resultR.get.void - case fiber => - fiber.cancel *> fiber.join flatMap { - case Outcome.Canceled() => - // cancel successful (or self-canceled), - // but we don't know if the `guaranteeCase` - // above ran so we need to double check: - delegateF.flatMap { - case null => - // ok, the `guaranteeCase` - // certainly executed/ing: - resultR.get.void - case fiber2 => - // we cancelled the fiber before it did - // anything, so the finalizer didn't run, - // we need to do it now: - val cleanup = fin.guarantee( - resultR.complete(Outcome.Canceled()).void - ) - if (fiber2 eq fiber) { - cleanup - } else { - // this should never happen - cleanup *> F.raiseError(new AssertionError( - "unexpected fiber (this is a bug in Supervisor)")) - } - } - case _ => - // finished in error/success, - // the outcome will certainly - // be completed: - resultR.get.void - } - } + action(firstCurrent).as( + new Fiber[F, Throwable, A] { + + private[this] val delegateF = currentR.get.flatMap(_.get) + + val cancel: F[Unit] = F uncancelable { _ => + // after setting `canceledR`, at + // most one restart happens, and + // the fiber we get through `delegateF` + // is the final one: + canceledR.set(true) *> delegateF flatMap { + case null => + // ok, task wasn't restarted, but we + // wait for the result to be completed + // (and the finalizer to run): + resultR.get.void + case fiber => + fiber.cancel *> fiber.join flatMap { + case Outcome.Canceled() => + // cancel successful (or self-canceled), + // but we don't know if the `guaranteeCase` + // above ran so we need to double check: + delegateF.flatMap { + case null => + // ok, the `guaranteeCase` + // certainly executed/ing: + resultR.get.void + case fiber2 => + // we cancelled the fiber before it did + // anything, so the finalizer didn't run, + // we need to do it now: + val cleanup = fin.guarantee( + resultR.complete(Outcome.Canceled()).void + ) + if (fiber2 eq fiber) { + cleanup + } else { + // this should never happen + cleanup *> F.raiseError(new AssertionError( + "unexpected fiber (this is a bug in Supervisor)")) + } + } + case _ => + // finished in error/success, + // the outcome will certainly + // be completed: + resultR.get.void + } } - - def join = resultR.get } - ) - } + + def join = resultR.get + } + ) } } } } + } - case None => (fa, fin) => F.start(fa.guarantee(fin)) + case None => { (fa, fin) => + F.start(fa).flatMap { fib => F.start(fib.join.guarantee(fin)).as(fib) } } + } - for { - done <- F.ref(false) - insertResult <- F.deferred[Boolean] - token <- F.unique - cleanup = state.remove(token) - fiber <- monitor( - // if the supervisor have been (or is now) - // shutting down, inserting into state will - // fail; so we need to wait for the positive result - // of inserting, before actually doing the task: - insertResult - .get - .ifM( - fa, - F.canceled *> F.raiseError[A](new AssertionError( - "supervised fiber couldn't cancel (this is a bug in Supervisor)")) - ), - done.set(true) *> cleanup - ) - insertOk <- state.add(token, fiber) - _ <- insertResult.complete(insertOk) - // `cleanup` could run BEFORE the `state.add` - // (if `fa` is very fast), in which case it doesn't - // remove the fiber from the state, so we re-check: - _ <- done.get.ifM(cleanup, F.unit) - _ <- { - if (!insertOk) { - F.raiseError(new IllegalStateException("supervisor already shutdown")) - } else { - F.unit - } + for { + done <- F.ref(false) + insertResult <- F.deferred[Boolean] + token <- F.unique + cleanup = state.remove(token) + fiber <- monitor( + // if the supervisor have been (or is now) + // shutting down, inserting into state will + // fail; so we need to wait for the positive result + // of inserting, before actually doing the task: + insertResult + .get + .ifM( + fa, + F.canceled *> F.raiseError[A](new AssertionError( + "supervised fiber couldn't cancel (this is a bug in Supervisor)")) + ), + done.set(true) *> cleanup + ) + insertOk <- state.add(token, fiber) + _ <- insertResult.complete(insertOk) + // `cleanup` could run BEFORE the `state.add` + // (if `fa` is very fast), in which case it doesn't + // remove the fiber from the state, so we re-check: + _ <- done.get.ifM(cleanup, F.unit) + _ <- { + if (!insertOk) { + F.raiseError(new IllegalStateException("supervisor already shutdown")) + } else { + F.unit } - } yield fiber - } - } + } + } yield fiber + } } private[effect] def applyForConcurrent[F[_]]( @@ -335,6 +346,9 @@ object Supervisor { case map => (map.updated(token, fiber), true) } + private[std] final override def numberOfFibers: F[Int] = // for testing + stateRef.get.map(_.size) + private[this] val allFibers: F[List[Fiber[F, Throwable, ?]]] = { // we're closing, so we won't need the state any more, // so we're using `null` as a sentinel to reject later @@ -371,6 +385,9 @@ object Supervisor { F.delay(state.put(token, fiber)) *> doneR.get.map(!_) } + private[std] final override def numberOfFibers: F[Int] = // for testing + F.delay { state.size() } + private[this] val allFibers: F[List[Fiber[F, Throwable, ?]]] = F delay { val fibers = ListBuffer.empty[Fiber[F, Throwable, ?]] diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index 0065f87b33..db6c0fa609 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -294,5 +294,34 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { tsk.parReplicateA_(if (isJVM) 1000 else 1).as(ok) } + + def superviseCancelRace(mkSupervisor: Resource[IO, Supervisor[IO]]) = { + val N = if (isJVM) 1000 else 5 + val M = if (isJVM) 20 else 2 + val tsk = mkSupervisor.use { supervisor => + supervisor + .supervise(IO.unit) + .flatMap(_.cancel) + .replicateA_(N) + .parReplicateA_(M) + .flatMap { _ => + // let's wait a bit (for cleanup to happen): + IO.sleep(0.2.second) *> { + val st = supervisor.asInstanceOf[Supervisor.SupervisorImpl[IO]].state + // the supervised fibers must've been cleaned up from the internal state: + st.numberOfFibers.flatMap { numFibs => IO(numFibs mustEqual 0) } + } + } + } + tsk.as(ok) + } + + "supervise / cancel race cleanup" in real { + superviseCancelRace(constructor(false, None)) + } + + "supervise / cancel race cleanup (with restart)" in real { + superviseCancelRace(constructor(false, Some(_ => true))) + } } }