Skip to content

Commit

Permalink
Merge pull request #7258 from Particular/publisher-metadata
Browse files Browse the repository at this point in the history
Consistently use PublisherMetadata to facilitate endpoint setup in Azure Service Bus
  • Loading branch information
danielmarbach authored Jan 9, 2025
2 parents 5632b27 + fa0c755 commit 9bb1bf2
Show file tree
Hide file tree
Showing 54 changed files with 250 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,12 @@ public static void RouteToEndpoint(this RoutingSettings routingSettings, Type me
var destinationEndpointAddress = Conventions.EndpointNamingConvention(destinationEndpointType);
routingSettings.RouteToEndpoint(messageType, destinationEndpointAddress);
}

public static void EnforcePublisherMetadataRegistration(this EndpointConfiguration config, string endpointName, PublisherMetadata publisherMetadata)
{
config.Pipeline.Register(new EnforcePublisherMetadataBehavior(endpointName, publisherMetadata),
"Enforces all published events have corresponding mappings in the PublisherMetadata");
config.Pipeline.Register(new EnforceSubscriptionPublisherMetadataBehavior(endpointName, publisherMetadata),
"Enforces all subscribed events have corresponding mappings in the PublisherMetadata");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace NServiceBus.AcceptanceTesting.Support;

using System;
using System.Threading.Tasks;
using Pipeline;

public class EnforcePublisherMetadataBehavior(string endpointName, PublisherMetadata publisherMetadata) : IBehavior<IOutgoingPublishContext, IOutgoingPublishContext>
{
public Task Invoke(IOutgoingPublishContext context, Func<IOutgoingPublishContext, Task> next)
{
var publisherDetails = publisherMetadata[endpointName];
if (!publisherDetails.Events.Contains(context.Message.MessageType))
{
throw new Exception($"The event '{context.Message.MessageType}' being published by '{endpointName}' does not have a corresponding mapping in the PublisherMetadata. Add the following code to the endpoint configuration builder: metadata.RegisterSelfAsPublisherFor<{context.Message.MessageType}>(this);");
}
return next(context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace NServiceBus.AcceptanceTesting.Support;

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Pipeline;

public class EnforceSubscriptionPublisherMetadataBehavior(string endpointName, PublisherMetadata publisherMetadata) : IBehavior<ISubscribeContext, ISubscribeContext>
{
readonly HashSet<Type> eventTypeMap = publisherMetadata.Publishers.SelectMany(publisher => publisher.Events).ToHashSet();

public Task Invoke(ISubscribeContext context, Func<ISubscribeContext, Task> next)
{
var unmappedEventTypes = new List<Type>(context.EventTypes.Where(eventType => !eventTypeMap.Contains(eventType)));
if (unmappedEventTypes.Count == 0)
{
return next(context);
}

var builder = new StringBuilder();
_ = builder.AppendLine($"The following event(s) are being subscribed to by '{endpointName}' but do not have a corresponding mapping in the PublisherMetadata:");
foreach (var eventType in unmappedEventTypes)
{
_ = builder.AppendLine($"- metadata.RegisterPublisherFor<{eventType}>(\"typeof(PublisherEndpointToBeDetermined)\");");
}
throw new Exception(builder.ToString());
}
}
36 changes: 17 additions & 19 deletions src/NServiceBus.AcceptanceTesting/Support/PublisherMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,31 @@ public void RegisterPublisherFor<T>(string endpointName)
publisher.RegisterOwnedEvent<T>();
}

public void RegisterPublisherFor<T>(Type endpointType)
{
RegisterPublisherFor<T>(Conventions.EndpointNamingConvention(endpointType));
}
public void RegisterSelfAsPublisherFor<TEventType>(EndpointConfigurationBuilder self) =>
RegisterPublisherFor<TEventType>(Conventions.EndpointNamingConvention(self.GetType()));

Dictionary<string, PublisherDetails> publisherDetails = [];
public void RegisterPublisherFor<TEventType, TPublisher>() where TPublisher : EndpointConfigurationBuilder =>
RegisterPublisherFor<TEventType>(Conventions.EndpointNamingConvention(typeof(TPublisher)));

public class PublisherDetails
{
public PublisherDetails(string publisherName)
{
PublisherName = publisherName;
}
public void RegisterPublisherFor<TEventType>(Type endpointType) =>
RegisterPublisherFor<TEventType>(Conventions.EndpointNamingConvention(endpointType));

public List<Type> Events { get; } = [];
public PublisherDetails this[string publisherName] =>
publisherDetails.TryGetValue(publisherName, out var publisherDetail)
? publisherDetail
: new PublisherDetails(publisherName);

public string PublisherName { get; }
readonly Dictionary<string, PublisherDetails> publisherDetails = [];

public class PublisherDetails(string publisherName)
{
public HashSet<Type> Events { get; } = [];

public string PublisherName { get; } = publisherName;

public void RegisterOwnedEvent<T>()
{
var eventType = typeof(T);

if (Events.Contains(eventType))
{
return;
}

Events.Add(eventType);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@
using System;
using System.IO;
using System.Threading.Tasks;
using AcceptanceTesting.Customization;
using NServiceBus;
using NServiceBus.AcceptanceTesting.Support;
using NUnit.Framework;

public class ConfigureEndpointAcceptanceTestingTransport : IConfigureEndpointTestExecution
public class ConfigureEndpointAcceptanceTestingTransport(
bool useNativePubSub,
bool useNativeDelayedDelivery,
TransportTransactionMode? transactionMode = null,
bool? enforcePublisherMetadata = null)
: IConfigureEndpointTestExecution
{
public ConfigureEndpointAcceptanceTestingTransport(bool useNativePubSub, bool useNativeDelayedDelivery, TransportTransactionMode? transactionMode = null)
{
this.useNativePubSub = useNativePubSub;
this.useNativeDelayedDelivery = useNativeDelayedDelivery;
this.transactionMode = transactionMode;
}

public Task Cleanup()
{
try
Expand All @@ -25,26 +24,22 @@ public Task Cleanup()
Directory.Delete(storageDir, true);
}
}
catch { }
catch
{
// ignored
}

return Task.CompletedTask;
}

public Task Configure(string endpointName, EndpointConfiguration configuration, RunSettings settings, PublisherMetadata publisherMetadata)
public Task Configure(string endpointName, EndpointConfiguration configuration, RunSettings settings,
PublisherMetadata publisherMetadata)
{
var testRunId = TestContext.CurrentContext.Test.ID;

string tempDir;

if (Environment.OSVersion.Platform == PlatformID.Win32NT)
{
string tempDir =
//can't use bin dir since that will be too long on the build agents
tempDir = @"c:\temp";
}
else
{
tempDir = Path.GetTempPath();
}
Environment.OSVersion.Platform == PlatformID.Win32NT ? @"c:\temp" : Path.GetTempPath();

storageDir = Path.Combine(tempDir, "acc", testRunId);

Expand All @@ -60,6 +55,11 @@ public Task Configure(string endpointName, EndpointConfiguration configuration,
acceptanceTestingTransport.TransportTransactionMode = transactionMode.Value;
}

if (enforcePublisherMetadata.GetValueOrDefault(false))
{
configuration.EnforcePublisherMetadataRegistration(endpointName, publisherMetadata);
}

var routing = configuration.UseTransport(acceptanceTestingTransport);

if (!useNativePubSub)
Expand All @@ -77,9 +77,5 @@ public Task Configure(string endpointName, EndpointConfiguration configuration,
return Task.CompletedTask;
}

readonly bool useNativePubSub;
readonly bool useNativeDelayedDelivery;
readonly TransportTransactionMode? transactionMode;

string storageDir;
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public Subscriber()
},
metadata =>
{
metadata.RegisterPublisherFor<EventToSubscribeTo>(typeof(Subscriber));
metadata.RegisterPublisherFor<EventToExclude>(typeof(Subscriber));
metadata.RegisterPublisherFor<EventToSubscribeTo, Subscriber>();
metadata.RegisterPublisherFor<EventToExclude, Subscriber>();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public Subscriber()
EndpointSetup<DefaultServer>((c, r) => c.Pipeline.Register("SubscriptionSpy", new SubscriptionSpy((Context)r.ScenarioContext), "Spies on subscriptions made"),
metadata =>
{
metadata.RegisterPublisherFor<MyEventBase>(typeof(Subscriber));
metadata.RegisterPublisherFor<MyEvent>(typeof(Subscriber));
metadata.RegisterPublisherFor<MyEventBase, Subscriber>();
metadata.RegisterPublisherFor<MyEvent, Subscriber>();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public Subscriber()
},
metadata =>
{
metadata.RegisterPublisherFor<MyEventWithParent>(typeof(Subscriber));
metadata.RegisterPublisherFor<MyEvent>(typeof(Subscriber));
metadata.RegisterPublisherFor<MyEventWithParent, Subscriber>();
metadata.RegisterPublisherFor<MyEvent, Subscriber>();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public Subscriber()
},
metadata =>
{
metadata.RegisterPublisherFor<MyEvent>(typeof(Subscriber));
metadata.RegisterPublisherFor<MyEventWithNoHandler>(typeof(Subscriber));
metadata.RegisterPublisherFor<MyEvent, Subscriber>();
metadata.RegisterPublisherFor<MyEventWithNoHandler, Subscriber>();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public Subscriber() =>
},
metadata =>
{
metadata.RegisterPublisherFor<SomeEvent>(typeof(Publisher));
metadata.RegisterPublisherFor<SomeEvent, Publisher>();
});

public class ThisHandlesSomethingHandler : IHandleMessages<SomeEvent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public Subscriber() =>
},
metadata =>
{
metadata.RegisterPublisherFor<ThisIsAnEvent>(typeof(Publisher));
metadata.RegisterPublisherFor<ThisIsAnEvent, Publisher>();
});

public class ThisHandlesSomethingHandler : IHandleMessages<ThisIsAnEvent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class SubscribingEndpoint : EndpointConfigurationBuilder
{
public SubscribingEndpoint() => EndpointSetup<OpenTelemetryEnabledEndpoint>(
c => c.DisableFeature<AutoSubscribe>(),
p => p.RegisterPublisherFor<DemoEvent>(typeof(PublishingEndpoint)));
p => p.RegisterPublisherFor<DemoEvent, PublishingEndpoint>());
}

class PublishingEndpoint : EndpointConfigurationBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class SubscriberEndpoint : EndpointConfigurationBuilder
{
public SubscriberEndpoint() => EndpointSetup<OpenTelemetryEnabledEndpoint>(
c => c.DisableFeature<AutoSubscribe>(),
p => p.RegisterPublisherFor<DemoEvent>(typeof(PublishingEndpoint)));
p => p.RegisterPublisherFor<DemoEvent, PublishingEndpoint>());
}

class PublishingEndpoint : EndpointConfigurationBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class Subscriber1 : EndpointConfigurationBuilder
{
public Subscriber1()
{
EndpointSetup<DefaultServer>(builder => builder.DisableFeature<AutoSubscribe>(), metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));
EndpointSetup<DefaultServer>(builder => builder.DisableFeature<AutoSubscribe>(), metadata => metadata.RegisterPublisherFor<MyEvent, Publisher>());
}

public class MyHandler : IHandleMessages<MyEvent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public Subscriber()
routingSettings.DisablePublishing();
}, p =>
{
p.RegisterPublisherFor<ForbiddenEvent>(typeof(PublisherWithAuthorizer));
p.RegisterPublisherFor<AllowedEvent>(typeof(PublisherWithAuthorizer));
p.RegisterPublisherFor<ForbiddenEvent, PublisherWithAuthorizer>();
p.RegisterPublisherFor<AllowedEvent, PublisherWithAuthorizer>();
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public EndpointWithDisabledPublishing()
var routingSettings = new RoutingSettings<AcceptanceTestingTransport>(c.GetSettings());
routingSettings.DisablePublishing();
},
pm => pm.RegisterPublisherFor<TestEvent>(typeof(MessageDrivenPublisher)));
pm => pm.RegisterPublisherFor<TestEvent, MessageDrivenPublisher>());
}

class EventHandler : IHandleMessages<TestEvent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public PublisherAndSubscriber()
context.EventSubscribed = true;
}
});
}, metadata => metadata.RegisterPublisherFor<Event>(typeof(PublisherAndSubscriber)));
}, metadata => metadata.RegisterPublisherFor<Event, PublisherAndSubscriber>());
}

public class Handler : IHandleMessages<Event>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
namespace NServiceBus.AcceptanceTests.Routing.MessageDrivenSubscriptions;
namespace NServiceBus.AcceptanceTests.Core.Routing.MessageDrivenSubscriptions;

using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using Configuration.AdvancedExtensibility;
using EndpointTemplates;
using Features;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTesting.Customization;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Configuration.AdvancedExtensibility;
using NServiceBus.Features;
using NServiceBus.Routing.MessageDrivenSubscriptions;
using NUnit.Framework;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
namespace NServiceBus.AcceptanceTests.Routing.MessageDrivenSubscriptions;
namespace NServiceBus.AcceptanceTests.Core.Routing.MessageDrivenSubscriptions;

using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using Features;
using Logging;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Features;
using NServiceBus.Logging;
using NUnit.Framework;

public class Missing_pub_info : NServiceBusAcceptanceTest
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
namespace NServiceBus.AcceptanceTests.Routing.MessageDrivenSubscriptions;
namespace NServiceBus.AcceptanceTests.Core.Routing.MessageDrivenSubscriptions;

using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using Extensibility;
using Features;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Extensibility;
using NServiceBus.Features;
using NServiceBus.Persistence;
using NServiceBus.Unicast.Subscriptions;
using NServiceBus.Unicast.Subscriptions.MessageDrivenSubscriptions;
using NUnit.Framework;
using Persistence;
using Unicast.Subscriptions;
using Unicast.Subscriptions.MessageDrivenSubscriptions;
using Conventions = AcceptanceTesting.Customization.Conventions;

public class Pub_from_sendonly : NServiceBusAcceptanceTest
Expand Down Expand Up @@ -46,7 +46,7 @@ public SendOnlyPublisher()
b.SendOnly();
b.UsePersistence(typeof(HardCodedPersistence));
b.DisableFeature<AutoSubscribe>();
});
}, metadata => metadata.RegisterSelfAsPublisherFor<MyEvent>(this));
}
}

Expand Down
Loading

0 comments on commit 9bb1bf2

Please sign in to comment.