Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/Mocha/Mocha.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
<Project Path="src/Mocha.Mediator.Abstractions/Mocha.Mediator.Abstractions.csproj" />
<Project Path="src/Mocha.Mediator/Mocha.Mediator.csproj" />
<Project Path="src/Mocha.Outbox/Mocha.Outbox.csproj" />
<Project Path="src/Mocha.Scheduling/Mocha.Scheduling.csproj" />
<Project Path="src/Mocha.Threading/Mocha.Threading.csproj" />
<Project Path="src/Mocha.Mediator/Mocha.Mediator.csproj" />
<Project Path="src/Mocha.Analyzers/Mocha.Analyzers.csproj" />
<Project Path="src/Mocha.Transport.InMemory/Mocha.Transport.InMemory.csproj" />
<Project Path="src/Mocha.Transport.Postgres/Mocha.Transport.Postgres.csproj" />
<Project Path="src/Mocha.Transport.RabbitMQ/Mocha.Transport.RabbitMQ.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
<ProjectReference Include="..\Mocha\Mocha.csproj" />
<ProjectReference Include="..\Mocha.EntityFrameworkCore\Mocha.EntityFrameworkCore.csproj" />
<ProjectReference Include="..\Mocha.Inbox\Mocha.Inbox.csproj" />
<ProjectReference Include="..\Mocha.Scheduling\Mocha.Scheduling.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,32 @@ namespace Mocha.Scheduling;
/// Implements <see cref="IScheduledMessageStore"/> for Postgres by inserting serialized message envelopes
/// into the scheduled messages table using raw SQL through the DbContext Npgsql connection.
/// </summary>
internal sealed class EfCoreScheduledMessageStore : IScheduledMessageStore, IDisposable
/// <remarks>
/// Creates a new <see cref="EfCoreScheduledMessageStore"/> using the provided DbContext connection,
/// scheduler signal, and pre-built SQL statements.
/// </remarks>
/// <param name="originalDbContext">The DbContext whose underlying Npgsql connection is used for operations.</param>
/// <param name="signal">The signal used to wake the scheduler after a message is persisted.</param>
/// <param name="insertSql">The parameterized SQL insert statement for the scheduled messages table.</param>
/// <param name="cancelSql">The parameterized SQL delete statement for cancelling a scheduled message.</param>
internal sealed class EfCoreScheduledMessageStore(
DbContext originalDbContext,
ISchedulerSignal signal,
string insertSql,
string cancelSql) : IScheduledMessageStore, IDisposable
{
private readonly DbContext _originalDbContext;
private readonly ISchedulerSignal _signal;
private const string ProviderPrefix = "postgres-scheduler:";
private readonly SemaphoreSlim _semaphore = new(1, 1);
private readonly string? _insertSql;
private PooledArrayWriter? _arrayWriter;

/// <summary>
/// Creates a new <see cref="EfCoreScheduledMessageStore"/> using the provided DbContext connection,
/// scheduler signal, and pre-built insert SQL.
/// </summary>
/// <param name="originalDbContext">The DbContext whose underlying Npgsql connection is used for inserts.</param>
/// <param name="signal">The signal used to wake the scheduler after a message is persisted.</param>
/// <param name="insertSql">The parameterized SQL insert statement for the scheduled messages table.</param>
public EfCoreScheduledMessageStore(DbContext originalDbContext, ISchedulerSignal signal, string insertSql)
{
_originalDbContext = originalDbContext;
_signal = signal;
_insertSql = insertSql;
}

/// <summary>
/// Serializes the message envelope and inserts it into the Postgres scheduled messages table.
/// </summary>
/// <param name="envelope">The message envelope to persist.</param>
/// <param name="scheduledTime">The time at which the message should be dispatched.</param>
/// <param name="cancellationToken">A token to observe for cancellation.</param>
public async ValueTask PersistAsync(
/// <returns>An opaque token string for later cancellation.</returns>
public async ValueTask<string> PersistAsync(
MessageEnvelope envelope,
DateTimeOffset scheduledTime,
CancellationToken cancellationToken)
Expand All @@ -54,27 +51,28 @@ public async ValueTask PersistAsync(
{
_arrayWriter ??= new PooledArrayWriter();

var connection = (NpgsqlConnection)_originalDbContext.Database.GetDbConnection();
var connection = (NpgsqlConnection)originalDbContext.Database.GetDbConnection();

if (connection.State != ConnectionState.Open)
{
await connection.OpenAsync(cancellationToken);
}

var transaction = _originalDbContext.Database.CurrentTransaction?.GetDbTransaction() as NpgsqlTransaction;
var transaction = originalDbContext.Database.CurrentTransaction?.GetDbTransaction() as NpgsqlTransaction;

await using var writer = new Utf8JsonWriter(_arrayWriter);
writer.WriteEnvelope(envelope);
writer.Flush(); // we know it's not async

// Execute the INSERT command
var id = NewVersion();
await using var command = connection.CreateCommand();
command.CommandText = _insertSql;
command.CommandText = insertSql;
if (transaction is not null)
{
command.Transaction = transaction;
}
command.Parameters.AddWithValue("@id", NewVersion());
command.Parameters.AddWithValue("@id", id);
command.Parameters.Add(
new NpgsqlParameter("@envelope", NpgsqlDbType.Json) { Value = _arrayWriter.WrittenMemory });
command.Parameters.AddWithValue("@scheduled_time", scheduledTime.UtcDateTime);
Expand All @@ -84,8 +82,10 @@ public async ValueTask PersistAsync(

if (transaction is null)
{
_signal.Notify(scheduledTime);
signal.Notify(scheduledTime);
}

return $"{ProviderPrefix}{id}";
}
finally
{
Expand All @@ -94,6 +94,54 @@ public async ValueTask PersistAsync(
}
}

/// <summary>
/// Cancels a scheduled message by deleting it from the store.
/// </summary>
/// <param name="token">The opaque scheduling token returned by <see cref="PersistAsync"/>.</param>
/// <param name="cancellationToken">A token to observe for cancellation.</param>
/// <returns><c>true</c> if the message was cancelled; <c>false</c> if not found or already dispatched.</returns>
public async ValueTask<bool> CancelAsync(string token, CancellationToken cancellationToken)
{
var value = token.StartsWith(ProviderPrefix, StringComparison.Ordinal)
? token[ProviderPrefix.Length..]
: token;

if (!Guid.TryParse(value, out var id))
{
return false;
}

await _semaphore.WaitAsync(cancellationToken);

try
{
var connection = (NpgsqlConnection)originalDbContext.Database.GetDbConnection();

if (connection.State != ConnectionState.Open)
{
await connection.OpenAsync(cancellationToken);
}

var transaction = originalDbContext.Database.CurrentTransaction?.GetDbTransaction() as NpgsqlTransaction;

await using var command = connection.CreateCommand();
command.CommandText = cancelSql;
if (transaction is not null)
{
command.Transaction = transaction;
}
command.Parameters.AddWithValue("@id", id);
await command.PrepareAsync(cancellationToken);

var result = await command.ExecuteScalarAsync(cancellationToken);
return result is not null and not DBNull;
}
finally
{
_semaphore.Release();
}
}

private static Guid NewVersion()
{
#if NET9_0_OR_GREATER
Expand Down Expand Up @@ -127,8 +175,9 @@ public static EfCoreScheduledMessageStore Create(Type contextType, string option
var optionsMonitor = services.GetRequiredService<IOptionsMonitor<PostgresScheduledMessageOptions>>();
var options = optionsMonitor.Get(optionsName);
var insertSql = options.Queries.InsertMessage;
var cancelSql = options.Queries.CancelMessage;

return new EfCoreScheduledMessageStore(dbContext, signal, insertSql);
return new EfCoreScheduledMessageStore(dbContext, signal, insertSql, cancelSql);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ internal sealed class ScheduledMessageQueries
/// </summary>
public string UpdateLastError { get; set; } = null!;

/// <summary>
/// Gets or sets the SQL statement to cancel a scheduled message by deleting it if it has not
/// exceeded its maximum delivery attempts.
/// </summary>
public string CancelMessage { get; set; } = null!;

/// <summary>
/// Creates a new <see cref="ScheduledMessageQueries"/> instance with SQL queries built from the provided table metadata.
/// </summary>
Expand Down Expand Up @@ -82,6 +88,13 @@ DELETE FROM {t.QualifiedTableName}
UPDATE {t.QualifiedTableName}
SET "{t.LastError}" = @last_error::jsonb
WHERE "{t.Id}" = @id;
""",

CancelMessage = $"""
DELETE FROM {t.QualifiedTableName}
WHERE "{t.Id}" = @id
AND "{t.TimesSent}" < "{t.MaxAttempts}"
RETURNING "{t.Id}";
"""
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
<ProjectReference Include="..\Mocha.Abstractions\Mocha.Abstractions.csproj" />
<ProjectReference Include="..\Mocha.Mediator\Mocha.Mediator.csproj" />
<ProjectReference Include="..\Mocha.Outbox\Mocha.Outbox.csproj" />
<ProjectReference Include="..\Mocha.Scheduling\Mocha.Scheduling.csproj" />
<ProjectReference Include="..\Mocha\Mocha.csproj" />
</ItemGroup>
<ItemGroup>
Expand Down
3 changes: 0 additions & 3 deletions src/Mocha/src/Mocha.Scheduling/Assembly.cs

This file was deleted.

10 changes: 0 additions & 10 deletions src/Mocha/src/Mocha.Scheduling/Mocha.Scheduling.csproj

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
<ItemGroup>
<ProjectReference Include="..\Mocha.Abstractions\Mocha.Abstractions.csproj" />
<ProjectReference Include="..\Mocha\Mocha.csproj" />
<ProjectReference Include="..\Mocha.Scheduling\Mocha.Scheduling.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Npgsql" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected override void OnAfterInitialized(IMessagingSetupContext context)
_topology.AddSubscription(subscription);
}

Features.GetOrSet<SchedulingTransportFeature>().SupportsSchedulingNatively = true;
Features.Configure<SchedulingTransportFeature>(f => f.SupportsSchedulingNatively = true);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,24 @@ public static TFeature GetOrSet<TFeature, TState>(
return feature;
}

/// <summary>
/// Gets or creates a feature and applies a configuration action to it.
/// </summary>
/// <typeparam name="TFeature">The feature key.</typeparam>
/// <param name="featureCollection">The feature collection.</param>
/// <param name="configure">The action to apply to the feature.</param>
/// <returns>The configured feature.</returns>
public static TFeature Configure<TFeature>(
this IFeatureCollection featureCollection,
Action<TFeature> configure) where TFeature : new()
{
ArgumentNullException.ThrowIfNull(configure);

var feature = featureCollection.GetOrSet<TFeature>();
configure(feature);
return feature;
}

/// <summary>
/// Updates a feature in the collection by applying a transformation function to the existing value.
/// </summary>
Expand Down
27 changes: 27 additions & 0 deletions src/Mocha/src/Mocha/Features/ScheduledMessageFeature.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Mocha.Features;

namespace Mocha;

/// <summary>
/// A pooled feature that carries the scheduling token assigned by the scheduled message store
/// back to the message bus after pipeline execution.
/// </summary>
public sealed class ScheduledMessageFeature : IPooledFeature
{
/// <summary>
/// Gets or sets the opaque scheduling token returned by the store after persistence.
/// </summary>
public string? Token { get; set; }

/// <inheritdoc />
public void Initialize(object state)
{
Token = null;
}

/// <inheritdoc />
public void Reset()
{
Token = null;
}
}
65 changes: 65 additions & 0 deletions src/Mocha/src/Mocha/IMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,69 @@ ValueTask<TResponse> RequestAsync<TResponse>(
/// <returns>A task that completes when the reply has been handed off to the transport.</returns>
ValueTask ReplyAsync<TResponse>(TResponse response, ReplyOptions options, CancellationToken cancellationToken)
where TResponse : notnull;

/// <summary>
/// Publishes a message scheduled for delivery at the specified time.
/// </summary>
/// <typeparam name="T">The type of the message to publish.</typeparam>
/// <param name="message">The message instance to publish.</param>
/// <param name="scheduledTime">The absolute time at which the message should be delivered.</param>
/// <param name="cancellationToken">A token to cancel the publish operation.</param>
/// <returns>A scheduling result containing the cancellation token and metadata.</returns>
ValueTask<SchedulingResult> SchedulePublishAsync<T>(
T message,
DateTimeOffset scheduledTime,
CancellationToken cancellationToken) where T : notnull;

/// <summary>
/// Publishes a message scheduled for delivery at the specified time with additional options.
/// </summary>
/// <typeparam name="T">The type of the message to publish.</typeparam>
/// <param name="message">The message instance to publish.</param>
/// <param name="scheduledTime">The absolute time at which the message should be delivered.</param>
/// <param name="options">Options controlling publish behavior such as headers and expiration.</param>
/// <param name="cancellationToken">A token to cancel the publish operation.</param>
/// <returns>A scheduling result containing the cancellation token and metadata.</returns>
ValueTask<SchedulingResult> SchedulePublishAsync<T>(
T message,
DateTimeOffset scheduledTime,
PublishOptions options,
CancellationToken cancellationToken) where T : notnull;

/// <summary>
/// Sends a message scheduled for delivery at the specified time.
/// </summary>
/// <param name="message">The message instance to send.</param>
/// <param name="scheduledTime">The absolute time at which the message should be delivered.</param>
/// <param name="cancellationToken">A token to cancel the send operation.</param>
/// <returns>A scheduling result containing the cancellation token and metadata.</returns>
ValueTask<SchedulingResult> ScheduleSendAsync(
object message,
DateTimeOffset scheduledTime,
CancellationToken cancellationToken);

/// <summary>
/// Sends a message scheduled for delivery at the specified time with additional options.
/// </summary>
/// <param name="message">The message instance to send.</param>
/// <param name="scheduledTime">The absolute time at which the message should be delivered.</param>
/// <param name="options">Options controlling send behavior such as headers and expiration.</param>
/// <param name="cancellationToken">A token to cancel the send operation.</param>
/// <returns>A scheduling result containing the cancellation token and metadata.</returns>
ValueTask<SchedulingResult> ScheduleSendAsync(
object message,
DateTimeOffset scheduledTime,
SendOptions options,
CancellationToken cancellationToken);

/// <summary>
/// Cancels a previously scheduled message. Returns <c>true</c> if the message was cancelled,
/// <c>false</c> if it was already dispatched, already cancelled, or not found.
/// </summary>
/// <param name="token">The opaque scheduling token returned by a prior schedule operation.</param>
/// <param name="cancellationToken">A token to cancel the cancellation operation.</param>
/// <returns><c>true</c> if the scheduled message was cancelled; otherwise <c>false</c>.</returns>
ValueTask<bool> CancelScheduledMessageAsync(
string token,
CancellationToken cancellationToken);
}
Loading
Loading