Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
<PackageVersion Include="Microsoft.Extensions.Logging" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="10.0.0" />
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="10.0.0" />
<PackageVersion Include="Microsoft.Extensions.TimeProvider.Testing" Version="8.3.0" />
<PackageVersion Include="Microsoft.FeatureManagement" Version="3.2.0" />
<PackageVersion Include="Microsoft.IO.RecyclableMemoryStream" Version="3.0.1" />
Expand Down
47 changes: 47 additions & 0 deletions src/Marten.ScaleTesting/Commands/SeedCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using JasperFx;
using JasperFx.CommandLine;
using Marten.ScaleTesting.Seeding;
using Spectre.Console;

namespace Marten.ScaleTesting.Commands;

[Description("Seed N tenants × M events each under conjoined multi-tenancy. Idempotent — rerun is a no-op if the target counts are already met.")]
public sealed class SeedCommand: JasperFxAsyncCommand<SeedInput>
{
public override async Task<bool> Execute(SeedInput input)
{
using var host = input.BuildHost();
var store = host.DocumentStore();

if (input.WipeFlag)
{
AnsiConsole.MarkupLine("[red]--wipe specified — destroying all data before seeding.[/]");
await store.Advanced.Clean.CompletelyRemoveAllAsync().ConfigureAwait(false);
}

await store.Storage.ApplyAllConfiguredChangesToDatabaseAsync().ConfigureAwait(false);

var options = input.ToSeedOptions();
var seeder = new EventSeeder(store, options);
var report = await seeder.RunAsync(CancellationToken.None).ConfigureAwait(false);

if (report.AlreadySeeded)
{
AnsiConsole.MarkupLine("[yellow]Nothing to do — the target tenants already have enough events. Pass --wipe to start over.[/]");
return true;
}

AnsiConsole.MarkupLine($"[green]Seed complete.[/]");
var table = new Table().AddColumn("Metric").AddColumn(new TableColumn("Value").RightAligned());
table.AddRow("Batches written", report.Batches.ToString("N0"));
table.AddRow("Events written", report.Events.ToString("N0"));
table.AddRow("Elapsed", $"{report.Elapsed.TotalSeconds:N1}s");
table.AddRow("Throughput (events/sec)", (report.Events / Math.Max(0.001, report.Elapsed.TotalSeconds)).ToString("N0"));
AnsiConsole.Write(table);

var stats = await store.Advanced.FetchEventStoreStatistics().ConfigureAwait(false);
AnsiConsole.MarkupLine($"[blue]Event store now holds {stats.EventCount:N0} events across {stats.StreamCount:N0} streams.[/]");

return true;
}
}
33 changes: 33 additions & 0 deletions src/Marten.ScaleTesting/Commands/SeedInput.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using JasperFx;
using JasperFx.CommandLine;
using Marten.ScaleTesting.Seeding;

namespace Marten.ScaleTesting.Commands;

public sealed class SeedInput: NetCoreInput
{
[Description("Number of tenants to seed under conjoined multi-tenancy. Default: 50.")]
public int TenantsFlag { get; set; } = 50;

[Description("Events per tenant. Default: 400,000 (×50 tenants ≈ 20M events).")]
public int EventsPerTenantFlag { get; set; } = 400_000;

[Description("Number of hash partition buckets for the conjoined tenancy. Default: 8.")]
public int BucketsFlag { get; set; } = 8;

[Description("Parallel writer task count. Default: 8.")]
public int WritersFlag { get; set; } = 8;

[Description("Root seed for deterministic stream generation. Default: 42.")]
public int SeedFlag { get; set; } = 42;

[Description("Wipe the event store schema before seeding. Default: false (idempotent rerun).")]
public bool WipeFlag { get; set; }

public SeedOptions ToSeedOptions() => new(
TenantCount: TenantsFlag,
EventsPerTenant: EventsPerTenantFlag,
HashBuckets: BucketsFlag,
WriterTasks: WritersFlag,
Seed: SeedFlag);
}
16 changes: 16 additions & 0 deletions src/Marten.ScaleTesting/ConnectionSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace Marten.ScaleTesting;

/// <summary>
/// Mirrors <c>DaemonTests.TeleHealth.ConnectionSource</c> but copy-pasted so the
/// dev-tool stays self-contained (no <c>ProjectReference</c> back into
/// DaemonTests). Override at runtime via the <c>marten_testing_database</c>
/// environment variable, same convention as the test harness.
/// </summary>
internal static class ConnectionSource
{
public const string DefaultConnectionString =
"Host=localhost;Port=5432;Database=marten_testing;Username=postgres;password=postgres";

public static string ConnectionString =>
Environment.GetEnvironmentVariable("marten_testing_database") ?? DefaultConnectionString;
}
93 changes: 93 additions & 0 deletions src/Marten.ScaleTesting/Domain/Appointments.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Lifted from src/DaemonTests/TeleHealth/Appointments.cs (#4666 Phase A).
// Copy-paste, not ProjectReference — the ScaleTesting harness owns its domain
// fork so we can extend without disturbing the DaemonTests fixtures.
using JasperFx.Events;
using Marten.Events.Aggregation;

namespace Marten.ScaleTesting.Domain;

public record AppointmentRequested(Guid PatientId, string StateCode, string SpecialtyCode);
public record AppointmentRouted(Guid BoardId, string ReasonCode);
public record AppointmentExternalIdentifierAssigned(Guid AppointmentId, Guid ExternalId);
public record ProviderAssigned(Guid ProviderId);
public record AppointmentStarted;
public record AppointmentCompleted;
public record AppointmentEstimated(DateTimeOffset Time);
public record AppointmentCancelled;

public enum AppointmentStatus
{
Requested,
Scheduled,
Started,
Completed
}

public class Appointment
{
public Guid Id { get; set; }
public long Version { get; set; }
public DateTimeOffset Created { get; set; }
public string SpecialtyCode { get; set; } = string.Empty;
public Licensing? Requirement { get; set; }
public AppointmentStatus Status { get; set; }
public Guid? ProviderId { get; set; }
public DateTimeOffset? EstimatedTime { get; set; }
public Guid? BoardId { get; set; }
public Guid PatientId { get; set; }
public DateTimeOffset? Started { get; set; }
public DateTimeOffset? Completed { get; set; }
}

public partial class AppointmentProjection: SingleStreamProjection<Appointment, Guid>
{
public AppointmentProjection()
{
Options.CacheLimitPerTenant = 1000;
}

public override Appointment? Evolve(Appointment? snapshot, Guid id, IEvent e)
{
switch (e.Data)
{
case AppointmentRequested requested:
snapshot = new Appointment
{
Status = AppointmentStatus.Requested,
Requirement = new Licensing(requested.SpecialtyCode, requested.StateCode),
PatientId = requested.PatientId,
Created = e.Timestamp,
SpecialtyCode = requested.SpecialtyCode
};
break;

case AppointmentRouted routed:
snapshot!.BoardId = routed.BoardId;
break;

case ProviderAssigned assigned:
snapshot!.ProviderId = assigned.ProviderId;
break;

case AppointmentEstimated estimated:
snapshot!.Status = AppointmentStatus.Scheduled;
snapshot.EstimatedTime = estimated.Time;
break;

case AppointmentStarted:
snapshot!.Status = AppointmentStatus.Started;
snapshot.Started = e.Timestamp;
break;

case AppointmentCompleted:
snapshot!.Status = AppointmentStatus.Completed;
snapshot.Completed = e.Timestamp;
break;

case AppointmentCancelled:
return null;
}

return snapshot;
}
}
67 changes: 67 additions & 0 deletions src/Marten.ScaleTesting/Domain/Boards.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Lifted from src/DaemonTests/TeleHealth/Boards.cs (#4666 Phase A).
using JasperFx.Core;

namespace Marten.ScaleTesting.Domain;

internal interface BoardStateEvent;

public record BoardOpened(string Name, DateOnly Date, DateTimeOffset Opened, string[] StateCodes, string[] SpecialtyCodes): BoardStateEvent;
public record BoardFinished(DateTimeOffset Timestamp): BoardStateEvent;
public record BoardClosed(DateTimeOffset Timestamp, string Reason): BoardStateEvent;
public record ShiftAdded(Guid ShiftId);
public record AlertRaised(string AlertCode);
public record AlertCleared(string AlertCode);
public record ShiftDropped(Guid ShiftId);

public class Board
{
public Board()
{
}

public Board(BoardOpened opened)
{
Name = opened.Name;
Activated = opened.Opened;
Date = opened.Date;

SpecialtyCodes = opened.SpecialtyCodes;
StateCodes = opened.StateCodes;
}

public void Apply(BoardFinished finished) => Finished = finished.Timestamp;

public void Apply(BoardClosed closed)
{
Closed = closed.Timestamp;
CloseReason = closed.Reason;
}

public void Apply(ShiftAdded added) => ActiveShifts.Fill(added.ShiftId);

public void Apply(ShiftDropped dropped) => ActiveShifts.Remove(dropped.ShiftId);

public void Apply(AlertRaised alert)
{
AlertCodes = AlertCodes.Concat([alert.AlertCode]).Distinct().ToArray();
}

public void Apply(AlertCleared cleared)
{
AlertCodes = AlertCodes.Where(x => x != cleared.AlertCode).ToArray();
}

public Guid Id { get; set; }
public string Name { get; } = string.Empty;
public DateTimeOffset Activated { get; set; }
public DateTimeOffset? Finished { get; set; }
public DateOnly Date { get; set; }
public DateTimeOffset? Closed { get; set; }

public string[] AlertCodes { get; set; } = [];
public string[] StateCodes { get; set; } = [];
public string[] SpecialtyCodes { get; set; } = [];

public string CloseReason { get; private set; } = string.Empty;
public List<Guid> ActiveShifts { get; set; } = new();
}
91 changes: 91 additions & 0 deletions src/Marten.ScaleTesting/Domain/ProviderShift.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Lifted from src/DaemonTests/TeleHealth/ProviderShift.cs (#4666 Phase A).
using JasperFx.Events;
using JasperFx.Events.Grouping;
using Marten.Events.Aggregation;

namespace Marten.ScaleTesting.Domain;

public class ProviderShift(Guid boardId, Provider provider)
{
public Guid Id { get; set; }
public long Version { get; set; }
public Guid BoardId { get; private set; } = boardId;
public Guid ProviderId => Provider.Id;
public ProviderStatus Status { get; set; } = ProviderStatus.Paused;
public string Name { get; init; } = string.Empty;
public Guid? AppointmentId { get; set; }

public Provider Provider { get; set; } = provider;
}

public enum ProviderStatus
{
Ready,
Assigned,
Charting,
Paused
}

public record ProviderScheduled(Guid ProviderId, DateTimeOffset ExpectedStart);
public record AppointmentAssigned(Guid AppointmentId);
public record ProviderJoined(Guid BoardId, Guid ProviderId);
public record EnhancedProviderJoined(Guid BoardId, Provider Provider);
public record ProviderReady;
public record ProviderPaused;
public record ProviderSignedOff;
public record ChartingFinished;
public record ChartingStarted;

public partial class ProviderShiftProjection: SingleStreamProjection<ProviderShift, Guid>
{
public ProviderShiftProjection()
{
Options.CacheLimitPerTenant = 1000;
}

public override async Task EnrichEventsAsync(SliceGroup<ProviderShift, Guid> group, IQuerySession querySession, CancellationToken cancellation)
{
await group
.EnrichWith<Provider>()
.ForEvent<ProviderJoined>()
.ForEntityId(x => x.ProviderId)
.EnrichAsync((slice, e, provider) =>
{
slice.ReplaceEvent(e, new EnhancedProviderJoined(e.Data.BoardId, provider));
});
}

public override ProviderShift? Evolve(ProviderShift? snapshot, Guid id, IEvent e)
{
switch (e.Data)
{
case EnhancedProviderJoined joined:
snapshot = new ProviderShift(joined.BoardId, joined.Provider)
{
Provider = joined.Provider,
Status = ProviderStatus.Ready
};
break;

case ProviderReady:
snapshot!.Status = ProviderStatus.Ready;
break;

case AppointmentAssigned assigned:
snapshot!.Status = ProviderStatus.Assigned;
snapshot.AppointmentId = assigned.AppointmentId;
break;

case ProviderPaused:
snapshot!.Status = ProviderStatus.Paused;
snapshot.AppointmentId = null;
break;

case ChartingStarted:
snapshot!.Status = ProviderStatus.Charting;
break;
}

return snapshot;
}
}
Loading
Loading