Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ public TripDistanceProjection()
public void Apply(TripDistance agg, TripLeg @event) => agg.Distance += @event.Distance;
}

[Fact(Skip = "#4679: JasperFx-side regression in catchUpPerTenantAsync. " +
"This minimal single-DB Conjoined + UseTenantPartitionedEvents " +
"reproduction does NOT trigger the 23505 in isolation -- the user's " +
"MultiTenantedWithShardedDatabases setup appears load-bearing. " +
"Pinned here as a permanent regression fixture; once the upstream fix lands + " +
"a tighter repro emerges, replace this Skip with an [Fact] and update the body.")]
// #4679 fixed in JasperFx.Events 2.9.0 (jasperfx#419 — composite member stages now compose
// ShardName with the parent's tenant id instead of a bare store-global name). Unskipped: this
// single-DB SingleStream repro never triggered the 23505 on its own (the real trigger was
// composite projections, covered by Sharded/Bug_4679_sharded_catch_up_23505), but it stays as
// a per-tenant catch-up advance + no-23505 guard.
[Fact]
public async Task force_catch_up_across_multiple_tenants_does_not_throw_23505()
{
using var store = DocumentStore.For(opts =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ namespace TenantPartitionedEventsTests.Sharded;
/// (A MultiStream <c>RollUpByTenant :All</c> projection added here instead surfaced a separate
/// <c>23514</c> doc-table partition-routing error — adjacent to #4648, not this issue.)
/// </para>
///
/// <para>
/// <b>Composite projections (the explicit hypothesis that composite support doesn't honor the
/// per-tenant ShardName) — also does NOT reproduce.</b> A multi-stage
/// <c>CompositeProjectionFor</c> caught up per-tenant writes correctly tenant-scoped rows for both
/// the composite's own shard (<c>Bug4679Composite:All:tA</c>) AND every member stage
/// (<c>Bug4679ShardedTrip:All:tA</c>, …) — see <see cref="composite_force_catch_up_with_multiple_tenants_on_one_shard"/>.
/// The composite catch-up honors <c>ShardName.ForTenant</c>; the store-global collision still
/// requires some other config detail.
/// </para>
/// </summary>
[Collection("sharded-tenant-partitioned")]
public partial class Bug_4679_sharded_catch_up_23505: IAsyncLifetime
Expand Down Expand Up @@ -208,6 +218,96 @@ public async Task force_catch_up_with_multiple_tenants_on_one_shard()
names.ShouldNotContain("Bug4679ShardedCount:All");
}

private void configureComposite(StoreOptions opts)
{
opts.MultiTenantedWithShardedDatabases(x =>
{
x.ConnectionString = ConnectionSource.ConnectionString;
x.SchemaName = "sharded";
x.PartitionSchemaName = "tenants";

foreach (var (dbName, connStr) in _fixture.ConnectionStrings)
{
x.AddDatabase(dbName, connStr);
}
});

opts.AutoCreateSchemaObjects = AutoCreate.All;
opts.Events.TenancyStyle = TenancyStyle.Conjoined;
opts.Events.AppendMode = EventAppendMode.QuickWithServerTimestamps;
opts.Events.UseTenantPartitionedEvents = true;
opts.Events.AddEventType<Bug4679TripLeg>();

// A multi-stage COMPOSITE projection — the path the user flagged. Members live in
// different stages; the composite owns a single store-global shard (Bug4679Composite:All).
opts.Projections.CompositeProjectionFor("Bug4679Composite", c =>
{
c.Add<Bug4679TripProjection>(stageNumber: 1);
c.Add<Bug4679CountProjection>(stageNumber: 2);
});

opts.Schema.For<Bug4679Trip>().DocumentAlias("b4679_trip");
opts.Schema.For<Bug4679Count>().DocumentAlias("b4679_cnt");
}

[Fact]
public async Task composite_force_catch_up_with_multiple_tenants_on_one_shard()
{
var tenants = new[] { "tA", "tB", "tC", "tD", "tE" };
var shardAssignment = new Dictionary<string, string>
{
["tA"] = _fixture.DbNames[0],
["tB"] = _fixture.DbNames[0],
["tC"] = _fixture.DbNames[0],
["tD"] = _fixture.DbNames[1],
["tE"] = _fixture.DbNames[2],
};

await using var store = (DocumentStore)DocumentStore.For(configureComposite);
foreach (var tenant in tenants)
{
await store.Advanced.AddTenantToShardAsync(tenant, shardAssignment[tenant], CancellationToken.None);
}

foreach (var tenant in tenants)
{
var streamId = Guid.NewGuid();
await using var session = store.LightweightSession(tenant);
session.Events.StartStream<Bug4679Trip>(streamId,
new Bug4679TripStarted(streamId),
new Bug4679TripLeg(1.0),
new Bug4679TripLeg(2.5));
await session.SaveChangesAsync();
}

using var daemon = await store.BuildProjectionDaemonAsync("tA");

Exception? thrown = null;
try
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await daemon.CatchUpAsync(cts.Token);
}
catch (Exception e)
{
thrown = e;
_output.WriteLine("=== CatchUpAsync threw ===");
_output.WriteLine(e.ToString());
}

var names = await progressionNamesAsync(_fixture.ConnectionStrings[shardAssignment["tA"]]);
_output.WriteLine("=== mt_event_progression rows on shard_a ===");
foreach (var n in names) _output.WriteLine(n);

// OBSERVE: does the composite write store-global (tenant-less) progression names? If the
// composite execution doesn't honor the per-tenant ShardName, these bare :All rows appear
// (and collide on the 2nd tenant under InsertProjectionProgress => 23505).
thrown.ShouldBeNull("composite catch-up threw — captured: " + thrown?.Message);
names.ShouldNotContain("Bug4679Composite:All");
names.ShouldNotContain("Bug4679ShardedTrip:All");
names.ShouldNotContain("Bug4679ShardedCount:All");
}

private static async Task<List<string>> progressionNamesAsync(string connectionString)
{
await using var conn = new NpgsqlConnection(connectionString);
Expand Down
Loading