Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions docs/guide/durability/sqlserver.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,43 @@ opts.ListenToSqlServerQueue("inbound").PollingInterval(2.Seconds());

When not set, the queue falls back to the global `DurabilitySettings.ScheduledJobPollingTime`.

### Optimizing Queue Throughput <Badge type="tip" text="6.16" />

By default the Sql Server queue and scheduled-message tables use a clustered primary key on the
message `id` (a random `Guid`). That layout is simple and safe, but on busy queues with non-trivial
depth it has two costs: inserts land on random pages (causing page splits and fragmentation), and
the dequeue query (`SELECT TOP(n) ... ORDER BY timestamp`) has no supporting index, so each poll
scans and sorts the whole table.

You can opt into a higher-throughput storage layout that mirrors the design used by mature SQL-based
queues (including Wolverine's own NServiceBus interop transport): the tables are **clustered on a
monotonic `seq` identity column** so dequeues are an ordered clustered-index seek and the matching
deletes remove physically contiguous rows, while the message `id` keeps a unique non-clustered index
so duplicate sends still fail fast for idempotency. A filtered index also speeds up the expiry sweep.

```cs
opts.UseSqlServerPersistenceAndTransport(connectionString)
// Use the clustered-identity queue table layout for much higher dequeue
// throughput and far more consistent tail latency on deep queues
.OptimizeQueueThroughput();
```

In Wolverine's own benchmarks this raised batched-drain throughput by well over an order of magnitude
and turned worst-case deep-queue dequeue latency from hundreds of milliseconds into single-digit
milliseconds.

::: warning
This setting changes the physical schema of the queue tables. Enabling it on an **existing** database
causes a one-time table rebuild the next time the schema is applied (drop the clustered primary key,
add the `seq` identity column, and build the new clustered/unique indexes). Queue tables are normally
transient, so this is usually cheap, but you should enable it during a maintenance window — ideally
with the queues drained. New applications can simply turn it on from the start.

This applies to Wolverine's native Sql Server queue tables only. The NServiceBus interoperability
transport already uses the equivalent clustered layout (it must match the NServiceBus on-disk schema),
so no opt-in is needed there.
:::

If you want to use Sql Server as a queueing mechanism between multiple applications, you'll need:

1. To target the same Sql Server database, even if the two applications target different database schemas
Expand Down
214 changes: 214 additions & 0 deletions src/Persistence/SqlServerTests/Transport/transport_perf_benchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
using System.Diagnostics;
using IntegrationTests;
using Microsoft.Data.SqlClient;
using Shouldly;
using Weasel.SqlServer;
using Xunit;
using Xunit.Abstractions;

namespace SqlServerTests.Transport;

// Manual benchmark for the SQL Server queue transport schema decision. Not part of CI.
// Compares three physical table designs head-to-head with raw DDL so the comparison is
// independent of Weasel migrations and Wolverine code:
// A) baseline: clustered PRIMARY KEY on a random GUID, no timestamp index (current)
// B) clustered GUID PK + nonclustered index on timestamp (index-only)
// C) nonclustered GUID PK + clustered bigint IDENTITY, dequeue by seq (redesign)
// Run:
// BENCH_SQLSERVER="Server=localhost,1443;..." dotnet test --filter "FullyQualifiedName~transport_perf_benchmark"
[Trait("Category", "Benchmark")]
public class transport_perf_benchmark
{
private readonly ITestOutputHelper _output;
public transport_perf_benchmark(ITestOutputHelper output) => _output = output;

private static string ConnString =>
Environment.GetEnvironmentVariable("BENCH_SQLSERVER") ?? Servers.SqlServerConnectionString;

private const int BodySize = 250;
private const int InsertCount = 3000;
private const int DrainBatch = 50;
private const int Backlog = 30000;
private const int BacklogPops = 30;

private record Variant(string Name, string Table, string CreateDdl, string OrderBy);

private static Variant[] Variants() =>
[
new("A baseline (clustered GUID, no index)", "bench_a", $@"
CREATE TABLE bench_a (
id uniqueidentifier NOT NULL CONSTRAINT pk_bench_a PRIMARY KEY,
body varbinary(max) NOT NULL,
message_type varchar(250) NOT NULL,
keep_until datetimeoffset NULL,
[timestamp] datetimeoffset NOT NULL DEFAULT SYSDATETIMEOFFSET());", "[timestamp]"),

new("B clustered GUID + timestamp index", "bench_b", $@"
CREATE TABLE bench_b (
id uniqueidentifier NOT NULL CONSTRAINT pk_bench_b PRIMARY KEY,
body varbinary(max) NOT NULL,
message_type varchar(250) NOT NULL,
keep_until datetimeoffset NULL,
[timestamp] datetimeoffset NOT NULL DEFAULT SYSDATETIMEOFFSET());
CREATE INDEX idx_bench_b_timestamp ON bench_b ([timestamp]);", "[timestamp]"),

new("C nonclustered GUID PK + clustered identity seq", "bench_c", $@"
CREATE TABLE bench_c (
id uniqueidentifier NOT NULL CONSTRAINT pk_bench_c PRIMARY KEY NONCLUSTERED,
seq bigint IDENTITY(1,1) NOT NULL,
body varbinary(max) NOT NULL,
message_type varchar(250) NOT NULL,
keep_until datetimeoffset NULL,
[timestamp] datetimeoffset NOT NULL DEFAULT SYSDATETIMEOFFSET());
CREATE UNIQUE CLUSTERED INDEX cidx_bench_c_seq ON bench_c (seq);", "seq")
];

// Validates that the real OptimizeQueueThroughput() table definition provisions through Weasel
// (fresh create) and that a send -> ordered pop roundtrip works against the clustered seq layout.
[Fact]
public async Task verify_optimized_schema_provisions_and_roundtrips()
{
const string schema = "benchopt";
await using (var conn = new SqlConnection(ConnString))
{
await conn.OpenAsync();
await conn.DropSchemaAsync(schema);
}

var transport = new Wolverine.SqlServer.Transport.SqlServerTransport(new Wolverine.RDBMS.DatabaseSettings
{
ConnectionString = ConnString,
SchemaName = schema
})
{
OptimizeQueueThroughput = true
};

var queue = transport.Queues["verify"];
queue.Mode = Wolverine.Configuration.EndpointMode.BufferedInMemory;

await queue.SetupAsync(Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance);

// Clustered index must be on seq, not the id PK.
await using (var conn = new SqlConnection(ConnString))
{
await conn.OpenAsync();
var clusteredCol = (string?)await conn.CreateCommand(
$@"select c.name from sys.indexes i
join sys.index_columns ic on i.object_id=ic.object_id and i.index_id=ic.index_id
join sys.columns c on ic.object_id=c.object_id and ic.column_id=c.column_id
where i.object_id = object_id('{schema}.wolverine_queue_verify') and i.type_desc='CLUSTERED'")
.ExecuteScalarAsync();
clusteredCol.ShouldBe("seq");
}

for (var i = 0; i < 25; i++)
{
var e = Wolverine.ComplianceTests.ObjectMother.Envelope();
await queue.SendAsync(e, CancellationToken.None);
}

var popped = await queue.TryPopAsync(10, Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance, CancellationToken.None);
popped.Count.ShouldBe(10);
(await queue.CountAsync()).ShouldBe(15);

_output.WriteLine("Optimized schema provisioned; clustered on seq; roundtrip OK");
}

[Fact(Skip = "Manual benchmark; run explicitly with BENCH_SQLSERVER set")]
public async Task run()
{
await using var conn = new SqlConnection(ConnString);
await conn.OpenAsync();

foreach (var v in Variants())
{
await execAsync(conn, $"IF OBJECT_ID('{v.Table}') IS NOT NULL DROP TABLE {v.Table};");
await execAsync(conn, v.CreateDdl);

// B1: insert throughput (single connection, mirrors per-message INSERT)
var insertSql = $"insert into {v.Table} (id, body, message_type, keep_until) values (@id, @body, 'bench', NULL)";
var body = new byte[BodySize];
var sw = Stopwatch.StartNew();
for (var i = 0; i < InsertCount; i++)
{
await using var cmd = conn.CreateCommand(insertSql).With("id", Guid.NewGuid()).With("body", body);
cmd.CommandTimeout = 120;
await cmd.ExecuteNonQueryAsync();
}
sw.Stop();
var insertRate = InsertCount / sw.Elapsed.TotalSeconds;

// B2: ordered drain throughput
sw.Restart();
long drained = 0;
while (true)
{
var n = await popAsync(conn, v, DrainBatch);
if (n == 0) break;
drained += n;
}
sw.Stop();
var drainRate = drained / sw.Elapsed.TotalSeconds;

// B3: deep-backlog single-pop latency
await bulkInsertAsync(conn, v, Backlog);
var times = new List<double>();
for (var i = 0; i < BacklogPops; i++)
{
var t = Stopwatch.StartNew();
await popAsync(conn, v, DrainBatch);
t.Stop();
times.Add(t.Elapsed.TotalMilliseconds);
}
times.Sort();

await execAsync(conn, $"DROP TABLE {v.Table};");

_output.WriteLine(
$"{v.Name}\n" +
$" B1 insert: {insertRate:F0} msg/s\n" +
$" B2 drain : {drainRate:F0} msg/s\n" +
$" B3 deep-pop (batch {DrainBatch} over {Backlog}): median {times[times.Count / 2]:F1} ms, " +
$"p95 {times[(int)(times.Count * 0.95)]:F1} ms, max {times[^1]:F1} ms\n");
}
}

private static async Task execAsync(SqlConnection conn, string sql)
{
await using var cmd = conn.CreateCommand(sql);
cmd.CommandTimeout = 120;
await cmd.ExecuteNonQueryAsync();
}

private static async Task<long> popAsync(SqlConnection conn, Variant v, int batch)
{
var sql = $@"
SET NOCOUNT ON;
WITH message AS (
SELECT TOP(@count) body, keep_until
FROM {v.Table} WITH (UPDLOCK, READPAST, ROWLOCK)
ORDER BY {v.OrderBy})
DELETE FROM message OUTPUT deleted.body;";
await using var cmd = conn.CreateCommand(sql).With("count", batch);
cmd.CommandTimeout = 120;
var rows = 0L;
await using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync()) rows++;
return rows;
}

private static async Task bulkInsertAsync(SqlConnection conn, Variant v, int count)
{
var sql = $@"
;WITH n AS (
SELECT TOP (@count) ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) r
FROM sys.all_objects a CROSS JOIN sys.all_objects b)
INSERT INTO {v.Table} (id, body, message_type, keep_until)
SELECT NEWID(), CONVERT(varbinary(max), REPLICATE(CAST('x' AS varchar(max)), @size)), 'bench', NULL
FROM n;";
await using var cmd = conn.CreateCommand(sql).With("count", count).With("size", BodySize);
cmd.CommandTimeout = 120;
await cmd.ExecuteNonQueryAsync();
}
}
43 changes: 42 additions & 1 deletion src/Persistence/Wolverine.SqlServer/Transport/QueueTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,51 @@ internal class QueueTable : Table
public QueueTable(SqlServerTransport transport, string tableName) : base(
new DbObjectName(transport.TransportSchemaName, tableName))
{
AddColumn<Guid>(DatabaseConstants.Id).AsPrimaryKey();
var id = AddColumn<Guid>(DatabaseConstants.Id);
AddColumn(DatabaseConstants.Body, "varbinary(max)").NotNull();
AddColumn(DatabaseConstants.MessageType, "varchar(250)").NotNull();
AddColumn<DateTimeOffset>(DatabaseConstants.KeepUntil);
AddColumn<DateTimeOffset>("timestamp").DefaultValueByExpression("SYSDATETIMEOFFSET()");

if (transport.OptimizeQueueThroughput)
{
// Opt-in high-throughput layout (see OptimizeQueueThroughput()): cluster on a monotonic
// identity so the TOP(n) ... ORDER BY seq dequeue is a clustered seek and the matching
// DELETE removes physically contiguous rows; keep the message id unique (non-clustered)
// so duplicate sends still fail fast for idempotency; and use a filtered index for the
// expiry sweep. Mirrors the proven NServiceBus SQL Server transport layout.
id.NotNull();
AddColumn<long>("seq").AutoNumber().NotNull();

Indexes.Add(new IndexDefinition($"cidx_{tableName}_seq")
{
Columns = ["seq"],
IsClustered = true
});

Indexes.Add(new IndexDefinition($"uidx_{tableName}_id")
{
Columns = [DatabaseConstants.Id],
IsUnique = true
});

Indexes.Add(new IndexDefinition($"idx_{tableName}_keepuntil")
{
Columns = [DatabaseConstants.KeepUntil],
Predicate = $"{DatabaseConstants.KeepUntil} IS NOT NULL"
});
}
else
{
// Default layout: clustered PK on id, plus an index on timestamp so the TOP(n) ...
// ORDER BY timestamp dequeue is a seek rather than a full scan + sort (the clustered PK
// on a random Guid is useless for that ordering). See GH perf review.
id.AsPrimaryKey();

Indexes.Add(new IndexDefinition($"idx_{tableName}_timestamp")
{
Columns = ["timestamp"]
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal class ScheduledMessageTable : Table
public ScheduledMessageTable(SqlServerTransport transport, string tableName) : base(
new DbObjectName(transport.TransportSchemaName, tableName))
{
AddColumn<Guid>(DatabaseConstants.Id).AsPrimaryKey();
var id = AddColumn<Guid>(DatabaseConstants.Id);
AddColumn(DatabaseConstants.Body, "varbinary(max)").NotNull();
AddColumn(DatabaseConstants.MessageType, "varchar(250)").NotNull();
AddColumn<DateTimeOffset>(DatabaseConstants.ExecutionTime).NotNull();
Expand All @@ -21,5 +21,36 @@ public ScheduledMessageTable(SqlServerTransport transport, string tableName) : b
{
Columns = [DatabaseConstants.ExecutionTime]
});

if (transport.OptimizeQueueThroughput)
{
// Match the high-throughput layout of the ready queue table (see OptimizeQueueThroughput()):
// cluster on a monotonic identity, keep the id unique and non-clustered for idempotent
// sends, and use a filtered index for the expiry sweep.
id.NotNull();
AddColumn<long>("seq").AutoNumber().NotNull();

Indexes.Add(new IndexDefinition($"cidx_{tableName}_seq")
{
Columns = ["seq"],
IsClustered = true
});

Indexes.Add(new IndexDefinition($"uidx_{tableName}_id")
{
Columns = [DatabaseConstants.Id],
IsUnique = true
});

Indexes.Add(new IndexDefinition($"idx_{tableName}_keepuntil")
{
Columns = [DatabaseConstants.KeepUntil],
Predicate = $"{DatabaseConstants.KeepUntil} IS NOT NULL"
});
}
else
{
id.AsPrimaryKey();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@ protected override SqlServerSubscriberConfiguration createSubscriberExpression(S
return new SqlServerSubscriberConfiguration(subscriberEndpoint);
}

/// <summary>
/// Opt into a higher-throughput storage layout for the Sql Server queue and scheduled message
/// tables. The tables are clustered on a monotonic identity column (FIFO dequeue + contiguous
/// deletes) with a unique non-clustered index on the message id, rather than a clustered primary
/// key on a random Guid. This dramatically improves dequeue throughput and tail latency on
/// non-trivial queue depths, at the cost of a one-time table rebuild when first enabled on an
/// existing database. New applications should turn this on; existing applications should enable
/// it during a maintenance window (ideally with queues drained).
/// </summary>
public SqlServerPersistenceExpression OptimizeQueueThroughput()
{
Transport.OptimizeQueueThroughput = true;
return this;
}

/// <summary>
/// Disable inbox and outbox usage on all Sql Server Transport endpoints
/// </summary>
Expand Down
Loading
Loading