Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ const config: UserConfig<DefaultTheme.Config> = {
text: 'Message Handlers', link: '/guide/handlers/', items: [
{text: 'Discovery', link: '/guide/handlers/discovery'},
{text: 'Error Handling', link: '/guide/handlers/error-handling'},
{text: 'Rate Limiting', link: '/guide/handlers/rate-limiting'},
{text: 'Return Values', link: '/guide/handlers/return-values'},
{text: 'Cascading Messages', link: '/guide/handlers/cascading'},
{text: 'Side Effects', link: '/guide/handlers/side-effects'},
Expand Down
59 changes: 59 additions & 0 deletions docs/guide/handlers/rate-limiting.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Rate Limiting

Wolverine can enforce distributed rate limits for message handlers by re-queuing and pausing the listener when limits are exceeded. This is intended for external API usage limits that must be respected across multiple worker nodes.

## Message Type Rate Limits

Use `RateLimit` on a message type policy to set a default limit and optional time-of-day overrides:

```cs
using Wolverine;
using Wolverine.RateLimiting;

opts.Policies.ForMessagesOfType<SendToExternalApi>()
.RateLimit(RateLimit.PerMinute(900), schedule =>
{
schedule.TimeZone = TimeZoneInfo.Utc;
schedule.AddWindow(new TimeOnly(8, 0), new TimeOnly(17, 0), RateLimit.PerMinute(400));
});
```

The middleware enforces the limit before handler execution. If the limit is exceeded, Wolverine re-schedules the message and pauses the listener for the computed delay.

## Endpoint Rate Limits

You can also rate limit an entire listening endpoint:

```cs
using Wolverine;
using Wolverine.RateLimiting;

opts.RateLimitEndpoint(new Uri("rabbitmq://queue/critical"), RateLimit.PerMinute(400));
```

Endpoint limits take precedence over message type limits when both are configured.

## Distributed Store

Rate limiting relies on a shared store. By default, Wolverine registers an in-memory store for tests and local development. For production, register a shared store implementation.

### SQL Server

```cs
using Wolverine;
using Wolverine.SqlServer;

opts.PersistMessagesWithSqlServer(connectionString)
.UseSqlServerRateLimiting();
```

This uses the Wolverine message storage schema by default (same schema as the inbox/outbox tables).

## Scheduling Requirements

Rate limiting re-schedules messages through Wolverine's scheduling pipeline. For external listeners, Wolverine requires durable inboxes to ensure rescheduled messages are persisted correctly.

```cs
opts.ListenToRabbitQueue("critical").UseDurableInbox();
// or: opts.Policies.UseDurableInboxOnAllListeners();
```
111 changes: 111 additions & 0 deletions src/Persistence/SqlServerTests/rate_limiting_storage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using IntegrationTests;
using JasperFx.Core;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Weasel.SqlServer;
using JasperFx.Resources;
using Wolverine;
using Wolverine.ComplianceTests.RateLimiting;
using Wolverine.Persistence.Durability;
using Wolverine.RateLimiting;
using Wolverine.RDBMS;
using Wolverine.RDBMS.Sagas;
using Wolverine.SqlServer;
using Wolverine.SqlServer.Persistence;
using Wolverine.SqlServer.RateLimiting;
using Wolverine.SqlServer.Schema;
using Xunit;

namespace SqlServerTests;

public class rate_limiting_storage : RateLimitStoreCompliance
{
private readonly string _schemaName = $"rate_limits_{Guid.NewGuid():N}";
private IHost? _host;

protected override async Task<IRateLimitStore> BuildStoreAsync()
{
await waitForSqlServerAsync();
using var conn = new SqlConnection(Servers.SqlServerConnectionString);
await conn.OpenAsync();
await conn.DropSchemaAsync(_schemaName);
await conn.CloseAsync();

_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString, _schemaName)
.UseSqlServerRateLimiting();
opts.Services.AddResourceSetupOnStartup();
}).StartAsync();

var settings = new DatabaseSettings
{
ConnectionString = Servers.SqlServerConnectionString,
SchemaName = _schemaName
};

var persistence = new SqlServerMessageStore(settings, new DurabilitySettings(),
NullLogger<SqlServerMessageStore>.Instance, Array.Empty<SagaTableDefinition>());
persistence.AddTable(new RateLimitTable(_schemaName, "wolverine_rate_limits"));
await persistence.RebuildAsync();

return new SqlServerRateLimitStore(settings, new SqlServerRateLimitOptions { SchemaName = _schemaName });
}

protected override async Task DisposeStoreAsync(IRateLimitStore store)
{
if (_host != null)
{
await _host.StopAsync();
_host.Dispose();
}
}

private static async Task waitForSqlServerAsync()
{
const int maxAttempts = 15;
var delay = TimeSpan.FromSeconds(1);
for (var attempt = 1; attempt <= maxAttempts; attempt++)
{
try
{
await using var conn = new SqlConnection(Servers.SqlServerConnectionString);
await conn.OpenAsync();
await conn.CloseAsync();
return;
}
catch (SqlException) when (attempt < maxAttempts)
{
await Task.Delay(delay);
}
}
}

[Fact]
public async Task creates_rate_limit_table_on_startup()
{
using var conn = new SqlConnection(Servers.SqlServerConnectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText =
"SELECT table_schema, table_name FROM information_schema.tables WHERE table_schema = @schema AND table_name = @name";
cmd.Parameters.Add(new SqlParameter("@schema", _schemaName));
cmd.Parameters.Add(new SqlParameter("@name", "wolverine_rate_limits"));

var found = false;
await using (var reader = await cmd.ExecuteReaderAsync())
{
if (await reader.ReadAsync())
{
found = true;
}
}

await conn.CloseAsync();

found.ShouldBeTrue();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Data.Common;
using System.Data.Common;
using JasperFx;
using JasperFx.Core;
using JasperFx.MultiTenancy;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Data;
using System.Data;
using System.Data.Common;
using ImTools;
using JasperFx;
Expand Down Expand Up @@ -519,6 +519,11 @@ public override IEnumerable<ISchemaObject> AllObjects()
}
}

public void AddTable(Table table)
{
_externalTables.Add(table);
}

public override IDatabaseSagaSchema<TId, TSaga> SagaSchemaFor<TSaga, TId>()
{
if (_sagaStorage.TryFind(typeof(TSaga), out var raw))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System.Data;
using Microsoft.Data.SqlClient;
using Weasel.Core;
using Wolverine.RateLimiting;
using Wolverine.RDBMS;
using Wolverine.SqlServer.Schema;

namespace Wolverine.SqlServer.RateLimiting;

public sealed class SqlServerRateLimitOptions
{
public string? SchemaName { get; set; }
public string TableName { get; set; } = "wolverine_rate_limits";
}

public sealed class SqlServerRateLimitStore : IRateLimitStore
{
private readonly string _connectionString;
private readonly string _qualifiedTable;
private readonly SqlServerRateLimitOptions _options;

public SqlServerRateLimitStore(DatabaseSettings settings, SqlServerRateLimitOptions options)
{
_options = options;
_connectionString = settings.ConnectionString ?? throw new InvalidOperationException("Connection string is required.");

var schemaName = _options.SchemaName ?? settings.SchemaName ?? "dbo";
_qualifiedTable = new DbObjectName(schemaName, _options.TableName).QualifiedName;
}

public async ValueTask<RateLimitStoreResult> TryAcquireAsync(RateLimitStoreRequest request,
CancellationToken cancellationToken)
{
await using var conn = new SqlConnection(_connectionString);
await conn.OpenAsync(cancellationToken).ConfigureAwait(false);

await using var cmd = conn.CreateCommand();
cmd.CommandText = $@"
MERGE {_qualifiedTable} WITH (HOLDLOCK) AS target
USING (SELECT @key AS {RateLimitTableColumns.Key}, @windowStart AS {RateLimitTableColumns.WindowStart}) AS source
ON target.{RateLimitTableColumns.Key} = source.{RateLimitTableColumns.Key}
AND target.{RateLimitTableColumns.WindowStart} = source.{RateLimitTableColumns.WindowStart}
WHEN MATCHED THEN
UPDATE SET {RateLimitTableColumns.CurrentCount} = target.{RateLimitTableColumns.CurrentCount} + @quantity,
{RateLimitTableColumns.WindowEnd} = @windowEnd,
{RateLimitTableColumns.Limit} = @limitPerWindow
WHEN NOT MATCHED THEN
INSERT ({RateLimitTableColumns.Key}, {RateLimitTableColumns.WindowStart}, {RateLimitTableColumns.WindowEnd}, {RateLimitTableColumns.Limit}, {RateLimitTableColumns.CurrentCount})
VALUES (@key, @windowStart, @windowEnd, @limitPerWindow, @quantity)
OUTPUT inserted.{RateLimitTableColumns.CurrentCount};
";

cmd.Parameters.Add(new SqlParameter("@key", SqlDbType.VarChar, 500) { Value = request.Key });
cmd.Parameters.Add(new SqlParameter("@windowStart", SqlDbType.DateTimeOffset) { Value = request.Bucket.WindowStart });
cmd.Parameters.Add(new SqlParameter("@windowEnd", SqlDbType.DateTimeOffset) { Value = request.Bucket.WindowEnd });
cmd.Parameters.Add(new SqlParameter("@limitPerWindow", SqlDbType.Int) { Value = request.Bucket.Limit });
cmd.Parameters.Add(new SqlParameter("@quantity", SqlDbType.Int) { Value = request.Quantity });

var current = (int)await cmd.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
var allowed = current <= request.Bucket.Limit;

return new RateLimitStoreResult(allowed, current);
}
}
25 changes: 25 additions & 0 deletions src/Persistence/Wolverine.SqlServer/Schema/RateLimitTable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using Weasel.Core;
using Weasel.SqlServer.Tables;

namespace Wolverine.SqlServer.Schema;

internal static class RateLimitTableColumns
{
public const string Key = "rate_limit_key";
public const string WindowStart = "window_start";
public const string WindowEnd = "window_end";
public const string Limit = "limit_per_window";
public const string CurrentCount = "current_count";
}

internal class RateLimitTable : Table
{
public RateLimitTable(string schemaName, string tableName) : base(new DbObjectName(schemaName, tableName))
{
AddColumn(RateLimitTableColumns.Key, "varchar(500)").NotNull().AsPrimaryKey();
AddColumn<DateTimeOffset>(RateLimitTableColumns.WindowStart).NotNull().AsPrimaryKey();
AddColumn<DateTimeOffset>(RateLimitTableColumns.WindowEnd).NotNull();
AddColumn<int>(RateLimitTableColumns.Limit).NotNull();
AddColumn<int>(RateLimitTableColumns.CurrentCount).NotNull();
}
}
29 changes: 27 additions & 2 deletions src/Persistence/Wolverine.SqlServer/SqlServerBackedPersistence.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Data.Common;
using System.Data.Common;
using JasperFx;
using JasperFx.CodeGeneration.Model;
using JasperFx.Core;
Expand Down Expand Up @@ -112,12 +112,20 @@ public interface ISqlServerBackedPersistence
internal class SqlServerBackedPersistence : IWolverineExtension, ISqlServerBackedPersistence
{
private readonly WolverineOptions _options;
private readonly List<Action<SqlServerMessageStore>> _storeConfigurations = new();

public SqlServerBackedPersistence(WolverineOptions options)
{
_options = options;
}

internal WolverineOptions Options => _options;

internal void AddStoreConfiguration(Action<SqlServerMessageStore> configuration)
{
_storeConfigurations.Add(configuration);
}

public string? ConnectionString { get; set; }

public string EnvelopeStorageSchemaName { get; set; } = "wolverine";
Expand Down Expand Up @@ -194,6 +202,7 @@ public IMessageStore BuildMessageStore(IWolverineRuntime runtime)
{
var defaultStore = new SqlServerMessageStore(settings, runtime.DurabilitySettings,
logger, sagaTables);
applyStoreConfigurations(defaultStore);

ConnectionStringTenancy = new MasterTenantSource(defaultStore, runtime.Options);

Expand All @@ -205,15 +214,31 @@ public IMessageStore BuildMessageStore(IWolverineRuntime runtime)
{
var defaultStore = new SqlServerMessageStore(settings, runtime.DurabilitySettings,
logger, sagaTables);
applyStoreConfigurations(defaultStore);

return new MultiTenantedMessageStore(defaultStore, runtime,
new SqlServerTenantedMessageStore(runtime, this, sagaTables){DataSource = ConnectionStringTenancy});
}

settings.Role = Role;

return new SqlServerMessageStore(settings, runtime.DurabilitySettings,
var store = new SqlServerMessageStore(settings, runtime.DurabilitySettings,
logger, sagaTables);
applyStoreConfigurations(store);
return store;
}

internal void ApplyStoreConfigurations(SqlServerMessageStore store)
{
applyStoreConfigurations(store);
}

private void applyStoreConfigurations(SqlServerMessageStore store)
{
foreach (var configuration in _storeConfigurations)
{
configuration(store);
}
}

private DatabaseSettings buildMainDatabaseSettings()
Expand Down
Loading
Loading