diff --git a/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs index 644f0a7328b..decd764adc9 100644 --- a/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs @@ -41,18 +41,20 @@ private static void AssertSuccess(Task task) [Fact] public async Task QueueSource_should_emit_received_message_to_the_stream() { - var s = this.CreateManualSubscriberProbe(); - var queue = - Source.Queue(10, OverflowStrategy.Fail).To(Sink.FromSubscriber(s)).Run(_materializer); - var sub = await s.ExpectSubscriptionAsync(); - - sub.Request(2); - AssertSuccess(queue.OfferAsync(1)); - await s.ExpectNextAsync(1); - AssertSuccess(queue.OfferAsync(2)); - await s.ExpectNextAsync(2); - AssertSuccess(queue.OfferAsync(3)); - sub.Cancel(); + await this.AssertAllStagesStoppedAsync(async () => { + var s = this.CreateManualSubscriberProbe(); + var queue = + Source.Queue(10, OverflowStrategy.Fail).To(Sink.FromSubscriber(s)).Run(_materializer); + var sub = await s.ExpectSubscriptionAsync(); + + sub.Request(2); + AssertSuccess(queue.OfferAsync(1)); + await s.ExpectNextAsync(1); + AssertSuccess(queue.OfferAsync(2)); + await s.ExpectNextAsync(2); + AssertSuccess(queue.OfferAsync(3)); + sub.Cancel(); + }, _materializer); } [Fact] @@ -69,47 +71,53 @@ public void QueueSource_should_be_reusable() } [Fact] - public void QueueSource_should_reject_elements_when_backpressuring_with_maxBuffer_0() + public async Task QueueSource_should_reject_elements_when_backpressuring_with_maxBuffer_0() { - var t = - Source.Queue(0, OverflowStrategy.Backpressure) - .ToMaterialized(this.SinkProbe(), Keep.Both) - .Run(_materializer); - var source = t.Item1; - var probe = t.Item2; - var task = source.OfferAsync(42); - var ex = source.OfferAsync(43); - ex.Invoking(_ => _.Wait(TimeSpan.FromSeconds(3))) - .Should().Throw() - .And.Message.Should() - .Contain("have to wait"); - - probe.RequestNext().Should().Be(42); - task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - task.Result.Should().Be(Enqueued.Instance); - + await this.AssertAllStagesStoppedAsync(async () => { + var t = + Source.Queue(0, OverflowStrategy.Backpressure) + .ToMaterialized(this.SinkProbe(), Keep.Both) + .Run(_materializer); + var source = t.Item1; + var probe = t.Item2; + var task = source.OfferAsync(42); + var ex = source.OfferAsync(43); + ex.Invoking(_ => _.Wait(TimeSpan.FromSeconds(3))) + .Should().Throw() + .And.Message.Should() + .Contain("have to wait"); + + await probe.RequestNextAsync(42); + task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); + task.Result.Should().Be(Enqueued.Instance); + + source.Complete(); + await probe.ExpectCompleteAsync(); + }, _materializer); } [Fact] public async Task QueueSource_should_buffer_when_needed() { - var s = this.CreateManualSubscriberProbe(); - var queue = - Source.Queue(100, OverflowStrategy.DropHead) - .To(Sink.FromSubscriber(s)) - .Run(_materializer); - var sub = await s.ExpectSubscriptionAsync(); - - for (var i = 1; i <= 20; i++) AssertSuccess(queue.OfferAsync(i)); - sub.Request(10); - for (var i = 1; i <= 10; i++) AssertSuccess(queue.OfferAsync(i)); - sub.Request(10); - for (var i = 11; i <= 20; i++) AssertSuccess(queue.OfferAsync(i)); - - for (var i = 200; i <= 399; i++) AssertSuccess(queue.OfferAsync(i)); - sub.Request(100); - for (var i = 300; i <= 399; i++) AssertSuccess(queue.OfferAsync(i)); - sub.Cancel(); + await this.AssertAllStagesStoppedAsync(async () => { + var s = this.CreateManualSubscriberProbe(); + var queue = + Source.Queue(100, OverflowStrategy.DropHead) + .To(Sink.FromSubscriber(s)) + .Run(_materializer); + var sub = await s.ExpectSubscriptionAsync(); + + for (var i = 1; i <= 20; i++) AssertSuccess(queue.OfferAsync(i)); + sub.Request(10); + for (var i = 1; i <= 10; i++) AssertSuccess(queue.OfferAsync(i)); + sub.Request(10); + for (var i = 11; i <= 20; i++) AssertSuccess(queue.OfferAsync(i)); + + for (var i = 200; i <= 399; i++) AssertSuccess(queue.OfferAsync(i)); + sub.Request(100); + for (var i = 300; i <= 399; i++) AssertSuccess(queue.OfferAsync(i)); + sub.Cancel(); + }, _materializer); } [Fact]