Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<PackageVersion Include="Lamar.Microsoft.DependencyInjection" Version="15.0.1" />
<PackageVersion Include="Marten" Version="8.32.0" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="6.1.3" />
<PackageVersion Include="Polecat" Version="1.6.1" />
<PackageVersion Include="Polecat" Version="2.1.0" />
<PackageVersion Include="Microsoft.Azure.Cosmos" Version="3.46.1" />
<PackageVersion Include="Marten.AspNetCore" Version="8.32.0" />
<PackageVersion Include="MemoryPack" Version="1.21.3" />
Expand Down
12 changes: 11 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,23 @@ services:
- "5672:5672"
- "15672:15672"

# SQL Server 2025 is required by Polecat 2.0+ (UseNativeJsonType defaults to true,
# which uses the native `json` column type introduced in SQL Server 2025).
#
# On Apple Silicon Macs, SQL Server 2025 under Rosetta may crash due to memory
# constraints. If you're working locally on ARM and don't need Polecat tests,
# you can override this service to use `mcr.microsoft.com/azure-sql-edge:latest`
# via a docker-compose.override.yml — but Polecat integration tests will fail
# against Azure SQL Edge unless you also set `m.UseNativeJsonType = false` in
# the test fixture.
sqlserver:
image: "mcr.microsoft.com/azure-sql-edge:latest"
image: "mcr.microsoft.com/mssql/server:2025-latest"
ports:
- "1434:1433"
environment:
- "ACCEPT_EULA=Y"
- "SA_PASSWORD=P@55w0rd"
- "MSSQL_SA_PASSWORD=P@55w0rd"
- "MSSQL_PID=Developer"

pulsar:
Expand Down
70 changes: 68 additions & 2 deletions docs/guide/durability/polecat/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,74 @@ public static IPolecatOp Pay([Entity] Invoice invoice)
}
```

There are existing Polecat ops for storing, inserting, updating, and deleting a document. There's also a specific
helper for starting a new event stream as shown below:
There are existing Polecat ops for storing, inserting, updating, and deleting a document.

### Storing Multiple Documents

Use `PolecatOps.StoreMany()` to store multiple documents of the same type, or `PolecatOps.StoreObjects()` to
store multiple documents of different types in a single side effect:

```csharp
// Store multiple documents of the same type
public static StoreManyDocs<Invoice> Handle(BatchInvoiceCommand command)
{
var invoices = command.Items.Select(i => new Invoice { Id = i.Id, Amount = i.Amount });
return PolecatOps.StoreMany(invoices.ToArray());
}

// Store multiple documents of different types
public static StoreObjects Handle(CreateOrderCommand command)
{
var order = new Order { Id = command.OrderId, Total = command.Total };
var audit = new AuditLog { Action = "OrderCreated", EntityId = command.OrderId };
return PolecatOps.StoreObjects(order, audit);
}
```

Both `StoreMany()` and `StoreObjects()` support fluent `With()` methods to incrementally add documents:

```csharp
public static StoreObjects Handle(ComplexCommand command)
{
return PolecatOps.StoreObjects(new Order { Id = command.OrderId })
.With(new AuditLog { Action = "Created" })
.With(new Notification { Message = "Order created" });
}
```

### Tenant-Scoped Operations

Every `PolecatOps` factory method has an overload that accepts a `tenantId` parameter. When provided, the
operation uses `IDocumentSession.ForTenant(tenantId)` to scope the write to a specific tenant. This is
useful in multi-tenant systems where a handler processing a message for one tenant needs to write data
to a different tenant's storage:

```csharp
// Store a document in a specific tenant
public static StoreDoc<Invoice> Handle(CreateInvoiceForTenant command)
{
var invoice = new Invoice { Id = command.InvoiceId, Amount = command.Amount };
return PolecatOps.Store(invoice, command.TenantId);
}

// Store many same-type documents in a specific tenant
public static StoreManyDocs<LineItem> Handle(BatchLineItems command)
{
return PolecatOps.StoreMany(command.TenantId, command.Items.ToArray());
}

// Store mixed-type documents in a specific tenant
public static StoreObjects Handle(CrossTenantAudit command)
{
return PolecatOps.StoreObjects(command.TargetTenantId,
new AuditRecord { Action = command.Action },
new Notification { Message = command.Message });
}
```

All existing method signatures are unchanged — the tenant overloads are purely additive.

There's also a specific helper for starting a new event stream as shown below:

```cs
public static class TodoListEndpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,27 @@ public void automatically_adding_stream_id_to_the_audit_members()
chain.SourceCode.ShouldContain("System.Diagnostics.Activity.Current?.SetTag(\"letter.aggregate.id\", raiseABC.LetterAggregateId);");
}

[Fact]
public void generates_wolverine_stream_id_otel_tag()
{
// Resolving the handler triggers chain compilation; without this the
// chain's generated SourceCode is null and the assertion below NREs.
// Mirrors the equivalent Marten test (MartenTests/AggregateHandlerWorkflow).
var handler = theHost.GetRuntime().Handlers.HandlerFor<RaiseABC>();
var chain = theHost.GetRuntime().Handlers.ChainFor<RaiseABC>();

chain!.SourceCode!.ShouldContain($"SetTag(\"{Wolverine.Runtime.WolverineTracing.StreamId}\"");
}

[Fact]
public void generates_wolverine_stream_type_otel_tag()
{
var handler = theHost.GetRuntime().Handlers.HandlerFor<RaiseABC>();
var chain = theHost.GetRuntime().Handlers.ChainFor<RaiseABC>();

chain!.SourceCode!.ShouldContain($"SetTag(\"{Wolverine.Runtime.WolverineTracing.StreamType}\", \"{typeof(LetterAggregate).FullName}\"");
}

[Fact]
public async Task events_then_response_invoke_with_return()
{
Expand Down
79 changes: 79 additions & 0 deletions src/Persistence/PolecatTests/PolecatOps_store.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using Shouldly;
using Wolverine.Polecat;

namespace PolecatTests;

public record StoreTestDoc1(string Name);
public record StoreTestDoc2(string Label);

public class PolecatOps_store
{
[Fact]
public void StoreMany()
{
var op = PolecatOps.StoreMany(new StoreTestDoc1("Test1"));

op.Documents.Count.ShouldBe(1);
op.Documents[0].ShouldBeOfType<StoreTestDoc1>();

op.With(new StoreTestDoc1("Test2"));

op.Documents.Count.ShouldBe(2);

op.With([new StoreTestDoc1("Test3"), new StoreTestDoc1("Test4")]);

op.Documents.Count.ShouldBe(4);

op = PolecatOps.StoreMany(new StoreTestDoc1("Test5"), new StoreTestDoc1("Test6"));

op.Documents.Count.ShouldBe(2);
}

[Fact]
public void StoreObjects()
{
var op = PolecatOps.StoreObjects(new StoreTestDoc1("Test1"));

op.Documents.Count.ShouldBe(1);
op.Documents[0].ShouldBeOfType<StoreTestDoc1>();

op.With(new StoreTestDoc2("Test2"));

op.Documents.Count.ShouldBe(2);
op.Documents[1].ShouldBeOfType<StoreTestDoc2>();

op.With([new StoreTestDoc1("Test3"), new StoreTestDoc2("Test4")]);

op.Documents.Count.ShouldBe(4);
op.Documents[2].ShouldBeOfType<StoreTestDoc1>();
op.Documents[3].ShouldBeOfType<StoreTestDoc2>();

op = PolecatOps.StoreObjects(new StoreTestDoc1("Test5"), new StoreTestDoc2("Test6"));

op.Documents.Count.ShouldBe(2);
op.Documents[0].ShouldBeOfType<StoreTestDoc1>();
op.Documents[1].ShouldBeOfType<StoreTestDoc2>();

op = PolecatOps.StoreObjects([new StoreTestDoc1("Test7"), new StoreTestDoc2("Test8")]);

op.Documents.Count.ShouldBe(2);
op.Documents[0].ShouldBeOfType<StoreTestDoc1>();
op.Documents[1].ShouldBeOfType<StoreTestDoc2>();
}

[Fact]
public void StoreObjects_with_tenantId()
{
var op = PolecatOps.StoreObjects("tenant-1", new StoreTestDoc1("Test1"), new StoreTestDoc2("Test2"));

op.Documents.Count.ShouldBe(2);
op.TenantId.ShouldBe("tenant-1");
}

[Fact]
public void IDocumentsOp_exposes_documents_readonly()
{
IDocumentsOp op = PolecatOps.StoreObjects(new StoreTestDoc1("Test1"), new StoreTestDoc2("Test2"));
op.Documents.Count.ShouldBe(2);
}
}
1 change: 1 addition & 0 deletions src/Persistence/Wolverine.Polecat/AggregateHandling.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public Variable Apply(IChain chain, IServiceContainer container)

var loader = new LoadAggregateFrame(this);
chain.Middleware.Add(loader);
chain.Middleware.Add(new TagAggregateOtelFrame(AggregateType, AggregateId));

var firstCall = chain.HandlerCalls().First();

Expand Down
33 changes: 33 additions & 0 deletions src/Persistence/Wolverine.Polecat/Codegen/TagAggregateOtelFrame.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System.Diagnostics;
using JasperFx.CodeGeneration;
using JasperFx.CodeGeneration.Frames;
using JasperFx.CodeGeneration.Model;
using JasperFx.Core.Reflection;
using Wolverine.Runtime;

namespace Wolverine.Polecat.Codegen;

/// <summary>
/// Frame that generates code to tag the current OpenTelemetry activity with
/// the aggregate stream ID and aggregate type when processing a Polecat
/// aggregate handler workflow.
/// </summary>
internal class TagAggregateOtelFrame : SyncFrame
{
private readonly Type _aggregateType;
private readonly Variable _aggregateId;

public TagAggregateOtelFrame(Type aggregateType, Variable aggregateId)
{
_aggregateType = aggregateType;
_aggregateId = aggregateId;
uses.Add(aggregateId);
}

public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)
{
writer.WriteLine($"{typeof(Activity).FullNameInCode()}.{nameof(Activity.Current)}?.{nameof(Activity.SetTag)}(\"{WolverineTracing.StreamId}\", {_aggregateId.Usage}.ToString());");
writer.WriteLine($"{typeof(Activity).FullNameInCode()}.{nameof(Activity.Current)}?.{nameof(Activity.SetTag)}(\"{WolverineTracing.StreamType}\", \"{_aggregateType.FullName}\");");
Next?.GenerateCode(method, writer);
}
}
89 changes: 88 additions & 1 deletion src/Persistence/Wolverine.Polecat/IPolecatOp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ public static StoreManyDocs<T> StoreMany<T>(params T[] documents) where T : notn
return new StoreManyDocs<T>(documents);
}

/// <summary>
/// Return a side effect of storing an enumerable of potentially mixed document types in Polecat
/// </summary>
/// <param name="documents"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static StoreObjects StoreObjects(params object[] documents)
{
if (documents == null) throw new ArgumentNullException(nameof(documents));
return new StoreObjects(documents);
}

public static InsertDoc<T> Insert<T>(T document) where T : notnull
{
if (document == null) throw new ArgumentNullException(nameof(document));
Expand Down Expand Up @@ -174,6 +186,16 @@ public static StoreManyDocs<T> StoreMany<T>(string tenantId, params T[] document
return new StoreManyDocs<T>(tenantId, documents);
}

/// <summary>
/// Return a side effect of storing an enumerable of potentially mixed document types, scoped to a specific tenant
/// </summary>
public static StoreObjects StoreObjects(string tenantId, params object[] documents)
{
if (tenantId == null) throw new ArgumentNullException(nameof(tenantId));
if (documents == null) throw new ArgumentNullException(nameof(documents));
return new StoreObjects(tenantId, documents);
}

/// <summary>
/// Return a side effect of inserting the specified document, scoped to a specific tenant
/// </summary>
Expand Down Expand Up @@ -385,9 +407,67 @@ public class StoreManyDocs<T> : DocumentsOp where T : notnull
public StoreManyDocs(params T[] documents) : base(documents.Cast<object>().ToArray()) { }
public StoreManyDocs(IList<T> documents) : this(documents.ToArray()) { }
public StoreManyDocs(string tenantId, params T[] documents) : base(tenantId, documents.Cast<object>().ToArray()) { }

public StoreManyDocs<T> With(T[] documents)
{
Documents.AddRange(documents.Cast<object>());
return this;
}

public StoreManyDocs<T> With(T document)
{
Documents.Add(document);
return this;
}

public override void Execute(IDocumentSession session) { ResolveSession(session).Store(Documents.Cast<T>()); }
}

public class StoreObjects : DocumentsOp
{
private static readonly System.Collections.Concurrent.ConcurrentDictionary<Type, System.Reflection.MethodInfo> _storeMethods = new();

public StoreObjects(params object[] documents) : base(documents) { }

public StoreObjects(IList<object> documents) : this(documents.ToArray()) { }

public StoreObjects(string tenantId, params object[] documents) : base(tenantId, documents) { }

public StoreObjects With(object[] documents)
{
Documents.AddRange(documents);
return this;
}

public StoreObjects With(object document)
{
Documents.Add(document);
return this;
}

public override void Execute(IDocumentSession session)
{
// Polecat does not have a single StoreObjects(IEnumerable<object>) method like Marten,
// so we dispatch each document to Store<T> by its runtime type.
var target = ResolveSession(session);
foreach (var document in Documents)
{
if (document is null) continue;
var docType = document.GetType();
var method = _storeMethods.GetOrAdd(docType, t =>
{
var open = typeof(IDocumentOperations).GetMethods()
.First(m => m.Name == nameof(IDocumentOperations.Store)
&& m.IsGenericMethodDefinition
&& m.GetParameters().Length == 1
&& !m.GetParameters()[0].ParameterType.IsArray);
return open.MakeGenericMethod(t);
});
method.Invoke(target, new[] { document });
}
}
}

public class InsertDoc<T> : DocumentOp where T : notnull
{
private readonly T _document;
Expand Down Expand Up @@ -480,7 +560,12 @@ protected IDocumentOperations ResolveSession(IDocumentSession session)
public abstract void Execute(IDocumentSession session);
}

public abstract class DocumentsOp : IPolecatOp
public interface IDocumentsOp : IPolecatOp
{
IReadOnlyList<object> Documents { get; }
}

public abstract class DocumentsOp : IDocumentsOp
{
public List<object> Documents { get; } = new();

Expand All @@ -501,4 +586,6 @@ protected IDocumentOperations ResolveSession(IDocumentSession session)
}

public abstract void Execute(IDocumentSession session);

IReadOnlyList<object> IDocumentsOp.Documents => Documents;
}
Loading
Loading