Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/Mocha/Mocha.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
<Project Path="src/Mocha.Mediator.Abstractions/Mocha.Mediator.Abstractions.csproj" />
<Project Path="src/Mocha.Mediator/Mocha.Mediator.csproj" />
<Project Path="src/Mocha.Outbox/Mocha.Outbox.csproj" />
<Project Path="src/Mocha.Scheduling/Mocha.Scheduling.csproj" />
<Project Path="src/Mocha.Threading/Mocha.Threading.csproj" />
<Project Path="src/Mocha.Mediator/Mocha.Mediator.csproj" />
<Project Path="src/Mocha.Analyzers/Mocha.Analyzers.csproj" />
<Project Path="src/Mocha.Transport.InMemory/Mocha.Transport.InMemory.csproj" />
<Project Path="src/Mocha.Transport.Postgres/Mocha.Transport.Postgres.csproj" />
<Project Path="src/Mocha.Transport.RabbitMQ/Mocha.Transport.RabbitMQ.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
<ProjectReference Include="..\Mocha\Mocha.csproj" />
<ProjectReference Include="..\Mocha.EntityFrameworkCore\Mocha.EntityFrameworkCore.csproj" />
<ProjectReference Include="..\Mocha.Inbox\Mocha.Inbox.csproj" />
<ProjectReference Include="..\Mocha.Scheduling\Mocha.Scheduling.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@ internal sealed class EfCoreScheduledMessageStore : IScheduledMessageStore, IDis
private readonly ISchedulerSignal _signal;
private readonly SemaphoreSlim _semaphore = new(1, 1);
private readonly string? _insertSql;
private readonly string _cancelSql;
private PooledArrayWriter? _arrayWriter;

/// <summary>
/// Creates a new <see cref="EfCoreScheduledMessageStore"/> using the provided DbContext connection,
/// scheduler signal, and pre-built insert SQL.
/// scheduler signal, and pre-built SQL statements.
/// </summary>
/// <param name="originalDbContext">The DbContext whose underlying Npgsql connection is used for inserts.</param>
/// <param name="originalDbContext">The DbContext whose underlying Npgsql connection is used for operations.</param>
/// <param name="signal">The signal used to wake the scheduler after a message is persisted.</param>
/// <param name="insertSql">The parameterized SQL insert statement for the scheduled messages table.</param>
public EfCoreScheduledMessageStore(DbContext originalDbContext, ISchedulerSignal signal, string insertSql)
/// <param name="cancelSql">The parameterized SQL delete statement for cancelling a scheduled message.</param>
public EfCoreScheduledMessageStore(DbContext originalDbContext, ISchedulerSignal signal, string insertSql, string cancelSql)
{
_originalDbContext = originalDbContext;
_signal = signal;
_insertSql = insertSql;
_cancelSql = cancelSql;
}

/// <summary>
Expand All @@ -43,7 +46,8 @@ public EfCoreScheduledMessageStore(DbContext originalDbContext, ISchedulerSignal
/// <param name="envelope">The message envelope to persist.</param>
/// <param name="scheduledTime">The time at which the message should be dispatched.</param>
/// <param name="cancellationToken">A token to observe for cancellation.</param>
public async ValueTask PersistAsync(
/// <returns>An opaque token string for later cancellation.</returns>
public async ValueTask<string> PersistAsync(
MessageEnvelope envelope,
DateTimeOffset scheduledTime,
CancellationToken cancellationToken)
Expand All @@ -68,13 +72,14 @@ public async ValueTask PersistAsync(
writer.Flush(); // we know it's not async

// Execute the INSERT command
var id = NewVersion();
await using var command = connection.CreateCommand();
command.CommandText = _insertSql;
if (transaction is not null)
{
command.Transaction = transaction;
}
command.Parameters.AddWithValue("@id", NewVersion());
command.Parameters.AddWithValue("@id", id);
command.Parameters.Add(
new NpgsqlParameter("@envelope", NpgsqlDbType.Json) { Value = _arrayWriter.WrittenMemory });
command.Parameters.AddWithValue("@scheduled_time", scheduledTime.UtcDateTime);
Expand All @@ -86,6 +91,8 @@ public async ValueTask PersistAsync(
{
_signal.Notify(scheduledTime);
}

return $"postgres-scheduler:{id}";
}
finally
{
Expand All @@ -94,6 +101,41 @@ public async ValueTask PersistAsync(
}
}

/// <summary>
/// Cancels a scheduled message by deleting it from the store.
/// </summary>
/// <param name="value">The store-specific identifier (GUID) extracted from the scheduling token.</param>
/// <param name="cancellationToken">A token to observe for cancellation.</param>
/// <returns><c>true</c> if the message was cancelled; <c>false</c> if not found or already dispatched.</returns>
public async ValueTask<bool> CancelAsync(string value, CancellationToken cancellationToken)
{
if (!Guid.TryParse(value, out var id))
{
return false;
}

var connection = (NpgsqlConnection)_originalDbContext.Database.GetDbConnection();

if (connection.State != ConnectionState.Open)
{
await connection.OpenAsync(cancellationToken);
}

var transaction = _originalDbContext.Database.CurrentTransaction?.GetDbTransaction() as NpgsqlTransaction;

await using var command = connection.CreateCommand();
command.CommandText = _cancelSql;
if (transaction is not null)
{
command.Transaction = transaction;
}
command.Parameters.AddWithValue("@id", id);
await command.PrepareAsync(cancellationToken);

var result = await command.ExecuteScalarAsync(cancellationToken);
return result is not null and not DBNull;
Comment thread
PascalSenn marked this conversation as resolved.
Outdated
}

private static Guid NewVersion()
{
#if NET9_0_OR_GREATER
Expand Down Expand Up @@ -127,8 +169,9 @@ public static EfCoreScheduledMessageStore Create(Type contextType, string option
var optionsMonitor = services.GetRequiredService<IOptionsMonitor<PostgresScheduledMessageOptions>>();
var options = optionsMonitor.Get(optionsName);
var insertSql = options.Queries.InsertMessage;
var cancelSql = options.Queries.CancelMessage;

return new EfCoreScheduledMessageStore(dbContext, signal, insertSql);
return new EfCoreScheduledMessageStore(dbContext, signal, insertSql, cancelSql);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ internal sealed class ScheduledMessageQueries
/// </summary>
public string UpdateLastError { get; set; } = null!;

/// <summary>
/// Gets or sets the SQL statement to cancel a scheduled message by deleting it if it has not
/// exceeded its maximum delivery attempts.
/// </summary>
public string CancelMessage { get; set; } = null!;

/// <summary>
/// Creates a new <see cref="ScheduledMessageQueries"/> instance with SQL queries built from the provided table metadata.
/// </summary>
Expand Down Expand Up @@ -82,6 +88,13 @@ DELETE FROM {t.QualifiedTableName}
UPDATE {t.QualifiedTableName}
SET "{t.LastError}" = @last_error::jsonb
WHERE "{t.Id}" = @id;
""",

CancelMessage = $"""
DELETE FROM {t.QualifiedTableName}
WHERE "{t.Id}" = @id
AND "{t.TimesSent}" < "{t.MaxAttempts}"
RETURNING "{t.Id}";
"""
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
<ProjectReference Include="..\Mocha.Abstractions\Mocha.Abstractions.csproj" />
<ProjectReference Include="..\Mocha.Mediator\Mocha.Mediator.csproj" />
<ProjectReference Include="..\Mocha.Outbox\Mocha.Outbox.csproj" />
<ProjectReference Include="..\Mocha.Scheduling\Mocha.Scheduling.csproj" />
<ProjectReference Include="..\Mocha\Mocha.csproj" />
</ItemGroup>
<ItemGroup>
Expand Down
3 changes: 0 additions & 3 deletions src/Mocha/src/Mocha.Scheduling/Assembly.cs

This file was deleted.

10 changes: 0 additions & 10 deletions src/Mocha/src/Mocha.Scheduling/Mocha.Scheduling.csproj

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
<ItemGroup>
<ProjectReference Include="..\Mocha.Abstractions\Mocha.Abstractions.csproj" />
<ProjectReference Include="..\Mocha\Mocha.csproj" />
<ProjectReference Include="..\Mocha.Scheduling\Mocha.Scheduling.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Npgsql" />
Expand Down
27 changes: 27 additions & 0 deletions src/Mocha/src/Mocha/Features/ScheduledMessageFeature.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Mocha.Features;

namespace Mocha;

/// <summary>
/// A pooled feature that carries the scheduling token assigned by the scheduled message store
/// back to the message bus after pipeline execution.
/// </summary>
public sealed class ScheduledMessageFeature : IPooledFeature
{
/// <summary>
/// Gets or sets the opaque scheduling token returned by the store after persistence.
/// </summary>
public string? Token { get; set; }

/// <inheritdoc />
public void Initialize(object state)
{
Token = null;
}

/// <inheritdoc />
public void Reset()
{
Token = null;
}
}
65 changes: 65 additions & 0 deletions src/Mocha/src/Mocha/IMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,69 @@ ValueTask<TResponse> RequestAsync<TResponse>(
/// <returns>A task that completes when the reply has been handed off to the transport.</returns>
ValueTask ReplyAsync<TResponse>(TResponse response, ReplyOptions options, CancellationToken cancellationToken)
where TResponse : notnull;

/// <summary>
/// Publishes a message scheduled for delivery at the specified time.
/// </summary>
/// <typeparam name="T">The type of the message to publish.</typeparam>
/// <param name="message">The message instance to publish.</param>
/// <param name="scheduledTime">The absolute time at which the message should be delivered.</param>
/// <param name="cancellationToken">A token to cancel the publish operation.</param>
/// <returns>A scheduling result containing the cancellation token and metadata.</returns>
ValueTask<SchedulingResult> SchedulePublishAsync<T>(
T message,
DateTimeOffset scheduledTime,
CancellationToken cancellationToken) where T : notnull;

/// <summary>
/// Publishes a message scheduled for delivery at the specified time with additional options.
/// </summary>
/// <typeparam name="T">The type of the message to publish.</typeparam>
/// <param name="message">The message instance to publish.</param>
/// <param name="scheduledTime">The absolute time at which the message should be delivered.</param>
/// <param name="options">Options controlling publish behavior such as headers and expiration.</param>
/// <param name="cancellationToken">A token to cancel the publish operation.</param>
/// <returns>A scheduling result containing the cancellation token and metadata.</returns>
ValueTask<SchedulingResult> SchedulePublishAsync<T>(
T message,
DateTimeOffset scheduledTime,
PublishOptions options,
CancellationToken cancellationToken) where T : notnull;

/// <summary>
/// Sends a message scheduled for delivery at the specified time.
/// </summary>
/// <param name="message">The message instance to send.</param>
/// <param name="scheduledTime">The absolute time at which the message should be delivered.</param>
/// <param name="cancellationToken">A token to cancel the send operation.</param>
/// <returns>A scheduling result containing the cancellation token and metadata.</returns>
ValueTask<SchedulingResult> ScheduleSendAsync(
object message,
DateTimeOffset scheduledTime,
CancellationToken cancellationToken);

/// <summary>
/// Sends a message scheduled for delivery at the specified time with additional options.
/// </summary>
/// <param name="message">The message instance to send.</param>
/// <param name="scheduledTime">The absolute time at which the message should be delivered.</param>
/// <param name="options">Options controlling send behavior such as headers and expiration.</param>
/// <param name="cancellationToken">A token to cancel the send operation.</param>
/// <returns>A scheduling result containing the cancellation token and metadata.</returns>
ValueTask<SchedulingResult> ScheduleSendAsync(
object message,
DateTimeOffset scheduledTime,
SendOptions options,
CancellationToken cancellationToken);

/// <summary>
/// Cancels a previously scheduled message. Returns <c>true</c> if the message was cancelled,
/// <c>false</c> if it was already dispatched, already cancelled, or not found.
/// </summary>
/// <param name="token">The opaque scheduling token returned by a prior schedule operation.</param>
/// <param name="cancellationToken">A token to cancel the cancellation operation.</param>
/// <returns><c>true</c> if the scheduled message was cancelled; otherwise <c>false</c>.</returns>
ValueTask<bool> CancelScheduledMessageAsync(
string token,
CancellationToken cancellationToken);
}
39 changes: 0 additions & 39 deletions src/Mocha/src/Mocha/MessageBusSchedulingExtensions.cs

This file was deleted.

Loading
Loading