Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@
JasperFx 2.9.2: jasperfx#431 — ProjectionErrorHandlingDescriptor on EventStoreUsage
so monitoring tools (CritterWatch) can read the daemon error-handling policy off
the wire instead of presuming skip-and-DLQ behaviour. See JasperFx/ProductSupport#3. -->
<PackageVersion Include="JasperFx" Version="2.9.2" />
<PackageVersion Include="JasperFx.Events" Version="2.9.2" />
<PackageVersion Include="JasperFx.Events.SourceGenerator" Version="2.9.2">
<PackageVersion Include="JasperFx" Version="2.9.5" />
<PackageVersion Include="JasperFx.Events" Version="2.9.5" />
<PackageVersion Include="JasperFx.Events.SourceGenerator" Version="2.9.5">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageVersion>
<PackageVersion Include="JasperFx.SourceGenerator" Version="2.9.2" />
<PackageVersion Include="JasperFx.SourceGenerator" Version="2.9.5" />
<PackageVersion Include="Jil" Version="3.0.0-alpha2" />
<PackageVersion Include="Lamar" Version="7.1.1" />
<PackageVersion Include="Lamar.Microsoft.DependencyInjection" Version="15.0.0" />
Expand Down
36 changes: 36 additions & 0 deletions src/Marten/Events/Daemon/HighWater/HighWaterDetector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,42 @@ public Task MarkHighWaterMarkInDatabaseAsync(long currentMark, CancellationToken
return _runner.Query(new MarkHighWaterQueryHandler(_graph, currentMark), token);
}

// #4717: persist a durable per-tenant high-water row (HighWaterMark:<tenant>) so each tenant's mark
// survives a daemon restart. Under per-tenant event partitioning the store-global mt_events_sequence
// is never advanced and a single HighWaterMark row cannot represent multiple tenants. Invoked by
// JasperFx's TenantedHighWaterCoordinator on each vectorized per-tenant poll.
public Task MarkHighWaterForTenantAsync(string tenantId, long sequence, CancellationToken token)
{
return _runner.Query(new MarkTenantHighWaterQueryHandler(_graph, tenantId, sequence), token);
}

public class MarkTenantHighWaterQueryHandler: ISingleQueryHandler<bool>
{
private readonly EventGraph _graph;
private readonly string _tenantId;
private readonly long _currentMark;

public MarkTenantHighWaterQueryHandler(EventGraph graph, string tenantId, long currentMark)
{
_graph = graph;
_tenantId = tenantId;
_currentMark = currentMark;
}

public NpgsqlCommand BuildCommand()
{
return new NpgsqlCommand(
$"select {_graph.DatabaseSchemaName}.mt_mark_event_progression(:name, :seq);")
.With("name", HighWaterShardIdentity.PerTenant(_tenantId))
.With("seq", _currentMark);
}

public Task<bool> HandleAsync(DbDataReader reader, CancellationToken token)
{
return Task.FromResult(true);
}
}

public class MarkHighWaterQueryHandler: ISingleQueryHandler<bool>
{
private readonly EventGraph _graph;
Expand Down
25 changes: 19 additions & 6 deletions src/Marten/Events/Daemon/HighWater/HighWaterStatisticsDetector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,25 @@ internal class HighWaterStatisticsDetector: ISingleQueryHandler<HighWaterStatist

public HighWaterStatisticsDetector(EventGraph graph)
{
// #4681: the literal 'HighWaterMark' name is produced by HighWaterShardIdentity so
// any future change to the grammar (e.g. an alternate store-global name) lands in
// one place rather than scattered SQL string literals.
// #4712 (same bug class as #4705): under per-tenant event partitioning the store-global
// mt_events_sequence is never advanced — each tenant draws seq_ids from its own
// mt_events_sequence_{suffix} — so `last_value` reports a stale 1 while the true height is
// far higher. That made the store-global high-water agent treat the store as perpetually
// Stale. Read the real height from mt_events instead (mirrors FetchHighestEventSequenceNumber).
var highestSequenceSql = graph.UseTenantPartitionedEvents
? $"select coalesce(max(seq_id), 0), transaction_timestamp() from {graph.DatabaseSchemaName}.mt_events"
: $"select last_value, transaction_timestamp() from {graph.DatabaseSchemaName}.mt_events_sequence";

// #4712: stamp Timestamp from THIS first result, which always returns exactly one row. Reading
// the timestamp off the second (mt_event_progression) result left Timestamp at
// default(DateTimeOffset) = 0001-01-01 whenever the store-global 'HighWaterMark' progression
// row was absent — and that default is exactly what produced the bogus SafeHarborTime
// (0001-01-01 + 3s threshold) that turned the gap-skip into a no-op and hung composite rebuilds.
// #4681: the literal 'HighWaterMark' name is produced by HighWaterShardIdentity so any future
// change to the grammar lands in one place rather than scattered SQL string literals.
_commandText = $@"
select last_value from {graph.DatabaseSchemaName}.mt_events_sequence;
select last_seq_id, last_updated, transaction_timestamp() as timestamp from {graph.DatabaseSchemaName}.mt_event_progression where name = '{HighWaterShardIdentity.StoreGlobal}';
{highestSequenceSql};
select last_seq_id, last_updated from {graph.DatabaseSchemaName}.mt_event_progression where name = '{HighWaterShardIdentity.StoreGlobal}';
".Trim();
}

Expand All @@ -35,6 +48,7 @@ public async Task<HighWaterStatistics> HandleAsync(DbDataReader reader, Cancella
if (await reader.ReadAsync(token).ConfigureAwait(false))
{
statistics.HighestSequence = await reader.GetFieldValueAsync<long>(0, token).ConfigureAwait(false);
statistics.Timestamp = await reader.GetFieldValueAsync<DateTimeOffset>(1, token).ConfigureAwait(false);
}

await reader.NextResultAsync(token).ConfigureAwait(false);
Expand All @@ -47,7 +61,6 @@ public async Task<HighWaterStatistics> HandleAsync(DbDataReader reader, Cancella
statistics.LastMark = statistics.SafeStartMark =
await reader.GetFieldValueAsync<long>(0, token).ConfigureAwait(false);
statistics.LastUpdated = await reader.GetFieldValueAsync<DateTimeOffset>(1, token).ConfigureAwait(false);
statistics.Timestamp = await reader.GetFieldValueAsync<DateTimeOffset>(2, token).ConfigureAwait(false);

return statistics;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#nullable enable
using System;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Events;
using JasperFx.Events.Projections;
using Marten;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Daemon.HighWater;
using Marten.Storage;
using Marten.Testing.Harness;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Weasel.Core;
using Xunit;
using Xunit.Abstractions;

namespace TenantPartitionedEventsTests.Regressions;

/// <summary>
/// #4712 — composite projection rebuilds hang under per-tenant event partitioning because the
/// store-global high-water detector computes a bogus <c>SafeHarborTime</c> of <c>0001-01-01</c>
/// (≈ <c>DateTime.MinValue</c> + the 3s stale threshold).
///
/// <para>
/// Root cause (the #4705 bug class, one query that was missed): the store-global
/// <see cref="HighWaterStatisticsDetector"/> reads <c>select last_value from mt_events_sequence</c>
/// for <c>HighestSequence</c>. Under <c>UseTenantPartitionedEvents</c> the store-global
/// <c>mt_events_sequence</c> is never advanced (each tenant draws seq_ids from its own
/// <c>mt_events_sequence_{suffix}</c>), so <c>HighestSequence</c> reads 1 while the real high-water
/// mark is far higher. The store-global agent then treats the store as perpetually <c>Stale</c> and,
/// because no store-global <c>HighWaterMark</c> progression row was read, leaves
/// <see cref="HighWaterStatistics.Timestamp"/> at <c>default(DateTimeOffset)</c> = 0001-01-01 — the
/// source of the bogus SafeHarborTime. The fix mirrors #4705: read <c>coalesce(max(seq_id),0)</c>
/// from <c>mt_events</c> under per-tenant partitioning, and always stamp <c>Timestamp</c>.
/// </para>
///
/// <para>Single-DB, single tenant — per-tenant partitioning is the only load-bearing factor (the
/// sharded multi-composite hang in the report is the downstream symptom of this same wrong reading).</para>
/// </summary>
public partial class Bug_4712_safe_harbor_high_water
{
private readonly ITestOutputHelper _output;

public Bug_4712_safe_harbor_high_water(ITestOutputHelper output) => _output = output;

public class Bug4712Trip { public Guid Id { get; set; } public double Distance { get; set; } }

public record Bug4712Started(Guid Id);
public record Bug4712Leg(double Distance);

public partial class Bug4712TripProjection: SingleStreamProjection<Bug4712Trip, Guid>
{
public Bug4712TripProjection() => Name = "Bug4712Trip";
public void Apply(Bug4712Trip a, Bug4712Leg e) => a.Distance += e.Distance;
}

[Fact]
public async Task high_water_detect_reports_true_max_sequence_and_a_real_timestamp()
{
using var store = (DocumentStore)DocumentStore.For(o =>
{
o.Connection(ConnectionSource.ConnectionString);
o.DatabaseSchemaName = $"bug4712_p{Environment.ProcessId}";
o.Events.TenancyStyle = TenancyStyle.Conjoined;
o.Events.UseTenantPartitionedEvents = true;
o.Events.AppendMode = EventAppendMode.QuickWithServerTimestamps;
o.Policies.AllDocumentsAreMultiTenanted();
o.Schema.For<Bug4712Trip>().DocumentAlias("b4712_trip");
o.Projections.Add<Bug4712TripProjection>(ProjectionLifecycle.Async);
});

await store.Advanced.Clean.CompletelyRemoveAllAsync();
await store.Storage.Database.EnsureStorageExistsAsync(typeof(IEvent));
await store.Advanced.AddMartenManagedTenantsAsync(CancellationToken.None, "t4712");

long appended = 0;
await using (var session = store.LightweightSession("t4712"))
{
for (var s = 0; s < 10; s++)
{
var streamId = Guid.NewGuid();
session.Events.StartStream<Bug4712Trip>(streamId,
new Bug4712Started(streamId), new Bug4712Leg(1.0), new Bug4712Leg(2.0), new Bug4712Leg(3.0));
appended += 4;
}

await session.SaveChangesAsync();
}

var database = (MartenDatabase)store.Storage.Database;
var detector = new HighWaterDetector(database, store.Options.EventGraph, NullLogger.Instance);

var statistics = await detector.Detect(CancellationToken.None);

_output.WriteLine($"appended={appended}, HighestSequence={statistics.HighestSequence}, " +
$"CurrentMark={statistics.CurrentMark}, Timestamp={statistics.Timestamp:O}");

// The high-water detector must see the real sequence height. Without the fix this reads 1
// (the never-advanced store-global mt_events_sequence) and the store-global agent loops
// forever in the Stale branch.
statistics.HighestSequence.ShouldBe(appended);

// And it must carry a real timestamp — a default(DateTimeOffset) here is exactly what yields
// the 0001-01-01 SafeHarborTime that makes the gap-skip a no-op and hangs composite rebuilds.
statistics.Timestamp.ShouldNotBe(default);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx;
using JasperFx.Core;
using JasperFx.Events;
using JasperFx.Events.Projections;
using Marten;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Npgsql;
using Shouldly;
using Weasel.Core;
using Xunit;
using Xunit.Abstractions;

namespace TenantPartitionedEventsTests.Regressions;

/// <summary>
/// #4717 — under per-tenant event partitioning the async daemon must persist PER-TENANT progression
/// records, not a single store-global <c>&lt;Projection&gt;:All</c> row. Each tenant's events come from
/// its own <c>mt_events_sequence_{suffix}</c> starting at 1, so with two+ tenants their seq_id ranges
/// overlap and a single <c>:All</c> shard cannot represent "how far each tenant has been projected".
///
/// <para>
/// This proves the requirement directly: two tenants with DIFFERENT event counts on one
/// per-tenant-partitioned database, running BOTH a composite projection and a standalone async
/// projection continuously, must produce a per-tenant progression row per (projection, tenant) whose
/// last_seq_id is that tenant's own height — plus a per-tenant high-water row per tenant.
/// </para>
/// </summary>
public partial class Bug_4717_per_tenant_progression
{
private readonly ITestOutputHelper _output;

public Bug_4717_per_tenant_progression(ITestOutputHelper output) => _output = output;

public class Bug4717Trip { public Guid Id { get; set; } public double Distance { get; set; } }
public class Bug4717Count { public Guid Id { get; set; } public int Count { get; set; } }
public class Bug4717Standalone { public Guid Id { get; set; } public double Total { get; set; } }

public record Bug4717Started(Guid Id);
public record Bug4717Leg(double Distance);

public partial class Bug4717TripProjection: SingleStreamProjection<Bug4717Trip, Guid>
{
public Bug4717TripProjection() => Name = "Bug4717Trip";
public void Apply(Bug4717Trip a, Bug4717Leg e) => a.Distance += e.Distance;
}

public partial class Bug4717CountProjection: SingleStreamProjection<Bug4717Count, Guid>
{
public Bug4717CountProjection() => Name = "Bug4717Count";
public void Apply(Bug4717Count a, Bug4717Leg e) => a.Count++;
}

public partial class Bug4717StandaloneProjection: SingleStreamProjection<Bug4717Standalone, Guid>
{
public Bug4717StandaloneProjection() => Name = "Bug4717Standalone";
public void Apply(Bug4717Standalone a, Bug4717Leg e) => a.Total += e.Distance;
}

private static readonly string Schema = $"bug4717_p{Environment.ProcessId}";

[Fact]
public async Task daemon_persists_per_tenant_progression_for_standalone_and_composite()
{
using var store = (DocumentStore)DocumentStore.For(o =>
{
o.Connection(ConnectionSource.ConnectionString);
o.DatabaseSchemaName = Schema;
o.Events.TenancyStyle = TenancyStyle.Conjoined;
o.Events.UseTenantPartitionedEvents = true;
o.Events.AppendMode = EventAppendMode.QuickWithServerTimestamps;
o.Policies.AllDocumentsAreMultiTenanted();

o.Schema.For<Bug4717Trip>().DocumentAlias("b4717_trip");
o.Schema.For<Bug4717Count>().DocumentAlias("b4717_cnt");
o.Schema.For<Bug4717Standalone>().DocumentAlias("b4717_std");

// A standalone async projection AND a composite bundle, both continuous.
o.Projections.Add<Bug4717StandaloneProjection>(ProjectionLifecycle.Async);
o.Projections.CompositeProjectionFor("bug4717-composite", c =>
{
c.Add<Bug4717TripProjection>(stageNumber: 1);
c.Add<Bug4717CountProjection>(stageNumber: 2);
});
});

await store.Advanced.Clean.CompletelyRemoveAllAsync();
await store.Storage.Database.EnsureStorageExistsAsync(typeof(IEvent));

// Two tenants with DIFFERENT event heights — independent per-tenant sequences.
var streamsPerTenant = new Dictionary<string, int> { ["t4717_a"] = 5, ["t4717_b"] = 3 };
var expectedSeq = streamsPerTenant.ToDictionary(p => p.Key, p => (long)(p.Value * 4)); // 4 events/stream
await store.Advanced.AddMartenManagedTenantsAsync(CancellationToken.None, streamsPerTenant.Keys.ToArray());

foreach (var (tenant, streams) in streamsPerTenant)
{
await using var session = store.LightweightSession(tenant);
for (var s = 0; s < streams; s++)
{
var id = Guid.NewGuid();
session.Events.StartStream<Bug4717Trip>(id,
new Bug4717Started(id), new Bug4717Leg(1.0), new Bug4717Leg(2.0), new Bug4717Leg(3.0));
}

await session.SaveChangesAsync();
}

using var daemon = await store.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();

// Poll until every (projection, tenant) has a per-tenant progression row at the expected height.
var projections = new[] { "Bug4717Standalone", "Bug4717Trip", "Bug4717Count", "bug4717-composite" };
var sw = Stopwatch.StartNew();
List<(string Name, long Seq)> rows = new();
while (sw.Elapsed < 30.Seconds())
{
rows = await progressionRowsAsync();
var allReady = projections.All(p => streamsPerTenant.Keys.All(t =>
rows.Any(r => isPerTenantRow(r.Name, p, t) && r.Seq >= expectedSeq[t])));
if (allReady) break;
await Task.Delay(500);
}

await daemon.StopAllAsync();

_output.WriteLine("=== mt_event_progression ===");
foreach (var (name, seq) in rows.OrderBy(r => r.Name)) _output.WriteLine($"{seq,6} | {name}");

// Per-tenant PROJECTION progress: each (projection, tenant) tracked independently at the
// tenant's own height. Today only store-global "<Projection>:All" rows are written (#4717).
foreach (var projection in new[] { "Bug4717Standalone", "Bug4717Trip", "Bug4717Count" })
{
foreach (var (tenant, seq) in expectedSeq)
{
var row = rows.FirstOrDefault(r => isPerTenantRow(r.Name, projection, tenant));
row.Name.ShouldNotBeNull(
$"expected a per-tenant progression row for {projection} / tenant {tenant} " +
$"(rows: {string.Join("; ", rows.Select(r => $"{r.Name}={r.Seq}"))})");
row.Seq.ShouldBe(seq,
$"{projection} for tenant {tenant} should track its own sequence height {seq}");
}
}

// Per-tenant HIGH-WATER: one row per tenant, at the tenant's own height.
foreach (var (tenant, seq) in expectedSeq)
{
var hw = rows.FirstOrDefault(r => r.Name == $"HighWaterMark:{tenant}");
hw.Name.ShouldNotBeNull($"expected a per-tenant HighWaterMark row for tenant {tenant}");
hw.Seq.ShouldBe(seq, $"HighWaterMark for tenant {tenant} should be its own height {seq}");
}
}

// A per-tenant row carries the tenant in the trailing slot of the ShardName grammar
// ("<Projection>:<shardKey>:<tenant>"), e.g. "Bug4717Standalone:All:t4717_a".
private static bool isPerTenantRow(string name, string projection, string tenant) =>
name.StartsWith(projection + ":", StringComparison.Ordinal) &&
name.EndsWith(":" + tenant, StringComparison.Ordinal);

private static async Task<List<(string Name, long Seq)>> progressionRowsAsync()
{
await using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand(
$"select name, coalesce(last_seq_id,0) from {Schema}.mt_event_progression order by name");
await using var reader = await cmd.ExecuteReaderAsync();
var rows = new List<(string, long)>();
while (await reader.ReadAsync()) rows.Add((reader.GetString(0), reader.GetInt64(1)));
return rows;
}
}
Loading
Loading