Skip to content

Commit f1aad13

Browse files
fixes after review
1 parent f217517 commit f1aad13

File tree

2 files changed

+27
-11
lines changed

2 files changed

+27
-11
lines changed

examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala

+13-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//> using dep org.apache.pekko::pekko-stream:1.1.2
33
//> using dep org.typelevel::cats-effect:3.5.7
44
//> using dep com.softwaremill.sttp.client3::core:3.10.2
5-
//> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.1
5+
//> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.2
66

77
package sttp.tapir.examples.streaming
88

@@ -28,14 +28,22 @@ import scala.concurrent.duration.FiniteDuration
2828
object longLastingClient extends IOApp:
2929
implicit val actorSystem: ActorSystem = ActorSystem("longLastingClient")
3030

31+
private val givenLength: Long = 10000
32+
private val chunkSize = 100
33+
private val noChunks = givenLength / chunkSize
34+
3135
private def makeRequest(backend: SttpBackend[Future, PekkoStreams & WebSockets]): Future[Response[Either[String, String]]] =
32-
val stream: Source[ByteString, Any] = Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(10)('A').map(_.toByte))).map { elem =>
33-
println(s"$elem ${java.time.LocalTime.now()}"); elem
34-
}
36+
val stream: Source[ByteString, Any] =
37+
Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(chunkSize)('A').map(_.toByte)))
38+
.zipWithIndex
39+
.take(noChunks)
40+
.map { case (chunk, idx) =>
41+
println(s"Chunk ${idx + 1} sent ${java.time.LocalTime.now()}"); chunk
42+
}
3543

3644
basicRequest
3745
.post(uri"http://localhost:9000/chunks")
38-
.header(Header(HeaderNames.ContentLength, "10000"))
46+
.header(Header(HeaderNames.ContentLength, givenLength.toString))
3947
.streamBody(PekkoStreams)(stream)
4048
.send(backend)
4149

examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala

+14-6
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import sttp.tapir.*
2323
import scala.concurrent.{ExecutionContext, Future}
2424
import scala.util.*
2525
import org.apache.pekko
26-
import pekko.stream.scaladsl.{Flow, Source}
26+
import pekko.stream.scaladsl.{Flow, Source, Sink}
2727
import pekko.util.ByteString
2828
import sttp.tapir.server.play.PlayServerOptions
2929

@@ -41,18 +41,26 @@ def handleErrors[T](f: Future[T]): Future[Either[ErrorInfo, T]] =
4141
Success(Left(e.getMessage))
4242
}
4343

44-
def logic(s: (Long, Source[ByteString, Any])): Future[(Long, Source[ByteString, Any])] = {
44+
def logic(s: (Long, Source[ByteString, Any])): Future[String] = {
4545
val (length, stream) = s
46-
println(s"Received $length bytes, ${stream.map(_.length)} bytes in total")
47-
Future.successful((length, stream))
46+
println(s"Transmitting $length bytes...")
47+
val result = stream
48+
.runFold(List.empty[ByteString])((acc, byteS) => acc :+ byteS)
49+
.map(_.reduce(_ ++ _).decodeString("UTF-8"))
50+
result.onComplete {
51+
case Failure(ex) =>
52+
println(s"Stream failed with exception: $ex" )
53+
case Success(s) =>
54+
println(s"Stream finished: ${s.length}/$length transmitted")
55+
}
56+
result
4857
}
4958

5059
val e = endpoint.post
5160
.in("chunks")
5261
.in(header[Long](HeaderNames.ContentLength))
5362
.in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain()))
54-
.out(header[Long](HeaderNames.ContentLength))
55-
.out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain()))
63+
.out(stringBody)
5664
.errorOut(plainBody[ErrorInfo])
5765
.serverLogic(logic.andThen(handleErrors))
5866

0 commit comments

Comments
 (0)