diff --git a/Directory.Packages.props b/Directory.Packages.props index 7472d9890a..0b5b1b4902 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -60,8 +60,8 @@ - - + + diff --git a/docs/configuration/multitenancy.md b/docs/configuration/multitenancy.md index 05acadc58f..c68394fe66 100644 --- a/docs/configuration/multitenancy.md +++ b/docs/configuration/multitenancy.md @@ -237,6 +237,214 @@ public async Task add_tenant_database_and_verify_the_daemon_projections_are_runn At runtime, if the Marten V7 version of the async daemon (our sub system for building asynchronous projections constantly in a background IHostedService) is constantly doing “health checks” to make sure that *some process* is running all known asynchronous projections on all known client databases. Long story, short, Marten 7 is able to detect new tenant databases and spin up the asynchronous projection handling for these new tenants with zero downtime. +## Sharded Multi-Tenancy with Database Pooling + +::: tip +This strategy was designed for extreme scalability scenarios targeting hundreds of billions of events across +many tenants. It combines database-level sharding, conjoined tenancy, and native PostgreSQL list partitioning +into a single cohesive multi-tenancy model. +::: + +For systems with very large numbers of tenants and massive data volumes, Marten provides a sharded tenancy model +that distributes tenants across a **pool of databases**. Within each database, tenant data is physically isolated +using native PostgreSQL LIST partitioning by tenant ID, while Marten's conjoined tenancy handles the query-level +filtering. + +This approach gives you: + +* **Horizontal scaling** — spread data across many databases to distribute I/O and storage +* **Physical tenant isolation** — each tenant has its own PostgreSQL partitions for both document and event tables +* **Dynamic tenant routing** — new tenants are automatically assigned to databases based on a pluggable strategy +* **Runtime expandability** — add new databases to the pool without downtime + +### Configuration + +```csharp +var builder = Host.CreateApplicationBuilder(); +builder.Services.AddMarten(opts => +{ + opts.MultiTenantedWithShardedDatabases(x => + { + // Connection to the master database that holds the pool registry + x.ConnectionString = masterConnectionString; + + // Schema for the registry tables in the master database + x.SchemaName = "tenants"; + + // Schema for the partition tracking table within each tenant database + x.PartitionSchemaName = "partitions"; + + // Seed the database pool on startup + x.AddDatabase("shard_01", shard1ConnectionString); + x.AddDatabase("shard_02", shard2ConnectionString); + x.AddDatabase("shard_03", shard3ConnectionString); + x.AddDatabase("shard_04", shard4ConnectionString); + + // Choose a tenant assignment strategy (see below) + x.UseHashAssignment(); // this is the default + }); +}); +``` + +Calling `MultiTenantedWithShardedDatabases()` automatically enables: + +* `Policies.AllDocumentsAreMultiTenanted()` — all document types use conjoined tenancy +* `Events.TenancyStyle = TenancyStyle.Conjoined` — events are partitioned by tenant +* `Policies.PartitionMultiTenantedDocumentsUsingMartenManagement()` — native PG list partitions are created per tenant + +### Tenant Assignment Strategies + +When a previously unknown tenant ID is encountered, Marten needs to decide which database in the pool +should host that tenant. Three built-in strategies are available, and you can provide your own. + +#### Hash Assignment (Default) + +```csharp +x.UseHashAssignment(); +``` + +Uses a deterministic FNV-1a hash of the tenant ID modulo the number of available (non-full) databases. +This is the fastest strategy and requires no database queries to make the assignment decision. The same +tenant ID will always hash to the same database, making it predictable and debuggable. + +Best for: systems where tenants are roughly equal in size and you want even distribution without any +management overhead. + +#### Smallest Database Assignment + +```csharp +x.UseSmallestDatabaseAssignment(); + +// Or with a custom sizing strategy +x.UseSmallestDatabaseAssignment(new MyCustomSizingStrategy()); +``` + +Assigns new tenants to the database with the fewest existing tenants. By default, "smallest" is +determined by the `tenant_count` column in the pool registry table. You can provide a custom +`IDatabaseSizingStrategy` implementation that queries actual row counts, disk usage, or any other +metric to determine database capacity. + +```csharp +public interface IDatabaseSizingStrategy +{ + ValueTask FindSmallestDatabaseAsync( + IReadOnlyList databases); +} +``` + +Best for: systems where tenants vary significantly in size and you want to balance load more carefully. + +#### Explicit Assignment + +```csharp +x.UseExplicitAssignment(); +``` + +Requires all tenants to be pre-assigned to a database via the admin API before they can be used. +Any attempt to use an unrecognized tenant ID throws an `UnknownTenantIdException`. This gives you +complete control over tenant placement at the cost of requiring an upfront registration step. + +Best for: regulated environments where tenant placement must be deliberate, or when you need to +co-locate related tenants in the same database. + +#### Custom Strategy + +```csharp +x.UseCustomAssignment(new MyStrategy()); +``` + +Implement the `ITenantAssignmentStrategy` interface from `Weasel.Core.MultiTenancy`: + +```csharp +public interface ITenantAssignmentStrategy +{ + ValueTask AssignTenantToDatabaseAsync( + string tenantId, + IReadOnlyList availableDatabases); +} +``` + +The strategy is called under a PostgreSQL advisory lock, so it does not need to handle concurrency +itself. The `availableDatabases` list only includes databases that are not marked as full. + +### Database Registry Tables + +The sharded tenancy model uses two tables in the master database to track the pool and tenant assignments: + +**`mt_database_pool`** — registry of all databases in the pool: + +| Column | Type | Description | +| ------ | ---- | ----------- | +| `database_id` | `VARCHAR` (PK) | Unique identifier for the database | +| `connection_string` | `VARCHAR NOT NULL` | PostgreSQL connection string | +| `is_full` | `BOOLEAN NOT NULL DEFAULT false` | When true, no new tenants are assigned here | +| `tenant_count` | `INTEGER NOT NULL DEFAULT 0` | Number of tenants currently assigned | + +**`mt_tenant_assignments`** — maps each tenant to its assigned database: + +| Column | Type | Description | +| ------ | ---- | ----------- | +| `tenant_id` | `VARCHAR` (PK) | The tenant identifier | +| `database_id` | `VARCHAR NOT NULL` (FK) | References `mt_database_pool.database_id` | +| `assigned_at` | `TIMESTAMPTZ NOT NULL DEFAULT now()` | When the assignment was made | + +These tables are created automatically when `AutoCreateSchemaObjects` is enabled. + +### Admin API + +Marten provides an admin API on `IDocumentStore.Advanced` for managing the database pool and tenant +assignments at runtime. All mutating operations acquire a PostgreSQL advisory lock on the master +database to prevent concurrent corruption. + +#### Adding Tenants + +```csharp +// Auto-assign a tenant using the configured strategy +// Returns the database_id the tenant was assigned to +var dbId = await store.Advanced.AddTenantToShardAsync("new-tenant", ct); + +// Explicitly assign a tenant to a specific database +await store.Advanced.AddTenantToShardAsync("vip-tenant", "shard_01", ct); +``` + +When a tenant is assigned, Marten automatically creates native PostgreSQL LIST partitions for that +tenant in the target database across all multi-tenanted document tables and event tables. + +#### Managing the Database Pool + +```csharp +// Add a new database to the pool at runtime +await store.Advanced.AddDatabaseToPoolAsync("shard_05", newConnectionString, ct); + +// Mark a database as full — no new tenants will be assigned to it +await store.Advanced.MarkDatabaseFullAsync("shard_01", ct); +``` + +Marking a database as full is useful when a database is approaching capacity limits. Existing tenants +in that database continue to work normally, but all new tenant assignments will go to other databases. + +#### Implicit Assignment + +If you are using the hash or smallest strategy, you do not need to explicitly add tenants. When a +session is opened for an unknown tenant ID, Marten will automatically: + +1. Acquire an advisory lock on the master database +2. Check if another process already assigned the tenant (double-check after lock) +3. Run the assignment strategy to pick a database +4. Write the assignment to `mt_tenant_assignments` +5. Create list partitions in the target database +6. Release the lock and return the session + +This means your application code can simply use `store.LightweightSession("any-tenant-id")` and +Marten handles the rest. + +### Async Daemon Support + +The async daemon automatically discovers all databases in the pool through `BuildDatabases()` and +runs asynchronous projections across all of them. When new databases or tenants are added at runtime, +the daemon's periodic health check picks them up and starts projection processing without any +downtime or reconfiguration. + ## Dynamically applying changes to tenants databases If you didn't call the `ApplyAllDatabaseChangesOnStartup` method, Marten would still try to create a database [upon the session creation](/documents/sessions). This action is invasive and can cause issues like timeouts, cold starts, or deadlocks. It also won't apply all defined changes upfront (so, e.g. [indexes](/documents/indexing/), [custom schema extensions](/schema/extensions)). diff --git a/src/Marten/AdvancedOperations.cs b/src/Marten/AdvancedOperations.cs index 4d2606f7b9..4ff6afdae4 100644 --- a/src/Marten/AdvancedOperations.cs +++ b/src/Marten/AdvancedOperations.cs @@ -418,6 +418,61 @@ public Task DeleteAllTenantDataAsync(string tenantId, CancellationToken token) return cleaner.ExecuteAsync(token); } + /// + /// Auto-assign a tenant to a database using the configured assignment strategy, + /// then create list partitions in the target database. Only available with sharded tenancy. + /// + /// The database_id the tenant was assigned to + public async Task AddTenantToShardAsync(string tenantId, CancellationToken ct) + { + var sharded = _store.Options.Tenancy as ShardedTenancy + ?? throw new InvalidOperationException( + "AddTenantToShardAsync is only available when using MultiTenantedWithShardedDatabases()"); + + await sharded.GetTenantAsync(tenantId).ConfigureAwait(false); + return await sharded.FindDatabaseForTenantAsync(tenantId, ct).ConfigureAwait(false) + ?? throw new InvalidOperationException($"Tenant '{tenantId}' was not assigned to any database"); + } + + /// + /// Explicitly assign a tenant to a specific database in the pool, + /// then create list partitions in the target database. Only available with sharded tenancy. + /// + public async Task AddTenantToShardAsync(string tenantId, string databaseId, CancellationToken ct) + { + var sharded = _store.Options.Tenancy as ShardedTenancy + ?? throw new InvalidOperationException( + "AddTenantToShardAsync is only available when using MultiTenantedWithShardedDatabases()"); + + await sharded.AssignTenantAsync(tenantId, databaseId, ct).ConfigureAwait(false); + } + + /// + /// Add a new database to the sharded tenancy pool at runtime. + /// Only available with sharded tenancy. + /// + public async Task AddDatabaseToPoolAsync(string databaseId, string connectionString, CancellationToken ct) + { + var sharded = _store.Options.Tenancy as ShardedTenancy + ?? throw new InvalidOperationException( + "AddDatabaseToPoolAsync is only available when using MultiTenantedWithShardedDatabases()"); + + await sharded.AddDatabaseAsync(databaseId, connectionString, ct).ConfigureAwait(false); + } + + /// + /// Mark a database as full so no new tenants will be assigned to it. + /// Only available with sharded tenancy. + /// + public async Task MarkDatabaseFullAsync(string databaseId, CancellationToken ct) + { + var sharded = _store.Options.Tenancy as ShardedTenancy + ?? throw new InvalidOperationException( + "MarkDatabaseFullAsync is only available when using MultiTenantedWithShardedDatabases()"); + + await sharded.MarkDatabaseFullAsync(databaseId, ct).ConfigureAwait(false); + } + /// /// Configure and execute a batch masking of protected data for a subset of the events /// in the event store diff --git a/src/Marten/Storage/ShardedTenancy.cs b/src/Marten/Storage/ShardedTenancy.cs new file mode 100644 index 0000000000..f1a7368e15 --- /dev/null +++ b/src/Marten/Storage/ShardedTenancy.cs @@ -0,0 +1,569 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using ImTools; +using JasperFx; +using JasperFx.Core; +using JasperFx.Descriptors; +using JasperFx.MultiTenancy; +using Marten.Schema; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Npgsql; +using Weasel.Core; +using Weasel.Core.Migrations; +using Weasel.Core.MultiTenancy; +using Weasel.Postgresql; +using Weasel.Postgresql.Tables; +using Weasel.Postgresql.Tables.Partitioning; +using Table = Weasel.Postgresql.Tables.Table; + +namespace Marten.Storage; + +/// +/// Multi-tenancy implementation that distributes tenants across a pool of databases +/// with conjoined tenancy and native PG list partitioning per tenant within each database. +/// +public class ShardedTenancy : ITenancy, ITenancyWithMasterDatabase, ITenantDatabasePool +{ + private readonly StoreOptions _options; + private readonly ShardedTenancyOptions _configuration; + private readonly Lazy _dataSource; + private readonly Lazy _poolDatabase; + private readonly string _schemaName; + + // tenant_id -> (MartenDatabase, databaseId) + private ImHashMap _tenantToDatabase = ImHashMap.Empty; + // database_id -> MartenDatabase + private ImHashMap _databasesById = ImHashMap.Empty; + + private bool _hasAppliedChanges; + private bool _hasSeeded; + + // Advisory lock key for tenant assignment serialization + private const int AdvisoryLockKey = 4173_0001; // Unique to sharded tenancy + + public ShardedTenancy(StoreOptions options, ShardedTenancyOptions configuration) + { + _options = options; + _configuration = configuration; + _schemaName = configuration.SchemaName; + + if (configuration.DataSource != null) + { + _dataSource = new Lazy(() => configuration.DataSource); + } + else if (configuration.ConnectionString.IsNotEmpty()) + { + _dataSource = new Lazy(() => + _options.NpgsqlDataSourceFactory.Create(configuration.ConnectionString)); + } + else + { + throw new ArgumentException( + "Either a ConnectionString or DataSource must be provided for ShardedTenancy"); + } + + Cleaner = new CompositeDocumentCleaner(this, _options); + + _poolDatabase = new Lazy(() => + new PoolLookupDatabase(_options, _dataSource.Value, _schemaName)); + } + + public void Dispose() + { + foreach (var entry in _databasesById.Enumerate()) entry.Value.Dispose(); + + if (_dataSource.IsValueCreated) + { + _dataSource.Value.Dispose(); + } + } + + #region ITenancy + + public Tenant Default => throw new NotSupportedException( + "Default tenant is not supported with sharded multi-tenancy. All operations require a tenant ID."); + + public IDocumentCleaner Cleaner { get; } + + public Tenant GetTenant(string tenantId) + { + tenantId = _options.TenantIdStyle.MaybeCorrectTenantId(tenantId); + if (_tenantToDatabase.TryFind(tenantId, out var database)) + { + return new Tenant(tenantId, database); + } + + database = findOrAssignTenantDatabaseAsync(tenantId).GetAwaiter().GetResult(); + return new Tenant(tenantId, database); + } + + public async ValueTask GetTenantAsync(string tenantId) + { + tenantId = _options.TenantIdStyle.MaybeCorrectTenantId(tenantId); + if (_tenantToDatabase.TryFind(tenantId, out var database)) + { + return new Tenant(tenantId, database); + } + + database = await findOrAssignTenantDatabaseAsync(tenantId).ConfigureAwait(false); + return new Tenant(tenantId, database); + } + + public async ValueTask FindOrCreateDatabase(string tenantIdOrDatabaseIdentifier) + { + tenantIdOrDatabaseIdentifier = _options.TenantIdStyle.MaybeCorrectTenantId(tenantIdOrDatabaseIdentifier); + + // Try tenant lookup first + if (_tenantToDatabase.TryFind(tenantIdOrDatabaseIdentifier, out var database)) + { + return database; + } + + // Try database id lookup + if (_databasesById.TryFind(tenantIdOrDatabaseIdentifier, out database)) + { + return database; + } + + return await findOrAssignTenantDatabaseAsync(tenantIdOrDatabaseIdentifier).ConfigureAwait(false); + } + + public async ValueTask FindDatabase(DatabaseId id) + { + var database = _databasesById.Enumerate().Select(x => x.Value).FirstOrDefault(x => x.Id == id); + if (database != null) return database; + + await BuildDatabases().ConfigureAwait(false); + + database = _databasesById.Enumerate().Select(x => x.Value).FirstOrDefault(x => x.Id == id); + if (database == null) + { + throw new ArgumentOutOfRangeException(nameof(id), $"Requested database {id.Identity} cannot be found"); + } + + return database; + } + + public bool IsTenantStoredInCurrentDatabase(IMartenDatabase database, string tenantId) + { + tenantId = _options.TenantIdStyle.MaybeCorrectTenantId(tenantId); + if (_tenantToDatabase.TryFind(tenantId, out var assignedDb)) + { + return assignedDb.Id == database.Id; + } + return false; + } + + public async ValueTask> BuildDatabases() + { + await maybeApplyChanges().ConfigureAwait(false); + await maybeSeedDatabases().ConfigureAwait(false); + + await using var conn = _dataSource.Value.CreateConnection(); + await conn.OpenAsync().ConfigureAwait(false); + + try + { + // Load all databases from pool + await using var poolReader = await ((DbCommand)conn + .CreateCommand($"select database_id, connection_string, is_full, tenant_count from {_schemaName}.{DatabasePoolTable.TableName}")) + .ExecuteReaderAsync().ConfigureAwait(false); + + while (await poolReader.ReadAsync().ConfigureAwait(false)) + { + var databaseId = await poolReader.GetFieldValueAsync(0).ConfigureAwait(false); + + if (_databasesById.TryFind(databaseId, out _)) continue; + + var connectionString = await poolReader.GetFieldValueAsync(1).ConfigureAwait(false); + connectionString = _configuration.CorrectedConnectionString(connectionString); + + var database = new MartenDatabase(_options, + _options.NpgsqlDataSourceFactory.Create(connectionString), databaseId); + + _databasesById = _databasesById.AddOrUpdate(databaseId, database); + } + + await poolReader.CloseAsync().ConfigureAwait(false); + + // Load all tenant assignments + await using var assignReader = await ((DbCommand)conn + .CreateCommand($"select tenant_id, database_id from {_schemaName}.{TenantAssignmentTable.TableName}")) + .ExecuteReaderAsync().ConfigureAwait(false); + + while (await assignReader.ReadAsync().ConfigureAwait(false)) + { + var tenantId = await assignReader.GetFieldValueAsync(0).ConfigureAwait(false); + tenantId = _options.TenantIdStyle.MaybeCorrectTenantId(tenantId); + var databaseId = await assignReader.GetFieldValueAsync(1).ConfigureAwait(false); + + if (_databasesById.TryFind(databaseId, out var database)) + { + database.TenantIds.Fill(tenantId); + _tenantToDatabase = _tenantToDatabase.AddOrUpdate(tenantId, database); + } + } + + await assignReader.CloseAsync().ConfigureAwait(false); + } + finally + { + await conn.CloseAsync().ConfigureAwait(false); + } + + var list = _databasesById.Enumerate().Select(x => x.Value).OfType().ToList(); + list.Insert(0, _poolDatabase.Value); + return list; + } + + #endregion + + #region ITenancyWithMasterDatabase + + public PostgresqlDatabase TenantDatabase => _poolDatabase.Value; + + #endregion + + #region ITenantDatabasePool + + public async ValueTask> ListDatabasesAsync(CancellationToken ct) + { + await maybeApplyChanges().ConfigureAwait(false); + await maybeSeedDatabases().ConfigureAwait(false); + + var list = new List(); + + await using var conn = _dataSource.Value.CreateConnection(); + await conn.OpenAsync(ct).ConfigureAwait(false); + + await using var reader = await ((DbCommand)conn + .CreateCommand($"select database_id, connection_string, is_full, tenant_count from {_schemaName}.{DatabasePoolTable.TableName}")) + .ExecuteReaderAsync(ct).ConfigureAwait(false); + + while (await reader.ReadAsync(ct).ConfigureAwait(false)) + { + list.Add(new PooledDatabase( + await reader.GetFieldValueAsync(0, ct).ConfigureAwait(false), + await reader.GetFieldValueAsync(1, ct).ConfigureAwait(false), + await reader.GetFieldValueAsync(2, ct).ConfigureAwait(false), + await reader.GetFieldValueAsync(3, ct).ConfigureAwait(false) + )); + } + + await conn.CloseAsync().ConfigureAwait(false); + return list; + } + + public async ValueTask AddDatabaseAsync(string databaseId, string connectionString, CancellationToken ct) + { + await maybeApplyChanges().ConfigureAwait(false); + + await _dataSource.Value + .CreateCommand( + $"insert into {_schemaName}.{DatabasePoolTable.TableName} (database_id, connection_string) values (:id, :conn) on conflict (database_id) do update set connection_string = :conn") + .With("id", databaseId) + .With("conn", connectionString) + .ExecuteNonQueryAsync(ct).ConfigureAwait(false); + + // Create and cache the MartenDatabase + var corrected = _configuration.CorrectedConnectionString(connectionString); + var database = new MartenDatabase(_options, + _options.NpgsqlDataSourceFactory.Create(corrected), databaseId); + _databasesById = _databasesById.AddOrUpdate(databaseId, database); + } + + public async ValueTask MarkDatabaseFullAsync(string databaseId, CancellationToken ct) + { + await maybeApplyChanges().ConfigureAwait(false); + await maybeSeedDatabases().ConfigureAwait(false); + + await _dataSource.Value + .CreateCommand( + $"update {_schemaName}.{DatabasePoolTable.TableName} set is_full = true where database_id = :id") + .With("id", databaseId) + .ExecuteNonQueryAsync(ct).ConfigureAwait(false); + } + + public async ValueTask FindDatabaseForTenantAsync(string tenantId, CancellationToken ct) + { + tenantId = _options.TenantIdStyle.MaybeCorrectTenantId(tenantId); + await maybeApplyChanges().ConfigureAwait(false); + + var result = await _dataSource.Value + .CreateCommand( + $"select database_id from {_schemaName}.{TenantAssignmentTable.TableName} where tenant_id = :id") + .With("id", tenantId) + .ExecuteScalarAsync(ct).ConfigureAwait(false); + + return result as string; + } + + public async ValueTask AssignTenantAsync(string tenantId, string databaseId, CancellationToken ct) + { + tenantId = _options.TenantIdStyle.MaybeCorrectTenantId(tenantId); + await maybeApplyChanges().ConfigureAwait(false); + await maybeSeedDatabases().ConfigureAwait(false); + + await using var conn = _dataSource.Value.CreateConnection(); + await conn.OpenAsync(ct).ConfigureAwait(false); + + // Acquire advisory lock + await conn.CreateCommand($"select pg_advisory_lock({AdvisoryLockKey})") + .ExecuteNonQueryAsync(ct).ConfigureAwait(false); + + try + { + await conn.CreateCommand( + $"insert into {_schemaName}.{TenantAssignmentTable.TableName} (tenant_id, database_id) values (:tid, :did) on conflict (tenant_id) do update set database_id = :did") + .With("tid", tenantId) + .With("did", databaseId) + .ExecuteNonQueryAsync(ct).ConfigureAwait(false); + + await conn.CreateCommand( + $"update {_schemaName}.{DatabasePoolTable.TableName} set tenant_count = (select count(*) from {_schemaName}.{TenantAssignmentTable.TableName} where database_id = :did) where database_id = :did") + .With("did", databaseId) + .ExecuteNonQueryAsync(ct).ConfigureAwait(false); + + // Create partition in the target database + if (_databasesById.TryFind(databaseId, out var database)) + { + database.TenantIds.Fill(tenantId); + _tenantToDatabase = _tenantToDatabase.AddOrUpdate(tenantId, database); + + await createPartitionsForTenant(database, tenantId, ct).ConfigureAwait(false); + } + } + finally + { + await conn.CreateCommand($"select pg_advisory_unlock({AdvisoryLockKey})") + .ExecuteNonQueryAsync(ct).ConfigureAwait(false); + await conn.CloseAsync().ConfigureAwait(false); + } + } + + public async ValueTask RemoveTenantAsync(string tenantId, CancellationToken ct) + { + tenantId = _options.TenantIdStyle.MaybeCorrectTenantId(tenantId); + await maybeApplyChanges().ConfigureAwait(false); + + await _dataSource.Value + .CreateCommand( + $"delete from {_schemaName}.{TenantAssignmentTable.TableName} where tenant_id = :id") + .With("id", tenantId) + .ExecuteNonQueryAsync(ct).ConfigureAwait(false); + } + + #endregion + +#pragma warning disable MA0032 + #region Internals + + private async Task findOrAssignTenantDatabaseAsync(string tenantId) + { + await maybeApplyChanges().ConfigureAwait(false); + await maybeSeedDatabases().ConfigureAwait(false); + + // Step 1: Check assignment table + var databaseId = await FindDatabaseForTenantAsync(tenantId, CancellationToken.None) + .ConfigureAwait(false); + + if (databaseId != null && _databasesById.TryFind(databaseId, out var database)) + { + database.TenantIds.Fill(tenantId); + _tenantToDatabase = _tenantToDatabase.AddOrUpdate(tenantId, database); + return database; + } + + // Step 2: Auto-assign under advisory lock + await using var conn = _dataSource.Value.CreateConnection(); + await conn.OpenAsync().ConfigureAwait(false); + + await conn.CreateCommand($"select pg_advisory_lock({AdvisoryLockKey})") + .ExecuteNonQueryAsync(CancellationToken.None).ConfigureAwait(false); + + try + { + // Double-check after acquiring lock + var existingDbId = (string?)await conn + .CreateCommand( + $"select database_id from {_schemaName}.{TenantAssignmentTable.TableName} where tenant_id = :id") + .With("id", tenantId) + .ExecuteScalarAsync(CancellationToken.None).ConfigureAwait(false); + + if (existingDbId != null && _databasesById.TryFind(existingDbId, out database)) + { + database.TenantIds.Fill(tenantId); + _tenantToDatabase = _tenantToDatabase.AddOrUpdate(tenantId, database); + return database; + } + + // Get available databases + var availableDatabases = new List(); + await using var reader = await ((DbCommand)conn + .CreateCommand( + $"select database_id, connection_string, is_full, tenant_count from {_schemaName}.{DatabasePoolTable.TableName} where is_full = false")) + .ExecuteReaderAsync(CancellationToken.None).ConfigureAwait(false); + + while (await reader.ReadAsync().ConfigureAwait(false)) + { + availableDatabases.Add(new PooledDatabase( + await reader.GetFieldValueAsync(0).ConfigureAwait(false), + await reader.GetFieldValueAsync(1).ConfigureAwait(false), + false, + await reader.GetFieldValueAsync(3).ConfigureAwait(false) + )); + } + + await reader.CloseAsync().ConfigureAwait(false); + + // Run assignment strategy + var assignedDbId = await _configuration.AssignmentStrategy + .AssignTenantToDatabaseAsync(tenantId, availableDatabases).ConfigureAwait(false); + + // Write assignment + await conn.CreateCommand( + $"insert into {_schemaName}.{TenantAssignmentTable.TableName} (tenant_id, database_id) values (:tid, :did) on conflict (tenant_id) do update set database_id = :did") + .With("tid", tenantId) + .With("did", assignedDbId) + .ExecuteNonQueryAsync(CancellationToken.None).ConfigureAwait(false); + + // Update tenant count + await conn.CreateCommand( + $"update {_schemaName}.{DatabasePoolTable.TableName} set tenant_count = tenant_count + 1 where database_id = :did") + .With("did", assignedDbId) + .ExecuteNonQueryAsync(CancellationToken.None).ConfigureAwait(false); + + // Ensure database is in cache + if (!_databasesById.TryFind(assignedDbId, out database)) + { + // Need to build it from the pool + await BuildDatabases().ConfigureAwait(false); + if (!_databasesById.TryFind(assignedDbId, out database)) + { + throw new InvalidOperationException( + $"Database '{assignedDbId}' was assigned but could not be found in the pool"); + } + } + + database.TenantIds.Fill(tenantId); + _tenantToDatabase = _tenantToDatabase.AddOrUpdate(tenantId, database); + + // Create partitions in the target database + await createPartitionsForTenant(database, tenantId, CancellationToken.None).ConfigureAwait(false); + + return database; + } + finally + { + await conn.CreateCommand($"select pg_advisory_unlock({AdvisoryLockKey})") + .ExecuteNonQueryAsync(CancellationToken.None).ConfigureAwait(false); + await conn.CloseAsync().ConfigureAwait(false); + } + } + + private async Task createPartitionsForTenant(MartenDatabase database, string tenantId, CancellationToken ct) + { + if (_options.TenantPartitions == null) return; + + var partitions = _options.TenantPartitions.Partitions; + var dict = new Dictionary { { tenantId, tenantId } }; + + await partitions.AddPartitionToAllTables( + NullLogger.Instance, database, dict, ct).ConfigureAwait(false); + } + + private async Task maybeApplyChanges() + { + if (!_hasAppliedChanges && + (_configuration.AutoCreate ?? _options.AutoCreateSchemaObjects) != AutoCreate.None) + { + await _poolDatabase.Value + .ApplyAllConfiguredChangesToDatabaseAsync(_options.AutoCreateSchemaObjects) + .ConfigureAwait(false); + _hasAppliedChanges = true; + } + } + + private async Task maybeSeedDatabases() + { + if (_hasSeeded || _configuration.SeedDatabases.Count == 0) return; + + foreach (var (databaseId, connectionString) in _configuration.SeedDatabases) + { + await AddDatabaseAsync(databaseId, connectionString, CancellationToken.None) + .ConfigureAwait(false); + } + + _hasSeeded = true; + } + +#pragma warning restore MA0032 + #endregion + + #region Descriptors + + public DatabaseCardinality Cardinality => DatabaseCardinality.DynamicMultiple; + + public async ValueTask DescribeDatabasesAsync(CancellationToken token) + { + await BuildDatabases().ConfigureAwait(false); + + var list = _databasesById.Enumerate().Select(pair => + { + var descriptor = pair.Value.Describe(); + descriptor.TenantIds.AddRange(pair.Value.TenantIds); + return descriptor; + }).ToList(); + + return new DatabaseUsage + { + Cardinality = DatabaseCardinality.DynamicMultiple, + Databases = list + }; + } + + #endregion + + #region Inner classes + + internal class PoolLookupDatabase : PostgresqlDatabase + { + private readonly PoolFeatureSchema _feature; + + public PoolLookupDatabase(StoreOptions options, NpgsqlDataSource dataSource, string schemaName) + : base(options, options.AutoCreateSchemaObjects, options.Advanced.Migrator, + "ShardedTenancyPool", dataSource) + { + _feature = new PoolFeatureSchema(schemaName, options); + } + + public override IFeatureSchema[] BuildFeatureSchemas() + { + return [_feature]; + } + } + + internal class PoolFeatureSchema : FeatureSchemaBase + { + private readonly string _schemaName; + + public PoolFeatureSchema(string schemaName, StoreOptions options) + : base("ShardedTenancyPool", options.Advanced.Migrator) + { + _schemaName = schemaName; + } + + protected override IEnumerable schemaObjects() + { + yield return new DatabasePoolTable(_schemaName); + yield return new TenantAssignmentTable(_schemaName); + } + } + + #endregion +} diff --git a/src/Marten/Storage/ShardedTenancyOptions.cs b/src/Marten/Storage/ShardedTenancyOptions.cs new file mode 100644 index 0000000000..2f8616aa2a --- /dev/null +++ b/src/Marten/Storage/ShardedTenancyOptions.cs @@ -0,0 +1,115 @@ +using System; +using System.Collections.Generic; +using JasperFx; +using Npgsql; +using Weasel.Core.MultiTenancy; + +namespace Marten.Storage; + +/// +/// Configuration options for sharded multi-tenancy where tenants are distributed +/// across multiple databases with native PG list partitioning per tenant within each database. +/// +public class ShardedTenancyOptions +{ + /// + /// Connection string to the master database that holds the pool registry + /// and tenant assignment tables. + /// + public string? ConnectionString { get; set; } + + /// + /// Pre-configured NpgsqlDataSource for the master database. + /// Takes precedence over ConnectionString if set. + /// + public NpgsqlDataSource? DataSource { get; set; } + + /// + /// Schema name for the pool registry and assignment tables in the master database. + /// Defaults to "public". + /// + public string SchemaName { get; set; } = "public"; + + /// + /// Schema name for the mt_tenant_partitions table within each tenant database. + /// Defaults to "tenants". + /// + public string PartitionSchemaName { get; set; } = "tenants"; + + /// + /// Override the AutoCreate setting for the master database tables. + /// If null, uses the store's AutoCreateSchemaObjects setting. + /// + public AutoCreate? AutoCreate { get; set; } + + /// + /// Application name tag for diagnostics in connection strings. + /// + public string? ApplicationName { get; set; } + + /// + /// The strategy used to assign new tenants to databases. + /// Defaults to . + /// + public ITenantAssignmentStrategy AssignmentStrategy { get; set; } = new HashTenantAssignment(); + + private readonly List<(string DatabaseId, string ConnectionString)> _seedDatabases = new(); + + /// + /// Seed databases that are registered in the pool on startup. + /// + public IReadOnlyList<(string DatabaseId, string ConnectionString)> SeedDatabases => _seedDatabases; + + /// + /// Register a database in the pool at startup. + /// + public void AddDatabase(string databaseId, string connectionString) + { + _seedDatabases.Add((databaseId, connectionString)); + } + + /// + /// Use hash-based tenant assignment (deterministic, FNV-1a hash % N). + /// This is the default. + /// + public void UseHashAssignment() + { + AssignmentStrategy = new HashTenantAssignment(); + } + + /// + /// Use smallest-database tenant assignment (picks database with lowest tenant count). + /// + public void UseSmallestDatabaseAssignment(IDatabaseSizingStrategy? sizing = null) + { + AssignmentStrategy = new SmallestTenantAssignment(sizing); + } + + /// + /// Use explicit-only tenant assignment. Unknown tenants throw UnknownTenantIdException. + /// All tenants must be pre-assigned via the admin API. + /// + public void UseExplicitAssignment() + { + AssignmentStrategy = new ExplicitTenantAssignment(); + } + + /// + /// Use a custom tenant assignment strategy. + /// + public void UseCustomAssignment(ITenantAssignmentStrategy strategy) + { + AssignmentStrategy = strategy ?? throw new ArgumentNullException(nameof(strategy)); + } + + internal string CorrectedConnectionString(string connectionString) + { + if (ApplicationName == null) return connectionString; + + var builder = new NpgsqlConnectionStringBuilder(connectionString) + { + ApplicationName = ApplicationName + }; + return builder.ConnectionString; + } +} diff --git a/src/Marten/StoreOptions.cs b/src/Marten/StoreOptions.cs index 76d57de7da..184cfc345d 100644 --- a/src/Marten/StoreOptions.cs +++ b/src/Marten/StoreOptions.cs @@ -980,6 +980,31 @@ public void MultiTenantedDatabasesWithMasterDatabaseTable(Action + /// Configure sharded multi-tenancy where tenants are distributed across a pool of databases + /// with conjoined tenancy and native PG list partitioning per tenant within each database. + /// + public void MultiTenantedWithShardedDatabases(Action configure) + { + var configuration = new ShardedTenancyOptions(); + configure(configuration); + + if (configuration.ConnectionString.IsEmpty() && configuration.DataSource == null) + { + throw new ArgumentOutOfRangeException(nameof(configure), + $"Either a {nameof(ShardedTenancyOptions.ConnectionString)} or {nameof(ShardedTenancyOptions.DataSource)} must be supplied"); + } + + // Set up conjoined tenancy + managed partitioning + Policies.AllDocumentsAreMultiTenanted(); + Events.TenancyStyle = TenancyStyle.Conjoined; + Policies.PartitionMultiTenantedDocumentsUsingMartenManagement(configuration.PartitionSchemaName); + + var tenancy = new ShardedTenancy(this, configuration); + Advanced.DefaultTenantUsageEnabled = false; + Tenancy = tenancy; + } + IDocumentSchemaResolver IReadOnlyStoreOptions.Schema => this; string IDocumentSchemaResolver.EventsSchemaName => Events.DatabaseSchemaName; diff --git a/src/MultiTenancyTests/sharded_tenancy_tests.cs b/src/MultiTenancyTests/sharded_tenancy_tests.cs new file mode 100644 index 0000000000..574e2488de --- /dev/null +++ b/src/MultiTenancyTests/sharded_tenancy_tests.cs @@ -0,0 +1,462 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Events; +using Marten; +using Marten.Events; +using Marten.Storage; +using Marten.Testing.Documents; +using Marten.Testing.Harness; +using Npgsql; +using Shouldly; +using JasperFx.MultiTenancy; +using Weasel.Postgresql; +using Weasel.Postgresql.Migrations; +using Weasel.Postgresql.Tables; +using Weasel.Postgresql.Tables.Partitioning; +using Xunit; + +namespace MultiTenancyTests; + +[CollectionDefinition("sharded-tenancy", DisableParallelization = true)] +public class ShardedTenancyCollection : ICollectionFixture; + +public class ShardedTenancyFixture : IAsyncLifetime +{ + public string[] DbNames { get; private set; } = null!; + public Dictionary ConnectionStrings { get; private set; } = null!; + + public async Task InitializeAsync() + { + await using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString); + await conn.OpenAsync(); + + DbNames = new[] { "marten_shard_a", "marten_shard_b", "marten_shard_c" }; + ConnectionStrings = new Dictionary(StringComparer.OrdinalIgnoreCase); + + foreach (var name in DbNames) + { + var exists = await conn.DatabaseExists(name); + if (!exists) + { + await new DatabaseSpecification().BuildDatabase(conn, name); + } + + var builder = new NpgsqlConnectionStringBuilder(ConnectionSource.ConnectionString) + { + Database = name + }; + ConnectionStrings[name] = builder.ConnectionString; + } + + // Clean up master database schemas + try { await conn.DropSchemaAsync("sharded"); } catch { } + + // Clean up tenant database schemas + foreach (var connStr in ConnectionStrings.Values) + { + await using var tenantConn = new NpgsqlConnection(connStr); + await tenantConn.OpenAsync(); + try { await tenantConn.DropSchemaAsync("tenants"); } catch { } + await cleanMartenObjectsInPublicSchema(tenantConn); + } + } + + internal static async Task cleanMartenObjectsInPublicSchema(NpgsqlConnection conn) + { + try + { + // Ensure public schema exists (may have been dropped by a previous test) + await conn.CreateCommand("CREATE SCHEMA IF NOT EXISTS public").ExecuteNonQueryAsync(); + + // Drop all mt_ tables + var tables = await conn.ExistingTablesAsync(); + foreach (var table in tables.Where(t => t.Schema == "public" && t.Name.StartsWith("mt_"))) + { + await conn.CreateCommand($"DROP TABLE IF EXISTS {table.QualifiedName} CASCADE").ExecuteNonQueryAsync(); + } + + // Drop all mt_ functions + var funcs = await conn.CreateCommand( + "SELECT proname FROM pg_proc p JOIN pg_namespace n ON p.pronamespace = n.oid WHERE n.nspname = 'public' AND p.proname LIKE 'mt_%'") + .ExecuteReaderAsync(); + var funcNames = new List(); + while (await funcs.ReadAsync()) funcNames.Add(await funcs.GetFieldValueAsync(0)); + await funcs.CloseAsync(); + foreach (var f in funcNames) + { + await conn.CreateCommand($"DROP FUNCTION IF EXISTS public.{f} CASCADE").ExecuteNonQueryAsync(); + } + + // Drop all mt_ sequences + await conn.CreateCommand( + "DO $$ DECLARE r RECORD; BEGIN FOR r IN (SELECT sequencename FROM pg_sequences WHERE schemaname = 'public' AND sequencename LIKE 'mt_%') LOOP EXECUTE 'DROP SEQUENCE IF EXISTS public.' || r.sequencename || ' CASCADE'; END LOOP; END $$") + .ExecuteNonQueryAsync(); + } + catch { } + } + + public Task DisposeAsync() => Task.CompletedTask; +} + +[Collection("sharded-tenancy")] +public class sharded_tenancy_tests : IAsyncLifetime +{ + private readonly ShardedTenancyFixture _fixture; + private IDocumentStore _store = null!; + + public sharded_tenancy_tests(ShardedTenancyFixture fixture) + { + _fixture = fixture; + } + + public async Task InitializeAsync() + { + // Clean schemas before each test + await using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString); + await conn.OpenAsync(); + try { await conn.DropSchemaAsync("sharded"); } catch { } + + foreach (var connStr in _fixture.ConnectionStrings.Values) + { + await using var tenantConn = new NpgsqlConnection(connStr); + await tenantConn.OpenAsync(); + try { await tenantConn.DropSchemaAsync("tenants"); } catch { } + await ShardedTenancyFixture.cleanMartenObjectsInPublicSchema(tenantConn); + } + } + + public async Task DisposeAsync() + { + if (_store != null) + { + _store.Dispose(); + } + } + + private IDocumentStore CreateStore(Action? customConfig = null) + { + _store = DocumentStore.For(opts => + { + opts.MultiTenantedWithShardedDatabases(x => + { + x.ConnectionString = ConnectionSource.ConnectionString; + x.SchemaName = "sharded"; + x.PartitionSchemaName = "tenants"; + + foreach (var (dbName, connStr) in _fixture.ConnectionStrings) + { + x.AddDatabase(dbName, connStr); + } + + customConfig?.Invoke(x); + }); + + opts.AutoCreateSchemaObjects = JasperFx.AutoCreate.All; + + opts.RegisterDocumentType(); + opts.RegisterDocumentType(); + opts.Events.AddEventType(); + }); + + return _store; + } + + [Fact] + public async Task can_create_store_and_seed_pool() + { + CreateStore(); + + var sharded = (ShardedTenancy)_store.Options.Tenancy; + var databases = await sharded.ListDatabasesAsync(CancellationToken.None); + + databases.Count.ShouldBe(3); + databases.Select(d => d.DatabaseId).OrderBy(x => x) + .ShouldBe(_fixture.DbNames.OrderBy(x => x).ToArray()); + } + + [Fact] + public async Task hash_assignment_distributes_tenants_across_databases() + { + CreateStore(); // defaults to hash assignment + + var assignedDatabases = new HashSet(); + for (int i = 0; i < 50; i++) + { + var tenantId = $"tenant_{i:000}"; + var dbId = await _store.Advanced.AddTenantToShardAsync(tenantId, CancellationToken.None); + assignedDatabases.Add(dbId); + } + + // With 50 tenants and 3 databases, hash should distribute across multiple + assignedDatabases.Count.ShouldBeGreaterThan(1); + } + + [Fact] + public async Task smallest_assignment_picks_database_with_fewest_tenants() + { + CreateStore(x => x.UseSmallestDatabaseAssignment()); + + var sharded = (ShardedTenancy)_store.Options.Tenancy; + + // Assign 3 tenants to db A explicitly + await sharded.AssignTenantAsync("t1", _fixture.DbNames[0], CancellationToken.None); + await sharded.AssignTenantAsync("t2", _fixture.DbNames[0], CancellationToken.None); + await sharded.AssignTenantAsync("t3", _fixture.DbNames[0], CancellationToken.None); + + // Assign 1 to db B + await sharded.AssignTenantAsync("t4", _fixture.DbNames[1], CancellationToken.None); + + // Now auto-assign — should go to db C (0 tenants) + var tenant5 = await sharded.GetTenantAsync("t5"); + var dbForT5 = await sharded.FindDatabaseForTenantAsync("t5", CancellationToken.None); + + dbForT5.ShouldBe(_fixture.DbNames[2]); + } + + [Fact] + public async Task explicit_assignment_throws_for_unknown_tenant() + { + CreateStore(x => x.UseExplicitAssignment()); + + await Should.ThrowAsync(async () => + { + await _store.Options.Tenancy.GetTenantAsync("unknown_tenant"); + }); + } + + [Fact] + public async Task explicit_assignment_works_for_pre_assigned_tenant() + { + CreateStore(x => x.UseExplicitAssignment()); + + var sharded = (ShardedTenancy)_store.Options.Tenancy; + + // Pre-assign + await sharded.AssignTenantAsync("known_tenant", _fixture.DbNames[0], CancellationToken.None); + + // Should succeed now + var tenant = await sharded.GetTenantAsync("known_tenant"); + tenant.TenantId.ShouldBe("known_tenant"); + } + + [Fact] + public async Task partition_created_after_tenant_assignment() + { + CreateStore(); + + // Apply schema to all databases first + var databases = await _store.Options.Tenancy.BuildDatabases(); + foreach (var db in databases.OfType()) + { + await db.ApplyAllConfiguredChangesToDatabaseAsync(); + } + + // Assign a tenant + await _store.Advanced.AddTenantToShardAsync("partition_test_tenant", CancellationToken.None); + + // Find which database it was assigned to + var sharded = (ShardedTenancy)_store.Options.Tenancy; + var dbId = await sharded.FindDatabaseForTenantAsync("partition_test_tenant", CancellationToken.None); + dbId.ShouldNotBeNull(); + + // Check that PG partitions were created + await using var conn = new NpgsqlConnection(_fixture.ConnectionStrings[dbId]); + await conn.OpenAsync(); + + var tables = await conn.ExistingTablesAsync(); + // Should have a partition like mt_doc_target_partition_test_tenant + tables.Any(t => t.Name.Contains("partition_test_tenant")).ShouldBeTrue( + $"Expected partition for 'partition_test_tenant' in {dbId}. Tables: {string.Join(", ", tables.Select(t => t.QualifiedName))}"); + } + + [Fact] + public async Task mark_database_full_excludes_from_assignment() + { + CreateStore(); // hash assignment + + var sharded = (ShardedTenancy)_store.Options.Tenancy; + + // Mark all but one database as full + await sharded.MarkDatabaseFullAsync(_fixture.DbNames[0], CancellationToken.None); + await sharded.MarkDatabaseFullAsync(_fixture.DbNames[1], CancellationToken.None); + + // All new tenants should go to the remaining database + for (int i = 0; i < 10; i++) + { + var dbId = await _store.Advanced.AddTenantToShardAsync($"full_test_{i}", CancellationToken.None); + dbId.ShouldBe(_fixture.DbNames[2]); + } + } + + [Fact] + public async Task runtime_database_addition() + { + // Create store without seeding any databases + _store = DocumentStore.For(opts => + { + opts.MultiTenantedWithShardedDatabases(x => + { + x.ConnectionString = ConnectionSource.ConnectionString; + x.SchemaName = "sharded"; + x.PartitionSchemaName = "tenants"; + // No AddDatabase calls — empty pool + }); + + opts.AutoCreateSchemaObjects = JasperFx.AutoCreate.All; + opts.RegisterDocumentType(); + }); + + var sharded = (ShardedTenancy)_store.Options.Tenancy; + + // Add databases at runtime + foreach (var (dbName, connStr) in _fixture.ConnectionStrings) + { + await sharded.AddDatabaseAsync(dbName, connStr, CancellationToken.None); + } + + var databases = await sharded.ListDatabasesAsync(CancellationToken.None); + databases.Count.ShouldBe(3); + } + + [Fact] + public async Task document_crud_across_shards() + { + CreateStore(); + + // Apply schema + var databases = await _store.Options.Tenancy.BuildDatabases(); + foreach (var db in databases.OfType()) + { + await db.ApplyAllConfiguredChangesToDatabaseAsync(); + } + + // Add specific tenants + var sharded = (ShardedTenancy)_store.Options.Tenancy; + await sharded.AssignTenantAsync("alpha", _fixture.DbNames[0], CancellationToken.None); + await sharded.AssignTenantAsync("beta", _fixture.DbNames[1], CancellationToken.None); + + // Write documents per tenant + await using (var session = _store.LightweightSession("alpha")) + { + session.Store(new Target { Id = Guid.NewGuid(), String = "alpha_data" }); + await session.SaveChangesAsync(); + } + + await using (var session = _store.LightweightSession("beta")) + { + session.Store(new Target { Id = Guid.NewGuid(), String = "beta_data" }); + await session.SaveChangesAsync(); + } + + // Read back — isolation check + await using (var q1 = _store.QuerySession("alpha")) + { + var results = await q1.Query().ToListAsync(); + results.Count.ShouldBe(1); + results[0].String.ShouldBe("alpha_data"); + } + + await using (var q2 = _store.QuerySession("beta")) + { + var results = await q2.Query().ToListAsync(); + results.Count.ShouldBe(1); + results[0].String.ShouldBe("beta_data"); + } + } + + [Fact] + public async Task event_append_and_query_across_shards() + { + CreateStore(); + + var databases = await _store.Options.Tenancy.BuildDatabases(); + foreach (var db in databases.OfType()) + { + await db.ApplyAllConfiguredChangesToDatabaseAsync(); + } + + var sharded = (ShardedTenancy)_store.Options.Tenancy; + await sharded.AssignTenantAsync("ev_alpha", _fixture.DbNames[0], CancellationToken.None); + await sharded.AssignTenantAsync("ev_beta", _fixture.DbNames[1], CancellationToken.None); + + Guid streamA, streamB; + + // Append events + await using (var session = _store.LightweightSession("ev_alpha")) + { + streamA = session.Events.StartStream( + new ShardedTestEvent { Value = "a1" }, + new ShardedTestEvent { Value = "a2" }).Id; + await session.SaveChangesAsync(); + } + + await using (var session = _store.LightweightSession("ev_beta")) + { + streamB = session.Events.StartStream( + new ShardedTestEvent { Value = "b1" }).Id; + await session.SaveChangesAsync(); + } + + // Query events per tenant + await using (var q1 = _store.QuerySession("ev_alpha")) + { + var events = await q1.Events.FetchStreamAsync(streamA); + events.Count.ShouldBe(2); + } + + await using (var q2 = _store.QuerySession("ev_beta")) + { + var events = await q2.Events.FetchStreamAsync(streamB); + events.Count.ShouldBe(1); + } + } + + [Fact] + public async Task build_databases_returns_all_pool_databases() + { + CreateStore(); + + var databases = await _store.Options.Tenancy.BuildDatabases(); + + // Should include the pool lookup DB + 3 shard databases + databases.Count.ShouldBeGreaterThanOrEqualTo(4); + + var dbNames = databases + .Where(d => d.Identifier != "ShardedTenancyPool") + .Select(d => d.Identifier) + .OrderBy(x => x) + .ToArray(); + + dbNames.ShouldBe(_fixture.DbNames.OrderBy(x => x).ToArray()); + } + + [Fact] + public async Task tenant_count_updates_correctly() + { + CreateStore(); + + var sharded = (ShardedTenancy)_store.Options.Tenancy; + + await sharded.AssignTenantAsync("count_t1", _fixture.DbNames[0], CancellationToken.None); + await sharded.AssignTenantAsync("count_t2", _fixture.DbNames[0], CancellationToken.None); + await sharded.AssignTenantAsync("count_t3", _fixture.DbNames[1], CancellationToken.None); + + var databases = await sharded.ListDatabasesAsync(CancellationToken.None); + + var dbA = databases.First(d => d.DatabaseId == _fixture.DbNames[0]); + var dbB = databases.First(d => d.DatabaseId == _fixture.DbNames[1]); + var dbC = databases.First(d => d.DatabaseId == _fixture.DbNames[2]); + + dbA.TenantCount.ShouldBe(2); + dbB.TenantCount.ShouldBe(1); + dbC.TenantCount.ShouldBe(0); + } +} + +public class ShardedTestEvent +{ + public string Value { get; set; } = ""; +}