Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/DocumentDbTests/MultiTenancy/conjoined_multi_tenancy.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DocumentDbTests.SessionMechanics;
using Marten;
using Marten.Schema;
using Marten.Storage;
Expand Down Expand Up @@ -37,12 +39,15 @@ public async Task InitializeAsync()
using (var session = theStore.LightweightSession("Red"))
{
session.Store(targetRed1, targetRed2);
session.Store(new User(), new User());

await session.SaveChangesAsync();
}

using (var session = theStore.LightweightSession("Blue"))
{
session.Store(targetBlue1, targetBlue2);
session.Store(new User(), new User());
await session.SaveChangesAsync();
}
}
Expand All @@ -52,6 +57,24 @@ public Task DisposeAsync()
return Task.CompletedTask;
}

[Fact]
public async Task delete_all_data_for_a_tenant()
{
await theStore.Advanced.DeleteAllTenantDataAsync("Red", CancellationToken.None);

using (var session = theStore.LightweightSession("Red"))
{
(await session.Query<User>().AnyAsync()).ShouldBeFalse();
(await session.Query<Target>().AnyAsync()).ShouldBeFalse();
}

using (var session = theStore.LightweightSession("Blue"))
{
(await session.Query<User>().AnyAsync()).ShouldBeTrue();
(await session.Query<Target>().AnyAsync()).ShouldBeTrue();
}
}

[Fact]
public void cannot_load_by_id_across_tenants()
{
Expand Down
28 changes: 28 additions & 0 deletions src/EventSourcingTests/multi_tenancy_and_event_capture.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventSourcingTests.Aggregation;
using Marten;
Expand Down Expand Up @@ -50,6 +51,33 @@ public async Task capture_events_for_multiple_tenants_in_one_session_as_string_i
eventsTwo.Count.ShouldBe(3);
}

[Fact]
public async Task delete_all_tenant_data_catches_event_data()
{
StoreOptions(opts =>
{
opts.Events.TenancyStyle = TenancyStyle.Conjoined;
opts.Events.StreamIdentity = StreamIdentity.AsString;

}, true);

theSession.Logger = new TestOutputMartenLogger(_output);
theSession.ForTenant("one").Events.StartStream("s1", new AEvent(), new BEvent());
theSession.ForTenant("two").Events.StartStream("s1", new CEvent(), new DEvent(), new QuestStarted());

await theSession.SaveChangesAsync();

await theStore.Advanced.DeleteAllTenantDataAsync("one", CancellationToken.None);

using var queryOne = theStore.QuerySession("one");
(await queryOne.Events.QueryAllRawEvents().AnyAsync()).ShouldBeFalse();

await using var queryTwo = theStore.QuerySession("two");
var eventsTwo = await queryTwo.Events.FetchStreamAsync("s1");

eventsTwo.Count.ShouldBe(3);
}

[Fact]
public async Task capture_events_for_multiple_tenants_in_one_session_as_guid_identified()
{
Expand Down
14 changes: 14 additions & 0 deletions src/Marten/AdvancedOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Marten.Events.Projections;
using Marten.Events.Protected;
using Marten.Events.TestSupport;
using Marten.Internal;
using Marten.Schema;
using Marten.Storage;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -302,6 +303,19 @@ await _store.Options.TenantPartitions.Partitions.DropPartitionFromAllTables(data
token).ConfigureAwait(false);
}

/// <summary>
/// Delete all data for a given tenant id and drop any partitions for that tenant id if
/// using by tenant partitioning managed by Marten
/// </summary>
/// <param name="tenantId"></param>
/// <param name="token"></param>
/// <returns></returns>
public Task DeleteAllTenantDataAsync(string tenantId, CancellationToken token)
{
var cleaner = new TenantDataCleaner(tenantId, _store);
return cleaner.ExecuteAsync(token);
}

/// <summary>
/// Configure and execute a batch masking of protected data for a subset of the events
/// in the event store
Expand Down
152 changes: 152 additions & 0 deletions src/Marten/Internal/TenantDataCleaner.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core;
using Marten.Events.Schema;
using Marten.Schema;
using Marten.Services;
using Marten.Storage;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Weasel.Core;
using Weasel.Postgresql;
using Weasel.Postgresql.Tables.Partitioning;

namespace Marten.Internal;

internal class TenantDataCleaner
{
private readonly string _tenantId;
private readonly DocumentStore _store;

public TenantDataCleaner(string tenantId, DocumentStore store)
{
_tenantId = tenantId;
_store = store;
}

public async Task ExecuteAsync(CancellationToken token)
{
var logger = _store.Options.LogFactory?.CreateLogger<DocumentStore>() ?? NullLogger<DocumentStore>.Instance;

var database = await _store.Storage.FindOrCreateDatabase(_tenantId).ConfigureAwait(false);
if (_store.Options is { TenantPartitions: not null})
{
await _store.Options.TenantPartitions.Partitions.DropPartitionFromAllTablesForValue(
(PostgresqlDatabase)database, logger, _tenantId, token).ConfigureAwait(false);
}

var tables = await database.SchemaTables(token).ConfigureAwait(false);

var builder = new BatchBuilder();

// Clear events out of a single database
if (_store.Options.Events.TenancyStyle == TenancyStyle.Single && _store.Options.Tenancy is not DefaultTenancy)
{
// if all tenant data is in a specific database, just wipe it here
logger.LogInformation("Deleting all event data in the database '{DbIdentifier}' for tenant '{TenantId}'", database.Identifier, _tenantId);
await database.DeleteAllEventDataAsync(token).ConfigureAwait(false);
}

bool foundTables = false;
Action<DbObjectName> deleteTenantedDataIfExists = tableName =>
{
if (tables.Contains(tableName))
{
foundTables = true;
logger.LogInformation("Trying to delete tenant information from table {Table} for tenant {TenantId}", tableName.QualifiedName, _tenantId);
builder.StartNewCommand();
builder.Append($"delete from {tableName.QualifiedName} where tenant_id = ");
builder.AppendParameter(_tenantId);
}
};

Action<DbObjectName> deleteDataIfExists = tableName =>
{
if (tables.Contains(tableName))
{
foundTables = true;
logger.LogInformation("Trying to delete tenant information from table {Table} for tenant {TenantId}", tableName.QualifiedName, _tenantId);
builder.StartNewCommand();
builder.Append($"delete from {tableName.QualifiedName}");
}
};

if (_store.Options.Events.TenancyStyle == TenancyStyle.Conjoined)
{
var eventsSchema = _store.Options.Events.DatabaseSchemaName;
deleteTenantedDataIfExists(new DbObjectName(eventsSchema, "mt_events"));
deleteTenantedDataIfExists(new DbObjectName(eventsSchema, "mt_streams"));
}

if (_store.Options.Tenancy is DefaultTenancy)
{
deleteDataForSingleDatabase(deleteTenantedDataIfExists);
}
else
{
deleteDataForTenantDatabase(deleteTenantedDataIfExists, deleteDataIfExists);
}

if (!foundTables) return;

var batch = builder.Compile();
await using var conn = database.CreateConnection();
await conn.OpenAsync(token).ConfigureAwait(false);
batch.Connection = conn;
await batch.ExecuteNonQueryAsync(token).ConfigureAwait(false);
await conn.CloseAsync().ConfigureAwait(false);
}

private void deleteDataForTenantDatabase(Action<DbObjectName> deleteTenantedDataIfExists, Action<DbObjectName> deleteDataIfExists)
{
var allTypes = _store.Options.Storage.AllDocumentMappings.OfType<IDocumentType>()
.Select(x => x.DocumentType)
.ToList();

var types = allTypes
.TopologicalSort(type => _store.Options.Storage.GetTypeDependencies(type))

// Need to delete data from the downstream tables first!
.Reverse()
.ToArray();

foreach (var type in types)
{
var mapping = _store.Options.Storage.MappingFor(type);
if (mapping.TenancyStyle == TenancyStyle.Conjoined)
{
deleteTenantedDataIfExists(mapping.TableName);
}
else
{
deleteDataIfExists(mapping.TableName);
}
}
}

private void deleteDataForSingleDatabase(Action<DbObjectName> deleteTenantedDataIfExists)
{
var tenantedTypes = _store.Options.Storage.AllDocumentMappings.OfType<IDocumentType>()
.Where(x => x.TenancyStyle == TenancyStyle.Conjoined)
.Select(x => x.DocumentType)
.ToList();

var types = tenantedTypes
.TopologicalSort(type => _store.Options.Storage.GetTypeDependencies(type))

// Need to delete data from the downstream tables first!
.Reverse()
.ToArray();

foreach (var type in types)
{
var mapping = _store.Options.Storage.MappingFor(type);
if (mapping.TenancyStyle == TenancyStyle.Conjoined)
{
deleteTenantedDataIfExists(mapping.TableName);
}
}
}
}
2 changes: 1 addition & 1 deletion src/Marten/Marten.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<PackageReference Include="Polly.Core" Version="8.5.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="8.0.0" />
<PackageReference Include="System.Text.Json" Version="8.0.5" />
<PackageReference Include="Weasel.Postgresql" Version="7.13.5" />
<PackageReference Include="Weasel.Postgresql" Version="7.14.0" />
</ItemGroup>

<!--SourceLink specific settings-->
Expand Down
2 changes: 1 addition & 1 deletion src/Marten/StoreOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ public void MultiTenantedDatabasesWithMasterDatabaseTable(Action<MasterTableTena
IDocumentSchemaResolver IReadOnlyStoreOptions.Schema => this;

string IDocumentSchemaResolver.EventsSchemaName => Events.DatabaseSchemaName;
internal MartenManagedTenantListPartitions TenantPartitions { get; set; }
internal MartenManagedTenantListPartitions? TenantPartitions { get; set; }

string IDocumentSchemaResolver.For<TDocument>(bool qualified)
{
Expand Down
44 changes: 44 additions & 0 deletions src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
using System.Threading;
using System.Threading.Tasks;
using Marten;
using Marten.Events;
using Marten.Metadata;
using Marten.Schema;
using Marten.Storage;
using Marten.Testing.Documents;
using Marten.Testing.Harness;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.FSharp.Control;
using Npgsql;
using Shouldly;
using Weasel.Postgresql;
Expand Down Expand Up @@ -89,6 +91,8 @@ public async Task add_then_remove_tenants_at_runtime()
opts.Schema.For<User>();
}, true);

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

var statuses = await theStore
.Advanced
// This is ensuring that there are tenant id partitions for all multi-tenanted documents
Expand All @@ -112,6 +116,46 @@ public async Task add_then_remove_tenants_at_runtime()
assertTableHasTenantPartitions(userTable, "a1", "a3");
}

[Fact]
public async Task delete_all_tenant_data_will_drop_partitions()
{
StoreOptions(opts =>
{
opts.Events.TenancyStyle = TenancyStyle.Conjoined;

opts.Policies.AllDocumentsAreMultiTenanted();
opts.Policies.PartitionMultiTenantedDocumentsUsingMartenManagement("tenants");

opts.Schema.For<Target>();
opts.Schema.For<User>();
}, true);

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();
await theStore.Storage.Database.EnsureStorageExistsAsync(typeof(IEvent));

var statuses = await theStore
.Advanced
// This is ensuring that there are tenant id partitions for all multi-tenanted documents
// with the named tenant ids
.AddMartenManagedTenantsAsync(CancellationToken.None, "a1", "a2", "a3");

foreach (var status in statuses)
{
status.Status.ShouldBe(PartitionMigrationStatus.Complete);
}

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();
await theStore.Storage.Database.AssertDatabaseMatchesConfigurationAsync();

await theStore.Advanced.DeleteAllTenantDataAsync("a2", CancellationToken.None);

var targetTable = await theStore.Storage.Database.ExistingTableFor(typeof(Target));
assertTableHasTenantPartitions(targetTable, "a1", "a3");

var userTable = await theStore.Storage.Database.ExistingTableFor(typeof(User));
assertTableHasTenantPartitions(userTable, "a1", "a3");
}



[Fact]
Expand Down
Loading