diff --git a/src/All.slnx b/src/All.slnx
index c856c1f63a2..558f38eacd7 100644
--- a/src/All.slnx
+++ b/src/All.slnx
@@ -352,6 +352,7 @@
+
diff --git a/src/Mocha/Mocha.slnx b/src/Mocha/Mocha.slnx
index 452ec2e12d9..cf3a4327070 100644
--- a/src/Mocha/Mocha.slnx
+++ b/src/Mocha/Mocha.slnx
@@ -18,6 +18,7 @@
+
diff --git a/src/Mocha/benchmarks/Mocha.Mediator.Benchmarks/Internal/PoolingBenchmarks.cs b/src/Mocha/benchmarks/Mocha.Mediator.Benchmarks/Internal/PoolingBenchmarks.cs
index 9c8b7c03fed..e74c3b7806f 100644
--- a/src/Mocha/benchmarks/Mocha.Mediator.Benchmarks/Internal/PoolingBenchmarks.cs
+++ b/src/Mocha/benchmarks/Mocha.Mediator.Benchmarks/Internal/PoolingBenchmarks.cs
@@ -73,7 +73,7 @@ public void Cleanup()
obj.Message = s_message;
obj.MessageType = s_type;
var result = obj.Message;
- // No return — GC collects it
+ // No return - GC collects it
return result;
}
diff --git a/src/Mocha/src/Demo/Demo.Billing/Data/BillingDbContext.cs b/src/Mocha/src/Demo/Demo.Billing/Data/BillingDbContext.cs
index 3424f1a454f..36804a6efb0 100644
--- a/src/Mocha/src/Demo/Demo.Billing/Data/BillingDbContext.cs
+++ b/src/Mocha/src/Demo/Demo.Billing/Data/BillingDbContext.cs
@@ -3,6 +3,7 @@
using Microsoft.EntityFrameworkCore.Design;
using Mocha.Inbox;
using Mocha.Outbox;
+using Mocha.Scheduling;
namespace Demo.Billing.Data;
@@ -20,6 +21,7 @@ protected override void OnModelCreating(ModelBuilder modelBuilder)
modelBuilder.AddPostgresOutbox();
modelBuilder.AddPostgresInbox();
+ modelBuilder.AddPostgresScheduledMessages();
modelBuilder.Entity(entity =>
{
diff --git a/src/Mocha/src/Demo/Demo.Billing/Handlers/OrderPlacedEventHandler.cs b/src/Mocha/src/Demo/Demo.Billing/Handlers/OrderPlacedEventHandler.cs
index 27b7d4357ad..1ac8bc6b7e2 100644
--- a/src/Mocha/src/Demo/Demo.Billing/Handlers/OrderPlacedEventHandler.cs
+++ b/src/Mocha/src/Demo/Demo.Billing/Handlers/OrderPlacedEventHandler.cs
@@ -70,5 +70,21 @@ await messageBus.PublishAsync(
cancellationToken);
logger.LogInformation("PaymentCompletedEvent published for order {OrderId}", message.OrderId);
+
+ // Schedule a payment reminder 30 seconds from now
+ await messageBus.SchedulePublishAsync(
+ new PaymentReminderEvent
+ {
+ InvoiceId = invoice.Id,
+ OrderId = message.OrderId,
+ CustomerId = message.CustomerId,
+ Amount = invoice.Amount
+ },
+ DateTimeOffset.UtcNow.AddSeconds(30),
+ cancellationToken);
+
+ logger.LogInformation(
+ "Payment reminder scheduled for order {OrderId} in 30 seconds",
+ message.OrderId);
}
}
diff --git a/src/Mocha/src/Demo/Demo.Billing/Handlers/PaymentReminderEventHandler.cs b/src/Mocha/src/Demo/Demo.Billing/Handlers/PaymentReminderEventHandler.cs
new file mode 100644
index 00000000000..b9cf56a73b1
--- /dev/null
+++ b/src/Mocha/src/Demo/Demo.Billing/Handlers/PaymentReminderEventHandler.cs
@@ -0,0 +1,40 @@
+using Demo.Billing.Data;
+using Demo.Billing.Entities;
+using Demo.Contracts.Events;
+using Microsoft.EntityFrameworkCore;
+using Mocha;
+
+namespace Demo.Billing.Handlers;
+
+public class PaymentReminderEventHandler(
+ BillingDbContext db,
+ ILogger logger) : IEventHandler
+{
+ public async ValueTask HandleAsync(PaymentReminderEvent message, CancellationToken cancellationToken)
+ {
+ var invoice = await db.Invoices.FirstOrDefaultAsync(i => i.Id == message.InvoiceId, cancellationToken);
+
+ if (invoice is null)
+ {
+ logger.LogWarning("Payment reminder for invoice {InvoiceId} - invoice not found", message.InvoiceId);
+ return;
+ }
+
+ if (invoice.Status == InvoiceStatus.Pending)
+ {
+ logger.LogWarning(
+ "Payment reminder: Invoice {InvoiceId} for order {OrderId} is still pending. Amount: {Amount}",
+ message.InvoiceId,
+ message.OrderId,
+ message.Amount);
+ }
+ else
+ {
+ logger.LogInformation(
+ "Payment reminder: Invoice {InvoiceId} for order {OrderId} is already {Status}",
+ message.InvoiceId,
+ message.OrderId,
+ invoice.Status);
+ }
+ }
+}
diff --git a/src/Mocha/src/Demo/Demo.Billing/Migrations/20260328113825_AddScheduledMessages.Designer.cs b/src/Mocha/src/Demo/Demo.Billing/Migrations/20260328113825_AddScheduledMessages.Designer.cs
new file mode 100644
index 00000000000..8e18568861d
--- /dev/null
+++ b/src/Mocha/src/Demo/Demo.Billing/Migrations/20260328113825_AddScheduledMessages.Designer.cs
@@ -0,0 +1,335 @@
+//
+using System;
+using System.Text.Json;
+using Demo.Billing.Data;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Infrastructure;
+using Microsoft.EntityFrameworkCore.Migrations;
+using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
+using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
+
+#nullable disable
+
+namespace HotChocolate.Demo.Billing.Migrations
+{
+ [DbContext(typeof(BillingDbContext))]
+ [Migration("20260328113825_AddScheduledMessages")]
+ partial class AddScheduledMessages
+ {
+ ///
+ protected override void BuildTargetModel(ModelBuilder modelBuilder)
+ {
+#pragma warning disable 612, 618
+ modelBuilder
+ .HasAnnotation("ProductVersion", "10.0.1")
+ .HasAnnotation("Relational:MaxIdentifierLength", 63);
+
+ NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
+
+ modelBuilder.Entity("Demo.Billing.Entities.Invoice", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid");
+
+ b.Property("Amount")
+ .HasPrecision(18, 2)
+ .HasColumnType("numeric(18,2)");
+
+ b.Property("CreatedAt")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("CustomerId")
+ .IsRequired()
+ .HasMaxLength(100)
+ .HasColumnType("character varying(100)");
+
+ b.Property("OrderId")
+ .HasColumnType("uuid");
+
+ b.Property("Status")
+ .HasColumnType("integer");
+
+ b.Property("UpdatedAt")
+ .HasColumnType("timestamp with time zone");
+
+ b.HasKey("Id");
+
+ b.HasIndex("CustomerId");
+
+ b.HasIndex("OrderId")
+ .IsUnique();
+
+ b.HasIndex("Status");
+
+ b.ToTable("Invoices");
+ });
+
+ modelBuilder.Entity("Demo.Billing.Entities.Payment", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid");
+
+ b.Property("Amount")
+ .HasPrecision(18, 2)
+ .HasColumnType("numeric(18,2)");
+
+ b.Property("InvoiceId")
+ .HasColumnType("uuid");
+
+ b.Property("Method")
+ .IsRequired()
+ .HasMaxLength(50)
+ .HasColumnType("character varying(50)");
+
+ b.Property("ProcessedAt")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("Status")
+ .HasColumnType("integer");
+
+ b.HasKey("Id");
+
+ b.HasIndex("InvoiceId");
+
+ b.HasIndex("Status");
+
+ b.ToTable("Payments");
+ });
+
+ modelBuilder.Entity("Demo.Billing.Entities.Refund", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid");
+
+ b.Property("CreatedAt")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("CustomerId")
+ .IsRequired()
+ .HasMaxLength(100)
+ .HasColumnType("character varying(100)");
+
+ b.Property("InvoiceId")
+ .HasColumnType("uuid");
+
+ b.Property("OrderId")
+ .HasColumnType("uuid");
+
+ b.Property("OriginalAmount")
+ .HasPrecision(18, 2)
+ .HasColumnType("numeric(18,2)");
+
+ b.Property("ProcessedAt")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("Reason")
+ .IsRequired()
+ .HasMaxLength(500)
+ .HasColumnType("character varying(500)");
+
+ b.Property("RefundPercentage")
+ .HasPrecision(5, 2)
+ .HasColumnType("numeric(5,2)");
+
+ b.Property("RefundedAmount")
+ .HasPrecision(18, 2)
+ .HasColumnType("numeric(18,2)");
+
+ b.Property("Status")
+ .HasColumnType("integer");
+
+ b.Property("Type")
+ .HasColumnType("integer");
+
+ b.HasKey("Id");
+
+ b.HasIndex("InvoiceId");
+
+ b.HasIndex("OrderId");
+
+ b.HasIndex("Status");
+
+ b.ToTable("Refunds");
+ });
+
+ modelBuilder.Entity("Demo.Billing.Entities.RevenueSummary", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid");
+
+ b.Property("AverageOrderAmount")
+ .HasPrecision(18, 2)
+ .HasColumnType("numeric(18,2)");
+
+ b.Property("CompletionMode")
+ .IsRequired()
+ .HasMaxLength(50)
+ .HasColumnType("character varying(50)");
+
+ b.Property("CreatedAt")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("OrderCount")
+ .HasColumnType("integer");
+
+ b.Property("PeriodEnd")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("PeriodStart")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("TotalItemsSold")
+ .HasColumnType("integer");
+
+ b.Property("TotalRevenue")
+ .HasPrecision(18, 2)
+ .HasColumnType("numeric(18,2)");
+
+ b.HasKey("Id");
+
+ b.ToTable("RevenueSummaries");
+ });
+
+ modelBuilder.Entity("Mocha.Inbox.InboxMessage", b =>
+ {
+ b.Property("MessageId")
+ .HasMaxLength(512)
+ .HasColumnType("character varying(512)")
+ .HasColumnName("message_id");
+
+ b.Property("ConsumerType")
+ .HasMaxLength(512)
+ .HasColumnType("character varying(512)")
+ .HasColumnName("consumer_type");
+
+ b.Property("MessageType")
+ .IsRequired()
+ .HasMaxLength(512)
+ .HasColumnType("character varying(512)")
+ .HasColumnName("message_type");
+
+ b.Property("ProcessedAt")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("processed_at")
+ .HasDefaultValueSql("NOW()");
+
+ b.HasKey("MessageId", "ConsumerType")
+ .HasName("ix_inbox_messages_primary_key");
+
+ b.HasIndex("ProcessedAt")
+ .HasDatabaseName("ix_inbox_messages_processed_at");
+
+ b.ToTable("inbox_messages", (string)null);
+ });
+
+ modelBuilder.Entity("Mocha.Outbox.OutboxMessage", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("CreatedAt")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("created_at");
+
+ b.Property("Envelope")
+ .IsRequired()
+ .HasColumnType("json")
+ .HasColumnName("envelope");
+
+ b.Property("TimesSent")
+ .HasColumnType("integer")
+ .HasColumnName("times_sent");
+
+ b.HasKey("Id")
+ .HasName("ix_outbox_messages_primary_key");
+
+ b.HasIndex("CreatedAt")
+ .IsDescending()
+ .HasDatabaseName("ix_outbox_messages_created_at");
+
+ b.HasIndex("TimesSent")
+ .HasDatabaseName("ix_outbox_messages_times_sent");
+
+ b.ToTable("outbox_messages", (string)null);
+ });
+
+ modelBuilder.Entity("Mocha.Scheduling.ScheduledMessage", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("CreatedAt")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("created_at");
+
+ b.Property("Envelope")
+ .IsRequired()
+ .HasColumnType("json")
+ .HasColumnName("envelope");
+
+ b.Property("LastError")
+ .HasColumnType("jsonb")
+ .HasColumnName("last_error");
+
+ b.Property("MaxAttempts")
+ .HasColumnType("integer")
+ .HasColumnName("max_attempts");
+
+ b.Property("ScheduledTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("scheduled_time");
+
+ b.Property("TimesSent")
+ .HasColumnType("integer")
+ .HasColumnName("times_sent");
+
+ b.HasKey("Id")
+ .HasName("ix_scheduled_messages_primary_key");
+
+ b.HasIndex("ScheduledTime")
+ .HasDatabaseName("ix_scheduled_messages_scheduled_time")
+ .HasFilter("\"times_sent\" < \"max_attempts\"");
+
+ b.HasIndex("TimesSent")
+ .HasDatabaseName("ix_scheduled_messages_times_sent");
+
+ b.ToTable("scheduled_messages", (string)null);
+ });
+
+ modelBuilder.Entity("Demo.Billing.Entities.Payment", b =>
+ {
+ b.HasOne("Demo.Billing.Entities.Invoice", "Invoice")
+ .WithMany("Payments")
+ .HasForeignKey("InvoiceId")
+ .OnDelete(DeleteBehavior.Cascade)
+ .IsRequired();
+
+ b.Navigation("Invoice");
+ });
+
+ modelBuilder.Entity("Demo.Billing.Entities.Refund", b =>
+ {
+ b.HasOne("Demo.Billing.Entities.Invoice", "Invoice")
+ .WithMany()
+ .HasForeignKey("InvoiceId");
+
+ b.Navigation("Invoice");
+ });
+
+ modelBuilder.Entity("Demo.Billing.Entities.Invoice", b =>
+ {
+ b.Navigation("Payments");
+ });
+#pragma warning restore 612, 618
+ }
+ }
+}
diff --git a/src/Mocha/src/Demo/Demo.Billing/Migrations/20260328113825_AddScheduledMessages.cs b/src/Mocha/src/Demo/Demo.Billing/Migrations/20260328113825_AddScheduledMessages.cs
new file mode 100644
index 00000000000..3857340cbe3
--- /dev/null
+++ b/src/Mocha/src/Demo/Demo.Billing/Migrations/20260328113825_AddScheduledMessages.cs
@@ -0,0 +1,51 @@
+using System;
+using System.Text.Json;
+using Microsoft.EntityFrameworkCore.Migrations;
+
+#nullable disable
+
+namespace HotChocolate.Demo.Billing.Migrations
+{
+ ///
+ public partial class AddScheduledMessages : Migration
+ {
+ ///
+ protected override void Up(MigrationBuilder migrationBuilder)
+ {
+ migrationBuilder.CreateTable(
+ name: "scheduled_messages",
+ columns: table => new
+ {
+ id = table.Column(type: "uuid", nullable: false),
+ envelope = table.Column(type: "json", nullable: false),
+ scheduled_time = table.Column(type: "timestamp with time zone", nullable: false),
+ times_sent = table.Column(type: "integer", nullable: false),
+ max_attempts = table.Column(type: "integer", nullable: false),
+ last_error = table.Column(type: "jsonb", nullable: true),
+ created_at = table.Column(type: "timestamp with time zone", nullable: false)
+ },
+ constraints: table =>
+ {
+ table.PrimaryKey("ix_scheduled_messages_primary_key", x => x.id);
+ });
+
+ migrationBuilder.CreateIndex(
+ name: "ix_scheduled_messages_scheduled_time",
+ table: "scheduled_messages",
+ column: "scheduled_time",
+ filter: "\"times_sent\" < \"max_attempts\"");
+
+ migrationBuilder.CreateIndex(
+ name: "ix_scheduled_messages_times_sent",
+ table: "scheduled_messages",
+ column: "times_sent");
+ }
+
+ ///
+ protected override void Down(MigrationBuilder migrationBuilder)
+ {
+ migrationBuilder.DropTable(
+ name: "scheduled_messages");
+ }
+ }
+}
diff --git a/src/Mocha/src/Demo/Demo.Billing/Migrations/BillingDbContextModelSnapshot.cs b/src/Mocha/src/Demo/Demo.Billing/Migrations/BillingDbContextModelSnapshot.cs
index 606e3a622e9..ea22fd19d7d 100644
--- a/src/Mocha/src/Demo/Demo.Billing/Migrations/BillingDbContextModelSnapshot.cs
+++ b/src/Mocha/src/Demo/Demo.Billing/Migrations/BillingDbContextModelSnapshot.cs
@@ -257,6 +257,51 @@ protected override void BuildModel(ModelBuilder modelBuilder)
b.ToTable("outbox_messages", (string)null);
});
+ modelBuilder.Entity("Mocha.Scheduling.ScheduledMessage", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("CreatedAt")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("created_at");
+
+ b.Property("Envelope")
+ .IsRequired()
+ .HasColumnType("json")
+ .HasColumnName("envelope");
+
+ b.Property("LastError")
+ .HasColumnType("jsonb")
+ .HasColumnName("last_error");
+
+ b.Property("MaxAttempts")
+ .HasColumnType("integer")
+ .HasColumnName("max_attempts");
+
+ b.Property("ScheduledTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("scheduled_time");
+
+ b.Property("TimesSent")
+ .HasColumnType("integer")
+ .HasColumnName("times_sent");
+
+ b.HasKey("Id")
+ .HasName("ix_scheduled_messages_primary_key");
+
+ b.HasIndex("ScheduledTime")
+ .HasDatabaseName("ix_scheduled_messages_scheduled_time")
+ .HasFilter("\"times_sent\" < \"max_attempts\"");
+
+ b.HasIndex("TimesSent")
+ .HasDatabaseName("ix_scheduled_messages_times_sent");
+
+ b.ToTable("scheduled_messages", (string)null);
+ });
+
modelBuilder.Entity("Demo.Billing.Entities.Payment", b =>
{
b.HasOne("Demo.Billing.Entities.Invoice", "Invoice")
diff --git a/src/Mocha/src/Demo/Demo.Billing/Program.cs b/src/Mocha/src/Demo/Demo.Billing/Program.cs
index 80ddecaaf7a..98757e06b61 100644
--- a/src/Mocha/src/Demo/Demo.Billing/Program.cs
+++ b/src/Mocha/src/Demo/Demo.Billing/Program.cs
@@ -8,6 +8,7 @@
using Mocha.Inbox;
using Mocha.Mediator;
using Mocha.Outbox;
+using Mocha.Scheduling;
using Mocha.Transport.RabbitMQ;
var builder = WebApplication.CreateBuilder(args);
@@ -48,6 +49,7 @@
p.UseResilience();
p.UseTransaction();
p.UsePostgresInbox();
+ p.UsePostgresScheduling();
})
.AddRabbitMQ();
diff --git a/src/Mocha/src/Demo/Demo.Contracts/Events/PaymentReminderEvent.cs b/src/Mocha/src/Demo/Demo.Contracts/Events/PaymentReminderEvent.cs
new file mode 100644
index 00000000000..e579347d2fd
--- /dev/null
+++ b/src/Mocha/src/Demo/Demo.Contracts/Events/PaymentReminderEvent.cs
@@ -0,0 +1,12 @@
+namespace Demo.Contracts.Events;
+
+///
+/// Published by Billing as a scheduled reminder to check invoice payment status.
+///
+public sealed class PaymentReminderEvent
+{
+ public required Guid InvoiceId { get; init; }
+ public required Guid OrderId { get; init; }
+ public required string CustomerId { get; init; }
+ public required decimal Amount { get; init; }
+}
diff --git a/src/Mocha/src/Examples/MediatorShowcase/Handlers.cs b/src/Mocha/src/Examples/MediatorShowcase/Handlers.cs
index e6767120a1e..2ee9d154a9a 100644
--- a/src/Mocha/src/Examples/MediatorShowcase/Handlers.cs
+++ b/src/Mocha/src/Examples/MediatorShowcase/Handlers.cs
@@ -122,7 +122,7 @@ public sealed class OrderShippedEmailHandler(ILogger l
{
public ValueTask HandleAsync(OrderShippedNotification notification, CancellationToken cancellationToken)
{
- logger.LogInformation("[Email] Order {OrderId} shipped — email sent to customer", notification.OrderId);
+ logger.LogInformation("[Email] Order {OrderId} shipped - email sent to customer", notification.OrderId);
return ValueTask.CompletedTask;
}
}
@@ -135,7 +135,7 @@ public sealed class OrderShippedAnalyticsHandler(ILogger
{
await sender.SendAsync(new CreateProductCommand(req.Name, req.Price));
diff --git a/src/Mocha/src/Examples/Transports/RabbitMQ/RabbitMQ.cs b/src/Mocha/src/Examples/Transports/RabbitMQ/RabbitMQ.cs
index 908cd48e969..4383d691306 100644
--- a/src/Mocha/src/Examples/Transports/RabbitMQ/RabbitMQ.cs
+++ b/src/Mocha/src/Examples/Transports/RabbitMQ/RabbitMQ.cs
@@ -46,7 +46,7 @@
.Handler();
// Declare a quorum queue explicitly with durable flag.
- // Quorum queues require durable=true — non-durable quorum queues are not supported.
+ // Quorum queues require durable=true - non-durable quorum queues are not supported.
transport.DeclareQueue("orders.processing")
.Durable()
.AutoProvision()
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Mocha.EntityFrameworkCore.Postgres.csproj b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Mocha.EntityFrameworkCore.Postgres.csproj
index 2022bdad08d..558a513a246 100644
--- a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Mocha.EntityFrameworkCore.Postgres.csproj
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Mocha.EntityFrameworkCore.Postgres.csproj
@@ -8,6 +8,7 @@
+
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Outbox/OutboxProcessor.cs b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Outbox/OutboxProcessor.cs
index dacb86037f0..cf8368158b0 100644
--- a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Outbox/OutboxProcessor.cs
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Outbox/OutboxProcessor.cs
@@ -158,7 +158,7 @@ private async Task ProcessEventAsync(NpgsqlConnection connection, Cancella
if (await reader.ReadAsync(cancellationToken))
{
var id = reader.GetGuid(0);
- var envelope = Serializer.ReadMessageEnvelopeSafe(reader, 1);
+ var envelope = Serializer.ReadMessageEnvelopeSafe(reader, 1, _logger);
var messageType = GetMessageType(envelope?.MessageType);
var isReply = envelope?.Headers?.IsReply() ?? false;
var endpoint = isReply
@@ -192,13 +192,23 @@ private async Task ProcessEventAsync(NpgsqlConnection connection, Cancella
}
finally
{
- await transaction.CommitAsync(cancellationToken);
+ try
+ {
+ await transaction.CommitAsync(cancellationToken);
+ }
+ catch
+ {
+ // Commit failed (e.g., connection lost). Attempt rollback.
+ // If commit actually succeeded server-side, the message stays
+ // with times_sent incremented - safe, just causes a retry.
+ try { await transaction.RollbackAsync(CancellationToken.None); } catch { /* swallow */ }
+ }
}
}
catch (Exception ex)
{
+ // Log only - no RollbackAsync here (commit already handled in finally)
_logger.UnexpectedErrorWhileProcessingOutboxEvent(ex);
- await transaction.RollbackAsync(cancellationToken);
throw;
}
}
@@ -274,6 +284,8 @@ private async ValueTask SendAsync(
ActivityKind.Client,
parentContext);
+ activity?.SetMessageId(envelope.MessageId);
+
activity?.Start();
}
}
@@ -339,7 +351,7 @@ internal static partial class Logs
file static class Serializer
{
- public static MessageEnvelope? ReadMessageEnvelopeSafe(NpgsqlDataReader reader, int ordinal)
+ public static MessageEnvelope? ReadMessageEnvelopeSafe(NpgsqlDataReader reader, int ordinal, ILogger logger)
{
try
{
@@ -348,7 +360,7 @@ file static class Serializer
}
catch (Exception ex)
{
- Console.WriteLine($"Error reading message envelope: {ex.Message}");
+ logger.LogError(ex, "Error reading message envelope");
return null;
}
}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/PostgresTableInfo.cs b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/PostgresTableInfo.cs
index ef4f290d852..80e0673b141 100644
--- a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/PostgresTableInfo.cs
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/PostgresTableInfo.cs
@@ -20,4 +20,9 @@ public sealed class PostgresTableInfo
/// Gets or sets the table and column metadata for the inbox messages table.
///
public InboxTableInfo Inbox { get; set; } = new();
+
+ ///
+ /// Gets or sets the table and column metadata for the scheduled messages table.
+ ///
+ public ScheduledMessageTableInfo ScheduledMessage { get; set; } = new();
}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/ScheduledMessageTableInfo.cs b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/ScheduledMessageTableInfo.cs
new file mode 100644
index 00000000000..068fe43b649
--- /dev/null
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/ScheduledMessageTableInfo.cs
@@ -0,0 +1,76 @@
+namespace Mocha.EntityFrameworkCore.Postgres;
+
+///
+/// Table and column information for the scheduled messages table.
+///
+public sealed class ScheduledMessageTableInfo
+{
+ ///
+ /// Gets or sets the database schema for the scheduled messages table. Defaults to "public".
+ ///
+ public string Schema { get; set; } = "public";
+
+ ///
+ /// Gets or sets the table name for scheduled messages. Defaults to "scheduled_messages".
+ ///
+ public string Table { get; set; } = "scheduled_messages";
+
+ ///
+ /// Gets or sets the column name for the scheduled message identifier. Defaults to "id".
+ ///
+ public string Id { get; set; } = "id";
+
+ ///
+ /// Gets or sets the column name for the serialized message envelope. Defaults to "envelope".
+ ///
+ public string Envelope { get; set; } = "envelope";
+
+ ///
+ /// Gets or sets the column name for the scheduled delivery time. Defaults to "scheduled_time".
+ ///
+ public string ScheduledTime { get; set; } = "scheduled_time";
+
+ ///
+ /// Gets or sets the column name tracking how many times dispatch has been attempted. Defaults
+ /// to "times_sent".
+ ///
+ public string TimesSent { get; set; } = "times_sent";
+
+ ///
+ /// Gets or sets the column name for the message creation timestamp. Defaults to "created_at".
+ ///
+ public string CreatedAt { get; set; } = "created_at";
+
+ ///
+ /// Gets or sets the column name for the maximum number of dispatch attempts. Defaults to "max_attempts".
+ ///
+ public string MaxAttempts { get; set; } = "max_attempts";
+
+ ///
+ /// Gets or sets the column name for the last error encountered during dispatch. Defaults to "last_error".
+ ///
+ public string LastError { get; set; } = "last_error";
+
+ ///
+ /// Gets or sets the name of the primary key index. Defaults to "ix_scheduled_messages_primary_key".
+ ///
+ public string IxPrimaryKey { get; set; } = "ix_scheduled_messages_primary_key";
+
+ ///
+ /// Gets or sets the name of the scheduled-time index used for dispatch ordering. Defaults to
+ /// "ix_scheduled_messages_scheduled_time".
+ ///
+ public string IxScheduledTime { get; set; } = "ix_scheduled_messages_scheduled_time";
+
+ ///
+ /// Gets or sets the name of the times-sent index used for retry filtering. Defaults to
+ /// "ix_scheduled_messages_times_sent".
+ ///
+ public string IxTimesSent { get; set; } = "ix_scheduled_messages_times_sent";
+
+ ///
+ /// Gets the fully qualified table name including schema if not public.
+ ///
+ public string QualifiedTableName
+ => string.IsNullOrEmpty(Schema) || Schema == "public" ? $"\"{Table}\"" : $"\"{Schema}\".\"{Table}\"";
+}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/EfCoreScheduledMessageStore.cs b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/EfCoreScheduledMessageStore.cs
new file mode 100644
index 00000000000..b09b5e5d192
--- /dev/null
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/EfCoreScheduledMessageStore.cs
@@ -0,0 +1,142 @@
+using System.Data;
+using System.Text.Json;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Storage;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+using Mocha.Middlewares;
+using Mocha.Utils;
+using Npgsql;
+using NpgsqlTypes;
+
+namespace Mocha.Scheduling;
+
+///
+/// Implements for Postgres by inserting serialized message envelopes
+/// into the scheduled messages table using raw SQL through the DbContext Npgsql connection.
+///
+internal sealed class EfCoreScheduledMessageStore : IScheduledMessageStore, IDisposable
+{
+ private readonly DbContext _originalDbContext;
+ private readonly ISchedulerSignal _signal;
+ private readonly SemaphoreSlim _semaphore = new(1, 1);
+ private readonly string? _insertSql;
+ private PooledArrayWriter? _arrayWriter;
+
+ ///
+ /// Creates a new using the provided DbContext connection,
+ /// scheduler signal, and pre-built insert SQL.
+ ///
+ /// The DbContext whose underlying Npgsql connection is used for inserts.
+ /// The signal used to wake the scheduler after a message is persisted.
+ /// The parameterized SQL insert statement for the scheduled messages table.
+ public EfCoreScheduledMessageStore(DbContext originalDbContext, ISchedulerSignal signal, string insertSql)
+ {
+ _originalDbContext = originalDbContext;
+ _signal = signal;
+ _insertSql = insertSql;
+ }
+
+ ///
+ /// Serializes the message envelope and inserts it into the Postgres scheduled messages table.
+ ///
+ /// The message envelope to persist.
+ /// The time at which the message should be dispatched.
+ /// A token to observe for cancellation.
+ public async ValueTask PersistAsync(
+ MessageEnvelope envelope,
+ DateTimeOffset scheduledTime,
+ CancellationToken cancellationToken)
+ {
+ await _semaphore.WaitAsync(cancellationToken);
+
+ try
+ {
+ _arrayWriter ??= new PooledArrayWriter();
+
+ var connection = (NpgsqlConnection)_originalDbContext.Database.GetDbConnection();
+
+ if (connection.State != ConnectionState.Open)
+ {
+ await connection.OpenAsync(cancellationToken);
+ }
+
+ var transaction = _originalDbContext.Database.CurrentTransaction?.GetDbTransaction() as NpgsqlTransaction;
+
+ await using var writer = new Utf8JsonWriter(_arrayWriter);
+ writer.WriteEnvelope(envelope);
+ writer.Flush(); // we know it's not async
+
+ // Execute the INSERT command
+ await using var command = connection.CreateCommand();
+ command.CommandText = _insertSql;
+ if (transaction is not null)
+ {
+ command.Transaction = transaction;
+ }
+ command.Parameters.AddWithValue("@id", NewVersion());
+ command.Parameters.Add(
+ new NpgsqlParameter("@envelope", NpgsqlDbType.Json) { Value = _arrayWriter.WrittenMemory });
+ command.Parameters.AddWithValue("@scheduled_time", scheduledTime.UtcDateTime);
+ await command.PrepareAsync(cancellationToken);
+
+ await command.ExecuteNonQueryAsync(cancellationToken);
+
+ if (transaction is null)
+ {
+ _signal.Notify(scheduledTime);
+ }
+ }
+ finally
+ {
+ _arrayWriter?.Reset();
+ _semaphore.Release();
+ }
+ }
+
+ private static Guid NewVersion()
+ {
+#if NET9_0_OR_GREATER
+ return Guid.CreateVersion7();
+#else
+ return Guid.NewGuid();
+#endif
+ }
+
+ ///
+ /// Releases the semaphore and pooled array writer used for scheduled message serialization.
+ ///
+ public void Dispose()
+ {
+ _semaphore.Dispose();
+ _arrayWriter?.Dispose();
+ }
+
+ ///
+ /// Creates a new by resolving the DbContext, scheduler signal,
+ /// and named options from the scoped service provider.
+ ///
+ /// The of the DbContext to resolve.
+ /// The named options key used to retrieve .
+ /// The scoped service provider used to resolve dependencies.
+ /// A new configured for the specified DbContext.
+ public static EfCoreScheduledMessageStore Create(Type contextType, string optionsName, IServiceProvider services)
+ {
+ var dbContext = (DbContext)services.GetRequiredService(contextType);
+ var signal = services.GetRequiredService();
+ var optionsMonitor = services.GetRequiredService>();
+ var options = optionsMonitor.Get(optionsName);
+ var insertSql = options.Queries.InsertMessage;
+
+ return new EfCoreScheduledMessageStore(dbContext, signal, insertSql);
+ }
+}
+
+file static class Extensions
+{
+ public static void WriteEnvelope(this Utf8JsonWriter writer, MessageEnvelope envelope)
+ {
+ var envelopeWriter = new MessageEnvelopeWriter(writer);
+ envelopeWriter.WriteMessage(envelope);
+ }
+}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/PostgresScheduledMessageEntityConfiguration.cs b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/PostgresScheduledMessageEntityConfiguration.cs
new file mode 100644
index 00000000000..97205f71ab9
--- /dev/null
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/PostgresScheduledMessageEntityConfiguration.cs
@@ -0,0 +1,46 @@
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+using Mocha.EntityFrameworkCore.Postgres;
+
+namespace Mocha.Scheduling;
+
+///
+/// Configures the EF Core entity mapping for using default Postgres
+/// table and column names from .
+///
+internal sealed class PostgresScheduledMessageEntityConfiguration : IEntityTypeConfiguration
+{
+ // Use default values from ScheduledMessageTableInfo as the source of truth
+ private static readonly ScheduledMessageTableInfo s_defaults = new();
+
+ ///
+ /// Gets the shared singleton instance of the scheduled message entity configuration.
+ ///
+ public static PostgresScheduledMessageEntityConfiguration Instance { get; } = new();
+
+ ///
+ /// Configures the scheduled message entity with table name, primary key, indexes, and column mappings.
+ ///
+ /// The entity type builder for .
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable(s_defaults.Table);
+
+ builder.HasKey(e => e.Id).HasName(s_defaults.IxPrimaryKey);
+
+ builder.HasIndex(x => x.ScheduledTime)
+ .HasDatabaseName(s_defaults.IxScheduledTime)
+ // Only consider messages that are not yet due or have remaining retry attempts.
+ .HasFilter($"\"{s_defaults.TimesSent}\" < \"{s_defaults.MaxAttempts}\"");
+
+ builder.HasIndex(x => x.TimesSent).HasDatabaseName(s_defaults.IxTimesSent);
+
+ builder.Property(x => x.Id).HasColumnName(s_defaults.Id);
+ builder.Property(x => x.Envelope).HasColumnName(s_defaults.Envelope).HasColumnType("json");
+ builder.Property(x => x.ScheduledTime).HasColumnName(s_defaults.ScheduledTime);
+ builder.Property(x => x.TimesSent).HasColumnName(s_defaults.TimesSent);
+ builder.Property(x => x.MaxAttempts).HasColumnName(s_defaults.MaxAttempts);
+ builder.Property(x => x.LastError).HasColumnName(s_defaults.LastError).HasColumnType("jsonb");
+ builder.Property(x => x.CreatedAt).HasColumnName(s_defaults.CreatedAt);
+ }
+}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/PostgresScheduledMessageOptions.cs b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/PostgresScheduledMessageOptions.cs
new file mode 100644
index 00000000000..f3f80488600
--- /dev/null
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/PostgresScheduledMessageOptions.cs
@@ -0,0 +1,18 @@
+namespace Mocha.Scheduling;
+
+///
+/// Configuration options for the Postgres scheduled message store, including pre-built SQL queries
+/// and the connection string used by the scheduled message dispatcher.
+///
+internal sealed class PostgresScheduledMessageOptions
+{
+ ///
+ /// Gets or sets the pre-built SQL queries used for scheduled message insert, poll, process, and delete operations.
+ ///
+ public ScheduledMessageQueries Queries { get; set; } = null!;
+
+ ///
+ /// Gets or sets the Postgres connection string used by the dispatcher to open a dedicated connection.
+ ///
+ public string ConnectionString { get; set; } = null!;
+}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/PostgresScheduledMessagePersistenceModelBuilderExtensions.cs b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/PostgresScheduledMessagePersistenceModelBuilderExtensions.cs
new file mode 100644
index 00000000000..61b09e62ced
--- /dev/null
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/PostgresScheduledMessagePersistenceModelBuilderExtensions.cs
@@ -0,0 +1,20 @@
+using Microsoft.EntityFrameworkCore;
+
+namespace Mocha.Scheduling;
+
+///
+/// Provides extension methods on for applying the Postgres scheduled message
+/// entity configuration to the EF Core model.
+///
+public static class PostgresScheduledMessagePersistenceModelBuilderExtensions
+{
+ ///
+ /// Applies the entity type configuration to the model,
+ /// mapping it to the Postgres scheduled messages table with default column names and indexes.
+ ///
+ /// The EF Core model builder to configure.
+ public static void AddPostgresScheduledMessages(this ModelBuilder modelBuilder)
+ {
+ modelBuilder.ApplyConfiguration(PostgresScheduledMessageEntityConfiguration.Instance);
+ }
+}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessage.cs b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessage.cs
new file mode 100644
index 00000000000..bb62e650158
--- /dev/null
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessage.cs
@@ -0,0 +1,46 @@
+using System.Text.Json;
+
+namespace Mocha.Scheduling;
+
+///
+/// Represents a message stored in the Postgres scheduled messages table, awaiting dispatch at the scheduled time.
+///
+/// The unique identifier for this scheduled message.
+/// The serialized message envelope containing headers, body, and routing information.
+public sealed class ScheduledMessage(Guid id, JsonDocument envelope)
+{
+ ///
+ /// Gets the unique identifier for this scheduled message.
+ ///
+ public Guid Id { get; private set; } = id;
+
+ ///
+ /// Gets the serialized message envelope containing headers, body, and routing information.
+ ///
+ public JsonDocument Envelope { get; private set; } = envelope;
+
+ ///
+ /// Gets the UTC time at which the message should be dispatched.
+ ///
+ public DateTime ScheduledTime { get; private set; }
+
+ ///
+ /// Gets the number of times the scheduler has attempted to dispatch this message.
+ ///
+ public int TimesSent { get; private set; }
+
+ ///
+ /// Gets the maximum number of times dispatch will be attempted before the message is considered dead.
+ ///
+ public int MaxAttempts { get; private set; } = 10;
+
+ ///
+ /// Gets the last error encountered during dispatch, stored as a JSON document.
+ ///
+ public JsonDocument? LastError { get; private set; }
+
+ ///
+ /// Gets the UTC timestamp when this scheduled message was created.
+ ///
+ public DateTime CreatedAt { get; private set; } = DateTime.UtcNow;
+}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessageDispatcher.cs b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessageDispatcher.cs
new file mode 100644
index 00000000000..c6b990e2e4e
--- /dev/null
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessageDispatcher.cs
@@ -0,0 +1,400 @@
+using System.Diagnostics;
+using System.Text.Json;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.ObjectPool;
+using Mocha.Middlewares;
+using Mocha.Outbox;
+using Npgsql;
+using NpgsqlTypes;
+
+namespace Mocha.Scheduling;
+
+///
+/// Continuously polls the Postgres scheduled messages table for messages that are due and dispatches them
+/// through the messaging runtime, using the scheduler signal to sleep efficiently between polls.
+///
+public sealed class ScheduledMessageDispatcher
+{
+ private readonly ILogger _logger;
+ private readonly IServiceProvider _services;
+ private readonly IMessagingRuntime _runtime;
+ private readonly ISchedulerSignal _signal;
+ private readonly ObjectPool _contextPool;
+ private readonly ScheduledMessageQueries _queries;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The logger used to record scheduled message processing diagnostics and errors.
+ ///
+ ///
+ /// The service provider used to create scoped services for each dispatched message.
+ ///
+ ///
+ /// The messaging runtime used to resolve message types and dispatch endpoints.
+ ///
+ ///
+ /// The pool provider supplying reusable instances to reduce allocations.
+ ///
+ ///
+ /// The scheduler signal used to sleep efficiently and wake when new messages are scheduled.
+ ///
+ ///
+ /// The SQL query definitions for Postgres scheduled messages table operations.
+ ///
+ internal ScheduledMessageDispatcher(
+ ILogger logger,
+ IServiceProvider services,
+ IMessagingRuntime runtime,
+ IMessagingPools pools,
+ ISchedulerSignal signal,
+ ScheduledMessageQueries queries)
+ {
+ _logger = logger;
+ _services = services;
+ _runtime = runtime;
+ _signal = signal;
+ _contextPool = pools.DispatchContext;
+ _queries = queries;
+ }
+
+ ///
+ /// Runs the scheduled message processing loop, dispatching one message per iteration and sleeping
+ /// until the next message is due or a signal is received.
+ ///
+ ///
+ /// The loop continues until is cancelled. Each iteration
+ /// locks a single row using FOR UPDATE SKIP LOCKED, dispatches the envelope,
+ /// and deletes the row on success. Messages that fail are retried with exponential backoff
+ /// up to 10 attempts before being dropped.
+ ///
+ /// An open Postgres connection to use for scheduled message queries.
+ /// A token that signals when the dispatcher should stop.
+ public async Task ProcessAsync(NpgsqlConnection connection, CancellationToken cancellationToken)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ try
+ {
+ var result = await ProcessMessageAsync(connection, cancellationToken);
+
+ if (!result)
+ {
+ var nextWakeTime = await GetNextWakeTimeAsync(connection, cancellationToken);
+
+ if (nextWakeTime is not null)
+ {
+ _logger.SchedulerSleepingUntil(nextWakeTime.Value);
+ await _signal.WaitUntilAsync(nextWakeTime.Value, cancellationToken);
+ }
+ else
+ {
+ // No scheduled messages - sleep until notified.
+ await _signal.WaitUntilAsync(DateTimeOffset.MaxValue, cancellationToken);
+ }
+ }
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ // Normal shutdown.
+ }
+ }
+ }
+
+ private async Task GetNextWakeTimeAsync(
+ NpgsqlConnection connection,
+ CancellationToken cancellationToken)
+ {
+ await using var command = connection.CreateCommand();
+ command.CommandText = _queries.NextWakeTime;
+ await command.PrepareAsync(cancellationToken);
+
+ var result = await command.ExecuteScalarAsync(cancellationToken);
+
+ return result is not null and not DBNull
+ ? new DateTimeOffset((DateTime)result, TimeSpan.Zero)
+ : null;
+ }
+
+ private async Task ProcessMessageAsync(NpgsqlConnection connection, CancellationToken cancellationToken)
+ {
+ using var activity = OpenTelemetry.Source.StartActivity(
+ "Process Scheduled Message Event",
+ ActivityKind.Producer,
+ new ActivityContext());
+
+ await using var transaction = await connection.BeginTransactionAsync(cancellationToken);
+
+ try
+ {
+ await using var command = connection.CreateCommand();
+ command.Transaction = transaction;
+ command.CommandText = _queries.ProcessMessage;
+
+ await command.PrepareAsync(cancellationToken);
+
+ try
+ {
+ await using var reader = await command.ExecuteReaderAsync(cancellationToken);
+
+ if (await reader.ReadAsync(cancellationToken))
+ {
+ var id = reader.GetGuid(0);
+ var envelope = Serializer.ReadMessageEnvelopeSafe(reader, 1, _logger);
+ var timesSent = reader.GetInt32(2);
+ var maxAttempts = reader.GetInt32(3);
+ var messageType = GetMessageType(envelope?.MessageType);
+ var isReply = envelope?.Headers?.IsReply() ?? false;
+ var endpoint = isReply
+ ? GetReplyDispatchEndpoint(envelope?.DestinationAddress)
+ : GetDispatchEndpoint(envelope?.DestinationAddress);
+
+ if (envelope is null || messageType is null || endpoint is null)
+ {
+ _logger.CouldNotDeserializeScheduledMessageBody(id);
+
+ await reader.CloseAsync();
+
+ await DeleteMessageAsync(connection, id, transaction, cancellationToken);
+
+ // we skipped this message, still have to check for the next ones
+ return true;
+ }
+
+ try
+ {
+ await SendAsync(envelope, endpoint, messageType, cancellationToken);
+
+ await reader.CloseAsync();
+
+ await DeleteMessageAsync(connection, id, transaction, cancellationToken);
+ }
+ catch (Exception ex) when (ex is not OperationCanceledException)
+ {
+ _logger.ScheduledMessageDispatchFailed(id, ex);
+
+ await reader.CloseAsync();
+
+ await UpdateLastErrorAsync(connection, id, ex, transaction, cancellationToken);
+
+ if (timesSent >= maxAttempts)
+ {
+ _logger.ScheduledMessageExhausted(id, maxAttempts);
+ }
+ }
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ try
+ {
+ await transaction.CommitAsync(cancellationToken);
+ }
+ catch
+ {
+ // Commit failed (e.g., connection lost). Attempt rollback.
+ // If commit actually succeeded server-side, the message stays
+ // with times_sent incremented - safe, just causes a retry.
+ try { await transaction.RollbackAsync(CancellationToken.None); }
+ catch
+ {
+ /* swallow */
+ }
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ // Log only - no RollbackAsync here (commit already handled in finally)
+ _logger.UnexpectedErrorWhileProcessingScheduledMessage(ex);
+ throw;
+ }
+ }
+
+ private MessageType? GetMessageType(string? messageType)
+ {
+ try
+ {
+ if (messageType is null)
+ {
+ return null;
+ }
+
+ return _runtime.Messages.GetMessageType(messageType);
+ }
+ catch
+ {
+ return null;
+ }
+ }
+
+ private DispatchEndpoint? GetReplyDispatchEndpoint(string? destinationAddress)
+ {
+ try
+ {
+ if (!Uri.TryCreate(destinationAddress, UriKind.Absolute, out var uri))
+ {
+ return null;
+ }
+
+ return _runtime.GetTransport(uri)?.ReplyDispatchEndpoint;
+ }
+ catch
+ {
+ return null;
+ }
+ }
+
+ private DispatchEndpoint? GetDispatchEndpoint(string? destinationAddress)
+ {
+ try
+ {
+ if (destinationAddress is null || !Uri.TryCreate(destinationAddress, UriKind.Absolute, out var uri))
+ {
+ return null;
+ }
+
+ return _runtime.GetDispatchEndpoint(uri);
+ }
+ catch
+ {
+ return null;
+ }
+ }
+
+ private async ValueTask SendAsync(
+ MessageEnvelope envelope,
+ DispatchEndpoint endpoint,
+ MessageType messageType,
+ CancellationToken cancellationToken)
+ {
+ Activity? activity = null;
+ var traceparent = envelope.Headers?.Get(MessageHeaders.Traceparent);
+
+ if (!string.IsNullOrEmpty(traceparent))
+ {
+ var tracestate = envelope.Headers?.Get(MessageHeaders.Tracestate);
+ if (ActivityContext.TryParse(traceparent, tracestate, out var parentContext))
+ {
+ activity = OpenTelemetry.Source.CreateActivity(
+ "scheduler send",
+ ActivityKind.Client,
+ parentContext);
+
+ activity?.SetMessageId(envelope.MessageId);
+
+ activity?.Start();
+ }
+ }
+
+ var context = _contextPool.Get();
+ try
+ {
+ await using var scope = _services.CreateAsyncScope();
+
+ context.Initialize(scope.ServiceProvider, endpoint, _runtime, messageType, cancellationToken);
+
+ context.SkipScheduler();
+ context.SkipOutbox();
+
+ context.Envelope = envelope;
+
+ await endpoint.ExecuteAsync(context);
+ }
+ finally
+ {
+ _contextPool.Return(context);
+ activity?.Dispose();
+ }
+ }
+
+ private async Task DeleteMessageAsync(
+ NpgsqlConnection connection,
+ Guid eventId,
+ NpgsqlTransaction transaction,
+ CancellationToken cancellationToken)
+ {
+ await using var command = connection.CreateCommand();
+ command.CommandText = _queries.DeleteMessage;
+ command.Connection = connection;
+ command.Transaction = transaction;
+ command.Parameters.AddWithValue("@id", eventId);
+
+ await command.PrepareAsync(cancellationToken);
+
+ await command.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ private async Task UpdateLastErrorAsync(
+ NpgsqlConnection connection,
+ Guid id,
+ Exception exception,
+ NpgsqlTransaction transaction,
+ CancellationToken cancellationToken)
+ {
+ using var errorJson = JsonSerializer.SerializeToDocument(
+ new
+ {
+ message = exception.Message,
+ exceptionType = exception.GetType().FullName
+ });
+
+ await using var command = connection.CreateCommand();
+ command.CommandText = _queries.UpdateLastError;
+ command.Connection = connection;
+ command.Transaction = transaction;
+ command.Parameters.AddWithValue("@id", id);
+ command.Parameters.AddWithValue("@last_error", NpgsqlDbType.Jsonb, errorJson);
+
+ await command.PrepareAsync(cancellationToken);
+
+ await command.ExecuteNonQueryAsync(cancellationToken);
+ }
+}
+
+internal static partial class SchedulerLogs
+{
+ [LoggerMessage(
+ 1,
+ LogLevel.Critical,
+ "Could not deserialize message body for scheduled message with ID {Id}. Message Dropped.")]
+ public static partial void CouldNotDeserializeScheduledMessageBody(this ILogger logger, Guid id);
+
+ [LoggerMessage(2, LogLevel.Information, "Scheduler sleeping until {WakeTime}.")]
+ public static partial void SchedulerSleepingUntil(this ILogger logger, DateTimeOffset wakeTime);
+
+ [LoggerMessage(3, LogLevel.Error, "An unexpected error occurred while processing scheduled message")]
+ public static partial void UnexpectedErrorWhileProcessingScheduledMessage(this ILogger logger, Exception exception);
+
+ [LoggerMessage(4, LogLevel.Warning, "Failed to dispatch scheduled message {Id}. Error recorded for retry.")]
+ public static partial void ScheduledMessageDispatchFailed(this ILogger logger, Guid id, Exception exception);
+
+ [LoggerMessage(5, LogLevel.Warning,
+ "Scheduled message {Id} exhausted all {MaxAttempts} retry attempts and will not be retried.")]
+ public static partial void ScheduledMessageExhausted(this ILogger logger, Guid id, int maxAttempts);
+}
+
+file static class Serializer
+{
+ public static MessageEnvelope? ReadMessageEnvelopeSafe(NpgsqlDataReader reader, int ordinal, ILogger logger)
+ {
+ try
+ {
+ var envelope = reader.GetFieldValue>(ordinal);
+ return MessageEnvelopeReader.Parse(envelope);
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, "Error reading message envelope");
+ return null;
+ }
+ }
+}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessageQueries.cs b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessageQueries.cs
new file mode 100644
index 00000000000..7a975019b2b
--- /dev/null
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessageQueries.cs
@@ -0,0 +1,88 @@
+using Mocha.EntityFrameworkCore.Postgres;
+
+namespace Mocha.Scheduling;
+
+///
+/// Holds pre-built SQL query strings for Postgres scheduled message operations, generated from
+/// column and table metadata.
+///
+internal sealed class ScheduledMessageQueries
+{
+ ///
+ /// Gets or sets the SQL statement to insert a new scheduled message into the table.
+ ///
+ public string InsertMessage { get; set; } = null!;
+
+ ///
+ /// Gets or sets the SQL query to retrieve the earliest scheduled time that is due for dispatch.
+ ///
+ public string NextWakeTime { get; set; } = null!;
+
+ ///
+ /// Gets or sets the SQL statement that locks a single scheduled message row for processing,
+ /// increments the times-sent counter, and returns the id and envelope.
+ ///
+ public string ProcessMessage { get; set; } = null!;
+
+ ///
+ /// Gets or sets the SQL statement to delete a dispatched scheduled message by its identifier.
+ ///
+ public string DeleteMessage { get; set; } = null!;
+
+ ///
+ /// Gets or sets the SQL statement to update the last error for a scheduled message.
+ ///
+ public string UpdateLastError { get; set; } = null!;
+
+ ///
+ /// Creates a new instance with SQL queries built from the provided table metadata.
+ ///
+ /// The scheduled message table info containing column and table names.
+ /// A fully initialized instance.
+ public static ScheduledMessageQueries From(ScheduledMessageTableInfo t)
+ {
+ return new ScheduledMessageQueries
+ {
+ InsertMessage = $"""
+ INSERT INTO {t.QualifiedTableName} ("{t.Id}", "{t.Envelope}", "{t.ScheduledTime}", "{t.TimesSent}", "{t.MaxAttempts}", "{t.CreatedAt}")
+ VALUES (@id, @envelope, @scheduled_time, 0, 10, NOW());
+ """,
+
+ NextWakeTime = $"""
+ SELECT MIN("{t.ScheduledTime}" + INTERVAL '1 second' * POWER(2, "{t.TimesSent}")) AS "NextWakeTime"
+ FROM {t.QualifiedTableName}
+ WHERE "{t.TimesSent}" < "{t.MaxAttempts}";
+ """,
+
+ ProcessMessage = $"""
+ UPDATE {t.QualifiedTableName}
+ SET "{t.TimesSent}" = "{t.TimesSent}" + 1,
+ "{t.ScheduledTime}" = NOW() + INTERVAL '1 second' * POWER(2, "{t.TimesSent}")
+ WHERE "{t.Id}" = (
+ SELECT "{t.Id}" FROM {t.QualifiedTableName}
+ WHERE "{t.TimesSent}" < "{t.MaxAttempts}"
+ AND "{t.ScheduledTime}" <= NOW()
+ ORDER BY "{t.ScheduledTime}"
+ FOR UPDATE SKIP LOCKED
+ LIMIT 1
+ )
+ RETURNING
+ "{t.Id}",
+ "{t.Envelope}",
+ "{t.TimesSent}",
+ "{t.MaxAttempts}";
+ """,
+
+ DeleteMessage = $"""
+ DELETE FROM {t.QualifiedTableName}
+ WHERE "{t.Id}" = @id;
+ """,
+
+ UpdateLastError = $"""
+ UPDATE {t.QualifiedTableName}
+ SET "{t.LastError}" = @last_error::jsonb
+ WHERE "{t.Id}" = @id;
+ """
+ };
+ }
+}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessageWorker.cs b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessageWorker.cs
new file mode 100644
index 00000000000..4a7031d6cb6
--- /dev/null
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessageWorker.cs
@@ -0,0 +1,67 @@
+using Microsoft.Extensions.Hosting;
+using Mocha.Threading;
+using Npgsql;
+
+namespace Mocha.Scheduling;
+
+///
+/// A hosted service that manages the lifecycle of the Postgres scheduled message dispatcher,
+/// opening a dedicated Npgsql connection and running the processing loop as a continuous background task.
+///
+/// The scheduled message options containing the Postgres connection string.
+/// The dispatcher that performs the scheduled message dispatch loop.
+internal sealed class ScheduledMessageWorker(
+ PostgresScheduledMessageOptions options,
+ ScheduledMessageDispatcher dispatcher)
+ : IHostedService
+{
+ private NpgsqlDataSource? _dataSource;
+ private ContinuousTask? _task;
+
+ ///
+ /// Starts the scheduled message processing background task.
+ ///
+ /// A token that signals when startup should be aborted.
+ /// A completed task once the background loop has been initiated.
+ /// Thrown if the worker is already running.
+ public Task StartAsync(CancellationToken cancellationToken)
+ {
+ if (_task is not null)
+ {
+ throw new InvalidOperationException("The worker is already running.");
+ }
+
+ _dataSource = NpgsqlDataSource.Create(options.ConnectionString);
+ _task = new ContinuousTask(ProcessAsync);
+
+ return Task.CompletedTask;
+ }
+
+ ///
+ /// Stops the scheduled message processing background task and waits for it to complete gracefully.
+ ///
+ /// A token that signals when shutdown should be forced.
+ public async Task StopAsync(CancellationToken cancellationToken)
+ {
+ if (_task is null)
+ {
+ return;
+ }
+
+ await _task.DisposeAsync();
+ _task = null;
+
+ if (_dataSource is not null)
+ {
+ await _dataSource.DisposeAsync();
+ _dataSource = null;
+ }
+ }
+
+ private async Task ProcessAsync(CancellationToken stoppingToken)
+ {
+ await using var connection = await _dataSource!.OpenConnectionAsync(stoppingToken);
+
+ await dispatcher.ProcessAsync(connection, stoppingToken);
+ }
+}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/SchedulingServiceCollectionExtensions.cs b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/SchedulingServiceCollectionExtensions.cs
new file mode 100644
index 00000000000..da157047d94
--- /dev/null
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/SchedulingServiceCollectionExtensions.cs
@@ -0,0 +1,187 @@
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Mocha.EntityFrameworkCore;
+using Mocha.EntityFrameworkCore.Postgres;
+
+namespace Mocha.Scheduling;
+
+///
+/// Provides extension methods on for registering
+/// the Postgres scheduling infrastructure including the dispatcher, worker, and message persistence.
+///
+public static class SchedulingServiceCollectionExtensions
+{
+ ///
+ /// Registers the full Postgres scheduling pipeline: table info discovery from the EF Core model,
+ /// the , a hosted background worker, and a scoped
+ /// backed by direct Npgsql inserts.
+ ///
+ ///
+ /// This method also calls
+ /// to register the EF Core interceptors that signal the scheduler on save and commit.
+ ///
+ /// The Entity Framework Core builder to configure.
+ /// The same instance for chaining.
+ public static IEntityFrameworkCoreBuilder UsePostgresScheduling(this IEntityFrameworkCoreBuilder builder)
+ {
+ var contextType = builder.ContextType;
+
+ builder
+ .Services.AddOptions(builder.Name)
+ .Configure((options, sp) =>
+ {
+ using var scope = sp.CreateScope();
+ var dbContext = (DbContext)scope.ServiceProvider.GetRequiredService(contextType);
+ var model = dbContext.Model;
+
+ ConfigureScheduledMessageTableInfo(options.ScheduledMessage, model);
+ });
+
+ builder
+ .Services.AddOptions(builder.Name)
+ .Configure>((options, postgresOptions,
+ tableInfoMonitor) =>
+ {
+ using var scope = postgresOptions.CreateScope();
+ var dbContext = (DbContext)scope.ServiceProvider.GetRequiredService(contextType);
+ options.ConnectionString =
+ dbContext.Database.GetConnectionString() ??
+ throw new InvalidOperationException(
+ $"Could not read the connection string from {contextType.Name}");
+ var tableInfo = tableInfoMonitor.Get(builder.Name);
+ options.Queries = ScheduledMessageQueries.From(tableInfo.ScheduledMessage);
+ });
+
+ builder.Services.AddSingleton(sp =>
+ {
+ var optionsMonitor = sp.GetRequiredService>();
+ var options = optionsMonitor.Get(builder.Name);
+ return new ScheduledMessageDispatcher(
+ sp.GetRequiredService>(),
+ sp,
+ sp.GetRequiredService(),
+ sp.GetRequiredService(),
+ sp.GetRequiredService(),
+ options.Queries);
+ });
+
+ builder.Services.AddSingleton(sp =>
+ {
+ var optionsMonitor = sp.GetRequiredService>();
+ var options = optionsMonitor.Get(builder.Name);
+ return new ScheduledMessageWorker(options, sp.GetRequiredService());
+ });
+
+ builder.Services.AddHostedService(sp => sp.GetRequiredService());
+
+ builder.Services.TryAddScoped(sp =>
+ EfCoreScheduledMessageStore.Create(contextType, builder.Name, sp)
+ );
+
+ builder.UseSchedulingCore();
+
+ return builder;
+ }
+
+ private static void ConfigureScheduledMessageTableInfo(ScheduledMessageTableInfo scheduledMessage, IModel model)
+ {
+ var entity = model.FindEntityType(typeof(ScheduledMessage));
+ if (entity is null)
+ {
+ return;
+ }
+
+ var tableName = entity.GetTableName();
+ var schema = entity.GetSchema();
+
+ if (tableName is not null)
+ {
+ scheduledMessage.Table = tableName;
+ }
+
+ if (schema is not null)
+ {
+ scheduledMessage.Schema = schema;
+ }
+
+ var storeObject = StoreObjectIdentifier.Create(entity, StoreObjectType.Table);
+ if (storeObject is null)
+ {
+ return;
+ }
+
+ var idProperty = entity.FindProperty(nameof(ScheduledMessage.Id));
+ if (idProperty is not null)
+ {
+ var columnName = idProperty.GetColumnName(storeObject.Value);
+ if (columnName is not null)
+ {
+ scheduledMessage.Id = columnName;
+ }
+ }
+
+ var envelopeProperty = entity.FindProperty(nameof(ScheduledMessage.Envelope));
+ if (envelopeProperty is not null)
+ {
+ var columnName = envelopeProperty.GetColumnName(storeObject.Value);
+ if (columnName is not null)
+ {
+ scheduledMessage.Envelope = columnName;
+ }
+ }
+
+ var scheduledTimeProperty = entity.FindProperty(nameof(ScheduledMessage.ScheduledTime));
+ if (scheduledTimeProperty is not null)
+ {
+ var columnName = scheduledTimeProperty.GetColumnName(storeObject.Value);
+ if (columnName is not null)
+ {
+ scheduledMessage.ScheduledTime = columnName;
+ }
+ }
+
+ var timesSentProperty = entity.FindProperty(nameof(ScheduledMessage.TimesSent));
+ if (timesSentProperty is not null)
+ {
+ var columnName = timesSentProperty.GetColumnName(storeObject.Value);
+ if (columnName is not null)
+ {
+ scheduledMessage.TimesSent = columnName;
+ }
+ }
+
+ var createdAtProperty = entity.FindProperty(nameof(ScheduledMessage.CreatedAt));
+ if (createdAtProperty is not null)
+ {
+ var columnName = createdAtProperty.GetColumnName(storeObject.Value);
+ if (columnName is not null)
+ {
+ scheduledMessage.CreatedAt = columnName;
+ }
+ }
+
+ var maxAttemptsProperty = entity.FindProperty(nameof(ScheduledMessage.MaxAttempts));
+ if (maxAttemptsProperty is not null)
+ {
+ var columnName = maxAttemptsProperty.GetColumnName(storeObject.Value);
+ if (columnName is not null)
+ {
+ scheduledMessage.MaxAttempts = columnName;
+ }
+ }
+
+ var lastErrorProperty = entity.FindProperty(nameof(ScheduledMessage.LastError));
+ if (lastErrorProperty is not null)
+ {
+ var columnName = lastErrorProperty.GetColumnName(storeObject.Value);
+ if (columnName is not null)
+ {
+ scheduledMessage.LastError = columnName;
+ }
+ }
+ }
+}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore/Assembly.cs b/src/Mocha/src/Mocha.EntityFrameworkCore/Assembly.cs
new file mode 100644
index 00000000000..e830a08b3a0
--- /dev/null
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore/Assembly.cs
@@ -0,0 +1,3 @@
+using System.Runtime.CompilerServices;
+
+[assembly: InternalsVisibleTo("Mocha.Tests")]
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore/Mocha.EntityFrameworkCore.csproj b/src/Mocha/src/Mocha.EntityFrameworkCore/Mocha.EntityFrameworkCore.csproj
index 6e1118ae5b5..54d0c229cf2 100644
--- a/src/Mocha/src/Mocha.EntityFrameworkCore/Mocha.EntityFrameworkCore.csproj
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore/Mocha.EntityFrameworkCore.csproj
@@ -7,6 +7,7 @@
+
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore/Scheduling/SchedulingDbTransactionInterceptor.cs b/src/Mocha/src/Mocha.EntityFrameworkCore/Scheduling/SchedulingDbTransactionInterceptor.cs
new file mode 100644
index 00000000000..8f19bd8d792
--- /dev/null
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore/Scheduling/SchedulingDbTransactionInterceptor.cs
@@ -0,0 +1,31 @@
+using System.Data.Common;
+using Microsoft.EntityFrameworkCore.Diagnostics;
+using Mocha.Scheduling;
+
+namespace Mocha.EntityFrameworkCore;
+
+///
+/// Intercepts Entity Framework Core database transaction commit events to signal the scheduler
+/// that messages are ready for dispatch.
+///
+internal sealed class SchedulingDbTransactionInterceptor(ISchedulerSignal signal, TimeProvider timeProvider)
+ : DbTransactionInterceptor
+ , ISingletonInterceptor
+{
+ ///
+ public override Task TransactionCommittedAsync(
+ DbTransaction transaction,
+ TransactionEndEventData eventData,
+ CancellationToken cancellationToken = default)
+ {
+ TransactionCommitted(transaction, eventData);
+
+ return Task.CompletedTask;
+ }
+
+ ///
+ public override void TransactionCommitted(DbTransaction transaction, TransactionEndEventData eventData)
+ {
+ signal.Notify(timeProvider.GetUtcNow());
+ }
+}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore/Scheduling/SchedulingEntityFrameworkCorePersistenceBuilderExtensions.cs b/src/Mocha/src/Mocha.EntityFrameworkCore/Scheduling/SchedulingEntityFrameworkCorePersistenceBuilderExtensions.cs
new file mode 100644
index 00000000000..6c834d3da07
--- /dev/null
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore/Scheduling/SchedulingEntityFrameworkCorePersistenceBuilderExtensions.cs
@@ -0,0 +1,36 @@
+using Microsoft.EntityFrameworkCore.Diagnostics;
+using Microsoft.Extensions.DependencyInjection;
+using Mocha.Scheduling;
+
+namespace Mocha.EntityFrameworkCore;
+
+///
+/// Provides extension methods on for registering
+/// core scheduling interceptors that signal the scheduler after save and transaction commit.
+///
+public static class SchedulingEntityFrameworkCorePersistenceBuilderExtensions
+{
+ ///
+ /// Registers the core scheduling infrastructure, including EF Core interceptors that signal the
+ /// scheduler when changes are saved or transactions are committed.
+ ///
+ /// The Entity Framework Core builder to configure.
+ /// The same instance for chaining.
+ public static IEntityFrameworkCoreBuilder UseSchedulingCore(this IEntityFrameworkCoreBuilder builder)
+ {
+ builder.HostBuilder.UseSchedulerCore();
+
+ builder.ConfigureEntityFrameworkServices((sp, services) =>
+ {
+ var signal = sp.GetService();
+
+ if (signal is not null)
+ {
+ var timeProvider = sp.GetService() ?? TimeProvider.System;
+ services.AddSingleton(new SchedulingDbTransactionInterceptor(signal, timeProvider));
+ services.AddSingleton(new SchedulingSaveChangesInterceptor(signal, timeProvider));
+ }
+ });
+ return builder;
+ }
+}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore/Scheduling/SchedulingSaveChangesInterceptor.cs b/src/Mocha/src/Mocha.EntityFrameworkCore/Scheduling/SchedulingSaveChangesInterceptor.cs
new file mode 100644
index 00000000000..a682e56f22c
--- /dev/null
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore/Scheduling/SchedulingSaveChangesInterceptor.cs
@@ -0,0 +1,40 @@
+using Microsoft.EntityFrameworkCore.Diagnostics;
+using Mocha.Scheduling;
+
+namespace Mocha.EntityFrameworkCore;
+
+///
+/// Intercepts Entity Framework Core save changes events to signal the scheduler
+/// that messages are ready for dispatch.
+///
+internal sealed class SchedulingSaveChangesInterceptor(
+ ISchedulerSignal signal,
+ TimeProvider timeProvider)
+ : SaveChangesInterceptor
+ , ISingletonInterceptor
+{
+ ///
+ public override ValueTask SavedChangesAsync(
+ SaveChangesCompletedEventData eventData,
+ int result,
+ CancellationToken cancellationToken = default)
+ {
+ return new(SavedChanges(eventData, result));
+ }
+
+ ///
+ public override int SavedChanges(SaveChangesCompletedEventData eventData, int result)
+ {
+ if (eventData.Context is not { } context)
+ {
+ return result;
+ }
+
+ if (context.Database.CurrentTransaction is null)
+ {
+ signal.Notify(timeProvider.GetUtcNow());
+ }
+
+ return result;
+ }
+}
diff --git a/src/Mocha/src/Mocha.Mediator/MediatorMiddlewareFactoryContextExtensions.cs b/src/Mocha/src/Mocha.Mediator/MediatorMiddlewareFactoryContextExtensions.cs
index 8d5bfa72fbb..0605707f264 100644
--- a/src/Mocha/src/Mocha.Mediator/MediatorMiddlewareFactoryContextExtensions.cs
+++ b/src/Mocha/src/Mocha.Mediator/MediatorMiddlewareFactoryContextExtensions.cs
@@ -4,7 +4,7 @@ namespace Mocha.Mediator;
/// Extension methods for that allow middleware
/// factories to inspect the pipeline being compiled and decide whether to participate.
/// Returning next from a middleware factory when these checks fail eliminates the
-/// middleware from that pipeline entirely — zero runtime cost.
+/// middleware from that pipeline entirely - zero runtime cost.
///
public static class MediatorMiddlewareFactoryContextExtensions
{
diff --git a/src/Mocha/src/Mocha.Scheduling/Assembly.cs b/src/Mocha/src/Mocha.Scheduling/Assembly.cs
new file mode 100644
index 00000000000..e830a08b3a0
--- /dev/null
+++ b/src/Mocha/src/Mocha.Scheduling/Assembly.cs
@@ -0,0 +1,3 @@
+using System.Runtime.CompilerServices;
+
+[assembly: InternalsVisibleTo("Mocha.Tests")]
diff --git a/src/Mocha/src/Mocha.Scheduling/DispatchSchedulingMiddleware.cs b/src/Mocha/src/Mocha.Scheduling/DispatchSchedulingMiddleware.cs
new file mode 100644
index 00000000000..10c2b2893d1
--- /dev/null
+++ b/src/Mocha/src/Mocha.Scheduling/DispatchSchedulingMiddleware.cs
@@ -0,0 +1,84 @@
+using Microsoft.Extensions.DependencyInjection;
+using Mocha.Features;
+using Mocha.Middlewares;
+
+namespace Mocha.Scheduling;
+
+///
+/// Dispatch middleware that intercepts outgoing messages with a scheduled time and persists them
+/// to the scheduled message store instead of forwarding them to the next pipeline stage.
+///
+///
+/// Messages without a and dispatches marked with
+/// pass through to the next middleware.
+/// When the transport registers , this middleware is skipped
+/// entirely during pipeline construction.
+///
+public sealed class DispatchSchedulingMiddleware
+{
+ ///
+ /// Evaluates whether the message should be persisted to the scheduled message store or
+ /// forwarded down the pipeline.
+ ///
+ /// The current dispatch context containing the message envelope and metadata.
+ /// The next middleware delegate in the dispatch pipeline.
+ /// A value task that completes when the message has been persisted or forwarded.
+ public async ValueTask InvokeAsync(IDispatchContext context, DispatchDelegate next)
+ {
+ if (context.ScheduledTime is not { } scheduledTime)
+ {
+ await next(context);
+ return;
+ }
+
+ var feature = context.Features.GetOrSet();
+
+ if (feature.SkipScheduler)
+ {
+ await next(context);
+ return;
+ }
+
+ if (context.Envelope is null)
+ {
+ await next(context);
+ return;
+ }
+
+ var store = context.Services.GetRequiredService();
+ await store.PersistAsync(context.Envelope, scheduledTime, context.CancellationToken);
+ }
+
+ ///
+ /// Creates the middleware configuration that wires the scheduling middleware into the dispatch
+ /// pipeline.
+ ///
+ ///
+ /// If the transport declares with
+ /// set to true,
+ /// the middleware is not installed and the next delegate is returned directly.
+ ///
+ ///
+ /// A named "Scheduling" for pipeline registration.
+ ///
+ public static DispatchMiddlewareConfiguration Create()
+ => new(
+ static (context, next) =>
+ {
+ if (context.Transport.Features.Get()?.SupportsSchedulingNatively is true)
+ {
+ return next;
+ }
+
+ var appServices = context.Services.GetApplicationServices();
+ var isService = appServices.GetService();
+ if (isService?.IsService(typeof(IScheduledMessageStore)) is not true)
+ {
+ return next;
+ }
+
+ var middleware = new DispatchSchedulingMiddleware();
+ return ctx => middleware.InvokeAsync(ctx, next);
+ },
+ "Scheduling");
+}
diff --git a/src/Mocha/src/Mocha.Scheduling/IScheduledMessageStore.cs b/src/Mocha/src/Mocha.Scheduling/IScheduledMessageStore.cs
new file mode 100644
index 00000000000..55392c8879b
--- /dev/null
+++ b/src/Mocha/src/Mocha.Scheduling/IScheduledMessageStore.cs
@@ -0,0 +1,25 @@
+using Mocha.Middlewares;
+
+namespace Mocha.Scheduling;
+
+///
+/// Defines the contract for persisting outgoing message envelopes to a durable scheduled message store.
+///
+///
+/// Implementations are responsible for transactionally storing envelopes so they can be
+/// dispatched at the specified scheduled time, providing at-least-once delivery guarantees.
+///
+public interface IScheduledMessageStore
+{
+ ///
+ /// Persists the specified message envelope to the scheduled message store for future delivery.
+ ///
+ /// The message envelope to persist, containing headers and payload.
+ /// The time at which the message should be dispatched.
+ /// A token to cancel the persistence operation.
+ /// A value task that completes when the envelope has been durably stored.
+ ValueTask PersistAsync(
+ MessageEnvelope envelope,
+ DateTimeOffset scheduledTime,
+ CancellationToken cancellationToken);
+}
diff --git a/src/Mocha/src/Mocha.Scheduling/ISchedulerSignal.cs b/src/Mocha/src/Mocha.Scheduling/ISchedulerSignal.cs
new file mode 100644
index 00000000000..772566db57f
--- /dev/null
+++ b/src/Mocha/src/Mocha.Scheduling/ISchedulerSignal.cs
@@ -0,0 +1,23 @@
+namespace Mocha.Scheduling;
+
+///
+/// Represents a signal that can be used to notify a waiting thread that a scheduled message has been persisted.
+///
+public interface ISchedulerSignal
+{
+ ///
+ /// Called by the scheduling middleware after persisting a message.
+ /// The scheduler wakes only if is earlier than its current wake target.
+ ///
+ /// The time the persisted message is scheduled for delivery.
+ void Notify(DateTimeOffset scheduledTime);
+
+ ///
+ /// Sleeps until arrives, or a call with a time earlier than
+ /// is received, or the is cancelled.
+ ///
+ /// The time to wake up at if no earlier notification arrives.
+ /// A token to cancel the wait.
+ /// A task that completes when the signal wakes.
+ Task WaitUntilAsync(DateTimeOffset wakeTime, CancellationToken cancellationToken);
+}
diff --git a/src/Mocha/src/Mocha.Scheduling/MessageBusSchedulerSignal.cs b/src/Mocha/src/Mocha.Scheduling/MessageBusSchedulerSignal.cs
new file mode 100644
index 00000000000..410761b3ef8
--- /dev/null
+++ b/src/Mocha/src/Mocha.Scheduling/MessageBusSchedulerSignal.cs
@@ -0,0 +1,74 @@
+namespace Mocha.Scheduling;
+
+internal sealed class MessageBusSchedulerSignal(TimeProvider timeProvider)
+ : IDisposable
+ , ISchedulerSignal
+{
+ private static readonly TimeSpan s_maxDelay = TimeSpan.FromMinutes(5);
+
+ private readonly object _lock = new();
+
+ private DateTimeOffset _target = DateTimeOffset.MaxValue;
+ private CancellationTokenSource? _delayCts;
+
+ ///
+ public void Notify(DateTimeOffset scheduledTime)
+ {
+ lock (_lock)
+ {
+ if (scheduledTime >= _target)
+ {
+ return;
+ }
+
+ _target = scheduledTime;
+ _delayCts?.Cancel();
+ }
+ }
+
+ ///
+ public async Task WaitUntilAsync(DateTimeOffset wakeTime, CancellationToken cancellationToken)
+ {
+ CancellationTokenSource delayCts;
+
+ lock (_lock)
+ {
+ _target = wakeTime;
+ _delayCts?.Dispose();
+ _delayCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ delayCts = _delayCts;
+ }
+
+ var delay = wakeTime - timeProvider.GetUtcNow();
+
+ if (delay <= TimeSpan.Zero)
+ {
+ return;
+ }
+
+ if (delay > s_maxDelay)
+ {
+ delay = s_maxDelay;
+ }
+
+ try
+ {
+ await Task.Delay(delay, timeProvider, delayCts.Token);
+ }
+ catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
+ {
+ // Woken by Notify - return to dispatcher, which will re-query and re-sleep.
+ }
+ }
+
+ ///
+ public void Dispose()
+ {
+ lock (_lock)
+ {
+ _delayCts?.Cancel();
+ _delayCts?.Dispose();
+ _delayCts = null;
+ }
+ }
+}
diff --git a/src/Mocha/src/Mocha.Scheduling/Mocha.Scheduling.csproj b/src/Mocha/src/Mocha.Scheduling/Mocha.Scheduling.csproj
new file mode 100644
index 00000000000..ec3dc85c831
--- /dev/null
+++ b/src/Mocha/src/Mocha.Scheduling/Mocha.Scheduling.csproj
@@ -0,0 +1,10 @@
+
+
+ Mocha.Scheduling
+ Mocha.Scheduling
+
+
+
+
+
+
diff --git a/src/Mocha/src/Mocha.Scheduling/SchedulerCoreServiceCollectionExtensions.cs b/src/Mocha/src/Mocha.Scheduling/SchedulerCoreServiceCollectionExtensions.cs
new file mode 100644
index 00000000000..76d81d1a91e
--- /dev/null
+++ b/src/Mocha/src/Mocha.Scheduling/SchedulerCoreServiceCollectionExtensions.cs
@@ -0,0 +1,30 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+
+namespace Mocha.Scheduling;
+
+///
+/// Provides extension methods to register scheduling infrastructure on .
+///
+public static class SchedulerCoreServiceCollectionExtensions
+{
+ ///
+ /// Registers the core scheduling services and inserts the scheduling dispatch middleware into
+ /// the message bus pipeline.
+ ///
+ ///
+ /// Adds as a singleton and configures the dispatch pipeline
+ /// to persist outgoing messages with a
+ /// through instead of dispatching them directly.
+ ///
+ /// The message bus host builder to configure.
+ /// The same instance for chaining.
+ public static IMessageBusHostBuilder UseSchedulerCore(this IMessageBusHostBuilder builder)
+ {
+ builder.Services.TryAddSingleton(sp =>
+ new MessageBusSchedulerSignal(sp.GetService() ?? TimeProvider.System));
+ builder.ConfigureMessageBus(x => x.UseDispatch(DispatchSchedulingMiddleware.Create(), after: "Serialization"));
+
+ return builder;
+ }
+}
diff --git a/src/Mocha/src/Mocha.Scheduling/SchedulingDispatchContextExtensions.cs b/src/Mocha/src/Mocha.Scheduling/SchedulingDispatchContextExtensions.cs
new file mode 100644
index 00000000000..f997d0f0b4c
--- /dev/null
+++ b/src/Mocha/src/Mocha.Scheduling/SchedulingDispatchContextExtensions.cs
@@ -0,0 +1,22 @@
+using Mocha.Features;
+using Mocha.Middlewares;
+
+namespace Mocha.Scheduling;
+
+///
+/// Provides convenience methods on for scheduling control.
+///
+public static class SchedulingDispatchContextExtensions
+{
+ ///
+ /// Marks the current dispatch context to bypass the scheduler, causing the message to be sent
+ /// directly to the transport.
+ ///
+ /// The dispatch context to modify.
+ public static void SkipScheduler(this IDispatchContext context)
+ {
+ var feature = context.Features.GetOrSet();
+
+ feature.SkipScheduler = true;
+ }
+}
diff --git a/src/Mocha/src/Mocha.Scheduling/SchedulingMiddlewareFeature.cs b/src/Mocha/src/Mocha.Scheduling/SchedulingMiddlewareFeature.cs
new file mode 100644
index 00000000000..e114f8ac94b
--- /dev/null
+++ b/src/Mocha/src/Mocha.Scheduling/SchedulingMiddlewareFeature.cs
@@ -0,0 +1,37 @@
+using Mocha.Features;
+
+namespace Mocha.Scheduling;
+
+///
+/// A pooled feature that controls whether the scheduling middleware should be bypassed for a given dispatch.
+///
+///
+/// Attach this feature to the dispatch context's feature collection and set
+/// to true to send the message directly without persisting it to the scheduled message store.
+/// The feature is pooled and automatically reset between uses.
+///
+public sealed class SchedulingMiddlewareFeature : IPooledFeature
+{
+ ///
+ /// Gets or sets a value indicating whether the scheduling persistence step should be skipped
+ /// for the current dispatch.
+ ///
+ public bool SkipScheduler { get; set; }
+
+ ///
+ /// Initializes the feature from the pool, resetting to false.
+ ///
+ /// The initialization state provided by the feature pool (unused).
+ public void Initialize(object state)
+ {
+ SkipScheduler = false;
+ }
+
+ ///
+ /// Resets the feature state before returning it to the pool, clearing to false.
+ ///
+ public void Reset()
+ {
+ SkipScheduler = false;
+ }
+}
diff --git a/src/Mocha/src/Mocha.Scheduling/SchedulingTransportFeature.cs b/src/Mocha/src/Mocha.Scheduling/SchedulingTransportFeature.cs
new file mode 100644
index 00000000000..6e27414c543
--- /dev/null
+++ b/src/Mocha/src/Mocha.Scheduling/SchedulingTransportFeature.cs
@@ -0,0 +1,13 @@
+namespace Mocha.Scheduling;
+
+///
+/// Feature that contains metadata about the transport's scheduling capabilities.
+///
+public sealed class SchedulingTransportFeature
+{
+ ///
+ /// Indicates that the transport supports native scheduling, and that the dispatch scheduling
+ /// middleware should be skipped.
+ ///
+ public bool SupportsSchedulingNatively { get; set; }
+}
diff --git a/src/Mocha/src/Mocha.Transport.Postgres/Connection/PostgresMessageStore.cs b/src/Mocha/src/Mocha.Transport.Postgres/Connection/PostgresMessageStore.cs
index 523228ebf99..303250f19c7 100644
--- a/src/Mocha/src/Mocha.Transport.Postgres/Connection/PostgresMessageStore.cs
+++ b/src/Mocha/src/Mocha.Transport.Postgres/Connection/PostgresMessageStore.cs
@@ -30,6 +30,7 @@ public async Task PublishAsync(
ReadOnlyMemory body,
ReadOnlyMemory headers,
string topicName,
+ DateTimeOffset? scheduledTime,
CancellationToken cancellationToken)
{
await using var connection = await _connectionManager.OpenConnectionAsync(cancellationToken);
@@ -37,8 +38,8 @@ public async Task PublishAsync(
command.CommandText = $"""
WITH inserted_messages AS (
- INSERT INTO {_schemaOptions.MessageTable} (body, headers, queue_id)
- SELECT @body, @headers, qs.destination_id
+ INSERT INTO {_schemaOptions.MessageTable} (body, headers, scheduled_time, queue_id)
+ SELECT @body, @headers, @scheduled_time, qs.destination_id
FROM {_schemaOptions.QueueSubscriptionTable} qs
INNER JOIN {_schemaOptions.TopicTable} t ON qs.source_id = t.id
WHERE t.name = @topic_name
@@ -56,6 +57,11 @@ FROM inserted_messages
command.Parameters.Add(
new NpgsqlParameter("headers", NpgsqlDbType.Jsonb) { Value = !headers.IsEmpty ? headers : DBNull.Value });
command.Parameters.Add(new NpgsqlParameter("topic_name", NpgsqlDbType.Text) { Value = topicName });
+ command.Parameters.Add(
+ new NpgsqlParameter("scheduled_time", NpgsqlDbType.TimestampTz)
+ {
+ Value = scheduledTime.HasValue ? scheduledTime.Value.UtcDateTime : DBNull.Value
+ });
await command.ExecuteNonQueryAsync(cancellationToken);
}
@@ -67,6 +73,7 @@ public async Task SendAsync(
ReadOnlyMemory body,
ReadOnlyMemory headers,
string queueName,
+ DateTimeOffset? scheduledTime,
CancellationToken cancellationToken)
{
await using var connection = await _connectionManager.OpenConnectionAsync(cancellationToken);
@@ -77,8 +84,8 @@ WITH queue_info AS (
SELECT id FROM {_schemaOptions.QueueTable} WHERE name = @queue_name LIMIT 1
),
inserted_message AS (
- INSERT INTO {_schemaOptions.MessageTable} (body, headers, queue_id)
- SELECT @body, @headers, queue_info.id
+ INSERT INTO {_schemaOptions.MessageTable} (body, headers, scheduled_time, queue_id)
+ SELECT @body, @headers, @scheduled_time, queue_info.id
FROM queue_info
RETURNING queue_id
)
@@ -94,6 +101,11 @@ FROM inserted_message
command.Parameters.Add(
new NpgsqlParameter("headers", NpgsqlDbType.Jsonb) { Value = !headers.IsEmpty ? headers : DBNull.Value });
command.Parameters.Add(new NpgsqlParameter("queue_name", NpgsqlDbType.Text) { Value = queueName });
+ command.Parameters.Add(
+ new NpgsqlParameter("scheduled_time", NpgsqlDbType.TimestampTz)
+ {
+ Value = scheduledTime.HasValue ? scheduledTime.Value.UtcDateTime : DBNull.Value
+ });
await command.ExecuteNonQueryAsync(cancellationToken);
}
diff --git a/src/Mocha/src/Mocha.Transport.Postgres/Mocha.Transport.Postgres.csproj b/src/Mocha/src/Mocha.Transport.Postgres/Mocha.Transport.Postgres.csproj
index a2cb0472642..e9a1153a4f4 100644
--- a/src/Mocha/src/Mocha.Transport.Postgres/Mocha.Transport.Postgres.csproj
+++ b/src/Mocha/src/Mocha.Transport.Postgres/Mocha.Transport.Postgres.csproj
@@ -6,6 +6,7 @@
+
diff --git a/src/Mocha/src/Mocha.Transport.Postgres/PostgresDispatchEndpoint.cs b/src/Mocha/src/Mocha.Transport.Postgres/PostgresDispatchEndpoint.cs
index 56c0f6e0a9c..d62cc1a24cf 100644
--- a/src/Mocha/src/Mocha.Transport.Postgres/PostgresDispatchEndpoint.cs
+++ b/src/Mocha/src/Mocha.Transport.Postgres/PostgresDispatchEndpoint.cs
@@ -52,6 +52,7 @@ protected override async ValueTask DispatchAsync(IDispatchContext context)
var feature = context.Features.GetOrSet();
var headers = WriteHeadersJson(feature, envelope);
var body = envelope.Body;
+ var scheduledTime = envelope.ScheduledTime;
if (Kind == DispatchEndpointKind.Reply)
{
@@ -71,13 +72,13 @@ protected override async ValueTask DispatchAsync(IDispatchContext context)
if (kind is "t")
{
- await messageStore.PublishAsync(body, headers, new string(name), cancellationToken);
+ await messageStore.PublishAsync(body, headers, new string(name), scheduledTime, cancellationToken);
return;
}
if (kind is "q")
{
- await messageStore.SendAsync(body, headers, new string(name), cancellationToken);
+ await messageStore.SendAsync(body, headers, new string(name), scheduledTime, cancellationToken);
return;
}
}
@@ -88,11 +89,11 @@ protected override async ValueTask DispatchAsync(IDispatchContext context)
if (Topic is not null)
{
- await messageStore.PublishAsync(body, headers, Topic.Name, cancellationToken);
+ await messageStore.PublishAsync(body, headers, Topic.Name, scheduledTime, cancellationToken);
}
else if (Queue is not null)
{
- await messageStore.SendAsync(body, headers, Queue.Name, cancellationToken);
+ await messageStore.SendAsync(body, headers, Queue.Name, scheduledTime, cancellationToken);
}
else
{
@@ -198,6 +199,11 @@ private static ReadOnlyMemory WriteHeadersJson(JsonHeadersFeature feature,
writer.WriteString(PostgresMessageHeaders.DeliverBy, deliverBy.ToString("O"));
}
+ if (envelope.ScheduledTime is { } scheduledTime)
+ {
+ writer.WriteString(PostgresMessageHeaders.ScheduledTime, scheduledTime.ToString("O"));
+ }
+
if (envelope.Headers is not null)
{
foreach (var header in envelope.Headers)
diff --git a/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessageEnvelopeParser.cs b/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessageEnvelopeParser.cs
index 35b031e4447..6c32c021b9b 100644
--- a/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessageEnvelopeParser.cs
+++ b/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessageEnvelopeParser.cs
@@ -30,6 +30,7 @@ public MessageEnvelope Parse(PostgresMessageItem messageItem)
string? contentType = null;
string? messageType = null;
DateTimeOffset? deliverBy = null;
+ DateTimeOffset? scheduledTime = null;
ImmutableArray? enclosedMessageTypes = null;
Headers? customHeaders = null;
@@ -121,11 +122,21 @@ public MessageEnvelope Parse(PostgresMessageItem messageItem)
reader.Read();
if (reader.TokenType == JsonTokenType.String
- && DateTimeOffset.TryParse(reader.GetString(), out var parsed))
+ && reader.TryGetDateTimeOffset(out var parsed))
{
deliverBy = parsed;
}
}
+ else if (reader.ValueTextEquals(PostgresMessageHeaders.ScheduledTime))
+ {
+ reader.Read();
+
+ if (reader.TokenType == JsonTokenType.String
+ && reader.TryGetDateTimeOffset(out var parsed))
+ {
+ scheduledTime = parsed;
+ }
+ }
else
{
var key = reader.GetString()!;
@@ -177,6 +188,7 @@ public MessageEnvelope Parse(PostgresMessageItem messageItem)
Headers = customHeaders ?? Headers.Empty(),
SentAt = new DateTimeOffset(messageItem.SentTime, TimeSpan.Zero),
DeliverBy = deliverBy,
+ ScheduledTime = scheduledTime,
DeliveryCount = messageItem.DeliveryCount,
Body = messageItem.Body
};
diff --git a/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessageHeaders.cs b/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessageHeaders.cs
index a577f6d6190..eb4eaf98700 100644
--- a/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessageHeaders.cs
+++ b/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessageHeaders.cs
@@ -64,4 +64,9 @@ internal static class PostgresMessageHeaders
/// Header key for the deadline by which the message must be delivered and processed.
///
public static ReadOnlySpan DeliverBy => "deliverBy"u8;
+
+ ///
+ /// Header key for the earliest time at which the message should be made available for consumption.
+ ///
+ public static ReadOnlySpan ScheduledTime => "scheduledTime"u8;
}
diff --git a/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessagingTransport.cs b/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessagingTransport.cs
index a7d50c3346e..224f0899bb8 100644
--- a/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessagingTransport.cs
+++ b/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessagingTransport.cs
@@ -1,6 +1,8 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
+using Mocha.Features;
+using Mocha.Scheduling;
using Mocha.Transport.Postgres.Tasks;
using static System.StringSplitOptions;
@@ -111,6 +113,8 @@ protected override void OnAfterInitialized(IMessagingSetupContext context)
{
_topology.AddSubscription(subscription);
}
+
+ Features.GetOrSet().SupportsSchedulingNatively = true;
}
///
diff --git a/src/Mocha/src/Mocha/Abstractions/Context/IDispatchContext.cs b/src/Mocha/src/Mocha/Abstractions/Context/IDispatchContext.cs
index 5d90db11674..39b3b80f0b2 100644
--- a/src/Mocha/src/Mocha/Abstractions/Context/IDispatchContext.cs
+++ b/src/Mocha/src/Mocha/Abstractions/Context/IDispatchContext.cs
@@ -88,6 +88,11 @@ public interface IDispatchContext : IExecutionContext, IFeatureProvider
///
DateTimeOffset? DeliverBy { get; set; }
+ ///
+ /// Gets or sets the optional time at which the message should be made available for consumption.
+ ///
+ DateTimeOffset? ScheduledTime { get; set; }
+
///
/// Gets the writable memory buffer used to hold the serialized message body.
///
diff --git a/src/Mocha/src/Mocha/Execution/PublishOptions.cs b/src/Mocha/src/Mocha/Execution/PublishOptions.cs
index f32e0d2edd4..9f7ec417216 100644
--- a/src/Mocha/src/Mocha/Execution/PublishOptions.cs
+++ b/src/Mocha/src/Mocha/Execution/PublishOptions.cs
@@ -6,7 +6,7 @@ namespace Mocha;
public readonly struct PublishOptions
{
///
- /// TODO this is currently not wired up
+ /// Gets the scheduled delivery time, or null for immediate delivery.
///
public DateTimeOffset? ScheduledTime { get; init; }
diff --git a/src/Mocha/src/Mocha/Extensions/TimeProviderExtensions.cs b/src/Mocha/src/Mocha/Extensions/TimeProviderExtensions.cs
new file mode 100644
index 00000000000..308ff77bf0d
--- /dev/null
+++ b/src/Mocha/src/Mocha/Extensions/TimeProviderExtensions.cs
@@ -0,0 +1,13 @@
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Mocha;
+
+internal static class TimeProviderExtensions
+{
+ public static TimeProvider GetTimeProvider(this IServiceProvider serviceProvider)
+ {
+ var timeProvider = serviceProvider.GetService();
+
+ return timeProvider ?? TimeProvider.System;
+ }
+}
diff --git a/src/Mocha/src/Mocha/MessageBusSchedulingExtensions.cs b/src/Mocha/src/Mocha/MessageBusSchedulingExtensions.cs
new file mode 100644
index 00000000000..1d4f8ef2b29
--- /dev/null
+++ b/src/Mocha/src/Mocha/MessageBusSchedulingExtensions.cs
@@ -0,0 +1,39 @@
+namespace Mocha;
+
+///
+/// Provides convenience extension methods on for scheduling messages.
+///
+public static class MessageBusSchedulingExtensions
+{
+ ///
+ /// Sends a message scheduled for delivery at the specified absolute time.
+ ///
+ /// The message bus to send through.
+ /// The message instance to send.
+ /// The absolute time at which the message should be delivered.
+ /// A token to cancel the send operation.
+ /// A task that completes when the message has been handed off to the dispatch pipeline.
+ public static ValueTask ScheduleSendAsync(
+ this IMessageBus bus,
+ object message,
+ DateTimeOffset scheduledTime,
+ CancellationToken cancellationToken = default)
+ => bus.SendAsync(message, new SendOptions { ScheduledTime = scheduledTime }, cancellationToken);
+
+ ///
+ /// Publishes a message scheduled for delivery at the specified absolute time.
+ ///
+ /// The type of the message to publish.
+ /// The message bus to publish through.
+ /// The message instance to publish.
+ /// The absolute time at which the message should be delivered.
+ /// A token to cancel the publish operation.
+ /// A task that completes when the message has been handed off to the dispatch pipeline.
+ public static ValueTask SchedulePublishAsync(
+ this IMessageBus bus,
+ T message,
+ DateTimeOffset scheduledTime,
+ CancellationToken cancellationToken = default)
+ where T : notnull
+ => bus.PublishAsync(message, new PublishOptions { ScheduledTime = scheduledTime }, cancellationToken);
+}
diff --git a/src/Mocha/src/Mocha/Middlewares/DefaultMessageBus.cs b/src/Mocha/src/Mocha/Middlewares/DefaultMessageBus.cs
index b4e72a62cff..d83dc989013 100644
--- a/src/Mocha/src/Mocha/Middlewares/DefaultMessageBus.cs
+++ b/src/Mocha/src/Mocha/Middlewares/DefaultMessageBus.cs
@@ -65,6 +65,7 @@ public async ValueTask PublishAsync(T message, PublishOptions options, Cancel
context.Message = message;
context.AddHeaders(options.Headers);
context.Headers.SetMessageKind(MessageKind.Publish);
+ context.ScheduledTime = options.ScheduledTime;
context.DeliverBy = options.ExpirationTime;
await endpoint.ExecuteAsync(context);
@@ -118,8 +119,7 @@ public async ValueTask SendAsync(object message, SendOptions options, Cancellati
context.Headers.SetMessageKind(MessageKind.Send);
context.ResponseAddress = replyEndpoint;
context.FaultAddress = faultEndpoint;
- // TODO scheduling is currenlty not supported
- //context.ScheduledTime = options.ScheduledTime;
+ context.ScheduledTime = options.ScheduledTime;
context.DeliverBy = options.ExpirationTime;
await endpoint.ExecuteAsync(context);
@@ -259,8 +259,6 @@ private async ValueTask RequestAndWaitAsync(
// var operationName = $"send {endpoint}";
var correlationId = Guid.NewGuid().ToString();
- // var scheduledTime = options.ScheduledTime;
-
var headers = options.Headers;
var waitHandle = _deferredResponseManager.AddPromise(correlationId);
@@ -277,6 +275,7 @@ private async ValueTask RequestAndWaitAsync(
context.Headers.SetMessageKind(MessageKind.Request);
context.ResponseAddress = replyEndpoint ?? endpoint.Transport.ReplyReceiveEndpoint?.Source.Address;
context.FaultAddress = faultEndpoint;
+ context.ScheduledTime = options.ScheduledTime;
context.DeliverBy = options.ExpirationTime;
await endpoint.ExecuteAsync(context);
diff --git a/src/Mocha/src/Mocha/Middlewares/Dispatch/DispatchSerializerMiddleware.cs b/src/Mocha/src/Mocha/Middlewares/Dispatch/DispatchSerializerMiddleware.cs
index c2de9906a70..808d4707a6e 100644
--- a/src/Mocha/src/Mocha/Middlewares/Dispatch/DispatchSerializerMiddleware.cs
+++ b/src/Mocha/src/Mocha/Middlewares/Dispatch/DispatchSerializerMiddleware.cs
@@ -87,6 +87,7 @@ public static MessageEnvelope CreateEnvelope(this IDispatchContext context)
Host = context.Host,
SentAt = context.SentAt,
DeliverBy = context.DeliverBy,
+ ScheduledTime = context.ScheduledTime,
DeliveryCount = 0,
Headers = context.Headers,
Body = context.Body.WrittenMemory
diff --git a/src/Mocha/src/Mocha/Middlewares/DispatchContext.cs b/src/Mocha/src/Mocha/Middlewares/DispatchContext.cs
index 2a2c81def99..264c0a1cb54 100644
--- a/src/Mocha/src/Mocha/Middlewares/DispatchContext.cs
+++ b/src/Mocha/src/Mocha/Middlewares/DispatchContext.cs
@@ -151,6 +151,11 @@ public DispatchContext()
///
public DateTimeOffset? DeliverBy { get; set; }
+ ///
+ /// Gets or sets the optional time at which the message should be made available for consumption.
+ ///
+ public DateTimeOffset? ScheduledTime { get; set; }
+
///
/// Gets or sets the serialized message envelope, available after serialization middleware runs.
///
@@ -190,6 +195,7 @@ public void Reset()
FaultAddress = null!;
SentAt = DateTimeOffset.UtcNow;
DeliverBy = null;
+ ScheduledTime = null;
Host = null!;
Envelope = null!;
_headers.Clear();
diff --git a/src/Mocha/src/Mocha/Observability/SemanticConventions.cs b/src/Mocha/src/Mocha/Observability/SemanticConventions.cs
index a83971c046a..8276ddcb09d 100644
--- a/src/Mocha/src/Mocha/Observability/SemanticConventions.cs
+++ b/src/Mocha/src/Mocha/Observability/SemanticConventions.cs
@@ -263,8 +263,13 @@ public static Activity SetConversationId(this Activity activity, string? value)
/// The activity to enrich.
/// The message identifier to record.
/// The same instance for fluent chaining.
- public static Activity SetMessageId(this Activity activity, string value)
+ public static Activity SetMessageId(this Activity activity, string? value)
{
+ if (value is null)
+ {
+ return activity;
+ }
+
activity.SetTag(MessagingMessageId, value);
return activity;
}
diff --git a/src/Mocha/src/Mocha/Sagas/Descriptors/SagaLifeCycleDescriptorExtensions.cs b/src/Mocha/src/Mocha/Sagas/Descriptors/SagaLifeCycleDescriptorExtensions.cs
index 44f8a2bdcc3..f07e4391b13 100644
--- a/src/Mocha/src/Mocha/Sagas/Descriptors/SagaLifeCycleDescriptorExtensions.cs
+++ b/src/Mocha/src/Mocha/Sagas/Descriptors/SagaLifeCycleDescriptorExtensions.cs
@@ -22,16 +22,13 @@ public static ISagaLifeCycleDescriptor ScheduledPublish new PublishOptions { ScheduledTime = DateTimeOffset.UtcNow.Add(delay) }
- // };
+ var options = new SagaPublishOptions
+ {
+ ConfigureOptions = (ctx, _) =>
+ new PublishOptions { ScheduledTime = ctx.Services.GetTimeProvider().GetUtcNow().Add(delay) }
+ };
- // return descriptor.Publish((_, state) => factory(state), options);
-
- // TODO for this we need scheduling
- throw new NotImplementedException(
- "Scheduled publish is not yet implemented. This requires support for delayed message dispatching in the underlying messaging system.");
+ return descriptor.Publish((_, state) => factory(state), options);
}
///
@@ -50,15 +47,13 @@ public static ISagaLifeCycleDescriptor ScheduledSend(
where TMessage : notnull
where TState : SagaStateBase
{
- // var options = new SagaSendOptions
- // {
- // ConfigureOptions = (_, _) => new SendOptions { ScheduledTime = DateTimeOffset.UtcNow.Add(delay) }
- // };
+ var options = new SagaSendOptions
+ {
+ ConfigureOptions = (ctx, _) =>
+ new SendOptions { ScheduledTime = ctx.Services.GetTimeProvider().GetUtcNow().Add(delay) }
+ };
- // return descriptor.Send((_, state) => factory(state), options);
- // TODO for this we need scheduling
- throw new NotImplementedException(
- "Scheduled send is not yet implemented. This requires support for delayed message dispatching in the underlying messaging system.");
+ return descriptor.Send((_, state) => factory(state), options);
}
///
diff --git a/src/Mocha/src/Mocha/Sagas/Descriptors/SagaTransitionDescriptorExtensions.cs b/src/Mocha/src/Mocha/Sagas/Descriptors/SagaTransitionDescriptorExtensions.cs
index 3fbddd0ac2b..4912db80224 100644
--- a/src/Mocha/src/Mocha/Sagas/Descriptors/SagaTransitionDescriptorExtensions.cs
+++ b/src/Mocha/src/Mocha/Sagas/Descriptors/SagaTransitionDescriptorExtensions.cs
@@ -22,15 +22,13 @@ public static ISagaTransitionDescriptor ScheduledPublish factory)
where TMessage : notnull
{
- // var options = new SagaPublishOptions
- // {
- // ConfigureOptions = (_, _) => new PublishOptions { ScheduledTime = DateTimeOffset.UtcNow.Add(delay) }
- // };
+ var options = new SagaPublishOptions
+ {
+ ConfigureOptions = (ctx, _) =>
+ new PublishOptions { ScheduledTime = ctx.Services.GetTimeProvider().GetUtcNow().Add(delay) }
+ };
- // return descriptor.Publish((_, state) => factory(state), options);
- // TODO for this we need scheduling
- throw new NotImplementedException(
- "Scheduled publish is not yet implemented. This requires support for delayed message dispatching in the underlying messaging system.");
+ return descriptor.Publish((_, state) => factory(state), options);
}
///
@@ -49,15 +47,13 @@ public static ISagaTransitionDescriptor ScheduledSend factory)
where TMessage : notnull
{
- // var options = new SagaSendOptions
- // {
- // ConfigureOptions = (_, _) => new SendOptions { ScheduledTime = DateTimeOffset.UtcNow.Add(delay) }
- // };
+ var options = new SagaSendOptions
+ {
+ ConfigureOptions = (ctx, _) =>
+ new SendOptions { ScheduledTime = ctx.Services.GetTimeProvider().GetUtcNow().Add(delay) }
+ };
- // return descriptor.Send((_, state) => factory(state), options);
- // TODO for this we need scheduling
- throw new NotImplementedException(
- "Scheduled send is not yet implemented. This requires support for delayed message dispatching in the underlying messaging system.");
+ return descriptor.Send((_, state) => factory(state), options);
}
///
diff --git a/src/Mocha/src/Mocha/Transport/MessageEnvelope.cs b/src/Mocha/src/Mocha/Transport/MessageEnvelope.cs
index f5633848f3d..931ed6ff75b 100644
--- a/src/Mocha/src/Mocha/Transport/MessageEnvelope.cs
+++ b/src/Mocha/src/Mocha/Transport/MessageEnvelope.cs
@@ -36,6 +36,7 @@ public MessageEnvelope(MessageEnvelope envelope)
MessageType = envelope.MessageType;
SentAt = envelope.SentAt;
DeliverBy = envelope.DeliverBy;
+ ScheduledTime = envelope.ScheduledTime;
DeliveryCount = envelope.DeliveryCount;
Headers = envelope.Headers is not null ? new Headers(envelope.Headers) : null;
Body = envelope.Body;
@@ -105,6 +106,11 @@ public MessageEnvelope(MessageEnvelope envelope)
///
public DateTimeOffset? DeliverBy { get; init; }
+ ///
+ /// The earliest time at which the message should be made available for consumption.
+ ///
+ public DateTimeOffset? ScheduledTime { get; init; }
+
///
/// Delivery attempt counter.
///
@@ -172,6 +178,9 @@ public static class Properties
/// Property name for .
public const string DeliverBy = "deliverBy";
+ /// Property name for .
+ public const string ScheduledTime = "scheduledTime";
+
/// Property name for .
public const string DeliveryCount = "deliveryCount";
diff --git a/src/Mocha/src/Mocha/Transport/MessageEnvelopeReader.cs b/src/Mocha/src/Mocha/Transport/MessageEnvelopeReader.cs
index 35402c72c84..918f648710e 100644
--- a/src/Mocha/src/Mocha/Transport/MessageEnvelopeReader.cs
+++ b/src/Mocha/src/Mocha/Transport/MessageEnvelopeReader.cs
@@ -87,6 +87,11 @@ public void WriteMessage(MessageEnvelope envelope)
writer.WriteString(MessageEnvelope.Properties.DeliverBy, envelope.DeliverBy.Value);
}
+ if (envelope.ScheduledTime is not null)
+ {
+ writer.WriteString(MessageEnvelope.Properties.ScheduledTime, envelope.ScheduledTime.Value);
+ }
+
if (envelope.DeliveryCount is not null)
{
writer.WriteNumber(MessageEnvelope.Properties.DeliveryCount, envelope.DeliveryCount.Value);
@@ -138,6 +143,7 @@ public static MessageEnvelope Parse(ReadOnlyMemory body)
private ImmutableArray? _enclosedMessageTypes;
private DateTimeOffset? _sentAt;
private DateTimeOffset? _deliverBy;
+ private DateTimeOffset? _scheduledTime;
private int _attempt;
private IHeaders? _headers;
private ReadOnlyMemory _body;
@@ -220,6 +226,11 @@ public MessageEnvelope ReadMessage()
reader.Read();
_deliverBy = reader.GetDateTimeOffset();
}
+ else if (reader.ValueTextEquals(MessageEnvelope.Properties.ScheduledTime))
+ {
+ reader.Read();
+ _scheduledTime = reader.GetDateTimeOffset();
+ }
else if (reader.ValueTextEquals(MessageEnvelope.Properties.DeliveryCount))
{
reader.Read();
@@ -276,6 +287,7 @@ public MessageEnvelope ReadMessage()
EnclosedMessageTypes = _enclosedMessageTypes,
SentAt = _sentAt,
DeliverBy = _deliverBy,
+ ScheduledTime = _scheduledTime,
DeliveryCount = _attempt,
Headers = _headers,
Body = _body
diff --git a/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/Helpers/TestDbContext.cs b/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/Helpers/TestDbContext.cs
index 960aa91014b..0a571ca148c 100644
--- a/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/Helpers/TestDbContext.cs
+++ b/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/Helpers/TestDbContext.cs
@@ -2,6 +2,7 @@
using Mocha.Inbox;
using Mocha.Outbox;
using Mocha.Sagas.EfCore;
+using Mocha.Scheduling;
namespace Mocha.EntityFrameworkCore.Postgres.Tests.Helpers;
@@ -12,5 +13,6 @@ protected override void OnModelCreating(ModelBuilder modelBuilder)
modelBuilder.AddPostgresInbox();
modelBuilder.AddPostgresOutbox();
modelBuilder.AddPostgresSagas();
+ modelBuilder.AddPostgresScheduledMessages();
}
}
diff --git a/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/Mocha.EntityFrameworkCore.Postgres.Tests.csproj b/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/Mocha.EntityFrameworkCore.Postgres.Tests.csproj
index 1db0c7b2bef..47af3cd4731 100644
--- a/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/Mocha.EntityFrameworkCore.Postgres.Tests.csproj
+++ b/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/Mocha.EntityFrameworkCore.Postgres.Tests.csproj
@@ -16,5 +16,6 @@
+
diff --git a/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/PostgresOutboxIntegrationTests.cs b/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/PostgresOutboxIntegrationTests.cs
index 162918a9316..39ae7974e5a 100644
--- a/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/PostgresOutboxIntegrationTests.cs
+++ b/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/PostgresOutboxIntegrationTests.cs
@@ -1,5 +1,6 @@
using System.Collections.Concurrent;
using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Mocha.EntityFrameworkCore.Postgres.Tests.Helpers;
@@ -102,7 +103,8 @@ public async Task Outbox_Should_ProcessPendingMessages_When_WorkerStartsAfterPer
var services = new ServiceCollection();
services.AddSingleton(recorder);
services.AddLogging();
- services.AddDbContext(o => o.UseNpgsql(connectionString));
+ services.AddDbContext(o => o.UseNpgsql(connectionString)
+ .ConfigureWarnings(w => w.Ignore(CoreEventId.ManyServiceProvidersCreatedWarning)));
services.AddSingleton();
var builder = services.AddMessageBus();
@@ -178,7 +180,8 @@ public async Task Outbox_Should_ResumeProcessing_When_WorkerRestartedAfterInterr
var services = new ServiceCollection();
services.AddSingleton(recorder);
services.AddLogging();
- services.AddDbContext(o => o.UseNpgsql(connectionString));
+ services.AddDbContext(o => o.UseNpgsql(connectionString)
+ .ConfigureWarnings(w => w.Ignore(CoreEventId.ManyServiceProvidersCreatedWarning)));
services.AddSingleton();
var builder = services.AddMessageBus();
@@ -329,7 +332,8 @@ private async Task CreateBusWithOutboxAsync(MessageRecorder rec
var services = new ServiceCollection();
services.AddSingleton(recorder);
services.AddLogging();
- services.AddDbContext(o => o.UseNpgsql(connectionString));
+ services.AddDbContext(o => o.UseNpgsql(connectionString)
+ .ConfigureWarnings(w => w.Ignore(CoreEventId.ManyServiceProvidersCreatedWarning)));
// Register the resilient signal BEFORE UsePostgresOutbox() so that
// TryAddSingleton in AddOutboxCore() is a no-op.
diff --git a/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/PostgresSchedulingIntegrationTests.cs b/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/PostgresSchedulingIntegrationTests.cs
new file mode 100644
index 00000000000..8c9e6a8f1c3
--- /dev/null
+++ b/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/PostgresSchedulingIntegrationTests.cs
@@ -0,0 +1,548 @@
+using System.Collections.Concurrent;
+using System.Text.Json;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Diagnostics;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Mocha.EntityFrameworkCore.Postgres.Tests.Helpers;
+using Mocha.Scheduling;
+using Mocha.Transport.InMemory;
+
+namespace Mocha.EntityFrameworkCore.Postgres.Tests;
+
+public sealed class PostgresSchedulingIntegrationTests(PostgresFixture fixture) : IClassFixture
+{
+ private static readonly TimeSpan s_timeout = TimeSpan.FromSeconds(30);
+
+ [Fact]
+ public async Task Scheduler_Should_DispatchMessage_When_ScheduledTimeReached()
+ {
+ // Arrange
+ var recorder = new MessageRecorder();
+ await using var env = await CreateBusWithSchedulingAsync(recorder);
+
+ using var scope = env.Provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+
+ // Act
+ await bus.PublishAsync(
+ new TestEvent { Payload = "hello" },
+ new PublishOptions { ScheduledTime = TimeProvider.System.GetUtcNow() },
+ default);
+
+ // Assert
+ Assert.True(await recorder.WaitAsync(s_timeout), "Handler should have received the scheduled message");
+ var received = Assert.Single(recorder.Messages.OfType());
+ Assert.Equal("hello", received.Payload);
+ }
+
+ [Fact]
+ public async Task Scheduler_Should_DispatchAllMessages_When_MultipleMessagesScheduled()
+ {
+ // Arrange
+ const int count = 5;
+ var recorder = new MessageRecorder();
+ await using var env = await CreateBusWithSchedulingAsync(recorder);
+
+ using var scope = env.Provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+
+ // Act
+ for (var i = 0; i < count; i++)
+ {
+ await bus.PublishAsync(
+ new TestEvent { Payload = $"msg-{i}" },
+ new PublishOptions { ScheduledTime = TimeProvider.System.GetUtcNow() },
+ default);
+ }
+
+ // Assert
+ Assert.True(
+ await recorder.WaitAsync(s_timeout, count),
+ $"Handler should have received all {count} scheduled messages");
+ var payloads = recorder.Messages.OfType().Select(e => e.Payload).OrderBy(p => p).ToList();
+ Assert.Equal(count, payloads.Count);
+ for (var i = 0; i < count; i++)
+ {
+ Assert.Contains($"msg-{i}", payloads);
+ }
+ }
+
+ [Fact]
+ public async Task Scheduler_Should_ProcessPendingMessages_When_WorkerStartsAfterPersist()
+ {
+ // Arrange - persist messages before the worker starts
+ const int count = 3;
+ var connectionString = await fixture.CreateDatabaseAsync();
+ var recorder = new MessageRecorder();
+
+ // Phase 1: build bus but don't start the hosted services (worker)
+ var services = new ServiceCollection();
+ services.AddSingleton(recorder);
+ services.AddLogging();
+ services.AddDbContext(o => o.UseNpgsql(connectionString)
+ .ConfigureWarnings(w => w.Ignore(CoreEventId.ManyServiceProvidersCreatedWarning)));
+ services.AddSingleton(new ResilientSchedulerSignal());
+
+ var builder = services.AddMessageBus();
+ builder.AddEntityFramework(ef => ef.UsePostgresScheduling());
+ builder.AddEventHandler();
+ builder.AddInMemory();
+
+ var provider = services.BuildServiceProvider();
+ var runtime = (MessagingRuntime)provider.GetRequiredService();
+ await runtime.StartAsync(default);
+
+ // Ensure schema exists
+ using (var scope = provider.CreateScope())
+ {
+ var db = scope.ServiceProvider.GetRequiredService();
+ await db.Database.EnsureCreatedAsync(default);
+ }
+
+ // Persist messages via IMessageBus (scheduling middleware captures them)
+ for (var i = 0; i < count; i++)
+ {
+ using var scope = provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+ await bus.PublishAsync(
+ new TestEvent { Payload = $"pending-{i}" },
+ new PublishOptions { ScheduledTime = TimeProvider.System.GetUtcNow() },
+ default);
+ }
+
+ // Phase 2: start the scheduling worker (hosted services)
+ var hostedServices = provider.GetServices().ToList();
+ foreach (var svc in hostedServices)
+ {
+ await svc.StartAsync(default);
+ }
+
+ try
+ {
+ // Assert - all pre-existing messages are processed
+ Assert.True(
+ await recorder.WaitAsync(s_timeout, count),
+ "Worker should process messages that were persisted before it started");
+
+ var payloads = recorder.Messages.OfType().Select(e => e.Payload).ToHashSet();
+ Assert.Equal(count, payloads.Count);
+ }
+ finally
+ {
+ foreach (var svc in hostedServices)
+ {
+ await svc.StopAsync(default);
+ }
+
+ // Allow in-flight processor transactions to drain (see TestEnvironment comment)
+ await Task.Delay(250, default);
+
+ await provider.DisposeAsync();
+ }
+ }
+
+ [Fact]
+ public async Task Scheduler_Should_DeleteMessage_When_DispatchSucceeds()
+ {
+ // Arrange
+ var recorder = new MessageRecorder();
+ await using var env = await CreateBusWithSchedulingAsync(recorder);
+
+ using var scope = env.Provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+
+ // Act
+ await bus.PublishAsync(
+ new TestEvent { Payload = "delete-me" },
+ new PublishOptions { ScheduledTime = TimeProvider.System.GetUtcNow() },
+ default);
+
+ // Wait for handler to receive the message
+ Assert.True(await recorder.WaitAsync(s_timeout), "Handler should have received the scheduled message");
+
+ // Assert - the row should have been deleted after successful dispatch.
+ // Give a brief moment for the DELETE to commit after the handler returns.
+ await Task.Delay(500);
+
+ using var verifyScope = env.Provider.CreateScope();
+ var db = verifyScope.ServiceProvider.GetRequiredService();
+ var remaining = await db.Database
+ .SqlQueryRaw(
+ "SELECT CAST(COUNT(*) AS INTEGER) AS \"Value\" FROM \"scheduled_messages\"")
+ .SingleAsync();
+ Assert.Equal(0, remaining);
+ }
+
+ [Fact]
+ public async Task Scheduler_Should_RecordLastError_When_DispatchFails()
+ {
+ // Arrange - use a dispatch middleware that always throws AFTER the scheduling middleware.
+ // During initial publish the scheduling middleware intercepts (never calling next),
+ // so the throwing middleware is inactive. During re-dispatch from the worker the
+ // scheduling middleware is skipped, causing the throwing middleware to fire.
+ var recorder = new MessageRecorder();
+ await using var env = await CreateBusWithSchedulingAsync(recorder, AddFailingDispatchMiddleware);
+
+ using (var scope = env.Provider.CreateScope())
+ {
+ var bus = scope.ServiceProvider.GetRequiredService();
+ await bus.PublishAsync(
+ new TestEvent { Payload = "will-fail" },
+ new PublishOptions { ScheduledTime = TimeProvider.System.GetUtcNow() },
+ default);
+ }
+
+ // Wait for at least one dispatch attempt to record the error
+ using var waitCts = new CancellationTokenSource(s_timeout);
+
+ while (!waitCts.Token.IsCancellationRequested)
+ {
+ await Task.Delay(250, waitCts.Token);
+
+ using var scope = env.Provider.CreateScope();
+ var db = scope.ServiceProvider.GetRequiredService();
+ var rows = await db.Database
+ .SqlQueryRaw(
+ "SELECT \"last_error\"::text AS \"Value\" FROM \"scheduled_messages\" LIMIT 1")
+ .ToListAsync(waitCts.Token);
+
+ if (rows.Count > 0 && rows[0] is not null)
+ {
+ // Assert - parse the JSON error
+ using var doc = JsonDocument.Parse(rows[0]!);
+ Assert.True(
+ doc.RootElement.TryGetProperty("message", out _),
+ "last_error should contain 'message'");
+ Assert.True(
+ doc.RootElement.TryGetProperty("exceptionType", out _),
+ "last_error should contain 'exceptionType'");
+ return;
+ }
+ }
+
+ Assert.Fail("Timed out waiting for last_error to be recorded");
+ }
+
+ [Fact]
+ public async Task Scheduler_Should_IncrementTimesSent_When_DispatchFails()
+ {
+ // Arrange - same failing middleware approach as RecordLastError test
+ var recorder = new MessageRecorder();
+ await using var env = await CreateBusWithSchedulingAsync(recorder, AddFailingDispatchMiddleware);
+
+ using (var scope = env.Provider.CreateScope())
+ {
+ var bus = scope.ServiceProvider.GetRequiredService();
+ await bus.PublishAsync(
+ new TestEvent { Payload = "always-fails" },
+ new PublishOptions { ScheduledTime = TimeProvider.System.GetUtcNow() },
+ default);
+ }
+
+ // Wait for at least 2 dispatch attempts
+ using var waitCts = new CancellationTokenSource(s_timeout);
+
+ while (!waitCts.Token.IsCancellationRequested)
+ {
+ await Task.Delay(250, waitCts.Token);
+
+ using var scope = env.Provider.CreateScope();
+ var db = scope.ServiceProvider.GetRequiredService();
+ var timesSent = await db.Database
+ .SqlQueryRaw(
+ "SELECT CAST(\"times_sent\" AS INTEGER) AS \"Value\" FROM \"scheduled_messages\" LIMIT 1")
+ .FirstOrDefaultAsync(waitCts.Token);
+
+ if (timesSent >= 2)
+ {
+ return;
+ }
+ }
+
+ Assert.Fail("Timed out waiting for times_sent to reach 2");
+ }
+
+ private static void AddFailingDispatchMiddleware(IMessageBusHostBuilder builder)
+ {
+ builder.ConfigureMessageBus(h =>
+ h.UseDispatch(new DispatchMiddlewareConfiguration(
+ static (_, _) => static _ =>
+ throw new InvalidOperationException("Simulated dispatch failure"),
+ "FailingTransport")));
+ }
+
+ [Fact]
+ public async Task Scheduler_Should_ProcessNewMessages_When_PublishedWhileWorkerRunning()
+ {
+ // Arrange
+ var recorder = new MessageRecorder();
+ await using var env = await CreateBusWithSchedulingAsync(recorder);
+
+ // Act - publish messages at intervals while worker is running
+ for (var i = 0; i < 5; i++)
+ {
+ using var scope = env.Provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+ await bus.PublishAsync(
+ new TestEvent { Payload = $"live-{i}" },
+ new PublishOptions { ScheduledTime = TimeProvider.System.GetUtcNow() },
+ default);
+
+ // Wait for this message to be delivered before publishing the next
+ Assert.True(
+ await recorder.WaitAsync(s_timeout),
+ $"Message live-{i} should be delivered while worker is running");
+ }
+
+ // Assert
+ var payloads = recorder.Messages.OfType().Select(e => e.Payload).ToHashSet();
+ Assert.Equal(5, payloads.Count);
+ }
+
+ [Fact]
+ public async Task Scheduler_Should_HandleConcurrentPublishers_When_MultipleScopes()
+ {
+ // Arrange
+ const int scopeCount = 10;
+ const int messagesPerScope = 5;
+ const int totalMessages = scopeCount * messagesPerScope;
+ var recorder = new MessageRecorder();
+ await using var env = await CreateBusWithSchedulingAsync(recorder);
+
+ // Act - multiple scopes publishing simultaneously
+ var tasks = Enumerable
+ .Range(0, scopeCount)
+ .Select(async scopeIndex =>
+ {
+ for (var i = 0; i < messagesPerScope; i++)
+ {
+ using var scope = env.Provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+ await bus.PublishAsync(
+ new TestEvent { Payload = $"scope-{scopeIndex}-msg-{i}" },
+ new PublishOptions { ScheduledTime = TimeProvider.System.GetUtcNow() },
+ default);
+ }
+ });
+ await Task.WhenAll(tasks);
+
+ // Assert
+ Assert.True(
+ await recorder.WaitAsync(s_timeout, totalMessages),
+ $"All {totalMessages} messages from {scopeCount} scopes should be delivered");
+
+ var payloads = recorder.Messages.OfType().Select(e => e.Payload).ToHashSet();
+ Assert.Equal(totalMessages, payloads.Count);
+ }
+
+ private async Task CreateBusWithSchedulingAsync(
+ MessageRecorder recorder,
+ Action? configure = null)
+ {
+ var connectionString = await fixture.CreateDatabaseAsync();
+
+ var services = new ServiceCollection();
+ services.AddSingleton(recorder);
+ services.AddLogging();
+ services.AddDbContext(o => o.UseNpgsql(connectionString)
+ .ConfigureWarnings(w => w.Ignore(CoreEventId.ManyServiceProvidersCreatedWarning)));
+
+ // Register the resilient signal BEFORE UsePostgresScheduling() so that
+ // TryAddSingleton in UseSchedulerCore() is a no-op.
+ // This prevents ObjectDisposedException during teardown when the
+ // dispatcher's own transactions fire the interceptor.
+ services.AddSingleton(new ResilientSchedulerSignal());
+
+ var builder = services.AddMessageBus();
+ builder.AddEntityFramework(ef => ef.UsePostgresScheduling());
+ builder.AddEventHandler();
+ builder.AddInMemory();
+
+ configure?.Invoke(builder);
+
+ var provider = services.BuildServiceProvider();
+ var runtime = (MessagingRuntime)provider.GetRequiredService();
+ await runtime.StartAsync(default);
+
+ // Ensure schema exists
+ using (var scope = provider.CreateScope())
+ {
+ var db = scope.ServiceProvider.GetRequiredService();
+ await db.Database.EnsureCreatedAsync(default);
+ }
+
+ // Start hosted services (scheduling worker)
+ var hostedServices = provider.GetServices().ToList();
+ foreach (var svc in hostedServices)
+ {
+ await svc.StartAsync(default);
+ }
+
+ return new TestEnvironment(provider, hostedServices);
+ }
+
+ public sealed class TestEvent
+ {
+ public required string Payload { get; init; }
+ }
+
+ public sealed class TestEventHandler(MessageRecorder recorder) : IEventHandler
+ {
+ public ValueTask HandleAsync(TestEvent message, CancellationToken cancellationToken)
+ {
+ recorder.Record(message);
+ return default;
+ }
+ }
+
+ public sealed class MessageRecorder
+ {
+ private readonly SemaphoreSlim _semaphore = new(0);
+
+ public ConcurrentBag