diff --git a/src/Mocha/Mocha.slnx b/src/Mocha/Mocha.slnx
index cf3a4327070..a7a7157c4f5 100644
--- a/src/Mocha/Mocha.slnx
+++ b/src/Mocha/Mocha.slnx
@@ -18,10 +18,7 @@
-
-
-
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Mocha.EntityFrameworkCore.Postgres.csproj b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Mocha.EntityFrameworkCore.Postgres.csproj
index 558a513a246..2022bdad08d 100644
--- a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Mocha.EntityFrameworkCore.Postgres.csproj
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Mocha.EntityFrameworkCore.Postgres.csproj
@@ -8,7 +8,6 @@
-
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/EfCoreScheduledMessageStore.cs b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/EfCoreScheduledMessageStore.cs
index b09b5e5d192..d3c0d40a723 100644
--- a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/EfCoreScheduledMessageStore.cs
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/EfCoreScheduledMessageStore.cs
@@ -15,35 +15,32 @@ namespace Mocha.Scheduling;
/// Implements for Postgres by inserting serialized message envelopes
/// into the scheduled messages table using raw SQL through the DbContext Npgsql connection.
///
-internal sealed class EfCoreScheduledMessageStore : IScheduledMessageStore, IDisposable
+///
+/// Creates a new using the provided DbContext connection,
+/// scheduler signal, and pre-built SQL statements.
+///
+/// The DbContext whose underlying Npgsql connection is used for operations.
+/// The signal used to wake the scheduler after a message is persisted.
+/// The parameterized SQL insert statement for the scheduled messages table.
+/// The parameterized SQL delete statement for cancelling a scheduled message.
+internal sealed class EfCoreScheduledMessageStore(
+ DbContext originalDbContext,
+ ISchedulerSignal signal,
+ string insertSql,
+ string cancelSql) : IScheduledMessageStore, IDisposable
{
- private readonly DbContext _originalDbContext;
- private readonly ISchedulerSignal _signal;
+ private const string ProviderPrefix = "postgres-scheduler:";
private readonly SemaphoreSlim _semaphore = new(1, 1);
- private readonly string? _insertSql;
private PooledArrayWriter? _arrayWriter;
- ///
- /// Creates a new using the provided DbContext connection,
- /// scheduler signal, and pre-built insert SQL.
- ///
- /// The DbContext whose underlying Npgsql connection is used for inserts.
- /// The signal used to wake the scheduler after a message is persisted.
- /// The parameterized SQL insert statement for the scheduled messages table.
- public EfCoreScheduledMessageStore(DbContext originalDbContext, ISchedulerSignal signal, string insertSql)
- {
- _originalDbContext = originalDbContext;
- _signal = signal;
- _insertSql = insertSql;
- }
-
///
/// Serializes the message envelope and inserts it into the Postgres scheduled messages table.
///
/// The message envelope to persist.
/// The time at which the message should be dispatched.
/// A token to observe for cancellation.
- public async ValueTask PersistAsync(
+ /// An opaque token string for later cancellation.
+ public async ValueTask PersistAsync(
MessageEnvelope envelope,
DateTimeOffset scheduledTime,
CancellationToken cancellationToken)
@@ -54,27 +51,28 @@ public async ValueTask PersistAsync(
{
_arrayWriter ??= new PooledArrayWriter();
- var connection = (NpgsqlConnection)_originalDbContext.Database.GetDbConnection();
+ var connection = (NpgsqlConnection)originalDbContext.Database.GetDbConnection();
if (connection.State != ConnectionState.Open)
{
await connection.OpenAsync(cancellationToken);
}
- var transaction = _originalDbContext.Database.CurrentTransaction?.GetDbTransaction() as NpgsqlTransaction;
+ var transaction = originalDbContext.Database.CurrentTransaction?.GetDbTransaction() as NpgsqlTransaction;
await using var writer = new Utf8JsonWriter(_arrayWriter);
writer.WriteEnvelope(envelope);
writer.Flush(); // we know it's not async
// Execute the INSERT command
+ var id = NewVersion();
await using var command = connection.CreateCommand();
- command.CommandText = _insertSql;
+ command.CommandText = insertSql;
if (transaction is not null)
{
command.Transaction = transaction;
}
- command.Parameters.AddWithValue("@id", NewVersion());
+ command.Parameters.AddWithValue("@id", id);
command.Parameters.Add(
new NpgsqlParameter("@envelope", NpgsqlDbType.Json) { Value = _arrayWriter.WrittenMemory });
command.Parameters.AddWithValue("@scheduled_time", scheduledTime.UtcDateTime);
@@ -84,8 +82,10 @@ public async ValueTask PersistAsync(
if (transaction is null)
{
- _signal.Notify(scheduledTime);
+ signal.Notify(scheduledTime);
}
+
+ return $"{ProviderPrefix}{id}";
}
finally
{
@@ -94,6 +94,54 @@ public async ValueTask PersistAsync(
}
}
+ ///
+ /// Cancels a scheduled message by deleting it from the store.
+ ///
+ /// The opaque scheduling token returned by .
+ /// A token to observe for cancellation.
+ /// true if the message was cancelled; false if not found or already dispatched.
+ public async ValueTask CancelAsync(string token, CancellationToken cancellationToken)
+ {
+ var value = token.StartsWith(ProviderPrefix, StringComparison.Ordinal)
+ ? token[ProviderPrefix.Length..]
+ : token;
+
+ if (!Guid.TryParse(value, out var id))
+ {
+ return false;
+ }
+
+ await _semaphore.WaitAsync(cancellationToken);
+
+ try
+ {
+ var connection = (NpgsqlConnection)originalDbContext.Database.GetDbConnection();
+
+ if (connection.State != ConnectionState.Open)
+ {
+ await connection.OpenAsync(cancellationToken);
+ }
+
+ var transaction = originalDbContext.Database.CurrentTransaction?.GetDbTransaction() as NpgsqlTransaction;
+
+ await using var command = connection.CreateCommand();
+ command.CommandText = cancelSql;
+ if (transaction is not null)
+ {
+ command.Transaction = transaction;
+ }
+ command.Parameters.AddWithValue("@id", id);
+ await command.PrepareAsync(cancellationToken);
+
+ var result = await command.ExecuteScalarAsync(cancellationToken);
+ return result is not null and not DBNull;
+ }
+ finally
+ {
+ _semaphore.Release();
+ }
+ }
+
private static Guid NewVersion()
{
#if NET9_0_OR_GREATER
@@ -127,8 +175,9 @@ public static EfCoreScheduledMessageStore Create(Type contextType, string option
var optionsMonitor = services.GetRequiredService>();
var options = optionsMonitor.Get(optionsName);
var insertSql = options.Queries.InsertMessage;
+ var cancelSql = options.Queries.CancelMessage;
- return new EfCoreScheduledMessageStore(dbContext, signal, insertSql);
+ return new EfCoreScheduledMessageStore(dbContext, signal, insertSql, cancelSql);
}
}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessageQueries.cs b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessageQueries.cs
index 7a975019b2b..2e707bd70bd 100644
--- a/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessageQueries.cs
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore.Postgres/Scheduling/ScheduledMessageQueries.cs
@@ -34,6 +34,12 @@ internal sealed class ScheduledMessageQueries
///
public string UpdateLastError { get; set; } = null!;
+ ///
+ /// Gets or sets the SQL statement to cancel a scheduled message by deleting it if it has not
+ /// exceeded its maximum delivery attempts.
+ ///
+ public string CancelMessage { get; set; } = null!;
+
///
/// Creates a new instance with SQL queries built from the provided table metadata.
///
@@ -82,6 +88,13 @@ DELETE FROM {t.QualifiedTableName}
UPDATE {t.QualifiedTableName}
SET "{t.LastError}" = @last_error::jsonb
WHERE "{t.Id}" = @id;
+ """,
+
+ CancelMessage = $"""
+ DELETE FROM {t.QualifiedTableName}
+ WHERE "{t.Id}" = @id
+ AND "{t.TimesSent}" < "{t.MaxAttempts}"
+ RETURNING "{t.Id}";
"""
};
}
diff --git a/src/Mocha/src/Mocha.EntityFrameworkCore/Mocha.EntityFrameworkCore.csproj b/src/Mocha/src/Mocha.EntityFrameworkCore/Mocha.EntityFrameworkCore.csproj
index 54d0c229cf2..6e1118ae5b5 100644
--- a/src/Mocha/src/Mocha.EntityFrameworkCore/Mocha.EntityFrameworkCore.csproj
+++ b/src/Mocha/src/Mocha.EntityFrameworkCore/Mocha.EntityFrameworkCore.csproj
@@ -7,7 +7,6 @@
-
diff --git a/src/Mocha/src/Mocha.Scheduling/Assembly.cs b/src/Mocha/src/Mocha.Scheduling/Assembly.cs
deleted file mode 100644
index e830a08b3a0..00000000000
--- a/src/Mocha/src/Mocha.Scheduling/Assembly.cs
+++ /dev/null
@@ -1,3 +0,0 @@
-using System.Runtime.CompilerServices;
-
-[assembly: InternalsVisibleTo("Mocha.Tests")]
diff --git a/src/Mocha/src/Mocha.Scheduling/Mocha.Scheduling.csproj b/src/Mocha/src/Mocha.Scheduling/Mocha.Scheduling.csproj
deleted file mode 100644
index ec3dc85c831..00000000000
--- a/src/Mocha/src/Mocha.Scheduling/Mocha.Scheduling.csproj
+++ /dev/null
@@ -1,10 +0,0 @@
-
-
- Mocha.Scheduling
- Mocha.Scheduling
-
-
-
-
-
-
diff --git a/src/Mocha/src/Mocha.Transport.Postgres/Mocha.Transport.Postgres.csproj b/src/Mocha/src/Mocha.Transport.Postgres/Mocha.Transport.Postgres.csproj
index e9a1153a4f4..a2cb0472642 100644
--- a/src/Mocha/src/Mocha.Transport.Postgres/Mocha.Transport.Postgres.csproj
+++ b/src/Mocha/src/Mocha.Transport.Postgres/Mocha.Transport.Postgres.csproj
@@ -6,7 +6,6 @@
-
diff --git a/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessagingTransport.cs b/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessagingTransport.cs
index 224f0899bb8..434a2e3cf1d 100644
--- a/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessagingTransport.cs
+++ b/src/Mocha/src/Mocha.Transport.Postgres/PostgresMessagingTransport.cs
@@ -114,7 +114,7 @@ protected override void OnAfterInitialized(IMessagingSetupContext context)
_topology.AddSubscription(subscription);
}
- Features.GetOrSet().SupportsSchedulingNatively = true;
+ Features.Configure(f => f.SupportsSchedulingNatively = true);
}
///
diff --git a/src/Mocha/src/Mocha.Utilities/Features/FeatureCollectionExtensions.cs b/src/Mocha/src/Mocha.Utilities/Features/FeatureCollectionExtensions.cs
index 288e4442928..f40bbcb9f70 100644
--- a/src/Mocha/src/Mocha.Utilities/Features/FeatureCollectionExtensions.cs
+++ b/src/Mocha/src/Mocha.Utilities/Features/FeatureCollectionExtensions.cs
@@ -83,6 +83,24 @@ public static TFeature GetOrSet(
return feature;
}
+ ///
+ /// Gets or creates a feature and applies a configuration action to it.
+ ///
+ /// The feature key.
+ /// The feature collection.
+ /// The action to apply to the feature.
+ /// The configured feature.
+ public static TFeature Configure(
+ this IFeatureCollection featureCollection,
+ Action configure) where TFeature : new()
+ {
+ ArgumentNullException.ThrowIfNull(configure);
+
+ var feature = featureCollection.GetOrSet();
+ configure(feature);
+ return feature;
+ }
+
///
/// Updates a feature in the collection by applying a transformation function to the existing value.
///
diff --git a/src/Mocha/src/Mocha/Features/ScheduledMessageFeature.cs b/src/Mocha/src/Mocha/Features/ScheduledMessageFeature.cs
new file mode 100644
index 00000000000..e312b2efdb2
--- /dev/null
+++ b/src/Mocha/src/Mocha/Features/ScheduledMessageFeature.cs
@@ -0,0 +1,27 @@
+using Mocha.Features;
+
+namespace Mocha;
+
+///
+/// A pooled feature that carries the scheduling token assigned by the scheduled message store
+/// back to the message bus after pipeline execution.
+///
+public sealed class ScheduledMessageFeature : IPooledFeature
+{
+ ///
+ /// Gets or sets the opaque scheduling token returned by the store after persistence.
+ ///
+ public string? Token { get; set; }
+
+ ///
+ public void Initialize(object state)
+ {
+ Token = null;
+ }
+
+ ///
+ public void Reset()
+ {
+ Token = null;
+ }
+}
diff --git a/src/Mocha/src/Mocha/IMessageBus.cs b/src/Mocha/src/Mocha/IMessageBus.cs
index b1ee5d48472..c74207a6611 100644
--- a/src/Mocha/src/Mocha/IMessageBus.cs
+++ b/src/Mocha/src/Mocha/IMessageBus.cs
@@ -94,4 +94,69 @@ ValueTask RequestAsync(
/// A task that completes when the reply has been handed off to the transport.
ValueTask ReplyAsync(TResponse response, ReplyOptions options, CancellationToken cancellationToken)
where TResponse : notnull;
+
+ ///
+ /// Publishes a message scheduled for delivery at the specified time.
+ ///
+ /// The type of the message to publish.
+ /// The message instance to publish.
+ /// The absolute time at which the message should be delivered.
+ /// A token to cancel the publish operation.
+ /// A scheduling result containing the cancellation token and metadata.
+ ValueTask SchedulePublishAsync(
+ T message,
+ DateTimeOffset scheduledTime,
+ CancellationToken cancellationToken) where T : notnull;
+
+ ///
+ /// Publishes a message scheduled for delivery at the specified time with additional options.
+ ///
+ /// The type of the message to publish.
+ /// The message instance to publish.
+ /// The absolute time at which the message should be delivered.
+ /// Options controlling publish behavior such as headers and expiration.
+ /// A token to cancel the publish operation.
+ /// A scheduling result containing the cancellation token and metadata.
+ ValueTask SchedulePublishAsync(
+ T message,
+ DateTimeOffset scheduledTime,
+ PublishOptions options,
+ CancellationToken cancellationToken) where T : notnull;
+
+ ///
+ /// Sends a message scheduled for delivery at the specified time.
+ ///
+ /// The message instance to send.
+ /// The absolute time at which the message should be delivered.
+ /// A token to cancel the send operation.
+ /// A scheduling result containing the cancellation token and metadata.
+ ValueTask ScheduleSendAsync(
+ object message,
+ DateTimeOffset scheduledTime,
+ CancellationToken cancellationToken);
+
+ ///
+ /// Sends a message scheduled for delivery at the specified time with additional options.
+ ///
+ /// The message instance to send.
+ /// The absolute time at which the message should be delivered.
+ /// Options controlling send behavior such as headers and expiration.
+ /// A token to cancel the send operation.
+ /// A scheduling result containing the cancellation token and metadata.
+ ValueTask ScheduleSendAsync(
+ object message,
+ DateTimeOffset scheduledTime,
+ SendOptions options,
+ CancellationToken cancellationToken);
+
+ ///
+ /// Cancels a previously scheduled message. Returns true if the message was cancelled,
+ /// false if it was already dispatched, already cancelled, or not found.
+ ///
+ /// The opaque scheduling token returned by a prior schedule operation.
+ /// A token to cancel the cancellation operation.
+ /// true if the scheduled message was cancelled; otherwise false.
+ ValueTask CancelScheduledMessageAsync(
+ string token,
+ CancellationToken cancellationToken);
}
diff --git a/src/Mocha/src/Mocha/MessageBusSchedulingExtensions.cs b/src/Mocha/src/Mocha/MessageBusSchedulingExtensions.cs
deleted file mode 100644
index 1d4f8ef2b29..00000000000
--- a/src/Mocha/src/Mocha/MessageBusSchedulingExtensions.cs
+++ /dev/null
@@ -1,39 +0,0 @@
-namespace Mocha;
-
-///
-/// Provides convenience extension methods on for scheduling messages.
-///
-public static class MessageBusSchedulingExtensions
-{
- ///
- /// Sends a message scheduled for delivery at the specified absolute time.
- ///
- /// The message bus to send through.
- /// The message instance to send.
- /// The absolute time at which the message should be delivered.
- /// A token to cancel the send operation.
- /// A task that completes when the message has been handed off to the dispatch pipeline.
- public static ValueTask ScheduleSendAsync(
- this IMessageBus bus,
- object message,
- DateTimeOffset scheduledTime,
- CancellationToken cancellationToken = default)
- => bus.SendAsync(message, new SendOptions { ScheduledTime = scheduledTime }, cancellationToken);
-
- ///
- /// Publishes a message scheduled for delivery at the specified absolute time.
- ///
- /// The type of the message to publish.
- /// The message bus to publish through.
- /// The message instance to publish.
- /// The absolute time at which the message should be delivered.
- /// A token to cancel the publish operation.
- /// A task that completes when the message has been handed off to the dispatch pipeline.
- public static ValueTask SchedulePublishAsync(
- this IMessageBus bus,
- T message,
- DateTimeOffset scheduledTime,
- CancellationToken cancellationToken = default)
- where T : notnull
- => bus.PublishAsync(message, new PublishOptions { ScheduledTime = scheduledTime }, cancellationToken);
-}
diff --git a/src/Mocha/src/Mocha/Middlewares/DefaultMessageBus.cs b/src/Mocha/src/Mocha/Middlewares/DefaultMessageBus.cs
index d83dc989013..c3996080adb 100644
--- a/src/Mocha/src/Mocha/Middlewares/DefaultMessageBus.cs
+++ b/src/Mocha/src/Mocha/Middlewares/DefaultMessageBus.cs
@@ -1,6 +1,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.ObjectPool;
using Mocha.Middlewares;
+using Mocha.Scheduling;
namespace Mocha;
@@ -21,8 +22,7 @@ public sealed class DefaultMessageBus(
IMessagingRuntime runtime,
IServiceProvider services,
IMessagingPools pools,
- ConsumeContextAccessor consumeContextAccessor)
- : IMessageBus
+ ConsumeContextAccessor consumeContextAccessor) : IMessageBus
{
private readonly ObjectPool _contextPool = pools.DispatchContext;
@@ -294,6 +294,139 @@ private async ValueTask RequestAndWaitAsync(
throw ThrowHelper.UnexpectedResponseType();
}
+ ///
+ /// Publishes a message scheduled for delivery at the specified time using default options.
+ ///
+ public async ValueTask SchedulePublishAsync(
+ T message,
+ DateTimeOffset scheduledTime,
+ CancellationToken cancellationToken)
+ where T : notnull
+ {
+ return await SchedulePublishAsync(message, scheduledTime, PublishOptions.Default, cancellationToken);
+ }
+
+ ///
+ /// Publishes a message scheduled for delivery at the specified time with additional options.
+ ///
+ public async ValueTask SchedulePublishAsync(
+ T message,
+ DateTimeOffset scheduledTime,
+ PublishOptions options,
+ CancellationToken cancellationToken)
+ where T : notnull
+ {
+ var messageType = runtime.GetMessageType(message!.GetType());
+ var endpoint = runtime.GetPublishEndpoint(messageType);
+
+ var context = _contextPool.Get();
+ try
+ {
+ PropagateCorrelationIds(context);
+ context.Initialize(services, endpoint, runtime, messageType, cancellationToken);
+ context.Message = message;
+ context.AddHeaders(options.Headers);
+ context.Headers.SetMessageKind(MessageKind.Publish);
+ context.ScheduledTime = scheduledTime;
+ context.DeliverBy = options.ExpirationTime;
+
+ await endpoint.ExecuteAsync(context);
+
+ var feature = context.Features.Get();
+
+ return new SchedulingResult
+ {
+ Token = feature?.Token,
+ ScheduledTime = scheduledTime,
+ IsCancellable = feature?.Token is not null
+ };
+ }
+ finally
+ {
+ _contextPool.Return(context);
+ }
+ }
+
+ ///
+ /// Sends a message scheduled for delivery at the specified time using default options.
+ ///
+ public async ValueTask ScheduleSendAsync(
+ object message,
+ DateTimeOffset scheduledTime,
+ CancellationToken cancellationToken)
+ {
+ return await ScheduleSendAsync(message, scheduledTime, SendOptions.Default, cancellationToken);
+ }
+
+ ///
+ /// Sends a message scheduled for delivery at the specified time with additional options.
+ ///
+ public async ValueTask ScheduleSendAsync(
+ object message,
+ DateTimeOffset scheduledTime,
+ SendOptions options,
+ CancellationToken cancellationToken)
+ {
+ var messageType = runtime.GetMessageType(message.GetType());
+ var endpoint = options.Endpoint is { } address
+ ? runtime.GetDispatchEndpoint(address)
+ : runtime.GetSendEndpoint(messageType);
+
+ var replyEndpoint = options.ReplyEndpoint;
+ var faultEndpoint = options.FaultEndpoint;
+ var headers = options.Headers;
+
+ var context = _contextPool.Get();
+ try
+ {
+ PropagateCorrelationIds(context);
+ context.Initialize(services, endpoint, runtime, messageType, cancellationToken);
+
+ context.Message = message;
+ context.AddHeaders(headers);
+ context.Headers.SetMessageKind(MessageKind.Send);
+ context.ResponseAddress = replyEndpoint;
+ context.FaultAddress = faultEndpoint;
+ context.ScheduledTime = scheduledTime;
+ context.DeliverBy = options.ExpirationTime;
+
+ await endpoint.ExecuteAsync(context);
+
+ var feature = context.Features.Get();
+
+ return new SchedulingResult
+ {
+ Token = feature?.Token,
+ ScheduledTime = scheduledTime,
+ IsCancellable = feature?.Token is not null
+ };
+ }
+ finally
+ {
+ _contextPool.Return(context);
+ }
+ }
+
+ ///
+ /// Cancels a previously scheduled message by forwarding the opaque token
+ /// to the registered scheduling store.
+ ///
+ public async ValueTask CancelScheduledMessageAsync(string token, CancellationToken cancellationToken)
+ {
+ if (string.IsNullOrEmpty(token))
+ {
+ return false;
+ }
+
+ var store = services.GetService();
+ if (store is null)
+ {
+ return false;
+ }
+
+ return await store.CancelAsync(token, cancellationToken);
+ }
+
private void PropagateCorrelationIds(DispatchContext context)
{
if (consumeContextAccessor.Context is { } ambient)
diff --git a/src/Mocha/src/Mocha.Scheduling/DispatchSchedulingMiddleware.cs b/src/Mocha/src/Mocha/Scheduling/DispatchSchedulingMiddleware.cs
similarity index 94%
rename from src/Mocha/src/Mocha.Scheduling/DispatchSchedulingMiddleware.cs
rename to src/Mocha/src/Mocha/Scheduling/DispatchSchedulingMiddleware.cs
index 10c2b2893d1..d7a899ed346 100644
--- a/src/Mocha/src/Mocha.Scheduling/DispatchSchedulingMiddleware.cs
+++ b/src/Mocha/src/Mocha/Scheduling/DispatchSchedulingMiddleware.cs
@@ -46,7 +46,9 @@ public async ValueTask InvokeAsync(IDispatchContext context, DispatchDelegate ne
}
var store = context.Services.GetRequiredService();
- await store.PersistAsync(context.Envelope, scheduledTime, context.CancellationToken);
+ var token = await store.PersistAsync(context.Envelope, scheduledTime, context.CancellationToken);
+
+ context.Features.Configure(f => f.Token = token);
}
///
diff --git a/src/Mocha/src/Mocha.Scheduling/IScheduledMessageStore.cs b/src/Mocha/src/Mocha/Scheduling/IScheduledMessageStore.cs
similarity index 57%
rename from src/Mocha/src/Mocha.Scheduling/IScheduledMessageStore.cs
rename to src/Mocha/src/Mocha/Scheduling/IScheduledMessageStore.cs
index 55392c8879b..76496915124 100644
--- a/src/Mocha/src/Mocha.Scheduling/IScheduledMessageStore.cs
+++ b/src/Mocha/src/Mocha/Scheduling/IScheduledMessageStore.cs
@@ -17,9 +17,20 @@ public interface IScheduledMessageStore
/// The message envelope to persist, containing headers and payload.
/// The time at which the message should be dispatched.
/// A token to cancel the persistence operation.
- /// A value task that completes when the envelope has been durably stored.
- ValueTask PersistAsync(
+ ///
+ /// An opaque token string in the format "provider:value" that can be used to cancel
+ /// the scheduled message.
+ ///
+ ValueTask PersistAsync(
MessageEnvelope envelope,
DateTimeOffset scheduledTime,
CancellationToken cancellationToken);
+
+ ///
+ /// Cancels a scheduled message using the opaque token returned by .
+ ///
+ /// The opaque scheduling token returned by a prior call.
+ /// A token to cancel the operation.
+ /// true if the message was cancelled; false if not found or already dispatched.
+ ValueTask CancelAsync(string token, CancellationToken cancellationToken);
}
diff --git a/src/Mocha/src/Mocha.Scheduling/ISchedulerSignal.cs b/src/Mocha/src/Mocha/Scheduling/ISchedulerSignal.cs
similarity index 100%
rename from src/Mocha/src/Mocha.Scheduling/ISchedulerSignal.cs
rename to src/Mocha/src/Mocha/Scheduling/ISchedulerSignal.cs
diff --git a/src/Mocha/src/Mocha.Scheduling/MessageBusSchedulerSignal.cs b/src/Mocha/src/Mocha/Scheduling/MessageBusSchedulerSignal.cs
similarity index 100%
rename from src/Mocha/src/Mocha.Scheduling/MessageBusSchedulerSignal.cs
rename to src/Mocha/src/Mocha/Scheduling/MessageBusSchedulerSignal.cs
diff --git a/src/Mocha/src/Mocha.Scheduling/SchedulerCoreServiceCollectionExtensions.cs b/src/Mocha/src/Mocha/Scheduling/SchedulerCoreServiceCollectionExtensions.cs
similarity index 99%
rename from src/Mocha/src/Mocha.Scheduling/SchedulerCoreServiceCollectionExtensions.cs
rename to src/Mocha/src/Mocha/Scheduling/SchedulerCoreServiceCollectionExtensions.cs
index 76d81d1a91e..c31d22082a4 100644
--- a/src/Mocha/src/Mocha.Scheduling/SchedulerCoreServiceCollectionExtensions.cs
+++ b/src/Mocha/src/Mocha/Scheduling/SchedulerCoreServiceCollectionExtensions.cs
@@ -23,6 +23,7 @@ public static IMessageBusHostBuilder UseSchedulerCore(this IMessageBusHostBuilde
{
builder.Services.TryAddSingleton(sp =>
new MessageBusSchedulerSignal(sp.GetService() ?? TimeProvider.System));
+
builder.ConfigureMessageBus(x => x.UseDispatch(DispatchSchedulingMiddleware.Create(), after: "Serialization"));
return builder;
diff --git a/src/Mocha/src/Mocha.Scheduling/SchedulingDispatchContextExtensions.cs b/src/Mocha/src/Mocha/Scheduling/SchedulingDispatchContextExtensions.cs
similarity index 82%
rename from src/Mocha/src/Mocha.Scheduling/SchedulingDispatchContextExtensions.cs
rename to src/Mocha/src/Mocha/Scheduling/SchedulingDispatchContextExtensions.cs
index f997d0f0b4c..91e10a8b0d1 100644
--- a/src/Mocha/src/Mocha.Scheduling/SchedulingDispatchContextExtensions.cs
+++ b/src/Mocha/src/Mocha/Scheduling/SchedulingDispatchContextExtensions.cs
@@ -15,8 +15,6 @@ public static class SchedulingDispatchContextExtensions
/// The dispatch context to modify.
public static void SkipScheduler(this IDispatchContext context)
{
- var feature = context.Features.GetOrSet();
-
- feature.SkipScheduler = true;
+ context.Features.Configure(feature => feature.SkipScheduler = true);
}
}
diff --git a/src/Mocha/src/Mocha.Scheduling/SchedulingMiddlewareFeature.cs b/src/Mocha/src/Mocha/Scheduling/SchedulingMiddlewareFeature.cs
similarity index 100%
rename from src/Mocha/src/Mocha.Scheduling/SchedulingMiddlewareFeature.cs
rename to src/Mocha/src/Mocha/Scheduling/SchedulingMiddlewareFeature.cs
diff --git a/src/Mocha/src/Mocha.Scheduling/SchedulingTransportFeature.cs b/src/Mocha/src/Mocha/Scheduling/SchedulingTransportFeature.cs
similarity index 100%
rename from src/Mocha/src/Mocha.Scheduling/SchedulingTransportFeature.cs
rename to src/Mocha/src/Mocha/Scheduling/SchedulingTransportFeature.cs
diff --git a/src/Mocha/src/Mocha/SchedulingResult.cs b/src/Mocha/src/Mocha/SchedulingResult.cs
new file mode 100644
index 00000000000..823a2fce2bd
--- /dev/null
+++ b/src/Mocha/src/Mocha/SchedulingResult.cs
@@ -0,0 +1,24 @@
+namespace Mocha;
+
+///
+/// Represents the result of a scheduling operation, containing the opaque cancellation token
+/// and metadata about the scheduled message.
+///
+public sealed record SchedulingResult
+{
+ ///
+ /// Gets the opaque token for cancelling this message, or null if the scheduling
+ /// path does not support cancellation.
+ ///
+ public string? Token { get; init; }
+
+ ///
+ /// Gets the time at which the message is scheduled for delivery.
+ ///
+ public DateTimeOffset ScheduledTime { get; init; }
+
+ ///
+ /// Gets a value indicating whether the scheduled message can be cancelled via the token.
+ ///
+ public bool IsCancellable { get; init; }
+}
diff --git a/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/Mocha.EntityFrameworkCore.Postgres.Tests.csproj b/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/Mocha.EntityFrameworkCore.Postgres.Tests.csproj
index 47af3cd4731..1db0c7b2bef 100644
--- a/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/Mocha.EntityFrameworkCore.Postgres.Tests.csproj
+++ b/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/Mocha.EntityFrameworkCore.Postgres.Tests.csproj
@@ -16,6 +16,5 @@
-
diff --git a/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/PostgresSchedulingIntegrationTests.cs b/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/PostgresSchedulingIntegrationTests.cs
index 8c9e6a8f1c3..c326790906e 100644
--- a/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/PostgresSchedulingIntegrationTests.cs
+++ b/src/Mocha/test/Mocha.EntityFrameworkCore.Postgres.Tests/PostgresSchedulingIntegrationTests.cs
@@ -263,6 +263,158 @@ await bus.PublishAsync(
Assert.Fail("Timed out waiting for times_sent to reach 2");
}
+ [Fact]
+ public async Task SchedulePublishAsync_Should_ReturnCancellableResult_When_PostgresSchedulingConfigured()
+ {
+ // Arrange
+ var recorder = new MessageRecorder();
+ await using var env = await CreateBusWithSchedulingAsync(recorder);
+
+ using var scope = env.Provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+ var scheduledTime = DateTimeOffset.UtcNow.AddMinutes(10);
+
+ // Act
+ var result = await bus.SchedulePublishAsync(
+ new TestEvent { Payload = "cancellable" },
+ scheduledTime,
+ default);
+
+ // Assert
+ Assert.True(result.IsCancellable);
+ Assert.NotNull(result.Token);
+ Assert.StartsWith("postgres-scheduler:", result.Token);
+ Assert.Equal(scheduledTime, result.ScheduledTime);
+
+ // Verify row exists in the database
+ using var verifyScope = env.Provider.CreateScope();
+ var db = verifyScope.ServiceProvider.GetRequiredService();
+ var count = await db.Database
+ .SqlQueryRaw(
+ "SELECT CAST(COUNT(*) AS INTEGER) AS \"Value\" FROM \"scheduled_messages\"")
+ .SingleAsync();
+ Assert.Equal(1, count);
+ }
+
+ [Fact]
+ public async Task CancelScheduledMessageAsync_Should_DeleteRow_When_ValidToken()
+ {
+ // Arrange
+ var recorder = new MessageRecorder();
+ await using var env = await CreateBusWithSchedulingAsync(recorder);
+
+ using var scope = env.Provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+ var scheduledTime = DateTimeOffset.UtcNow.AddMinutes(10);
+
+ var result = await bus.SchedulePublishAsync(
+ new TestEvent { Payload = "to-cancel" },
+ scheduledTime,
+ default);
+
+ // Act
+ var cancelled = await bus.CancelScheduledMessageAsync(result.Token!, default);
+
+ // Assert
+ Assert.True(cancelled);
+
+ using var verifyScope = env.Provider.CreateScope();
+ var db = verifyScope.ServiceProvider.GetRequiredService();
+ var remaining = await db.Database
+ .SqlQueryRaw(
+ "SELECT CAST(COUNT(*) AS INTEGER) AS \"Value\" FROM \"scheduled_messages\"")
+ .SingleAsync();
+ Assert.Equal(0, remaining);
+ }
+
+ [Fact]
+ public async Task CancelScheduledMessageAsync_Should_ReturnFalse_When_AlreadyDispatched()
+ {
+ // Arrange
+ var recorder = new MessageRecorder();
+ await using var env = await CreateBusWithSchedulingAsync(recorder);
+
+ using var scope = env.Provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+
+ var result = await bus.SchedulePublishAsync(
+ new TestEvent { Payload = "dispatch-then-cancel" },
+ DateTimeOffset.UtcNow,
+ default);
+
+ // Wait for handler to receive the message
+ Assert.True(await recorder.WaitAsync(s_timeout), "Handler should have received the scheduled message");
+
+ // Wait for row to be deleted after dispatch
+ using var waitCts = new CancellationTokenSource(s_timeout);
+
+ while (!waitCts.Token.IsCancellationRequested)
+ {
+ await Task.Delay(250, waitCts.Token);
+
+ using var verifyScope = env.Provider.CreateScope();
+ var db = verifyScope.ServiceProvider.GetRequiredService();
+ var count = await db.Database
+ .SqlQueryRaw(
+ "SELECT CAST(COUNT(*) AS INTEGER) AS \"Value\" FROM \"scheduled_messages\"")
+ .SingleAsync(waitCts.Token);
+
+ if (count == 0)
+ {
+ break;
+ }
+ }
+
+ // Act
+ var cancelled = await bus.CancelScheduledMessageAsync(result.Token!, default);
+
+ // Assert
+ Assert.False(cancelled);
+ }
+
+ [Fact]
+ public async Task CancelScheduledMessageAsync_Should_ReturnFalse_When_AlreadyCancelled()
+ {
+ // Arrange
+ var recorder = new MessageRecorder();
+ await using var env = await CreateBusWithSchedulingAsync(recorder);
+
+ using var scope = env.Provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+ var scheduledTime = DateTimeOffset.UtcNow.AddMinutes(10);
+
+ var result = await bus.SchedulePublishAsync(
+ new TestEvent { Payload = "double-cancel" },
+ scheduledTime,
+ default);
+
+ // Act
+ var firstCancel = await bus.CancelScheduledMessageAsync(result.Token!, default);
+ var secondCancel = await bus.CancelScheduledMessageAsync(result.Token!, default);
+
+ // Assert
+ Assert.True(firstCancel);
+ Assert.False(secondCancel);
+ }
+
+ [Fact]
+ public async Task CancelScheduledMessageAsync_Should_ReturnFalse_When_InvalidToken()
+ {
+ // Arrange
+ var recorder = new MessageRecorder();
+ await using var env = await CreateBusWithSchedulingAsync(recorder);
+
+ using var scope = env.Provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+ var invalidToken = $"postgres-scheduler:{Guid.NewGuid()}";
+
+ // Act
+ var cancelled = await bus.CancelScheduledMessageAsync(invalidToken, default);
+
+ // Assert
+ Assert.False(cancelled);
+ }
+
private static void AddFailingDispatchMiddleware(IMessageBusHostBuilder builder)
{
builder.ConfigureMessageBus(h =>
diff --git a/src/Mocha/test/Mocha.Sagas.TestHelpers/SagaTester.cs b/src/Mocha/test/Mocha.Sagas.TestHelpers/SagaTester.cs
index be7ff5528d7..ed4408f1f8d 100644
--- a/src/Mocha/test/Mocha.Sagas.TestHelpers/SagaTester.cs
+++ b/src/Mocha/test/Mocha.Sagas.TestHelpers/SagaTester.cs
@@ -73,8 +73,8 @@ public async Task ExecuteAsync(object @event)
ResponseAddress = new Uri($"queue://test/{SagaTester.Defaults.ReplyEndpoint}")
};
- context.Features.GetOrSet().Message = @event;
- context.Features.GetOrSet().Store = Store;
+ context.Features.Configure(f => f.Message = @event);
+ context.Features.Configure(f => f.Store = Store);
if (State != null)
{
diff --git a/src/Mocha/test/Mocha.Sagas.TestHelpers/TestMessageBus.cs b/src/Mocha/test/Mocha.Sagas.TestHelpers/TestMessageBus.cs
index d43c82ab736..32da210583c 100644
--- a/src/Mocha/test/Mocha.Sagas.TestHelpers/TestMessageBus.cs
+++ b/src/Mocha/test/Mocha.Sagas.TestHelpers/TestMessageBus.cs
@@ -64,4 +64,51 @@ public ValueTask ReplyAsync(
outbox.Messages.Add(new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Reply, response, options));
return ValueTask.CompletedTask;
}
+
+ public ValueTask SchedulePublishAsync(
+ T message,
+ DateTimeOffset scheduledTime,
+ CancellationToken cancellationToken) where T : notnull
+ {
+ outbox.Messages.Add(
+ new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Publish, message, null));
+ return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime });
+ }
+
+ public ValueTask SchedulePublishAsync(
+ T message,
+ DateTimeOffset scheduledTime,
+ PublishOptions options,
+ CancellationToken cancellationToken) where T : notnull
+ {
+ outbox.Messages.Add(
+ new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Publish, message, options));
+ return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime });
+ }
+
+ public ValueTask ScheduleSendAsync(
+ object message,
+ DateTimeOffset scheduledTime,
+ CancellationToken cancellationToken)
+ {
+ outbox.Messages.Add(
+ new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Send, message, null));
+ return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime });
+ }
+
+ public ValueTask ScheduleSendAsync(
+ object message,
+ DateTimeOffset scheduledTime,
+ SendOptions options,
+ CancellationToken cancellationToken)
+ {
+ outbox.Messages.Add(
+ new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Send, message, options));
+ return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime });
+ }
+
+ public ValueTask CancelScheduledMessageAsync(string token, CancellationToken cancellationToken)
+ {
+ return ValueTask.FromResult(false);
+ }
}
diff --git a/src/Mocha/test/Mocha.Sagas.Tests/SagaSchedulingTests.cs b/src/Mocha/test/Mocha.Sagas.Tests/SagaSchedulingTests.cs
index b79e63f534c..9d5af9693d8 100644
--- a/src/Mocha/test/Mocha.Sagas.Tests/SagaSchedulingTests.cs
+++ b/src/Mocha/test/Mocha.Sagas.Tests/SagaSchedulingTests.cs
@@ -182,8 +182,8 @@ private TestConsumeContext CreateContext(Saga saga, object message)
Runtime = s_runtime
};
- context.Features.GetOrSet().Message = message;
- context.Features.GetOrSet().Store = _store;
+ context.Features.Configure(f => f.Message = message);
+ context.Features.Configure(f => f.Store = _store);
return context;
}
diff --git a/src/Mocha/test/Mocha.Tests/Consumers/Batching/ConsumeContextTests.cs b/src/Mocha/test/Mocha.Tests/Consumers/Batching/ConsumeContextTests.cs
index 0914a9c9f76..6675dd0c38d 100644
--- a/src/Mocha/test/Mocha.Tests/Consumers/Batching/ConsumeContextTests.cs
+++ b/src/Mocha/test/Mocha.Tests/Consumers/Batching/ConsumeContextTests.cs
@@ -146,7 +146,7 @@ public StubConsumeContext()
public void SetMessage(object message)
{
- _features.GetOrSet().Message = message;
+ _features.Configure(f => f.Message = message);
}
public IFeatureCollection Features { get; }
diff --git a/src/Mocha/test/Mocha.Tests/Mocha.Tests.csproj b/src/Mocha/test/Mocha.Tests/Mocha.Tests.csproj
index caa3e151ee5..c9910c68c23 100644
--- a/src/Mocha/test/Mocha.Tests/Mocha.Tests.csproj
+++ b/src/Mocha/test/Mocha.Tests/Mocha.Tests.csproj
@@ -20,7 +20,6 @@
-
diff --git a/src/Mocha/test/Mocha.Tests/Scheduling/CancelScheduledMessageTests.cs b/src/Mocha/test/Mocha.Tests/Scheduling/CancelScheduledMessageTests.cs
new file mode 100644
index 00000000000..17556a44da1
--- /dev/null
+++ b/src/Mocha/test/Mocha.Tests/Scheduling/CancelScheduledMessageTests.cs
@@ -0,0 +1,120 @@
+using Microsoft.Extensions.DependencyInjection;
+using Mocha.Middlewares;
+using Mocha.Scheduling;
+using Mocha.Transport.InMemory;
+
+namespace Mocha.Tests.Scheduling;
+
+public class CancelScheduledMessageTests
+{
+ [Fact]
+ public async Task CancelScheduledMessageAsync_Should_ReturnFalse_When_TokenIsNull()
+ {
+ // arrange
+ await using var provider = await CreateBusAsync();
+ using var scope = provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+
+ // act
+ var result = await bus.CancelScheduledMessageAsync(null!, CancellationToken.None);
+
+ // assert
+ Assert.False(result);
+ }
+
+ [Fact]
+ public async Task CancelScheduledMessageAsync_Should_ReturnFalse_When_TokenIsEmpty()
+ {
+ // arrange
+ await using var provider = await CreateBusAsync();
+ using var scope = provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+
+ // act
+ var result = await bus.CancelScheduledMessageAsync("", CancellationToken.None);
+
+ // assert
+ Assert.False(result);
+ }
+
+ [Fact]
+ public async Task CancelScheduledMessageAsync_Should_ReturnFalse_When_NoStoreRegistered()
+ {
+ // arrange
+ await using var provider = await CreateBusAsync();
+ using var scope = provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+
+ // act
+ var result = await bus.CancelScheduledMessageAsync(
+ "some-provider:some-value",
+ CancellationToken.None);
+
+ // assert
+ Assert.False(result);
+ }
+
+ [Fact]
+ public async Task CancelScheduledMessageAsync_Should_DelegateToStore_When_TokenIsValid()
+ {
+ // arrange
+ var spy = new SpyScheduledMessageStore();
+ await using var provider = await CreateBusAsync(spy);
+ using var scope = provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+
+ // act
+ var result = await bus.CancelScheduledMessageAsync(
+ "test-provider:my-cancel-value",
+ CancellationToken.None);
+
+ // assert
+ Assert.True(result);
+ Assert.Equal("test-provider:my-cancel-value", spy.LastCancelledValue);
+ }
+
+ private static async Task CreateBusAsync(
+ SpyScheduledMessageStore? spyStore = null)
+ {
+ var services = new ServiceCollection();
+ var builder = services.AddMessageBus();
+ builder.AddEventHandler();
+ builder.AddInMemory();
+ builder.UseSchedulerCore();
+
+ if (spyStore is not null)
+ {
+ services.AddScoped(_ => spyStore);
+ }
+
+ var provider = services.BuildServiceProvider();
+ var runtime = (MessagingRuntime)provider.GetRequiredService();
+ await runtime.StartAsync(CancellationToken.None);
+ return provider;
+ }
+
+ private sealed class SpyScheduledMessageStore : IScheduledMessageStore
+ {
+ public string? LastCancelledValue { get; private set; }
+
+ public ValueTask PersistAsync(
+ MessageEnvelope envelope,
+ DateTimeOffset scheduledTime,
+ CancellationToken cancellationToken) =>
+ ValueTask.FromResult("test-provider:test-id");
+
+ public ValueTask CancelAsync(string token, CancellationToken cancellationToken)
+ {
+ LastCancelledValue = token;
+ return ValueTask.FromResult(true);
+ }
+ }
+
+ private sealed class StubEvent;
+
+ private sealed class StubEventHandler : IEventHandler
+ {
+ public ValueTask HandleAsync(StubEvent message, CancellationToken cancellationToken) =>
+ default;
+ }
+}
diff --git a/src/Mocha/test/Mocha.Tests/Scheduling/DispatchSchedulingMiddlewareTests.cs b/src/Mocha/test/Mocha.Tests/Scheduling/DispatchSchedulingMiddlewareTests.cs
index 118a7269111..63442175a4c 100644
--- a/src/Mocha/test/Mocha.Tests/Scheduling/DispatchSchedulingMiddlewareTests.cs
+++ b/src/Mocha/test/Mocha.Tests/Scheduling/DispatchSchedulingMiddlewareTests.cs
@@ -129,7 +129,7 @@ public async Task InvokeAsync_Should_CallNext_When_SkipSchedulerIsTrue()
};
// Set SkipScheduler flag
- context.Features.GetOrSet().SkipScheduler = true;
+ context.Features.Configure(f => f.SkipScheduler = true);
var nextCalled = false;
DispatchDelegate next = _ =>
@@ -281,13 +281,19 @@ private sealed class InMemoryScheduledMessageStore : IScheduledMessageStore
{
public ConcurrentBag<(MessageEnvelope Envelope, DateTimeOffset ScheduledTime)> Entries { get; } = [];
- public ValueTask PersistAsync(
+ public ValueTask PersistAsync(
MessageEnvelope envelope,
DateTimeOffset scheduledTime,
CancellationToken cancellationToken)
{
Entries.Add((envelope, scheduledTime));
- return ValueTask.CompletedTask;
+ var token = $"in-memory:{Guid.NewGuid()}";
+ return ValueTask.FromResult(token);
+ }
+
+ public ValueTask CancelAsync(string token, CancellationToken cancellationToken)
+ {
+ return ValueTask.FromResult(false);
}
}
}
diff --git a/src/Mocha/test/Mocha.Tests/Scheduling/MessageBusSchedulingExtensionsTests.cs b/src/Mocha/test/Mocha.Tests/Scheduling/MessageBusSchedulingExtensionsTests.cs
index d92e308af2d..54f055d4344 100644
--- a/src/Mocha/test/Mocha.Tests/Scheduling/MessageBusSchedulingExtensionsTests.cs
+++ b/src/Mocha/test/Mocha.Tests/Scheduling/MessageBusSchedulingExtensionsTests.cs
@@ -1,11 +1,9 @@
-using Microsoft.Extensions.Time.Testing;
-
namespace Mocha.Tests.Scheduling;
-public class MessageBusSchedulingExtensionsTests
+public class MessageBusSchedulingTests
{
[Fact]
- public async Task ScheduleSendAsync_WithAbsoluteTime_Should_DelegateToSendAsync_When_Called()
+ public async Task ScheduleSendAsync_Should_RecordMessage_When_Called()
{
// arrange
var spy = new SpyMessageBus();
@@ -13,17 +11,18 @@ public async Task ScheduleSendAsync_WithAbsoluteTime_Should_DelegateToSendAsync_
var message = new TestMessage("send-abs");
// act
- await spy.ScheduleSendAsync(message, scheduledTime);
+ var result = await spy.ScheduleSendAsync(message, scheduledTime, CancellationToken.None);
// assert
- Assert.Single(spy.SentMessages);
- var (sentMsg, sentOptions) = spy.SentMessages[0];
+ Assert.Single(spy.ScheduledSendMessages);
+ var (sentMsg, sentTime) = spy.ScheduledSendMessages[0];
Assert.Same(message, sentMsg);
- Assert.Equal(scheduledTime, sentOptions.ScheduledTime);
+ Assert.Equal(scheduledTime, sentTime);
+ Assert.Equal(scheduledTime, result.ScheduledTime);
}
[Fact]
- public async Task SchedulePublishAsync_WithAbsoluteTime_Should_DelegateToPublishAsync_When_Called()
+ public async Task SchedulePublishAsync_Should_RecordMessage_When_Called()
{
// arrange
var spy = new SpyMessageBus();
@@ -31,39 +30,34 @@ public async Task SchedulePublishAsync_WithAbsoluteTime_Should_DelegateToPublish
var message = new TestMessage("pub-abs");
// act
- await spy.SchedulePublishAsync(message, scheduledTime);
+ var result = await spy.SchedulePublishAsync(message, scheduledTime, CancellationToken.None);
// assert
- Assert.Single(spy.PublishedMessages);
- var (pubMsg, pubOptions) = spy.PublishedMessages[0];
+ Assert.Single(spy.ScheduledPublishMessages);
+ var (pubMsg, pubTime) = spy.ScheduledPublishMessages[0];
Assert.Same(message, pubMsg);
- Assert.Equal(scheduledTime, pubOptions.ScheduledTime);
+ Assert.Equal(scheduledTime, pubTime);
+ Assert.Equal(scheduledTime, result.ScheduledTime);
}
private sealed record TestMessage(string Payload);
private sealed class SpyMessageBus : IMessageBus
{
- public List<(object Message, SendOptions Options)> SentMessages { get; } = [];
- public List<(object Message, PublishOptions Options)> PublishedMessages { get; } = [];
+ public List<(object Message, DateTimeOffset ScheduledTime)> ScheduledSendMessages { get; } = [];
+ public List<(object Message, DateTimeOffset ScheduledTime)> ScheduledPublishMessages { get; } = [];
public ValueTask SendAsync(object message, CancellationToken cancellationToken) =>
ValueTask.CompletedTask;
- public ValueTask SendAsync(object message, SendOptions options, CancellationToken cancellationToken)
- {
- SentMessages.Add((message, options));
- return ValueTask.CompletedTask;
- }
+ public ValueTask SendAsync(object message, SendOptions options, CancellationToken cancellationToken) =>
+ ValueTask.CompletedTask;
public ValueTask PublishAsync(T message, CancellationToken cancellationToken) =>
ValueTask.CompletedTask;
- public ValueTask PublishAsync(T message, PublishOptions options, CancellationToken cancellationToken)
- {
- PublishedMessages.Add((message!, options));
- return ValueTask.CompletedTask;
- }
+ public ValueTask PublishAsync(T message, PublishOptions options, CancellationToken cancellationToken) =>
+ ValueTask.CompletedTask;
public ValueTask RequestAsync(
IEventRequest message,
@@ -87,5 +81,46 @@ public ValueTask ReplyAsync(
ReplyOptions options,
CancellationToken cancellationToken) where TResponse : notnull =>
throw new NotSupportedException();
+
+ public ValueTask SchedulePublishAsync(
+ T message,
+ DateTimeOffset scheduledTime,
+ CancellationToken cancellationToken) where T : notnull
+ {
+ ScheduledPublishMessages.Add((message, scheduledTime));
+ return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime });
+ }
+
+ public ValueTask SchedulePublishAsync(
+ T message,
+ DateTimeOffset scheduledTime,
+ PublishOptions options,
+ CancellationToken cancellationToken) where T : notnull
+ {
+ ScheduledPublishMessages.Add((message, scheduledTime));
+ return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime });
+ }
+
+ public ValueTask ScheduleSendAsync(
+ object message,
+ DateTimeOffset scheduledTime,
+ CancellationToken cancellationToken)
+ {
+ ScheduledSendMessages.Add((message, scheduledTime));
+ return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime });
+ }
+
+ public ValueTask ScheduleSendAsync(
+ object message,
+ DateTimeOffset scheduledTime,
+ SendOptions options,
+ CancellationToken cancellationToken)
+ {
+ ScheduledSendMessages.Add((message, scheduledTime));
+ return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime });
+ }
+
+ public ValueTask CancelScheduledMessageAsync(string token, CancellationToken cancellationToken) =>
+ ValueTask.FromResult(false);
}
}
diff --git a/src/Mocha/test/Mocha.Tests/Scheduling/SchedulingMiddlewareIntegrationTests.cs b/src/Mocha/test/Mocha.Tests/Scheduling/SchedulingMiddlewareIntegrationTests.cs
index 89651356c56..4a64914dfd7 100644
--- a/src/Mocha/test/Mocha.Tests/Scheduling/SchedulingMiddlewareIntegrationTests.cs
+++ b/src/Mocha/test/Mocha.Tests/Scheduling/SchedulingMiddlewareIntegrationTests.cs
@@ -134,6 +134,134 @@ await bus.PublishAsync(
Assert.Empty(store.Entries);
}
+ [Fact]
+ public async Task SchedulePublishAsync_Should_ReturnCancellableResult_When_StoreRegistered()
+ {
+ // arrange
+ var timeProvider = new FakeTimeProvider();
+ var store = new InMemoryScheduledMessageStore();
+ await using var provider = await CreateBusWithSchedulingAsync(
+ store,
+ _ => { },
+ timeProvider);
+
+ using var scope = provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+
+ var scheduledTime = timeProvider.GetUtcNow().AddMinutes(10);
+
+ // act
+ var result = await bus.SchedulePublishAsync(
+ new SchedulingTestEvent { Payload = "schedule-me" },
+ scheduledTime,
+ CancellationToken.None);
+
+ // assert
+ Assert.True(result.IsCancellable);
+ Assert.NotNull(result.Token);
+ Assert.Equal(scheduledTime, result.ScheduledTime);
+ await WaitUntilAsync(() => !store.Entries.IsEmpty, s_timeout);
+ Assert.Single(store.Entries);
+ }
+
+ [Fact]
+ public async Task CancelScheduledMessageAsync_Should_RemoveFromStore_When_ValidToken()
+ {
+ // arrange
+ var timeProvider = new FakeTimeProvider();
+ var store = new InMemoryScheduledMessageStore();
+ await using var provider = await CreateBusWithSchedulingAndProviderAsync(
+ store,
+ _ => { },
+ timeProvider);
+
+ using var scope = provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+
+ var scheduledTime = timeProvider.GetUtcNow().AddMinutes(10);
+ var result = await bus.SchedulePublishAsync(
+ new SchedulingTestEvent { Payload = "cancel-me" },
+ scheduledTime,
+ CancellationToken.None);
+
+ await WaitUntilAsync(() => store.TrackedCount > 0, s_timeout);
+ Assert.Equal(1, store.TrackedCount);
+
+ // act
+ var cancelled = await bus.CancelScheduledMessageAsync(result.Token!, CancellationToken.None);
+
+ // assert
+ Assert.True(cancelled);
+ Assert.Equal(0, store.TrackedCount);
+ }
+
+ [Fact]
+ public async Task CancelScheduledMessageAsync_Should_ReturnFalse_When_AlreadyCancelled()
+ {
+ // arrange
+ var timeProvider = new FakeTimeProvider();
+ var store = new InMemoryScheduledMessageStore();
+ await using var provider = await CreateBusWithSchedulingAndProviderAsync(
+ store,
+ _ => { },
+ timeProvider);
+
+ using var scope = provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+
+ var scheduledTime = timeProvider.GetUtcNow().AddMinutes(10);
+ var result = await bus.SchedulePublishAsync(
+ new SchedulingTestEvent { Payload = "cancel-twice" },
+ scheduledTime,
+ CancellationToken.None);
+
+ await WaitUntilAsync(() => store.TrackedCount > 0, s_timeout);
+
+ // act
+ var firstCancel = await bus.CancelScheduledMessageAsync(result.Token!, CancellationToken.None);
+ var secondCancel = await bus.CancelScheduledMessageAsync(result.Token!, CancellationToken.None);
+
+ // assert
+ Assert.True(firstCancel);
+ Assert.False(secondCancel);
+ }
+
+ [Fact]
+ public async Task SchedulePublishAsync_Should_ReturnNonCancellable_When_NoStoreRegistered()
+ {
+ // arrange
+ var timeProvider = new FakeTimeProvider();
+
+ var services = new ServiceCollection();
+ services.AddSingleton(timeProvider);
+
+ var builder = services.AddMessageBus();
+ builder.UseSchedulerCore();
+ builder.AddInMemory();
+
+ var provider = services.BuildServiceProvider();
+ var runtime = (MessagingRuntime)provider.GetRequiredService();
+ await runtime.StartAsync(CancellationToken.None);
+
+ await using (provider)
+ {
+ using var scope = provider.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+
+ var scheduledTime = timeProvider.GetUtcNow().AddMinutes(10);
+
+ // act
+ var result = await bus.SchedulePublishAsync(
+ new SchedulingTestEvent { Payload = "no-store" },
+ scheduledTime,
+ CancellationToken.None);
+
+ // assert
+ Assert.False(result.IsCancellable);
+ Assert.Null(result.Token);
+ }
+ }
+
[Fact]
public async Task Scheduling_Should_PassThrough_When_NoStoreRegistered()
{
@@ -221,6 +349,34 @@ private static async Task CreateBusWithSchedulingAsync(
return provider;
}
+ private static async Task CreateBusWithSchedulingAndProviderAsync(
+ InMemoryScheduledMessageStore store,
+ Action configure,
+ TimeProvider? timeProvider = null)
+ {
+ var services = new ServiceCollection();
+ services.AddScoped(_ => store);
+ services.AddSingleton();
+
+ if (timeProvider is not null)
+ {
+ services.AddSingleton(timeProvider);
+ }
+
+ var builder = services.AddMessageBus();
+
+ // Register middleware directly (bypassing factory-time DI check)
+ builder.ConfigureMessageBus(x => x.UseDispatch(CreateSchedulingMiddleware()));
+
+ configure(builder);
+ builder.AddInMemory();
+
+ var provider = services.BuildServiceProvider();
+ var runtime = (MessagingRuntime)provider.GetRequiredService();
+ await runtime.StartAsync(CancellationToken.None);
+ return provider;
+ }
+
public sealed class SchedulingTestEvent
{
public required string Payload { get; init; }
@@ -242,6 +398,7 @@ public ValueTask HandleAsync(SchedulingTestEvent message, CancellationToken canc
public sealed class InMemoryScheduledMessageStore : IScheduledMessageStore
{
private readonly ISchedulerSignal? _signal;
+ private readonly ConcurrentDictionary _entriesById = new();
public InMemoryScheduledMessageStore(ISchedulerSignal? signal = null)
{
@@ -250,12 +407,28 @@ public InMemoryScheduledMessageStore(ISchedulerSignal? signal = null)
public ConcurrentBag<(MessageEnvelope Envelope, DateTimeOffset ScheduledTime)> Entries { get; } = [];
- public ValueTask PersistAsync(MessageEnvelope envelope, DateTimeOffset scheduledTime, CancellationToken cancellationToken)
+ public ValueTask PersistAsync(MessageEnvelope envelope, DateTimeOffset scheduledTime, CancellationToken cancellationToken)
{
+ var id = Guid.NewGuid().ToString();
Entries.Add((envelope, scheduledTime));
+ _entriesById[id] = (envelope, scheduledTime);
_signal?.Notify(scheduledTime);
- return ValueTask.CompletedTask;
+ var token = $"in-memory:{id}";
+ return ValueTask.FromResult(token);
}
+
+ public ValueTask CancelAsync(string token, CancellationToken cancellationToken)
+ {
+ var value = token.StartsWith("in-memory:", StringComparison.Ordinal)
+ ? token["in-memory:".Length..]
+ : token;
+ var removed = _entriesById.TryRemove(value, out _);
+ return ValueTask.FromResult(removed);
+ }
+
+ public bool HasEntry(string id) => _entriesById.ContainsKey(id);
+
+ public int TrackedCount => _entriesById.Count;
}
///
diff --git a/website/src/docs/mocha/v1/scheduling.md b/website/src/docs/mocha/v1/scheduling.md
index 5217fa570d8..38fc77c9475 100644
--- a/website/src/docs/mocha/v1/scheduling.md
+++ b/website/src/docs/mocha/v1/scheduling.md
@@ -1,24 +1,26 @@
---
title: "Scheduling"
-description: "Schedule messages for future delivery in Mocha using absolute times or relative delays, with durable Postgres persistence or in-memory scheduling for development."
+description: "Schedule messages for future delivery in Mocha using absolute times or relative delays, with durable Postgres persistence or in-memory scheduling for development. Cancel scheduled messages before they are dispatched."
---
# Scheduling
-Sometimes a message should not be delivered right now. A welcome email goes out 30 minutes after signup. A payment retry fires 24 hours after the first failure. A saga timeout triggers if no response arrives within 5 minutes. Scheduling lets you hand a message to the bus with a future delivery time, and the infrastructure takes care of the rest.
+Sometimes a message should not be delivered right now. A welcome email goes out 30 minutes after signup. A payment retry fires 24 hours after the first failure. A saga timeout triggers if no response arrives within 5 minutes. Scheduling lets you hand a message to the bus with a future delivery time, and the infrastructure takes care of the rest. If plans change, you can cancel a scheduled message before it is dispatched.
```csharp
-await bus.SchedulePublishAsync(
+var result = await bus.SchedulePublishAsync(
new SendWelcomeEmail { UserId = userId },
DateTimeOffset.UtcNow.AddMinutes(30),
cancellationToken);
+
+// result.Token can be used to cancel the message later
```
-The call returns immediately. The message is persisted and delivered when the scheduled time arrives.
+The call returns immediately with a `SchedulingResult`. The message is persisted and delivered when the scheduled time arrives. If you need to revoke the message before delivery, pass the token to `CancelScheduledMessageAsync`.
# Schedule a message
-Mocha provides convenience extension methods on `IMessageBus` for scheduling with an absolute `DateTimeOffset`.
+Mocha provides scheduling methods on `IMessageBus` for scheduling with an absolute `DateTimeOffset`.
## Schedule with an absolute time
@@ -28,28 +30,30 @@ Use a `DateTimeOffset` when you know the exact delivery time:
var scheduledTime = DateTimeOffset.UtcNow.AddHours(24);
// Schedule a publish (fan-out to all subscribers)
-await bus.SchedulePublishAsync(
+var publishResult = await bus.SchedulePublishAsync(
new PaymentRetryEvent { OrderId = orderId },
scheduledTime,
cancellationToken);
// Schedule a send (directed to a single handler)
-await bus.ScheduleSendAsync(
+var sendResult = await bus.ScheduleSendAsync(
new CleanupExpiredSessionsCommand { CutoffTime = cutoff },
scheduledTime,
cancellationToken);
```
-# Schedule with options
+Both methods return a `SchedulingResult` with a `Token` you can use for cancellation and an `IsCancellable` flag that tells you whether cancellation is supported by the current scheduling infrastructure.
+
+## Schedule with options
-The convenience methods are wrappers around `PublishOptions` and `SendOptions`. If you need to combine scheduling with other options like expiration or custom headers, set `ScheduledTime` directly on the options struct:
+The scheduling methods also accept an options overload. If you need to combine scheduling with other options like expiration or custom headers, pass a `PublishOptions` or `SendOptions` struct:
```csharp
-await bus.PublishAsync(
+var result = await bus.SchedulePublishAsync(
new PaymentRetryEvent { OrderId = orderId },
+ DateTimeOffset.UtcNow.AddHours(24),
new PublishOptions
{
- ScheduledTime = DateTimeOffset.UtcNow.AddHours(24),
ExpirationTime = DateTimeOffset.UtcNow.AddHours(48),
Headers = new Dictionary { ["priority"] = "high" }
},
@@ -57,17 +61,117 @@ await bus.PublishAsync(
```
```csharp
-await bus.SendAsync(
+var result = await bus.ScheduleSendAsync(
new RetryPaymentCommand { PaymentId = paymentId },
+ DateTimeOffset.UtcNow.AddMinutes(30),
new SendOptions
{
- ScheduledTime = DateTimeOffset.UtcNow.AddMinutes(30),
ExpirationTime = DateTimeOffset.UtcNow.AddHours(1)
},
cancellationToken);
```
-This gives full control over the dispatch options while still routing through the scheduling middleware.
+You can also set `ScheduledTime` directly on options when calling `PublishAsync` or `SendAsync`. This approach does not return a `SchedulingResult`, so you cannot cancel the message later:
+
+```csharp
+await bus.PublishAsync(
+ new PaymentRetryEvent { OrderId = orderId },
+ new PublishOptions
+ {
+ ScheduledTime = DateTimeOffset.UtcNow.AddHours(24),
+ ExpirationTime = DateTimeOffset.UtcNow.AddHours(48),
+ },
+ cancellationToken);
+```
+
+# Cancel a scheduled message
+
+When a scheduled message is no longer needed, cancel it before the scheduled time arrives. The `SchedulingResult` returned by `SchedulePublishAsync` and `ScheduleSendAsync` contains the token you need.
+
+```csharp
+// Schedule a payment reminder
+var result = await bus.SchedulePublishAsync(
+ new PaymentReminderEvent { OrderId = orderId },
+ DateTimeOffset.UtcNow.AddHours(24),
+ cancellationToken);
+
+// Customer pays before the reminder fires - cancel it
+var cancelled = await bus.CancelScheduledMessageAsync(
+ result.Token!,
+ cancellationToken);
+```
+
+`CancelScheduledMessageAsync` returns `true` if the message was successfully cancelled and `false` otherwise.
+
+## When cancellation returns false
+
+A `false` return does not necessarily mean something went wrong. It means the message is no longer in the store:
+
+- **Already dispatched.** The scheduled time passed and the message was delivered. The cancellation window has closed.
+- **Already cancelled.** A previous call already removed the message. Cancelling twice is safe - the second call returns `false`.
+- **Token not found.** The token does not match any message in the store.
+
+## SchedulingResult
+
+Every call to `SchedulePublishAsync` or `ScheduleSendAsync` returns a `SchedulingResult`:
+
+```csharp
+var result = await bus.SchedulePublishAsync(message, scheduledTime, cancellationToken);
+
+if (result.IsCancellable)
+{
+ // Store the token so you can cancel later
+ await SaveTokenAsync(result.Token!);
+}
+```
+
+| Property | Type | Description |
+| --------------- | ---------------- | ----------------------------------------------------------------------------------------- |
+| `Token` | `string?` | An opaque token for cancelling this message, or `null` if cancellation is not supported. |
+| `ScheduledTime` | `DateTimeOffset` | The time at which the message is scheduled for delivery. |
+| `IsCancellable` | `bool` | `true` when the scheduling infrastructure supports cancellation and a token was assigned. |
+
+`IsCancellable` is `true` when a store-based scheduling provider (like Postgres) is registered. If no store is registered, the message is still scheduled (through the transport's native scheduling), but cancellation is not available.
+
+## Real-world example: cancellable reminder
+
+A common pattern is scheduling a reminder that should be revoked when the user completes the expected action.
+
+```csharp
+public class OrderService(IMessageBus bus, IOrderRepository orders)
+{
+ public async Task PlaceOrderAsync(Order order, CancellationToken ct)
+ {
+ await orders.SaveAsync(order, ct);
+
+ // Remind the customer to pay in 24 hours
+ var result = await bus.SchedulePublishAsync(
+ new PaymentReminderEvent { OrderId = order.Id },
+ DateTimeOffset.UtcNow.AddHours(24),
+ ct);
+
+ // Persist the token so we can cancel later
+ if (result.IsCancellable)
+ {
+ order.ReminderToken = result.Token;
+ await orders.SaveAsync(order, ct);
+ }
+ }
+
+ public async Task ConfirmPaymentAsync(Guid orderId, CancellationToken ct)
+ {
+ var order = await orders.GetAsync(orderId, ct);
+
+ // Payment received - cancel the reminder
+ if (order.ReminderToken is not null)
+ {
+ await bus.CancelScheduledMessageAsync(order.ReminderToken, ct);
+ order.ReminderToken = null;
+ await orders.SaveAsync(order, ct);
+ }
+ }
+}
+```
# Set up store-based scheduling for RabbitMQ
@@ -104,17 +208,14 @@ builder.Services
.AddPostgres(connectionString);
```
-| Call | Purpose |
-| -------------------------------- | -------------------------------------------------------------------------------------------------------- |
-| `UsePostgresScheduling()` | Registers the background worker, scheduled message store, dispatch middleware, and EF Core interceptors. |
-| `AddPostgresScheduledMessages()` | Adds the `ScheduledMessage` entity configuration to the EF Core model. |
+| Call | Purpose |
+| -------------------------------- | --------------------------------------------------------------------------------------------------------------------------- |
+| `UsePostgresScheduling()` | Registers everything needed for durable scheduling with Postgres, including the background worker and EF Core interceptors. |
+| `AddPostgresScheduledMessages()` | Adds the `ScheduledMessage` entity configuration to the EF Core model. |
-`UsePostgresScheduling()` wires up the full pipeline:
+`UsePostgresScheduling()` sets up everything needed to persist scheduled messages in Postgres and dispatch them at the right time. Outgoing messages with a `ScheduledTime` are intercepted and written to the `scheduled_messages` table instead of being sent to the transport. A background worker continuously polls for due messages and dispatches them through the bus. EF Core interceptors signal the worker when `SaveChanges` or a transaction commit occurs, enabling low-latency wake-up.
-- A **dispatch middleware** that intercepts outgoing messages with a `ScheduledTime` and persists them to the store instead of sending them to the transport.
-- An **`IScheduledMessageStore`** implementation that writes scheduled message rows using direct Npgsql inserts within the current EF Core transaction.
-- A **background worker** that continuously polls for due messages and dispatches them through the bus.
-- **EF Core interceptors** that signal the scheduler when `SaveChanges` or a transaction commit occurs, enabling low-latency wake-up.
+When `UsePostgresScheduling()` is configured, `SchedulePublishAsync` and `ScheduleSendAsync` return cancellable results with tokens you can pass to `CancelScheduledMessageAsync`.
**4. Create the database migration.**
@@ -127,19 +228,19 @@ dotnet ef database update
# Transport scheduling behavior
-Each transport handles scheduling differently. The dispatch scheduling middleware adapts automatically based on what the transport supports.
+Each transport handles scheduling differently. Mocha adapts automatically based on what the transport supports.
-| Transport | Scheduling type | Durability | Setup required |
-| ---------- | ------------------------------------- | ---------------------------- | ----------------------------------------- |
-| InMemory | Native (in-process scheduler) | Non-durable, lost on restart | None |
-| PostgreSQL | Native (scheduled_time column) | Durable, survives restarts | None |
-| RabbitMQ | Store-based (via Postgres middleware) | Durable with Postgres store | `UsePostgresScheduling()` + EF Core model |
+| Transport | Scheduling type | Durability | Cancellation support | Setup required |
+| ---------- | ------------------------------------- | ---------------------------- | ------------------------------------ | ----------------------------------------- |
+| InMemory | Native (in-process scheduler) | Non-durable, lost on restart | No | None |
+| PostgreSQL | Native (scheduled_time column) | Durable, survives restarts | No | None |
+| RabbitMQ | Store-based (via Postgres middleware) | Durable with Postgres store | Yes (with `UsePostgresScheduling()`) | `UsePostgresScheduling()` + EF Core model |
-**InMemory:** The transport schedules messages natively using an internal scheduler. Messages scheduled for a time in the past are delivered immediately. Scheduled messages are lost if the process restarts.
+**InMemory:** The transport schedules messages natively using an internal scheduler. Messages scheduled for a time in the past are delivered immediately. Scheduled messages are lost if the process restarts. Cancellation is not supported.
-**PostgreSQL:** The transport handles scheduling natively. When you set `ScheduledTime`, the transport writes a `scheduled_time` column alongside the message. Messages are only delivered to consumers after the scheduled time has passed. No additional setup is required beyond the standard [PostgreSQL transport configuration](/docs/mocha/v1/transports/postgres).
+**PostgreSQL:** The transport handles scheduling natively. When you set `ScheduledTime`, the transport writes a `scheduled_time` column alongside the message. Messages are only delivered to consumers after the scheduled time has passed. No additional setup is required beyond the standard [PostgreSQL transport configuration](/docs/mocha/v1/transports/postgres). Cancellation is not supported with native scheduling.
-**RabbitMQ:** RabbitMQ does not support native message scheduling. To enable scheduling, register `UsePostgresScheduling()` with an EF Core DbContext. The dispatch middleware intercepts scheduled messages before they reach the RabbitMQ transport and persists them to a Postgres `scheduled_messages` table. A background worker dispatches them at the scheduled time, routing through the RabbitMQ transport.
+**RabbitMQ:** RabbitMQ does not support native message scheduling. To enable scheduling, register `UsePostgresScheduling()` with an EF Core DbContext. Scheduled messages are intercepted before they reach the RabbitMQ transport and persisted to a Postgres `scheduled_messages` table. A background worker dispatches them at the scheduled time, routing through the RabbitMQ transport. Cancellation is fully supported - the `SchedulingResult` contains a token you can use with `CancelScheduledMessageAsync`.
## Retry behavior
@@ -151,7 +252,7 @@ When multiple instances of your service are running, each scheduled message is p
## Outbox integration
-When both the transactional outbox and scheduling are configured, scheduled messages participate in the transaction correctly. The scheduling middleware runs in the dispatch pipeline before the outbox middleware. Messages with a `ScheduledTime` are intercepted by the scheduler and never reach the outbox. Messages dispatched by the background worker skip both the scheduler and the outbox, going directly to the transport. See [Reliability](/docs/mocha/v1/reliability) for outbox configuration.
+When both the transactional outbox and scheduling are configured, scheduled messages participate in the transaction correctly. Messages with a `ScheduledTime` are intercepted by the scheduler and never reach the outbox. Messages dispatched by the background worker skip both the scheduler and the outbox, going directly to the transport. See [Reliability](/docs/mocha/v1/reliability) for outbox configuration.
# Schedule messages in sagas
@@ -199,14 +300,23 @@ See [Sagas](/docs/mocha/v1/sagas) for the full saga configuration guide.
# API reference
-## Extension methods on `IMessageBus`
+## Scheduling methods on `IMessageBus`
-| Method | Parameters | Description |
-| ------------------------- | -------------------------------------------------------------------- | ----------------------------------------------------- |
-| `SchedulePublishAsync` | `T message, DateTimeOffset scheduledTime, CancellationToken ct` | Publishes a message for delivery at an absolute time. |
-| `ScheduleSendAsync` | `object message, DateTimeOffset scheduledTime, CancellationToken ct` | Sends a message for delivery at an absolute time. |
+| Method | Parameters | Returns | Description |
+| ----------------------------- | ----------------------------------------------------------------------------------------- | ----------------------------- | ------------------------------------------------------------------------------------------------------------------------- |
+| `SchedulePublishAsync` | `T message, DateTimeOffset scheduledTime, CancellationToken ct` | `ValueTask` | Publishes a message for delivery at an absolute time. |
+| `SchedulePublishAsync` | `T message, DateTimeOffset scheduledTime, PublishOptions options, CancellationToken ct` | `ValueTask` | Publishes a message with scheduling and publish options. |
+| `ScheduleSendAsync` | `object message, DateTimeOffset scheduledTime, CancellationToken ct` | `ValueTask` | Sends a message for delivery at an absolute time. |
+| `ScheduleSendAsync` | `object message, DateTimeOffset scheduledTime, SendOptions options, CancellationToken ct` | `ValueTask` | Sends a message with scheduling and send options. |
+| `CancelScheduledMessageAsync` | `string token, CancellationToken ct` | `ValueTask` | Cancels a scheduled message. Returns `true` if cancelled, `false` if already dispatched, already cancelled, or not found. |
-All methods return `ValueTask` and complete when the message has been handed to the scheduling infrastructure.
+## `SchedulingResult`
+
+| Property | Type | Description |
+| --------------- | ---------------- | ----------------------------------------------------------------------------------------- |
+| `Token` | `string?` | An opaque token for cancelling this message, or `null` if cancellation is not supported. |
+| `ScheduledTime` | `DateTimeOffset` | The time at which the message is scheduled for delivery. |
+| `IsCancellable` | `bool` | `true` when the scheduling infrastructure supports cancellation and a token was assigned. |
## Scheduling properties on options
@@ -244,9 +354,9 @@ All methods return `ValueTask` and complete when the message has been handed to
## Scheduling service registration
-| Method | Description |
-| ------------------------- | ----------------------------------------------------------------------------------------------------------- |
-| `UsePostgresScheduling()` | Registers the Postgres scheduling pipeline: store, dispatcher, background worker, and EF Core interceptors. |
+| Method | Description |
+| ------------------------- | ------------------------------------------------------------------------------------------------------- |
+| `UsePostgresScheduling()` | Registers the Postgres scheduling pipeline: message store, background worker, and EF Core interceptors. |
# Troubleshooting
@@ -271,6 +381,12 @@ WHERE times_sent >= max_attempts;
**Multiple service instances dispatch the same message.**
This does not happen. The dispatcher uses row-level locking to ensure each message is processed by exactly one instance.
+**Cancellation returns false even though I have a valid token.**
+The message was already dispatched before the cancellation request reached the store. Once the background worker picks up a message and delivers it, the row is deleted and cancellation is no longer possible. If you need a wider cancellation window, schedule messages further in the future or check `SchedulingResult.IsCancellable` to confirm the infrastructure supports cancellation.
+
+**`SchedulingResult.IsCancellable` is false.**
+No store-based scheduling provider is registered. Cancellation requires a provider like `UsePostgresScheduling()` that persists messages to a store. Transports with native scheduling (InMemory, PostgreSQL) do not support cancellation. If you need cancellation support, configure `UsePostgresScheduling()` with an EF Core DbContext.
+
# Next steps
- [**Reliability**](/docs/mocha/v1/reliability) - Configure the transactional outbox and inbox for guaranteed delivery alongside scheduling.