Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ Linq
Liquibase
Marek
matchesBrics
maxage
MCPEXP
MediatR
Memberwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,16 @@

builder.Services
.AddMessageBus()
// AddInstrumentation() registers the built-in OpenTelemetryDiagnosticObserver.
// Without this call, Mocha uses a no-op observer with zero overhead.
// AddInstrumentation() registers the built-in ActivityMessagingDiagnosticListener.
// Without this call, Mocha uses a no-op listener with zero overhead.
// Spans are emitted to the "Mocha" activity source - subscribe via AddSource("Mocha").
.AddInstrumentation()
// Register a custom listener alongside the built-in one for application-level telemetry.
// Multiple listeners compose automatically — no manual aggregation needed.
.AddDiagnosticEventListener<ConsoleDiagnosticObserver>()
.AddEventHandler<OrderPlacedHandler>()
.AddInMemory();

// Register a custom observer alongside the built-in one for application-level telemetry.
// The built-in observer emits OpenTelemetry spans. This observer logs to console.
builder.Services.AddSingleton<IBusDiagnosticObserver, ConsoleDiagnosticObserver>();

var app = builder.Build();

app.MapGet("/orders", async (IMessageBus bus) =>
Expand Down Expand Up @@ -95,14 +94,15 @@ public ValueTask HandleAsync(
}
}

// --- Custom diagnostic observer ---
// --- Custom diagnostic listener ---

// Implement IBusDiagnosticObserver to collect telemetry or integrate with a
// non-OpenTelemetry backend. Each method returns an IDisposable whose disposal
// marks the end of the observed scope, enabling duration measurement.
public sealed class ConsoleDiagnosticObserver : IBusDiagnosticObserver
// Extend MessagingDiagnosticEventListener to collect telemetry or integrate with a
// non-OpenTelemetry backend. Override only the methods you care about — the base
// class provides no-op defaults for the rest. Each scope method returns an
// IDisposable whose disposal marks the end of the observed scope.
public sealed class ConsoleDiagnosticObserver : MessagingDiagnosticEventListener
{
public IDisposable Dispatch(IDispatchContext context)
public override IDisposable Dispatch(IDispatchContext context)
{
var startTime = DateTimeOffset.UtcNow;
Console.WriteLine($"[Dispatch] -> {context.DestinationAddress}");
Expand All @@ -114,25 +114,25 @@ public IDisposable Dispatch(IDispatchContext context)
});
}

public IDisposable Receive(IReceiveContext context)
public override IDisposable Receive(IReceiveContext context)
{
Console.WriteLine($"[Receive] <- {context.Endpoint.Address}");
return new Scope(() => Console.WriteLine("[Receive] completed"));
}

public IDisposable Consume(IConsumeContext context)
public override IDisposable Consume(IConsumeContext context)
{
Console.WriteLine($"[Consume] message {context.MessageId}");
return new Scope(() => Console.WriteLine("[Consume] completed"));
}

public void OnDispatchError(IDispatchContext context, Exception exception)
public override void DispatchError(IDispatchContext context, Exception exception)
=> Console.WriteLine($"[Dispatch] error: {exception.Message}");

public void OnReceiveError(IReceiveContext context, Exception exception)
public override void ReceiveError(IReceiveContext context, Exception exception)
=> Console.WriteLine($"[Receive] error: {exception.Message}");

public void OnConsumeError(IConsumeContext context, Exception exception)
public override void ConsumeError(IConsumeContext context, Exception exception)
=> Console.WriteLine($"[Consume] error: {exception.Message}");

private sealed class Scope(Action onDispose) : IDisposable
Expand Down
12 changes: 9 additions & 3 deletions src/Mocha/src/Mocha/Builder/MessageBusBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,16 @@ private static void AddCoreServices(IServiceCollection services, IServiceProvide
services.AddSingleton(loggerFactory);
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));

var diagnosticObserver =
applicationServices.GetService<IBusDiagnosticObserver>() ?? NoOpBusDiagnosticObserver.Instance;
var listeners = applicationServices.GetServices<IMessagingDiagnosticEventListener>().ToArray();

services.AddSingleton(diagnosticObserver);
IMessagingDiagnosticEvents diagnosticEvents = listeners.Length switch
{
0 => NoopMessagingDiagnosticEvents.Instance,
1 => listeners[0],
_ => new AggregateMessagingDiagnosticEvents(listeners)
};

services.AddSingleton(diagnosticEvents);
Comment thread
PascalSenn marked this conversation as resolved.

var naming = applicationServices.GetService<IBusNamingConventions>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,36 @@

namespace Mocha;

/// <summary>
/// Diagnostic observer that emits OpenTelemetry traces and metrics for dispatch, receive, and consume operations.
/// </summary>
/// <remarks>
/// Creates <see cref="Activity"/> spans for each pipeline stage and records exceptions as span events
/// with an error status. Trace context propagation is handled via message headers on the receive path,
/// enabling distributed tracing across transport boundaries.
/// </remarks>
public sealed class OpenTelemetryDiagnosticObserver : IBusDiagnosticObserver
internal sealed class ActivityMessagingDiagnosticListener : MessagingDiagnosticEventListener
{
/// <inheritdoc />
public IDisposable Dispatch(IDispatchContext context)
public override IDisposable Dispatch(IDispatchContext context)
{
return DispatchActivity.Create(context);
}

/// <inheritdoc />
public IDisposable Receive(IReceiveContext context)
public override void DispatchError(IDispatchContext context, Exception exception)
{
return ReceiveActivity.Create(context);
Activity.Current?.AddException(exception);
Activity.Current?.SetStatus(ActivityStatusCode.Error);
}

/// <inheritdoc />
public IDisposable Consume(IConsumeContext context)
public override IDisposable Receive(IReceiveContext context)
{
return ConsumerActivity.Create(context);
return ReceiveActivity.Create(context);
}

/// <inheritdoc />
public void OnReceiveError(IReceiveContext context, Exception exception)
public override void ReceiveError(IReceiveContext context, Exception exception)
{
Activity.Current?.AddException(exception);
Activity.Current?.SetStatus(ActivityStatusCode.Error);
}

/// <inheritdoc />
public void OnDispatchError(IDispatchContext context, Exception exception)
public override IDisposable Consume(IConsumeContext context)
{
Activity.Current?.AddException(exception);
Activity.Current?.SetStatus(ActivityStatusCode.Error);
return ConsumerActivity.Create(context);
}

/// <inheritdoc />
public void OnConsumeError(IConsumeContext context, Exception exception)
public override void ConsumeError(IConsumeContext context, Exception exception)
{
Activity.Current?.AddException(exception);
Activity.Current?.SetStatus(ActivityStatusCode.Error);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using Mocha.Middlewares;

namespace Mocha;

internal sealed class AggregateMessagingDiagnosticEvents(IMessagingDiagnosticEventListener[] listeners)
: IMessagingDiagnosticEvents
{
public IDisposable Dispatch(IDispatchContext context)
{
var scopes = new IDisposable[listeners.Length];

for (var i = 0; i < listeners.Length; i++)
{
scopes[i] = listeners[i].Dispatch(context);
}

return new AggregateActivityScope(scopes);
}

public void DispatchError(IDispatchContext context, Exception exception)
{
for (var i = 0; i < listeners.Length; i++)
{
listeners[i].DispatchError(context, exception);
}
}

public IDisposable Receive(IReceiveContext context)
{
var scopes = new IDisposable[listeners.Length];

for (var i = 0; i < listeners.Length; i++)
{
scopes[i] = listeners[i].Receive(context);
}

return new AggregateActivityScope(scopes);
}

public void ReceiveError(IReceiveContext context, Exception exception)
{
for (var i = 0; i < listeners.Length; i++)
{
listeners[i].ReceiveError(context, exception);
}
}

public IDisposable Consume(IConsumeContext context)
{
var scopes = new IDisposable[listeners.Length];

for (var i = 0; i < listeners.Length; i++)
{
scopes[i] = listeners[i].Consume(context);
}

return new AggregateActivityScope(scopes);
}

public void ConsumeError(IConsumeContext context, Exception exception)
{
for (var i = 0; i < listeners.Length; i++)
{
listeners[i].ConsumeError(context, exception);
}
}

private sealed class AggregateActivityScope(IDisposable[] scopes) : IDisposable
{
private bool _disposed;

public void Dispose()
{
if (!_disposed)
{
for (var i = 0; i < scopes.Length; i++)
{
scopes[i].Dispose();
}

_disposed = true;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Mocha;

/// <summary>
/// Register an implementation of this interface in the DI container to
/// listen to diagnostic events. Multiple implementations can be registered
/// and they will all be called in registration order.
/// </summary>
/// <seealso cref="MessagingDiagnosticEventListener"/>
public interface IMessagingDiagnosticEventListener : IMessagingDiagnosticEvents;
41 changes: 41 additions & 0 deletions src/Mocha/src/Mocha/Instrumentation/IMessagingDiagnosticEvents.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Mocha.Middlewares;

namespace Mocha;

/// <summary>
/// Provides diagnostic events that can be triggered by the messaging pipeline.
/// These events allow monitoring and instrumentation of dispatch, receive, and consume operations.
/// </summary>
/// <seealso cref="IMessagingDiagnosticEventListener"/>
public interface IMessagingDiagnosticEvents
{
/// <summary>
/// Called when a message begins dispatching. Returns a disposable scope.
/// </summary>
IDisposable Dispatch(IDispatchContext context);

/// <summary>
/// Called when an exception occurs during dispatch.
/// </summary>
void DispatchError(IDispatchContext context, Exception exception);

/// <summary>
/// Called when a message begins being received. Returns a disposable scope.
/// </summary>
IDisposable Receive(IReceiveContext context);

/// <summary>
/// Called when an exception occurs during receive.
/// </summary>
void ReceiveError(IReceiveContext context, Exception exception);

/// <summary>
/// Called when a message begins being consumed. Returns a disposable scope.
/// </summary>
IDisposable Consume(IConsumeContext context);

/// <summary>
/// Called when an exception occurs during consume.
/// </summary>
void ConsumeError(IConsumeContext context, Exception exception);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Mocha.Middlewares;

namespace Mocha;

/// <summary>
/// A base class for diagnostic event listeners with default no-op implementations.
/// Extend this class and override the methods you need.
/// </summary>
/// <seealso cref="IMessagingDiagnosticEventListener"/>
public class MessagingDiagnosticEventListener : IMessagingDiagnosticEventListener
{
protected MessagingDiagnosticEventListener() { }

/// <summary>
/// Gets a shared no-op <see cref="IDisposable"/> scope.
/// Calling <see cref="IDisposable.Dispose"/> on this instance is safe and performs no operation.
/// Use this as a default return value from diagnostic methods when no diagnostic activity is needed.
/// </summary>
protected internal static IDisposable EmptyScope { get; } = new EmptyActivityScope();

public virtual IDisposable Dispatch(IDispatchContext context) => EmptyScope;

public virtual void DispatchError(IDispatchContext context, Exception exception) { }

public virtual IDisposable Receive(IReceiveContext context) => EmptyScope;

public virtual void ReceiveError(IReceiveContext context, Exception exception) { }

public virtual IDisposable Consume(IConsumeContext context) => EmptyScope;

public virtual void ConsumeError(IConsumeContext context, Exception exception) { }

private sealed class EmptyActivityScope : IDisposable
{
public void Dispose() { }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Mocha;

internal sealed class NoopMessagingDiagnosticEvents
: MessagingDiagnosticEventListener
{
private NoopMessagingDiagnosticEvents()
{
}

public static NoopMessagingDiagnosticEvents Instance { get; } = new();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,29 @@ namespace Mocha.Middlewares;
/// Without this separation, high receive latency and high handler latency are hard to attribute and
/// tune independently.
/// </remarks>
internal sealed class ConsumerInstrumentationMiddleware(IBusDiagnosticObserver observer)
internal sealed class ConsumerInstrumentationMiddleware(IMessagingDiagnosticEvents events)
{
public async ValueTask InvokeAsync(IConsumeContext context, ConsumerDelegate next)
{
using var scope = observer.Consume(context);
using var scope = events.Consume(context);

await next(context);
try
{
await next(context);
}
catch (Exception ex)
{
events.ConsumeError(context, ex);
throw;
}
Comment thread
PascalSenn marked this conversation as resolved.
}

public static ConsumerMiddlewareConfiguration Create()
=> new(
static (context, next) =>
{
var observer = context.Services.GetRequiredService<IBusDiagnosticObserver>();
var middleware = new ConsumerInstrumentationMiddleware(observer);
var events = context.Services.GetRequiredService<IMessagingDiagnosticEvents>();
var middleware = new ConsumerInstrumentationMiddleware(events);
return ctx => middleware.InvokeAsync(ctx, next);
},
"Instrumentation");
Expand Down
Loading
Loading