Skip to content
Merged
1 change: 1 addition & 0 deletions src/All.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@
<Project Path="Mocha/src/Mocha.EntityFrameworkCore/Mocha.EntityFrameworkCore.csproj" />
<Project Path="Mocha/src/Mocha.Hosting/Mocha.Hosting.csproj" />
<Project Path="Mocha/src/Mocha.Outbox/Mocha.Outbox.csproj" />
<Project Path="Mocha/src/Mocha.Scheduling/Mocha.Scheduling.csproj" />
<Project Path="Mocha/src/Mocha.Threading/Mocha.Threading.csproj" />
<Project Path="Mocha/src/Mocha.Transport.InMemory/Mocha.Transport.InMemory.csproj" />
<Project Path="Mocha/src/Mocha.Transport.Postgres/Mocha.Transport.Postgres.csproj" />
Expand Down
1 change: 1 addition & 0 deletions src/Mocha/Mocha.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<Project Path="src/Mocha.Hosting/Mocha.Hosting.csproj" />
<Project Path="src/Mocha.Inbox/Mocha.Inbox.csproj" />
<Project Path="src/Mocha.Outbox/Mocha.Outbox.csproj" />
<Project Path="src/Mocha.Scheduling/Mocha.Scheduling.csproj" />
<Project Path="src/Mocha.Threading/Mocha.Threading.csproj" />
<Project Path="src/Mocha.Transport.InMemory/Mocha.Transport.InMemory.csproj" />
<Project Path="src/Mocha.Transport.RabbitMQ/Mocha.Transport.RabbitMQ.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void Cleanup()
obj.Message = s_message;
obj.MessageType = s_type;
var result = obj.Message;
// No return GC collects it
// No return - GC collects it
return result;
}

Expand Down
4 changes: 2 additions & 2 deletions src/Mocha/src/Examples/MediatorShowcase/Handlers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public sealed class OrderShippedEmailHandler(ILogger<OrderShippedEmailHandler> l
{
public ValueTask HandleAsync(OrderShippedNotification notification, CancellationToken cancellationToken)
{
logger.LogInformation("[Email] Order {OrderId} shipped email sent to customer", notification.OrderId);
logger.LogInformation("[Email] Order {OrderId} shipped - email sent to customer", notification.OrderId);
return ValueTask.CompletedTask;
}
}
Expand All @@ -135,7 +135,7 @@ public sealed class OrderShippedAnalyticsHandler(ILogger<OrderShippedAnalyticsHa
{
public ValueTask HandleAsync(OrderShippedNotification notification, CancellationToken cancellationToken)
{
logger.LogInformation("[Analytics] Order {OrderId} shipped metrics recorded", notification.OrderId);
logger.LogInformation("[Analytics] Order {OrderId} shipped - metrics recorded", notification.OrderId);
return ValueTask.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// Commands
// ──────────────────────────────────────────────────

// Void command no return value
// Void command - no return value
app.MapPost("/api/products", async (CreateProductRequest req, ISender sender) =>
{
await sender.SendAsync(new CreateProductCommand(req.Name, req.Price));
Expand Down
2 changes: 1 addition & 1 deletion src/Mocha/src/Examples/Transports/RabbitMQ/RabbitMQ.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
.Handler<OrderPlacedHandler>();

// Declare a quorum queue explicitly with durable flag.
// Quorum queues require durable=true non-durable quorum queues are not supported.
// Quorum queues require durable=true - non-durable quorum queues are not supported.
transport.DeclareQueue("orders.processing")
.Durable()
.AutoProvision()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<ProjectReference Include="..\Mocha\Mocha.csproj" />
<ProjectReference Include="..\Mocha.EntityFrameworkCore\Mocha.EntityFrameworkCore.csproj" />
<ProjectReference Include="..\Mocha.Inbox\Mocha.Inbox.csproj" />
<ProjectReference Include="..\Mocha.Scheduling\Mocha.Scheduling.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private async Task<bool> ProcessEventAsync(NpgsqlConnection connection, Cancella
if (await reader.ReadAsync(cancellationToken))
{
var id = reader.GetGuid(0);
var envelope = Serializer.ReadMessageEnvelopeSafe(reader, 1);
var envelope = Serializer.ReadMessageEnvelopeSafe(reader, 1, _logger);
var messageType = GetMessageType(envelope?.MessageType);
var isReply = envelope?.Headers?.IsReply() ?? false;
var endpoint = isReply
Expand Down Expand Up @@ -192,13 +192,23 @@ private async Task<bool> ProcessEventAsync(NpgsqlConnection connection, Cancella
}
finally
{
await transaction.CommitAsync(cancellationToken);
try
{
await transaction.CommitAsync(cancellationToken);
}
catch
{
// Commit failed (e.g., connection lost). Attempt rollback.
// If commit actually succeeded server-side, the message stays
// with times_sent incremented - safe, just causes a retry.
try { await transaction.RollbackAsync(CancellationToken.None); } catch { /* swallow */ }
}
}
}
catch (Exception ex)
{
// Log only - no RollbackAsync here (commit already handled in finally)
_logger.UnexpectedErrorWhileProcessingOutboxEvent(ex);
await transaction.RollbackAsync(cancellationToken);
throw;
}
}
Expand Down Expand Up @@ -270,10 +280,10 @@ private async ValueTask SendAsync(
if (ActivityContext.TryParse(traceparent, tracestate, out var parentContext))
{
activity = OpenTelemetry.Source.CreateActivity(
$"outbox send {envelope.MessageId}",
"outbox send",
ActivityKind.Client,
parentContext);

activity?.SetTag("messaging.message_id", envelope.MessageId);
activity?.Start();
}
}
Expand Down Expand Up @@ -339,7 +349,7 @@ internal static partial class Logs

file static class Serializer
{
public static MessageEnvelope? ReadMessageEnvelopeSafe(NpgsqlDataReader reader, int ordinal)
public static MessageEnvelope? ReadMessageEnvelopeSafe(NpgsqlDataReader reader, int ordinal, ILogger logger)
{
try
{
Expand All @@ -348,7 +358,7 @@ file static class Serializer
}
catch (Exception ex)
{
Console.WriteLine($"Error reading message envelope: {ex.Message}");
logger.LogError(ex, "Error reading message envelope");
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ public sealed class PostgresTableInfo
/// Gets or sets the table and column metadata for the inbox messages table.
/// </summary>
public InboxTableInfo Inbox { get; set; } = new();

/// <summary>
/// Gets or sets the table and column metadata for the scheduled messages table.
/// </summary>
public ScheduledMessageTableInfo ScheduledMessage { get; set; } = new();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
namespace Mocha.EntityFrameworkCore.Postgres;

/// <summary>
/// Table and column information for the scheduled messages table.
/// </summary>
public sealed class ScheduledMessageTableInfo
{
/// <summary>
/// Gets or sets the database schema for the scheduled messages table. Defaults to <c>"public"</c>.
/// </summary>
public string Schema { get; set; } = "public";

/// <summary>
/// Gets or sets the table name for scheduled messages. Defaults to <c>"scheduled_messages"</c>.
/// </summary>
public string Table { get; set; } = "scheduled_messages";

/// <summary>
/// Gets or sets the column name for the scheduled message identifier. Defaults to <c>"id"</c>.
/// </summary>
public string Id { get; set; } = "id";

/// <summary>
/// Gets or sets the column name for the serialized message envelope. Defaults to <c>"envelope"</c>.
/// </summary>
public string Envelope { get; set; } = "envelope";

/// <summary>
/// Gets or sets the column name for the scheduled delivery time. Defaults to <c>"scheduled_time"</c>.
/// </summary>
public string ScheduledTime { get; set; } = "scheduled_time";

/// <summary>
/// Gets or sets the column name tracking how many times dispatch has been attempted. Defaults
/// to <c>"times_sent"</c>.
/// </summary>
public string TimesSent { get; set; } = "times_sent";

/// <summary>
/// Gets or sets the column name for the message creation timestamp. Defaults to <c>"created_at"</c>.
/// </summary>
public string CreatedAt { get; set; } = "created_at";

/// <summary>
/// Gets or sets the column name for the maximum number of dispatch attempts. Defaults to <c>"max_attempts"</c>.
/// </summary>
public string MaxAttempts { get; set; } = "max_attempts";

/// <summary>
/// Gets or sets the column name for the last error encountered during dispatch. Defaults to <c>"last_error"</c>.
/// </summary>
public string LastError { get; set; } = "last_error";

/// <summary>
/// Gets or sets the name of the primary key index. Defaults to <c>"ix_scheduled_messages_primary_key"</c>.
/// </summary>
public string IxPrimaryKey { get; set; } = "ix_scheduled_messages_primary_key";

/// <summary>
/// Gets or sets the name of the scheduled-time index used for dispatch ordering. Defaults to
/// <c>"ix_scheduled_messages_scheduled_time"</c>.
/// </summary>
public string IxScheduledTime { get; set; } = "ix_scheduled_messages_scheduled_time";

/// <summary>
/// Gets or sets the name of the times-sent index used for retry filtering. Defaults to
/// <c>"ix_scheduled_messages_times_sent"</c>.
/// </summary>
public string IxTimesSent { get; set; } = "ix_scheduled_messages_times_sent";

/// <summary>
/// Gets the fully qualified table name including schema if not public.
/// </summary>
public string QualifiedTableName
=> string.IsNullOrEmpty(Schema) || Schema == "public" ? $"\"{Table}\"" : $"\"{Schema}\".\"{Table}\"";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Mocha.Middlewares;
using Mocha.Utils;
using Npgsql;
using NpgsqlTypes;

namespace Mocha.Scheduling;

/// <summary>
/// Implements <see cref="IScheduledMessageStore"/> for Postgres by inserting serialized message envelopes
/// into the scheduled messages table using raw SQL through the DbContext Npgsql connection.
/// </summary>
internal sealed class EfCoreScheduledMessageStore : IScheduledMessageStore, IDisposable
{
private readonly DbContext _originalDbContext;
private readonly ISchedulerSignal _signal;
private readonly SemaphoreSlim _semaphore = new(1, 1);
private readonly string? _insertSql;
private PooledArrayWriter? _arrayWriter;

/// <summary>
/// Creates a new <see cref="EfCoreScheduledMessageStore"/> using the provided DbContext connection,
/// scheduler signal, and pre-built insert SQL.
/// </summary>
/// <param name="originalDbContext">The DbContext whose underlying Npgsql connection is used for inserts.</param>
/// <param name="signal">The signal used to wake the scheduler after a message is persisted.</param>
/// <param name="insertSql">The parameterized SQL insert statement for the scheduled messages table.</param>
public EfCoreScheduledMessageStore(DbContext originalDbContext, ISchedulerSignal signal, string insertSql)
{
_originalDbContext = originalDbContext;
_signal = signal;
_insertSql = insertSql;
}

/// <summary>
/// Serializes the message envelope and inserts it into the Postgres scheduled messages table.
/// </summary>
/// <param name="envelope">The message envelope to persist.</param>
/// <param name="scheduledTime">The time at which the message should be dispatched.</param>
/// <param name="cancellationToken">A token to observe for cancellation.</param>
public async ValueTask PersistAsync(
MessageEnvelope envelope,
DateTimeOffset scheduledTime,
CancellationToken cancellationToken)
{
await _semaphore.WaitAsync(cancellationToken);

try
{
_arrayWriter ??= new PooledArrayWriter();

var connection = (NpgsqlConnection)_originalDbContext.Database.GetDbConnection();

if (connection.State != System.Data.ConnectionState.Open)
{
await connection.OpenAsync(cancellationToken);
}

var transaction = _originalDbContext.Database.CurrentTransaction?.GetDbTransaction() as NpgsqlTransaction;

await using var writer = new Utf8JsonWriter(_arrayWriter);
writer.WriteEnvelope(envelope);
writer.Flush(); // we know it's not async

// Execute the INSERT command
await using var command = connection.CreateCommand();
command.CommandText = _insertSql;
command.Parameters.AddWithValue("@id", NewVersion());
command.Parameters.Add(
new NpgsqlParameter("@envelope", NpgsqlDbType.Json) { Value = _arrayWriter.WrittenMemory });
command.Parameters.AddWithValue("@scheduled_time", scheduledTime.UtcDateTime);
await command.PrepareAsync(cancellationToken);

await command.ExecuteNonQueryAsync(cancellationToken);

if (transaction is null)
{
_signal.Notify(scheduledTime);
}
Comment thread
PascalSenn marked this conversation as resolved.
}
finally
{
_arrayWriter?.Reset();
_semaphore.Release();
}
}

private static Guid NewVersion()
{
#if NET9_0_OR_GREATER
return Guid.CreateVersion7();
#else
return Guid.NewGuid();
#endif
}

/// <summary>
/// Releases the semaphore and pooled array writer used for scheduled message serialization.
/// </summary>
public void Dispose()
{
_semaphore.Dispose();
_arrayWriter?.Dispose();
}

/// <summary>
/// Creates a new <see cref="EfCoreScheduledMessageStore"/> by resolving the DbContext, scheduler signal,
/// and named options from the scoped service provider.
/// </summary>
/// <param name="contextType">The <see cref="Type"/> of the DbContext to resolve.</param>
/// <param name="optionsName">The named options key used to retrieve <see cref="PostgresScheduledMessageOptions"/>.</param>
/// <param name="services">The scoped service provider used to resolve dependencies.</param>
/// <returns>A new <see cref="EfCoreScheduledMessageStore"/> configured for the specified DbContext.</returns>
public static EfCoreScheduledMessageStore Create(Type contextType, string optionsName, IServiceProvider services)
{
var dbContext = (DbContext)services.GetRequiredService(contextType);
var signal = services.GetRequiredService<ISchedulerSignal>();
var optionsMonitor = services.GetRequiredService<IOptionsMonitor<PostgresScheduledMessageOptions>>();
var options = optionsMonitor.Get(optionsName);
var insertSql = options.Queries.InsertMessage;

return new EfCoreScheduledMessageStore(dbContext, signal, insertSql);
}
}

file static class Extensions
{
public static void WriteEnvelope(this Utf8JsonWriter writer, MessageEnvelope envelope)
{
var envelopeWriter = new MessageEnvelopeWriter(writer);
envelopeWriter.WriteMessage(envelope);
}
}
Loading
Loading