Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class ReverseArrowSpec extends StreamSpec {
src ~> f
sink2 <~ f
(the[IllegalArgumentException] thrownBy (s <~ f <~ src)).getMessage should include(
"[StatefulMapConcat.out] is already connected")
"[IterableSource.out] is already connected")
ClosedShape
})
.run(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,19 +301,60 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
}

"use decider when iterator throws" in {

Source
.fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else throw TE("a")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.grouped(10)
.runWith(Sink.head)
.failed
.futureValue shouldBe an[TE]

Source
.fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else throw TE("a")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.recoverWithRetries(1, { case _ => Source.empty })
.grouped(10)
.runWith(Sink.head)
.futureValue shouldBe List(1, 2)

Source
.fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else throw TE("a")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.grouped(10)
.runWith(Sink.head)
.futureValue should ===(List(1, 2, 4, 5))

Source
.fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else throw TE("a")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.grouped(10)
.runWith(Sink.head)
.futureValue should ===(List(1, 2))
.futureValue should ===(List(1, 2, 1, 2, 1, 2, 1, 2, 1, 2))

Source
.fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.grouped(10)
.runWith(Sink.headOption)
.futureValue should ===(None)
.failed
.futureValue shouldBe an[TE]

Source
.fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.grouped(10)
.runWith(Sink.headOption)
.failed
.futureValue shouldBe an[TE]

Source
.fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.recoverWithRetries(1, { case _ => Source.empty })
.grouped(10)
.runWith(Sink.headOption)
.futureValue shouldBe None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,33 @@

package akka.stream.impl.fusing

import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }

import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

import akka.Done
import akka.actor.Cancellable
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.Logging
import akka.stream.{ Shape, _ }
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.FlowMonitorState._
import akka.stream.impl.{ ContextPropagation, LinearTraversalBuilder, ReactiveStreamsCompliance }
import akka.stream.Shape
import akka.stream._
import akka.stream.impl.ContextPropagation
import akka.stream.impl.LinearTraversalBuilder
import akka.stream.impl.ReactiveStreamsCompliance
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout._
import akka.stream.scaladsl._
import akka.stream.stage._

import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
import scala.util.Try
import scala.util.control.NonFatal

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -284,6 +291,51 @@ import akka.stream.stage._
override def toString: String = "SingleSource"
}

final class IterableSource[T](val elements: immutable.Iterable[T]) extends GraphStage[SourceShape[T]] {
ReactiveStreamsCompliance.requireNonNullElement(elements)
override protected def initialAttributes: Attributes = DefaultAttributes.iterableSource
private val out = Outlet[T]("IterableSource.out")
override val shape: SourceShape[T] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private var currentIterator: Iterator[T] = _

override def onPull(): Unit =
try {
if (currentIterator eq null) {
currentIterator = elements.iterator
}
tryPushNextOrComplete()
} catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop => failStage(ex)
case Supervision.Resume => tryPushNextOrComplete()
case Supervision.Restart =>
currentIterator = elements.iterator
tryPushNextOrComplete()
}
}

private def tryPushNextOrComplete(): Unit =
if (currentIterator.hasNext) {
if (isAvailable(out)) {
push(out, currentIterator.next())
}
if (!currentIterator.hasNext) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why call hasNext here,is it ok not to to call here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it just pushed the last element it can immediately complete the stage after that instead of waiting for next pull to make that decision.

completeStage()
}
} else {
completeStage()
}

setHandler(out, this)
}

override def toString: String = "IterableSource"
}

final class FutureFlattenSource[T, M](futureSource: Future[Graph[SourceShape[T], M]])
extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] {
ReactiveStreamsCompliance.requireNonNullElement(futureSource)
Expand Down
5 changes: 2 additions & 3 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ object Source {
* beginning) regardless of when they subscribed.
*/
def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource)
fromGraph(new IterableSource[T](iterable))

/**
* Starts a new `Source` from the given `Future`. The stream will consist of
Expand Down Expand Up @@ -409,8 +409,7 @@ object Source {
* Create a `Source` that will continually emit the given element.
*/
def repeat[T](element: T): Source[T, NotUsed] = {
val next = Some((element, element))
unfold(element)(_ => next).withAttributes(DefaultAttributes.repeat)
fromIterator(() => Iterator.continually(element)).withAttributes(DefaultAttributes.repeat)
}

/**
Expand Down