diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala index 6e06e9f51d7..ecd04d91baf 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala @@ -217,7 +217,7 @@ class ReverseArrowSpec extends StreamSpec { src ~> f sink2 <~ f (the[IllegalArgumentException] thrownBy (s <~ f <~ src)).getMessage should include( - "[StatefulMapConcat.out] is already connected") + "[IterableSource.out] is already connected") ClosedShape }) .run(), diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index f3e4cbe5c67..c2dec99f35a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -301,19 +301,60 @@ class SourceSpec extends StreamSpec with DefaultTimeout { } "use decider when iterator throws" in { + + Source + .fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else throw TE("a"))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .grouped(10) + .runWith(Sink.head) + .failed + .futureValue shouldBe an[TE] + + Source + .fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else throw TE("a"))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .recoverWithRetries(1, { case _ => Source.empty }) + .grouped(10) + .runWith(Sink.head) + .futureValue shouldBe List(1, 2) + + Source + .fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else throw TE("a"))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .grouped(10) + .runWith(Sink.head) + .futureValue should ===(List(1, 2, 4, 5)) + Source .fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else throw TE("a"))) .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) .grouped(10) .runWith(Sink.head) - .futureValue should ===(List(1, 2)) + .futureValue should ===(List(1, 2, 1, 2, 1, 2, 1, 2, 1, 2)) Source .fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b"))) .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) .grouped(10) .runWith(Sink.headOption) - .futureValue should ===(None) + .failed + .futureValue shouldBe an[TE] + + Source + .fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b"))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .grouped(10) + .runWith(Sink.headOption) + .failed + .futureValue shouldBe an[TE] + + Source + .fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b"))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .recoverWithRetries(1, { case _ => Source.empty }) + .grouped(10) + .runWith(Sink.headOption) + .futureValue shouldBe None } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 1e8d32578f4..f4dd6995ebf 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -4,26 +4,33 @@ package akka.stream.impl.fusing -import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } - -import scala.annotation.unchecked.uncheckedVariance -import scala.concurrent.{ Future, Promise } -import scala.concurrent.duration.FiniteDuration -import scala.util.Try - import akka.Done import akka.actor.Cancellable import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.event.Logging -import akka.stream.{ Shape, _ } +import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream.FlowMonitorState._ -import akka.stream.impl.{ ContextPropagation, LinearTraversalBuilder, ReactiveStreamsCompliance } +import akka.stream.Shape +import akka.stream._ +import akka.stream.impl.ContextPropagation +import akka.stream.impl.LinearTraversalBuilder +import akka.stream.impl.ReactiveStreamsCompliance import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout._ import akka.stream.scaladsl._ import akka.stream.stage._ +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.unchecked.uncheckedVariance +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.concurrent.duration.FiniteDuration +import scala.util.Try +import scala.util.control.NonFatal + /** * INTERNAL API */ @@ -284,6 +291,51 @@ import akka.stream.stage._ override def toString: String = "SingleSource" } + final class IterableSource[T](val elements: immutable.Iterable[T]) extends GraphStage[SourceShape[T]] { + ReactiveStreamsCompliance.requireNonNullElement(elements) + override protected def initialAttributes: Attributes = DefaultAttributes.iterableSource + private val out = Outlet[T]("IterableSource.out") + override val shape: SourceShape[T] = SourceShape(out) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + private var currentIterator: Iterator[T] = _ + + override def onPull(): Unit = + try { + if (currentIterator eq null) { + currentIterator = elements.iterator + } + tryPushNextOrComplete() + } catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => failStage(ex) + case Supervision.Resume => tryPushNextOrComplete() + case Supervision.Restart => + currentIterator = elements.iterator + tryPushNextOrComplete() + } + } + + private def tryPushNextOrComplete(): Unit = + if (currentIterator.hasNext) { + if (isAvailable(out)) { + push(out, currentIterator.next()) + } + if (!currentIterator.hasNext) { + completeStage() + } + } else { + completeStage() + } + + setHandler(out, this) + } + + override def toString: String = "IterableSource" + } + final class FutureFlattenSource[T, M](futureSource: Future[Graph[SourceShape[T], M]]) extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { ReactiveStreamsCompliance.requireNonNullElement(futureSource) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 9365a2fe7ea..c9b8467c3d7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -346,7 +346,7 @@ object Source { * beginning) regardless of when they subscribed. */ def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = - single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource) + fromGraph(new IterableSource[T](iterable)) /** * Starts a new `Source` from the given `Future`. The stream will consist of @@ -409,8 +409,7 @@ object Source { * Create a `Source` that will continually emit the given element. */ def repeat[T](element: T): Source[T, NotUsed] = { - val next = Some((element, element)) - unfold(element)(_ => next).withAttributes(DefaultAttributes.repeat) + fromIterator(() => Iterator.continually(element)).withAttributes(DefaultAttributes.repeat) } /**