Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,22 @@ public interface IInMemoryMessagingTransportDescriptor : IMessagingTransportDesc
ReceiveMiddlewareConfiguration configuration,
string? before = null,
string? after = null);

/// <summary>
/// Claims a handler for this in-memory transport and returns a configurator for its receive endpoint.
/// The handler will be bound to a convention-named endpoint on this transport during initialization.
/// </summary>
/// <typeparam name="THandler">The handler type implementing <see cref="IHandler"/>.</typeparam>
/// <returns>A configurator that allows configuring the handler's receive endpoint.</returns>
new IHandlerConfigurator<IInMemoryReceiveEndpointDescriptor> Handler<THandler>()
where THandler : class, IHandler;

/// <summary>
/// Claims a consumer for this in-memory transport and returns a configurator for its receive endpoint.
/// The consumer will be bound to a convention-named endpoint on this transport during initialization.
/// </summary>
/// <typeparam name="TConsumer">The consumer type implementing <see cref="IConsumer"/>.</typeparam>
/// <returns>A configurator that allows configuring the consumer's receive endpoint.</returns>
new IConsumerConfigurator<IInMemoryReceiveEndpointDescriptor> Consumer<TConsumer>()
where TConsumer : class, IConsumer;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ public interface IInMemoryReceiveEndpointDescriptor : IReceiveEndpointDescriptor
/// <inheritdoc />
new IInMemoryReceiveEndpointDescriptor Handler<THandler>() where THandler : class, IHandler;

/// <inheritdoc />
new IInMemoryReceiveEndpointDescriptor Handler(Type handlerType);

/// <inheritdoc />
new IInMemoryReceiveEndpointDescriptor Consumer<TConsumer>() where TConsumer : class, IConsumer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ private InMemoryDispatchEndpointDescriptor(IMessagingConfigurationContext contex
Configuration = new InMemoryDispatchEndpointConfiguration { Name = name, TopicName = name };
}

protected override InMemoryDispatchEndpointConfiguration Configuration { get; set; }
protected internal override InMemoryDispatchEndpointConfiguration Configuration { get; protected set; }

public IInMemoryDispatchEndpointDescriptor ToQueue(string name)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public InMemoryMessagingTransportDescriptor(IMessagingSetupContext discoveryCont
Configuration = new InMemoryTransportConfiguration();
}

protected override InMemoryTransportConfiguration Configuration { get; set; }
protected internal override InMemoryTransportConfiguration Configuration { get; protected set; }

/// <inheritdoc />
public new IInMemoryMessagingTransportDescriptor ModifyOptions(Action<TransportOptions> configure)
Expand Down Expand Up @@ -108,6 +108,24 @@ public InMemoryMessagingTransportDescriptor(IMessagingSetupContext discoveryCont
return this;
}

/// <inheritdoc />
public new IHandlerConfigurator<IInMemoryReceiveEndpointDescriptor> Handler<THandler>()
where THandler : class, IHandler
{
var claim = new HandlerClaim { HandlerType = typeof(THandler) };
HandlerClaims.Add(claim);
return new HandlerConfigurator<IInMemoryReceiveEndpointDescriptor>(claim);
}

/// <inheritdoc />
public new IConsumerConfigurator<IInMemoryReceiveEndpointDescriptor> Consumer<TConsumer>()
where TConsumer : class, IConsumer
{
var claim = new HandlerClaim { HandlerType = typeof(TConsumer) };
HandlerClaims.Add(claim);
return new ConsumerConfigurator<IInMemoryReceiveEndpointDescriptor>(claim);
}

/// <inheritdoc />
public IInMemoryReceiveEndpointDescriptor Endpoint(string name)
{
Expand Down Expand Up @@ -184,6 +202,15 @@ public IInMemoryBindingDescriptor DeclareBinding(string exchange, string queue)
/// <returns>The fully populated transport configuration ready for runtime initialization.</returns>
public InMemoryTransportConfiguration CreateConfiguration()
{
foreach (var claim in HandlerClaims)
{
var name = Context.Naming.GetReceiveEndpointName(
claim.HandlerType, ReceiveEndpointKind.Default);
var endpoint = (InMemoryReceiveEndpointDescriptor)Endpoint(name);
Comment thread
PascalSenn marked this conversation as resolved.
Outdated
endpoint.Handler(claim.HandlerType);
claim.ConfigureEndpoint?.Invoke(endpoint);
}
Comment thread
PascalSenn marked this conversation as resolved.
Outdated

Configuration.ReceiveEndpoints = _receiveEndpoints
.Select(ReceiveEndpointConfiguration (e) => e.CreateConfiguration())
.ToList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ internal InMemoryReceiveEndpointDescriptor(IMessagingConfigurationContext discov
return this;
}

public new IInMemoryReceiveEndpointDescriptor Handler(Type handlerType)
{
base.Handler(handlerType);

return this;
}

public new IInMemoryReceiveEndpointDescriptor Consumer<TConsumer>() where TConsumer : class, IConsumer
{
base.Consumer<TConsumer>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public InMemoryBindingDescriptor(IMessagingConfigurationContext context, string
}

/// <inheritdoc />
protected override InMemoryBindingConfiguration Configuration { get; set; }
protected internal override InMemoryBindingConfiguration Configuration { get; protected set; }

/// <inheritdoc />
public IInMemoryBindingDescriptor Source(string topicName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public InMemoryQueueDescriptor(IMessagingConfigurationContext context, string na
}

/// <inheritdoc />
protected override InMemoryQueueConfiguration Configuration { get; set; }
protected internal override InMemoryQueueConfiguration Configuration { get; protected set; }

/// <inheritdoc />
public IInMemoryQueueDescriptor Name(string name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public InMemoryTopicDescriptor(IMessagingConfigurationContext context, string na
}

/// <inheritdoc />
protected override InMemoryTopicConfiguration Configuration { get; set; }
protected internal override InMemoryTopicConfiguration Configuration { get; protected set; }

/// <inheritdoc />
public IInMemoryTopicDescriptor Name(string name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,12 @@ public interface IPostgresMessagingTransportDescriptor
ReceiveMiddlewareConfiguration configuration,
string? before = null,
string? after = null);

/// <inheritdoc cref="IMessagingTransportDescriptor.Handler{THandler}"/>
new IHandlerConfigurator<IPostgresReceiveEndpointDescriptor> Handler<THandler>()
where THandler : class, IHandler;

/// <inheritdoc cref="IMessagingTransportDescriptor.Consumer{TConsumer}"/>
new IConsumerConfigurator<IPostgresReceiveEndpointDescriptor> Consumer<TConsumer>()
where TConsumer : class, IConsumer;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ public interface IPostgresReceiveEndpointDescriptor : IReceiveEndpointDescriptor
/// <inheritdoc cref="IReceiveEndpointDescriptor{T}.Handler{THandler}"/>
new IPostgresReceiveEndpointDescriptor Handler<THandler>() where THandler : class, IHandler;

/// <inheritdoc cref="IReceiveEndpointDescriptor{T}.Handler(Type)"/>
new IPostgresReceiveEndpointDescriptor Handler(Type handlerType);

/// <inheritdoc cref="IReceiveEndpointDescriptor{T}.Consumer{TConsumer}"/>
new IPostgresReceiveEndpointDescriptor Consumer<TConsumer>() where TConsumer : class, IConsumer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ public PostgresMessagingTransportDescriptor(IMessagingSetupContext discoveryCont
return this;
}

/// <inheritdoc />
public new IHandlerConfigurator<IPostgresReceiveEndpointDescriptor> Handler<THandler>()
where THandler : class, IHandler
{
var claim = new HandlerClaim { HandlerType = typeof(THandler) };
HandlerClaims.Add(claim);
return new HandlerConfigurator<IPostgresReceiveEndpointDescriptor>(claim);
}

/// <inheritdoc />
public new IConsumerConfigurator<IPostgresReceiveEndpointDescriptor> Consumer<TConsumer>()
where TConsumer : class, IConsumer
{
var claim = new HandlerClaim { HandlerType = typeof(TConsumer) };
HandlerClaims.Add(claim);
return new ConsumerConfigurator<IPostgresReceiveEndpointDescriptor>(claim);
}

/// <inheritdoc />
public IPostgresMessagingTransportDescriptor AutoProvision(bool autoProvision = true)
{
Expand Down Expand Up @@ -210,6 +228,15 @@ public IPostgresSubscriptionDescriptor DeclareSubscription(string topic, string
/// <returns>The fully populated transport configuration ready for runtime initialization.</returns>
public PostgresTransportConfiguration CreateConfiguration()
{
foreach (var claim in HandlerClaims)
{
var name = Context.Naming.GetReceiveEndpointName(
claim.HandlerType, ReceiveEndpointKind.Default);
var endpoint = Endpoint(name);
endpoint.Handler(claim.HandlerType);
claim.ConfigureEndpoint?.Invoke(endpoint);
}
Comment thread
PascalSenn marked this conversation as resolved.
Outdated

Configuration.ReceiveEndpoints = _receiveEndpoints
.Select(ReceiveEndpointConfiguration (e) => e.CreateConfiguration())
.ToList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ internal PostgresReceiveEndpointDescriptor(IMessagingConfigurationContext discov
return this;
}

/// <inheritdoc />
public new IPostgresReceiveEndpointDescriptor Handler(Type handlerType)
{
base.Handler(handlerType);

return this;
}

/// <inheritdoc />
public new IPostgresReceiveEndpointDescriptor Consumer<TConsumer>() where TConsumer : class, IConsumer
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,12 @@ IRabbitMQMessagingTransportDescriptor ConnectionProvider(
ReceiveMiddlewareConfiguration configuration,
string? before = null,
string? after = null);

/// <inheritdoc cref="IMessagingTransportDescriptor.Handler{THandler}" />
new IHandlerConfigurator<IRabbitMQReceiveEndpointDescriptor> Handler<THandler>()
where THandler : class, IHandler;

/// <inheritdoc cref="IMessagingTransportDescriptor.Consumer{TConsumer}" />
new IConsumerConfigurator<IRabbitMQReceiveEndpointDescriptor> Consumer<TConsumer>()
where TConsumer : class, IConsumer;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ public interface IRabbitMQReceiveEndpointDescriptor : IReceiveEndpointDescriptor
/// <inheritdoc cref="IReceiveEndpointDescriptor{TConfiguration}.Handler{THandler}" />
new IRabbitMQReceiveEndpointDescriptor Handler<THandler>() where THandler : class, IHandler;

/// <inheritdoc cref="IReceiveEndpointDescriptor{TConfiguration}.Handler(Type)" />
new IRabbitMQReceiveEndpointDescriptor Handler(Type handlerType);

/// <inheritdoc cref="IReceiveEndpointDescriptor{TConfiguration}.Consumer{TConsumer}" />
new IRabbitMQReceiveEndpointDescriptor Consumer<TConsumer>() where TConsumer : class, IConsumer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,24 @@ public RabbitMQMessagingTransportDescriptor(IMessagingSetupContext discoveryCont
return this;
}

/// <inheritdoc />
public new IHandlerConfigurator<IRabbitMQReceiveEndpointDescriptor> Handler<THandler>()
where THandler : class, IHandler
{
var claim = new HandlerClaim { HandlerType = typeof(THandler) };
HandlerClaims.Add(claim);
return new HandlerConfigurator<IRabbitMQReceiveEndpointDescriptor>(claim);
}

/// <inheritdoc />
public new IConsumerConfigurator<IRabbitMQReceiveEndpointDescriptor> Consumer<TConsumer>()
where TConsumer : class, IConsumer
{
var claim = new HandlerClaim { HandlerType = typeof(TConsumer) };
HandlerClaims.Add(claim);
return new ConsumerConfigurator<IRabbitMQReceiveEndpointDescriptor>(claim);
}

/// <inheritdoc />
public IRabbitMQMessagingTransportDescriptor AutoProvision(bool autoProvision = true)
{
Expand Down Expand Up @@ -202,6 +220,15 @@ public IRabbitMQBindingDescriptor DeclareBinding(string exchange, string queue)
/// <returns>A fully populated <see cref="RabbitMQTransportConfiguration"/> ready for transport initialization.</returns>
public RabbitMQTransportConfiguration CreateConfiguration()
{
foreach (var claim in HandlerClaims)
{
var name = Context.Naming.GetReceiveEndpointName(
claim.HandlerType, ReceiveEndpointKind.Default);
var endpoint = Endpoint(name);
endpoint.Handler(claim.HandlerType);
claim.ConfigureEndpoint?.Invoke(endpoint);
}
Comment thread
PascalSenn marked this conversation as resolved.
Outdated

Configuration.ReceiveEndpoints = _receiveEndpoints
.Select(ReceiveEndpointConfiguration (e) => e.CreateConfiguration())
.ToList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ private RabbitMQReceiveEndpointDescriptor(IMessagingConfigurationContext discove
return this;
}

public new IRabbitMQReceiveEndpointDescriptor Handler(Type handlerType)
{
base.Handler(handlerType);

return this;
}
Comment thread
PascalSenn marked this conversation as resolved.

public new IRabbitMQReceiveEndpointDescriptor Consumer<TConsumer>() where TConsumer : class, IConsumer
{
base.Consumer<TConsumer>();
Expand Down
1 change: 1 addition & 0 deletions src/Mocha/src/Mocha/Assembly.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
[assembly: InternalsVisibleTo("Mocha.EntityFrameworkCore")]
[assembly: InternalsVisibleTo("Mocha.Transport.RabbitMQ")]
[assembly: InternalsVisibleTo("Mocha.Transport.Postgres")]
[assembly: InternalsVisibleTo("Mocha.Transport.InMemory")]
[assembly: InternalsVisibleTo("Mocha.Tests")]
[assembly: InternalsVisibleTo("Mocha.Sagas.TestHelpers")]
[assembly: InternalsVisibleTo("Mocha.Sagas.Tests")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ public interface IReceiveEndpointDescriptor<out TConfiguration>
/// <returns>The descriptor instance for method chaining.</returns>
IReceiveEndpointDescriptor<TConfiguration> Handler<THandler>() where THandler : class, IHandler;

/// <summary>
/// Binds a handler to this receive endpoint by its runtime type.
/// </summary>
/// <param name="handlerType">The handler type to bind.</param>
/// <returns>The descriptor instance for method chaining.</returns>
IReceiveEndpointDescriptor<TConfiguration> Handler(Type handlerType);
Comment thread
PascalSenn marked this conversation as resolved.

/// <summary>
/// Binds a consumer to this receive endpoint, ensuring its messages are consumed on this
/// endpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ public IReceiveEndpointDescriptor<T> Consumer<TConsumer>() where TConsumer : cla
return this;
}

/// <summary>
/// Binds a handler to this receive endpoint by its runtime type.
/// </summary>
/// <param name="handlerType">The handler type to bind.</param>
/// <returns>The descriptor instance for method chaining.</returns>
public IReceiveEndpointDescriptor<T> Handler(Type handlerType)
{
Configuration.ConsumerIdentities.Add(handlerType);
return this;
}

public IReceiveEndpointDescriptor<T> Kind(ReceiveEndpointKind kind)
{
Configuration.Kind = kind;
Expand Down
25 changes: 25 additions & 0 deletions src/Mocha/src/Mocha/Transport/ConsumerConfigurator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace Mocha;

/// <summary>
/// Internal implementation of <see cref="IConsumerConfigurator{TEndpointDescriptor}"/> that
/// captures endpoint configuration actions on the underlying <see cref="HandlerClaim"/>.
/// </summary>
/// <typeparam name="TEndpointDescriptor">The endpoint descriptor type exposed by the transport.</typeparam>
internal sealed class ConsumerConfigurator<TEndpointDescriptor>
: IConsumerConfigurator<TEndpointDescriptor>
{
private readonly HandlerClaim _claim;

internal ConsumerConfigurator(HandlerClaim claim) => _claim = claim;

/// <inheritdoc />
public IConsumerConfigurator<TEndpointDescriptor> ConfigureEndpoint(
Action<TEndpointDescriptor> configure)
{
var prev = _claim.ConfigureEndpoint;
_claim.ConfigureEndpoint = prev is null
? obj => configure((TEndpointDescriptor)obj)
: obj => { prev(obj); configure((TEndpointDescriptor)obj); };
return this;
}
}
20 changes: 20 additions & 0 deletions src/Mocha/src/Mocha/Transport/HandlerClaim.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace Mocha;

/// <summary>
/// Stores a transport-level handler claim, capturing the handler type and an optional
/// endpoint configuration action to apply when the claim is materialized.
/// </summary>
internal sealed class HandlerClaim
{
/// <summary>
/// Gets the handler type that is claimed by the transport.
/// </summary>
public required Type HandlerType { get; init; }

/// <summary>
/// Gets or sets an optional configuration action applied to the receive endpoint descriptor
/// when the claim is materialized. The delegate accepts the endpoint descriptor as <see cref="object"/>
/// and casts internally to the transport-specific type.
/// </summary>
public Action<object>? ConfigureEndpoint { get; set; }
}
Loading
Loading