Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 85 additions & 3 deletions src/EventTests/ShardNameTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,95 @@ public void clone_for_database()
var name = new ShardName("Foo", "Other", 2);
var database = new Uri("postgresql://server1/db1/schema1");
var clone = name.CloneForDatabase(database);

clone.ShouldNotBeSameAs(name);
clone.Database.ShouldBe(database);
clone.Name.ShouldBe("Foo");
clone.ShardKey.ShouldBe("Other");
clone.Version.ShouldBe((uint)2);
}



[Fact]
public void null_tenant_keeps_identity_and_url_unchanged()
{
// The whole grammar change is additive: a null tenant must look exactly like today.
var name = ShardName.Compose("Foo", "All");
name.TenantId.ShouldBeNull();
name.Identity.ShouldBe("Foo:All");
name.RelativeUrl.ShouldBe("foo/all");

var versioned = ShardName.Compose("Foo", "All", version: 2);
versioned.TenantId.ShouldBeNull();
versioned.Identity.ShouldBe("Foo:V2:All");
versioned.RelativeUrl.ShouldBe("foo/all/v2");
}

[Fact]
public void compose_with_tenant_appends_distinct_trailing_slot()
{
var name = ShardName.Compose("Foo", "All", "tenant1");
name.TenantId.ShouldBe("tenant1");
name.ShardKey.ShouldBe("All"); // tenant is NOT folded into the shard key
name.Identity.ShouldBe("Foo:All:tenant1");
name.RelativeUrl.ShouldBe("foo/all/tenant1");

var versioned = ShardName.Compose("Foo", "All", "tenant1", 2);
versioned.Identity.ShouldBe("Foo:V2:All:tenant1");
versioned.RelativeUrl.ShouldBe("foo/all/v2/tenant1");
}

[Fact]
public void compose_treats_empty_tenant_as_store_global()
{
ShardName.Compose("Foo", "All", "").TenantId.ShouldBeNull();
ShardName.Compose("Foo", "All", "").Identity.ShouldBe("Foo:All");
}

[Theory]
[InlineData("Foo", "All", null, 1u)] // 2-segment
[InlineData("Foo", "Other", null, 1u)] // 2-segment, custom key
[InlineData("Foo", "All", "tenant1", 1u)] // 3-segment tenant form
[InlineData("Foo", "All", null, 2u)] // 3-segment version form
[InlineData("Foo", "All", "tenant1", 3u)] // 4-segment version + tenant
[InlineData("Foo", "Other", "acme", 2u)]
public void try_parse_round_trips_compose(string name, string key, string? tenant, uint version)
{
var composed = ShardName.Compose(name, key, tenant, version);

ShardName.TryParse(composed.Identity, out var parsed).ShouldBeTrue();
parsed.ShouldNotBeNull();
parsed!.Name.ShouldBe(name);
parsed.ShardKey.ShouldBe(key);
parsed.TenantId.ShouldBe(tenant);
parsed.Version.ShouldBe(version);
parsed.Identity.ShouldBe(composed.Identity);
parsed.ShouldBe(composed); // equality is identity-based
}

[Fact]
public void try_parse_round_trips_high_water_mark()
{
ShardName.TryParse(ShardState.HighWaterMark, out var parsed).ShouldBeTrue();
parsed!.Identity.ShouldBe(ShardState.HighWaterMark);
}

[Theory]
[InlineData(null)]
[InlineData("")]
public void try_parse_rejects_empty(string? text)
{
ShardName.TryParse(text, out var parsed).ShouldBeFalse();
parsed.ShouldBeNull();
}

[Fact]
public void for_tenant_rebinds_to_tenant_partition()
{
var global = ShardName.Compose("Foo", "All");
var scoped = global.ForTenant("tenant1");

scoped.TenantId.ShouldBe("tenant1");
scoped.Identity.ShouldBe("Foo:All:tenant1");
scoped.ForTenant(null).TenantId.ShouldBeNull();
}
}
66 changes: 66 additions & 0 deletions src/EventTests/TenantIdSurfaceSerializationTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System.Text.Json;
using JasperFx.Events.Daemon.HighWater;
using JasperFx.Events.Projections;
using Shouldly;

namespace EventTests;

// jasperfx#407 Phase 0: the new nullable TenantId slot on ShardState / HighWaterStatistics must be
// purely additive -- default to null and never break existing serialized payloads that predate it.
public class TenantIdSurfaceSerializationTests
{
[Fact]
public void shard_state_defaults_tenant_to_null()
{
new ShardState("Foo:All", 5).TenantId.ShouldBeNull();
}

[Fact]
public void shard_state_copies_tenant_from_shard_name()
{
var state = new ShardState(ShardName.Compose("Foo", "All", "tenant1"), 5);
state.TenantId.ShouldBe("tenant1");
}

[Fact]
public void shard_state_serializes_null_tenant_without_throwing()
{
// ShardState is published in-memory rather than JSON round-tripped, but writing it must
// still emit the additive slot as null so any consumer that does serialize it sees null.
var json = JsonSerializer.Serialize(new ShardState("Foo:All", 5));
using var doc = JsonDocument.Parse(json);
doc.RootElement.GetProperty("TenantId").ValueKind.ShouldBe(JsonValueKind.Null);
}

[Fact]
public void shard_state_serializes_set_tenant()
{
var json = JsonSerializer.Serialize(new ShardState("Foo:All", 5) { TenantId = "tenant1" });
using var doc = JsonDocument.Parse(json);
doc.RootElement.GetProperty("TenantId").GetString().ShouldBe("tenant1");
}

[Fact]
public void high_water_statistics_defaults_tenant_to_null()
{
new HighWaterStatistics().TenantId.ShouldBeNull();
}

[Fact]
public void high_water_statistics_round_trips_tenant()
{
var stats = new HighWaterStatistics { CurrentMark = 10, TenantId = "tenant1" };
var restored = JsonSerializer.Deserialize<HighWaterStatistics>(JsonSerializer.Serialize(stats));
restored!.TenantId.ShouldBe("tenant1");
restored.CurrentMark.ShouldBe(10);
}

[Fact]
public void existing_high_water_payload_without_tenant_still_deserializes()
{
const string legacy = """{"LastMark":3,"CurrentMark":7,"HighestSequence":7}""";
var restored = JsonSerializer.Deserialize<HighWaterStatistics>(legacy);
restored!.TenantId.ShouldBeNull();
restored.CurrentMark.ShouldBe(7);
}
}
7 changes: 7 additions & 0 deletions src/JasperFx.Events/Daemon/HighWater/HighWaterStatistics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ public class HighWaterStatistics
public bool IncludesSkipping { get; set; }
public DateTimeOffset Timestamp { get; set; } = default;

/// <summary>
/// Tenant partition this high water reading belongs to. Null means store-global, which is
/// the only behavior that existed before per-tenant partitioning. A vectorized high-water
/// agent emits one reading per assigned tenant; existing single-reading consumers leave this null.
/// </summary>
public string? TenantId { get; set; }

public HighWaterStatus InterpretStatus(HighWaterStatistics previous)
{
// Postgres sequences start w/ 1 by default. So the initial state is "HighestSequence = 1".
Expand Down
42 changes: 42 additions & 0 deletions src/JasperFx.Events/Daemon/IProjectionDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ public interface IProjectionDaemon: IDisposable
/// <returns></returns>
Task RebuildProjectionAsync(string projectionName, CancellationToken token);

/// <summary>
/// Rebuilds a single projection by projection name for a single tenant partition inline.
/// A null <paramref name="tenantId" /> is store-global and delegates to the tenant-less overload
/// (today's behavior). Daemons that implement per-tenant partitioning override this; the default
/// throws for a non-null tenant. See jasperfx#407.
/// </summary>
Task RebuildProjectionAsync(string projectionName, string? tenantId, CancellationToken token)
=> tenantId == null
? RebuildProjectionAsync(projectionName, token)
: throw new NotSupportedException(
"Per-tenant RebuildProjectionAsync is not implemented on this IProjectionDaemon. Use an event store that implements per-tenant partitioning.");


/// <summary>
/// Rebuilds a single projection by projection type inline.
Expand Down Expand Up @@ -72,6 +84,18 @@ public interface IProjectionDaemon: IDisposable
/// <returns></returns>
Task RebuildProjectionAsync(string projectionName, TimeSpan shardTimeout, CancellationToken token);

/// <summary>
/// Rebuilds a single projection by projection name for a single tenant partition inline.
/// A null <paramref name="tenantId" /> is store-global and delegates to the tenant-less overload
/// (today's behavior). Daemons that implement per-tenant partitioning override this; the default
/// throws for a non-null tenant. See jasperfx#407.
/// </summary>
Task RebuildProjectionAsync(string projectionName, string? tenantId, TimeSpan shardTimeout, CancellationToken token)
=> tenantId == null
? RebuildProjectionAsync(projectionName, shardTimeout, token)
: throw new NotSupportedException(
"Per-tenant RebuildProjectionAsync is not implemented on this IProjectionDaemon. Use an event store that implements per-tenant partitioning.");


/// <summary>
/// Rebuilds a single projection by projection type inline
Expand Down Expand Up @@ -202,4 +226,22 @@ public interface IProjectionDaemon: IDisposable
/// <returns></returns>
Task RewindSubscriptionAsync(string subscriptionName, CancellationToken token, long? sequenceFloor = 0,
DateTimeOffset? timestamp = null);

/// <summary>
/// Rewinds a subscription (or projection) for a single tenant partition to a certain point and
/// allows it to restart at that point. A null <paramref name="tenantId" /> is store-global and
/// delegates to the tenant-less overload (today's behavior). Daemons that implement per-tenant
/// partitioning override this; the default throws for a non-null tenant. See jasperfx#407.
/// </summary>
/// <param name="subscriptionName">Name of the subscription</param>
/// <param name="tenantId">Tenant partition to scope the rewind to. Null means store-global.</param>
/// <param name="token"></param>
/// <param name="sequenceFloor">The point at which to rewind the subscription. The default is zero</param>
/// <param name="timestamp">Optional parameter to rewind the subscription to rerun any events that were posted on or after this time. If the system cannot determine the sequence, it will do nothing</param>
Task RewindSubscriptionAsync(string subscriptionName, string? tenantId, CancellationToken token,
long? sequenceFloor = 0, DateTimeOffset? timestamp = null)
=> tenantId == null
? RewindSubscriptionAsync(subscriptionName, token, sequenceFloor, timestamp)
: throw new NotSupportedException(
"Per-tenant RewindSubscriptionAsync is not implemented on this IProjectionDaemon. Use an event store that implements per-tenant partitioning.");
}
30 changes: 28 additions & 2 deletions src/JasperFx.Events/IEventDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,19 @@ Task<long> ProjectionProgressFor(ShardName name,
/// <param name="token"></param>
/// <returns></returns>
Task<long?> FindEventStoreFloorAtTimeAsync(DateTimeOffset timestamp, CancellationToken token);


/// <summary>
/// Find the position of the event store sequence just below the supplied timestamp for a single
/// tenant partition. A null <paramref name="tenantId" /> is store-global and delegates to the
/// tenant-less overload (today's behavior). Event stores that implement per-tenant partitioning
/// override this; the default throws for a non-null tenant. See jasperfx#407.
/// </summary>
Task<long?> FindEventStoreFloorAtTimeAsync(DateTimeOffset timestamp, string? tenantId, CancellationToken token)
=> tenantId == null
? FindEventStoreFloorAtTimeAsync(timestamp, token)
: throw new NotSupportedException(
"Per-tenant FindEventStoreFloorAtTimeAsync is not implemented on this IEventDatabase. Use an event store that implements per-tenant partitioning.");

string StorageIdentifier { get; }
Task<long> FetchHighestEventSequenceNumber(CancellationToken token);

Expand All @@ -68,6 +80,20 @@ Task<long> ProjectionProgressFor(ShardName name,
Task<IReadOnlyList<ShardState>> AllProjectionProgress(
CancellationToken token = default);

/// <summary>
/// Check the current progress of all asynchronous projections for a single tenant partition.
/// A null <paramref name="tenantId" /> is store-global and delegates to the tenant-less overload
/// (today's behavior). Event stores that implement per-tenant partitioning override this; the
/// default throws for a non-null tenant. See jasperfx#407.
/// </summary>
/// <param name="tenantId">Tenant partition to scope progress to. Null means store-global.</param>
/// <param name="token"></param>
Task<IReadOnlyList<ShardState>> AllProjectionProgress(string? tenantId, CancellationToken token = default)
=> tenantId == null
? AllProjectionProgress(token)
: throw new NotSupportedException(
"Per-tenant AllProjectionProgress is not implemented on this IEventDatabase. Use an event store that implements per-tenant partitioning.");

/// <summary>
/// Count the stored dead letter events for a single projection/subscription shard. With
/// <c>SkipApplyErrors</c> on (the JasperFx.Events 2.0 default), a failed <c>Apply()</c> is
Expand All @@ -84,7 +110,7 @@ Task<long> CountDeadLetterEventsAsync(ShardName shard, CancellationToken token =
/// <summary>
/// Fetch the stored dead letter event counts for this database, one row per shard
/// (<see cref="DeadLetterShardCount.ProjectionName" /> + <see cref="DeadLetterShardCount.ShardKey" />).
/// Mirrors the "give me every row" shape of <see cref="AllProjectionProgress" />.
/// Mirrors the "give me every row" shape of <see cref="AllProjectionProgress(CancellationToken)" />.
/// The default implementation returns an empty list as a stand-in; event stores that
/// persist dead letters should override this. See jasperfx#356.
/// </summary>
Expand Down
27 changes: 27 additions & 0 deletions src/JasperFx.Events/IEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,20 @@ Task<IReadOnlyList<ProjectionStatus>> GetProjectionStatusesAsync(CancellationTok
=> throw new NotImplementedException(
"GetProjectionStatusesAsync is not implemented on this IEventStore. Use Marten or Polecat 6+ for the projections page.");

/// <summary>
/// Return a snapshot of every projection's status scoped to a single tenant partition.
/// A null <paramref name="tenantId"/> is store-global and delegates to the tenant-less
/// overload (today's behavior). Event stores that implement per-tenant partitioning
/// override this; the default throws for a non-null tenant. See jasperfx#407.
/// </summary>
/// <param name="tenantId">Tenant partition to scope statuses to. Null means store-global.</param>
/// <param name="ct">Cancellation token.</param>
Task<IReadOnlyList<ProjectionStatus>> GetProjectionStatusesAsync(string? tenantId, CancellationToken ct)
=> tenantId == null
? GetProjectionStatusesAsync(ct)
: throw new NotSupportedException(
"Per-tenant GetProjectionStatusesAsync is not implemented on this IEventStore. Use an event store that implements per-tenant partitioning.");

/// <summary>
/// Replay a projection over a fixed in-memory event list, returning
/// the per-step before/after state. Stateless — nothing is persisted
Expand Down Expand Up @@ -261,6 +275,19 @@ Task TeardownExistingProjectionStateAsync(IEventDatabase database, string subscr
Task DeleteProjectionProgressAsync(IEventDatabase database, string subscriptionName,
CancellationToken token);

/// <summary>
/// Delete *only* any persisted projection progress data for a single tenant partition.
/// A null <paramref name="tenantId"/> is store-global and delegates to the tenant-less
/// overload (today's behavior). Event stores that implement per-tenant partitioning
/// override this; the default throws for a non-null tenant. See jasperfx#407.
/// </summary>
Task DeleteProjectionProgressAsync(IEventDatabase database, string subscriptionName, string? tenantId,
CancellationToken token)
=> tenantId == null
? DeleteProjectionProgressAsync(database, subscriptionName, token)
: throw new NotSupportedException(
"Per-tenant DeleteProjectionProgressAsync is not implemented on this IEventStore. Use an event store that implements per-tenant partitioning.");

ValueTask<IProjectionBatch<TOperations, TQuerySession>> StartProjectionBatchAsync(EventRange range,
IEventDatabase database, ShardExecutionMode mode, AsyncOptions projectionOptions, CancellationToken token);

Expand Down
Loading
Loading