diff --git a/Directory.Packages.props b/Directory.Packages.props index 2a1d9cb1e..10570c744 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -35,7 +35,7 @@ - + diff --git a/docker-compose.yml b/docker-compose.yml index 9dd9c35d8..b4bc3abe6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -49,13 +49,23 @@ services: - "5672:5672" - "15672:15672" + # SQL Server 2025 is required by Polecat 2.0+ (UseNativeJsonType defaults to true, + # which uses the native `json` column type introduced in SQL Server 2025). + # + # On Apple Silicon Macs, SQL Server 2025 under Rosetta may crash due to memory + # constraints. If you're working locally on ARM and don't need Polecat tests, + # you can override this service to use `mcr.microsoft.com/azure-sql-edge:latest` + # via a docker-compose.override.yml — but Polecat integration tests will fail + # against Azure SQL Edge unless you also set `m.UseNativeJsonType = false` in + # the test fixture. sqlserver: - image: "mcr.microsoft.com/azure-sql-edge:latest" + image: "mcr.microsoft.com/mssql/server:2025-latest" ports: - "1434:1433" environment: - "ACCEPT_EULA=Y" - "SA_PASSWORD=P@55w0rd" + - "MSSQL_SA_PASSWORD=P@55w0rd" - "MSSQL_PID=Developer" pulsar: diff --git a/docs/guide/durability/polecat/operations.md b/docs/guide/durability/polecat/operations.md index 3e2d88830..3fe143f80 100644 --- a/docs/guide/durability/polecat/operations.md +++ b/docs/guide/durability/polecat/operations.md @@ -34,8 +34,74 @@ public static IPolecatOp Pay([Entity] Invoice invoice) } ``` -There are existing Polecat ops for storing, inserting, updating, and deleting a document. There's also a specific -helper for starting a new event stream as shown below: +There are existing Polecat ops for storing, inserting, updating, and deleting a document. + +### Storing Multiple Documents + +Use `PolecatOps.StoreMany()` to store multiple documents of the same type, or `PolecatOps.StoreObjects()` to +store multiple documents of different types in a single side effect: + +```csharp +// Store multiple documents of the same type +public static StoreManyDocs Handle(BatchInvoiceCommand command) +{ + var invoices = command.Items.Select(i => new Invoice { Id = i.Id, Amount = i.Amount }); + return PolecatOps.StoreMany(invoices.ToArray()); +} + +// Store multiple documents of different types +public static StoreObjects Handle(CreateOrderCommand command) +{ + var order = new Order { Id = command.OrderId, Total = command.Total }; + var audit = new AuditLog { Action = "OrderCreated", EntityId = command.OrderId }; + return PolecatOps.StoreObjects(order, audit); +} +``` + +Both `StoreMany()` and `StoreObjects()` support fluent `With()` methods to incrementally add documents: + +```csharp +public static StoreObjects Handle(ComplexCommand command) +{ + return PolecatOps.StoreObjects(new Order { Id = command.OrderId }) + .With(new AuditLog { Action = "Created" }) + .With(new Notification { Message = "Order created" }); +} +``` + +### Tenant-Scoped Operations + +Every `PolecatOps` factory method has an overload that accepts a `tenantId` parameter. When provided, the +operation uses `IDocumentSession.ForTenant(tenantId)` to scope the write to a specific tenant. This is +useful in multi-tenant systems where a handler processing a message for one tenant needs to write data +to a different tenant's storage: + +```csharp +// Store a document in a specific tenant +public static StoreDoc Handle(CreateInvoiceForTenant command) +{ + var invoice = new Invoice { Id = command.InvoiceId, Amount = command.Amount }; + return PolecatOps.Store(invoice, command.TenantId); +} + +// Store many same-type documents in a specific tenant +public static StoreManyDocs Handle(BatchLineItems command) +{ + return PolecatOps.StoreMany(command.TenantId, command.Items.ToArray()); +} + +// Store mixed-type documents in a specific tenant +public static StoreObjects Handle(CrossTenantAudit command) +{ + return PolecatOps.StoreObjects(command.TargetTenantId, + new AuditRecord { Action = command.Action }, + new Notification { Message = command.Message }); +} +``` + +All existing method signatures are unchanged — the tenant overloads are purely additive. + +There's also a specific helper for starting a new event stream as shown below: ```cs public static class TodoListEndpoint diff --git a/src/Persistence/PolecatTests/AggregateHandlerWorkflow/aggregate_handler_workflow.cs b/src/Persistence/PolecatTests/AggregateHandlerWorkflow/aggregate_handler_workflow.cs index 5e1035cb4..0fbb96f46 100644 --- a/src/Persistence/PolecatTests/AggregateHandlerWorkflow/aggregate_handler_workflow.cs +++ b/src/Persistence/PolecatTests/AggregateHandlerWorkflow/aggregate_handler_workflow.cs @@ -98,6 +98,27 @@ public void automatically_adding_stream_id_to_the_audit_members() chain.SourceCode.ShouldContain("System.Diagnostics.Activity.Current?.SetTag(\"letter.aggregate.id\", raiseABC.LetterAggregateId);"); } + [Fact] + public void generates_wolverine_stream_id_otel_tag() + { + // Resolving the handler triggers chain compilation; without this the + // chain's generated SourceCode is null and the assertion below NREs. + // Mirrors the equivalent Marten test (MartenTests/AggregateHandlerWorkflow). + var handler = theHost.GetRuntime().Handlers.HandlerFor(); + var chain = theHost.GetRuntime().Handlers.ChainFor(); + + chain!.SourceCode!.ShouldContain($"SetTag(\"{Wolverine.Runtime.WolverineTracing.StreamId}\""); + } + + [Fact] + public void generates_wolverine_stream_type_otel_tag() + { + var handler = theHost.GetRuntime().Handlers.HandlerFor(); + var chain = theHost.GetRuntime().Handlers.ChainFor(); + + chain!.SourceCode!.ShouldContain($"SetTag(\"{Wolverine.Runtime.WolverineTracing.StreamType}\", \"{typeof(LetterAggregate).FullName}\""); + } + [Fact] public async Task events_then_response_invoke_with_return() { diff --git a/src/Persistence/PolecatTests/PolecatOps_store.cs b/src/Persistence/PolecatTests/PolecatOps_store.cs new file mode 100644 index 000000000..beed57e4a --- /dev/null +++ b/src/Persistence/PolecatTests/PolecatOps_store.cs @@ -0,0 +1,79 @@ +using Shouldly; +using Wolverine.Polecat; + +namespace PolecatTests; + +public record StoreTestDoc1(string Name); +public record StoreTestDoc2(string Label); + +public class PolecatOps_store +{ + [Fact] + public void StoreMany() + { + var op = PolecatOps.StoreMany(new StoreTestDoc1("Test1")); + + op.Documents.Count.ShouldBe(1); + op.Documents[0].ShouldBeOfType(); + + op.With(new StoreTestDoc1("Test2")); + + op.Documents.Count.ShouldBe(2); + + op.With([new StoreTestDoc1("Test3"), new StoreTestDoc1("Test4")]); + + op.Documents.Count.ShouldBe(4); + + op = PolecatOps.StoreMany(new StoreTestDoc1("Test5"), new StoreTestDoc1("Test6")); + + op.Documents.Count.ShouldBe(2); + } + + [Fact] + public void StoreObjects() + { + var op = PolecatOps.StoreObjects(new StoreTestDoc1("Test1")); + + op.Documents.Count.ShouldBe(1); + op.Documents[0].ShouldBeOfType(); + + op.With(new StoreTestDoc2("Test2")); + + op.Documents.Count.ShouldBe(2); + op.Documents[1].ShouldBeOfType(); + + op.With([new StoreTestDoc1("Test3"), new StoreTestDoc2("Test4")]); + + op.Documents.Count.ShouldBe(4); + op.Documents[2].ShouldBeOfType(); + op.Documents[3].ShouldBeOfType(); + + op = PolecatOps.StoreObjects(new StoreTestDoc1("Test5"), new StoreTestDoc2("Test6")); + + op.Documents.Count.ShouldBe(2); + op.Documents[0].ShouldBeOfType(); + op.Documents[1].ShouldBeOfType(); + + op = PolecatOps.StoreObjects([new StoreTestDoc1("Test7"), new StoreTestDoc2("Test8")]); + + op.Documents.Count.ShouldBe(2); + op.Documents[0].ShouldBeOfType(); + op.Documents[1].ShouldBeOfType(); + } + + [Fact] + public void StoreObjects_with_tenantId() + { + var op = PolecatOps.StoreObjects("tenant-1", new StoreTestDoc1("Test1"), new StoreTestDoc2("Test2")); + + op.Documents.Count.ShouldBe(2); + op.TenantId.ShouldBe("tenant-1"); + } + + [Fact] + public void IDocumentsOp_exposes_documents_readonly() + { + IDocumentsOp op = PolecatOps.StoreObjects(new StoreTestDoc1("Test1"), new StoreTestDoc2("Test2")); + op.Documents.Count.ShouldBe(2); + } +} diff --git a/src/Persistence/Wolverine.Polecat/AggregateHandling.cs b/src/Persistence/Wolverine.Polecat/AggregateHandling.cs index 5d7f7d035..7ed024834 100644 --- a/src/Persistence/Wolverine.Polecat/AggregateHandling.cs +++ b/src/Persistence/Wolverine.Polecat/AggregateHandling.cs @@ -41,6 +41,7 @@ public Variable Apply(IChain chain, IServiceContainer container) var loader = new LoadAggregateFrame(this); chain.Middleware.Add(loader); + chain.Middleware.Add(new TagAggregateOtelFrame(AggregateType, AggregateId)); var firstCall = chain.HandlerCalls().First(); diff --git a/src/Persistence/Wolverine.Polecat/Codegen/TagAggregateOtelFrame.cs b/src/Persistence/Wolverine.Polecat/Codegen/TagAggregateOtelFrame.cs new file mode 100644 index 000000000..5c0f8027c --- /dev/null +++ b/src/Persistence/Wolverine.Polecat/Codegen/TagAggregateOtelFrame.cs @@ -0,0 +1,33 @@ +using System.Diagnostics; +using JasperFx.CodeGeneration; +using JasperFx.CodeGeneration.Frames; +using JasperFx.CodeGeneration.Model; +using JasperFx.Core.Reflection; +using Wolverine.Runtime; + +namespace Wolverine.Polecat.Codegen; + +/// +/// Frame that generates code to tag the current OpenTelemetry activity with +/// the aggregate stream ID and aggregate type when processing a Polecat +/// aggregate handler workflow. +/// +internal class TagAggregateOtelFrame : SyncFrame +{ + private readonly Type _aggregateType; + private readonly Variable _aggregateId; + + public TagAggregateOtelFrame(Type aggregateType, Variable aggregateId) + { + _aggregateType = aggregateType; + _aggregateId = aggregateId; + uses.Add(aggregateId); + } + + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + writer.WriteLine($"{typeof(Activity).FullNameInCode()}.{nameof(Activity.Current)}?.{nameof(Activity.SetTag)}(\"{WolverineTracing.StreamId}\", {_aggregateId.Usage}.ToString());"); + writer.WriteLine($"{typeof(Activity).FullNameInCode()}.{nameof(Activity.Current)}?.{nameof(Activity.SetTag)}(\"{WolverineTracing.StreamType}\", \"{_aggregateType.FullName}\");"); + Next?.GenerateCode(method, writer); + } +} diff --git a/src/Persistence/Wolverine.Polecat/IPolecatOp.cs b/src/Persistence/Wolverine.Polecat/IPolecatOp.cs index 75d1093ad..708e53d62 100644 --- a/src/Persistence/Wolverine.Polecat/IPolecatOp.cs +++ b/src/Persistence/Wolverine.Polecat/IPolecatOp.cs @@ -83,6 +83,18 @@ public static StoreManyDocs StoreMany(params T[] documents) where T : notn return new StoreManyDocs(documents); } + /// + /// Return a side effect of storing an enumerable of potentially mixed document types in Polecat + /// + /// + /// + /// + public static StoreObjects StoreObjects(params object[] documents) + { + if (documents == null) throw new ArgumentNullException(nameof(documents)); + return new StoreObjects(documents); + } + public static InsertDoc Insert(T document) where T : notnull { if (document == null) throw new ArgumentNullException(nameof(document)); @@ -174,6 +186,16 @@ public static StoreManyDocs StoreMany(string tenantId, params T[] document return new StoreManyDocs(tenantId, documents); } + /// + /// Return a side effect of storing an enumerable of potentially mixed document types, scoped to a specific tenant + /// + public static StoreObjects StoreObjects(string tenantId, params object[] documents) + { + if (tenantId == null) throw new ArgumentNullException(nameof(tenantId)); + if (documents == null) throw new ArgumentNullException(nameof(documents)); + return new StoreObjects(tenantId, documents); + } + /// /// Return a side effect of inserting the specified document, scoped to a specific tenant /// @@ -385,9 +407,67 @@ public class StoreManyDocs : DocumentsOp where T : notnull public StoreManyDocs(params T[] documents) : base(documents.Cast().ToArray()) { } public StoreManyDocs(IList documents) : this(documents.ToArray()) { } public StoreManyDocs(string tenantId, params T[] documents) : base(tenantId, documents.Cast().ToArray()) { } + + public StoreManyDocs With(T[] documents) + { + Documents.AddRange(documents.Cast()); + return this; + } + + public StoreManyDocs With(T document) + { + Documents.Add(document); + return this; + } + public override void Execute(IDocumentSession session) { ResolveSession(session).Store(Documents.Cast()); } } +public class StoreObjects : DocumentsOp +{ + private static readonly System.Collections.Concurrent.ConcurrentDictionary _storeMethods = new(); + + public StoreObjects(params object[] documents) : base(documents) { } + + public StoreObjects(IList documents) : this(documents.ToArray()) { } + + public StoreObjects(string tenantId, params object[] documents) : base(tenantId, documents) { } + + public StoreObjects With(object[] documents) + { + Documents.AddRange(documents); + return this; + } + + public StoreObjects With(object document) + { + Documents.Add(document); + return this; + } + + public override void Execute(IDocumentSession session) + { + // Polecat does not have a single StoreObjects(IEnumerable) method like Marten, + // so we dispatch each document to Store by its runtime type. + var target = ResolveSession(session); + foreach (var document in Documents) + { + if (document is null) continue; + var docType = document.GetType(); + var method = _storeMethods.GetOrAdd(docType, t => + { + var open = typeof(IDocumentOperations).GetMethods() + .First(m => m.Name == nameof(IDocumentOperations.Store) + && m.IsGenericMethodDefinition + && m.GetParameters().Length == 1 + && !m.GetParameters()[0].ParameterType.IsArray); + return open.MakeGenericMethod(t); + }); + method.Invoke(target, new[] { document }); + } + } +} + public class InsertDoc : DocumentOp where T : notnull { private readonly T _document; @@ -480,7 +560,12 @@ protected IDocumentOperations ResolveSession(IDocumentSession session) public abstract void Execute(IDocumentSession session); } -public abstract class DocumentsOp : IPolecatOp +public interface IDocumentsOp : IPolecatOp +{ + IReadOnlyList Documents { get; } +} + +public abstract class DocumentsOp : IDocumentsOp { public List Documents { get; } = new(); @@ -501,4 +586,6 @@ protected IDocumentOperations ResolveSession(IDocumentSession session) } public abstract void Execute(IDocumentSession session); + + IReadOnlyList IDocumentsOp.Documents => Documents; } diff --git a/src/Persistence/Wolverine.Polecat/WriteAggregateAttribute.cs b/src/Persistence/Wolverine.Polecat/WriteAggregateAttribute.cs index 921ad261d..017707a90 100644 --- a/src/Persistence/Wolverine.Polecat/WriteAggregateAttribute.cs +++ b/src/Persistence/Wolverine.Polecat/WriteAggregateAttribute.cs @@ -62,7 +62,16 @@ public override Variable Modify(IChain chain, ParameterInfo parameter, IServiceC var idProp = aggregateType.GetProperty("Id", BindingFlags.Public | BindingFlags.Instance); var idType = idProp?.PropertyType ?? typeof(Guid); - var identity = FindIdentity(aggregateType, idType, chain); + // If a specific ValueSource has been set (e.g. via FromMethod, FromRoute, FromHeader, FromClaim), + // use the base class identity resolution which respects that ValueSource + Variable? identity = null; + if (ValueSource != ValueSource.InputMember && ArgumentName.IsNotEmpty()) + { + tryFindIdentityVariable(chain, parameter, idType, out identity); + } + + // Fall back to WriteAggregate's standard identity resolution + identity ??= FindIdentity(aggregateType, idType, chain); var isNaturalKey = false; // If standard identity resolution failed, check for natural key support