diff --git a/src/DaemonTests/Composites/multi_stage_projections.cs b/src/DaemonTests/Composites/multi_stage_projections.cs index 3976a2c1fe..08c2ec612e 100644 --- a/src/DaemonTests/Composites/multi_stage_projections.cs +++ b/src/DaemonTests/Composites/multi_stage_projections.cs @@ -10,35 +10,34 @@ using JasperFx.Core.Reflection; using JasperFx.Events; using JasperFx.Events.Descriptors; +using JasperFx.MultiTenancy; using Marten; using Marten.Events; +using Marten.Storage; using Shouldly; using Xunit; using Xunit.Abstractions; namespace DaemonTests.Composites; -public class multi_stage_projections: DaemonContext +public class multi_stage_projections(ITestOutputHelper output): DaemonContext(output) { - protected readonly List theBoards = new(); - protected readonly List thePatients = new(); - protected readonly List theProviders = new(); - - public multi_stage_projections(ITestOutputHelper output): base(output) - { - } + private readonly List theBoards = new(); + private readonly List thePatients = new(); + private readonly List theProviders = new(); + private IDocumentSession _compositeSession; private async Task buildSpecialties() { - theSession.Store(new Specialty { Code = "PED", Description = "Pediatrics" }); - theSession.Store(new Specialty { Code = "GEN", Description = "General Practice" }); - theSession.Store(new Specialty { Code = "ENT", Description = "Ear, Nose, and Throat" }); - theSession.Store(new Specialty { Code = "ORTH", Description = "Orthopedics" }); + _compositeSession.Store(new Specialty { Code = "PED", Description = "Pediatrics" }); + _compositeSession.Store(new Specialty { Code = "GEN", Description = "General Practice" }); + _compositeSession.Store(new Specialty { Code = "ENT", Description = "Ear, Nose, and Throat" }); + _compositeSession.Store(new Specialty { Code = "ORTH", Description = "Orthopedics" }); - await theSession.SaveChangesAsync(); + await _compositeSession.SaveChangesAsync(); } - protected async Task buildProviders() + protected async Task buildProviders(string tenantId) { var faker = new Faker() .StrictMode(false) @@ -54,12 +53,12 @@ protected async Task buildProviders() theProviders.Add(faker.Generate()); } - await theStore.BulkInsertAsync(theProviders); + await theStore.BulkInsertAsync(tenantId, theProviders); } protected async Task startBoards() { - var specialties = await theSession.Query().ToListAsync(); + var specialties = await _compositeSession.Query().ToListAsync(); foreach (var specialty in specialties) { var tx = new BoardOpened($"Texas {specialty.Description}", DateOnly.FromDateTime(DateTime.Today), @@ -75,25 +74,25 @@ protected async Task startBoards() ["AR"], [specialty.Code]); - theBoards.Add(theSession.Events.StartStream(tx).Id); - theBoards.Add(theSession.Events.StartStream(ar).Id); - theBoards.Add(theSession.Events.StartStream(ok).Id); + theBoards.Add(_compositeSession.Events.StartStream(tx).Id); + theBoards.Add(_compositeSession.Events.StartStream(ar).Id); + theBoards.Add(_compositeSession.Events.StartStream(ok).Id); } - await theSession.SaveChangesAsync(); + await _compositeSession.SaveChangesAsync(); } protected async Task> fetchOpenBoards() { var list = new List(); - foreach (var guid in theBoards) list.Add(await theSession.Events.FetchLatest(guid)); + foreach (var guid in theBoards) list.Add(await _compositeSession.Events.FetchLatest(guid)); return list; } protected async Task startShifts() { - var providers = await theSession.Query().ToListAsync(); + var providers = await _compositeSession.Query().ToListAsync(); var boards = await fetchOpenBoards(); foreach (var provider in providers) @@ -105,20 +104,20 @@ protected async Task startShifts() { if (Random.Shared.NextDouble() < .2) { - theSession.Events.StartStream(new ProviderJoined(board.Id, provider.Id)); + _compositeSession.Events.StartStream(new ProviderJoined(board.Id, provider.Id)); } else { - theSession.Events.StartStream(new ProviderJoined(board.Id, provider.Id), + _compositeSession.Events.StartStream(new ProviderJoined(board.Id, provider.Id), new ProviderReady()); } } } - await theSession.SaveChangesAsync(); + await _compositeSession.SaveChangesAsync(); } - protected async Task buildPatients() + protected async Task buildPatients(string tenantId) { var faker = new Faker() .StrictMode(false) @@ -130,18 +129,18 @@ protected async Task buildPatients() thePatients.Add(faker.Generate()); } - await theStore.BulkInsertAsync(thePatients); + await theStore.BulkInsertAsync(tenantId, thePatients); } private async Task startAppointments() { - var patients = await theSession.Query().ToListAsync(); - var specialties = await theSession.Query().Select(x => x.Code).ToListAsync(); + var patients = await _compositeSession.Query().ToListAsync(); + var specialties = await _compositeSession.Query().Select(x => x.Code).ToListAsync(); var states = new[] { "TX", "AR", "OK" }; var faker = new Faker(); - var boards = await theSession.Query().ToListAsync(); + var boards = await _compositeSession.Query().ToListAsync(); foreach (var patient in patients) { @@ -153,34 +152,45 @@ private async Task startAppointments() if (board != null) { - theSession.Events.StartStream(requested, new AppointmentRouted(board.Id)); + _compositeSession.Events.StartStream(requested, new AppointmentRouted(board.Id)); } else { - theSession.Events.StartStream(requested); + _compositeSession.Events.StartStream(requested); } } - await theSession.SaveChangesAsync(); + await _compositeSession.SaveChangesAsync(); } - private async Task setUpData() + private async Task setUpData(string tenantId) { await buildSpecialties(); await startBoards(); - await buildPatients(); - await buildProviders(); + await buildPatients(tenantId); + await buildProviders(tenantId); await startShifts(); } - [Fact] - public async Task end_to_end() + [Theory] + [InlineData(TenancyStyle.Single)] + [InlineData(TenancyStyle.Conjoined)] + public async Task end_to_end(TenancyStyle tenancyStyle) { StoreOptions(opts => { #region sample_defining_a_composite_projection + if(tenancyStyle == TenancyStyle.Conjoined) + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Policies.AllDocumentsAreMultiTenantedWithPartitioning(x => + { + x.ByHash(Enumerable.Range(1, 2).Select(i => $"b_{i}").ToArray()); + }); + opts.Advanced.DefaultTenantUsageEnabled = false; + } opts.Projections.CompositeProjectionFor("TeleHealth", projection => { projection.Add(); @@ -194,7 +204,8 @@ public async Task end_to_end() #endregion }); - + var tenantId = tenancyStyle == TenancyStyle.Conjoined ? "some_tenant" : TenantId.DefaultTenantId; + _compositeSession = theStore.LightweightSession(tenantId); var usage = await theStore.As().TryCreateUsage(CancellationToken.None); verifyDescription(usage); @@ -203,7 +214,7 @@ public async Task end_to_end() sql.ShouldContain("mt_doc_board"); sql.ShouldContain("mt_doc_boardsummary"); - await setUpData(); + await setUpData(tenantId); using var daemon = await StartDaemon(); await daemon.StartAllAsync(); @@ -211,31 +222,31 @@ public async Task end_to_end() await daemon.WaitForNonStaleData(30.Seconds()); // All the Boards exist - (await theSession.Query().CountAsync()).ShouldBe(12); + (await _compositeSession.Query().CountAsync()).ShouldBe(12); // Built up ProviderShifts - (await theSession.Query().CountAsync()).ShouldBeGreaterThan(0); + (await _compositeSession.Query().CountAsync()).ShouldBeGreaterThan(0); await startAppointments(); await daemon.WaitForNonStaleData(30.Seconds()); // Got appointments - (await theSession.Query().CountAsync()).ShouldBeGreaterThan(0); + (await _compositeSession.Query().CountAsync()).ShouldBeGreaterThan(0); // Got details from the 2nd stage projection! - (await theSession.Query().CountAsync()).ShouldBeGreaterThan(0); + (await _compositeSession.Query().CountAsync()).ShouldBeGreaterThan(0); // See the downstream BoardSummary too! - (await theSession.Query().CountAsync()).ShouldBeGreaterThan(0); - foreach (var boardSummary in await theSession.Query().ToListAsync()) + (await _compositeSession.Query().CountAsync()).ShouldBeGreaterThan(0); + foreach (var boardSummary in await _compositeSession.Query().ToListAsync()) { boardSummary.Board.ShouldNotBeNull(); } #region sample_querying_for_non_stale_projection_data - // theSession is an IDocumentSession - var summaries = await theSession + // _compositeSession is an IDocumentSession + var summaries = await _compositeSession // This makes Marten "wait" until the async daemon progress for whatever projection // is building the BoardSummary document to catch up to the point at which the // event store was at when you first tired to execute the LINQ query @@ -257,15 +268,15 @@ public async Task end_to_end() await daemon.StartAllAsync(); // Now, let's cancel an appointment and see that AppointmentDetails is also deleted - var appointmentId = (await theSession.Query().FirstAsync()).Id; + var appointmentId = (await _compositeSession.Query().FirstAsync()).Id; - theSession.Events.Append(appointmentId, new AppointmentCancelled()); - await theSession.SaveChangesAsync(); + _compositeSession.Events.Append(appointmentId, new AppointmentCancelled()); + await _compositeSession.SaveChangesAsync(); await theStore.WaitForNonStaleProjectionDataAsync(5.Seconds()); - (await theSession.LoadAsync(appointmentId)).ShouldBeNull(); - (await theSession.LoadAsync(appointmentId)).ShouldBeNull(); + (await _compositeSession.LoadAsync(appointmentId)).ShouldBeNull(); + (await _compositeSession.LoadAsync(appointmentId)).ShouldBeNull(); @@ -287,7 +298,7 @@ private static void verifyDescription(EventStoreUsage usage) private async Task assignProvidersToAppointments() { - var boards = await theSession.Query().ToListAsync(); + var boards = await _compositeSession.Query().ToListAsync(); foreach (var board in boards) { @@ -297,12 +308,12 @@ private async Task assignProvidersToAppointments() var provider = board.ActiveProviders.Values.Where(x => x.Status == ProviderStatus.Ready).FirstOrDefault(); if (provider != null) { - theSession.Events.Append(provider.Id, new AppointmentAssigned(appointment.Id)); - theSession.Events.Append(appointment.Id, new ProviderAssigned(provider.ProviderId)); + _compositeSession.Events.Append(provider.Id, new AppointmentAssigned(appointment.Id)); + _compositeSession.Events.Append(appointment.Id, new ProviderAssigned(provider.ProviderId)); } } } - await theSession.SaveChangesAsync(); + await _compositeSession.SaveChangesAsync(); } }