diff --git a/src/core/Akka.Streams.Tests/Dsl/StreamRefsSpec.cs b/src/core/Akka.Streams.Tests/Dsl/StreamRefsSpec.cs index a25f9d8f197..0cb3b853d2f 100644 --- a/src/core/Akka.Streams.Tests/Dsl/StreamRefsSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/StreamRefsSpec.cs @@ -486,10 +486,86 @@ public void SinkRef_must_not_allow_materializing_multiple_times() var p1 = this.SourceProbe().To(sinkRef.Sink).Run(Materializer); p1.EnsureSubscription(); var req = p1.ExpectRequest(); - + var p2 = this.SourceProbe().To(sinkRef.Sink).Run(Materializer); p2.EnsureSubscription(); // will be cancelled immediately, since it's 2nd p2.ExpectCancellation(); } + + [Fact] + public async Task SourceRef_Source_property_should_be_idempotent_issue_7895() + { + // Reproduction test for issue #7895: https://github.com/akkadotnet/akka.net/issues/7895 + // The .Source property creates a new SourceRefStageImpl on every access, + // which is not idempotent behavior and can cause intermittent subscription timeouts + + // Create a SourceRef + var sourceRef = await Source.From(new[] { 1, 2, 3 }) + .ToMaterialized(StreamRefs.SourceRef(), Keep.Right) + .Run(Materializer); + + // Access .Source property twice (simulates multiple accesses) + // This could happen via debugger inspection, logging, serialization, etc. + var source1 = sourceRef.Source; + var source2 = sourceRef.Source; + + // BUG: They're NOT the same object (non-idempotent behavior) + // Each property access creates a new Source with a new SourceRefStageImpl + // When fixed, this assertion should PASS with ReferenceEquals(source1, source2) == true + ReferenceEquals(source1, source2).Should().BeTrue( + "Source property should be idempotent and return the same instance"); + } + + [Fact] + public async Task SourceRef_multiple_materializations_cause_timeout_issue_7895() + { + // Reproduction test for issue #7895: https://github.com/akkadotnet/akka.net/issues/7895 + // This test demonstrates the race condition from multiple .Source property accesses + // Multiple .Source property accesses create racing SourceRefStageImpl instances + + // Create a SourceRef with short timeout + var sourceRef = await Source.From(Enumerable.Range(1, 100)) + .ToMaterialized(StreamRefs.SourceRef(), Keep.Right) + .WithAttributes(StreamRefAttributes.CreateSubscriptionTimeout(TimeSpan.FromSeconds(3))) + .Run(Materializer); + + // Access .Source twice - creates TWO SourceRefStageImpl instances + var source1 = sourceRef.Source; + var source2 = sourceRef.Source; + + // Materialize both - they race for the same SinkRef handshake + var task1 = source1.RunWith(Sink.Seq(), Materializer); + var task2 = source2.RunWith(Sink.Seq(), Materializer); + + // Wait for both with timeout protection + var allTasks = Task.WhenAll( + task1.ContinueWith(t => t), + task2.ContinueWith(t => t) + ); + + try + { + await allTasks; + } + catch + { + // Expected: at least one should fail + } + + // Check results - at least one should have failed/timed out + var results = new[] { task1, task2 }; + var completedCount = results.Count(t => t.Status == TaskStatus.RanToCompletion); + var faultedCount = results.Count(t => t.Status == TaskStatus.Faulted); + + // Due to race condition: sometimes both fail, sometimes one succeeds + (completedCount + faultedCount).Should().Be(2, "Both tasks should have completed or faulted"); + + // At least one should have issues due to duplicate stage instances + if (faultedCount > 0) + { + var failedTask = results.First(t => t.Status == TaskStatus.Faulted); + failedTask.Exception.InnerException.Should().BeOfType(); + } + } } } diff --git a/src/core/Akka.Streams/Implementation/StreamRef/SinkRefImpl.cs b/src/core/Akka.Streams/Implementation/StreamRef/SinkRefImpl.cs index 47bd5ae26fe..483e82d1fe7 100644 --- a/src/core/Akka.Streams/Implementation/StreamRef/SinkRefImpl.cs +++ b/src/core/Akka.Streams/Implementation/StreamRef/SinkRefImpl.cs @@ -46,11 +46,19 @@ protected SinkRefImpl(IActorRef initialPartnerRef) [InternalApi] internal sealed class SinkRefImpl : SinkRefImpl, ISinkRef { - public SinkRefImpl(IActorRef initialPartnerRef) : base(initialPartnerRef) { } + private readonly Lazy> _sink; + + public SinkRefImpl(IActorRef initialPartnerRef) : base(initialPartnerRef) + { + _sink = new Lazy>(() => + Dsl.Sink.FromGraph(new SinkRefStageImpl(InitialPartnerRef)) + .MapMaterializedValue(_ => NotUsed.Instance)); + } + public override Type EventType => typeof(T); - public override ISurrogate ToSurrogate(ActorSystem system) => SerializationTools.ToSurrogate(this); + public Sink Sink => _sink.Value; - public Sink Sink => Dsl.Sink.FromGraph(new SinkRefStageImpl(InitialPartnerRef)).MapMaterializedValue(_ => NotUsed.Instance); + public override ISurrogate ToSurrogate(ActorSystem system) => SerializationTools.ToSurrogate(this); } /// diff --git a/src/core/Akka.Streams/Implementation/StreamRef/SourceRefImpl.cs b/src/core/Akka.Streams/Implementation/StreamRef/SourceRefImpl.cs index e9bef29f744..8964129a6cd 100644 --- a/src/core/Akka.Streams/Implementation/StreamRef/SourceRefImpl.cs +++ b/src/core/Akka.Streams/Implementation/StreamRef/SourceRefImpl.cs @@ -48,13 +48,20 @@ protected SourceRefImpl(IActorRef initialPartnerRef) /// /// INTERNAL API: Implementation class, not intended to be touched directly by end-users. /// - [InternalApi] + [InternalApi] internal sealed class SourceRefImpl : SourceRefImpl, ISourceRef { - public SourceRefImpl(IActorRef initialPartnerRef) : base(initialPartnerRef) { } + private readonly Lazy> _source; + + public SourceRefImpl(IActorRef initialPartnerRef) : base(initialPartnerRef) + { + _source = new Lazy>(() => + Dsl.Source.FromGraph(new SourceRefStageImpl(InitialPartnerRef)) + .MapMaterializedValue(_ => NotUsed.Instance)); + } + public override Type EventType => typeof(T); - public Source Source => - Dsl.Source.FromGraph(new SourceRefStageImpl(InitialPartnerRef)).MapMaterializedValue(_ => NotUsed.Instance); + public Source Source => _source.Value; public override ISurrogate ToSurrogate(ActorSystem system) => SerializationTools.ToSurrogate(this); }