From 4fba997c068d60561dfa48f51f3845783a024812 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Sun, 26 Apr 2026 21:05:28 -0500 Subject: [PATCH 1/3] =?UTF-8?q?Port=20recent=20Wolverine.Marten=20changes?= =?UTF-8?q?=20to=20Wolverine.Polecat=20+=20bump=20Polecat=201.6.1=20?= =?UTF-8?q?=E2=86=92=202.1.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Brings Wolverine.Polecat to parity with several improvements that landed in Wolverine.Marten over the past few weeks. All ports preserve existing public surface — additions only. ## Changes **Polecat NuGet 1.6.1 → 2.1.0** (Directory.Packages.props) **Wolverine.Polecat** - WriteAggregateAttribute.Modify() now respects an explicitly set ValueSource (e.g. via FromMethod/FromRoute/FromHeader/FromClaim) when resolving the aggregate id, falling back to the existing FindIdentity() logic. Mirrors Wolverine.Marten#2451 fix. - PolecatOps.StoreObjects(...) for storing mixed document types as a single side effect. Polecat's IDocumentOperations doesn't expose StoreObjects, so the operation dispatches each document by runtime type to the generic Store(document) method via cached MethodInfo. - New IDocumentsOp interface mirroring Marten's IDocumentsOp; DocumentsOp now implements it. - Fluent .With(document) / .With(documents[]) on StoreManyDocs and StoreObjects for incremental document accumulation. - Tenant-scoped StoreObjects(string tenantId, params object[]) overload. - New Codegen/TagAggregateOtelFrame injected by AggregateHandling.Apply() to emit wolverine.stream.id and wolverine.stream.type Activity tags for every Polecat aggregate handler workflow. Mirrors Wolverine.Marten#2470 with the value-type aggregate id codegen fix from 5.30.0. **PolecatTests** - New PolecatOps_store.cs covering StoreMany/.With/StoreObjects/.With + tenant-scoped overload + IDocumentsOp surface. - aggregate_handler_workflow gains generates_wolverine_stream_id_otel_tag and generates_wolverine_stream_type_otel_tag codegen assertions. **Docs** - docs/guide/durability/polecat/operations.md gains "Storing Multiple Documents" and "Tenant-Scoped Operations" sections mirroring the Marten operations docs. Co-Authored-By: Claude Opus 4.7 (1M context) --- Directory.Packages.props | 2 +- docs/guide/durability/polecat/operations.md | 70 ++++++++++++++- .../aggregate_handler_workflow.cs | 16 ++++ .../PolecatTests/PolecatOps_store.cs | 79 ++++++++++++++++ .../Wolverine.Polecat/AggregateHandling.cs | 1 + .../Codegen/TagAggregateOtelFrame.cs | 33 +++++++ .../Wolverine.Polecat/IPolecatOp.cs | 89 ++++++++++++++++++- .../WriteAggregateAttribute.cs | 11 ++- 8 files changed, 296 insertions(+), 5 deletions(-) create mode 100644 src/Persistence/PolecatTests/PolecatOps_store.cs create mode 100644 src/Persistence/Wolverine.Polecat/Codegen/TagAggregateOtelFrame.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index 2a1d9cb1e..10570c744 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -35,7 +35,7 @@ - + diff --git a/docs/guide/durability/polecat/operations.md b/docs/guide/durability/polecat/operations.md index 3e2d88830..3fe143f80 100644 --- a/docs/guide/durability/polecat/operations.md +++ b/docs/guide/durability/polecat/operations.md @@ -34,8 +34,74 @@ public static IPolecatOp Pay([Entity] Invoice invoice) } ``` -There are existing Polecat ops for storing, inserting, updating, and deleting a document. There's also a specific -helper for starting a new event stream as shown below: +There are existing Polecat ops for storing, inserting, updating, and deleting a document. + +### Storing Multiple Documents + +Use `PolecatOps.StoreMany()` to store multiple documents of the same type, or `PolecatOps.StoreObjects()` to +store multiple documents of different types in a single side effect: + +```csharp +// Store multiple documents of the same type +public static StoreManyDocs Handle(BatchInvoiceCommand command) +{ + var invoices = command.Items.Select(i => new Invoice { Id = i.Id, Amount = i.Amount }); + return PolecatOps.StoreMany(invoices.ToArray()); +} + +// Store multiple documents of different types +public static StoreObjects Handle(CreateOrderCommand command) +{ + var order = new Order { Id = command.OrderId, Total = command.Total }; + var audit = new AuditLog { Action = "OrderCreated", EntityId = command.OrderId }; + return PolecatOps.StoreObjects(order, audit); +} +``` + +Both `StoreMany()` and `StoreObjects()` support fluent `With()` methods to incrementally add documents: + +```csharp +public static StoreObjects Handle(ComplexCommand command) +{ + return PolecatOps.StoreObjects(new Order { Id = command.OrderId }) + .With(new AuditLog { Action = "Created" }) + .With(new Notification { Message = "Order created" }); +} +``` + +### Tenant-Scoped Operations + +Every `PolecatOps` factory method has an overload that accepts a `tenantId` parameter. When provided, the +operation uses `IDocumentSession.ForTenant(tenantId)` to scope the write to a specific tenant. This is +useful in multi-tenant systems where a handler processing a message for one tenant needs to write data +to a different tenant's storage: + +```csharp +// Store a document in a specific tenant +public static StoreDoc Handle(CreateInvoiceForTenant command) +{ + var invoice = new Invoice { Id = command.InvoiceId, Amount = command.Amount }; + return PolecatOps.Store(invoice, command.TenantId); +} + +// Store many same-type documents in a specific tenant +public static StoreManyDocs Handle(BatchLineItems command) +{ + return PolecatOps.StoreMany(command.TenantId, command.Items.ToArray()); +} + +// Store mixed-type documents in a specific tenant +public static StoreObjects Handle(CrossTenantAudit command) +{ + return PolecatOps.StoreObjects(command.TargetTenantId, + new AuditRecord { Action = command.Action }, + new Notification { Message = command.Message }); +} +``` + +All existing method signatures are unchanged — the tenant overloads are purely additive. + +There's also a specific helper for starting a new event stream as shown below: ```cs public static class TodoListEndpoint diff --git a/src/Persistence/PolecatTests/AggregateHandlerWorkflow/aggregate_handler_workflow.cs b/src/Persistence/PolecatTests/AggregateHandlerWorkflow/aggregate_handler_workflow.cs index 5e1035cb4..8334f45a4 100644 --- a/src/Persistence/PolecatTests/AggregateHandlerWorkflow/aggregate_handler_workflow.cs +++ b/src/Persistence/PolecatTests/AggregateHandlerWorkflow/aggregate_handler_workflow.cs @@ -98,6 +98,22 @@ public void automatically_adding_stream_id_to_the_audit_members() chain.SourceCode.ShouldContain("System.Diagnostics.Activity.Current?.SetTag(\"letter.aggregate.id\", raiseABC.LetterAggregateId);"); } + [Fact] + public void generates_wolverine_stream_id_otel_tag() + { + var chain = theHost.GetRuntime().Handlers.ChainFor(); + + chain!.SourceCode!.ShouldContain($"SetTag(\"{Wolverine.Runtime.WolverineTracing.StreamId}\""); + } + + [Fact] + public void generates_wolverine_stream_type_otel_tag() + { + var chain = theHost.GetRuntime().Handlers.ChainFor(); + + chain!.SourceCode!.ShouldContain($"SetTag(\"{Wolverine.Runtime.WolverineTracing.StreamType}\", \"{typeof(LetterAggregate).FullName}\""); + } + [Fact] public async Task events_then_response_invoke_with_return() { diff --git a/src/Persistence/PolecatTests/PolecatOps_store.cs b/src/Persistence/PolecatTests/PolecatOps_store.cs new file mode 100644 index 000000000..beed57e4a --- /dev/null +++ b/src/Persistence/PolecatTests/PolecatOps_store.cs @@ -0,0 +1,79 @@ +using Shouldly; +using Wolverine.Polecat; + +namespace PolecatTests; + +public record StoreTestDoc1(string Name); +public record StoreTestDoc2(string Label); + +public class PolecatOps_store +{ + [Fact] + public void StoreMany() + { + var op = PolecatOps.StoreMany(new StoreTestDoc1("Test1")); + + op.Documents.Count.ShouldBe(1); + op.Documents[0].ShouldBeOfType(); + + op.With(new StoreTestDoc1("Test2")); + + op.Documents.Count.ShouldBe(2); + + op.With([new StoreTestDoc1("Test3"), new StoreTestDoc1("Test4")]); + + op.Documents.Count.ShouldBe(4); + + op = PolecatOps.StoreMany(new StoreTestDoc1("Test5"), new StoreTestDoc1("Test6")); + + op.Documents.Count.ShouldBe(2); + } + + [Fact] + public void StoreObjects() + { + var op = PolecatOps.StoreObjects(new StoreTestDoc1("Test1")); + + op.Documents.Count.ShouldBe(1); + op.Documents[0].ShouldBeOfType(); + + op.With(new StoreTestDoc2("Test2")); + + op.Documents.Count.ShouldBe(2); + op.Documents[1].ShouldBeOfType(); + + op.With([new StoreTestDoc1("Test3"), new StoreTestDoc2("Test4")]); + + op.Documents.Count.ShouldBe(4); + op.Documents[2].ShouldBeOfType(); + op.Documents[3].ShouldBeOfType(); + + op = PolecatOps.StoreObjects(new StoreTestDoc1("Test5"), new StoreTestDoc2("Test6")); + + op.Documents.Count.ShouldBe(2); + op.Documents[0].ShouldBeOfType(); + op.Documents[1].ShouldBeOfType(); + + op = PolecatOps.StoreObjects([new StoreTestDoc1("Test7"), new StoreTestDoc2("Test8")]); + + op.Documents.Count.ShouldBe(2); + op.Documents[0].ShouldBeOfType(); + op.Documents[1].ShouldBeOfType(); + } + + [Fact] + public void StoreObjects_with_tenantId() + { + var op = PolecatOps.StoreObjects("tenant-1", new StoreTestDoc1("Test1"), new StoreTestDoc2("Test2")); + + op.Documents.Count.ShouldBe(2); + op.TenantId.ShouldBe("tenant-1"); + } + + [Fact] + public void IDocumentsOp_exposes_documents_readonly() + { + IDocumentsOp op = PolecatOps.StoreObjects(new StoreTestDoc1("Test1"), new StoreTestDoc2("Test2")); + op.Documents.Count.ShouldBe(2); + } +} diff --git a/src/Persistence/Wolverine.Polecat/AggregateHandling.cs b/src/Persistence/Wolverine.Polecat/AggregateHandling.cs index 5d7f7d035..7ed024834 100644 --- a/src/Persistence/Wolverine.Polecat/AggregateHandling.cs +++ b/src/Persistence/Wolverine.Polecat/AggregateHandling.cs @@ -41,6 +41,7 @@ public Variable Apply(IChain chain, IServiceContainer container) var loader = new LoadAggregateFrame(this); chain.Middleware.Add(loader); + chain.Middleware.Add(new TagAggregateOtelFrame(AggregateType, AggregateId)); var firstCall = chain.HandlerCalls().First(); diff --git a/src/Persistence/Wolverine.Polecat/Codegen/TagAggregateOtelFrame.cs b/src/Persistence/Wolverine.Polecat/Codegen/TagAggregateOtelFrame.cs new file mode 100644 index 000000000..5c0f8027c --- /dev/null +++ b/src/Persistence/Wolverine.Polecat/Codegen/TagAggregateOtelFrame.cs @@ -0,0 +1,33 @@ +using System.Diagnostics; +using JasperFx.CodeGeneration; +using JasperFx.CodeGeneration.Frames; +using JasperFx.CodeGeneration.Model; +using JasperFx.Core.Reflection; +using Wolverine.Runtime; + +namespace Wolverine.Polecat.Codegen; + +/// +/// Frame that generates code to tag the current OpenTelemetry activity with +/// the aggregate stream ID and aggregate type when processing a Polecat +/// aggregate handler workflow. +/// +internal class TagAggregateOtelFrame : SyncFrame +{ + private readonly Type _aggregateType; + private readonly Variable _aggregateId; + + public TagAggregateOtelFrame(Type aggregateType, Variable aggregateId) + { + _aggregateType = aggregateType; + _aggregateId = aggregateId; + uses.Add(aggregateId); + } + + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + writer.WriteLine($"{typeof(Activity).FullNameInCode()}.{nameof(Activity.Current)}?.{nameof(Activity.SetTag)}(\"{WolverineTracing.StreamId}\", {_aggregateId.Usage}.ToString());"); + writer.WriteLine($"{typeof(Activity).FullNameInCode()}.{nameof(Activity.Current)}?.{nameof(Activity.SetTag)}(\"{WolverineTracing.StreamType}\", \"{_aggregateType.FullName}\");"); + Next?.GenerateCode(method, writer); + } +} diff --git a/src/Persistence/Wolverine.Polecat/IPolecatOp.cs b/src/Persistence/Wolverine.Polecat/IPolecatOp.cs index 75d1093ad..708e53d62 100644 --- a/src/Persistence/Wolverine.Polecat/IPolecatOp.cs +++ b/src/Persistence/Wolverine.Polecat/IPolecatOp.cs @@ -83,6 +83,18 @@ public static StoreManyDocs StoreMany(params T[] documents) where T : notn return new StoreManyDocs(documents); } + /// + /// Return a side effect of storing an enumerable of potentially mixed document types in Polecat + /// + /// + /// + /// + public static StoreObjects StoreObjects(params object[] documents) + { + if (documents == null) throw new ArgumentNullException(nameof(documents)); + return new StoreObjects(documents); + } + public static InsertDoc Insert(T document) where T : notnull { if (document == null) throw new ArgumentNullException(nameof(document)); @@ -174,6 +186,16 @@ public static StoreManyDocs StoreMany(string tenantId, params T[] document return new StoreManyDocs(tenantId, documents); } + /// + /// Return a side effect of storing an enumerable of potentially mixed document types, scoped to a specific tenant + /// + public static StoreObjects StoreObjects(string tenantId, params object[] documents) + { + if (tenantId == null) throw new ArgumentNullException(nameof(tenantId)); + if (documents == null) throw new ArgumentNullException(nameof(documents)); + return new StoreObjects(tenantId, documents); + } + /// /// Return a side effect of inserting the specified document, scoped to a specific tenant /// @@ -385,9 +407,67 @@ public class StoreManyDocs : DocumentsOp where T : notnull public StoreManyDocs(params T[] documents) : base(documents.Cast().ToArray()) { } public StoreManyDocs(IList documents) : this(documents.ToArray()) { } public StoreManyDocs(string tenantId, params T[] documents) : base(tenantId, documents.Cast().ToArray()) { } + + public StoreManyDocs With(T[] documents) + { + Documents.AddRange(documents.Cast()); + return this; + } + + public StoreManyDocs With(T document) + { + Documents.Add(document); + return this; + } + public override void Execute(IDocumentSession session) { ResolveSession(session).Store(Documents.Cast()); } } +public class StoreObjects : DocumentsOp +{ + private static readonly System.Collections.Concurrent.ConcurrentDictionary _storeMethods = new(); + + public StoreObjects(params object[] documents) : base(documents) { } + + public StoreObjects(IList documents) : this(documents.ToArray()) { } + + public StoreObjects(string tenantId, params object[] documents) : base(tenantId, documents) { } + + public StoreObjects With(object[] documents) + { + Documents.AddRange(documents); + return this; + } + + public StoreObjects With(object document) + { + Documents.Add(document); + return this; + } + + public override void Execute(IDocumentSession session) + { + // Polecat does not have a single StoreObjects(IEnumerable) method like Marten, + // so we dispatch each document to Store by its runtime type. + var target = ResolveSession(session); + foreach (var document in Documents) + { + if (document is null) continue; + var docType = document.GetType(); + var method = _storeMethods.GetOrAdd(docType, t => + { + var open = typeof(IDocumentOperations).GetMethods() + .First(m => m.Name == nameof(IDocumentOperations.Store) + && m.IsGenericMethodDefinition + && m.GetParameters().Length == 1 + && !m.GetParameters()[0].ParameterType.IsArray); + return open.MakeGenericMethod(t); + }); + method.Invoke(target, new[] { document }); + } + } +} + public class InsertDoc : DocumentOp where T : notnull { private readonly T _document; @@ -480,7 +560,12 @@ protected IDocumentOperations ResolveSession(IDocumentSession session) public abstract void Execute(IDocumentSession session); } -public abstract class DocumentsOp : IPolecatOp +public interface IDocumentsOp : IPolecatOp +{ + IReadOnlyList Documents { get; } +} + +public abstract class DocumentsOp : IDocumentsOp { public List Documents { get; } = new(); @@ -501,4 +586,6 @@ protected IDocumentOperations ResolveSession(IDocumentSession session) } public abstract void Execute(IDocumentSession session); + + IReadOnlyList IDocumentsOp.Documents => Documents; } diff --git a/src/Persistence/Wolverine.Polecat/WriteAggregateAttribute.cs b/src/Persistence/Wolverine.Polecat/WriteAggregateAttribute.cs index 921ad261d..017707a90 100644 --- a/src/Persistence/Wolverine.Polecat/WriteAggregateAttribute.cs +++ b/src/Persistence/Wolverine.Polecat/WriteAggregateAttribute.cs @@ -62,7 +62,16 @@ public override Variable Modify(IChain chain, ParameterInfo parameter, IServiceC var idProp = aggregateType.GetProperty("Id", BindingFlags.Public | BindingFlags.Instance); var idType = idProp?.PropertyType ?? typeof(Guid); - var identity = FindIdentity(aggregateType, idType, chain); + // If a specific ValueSource has been set (e.g. via FromMethod, FromRoute, FromHeader, FromClaim), + // use the base class identity resolution which respects that ValueSource + Variable? identity = null; + if (ValueSource != ValueSource.InputMember && ArgumentName.IsNotEmpty()) + { + tryFindIdentityVariable(chain, parameter, idType, out identity); + } + + // Fall back to WriteAggregate's standard identity resolution + identity ??= FindIdentity(aggregateType, idType, chain); var isNaturalKey = false; // If standard identity resolution failed, check for natural key support From 1e904ff8629d9804376b61e25ef3b5766666ac05 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Sun, 26 Apr 2026 21:53:51 -0500 Subject: [PATCH 2/3] Bump CI sqlserver image to mssql/server:2025-latest for Polecat 2.x MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Polecat 2.0 changed the default JSON storage to use SQL Server 2025's native `json` column type (StoreOptions.UseNativeJsonType defaults to true). The previous Azure SQL Edge image only emulates SQL Server 2022, which fails Polecat schema migrations with: Microsoft.Data.SqlClient.SqlException : Column, parameter, or variable #6: Cannot find data type json. This was causing every PolecatTests integration test to fail in CI. Switch the shared docker-compose sqlserver service to the official mcr.microsoft.com/mssql/server:2025-latest image (matches Polecat's own docker-compose recommendation) and add MSSQL_SA_PASSWORD alongside the legacy SA_PASSWORD env var for forward compatibility. Existing Wolverine.SqlServer / RDBMS tests continue to work — SQL Server 2025 is backward-compatible at the schema level. Documents the Apple Silicon caveat in a comment: ARM Mac users may want to override locally via docker-compose.override.yml + UseNativeJsonType = false on test fixtures, since SQL Server 2025 under Rosetta can be flaky on memory-constrained machines. Co-Authored-By: Claude Opus 4.7 (1M context) --- docker-compose.yml | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 9dd9c35d8..b4bc3abe6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -49,13 +49,23 @@ services: - "5672:5672" - "15672:15672" + # SQL Server 2025 is required by Polecat 2.0+ (UseNativeJsonType defaults to true, + # which uses the native `json` column type introduced in SQL Server 2025). + # + # On Apple Silicon Macs, SQL Server 2025 under Rosetta may crash due to memory + # constraints. If you're working locally on ARM and don't need Polecat tests, + # you can override this service to use `mcr.microsoft.com/azure-sql-edge:latest` + # via a docker-compose.override.yml — but Polecat integration tests will fail + # against Azure SQL Edge unless you also set `m.UseNativeJsonType = false` in + # the test fixture. sqlserver: - image: "mcr.microsoft.com/azure-sql-edge:latest" + image: "mcr.microsoft.com/mssql/server:2025-latest" ports: - "1434:1433" environment: - "ACCEPT_EULA=Y" - "SA_PASSWORD=P@55w0rd" + - "MSSQL_SA_PASSWORD=P@55w0rd" - "MSSQL_PID=Developer" pulsar: From ca7bbd8f4924a03130e4b6fd4830047b4e498927 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 27 Apr 2026 13:11:09 -0500 Subject: [PATCH 3/3] Fix CI: trigger chain compilation in OTEL tag tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The two `generates_wolverine_stream_*_otel_tag` tests on the Polecat port were failing in CI with `NullReferenceException` from `Shouldly.StringHelpers.Clip` because `chain.SourceCode` was null — the chain hadn't been compiled yet. The Marten originals these tests were ported from call `theHost.GetRuntime().Handlers.HandlerFor()` first to trigger compilation, then `ChainFor()` to inspect the resulting chain. That setup line was dropped during the port. Restore it. Companion `automatically_adding_stream_id_to_the_audit_members` test (which was passing) followed the same pattern incidentally because it accesses `chain.AuditedMembers` which triggers compilation as a side effect. The SQL Server image is already on `mssql/server:2025-latest` (commit 1e904ff8 on this branch), so the native JSON type Polecat 2.x emits is available. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../AggregateHandlerWorkflow/aggregate_handler_workflow.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Persistence/PolecatTests/AggregateHandlerWorkflow/aggregate_handler_workflow.cs b/src/Persistence/PolecatTests/AggregateHandlerWorkflow/aggregate_handler_workflow.cs index 8334f45a4..0fbb96f46 100644 --- a/src/Persistence/PolecatTests/AggregateHandlerWorkflow/aggregate_handler_workflow.cs +++ b/src/Persistence/PolecatTests/AggregateHandlerWorkflow/aggregate_handler_workflow.cs @@ -101,6 +101,10 @@ public void automatically_adding_stream_id_to_the_audit_members() [Fact] public void generates_wolverine_stream_id_otel_tag() { + // Resolving the handler triggers chain compilation; without this the + // chain's generated SourceCode is null and the assertion below NREs. + // Mirrors the equivalent Marten test (MartenTests/AggregateHandlerWorkflow). + var handler = theHost.GetRuntime().Handlers.HandlerFor(); var chain = theHost.GetRuntime().Handlers.ChainFor(); chain!.SourceCode!.ShouldContain($"SetTag(\"{Wolverine.Runtime.WolverineTracing.StreamId}\""); @@ -109,6 +113,7 @@ public void generates_wolverine_stream_id_otel_tag() [Fact] public void generates_wolverine_stream_type_otel_tag() { + var handler = theHost.GetRuntime().Handlers.HandlerFor(); var chain = theHost.GetRuntime().Handlers.ChainFor(); chain!.SourceCode!.ShouldContain($"SetTag(\"{Wolverine.Runtime.WolverineTracing.StreamType}\", \"{typeof(LetterAggregate).FullName}\"");