diff --git a/src/EventTests/ShardNameTests.cs b/src/EventTests/ShardNameTests.cs index 55486109..77a2af7b 100644 --- a/src/EventTests/ShardNameTests.cs +++ b/src/EventTests/ShardNameTests.cs @@ -56,13 +56,95 @@ public void clone_for_database() var name = new ShardName("Foo", "Other", 2); var database = new Uri("postgresql://server1/db1/schema1"); var clone = name.CloneForDatabase(database); - + clone.ShouldNotBeSameAs(name); clone.Database.ShouldBe(database); clone.Name.ShouldBe("Foo"); clone.ShardKey.ShouldBe("Other"); clone.Version.ShouldBe((uint)2); } - - + + [Fact] + public void null_tenant_keeps_identity_and_url_unchanged() + { + // The whole grammar change is additive: a null tenant must look exactly like today. + var name = ShardName.Compose("Foo", "All"); + name.TenantId.ShouldBeNull(); + name.Identity.ShouldBe("Foo:All"); + name.RelativeUrl.ShouldBe("foo/all"); + + var versioned = ShardName.Compose("Foo", "All", version: 2); + versioned.TenantId.ShouldBeNull(); + versioned.Identity.ShouldBe("Foo:V2:All"); + versioned.RelativeUrl.ShouldBe("foo/all/v2"); + } + + [Fact] + public void compose_with_tenant_appends_distinct_trailing_slot() + { + var name = ShardName.Compose("Foo", "All", "tenant1"); + name.TenantId.ShouldBe("tenant1"); + name.ShardKey.ShouldBe("All"); // tenant is NOT folded into the shard key + name.Identity.ShouldBe("Foo:All:tenant1"); + name.RelativeUrl.ShouldBe("foo/all/tenant1"); + + var versioned = ShardName.Compose("Foo", "All", "tenant1", 2); + versioned.Identity.ShouldBe("Foo:V2:All:tenant1"); + versioned.RelativeUrl.ShouldBe("foo/all/v2/tenant1"); + } + + [Fact] + public void compose_treats_empty_tenant_as_store_global() + { + ShardName.Compose("Foo", "All", "").TenantId.ShouldBeNull(); + ShardName.Compose("Foo", "All", "").Identity.ShouldBe("Foo:All"); + } + + [Theory] + [InlineData("Foo", "All", null, 1u)] // 2-segment + [InlineData("Foo", "Other", null, 1u)] // 2-segment, custom key + [InlineData("Foo", "All", "tenant1", 1u)] // 3-segment tenant form + [InlineData("Foo", "All", null, 2u)] // 3-segment version form + [InlineData("Foo", "All", "tenant1", 3u)] // 4-segment version + tenant + [InlineData("Foo", "Other", "acme", 2u)] + public void try_parse_round_trips_compose(string name, string key, string? tenant, uint version) + { + var composed = ShardName.Compose(name, key, tenant, version); + + ShardName.TryParse(composed.Identity, out var parsed).ShouldBeTrue(); + parsed.ShouldNotBeNull(); + parsed!.Name.ShouldBe(name); + parsed.ShardKey.ShouldBe(key); + parsed.TenantId.ShouldBe(tenant); + parsed.Version.ShouldBe(version); + parsed.Identity.ShouldBe(composed.Identity); + parsed.ShouldBe(composed); // equality is identity-based + } + + [Fact] + public void try_parse_round_trips_high_water_mark() + { + ShardName.TryParse(ShardState.HighWaterMark, out var parsed).ShouldBeTrue(); + parsed!.Identity.ShouldBe(ShardState.HighWaterMark); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + public void try_parse_rejects_empty(string? text) + { + ShardName.TryParse(text, out var parsed).ShouldBeFalse(); + parsed.ShouldBeNull(); + } + + [Fact] + public void for_tenant_rebinds_to_tenant_partition() + { + var global = ShardName.Compose("Foo", "All"); + var scoped = global.ForTenant("tenant1"); + + scoped.TenantId.ShouldBe("tenant1"); + scoped.Identity.ShouldBe("Foo:All:tenant1"); + scoped.ForTenant(null).TenantId.ShouldBeNull(); + } } \ No newline at end of file diff --git a/src/EventTests/TenantIdSurfaceSerializationTests.cs b/src/EventTests/TenantIdSurfaceSerializationTests.cs new file mode 100644 index 00000000..84cfaeb0 --- /dev/null +++ b/src/EventTests/TenantIdSurfaceSerializationTests.cs @@ -0,0 +1,66 @@ +using System.Text.Json; +using JasperFx.Events.Daemon.HighWater; +using JasperFx.Events.Projections; +using Shouldly; + +namespace EventTests; + +// jasperfx#407 Phase 0: the new nullable TenantId slot on ShardState / HighWaterStatistics must be +// purely additive -- default to null and never break existing serialized payloads that predate it. +public class TenantIdSurfaceSerializationTests +{ + [Fact] + public void shard_state_defaults_tenant_to_null() + { + new ShardState("Foo:All", 5).TenantId.ShouldBeNull(); + } + + [Fact] + public void shard_state_copies_tenant_from_shard_name() + { + var state = new ShardState(ShardName.Compose("Foo", "All", "tenant1"), 5); + state.TenantId.ShouldBe("tenant1"); + } + + [Fact] + public void shard_state_serializes_null_tenant_without_throwing() + { + // ShardState is published in-memory rather than JSON round-tripped, but writing it must + // still emit the additive slot as null so any consumer that does serialize it sees null. + var json = JsonSerializer.Serialize(new ShardState("Foo:All", 5)); + using var doc = JsonDocument.Parse(json); + doc.RootElement.GetProperty("TenantId").ValueKind.ShouldBe(JsonValueKind.Null); + } + + [Fact] + public void shard_state_serializes_set_tenant() + { + var json = JsonSerializer.Serialize(new ShardState("Foo:All", 5) { TenantId = "tenant1" }); + using var doc = JsonDocument.Parse(json); + doc.RootElement.GetProperty("TenantId").GetString().ShouldBe("tenant1"); + } + + [Fact] + public void high_water_statistics_defaults_tenant_to_null() + { + new HighWaterStatistics().TenantId.ShouldBeNull(); + } + + [Fact] + public void high_water_statistics_round_trips_tenant() + { + var stats = new HighWaterStatistics { CurrentMark = 10, TenantId = "tenant1" }; + var restored = JsonSerializer.Deserialize(JsonSerializer.Serialize(stats)); + restored!.TenantId.ShouldBe("tenant1"); + restored.CurrentMark.ShouldBe(10); + } + + [Fact] + public void existing_high_water_payload_without_tenant_still_deserializes() + { + const string legacy = """{"LastMark":3,"CurrentMark":7,"HighestSequence":7}"""; + var restored = JsonSerializer.Deserialize(legacy); + restored!.TenantId.ShouldBeNull(); + restored.CurrentMark.ShouldBe(7); + } +} diff --git a/src/JasperFx.Events/Daemon/HighWater/HighWaterStatistics.cs b/src/JasperFx.Events/Daemon/HighWater/HighWaterStatistics.cs index 536f54ac..1d9769d2 100644 --- a/src/JasperFx.Events/Daemon/HighWater/HighWaterStatistics.cs +++ b/src/JasperFx.Events/Daemon/HighWater/HighWaterStatistics.cs @@ -16,6 +16,13 @@ public class HighWaterStatistics public bool IncludesSkipping { get; set; } public DateTimeOffset Timestamp { get; set; } = default; + /// + /// Tenant partition this high water reading belongs to. Null means store-global, which is + /// the only behavior that existed before per-tenant partitioning. A vectorized high-water + /// agent emits one reading per assigned tenant; existing single-reading consumers leave this null. + /// + public string? TenantId { get; set; } + public HighWaterStatus InterpretStatus(HighWaterStatistics previous) { // Postgres sequences start w/ 1 by default. So the initial state is "HighestSequence = 1". diff --git a/src/JasperFx.Events/Daemon/IProjectionDaemon.cs b/src/JasperFx.Events/Daemon/IProjectionDaemon.cs index f39c796e..10220869 100644 --- a/src/JasperFx.Events/Daemon/IProjectionDaemon.cs +++ b/src/JasperFx.Events/Daemon/IProjectionDaemon.cs @@ -35,6 +35,18 @@ public interface IProjectionDaemon: IDisposable /// Task RebuildProjectionAsync(string projectionName, CancellationToken token); + /// + /// Rebuilds a single projection by projection name for a single tenant partition inline. + /// A null is store-global and delegates to the tenant-less overload + /// (today's behavior). Daemons that implement per-tenant partitioning override this; the default + /// throws for a non-null tenant. See jasperfx#407. + /// + Task RebuildProjectionAsync(string projectionName, string? tenantId, CancellationToken token) + => tenantId == null + ? RebuildProjectionAsync(projectionName, token) + : throw new NotSupportedException( + "Per-tenant RebuildProjectionAsync is not implemented on this IProjectionDaemon. Use an event store that implements per-tenant partitioning."); + /// /// Rebuilds a single projection by projection type inline. @@ -72,6 +84,18 @@ public interface IProjectionDaemon: IDisposable /// Task RebuildProjectionAsync(string projectionName, TimeSpan shardTimeout, CancellationToken token); + /// + /// Rebuilds a single projection by projection name for a single tenant partition inline. + /// A null is store-global and delegates to the tenant-less overload + /// (today's behavior). Daemons that implement per-tenant partitioning override this; the default + /// throws for a non-null tenant. See jasperfx#407. + /// + Task RebuildProjectionAsync(string projectionName, string? tenantId, TimeSpan shardTimeout, CancellationToken token) + => tenantId == null + ? RebuildProjectionAsync(projectionName, shardTimeout, token) + : throw new NotSupportedException( + "Per-tenant RebuildProjectionAsync is not implemented on this IProjectionDaemon. Use an event store that implements per-tenant partitioning."); + /// /// Rebuilds a single projection by projection type inline @@ -202,4 +226,22 @@ public interface IProjectionDaemon: IDisposable /// Task RewindSubscriptionAsync(string subscriptionName, CancellationToken token, long? sequenceFloor = 0, DateTimeOffset? timestamp = null); + + /// + /// Rewinds a subscription (or projection) for a single tenant partition to a certain point and + /// allows it to restart at that point. A null is store-global and + /// delegates to the tenant-less overload (today's behavior). Daemons that implement per-tenant + /// partitioning override this; the default throws for a non-null tenant. See jasperfx#407. + /// + /// Name of the subscription + /// Tenant partition to scope the rewind to. Null means store-global. + /// + /// The point at which to rewind the subscription. The default is zero + /// Optional parameter to rewind the subscription to rerun any events that were posted on or after this time. If the system cannot determine the sequence, it will do nothing + Task RewindSubscriptionAsync(string subscriptionName, string? tenantId, CancellationToken token, + long? sequenceFloor = 0, DateTimeOffset? timestamp = null) + => tenantId == null + ? RewindSubscriptionAsync(subscriptionName, token, sequenceFloor, timestamp) + : throw new NotSupportedException( + "Per-tenant RewindSubscriptionAsync is not implemented on this IProjectionDaemon. Use an event store that implements per-tenant partitioning."); } diff --git a/src/JasperFx.Events/IEventDatabase.cs b/src/JasperFx.Events/IEventDatabase.cs index 2ebb1029..e79d82ed 100644 --- a/src/JasperFx.Events/IEventDatabase.cs +++ b/src/JasperFx.Events/IEventDatabase.cs @@ -52,7 +52,19 @@ Task ProjectionProgressFor(ShardName name, /// /// Task FindEventStoreFloorAtTimeAsync(DateTimeOffset timestamp, CancellationToken token); - + + /// + /// Find the position of the event store sequence just below the supplied timestamp for a single + /// tenant partition. A null is store-global and delegates to the + /// tenant-less overload (today's behavior). Event stores that implement per-tenant partitioning + /// override this; the default throws for a non-null tenant. See jasperfx#407. + /// + Task FindEventStoreFloorAtTimeAsync(DateTimeOffset timestamp, string? tenantId, CancellationToken token) + => tenantId == null + ? FindEventStoreFloorAtTimeAsync(timestamp, token) + : throw new NotSupportedException( + "Per-tenant FindEventStoreFloorAtTimeAsync is not implemented on this IEventDatabase. Use an event store that implements per-tenant partitioning."); + string StorageIdentifier { get; } Task FetchHighestEventSequenceNumber(CancellationToken token); @@ -68,6 +80,20 @@ Task ProjectionProgressFor(ShardName name, Task> AllProjectionProgress( CancellationToken token = default); + /// + /// Check the current progress of all asynchronous projections for a single tenant partition. + /// A null is store-global and delegates to the tenant-less overload + /// (today's behavior). Event stores that implement per-tenant partitioning override this; the + /// default throws for a non-null tenant. See jasperfx#407. + /// + /// Tenant partition to scope progress to. Null means store-global. + /// + Task> AllProjectionProgress(string? tenantId, CancellationToken token = default) + => tenantId == null + ? AllProjectionProgress(token) + : throw new NotSupportedException( + "Per-tenant AllProjectionProgress is not implemented on this IEventDatabase. Use an event store that implements per-tenant partitioning."); + /// /// Count the stored dead letter events for a single projection/subscription shard. With /// SkipApplyErrors on (the JasperFx.Events 2.0 default), a failed Apply() is @@ -84,7 +110,7 @@ Task CountDeadLetterEventsAsync(ShardName shard, CancellationToken token = /// /// Fetch the stored dead letter event counts for this database, one row per shard /// ( + ). - /// Mirrors the "give me every row" shape of . + /// Mirrors the "give me every row" shape of . /// The default implementation returns an empty list as a stand-in; event stores that /// persist dead letters should override this. See jasperfx#356. /// diff --git a/src/JasperFx.Events/IEventStore.cs b/src/JasperFx.Events/IEventStore.cs index 3d3f8fff..a077c64e 100644 --- a/src/JasperFx.Events/IEventStore.cs +++ b/src/JasperFx.Events/IEventStore.cs @@ -174,6 +174,20 @@ Task> GetProjectionStatusesAsync(CancellationTok => throw new NotImplementedException( "GetProjectionStatusesAsync is not implemented on this IEventStore. Use Marten or Polecat 6+ for the projections page."); + /// + /// Return a snapshot of every projection's status scoped to a single tenant partition. + /// A null is store-global and delegates to the tenant-less + /// overload (today's behavior). Event stores that implement per-tenant partitioning + /// override this; the default throws for a non-null tenant. See jasperfx#407. + /// + /// Tenant partition to scope statuses to. Null means store-global. + /// Cancellation token. + Task> GetProjectionStatusesAsync(string? tenantId, CancellationToken ct) + => tenantId == null + ? GetProjectionStatusesAsync(ct) + : throw new NotSupportedException( + "Per-tenant GetProjectionStatusesAsync is not implemented on this IEventStore. Use an event store that implements per-tenant partitioning."); + /// /// Replay a projection over a fixed in-memory event list, returning /// the per-step before/after state. Stateless — nothing is persisted @@ -261,6 +275,19 @@ Task TeardownExistingProjectionStateAsync(IEventDatabase database, string subscr Task DeleteProjectionProgressAsync(IEventDatabase database, string subscriptionName, CancellationToken token); + /// + /// Delete *only* any persisted projection progress data for a single tenant partition. + /// A null is store-global and delegates to the tenant-less + /// overload (today's behavior). Event stores that implement per-tenant partitioning + /// override this; the default throws for a non-null tenant. See jasperfx#407. + /// + Task DeleteProjectionProgressAsync(IEventDatabase database, string subscriptionName, string? tenantId, + CancellationToken token) + => tenantId == null + ? DeleteProjectionProgressAsync(database, subscriptionName, token) + : throw new NotSupportedException( + "Per-tenant DeleteProjectionProgressAsync is not implemented on this IEventStore. Use an event store that implements per-tenant partitioning."); + ValueTask> StartProjectionBatchAsync(EventRange range, IEventDatabase database, ShardExecutionMode mode, AsyncOptions projectionOptions, CancellationToken token); diff --git a/src/JasperFx.Events/Projections/ShardName.cs b/src/JasperFx.Events/Projections/ShardName.cs index 7a749564..7786df27 100644 --- a/src/JasperFx.Events/Projections/ShardName.cs +++ b/src/JasperFx.Events/Projections/ShardName.cs @@ -9,25 +9,32 @@ public class ShardName { public const string All = "All"; - + [JsonConstructor] - public ShardName(string name, string shardKey, uint version) + public ShardName(string name, string shardKey, uint version, string? tenantId) { Name = name; ShardKey = shardKey; Version = version; + TenantId = string.IsNullOrEmpty(tenantId) ? null : tenantId; + + // The tenant is a distinct, trailing slot in the grammar -- it is NEVER folded + // into the ShardKey. A null/empty tenant means "store-global" and keeps the + // Identity/RelativeUrl byte-for-byte identical to the pre-tenancy behavior. + var identitySuffix = TenantId == null ? string.Empty : $":{TenantId}"; + var urlSuffix = TenantId == null ? string.Empty : $"/{TenantId}"; if (version > 1) { - Identity = $"{name}:V{version}:{shardKey}"; - RelativeUrl = $"{name}/{shardKey}/v{version}".ToLowerInvariant(); + Identity = $"{name}:V{version}:{shardKey}{identitySuffix}"; + RelativeUrl = $"{name}/{shardKey}/v{version}{urlSuffix}".ToLowerInvariant(); } else { - Identity = $"{name}:{shardKey}"; - RelativeUrl = $"{name}/{shardKey}".ToLowerInvariant(); + Identity = $"{name}:{shardKey}{identitySuffix}"; + RelativeUrl = $"{name}/{shardKey}{urlSuffix}".ToLowerInvariant(); } - + if (name == ShardState.HighWaterMark) { Identity = ShardState.HighWaterMark; @@ -35,15 +42,102 @@ public ShardName(string name, string shardKey, uint version) } - public ShardName(string name): this(name, All, 1) + public ShardName(string name, string shardKey, uint version): this(name, shardKey, version, null) + { + } + + public ShardName(string name): this(name, All, 1, null) + { + } + + /// + /// Compose a from its parts. The optional + /// occupies a distinct, trailing slot in the shard grammar and is never folded into the + /// . A null/empty tenant is store-global (today's default behavior). + /// + /// Projection or composite-group identity + /// Identity of the shard within the projection. Defaults to "All". + /// Optional tenant partition suffix. Null/empty means store-global. + /// Projection version. Defaults to 1. + public static ShardName Compose(string name, string? shardKey = All, string? tenantId = null, uint version = 1) + { + return new ShardName(name, string.IsNullOrEmpty(shardKey) ? All : shardKey, version, tenantId); + } + + /// + /// Parse a shard string back into a . + /// Understands every form produced by /: + /// Name:ShardKey, Name:ShardKey:Tenant, Name:V{n}:ShardKey, and + /// Name:V{n}:ShardKey:Tenant. A leading V{digits} segment is interpreted as a + /// version marker; otherwise the trailing segment of a 3-part identity is the tenant. + /// + public static bool TryParse(string? text, out ShardName? shardName) + { + shardName = null; + if (string.IsNullOrEmpty(text)) + { + return false; + } + + if (text == ShardState.HighWaterMark) + { + shardName = new ShardName(ShardState.HighWaterMark); + return true; + } + + var parts = text.Split(':'); + switch (parts.Length) + { + case 2: + // Name:ShardKey + shardName = new ShardName(parts[0], parts[1], 1, null); + return true; + + case 3 when TryParseVersion(parts[1], out var versionedKeyVersion): + // Name:V{n}:ShardKey + shardName = new ShardName(parts[0], parts[2], versionedKeyVersion, null); + return true; + + case 3: + // Name:ShardKey:Tenant + shardName = new ShardName(parts[0], parts[1], 1, parts[2]); + return true; + + case 4 when TryParseVersion(parts[1], out var versionedTenantVersion): + // Name:V{n}:ShardKey:Tenant + shardName = new ShardName(parts[0], parts[2], versionedTenantVersion, parts[3]); + return true; + + default: + return false; + } + } + + private static bool TryParseVersion(string segment, out uint version) { + version = 1; + if (segment.Length < 2 || segment[0] != 'V') + { + return false; + } + + return uint.TryParse(segment.AsSpan(1), out version); } - + public string RelativeUrl { get; } public ShardName CloneForDatabase(Uri database) { - return new ShardName(Name, ShardKey, Version) { Database = database }; + return new ShardName(Name, ShardKey, Version, TenantId) { Database = database }; + } + + /// + /// Return an equivalent bound to the supplied tenant. A null/empty + /// tenant yields the store-global shard. + /// + public ShardName ForTenant(string? tenantId) + { + return new ShardName(Name, ShardKey, Version, tenantId) { Database = Database }; } public Uri Database { get; set; } = new Uri("database://default"); @@ -59,6 +153,13 @@ public ShardName CloneForDatabase(Uri database) /// public string ShardKey { get; } + /// + /// Optional tenant partition this shard is scoped to. Null means store-global -- the only + /// behavior that existed before per-tenant partitioning. This is a distinct slot in the shard + /// grammar and is never folded into . + /// + public string? TenantId { get; } + /// /// {ProjectionName}:{Key}. Single identity string that should be unique within this application /// diff --git a/src/JasperFx.Events/Projections/ShardState.cs b/src/JasperFx.Events/Projections/ShardState.cs index 118d1c3c..ad28e907 100644 --- a/src/JasperFx.Events/Projections/ShardState.cs +++ b/src/JasperFx.Events/Projections/ShardState.cs @@ -26,8 +26,16 @@ public ShardState(string shardName, long sequence) public ShardState(ShardName shardName, long sequence): this(shardName.Identity, sequence) { + TenantId = shardName.TenantId; } + /// + /// Tenant partition this shard state belongs to. Null means store-global, which is the + /// only behavior that existed before per-tenant partitioning. Serializes as null for + /// existing consumers that never set it. + /// + public string? TenantId { get; set; } + public long RebuildThreshold { get; set; } public ShardMode Mode { get; set; } = ShardMode.continuous;