From 7319db2397cf91a958d9661f37d1625f0977c9fe Mon Sep 17 00:00:00 2001 From: Morgen Peschke Date: Tue, 6 May 2025 11:41:20 -0700 Subject: [PATCH 1/5] Add a test Console --- .../cats/effect/testkit/TestConsole.scala | 350 +++++++++++++++++ .../cats/effect/testkit/TestConsoleSpec.scala | 355 ++++++++++++++++++ 2 files changed, 705 insertions(+) create mode 100644 testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala create mode 100644 tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala diff --git a/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala b/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala new file mode 100644 index 0000000000..3573c2cceb --- /dev/null +++ b/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala @@ -0,0 +1,350 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.testkit + +import cats.{Parallel, Show} +import cats.data.{Chain, NonEmptyChain} +import cats.effect.Concurrent +import cats.effect.kernel.{Deferred, Ref, Resource} +import cats.effect.std.{Console, Semaphore} +import cats.effect.testkit.TestConsole.{ConsoleClosedException, TestStdInState} +import cats.effect.testkit.TestConsole.TestStdInState._ +import cats.syntax.all._ + +import scala.annotation.tailrec +import scala.util.control.NoStackTrace + +import java.io.EOFException +import java.nio.charset.Charset + +/** + * Implement a test version of [[cats.effect.std.Console]] + */ +final class TestConsole[F[_]: Parallel]( + stdInSemaphore: Semaphore[F], + stdInStateRef: Ref[F, TestStdInState[F]], + stdOutRef: Ref[F, Chain[String]], + stdErrRef: Ref[F, Chain[String]], + logsRef: Ref[F, Chain[String]], + readIdRef: Ref[F, Int] +)(implicit F: Concurrent[F]) + extends Console[F] { + private val defaultCharset = Charset.defaultCharset() + private def streamClosed = new EOFException("End Of File") + private def log(msg: String): F[Unit] = logsRef.update(_.append(msg)) + + /** + * Write a string to the simulated stdIn + * + * Blocked calls to [[readLineWithCharset]] will be woken up if `str` contains one or more + * lines. + * + * @note + * Blocked calls will be woken in a first-in-first-out order. + */ + def write[A](value: A, charset: Charset = defaultCharset)(implicit S: Show[A]): F[Unit] = + log(show"Writing to stdin: $value") *> writeImpl(Chunk(value.show, charset)) + + /** + * Write a string and a newline to the simulated stdIn + * + * At least one blocked call to [[readLineWithCharset]] will be woken up, if it exists. + * + * @note + * Blocked calls will be woken in a first-in-first-out order. + */ + def writeln[A](value: A, charset: Charset = defaultCharset)(implicit S: Show[A]): F[Unit] = + log(show"Writing line to stdin: $value") *> writeImpl(Chunk(show"$value\n", charset)) + + private def writeImpl(chunk: Chunk): F[Unit] = + if (chunk.isEmpty) F.unit + else + stdInSemaphore.permit.use { _ => + stdInStateRef.get.flatMap { + case Closed() => F.raiseError(ConsoleClosedException()) + case Ready(lines, partial) => + val (newLines, newPartial) = partial.append(chunk) + stdInStateRef.set(Ready[F](lines.appendChain(newLines), newPartial)) + case Waiting(requests, buffer) => + val (lines, partial) = buffer.append(chunk) + if (lines.isEmpty) + stdInStateRef.set(Waiting[F](requests, partial)) + else { + def loop( + remainingLines: Chain[Line], + remainingRequests: Chain[Deferred[F, Either[Throwable, Array[Byte]]]]) + : F[TestStdInState[F]] = + (remainingLines.uncons, remainingRequests.uncons) match { + case (None, None) => + Waiting[F](Chain.empty, PartialLine.empty).pure[F].widen + case (None, Some(_)) => + Waiting[F](remainingRequests, partial).pure[F].widen + case (Some((nextLine, otherLines)), None) => + Ready[F](NonEmptyChain.fromChainPrepend(nextLine, otherLines), partial) + .pure[F] + .widen + case (Some((nextLine, otherLines)), Some((nextRequest, otherRequests))) => + nextRequest + .complete(nextLine.bytes.asRight) >> loop(otherLines, otherRequests) + } + + loop(lines, requests).flatMap(stdInStateRef.set) + } + } + } + + override def readLineWithCharset(charset: Charset): F[String] = + readIdRef.getAndUpdate(_ + 1).flatMap { readId => + stdInSemaphore + .permit + .use { _ => + log(s"Reading stdIn [id: $readId]") *> + stdInStateRef.get.flatMap { + case Closed() => + F.raiseError[Deferred[F, Either[Throwable, Array[Byte]]]](streamClosed) + case Ready(lines, partial) => + val newState = + NonEmptyChain + .fromChain(lines.tail) + .fold[TestStdInState[F]](Waiting(Chain.empty, PartialLine.empty))( + Ready(_, partial)) + + stdInStateRef.set(newState) *> + Deferred[F, Either[Throwable, Array[Byte]]].flatTap( + _.complete(lines.head.bytes.asRight)) + case Waiting(requests, buffer) => + Deferred[F, Either[Throwable, Array[Byte]]].flatTap(d => + stdInStateRef.set(Waiting(requests.append(d), buffer))) + } + } + .flatMap(_.get) + .flatMap(_.traverse(bytes => Concurrent[F].catchNonFatal(new String(bytes, charset)))) + .flatTap { + case Left(ex) => log(s"Read from stdin failed [id: $readId]: $ex") + case Right(line) => log(s"Read from stdin [id: $readId]: $line") + } + .rethrow + } + + override def print[A](a: A)(implicit S: Show[A]): F[Unit] = + log(show"print($a)") *> stdOutRef.update(_.append(a.show)) + + override def println[A](a: A)(implicit S: Show[A]): F[Unit] = + log(show"println($a)") *> stdOutRef.update(_.append(a.show).append("\n")) + + override def error[A](a: A)(implicit S: Show[A]): F[Unit] = + log(show"error($a)") *> stdErrRef.update(_.append(a.show)) + + override def errorln[A](a: A)(implicit S: Show[A]): F[Unit] = + log(show"errorln($a)") *> stdErrRef.update(_.append(a.show).append("\n")) + + /** + * Close the TestConsole + * + * Any blocked calls to [[readLineWithCharset]] terminate with a raised + * [[java.io.EOFException]] + */ + def close: F[Unit] = stdInSemaphore.permit.use { _ => + stdInStateRef + .get + .flatTap(_ => log("Closing")) + .flatMap { + case Closed() => F.unit + case Ready(lines, partial) => + log(s"Discarding ${lines.length} lines and ${partial.chunks.length} bytes from stdIn") *> + stdInStateRef.set(Closed[F]()) + case Waiting(requests, buffer) => + log(s"Discarding ${buffer.chunks.length} bytes from stdIn") + .unlessA(buffer.chunks.isEmpty) *> + log(s"Notifying ${requests.length} pending read requests") + .unlessA(requests.isEmpty) *> + stdInStateRef.set(Closed[F]()) *> requests.parTraverse_( + _.complete(streamClosed.asLeft)) + } + .flatTap(_ => log("Closed")) + } + + /** + * @return + * The current contents of stdOut + */ + def stdOutContents: F[String] = stdOutRef.get.map(_.mkString_("")) + + /** + * @return + * The current contents of stdErr + */ + def stdErrContents: F[String] = stdErrRef.get.map(_.mkString_("")) + + /** + * @return + * A human-readable description of the activity log and current status of this instance. + * + * Handy for debugging failing or blocked tests. + */ + def activityLog: F[String] = + (logsRef.get.map(_.mkString_("\n")), stdStateDescription).mapN { (logStr, stateStr) => + s"""|=== Activity Log === + |$logStr + |=== Current State === + |$stateStr""".stripMargin + } + + /** + * Clear the human-readable activity log + */ + def clearLog: F[Unit] = logsRef.set(Chain.empty) + + private def stdStateDescription: F[String] = stdInStateRef.get.map { + case Closed() => "Closed" + case Ready(lines, partial) => + val linesStr = lines.mkString_("\n") + val partialStr = + if (partial.isEmpty) "No partial line" + else s"Partial line: '${partial.render}'" + + s"""Ready for read + |$partialStr + |--- Complete Lines --- + |$linesStr""".stripMargin + case Waiting(requests, buffer) => + val bufferStr = + if (buffer.isEmpty) "No partial line" + else s"Partial line: '${buffer.render}'" + s"""Waiting for read + |Pending requests: ${requests.length} + |$bufferStr""".stripMargin + } +} +object TestConsole { + + /** + * Create a [[TestConsole]] instance without lifecycle management + * + * CAUTION + * + * Be careful to ensure that [[TestConsole.close]] is called before the end of the test, to + * make sure that no fibers are blocked waiting on a call to + * [[TestConsole.readLineWithCharset]] + */ + def unsafe[F[_]: Concurrent: Parallel]: F[TestConsole[F]] = + ( + Semaphore[F](1L), + Ref.of[F, TestStdInState[F]](TestStdInState.Waiting[F](Chain.empty, PartialLine.empty)), + Ref.empty[F, Chain[String]], + Ref.empty[F, Chain[String]], + Ref.empty[F, Chain[String]], + Ref.of[F, Int](0) + ).mapN(new TestConsole[F](_, _, _, _, _, _)) + + /** + * Create a resource which instantiates and closes a [[TestConsole]] + * + * This is the preferred usage pattern, as it ensures that no fibers are left blocked on calls + * to [[TestConsole.readLineWithCharset]] + */ + def resource[F[_]: Concurrent: Parallel]: Resource[F, TestConsole[F]] = + Resource.make[F, TestConsole[F]](unsafe[F])(_.close.recover { + case ConsoleClosedException() => () + }) + + private[testkit] final case class ConsoleClosedException() + extends IllegalStateException("Console is closed") + with NoStackTrace + + private[testkit] sealed trait TestStdInState[F[_]] + private[testkit] object TestStdInState { + final case class Chunk(value: String, charset: Charset) { + def bytes: Array[Byte] = value.getBytes(charset) + def isEmpty: Boolean = value.isEmpty + def modify(f: String => String): Chunk = Chunk(f(value), charset) + def split(char: Char): Option[(Chunk, Chunk)] = { + val idx = value.indexOf(char.toInt) + if (idx === -1) None + else { + val (head, tail) = value.splitAt(idx) + Some((Chunk(head, charset), Chunk(tail.drop(1), charset))) + } + } + } + object Chunk { + implicit val show: Show[Chunk] = Show.show(_.value) + } + + final case class PartialLine(chunks: Chain[Chunk]) { + def isEmpty: Boolean = chunks.forall(_.isEmpty) + + def render: String = chunks.mkString_("") + + def toLine: Line = Line(chunks) + + def append(chunk: Chunk): (Chain[Line], PartialLine) = + if (chunk.value.startsWith("\n")) + PartialLine.empty.append(chunk.modify(_.drop(1))).leftMap(_.prepend(toLine)) + else if (chunk.value.endsWith("\n")) { + val (lines, lastLine) = append(chunk.modify(_.dropRight(1))) + lines.append(lastLine.toLine) -> PartialLine.empty + } else { + if (chunk.isEmpty) (Chain.empty, this) + else { + @tailrec + def loop(accum: Chain[Line], remaining: Chunk): (Chain[Line], PartialLine) = + if (remaining.isEmpty) (accum, PartialLine.empty) + else { + remaining.split('\n') match { + case None => (accum, PartialLine.one(remaining)) + case Some((head, tail)) => loop(accum.append(Line.one(head)), tail) + } + } + + chunk.split('\n') match { + case Some((head, tail)) => + loop(Chain.one(Line(chunks.append(head))), tail) + case None => + if (isEmpty) (Chain.empty, PartialLine.one(chunk)) + else (Chain.empty, PartialLine(chunks.append(chunk))) + } + } + } + } + object PartialLine { + def one(c: Chunk): PartialLine = PartialLine(Chain.one(c)) + def empty: PartialLine = PartialLine(Chain.empty) + } + + final case class Line(chunks: Chain[Chunk]) { + def isEmpty: Boolean = chunks.forall(_.isEmpty) + def render: String = chunks.mkString_("") + def bytes: Array[Byte] = + chunks.map(_.bytes).toVector.toArray.flatten + } + object Line { + def one(chunk: Chunk): Line = Line(Chain.one(chunk)) + def empty: Line = Line(Chain.empty) + implicit val show: Show[Line] = Show.show(_.render) + } + + final case class Closed[F[_]]() extends TestStdInState[F] + final case class Ready[F[_]](lines: NonEmptyChain[Line], partial: PartialLine) + extends TestStdInState[F] + + final case class Waiting[F[_]]( + requests: Chain[Deferred[F, Either[Throwable, Array[Byte]]]], + buffer: PartialLine) + extends TestStdInState[F] + } +} diff --git a/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala b/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala new file mode 100644 index 0000000000..4eb708d40b --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala @@ -0,0 +1,355 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package testkit + +import cats.data.Chain +import cats.effect.testkit.TestConsole.TestStdInState.{Chunk, Line, PartialLine} +import cats.syntax.all._ + +import scala.concurrent.duration.DurationInt + +import java.nio.charset.StandardCharsets + +class TestConsoleSpec extends BaseSuite { + private def chunk(value: String): Chunk = Chunk(value, StandardCharsets.UTF_8) + + real("TestConsole.print should append the formatted value to stdOut") { + TestConsole.resource[IO].use { console => + for { + _ <- console.println(120) + _ <- console.print("foo") + stdOut <- console.stdOutContents + } yield assertEquals(stdOut, "120\nfoo") + } + } + + real("TestConsole.error should append the formatted value to stdErr") { + TestConsole.resource[IO].use { console => + for { + _ <- console.errorln(120) + _ <- console.error("foo") + stdOut <- console.stdErrContents + } yield assertEquals(stdOut, "120\nfoo") + } + } + + real("TestConsole.write should fail when closed") { + TestConsole.resource[IO].use { console => + for { + _ <- console.close + result <- console.write("foo").attempt + } yield assertEquals(result.leftMap(_.getMessage), Left("Console is closed")) + } + } + + real("TestConsole.write should remain waiting when waiting and line is not complete") { + TestConsole.resource[IO].use { console => + for { + _ <- console.write("foo") + _ <- console.write("bar") + state <- console.activityLog + } yield assertEquals( + state, + """|=== Activity Log === + |Writing to stdin: foo + |Writing to stdin: bar + |=== Current State === + |Waiting for read + |Pending requests: 0 + |Partial line: 'foobar'""".stripMargin + ) + } + } + + real("TestConsole.write should be ready when waiting and line is complete") { + TestConsole.resource[IO].use { console => + for { + _ <- console.writeln("foo") + state <- console.activityLog + } yield assertEquals( + state, + """|=== Activity Log === + |Writing line to stdin: foo + |=== Current State === + |Ready for read + |No partial line + |--- Complete Lines --- + |foo""".stripMargin + ) + } + } + + real("TestConsole.write should stay ready when already ready") { + TestConsole.resource[IO].use { console => + for { + _ <- console.writeln("foo") + _ <- console.writeln("bar") + state <- console.activityLog + } yield assertEquals( + state, + """|=== Activity Log === + |Writing line to stdin: foo + |Writing line to stdin: bar + |=== Current State === + |Ready for read + |No partial line + |--- Complete Lines --- + |foo + |bar""".stripMargin + ) + } + } + + real("TestConsole.write should accumulate partial lines when already ready") { + TestConsole.resource[IO].use { console => + for { + _ <- console.writeln("foo") + _ <- console.write("bar") + _ <- console.write("baz") + state <- console.activityLog + } yield assertEquals( + state, + """|=== Activity Log === + |Writing line to stdin: foo + |Writing to stdin: bar + |Writing to stdin: baz + |=== Current State === + |Ready for read + |Partial line: 'barbaz' + |--- Complete Lines --- + |foo""".stripMargin + ) + } + } + + real( + "TestConsole.write should retain partial lines when writing a new line and already ready") { + TestConsole.resource[IO].use { console => + for { + _ <- console.writeln("foo") + _ <- console.write("bar") + _ <- console.write("baz") + _ <- console.writeln("qux") + state <- console.activityLog + } yield assertEquals( + state, + """|=== Activity Log === + |Writing line to stdin: foo + |Writing to stdin: bar + |Writing to stdin: baz + |Writing line to stdin: qux + |=== Current State === + |Ready for read + |No partial line + |--- Complete Lines --- + |foo + |barbazqux""".stripMargin + ) + } + } + + real("TestConsole.write should wake up multiple reads if the line has embedded newlines") { + TestConsole.resource[IO].use { console => + TestControl.executeEmbed { + ( + console.readLineWithCharset(StandardCharsets.UTF_8), + console.readLineWithCharset(StandardCharsets.UTF_8).delayBy(10.millis), + console.writeln("foo\nbar").delayBy(20.millis) + ).parMapN { + case (fooRead, barRead, _) => + assertEquals((fooRead, barRead), ("foo", "bar")) + } *> console.activityLog.flatMap { log => + // Non-determinism is fun. + // The reads receive the right values, but they may wake up out of + // order. + if (log == """|=== Activity Log === + |Reading stdIn [id: 0] + |Reading stdIn [id: 1] + |Writing line to stdin: foo + |bar + |Read from stdin [id: 0]: foo + |Read from stdin [id: 1]: bar + |=== Current State === + |Waiting for read + |Pending requests: 0 + |No partial line""".stripMargin) IO.unit + else if (log == """|=== Activity Log === + |Reading stdIn [id: 0] + |Reading stdIn [id: 1] + |Writing line to stdin: foo + |bar + |Read from stdin [id: 1]: bar + |Read from stdin [id: 0]: foo + |=== Current State === + |Waiting for read + |Pending requests: 0 + |No partial line""".stripMargin) IO.unit + else IO(fail("Unexpected activity log", clues(log))) + } + } + } + } + + real("TestConsole.readLineWithCharset should read only a single line from stdIn") { + TestConsole.resource[IO].use { console => + for { + _ <- console.writeln("foo") + _ <- console.writeln("bar") + actual <- console.readLineWithCharset(StandardCharsets.UTF_8) + } yield assertEquals(actual, "foo") + } + } + + real("TestConsole.readLineWithCharset should block if the write isn't ready") { + TestConsole.resource[IO].use { console => + TestControl.executeEmbed { + ( + console.readLineWithCharset(StandardCharsets.UTF_8), + console.readLineWithCharset(StandardCharsets.UTF_8).delayBy(10.millis), + console.writeln("foo").delayBy(20.millis), + console.writeln("bar").delayBy(30.millis) + ).parMapN { + case (fooRead, barRead, _, _) => + assertEquals((fooRead, barRead), ("foo", "bar")) + } *> console.activityLog.map { + assertEquals( + _, + """|=== Activity Log === + |Reading stdIn [id: 0] + |Reading stdIn [id: 1] + |Writing line to stdin: foo + |Read from stdin [id: 0]: foo + |Writing line to stdin: bar + |Read from stdin [id: 1]: bar + |=== Current State === + |Waiting for read + |Pending requests: 0 + |No partial line""".stripMargin + ) + } + } + } + } + + real("TestConsole.close should clean up any blocked reads") { + TestConsole.resource[IO].use { console => + TestControl.executeEmbed { + ( + console.readLineWithCharset(StandardCharsets.UTF_8).attempt, + console + .readLineWithCharset(StandardCharsets.UTF_8) + .attempt + .map(_.leftMap(_.getMessage)) + .delayBy(10.millis), + console.writeln("foo").attempt.delayBy(20.millis), + console.close.delayBy(30.millis) + ).parMapN { + case (fooRead, barRead, write, _) => + assertEquals( + (fooRead, barRead, write), + (Right("foo"), Left("End Of File"), Right(())) + ) + } *> console.activityLog.flatMap { log => + // Non-determinism is fun. + // The result itself is stable, but the failure may be logged before + // or after the transition to 'Closed' is logged + if (log == """|=== Activity Log === + |Reading stdIn [id: 0] + |Reading stdIn [id: 1] + |Writing line to stdin: foo + |Read from stdin [id: 0]: foo + |Closing + |Notifying 1 pending read requests + |Read from stdin failed [id: 1]: java.io.EOFException: End Of File + |Closed + |=== Current State === + |Closed""".stripMargin) IO.unit + else if (log == """|=== Activity Log === + |Reading stdIn [id: 0] + |Reading stdIn [id: 1] + |Writing line to stdin: foo + |Read from stdin [id: 0]: foo + |Closing + |Notifying 1 pending read requests + |Closed + |Read from stdin failed [id: 1]: java.io.EOFException: End Of File + |=== Current State === + |Closed""".stripMargin) IO.unit + else IO(fail("Unexpected activity log", clues(log))) + } + } + } + } + + test("PartialLine.append should not change when appending an empty chunk") { + assertEquals( + PartialLine.one(chunk("foo")).append(chunk("")), + (Chain.empty[Line], PartialLine.one(chunk("foo"))) + ) + } + + test("PartialLine.append should replace when appending to an empty chunk") { + assertEquals( + PartialLine.one(chunk("")).append(chunk("foo")), + (Chain.empty[Line], PartialLine.one(chunk("foo"))) + ) + } + + test( + "PartialLine.append should append the incoming chunk when it does not contain a newline") { + assertEquals( + PartialLine.one(chunk("foo")).append(chunk("bar")), + (Chain.empty[Line], PartialLine(Chain(chunk("foo"), chunk("bar")))) + ) + } + + test( + "PartialLine.append should emit and replace when the incoming chunk starts with a newline") { + assertEquals( + PartialLine.one(chunk("foo")).append(chunk("\nbar")), + (Chain.one(Line.one(chunk("foo"))), PartialLine.one(chunk("bar"))) + ) + } + + test("append the incoming chunk when it does not end with a newline") { + assertEquals( + PartialLine.one(chunk("foo")).append(chunk("bar")), + (Chain.empty, PartialLine(Chain(chunk("foo"), chunk("bar")))) + ) + } + + test("emit when the incomming chunk ends with a newline") { + assertEquals( + PartialLine.one(chunk("foo")).append(chunk("bar\n")), + (Chain.one(Line(Chain(chunk("foo"), chunk("bar")))), PartialLine.empty) + ) + } + + test("emit multiple if the incoming chunk has multiple newlines") { + assertEquals( + PartialLine.one(chunk("foo")).append(chunk("bar\nbaz\nqux")), + ( + Chain( + Line(Chain(chunk("foo"), chunk("bar"))), + Line.one(chunk("baz")) + ), + PartialLine.one(chunk("qux")) + ) + ) + } +} From f433ae7ce504baa4e8727a0dafc0da893a28e274 Mon Sep 17 00:00:00 2001 From: Morgen Peschke Date: Mon, 26 May 2025 10:41:45 -0700 Subject: [PATCH 2/5] Hide TestConsole.close --- .../cats/effect/testkit/TestConsole.scala | 29 ++--- .../cats/effect/testkit/TestConsoleSpec.scala | 104 +++++++++--------- 2 files changed, 63 insertions(+), 70 deletions(-) diff --git a/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala b/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala index 3573c2cceb..363e50ff54 100644 --- a/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala +++ b/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala @@ -158,7 +158,7 @@ final class TestConsole[F[_]: Parallel]( * Any blocked calls to [[readLineWithCharset]] terminate with a raised * [[java.io.EOFException]] */ - def close: F[Unit] = stdInSemaphore.permit.use { _ => + private def close: F[Unit] = stdInSemaphore.permit.use { _ => stdInStateRef .get .flatTap(_ => log("Closing")) @@ -233,15 +233,17 @@ final class TestConsole[F[_]: Parallel]( object TestConsole { /** - * Create a [[TestConsole]] instance without lifecycle management - * - * CAUTION + * Create a resource which instantiates and closes a [[TestConsole]] * - * Be careful to ensure that [[TestConsole.close]] is called before the end of the test, to - * make sure that no fibers are blocked waiting on a call to - * [[TestConsole.readLineWithCharset]] + * This is the preferred usage pattern, as it ensures that no fibers are left blocked on calls + * to [[TestConsole.readLineWithCharset]] */ - def unsafe[F[_]: Concurrent: Parallel]: F[TestConsole[F]] = + def resource[F[_]: Concurrent: Parallel]: Resource[F, TestConsole[F]] = + Resource.make[F, TestConsole[F]](unsafe[F])(_.close.recover { + case ConsoleClosedException() => () + }) + + private def unsafe[F[_]: Concurrent: Parallel]: F[TestConsole[F]] = ( Semaphore[F](1L), Ref.of[F, TestStdInState[F]](TestStdInState.Waiting[F](Chain.empty, PartialLine.empty)), @@ -251,17 +253,6 @@ object TestConsole { Ref.of[F, Int](0) ).mapN(new TestConsole[F](_, _, _, _, _, _)) - /** - * Create a resource which instantiates and closes a [[TestConsole]] - * - * This is the preferred usage pattern, as it ensures that no fibers are left blocked on calls - * to [[TestConsole.readLineWithCharset]] - */ - def resource[F[_]: Concurrent: Parallel]: Resource[F, TestConsole[F]] = - Resource.make[F, TestConsole[F]](unsafe[F])(_.close.recover { - case ConsoleClosedException() => () - }) - private[testkit] final case class ConsoleClosedException() extends IllegalStateException("Console is closed") with NoStackTrace diff --git a/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala b/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala index 4eb708d40b..70ab629f4c 100644 --- a/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala @@ -49,11 +49,12 @@ class TestConsoleSpec extends BaseSuite { } real("TestConsole.write should fail when closed") { - TestConsole.resource[IO].use { console => - for { - _ <- console.close - result <- console.write("foo").attempt - } yield assertEquals(result.leftMap(_.getMessage), Left("Console is closed")) + TestConsole.resource[IO].allocated.flatMap { + case (console, close) => + for { + _ <- close + result <- console.write("foo").attempt + } yield assertEquals(result.leftMap(_.getMessage), Left("Console is closed")) } } @@ -246,53 +247,54 @@ class TestConsoleSpec extends BaseSuite { } } - real("TestConsole.close should clean up any blocked reads") { - TestConsole.resource[IO].use { console => - TestControl.executeEmbed { - ( - console.readLineWithCharset(StandardCharsets.UTF_8).attempt, - console - .readLineWithCharset(StandardCharsets.UTF_8) - .attempt - .map(_.leftMap(_.getMessage)) - .delayBy(10.millis), - console.writeln("foo").attempt.delayBy(20.millis), - console.close.delayBy(30.millis) - ).parMapN { - case (fooRead, barRead, write, _) => - assertEquals( - (fooRead, barRead, write), - (Right("foo"), Left("End Of File"), Right(())) - ) - } *> console.activityLog.flatMap { log => - // Non-determinism is fun. - // The result itself is stable, but the failure may be logged before - // or after the transition to 'Closed' is logged - if (log == """|=== Activity Log === - |Reading stdIn [id: 0] - |Reading stdIn [id: 1] - |Writing line to stdin: foo - |Read from stdin [id: 0]: foo - |Closing - |Notifying 1 pending read requests - |Read from stdin failed [id: 1]: java.io.EOFException: End Of File - |Closed - |=== Current State === - |Closed""".stripMargin) IO.unit - else if (log == """|=== Activity Log === - |Reading stdIn [id: 0] - |Reading stdIn [id: 1] - |Writing line to stdin: foo - |Read from stdin [id: 0]: foo - |Closing - |Notifying 1 pending read requests - |Closed - |Read from stdin failed [id: 1]: java.io.EOFException: End Of File - |=== Current State === - |Closed""".stripMargin) IO.unit - else IO(fail("Unexpected activity log", clues(log))) + real("TestConsole should clean up any blocked reads when released") { + TestConsole.resource[IO].allocated.flatMap { + case (console, close) => + TestControl.executeEmbed { + ( + console.readLineWithCharset(StandardCharsets.UTF_8).attempt, + console + .readLineWithCharset(StandardCharsets.UTF_8) + .attempt + .map(_.leftMap(_.getMessage)) + .delayBy(10.millis), + console.writeln("foo").attempt.delayBy(20.millis), + close.delayBy(30.millis) + ).parMapN { + case (fooRead, barRead, write, _) => + assertEquals( + (fooRead, barRead, write), + (Right("foo"), Left("End Of File"), Right(())) + ) + } *> console.activityLog.flatMap { log => + // Non-determinism is fun. + // The result itself is stable, but the failure may be logged before + // or after the transition to 'Closed' is logged + if (log == """|=== Activity Log === + |Reading stdIn [id: 0] + |Reading stdIn [id: 1] + |Writing line to stdin: foo + |Read from stdin [id: 0]: foo + |Closing + |Notifying 1 pending read requests + |Read from stdin failed [id: 1]: java.io.EOFException: End Of File + |Closed + |=== Current State === + |Closed""".stripMargin) IO.unit + else if (log == """|=== Activity Log === + |Reading stdIn [id: 0] + |Reading stdIn [id: 1] + |Writing line to stdin: foo + |Read from stdin [id: 0]: foo + |Closing + |Notifying 1 pending read requests + |Closed + |Read from stdin failed [id: 1]: java.io.EOFException: End Of File + |=== Current State === + |Closed""".stripMargin) IO.unit + else IO(fail("Unexpected activity log", clues(log))) + } } - } } } From 4f6898ecb64d1afacf38bf77eb126c0677d4b3a3 Mon Sep 17 00:00:00 2001 From: Morgen Peschke Date: Wed, 16 Jul 2025 15:26:06 -0700 Subject: [PATCH 3/5] Separate inspection and stdin control from TestConsole --- .../cats/effect/testkit/TestConsole.scala | 642 +++++++++++------- .../cats/effect/testkit/TestConsoleSpec.scala | 390 +++++------ 2 files changed, 582 insertions(+), 450 deletions(-) diff --git a/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala b/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala index 363e50ff54..eb70b7fbef 100644 --- a/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala +++ b/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala @@ -16,13 +16,12 @@ package cats.effect.testkit -import cats.{Parallel, Show} +import cats.Show import cats.data.{Chain, NonEmptyChain} import cats.effect.Concurrent import cats.effect.kernel.{Deferred, Ref, Resource} import cats.effect.std.{Console, Semaphore} -import cats.effect.testkit.TestConsole.{ConsoleClosedException, TestStdInState} -import cats.effect.testkit.TestConsole.TestStdInState._ +import cats.effect.testkit.TestConsole.TestStdIn import cats.syntax.all._ import scala.annotation.tailrec @@ -34,111 +33,16 @@ import java.nio.charset.Charset /** * Implement a test version of [[cats.effect.std.Console]] */ -final class TestConsole[F[_]: Parallel]( - stdInSemaphore: Semaphore[F], - stdInStateRef: Ref[F, TestStdInState[F]], +final class TestConsole[F[_]: Concurrent]( + stdIn: TestStdIn[F], stdOutRef: Ref[F, Chain[String]], stdErrRef: Ref[F, Chain[String]], - logsRef: Ref[F, Chain[String]], - readIdRef: Ref[F, Int] -)(implicit F: Concurrent[F]) - extends Console[F] { - private val defaultCharset = Charset.defaultCharset() - private def streamClosed = new EOFException("End Of File") - private def log(msg: String): F[Unit] = logsRef.update(_.append(msg)) - - /** - * Write a string to the simulated stdIn - * - * Blocked calls to [[readLineWithCharset]] will be woken up if `str` contains one or more - * lines. - * - * @note - * Blocked calls will be woken in a first-in-first-out order. - */ - def write[A](value: A, charset: Charset = defaultCharset)(implicit S: Show[A]): F[Unit] = - log(show"Writing to stdin: $value") *> writeImpl(Chunk(value.show, charset)) - - /** - * Write a string and a newline to the simulated stdIn - * - * At least one blocked call to [[readLineWithCharset]] will be woken up, if it exists. - * - * @note - * Blocked calls will be woken in a first-in-first-out order. - */ - def writeln[A](value: A, charset: Charset = defaultCharset)(implicit S: Show[A]): F[Unit] = - log(show"Writing line to stdin: $value") *> writeImpl(Chunk(show"$value\n", charset)) - - private def writeImpl(chunk: Chunk): F[Unit] = - if (chunk.isEmpty) F.unit - else - stdInSemaphore.permit.use { _ => - stdInStateRef.get.flatMap { - case Closed() => F.raiseError(ConsoleClosedException()) - case Ready(lines, partial) => - val (newLines, newPartial) = partial.append(chunk) - stdInStateRef.set(Ready[F](lines.appendChain(newLines), newPartial)) - case Waiting(requests, buffer) => - val (lines, partial) = buffer.append(chunk) - if (lines.isEmpty) - stdInStateRef.set(Waiting[F](requests, partial)) - else { - def loop( - remainingLines: Chain[Line], - remainingRequests: Chain[Deferred[F, Either[Throwable, Array[Byte]]]]) - : F[TestStdInState[F]] = - (remainingLines.uncons, remainingRequests.uncons) match { - case (None, None) => - Waiting[F](Chain.empty, PartialLine.empty).pure[F].widen - case (None, Some(_)) => - Waiting[F](remainingRequests, partial).pure[F].widen - case (Some((nextLine, otherLines)), None) => - Ready[F](NonEmptyChain.fromChainPrepend(nextLine, otherLines), partial) - .pure[F] - .widen - case (Some((nextLine, otherLines)), Some((nextRequest, otherRequests))) => - nextRequest - .complete(nextLine.bytes.asRight) >> loop(otherLines, otherRequests) - } - - loop(lines, requests).flatMap(stdInStateRef.set) - } - } - } + inspector: TestConsole.Inspector[F] +) extends Console[F] { + import inspector.log override def readLineWithCharset(charset: Charset): F[String] = - readIdRef.getAndUpdate(_ + 1).flatMap { readId => - stdInSemaphore - .permit - .use { _ => - log(s"Reading stdIn [id: $readId]") *> - stdInStateRef.get.flatMap { - case Closed() => - F.raiseError[Deferred[F, Either[Throwable, Array[Byte]]]](streamClosed) - case Ready(lines, partial) => - val newState = - NonEmptyChain - .fromChain(lines.tail) - .fold[TestStdInState[F]](Waiting(Chain.empty, PartialLine.empty))( - Ready(_, partial)) - - stdInStateRef.set(newState) *> - Deferred[F, Either[Throwable, Array[Byte]]].flatTap( - _.complete(lines.head.bytes.asRight)) - case Waiting(requests, buffer) => - Deferred[F, Either[Throwable, Array[Byte]]].flatTap(d => - stdInStateRef.set(Waiting(requests.append(d), buffer))) - } - } - .flatMap(_.get) - .flatMap(_.traverse(bytes => Concurrent[F].catchNonFatal(new String(bytes, charset)))) - .flatTap { - case Left(ex) => log(s"Read from stdin failed [id: $readId]: $ex") - case Right(line) => log(s"Read from stdin [id: $readId]: $line") - } - .rethrow - } + stdIn.readLine(charset) override def print[A](a: A)(implicit S: Show[A]): F[Unit] = log(show"print($a)") *> stdOutRef.update(_.append(a.show)) @@ -151,84 +55,6 @@ final class TestConsole[F[_]: Parallel]( override def errorln[A](a: A)(implicit S: Show[A]): F[Unit] = log(show"errorln($a)") *> stdErrRef.update(_.append(a.show).append("\n")) - - /** - * Close the TestConsole - * - * Any blocked calls to [[readLineWithCharset]] terminate with a raised - * [[java.io.EOFException]] - */ - private def close: F[Unit] = stdInSemaphore.permit.use { _ => - stdInStateRef - .get - .flatTap(_ => log("Closing")) - .flatMap { - case Closed() => F.unit - case Ready(lines, partial) => - log(s"Discarding ${lines.length} lines and ${partial.chunks.length} bytes from stdIn") *> - stdInStateRef.set(Closed[F]()) - case Waiting(requests, buffer) => - log(s"Discarding ${buffer.chunks.length} bytes from stdIn") - .unlessA(buffer.chunks.isEmpty) *> - log(s"Notifying ${requests.length} pending read requests") - .unlessA(requests.isEmpty) *> - stdInStateRef.set(Closed[F]()) *> requests.parTraverse_( - _.complete(streamClosed.asLeft)) - } - .flatTap(_ => log("Closed")) - } - - /** - * @return - * The current contents of stdOut - */ - def stdOutContents: F[String] = stdOutRef.get.map(_.mkString_("")) - - /** - * @return - * The current contents of stdErr - */ - def stdErrContents: F[String] = stdErrRef.get.map(_.mkString_("")) - - /** - * @return - * A human-readable description of the activity log and current status of this instance. - * - * Handy for debugging failing or blocked tests. - */ - def activityLog: F[String] = - (logsRef.get.map(_.mkString_("\n")), stdStateDescription).mapN { (logStr, stateStr) => - s"""|=== Activity Log === - |$logStr - |=== Current State === - |$stateStr""".stripMargin - } - - /** - * Clear the human-readable activity log - */ - def clearLog: F[Unit] = logsRef.set(Chain.empty) - - private def stdStateDescription: F[String] = stdInStateRef.get.map { - case Closed() => "Closed" - case Ready(lines, partial) => - val linesStr = lines.mkString_("\n") - val partialStr = - if (partial.isEmpty) "No partial line" - else s"Partial line: '${partial.render}'" - - s"""Ready for read - |$partialStr - |--- Complete Lines --- - |$linesStr""".stripMargin - case Waiting(requests, buffer) => - val bufferStr = - if (buffer.isEmpty) "No partial line" - else s"Partial line: '${buffer.render}'" - s"""Waiting for read - |Pending requests: ${requests.length} - |$bufferStr""".stripMargin - } } object TestConsole { @@ -238,104 +64,400 @@ object TestConsole { * This is the preferred usage pattern, as it ensures that no fibers are left blocked on calls * to [[TestConsole.readLineWithCharset]] */ - def resource[F[_]: Concurrent: Parallel]: Resource[F, TestConsole[F]] = - Resource.make[F, TestConsole[F]](unsafe[F])(_.close.recover { - case ConsoleClosedException() => () - }) - - private def unsafe[F[_]: Concurrent: Parallel]: F[TestConsole[F]] = - ( - Semaphore[F](1L), - Ref.of[F, TestStdInState[F]](TestStdInState.Waiting[F](Chain.empty, PartialLine.empty)), - Ref.empty[F, Chain[String]], - Ref.empty[F, Chain[String]], - Ref.empty[F, Chain[String]], - Ref.of[F, Int](0) - ).mapN(new TestConsole[F](_, _, _, _, _, _)) + def resource[F[_]: Concurrent]: Resource[F, (TestConsole[F], TestStdIn[F], Inspector[F])] = + Resource.make[F, (TestConsole[F], TestStdIn[F], Inspector[F])](unsafe[F]) { + case (_, stdIn, _) => stdIn.close.recover { case ConsoleClosedException() => () } + } + + private def unsafe[F[_]: Concurrent]: F[(TestConsole[F], TestStdIn[F], Inspector[F])] = + for { + stdInStateRef <- Ref.of[F, TestStdIn.State[F]](TestStdIn.State.waiting[F]) + stdOutRef <- Ref.empty[F, Chain[String]] + stdErrRef <- Ref.empty[F, Chain[String]] + inspector <- Inspector.default(stdInStateRef, stdOutRef, stdErrRef) + stdIn <- TestStdIn.default(stdInStateRef, inspector.log) + } yield (new TestConsole[F](stdIn, stdOutRef, stdErrRef, inspector), stdIn, inspector) private[testkit] final case class ConsoleClosedException() extends IllegalStateException("Console is closed") with NoStackTrace - private[testkit] sealed trait TestStdInState[F[_]] - private[testkit] object TestStdInState { - final case class Chunk(value: String, charset: Charset) { - def bytes: Array[Byte] = value.getBytes(charset) - def isEmpty: Boolean = value.isEmpty - def modify(f: String => String): Chunk = Chunk(f(value), charset) - def split(char: Char): Option[(Chunk, Chunk)] = { - val idx = value.indexOf(char.toInt) - if (idx === -1) None - else { - val (head, tail) = value.splitAt(idx) - Some((Chunk(head, charset), Chunk(tail.drop(1), charset))) + /** + * Allows inspection of the state of a [[TestConsole]] + */ + trait Inspector[F[_]] { + + /** + * @return + * The current contents of the associated [[TestConsole]]'s stdOut + */ + def stdOutContents: F[String] + + /** + * @return + * The current contents of the associated [[TestConsole]]'s stdErr + */ + def stdErrContents: F[String] + + /** + * @return + * A human-readable description of the activity log and current status of this instance. + * + * Handy for debugging failing or blocked tests. + */ + def description: F[String] + + private[testkit] def log(msg: String): F[Unit] + } + object Inspector { + def default[F[_]: Concurrent]( + stdInStateRef: Ref[F, TestStdIn.State[F]], + stdOutRef: Ref[F, Chain[String]], + stdErrRef: Ref[F, Chain[String]]): F[Inspector[F]] = + Ref.empty[F, Chain[String]].map { logsRef => + new Inspector[F] { + override def stdOutContents: F[String] = stdOutRef.get.map(_.mkString_("")) + + override def stdErrContents: F[String] = stdErrRef.get.map(_.mkString_("")) + + override def description: F[String] = + (logsRef.get.map(_.mkString_("\n")), stdInStateRef.get.map(_.describe)).mapN { + (logStr, stateStr) => + s"""|=== Activity Log === + |$logStr + |=== Current State === + |$stateStr""".stripMargin + } + + override private[testkit] def log(msg: String): F[Unit] = + logsRef.update(_.append(msg)) } } + } + + trait TestStdIn[F[_]] { + + /** + * Write a string to the simulated stdIn using the system-default charset + * + * Blocked calls to [[TestConsole.readLineWithCharset]] will be woken up if `str` contains + * one or more lines. + * + * @note + * Blocked calls will be woken in a first-in-first-out order. + */ + def write[A](value: A)(implicit S: Show[A]): F[Unit] + + /** + * Write a string to the simulated stdIn + * + * Blocked calls to [[TestConsole.readLineWithCharset]] will be woken up if `str` contains + * one or more lines. + * + * @note + * Blocked calls will be woken in a first-in-first-out order. + */ + def write[A](value: A, charset: Charset)(implicit S: Show[A]): F[Unit] + + /** + * Write a string and a newline to the simulated stdIn with the system-default charset + * + * At least one blocked call to [[TestConsole.readLineWithCharset]] will be woken up, if it + * exists. + * + * @note + * Blocked calls will be woken in a first-in-first-out order. + */ + def writeln[A](value: A)(implicit S: Show[A]): F[Unit] + + /** + * Write a string and a newline to the simulated stdIn + * + * At least one blocked call to [[TestConsole.readLineWithCharset]] will be woken up, if it + * exists. + * + * @note + * Blocked calls will be woken in a first-in-first-out order. + */ + def writeln[A](value: A, charset: Charset)(implicit S: Show[A]): F[Unit] + + private[testkit] def readLine(charset: Charset): F[String] + private[testkit] def close: F[Unit] + } + + object TestStdIn { + def default[F[_]](stateRef: Ref[F, TestStdIn.State[F]], log: String => F[Unit])( + implicit F: Concurrent[F]): F[TestStdIn[F]] = + (Semaphore[F](1L), Ref.of[F, Int](0)).mapN { (semaphore, readIdRef) => + new TestStdIn[F] { + private val defaultCharset = Charset.defaultCharset() + + private def streamClosed = new EOFException("End Of File") + + override def write[A](value: A)(implicit S: Show[A]): F[Unit] = + write(value, defaultCharset) + + override def write[A](value: A, charset: Charset)(implicit S: Show[A]): F[Unit] = + log(show"Writing to stdIn: $value") *> writeImpl(State.Chunk(value.show, charset)) + + override def writeln[A](value: A)(implicit S: Show[A]): F[Unit] = + writeln(value, defaultCharset) + + override def writeln[A](value: A, charset: Charset)(implicit S: Show[A]): F[Unit] = + log(show"Writing line to stdIn: $value") *> writeImpl( + State.Chunk(show"$value\n", charset)) + + private def writeImpl(chunk: State.Chunk): F[Unit] = + if (chunk.isEmpty) F.unit + else + semaphore.permit.use { _ => + stateRef.get.flatMap { + case State.Closed() => F.raiseError(ConsoleClosedException()) + case ready: TestStdIn.State.Ready[F] => stateRef.set(ready.push(chunk)) + case oldState: TestStdIn.State.Waiting[F] => + val (lines, newBuffer) = oldState.buffer.append(chunk) + if (lines.isEmpty) stateRef.set(oldState.replaceBuffer(newBuffer)) + else { + def loop( + remainingLines: Chain[State.Line], + remainingRequests: Chain[Deferred[F, Either[Throwable, Array[Byte]]]]) + : F[State[F]] = + (remainingLines.uncons, remainingRequests.uncons) match { + case (None, None) => State.waiting[F].pure[F] + case (None, Some(_)) => + State.waiting[F](remainingRequests, newBuffer).pure[F] + case (Some((nextLine, otherLines)), None) => + State.ready[F](nextLine, otherLines, newBuffer).pure[F] + case ( + Some((nextLine, otherLines)), + Some((nextRequest, otherRequests))) => + nextRequest.complete(nextLine.bytes.asRight) >> loop( + otherLines, + otherRequests) + } + + loop(lines, oldState.requests).flatMap(stateRef.set) + } + } + } + + override private[testkit] def readLine(charset: Charset): F[String] = + readIdRef.getAndUpdate(_ + 1).flatMap { readId => + semaphore + .permit + .use { _ => + log(s"Reading stdIn [id: $readId]") *> + stateRef.get.flatMap { + case State.Closed() => + F.raiseError[Deferred[F, Either[Throwable, Array[Byte]]]](streamClosed) + case ready: TestStdIn.State.Ready[F] => + val (line, newState) = ready.shift + + stateRef.set(newState) *> + Deferred[F, Either[Throwable, Array[Byte]]].flatTap( + _.complete(line.bytes.asRight)) + case waiting: TestStdIn.State.Waiting[F] => + Deferred[F, Either[Throwable, Array[Byte]]].flatTap(d => + stateRef.set(waiting.addRequest(d))) + } + } + .flatMap(_.get) + .flatMap(_.traverse(bytes => + Concurrent[F].catchNonFatal(new String(bytes, charset)))) + .flatTap { + case Left(ex) => log(s"Read from stdIn failed [id: $readId]: $ex") + case Right(line) => log(s"Read from stdIn [id: $readId]: $line") + } + .rethrow + } + + override private[testkit] def close: F[Unit] = + semaphore.permit.use { _ => + stateRef + .get + .flatTap(_ => log("Closing stdIn")) + .flatMap { + case State.Closed() => F.unit + case State.Ready(lines, partial) => + log(s"Discarding ${lines.length} lines and ${partial.chunks.length} bytes from stdIn") *> + stateRef.set(State.closed) + case State.Waiting(requests, buffer) => + log(s"Discarding ${buffer.chunks.length} bytes from stdIn") + .unlessA(buffer.chunks.isEmpty) *> + log(s"Notifying ${requests.length} pending read requests") + .unlessA(requests.isEmpty) *> + stateRef.set(State.closed) *> + requests.traverse_(_.complete(streamClosed.asLeft)) + } + .flatTap(_ => log("Closed stdIn")) + } + } + } + + private[testkit] sealed trait State[F[_]] { + def describe: String = this match { + case State.Closed() => "Closed" + case State.Ready(lines, partial) => + val linesStr = lines.mkString_("\n") + val partialStr = + if (partial.isEmpty) "No partial line" + else s"Partial line: '${partial.render}'" + + s"""Ready for read + |$partialStr + |--- Complete Lines --- + |$linesStr""".stripMargin + case State.Waiting(requests, buffer) => + val bufferStr = + if (buffer.isEmpty) "No partial line" + else s"Partial line: '${buffer.render}'" + s"""Waiting for write + |Pending requests: ${requests.length} + |$bufferStr""".stripMargin + } } - object Chunk { - implicit val show: Show[Chunk] = Show.show(_.value) - } + private[testkit] object State { + def closed[F[_]]: State[F] = Closed() + + def ready[F[_]]( + firstLine: Line, + otherLines: Chain[Line], + partial: PartialLine): State[F] = + Ready(NonEmptyChain.fromChainPrepend(firstLine, otherLines), partial) - final case class PartialLine(chunks: Chain[Chunk]) { - def isEmpty: Boolean = chunks.forall(_.isEmpty) + def waiting[F[_]]: State[F] = Waiting(Chain.empty, PartialLine.empty) + def waiting[F[_]](buffer: PartialLine): State[F] = Waiting(Chain.empty, buffer) + def waiting[F[_]]( + requests: Chain[Deferred[F, Either[Throwable, Array[Byte]]]], + buffer: PartialLine): State[F] = Waiting(requests, buffer) - def render: String = chunks.mkString_("") + final case class Chunk(value: String, charset: Charset) { + def bytes: Array[Byte] = value.getBytes(charset) - def toLine: Line = Line(chunks) + def isEmpty: Boolean = value.isEmpty - def append(chunk: Chunk): (Chain[Line], PartialLine) = - if (chunk.value.startsWith("\n")) - PartialLine.empty.append(chunk.modify(_.drop(1))).leftMap(_.prepend(toLine)) - else if (chunk.value.endsWith("\n")) { - val (lines, lastLine) = append(chunk.modify(_.dropRight(1))) - lines.append(lastLine.toLine) -> PartialLine.empty - } else { - if (chunk.isEmpty) (Chain.empty, this) + def modify(f: String => String): Chunk = Chunk(f(value), charset) + + def split(char: Char): Option[(Chunk, Chunk)] = { + val idx = value.indexOf(char.toInt) + if (idx === -1) None else { - @tailrec - def loop(accum: Chain[Line], remaining: Chunk): (Chain[Line], PartialLine) = - if (remaining.isEmpty) (accum, PartialLine.empty) - else { - remaining.split('\n') match { - case None => (accum, PartialLine.one(remaining)) - case Some((head, tail)) => loop(accum.append(Line.one(head)), tail) + val (head, tail) = value.splitAt(idx) + Some((Chunk(head, charset), Chunk(tail.drop(1), charset))) + } + } + } + + object Chunk { + implicit val show: Show[Chunk] = Show.show(_.value) + } + + /** + * Chunks of a line which cannot be read from stdIn until a newline is written + */ + final case class PartialLine(chunks: Chain[Chunk]) { + def isEmpty: Boolean = chunks.forall(_.isEmpty) + + def render: String = chunks.mkString_("") + + def toLine: Line = Line(chunks) + + def append(chunk: Chunk): (Chain[Line], PartialLine) = + if (chunk.value.startsWith("\n")) + PartialLine.empty.append(chunk.modify(_.drop(1))).leftMap(_.prepend(toLine)) + else if (chunk.value.endsWith("\n")) { + val (lines, lastLine) = append(chunk.modify(_.dropRight(1))) + lines.append(lastLine.toLine) -> PartialLine.empty + } else { + if (chunk.isEmpty) (Chain.empty, this) + else { + @tailrec + def loop(accum: Chain[Line], remaining: Chunk): (Chain[Line], PartialLine) = + if (remaining.isEmpty) (accum, PartialLine.empty) + else { + remaining.split('\n') match { + case None => (accum, PartialLine.one(remaining)) + case Some((head, tail)) => loop(accum.append(Line.one(head)), tail) + } } - } - chunk.split('\n') match { - case Some((head, tail)) => - loop(Chain.one(Line(chunks.append(head))), tail) - case None => - if (isEmpty) (Chain.empty, PartialLine.one(chunk)) - else (Chain.empty, PartialLine(chunks.append(chunk))) + chunk.split('\n') match { + case Some((head, tail)) => + loop(Chain.one(Line(chunks.append(head))), tail) + case None => + if (isEmpty) (Chain.empty, PartialLine.one(chunk)) + else (Chain.empty, PartialLine(chunks.append(chunk))) + } } } + } + + object PartialLine { + def one(c: Chunk): PartialLine = PartialLine(Chain.one(c)) + + def empty: PartialLine = PartialLine(Chain.empty) + } + + /** + * Lines ready to be read from stdIn + */ + final case class Line(chunks: Chain[Chunk]) { + def isEmpty: Boolean = chunks.forall(_.isEmpty) + + def render: String = chunks.mkString_("") + + def bytes: Array[Byte] = + chunks.map(_.bytes).toVector.toArray.flatten + } + + object Line { + def one(chunk: Chunk): Line = Line(Chain.one(chunk)) + + def empty: Line = Line(Chain.empty) + + implicit val show: Show[Line] = Show.show(_.render) + } + + /** + * StdIn will reject reads and writes + */ + final case class Closed[F[_]]() extends State[F] + + /** + * StdIn has at least one line ready to be read + * + * Transitions to Waiting when `lines` can no longer be created + */ + final case class Ready[F[_]](lines: NonEmptyChain[Line], partial: PartialLine) + extends State[F] { + def push(chunk: Chunk): State[F] = { + val (newLines, newPartial) = partial.append(chunk) + Ready[F](lines.appendChain(newLines), newPartial) } - } - object PartialLine { - def one(c: Chunk): PartialLine = PartialLine(Chain.one(c)) - def empty: PartialLine = PartialLine(Chain.empty) - } - final case class Line(chunks: Chain[Chunk]) { - def isEmpty: Boolean = chunks.forall(_.isEmpty) - def render: String = chunks.mkString_("") - def bytes: Array[Byte] = - chunks.map(_.bytes).toVector.toArray.flatten - } - object Line { - def one(chunk: Chunk): Line = Line(Chain.one(chunk)) - def empty: Line = Line(Chain.empty) - implicit val show: Show[Line] = Show.show(_.render) - } + def shift: (Line, State[F]) = { + val newState = + NonEmptyChain.fromChain(lines.tail).fold(waiting[F](partial))(Ready(_, partial)) - final case class Closed[F[_]]() extends TestStdInState[F] - final case class Ready[F[_]](lines: NonEmptyChain[Line], partial: PartialLine) - extends TestStdInState[F] + (lines.head, newState) + } + } - final case class Waiting[F[_]]( - requests: Chain[Deferred[F, Either[Throwable, Array[Byte]]]], - buffer: PartialLine) - extends TestStdInState[F] + /** + * StdIn cannot accept writes because it doesn't have at least one complete line + * + * Transitions to Ready when a newline is written to the stream + */ + final case class Waiting[F[_]]( + requests: Chain[Deferred[F, Either[Throwable, Array[Byte]]]], + buffer: PartialLine) + extends State[F] { + def replaceBuffer(newBuffer: PartialLine): State[F] = + Waiting(requests, newBuffer) + + def addRequest(request: Deferred[F, Either[Throwable, Array[Byte]]]): State[F] = + Waiting(requests.append(request), buffer) + } + } } } diff --git a/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala b/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala index 70ab629f4c..930e17f194 100644 --- a/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala @@ -18,7 +18,7 @@ package cats.effect package testkit import cats.data.Chain -import cats.effect.testkit.TestConsole.TestStdInState.{Chunk, Line, PartialLine} +import cats.effect.testkit.TestConsole.TestStdIn.State.{Chunk, Line, PartialLine} import cats.syntax.all._ import scala.concurrent.duration.DurationInt @@ -29,227 +29,237 @@ class TestConsoleSpec extends BaseSuite { private def chunk(value: String): Chunk = Chunk(value, StandardCharsets.UTF_8) real("TestConsole.print should append the formatted value to stdOut") { - TestConsole.resource[IO].use { console => - for { - _ <- console.println(120) - _ <- console.print("foo") - stdOut <- console.stdOutContents - } yield assertEquals(stdOut, "120\nfoo") + TestConsole.resource[IO].use { + case (console, _, inspector) => + for { + _ <- console.println(120) + _ <- console.print("foo") + stdOut <- inspector.stdOutContents + } yield assertEquals(stdOut, "120\nfoo") } } real("TestConsole.error should append the formatted value to stdErr") { - TestConsole.resource[IO].use { console => - for { - _ <- console.errorln(120) - _ <- console.error("foo") - stdOut <- console.stdErrContents - } yield assertEquals(stdOut, "120\nfoo") + TestConsole.resource[IO].use { + case (console, _, inspector) => + for { + _ <- console.errorln(120) + _ <- console.error("foo") + stdOut <- inspector.stdErrContents + } yield assertEquals(stdOut, "120\nfoo") } } - real("TestConsole.write should fail when closed") { + real("TestStdIn.write should fail when closed") { TestConsole.resource[IO].allocated.flatMap { - case (console, close) => + case ((_, stdIn, _), close) => for { _ <- close - result <- console.write("foo").attempt + result <- stdIn.write("foo").attempt } yield assertEquals(result.leftMap(_.getMessage), Left("Console is closed")) } } - real("TestConsole.write should remain waiting when waiting and line is not complete") { - TestConsole.resource[IO].use { console => - for { - _ <- console.write("foo") - _ <- console.write("bar") - state <- console.activityLog - } yield assertEquals( - state, - """|=== Activity Log === - |Writing to stdin: foo - |Writing to stdin: bar - |=== Current State === - |Waiting for read - |Pending requests: 0 - |Partial line: 'foobar'""".stripMargin - ) + real("TestStdIn.write should remain waiting when waiting and line is not complete") { + TestConsole.resource[IO].use { + case (_, stdIn, inspector) => + for { + _ <- stdIn.write("foo") + _ <- stdIn.write("bar") + state <- inspector.description + } yield assertEquals( + state, + """|=== Activity Log === + |Writing to stdIn: foo + |Writing to stdIn: bar + |=== Current State === + |Waiting for write + |Pending requests: 0 + |Partial line: 'foobar'""".stripMargin + ) } } - real("TestConsole.write should be ready when waiting and line is complete") { - TestConsole.resource[IO].use { console => - for { - _ <- console.writeln("foo") - state <- console.activityLog - } yield assertEquals( - state, - """|=== Activity Log === - |Writing line to stdin: foo - |=== Current State === - |Ready for read - |No partial line - |--- Complete Lines --- - |foo""".stripMargin - ) + real("TestStdIn.write should be ready when waiting and line is complete") { + TestConsole.resource[IO].use { + case (_, stdIn, inspector) => + for { + _ <- stdIn.writeln("foo") + state <- inspector.description + } yield assertEquals( + state, + """|=== Activity Log === + |Writing line to stdIn: foo + |=== Current State === + |Ready for read + |No partial line + |--- Complete Lines --- + |foo""".stripMargin + ) } } - real("TestConsole.write should stay ready when already ready") { - TestConsole.resource[IO].use { console => - for { - _ <- console.writeln("foo") - _ <- console.writeln("bar") - state <- console.activityLog - } yield assertEquals( - state, - """|=== Activity Log === - |Writing line to stdin: foo - |Writing line to stdin: bar - |=== Current State === - |Ready for read - |No partial line - |--- Complete Lines --- - |foo - |bar""".stripMargin - ) + real("TestStdIn.write should stay ready when already ready") { + TestConsole.resource[IO].use { + case (_, stdIn, inspector) => + for { + _ <- stdIn.writeln("foo") + _ <- stdIn.writeln("bar") + state <- inspector.description + } yield assertEquals( + state, + """|=== Activity Log === + |Writing line to stdIn: foo + |Writing line to stdIn: bar + |=== Current State === + |Ready for read + |No partial line + |--- Complete Lines --- + |foo + |bar""".stripMargin + ) } } - real("TestConsole.write should accumulate partial lines when already ready") { - TestConsole.resource[IO].use { console => - for { - _ <- console.writeln("foo") - _ <- console.write("bar") - _ <- console.write("baz") - state <- console.activityLog - } yield assertEquals( - state, - """|=== Activity Log === - |Writing line to stdin: foo - |Writing to stdin: bar - |Writing to stdin: baz - |=== Current State === - |Ready for read - |Partial line: 'barbaz' - |--- Complete Lines --- - |foo""".stripMargin - ) + real("TestStdIn.write should accumulate partial lines when already ready") { + TestConsole.resource[IO].use { + case (_, stdIn, inspector) => + for { + _ <- stdIn.writeln("foo") + _ <- stdIn.write("bar") + _ <- stdIn.write("baz") + state <- inspector.description + } yield assertEquals( + state, + """|=== Activity Log === + |Writing line to stdIn: foo + |Writing to stdIn: bar + |Writing to stdIn: baz + |=== Current State === + |Ready for read + |Partial line: 'barbaz' + |--- Complete Lines --- + |foo""".stripMargin + ) } } real( - "TestConsole.write should retain partial lines when writing a new line and already ready") { - TestConsole.resource[IO].use { console => - for { - _ <- console.writeln("foo") - _ <- console.write("bar") - _ <- console.write("baz") - _ <- console.writeln("qux") - state <- console.activityLog - } yield assertEquals( - state, - """|=== Activity Log === - |Writing line to stdin: foo - |Writing to stdin: bar - |Writing to stdin: baz - |Writing line to stdin: qux - |=== Current State === - |Ready for read - |No partial line - |--- Complete Lines --- - |foo - |barbazqux""".stripMargin - ) + "TestStdIn.write should retain partial lines when writing a new line and already ready") { + TestConsole.resource[IO].use { + case (_, stdIn, inspector) => + for { + _ <- stdIn.writeln("foo") + _ <- stdIn.write("bar") + _ <- stdIn.write("baz") + _ <- stdIn.writeln("qux") + state <- inspector.description + } yield assertEquals( + state, + """|=== Activity Log === + |Writing line to stdIn: foo + |Writing to stdIn: bar + |Writing to stdIn: baz + |Writing line to stdIn: qux + |=== Current State === + |Ready for read + |No partial line + |--- Complete Lines --- + |foo + |barbazqux""".stripMargin + ) } } - real("TestConsole.write should wake up multiple reads if the line has embedded newlines") { - TestConsole.resource[IO].use { console => - TestControl.executeEmbed { - ( - console.readLineWithCharset(StandardCharsets.UTF_8), - console.readLineWithCharset(StandardCharsets.UTF_8).delayBy(10.millis), - console.writeln("foo\nbar").delayBy(20.millis) - ).parMapN { - case (fooRead, barRead, _) => - assertEquals((fooRead, barRead), ("foo", "bar")) - } *> console.activityLog.flatMap { log => - // Non-determinism is fun. - // The reads receive the right values, but they may wake up out of - // order. - if (log == """|=== Activity Log === - |Reading stdIn [id: 0] - |Reading stdIn [id: 1] - |Writing line to stdin: foo - |bar - |Read from stdin [id: 0]: foo - |Read from stdin [id: 1]: bar - |=== Current State === - |Waiting for read - |Pending requests: 0 - |No partial line""".stripMargin) IO.unit - else if (log == """|=== Activity Log === - |Reading stdIn [id: 0] - |Reading stdIn [id: 1] - |Writing line to stdin: foo - |bar - |Read from stdin [id: 1]: bar - |Read from stdin [id: 0]: foo - |=== Current State === - |Waiting for read - |Pending requests: 0 - |No partial line""".stripMargin) IO.unit - else IO(fail("Unexpected activity log", clues(log))) + real("TestStdIn.write should wake up multiple reads if the line has embedded newlines") { + TestConsole.resource[IO].use { + case (console, stdIn, inspector) => + TestControl.executeEmbed { + ( + console.readLineWithCharset(StandardCharsets.UTF_8), + console.readLineWithCharset(StandardCharsets.UTF_8).delayBy(10.millis), + stdIn.writeln("foo\nbar").delayBy(20.millis) + ).parMapN { + case (fooRead, barRead, _) => + assertEquals((fooRead, barRead), ("foo", "bar")) + } *> inspector.description.flatMap { log => + // Non-determinism is fun. + // The reads receive the right values, but they may wake up out of + // order. + if (log == """|=== Activity Log === + |Reading stdIn [id: 0] + |Reading stdIn [id: 1] + |Writing line to stdIn: foo + |bar + |Read from stdIn [id: 0]: foo + |Read from stdIn [id: 1]: bar + |=== Current State === + |Waiting for write + |Pending requests: 0 + |No partial line""".stripMargin) IO.unit + else if (log == """|=== Activity Log === + |Reading stdIn [id: 0] + |Reading stdIn [id: 1] + |Writing line to stdIn: foo + |bar + |Read from stdIn [id: 1]: bar + |Read from stdIn [id: 0]: foo + |=== Current State === + |Waiting for write + |Pending requests: 0 + |No partial line""".stripMargin) IO.unit + else IO(fail("Unexpected activity log", clues(log))) + } } - } } } real("TestConsole.readLineWithCharset should read only a single line from stdIn") { - TestConsole.resource[IO].use { console => - for { - _ <- console.writeln("foo") - _ <- console.writeln("bar") - actual <- console.readLineWithCharset(StandardCharsets.UTF_8) - } yield assertEquals(actual, "foo") + TestConsole.resource[IO].use { + case (console, stdIn, _) => + for { + _ <- stdIn.writeln("foo") + _ <- stdIn.writeln("bar") + actual <- console.readLineWithCharset(StandardCharsets.UTF_8) + } yield assertEquals(actual, "foo") } } real("TestConsole.readLineWithCharset should block if the write isn't ready") { - TestConsole.resource[IO].use { console => - TestControl.executeEmbed { - ( - console.readLineWithCharset(StandardCharsets.UTF_8), - console.readLineWithCharset(StandardCharsets.UTF_8).delayBy(10.millis), - console.writeln("foo").delayBy(20.millis), - console.writeln("bar").delayBy(30.millis) - ).parMapN { - case (fooRead, barRead, _, _) => - assertEquals((fooRead, barRead), ("foo", "bar")) - } *> console.activityLog.map { - assertEquals( - _, - """|=== Activity Log === - |Reading stdIn [id: 0] - |Reading stdIn [id: 1] - |Writing line to stdin: foo - |Read from stdin [id: 0]: foo - |Writing line to stdin: bar - |Read from stdin [id: 1]: bar - |=== Current State === - |Waiting for read - |Pending requests: 0 - |No partial line""".stripMargin - ) + TestConsole.resource[IO].use { + case (console, stdIn, inspector) => + TestControl.executeEmbed { + ( + console.readLineWithCharset(StandardCharsets.UTF_8), + console.readLineWithCharset(StandardCharsets.UTF_8).delayBy(10.millis), + stdIn.writeln("foo").delayBy(20.millis), + stdIn.writeln("bar").delayBy(30.millis) + ).parMapN { + case (fooRead, barRead, _, _) => + assertEquals((fooRead, barRead), ("foo", "bar")) + } *> inspector.description.map { + assertEquals( + _, + """|=== Activity Log === + |Reading stdIn [id: 0] + |Reading stdIn [id: 1] + |Writing line to stdIn: foo + |Read from stdIn [id: 0]: foo + |Writing line to stdIn: bar + |Read from stdIn [id: 1]: bar + |=== Current State === + |Waiting for write + |Pending requests: 0 + |No partial line""".stripMargin + ) + } } - } } } real("TestConsole should clean up any blocked reads when released") { TestConsole.resource[IO].allocated.flatMap { - case (console, close) => + case ((console, stdIn, inspector), close) => TestControl.executeEmbed { ( console.readLineWithCharset(StandardCharsets.UTF_8).attempt, @@ -258,7 +268,7 @@ class TestConsoleSpec extends BaseSuite { .attempt .map(_.leftMap(_.getMessage)) .delayBy(10.millis), - console.writeln("foo").attempt.delayBy(20.millis), + stdIn.writeln("foo").attempt.delayBy(20.millis), close.delayBy(30.millis) ).parMapN { case (fooRead, barRead, write, _) => @@ -266,30 +276,30 @@ class TestConsoleSpec extends BaseSuite { (fooRead, barRead, write), (Right("foo"), Left("End Of File"), Right(())) ) - } *> console.activityLog.flatMap { log => + } *> inspector.description.flatMap { log => // Non-determinism is fun. // The result itself is stable, but the failure may be logged before // or after the transition to 'Closed' is logged if (log == """|=== Activity Log === |Reading stdIn [id: 0] |Reading stdIn [id: 1] - |Writing line to stdin: foo - |Read from stdin [id: 0]: foo - |Closing + |Writing line to stdIn: foo + |Read from stdIn [id: 0]: foo + |Closing stdIn |Notifying 1 pending read requests - |Read from stdin failed [id: 1]: java.io.EOFException: End Of File - |Closed + |Read from stdIn failed [id: 1]: java.io.EOFException: End Of File + |Closed stdIn |=== Current State === |Closed""".stripMargin) IO.unit else if (log == """|=== Activity Log === |Reading stdIn [id: 0] |Reading stdIn [id: 1] - |Writing line to stdin: foo - |Read from stdin [id: 0]: foo - |Closing + |Writing line to stdIn: foo + |Read from stdIn [id: 0]: foo + |Closing stdIn |Notifying 1 pending read requests - |Closed - |Read from stdin failed [id: 1]: java.io.EOFException: End Of File + |Closed stdIn + |Read from stdIn failed [id: 1]: java.io.EOFException: End Of File |=== Current State === |Closed""".stripMargin) IO.unit else IO(fail("Unexpected activity log", clues(log))) @@ -335,7 +345,7 @@ class TestConsoleSpec extends BaseSuite { ) } - test("emit when the incomming chunk ends with a newline") { + test("emit when the incoming chunk ends with a newline") { assertEquals( PartialLine.one(chunk("foo")).append(chunk("bar\n")), (Chain.one(Line(Chain(chunk("foo"), chunk("bar")))), PartialLine.empty) From 8cba2970af05e10769e8e5aa8c3ebfd1ecaa7dee Mon Sep 17 00:00:00 2001 From: Morgen Peschke Date: Wed, 16 Jul 2025 17:23:13 -0700 Subject: [PATCH 4/5] Provide an alternate low-level verification path for TestConsole --- .../cats/effect/testkit/TestConsole.scala | 252 +++++++++++++----- .../cats/effect/testkit/TestConsoleSpec.scala | 21 +- 2 files changed, 189 insertions(+), 84 deletions(-) diff --git a/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala b/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala index eb70b7fbef..689b45bca5 100644 --- a/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala +++ b/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala @@ -16,12 +16,12 @@ package cats.effect.testkit -import cats.Show +import cats.{Applicative, ApplicativeThrow, Functor, Semigroupal, Show} import cats.data.{Chain, NonEmptyChain} import cats.effect.Concurrent import cats.effect.kernel.{Deferred, Ref, Resource} import cats.effect.std.{Console, Semaphore} -import cats.effect.testkit.TestConsole.TestStdIn +import cats.effect.testkit.TestConsole.{ConsoleClosedException, Op, TestStdIn} import cats.syntax.all._ import scala.annotation.tailrec @@ -33,28 +33,20 @@ import java.nio.charset.Charset /** * Implement a test version of [[cats.effect.std.Console]] */ -final class TestConsole[F[_]: Concurrent]( +final class TestConsole[F[_]: ApplicativeThrow]( stdIn: TestStdIn[F], - stdOutRef: Ref[F, Chain[String]], - stdErrRef: Ref[F, Chain[String]], inspector: TestConsole.Inspector[F] ) extends Console[F] { import inspector.log - override def readLineWithCharset(charset: Charset): F[String] = - stdIn.readLine(charset) + override def readLineWithCharset(charset: Charset): F[String] = stdIn.readLine(charset) + override def print[A](a: A)(implicit S: Show[A]): F[Unit] = log(Op.Print(a.show)) + override def println[A](a: A)(implicit S: Show[A]): F[Unit] = log(Op.Println(a.show)) + override def error[A](a: A)(implicit S: Show[A]): F[Unit] = log(Op.Error(a.show)) + override def errorln[A](a: A)(implicit S: Show[A]): F[Unit] = log(Op.Errorln(a.show)) - override def print[A](a: A)(implicit S: Show[A]): F[Unit] = - log(show"print($a)") *> stdOutRef.update(_.append(a.show)) - - override def println[A](a: A)(implicit S: Show[A]): F[Unit] = - log(show"println($a)") *> stdOutRef.update(_.append(a.show).append("\n")) - - override def error[A](a: A)(implicit S: Show[A]): F[Unit] = - log(show"error($a)") *> stdErrRef.update(_.append(a.show)) - - override def errorln[A](a: A)(implicit S: Show[A]): F[Unit] = - log(show"errorln($a)") *> stdErrRef.update(_.append(a.show).append("\n")) + private[testkit] def close: F[Unit] = + stdIn.close.recover { case ConsoleClosedException() => () } *> inspector.log(Op.Closed) } object TestConsole { @@ -65,18 +57,14 @@ object TestConsole { * to [[TestConsole.readLineWithCharset]] */ def resource[F[_]: Concurrent]: Resource[F, (TestConsole[F], TestStdIn[F], Inspector[F])] = - Resource.make[F, (TestConsole[F], TestStdIn[F], Inspector[F])](unsafe[F]) { - case (_, stdIn, _) => stdIn.close.recover { case ConsoleClosedException() => () } - } + Resource.make[F, (TestConsole[F], TestStdIn[F], Inspector[F])](unsafe[F])(_._1.close) private def unsafe[F[_]: Concurrent]: F[(TestConsole[F], TestStdIn[F], Inspector[F])] = for { stdInStateRef <- Ref.of[F, TestStdIn.State[F]](TestStdIn.State.waiting[F]) - stdOutRef <- Ref.empty[F, Chain[String]] - stdErrRef <- Ref.empty[F, Chain[String]] - inspector <- Inspector.default(stdInStateRef, stdOutRef, stdErrRef) - stdIn <- TestStdIn.default(stdInStateRef, inspector.log) - } yield (new TestConsole[F](stdIn, stdOutRef, stdErrRef, inspector), stdIn, inspector) + inspector <- Inspector.default(stdInStateRef) + stdIn <- TestStdIn.default(stdInStateRef, inspector) + } yield (new TestConsole[F](stdIn, inspector), stdIn, inspector) private[testkit] final case class ConsoleClosedException() extends IllegalStateException("Console is closed") @@ -91,13 +79,13 @@ object TestConsole { * @return * The current contents of the associated [[TestConsole]]'s stdOut */ - def stdOutContents: F[String] + def stdOut: F[String] /** * @return * The current contents of the associated [[TestConsole]]'s stdErr */ - def stdErrContents: F[String] + def stdErr: F[String] /** * @return @@ -107,32 +95,152 @@ object TestConsole { */ def description: F[String] - private[testkit] def log(msg: String): F[Unit] + /** + * Provides access to lower level inspections. + * + * This is generally discouraged as it tends to make tests more brittle. + */ + def lowLevel: Inspector.LowLevel[F] + + private[testkit] def log(operation: Op): F[Unit] + private[testkit] def freeze: F[Inspector[F]] } object Inspector { - def default[F[_]: Concurrent]( - stdInStateRef: Ref[F, TestStdIn.State[F]], - stdOutRef: Ref[F, Chain[String]], - stdErrRef: Ref[F, Chain[String]]): F[Inspector[F]] = - Ref.empty[F, Chain[String]].map { logsRef => - new Inspector[F] { - override def stdOutContents: F[String] = stdOutRef.get.map(_.mkString_("")) - - override def stdErrContents: F[String] = stdErrRef.get.map(_.mkString_("")) - - override def description: F[String] = - (logsRef.get.map(_.mkString_("\n")), stdInStateRef.get.map(_.describe)).mapN { - (logStr, stateStr) => - s"""|=== Activity Log === - |$logStr - |=== Current State === - |$stateStr""".stripMargin - } + trait LowLevel[F[_]] extends Inspector[F] { + + /** + * @return + * The list of user operations on stdOut + */ + def stdOutOperations: F[List[Op.StdOutOp]] - override private[testkit] def log(msg: String): F[Unit] = - logsRef.update(_.append(msg)) + /** + * @return + * The list of user operations on stdErr + */ + def stdErrOperations: F[List[Op.StdErrOp]] + + /** + * @return + * The list of user operations on stdIn + */ + def stdInOperations: F[List[Op.StdInOp]] + + /** + * @return + * The list of all operations + */ + def operationsLog: F[List[Op]] + } + + def default[F[_]: Concurrent](stdInStateRef: Ref[F, TestStdIn.State[F]]): F[Inspector[F]] = + Ref.empty[F, Chain[Op]].map { operationRef => + new Default[F](operationRef.get, stdInStateRef.get) { + override private[testkit] def log(operation: Op): F[Unit] = + operationRef.update(_.append(operation)) + + override private[testkit] def freeze: F[Inspector[F]] = + (stdInStateRef.get, operationRef.get).mapN(frozen(_, _)) } } + + def frozen[F[_]: Applicative]( + finalState: TestStdIn.State[F], + logs: Chain[Op]): Inspector[F] = + new Default[F](logs.pure[F], finalState.pure[F]) { + override private[testkit] def log(operation: Op): F[Unit] = Applicative[F].unit + + override private[testkit] def freeze: F[Inspector[F]] = this.pure[F].widen + } + + private abstract class Default[F[_]: Functor: Semigroupal]( + operations: F[Chain[Op]], + state: F[TestStdIn.State[F]]) + extends Inspector[F] + with LowLevel[F] { + override def stdOut: F[String] = operations.map { opLog => + opLog + .flatMap { + case Op.Print(value) => Chain.one(value) + case Op.Println(value) => Chain(value, "\n") + case _ => Chain.empty + } + .mkString_("") + } + + override def stdErr: F[String] = operations.map { opLog => + opLog + .flatMap { + case Op.Error(value) => Chain.one(value) + case Op.Errorln(value) => Chain(value, "\n") + case _ => Chain.empty + } + .mkString_("") + } + + override def description: F[String] = + (operations.map(_.mkString_("\n")), state.map(_.describe)).mapN { (logStr, stateStr) => + s"""|=== Activity Log === + |$logStr + |=== Current State === + |$stateStr""".stripMargin + } + + override def lowLevel: LowLevel[F] = this + + override def stdOutOperations: F[List[Op.StdOutOp]] = operations.map(_.flatMap { + case op: Op.StdOutOp => Chain.one(op) + case _ => Chain.empty + }.toList) + + override def stdErrOperations: F[List[Op.StdErrOp]] = operations.map(_.flatMap { + case op: Op.StdErrOp => Chain.one(op) + case _ => Chain.empty + }.toList) + + override def stdInOperations: F[List[Op.StdInOp]] = operations.map(_.flatMap { + case op: Op.StdInOp => Chain.one(op) + case _ => Chain.empty + }.toList) + + override def operationsLog: F[List[Op]] = operations.map(_.toList) + } + } + + sealed trait Op + object Op { + sealed trait StdOutOp + sealed trait StdErrOp + sealed trait StdInOp + + final case class Error(value: String) extends Op with StdErrOp + final case class Errorln(value: String) extends Op with StdErrOp + final case class Print(value: String) extends Op with StdOutOp + final case class Println(value: String) extends Op with StdOutOp + final case class Write(value: String) extends Op with StdInOp + final case class Writeln(value: String) extends Op with StdInOp + final case class ReadAttempted(id: Int) extends Op with StdInOp + final case class ReadSuccess(id: Int, line: String) extends Op with StdInOp + final case class ReadFailure(id: Int, throwable: Throwable) extends Op with StdInOp + final case class DiscardStdInContents(lines: Long, bytes: Long) extends Op + final case class NotifyPendingReads(requests: Long) extends Op + case object Closed extends Op + + implicit val show: Show[Op] = Show.show { + case Error(value) => s"error($value)" + case Errorln(value) => s"errorln($value)" + case Print(value) => s"print($value)" + case Println(value) => s"println($value)" + case Write(value) => s"Writing to stdIn: $value" + case Writeln(value) => s"Writing line to stdIn: $value" + case ReadAttempted(id) => s"Reading stdIn [id: $id]" + case ReadSuccess(id, line) => s"Read from stdIn [id: $id]: $line" + case ReadFailure(id, throwable) => s"Read from stdIn failed [id: $id]: $throwable" + case DiscardStdInContents(lines, bytes) => + s"Discarded $lines lines and $bytes bytes from stdIn" + case NotifyPendingReads(requests) => s"Notified $requests pending read requests" + case Closed => "Closed" + } } trait TestStdIn[F[_]] { @@ -186,10 +294,12 @@ object TestConsole { } object TestStdIn { - def default[F[_]](stateRef: Ref[F, TestStdIn.State[F]], log: String => F[Unit])( + def default[F[_]](stateRef: Ref[F, TestStdIn.State[F]], inspector: Inspector[F])( implicit F: Concurrent[F]): F[TestStdIn[F]] = (Semaphore[F](1L), Ref.of[F, Int](0)).mapN { (semaphore, readIdRef) => new TestStdIn[F] { + import inspector.log + private val defaultCharset = Charset.defaultCharset() private def streamClosed = new EOFException("End Of File") @@ -198,14 +308,13 @@ object TestConsole { write(value, defaultCharset) override def write[A](value: A, charset: Charset)(implicit S: Show[A]): F[Unit] = - log(show"Writing to stdIn: $value") *> writeImpl(State.Chunk(value.show, charset)) + log(Op.Write(value.show)) *> writeImpl(State.Chunk(value.show, charset)) override def writeln[A](value: A)(implicit S: Show[A]): F[Unit] = writeln(value, defaultCharset) override def writeln[A](value: A, charset: Charset)(implicit S: Show[A]): F[Unit] = - log(show"Writing line to stdIn: $value") *> writeImpl( - State.Chunk(show"$value\n", charset)) + log(Op.Writeln(value.show)) *> writeImpl(State.Chunk(show"$value\n", charset)) private def writeImpl(chunk: State.Chunk): F[Unit] = if (chunk.isEmpty) F.unit @@ -246,7 +355,7 @@ object TestConsole { semaphore .permit .use { _ => - log(s"Reading stdIn [id: $readId]") *> + log(Op.ReadAttempted(readId)) *> stateRef.get.flatMap { case State.Closed() => F.raiseError[Deferred[F, Either[Throwable, Array[Byte]]]](streamClosed) @@ -265,31 +374,26 @@ object TestConsole { .flatMap(_.traverse(bytes => Concurrent[F].catchNonFatal(new String(bytes, charset)))) .flatTap { - case Left(ex) => log(s"Read from stdIn failed [id: $readId]: $ex") - case Right(line) => log(s"Read from stdIn [id: $readId]: $line") + case Left(ex) => log(Op.ReadFailure(readId, ex)) + case Right(line) => log(Op.ReadSuccess(readId, line)) } .rethrow } override private[testkit] def close: F[Unit] = semaphore.permit.use { _ => - stateRef - .get - .flatTap(_ => log("Closing stdIn")) - .flatMap { - case State.Closed() => F.unit - case State.Ready(lines, partial) => - log(s"Discarding ${lines.length} lines and ${partial.chunks.length} bytes from stdIn") *> - stateRef.set(State.closed) - case State.Waiting(requests, buffer) => - log(s"Discarding ${buffer.chunks.length} bytes from stdIn") - .unlessA(buffer.chunks.isEmpty) *> - log(s"Notifying ${requests.length} pending read requests") - .unlessA(requests.isEmpty) *> - stateRef.set(State.closed) *> - requests.traverse_(_.complete(streamClosed.asLeft)) - } - .flatTap(_ => log("Closed stdIn")) + stateRef.get.flatMap { + case State.Closed() => F.unit + case State.Ready(lines, partial) => + log(Op.DiscardStdInContents(lines.length, partial.length)) + .unlessA(partial.isEmpty) *> + stateRef.set(State.closed) + case State.Waiting(requests, buffer) => + log(Op.DiscardStdInContents(0, buffer.length)).unlessA(buffer.isEmpty) *> + log(Op.NotifyPendingReads(requests.length)).unlessA(requests.isEmpty) *> + requests.traverse_(_.complete(streamClosed.asLeft).attempt) *> + stateRef.set(State.closed) + } } } } @@ -358,6 +462,8 @@ object TestConsole { final case class PartialLine(chunks: Chain[Chunk]) { def isEmpty: Boolean = chunks.forall(_.isEmpty) + def length: Long = chunks.map(_.bytes.length.toLong).combineAll + def render: String = chunks.mkString_("") def toLine: Line = Line(chunks) diff --git a/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala b/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala index 930e17f194..29770da958 100644 --- a/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/testkit/TestConsoleSpec.scala @@ -34,7 +34,7 @@ class TestConsoleSpec extends BaseSuite { for { _ <- console.println(120) _ <- console.print("foo") - stdOut <- inspector.stdOutContents + stdOut <- inspector.stdOut } yield assertEquals(stdOut, "120\nfoo") } } @@ -45,7 +45,7 @@ class TestConsoleSpec extends BaseSuite { for { _ <- console.errorln(120) _ <- console.error("foo") - stdOut <- inspector.stdErrContents + stdOut <- inspector.stdErr } yield assertEquals(stdOut, "120\nfoo") } } @@ -285,10 +285,9 @@ class TestConsoleSpec extends BaseSuite { |Reading stdIn [id: 1] |Writing line to stdIn: foo |Read from stdIn [id: 0]: foo - |Closing stdIn - |Notifying 1 pending read requests + |Notified 1 pending read requests |Read from stdIn failed [id: 1]: java.io.EOFException: End Of File - |Closed stdIn + |Closed |=== Current State === |Closed""".stripMargin) IO.unit else if (log == """|=== Activity Log === @@ -296,9 +295,8 @@ class TestConsoleSpec extends BaseSuite { |Reading stdIn [id: 1] |Writing line to stdIn: foo |Read from stdIn [id: 0]: foo - |Closing stdIn - |Notifying 1 pending read requests - |Closed stdIn + |Notified 1 pending read requests + |Closed |Read from stdIn failed [id: 1]: java.io.EOFException: End Of File |=== Current State === |Closed""".stripMargin) IO.unit @@ -338,21 +336,22 @@ class TestConsoleSpec extends BaseSuite { ) } - test("append the incoming chunk when it does not end with a newline") { + test( + "PartialLine.append should append the incoming chunk when it does not end with a newline") { assertEquals( PartialLine.one(chunk("foo")).append(chunk("bar")), (Chain.empty, PartialLine(Chain(chunk("foo"), chunk("bar")))) ) } - test("emit when the incoming chunk ends with a newline") { + test("PartialLine.append should emit when the incoming chunk ends with a newline") { assertEquals( PartialLine.one(chunk("foo")).append(chunk("bar\n")), (Chain.one(Line(Chain(chunk("foo"), chunk("bar")))), PartialLine.empty) ) } - test("emit multiple if the incoming chunk has multiple newlines") { + test("PartialLine.append should emit multiple if the incoming chunk has multiple newlines") { assertEquals( PartialLine.one(chunk("foo")).append(chunk("bar\nbaz\nqux")), ( From 80aa810eb23ff52442aa051d236e76204dcf25ae Mon Sep 17 00:00:00 2001 From: Morgen Peschke Date: Wed, 23 Jul 2025 09:51:53 -0700 Subject: [PATCH 5/5] Add default values for Show constraints --- .../cats/effect/testkit/TestConsole.scala | 54 ++++++------------- 1 file changed, 16 insertions(+), 38 deletions(-) diff --git a/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala b/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala index 689b45bca5..65b5eb73f5 100644 --- a/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala +++ b/testkit/shared/src/main/scala/cats/effect/testkit/TestConsole.scala @@ -40,10 +40,14 @@ final class TestConsole[F[_]: ApplicativeThrow]( import inspector.log override def readLineWithCharset(charset: Charset): F[String] = stdIn.readLine(charset) - override def print[A](a: A)(implicit S: Show[A]): F[Unit] = log(Op.Print(a.show)) - override def println[A](a: A)(implicit S: Show[A]): F[Unit] = log(Op.Println(a.show)) - override def error[A](a: A)(implicit S: Show[A]): F[Unit] = log(Op.Error(a.show)) - override def errorln[A](a: A)(implicit S: Show[A]): F[Unit] = log(Op.Errorln(a.show)) + override def print[A](a: A)(implicit S: Show[A] = Show.fromToString[A]): F[Unit] = log( + Op.Print(a.show)) + override def println[A](a: A)(implicit S: Show[A] = Show.fromToString[A]): F[Unit] = log( + Op.Println(a.show)) + override def error[A](a: A)(implicit S: Show[A] = Show.fromToString[A]): F[Unit] = log( + Op.Error(a.show)) + override def errorln[A](a: A)(implicit S: Show[A] = Show.fromToString[A]): F[Unit] = log( + Op.Errorln(a.show)) private[testkit] def close: F[Unit] = stdIn.close.recover { case ConsoleClosedException() => () } *> inspector.log(Op.Closed) @@ -245,17 +249,6 @@ object TestConsole { trait TestStdIn[F[_]] { - /** - * Write a string to the simulated stdIn using the system-default charset - * - * Blocked calls to [[TestConsole.readLineWithCharset]] will be woken up if `str` contains - * one or more lines. - * - * @note - * Blocked calls will be woken in a first-in-first-out order. - */ - def write[A](value: A)(implicit S: Show[A]): F[Unit] - /** * Write a string to the simulated stdIn * @@ -265,18 +258,8 @@ object TestConsole { * @note * Blocked calls will be woken in a first-in-first-out order. */ - def write[A](value: A, charset: Charset)(implicit S: Show[A]): F[Unit] - - /** - * Write a string and a newline to the simulated stdIn with the system-default charset - * - * At least one blocked call to [[TestConsole.readLineWithCharset]] will be woken up, if it - * exists. - * - * @note - * Blocked calls will be woken in a first-in-first-out order. - */ - def writeln[A](value: A)(implicit S: Show[A]): F[Unit] + def write[A](value: A, charset: Charset = Charset.defaultCharset())( + implicit S: Show[A] = Show.fromToString[A]): F[Unit] /** * Write a string and a newline to the simulated stdIn @@ -287,7 +270,8 @@ object TestConsole { * @note * Blocked calls will be woken in a first-in-first-out order. */ - def writeln[A](value: A, charset: Charset)(implicit S: Show[A]): F[Unit] + def writeln[A](value: A, charset: Charset = Charset.defaultCharset())( + implicit S: Show[A] = Show.fromToString[A]): F[Unit] private[testkit] def readLine(charset: Charset): F[String] private[testkit] def close: F[Unit] @@ -300,20 +284,14 @@ object TestConsole { new TestStdIn[F] { import inspector.log - private val defaultCharset = Charset.defaultCharset() - private def streamClosed = new EOFException("End Of File") - override def write[A](value: A)(implicit S: Show[A]): F[Unit] = - write(value, defaultCharset) - - override def write[A](value: A, charset: Charset)(implicit S: Show[A]): F[Unit] = + override def write[A](value: A, charset: Charset = Charset.defaultCharset())( + implicit S: Show[A] = Show.fromToString[A]): F[Unit] = log(Op.Write(value.show)) *> writeImpl(State.Chunk(value.show, charset)) - override def writeln[A](value: A)(implicit S: Show[A]): F[Unit] = - writeln(value, defaultCharset) - - override def writeln[A](value: A, charset: Charset)(implicit S: Show[A]): F[Unit] = + override def writeln[A](value: A, charset: Charset = Charset.defaultCharset())( + implicit S: Show[A] = Show.fromToString[A]): F[Unit] = log(Op.Writeln(value.show)) *> writeImpl(State.Chunk(show"$value\n", charset)) private def writeImpl(chunk: State.Chunk): F[Unit] =