From d879eb8c4ff7922339d6864e21506200905b711a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 16 Aug 2023 18:14:33 +0200 Subject: [PATCH] fix: Avoid turning all stream timeouts to TcpIdleTimeoutException --- .../tcp-idle-timeout.excludes | 2 ++ .../akka/stream/StreamTimeoutException.scala | 2 +- .../main/scala/akka/stream/impl/Timers.scala | 11 +++++++-- .../scala/akka/stream/impl/io/TcpStages.scala | 24 +++++++------------ .../main/scala/akka/stream/scaladsl/Tcp.scala | 3 +-- 5 files changed, 21 insertions(+), 21 deletions(-) create mode 100644 akka-stream/src/main/mima-filters/2.9.0.backwards.excludes/tcp-idle-timeout.excludes diff --git a/akka-stream/src/main/mima-filters/2.9.0.backwards.excludes/tcp-idle-timeout.excludes b/akka-stream/src/main/mima-filters/2.9.0.backwards.excludes/tcp-idle-timeout.excludes new file mode 100644 index 00000000000..23ccad56e8b --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.9.0.backwards.excludes/tcp-idle-timeout.excludes @@ -0,0 +1,2 @@ +# Internal API changed +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.Timers#IdleTimeoutBidi.this") diff --git a/akka-stream/src/main/scala/akka/stream/StreamTimeoutException.scala b/akka-stream/src/main/scala/akka/stream/StreamTimeoutException.scala index 36c59a83692..6375484d8b4 100644 --- a/akka-stream/src/main/scala/akka/stream/StreamTimeoutException.scala +++ b/akka-stream/src/main/scala/akka/stream/StreamTimeoutException.scala @@ -15,7 +15,7 @@ import akka.annotation.DoNotInherit * Not for user extension */ @DoNotInherit -sealed class StreamTimeoutException(msg: String) extends TimeoutException(msg) with NoStackTrace +class StreamTimeoutException(msg: String) extends TimeoutException(msg) with NoStackTrace final class InitialTimeoutException(msg: String) extends StreamTimeoutException(msg) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala index ba942ed2701..5c91b4649e4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala @@ -151,7 +151,14 @@ import akka.stream.stage._ } - final class IdleTimeoutBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] { + private object IdleTimeoutBidi { + val defaultFailureCreator: FiniteDuration => Throwable = timeout => + new StreamIdleTimeoutException(s"No elements passed in the last ${timeout.toCoarsest}.") + } + final class IdleTimeoutBidi[I, O]( + val timeout: FiniteDuration, + failureCreator: FiniteDuration => Throwable = IdleTimeoutBidi.defaultFailureCreator) + extends GraphStage[BidiShape[I, I, O, O]] { val in1 = Inlet[I]("in1") val in2 = Inlet[O]("in2") val out1 = Outlet[I]("out1") @@ -170,7 +177,7 @@ import akka.stream.stage._ final override def onTimer(key: Any): Unit = if (nextDeadline - System.nanoTime < 0) - failStage(new StreamIdleTimeoutException(s"No elements passed in the last ${timeout.toCoarsest}.")) + failStage(failureCreator(timeout)) override def preStart(): Unit = scheduleWithFixedDelay(GraphStageLogicTimer, timeoutCheckInterval(timeout), timeoutCheckInterval(timeout)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index b0e978df601..80c6ce8ab3a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -5,14 +5,11 @@ package akka.stream.impl.io import java.net.InetSocketAddress -import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong } - import scala.annotation.nowarn import scala.collection.immutable import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.{ Duration, FiniteDuration } - import akka.{ Done, NotUsed } import akka.actor.{ ActorRef, Terminated } import akka.annotation.InternalApi @@ -22,6 +19,7 @@ import akka.io.Tcp import akka.io.Tcp._ import akka.stream._ import akka.stream.impl.ReactiveStreamsCompliance +import akka.stream.impl.Timers import akka.stream.impl.fusing.GraphStages.detacher import akka.stream.scaladsl.{ BidiFlow, Flow, TcpIdleTimeoutException, Tcp => StreamTcp } import akka.stream.scaladsl.Tcp.{ OutgoingConnection, ServerBinding } @@ -593,19 +591,13 @@ private[stream] object ConnectionSourceStage { case Some(address) => s" on connection to [$address]" case _ => "" } + BidiFlow.fromGraph( + new Timers.IdleTimeoutBidi( + idleTimeout, + failureCreator = _ => + new TcpIdleTimeoutException( + s"TCP idle-timeout encountered$connectionToString, no bytes passed in the last $idleTimeout", + idleTimeout))) - val toNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = - BidiFlow.fromFlows( - Flow[ByteString].mapError { - case _: TimeoutException => - new TcpIdleTimeoutException( - s"TCP idle-timeout encountered$connectionToString, no bytes passed in the last $idleTimeout", - idleTimeout) - }, - Flow[ByteString]) - val fromNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = - toNetTimeout.reversed // now the bottom flow transforms the exception, the top one doesn't (since that one is "fromNet") - - fromNetTimeout.atop(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](idleTimeout)).atop(toNetTimeout) } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index fb9d9f3613e..bca3449605d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -5,7 +5,6 @@ package akka.stream.scaladsl import java.net.InetSocketAddress -import java.util.concurrent.TimeoutException import javax.net.ssl.SSLEngine import javax.net.ssl.SSLSession @@ -394,7 +393,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { } final class TcpIdleTimeoutException(msg: String, @unused timeout: Duration) - extends TimeoutException(msg: String) + extends StreamTimeoutException(msg: String) with NoStackTrace // only used from a single stage object TcpAttributes {