Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions docs/guide/durability/sagas.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,62 @@ And lastly, Wolverine looks for a public member named `Id` like this one:
public record CompleteOrder(string Id);
```

## Strong-Typed Identifiers <Badge type="tip" text="5.x" />

Wolverine supports strong-typed identifiers (record structs or classes wrapping a primitive) as the saga identity.
The type must expose a `TryParse(string?, out T)` static method so Wolverine can recover the identity from the
envelope header when a message does not carry the ID directly on its body.

```csharp
// Strong-typed ID wrapping Guid
public record struct OrderSagaId(Guid Value)
{
public static OrderSagaId New() => new(Guid.NewGuid());

public static bool TryParse(string? input, out OrderSagaId result)
{
if (Guid.TryParse(input, out var guid))
{
result = new OrderSagaId(guid);
return true;
}
result = default;
return false;
}

public override string ToString() => Value.ToString();
}

public class OrderSaga : Saga
{
public OrderSagaId Id { get; set; }

public static OrderSaga Start(StartOrder cmd)
=> new() { Id = cmd.OrderId };

// Messages that carry the ID on the body work automatically
public void Handle(ShipOrder cmd) { /* ... */ }

// Messages without the ID field read it from the envelope header
public void Handle(OrderTimeout timeout) { /* ... */ }
}

public record StartOrder(OrderSagaId OrderId);
public record ShipOrder(OrderSagaId OrderSagaId);
public record OrderTimeout; // no saga ID field — read from envelope
```

::: tip
When the message type does not expose the saga ID as a field, Wolverine propagates the identity automatically through
the `SagaId` envelope header. The cascaded messages emitted from within a saga handler will have this header set
for you. In your own integration tests you can supply it via `envelope.SagaId = id.ToString()`.
:::

::: warning
Strong-typed identifiers backed by a third-party source-generator (e.g. [StronglyTypedId](https://github.com/andrewlock/StronglyTypedId))
are supported. The generated `TryParse` method on those types satisfies the requirement above.
:::

## Starting a Saga

::: tip
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System.Collections.Concurrent;
using Shouldly;

namespace EfCoreTests;

public class parallel_tenant_initialization_tests
{
[Fact]
public async Task all_tenants_are_initialized()
{
var initialized = new ConcurrentBag<string>();
var tenantIds = Enumerable.Range(1, 20).Select(i => $"tenant{i}").ToList();

await Parallel.ForEachAsync(tenantIds, new ParallelOptions { MaxDegreeOfParallelism = 10 },
async (tenantId, ct) =>
{
await Task.Delay(5, ct);
initialized.Add(tenantId);
});

initialized.Count.ShouldBe(tenantIds.Count);
foreach (var tenantId in tenantIds)
{
initialized.ShouldContain(tenantId);
}
}

[Fact]
public async Task concurrent_initialization_is_faster_than_sequential()
{
const int tenantCount = 10;
const int perTenantDelayMs = 50;

var tenantIds = Enumerable.Range(1, tenantCount).Select(i => $"tenant{i}").ToList();

var sw = System.Diagnostics.Stopwatch.StartNew();
await Parallel.ForEachAsync(tenantIds, new ParallelOptions { MaxDegreeOfParallelism = 10 },
async (tenantId, ct) => await Task.Delay(perTenantDelayMs, ct));
sw.Stop();

// Sequential would take tenantCount * perTenantDelayMs = 500ms minimum.
// Parallel should complete much faster. Allow up to half the sequential time.
sw.ElapsedMilliseconds.ShouldBeLessThan(tenantCount * perTenantDelayMs / 2);
}

[Fact]
public async Task single_tenant_failure_surfaces_as_exception()
{
var tenantIds = Enumerable.Range(1, 5).Select(i => $"tenant{i}").ToList();

await Should.ThrowAsync<Exception>(async () =>
{
await Parallel.ForEachAsync(tenantIds, new ParallelOptions { MaxDegreeOfParallelism = 10 },
async (tenantId, ct) =>
{
await Task.Delay(1, ct);
if (tenantId == "tenant3")
throw new InvalidOperationException($"Database initialization failed for {tenantId}");
});
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protected override IEnumerable<MySqlQueue> endpoints()

public override string SanitizeIdentifier(string identifier)
{
return identifier.Replace('-', '_').ToLower();
return identifier.Replace('-', '_').ToLowerInvariant();
}

protected override MySqlQueue findEndpointByUri(Uri uri)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Globalization;
using IntegrationTests;
using JasperFx.Core;
using Shouldly;
Expand All @@ -20,4 +21,25 @@ public void retrieve_queue_by_uri()
var queue = theTransport.GetOrCreateEndpoint("sqlserver://one".ToUri());
queue.ShouldBeOfType<SqlServerQueue>().Name.ShouldBe("one");
}

// Regression test for https://github.com/JasperFx/wolverine/issues/2472
// Turkish culture maps 'I'.ToLower() to dotless 'ı' instead of 'i',
// which corrupts SQL identifiers when SanitizeIdentifier uses ToLower().
[Fact]
public void sanitize_identifier_is_culture_invariant()
{
var originalCulture = CultureInfo.CurrentCulture;
try
{
CultureInfo.CurrentCulture = new CultureInfo("tr-TR");

// "INCOMING" contains 'I' — Turkish ToLower() would produce "ıncomıng"
theTransport.SanitizeIdentifier("INCOMING").ShouldBe("incoming");
theTransport.SanitizeIdentifier("My-Queue-Name").ShouldBe("my_queue_name");
}
finally
{
CultureInfo.CurrentCulture = originalCulture;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,16 @@ public async Task DeleteAllTenantDatabasesAsync()
public async Task EnsureAllTenantDatabasesCreatedAsync()
{
await _store.Source.RefreshAsync();
foreach (var assignment in _store.Source.AllActiveByTenant())
{
var dbContext = await BuildAsync(assignment.TenantId, CancellationToken.None);
await _serviceProvider.EnsureDatabaseExistsAsync(dbContext);
await using var migration = await _serviceProvider.CreateMigrationAsync(dbContext, CancellationToken.None);
await migration.ExecuteAsync(AutoCreate.CreateOrUpdate, CancellationToken.None);
}
var assignments = _store.Source.AllActiveByTenant().ToList();

await Parallel.ForEachAsync(assignments, new ParallelOptions { MaxDegreeOfParallelism = 10 },
async (assignment, ct) =>
{
var dbContext = await BuildAsync(assignment.TenantId, ct);
await _serviceProvider.EnsureDatabaseExistsAsync(dbContext, ct);
await using var migration = await _serviceProvider.CreateMigrationAsync(dbContext, ct);
await migration.ExecuteAsync(AutoCreate.CreateOrUpdate, ct);
});
}

public async Task ApplyAllChangesToDatabasesAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,16 @@ public async Task DeleteAllTenantDatabasesAsync()
public async Task EnsureAllTenantDatabasesCreatedAsync()
{
await _store.Source.RefreshAsync();
foreach (var assignment in _store.Source.AllActiveByTenant())
{
var dbContext = await BuildAsync(assignment.TenantId, CancellationToken.None);
await _serviceProvider.EnsureDatabaseExistsAsync(dbContext);
await using var migration = await _serviceProvider.CreateMigrationAsync(dbContext, CancellationToken.None);
await migration.ExecuteAsync(AutoCreate.CreateOrUpdate, CancellationToken.None);
}
var assignments = _store.Source.AllActiveByTenant().ToList();

await Parallel.ForEachAsync(assignments, new ParallelOptions { MaxDegreeOfParallelism = 10 },
async (assignment, ct) =>
{
var dbContext = await BuildAsync(assignment.TenantId, ct);
await _serviceProvider.EnsureDatabaseExistsAsync(dbContext, ct);
await using var migration = await _serviceProvider.CreateMigrationAsync(dbContext, ct);
await migration.ExecuteAsync(AutoCreate.CreateOrUpdate, ct);
});
}

private async Task<DbDataSource> findDataSource(string? tenantId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected override IEnumerable<PostgresqlQueue> endpoints()

public override string SanitizeIdentifier(string identifier)
{
return identifier.Replace('-', '_').ToLower();
return identifier.Replace('-', '_').ToLowerInvariant();
}

protected override PostgresqlQueue findEndpointByUri(Uri uri)
Expand Down
4 changes: 2 additions & 2 deletions src/Persistence/Wolverine.RDBMS/Sagas/SagaTableDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ private static string defaultTableName(Type documentType)
nameToAlias = _aliasSanitizer.Replace(documentType.GetPrettyName(), string.Empty).Replace(",", "_");
}

var parts = new List<string> { nameToAlias.ToLower() };
var parts = new List<string> { nameToAlias.ToLowerInvariant() };
if (documentType.IsNested)
{
parts.Insert(0, documentType.DeclaringType!.Name.ToLower());
parts.Insert(0, documentType.DeclaringType!.Name.ToLowerInvariant());
}

return string.Join("_", parts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected override IEnumerable<Endpoint> explicitEndpoints()

public override string SanitizeIdentifier(string identifier)
{
return identifier.Replace('-', '_').ToLower();
return identifier.Replace('-', '_').ToLowerInvariant();
}

protected override SqlServerQueue findEndpointByUri(Uri uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected override IEnumerable<SqliteQueue> endpoints()

public override string SanitizeIdentifier(string identifier)
{
return identifier.Replace('-', '_').ToLower();
return identifier.Replace('-', '_').ToLowerInvariant();
}

protected override SqliteQueue findEndpointByUri(Uri uri)
Expand Down
116 changes: 116 additions & 0 deletions src/Testing/CoreTests/Bugs/Bug_2465_strong_typed_id_saga_codegen.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Persistence.Sagas;
using Wolverine.Tracking;
using Xunit;

namespace CoreTests.Bugs;

// Reproduces https://github.com/JasperFx/wolverine/issues/2465
// Using a saga with a strong-typed ID caused CS0103/CS0246 code gen errors
// because PullSagaIdFromEnvelopeFrame used NameInCode() (bare type name) instead
// of FullNameInCode() (fully qualified name) when emitting TryParse calls for
// message types that carry no saga ID field (envelope path).

public class Bug_2465_strong_typed_id_saga_codegen : IAsyncLifetime
{
private IHost _host = null!;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Discovery.DisableConventionalDiscovery()
.IncludeType(typeof(StrongIdSaga));
}).StartAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public void host_starts_successfully_with_strong_typed_saga_id()
{
// The host starting without exception verifies code generation succeeded.
// StrongIdSagaWork has no saga ID field so PullSagaIdFromEnvelopeFrame
// is used. Before the fix, it emitted `StrongSagaId` (bare) instead of
// `CoreTests.Bugs.StrongSagaId` (fully qualified), causing CS0246.
_host.ShouldNotBeNull();
}

[Fact]
public async Task can_start_saga_with_strong_typed_id()
{
var id = StrongSagaId.New();
await _host.InvokeMessageAndWaitAsync(new StartStrongIdSaga(id));

var persistor = _host.Services.GetRequiredService<InMemorySagaPersistor>();
var saga = persistor.Load<StrongIdSaga>(id);
saga.ShouldNotBeNull();
saga!.Id.ShouldBe(id);
}

[Fact]
public async Task handle_message_via_envelope_saga_id_path()
{
// StartStrongIdSaga.Start() cascades a StrongIdSagaWork message.
// That message has no saga ID field so Wolverine uses PullSagaIdFromEnvelopeFrame
// to parse the saga ID from envelope.SagaId. This is the code path that was broken.
var id = StrongSagaId.New();
await _host.SendMessageAndWaitAsync(new StartStrongIdSaga(id));

var persistor = _host.Services.GetRequiredService<InMemorySagaPersistor>();
var saga = persistor.Load<StrongIdSaga>(id);
saga.ShouldNotBeNull();
saga!.WorkDone.ShouldBeTrue();
}
}

// Strong-typed ID: a record struct wrapping Guid with TryParse, placed in this
// namespace so FullNameInCode() is required to reference it in generated code.
public record struct StrongSagaId(Guid Value)
{
public static StrongSagaId New() => new(Guid.NewGuid());

public static bool TryParse(string? input, out StrongSagaId result)
{
if (Guid.TryParse(input, out var guid))
{
result = new StrongSagaId(guid);
return true;
}

result = default;
return false;
}

public override string ToString() => Value.ToString();
}

public record StartStrongIdSaga(StrongSagaId Id);

// No saga ID field — forces Wolverine to use PullSagaIdFromEnvelopeFrame
public record StrongIdSagaWork;

public class StrongIdSaga : Saga
{
public StrongSagaId Id { get; set; }
public bool WorkDone { get; set; }

// Returns a cascaded StrongIdSagaWork; Wolverine will propagate SagaId on that envelope
public static (StrongIdSaga, StrongIdSagaWork) Start(StartStrongIdSaga cmd)
{
return (new StrongIdSaga { Id = cmd.Id }, new StrongIdSagaWork());
}

// StrongIdSagaWork has no saga ID → uses PullSagaIdFromEnvelopeFrame
public void Handle(StrongIdSagaWork work)
{
WorkDone = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)
{
var typeNameInCode = SagaId.VariableType == typeof(Guid)
? typeof(Guid).FullName
: SagaId.VariableType.NameInCode();
: SagaId.VariableType.FullNameInCode();

writer.Write(
$"if (!{typeNameInCode}.TryParse({_envelope!.Usage}.{nameof(Envelope.SagaId)}, out {typeNameInCode} sagaId)) throw new {typeof(IndeterminateSagaStateIdException).FullName}({_envelope.Usage});");
Expand Down
Loading