Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage_
var consumerEndProbe = CreateTestProbe();
var region = await ClusterSharding.Get(Sys).StartAsync($"TestConsumer-{_idCount}", _ =>
ShardingConsumerController.Create<Job>(c =>
PropsFor(DefaultConsumerDelay, 42, consumerEndProbe.Ref, c),
PropsFor(DefaultConsumerDelay, 42, consumerEndProbe.Ref, c, expectedProducerCount: 2),
ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys),
HashCodeMessageExtractor.Create(10,
o => string.Empty, o => o));
Expand All @@ -108,7 +108,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage_
$"p2-{_idCount}");

// expecting 3 end messages, one for each entity: "entity-0", "entity-1", "entity-2"
var endMessages = await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(5)).ToListAsync();
var endMessages = await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(10)).ToListAsync();

var producerIds = endMessages.Cast<Collected>().SelectMany(c => c.ProducerIds).ToList();
producerIds
Expand Down
39 changes: 30 additions & 9 deletions src/core/Akka.Tests/Delivery/TestConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,19 @@ public sealed class TestConsumer : ReceiveActor, IWithTimers
private ImmutableHashSet<(string, long)> _processed = ImmutableHashSet<(string, long)>.Empty;
private readonly bool _supportRestarts = false;
private int _messageCount = 0;
private readonly int _expectedProducerCount;
private ImmutableHashSet<string> _completedProducers = ImmutableHashSet<string>.Empty;

public TestConsumer(TimeSpan delay, Func<SomeAsyncJob, bool> endCondition, IActorRef endReplyTo,
IActorRef consumerController, bool supportRestarts = false)
IActorRef consumerController, int expectedProducerCount = 1, bool supportRestarts = false)
{
Delay = delay;
EndCondition = endCondition;
EndReplyTo = endReplyTo;
ConsumerController = consumerController;
_expectedProducerCount = expectedProducerCount;
_supportRestarts = supportRestarts;

Active();
}

Expand Down Expand Up @@ -77,9 +80,27 @@ private void Active()

if (EndCondition(job) && (_messageCount > 0 || _supportRestarts))
{
_log.Debug("End at [{0}]", job.SeqNr);
EndReplyTo.Tell(new Collected(_processed.Select(c => c.Item1).ToImmutableHashSet(), _messageCount + 1));
Context.Stop(Self);
// Track that this producer has completed
if (!_completedProducers.Contains(job.ProducerId))

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key fix - need to track completions individually per-producer if there are more than 1 of them messaging the same TestConsumer

{
_completedProducers = _completedProducers.Add(job.ProducerId);
_log.Debug("Producer [{0}] completed at seqNr [{1}]. {2}/{3} producers completed.",
job.ProducerId, job.SeqNr, _completedProducers.Count, _expectedProducerCount);
}

// Only stop when all expected producers have completed
if (_completedProducers.Count >= _expectedProducerCount)
{
_log.Debug("All {0} producers completed. Stopping consumer.", _expectedProducerCount);
EndReplyTo.Tell(new Collected(_processed.Select(c => c.Item1).ToImmutableHashSet(), _messageCount + 1));
Context.Stop(Self);
}
else
{
// Continue processing messages from other producers
_processed = cleanProcessed.Add(nextMsg);
_messageCount++;
}
}
else if (!_supportRestarts && EndCondition(job))
{
Expand Down Expand Up @@ -188,12 +209,12 @@ public static ConsumerController.SequencedMessage<Job> SequencedMessage(string p

private static Func<SomeAsyncJob, bool> ConsumerEndCondition(long seqNr) => msg => msg.SeqNr >= seqNr;

public static Props PropsFor(TimeSpan delay, long seqNr, IActorRef endReplyTo, IActorRef consumerController, bool supportsRestarts = false) =>
Props.Create(() => new TestConsumer(delay, ConsumerEndCondition(seqNr), endReplyTo, consumerController, supportsRestarts));
public static Props PropsFor(TimeSpan delay, long seqNr, IActorRef endReplyTo, IActorRef consumerController, int expectedProducerCount = 1, bool supportsRestarts = false) =>
Props.Create(() => new TestConsumer(delay, ConsumerEndCondition(seqNr), endReplyTo, consumerController, expectedProducerCount, supportsRestarts));

public static Props PropsFor(TimeSpan delay, Func<SomeAsyncJob, bool> endCondition, IActorRef endReplyTo,
IActorRef consumerController, bool supportsRestarts = false) =>
Props.Create(() => new TestConsumer(delay, endCondition, endReplyTo, consumerController, supportsRestarts));
IActorRef consumerController, int expectedProducerCount = 1, bool supportsRestarts = false) =>
Props.Create(() => new TestConsumer(delay, endCondition, endReplyTo, consumerController, expectedProducerCount, supportsRestarts));

public ITimerScheduler Timers { get; set; } = null!;
}
Expand Down