Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions src/Persistence/SqlServerTests/MultiTenancy/MultiTenancyContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using IntegrationTests;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Hosting;
using Weasel.SqlServer;
using Weasel.SqlServer.Tables;
using Wolverine;
using Wolverine.RDBMS;

namespace SqlServerTests.MultiTenancy;

public abstract class MultiTenancyContext : SqlServerContext, IAsyncLifetime
{
protected IHost theHost;
protected string tenant1ConnectionString;
protected string tenant2ConnectionString;
protected string tenant3ConnectionString;

public new async Task InitializeAsync()
{
await using var conn = new SqlConnection(Servers.SqlServerConnectionString);
await conn.OpenAsync();

tenant1ConnectionString = await CreateDatabaseIfNotExists(conn, "db1");
tenant2ConnectionString = await CreateDatabaseIfNotExists(conn, "db2");
tenant3ConnectionString = await CreateDatabaseIfNotExists(conn, "db3");

await cleanItems(tenant1ConnectionString);
await cleanItems(tenant2ConnectionString);
await cleanItems(tenant3ConnectionString);

theHost = await Host.CreateDefaultBuilder()
.UseWolverine(opts => { configureWolverine(opts); }).StartAsync();

await onStartup();
}

protected abstract void configureWolverine(WolverineOptions opts);

protected virtual Task onStartup() => Task.CompletedTask;

public new async Task DisposeAsync()
{
await theHost.StopAsync();
}

private async Task<string> CreateDatabaseIfNotExists(SqlConnection conn, string databaseName)
{
var builder = new SqlConnectionStringBuilder(Servers.SqlServerConnectionString);

var exists = await DatabaseExistsAsync(conn, databaseName);
if (!exists)
{
await conn.CreateCommand($"CREATE DATABASE [{databaseName}]").ExecuteNonQueryAsync();
}

builder.InitialCatalog = databaseName;

return builder.ConnectionString;
}

private static async Task<bool> DatabaseExistsAsync(SqlConnection conn, string databaseName)
{
var cmd = conn.CreateCommand($"SELECT DB_ID(@databaseName)");
cmd.Parameters.AddWithValue("@databaseName", databaseName);
var result = await cmd.ExecuteScalarAsync();
return result != null && result != DBNull.Value;
}

private async Task cleanItems(string connectionString)
{
var table = new Table("items");
table.AddColumn<Guid>("Id").AsPrimaryKey();
table.AddColumn<string>("Name");

await using var conn = new SqlConnection(connectionString);
await conn.OpenAsync();

await table.ApplyChangesAsync(conn);

await conn.CreateCommand("delete from items").ExecuteNonQueryAsync();
await conn.CloseAsync();
}
}
159 changes: 159 additions & 0 deletions src/Persistence/SqlServerTests/MultiTenancy/static_multi_tenancy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
using IntegrationTests;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Shouldly;
using Weasel.Core.CommandLine;
using Weasel.Core.Migrations;
using Wolverine;
using Wolverine.Persistence.Durability;
using Wolverine.SqlServer;
using Wolverine.SqlServer.Persistence;
using Wolverine.RDBMS;
using Xunit.Abstractions;

namespace SqlServerTests.MultiTenancy;

public class static_multi_tenancy : MultiTenancyContext
{
private readonly ITestOutputHelper _output;

public static_multi_tenancy(ITestOutputHelper output)
{
_output = output;
}

protected override void configureWolverine(WolverineOptions opts)
{
opts.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString, "static_multi_tenancy2")
.RegisterStaticTenants(tenants =>
{
tenants.Register("red", tenant1ConnectionString);
tenants.Register("blue", tenant2ConnectionString);
tenants.Register("green", tenant3ConnectionString);
});

opts.Services.AddResourceSetupOnStartup();

opts.AddSagaType<BlueSaga>("blues");
opts.AddSagaType<RedSaga>("reds");
}

[Fact]
public async Task registers_a_multi_tenanted_message_store()
{
var store = theHost.Services.GetRequiredService<IMessageStore>()
.ShouldBeOfType<MultiTenantedMessageStore>();

store.Main.Describe().DatabaseName.ShouldBe("master");

(await store.Source.FindAsync("red")).Describe().DatabaseName.ShouldBe("db1");
(await store.Source.FindAsync("blue")).Describe().DatabaseName.ShouldBe("db2");
(await store.Source.FindAsync("green")).Describe().DatabaseName.ShouldBe("db3");
}

[Fact]
public async Task exposes_every_database_in_all_active()
{
var store = theHost.Services.GetRequiredService<IMessageStore>()
.ShouldBeOfType<MultiTenantedMessageStore>();

var all = store.ActiveDatabases();
all.Count.ShouldBe(4);
}

[Fact]
public async Task all_databases_are_exposed_to_weasel()
{
var databases = await new WeaselInput().FilterDatabases(theHost);

var store = theHost.Services.GetRequiredService<IMessageStore>()
.ShouldBeOfType<MultiTenantedMessageStore>();

databases.ShouldContain((IDatabase)store.Main);
databases.ShouldContain((IDatabase)await store.Source.FindAsync("red"));
databases.ShouldContain((IDatabase)await store.Source.FindAsync("blue"));
databases.ShouldContain((IDatabase)await store.Source.FindAsync("green"));
}

[Fact]
public async Task the_main_database_tables_include_node_persistence()
{
var store = theHost.Services.GetRequiredService<IMessageStore>()
.ShouldBeOfType<MultiTenantedMessageStore>();
var tables = await store.Main.As<SqlServerMessageStore>().SchemaTables();

var expected = @"
static_multi_tenancy2.blues
static_multi_tenancy2.reds
static_multi_tenancy2.wolverine_agent_restrictions
static_multi_tenancy2.wolverine_control_queue
static_multi_tenancy2.wolverine_dead_letters
static_multi_tenancy2.wolverine_incoming_envelopes
static_multi_tenancy2.wolverine_node_assignments
static_multi_tenancy2.wolverine_node_records
static_multi_tenancy2.wolverine_nodes
static_multi_tenancy2.wolverine_outgoing_envelopes
".ReadLines().Where(x => x.IsNotEmpty()).ToArray();

tables.OrderBy(x => x.QualifiedName).Select(x => x.QualifiedName).ToArray()
.ShouldBe(expected);
}

[Fact]
public async Task the_tenant_databases_have_only_envelope_and_saga_tables()
{
var store = theHost.Services.GetRequiredService<IMessageStore>()
.ShouldBeOfType<MultiTenantedMessageStore>();

await store.Source.RefreshAsync();

var expected = @"
static_multi_tenancy2.blues
static_multi_tenancy2.reds
static_multi_tenancy2.wolverine_dead_letters
static_multi_tenancy2.wolverine_incoming_envelopes
static_multi_tenancy2.wolverine_outgoing_envelopes
".ReadLines().Where(x => x.IsNotEmpty()).ToArray();

foreach (var tenantId in new string[] { "red", "blue", "green" })
{
var messageStore = await store.Source.FindAsync(tenantId);
var tables = await messageStore.As<SqlServerMessageStore>().SchemaTables();

tables.OrderBy(x => x.QualifiedName).Select(x => x.QualifiedName).ToArray()
.ShouldBe(expected);
}
}

[Fact]
public async Task have_all_the_correct_durability_agents()
{
var store = theHost.Services.GetRequiredService<IMessageStore>()
.ShouldBeOfType<MultiTenantedMessageStore>();

var expected = @"
wolverinedb://sqlserver/localhost/master/static_multi_tenancy2
wolverinedb://sqlserver/localhost/db1/static_multi_tenancy2
wolverinedb://sqlserver/localhost/db3/static_multi_tenancy2
wolverinedb://sqlserver/localhost/db2/static_multi_tenancy2
".ReadLines().Where(x => x.IsNotEmpty()).Select(x => new Uri(x)).OrderBy(x => x.ToString()).ToArray();


var agents = await store.AllKnownAgentsAsync();
agents.OrderBy(x => x.ToString()).ToArray().ShouldBe(expected);
}
}

public class RedSaga : Saga
{
public Guid Id { get; set; }
public string Name { get; set; }
}

public class BlueSaga : Saga
{
public Guid Id { get; set; }
public string Name { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public async Task InitializeAsync()
[Fact]
public async Task cascaded_response_with_outbox()
{
var tracked = await _host.TrackActivity().WaitForMessageToBeReceivedAt<SqlServerPong>(_host).InvokeMessageAndWaitAsync(new SqlServerPing("first"));
var tracked = await _host.TrackActivity().Timeout(15.Seconds()).WaitForMessageToBeReceivedAt<SqlServerPong>(_host).InvokeMessageAndWaitAsync(new SqlServerPing("first"));

tracked.FindSingleTrackedMessageOfType<SqlServerPong>()
.Name.ShouldBe("first");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,40 @@ protected override void writeMessageIdArrayQueryList(DbCommandBuilder builder, G
}
}

/// <summary>
/// Fetch a list of the existing tables in the database filtered by schemas
/// </summary>
/// <param name="ct"></param>
/// <returns></returns>
// TODO -- get this moved to Weasel. Shouldn't be here, but Claude brute forced it
public async Task<IReadOnlyList<DbObjectName>> SchemaTables(CancellationToken ct = default)
{
var schemaNames = AllSchemaNames();

await using var conn = CreateConnection();
await conn.OpenAsync(ct).ConfigureAwait(false);

var tables = new List<DbObjectName>();
foreach (var schemaName in schemaNames)
{
var sql = $"SELECT table_schema, table_name FROM information_schema.tables WHERE table_schema = @schema";
var cmd = conn.CreateCommand(sql).With("schema", schemaName);

await using var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false);
while (await reader.ReadAsync(ct).ConfigureAwait(false))
{
var schema = reader.GetString(0);
var name = reader.GetString(1);
tables.Add(new DbObjectName(schema, name));
}

await reader.CloseAsync().ConfigureAwait(false);
}

await conn.CloseAsync().ConfigureAwait(false);
return tables;
}

protected override INodeAgentPersistence? buildNodeStorage(DatabaseSettings databaseSettings,
DbDataSource dataSource)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using Wolverine.Transports.Sending;

namespace Wolverine.SqlServer.Transport;

internal interface ISqlServerQueueSender : ISender
{
Task ScheduleRetryAsync(Envelope envelope, CancellationToken cancellationToken);
}
Loading
Loading