diff --git a/.github/workflows/efcore.yml b/.github/workflows/efcore.yml
new file mode 100644
index 000000000..f73bbfffb
--- /dev/null
+++ b/.github/workflows/efcore.yml
@@ -0,0 +1,79 @@
+name: efcore
+
+on:
+ push:
+ branches: [ main ]
+ pull_request:
+ branches: [ main ]
+
+env:
+ config: Release
+ disable_test_parallelization: true
+
+jobs:
+ test:
+ runs-on: ubuntu-latest
+ timeout-minutes: 30
+
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+ with:
+ fetch-depth: 0
+
+ - name: Setup .NET 8
+ uses: actions/setup-dotnet@v1
+ with:
+ dotnet-version: 8.0.x
+
+ - name: Setup .NET 9
+ uses: actions/setup-dotnet@v1
+ with:
+ dotnet-version: 9.0.x
+
+ - name: Setup .NET 10
+ uses: actions/setup-dotnet@v1
+ with:
+ dotnet-version: 10.0.x
+
+ - name: Start containers
+ run: docker compose up -d postgresql sqlserver
+
+ - name: Build
+ run: |
+ dotnet build src/Persistence/EfCoreTests/EfCoreTests.csproj --configuration ${{ env.config }} --framework net9.0
+ dotnet build src/Persistence/EfCoreTests.MultiTenancy/EfCoreTests.MultiTenancy.csproj --configuration ${{ env.config }} --framework net9.0
+
+ - name: Wait for PostgreSQL
+ run: |
+ echo "Waiting for PostgreSQL to be ready..."
+ for i in {1..30}; do
+ if docker compose exec -T postgresql pg_isready -U postgres; then
+ echo "PostgreSQL is ready"
+ break
+ fi
+ echo "Attempt $i: PostgreSQL not ready yet, waiting..."
+ sleep 2
+ done
+
+ - name: Wait for SQL Server
+ run: |
+ echo "Waiting for SQL Server to be ready..."
+ for i in {1..30}; do
+ if docker compose exec -T sqlserver /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P 'P@55w0rd' -C -Q "SELECT 1" > /dev/null 2>&1; then
+ echo "SQL Server is ready"
+ break
+ fi
+ echo "Attempt $i: SQL Server not ready yet, waiting..."
+ sleep 2
+ done
+
+ - name: Test EfCoreTests
+ run: dotnet test src/Persistence/EfCoreTests/EfCoreTests.csproj --configuration ${{ env.config }} --framework net9.0 --no-build --logger "GitHubActions;summary.includePassedTests=true;summary.includeSkippedTests=true"
+
+ - name: Test EfCoreTests.MultiTenancy
+ run: dotnet test src/Persistence/EfCoreTests.MultiTenancy/EfCoreTests.MultiTenancy.csproj --configuration ${{ env.config }} --framework net9.0 --no-build --logger "GitHubActions;summary.includePassedTests=true;summary.includeSkippedTests=true"
+
+ - name: Stop containers
+ if: always()
+ run: docker compose down
diff --git a/Directory.Packages.props b/Directory.Packages.props
index c245bca20..97141e45c 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -21,8 +21,8 @@
-
-
+
+
@@ -79,13 +79,13 @@
-
-
-
-
-
-
-
+
+
+
+
+
+
+
diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts
index 34523c2be..3e50c066f 100644
--- a/docs/.vitepress/config.mts
+++ b/docs/.vitepress/config.mts
@@ -271,8 +271,9 @@ const config: UserConfig = {
{text: 'Operation Side Effects', link: '/guide/durability/efcore/operations'},
{text: 'Saga Storage', link: '/guide/durability/efcore/sagas'},
{text: 'Multi-Tenancy', link: '/guide/durability/efcore/multi-tenancy'},
- {text: 'Domain Events', link: '/guide/durability/efcore/domain-events'}
-
+ {text: 'Domain Events', link: '/guide/durability/efcore/domain-events'},
+ {text: 'Database Migrations', link: '/guide/durability/efcore/migrations'}
+
]},
{text: 'Managing Message Storage', link: '/guide/durability/managing'},
{text: 'Dead Letter Storage', link: '/guide/durability/dead-letter-storage'},
diff --git a/docs/guide/durability/efcore/migrations.md b/docs/guide/durability/efcore/migrations.md
new file mode 100644
index 000000000..db7aee88f
--- /dev/null
+++ b/docs/guide/durability/efcore/migrations.md
@@ -0,0 +1,108 @@
+# Database Migrations
+
+Wolverine uses [Weasel](https://github.com/JasperFx/weasel) for schema management of EF Core `DbContext` types rather than EF Core's own migration system. This approach provides a consistent schema management experience across the entire "critter stack" (Wolverine + Marten) and avoids issues with EF Core's `Database.EnsureCreatedAsync()` bypassing migration history.
+
+## How It Works
+
+When you register a `DbContext` with Wolverine using `AddDbContextWithWolverineIntegration()` or call `UseEntityFrameworkCoreWolverineManagedMigrations()`, Wolverine will:
+
+1. **Read the EF Core model** — Wolverine inspects your `DbContext`'s entity types, properties, and relationships to build a Weasel schema representation
+2. **Compare against the actual database** — Weasel connects to the database and compares the expected schema with the current state
+3. **Apply deltas** — Only the necessary changes (new tables, added columns, foreign keys) are applied
+
+This all happens automatically at application startup when you use `UseResourceSetupOnStartup()` or through Wolverine's resource management commands.
+
+## Enabling Weasel-Managed Migrations
+
+To opt into Weasel-managed migrations for your EF Core `DbContext` types, add this to your Wolverine configuration:
+
+```csharp
+builder.UseWolverine(opts =>
+{
+ opts.PersistMessagesWithSqlServer(connectionString);
+
+ opts.Services.AddDbContextWithWolverineIntegration(
+ x => x.UseSqlServer(connectionString));
+
+ // Enable Weasel-managed migrations for all registered DbContext types
+ opts.UseEntityFrameworkCoreWolverineManagedMigrations();
+});
+```
+
+With this in place, Wolverine will create and update your EF Core tables using Weasel at startup, alongside any Wolverine envelope storage tables.
+
+## What Gets Migrated
+
+Weasel will manage the following schema elements from your EF Core model:
+
+- **Tables** — Created from entity types registered in `DbSet` properties
+- **Columns** — Mapped from entity properties, including types, nullability, and default values
+- **Primary keys** — Derived from `DbContext` key configuration
+- **Foreign keys** — Including cascade delete behavior
+- **Schema names** — Respects EF Core's `ToSchema()` configuration
+
+Entity types excluded from migrations via EF Core's `ExcludeFromMigrations()` are also excluded from Weasel management.
+
+## Programmatic Migration
+
+You can also trigger migrations programmatically using the Weasel extension methods on `IServiceProvider`:
+
+```csharp
+// Create a migration plan for a specific DbContext
+await using var migration = await serviceProvider
+ .CreateMigrationAsync(dbContext, CancellationToken.None);
+
+// Apply the migration (only applies if there are actual differences)
+await migration.ExecuteAsync(AutoCreate.CreateOrUpdate, CancellationToken.None);
+```
+
+The `CreateMigrationAsync()` method compares the EF Core model against the actual database schema and produces a `DbContextMigration` object. Calling `ExecuteAsync()` applies any necessary changes.
+
+### Creating the Database
+
+If you need to ensure the database itself exists (not just the tables), use:
+
+```csharp
+await serviceProvider.EnsureDatabaseExistsAsync(dbContext);
+```
+
+This uses Weasel's provider-specific database creation logic, which only creates the database catalog — it does not create any tables or schema objects.
+
+## Multi-Tenancy
+
+For multi-tenant setups where each tenant has its own database, Wolverine will automatically ensure each tenant database exists and apply schema migrations when using the tenanted `DbContext` builder. See [Multi-Tenancy](./multi-tenancy) for details.
+
+## Weasel vs EF Core Migrations
+
+| Feature | Weasel (Wolverine) | EF Core Migrations |
+|---------|-------------------|-------------------|
+| Migration tracking | Compares live schema | Migration history table |
+| Code generation | None needed | `dotnet ef migrations add` |
+| Additive changes | Automatic | Requires new migration |
+| Works with Marten | Yes, unified approach | No |
+| Rollback support | No | Yes, via `Down()` method |
+
+::: tip
+Weasel migrations are **additive** — they can create tables and add columns, but will not drop columns or tables automatically. This makes them safe for `CreateOrUpdate` scenarios in production.
+:::
+
+::: warning
+If you are already using EF Core's migration system (`dotnet ef migrations add`, `Database.MigrateAsync()`), you should choose one approach or the other. Mixing EF Core migrations with Weasel-managed migrations can lead to conflicts. Wolverine's Weasel-managed approach is recommended for applications in the "critter stack" ecosystem.
+:::
+
+## CLI Commands
+
+When Weasel-managed migrations are enabled, you can use Wolverine's built-in resource management:
+
+```bash
+# Apply all pending schema changes
+dotnet run -- resources setup
+
+# Check current database status
+dotnet run -- resources list
+
+# Reset all state (development only!)
+dotnet run -- resources clear
+```
+
+These commands manage both Wolverine's internal tables and your EF Core entity tables together.
diff --git a/src/Persistence/EfCoreTests.MultiTenancy/EfCoreTests.MultiTenancy.csproj b/src/Persistence/EfCoreTests.MultiTenancy/EfCoreTests.MultiTenancy.csproj
new file mode 100644
index 000000000..4c606c150
--- /dev/null
+++ b/src/Persistence/EfCoreTests.MultiTenancy/EfCoreTests.MultiTenancy.csproj
@@ -0,0 +1,47 @@
+
+
+
+ false
+ true
+ net9.0
+
+
+
+
+
+
+
+
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+ all
+
+
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+ all
+
+
+
+
+
+
+
+
+
+
+
+ Servers.cs
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Persistence/EfCoreTests.MultiTenancy/GlobalUsings.cs b/src/Persistence/EfCoreTests.MultiTenancy/GlobalUsings.cs
new file mode 100644
index 000000000..c802f4480
--- /dev/null
+++ b/src/Persistence/EfCoreTests.MultiTenancy/GlobalUsings.cs
@@ -0,0 +1 @@
+global using Xunit;
diff --git a/src/Persistence/EfCoreTests/MultiTenancy/MultiTenancyCompliance.cs b/src/Persistence/EfCoreTests.MultiTenancy/MultiTenancyCompliance.cs
similarity index 96%
rename from src/Persistence/EfCoreTests/MultiTenancy/MultiTenancyCompliance.cs
rename to src/Persistence/EfCoreTests.MultiTenancy/MultiTenancyCompliance.cs
index 52698a60a..9e285b6b8 100644
--- a/src/Persistence/EfCoreTests/MultiTenancy/MultiTenancyCompliance.cs
+++ b/src/Persistence/EfCoreTests.MultiTenancy/MultiTenancyCompliance.cs
@@ -83,9 +83,29 @@ public async Task InitializeAsync()
theBuilder = theHost.Services.GetRequiredService>();
}
- public Task DisposeAsync()
+ public async Task DisposeAsync()
{
- return theHost.StopAsync();
+ try
+ {
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
+ await theHost.StopAsync(cts.Token);
+ }
+ catch (Exception)
+ {
+ // Swallow shutdown errors - host may have already stopped or timed out
+ }
+
+ try
+ {
+ theHost.Dispose();
+ }
+ catch (Exception)
+ {
+ // Swallow errors from inner WebApplicationFactory dispose
+ }
+
+ NpgsqlConnection.ClearAllPools();
+ SqlConnection.ClearAllPools();
}
public abstract void Configure(WolverineOptions options);
diff --git a/src/Persistence/EfCoreTests/MultiTenancy/MultiTenancyDocumentationSamples.cs b/src/Persistence/EfCoreTests.MultiTenancy/MultiTenancyDocumentationSamples.cs
similarity index 99%
rename from src/Persistence/EfCoreTests/MultiTenancy/MultiTenancyDocumentationSamples.cs
rename to src/Persistence/EfCoreTests.MultiTenancy/MultiTenancyDocumentationSamples.cs
index 89d2cc206..6aa2e6309 100644
--- a/src/Persistence/EfCoreTests/MultiTenancy/MultiTenancyDocumentationSamples.cs
+++ b/src/Persistence/EfCoreTests.MultiTenancy/MultiTenancyDocumentationSamples.cs
@@ -219,3 +219,5 @@ public async Task HandleAsync(CreateItem command, TenantId tenantId, Cancellatio
#endregion
+public record CreateItem(string Name);
+
diff --git a/src/Persistence/EfCoreTests.MultiTenancy/NoParallelization.cs b/src/Persistence/EfCoreTests.MultiTenancy/NoParallelization.cs
new file mode 100644
index 000000000..e5cc5d402
--- /dev/null
+++ b/src/Persistence/EfCoreTests.MultiTenancy/NoParallelization.cs
@@ -0,0 +1 @@
+[assembly: CollectionBehavior(DisableTestParallelization = true)]
diff --git a/src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_marten_managed_multi_tenancy.cs b/src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_marten_managed_multi_tenancy.cs
similarity index 97%
rename from src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_marten_managed_multi_tenancy.cs
rename to src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_marten_managed_multi_tenancy.cs
index 1f47abdc9..98c17c3c3 100644
--- a/src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_marten_managed_multi_tenancy.cs
+++ b/src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_marten_managed_multi_tenancy.cs
@@ -48,6 +48,5 @@ public override void Configure(WolverineOptions opts)
opts.Services.RemoveAll(typeof(OrdersDbContext));
opts.AddSagaType();
- opts.Services.AddResourceSetupOnStartup();
}
}
\ No newline at end of file
diff --git a/src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_master_table_approach_postgresql.cs b/src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_master_table_approach_postgresql.cs
similarity index 96%
rename from src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_master_table_approach_postgresql.cs
rename to src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_master_table_approach_postgresql.cs
index 4be00f170..c724228f8 100644
--- a/src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_master_table_approach_postgresql.cs
+++ b/src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_master_table_approach_postgresql.cs
@@ -38,6 +38,5 @@ public override void Configure(WolverineOptions opts)
builder.UseNpgsql(connectionString.Value, b => b.MigrationsAssembly("MultiTenantedEfCoreWithPostgreSQL"));
}, AutoCreate.CreateOrUpdate);
- opts.Services.AddResourceSetupOnStartup();
}
}
\ No newline at end of file
diff --git a/src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_master_table_approach_sqlserver.cs b/src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_master_table_approach_sqlserver.cs
similarity index 96%
rename from src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_master_table_approach_sqlserver.cs
rename to src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_master_table_approach_sqlserver.cs
index 1e1fd720e..dc1232996 100644
--- a/src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_master_table_approach_sqlserver.cs
+++ b/src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_master_table_approach_sqlserver.cs
@@ -40,6 +40,5 @@ public override void Configure(WolverineOptions opts)
b => b.MigrationsAssembly("MultiTenantedEfCoreWithSqlServer"));
}, AutoCreate.CreateOrUpdate);
- opts.Services.AddResourceSetupOnStartup();
}
}
\ No newline at end of file
diff --git a/src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_shared_database_between_tenants_sql_server.cs b/src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_shared_database_between_tenants_sql_server.cs
similarity index 97%
rename from src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_shared_database_between_tenants_sql_server.cs
rename to src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_shared_database_between_tenants_sql_server.cs
index 6050d5567..7c83f1f6b 100644
--- a/src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_shared_database_between_tenants_sql_server.cs
+++ b/src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_shared_database_between_tenants_sql_server.cs
@@ -43,8 +43,6 @@ public override void Configure(WolverineOptions opts)
builder.UseSqlServer(connectionString.Value, b => b.MigrationsAssembly("MultiTenantedEfCoreWithSqlServer"));
}, AutoCreate.CreateOrUpdate);
- opts.Services.AddResourceSetupOnStartup();
-
}
[Fact]
diff --git a/src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_static_tenants_and_connection_strings_for_postgresql.cs b/src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_static_tenants_and_connection_strings_for_postgresql.cs
similarity index 98%
rename from src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_static_tenants_and_connection_strings_for_postgresql.cs
rename to src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_static_tenants_and_connection_strings_for_postgresql.cs
index 363d04c69..1cd6c5394 100644
--- a/src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_static_tenants_and_connection_strings_for_postgresql.cs
+++ b/src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_static_tenants_and_connection_strings_for_postgresql.cs
@@ -46,9 +46,8 @@ public override void Configure(WolverineOptions opts)
builder.UseNpgsql(connectionString.Value, b => b.MigrationsAssembly("MultiTenantedEfCoreWithPostgreSQL"));
}, AutoCreate.CreateOrUpdate);
- opts.Services.AddResourceSetupOnStartup();
}
-
+
[Fact]
public async Task opens_the_db_context_to_the_correct_database_1()
{
diff --git a/src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_static_tenants_and_connection_strings_for_sqlserver.cs b/src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_static_tenants_and_connection_strings_for_sqlserver.cs
similarity index 98%
rename from src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_static_tenants_and_connection_strings_for_sqlserver.cs
rename to src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_static_tenants_and_connection_strings_for_sqlserver.cs
index 6f0e64e89..65c40cd3b 100644
--- a/src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_static_tenants_and_connection_strings_for_sqlserver.cs
+++ b/src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_static_tenants_and_connection_strings_for_sqlserver.cs
@@ -44,9 +44,6 @@ public override void Configure(WolverineOptions opts)
builder.UseSqlServer(connectionString.Value, b => b.MigrationsAssembly("MultiTenantedEfCoreWithSqlServer"));
}, AutoCreate.CreateOrUpdate);
- opts.Services.AddResourceSetupOnStartup();
-
-
}
[Fact]
diff --git a/src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_static_tenants_and_data_sources_for_postgresql.cs b/src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_static_tenants_and_data_sources_for_postgresql.cs
similarity index 98%
rename from src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_static_tenants_and_data_sources_for_postgresql.cs
rename to src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_static_tenants_and_data_sources_for_postgresql.cs
index 32c768293..70706cdb6 100644
--- a/src/Persistence/EfCoreTests/MultiTenancy/multi_tenancy_with_static_tenants_and_data_sources_for_postgresql.cs
+++ b/src/Persistence/EfCoreTests.MultiTenancy/multi_tenancy_with_static_tenants_and_data_sources_for_postgresql.cs
@@ -45,9 +45,8 @@ public override void Configure(WolverineOptions opts)
builder.UseNpgsql((DbDataSource)dataSource, b => b.MigrationsAssembly("MultiTenantedEfCoreWithPostgreSQL"));
}, AutoCreate.CreateOrUpdate);
- opts.Services.AddResourceSetupOnStartup();
}
-
+
[Fact]
public async Task opens_the_db_context_to_the_correct_database_1()
{
diff --git a/src/Persistence/EfCoreTests/Bug_252_codegen_issue.cs b/src/Persistence/EfCoreTests/Bug_252_codegen_issue.cs
index 5d4e934cf..8995721c8 100644
--- a/src/Persistence/EfCoreTests/Bug_252_codegen_issue.cs
+++ b/src/Persistence/EfCoreTests/Bug_252_codegen_issue.cs
@@ -1,4 +1,3 @@
-using EfCoreTests.MultiTenancy;
using IntegrationTests;
using JasperFx;
using JasperFx.Core;
@@ -65,10 +64,6 @@ public async Task use_the_saga_type_to_determine_the_correct_DbContext_type()
var migration = await SchemaMigration.DetermineAsync(conn, table);
await new SqlServerMigrator().ApplyAllAsync(conn, migration, AutoCreate.All);
- using var scope = host.Services.CreateScope();
- var dbContext = scope.ServiceProvider.GetRequiredService();
- await dbContext.Database.EnsureCreatedAsync();
-
await conn.CloseAsync();
await host.InvokeMessageAndWaitAsync(new OrderCreated(Guid.NewGuid()));
@@ -108,10 +103,6 @@ public async Task bug_256_message_bus_should_be_in_outbox_transaction()
await conn.CloseAsync();
- using var scope = host.Services.CreateScope();
- var dbContext = scope.ServiceProvider.GetRequiredService();
- await dbContext.Database.EnsureCreatedAsync();
-
var chain = host.Services.GetRequiredService().HandlerFor().As().Chain;
var lines = chain.SourceCode.ReadLines();
diff --git a/src/Persistence/EfCoreTests/Bug_661_postgresql_with_ef_core.cs b/src/Persistence/EfCoreTests/Bug_661_postgresql_with_ef_core.cs
index 7dfd27950..21cd44bc8 100644
--- a/src/Persistence/EfCoreTests/Bug_661_postgresql_with_ef_core.cs
+++ b/src/Persistence/EfCoreTests/Bug_661_postgresql_with_ef_core.cs
@@ -1,4 +1,3 @@
-using EfCoreTests.MultiTenancy;
using IntegrationTests;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
diff --git a/src/Persistence/EfCoreTests/Bugs/Bug_1846_duplicate_execution_of_scheduled_jobs.cs b/src/Persistence/EfCoreTests/Bugs/Bug_1846_duplicate_execution_of_scheduled_jobs.cs
index 3872fbbd0..e8340bd4c 100644
--- a/src/Persistence/EfCoreTests/Bugs/Bug_1846_duplicate_execution_of_scheduled_jobs.cs
+++ b/src/Persistence/EfCoreTests/Bugs/Bug_1846_duplicate_execution_of_scheduled_jobs.cs
@@ -13,6 +13,7 @@
namespace EfCoreTests.Bugs;
+[Collection("sqlserver")]
public class Bug_1846_duplicate_execution_of_scheduled_jobs
{
private readonly ITestOutputHelper _output;
diff --git a/src/Persistence/EfCoreTests/Bugs/Bug_2075_separated_behavior_and_scheduled_messages.cs b/src/Persistence/EfCoreTests/Bugs/Bug_2075_separated_behavior_and_scheduled_messages.cs
index 1c0fcb655..0b1d9b42e 100644
--- a/src/Persistence/EfCoreTests/Bugs/Bug_2075_separated_behavior_and_scheduled_messages.cs
+++ b/src/Persistence/EfCoreTests/Bugs/Bug_2075_separated_behavior_and_scheduled_messages.cs
@@ -16,6 +16,7 @@
namespace EfCoreTests.Bugs;
+[Collection("postgresql")]
public class Bug_2075_separated_behavior_and_scheduled_messages(ITestOutputHelper Output)
{
[Fact]
diff --git a/src/Persistence/EfCoreTests/DomainEvents/configuration_of_domain_events_scrapers.cs b/src/Persistence/EfCoreTests/DomainEvents/configuration_of_domain_events_scrapers.cs
index a1dd37887..c94024108 100644
--- a/src/Persistence/EfCoreTests/DomainEvents/configuration_of_domain_events_scrapers.cs
+++ b/src/Persistence/EfCoreTests/DomainEvents/configuration_of_domain_events_scrapers.cs
@@ -4,6 +4,7 @@
using JasperFx.Resources;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
+using Npgsql;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using SharedPersistenceModels.Items;
@@ -40,6 +41,7 @@ public async ValueTask DisposeAsync()
{
await theHost.StopAsync();
theHost.Dispose();
+ NpgsqlConnection.ClearAllPools();
}
public async Task startHostAsync(Action configure)
diff --git a/src/Persistence/EfCoreTests/EfCoreCompilationScenarios.cs b/src/Persistence/EfCoreTests/EfCoreCompilationScenarios.cs
index 60abd5b08..d575e6c09 100644
--- a/src/Persistence/EfCoreTests/EfCoreCompilationScenarios.cs
+++ b/src/Persistence/EfCoreTests/EfCoreCompilationScenarios.cs
@@ -1,4 +1,3 @@
-using EfCoreTests.MultiTenancy;
using JasperFx.CodeGeneration;
using Microsoft.Extensions.DependencyInjection;
using NSubstitute;
diff --git a/src/Persistence/EfCoreTests/EfCoreTests.csproj b/src/Persistence/EfCoreTests/EfCoreTests.csproj
index d0fa35e94..25a72c0f0 100644
--- a/src/Persistence/EfCoreTests/EfCoreTests.csproj
+++ b/src/Persistence/EfCoreTests/EfCoreTests.csproj
@@ -19,6 +19,7 @@
runtime; build; native; contentfiles; analyzers; buildtransitiveall
+
@@ -32,17 +33,12 @@
Servers.cs
-
-
-
-
-
diff --git a/src/Persistence/EfCoreTests/Migrations/with_one_postgresql_context.cs b/src/Persistence/EfCoreTests/Migrations/with_one_postgresql_context.cs
index dbc1f69d1..ca97e85f3 100644
--- a/src/Persistence/EfCoreTests/Migrations/with_one_postgresql_context.cs
+++ b/src/Persistence/EfCoreTests/Migrations/with_one_postgresql_context.cs
@@ -53,6 +53,8 @@ public async Task InitializeAsync()
public async Task DisposeAsync()
{
await _host.StopAsync();
+ _host.Dispose();
+ NpgsqlConnection.ClearAllPools();
}
[Fact]
diff --git a/src/Persistence/EfCoreTests/Migrations/with_one_sqlserver_context.cs b/src/Persistence/EfCoreTests/Migrations/with_one_sqlserver_context.cs
index 05f33749f..35c82022b 100644
--- a/src/Persistence/EfCoreTests/Migrations/with_one_sqlserver_context.cs
+++ b/src/Persistence/EfCoreTests/Migrations/with_one_sqlserver_context.cs
@@ -52,6 +52,8 @@ public async Task InitializeAsync()
public async Task DisposeAsync()
{
await _host.StopAsync();
+ _host.Dispose();
+ SqlConnection.ClearAllPools();
}
[Fact]
diff --git a/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs b/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs
index dc1264767..b985ddce2 100644
--- a/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs
+++ b/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs
@@ -1,4 +1,3 @@
-using EfCoreTests.MultiTenancy;
using IntegrationTests;
using JasperFx;
using JasperFx.Core;
@@ -62,11 +61,10 @@ public async Task detect_concurrency_exception_as_SagaConcurrencyException()
var migration = await SchemaMigration.DetermineAsync(conn, table);
await new SqlServerMigrator().ApplyAllAsync(conn, migration, AutoCreate.All);
+ await conn.CloseAsync();
+
using var scope = host.Services.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService();
- await dbContext.Database.EnsureCreatedAsync();
-
- await conn.CloseAsync();
await dbContext.ConcurrencyTestSagas.AddAsync(new()
{
diff --git a/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs b/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs
index b909c0118..2543da4eb 100644
--- a/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs
+++ b/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs
@@ -1,4 +1,3 @@
-using EfCoreTests.MultiTenancy;
using IntegrationTests;
using JasperFx;
using JasperFx.Core.Reflection;
@@ -200,8 +199,6 @@ public async Task persist_an_outgoing_envelope_raw()
var messaging = nested.ServiceProvider.GetRequiredService>()
.ShouldBeOfType>();
- await messaging.DbContext.Database.EnsureCreatedAsync();
-
await messaging.Transaction.PersistOutgoingAsync(envelope);
messaging.DbContext.Items.Add(new Item { Id = Guid.NewGuid(), Name = Guid.NewGuid().ToString() });
@@ -245,8 +242,6 @@ public async Task persist_an_outgoing_envelope_mapped()
var messaging = nested.ServiceProvider.GetRequiredService>()
.ShouldBeOfType>();
- await messaging.DbContext.Database.EnsureCreatedAsync();
-
await messaging.Transaction.PersistOutgoingAsync(envelope);
messaging.DbContext.Items.Add(new Item { Id = Guid.NewGuid(), Name = Guid.NewGuid().ToString() });
diff --git a/src/Persistence/EfCoreTests/idempotency_with_inline_or_buffered_endpoints_end_to_end.cs b/src/Persistence/EfCoreTests/idempotency_with_inline_or_buffered_endpoints_end_to_end.cs
index 80eebba37..d4429cbb4 100644
--- a/src/Persistence/EfCoreTests/idempotency_with_inline_or_buffered_endpoints_end_to_end.cs
+++ b/src/Persistence/EfCoreTests/idempotency_with_inline_or_buffered_endpoints_end_to_end.cs
@@ -3,7 +3,6 @@
using JasperFx.CodeGeneration.Frames;
using JasperFx.Core;
using JasperFx.Resources;
-using Marten;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
@@ -15,7 +14,6 @@
using Weasel.SqlServer.Tables;
using Wolverine;
using Wolverine.EntityFrameworkCore;
-using Wolverine.Marten;
using Wolverine.Persistence;
using Wolverine.Runtime;
using Wolverine.SqlServer;
@@ -52,6 +50,7 @@ private static async Task buildSqlServer()
public Task DisposeAsync()
{
+ SqlConnection.ClearAllPools();
return Task.CompletedTask;
}
diff --git a/src/Persistence/EfCoreTests/persisting_envelopes_with_sqlserver.cs b/src/Persistence/EfCoreTests/persisting_envelopes_with_sqlserver.cs
index 32ec50b6e..763514195 100644
--- a/src/Persistence/EfCoreTests/persisting_envelopes_with_sqlserver.cs
+++ b/src/Persistence/EfCoreTests/persisting_envelopes_with_sqlserver.cs
@@ -11,6 +11,8 @@
using Wolverine.Persistence;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime;
+using Microsoft.Data.SqlClient;
+using Npgsql;
using Wolverine.SqlServer;
using Wolverine.Tracking;
@@ -93,6 +95,8 @@ public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
+ SqlConnection.ClearAllPools();
+ NpgsqlConnection.ClearAllPools();
}
[Fact]
diff --git a/src/Persistence/EfCoreTests/using_add_dbcontext_with_wolverine_integration.cs b/src/Persistence/EfCoreTests/using_add_dbcontext_with_wolverine_integration.cs
index 77f9af6f6..7c323503c 100644
--- a/src/Persistence/EfCoreTests/using_add_dbcontext_with_wolverine_integration.cs
+++ b/src/Persistence/EfCoreTests/using_add_dbcontext_with_wolverine_integration.cs
@@ -31,6 +31,8 @@ public class using_add_dbcontext_with_wolverine_integration : IAsyncLifetime
public async Task InitializeAsync()
{
+ SqlConnection.ClearAllPools();
+
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
@@ -49,6 +51,7 @@ public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
+ SqlConnection.ClearAllPools();
}
public Table ItemsTable { get; }
diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.cs b/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.cs
index f42dd7adb..344ce7bcb 100644
--- a/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.cs
+++ b/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.cs
@@ -51,7 +51,7 @@ public Task StartAsync(CancellationToken cancellationToken)
internal void StartTimers()
{
- _metrics = new PersistenceMetrics(_runtime.Meter, _settings, null);
+ _metrics = new PersistenceMetrics(_runtime, _settings, null);
if (_settings.DurabilityMetricsEnabled)
{
diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/AssemblyAttributes.cs b/src/Persistence/Wolverine.EntityFrameworkCore/AssemblyAttributes.cs
index 27c35eb3a..e30dfe826 100644
--- a/src/Persistence/Wolverine.EntityFrameworkCore/AssemblyAttributes.cs
+++ b/src/Persistence/Wolverine.EntityFrameworkCore/AssemblyAttributes.cs
@@ -1,4 +1,5 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("PersistenceTests")]
-[assembly: InternalsVisibleTo("EfCoreTests")]
\ No newline at end of file
+[assembly: InternalsVisibleTo("EfCoreTests")]
+[assembly: InternalsVisibleTo("EfCoreTests.MultiTenancy")]
\ No newline at end of file
diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/DbContextExtensions.cs b/src/Persistence/Wolverine.EntityFrameworkCore/DbContextExtensions.cs
new file mode 100644
index 000000000..bbe2300b8
--- /dev/null
+++ b/src/Persistence/Wolverine.EntityFrameworkCore/DbContextExtensions.cs
@@ -0,0 +1,61 @@
+using System.Data.Common;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Infrastructure;
+using Microsoft.EntityFrameworkCore.Storage;
+using Weasel.EntityFrameworkCore;
+
+namespace Wolverine.EntityFrameworkCore;
+
+public static class WolverineDbContextExtensions
+{
+ ///
+ /// Ensures the database referenced by the DbContext's connection exists, creating it if necessary.
+ /// TODO: Move this method to Weasel.EntityFrameworkCore.DbContextExtensions in a future Weasel release.
+ ///
+ public static async Task EnsureDatabaseExistsAsync(
+ this IServiceProvider services,
+ DbContext context,
+ CancellationToken ct = default)
+ {
+ var (conn, migrator) = services.FindMigratorForDbContext(context);
+
+ // When credentials are available in the connection string, use the standard Weasel path.
+ // This is the common case for connection-string-based configurations.
+ if (HasCredentials(conn.ConnectionString))
+ {
+ await migrator!.EnsureDatabaseExistsAsync(conn, ct).ConfigureAwait(false);
+ return;
+ }
+
+ // When using a DbDataSource (e.g., UseNpgsql(dataSource)), the connection's
+ // ConnectionString may strip credentials (Npgsql does this for security).
+ // Try the DbContext's configured connection string which may preserve them.
+ var dbContextConnStr = context.Database.GetConnectionString();
+ if (dbContextConnStr != null && HasCredentials(dbContextConnStr))
+ {
+ await using var credConn = (DbConnection)Activator.CreateInstance(conn.GetType(), dbContextConnStr)!;
+ await migrator!.EnsureDatabaseExistsAsync(credConn, ct).ConfigureAwait(false);
+ return;
+ }
+
+ // Last resort for DataSource-based connections (e.g., Marten multi-tenancy with
+ // NpgsqlDataSource): the data source manages authentication internally, so credentials
+ // aren't exposed in the connection string. Use EF Core's IRelationalDatabaseCreator
+ // which has internal access to the data source and can create databases.
+ var creator = context.Database.GetService();
+ if (!await creator.ExistsAsync(ct).ConfigureAwait(false))
+ {
+ await creator.CreateAsync(ct).ConfigureAwait(false);
+ }
+ }
+
+ private static bool HasCredentials(string? connectionString)
+ {
+ if (string.IsNullOrEmpty(connectionString)) return false;
+
+ return connectionString.Contains("Password", StringComparison.OrdinalIgnoreCase)
+ || connectionString.Contains("Pwd", StringComparison.OrdinalIgnoreCase)
+ || connectionString.Contains("Integrated Security", StringComparison.OrdinalIgnoreCase)
+ || connectionString.Contains("Trusted_Connection", StringComparison.OrdinalIgnoreCase);
+ }
+}
diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/Internals/Migrations/EntityFrameworkCoreSystemPart.cs b/src/Persistence/Wolverine.EntityFrameworkCore/Internals/Migrations/EntityFrameworkCoreSystemPart.cs
index ff19ad75c..c2bb80616 100644
--- a/src/Persistence/Wolverine.EntityFrameworkCore/Internals/Migrations/EntityFrameworkCoreSystemPart.cs
+++ b/src/Persistence/Wolverine.EntityFrameworkCore/Internals/Migrations/EntityFrameworkCoreSystemPart.cs
@@ -53,10 +53,30 @@ public async Task WriteToConsole()
public async ValueTask> FindResources()
{
- var databases = await BuildDatabases();
- var resources = databases.Select(x => new DatabaseResource(x, new Uri("efcore://"))).ToList();
+ // Only create DatabaseResource instances for non-tenanted DbContexts.
+ // Tenanted DbContexts are managed by TenantedDbContextInitializer which handles
+ // both database creation and schema migration via IResourceCreator.
+ var dbContextTypes = _container
+ .FindMatchingServices(type => type.CanBeCastTo())
+ .Where(x => !x.IsKeyedService)
+ .Select(x => x.ServiceType)
+ .ToArray();
+
+ var list = new List();
+ using var scope = _container.GetInstance().CreateScope();
- return resources;
+ foreach (var dbContextType in dbContextTypes)
+ {
+ var matching = _sources.FirstOrDefault(x => x.DbContextType == dbContextType);
+ if (matching == null)
+ {
+ var context = (DbContext)scope.ServiceProvider.GetRequiredService(dbContextType);
+ var database = _container.Services.CreateDatabase(context, dbContextType.FullNameInCode());
+ list.Add(new DatabaseResource(database, SubjectUri));
+ }
+ }
+
+ return list;
}
public async Task AssertEnvironmentAsync(IServiceProvider services, EnvironmentCheckResults results, CancellationToken token)
@@ -110,7 +130,7 @@ public async ValueTask> BuildDatabases()
var contexts = await matching.FindAllAsync();
foreach (var dbContext in contexts)
{
- await dbContext.Database.EnsureCreatedAsync();
+ await scope.ServiceProvider.EnsureDatabaseExistsAsync(dbContext);
var database = _container.Services.CreateDatabase(dbContext, dbContextType.FullNameInCode());
list.Add(database);
}
diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/Internals/TenantedDbContextBuilderByConnectionString.cs b/src/Persistence/Wolverine.EntityFrameworkCore/Internals/TenantedDbContextBuilderByConnectionString.cs
index 2a0f80822..8c9f1f4a8 100644
--- a/src/Persistence/Wolverine.EntityFrameworkCore/Internals/TenantedDbContextBuilderByConnectionString.cs
+++ b/src/Persistence/Wolverine.EntityFrameworkCore/Internals/TenantedDbContextBuilderByConnectionString.cs
@@ -8,6 +8,7 @@
using JasperFx.MultiTenancy;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
+using Weasel.Core;
using Weasel.EntityFrameworkCore;
using Wolverine.EntityFrameworkCore.Internals.Migrations;
using Wolverine.Persistence.Durability;
@@ -112,7 +113,9 @@ public async Task EnsureAllTenantDatabasesCreatedAsync()
foreach (var assignment in _store.Source.AllActiveByTenant())
{
var dbContext = await BuildAsync(assignment.TenantId, CancellationToken.None);
- await dbContext.Database.EnsureCreatedAsync();
+ await _serviceProvider.EnsureDatabaseExistsAsync(dbContext);
+ await using var migration = await _serviceProvider.CreateMigrationAsync(dbContext, CancellationToken.None);
+ await migration.ExecuteAsync(AutoCreate.CreateOrUpdate, CancellationToken.None);
}
}
@@ -122,9 +125,9 @@ public async Task ApplyAllChangesToDatabasesAsync()
foreach (var context in contexts)
{
- await context.Database.EnsureCreatedAsync();
+ await _serviceProvider.EnsureDatabaseExistsAsync(context);
await using var migration = await _serviceProvider.CreateMigrationAsync(context, CancellationToken.None);
-
+
// TODO -- add some logging here!
await migration.ExecuteAsync(AutoCreate.CreateOrUpdate, CancellationToken.None);
}
@@ -136,8 +139,9 @@ public async Task EnsureAllDatabasesAreCreatedAsync()
foreach (var context in contexts)
{
- // TODO -- let's put some debug logging here!!!!
- await context.Database.EnsureCreatedAsync();
+ // Only ensure the database catalog exists here. Table creation/migration
+ // is handled by ApplyAllChangesToDatabasesAsync() via Setup().
+ await _serviceProvider.EnsureDatabaseExistsAsync(context);
}
}
diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/Internals/TenantedDbContextBuilderByDbDataSource.cs b/src/Persistence/Wolverine.EntityFrameworkCore/Internals/TenantedDbContextBuilderByDbDataSource.cs
index fedc89564..8bd3007f7 100644
--- a/src/Persistence/Wolverine.EntityFrameworkCore/Internals/TenantedDbContextBuilderByDbDataSource.cs
+++ b/src/Persistence/Wolverine.EntityFrameworkCore/Internals/TenantedDbContextBuilderByDbDataSource.cs
@@ -8,8 +8,8 @@
using JasperFx.MultiTenancy;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
-using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.Extensions.DependencyInjection;
+using Weasel.Core;
using Weasel.EntityFrameworkCore;
using Wolverine.Persistence.Durability;
using Wolverine.RDBMS;
@@ -84,42 +84,69 @@ public ValueTask BuildAsync(CancellationToken cancellationToken)
public async Task ApplyAllChangesToDatabasesAsync()
{
+ // For data source connections, ensure all tenant databases exist FIRST before
+ // building DbContexts. NpgsqlDataSource strips credentials from its ConnectionString
+ // property, so Weasel's EnsureDatabaseExistsAsync cannot create admin connections.
+ // We use the admin data source directly instead.
+ await ensureAllTenantDatabasesExistAsync();
+
var contexts = await BuildAllAsync();
foreach (var context in contexts)
{
- var pending = (await context.Database.GetPendingMigrationsAsync()).ToArray();
- var applied = (await context.Database.GetAppliedMigrationsAsync()).ToArray();
-
- if (pending.All(x => applied.Contains(x))) return;
-
- var migrator = context.Database.GetInfrastructure().GetRequiredService();
- await migrator.MigrateAsync();
+ await using var migration = await _serviceProvider.CreateMigrationAsync(context, CancellationToken.None);
+ await migration.ExecuteAsync(AutoCreate.CreateOrUpdate, CancellationToken.None);
}
}
public async Task EnsureAllDatabasesAreCreatedAsync()
{
- var list = new List();
- list.Add((T)BuildForMain());
+ // For data source connections, ensure all tenant databases exist FIRST before
+ // building DbContexts. NpgsqlDataSource strips credentials from its ConnectionString
+ // property, so Weasel's EnsureDatabaseExistsAsync cannot create admin connections.
+ await ensureAllTenantDatabasesExistAsync();
+ }
+
+ private async Task ensureAllTenantDatabasesExistAsync()
+ {
+ var adminDataSource = _store.Main.As().Settings.DataSource;
+ if (adminDataSource == null) return;
await _store.Source.RefreshLiteAsync();
-
+
+ var databaseNames = new HashSet(StringComparer.OrdinalIgnoreCase);
foreach (var assignment in _store.Source.AllActiveByTenant())
{
- var dbContext = await BuildAsync(assignment.TenantId, CancellationToken.None);
- list.Add(dbContext);
+ var settings = assignment.Value.As().Settings;
+ var connStr = settings.ConnectionString ?? settings.DataSource?.ConnectionString;
+ if (string.IsNullOrEmpty(connStr)) continue;
+
+ var builder = new DbConnectionStringBuilder { ConnectionString = connStr };
+ if (builder.TryGetValue("Database", out var dbObj) && dbObj is string dbName && !string.IsNullOrEmpty(dbName))
+ {
+ databaseNames.Add(dbName);
+ }
}
- // Filter out duplicates when multiple tenants address the same database
- var contexts = list.GroupBy(x => x.Database.GetConnectionString()).Select(x => x.First()).ToList();
+ if (databaseNames.Count == 0) return;
- foreach (var context in contexts)
+ await using var adminConn = adminDataSource.CreateConnection();
+ await adminConn.OpenAsync();
+
+ foreach (var dbName in databaseNames)
{
- await context.Database.EnsureCreatedAsync();
- await using var migration = await _serviceProvider.CreateMigrationAsync(context, CancellationToken.None);
- await migration.ExecuteAsync(AutoCreate.CreateOrUpdate, CancellationToken.None);
- // TODO -- let's put some debug logging here!!!!
+ await using var checkCmd = adminConn.CreateCommand();
+ checkCmd.CommandText = "SELECT 1 FROM pg_database WHERE datname = @dbname";
+ var param = checkCmd.CreateParameter();
+ param.ParameterName = "dbname";
+ param.Value = dbName;
+ checkCmd.Parameters.Add(param);
+
+ if (await checkCmd.ExecuteScalarAsync() != null) continue;
+
+ await using var createCmd = adminConn.CreateCommand();
+ createCmd.CommandText = $"CREATE DATABASE \"{dbName}\"";
+ await createCmd.ExecuteNonQueryAsync();
}
}
@@ -195,7 +222,9 @@ public async Task EnsureAllTenantDatabasesCreatedAsync()
foreach (var assignment in _store.Source.AllActiveByTenant())
{
var dbContext = await BuildAsync(assignment.TenantId, CancellationToken.None);
- await dbContext.Database.EnsureCreatedAsync();
+ await _serviceProvider.EnsureDatabaseExistsAsync(dbContext);
+ await using var migration = await _serviceProvider.CreateMigrationAsync(dbContext, CancellationToken.None);
+ await migration.ExecuteAsync(AutoCreate.CreateOrUpdate, CancellationToken.None);
}
}
diff --git a/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs b/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs
index dbcb0d29f..75f2b3934 100644
--- a/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs
+++ b/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs
@@ -180,29 +180,13 @@ public override async Task FetchCountsAsync()
{
var counts = new PersistedCounts();
- await using (var reader = await CreateCommand( $"select status, count(*) from {SchemaName}.{DatabaseConstants.IncomingTable} group by status")
- .ExecuteReaderAsync())
+ if (Durability.EnableInboxPartitioning)
{
- while (await reader.ReadAsync())
- {
- var status = Enum.Parse(await reader.GetFieldValueAsync(0));
- var count = await reader.GetFieldValueAsync(1);
-
- if (status == EnvelopeStatus.Incoming)
- {
- counts.Incoming = count;
- }
- else if (status == EnvelopeStatus.Handled)
- {
- counts.Handled = count;
- }
- else if (status == EnvelopeStatus.Scheduled)
- {
- counts.Scheduled = count;
- }
- }
-
- await reader.CloseAsync();
+ await fetchCountsWithPartitionEstimates(counts);
+ }
+ else
+ {
+ await fetchCountsWithGroupBy(counts);
}
var longCount = await CreateCommand($"select count(*) from {SchemaName}.{DatabaseConstants.OutgoingTable}")
@@ -218,6 +202,102 @@ public override async Task FetchCountsAsync()
return counts;
}
+ private async Task fetchCountsWithGroupBy(PersistedCounts counts)
+ {
+ await using var reader = await CreateCommand(
+ $"select status, count(*) from {SchemaName}.{DatabaseConstants.IncomingTable} group by status")
+ .ExecuteReaderAsync();
+
+ while (await reader.ReadAsync())
+ {
+ var status = Enum.Parse(await reader.GetFieldValueAsync(0));
+ var count = await reader.GetFieldValueAsync(1);
+
+ if (status == EnvelopeStatus.Incoming)
+ {
+ counts.Incoming = count;
+ }
+ else if (status == EnvelopeStatus.Handled)
+ {
+ counts.Handled = count;
+ }
+ else if (status == EnvelopeStatus.Scheduled)
+ {
+ counts.Scheduled = count;
+ }
+ }
+
+ await reader.CloseAsync();
+ }
+
+ private async Task fetchCountsWithPartitionEstimates(PersistedCounts counts)
+ {
+ // Use pg_class reltuples to estimate row counts per partition.
+ // This is the "Safe and Explicit" approach from
+ // https://stackoverflow.com/questions/7943233: handles never-vacuumed
+ // tables (reltuples < 0) and empty tables (relpages = 0), then
+ // scales the estimate by the current relation size.
+ // If any partition has data (pg_relation_size > 0) but stale stats
+ // (reltuples <= 0), we fall back to exact GROUP BY count.
+ var sql = $@"
+select p.partition_name, c.reltuples,
+ pg_catalog.pg_relation_size(c.oid) as relation_size,
+ (case when c.reltuples < 0 then 0
+ when c.relpages = 0 then 0
+ else (c.reltuples / c.relpages)
+ * (pg_catalog.pg_relation_size(c.oid)
+ / pg_catalog.current_setting('block_size')::int)
+ end)::bigint as estimated_count
+from pg_catalog.pg_class c
+join (values
+ ('{DatabaseConstants.IncomingTable}_incoming', 'Incoming'),
+ ('{DatabaseConstants.IncomingTable}_scheduled', 'Scheduled'),
+ ('{DatabaseConstants.IncomingTable}_handled', 'Handled')
+) as p(relname, partition_name) on c.relname = p.relname
+join pg_catalog.pg_namespace n on n.oid = c.relnamespace and n.nspname = '{SchemaName}';";
+
+ var needsFallback = false;
+
+ await using (var reader = await CreateCommand(sql).ExecuteReaderAsync())
+ {
+ while (await reader.ReadAsync())
+ {
+ var partitionName = await reader.GetFieldValueAsync(0);
+ var reltuples = await reader.GetFieldValueAsync(1);
+ var relationSize = await reader.GetFieldValueAsync(2);
+ var estimate = await reader.GetFieldValueAsync(3);
+
+ // If the partition has physical data but reltuples hasn't been
+ // updated by VACUUM/ANALYZE, the estimate will be wrong.
+ if (reltuples <= 0 && relationSize > 0)
+ {
+ needsFallback = true;
+ break;
+ }
+
+ switch (partitionName)
+ {
+ case "Incoming":
+ counts.Incoming = (int)estimate;
+ break;
+ case "Scheduled":
+ counts.Scheduled = (int)estimate;
+ break;
+ case "Handled":
+ counts.Handled = (int)estimate;
+ break;
+ }
+ }
+
+ await reader.CloseAsync();
+ }
+
+ if (needsFallback)
+ {
+ await fetchCountsWithGroupBy(counts);
+ }
+ }
+
public override async Task DiscardAndReassignOutgoingAsync(Envelope[] discards, Envelope[] reassigned, int nodeId)
{
await using var cmd = CreateCommand(_discardAndReassignOutgoingSql)
diff --git a/src/Persistence/Wolverine.Postgresql/PostgresqlTenantedMessageStore.cs b/src/Persistence/Wolverine.Postgresql/PostgresqlTenantedMessageStore.cs
index caa16c4a7..bc2c24f9b 100644
--- a/src/Persistence/Wolverine.Postgresql/PostgresqlTenantedMessageStore.cs
+++ b/src/Persistence/Wolverine.Postgresql/PostgresqlTenantedMessageStore.cs
@@ -147,7 +147,7 @@ public async Task RefreshAsync(bool withMigration)
var store = buildTenantStoreForDataSource(assignment.Value);
store.TenantIds.Fill(assignment.TenantId);
- if (_runtime.Options.AutoBuildMessageStorageOnStartup != AutoCreate.None)
+ if (withMigration && _runtime.Options.AutoBuildMessageStorageOnStartup != AutoCreate.None)
{
await store.Admin.MigrateAsync();
}
diff --git a/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs b/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs
index 4be891070..f87cf9cb2 100644
--- a/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs
+++ b/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs
@@ -67,7 +67,7 @@ public DurabilityAgent(IWolverineRuntime runtime, IMessageDatabase database)
public Task StartAsync(CancellationToken cancellationToken)
{
- _metrics = new PersistenceMetrics(_runtime.Meter, _settings, _database.Name);
+ _metrics = new PersistenceMetrics(_runtime, _settings, _database.Name);
if (_settings.DurabilityMetricsEnabled)
{
diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs
index eb2d7b91a..339a216e5 100644
--- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs
+++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs
@@ -14,7 +14,6 @@
using Wolverine.RDBMS.Transport;
using Wolverine.Runtime;
using Wolverine.Runtime.Agents;
-using Wolverine.Runtime.WorkerQueues;
using Wolverine.Transports;
using DbCommandBuilder = Weasel.Core.DbCommandBuilder;
diff --git a/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.cs b/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.cs
index 7651fc1ad..e39134d3c 100644
--- a/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.cs
+++ b/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.cs
@@ -53,7 +53,7 @@ public Task StartAsync(CancellationToken cancellationToken)
internal void StartTimers()
{
- _metrics = new PersistenceMetrics(_runtime.Meter, _settings, null);
+ _metrics = new PersistenceMetrics(_runtime, _settings, null);
if (_settings.DurabilityMetricsEnabled)
{
diff --git a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs
index a3ca7e3d0..29c1e0d92 100644
--- a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs
+++ b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs
@@ -330,20 +330,20 @@ public override ISchemaObject AddExternalMessageTable(ExternalMessageTable defin
protected override Task deleteMany(DbTransaction tx, Guid[] ids, DbObjectName tableName, string idColumnName)
{
- var builder = new CommandBuilder();
+ var builder = new BatchBuilder();
foreach (var id in ids)
{
+ builder.StartNewCommand();
builder.Append($"delete from {tableName.QualifiedName} where {idColumnName} = ");
builder.AppendParameter(id);
- builder.Append(";");
}
- var command = builder.Compile();
- command.Connection = (SqlConnection)tx.Connection;
- command.Transaction = (SqlTransaction)tx;
+ var batch = builder.Compile();
+ batch.Connection = (SqlConnection)tx.Connection;
+ batch.Transaction = (SqlTransaction)tx;
- return command.ExecuteNonQueryAsync();
+ return batch.ExecuteNonQueryAsync();
}
protected override Task TryAttainLockAsync(int lockId, SqlConnection connection, CancellationToken token)
diff --git a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerNodePersistence.cs b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerNodePersistence.cs
index dfe49f4bb..1a470bd34 100644
--- a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerNodePersistence.cs
+++ b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerNodePersistence.cs
@@ -1,4 +1,3 @@
-using System.Data;
using System.Data.Common;
using JasperFx.Core;
using Microsoft.Data.SqlClient;
@@ -119,36 +118,33 @@ public async Task> LoadAllNodesAsync(CancellationTo
public async Task PersistAgentRestrictionsAsync(IReadOnlyList restrictions,
CancellationToken cancellationToken)
{
- var builder = new CommandBuilder();
+ var builder = new BatchBuilder();
foreach (var restriction in restrictions)
{
- builder.Append($"delete from {_restrictionTable} where id = ");
- builder.AppendParameter(restriction.Id);
- builder.Append(";");
- if (restriction.Type != AgentRestrictionType.None)
+ builder.StartNewCommand();
+
+ if (restriction.Type == AgentRestrictionType.None)
+ {
+ builder.Append($"delete from {_restrictionTable} where id = ");
+ builder.AppendParameter(restriction.Id);
+ }
+ else
{
+ builder.Append($"delete from {_restrictionTable} where id = ");
+ builder.AppendParameter(restriction.Id);
+ builder.StartNewCommand();
builder.Append(
$"insert into {_restrictionTable} (id, uri, type, node) values (");
- var parameters = builder.AppendWithParameters("?, ?, ?, ?");
- parameters[0].Value = restriction.Id;
- parameters[0].SqlDbType = SqlDbType.UniqueIdentifier;
- parameters[1].Value = restriction.AgentUri.ToString();
- parameters[1].SqlDbType = SqlDbType.VarChar;
- parameters[2].Value = restriction.Type.ToString();
- parameters[2].SqlDbType = SqlDbType.VarChar;
- parameters[3].Value = restriction.NodeNumber;
- parameters[3].SqlDbType = SqlDbType.Int;
-
- builder.Append(");");
+ builder.AppendParameters(restriction.Id, restriction.AgentUri.ToString(), restriction.Type.ToString(), restriction.NodeNumber);
+ builder.Append(")");
}
-
}
- var cmd = builder.Compile();
+ var batch = builder.Compile();
await using var conn = new SqlConnection(_settings.ConnectionString);
await conn.OpenAsync(cancellationToken);
- cmd.Connection = conn;
- await cmd.ExecuteNonQueryAsync(cancellationToken);
+ batch.Connection = conn;
+ await batch.ExecuteNonQueryAsync(cancellationToken);
await conn.CloseAsync();
}
@@ -292,19 +288,22 @@ public async Task AssignAgentsAsync(Guid nodeId, IReadOnlyList agents, Canc
await using var conn = new SqlConnection(_settings.ConnectionString);
await conn.OpenAsync(cancellationToken);
- var builder = new CommandBuilder();
- var nodeParameter = builder.AddNamedParameter("node", nodeId, SqlDbType.UniqueIdentifier);
+ var builder = new BatchBuilder();
foreach (var agent in agents)
{
- var parameter = builder.AddParameter(agent.ToString());
- builder.Append($"delete from {_assignmentTable} where id = @{parameter.ParameterName};insert into {_assignmentTable} (id, node_id) values (@{parameter.ParameterName}, @{nodeParameter.ParameterName});");
+ builder.StartNewCommand();
+ builder.Append($"delete from {_assignmentTable} where id = ");
+ builder.AppendParameter(agent.ToString());
+ builder.StartNewCommand();
+ builder.Append($"insert into {_assignmentTable} (id, node_id) values (");
+ builder.AppendParameters(agent.ToString(), nodeId);
+ builder.Append(")");
}
- var command = builder.Compile();
- command.Connection = conn;
- await command.ExecuteNonQueryAsync(cancellationToken);
-
+ var batch = builder.Compile();
+ batch.Connection = conn;
+ await batch.ExecuteNonQueryAsync(cancellationToken);
await conn.CloseAsync();
}
diff --git a/src/Testing/BackPressureTests/XUnitObserver.cs b/src/Testing/BackPressureTests/XUnitObserver.cs
index 87175c08f..aac5f01fe 100644
--- a/src/Testing/BackPressureTests/XUnitObserver.cs
+++ b/src/Testing/BackPressureTests/XUnitObserver.cs
@@ -1,5 +1,7 @@
using Wolverine.Configuration;
+using Wolverine.Logging;
using Wolverine.Runtime.Agents;
+using Wolverine.Runtime.Metrics;
using Wolverine.Runtime.Routing;
using Wolverine.Transports;
using Xunit.Abstractions;
@@ -81,4 +83,14 @@ public Task BackPressureLifted(Endpoint endpoint)
Lifted?.TrySetResult();
return Task.CompletedTask;
}
+
+ public void PersistedCounts(Uri storeUri, PersistedCounts counts)
+ {
+ // Nothing here...
+ }
+
+ public void MessageHandlingMetricsExported(MessageHandlingMetrics metrics)
+ {
+ // Nothing here...
+ }
}
\ No newline at end of file
diff --git a/src/Testing/CoreTests/Acceptance/exporting_service_capabilities.cs b/src/Testing/CoreTests/Acceptance/exporting_service_capabilities.cs
index e741b745b..d5f70f3b9 100644
--- a/src/Testing/CoreTests/Acceptance/exporting_service_capabilities.cs
+++ b/src/Testing/CoreTests/Acceptance/exporting_service_capabilities.cs
@@ -1,3 +1,4 @@
+using Wolverine.Configuration;
using Wolverine.Configuration.Capabilities;
using Wolverine.Tracking;
using Xunit;
@@ -35,7 +36,35 @@ public void the_application_version()
{
theCapabilities.Version.ShouldBe(GetType().Assembly.GetName().Version);
}
-
-
+ [Fact]
+ public void should_not_include_messages_from_wolverine_assembly()
+ {
+ var wolverineAssemblyName = typeof(WolverineOptions).Assembly.GetName().Name;
+ theCapabilities.Messages.ShouldNotBeEmpty();
+ theCapabilities.Messages.ShouldAllBe(m => m.Type.AssemblyName != wolverineAssemblyName);
+ }
+
+ [Fact]
+ public void should_not_include_system_endpoints()
+ {
+ var systemEndpointUris = Host.GetRuntime().Options.Transports
+ .AllEndpoints()
+ .Where(e => e.Role == EndpointRole.System)
+ .Select(e => e.Uri)
+ .ToList();
+
+ systemEndpointUris.ShouldNotBeEmpty();
+ foreach (var uri in systemEndpointUris)
+ {
+ theCapabilities.MessagingEndpoints.ShouldNotContain(e => e.Uri == uri);
+ }
+ }
+
+ [Fact]
+ public void should_include_application_messages()
+ {
+ var appAssemblyName = GetType().Assembly.GetName().Name;
+ theCapabilities.Messages.ShouldContain(m => m.Type.AssemblyName == appAssemblyName);
+ }
}
\ No newline at end of file
diff --git a/src/Testing/CoreTests/Runtime/Metrics/MessageHandlingMetricsSumTests.cs b/src/Testing/CoreTests/Runtime/Metrics/MessageHandlingMetricsSumTests.cs
new file mode 100644
index 000000000..b8eb21a91
--- /dev/null
+++ b/src/Testing/CoreTests/Runtime/Metrics/MessageHandlingMetricsSumTests.cs
@@ -0,0 +1,473 @@
+using JasperFx.Core;
+using Wolverine.Runtime.Metrics;
+using Xunit;
+
+namespace CoreTests.Runtime.Metrics;
+
+public class MessageHandlingMetricsSumTests
+{
+ private static readonly Uri Destination = new("tcp://localhost:5000");
+ private const string MessageType = "MyApp.MyMessage";
+
+ [Fact]
+ public void sum_empty_collection_returns_zeroed_metrics()
+ {
+ var result = MessageHandlingMetrics.Sum(MessageType, Destination, []);
+
+ result.MessageType.ShouldBe(MessageType);
+ result.Destination.ShouldBe(Destination);
+ result.PerTenant.ShouldBeEmpty();
+ result.Range.From.ShouldBeNull();
+ result.Range.To.ShouldBeNull();
+ }
+
+ [Fact]
+ public void sum_single_element()
+ {
+ var from = DateTimeOffset.UtcNow.Subtract(5.Minutes());
+ var to = DateTimeOffset.UtcNow;
+
+ var metrics = new MessageHandlingMetrics(MessageType, Destination,
+ new TimeRange(from, to),
+ [new PerTenantMetrics("tenant1",
+ new Executions(10, 500),
+ new EffectiveTime(8, 120.5),
+ [new ExceptionCounts("System.InvalidOperationException", 2, 1)])]);
+
+ var result = MessageHandlingMetrics.Sum(MessageType, Destination, [metrics]);
+
+ result.Range.From.ShouldBe(from);
+ result.Range.To.ShouldBe(to);
+ result.PerTenant.Length.ShouldBe(1);
+ result.PerTenant[0].TenantId.ShouldBe("tenant1");
+ result.PerTenant[0].Executions.Count.ShouldBe(10);
+ result.PerTenant[0].Executions.TotalTime.ShouldBe(500);
+ result.PerTenant[0].EffectiveTime.Count.ShouldBe(8);
+ result.PerTenant[0].EffectiveTime.TotalTime.ShouldBe(120.5);
+ result.PerTenant[0].Exceptions.Length.ShouldBe(1);
+ result.PerTenant[0].Exceptions[0].ExceptionType.ShouldBe("System.InvalidOperationException");
+ result.PerTenant[0].Exceptions[0].Failures.ShouldBe(2);
+ result.PerTenant[0].Exceptions[0].DeadLetters.ShouldBe(1);
+ }
+
+ [Fact]
+ public void time_range_uses_earliest_from_and_latest_to()
+ {
+ var earliest = DateTimeOffset.UtcNow.Subtract(10.Minutes());
+ var middle = DateTimeOffset.UtcNow.Subtract(5.Minutes());
+ var latest = DateTimeOffset.UtcNow;
+
+ var m1 = new MessageHandlingMetrics(MessageType, Destination,
+ new TimeRange(middle, latest),
+ [new PerTenantMetrics("t", new Executions(1, 1), new EffectiveTime(1, 1.0), [])]);
+
+ var m2 = new MessageHandlingMetrics(MessageType, Destination,
+ new TimeRange(earliest, middle),
+ [new PerTenantMetrics("t", new Executions(1, 1), new EffectiveTime(1, 1.0), [])]);
+
+ var result = MessageHandlingMetrics.Sum(MessageType, Destination, [m1, m2]);
+
+ result.Range.From.ShouldBe(earliest);
+ result.Range.To.ShouldBe(latest);
+ }
+
+ [Fact]
+ public void time_range_handles_null_from_and_to()
+ {
+ var m1 = new MessageHandlingMetrics(MessageType, Destination,
+ new TimeRange(null, DateTimeOffset.UtcNow),
+ [new PerTenantMetrics("t", new Executions(1, 1), new EffectiveTime(1, 1.0), [])]);
+
+ var m2 = new MessageHandlingMetrics(MessageType, Destination,
+ new TimeRange(null, null),
+ [new PerTenantMetrics("t", new Executions(1, 1), new EffectiveTime(1, 1.0), [])]);
+
+ var result = MessageHandlingMetrics.Sum(MessageType, Destination, [m1, m2]);
+
+ result.Range.From.ShouldBeNull();
+ result.Range.To.ShouldBe(m1.Range.To);
+ }
+
+ [Fact]
+ public void aggregates_per_tenant_metrics_by_tenant_id()
+ {
+ var range = new TimeRange(DateTimeOffset.UtcNow.Subtract(1.Minutes()), DateTimeOffset.UtcNow);
+
+ var m1 = new MessageHandlingMetrics(MessageType, Destination, range,
+ [
+ new PerTenantMetrics("tenantA", new Executions(10, 500), new EffectiveTime(8, 100.0), []),
+ new PerTenantMetrics("tenantB", new Executions(5, 200), new EffectiveTime(4, 50.0), [])
+ ]);
+
+ var m2 = new MessageHandlingMetrics(MessageType, Destination, range,
+ [
+ new PerTenantMetrics("tenantA", new Executions(20, 1000), new EffectiveTime(15, 300.0), []),
+ new PerTenantMetrics("tenantC", new Executions(3, 100), new EffectiveTime(2, 25.0), [])
+ ]);
+
+ var result = MessageHandlingMetrics.Sum(MessageType, Destination, [m1, m2]);
+
+ result.PerTenant.Length.ShouldBe(3);
+
+ var tenantA = result.PerTenant.Single(t => t.TenantId == "tenantA");
+ tenantA.Executions.Count.ShouldBe(30);
+ tenantA.Executions.TotalTime.ShouldBe(1500);
+ tenantA.EffectiveTime.Count.ShouldBe(23);
+ tenantA.EffectiveTime.TotalTime.ShouldBe(400.0);
+
+ var tenantB = result.PerTenant.Single(t => t.TenantId == "tenantB");
+ tenantB.Executions.Count.ShouldBe(5);
+ tenantB.Executions.TotalTime.ShouldBe(200);
+
+ var tenantC = result.PerTenant.Single(t => t.TenantId == "tenantC");
+ tenantC.Executions.Count.ShouldBe(3);
+ }
+
+ [Fact]
+ public void aggregates_exception_counts_by_exception_type()
+ {
+ var range = new TimeRange(DateTimeOffset.UtcNow.Subtract(1.Minutes()), DateTimeOffset.UtcNow);
+
+ var m1 = new MessageHandlingMetrics(MessageType, Destination, range,
+ [
+ new PerTenantMetrics("t1",
+ new Executions(10, 500),
+ new EffectiveTime(8, 100.0),
+ [
+ new ExceptionCounts("System.InvalidOperationException", 3, 1),
+ new ExceptionCounts("System.TimeoutException", 1, 0)
+ ])
+ ]);
+
+ var m2 = new MessageHandlingMetrics(MessageType, Destination, range,
+ [
+ new PerTenantMetrics("t1",
+ new Executions(20, 1000),
+ new EffectiveTime(15, 200.0),
+ [
+ new ExceptionCounts("System.InvalidOperationException", 5, 2),
+ new ExceptionCounts("System.ArgumentException", 2, 1)
+ ])
+ ]);
+
+ var result = MessageHandlingMetrics.Sum(MessageType, Destination, [m1, m2]);
+
+ var tenant = result.PerTenant.Single(t => t.TenantId == "t1");
+ tenant.Exceptions.Length.ShouldBe(3);
+
+ var invalidOp = tenant.Exceptions.Single(e => e.ExceptionType == "System.InvalidOperationException");
+ invalidOp.Failures.ShouldBe(8);
+ invalidOp.DeadLetters.ShouldBe(3);
+
+ var timeout = tenant.Exceptions.Single(e => e.ExceptionType == "System.TimeoutException");
+ timeout.Failures.ShouldBe(1);
+ timeout.DeadLetters.ShouldBe(0);
+
+ var argEx = tenant.Exceptions.Single(e => e.ExceptionType == "System.ArgumentException");
+ argEx.Failures.ShouldBe(2);
+ argEx.DeadLetters.ShouldBe(1);
+ }
+
+ [Fact]
+ public void sum_across_multiple_nodes()
+ {
+ var from1 = DateTimeOffset.UtcNow.Subtract(10.Minutes());
+ var to1 = DateTimeOffset.UtcNow.Subtract(5.Minutes());
+ var from2 = DateTimeOffset.UtcNow.Subtract(8.Minutes());
+ var to2 = DateTimeOffset.UtcNow;
+
+ var m1 = new MessageHandlingMetrics(MessageType, Destination,
+ new TimeRange(from1, to1),
+ [new PerTenantMetrics("t", new Executions(100, 5000), new EffectiveTime(90, 4000.0),
+ [new ExceptionCounts("System.Exception", 10, 5)])]);
+
+ var m2 = new MessageHandlingMetrics(MessageType, Destination,
+ new TimeRange(from2, to2),
+ [new PerTenantMetrics("t", new Executions(200, 8000), new EffectiveTime(180, 7000.0),
+ [new ExceptionCounts("System.Exception", 20, 8)])]);
+
+ var result = MessageHandlingMetrics.Sum(MessageType, Destination, [m1, m2]);
+
+ result.Range.From.ShouldBe(from1);
+ result.Range.To.ShouldBe(to2);
+
+ var tenant = result.PerTenant.Single();
+ tenant.Executions.Count.ShouldBe(300);
+ tenant.Executions.TotalTime.ShouldBe(13000);
+ tenant.EffectiveTime.Count.ShouldBe(270);
+ tenant.EffectiveTime.TotalTime.ShouldBe(11000.0);
+
+ var exceptions = tenant.Exceptions.Single();
+ exceptions.Failures.ShouldBe(30);
+ exceptions.DeadLetters.ShouldBe(13);
+ }
+
+ [Fact]
+ public void sum_by_destination_groups_across_message_types()
+ {
+ var range = new TimeRange(DateTimeOffset.UtcNow.Subtract(1.Minutes()), DateTimeOffset.UtcNow);
+ var dest1 = new Uri("tcp://localhost:5000");
+ var dest2 = new Uri("tcp://localhost:6000");
+
+ var metrics = new[]
+ {
+ new MessageHandlingMetrics("App.OrderPlaced", dest1, range,
+ [new PerTenantMetrics("t1", new Executions(10, 500), new EffectiveTime(8, 100.0), [])]),
+ new MessageHandlingMetrics("App.OrderShipped", dest1, range,
+ [new PerTenantMetrics("t1", new Executions(20, 1000), new EffectiveTime(15, 200.0), [])]),
+ new MessageHandlingMetrics("App.OrderPlaced", dest2, range,
+ [new PerTenantMetrics("t1", new Executions(5, 250), new EffectiveTime(4, 50.0), [])])
+ };
+
+ var results = MessageHandlingMetrics.SumByDestination(metrics);
+
+ results.Length.ShouldBe(2);
+
+ var forDest1 = results.Single(r => r.Destination == dest1);
+ forDest1.MessageType.ShouldBe("*");
+ var tenant1 = forDest1.PerTenant.Single(t => t.TenantId == "t1");
+ tenant1.Executions.Count.ShouldBe(30);
+ tenant1.Executions.TotalTime.ShouldBe(1500);
+ tenant1.EffectiveTime.Count.ShouldBe(23);
+ tenant1.EffectiveTime.TotalTime.ShouldBe(300.0);
+
+ var forDest2 = results.Single(r => r.Destination == dest2);
+ forDest2.MessageType.ShouldBe("*");
+ var tenant2 = forDest2.PerTenant.Single(t => t.TenantId == "t1");
+ tenant2.Executions.Count.ShouldBe(5);
+ tenant2.Executions.TotalTime.ShouldBe(250);
+ }
+
+ [Fact]
+ public void sum_by_destination_aggregates_tenants_across_message_types()
+ {
+ var range = new TimeRange(DateTimeOffset.UtcNow.Subtract(1.Minutes()), DateTimeOffset.UtcNow);
+ var dest = new Uri("tcp://localhost:5000");
+
+ var metrics = new[]
+ {
+ new MessageHandlingMetrics("App.OrderPlaced", dest, range,
+ [
+ new PerTenantMetrics("tenantA", new Executions(10, 500), new EffectiveTime(8, 100.0), []),
+ new PerTenantMetrics("tenantB", new Executions(5, 200), new EffectiveTime(4, 50.0), [])
+ ]),
+ new MessageHandlingMetrics("App.OrderShipped", dest, range,
+ [
+ new PerTenantMetrics("tenantA", new Executions(20, 1000), new EffectiveTime(15, 300.0), []),
+ new PerTenantMetrics("tenantC", new Executions(3, 100), new EffectiveTime(2, 25.0), [])
+ ])
+ };
+
+ var results = MessageHandlingMetrics.SumByDestination(metrics);
+
+ results.Length.ShouldBe(1);
+ var result = results[0];
+ result.MessageType.ShouldBe("*");
+ result.PerTenant.Length.ShouldBe(3);
+
+ var tenantA = result.PerTenant.Single(t => t.TenantId == "tenantA");
+ tenantA.Executions.Count.ShouldBe(30);
+
+ var tenantB = result.PerTenant.Single(t => t.TenantId == "tenantB");
+ tenantB.Executions.Count.ShouldBe(5);
+
+ var tenantC = result.PerTenant.Single(t => t.TenantId == "tenantC");
+ tenantC.Executions.Count.ShouldBe(3);
+ }
+
+ [Fact]
+ public void sum_by_destination_merges_time_ranges()
+ {
+ var earliest = DateTimeOffset.UtcNow.Subtract(10.Minutes());
+ var middle = DateTimeOffset.UtcNow.Subtract(5.Minutes());
+ var latest = DateTimeOffset.UtcNow;
+ var dest = new Uri("tcp://localhost:5000");
+
+ var metrics = new[]
+ {
+ new MessageHandlingMetrics("App.Msg1", dest, new TimeRange(earliest, middle),
+ [new PerTenantMetrics("t", new Executions(1, 1), new EffectiveTime(1, 1.0), [])]),
+ new MessageHandlingMetrics("App.Msg2", dest, new TimeRange(middle, latest),
+ [new PerTenantMetrics("t", new Executions(1, 1), new EffectiveTime(1, 1.0), [])])
+ };
+
+ var results = MessageHandlingMetrics.SumByDestination(metrics);
+
+ results.Length.ShouldBe(1);
+ results[0].Range.From.ShouldBe(earliest);
+ results[0].Range.To.ShouldBe(latest);
+ }
+
+ [Fact]
+ public void sum_by_message_type_groups_across_destinations()
+ {
+ var range = new TimeRange(DateTimeOffset.UtcNow.Subtract(1.Minutes()), DateTimeOffset.UtcNow);
+ var dest1 = new Uri("tcp://localhost:5000");
+ var dest2 = new Uri("tcp://localhost:6000");
+ var allDestination = new Uri("all://");
+
+ var metrics = new[]
+ {
+ new MessageHandlingMetrics("App.OrderPlaced", dest1, range,
+ [new PerTenantMetrics("t1", new Executions(10, 500), new EffectiveTime(8, 100.0), [])]),
+ new MessageHandlingMetrics("App.OrderPlaced", dest2, range,
+ [new PerTenantMetrics("t1", new Executions(20, 1000), new EffectiveTime(15, 200.0), [])]),
+ new MessageHandlingMetrics("App.OrderShipped", dest1, range,
+ [new PerTenantMetrics("t1", new Executions(5, 250), new EffectiveTime(4, 50.0), [])])
+ };
+
+ var results = MessageHandlingMetrics.SumByMessageType(metrics);
+
+ results.Length.ShouldBe(2);
+
+ var forOrderPlaced = results.Single(r => r.MessageType == "App.OrderPlaced");
+ forOrderPlaced.Destination.ShouldBe(allDestination);
+ var tenant1 = forOrderPlaced.PerTenant.Single(t => t.TenantId == "t1");
+ tenant1.Executions.Count.ShouldBe(30);
+ tenant1.Executions.TotalTime.ShouldBe(1500);
+ tenant1.EffectiveTime.Count.ShouldBe(23);
+ tenant1.EffectiveTime.TotalTime.ShouldBe(300.0);
+
+ var forOrderShipped = results.Single(r => r.MessageType == "App.OrderShipped");
+ forOrderShipped.Destination.ShouldBe(allDestination);
+ var tenant2 = forOrderShipped.PerTenant.Single(t => t.TenantId == "t1");
+ tenant2.Executions.Count.ShouldBe(5);
+ tenant2.Executions.TotalTime.ShouldBe(250);
+ }
+
+ [Fact]
+ public void sum_by_message_type_aggregates_tenants_across_destinations()
+ {
+ var range = new TimeRange(DateTimeOffset.UtcNow.Subtract(1.Minutes()), DateTimeOffset.UtcNow);
+ var dest1 = new Uri("tcp://localhost:5000");
+ var dest2 = new Uri("tcp://localhost:6000");
+ var allDestination = new Uri("all://");
+
+ var metrics = new[]
+ {
+ new MessageHandlingMetrics("App.OrderPlaced", dest1, range,
+ [
+ new PerTenantMetrics("tenantA", new Executions(10, 500), new EffectiveTime(8, 100.0), []),
+ new PerTenantMetrics("tenantB", new Executions(5, 200), new EffectiveTime(4, 50.0), [])
+ ]),
+ new MessageHandlingMetrics("App.OrderPlaced", dest2, range,
+ [
+ new PerTenantMetrics("tenantA", new Executions(20, 1000), new EffectiveTime(15, 300.0), []),
+ new PerTenantMetrics("tenantC", new Executions(3, 100), new EffectiveTime(2, 25.0), [])
+ ])
+ };
+
+ var results = MessageHandlingMetrics.SumByMessageType(metrics);
+
+ results.Length.ShouldBe(1);
+ var result = results[0];
+ result.Destination.ShouldBe(allDestination);
+ result.PerTenant.Length.ShouldBe(3);
+
+ var tenantA = result.PerTenant.Single(t => t.TenantId == "tenantA");
+ tenantA.Executions.Count.ShouldBe(30);
+
+ var tenantB = result.PerTenant.Single(t => t.TenantId == "tenantB");
+ tenantB.Executions.Count.ShouldBe(5);
+
+ var tenantC = result.PerTenant.Single(t => t.TenantId == "tenantC");
+ tenantC.Executions.Count.ShouldBe(3);
+ }
+
+ [Fact]
+ public void sum_by_message_type_merges_time_ranges()
+ {
+ var earliest = DateTimeOffset.UtcNow.Subtract(10.Minutes());
+ var middle = DateTimeOffset.UtcNow.Subtract(5.Minutes());
+ var latest = DateTimeOffset.UtcNow;
+ var allDestination = new Uri("all://");
+
+ var metrics = new[]
+ {
+ new MessageHandlingMetrics("App.Msg1", new Uri("tcp://localhost:5000"), new TimeRange(earliest, middle),
+ [new PerTenantMetrics("t", new Executions(1, 1), new EffectiveTime(1, 1.0), [])]),
+ new MessageHandlingMetrics("App.Msg1", new Uri("tcp://localhost:6000"), new TimeRange(middle, latest),
+ [new PerTenantMetrics("t", new Executions(1, 1), new EffectiveTime(1, 1.0), [])])
+ };
+
+ var results = MessageHandlingMetrics.SumByMessageType(metrics);
+
+ results.Length.ShouldBe(1);
+ results[0].Destination.ShouldBe(allDestination);
+ results[0].Range.From.ShouldBe(earliest);
+ results[0].Range.To.ShouldBe(latest);
+ }
+
+ [Fact]
+ public void weight_of_one_returns_same_instance()
+ {
+ var range = new TimeRange(DateTimeOffset.UtcNow.Subtract(1.Minutes()), DateTimeOffset.UtcNow);
+ var metrics = new MessageHandlingMetrics(MessageType, Destination, range,
+ [new PerTenantMetrics("t1", new Executions(10, 500), new EffectiveTime(8, 100.0),
+ [new ExceptionCounts("System.Exception", 2, 1)])]);
+
+ var result = metrics.Weight(1);
+
+ result.ShouldBeSameAs(metrics);
+ }
+
+ [Fact]
+ public void weight_multiplies_all_numeric_values()
+ {
+ var range = new TimeRange(DateTimeOffset.UtcNow.Subtract(1.Minutes()), DateTimeOffset.UtcNow);
+ var metrics = new MessageHandlingMetrics(MessageType, Destination, range,
+ [
+ new PerTenantMetrics("t1",
+ new Executions(10, 500),
+ new EffectiveTime(8, 100.0),
+ [new ExceptionCounts("System.Exception", 3, 1)]),
+ new PerTenantMetrics("t2",
+ new Executions(5, 200),
+ new EffectiveTime(4, 50.0),
+ [new ExceptionCounts("System.ArgumentException", 2, 0)])
+ ]);
+
+ var result = metrics.Weight(3);
+
+ result.MessageType.ShouldBe(MessageType);
+ result.Destination.ShouldBe(Destination);
+ result.Range.ShouldBe(range);
+ result.PerTenant.Length.ShouldBe(2);
+
+ var t1 = result.PerTenant.Single(t => t.TenantId == "t1");
+ t1.Executions.Count.ShouldBe(30);
+ t1.Executions.TotalTime.ShouldBe(1500);
+ t1.EffectiveTime.Count.ShouldBe(24);
+ t1.EffectiveTime.TotalTime.ShouldBe(300.0);
+ t1.Exceptions.Length.ShouldBe(1);
+ t1.Exceptions[0].ExceptionType.ShouldBe("System.Exception");
+ t1.Exceptions[0].Failures.ShouldBe(9);
+ t1.Exceptions[0].DeadLetters.ShouldBe(3);
+
+ var t2 = result.PerTenant.Single(t => t.TenantId == "t2");
+ t2.Executions.Count.ShouldBe(15);
+ t2.Executions.TotalTime.ShouldBe(600);
+ t2.EffectiveTime.Count.ShouldBe(12);
+ t2.EffectiveTime.TotalTime.ShouldBe(150.0);
+ t2.Exceptions[0].Failures.ShouldBe(6);
+ t2.Exceptions[0].DeadLetters.ShouldBe(0);
+ }
+
+ [Fact]
+ public void weight_throws_for_zero()
+ {
+ var metrics = new MessageHandlingMetrics(MessageType, Destination,
+ new TimeRange(null, null), []);
+
+ Should.Throw(() => metrics.Weight(0));
+ }
+
+ [Fact]
+ public void weight_throws_for_negative()
+ {
+ var metrics = new MessageHandlingMetrics(MessageType, Destination,
+ new TimeRange(null, null), []);
+
+ Should.Throw(() => metrics.Weight(-1));
+ }
+}
diff --git a/src/Testing/Wolverine.ComplianceTests/DeadLetterAdminCompliance.cs b/src/Testing/Wolverine.ComplianceTests/DeadLetterAdminCompliance.cs
index 693d48ab5..7d11b81e4 100644
--- a/src/Testing/Wolverine.ComplianceTests/DeadLetterAdminCompliance.cs
+++ b/src/Testing/Wolverine.ComplianceTests/DeadLetterAdminCompliance.cs
@@ -1,3 +1,4 @@
+using JasperFx.Core;
using JasperFx.Core.Reflection;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
diff --git a/src/Wolverine/AssemblyAttributes.cs b/src/Wolverine/AssemblyAttributes.cs
index b5bda4701..d5cfad4e6 100644
--- a/src/Wolverine/AssemblyAttributes.cs
+++ b/src/Wolverine/AssemblyAttributes.cs
@@ -4,6 +4,7 @@
using Wolverine.Attributes;
[assembly: JasperFx.Core.TypeScanning.IgnoreAssembly]
+[assembly: ExcludeFromServiceCapabilities]
[assembly: JasperFxAssembly]
[assembly: WolverineFeature]
diff --git a/src/Wolverine/Attributes/ExcludeFromServiceCapabilitiesAttribute.cs b/src/Wolverine/Attributes/ExcludeFromServiceCapabilitiesAttribute.cs
new file mode 100644
index 000000000..21cd0fd22
--- /dev/null
+++ b/src/Wolverine/Attributes/ExcludeFromServiceCapabilitiesAttribute.cs
@@ -0,0 +1,8 @@
+namespace Wolverine.Attributes;
+
+///
+/// When applied at the assembly level, tells Wolverine to exclude all message types
+/// and handlers from this assembly when building ServiceCapabilities descriptions.
+///
+[AttributeUsage(AttributeTargets.Assembly)]
+public class ExcludeFromServiceCapabilitiesAttribute : Attribute;
diff --git a/src/Wolverine/Configuration/Capabilities/PersistenceCountsUpdated.cs b/src/Wolverine/Configuration/Capabilities/PersistenceCountsUpdated.cs
new file mode 100644
index 000000000..0c13a80ff
--- /dev/null
+++ b/src/Wolverine/Configuration/Capabilities/PersistenceCountsUpdated.cs
@@ -0,0 +1,5 @@
+using Wolverine.Logging;
+
+namespace Wolverine.Configuration.Capabilities;
+
+public record PersistenceCountsUpdated(Uri DatabaseUri, PersistedCounts Counts);
\ No newline at end of file
diff --git a/src/Wolverine/Configuration/Capabilities/ServiceCapabilities.cs b/src/Wolverine/Configuration/Capabilities/ServiceCapabilities.cs
index 907dac614..03dc5f86b 100644
--- a/src/Wolverine/Configuration/Capabilities/ServiceCapabilities.cs
+++ b/src/Wolverine/Configuration/Capabilities/ServiceCapabilities.cs
@@ -2,6 +2,7 @@
using System.Text.Json.Serialization;
using JasperFx.Core.Reflection;
using JasperFx.Descriptors;
+using Wolverine.Attributes;
using JasperFx.Events;
using JasperFx.Events.Descriptors;
using Microsoft.Extensions.DependencyInjection;
@@ -23,8 +24,11 @@ public ServiceCapabilities(WolverineOptions options) : base(options)
}
public DateTimeOffset Evaluated { get; set; } = DateTimeOffset.UtcNow;
+
+ [JsonConverter(typeof(VersionJsonConverter))]
public Version Version { get; set; }
+ [JsonConverter(typeof(VersionJsonConverter))]
public Version? WolverineVersion { get; set; }
public List EventStores { get; set; } = [];
@@ -69,6 +73,7 @@ private static void readEndpoints(IWolverineRuntime runtime, ServiceCapabilities
{
foreach (var endpoint in runtime.Options.Transports.AllEndpoints().OrderBy(x => x.Uri.ToString()))
{
+ if (endpoint.Role == EndpointRole.System) continue;
capabilities.MessagingEndpoints.Add(new EndpointDescriptor(endpoint));
}
}
@@ -78,6 +83,7 @@ private static void readMessageTypes(IWolverineRuntime runtime, ServiceCapabilit
var messageTypes = runtime.Options.Discovery.FindAllMessages(runtime.Options.HandlerGraph);
foreach (var messageType in messageTypes.OrderBy(x => x.FullNameInCode()))
{
+ if (messageType.Assembly.HasAttribute()) continue;
capabilities.Messages.Add(new MessageDescriptor(messageType, runtime));
}
}
diff --git a/src/Wolverine/Persistence/Durability/DeadLetterManagement/DeadLetterEnvelopeQuery.cs b/src/Wolverine/Persistence/Durability/DeadLetterManagement/DeadLetterEnvelopeQuery.cs
index b7150e9ba..6ca70f295 100644
--- a/src/Wolverine/Persistence/Durability/DeadLetterManagement/DeadLetterEnvelopeQuery.cs
+++ b/src/Wolverine/Persistence/Durability/DeadLetterManagement/DeadLetterEnvelopeQuery.cs
@@ -1,3 +1,5 @@
+using JasperFx.Core;
+
namespace Wolverine.Persistence.Durability.DeadLetterManagement;
public class DeadLetterEnvelopeQuery
diff --git a/src/Wolverine/Persistence/Durability/DeadLetterManagement/TimeRange.cs b/src/Wolverine/Persistence/Durability/DeadLetterManagement/TimeRange.cs
deleted file mode 100644
index f5f8ed15d..000000000
--- a/src/Wolverine/Persistence/Durability/DeadLetterManagement/TimeRange.cs
+++ /dev/null
@@ -1,6 +0,0 @@
-namespace Wolverine.Persistence.Durability.DeadLetterManagement;
-
-public record TimeRange(DateTimeOffset? From, DateTimeOffset? To)
-{
- public static TimeRange AllTime() => new TimeRange(null, null);
-}
\ No newline at end of file
diff --git a/src/Wolverine/Persistence/Durability/IDeadLetters.cs b/src/Wolverine/Persistence/Durability/IDeadLetters.cs
index 6f095ce20..245e604f2 100644
--- a/src/Wolverine/Persistence/Durability/IDeadLetters.cs
+++ b/src/Wolverine/Persistence/Durability/IDeadLetters.cs
@@ -1,3 +1,4 @@
+using JasperFx.Core;
using Wolverine.Persistence.Durability.DeadLetterManagement;
namespace Wolverine.Persistence.Durability;
diff --git a/src/Wolverine/Persistence/Durability/NullMessageStore.cs b/src/Wolverine/Persistence/Durability/NullMessageStore.cs
index 010c4c5fc..67d22b51d 100644
--- a/src/Wolverine/Persistence/Durability/NullMessageStore.cs
+++ b/src/Wolverine/Persistence/Durability/NullMessageStore.cs
@@ -1,3 +1,4 @@
+using JasperFx.Core;
using JasperFx.Descriptors;
using Wolverine.Logging;
using Wolverine.Persistence.Durability.DeadLetterManagement;
diff --git a/src/Wolverine/Persistence/PersistenceMetrics.cs b/src/Wolverine/Persistence/PersistenceMetrics.cs
index e981bb7db..f6b793f13 100644
--- a/src/Wolverine/Persistence/PersistenceMetrics.cs
+++ b/src/Wolverine/Persistence/PersistenceMetrics.cs
@@ -3,6 +3,8 @@
using Microsoft.Extensions.Logging;
using Wolverine.Logging;
using Wolverine.Persistence.Durability;
+using Wolverine.Runtime;
+using Wolverine.Runtime.Agents;
namespace Wolverine.Persistence;
@@ -14,11 +16,14 @@ public class PersistenceMetrics : IDisposable
private readonly ObservableGauge _scheduled;
private CancellationTokenSource _cancellation;
private Task _task;
+ private readonly IWolverineObserver _observer;
- public PersistenceMetrics(Meter meter, DurabilitySettings settings, string? databaseName)
+ public PersistenceMetrics(IWolverineRuntime runtime, DurabilitySettings settings, string? databaseName)
{
_settings = settings;
_cancellation = CancellationTokenSource.CreateLinkedTokenSource(settings.Cancellation);
+ _observer = runtime.Observer;
+ var meter = runtime.Meter;
if (databaseName.IsEmpty())
{
@@ -53,6 +58,7 @@ public void StartPolling(ILogger logger, IMessageStore store)
try
{
Counts = await store.Admin.FetchCountsAsync();
+ _observer.PersistedCounts(store.Uri, Counts);
}
catch (TaskCanceledException)
{
diff --git a/src/Wolverine/Runtime/Agents/IWolverineObserver.cs b/src/Wolverine/Runtime/Agents/IWolverineObserver.cs
index 813d10de1..2dc5e99a8 100644
--- a/src/Wolverine/Runtime/Agents/IWolverineObserver.cs
+++ b/src/Wolverine/Runtime/Agents/IWolverineObserver.cs
@@ -1,4 +1,6 @@
using Wolverine.Configuration;
+using Wolverine.Logging;
+using Wolverine.Runtime.Metrics;
using Wolverine.Runtime.Routing;
using Wolverine.Transports;
@@ -23,6 +25,8 @@ public interface IWolverineObserver
Task BackPressureTriggered(Endpoint endpoint, IListeningAgent agent);
Task BackPressureLifted(Endpoint endpoint);
+ void PersistedCounts(Uri storeUri, PersistedCounts counts);
+ void MessageHandlingMetricsExported(MessageHandlingMetrics metrics);
}
internal class PersistenceWolverineObserver : IWolverineObserver
@@ -44,6 +48,16 @@ public Task BackPressureLifted(Endpoint endpoint)
return Task.CompletedTask;
}
+ public void PersistedCounts(Uri storeUri, PersistedCounts counts)
+ {
+ // Nothing
+ }
+
+ public void MessageHandlingMetricsExported(MessageHandlingMetrics metrics)
+ {
+ // Nothing
+ }
+
public async Task AssumedLeadership()
{
await _runtime.Storage.Nodes.LogRecordsAsync(NodeRecord.For(_runtime.Options,
diff --git a/src/Wolverine/Runtime/HandlerPipeline.cs b/src/Wolverine/Runtime/HandlerPipeline.cs
index 151810954..6e595a39a 100644
--- a/src/Wolverine/Runtime/HandlerPipeline.cs
+++ b/src/Wolverine/Runtime/HandlerPipeline.cs
@@ -109,6 +109,8 @@ public async Task InvokeAsync(Envelope envelope, IChannelCallback channel, Activ
public async ValueTask TryDeserializeEnvelope(Envelope envelope)
{
+ if (envelope.Message != null) return NullContinuation.Instance;
+
// Try to deserialize
try
{
diff --git a/src/Wolverine/Runtime/Metrics/IHandlerMetricsData.cs b/src/Wolverine/Runtime/Metrics/IHandlerMetricsData.cs
index f9a37c3bb..b3f36055b 100644
--- a/src/Wolverine/Runtime/Metrics/IHandlerMetricsData.cs
+++ b/src/Wolverine/Runtime/Metrics/IHandlerMetricsData.cs
@@ -1,7 +1,22 @@
namespace Wolverine.Runtime.Metrics;
+///
+/// Represents a single metrics data point captured during message handling. Implementations
+/// are posted to the batching pipeline where they
+/// are applied to the appropriate counters.
+///
public interface IHandlerMetricsData
{
+ ///
+ /// The tenant identifier for multi-tenant routing. When null, the data point
+ /// is assigned to StorageConstants.DefaultTenantId during accumulation.
+ ///
string TenantId { get; }
+
+ ///
+ /// Applies this data point to the mutable per-tenant tracking counters.
+ /// Called under a lock within .
+ ///
+ /// The per-tenant tracking instance whose counters will be mutated.
void Apply(PerTenantTracking tracking);
-}
\ No newline at end of file
+}
diff --git a/src/Wolverine/Runtime/Metrics/MessageHandlingCounts.cs b/src/Wolverine/Runtime/Metrics/MessageHandlingCounts.cs
index 55579f23b..642b9b2cb 100644
--- a/src/Wolverine/Runtime/Metrics/MessageHandlingCounts.cs
+++ b/src/Wolverine/Runtime/Metrics/MessageHandlingCounts.cs
@@ -3,33 +3,62 @@
namespace Wolverine.Runtime.Metrics;
-
+///
+/// Maintains a set of instances for a specific message type
+/// and destination combination. When an record arrives,
+/// resolves the appropriate tenant tracker (creating one on first access)
+/// and delegates to . Null tenant IDs are mapped to
+/// StorageConstants.DefaultTenantId.
+///
public class MessageHandlingCounts
{
+ ///
+ /// The fully-qualified CLR message type name this instance tracks.
+ ///
public string MessageType { get; }
+
+ ///
+ /// The destination endpoint URI this instance tracks.
+ ///
public Uri Destination { get; }
+ ///
+ /// Creates a new counts tracker for a specific message type and destination.
+ ///
+ /// The fully-qualified CLR message type name.
+ /// The destination endpoint URI.
public MessageHandlingCounts(string messageType, Uri destination)
{
MessageType = messageType;
Destination = destination;
}
+ ///
+ /// Lazily-populated cache of instances keyed by tenant ID.
+ /// New tenants are automatically created on first access.
+ ///
public LightweightCache PerTenant { get; } =
new(tenantId => new PerTenantTracking(tenantId));
+ ///
+ /// Routes a metrics data point to the correct per-tenant tracker and applies it.
+ ///
+ /// The metrics data point to accumulate.
public void Increment(IHandlerMetricsData metricsData)
{
var perTenant = PerTenant[metricsData.TenantId ?? StorageConstants.DefaultTenantId];
metricsData.Apply(perTenant);
}
+ ///
+ /// Clears all per-tenant counters without removing the tenant entries.
+ ///
public void Clear()
{
foreach (var tracking in PerTenant)
{
tracking.Clear();
}
-
+
}
-}
\ No newline at end of file
+}
diff --git a/src/Wolverine/Runtime/Metrics/MessageHandlingMetrics.cs b/src/Wolverine/Runtime/Metrics/MessageHandlingMetrics.cs
index b266eaefe..425a75530 100644
--- a/src/Wolverine/Runtime/Metrics/MessageHandlingMetrics.cs
+++ b/src/Wolverine/Runtime/Metrics/MessageHandlingMetrics.cs
@@ -1,18 +1,245 @@
-using Wolverine.Persistence.Durability.DeadLetterManagement;
+using JasperFx.Core;
namespace Wolverine.Runtime.Metrics;
+///
+/// Immutable snapshot of message handling metrics for a specific message type and destination
+/// combination over a time range. Produced by
+/// on each sampling period. Contains per-tenant breakdowns of execution counts, effective times,
+/// and exception counts.
+///
+/// The fully-qualified CLR message type name. Set to "*" in
+/// results to indicate aggregation across all message types.
+/// The destination endpoint URI. Set to all:// in
+/// results to indicate aggregation across all destinations.
+/// The time window this snapshot covers, from the start of accumulation to the export timestamp.
+/// Per-tenant metrics breakdowns. Empty when no messages were processed in the time range.
public record MessageHandlingMetrics(
- int NodeNumber,
string MessageType,
Uri Destination,
TimeRange Range,
- PerTenantMetrics[] PerTenant);
+ PerTenantMetrics[] PerTenant // TODO -- we can remove this
+)
+{
+ ///
+ /// Returns a grouping key in the format "MessageType@Destination" for use in
+ /// downstream aggregation and lookup scenarios.
+ ///
+ /// A composite key string.
+ public string Key() => $"{MessageType}@{Destination}";
-public record PerTenantMetrics(string TenantId, Executions Executions, EffectiveTime EffectiveTime, ExceptionCounts[] Exceptions);
+ ///
+ /// Combines multiple snapshots into a single aggregate.
+ /// The time range spans the earliest From to the latest To across all inputs.
+ /// Per-tenant data is grouped by tenant ID, with execution counts, effective times, and
+ /// exception counts summed within each tenant.
+ ///
+ /// The message type to assign to the resulting aggregate.
+ /// The destination to assign to the resulting aggregate.
+ /// The metrics snapshots to aggregate.
+ /// A single aggregated snapshot. Returns an empty all-time snapshot if the input is empty.
+ public static MessageHandlingMetrics Sum(string messageType, Uri destination, IReadOnlyList metrics)
+ {
+ if (metrics.Count == 0)
+ {
+ return new MessageHandlingMetrics(messageType, destination, TimeRange.AllTime(), []);
+ }
-public record Executions(int Count, long TotalTime);
+ var from = metrics.Where(m => m.Range.From.HasValue).Select(m => m.Range.From!.Value).DefaultIfEmpty().Min();
+ var to = metrics.Where(m => m.Range.To.HasValue).Select(m => m.Range.To!.Value).DefaultIfEmpty().Max();
+ var range = new TimeRange(
+ metrics.Any(m => m.Range.From.HasValue) ? from : null,
+ metrics.Any(m => m.Range.To.HasValue) ? to : null);
-public record EffectiveTime(int Count, double TotalTime);
+ var perTenant = metrics
+ .SelectMany(m => m.PerTenant)
+ .GroupBy(t => t.TenantId)
+ .Select(PerTenantMetrics.Sum)
+ .ToArray();
-public record ExceptionCounts(string ExceptionType, int Failures, int DeadLetters);
+ return new MessageHandlingMetrics(messageType, destination, range, perTenant);
+ }
+
+ ///
+ /// Combines multiple snapshots that share the same
+ /// message type and destination. Takes the message type and destination from the first element.
+ ///
+ /// The metrics snapshots to aggregate. Must contain at least one element.
+ /// A single aggregated snapshot.
+ public static MessageHandlingMetrics Sum(MessageHandlingMetrics[] metrics)
+ {
+ return Sum(metrics[0].MessageType, metrics[0].Destination, metrics);
+ }
+
+ ///
+ /// Groups the input metrics by and sums each group, collating
+ /// across message types. Each resulting snapshot has its set to
+ /// "*" to indicate aggregation across all message types for that destination.
+ ///
+ /// The metrics snapshots to group and aggregate.
+ /// One aggregated snapshot per unique destination URI.
+ public static MessageHandlingMetrics[] SumByDestination(MessageHandlingMetrics[] metrics)
+ {
+ return metrics
+ .GroupBy(m => m.Destination)
+ .Select(g => Sum("*", g.Key, g.ToList()))
+ .ToArray();
+ }
+
+ ///
+ /// Groups the input metrics by and sums each group, collating
+ /// across destinations. Each resulting snapshot has its set to
+ /// all:// to indicate aggregation across all destinations for that message type.
+ ///
+ /// The metrics snapshots to group and aggregate.
+ /// One aggregated snapshot per unique message type.
+ public static MessageHandlingMetrics[] SumByMessageType(MessageHandlingMetrics[] metrics)
+ {
+ var allDestination = new Uri("all://");
+ return metrics
+ .GroupBy(m => m.MessageType)
+ .Select(g => Sum(g.Key, allDestination, g.ToList()))
+ .ToArray();
+ }
+
+ ///
+ /// Multiplies every numeric value in this snapshot and all nested records by the given
+ /// . Returns this unchanged when
+ /// is 1. Intended for building weighted average calculations where snapshots from different
+ /// time periods or nodes need proportional scaling before summation.
+ ///
+ /// The multiplier to apply. Must be a positive integer.
+ /// The same instance if weight is 1, otherwise a new weighted copy.
+ /// Thrown when is zero or negative.
+ public MessageHandlingMetrics Weight(int weight)
+ {
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(weight);
+
+ if (weight == 1) return this;
+
+ return new MessageHandlingMetrics(MessageType, Destination, Range,
+ PerTenant.Select(t => t.Weight(weight)).ToArray());
+ }
+}
+
+///
+/// Immutable snapshot of metrics for a single tenant within a
+/// record. Compiled from during each export cycle.
+/// Contains execution statistics, effective time statistics, and per-exception-type error counts.
+///
+/// The tenant identifier. Defaults to StorageConstants.DefaultTenantId for
+/// messages without an explicit tenant.
+/// Handler execution count and total execution time in milliseconds.
+/// Message completion count and total end-to-end time in milliseconds.
+/// Per-exception-type counts of failures and dead letters, ordered alphabetically by type name.
+public record PerTenantMetrics(string TenantId, Executions Executions, EffectiveTime EffectiveTime, ExceptionCounts[] Exceptions)
+{
+ ///
+ /// Sums a group of records sharing the same tenant ID.
+ /// Execution counts and times are summed directly. Exception counts are grouped by
+ /// exception type and summed via .
+ ///
+ /// A grouping of per-tenant metrics keyed by tenant ID.
+ /// A single aggregated per-tenant snapshot.
+ public static PerTenantMetrics Sum(IGrouping group)
+ {
+ var tenantId = group.Key;
+ var executions = new Executions(
+ group.Sum(t => t.Executions.Count),
+ group.Sum(t => t.Executions.TotalTime));
+ var effectiveTime = new EffectiveTime(
+ group.Sum(t => t.EffectiveTime.Count),
+ group.Sum(t => t.EffectiveTime.TotalTime));
+ var exceptions = group
+ .SelectMany(t => t.Exceptions)
+ .GroupBy(e => e.ExceptionType)
+ .Select(ExceptionCounts.Sum)
+ .ToArray();
+
+ return new PerTenantMetrics(tenantId, executions, effectiveTime, exceptions);
+ }
+
+ ///
+ /// Multiplies all numeric values (execution counts and times, effective time counts and times,
+ /// and all exception failure/dead-letter counts) by the given weight.
+ ///
+ /// The multiplier to apply.
+ /// A new weighted copy of this per-tenant snapshot.
+ public PerTenantMetrics Weight(int weight)
+ {
+ return new PerTenantMetrics(TenantId,
+ Executions.Weight(weight),
+ EffectiveTime.Weight(weight),
+ Exceptions.Select(e => e.Weight(weight)).ToArray());
+ }
+}
+
+///
+/// Handler execution statistics: the number of executions and total wall-clock execution time.
+/// Accumulated from data points where each handler invocation
+/// increments by one and adds its duration to .
+/// Average execution time can be calculated as TotalTime / Count.
+///
+/// The number of handler executions.
+/// The sum of handler execution durations in milliseconds.
+public record Executions(int Count, long TotalTime)
+{
+ ///
+ /// Multiplies both and by the given weight.
+ ///
+ /// The multiplier to apply.
+ /// A new weighted copy.
+ public Executions Weight(int weight) => new(Count * weight, TotalTime * weight);
+}
+
+///
+/// End-to-end (effective) time statistics: the number of completed messages and their total
+/// elapsed time from send to completion. Accumulated from
+/// data points where each completion increments by one and adds its
+/// elapsed time to . This measures latency from Envelope.SentAt
+/// to processing completion, capturing queueing, transport, and handler execution time combined.
+/// Average effective time can be calculated as TotalTime / Count.
+///
+/// The number of messages that completed processing.
+/// The sum of end-to-end elapsed times in milliseconds.
+public record EffectiveTime(int Count, double TotalTime)
+{
+ ///
+ /// Multiplies both and by the given weight.
+ ///
+ /// The multiplier to apply.
+ /// A new weighted copy.
+ public EffectiveTime Weight(int weight) => new(Count * weight, TotalTime * weight);
+}
+
+///
+/// Per-exception-type error counts within a tenant. Tracks both transient failures
+/// (from ) where the message may be retried, and dead letters
+/// (from ) where the message was moved to the error queue
+/// after exhausting retry policies.
+///
+/// The fully-qualified CLR exception type name (e.g. "System.InvalidOperationException").
+/// The number of handler failures (exceptions thrown) for this exception type.
+/// The number of messages dead-lettered due to this exception type.
+public record ExceptionCounts(string ExceptionType, int Failures, int DeadLetters)
+{
+ ///
+ /// Sums a group of records sharing the same exception type.
+ ///
+ /// A grouping of exception counts keyed by exception type name.
+ /// A single aggregated exception count.
+ public static ExceptionCounts Sum(IGrouping group)
+ {
+ return new ExceptionCounts(
+ group.Key,
+ group.Sum(e => e.Failures),
+ group.Sum(e => e.DeadLetters));
+ }
+
+ ///
+ /// Multiplies both and by the given weight.
+ ///
+ /// The multiplier to apply.
+ /// A new weighted copy.
+ public ExceptionCounts Weight(int weight) => new(ExceptionType, Failures * weight, DeadLetters * weight);
+}
diff --git a/src/Wolverine/Runtime/Metrics/MessageMetrics.cs b/src/Wolverine/Runtime/Metrics/MessageMetrics.cs
index 236565ab7..ae754bf68 100644
--- a/src/Wolverine/Runtime/Metrics/MessageMetrics.cs
+++ b/src/Wolverine/Runtime/Metrics/MessageMetrics.cs
@@ -3,6 +3,8 @@
namespace Wolverine.Runtime.Metrics;
///
-/// Message type to send remotely
+/// Envelope message type used to transmit an array of
+/// snapshots, typically for remote publishing or aggregation across nodes.
///
-public record MessageMetrics(MessageHandlingMetrics[] Handled);
\ No newline at end of file
+/// The array of metrics snapshots to transmit.
+public record MessageMetrics(MessageHandlingMetrics[] Handled);
diff --git a/src/Wolverine/Runtime/Metrics/MessageTypeMetricsAccumulator.cs b/src/Wolverine/Runtime/Metrics/MessageTypeMetricsAccumulator.cs
index 6f7b698bd..31303362f 100644
--- a/src/Wolverine/Runtime/Metrics/MessageTypeMetricsAccumulator.cs
+++ b/src/Wolverine/Runtime/Metrics/MessageTypeMetricsAccumulator.cs
@@ -4,13 +4,34 @@
namespace Wolverine.Runtime.Metrics;
+///
+/// Accumulates handler metrics for a single message type and destination combination using
+/// a batching pipeline. records are posted to
+/// which batches them (up to 500 items or 250ms) before applying to the underlying
+/// . On each sampling period,
+/// snapshots the accumulated counters into an immutable
+/// record and resets the counters for the next period.
+///
public class MessageTypeMetricsAccumulator
{
private readonly object _syncLock = new();
-
+
+ ///
+ /// The fully-qualified CLR message type name being tracked.
+ ///
public string MessageType { get; }
+
+ ///
+ /// The destination endpoint URI being tracked.
+ ///
public Uri Destination { get; }
+ ///
+ /// Creates a new accumulator for a specific message type and destination. Initializes the
+ /// batching pipeline that feeds into .
+ ///
+ /// The fully-qualified CLR message type name.
+ /// The destination endpoint URI.
public MessageTypeMetricsAccumulator(string messageType, Uri destination)
{
MessageType = messageType;
@@ -20,13 +41,29 @@ public MessageTypeMetricsAccumulator(string messageType, Uri destination)
var processor = new Block(Process);
EntryPoint = processor.BatchUpstream(250.Milliseconds(), 500);
}
-
+
+ ///
+ /// The start of the current accumulation time window. Reset to DateTimeOffset.UtcNow
+ /// after each call.
+ ///
public DateTimeOffset Starting { get; private set; } = DateTimeOffset.UtcNow;
+ ///
+ /// The underlying mutable counter storage for this message type and destination.
+ ///
public MessageHandlingCounts Counts { get; }
+ ///
+ /// The entry point for the batching pipeline. Post records
+ /// here; they will be batched and forwarded to for accumulation.
+ ///
public IBlock EntryPoint { get; }
+ ///
+ /// Processes a batch of metrics data points by applying each to the underlying
+ /// under a lock. Called by the batching pipeline.
+ ///
+ /// A batch of metrics data points to accumulate.
public void Process(IHandlerMetricsData[] instruments)
{
lock (_syncLock)
@@ -45,15 +82,21 @@ public void Process(IHandlerMetricsData[] instruments)
}
}
+ ///
+ /// Snapshots the accumulated counters into an immutable
+ /// record spanning from to now, then resets the counters and advances
+ /// to the current time for the next accumulation window. Called by
+ /// on each sampling period.
+ ///
+ /// The assigned node number for this Wolverine instance.
+ /// An immutable metrics snapshot for the completed accumulation window.
public MessageHandlingMetrics TriggerExport(int nodeNumber)
{
lock (_syncLock)
{
var time = DateTimeOffset.UtcNow;
-
- var metrics = new MessageHandlingMetrics(
- nodeNumber,
- MessageType,
+
+ var metrics = new MessageHandlingMetrics(MessageType,
Destination,
new TimeRange(Starting, time),
Counts.PerTenant.OrderBy(x => x.TenantId).Select(x => x.CompileAndReset()).ToArray());
@@ -63,4 +106,4 @@ public MessageHandlingMetrics TriggerExport(int nodeNumber)
return metrics;
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Wolverine/Runtime/Metrics/MetricsAccumulator.cs b/src/Wolverine/Runtime/Metrics/MetricsAccumulator.cs
index 0c2fb29e2..496f96663 100644
--- a/src/Wolverine/Runtime/Metrics/MetricsAccumulator.cs
+++ b/src/Wolverine/Runtime/Metrics/MetricsAccumulator.cs
@@ -5,26 +5,52 @@
namespace Wolverine.Runtime.Metrics;
+///
+/// Top-level coordinator for message handling metrics collection. Maintains an immutable
+/// collection of instances, one per unique
+/// (message type, destination) pair. Runs a background loop that periodically (every
+/// WolverineOptions.Metrics.SamplingPeriod, default 5 seconds) triggers each
+/// accumulator to export its snapshot via .
+/// Only snapshots with at least one tenant data point are published.
+///
// TODO -- make this lazy on WolverineRuntime
public class MetricsAccumulator : IAsyncDisposable
{
private readonly IWolverineRuntime _runtime;
private readonly object _syncLock = new();
-
+
private ImmutableArray _accumulators = ImmutableArray.Empty;
private Task _runner;
+ ///
+ /// Creates a new metrics accumulator bound to the given runtime.
+ ///
+ /// The Wolverine runtime providing cancellation, options, and observer access.
public MetricsAccumulator(IWolverineRuntime runtime)
{
_runtime = runtime;
}
+ ///
+ /// Finds or creates a for the given message type
+ /// and endpoint. Uses lock-free reads with a fallback to a locked creation path.
+ ///
+ /// The fully-qualified CLR message type name.
+ /// The endpoint whose URI identifies the destination.
+ /// The accumulator for the given message type and destination.
public MessageTypeMetricsAccumulator FindAccumulator(string messageTypeName, Endpoint endpoint)
{
var endpointUri = endpoint.Uri;
return FindAccumulator(messageTypeName, endpointUri);
}
+ ///
+ /// Finds or creates a for the given message type
+ /// and destination URI. Uses lock-free reads with a fallback to a locked creation path.
+ ///
+ /// The fully-qualified CLR message type name.
+ /// The destination endpoint URI.
+ /// The accumulator for the given message type and destination.
public MessageTypeMetricsAccumulator FindAccumulator(string messageTypeName, Uri endpointUri)
{
var snapshot = _accumulators;
@@ -55,14 +81,26 @@ public MessageTypeMetricsAccumulator FindAccumulator(string messageTypeName, Uri
}
}
+ ///
+ /// Drains all pending batched metrics data points across all accumulators by waiting
+ /// for their batching pipelines to complete.
+ ///
public async ValueTask DrainAsync()
{
var tasks = _accumulators.Select(x => x.EntryPoint.WaitForCompletionAsync());
await Task.WhenAll(tasks);
}
-
+
+ ///
+ /// The start time of the overall metrics collection. Set when the accumulator is created.
+ ///
public DateTimeOffset From { get; private set; } = DateTimeOffset.UtcNow;
+ ///
+ /// Starts the background export loop. On each SamplingPeriod tick, iterates all
+ /// accumulators, triggers export, and publishes non-empty snapshots to the runtime's
+ /// .
+ ///
public void Start()
{
_runner = Task.Run(async () =>
@@ -74,15 +112,13 @@ public void Start()
await Task.Delay(_runtime.Options.Metrics.SamplingPeriod);
try
{
- var bus = new MessageBus(_runtime);
-
foreach (var accumulator in _accumulators)
{
var metrics = accumulator.TriggerExport(_runtime.DurabilitySettings.AssignedNodeNumber);
-
+
if (metrics.PerTenant.Length > 0)
{
- await bus.PublishAsync(metrics);
+ _runtime.Observer.MessageHandlingMetricsExported(metrics);
}
}
}
@@ -99,9 +135,10 @@ public void Start()
}, _runtime.Cancellation);
}
+ ///
public ValueTask DisposeAsync()
{
_runner.SafeDispose();
return new ValueTask();
}
-}
\ No newline at end of file
+}
diff --git a/src/Wolverine/Runtime/Metrics/PerTenantTracking.cs b/src/Wolverine/Runtime/Metrics/PerTenantTracking.cs
index a15edda3d..f8473be31 100644
--- a/src/Wolverine/Runtime/Metrics/PerTenantTracking.cs
+++ b/src/Wolverine/Runtime/Metrics/PerTenantTracking.cs
@@ -1,23 +1,68 @@
namespace Wolverine.Runtime.Metrics;
+///
+/// Mutable, in-process counters that accumulate raw metrics data for a single tenant within
+/// a instance. records
+/// mutate these counters via . On each sampling period
+/// export, snapshots the counters into an immutable
+/// record and resets all values to zero.
+///
public class PerTenantTracking
{
+ ///
+ /// The tenant identifier this tracking instance accumulates data for.
+ ///
public string TenantId { get; }
+ ///
+ /// Creates a new per-tenant tracking instance.
+ ///
+ /// The tenant identifier.
public PerTenantTracking(string tenantId)
{
TenantId = tenantId;
}
-
+
+ ///
+ /// The number of handler executions recorded. Incremented by .
+ ///
public int Executions { get; set; }
+
+ ///
+ /// The sum of handler execution durations in milliseconds. Incremented by .
+ ///
public long TotalExecutionTime { get; set; }
-
+
+ ///
+ /// The number of messages that completed processing (success or dead-letter).
+ /// Incremented by .
+ ///
public int Completions { get; set; }
+
+ ///
+ /// The sum of effective (end-to-end) times in milliseconds, measured from envelope
+ /// SentAt to processing completion. Incremented by .
+ ///
public double TotalEffectiveTime { get; set; }
+ ///
+ /// Dead-letter counts keyed by fully-qualified exception type name. Incremented
+ /// by .
+ ///
public Dictionary DeadLetterCounts { get; } = new();
+
+ ///
+ /// Failure counts keyed by fully-qualified exception type name. Incremented
+ /// by .
+ ///
public Dictionary Failures { get; } = new();
+ ///
+ /// Snapshots the current counter values into an immutable
+ /// record, then resets all counters to zero. The exception counts are compiled by taking
+ /// the union of exception types across and .
+ ///
+ /// An immutable snapshot of the accumulated metrics for this tenant.
public PerTenantMetrics CompileAndReset()
{
var exceptionTypes = DeadLetterCounts.Keys.Union(Failures.Keys).ToArray();
@@ -36,12 +81,15 @@ public PerTenantMetrics CompileAndReset()
return new ExceptionCounts(exceptionType, failures, deadLetters);
}).ToArray()
);
-
+
Clear();
return response;
}
-
+
+ ///
+ /// Resets all counters and dictionaries to their initial zero state.
+ ///
public void Clear()
{
Executions = 0;
@@ -52,4 +100,4 @@ public void Clear()
Failures.Clear();
}
-}
\ No newline at end of file
+}
diff --git a/src/Wolverine/Runtime/Metrics/RecordDeadLetter.cs b/src/Wolverine/Runtime/Metrics/RecordDeadLetter.cs
index 944383fd4..e1f80835b 100644
--- a/src/Wolverine/Runtime/Metrics/RecordDeadLetter.cs
+++ b/src/Wolverine/Runtime/Metrics/RecordDeadLetter.cs
@@ -1,7 +1,16 @@
namespace Wolverine.Runtime.Metrics;
+///
+/// Records that a message was moved to the dead-letter queue after exhausting all retry
+/// policies. Posted when MovedToErrorQueue or MessageFailed is called.
+/// Increments the dead-letter count for in
+/// .
+///
+/// The fully-qualified CLR type name of the exception that caused the dead letter.
+/// The tenant identifier from the envelope.
public record RecordDeadLetter(string ExceptionType, string TenantId) : IHandlerMetricsData
{
+ ///
public void Apply(PerTenantTracking tracking)
{
if (!tracking.DeadLetterCounts.TryAdd(ExceptionType, 1))
@@ -9,4 +18,4 @@ public void Apply(PerTenantTracking tracking)
tracking.DeadLetterCounts[ExceptionType] += 1;
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Wolverine/Runtime/Metrics/RecordEffectiveTime.cs b/src/Wolverine/Runtime/Metrics/RecordEffectiveTime.cs
index dfda4405d..9dbcf4aef 100644
--- a/src/Wolverine/Runtime/Metrics/RecordEffectiveTime.cs
+++ b/src/Wolverine/Runtime/Metrics/RecordEffectiveTime.cs
@@ -1,10 +1,20 @@
namespace Wolverine.Runtime.Metrics;
+///
+/// Records the effective (end-to-end) time of a message from when it was sent to when processing
+/// completed. Posted on both successful completion and dead-letter outcomes. Calculated as
+/// DateTimeOffset.UtcNow - envelope.SentAt in milliseconds. Increments
+/// by one and adds to
+/// .
+///
+/// The elapsed time in milliseconds from the envelope's SentAt timestamp to now.
+/// The tenant identifier from the envelope.
public record RecordEffectiveTime(double Time, string TenantId) : IHandlerMetricsData
{
+ ///
public void Apply(PerTenantTracking tracking)
{
tracking.Completions++;
tracking.TotalEffectiveTime += Time;
}
-}
\ No newline at end of file
+}
diff --git a/src/Wolverine/Runtime/Metrics/RecordExecutionTime.cs b/src/Wolverine/Runtime/Metrics/RecordExecutionTime.cs
index 6907c9169..8aebf823c 100644
--- a/src/Wolverine/Runtime/Metrics/RecordExecutionTime.cs
+++ b/src/Wolverine/Runtime/Metrics/RecordExecutionTime.cs
@@ -1,10 +1,19 @@
namespace Wolverine.Runtime.Metrics;
+///
+/// Records the wall-clock execution time of a single message handler invocation. Posted when
+/// Envelope.StopTiming() returns a positive value after handler execution completes.
+/// Increments by one and adds
+/// to .
+///
+/// The handler execution duration in milliseconds, from Envelope.StopTiming().
+/// The tenant identifier from the envelope.
public record RecordExecutionTime(long Time, string TenantId) : IHandlerMetricsData
{
+ ///
public void Apply(PerTenantTracking tracking)
{
tracking.Executions++;
tracking.TotalExecutionTime += Time;
}
-}
\ No newline at end of file
+}
diff --git a/src/Wolverine/Runtime/Metrics/RecordFailure.cs b/src/Wolverine/Runtime/Metrics/RecordFailure.cs
index a4804c8c3..b87e5617e 100644
--- a/src/Wolverine/Runtime/Metrics/RecordFailure.cs
+++ b/src/Wolverine/Runtime/Metrics/RecordFailure.cs
@@ -1,7 +1,15 @@
namespace Wolverine.Runtime.Metrics;
+///
+/// Records that a message handler threw an exception during execution. Posted alongside
+/// when the handler faults. Increments the failure count
+/// for in .
+///
+/// The fully-qualified CLR type name of the exception (e.g. "System.InvalidOperationException").
+/// The tenant identifier from the envelope.
public record RecordFailure(string ExceptionType, string TenantId) : IHandlerMetricsData
{
+ ///
public void Apply(PerTenantTracking tracking)
{
if (!tracking.Failures.TryAdd(ExceptionType, 1))
@@ -9,4 +17,4 @@ public void Apply(PerTenantTracking tracking)
tracking.Failures[ExceptionType] += 1;
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Wolverine/Wolverine.csproj b/src/Wolverine/Wolverine.csproj
index ffff864b9..2769426b6 100644
--- a/src/Wolverine/Wolverine.csproj
+++ b/src/Wolverine/Wolverine.csproj
@@ -4,9 +4,6 @@
WolverineFx
-
-
-
@@ -22,6 +19,15 @@
+
+
+
+
+
+
+
+
+
diff --git a/wolverine.sln b/wolverine.sln
index c001e37a4..4ec695cee 100644
--- a/wolverine.sln
+++ b/wolverine.sln
@@ -357,6 +357,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.CosmosDb", "src\P
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CosmosDbTests", "src\Persistence\CosmosDbTests\CosmosDbTests.csproj", "{E0D51CAE-97CF-48A8-879E-149A4E69BEE2}"
EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{827E0CD3-B72D-47B6-A68D-7590B98EB39B}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Persistence", "Persistence", "{3BA5312F-4CB7-F189-D5EC-6A9485924027}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EfCoreTests.MultiTenancy", "src\Persistence\EfCoreTests.MultiTenancy\EfCoreTests.MultiTenancy.csproj", "{ACB9EEA0-A545-4D02-A040-B1AE3CEF83ED}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -1987,6 +1993,18 @@ Global
{E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Release|x64.Build.0 = Release|Any CPU
{E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Release|x86.ActiveCfg = Release|Any CPU
{E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Release|x86.Build.0 = Release|Any CPU
+ {ACB9EEA0-A545-4D02-A040-B1AE3CEF83ED}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {ACB9EEA0-A545-4D02-A040-B1AE3CEF83ED}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {ACB9EEA0-A545-4D02-A040-B1AE3CEF83ED}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {ACB9EEA0-A545-4D02-A040-B1AE3CEF83ED}.Debug|x64.Build.0 = Debug|Any CPU
+ {ACB9EEA0-A545-4D02-A040-B1AE3CEF83ED}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {ACB9EEA0-A545-4D02-A040-B1AE3CEF83ED}.Debug|x86.Build.0 = Debug|Any CPU
+ {ACB9EEA0-A545-4D02-A040-B1AE3CEF83ED}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {ACB9EEA0-A545-4D02-A040-B1AE3CEF83ED}.Release|Any CPU.Build.0 = Release|Any CPU
+ {ACB9EEA0-A545-4D02-A040-B1AE3CEF83ED}.Release|x64.ActiveCfg = Release|Any CPU
+ {ACB9EEA0-A545-4D02-A040-B1AE3CEF83ED}.Release|x64.Build.0 = Release|Any CPU
+ {ACB9EEA0-A545-4D02-A040-B1AE3CEF83ED}.Release|x86.ActiveCfg = Release|Any CPU
+ {ACB9EEA0-A545-4D02-A040-B1AE3CEF83ED}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -2151,6 +2169,8 @@ Global
{68B94BE1-185D-D133-8A8C-EFE0C95F2BC7} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210}
{9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737} = {68B94BE1-185D-D133-8A8C-EFE0C95F2BC7}
{E0D51CAE-97CF-48A8-879E-149A4E69BEE2} = {68B94BE1-185D-D133-8A8C-EFE0C95F2BC7}
+ {3BA5312F-4CB7-F189-D5EC-6A9485924027} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
+ {ACB9EEA0-A545-4D02-A040-B1AE3CEF83ED} = {3BA5312F-4CB7-F189-D5EC-6A9485924027}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {30422362-0D90-4DBE-8C97-DD2B5B962768}