From 3614a69db4a2e5be1067621e342a231ac9c99e95 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Tue, 22 Aug 2023 12:54:36 +0800 Subject: [PATCH] =str Add IterableSource. Signed-off-by: He-Pin --- .../stream/scaladsl/ReverseArrowSpec.scala | 2 +- .../pekko/stream/scaladsl/SourceSpec.scala | 52 ++++++++++++- .../stream/impl/fusing/IterableSource.scala | 76 +++++++++++++++++++ .../apache/pekko/stream/scaladsl/Source.scala | 7 +- 4 files changed, 130 insertions(+), 7 deletions(-) create mode 100644 stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ReverseArrowSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ReverseArrowSpec.scala index 9d52f1a18e6..9ca05e3b750 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ReverseArrowSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ReverseArrowSpec.scala @@ -227,7 +227,7 @@ class ReverseArrowSpec extends StreamSpec { src ~> f sink2 <~ f (the[IllegalArgumentException] thrownBy (s <~ f <~ src)).getMessage should include( - "[StatefulMapConcat.out] is already connected") + "[IterableSource.out] is already connected") ClosedShape }) .run(), diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index 4e31769fcb4..a24e492269e 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -311,19 +311,67 @@ class SourceSpec extends StreamSpec with DefaultTimeout { } "use decider when iterator throws" in { + + // using stopping decider + Source + .fromIterator(() => (1 to 5).iterator.map(k => if (k != 3) k else throw TE("a"))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .grouped(10) + .runWith(Sink.head) + .failed + .futureValue shouldBe a[TE] + + // using stopping decider with recover + Source + .fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else throw TE("a"))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .recoverWithRetries(1, { case _ => Source.empty }) + .grouped(10) + .runWith(Sink.head) + .futureValue shouldBe List(1, 2) + + // failing on every elements, using stopping decider + Source + .fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b"))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .grouped(10) + .runWith(Sink.headOption) + .failed + .futureValue shouldBe a[TE] + + // failing on every elements, using stopping decider and recover + Source + .fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b"))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .recoverWithRetries(1, { case _ => Source.empty }) + .grouped(10) + .runWith(Sink.headOption) + .futureValue shouldBe None + + // using resuming decider + Source + .fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else throw TE("a"))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .grouped(10) + .runWith(Sink.head) + .futureValue should ===(List(1, 2, 4, 5)) + + // using restarting decider Source .fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else throw TE("a"))) .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) .grouped(10) .runWith(Sink.head) - .futureValue should ===(List(1, 2)) + .futureValue should ===(List(1, 2, 1, 2, 1, 2, 1, 2, 1, 2)) + // with failing on every elements, using restarting decider Source .fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b"))) .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) .grouped(10) .runWith(Sink.headOption) - .futureValue should ===(None) + .failed + .futureValue shouldBe a[TE] } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala new file mode 100644 index 00000000000..0bf99be6e3a --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import scala.collection.immutable +import scala.util.control.NonFatal + +import org.apache.pekko +import pekko.stream.{ Attributes, Outlet, SourceShape, Supervision } +import pekko.stream.ActorAttributes.SupervisionStrategy +import pekko.stream.impl.ReactiveStreamsCompliance +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } + +private[pekko] final class IterableSource[T](val elements: immutable.Iterable[T]) extends GraphStage[SourceShape[T]] { + ReactiveStreamsCompliance.requireNonNullElement(elements) + + override protected def initialAttributes: Attributes = DefaultAttributes.iterableSource + + private val out = Outlet[T]("IterableSource.out") + override val shape: SourceShape[T] = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + private var currentIterator: Iterator[T] = _ + + override def onPull(): Unit = + try { + if (currentIterator eq null) { + currentIterator = elements.iterator + } + tryPushNextOrComplete() + } catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => failStage(ex) + case Supervision.Resume => tryPushNextOrComplete() + case Supervision.Restart => + currentIterator = elements.iterator + tryPushNextOrComplete() + } + } + + private def tryPushNextOrComplete(): Unit = + if (currentIterator.hasNext) { + if (isAvailable(out)) { + push(out, currentIterator.next()) + if (!currentIterator.hasNext) { + completeStage() + } + } + } else { + completeStage() + } + + setHandler(out, this) + } + + override def toString: String = "IterableSource" +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 1d935d990a7..cd33cfb799e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -28,7 +28,7 @@ import pekko.annotation.InternalApi import pekko.stream.{ Outlet, SourceShape, _ } import pekko.stream.impl.{ PublisherSource, _ } import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.{ GraphStages, LazyFutureSource, LazySingleSource } +import pekko.stream.impl.fusing.{ GraphStages, IterableSource, LazyFutureSource, LazySingleSource } import pekko.stream.impl.fusing.GraphStages._ import pekko.stream.stage.GraphStageWithMaterializedValue import pekko.util.ConstantFun @@ -356,7 +356,7 @@ object Source { * beginning) regardless of when they subscribed. */ def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = - single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource) + fromGraph(new IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource) /** * Starts a new `Source` from the given `Future`. The stream will consist of @@ -419,8 +419,7 @@ object Source { * Create a `Source` that will continually emit the given element. */ def repeat[T](element: T): Source[T, NotUsed] = { - val next = Some((element, element)) - unfold(element)(_ => next).withAttributes(DefaultAttributes.repeat) + fromIterator(() => Iterator.continually(element)).withAttributes(DefaultAttributes.repeat) } /**