Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,32 @@ public async Task with_inline_endpoint_explicit_scheduling()
await host.StopAsync();
}

[Fact]
public async Task schedule_to_topic_with_subscription_listener()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAzureServiceBusTesting()
.AutoProvision().AutoPurgeOnStartup();

opts.PublishMessage<AsbMessage1>().ToAzureServiceBusTopic("scheduled-topic");
opts.ListenToAzureServiceBusSubscription("scheduled-sub")
.FromTopic("scheduled-topic")
.ProcessInline();
}).StartAsync();

var session = await host.TrackActivity()
.IncludeExternalTransports()
.Timeout(20.Seconds())
.ExecuteAndWaitAsync(c => c.ScheduleAsync(new AsbMessage1("topic scheduled"), 3.Seconds()));

session.Received.SingleMessage<AsbMessage1>()
.Name.ShouldBe("topic scheduled");

await host.StopAsync();
}

[Fact]
public async Task with_buffered_endpoint() // durable would have similar mechanics
{
Expand Down
8 changes: 4 additions & 4 deletions src/Wolverine/Configuration/EndpointCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public interface IEndpointCollection : IAsyncDisposable
Task StartListenerAsync(Endpoint endpoint, CancellationToken cancellationToken);
Task StopListenerAsync(Endpoint endpoint, CancellationToken cancellationToken);

IListenerCircuit FindListenerCircuit(Uri address);
IListenerCircuit? FindListenerCircuit(Uri address);
}

public class EndpointCollection : IEndpointCollection
Expand Down Expand Up @@ -258,15 +258,15 @@ public async Task StopListenerAsync(Endpoint endpoint, CancellationToken cancell
}
}

public IListenerCircuit FindListenerCircuit(Uri address)
public IListenerCircuit? FindListenerCircuit(Uri address)
{
if (address.Scheme == TransportConstants.Local)
{
return (IListenerCircuit)GetOrBuildSendingAgent(address);
}

return (FindListeningAgent(address) ??
FindListeningAgent(TransportConstants.Durable))!;
return FindListeningAgent(address) ??
FindListeningAgent(TransportConstants.Durable);
}

public async Task StartListenerAsync(Endpoint endpoint, CancellationToken cancellationToken)
Expand Down
16 changes: 15 additions & 1 deletion src/Wolverine/Runtime/WolverineRuntime.EnqueueDirectly.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,21 @@ public async ValueTask EnqueueDirectlyAsync(IReadOnlyList<Envelope> envelopes)
var groups = envelopes.GroupBy(x => x.Destination ?? TransportConstants.LocalUri).ToArray();
foreach (var group in groups)
{
await Endpoints.FindListenerCircuit(group.Key).EnqueueDirectlyAsync(group);
var listener = Endpoints.FindListenerCircuit(group.Key);
if (listener != null)
{
await listener.EnqueueDirectlyAsync(group);
}
else
{
// For send-only endpoints (e.g. Azure Service Bus topics),
// there is no listener circuit. Send through the sending agent instead.
var sender = Endpoints.GetOrBuildSendingAgent(group.Key);
foreach (var envelope in group)
{
await sender.EnqueueOutgoingAsync(envelope);
}
}
}
}
}
Loading