Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 68 additions & 57 deletions src/DaemonTests/Composites/multi_stage_projections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,34 @@
using JasperFx.Core.Reflection;
using JasperFx.Events;
using JasperFx.Events.Descriptors;
using JasperFx.MultiTenancy;
using Marten;
using Marten.Events;
using Marten.Storage;
using Shouldly;
using Xunit;
using Xunit.Abstractions;

namespace DaemonTests.Composites;

public class multi_stage_projections: DaemonContext
public class multi_stage_projections(ITestOutputHelper output): DaemonContext(output)
{
protected readonly List<Guid> theBoards = new();
protected readonly List<Patient> thePatients = new();
protected readonly List<Provider> theProviders = new();

public multi_stage_projections(ITestOutputHelper output): base(output)
{
}
private readonly List<Guid> theBoards = new();
private readonly List<Patient> thePatients = new();
private readonly List<Provider> theProviders = new();
private IDocumentSession _compositeSession;

private async Task buildSpecialties()
{
theSession.Store(new Specialty { Code = "PED", Description = "Pediatrics" });
theSession.Store(new Specialty { Code = "GEN", Description = "General Practice" });
theSession.Store(new Specialty { Code = "ENT", Description = "Ear, Nose, and Throat" });
theSession.Store(new Specialty { Code = "ORTH", Description = "Orthopedics" });
_compositeSession.Store(new Specialty { Code = "PED", Description = "Pediatrics" });
_compositeSession.Store(new Specialty { Code = "GEN", Description = "General Practice" });
_compositeSession.Store(new Specialty { Code = "ENT", Description = "Ear, Nose, and Throat" });
_compositeSession.Store(new Specialty { Code = "ORTH", Description = "Orthopedics" });

await theSession.SaveChangesAsync();
await _compositeSession.SaveChangesAsync();
}

protected async Task buildProviders()
protected async Task buildProviders(string tenantId)
{
var faker = new Faker<Provider>()
.StrictMode(false)
Expand All @@ -54,12 +53,12 @@ protected async Task buildProviders()
theProviders.Add(faker.Generate());
}

await theStore.BulkInsertAsync(theProviders);
await theStore.BulkInsertAsync(tenantId, theProviders);
}

protected async Task startBoards()
{
var specialties = await theSession.Query<Specialty>().ToListAsync();
var specialties = await _compositeSession.Query<Specialty>().ToListAsync();
foreach (var specialty in specialties)
{
var tx = new BoardOpened($"Texas {specialty.Description}", DateOnly.FromDateTime(DateTime.Today),
Expand All @@ -75,25 +74,25 @@ protected async Task startBoards()
["AR"], [specialty.Code]);


theBoards.Add(theSession.Events.StartStream<Board>(tx).Id);
theBoards.Add(theSession.Events.StartStream<Board>(ar).Id);
theBoards.Add(theSession.Events.StartStream<Board>(ok).Id);
theBoards.Add(_compositeSession.Events.StartStream<Board>(tx).Id);
theBoards.Add(_compositeSession.Events.StartStream<Board>(ar).Id);
theBoards.Add(_compositeSession.Events.StartStream<Board>(ok).Id);
}

await theSession.SaveChangesAsync();
await _compositeSession.SaveChangesAsync();
}

protected async Task<List<Board>> fetchOpenBoards()
{
var list = new List<Board>();
foreach (var guid in theBoards) list.Add(await theSession.Events.FetchLatest<Board>(guid));
foreach (var guid in theBoards) list.Add(await _compositeSession.Events.FetchLatest<Board>(guid));

return list;
}

protected async Task startShifts()
{
var providers = await theSession.Query<Provider>().ToListAsync();
var providers = await _compositeSession.Query<Provider>().ToListAsync();
var boards = await fetchOpenBoards();

foreach (var provider in providers)
Expand All @@ -105,20 +104,20 @@ protected async Task startShifts()
{
if (Random.Shared.NextDouble() < .2)
{
theSession.Events.StartStream<ProviderShift>(new ProviderJoined(board.Id, provider.Id));
_compositeSession.Events.StartStream<ProviderShift>(new ProviderJoined(board.Id, provider.Id));
}
else
{
theSession.Events.StartStream<ProviderShift>(new ProviderJoined(board.Id, provider.Id),
_compositeSession.Events.StartStream<ProviderShift>(new ProviderJoined(board.Id, provider.Id),
new ProviderReady());
}
}
}

await theSession.SaveChangesAsync();
await _compositeSession.SaveChangesAsync();
}

protected async Task buildPatients()
protected async Task buildPatients(string tenantId)
{
var faker = new Faker<Patient>()
.StrictMode(false)
Expand All @@ -130,18 +129,18 @@ protected async Task buildPatients()
thePatients.Add(faker.Generate());
}

await theStore.BulkInsertAsync(thePatients);
await theStore.BulkInsertAsync(tenantId, thePatients);
}

private async Task startAppointments()
{
var patients = await theSession.Query<Patient>().ToListAsync();
var specialties = await theSession.Query<Specialty>().Select(x => x.Code).ToListAsync();
var patients = await _compositeSession.Query<Patient>().ToListAsync();
var specialties = await _compositeSession.Query<Specialty>().Select(x => x.Code).ToListAsync();
var states = new[] { "TX", "AR", "OK" };

var faker = new Faker();

var boards = await theSession.Query<Board>().ToListAsync();
var boards = await _compositeSession.Query<Board>().ToListAsync();

foreach (var patient in patients)
{
Expand All @@ -153,34 +152,45 @@ private async Task startAppointments()

if (board != null)
{
theSession.Events.StartStream<Appointment>(requested, new AppointmentRouted(board.Id));
_compositeSession.Events.StartStream<Appointment>(requested, new AppointmentRouted(board.Id));
}
else
{
theSession.Events.StartStream<Appointment>(requested);
_compositeSession.Events.StartStream<Appointment>(requested);
}
}

await theSession.SaveChangesAsync();
await _compositeSession.SaveChangesAsync();
}

private async Task setUpData()
private async Task setUpData(string tenantId)
{
await buildSpecialties();
await startBoards();
await buildPatients();
await buildProviders();
await buildPatients(tenantId);
await buildProviders(tenantId);

await startShifts();
}

[Fact]
public async Task end_to_end()
[Theory]
[InlineData(TenancyStyle.Single)]
[InlineData(TenancyStyle.Conjoined)]
public async Task end_to_end(TenancyStyle tenancyStyle)
{
StoreOptions(opts =>
{
#region sample_defining_a_composite_projection

if(tenancyStyle == TenancyStyle.Conjoined)
{
opts.Events.TenancyStyle = TenancyStyle.Conjoined;
opts.Policies.AllDocumentsAreMultiTenantedWithPartitioning(x =>
{
x.ByHash(Enumerable.Range(1, 2).Select(i => $"b_{i}").ToArray());
});
opts.Advanced.DefaultTenantUsageEnabled = false;
}
opts.Projections.CompositeProjectionFor("TeleHealth", projection =>
{
projection.Add<ProviderShiftProjection>();
Expand All @@ -194,7 +204,8 @@ public async Task end_to_end()

#endregion
});

var tenantId = tenancyStyle == TenancyStyle.Conjoined ? "some_tenant" : TenantId.DefaultTenantId;
_compositeSession = theStore.LightweightSession(tenantId);
var usage = await theStore.As<IEventStore>().TryCreateUsage(CancellationToken.None);
verifyDescription(usage);

Expand All @@ -203,39 +214,39 @@ public async Task end_to_end()
sql.ShouldContain("mt_doc_board");
sql.ShouldContain("mt_doc_boardsummary");

await setUpData();
await setUpData(tenantId);

using var daemon = await StartDaemon();
await daemon.StartAllAsync();

await daemon.WaitForNonStaleData(30.Seconds());

// All the Boards exist
(await theSession.Query<Board>().CountAsync()).ShouldBe(12);
(await _compositeSession.Query<Board>().CountAsync()).ShouldBe(12);

// Built up ProviderShifts
(await theSession.Query<ProviderShift>().CountAsync()).ShouldBeGreaterThan(0);
(await _compositeSession.Query<ProviderShift>().CountAsync()).ShouldBeGreaterThan(0);

await startAppointments();
await daemon.WaitForNonStaleData(30.Seconds());

// Got appointments
(await theSession.Query<Appointment>().CountAsync()).ShouldBeGreaterThan(0);
(await _compositeSession.Query<Appointment>().CountAsync()).ShouldBeGreaterThan(0);

// Got details from the 2nd stage projection!
(await theSession.Query<AppointmentDetails>().CountAsync()).ShouldBeGreaterThan(0);
(await _compositeSession.Query<AppointmentDetails>().CountAsync()).ShouldBeGreaterThan(0);

// See the downstream BoardSummary too!
(await theSession.Query<BoardSummary>().CountAsync()).ShouldBeGreaterThan(0);
foreach (var boardSummary in await theSession.Query<BoardSummary>().ToListAsync())
(await _compositeSession.Query<BoardSummary>().CountAsync()).ShouldBeGreaterThan(0);
foreach (var boardSummary in await _compositeSession.Query<BoardSummary>().ToListAsync())
{
boardSummary.Board.ShouldNotBeNull();
}

#region sample_querying_for_non_stale_projection_data

// theSession is an IDocumentSession
var summaries = await theSession
// _compositeSession is an IDocumentSession
var summaries = await _compositeSession
// This makes Marten "wait" until the async daemon progress for whatever projection
// is building the BoardSummary document to catch up to the point at which the
// event store was at when you first tired to execute the LINQ query
Expand All @@ -257,15 +268,15 @@ public async Task end_to_end()

await daemon.StartAllAsync();
// Now, let's cancel an appointment and see that AppointmentDetails is also deleted
var appointmentId = (await theSession.Query<Appointment>().FirstAsync()).Id;
var appointmentId = (await _compositeSession.Query<Appointment>().FirstAsync()).Id;

theSession.Events.Append(appointmentId, new AppointmentCancelled());
await theSession.SaveChangesAsync();
_compositeSession.Events.Append(appointmentId, new AppointmentCancelled());
await _compositeSession.SaveChangesAsync();

await theStore.WaitForNonStaleProjectionDataAsync(5.Seconds());

(await theSession.LoadAsync<Appointment>(appointmentId)).ShouldBeNull();
(await theSession.LoadAsync<AppointmentDetails>(appointmentId)).ShouldBeNull();
(await _compositeSession.LoadAsync<Appointment>(appointmentId)).ShouldBeNull();
(await _compositeSession.LoadAsync<AppointmentDetails>(appointmentId)).ShouldBeNull();



Expand All @@ -287,7 +298,7 @@ private static void verifyDescription(EventStoreUsage usage)

private async Task assignProvidersToAppointments()
{
var boards = await theSession.Query<BoardSummary>().ToListAsync();
var boards = await _compositeSession.Query<BoardSummary>().ToListAsync();

foreach (var board in boards)
{
Expand All @@ -297,12 +308,12 @@ private async Task assignProvidersToAppointments()
var provider = board.ActiveProviders.Values.Where(x => x.Status == ProviderStatus.Ready).FirstOrDefault();
if (provider != null)
{
theSession.Events.Append(provider.Id, new AppointmentAssigned(appointment.Id));
theSession.Events.Append(appointment.Id, new ProviderAssigned(provider.ProviderId));
_compositeSession.Events.Append(provider.Id, new AppointmentAssigned(appointment.Id));
_compositeSession.Events.Append(appointment.Id, new ProviderAssigned(provider.ProviderId));
}
}
}

await theSession.SaveChangesAsync();
await _compositeSession.SaveChangesAsync();
}
}
Loading