diff --git a/Directory.Packages.props b/Directory.Packages.props index d35ce2ad62..5394958e90 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -45,6 +45,7 @@ + diff --git a/src/Marten.ScaleTesting/Commands/SeedCommand.cs b/src/Marten.ScaleTesting/Commands/SeedCommand.cs new file mode 100644 index 0000000000..0041e62501 --- /dev/null +++ b/src/Marten.ScaleTesting/Commands/SeedCommand.cs @@ -0,0 +1,47 @@ +using JasperFx; +using JasperFx.CommandLine; +using Marten.ScaleTesting.Seeding; +using Spectre.Console; + +namespace Marten.ScaleTesting.Commands; + +[Description("Seed N tenants × M events each under conjoined multi-tenancy. Idempotent — rerun is a no-op if the target counts are already met.")] +public sealed class SeedCommand: JasperFxAsyncCommand +{ + public override async Task Execute(SeedInput input) + { + using var host = input.BuildHost(); + var store = host.DocumentStore(); + + if (input.WipeFlag) + { + AnsiConsole.MarkupLine("[red]--wipe specified — destroying all data before seeding.[/]"); + await store.Advanced.Clean.CompletelyRemoveAllAsync().ConfigureAwait(false); + } + + await store.Storage.ApplyAllConfiguredChangesToDatabaseAsync().ConfigureAwait(false); + + var options = input.ToSeedOptions(); + var seeder = new EventSeeder(store, options); + var report = await seeder.RunAsync(CancellationToken.None).ConfigureAwait(false); + + if (report.AlreadySeeded) + { + AnsiConsole.MarkupLine("[yellow]Nothing to do — the target tenants already have enough events. Pass --wipe to start over.[/]"); + return true; + } + + AnsiConsole.MarkupLine($"[green]Seed complete.[/]"); + var table = new Table().AddColumn("Metric").AddColumn(new TableColumn("Value").RightAligned()); + table.AddRow("Batches written", report.Batches.ToString("N0")); + table.AddRow("Events written", report.Events.ToString("N0")); + table.AddRow("Elapsed", $"{report.Elapsed.TotalSeconds:N1}s"); + table.AddRow("Throughput (events/sec)", (report.Events / Math.Max(0.001, report.Elapsed.TotalSeconds)).ToString("N0")); + AnsiConsole.Write(table); + + var stats = await store.Advanced.FetchEventStoreStatistics().ConfigureAwait(false); + AnsiConsole.MarkupLine($"[blue]Event store now holds {stats.EventCount:N0} events across {stats.StreamCount:N0} streams.[/]"); + + return true; + } +} diff --git a/src/Marten.ScaleTesting/Commands/SeedInput.cs b/src/Marten.ScaleTesting/Commands/SeedInput.cs new file mode 100644 index 0000000000..cad3a51af6 --- /dev/null +++ b/src/Marten.ScaleTesting/Commands/SeedInput.cs @@ -0,0 +1,33 @@ +using JasperFx; +using JasperFx.CommandLine; +using Marten.ScaleTesting.Seeding; + +namespace Marten.ScaleTesting.Commands; + +public sealed class SeedInput: NetCoreInput +{ + [Description("Number of tenants to seed under conjoined multi-tenancy. Default: 50.")] + public int TenantsFlag { get; set; } = 50; + + [Description("Events per tenant. Default: 400,000 (×50 tenants ≈ 20M events).")] + public int EventsPerTenantFlag { get; set; } = 400_000; + + [Description("Number of hash partition buckets for the conjoined tenancy. Default: 8.")] + public int BucketsFlag { get; set; } = 8; + + [Description("Parallel writer task count. Default: 8.")] + public int WritersFlag { get; set; } = 8; + + [Description("Root seed for deterministic stream generation. Default: 42.")] + public int SeedFlag { get; set; } = 42; + + [Description("Wipe the event store schema before seeding. Default: false (idempotent rerun).")] + public bool WipeFlag { get; set; } + + public SeedOptions ToSeedOptions() => new( + TenantCount: TenantsFlag, + EventsPerTenant: EventsPerTenantFlag, + HashBuckets: BucketsFlag, + WriterTasks: WritersFlag, + Seed: SeedFlag); +} diff --git a/src/Marten.ScaleTesting/ConnectionSource.cs b/src/Marten.ScaleTesting/ConnectionSource.cs new file mode 100644 index 0000000000..6baf492bbe --- /dev/null +++ b/src/Marten.ScaleTesting/ConnectionSource.cs @@ -0,0 +1,16 @@ +namespace Marten.ScaleTesting; + +/// +/// Mirrors DaemonTests.TeleHealth.ConnectionSource but copy-pasted so the +/// dev-tool stays self-contained (no ProjectReference back into +/// DaemonTests). Override at runtime via the marten_testing_database +/// environment variable, same convention as the test harness. +/// +internal static class ConnectionSource +{ + public const string DefaultConnectionString = + "Host=localhost;Port=5432;Database=marten_testing;Username=postgres;password=postgres"; + + public static string ConnectionString => + Environment.GetEnvironmentVariable("marten_testing_database") ?? DefaultConnectionString; +} diff --git a/src/Marten.ScaleTesting/Domain/Appointments.cs b/src/Marten.ScaleTesting/Domain/Appointments.cs new file mode 100644 index 0000000000..128b021e08 --- /dev/null +++ b/src/Marten.ScaleTesting/Domain/Appointments.cs @@ -0,0 +1,93 @@ +// Lifted from src/DaemonTests/TeleHealth/Appointments.cs (#4666 Phase A). +// Copy-paste, not ProjectReference — the ScaleTesting harness owns its domain +// fork so we can extend without disturbing the DaemonTests fixtures. +using JasperFx.Events; +using Marten.Events.Aggregation; + +namespace Marten.ScaleTesting.Domain; + +public record AppointmentRequested(Guid PatientId, string StateCode, string SpecialtyCode); +public record AppointmentRouted(Guid BoardId, string ReasonCode); +public record AppointmentExternalIdentifierAssigned(Guid AppointmentId, Guid ExternalId); +public record ProviderAssigned(Guid ProviderId); +public record AppointmentStarted; +public record AppointmentCompleted; +public record AppointmentEstimated(DateTimeOffset Time); +public record AppointmentCancelled; + +public enum AppointmentStatus +{ + Requested, + Scheduled, + Started, + Completed +} + +public class Appointment +{ + public Guid Id { get; set; } + public long Version { get; set; } + public DateTimeOffset Created { get; set; } + public string SpecialtyCode { get; set; } = string.Empty; + public Licensing? Requirement { get; set; } + public AppointmentStatus Status { get; set; } + public Guid? ProviderId { get; set; } + public DateTimeOffset? EstimatedTime { get; set; } + public Guid? BoardId { get; set; } + public Guid PatientId { get; set; } + public DateTimeOffset? Started { get; set; } + public DateTimeOffset? Completed { get; set; } +} + +public partial class AppointmentProjection: SingleStreamProjection +{ + public AppointmentProjection() + { + Options.CacheLimitPerTenant = 1000; + } + + public override Appointment? Evolve(Appointment? snapshot, Guid id, IEvent e) + { + switch (e.Data) + { + case AppointmentRequested requested: + snapshot = new Appointment + { + Status = AppointmentStatus.Requested, + Requirement = new Licensing(requested.SpecialtyCode, requested.StateCode), + PatientId = requested.PatientId, + Created = e.Timestamp, + SpecialtyCode = requested.SpecialtyCode + }; + break; + + case AppointmentRouted routed: + snapshot!.BoardId = routed.BoardId; + break; + + case ProviderAssigned assigned: + snapshot!.ProviderId = assigned.ProviderId; + break; + + case AppointmentEstimated estimated: + snapshot!.Status = AppointmentStatus.Scheduled; + snapshot.EstimatedTime = estimated.Time; + break; + + case AppointmentStarted: + snapshot!.Status = AppointmentStatus.Started; + snapshot.Started = e.Timestamp; + break; + + case AppointmentCompleted: + snapshot!.Status = AppointmentStatus.Completed; + snapshot.Completed = e.Timestamp; + break; + + case AppointmentCancelled: + return null; + } + + return snapshot; + } +} diff --git a/src/Marten.ScaleTesting/Domain/Boards.cs b/src/Marten.ScaleTesting/Domain/Boards.cs new file mode 100644 index 0000000000..864a040531 --- /dev/null +++ b/src/Marten.ScaleTesting/Domain/Boards.cs @@ -0,0 +1,67 @@ +// Lifted from src/DaemonTests/TeleHealth/Boards.cs (#4666 Phase A). +using JasperFx.Core; + +namespace Marten.ScaleTesting.Domain; + +internal interface BoardStateEvent; + +public record BoardOpened(string Name, DateOnly Date, DateTimeOffset Opened, string[] StateCodes, string[] SpecialtyCodes): BoardStateEvent; +public record BoardFinished(DateTimeOffset Timestamp): BoardStateEvent; +public record BoardClosed(DateTimeOffset Timestamp, string Reason): BoardStateEvent; +public record ShiftAdded(Guid ShiftId); +public record AlertRaised(string AlertCode); +public record AlertCleared(string AlertCode); +public record ShiftDropped(Guid ShiftId); + +public class Board +{ + public Board() + { + } + + public Board(BoardOpened opened) + { + Name = opened.Name; + Activated = opened.Opened; + Date = opened.Date; + + SpecialtyCodes = opened.SpecialtyCodes; + StateCodes = opened.StateCodes; + } + + public void Apply(BoardFinished finished) => Finished = finished.Timestamp; + + public void Apply(BoardClosed closed) + { + Closed = closed.Timestamp; + CloseReason = closed.Reason; + } + + public void Apply(ShiftAdded added) => ActiveShifts.Fill(added.ShiftId); + + public void Apply(ShiftDropped dropped) => ActiveShifts.Remove(dropped.ShiftId); + + public void Apply(AlertRaised alert) + { + AlertCodes = AlertCodes.Concat([alert.AlertCode]).Distinct().ToArray(); + } + + public void Apply(AlertCleared cleared) + { + AlertCodes = AlertCodes.Where(x => x != cleared.AlertCode).ToArray(); + } + + public Guid Id { get; set; } + public string Name { get; } = string.Empty; + public DateTimeOffset Activated { get; set; } + public DateTimeOffset? Finished { get; set; } + public DateOnly Date { get; set; } + public DateTimeOffset? Closed { get; set; } + + public string[] AlertCodes { get; set; } = []; + public string[] StateCodes { get; set; } = []; + public string[] SpecialtyCodes { get; set; } = []; + + public string CloseReason { get; private set; } = string.Empty; + public List ActiveShifts { get; set; } = new(); +} diff --git a/src/Marten.ScaleTesting/Domain/ProviderShift.cs b/src/Marten.ScaleTesting/Domain/ProviderShift.cs new file mode 100644 index 0000000000..0559b1fb49 --- /dev/null +++ b/src/Marten.ScaleTesting/Domain/ProviderShift.cs @@ -0,0 +1,91 @@ +// Lifted from src/DaemonTests/TeleHealth/ProviderShift.cs (#4666 Phase A). +using JasperFx.Events; +using JasperFx.Events.Grouping; +using Marten.Events.Aggregation; + +namespace Marten.ScaleTesting.Domain; + +public class ProviderShift(Guid boardId, Provider provider) +{ + public Guid Id { get; set; } + public long Version { get; set; } + public Guid BoardId { get; private set; } = boardId; + public Guid ProviderId => Provider.Id; + public ProviderStatus Status { get; set; } = ProviderStatus.Paused; + public string Name { get; init; } = string.Empty; + public Guid? AppointmentId { get; set; } + + public Provider Provider { get; set; } = provider; +} + +public enum ProviderStatus +{ + Ready, + Assigned, + Charting, + Paused +} + +public record ProviderScheduled(Guid ProviderId, DateTimeOffset ExpectedStart); +public record AppointmentAssigned(Guid AppointmentId); +public record ProviderJoined(Guid BoardId, Guid ProviderId); +public record EnhancedProviderJoined(Guid BoardId, Provider Provider); +public record ProviderReady; +public record ProviderPaused; +public record ProviderSignedOff; +public record ChartingFinished; +public record ChartingStarted; + +public partial class ProviderShiftProjection: SingleStreamProjection +{ + public ProviderShiftProjection() + { + Options.CacheLimitPerTenant = 1000; + } + + public override async Task EnrichEventsAsync(SliceGroup group, IQuerySession querySession, CancellationToken cancellation) + { + await group + .EnrichWith() + .ForEvent() + .ForEntityId(x => x.ProviderId) + .EnrichAsync((slice, e, provider) => + { + slice.ReplaceEvent(e, new EnhancedProviderJoined(e.Data.BoardId, provider)); + }); + } + + public override ProviderShift? Evolve(ProviderShift? snapshot, Guid id, IEvent e) + { + switch (e.Data) + { + case EnhancedProviderJoined joined: + snapshot = new ProviderShift(joined.BoardId, joined.Provider) + { + Provider = joined.Provider, + Status = ProviderStatus.Ready + }; + break; + + case ProviderReady: + snapshot!.Status = ProviderStatus.Ready; + break; + + case AppointmentAssigned assigned: + snapshot!.Status = ProviderStatus.Assigned; + snapshot.AppointmentId = assigned.AppointmentId; + break; + + case ProviderPaused: + snapshot!.Status = ProviderStatus.Paused; + snapshot.AppointmentId = null; + break; + + case ChartingStarted: + snapshot!.Status = ProviderStatus.Charting; + break; + } + + return snapshot; + } +} diff --git a/src/Marten.ScaleTesting/Domain/ReferenceData.cs b/src/Marten.ScaleTesting/Domain/ReferenceData.cs new file mode 100644 index 0000000000..d87f4cbe51 --- /dev/null +++ b/src/Marten.ScaleTesting/Domain/ReferenceData.cs @@ -0,0 +1,46 @@ +// Lifted from src/DaemonTests/TeleHealth/{Patient,Providers,RoutingReason,Specialty}.cs (#4666 Phase A). +using Marten.Schema; + +namespace Marten.ScaleTesting.Domain; + +public class Patient +{ + public Guid Id { get; set; } + public string FirstName { get; set; } = string.Empty; + public string LastName { get; set; } = string.Empty; +} + +public record Licensing(string SpecialtyCode, string StateCode); + +public enum ProviderRole +{ + Physician, + PhysicianAssistant, + Nurse +} + +public class Provider +{ + public Guid Id { get; set; } + public string FirstName { get; set; } = string.Empty; + public string LastName { get; set; } = string.Empty; + public ProviderRole Role { get; set; } + + public List Licensing { get; set; } = []; +} + +public class RoutingReason +{ + public Guid Id { get; set; } + public string Code { get; set; } = string.Empty; + public string Description { get; set; } = string.Empty; + public bool IsActive { get; set; } + public int Severity { get; set; } +} + +public class Specialty +{ + [Identity] + public string Code { get; set; } = string.Empty; + public string Description { get; set; } = string.Empty; +} diff --git a/src/Marten.ScaleTesting/Marten.ScaleTesting.csproj b/src/Marten.ScaleTesting/Marten.ScaleTesting.csproj new file mode 100644 index 0000000000..924579fa26 --- /dev/null +++ b/src/Marten.ScaleTesting/Marten.ScaleTesting.csproj @@ -0,0 +1,27 @@ + + + + Exe + net10.0 + enable + enable + Marten.ScaleTesting + + false + + false + + + + + + + + + + + + + diff --git a/src/Marten.ScaleTesting/Program.cs b/src/Marten.ScaleTesting/Program.cs new file mode 100644 index 0000000000..6fb483f844 --- /dev/null +++ b/src/Marten.ScaleTesting/Program.cs @@ -0,0 +1,51 @@ +using JasperFx; +using JasperFx.Events; +using Marten; +using Marten.Events; +using Marten.ScaleTesting; +using Marten.Storage; +using Microsoft.Extensions.Hosting; + +// #4666 Phase A — host bootstrap. Mirrors the conjoined-multi-tenancy config +// from src/DaemonTests/Composites/multi_stage_projections.cs:246-254 but +// keeps the bucket count + tenancy style parametrisable per subcommand. +// +// Projections are registered at host build time so the Async daemon (Phase B) +// has them on hand; Phase A doesn't start the daemon, only seeds events +// against the same schema shape the rebuild will use. + +var builder = Host.CreateDefaultBuilder(args); +builder.ConfigureServices(services => +{ + services.AddMarten(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DisableNpgsqlLogging = true; + + // Quick append is the realistic shape for the high-throughput seed. + opts.Events.AppendMode = EventAppendMode.Quick; + + // Conjoined tenancy with hash partitioning — bucket count is fixed + // at 8 here so the schema is stable across seed/rebuild runs. The + // CLI seed subcommand still threads its own bucket value but the + // host-level config wins for partition DDL. + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Policies.AllDocumentsAreMultiTenantedWithPartitioning(x => + { + x.ByHash(Enumerable.Range(1, 8).Select(i => $"b_{i}").ToArray()); + }); + opts.Advanced.DefaultTenantUsageEnabled = false; + + // Phase A intentionally does NOT register the snapshot projections — + // the seeder only writes raw events. Registering Snapshot would + // require the JasperFx.Events.SourceGenerator to emit the dispatcher + // for each aggregate's partial class, which is fine when we wire up + // the composite projection in Phase B but is dead weight for a + // pure-seeding run. Without registration, StartStream on the + // seeder just tags the stream with the aggregate type name; that + // doesn't trigger any projection machinery and Phase B's rebuild + // builds the snapshots from the seeded events. + }); +}); + +return await builder.RunJasperFxCommands(args).ConfigureAwait(false); diff --git a/src/Marten.ScaleTesting/README.md b/src/Marten.ScaleTesting/README.md new file mode 100644 index 0000000000..0f693360df --- /dev/null +++ b/src/Marten.ScaleTesting/README.md @@ -0,0 +1,49 @@ +# Marten.ScaleTesting + +Long-running load test harness for Marten 9's async daemon projection rebuilds +([#4666](https://github.com/JasperFx/marten/issues/4666)). Interactive / dev-box +only — **not** packaged, **not** wired into CI. + +Drives the daemon-thread-safety synthesis fixes (#4657 → #4658 → #4667 phases) +against realistic conjoined-multi-tenant event interleaving at the 20M+ event +scale, beyond what unit-test fixtures cover. + +## Usage + +```bash +# Seed N tenants × M events per tenant under conjoined multi-tenancy +dotnet run --project src/Marten.ScaleTesting -- seed --tenants 50 --events-per-tenant 400000 --buckets 8 --seed 42 + +# Rebuild the TeleHealth composite projection against the seeded data +# (Phase B — not yet implemented) +dotnet run --project src/Marten.ScaleTesting -- rebuild --projection composite --report metrics.json + +# Validate aggregates against a single-shard baseline (Phase C — not yet implemented) +dotnet run --project src/Marten.ScaleTesting -- validate --baseline baseline.json +``` + +Connect string defaults to the standard Marten test connection +(`Host=localhost;Port=5432;Database=marten_testing;Username=postgres;password=postgres`). +Override with the `marten_testing_database` env var. + +## Project layout + +| Directory | Contents | +|---|---| +| `Domain/` | TeleHealth events / aggregates / reference data **lifted** (copied) from `src/DaemonTests/TeleHealth/`. Self-contained so we can extend without touching test fixtures. | +| `Seeding/` | Producer-consumer event seeder: per-stream `IEnumerable` generators, weighted-random k-way merge, `Channel` pipeline. | +| `Commands/` | JasperFx.CommandLine subcommands. Modelled on `src/EventAppenderPerfTester/`. | + +## Phases + +* **Phase A (this PR):** project scaffold + lifted Telehealth domain + event seeder + `seed` subcommand. Idempotent via `mt_events` row count check. +* **Phase B:** 4+2+2 composite topology (stage 1: single-stream snapshots + `AppointmentMetricsProjection`; stage 2: `AppointmentDetailsProjection` + `BoardSummaryProjection`; stage 3: NEW `ProviderUtilizationProjection` + `TenantDailyRollupProjection`) + `rebuild` subcommand on the single-pass `CompositeReplayExecutor` path. +* **Phase C:** `validate` (single-shard baseline diff) + `stress` (chain `seed` + `rebuild` + `validate`) + JSON metrics sink. +* **Phase D:** use it. Drive the daemon-thread-safety fixes against the harness — each fix should hold the crash gate AND not regress rebuild time. + +## Non-goals + +* Not a microbenchmark — `src/MartenBenchmarks/` covers per-method timings. +* Not a NuGet package — internal tool only. +* Not wired into CI. +* Not sharded-PG or distributed. diff --git a/src/Marten.ScaleTesting/Seeding/EventBatch.cs b/src/Marten.ScaleTesting/Seeding/EventBatch.cs new file mode 100644 index 0000000000..817c4a0b7f --- /dev/null +++ b/src/Marten.ScaleTesting/Seeding/EventBatch.cs @@ -0,0 +1,26 @@ +namespace Marten.ScaleTesting.Seeding; + +/// +/// One stream's complete event sequence for a single tenant. Each batch is +/// the complete contents of a single StartStream<TAggregate> + +/// SaveChangesAsync call — no inter-batch coordination is required, +/// so the writer pool can fan out fully in parallel without colliding on the +/// per-stream version sequence. +/// +/// +/// Cross-stream interleaving at the mt_events table level still +/// happens via the producer's draw order: it picks the next stream to emit +/// using a weighted random across stream types per tenant, then round-robins +/// across tenants. Writers commit roughly in producer order, so the events +/// table ends up with the same interleaving shape a real workload exhibits. +/// +/// +/// Conjoined tenant id; never null. +/// Stream id; stable so a given stream id always maps to one StartStream call. +/// Aggregate the stream rolls up to (Appointment / Board / ProviderShift). +/// The complete event sequence for the stream, in append order. +internal sealed record EventBatch( + string TenantId, + Guid StreamId, + Type AggregateType, + IReadOnlyList Events); diff --git a/src/Marten.ScaleTesting/Seeding/EventInterleaver.cs b/src/Marten.ScaleTesting/Seeding/EventInterleaver.cs new file mode 100644 index 0000000000..a4bf36d41a --- /dev/null +++ b/src/Marten.ScaleTesting/Seeding/EventInterleaver.cs @@ -0,0 +1,100 @@ +using Marten.ScaleTesting.Domain; + +namespace Marten.ScaleTesting.Seeding; + +/// +/// Per-tenant batch interleaver. Holds the in-progress event sequence for +/// every stream that hasn't yet drained and emits one +/// at a time, drawn from a stream chosen at random with weights tuned for a +/// realistic Telehealth event mix. +/// +/// +/// The weights are blended Appointment-heavy because that's the production +/// reality (boards exist for the day, providers for the shift, but +/// appointments churn constantly). Tune from if +/// the harness ever needs a different stress shape. +/// +/// +internal sealed class EventInterleaver +{ + public sealed record StreamWeights(double Appointment, double Board, double ProviderShift) + { + public static readonly StreamWeights Default = new(Appointment: 70, Board: 5, ProviderShift: 25); + } + + private readonly string _tenantId; + private readonly Random _rng; + private readonly StreamWeights _weights; + private readonly List _appointments = []; + private readonly List _boards = []; + private readonly List _shifts = []; + + public EventInterleaver(string tenantId, Random rng, StreamWeights? weights = null) + { + _tenantId = tenantId; + _rng = rng; + _weights = weights ?? StreamWeights.Default; + } + + public void AddAppointmentStream(Guid streamId, List events) => add(_appointments, streamId, typeof(Appointment), events); + public void AddBoardStream(Guid streamId, List events) => add(_boards, streamId, typeof(Board), events); + public void AddProviderShiftStream(Guid streamId, List events) => add(_shifts, streamId, typeof(ProviderShift), events); + + private static void add(List bucket, Guid streamId, Type aggregateType, List events) + { + if (events.Count == 0) return; + bucket.Add(new PendingStream(streamId, aggregateType, events)); + } + + /// + /// Drains all queued streams as emissions, one + /// batch per stream (each batch is the complete stream). The weighted + /// random pick across stream types still produces cross-stream + /// interleaving at the events-table level once writers commit, without + /// any per-stream version-sequence races. + /// + public IEnumerable Drain() + { + while (_appointments.Count > 0 || _boards.Count > 0 || _shifts.Count > 0) + { + var bucket = pickBucket(); + if (bucket.Count == 0) continue; // weighted pick can land on an empty bucket; retry. + + var idx = _rng.Next(bucket.Count); + var stream = bucket[idx]; + bucket.RemoveAt(idx); + + yield return new EventBatch(_tenantId, stream.StreamId, stream.AggregateType, stream.Events); + } + } + + /// + /// Weighted pick across the three bucket types. Returns the bucket the + /// RNG selected; caller must handle the empty-bucket case (drained + /// streams get pruned out of band). + /// + private List pickBucket() + { + // Skip drained buckets so the weighted draw doesn't waste rolls + // late in seeding when one type runs out. + var total = 0.0; + if (_appointments.Count > 0) total += _weights.Appointment; + if (_boards.Count > 0) total += _weights.Board; + if (_shifts.Count > 0) total += _weights.ProviderShift; + + var roll = _rng.NextDouble() * total; + if (_appointments.Count > 0) + { + if (roll < _weights.Appointment) return _appointments; + roll -= _weights.Appointment; + } + if (_boards.Count > 0) + { + if (roll < _weights.Board) return _boards; + roll -= _weights.Board; + } + return _shifts; + } + + private sealed record PendingStream(Guid StreamId, Type AggregateType, List Events); +} diff --git a/src/Marten.ScaleTesting/Seeding/EventSeeder.cs b/src/Marten.ScaleTesting/Seeding/EventSeeder.cs new file mode 100644 index 0000000000..039cc08d58 --- /dev/null +++ b/src/Marten.ScaleTesting/Seeding/EventSeeder.cs @@ -0,0 +1,294 @@ +using System.Diagnostics; +using System.Threading.Channels; +using JasperFx; +using Marten.ScaleTesting.Domain; +using Spectre.Console; + +namespace Marten.ScaleTesting.Seeding; + +/// +/// Coordinates the full seed: reference data per tenant + interleaved event +/// batches → bounded → N parallel writer tasks each +/// owning a session-per-tenant + SaveChangesAsync per batch. +/// +/// +/// Idempotent. Before doing any work, queries mt_streams per tenant; +/// if the existing event count already meets the target the seeder exits +/// early. +/// +/// +internal sealed class EventSeeder +{ + private readonly IDocumentStore _store; + private readonly SeedOptions _options; + + public EventSeeder(IDocumentStore store, SeedOptions options) + { + _store = store; + _options = options; + } + + public async Task RunAsync(CancellationToken token) + { + var sw = Stopwatch.StartNew(); + + // Idempotency gate. If every tenant already has at least the target + // event count, exit. We don't try to be cute about "partial" seeding — + // either the tenant has enough events or it doesn't. + var existing = await ExistingEventCountsAsync(token).ConfigureAwait(false); + var tenantsNeedingWork = Enumerable.Range(0, _options.TenantCount) + .Where(i => existing.GetValueOrDefault(_options.TenantId(i)) < _options.EventsPerTenant) + .ToArray(); + + if (tenantsNeedingWork.Length == 0) + { + AnsiConsole.MarkupLine($"[yellow]Seed skipped — all {_options.TenantCount} tenants already have ≥ {_options.EventsPerTenant} events.[/]"); + return new SeedReport(0, 0, sw.Elapsed, AlreadySeeded: true); + } + + AnsiConsole.MarkupLine($"[blue]Seeding {tenantsNeedingWork.Length} tenant(s) × ~{_options.EventsPerTenant:N0} events each.[/]"); + AnsiConsole.MarkupLine($"[grey]Writers: {_options.WriterTasks} · batch buffer: {_options.BatchBufferCapacity} · seed: {_options.Seed}[/]"); + + // Reference data per tenant. Synchronous-sequential so the writer + // pool isn't pummeled with parallel schema-creation contention on + // the first tenant. + var refData = new Dictionary(tenantsNeedingWork.Length); + foreach (var tenantIdx in tenantsNeedingWork) + { + var tenantId = _options.TenantId(tenantIdx); + refData[tenantId] = await ReferenceDataSeeder.SeedAsync(_store, tenantId, tenantIdx, _options.Seed, token).ConfigureAwait(false); + } + + var channel = Channel.CreateBounded(new BoundedChannelOptions(_options.BatchBufferCapacity) + { + SingleReader = false, + SingleWriter = false, + FullMode = BoundedChannelFullMode.Wait + }); + + long batchesWritten = 0; + long eventsWritten = 0; + + // Consumers fan out so multiple sessions write in parallel — that's the + // realistic shape for a production async-daemon-backed app. + var consumers = new Task[_options.WriterTasks]; + for (var i = 0; i < _options.WriterTasks; i++) + { + consumers[i] = Task.Run(async () => + { + await foreach (var batch in channel.Reader.ReadAllAsync(token).ConfigureAwait(false)) + { + await WriteBatchAsync(batch, token).ConfigureAwait(false); + Interlocked.Increment(ref batchesWritten); + Interlocked.Add(ref eventsWritten, batch.Events.Count); + } + }, token); + } + + // Reporter ticker — every 5s, print throughput so a 15-min seed isn't + // silent. + using var reporterCts = CancellationTokenSource.CreateLinkedTokenSource(token); + var reporter = Task.Run(async () => + { + var lastEvents = 0L; + var ticker = TimeSpan.FromSeconds(5); + while (!reporterCts.IsCancellationRequested) + { + try + { + await Task.Delay(ticker, reporterCts.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return; + } + var snap = Interlocked.Read(ref eventsWritten); + var delta = snap - lastEvents; + lastEvents = snap; + AnsiConsole.MarkupLine($"[grey] ... {snap:N0} events written ({(delta / ticker.TotalSeconds):N0}/sec)[/]"); + } + }, reporterCts.Token); + + // Producer is single-threaded but processes tenants round-robin — + // keeps the channel fed from multiple tenants concurrently which the + // writers can then route to per-tenant sessions. + await ProduceAsync(tenantsNeedingWork, refData, channel.Writer, token).ConfigureAwait(false); + channel.Writer.Complete(); + + await Task.WhenAll(consumers).ConfigureAwait(false); + await reporterCts.CancelAsync().ConfigureAwait(false); + try { await reporter.ConfigureAwait(false); } catch (OperationCanceledException) { } + + sw.Stop(); + return new SeedReport(batchesWritten, eventsWritten, sw.Elapsed, AlreadySeeded: false); + } + + private async Task ProduceAsync(int[] tenantsToSeed, IReadOnlyDictionary refData, ChannelWriter writer, CancellationToken token) + { + // Generate the per-tenant interleavers upfront — cheap, in-memory. + // The actual draining is the heavy bit and pushes through the channel. + var interleavers = new List<(EventInterleaver Interleaver, IEnumerator Cursor)>(tenantsToSeed.Length); + var clock = DateTimeOffset.UtcNow.Date.AddYears(-1); // start a year ago + + foreach (var tenantIdx in tenantsToSeed) + { + var tenantId = _options.TenantId(tenantIdx); + var interleaver = BuildInterleaver(tenantId, tenantIdx, refData[tenantId], clock); + interleavers.Add((interleaver, interleaver.Drain().GetEnumerator())); + } + + // Round-robin across tenants — keeps the channel "mixed" rather than + // tenant-by-tenant, which more realistically exercises the daemon's + // tenant-keyed slice fan-out. + var alive = interleavers.Count; + var i = 0; + while (alive > 0 && !token.IsCancellationRequested) + { + var idx = i++ % interleavers.Count; + var (_, cursor) = interleavers[idx]; + if (cursor is null) continue; + if (cursor.MoveNext()) + { + await writer.WriteAsync(cursor.Current, token).ConfigureAwait(false); + } + else + { + cursor.Dispose(); + interleavers[idx] = (interleavers[idx].Interleaver, null!); + alive--; + } + } + } + + private EventInterleaver BuildInterleaver(string tenantId, int tenantIdx, TenantReferenceData refData, DateTimeOffset clock) + { + var rng = new Random(HashCode.Combine(_options.Seed, tenantIdx, "tenant")); + var interleaver = new EventInterleaver(tenantId, rng); + + // We size streams to roughly land at the per-tenant event target. The + // realistic mix is appointment-dominated; weights inside the + // interleaver bias the per-batch *order*, the per-stream counts here + // bias the per-stream *volume*. + // + // Average event-count-per-stream heuristics from StreamGenerators: + // Appointment ≈ 6.5 evts/stream + // Board ≈ 9 evts/stream + // ProviderShift ≈ 7 evts/stream + var (apptStreams, boardStreams, shiftStreams) = sizeStreams(_options.EventsPerTenant); + + var boardIds = new Guid[boardStreams]; + for (var b = 0; b < boardStreams; b++) + { + boardIds[b] = Guid.NewGuid(); + var boardRng = new Random(HashCode.Combine(_options.Seed, tenantIdx, "board", b)); + interleaver.AddBoardStream(boardIds[b], StreamGenerators.Board(boardRng, DateOnly.FromDateTime(clock.UtcDateTime.AddDays(b % 30)), clock.AddDays(b % 30))); + } + + for (var s = 0; s < shiftStreams; s++) + { + var shiftRng = new Random(HashCode.Combine(_options.Seed, tenantIdx, "shift", s)); + var boardId = boardIds[s % Math.Max(1, boardStreams)]; + var providerId = refData.Providers[s % refData.Providers.Length]; + interleaver.AddProviderShiftStream(Guid.NewGuid(), StreamGenerators.ProviderShift(shiftRng, boardId, providerId)); + } + + for (var a = 0; a < apptStreams; a++) + { + var apptRng = new Random(HashCode.Combine(_options.Seed, tenantIdx, "appt", a)); + var patient = refData.Patients[a % refData.Patients.Length]; + var board = boardIds[a % Math.Max(1, boardStreams)]; + var provider = refData.Providers[a % refData.Providers.Length]; + interleaver.AddAppointmentStream(Guid.NewGuid(), StreamGenerators.Appointment(apptRng, patient, board, provider, clock)); + } + + return interleaver; + } + + /// + /// Solves "how many streams of each kind do I need to roughly hit + /// ?" using the StreamGenerators averages. + /// 70 / 25 / 5 split for appointments / shifts / boards aligns with the + /// interleaver's default per-batch weights. + /// + private static (int Appointments, int Boards, int Shifts) sizeStreams(int eventsTarget) + { + const double apptAvg = 6.5; + const double boardAvg = 9.0; + const double shiftAvg = 7.0; + + var apptEvents = eventsTarget * 0.70; + var shiftEvents = eventsTarget * 0.25; + var boardEvents = eventsTarget * 0.05; + + return ( + Math.Max(1, (int)Math.Round(apptEvents / apptAvg)), + Math.Max(1, (int)Math.Round(boardEvents / boardAvg)), + Math.Max(1, (int)Math.Round(shiftEvents / shiftAvg)) + ); + } + + private async Task WriteBatchAsync(EventBatch batch, CancellationToken token) + { + // One stream per batch (interleaver enforces this), so always StartStream. + // StartStream tags the stream with the aggregate type name + // even though no projection registration is active in this Phase A + // bootstrap — Phase B's rebuild reads the type tag back when picking + // the right SingleStreamProjection. + await using var session = _store.LightweightSession(batch.TenantId); + startStream(session, batch); + await session.SaveChangesAsync(token).ConfigureAwait(false); + } + + private static void startStream(IDocumentSession session, EventBatch batch) + { + if (batch.AggregateType == typeof(Appointment)) + { + session.Events.StartStream(batch.StreamId, batch.Events); + } + else if (batch.AggregateType == typeof(Board)) + { + session.Events.StartStream(batch.StreamId, batch.Events); + } + else if (batch.AggregateType == typeof(ProviderShift)) + { + session.Events.StartStream(batch.StreamId, batch.Events); + } + else + { + throw new InvalidOperationException($"Unrecognised aggregate type {batch.AggregateType.Name} in seed batch."); + } + } + + private async Task> ExistingEventCountsAsync(CancellationToken token) + { + // Cross-tenant rollup, so we can't use a per-tenant QuerySession (the + // store has DefaultTenantUsageEnabled = false). Open a raw connection + // off the configured database instead. + var counts = new Dictionary(); + var database = await _store.Storage.FindOrCreateDatabase(StorageConstants.DefaultTenantId).ConfigureAwait(false); + await using var conn = database.CreateConnection(); + await conn.OpenAsync(token).ConfigureAwait(false); + try + { + await using var cmd = conn.CreateCommand(); + cmd.CommandText = $"select tenant_id, count(*) from {_store.Options.Events.DatabaseSchemaName}.mt_events group by tenant_id"; + await using var reader = await cmd.ExecuteReaderAsync(token).ConfigureAwait(false); + while (await reader.ReadAsync(token).ConfigureAwait(false)) + { + counts[reader.GetString(0)] = reader.GetInt64(1); + } + } + catch (Npgsql.PostgresException ex) when (ex.SqlState == "42P01") + { + // mt_events doesn't exist yet — first-ever seed. Treat as "no + // tenant has any events" so the caller proceeds. + } + finally + { + await conn.CloseAsync().ConfigureAwait(false); + } + return counts; + } +} + +internal sealed record SeedReport(long Batches, long Events, TimeSpan Elapsed, bool AlreadySeeded); diff --git a/src/Marten.ScaleTesting/Seeding/ReferenceDataSeeder.cs b/src/Marten.ScaleTesting/Seeding/ReferenceDataSeeder.cs new file mode 100644 index 0000000000..431bc75e28 --- /dev/null +++ b/src/Marten.ScaleTesting/Seeding/ReferenceDataSeeder.cs @@ -0,0 +1,104 @@ +using Marten.ScaleTesting.Domain; + +namespace Marten.ScaleTesting.Seeding; + +/// +/// Bulk-inserts the reference documents (Patient / Provider / RoutingReason / +/// Specialty) the composite projections enrich against. Idempotent — if the +/// expected counts are already present per tenant we skip the inserts. +/// +internal static class ReferenceDataSeeder +{ + private const int PatientsPerTenant = 200; + private const int ProvidersPerTenant = 50; + + private static readonly (string Code, string Description, int Severity)[] s_routingReasons = + [ + ("ROUTINE", "Routine appointment", 1), + ("URGENT", "Urgent care needed", 5), + ("FOLLOWUP", "Follow up from prior visit", 2), + ("CRITICAL", "Critical / emergency", 9) + ]; + + private static readonly (string Code, string Description)[] s_specialties = + [ + ("CARD", "Cardiology"), + ("DERM", "Dermatology"), + ("ENT", "Ear, Nose, Throat"), + ("GP", "General Practice"), + ("NEURO", "Neurology"), + ("ORTHO", "Orthopedics"), + ("PED", "Pediatrics"), + ("PSYCH", "Psychiatry") + ]; + + /// + /// Deterministic per-tenant reference data. Returns a snapshot of the + /// generated Patient + Provider ids so the stream seeder can pick from + /// them without re-querying. + /// + public static async Task SeedAsync( + IDocumentStore store, + string tenantId, + int tenantIndex, + int rootSeed, + CancellationToken token) + { + var rng = new Random(HashCode.Combine(rootSeed, tenantIndex)); + + // Specialty docs are tiny and global-ish, but conjoined tenancy means + // every tenant gets its own row. Idempotent by code (primary key). + await using (var session = store.LightweightSession(tenantId)) + { + foreach (var (code, desc) in s_specialties) + { + session.Store(new Specialty { Code = code, Description = desc }); + } + foreach (var (code, desc, sev) in s_routingReasons) + { + session.Store(new RoutingReason + { + Id = Guid.NewGuid(), + Code = code, + Description = desc, + Severity = sev, + IsActive = true + }); + } + await session.SaveChangesAsync(token).ConfigureAwait(false); + } + + var patients = new Guid[PatientsPerTenant]; + var providers = new Guid[ProvidersPerTenant]; + + await using (var session = store.LightweightSession(tenantId)) + { + for (var i = 0; i < PatientsPerTenant; i++) + { + patients[i] = Guid.NewGuid(); + session.Store(new Patient + { + Id = patients[i], + FirstName = $"P{tenantIndex:D4}-{i:D4}-F", + LastName = $"P{tenantIndex:D4}-{i:D4}-L" + }); + } + for (var i = 0; i < ProvidersPerTenant; i++) + { + providers[i] = Guid.NewGuid(); + session.Store(new Provider + { + Id = providers[i], + FirstName = $"Dr{tenantIndex:D4}-{i:D3}-F", + LastName = $"Dr{tenantIndex:D4}-{i:D3}-L", + Role = (ProviderRole)(rng.Next() % 3) + }); + } + await session.SaveChangesAsync(token).ConfigureAwait(false); + } + + return new TenantReferenceData(patients, providers); + } +} + +internal sealed record TenantReferenceData(Guid[] Patients, Guid[] Providers); diff --git a/src/Marten.ScaleTesting/Seeding/SeedOptions.cs b/src/Marten.ScaleTesting/Seeding/SeedOptions.cs new file mode 100644 index 0000000000..326bb5dc98 --- /dev/null +++ b/src/Marten.ScaleTesting/Seeding/SeedOptions.cs @@ -0,0 +1,24 @@ +namespace Marten.ScaleTesting.Seeding; + +/// +/// Tunable inputs for the event seeder. Defaults match the #4666 issue +/// targets (50 tenants × 400K events/tenant = 20M events under 8 hash +/// buckets). Override from the CLI for smaller dev-loop runs. +/// +public sealed record SeedOptions( + int TenantCount = 50, + int EventsPerTenant = 400_000, + int HashBuckets = 8, + int WriterTasks = 8, + int Seed = 42, + string TenantPrefix = "tenant_", + int BatchBufferCapacity = 1024) +{ + public int TotalEvents => TenantCount * EventsPerTenant; + + /// + /// Tenant id at a given index. Stable for a given + /// so reruns are idempotent. + /// + public string TenantId(int index) => $"{TenantPrefix}{index:D4}"; +} diff --git a/src/Marten.ScaleTesting/Seeding/StreamGenerators.cs b/src/Marten.ScaleTesting/Seeding/StreamGenerators.cs new file mode 100644 index 0000000000..da7e5d9288 --- /dev/null +++ b/src/Marten.ScaleTesting/Seeding/StreamGenerators.cs @@ -0,0 +1,142 @@ +using Marten.ScaleTesting.Domain; + +namespace Marten.ScaleTesting.Seeding; + +/// +/// Per-stream generators. Each returns the full ordered event sequence for one +/// stream of the given aggregate; the chops it +/// into emissions and weaves them with peers under +/// the same tenant. +/// +/// Deterministic for a fixed : callers pass a per-stream +/// RNG seeded from (rootSeed, tenantIndex, streamIndex) so reruns +/// produce byte-identical event payloads. +/// +internal static class StreamGenerators +{ + private static readonly string[] s_states = ["AZ", "CA", "FL", "GA", "IL", "MA", "NC", "NY", "OH", "PA", "TX", "WA"]; + private static readonly string[] s_specialties = ["CARD", "DERM", "ENT", "GP", "NEURO", "ORTHO", "PED", "PSYCH"]; + private static readonly string[] s_alertCodes = ["LOW_STAFF", "SURGE", "HIGH_WAIT", "ESCALATION"]; + private static readonly string[] s_closeReasons = ["EndOfShift", "PolicyClose", "EmergencyStop"]; + + /// + /// Generates the Appointment event stream — single happy-path with a small + /// cancellation tail. 4–8 events per stream, weighted heavily toward + /// scheduled+started+completed (the realistic mix). About 5% of streams + /// terminate early with a cancel. + /// + public static List Appointment(Random rng, Guid patientId, Guid boardId, Guid providerId, DateTimeOffset clock) + { + var state = s_states[rng.Next(s_states.Length)]; + var specialty = s_specialties[rng.Next(s_specialties.Length)]; + + var events = new List(8) + { + new AppointmentRequested(patientId, state, specialty), + new AppointmentRouted(boardId, "ROUTINE") + }; + + // 5% early cancel — exercise the SingleStreamProjection delete path. + if (rng.NextDouble() < 0.05) + { + events.Add(new AppointmentCancelled()); + return events; + } + + events.Add(new ProviderAssigned(providerId)); + events.Add(new AppointmentEstimated(clock.AddMinutes(rng.Next(5, 120)))); + events.Add(new AppointmentStarted()); + events.Add(new AppointmentCompleted()); + + // 30% of completed appointments also emit the external identifier event — + // exercises the AppointmentByExternalIdentifierProjection's Identity<> + // routing. + if (rng.NextDouble() < 0.3) + { + events.Add(new AppointmentExternalIdentifierAssigned(Guid.NewGuid(), Guid.NewGuid())); + } + + return events; + } + + /// + /// Generates the Board event stream — open → some shifts join / drop with + /// alerts raised in between → finish → close. 6–14 events typical. + /// + public static List Board(Random rng, DateOnly date, DateTimeOffset clock) + { + var stateCount = rng.Next(1, 4); + var states = pickN(rng, s_states, stateCount); + var specCount = rng.Next(1, 3); + var specs = pickN(rng, s_specialties, specCount); + + var events = new List(14) + { + new BoardOpened($"Board-{Guid.NewGuid():N}".Substring(0, 16), date, clock, states, specs) + }; + + // 1–4 shift adds, optionally an alert raise/clear pair, then finish+close. + var shiftAdds = rng.Next(1, 5); + for (var i = 0; i < shiftAdds; i++) + { + events.Add(new ShiftAdded(Guid.NewGuid())); + } + + if (rng.NextDouble() < 0.4) + { + var alert = s_alertCodes[rng.Next(s_alertCodes.Length)]; + events.Add(new AlertRaised(alert)); + events.Add(new AlertCleared(alert)); + } + + // 70% finish, 100% close (closed boards are the steady-state). + if (rng.NextDouble() < 0.7) + { + events.Add(new BoardFinished(clock.AddHours(rng.Next(1, 8)))); + } + events.Add(new BoardClosed(clock.AddHours(rng.Next(8, 12)), s_closeReasons[rng.Next(s_closeReasons.Length)])); + + return events; + } + + /// + /// Generates the ProviderShift event stream — join board → ready/assigned + /// oscillation → charting → paused → sign-off. 4–10 events. + /// + public static List ProviderShift(Random rng, Guid boardId, Guid providerId) + { + var events = new List(10) + { + new ProviderJoined(boardId, providerId), + new ProviderReady() + }; + + // 1–3 appointment cycles per shift. + var cycles = rng.Next(1, 4); + for (var i = 0; i < cycles; i++) + { + events.Add(new AppointmentAssigned(Guid.NewGuid())); + events.Add(new ChartingStarted()); + events.Add(new ChartingFinished()); + events.Add(new ProviderReady()); + } + + events.Add(new ProviderPaused()); + events.Add(new ProviderSignedOff()); + return events; + } + + private static string[] pickN(Random rng, string[] source, int n) + { + if (n >= source.Length) return source.ToArray(); + var pool = source.ToList(); + var picked = new string[n]; + for (var i = 0; i < n; i++) + { + var idx = rng.Next(pool.Count); + picked[i] = pool[idx]; + pool.RemoveAt(idx); + } + return picked; + } +} diff --git a/src/Marten.slnx b/src/Marten.slnx index 797ec0519d..9c2bfdc563 100644 --- a/src/Marten.slnx +++ b/src/Marten.slnx @@ -82,6 +82,7 @@ +