diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs index 9e11bf4d5fd..d962e6ee801 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs @@ -88,7 +88,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage_ var consumerEndProbe = CreateTestProbe(); var region = await ClusterSharding.Get(Sys).StartAsync($"TestConsumer-{_idCount}", _ => ShardingConsumerController.Create(c => - PropsFor(DefaultConsumerDelay, 42, consumerEndProbe.Ref, c), + PropsFor(DefaultConsumerDelay, 42, consumerEndProbe.Ref, c, expectedProducerCount: 2), ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys), HashCodeMessageExtractor.Create(10, o => string.Empty, o => o)); @@ -108,7 +108,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage_ $"p2-{_idCount}"); // expecting 3 end messages, one for each entity: "entity-0", "entity-1", "entity-2" - var endMessages = await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(5)).ToListAsync(); + var endMessages = await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(10)).ToListAsync(); var producerIds = endMessages.Cast().SelectMany(c => c.ProducerIds).ToList(); producerIds diff --git a/src/core/Akka.Tests/Delivery/TestConsumer.cs b/src/core/Akka.Tests/Delivery/TestConsumer.cs index da163d2ac11..461ca68504e 100644 --- a/src/core/Akka.Tests/Delivery/TestConsumer.cs +++ b/src/core/Akka.Tests/Delivery/TestConsumer.cs @@ -36,16 +36,19 @@ public sealed class TestConsumer : ReceiveActor, IWithTimers private ImmutableHashSet<(string, long)> _processed = ImmutableHashSet<(string, long)>.Empty; private readonly bool _supportRestarts = false; private int _messageCount = 0; + private readonly int _expectedProducerCount; + private ImmutableHashSet _completedProducers = ImmutableHashSet.Empty; public TestConsumer(TimeSpan delay, Func endCondition, IActorRef endReplyTo, - IActorRef consumerController, bool supportRestarts = false) + IActorRef consumerController, int expectedProducerCount = 1, bool supportRestarts = false) { Delay = delay; EndCondition = endCondition; EndReplyTo = endReplyTo; ConsumerController = consumerController; + _expectedProducerCount = expectedProducerCount; _supportRestarts = supportRestarts; - + Active(); } @@ -77,9 +80,27 @@ private void Active() if (EndCondition(job) && (_messageCount > 0 || _supportRestarts)) { - _log.Debug("End at [{0}]", job.SeqNr); - EndReplyTo.Tell(new Collected(_processed.Select(c => c.Item1).ToImmutableHashSet(), _messageCount + 1)); - Context.Stop(Self); + // Track that this producer has completed + if (!_completedProducers.Contains(job.ProducerId)) + { + _completedProducers = _completedProducers.Add(job.ProducerId); + _log.Debug("Producer [{0}] completed at seqNr [{1}]. {2}/{3} producers completed.", + job.ProducerId, job.SeqNr, _completedProducers.Count, _expectedProducerCount); + } + + // Only stop when all expected producers have completed + if (_completedProducers.Count >= _expectedProducerCount) + { + _log.Debug("All {0} producers completed. Stopping consumer.", _expectedProducerCount); + EndReplyTo.Tell(new Collected(_processed.Select(c => c.Item1).ToImmutableHashSet(), _messageCount + 1)); + Context.Stop(Self); + } + else + { + // Continue processing messages from other producers + _processed = cleanProcessed.Add(nextMsg); + _messageCount++; + } } else if (!_supportRestarts && EndCondition(job)) { @@ -188,12 +209,12 @@ public static ConsumerController.SequencedMessage SequencedMessage(string p private static Func ConsumerEndCondition(long seqNr) => msg => msg.SeqNr >= seqNr; - public static Props PropsFor(TimeSpan delay, long seqNr, IActorRef endReplyTo, IActorRef consumerController, bool supportsRestarts = false) => - Props.Create(() => new TestConsumer(delay, ConsumerEndCondition(seqNr), endReplyTo, consumerController, supportsRestarts)); + public static Props PropsFor(TimeSpan delay, long seqNr, IActorRef endReplyTo, IActorRef consumerController, int expectedProducerCount = 1, bool supportsRestarts = false) => + Props.Create(() => new TestConsumer(delay, ConsumerEndCondition(seqNr), endReplyTo, consumerController, expectedProducerCount, supportsRestarts)); public static Props PropsFor(TimeSpan delay, Func endCondition, IActorRef endReplyTo, - IActorRef consumerController, bool supportsRestarts = false) => - Props.Create(() => new TestConsumer(delay, endCondition, endReplyTo, consumerController, supportsRestarts)); + IActorRef consumerController, int expectedProducerCount = 1, bool supportsRestarts = false) => + Props.Create(() => new TestConsumer(delay, endCondition, endReplyTo, consumerController, expectedProducerCount, supportsRestarts)); public ITimerScheduler Timers { get; set; } = null!; }