diff --git a/src/EventSourcingTests/Dcb/conjoined_tenancy_dcb_tag_tests.cs b/src/EventSourcingTests/Dcb/conjoined_tenancy_dcb_tag_tests.cs new file mode 100644 index 0000000000..a07b3852bd --- /dev/null +++ b/src/EventSourcingTests/Dcb/conjoined_tenancy_dcb_tag_tests.cs @@ -0,0 +1,298 @@ +#nullable enable +using System; +using System.Linq; +using System.Threading.Tasks; +using JasperFx.Events; +using JasperFx.Events.Tags; +using Marten; +using Marten.Events; +using Marten.Events.Dcb; +using Marten.Storage; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.Dcb; + +[Collection("OneOffs")] +public class conjoined_tenancy_dcb_tag_tests: OneOffConfigurationsContext +{ + private const string TenantA = "tenant-a"; + private const string TenantB = "tenant-b"; + + private void ConfigureConjoinedStore() + { + StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + + opts.Events.AddEventType(); + opts.Events.AddEventType(); + opts.Events.AddEventType(); + opts.Events.AddEventType(); + + opts.Events.RegisterTagType("student") + .ForAggregate(); + opts.Events.RegisterTagType("course") + .ForAggregate(); + + opts.Projections.LiveStreamAggregation(); + }); + } + + [Fact] + public async Task can_create_schema_with_conjoined_tenancy_and_tag_types() + { + ConfigureConjoinedStore(); + + await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + await theStore.Storage.Database.AssertDatabaseMatchesConfigurationAsync(); + } + + [Fact] + public async Task schema_is_idempotent_with_conjoined_tenancy_and_tag_types() + { + ConfigureConjoinedStore(); + + await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + + var store2 = SeparateStore(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Events.AddEventType(); + opts.Events.RegisterTagType("student"); + opts.Events.RegisterTagType("course"); + }); + + await store2.Storage.Database.AssertDatabaseMatchesConfigurationAsync(); + } + + [Fact] + public async Task tag_queries_are_isolated_by_tenant() + { + ConfigureConjoinedStore(); + + var studentId = new StudentId(Guid.NewGuid()); + var courseId = new CourseId(Guid.NewGuid()); + + // Tenant A appends an event + await using var sessionA = theStore.LightweightSession(TenantA); + var streamA = Guid.NewGuid(); + var enrolledA = sessionA.Events.BuildEvent(new StudentEnrolled("Alice", "Math")); + enrolledA.WithTag(studentId, courseId); + sessionA.Events.Append(streamA, enrolledA); + await sessionA.SaveChangesAsync(); + + // Tenant B appends same tags but different event + await using var sessionB = theStore.LightweightSession(TenantB); + var streamB = Guid.NewGuid(); + var enrolledB = sessionB.Events.BuildEvent(new StudentEnrolled("Bob", "Math")); + enrolledB.WithTag(studentId, courseId); + sessionB.Events.Append(streamB, enrolledB); + await sessionB.SaveChangesAsync(); + + // Query from Tenant A should only see Tenant A's event + await using var queryA = theStore.LightweightSession(TenantA); + var query = new EventTagQuery().Or(studentId); + var eventsA = await queryA.Events.QueryByTagsAsync(query); + eventsA.Count.ShouldBe(1); + eventsA[0].Data.ShouldBeOfType().StudentName.ShouldBe("Alice"); + + // Query from Tenant B should only see Tenant B's event + await using var queryB = theStore.LightweightSession(TenantB); + var eventsB = await queryB.Events.QueryByTagsAsync(query); + eventsB.Count.ShouldBe(1); + eventsB[0].Data.ShouldBeOfType().StudentName.ShouldBe("Bob"); + } + + [Fact] + public async Task events_exist_is_isolated_by_tenant() + { + ConfigureConjoinedStore(); + + var studentId = new StudentId(Guid.NewGuid()); + var courseId = new CourseId(Guid.NewGuid()); + + // Only Tenant A has the event + await using var sessionA = theStore.LightweightSession(TenantA); + var enrolled = sessionA.Events.BuildEvent(new StudentEnrolled("Alice", "Math")); + enrolled.WithTag(studentId, courseId); + sessionA.Events.Append(Guid.NewGuid(), enrolled); + await sessionA.SaveChangesAsync(); + + var query = new EventTagQuery().Or(studentId); + + // Tenant A should see it + await using var queryA = theStore.LightweightSession(TenantA); + (await queryA.Events.EventsExistAsync(query)).ShouldBeTrue(); + + // Tenant B should NOT see it + await using var queryB = theStore.LightweightSession(TenantB); + (await queryB.Events.EventsExistAsync(query)).ShouldBeFalse(); + } + + [Fact] + public async Task aggregate_by_tags_is_isolated_by_tenant() + { + ConfigureConjoinedStore(); + + var studentId = new StudentId(Guid.NewGuid()); + var courseId = new CourseId(Guid.NewGuid()); + + // Tenant A: Alice with assignment + await using var sessionA = theStore.LightweightSession(TenantA); + var streamA = Guid.NewGuid(); + var enrolledA = sessionA.Events.BuildEvent(new StudentEnrolled("Alice", "Math")); + enrolledA.WithTag(studentId, courseId); + var hwA = sessionA.Events.BuildEvent(new AssignmentSubmitted("HW-A", 95)); + hwA.WithTag(studentId, courseId); + sessionA.Events.Append(streamA, enrolledA, hwA); + await sessionA.SaveChangesAsync(); + + // Tenant B: Bob with different assignment + await using var sessionB = theStore.LightweightSession(TenantB); + var streamB = Guid.NewGuid(); + var enrolledB = sessionB.Events.BuildEvent(new StudentEnrolled("Bob", "Math")); + enrolledB.WithTag(studentId, courseId); + var hwB = sessionB.Events.BuildEvent(new AssignmentSubmitted("HW-B", 80)); + hwB.WithTag(studentId, courseId); + sessionB.Events.Append(streamB, enrolledB, hwB); + await sessionB.SaveChangesAsync(); + + var query = new EventTagQuery() + .Or(studentId) + .Or(courseId); + + // Aggregate for Tenant A + await using var queryA = theStore.LightweightSession(TenantA); + var aggA = await queryA.Events.AggregateByTagsAsync(query); + aggA.ShouldNotBeNull(); + aggA.StudentName.ShouldBe("Alice"); + aggA.Assignments.ShouldContain("HW-A"); + aggA.Assignments.ShouldNotContain("HW-B"); + + // Aggregate for Tenant B + await using var queryB = theStore.LightweightSession(TenantB); + var aggB = await queryB.Events.AggregateByTagsAsync(query); + aggB.ShouldNotBeNull(); + aggB.StudentName.ShouldBe("Bob"); + aggB.Assignments.ShouldContain("HW-B"); + aggB.Assignments.ShouldNotContain("HW-A"); + } + + [Fact] + public async Task fetch_for_writing_by_tags_is_isolated_by_tenant() + { + ConfigureConjoinedStore(); + + var studentId = new StudentId(Guid.NewGuid()); + var courseId = new CourseId(Guid.NewGuid()); + + // Tenant A seeds an event + await using var sessionA = theStore.LightweightSession(TenantA); + var streamA = Guid.NewGuid(); + var enrolled = sessionA.Events.BuildEvent(new StudentEnrolled("Alice", "Math")); + enrolled.WithTag(studentId, courseId); + sessionA.Events.Append(streamA, enrolled); + await sessionA.SaveChangesAsync(); + + // Tenant B fetches for writing with the same tag - should see empty + await using var sessionB = theStore.LightweightSession(TenantB); + var query = new EventTagQuery().Or(studentId); + var boundary = await sessionB.Events.FetchForWritingByTags(query); + boundary.Aggregate.ShouldBeNull(); + boundary.Events.Count.ShouldBe(0); + } + + [Fact] + public async Task dcb_concurrency_check_is_isolated_by_tenant() + { + ConfigureConjoinedStore(); + + var studentId = new StudentId(Guid.NewGuid()); + var courseId = new CourseId(Guid.NewGuid()); + + // Tenant A seeds + await using var sessionA1 = theStore.LightweightSession(TenantA); + var streamA = Guid.NewGuid(); + var enrolled = sessionA1.Events.BuildEvent(new StudentEnrolled("Alice", "Math")); + enrolled.WithTag(studentId, courseId); + sessionA1.Events.Append(streamA, enrolled); + await sessionA1.SaveChangesAsync(); + + // Tenant A fetches for writing + await using var sessionA2 = theStore.LightweightSession(TenantA); + var query = new EventTagQuery().Or(studentId); + var boundary = await sessionA2.Events.FetchForWritingByTags(query); + + // Tenant B appends with the same tag - should NOT cause a concurrency violation in Tenant A + await using var sessionB = theStore.LightweightSession(TenantB); + var streamB = Guid.NewGuid(); + var enrolledB = sessionB.Events.BuildEvent(new StudentEnrolled("Bob", "Math")); + enrolledB.WithTag(studentId, courseId); + sessionB.Events.Append(streamB, enrolledB); + await sessionB.SaveChangesAsync(); + + // Tenant A saves - should NOT throw because Tenant B's event is not visible + var hw = sessionA2.Events.BuildEvent(new AssignmentSubmitted("HW1", 90)); + hw.WithTag(studentId, courseId); + boundary.AppendOne(hw); + + await sessionA2.SaveChangesAsync(); // Should succeed without DcbConcurrencyException + } + + [Fact] + public async Task dcb_concurrency_detects_same_tenant_conflict() + { + ConfigureConjoinedStore(); + + var studentId = new StudentId(Guid.NewGuid()); + var courseId = new CourseId(Guid.NewGuid()); + + // Tenant A seeds + await using var sessionA1 = theStore.LightweightSession(TenantA); + var streamA = Guid.NewGuid(); + var enrolled = sessionA1.Events.BuildEvent(new StudentEnrolled("Alice", "Math")); + enrolled.WithTag(studentId, courseId); + sessionA1.Events.Append(streamA, enrolled); + await sessionA1.SaveChangesAsync(); + + // Tenant A, session 1: fetch for writing + await using var session1 = theStore.LightweightSession(TenantA); + var query = new EventTagQuery().Or(studentId); + var boundary = await session1.Events.FetchForWritingByTags(query); + + // Tenant A, session 2: appends conflicting event + await using var session2 = theStore.LightweightSession(TenantA); + var conflicting = session2.Events.BuildEvent(new AssignmentSubmitted("HW-conflict", 50)); + conflicting.WithTag(studentId, courseId); + session2.Events.Append(streamA, conflicting); + await session2.SaveChangesAsync(); + + // Session 1: save should throw + var hw = session1.Events.BuildEvent(new AssignmentSubmitted("HW1", 90)); + hw.WithTag(studentId, courseId); + boundary.AppendOne(hw); + + await Should.ThrowAsync(async () => + { + await session1.SaveChangesAsync(); + }); + } + + [Fact] + public async Task can_create_schema_with_conjoined_tenancy_archived_partitioning_and_tags() + { + StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Events.UseArchivedStreamPartitioning = true; + + opts.Events.RegisterTagType("student"); + opts.Events.RegisterTagType("course"); + }); + + await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + await theStore.Storage.Database.AssertDatabaseMatchesConfigurationAsync(); + } +} diff --git a/src/EventSourcingTests/FetchForWriting/conjoined_tenancy_natural_key_tests.cs b/src/EventSourcingTests/FetchForWriting/conjoined_tenancy_natural_key_tests.cs new file mode 100644 index 0000000000..86adeb0a9b --- /dev/null +++ b/src/EventSourcingTests/FetchForWriting/conjoined_tenancy_natural_key_tests.cs @@ -0,0 +1,248 @@ +using System; +using System.Threading.Tasks; +using JasperFx.Events; +using Marten; +using Marten.Events.Projections; +using Marten.Storage; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.FetchForWriting; + +[Collection("OneOffs")] +public class conjoined_tenancy_natural_key_tests: OneOffConfigurationsContext +{ + private const string TenantA = "tenant-a"; + private const string TenantB = "tenant-b"; + + [Fact] + public async Task can_create_schema_with_conjoined_tenancy_and_natural_keys() + { + StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Policies.AllDocumentsAreMultiTenanted(); + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + + await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + await theStore.Storage.Database.AssertDatabaseMatchesConfigurationAsync(); + } + + [Fact] + public async Task schema_is_idempotent_with_conjoined_tenancy_and_natural_keys() + { + StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Policies.AllDocumentsAreMultiTenanted(); + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + + await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + + var store2 = SeparateStore(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Policies.AllDocumentsAreMultiTenanted(); + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + + await store2.Storage.Database.AssertDatabaseMatchesConfigurationAsync(); + } + + [Fact] + public async Task can_create_schema_with_conjoined_tenancy_archived_partitioning_and_natural_keys() + { + StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Events.UseArchivedStreamPartitioning = true; + opts.Policies.AllDocumentsAreMultiTenanted(); + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + + await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + await theStore.Storage.Database.AssertDatabaseMatchesConfigurationAsync(); + } + + [Fact] + public async Task same_natural_key_in_different_tenants() + { + StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Policies.AllDocumentsAreMultiTenanted(); + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + + var orderNumber = new OrderNumber("ORD-001"); + + // Tenant A creates a stream with this natural key + await using var sessionA = theStore.LightweightSession(TenantA); + var streamA = Guid.NewGuid(); + sessionA.Events.StartStream(streamA, + new OrderCreated(orderNumber, "Alice")); + await sessionA.SaveChangesAsync(); + + // Tenant B creates a different stream with the SAME natural key + await using var sessionB = theStore.LightweightSession(TenantB); + var streamB = Guid.NewGuid(); + sessionB.Events.StartStream(streamB, + new OrderCreated(orderNumber, "Bob")); + await sessionB.SaveChangesAsync(); + + // Fetch by natural key from Tenant A + await using var queryA = theStore.LightweightSession(TenantA); + var aggA = await queryA.Events.FetchForWriting(orderNumber); + aggA.Aggregate.ShouldNotBeNull(); + aggA.Aggregate.CustomerName.ShouldBe("Alice"); + + // Fetch by natural key from Tenant B + await using var queryB = theStore.LightweightSession(TenantB); + var aggB = await queryB.Events.FetchForWriting(orderNumber); + aggB.Aggregate.ShouldNotBeNull(); + aggB.Aggregate.CustomerName.ShouldBe("Bob"); + } + + [Fact] + public async Task fetch_latest_by_natural_key_is_tenant_isolated() + { + StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Policies.AllDocumentsAreMultiTenanted(); + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + + var orderNumber = new OrderNumber("ORD-002"); + + // Tenant A creates order + await using var sessionA = theStore.LightweightSession(TenantA); + sessionA.Events.StartStream(Guid.NewGuid(), + new OrderCreated(orderNumber, "Alice"), + new OrderItemAdded("Widget", 10.00m)); + await sessionA.SaveChangesAsync(); + + // Tenant B creates order with same number + await using var sessionB = theStore.LightweightSession(TenantB); + sessionB.Events.StartStream(Guid.NewGuid(), + new OrderCreated(orderNumber, "Bob"), + new OrderItemAdded("Gadget", 20.00m)); + await sessionB.SaveChangesAsync(); + + // FetchLatest from Tenant A + await using var queryA = theStore.LightweightSession(TenantA); + var aggA = await queryA.Events.FetchLatest(orderNumber); + aggA.ShouldNotBeNull(); + aggA.CustomerName.ShouldBe("Alice"); + aggA.TotalAmount.ShouldBe(10.00m); + + // FetchLatest from Tenant B + await using var queryB = theStore.LightweightSession(TenantB); + var aggB = await queryB.Events.FetchLatest(orderNumber); + aggB.ShouldNotBeNull(); + aggB.CustomerName.ShouldBe("Bob"); + aggB.TotalAmount.ShouldBe(20.00m); + } + + [Fact] + public async Task natural_key_returns_null_for_nonexistent_key_in_tenant() + { + StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Policies.AllDocumentsAreMultiTenanted(); + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + + var orderNumber = new OrderNumber("ORD-003"); + + // Tenant A has the order + await using var sessionA = theStore.LightweightSession(TenantA); + sessionA.Events.StartStream(Guid.NewGuid(), + new OrderCreated(orderNumber, "Alice")); + await sessionA.SaveChangesAsync(); + + // Tenant B should NOT find it + await using var queryB = theStore.LightweightSession(TenantB); + var aggB = await queryB.Events.FetchLatest(orderNumber); + aggB.ShouldBeNull(); + } + + [Fact] + public async Task fetch_for_writing_appends_to_correct_tenant_stream() + { + StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Policies.AllDocumentsAreMultiTenanted(); + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + + var orderNumber = new OrderNumber("ORD-004"); + + // Create streams in both tenants + await using var sessionA = theStore.LightweightSession(TenantA); + sessionA.Events.StartStream(Guid.NewGuid(), + new OrderCreated(orderNumber, "Alice")); + await sessionA.SaveChangesAsync(); + + await using var sessionB = theStore.LightweightSession(TenantB); + sessionB.Events.StartStream(Guid.NewGuid(), + new OrderCreated(orderNumber, "Bob")); + await sessionB.SaveChangesAsync(); + + // Fetch for writing in Tenant A and append + await using var writeA = theStore.LightweightSession(TenantA); + var streamA = await writeA.Events.FetchForWriting(orderNumber); + streamA.AppendOne(new OrderItemAdded("Widget", 10.00m)); + await writeA.SaveChangesAsync(); + + // Verify Tenant A has the new event, Tenant B doesn't + await using var verifyA = theStore.LightweightSession(TenantA); + var aggA = await verifyA.Events.FetchLatest(orderNumber); + aggA.ShouldNotBeNull(); + aggA.TotalAmount.ShouldBe(10.00m); + + await using var verifyB = theStore.LightweightSession(TenantB); + var aggB = await verifyB.Events.FetchLatest(orderNumber); + aggB.ShouldNotBeNull(); + aggB.TotalAmount.ShouldBe(0m); + } + + [Fact] + public async Task live_aggregation_with_conjoined_tenancy_and_natural_key() + { + StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Policies.AllDocumentsAreMultiTenanted(); + opts.Projections.LiveStreamAggregation(); + }); + + var orderNumber = new OrderNumber("ORD-LIVE-001"); + + // Create stream in Tenant A + await using var sessionA = theStore.LightweightSession(TenantA); + sessionA.Events.StartStream(Guid.NewGuid(), + new OrderCreated(orderNumber, "Alice"), + new OrderItemAdded("Widget", 15.00m)); + await sessionA.SaveChangesAsync(); + + // Create stream in Tenant B with same key + await using var sessionB = theStore.LightweightSession(TenantB); + sessionB.Events.StartStream(Guid.NewGuid(), + new OrderCreated(orderNumber, "Bob"), + new OrderItemAdded("Gadget", 25.00m)); + await sessionB.SaveChangesAsync(); + + // Live aggregation via FetchForWriting from Tenant A + await using var queryA = theStore.LightweightSession(TenantA); + var streamA = await queryA.Events.FetchForWriting(orderNumber); + streamA.Aggregate.ShouldNotBeNull(); + streamA.Aggregate.CustomerName.ShouldBe("Alice"); + streamA.Aggregate.TotalAmount.ShouldBe(15.00m); + } +} diff --git a/src/Marten/Events/Dcb/AssertDcbConsistency.cs b/src/Marten/Events/Dcb/AssertDcbConsistency.cs index 23336258c7..016ad043e3 100644 --- a/src/Marten/Events/Dcb/AssertDcbConsistency.cs +++ b/src/Marten/Events/Dcb/AssertDcbConsistency.cs @@ -8,6 +8,7 @@ using JasperFx.Events.Tags; using Marten.Internal; using Marten.Internal.Operations; +using Marten.Storage; using Weasel.Postgresql; namespace Marten.Events.Dcb; @@ -110,7 +111,16 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) builder.Append(")"); } - builder.Append(") limit 1)"); + builder.Append(")"); + + // Filter by tenant_id for conjoined tenancy + if (_events.TenancyStyle == TenancyStyle.Conjoined) + { + builder.Append(" and t0.tenant_id = "); + builder.AppendParameter(session.TenantId); + } + + builder.Append(" limit 1)"); } public Type DocumentType => typeof(IEvent); diff --git a/src/Marten/Events/Dcb/EventsExistByTagsHandler.cs b/src/Marten/Events/Dcb/EventsExistByTagsHandler.cs index 6d1f2cb922..1f3c06c67c 100644 --- a/src/Marten/Events/Dcb/EventsExistByTagsHandler.cs +++ b/src/Marten/Events/Dcb/EventsExistByTagsHandler.cs @@ -10,6 +10,7 @@ using Marten.Internal; using Marten.Internal.Sessions; using Marten.Linq.QueryHandlers; +using Marten.Storage; using Weasel.Postgresql; namespace Marten.Events.Dcb; @@ -109,7 +110,16 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) builder.Append(")"); } - builder.Append(") limit 1)"); + builder.Append(")"); + + // Filter by tenant_id for conjoined tenancy + if (_store.Events.TenancyStyle == TenancyStyle.Conjoined) + { + builder.Append(" and t0.tenant_id = "); + builder.AppendParameter(session.TenantId); + } + + builder.Append(" limit 1)"); } public bool Handle(DbDataReader reader, IMartenSession session) diff --git a/src/Marten/Events/Dcb/FetchForWritingByTagsHandler.cs b/src/Marten/Events/Dcb/FetchForWritingByTagsHandler.cs index 063cf6c28e..f80849b5ac 100644 --- a/src/Marten/Events/Dcb/FetchForWritingByTagsHandler.cs +++ b/src/Marten/Events/Dcb/FetchForWritingByTagsHandler.cs @@ -12,6 +12,7 @@ using Marten.Internal; using Marten.Internal.Sessions; using Marten.Linq.QueryHandlers; +using Marten.Storage; using NpgsqlTypes; using Weasel.Postgresql; @@ -94,6 +95,13 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) builder.Append(")"); + // Filter by tenant_id for conjoined tenancy + if (_store.Events.TenancyStyle == TenancyStyle.Conjoined) + { + builder.Append(" and e.tenant_id = "); + builder.AppendParameter(session.TenantId); + } + // If the aggregator has event type filtering, apply it to limit the returned events var eventTypeNames = resolveAggregatorEventTypeNames(); if (eventTypeNames != null) diff --git a/src/Marten/Events/EventStore.Dcb.cs b/src/Marten/Events/EventStore.Dcb.cs index 089f6d1693..87b1c21d6a 100644 --- a/src/Marten/Events/EventStore.Dcb.cs +++ b/src/Marten/Events/EventStore.Dcb.cs @@ -11,6 +11,7 @@ using JasperFx.Events.Tags; using Marten.Events.Dcb; using Marten.Internal.Sessions; +using Marten.Storage; using Npgsql; namespace Marten.Events; @@ -185,6 +186,14 @@ private static async Task> ReadEventsFromReaderAsync(DbDat sb.Append(')'); + // Filter by tenant_id for conjoined tenancy + if (_store.Events.TenancyStyle == TenancyStyle.Conjoined) + { + sb.Append(" and e.tenant_id = @p"); + sb.Append(paramValues.Count); + paramValues.Add(_session.TenantId); + } + // If the aggregator has event type filtering, apply it to limit the returned events if (aggregatorEventTypeNames is { Length: > 0 }) { diff --git a/src/Marten/Events/EventStore.cs b/src/Marten/Events/EventStore.cs index 13478b9caf..f8bb65299e 100644 --- a/src/Marten/Events/EventStore.cs +++ b/src/Marten/Events/EventStore.cs @@ -64,7 +64,8 @@ public void AssignTagWhere(Expression> expression, object tag _ => CompoundWhereFragment.And(holder.Fragments) }; - var op = new AssignTagWhereOperation(schema, registration, value, whereFragment); + var isConjoined = _store.Events.TenancyStyle == Storage.TenancyStyle.Conjoined; + var op = new AssignTagWhereOperation(schema, registration, value, whereFragment, isConjoined); _session.QueueOperation(op); } diff --git a/src/Marten/Events/Operations/AssignTagWhereOperation.cs b/src/Marten/Events/Operations/AssignTagWhereOperation.cs index aca9fd9b2a..47da53c6aa 100644 --- a/src/Marten/Events/Operations/AssignTagWhereOperation.cs +++ b/src/Marten/Events/Operations/AssignTagWhereOperation.cs @@ -24,14 +24,16 @@ internal class AssignTagWhereOperation: IStorageOperation private readonly ITagTypeRegistration _registration; private readonly object _value; private readonly ISqlFragment _whereFragment; + private readonly bool _isConjoined; public AssignTagWhereOperation(string schemaName, ITagTypeRegistration registration, object value, - ISqlFragment whereFragment) + ISqlFragment whereFragment, bool isConjoined = false) { _schemaName = schemaName; _registration = registration; _value = value; _whereFragment = whereFragment; + _isConjoined = isConjoined; } public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) @@ -40,9 +42,22 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) builder.Append(_schemaName); builder.Append(".mt_event_tag_"); builder.Append(_registration.TableSuffix); - builder.Append(" (value, seq_id) select "); - builder.AppendParameter(_value); - builder.Append(", d.seq_id from "); + + if (_isConjoined) + { + builder.Append(" (value, tenant_id, seq_id) select "); + builder.AppendParameter(_value); + builder.Append(", "); + builder.AppendParameter(session.TenantId); + builder.Append(", d.seq_id from "); + } + else + { + builder.Append(" (value, seq_id) select "); + builder.AppendParameter(_value); + builder.Append(", d.seq_id from "); + } + builder.Append(_schemaName); builder.Append(".mt_events as d where "); _whereFragment.Apply(builder); diff --git a/src/Marten/Events/Operations/EventTagOperations.cs b/src/Marten/Events/Operations/EventTagOperations.cs index 45741e21a1..96f96a7285 100644 --- a/src/Marten/Events/Operations/EventTagOperations.cs +++ b/src/Marten/Events/Operations/EventTagOperations.cs @@ -1,5 +1,6 @@ using JasperFx.Events; using Marten.Internal.Sessions; +using Marten.Storage; namespace Marten.Events.Operations; @@ -13,6 +14,7 @@ public static void QueueTagOperations(EventGraph eventGraph, DocumentSessionBase if (eventGraph.TagTypes.Count == 0) return; var schema = eventGraph.DatabaseSchemaName; + var isConjoined = eventGraph.TenancyStyle == TenancyStyle.Conjoined; foreach (var @event in stream.Events) { @@ -24,7 +26,7 @@ public static void QueueTagOperations(EventGraph eventGraph, DocumentSessionBase var registration = eventGraph.FindTagType(tag.TagType); if (registration == null) continue; - session.QueueOperation(new InsertEventTagOperation(schema, registration, @event.Sequence, tag.Value)); + session.QueueOperation(new InsertEventTagOperation(schema, registration, @event.Sequence, tag.Value, isConjoined)); } } } @@ -37,6 +39,7 @@ public static void QueueTagOperationsByEventId(EventGraph eventGraph, DocumentSe if (eventGraph.TagTypes.Count == 0) return; var schema = eventGraph.DatabaseSchemaName; + var isConjoined = eventGraph.TenancyStyle == TenancyStyle.Conjoined; foreach (var @event in stream.Events) { @@ -48,7 +51,7 @@ public static void QueueTagOperationsByEventId(EventGraph eventGraph, DocumentSe var registration = eventGraph.FindTagType(tag.TagType); if (registration == null) continue; - session.QueueOperation(new InsertEventTagByEventIdOperation(schema, registration, @event.Id, tag.Value)); + session.QueueOperation(new InsertEventTagByEventIdOperation(schema, registration, @event.Id, tag.Value, isConjoined)); } } } diff --git a/src/Marten/Events/Operations/InsertEventTagByEventIdOperation.cs b/src/Marten/Events/Operations/InsertEventTagByEventIdOperation.cs index 279f2c0831..da49bac4b6 100644 --- a/src/Marten/Events/Operations/InsertEventTagByEventIdOperation.cs +++ b/src/Marten/Events/Operations/InsertEventTagByEventIdOperation.cs @@ -21,13 +21,15 @@ internal class InsertEventTagByEventIdOperation: IStorageOperation private readonly ITagTypeRegistration _registration; private readonly Guid _eventId; private readonly object _value; + private readonly bool _isConjoined; - public InsertEventTagByEventIdOperation(string schemaName, ITagTypeRegistration registration, Guid eventId, object tagValue) + public InsertEventTagByEventIdOperation(string schemaName, ITagTypeRegistration registration, Guid eventId, object tagValue, bool isConjoined = false) { _schemaName = schemaName; _registration = registration; _eventId = eventId; _value = registration.ExtractValue(tagValue); + _isConjoined = isConjoined; } public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) @@ -36,9 +38,22 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) builder.Append(_schemaName); builder.Append(".mt_event_tag_"); builder.Append(_registration.TableSuffix); - builder.Append(" (value, seq_id) select "); - builder.AppendParameter(_value); - builder.Append(", seq_id from "); + + if (_isConjoined) + { + builder.Append(" (value, tenant_id, seq_id) select "); + builder.AppendParameter(_value); + builder.Append(", "); + builder.AppendParameter(session.TenantId); + builder.Append(", seq_id from "); + } + else + { + builder.Append(" (value, seq_id) select "); + builder.AppendParameter(_value); + builder.Append(", seq_id from "); + } + builder.Append(_schemaName); builder.Append(".mt_events where id = "); builder.AppendParameter(_eventId); diff --git a/src/Marten/Events/Operations/InsertEventTagOperation.cs b/src/Marten/Events/Operations/InsertEventTagOperation.cs index 6fa8622675..de08730de6 100644 --- a/src/Marten/Events/Operations/InsertEventTagOperation.cs +++ b/src/Marten/Events/Operations/InsertEventTagOperation.cs @@ -17,13 +17,15 @@ internal class InsertEventTagOperation: IStorageOperation private readonly ITagTypeRegistration _registration; private readonly long _seqId; private readonly object _value; + private readonly bool _isConjoined; - public InsertEventTagOperation(string schemaName, ITagTypeRegistration registration, long seqId, object tagValue) + public InsertEventTagOperation(string schemaName, ITagTypeRegistration registration, long seqId, object tagValue, bool isConjoined = false) { _schemaName = schemaName; _registration = registration; _seqId = seqId; _value = registration.ExtractValue(tagValue); + _isConjoined = isConjoined; } public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) @@ -32,10 +34,24 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) builder.Append(_schemaName); builder.Append(".mt_event_tag_"); builder.Append(_registration.TableSuffix); - builder.Append(" (value, seq_id) values ("); - builder.AppendParameter(_value); - builder.Append(", "); - builder.AppendParameter(_seqId); + + if (_isConjoined) + { + builder.Append(" (value, tenant_id, seq_id) values ("); + builder.AppendParameter(_value); + builder.Append(", "); + builder.AppendParameter(session.TenantId); + builder.Append(", "); + builder.AppendParameter(_seqId); + } + else + { + builder.Append(" (value, seq_id) values ("); + builder.AppendParameter(_value); + builder.Append(", "); + builder.AppendParameter(_seqId); + } + builder.Append(") on conflict do nothing"); } diff --git a/src/Marten/Events/Projections/NaturalKeyProjection.cs b/src/Marten/Events/Projections/NaturalKeyProjection.cs index 98f8392bc6..57c4e70e53 100644 --- a/src/Marten/Events/Projections/NaturalKeyProjection.cs +++ b/src/Marten/Events/Projections/NaturalKeyProjection.cs @@ -69,9 +69,9 @@ private void queueUpsertSql(IDocumentOperations operations, StreamAction stream, { var sql = $"INSERT INTO {_tableName} (natural_key_value, {streamCol}, tenant_id, is_archived) " + $"VALUES (?, ?, ?, false) " + - $"ON CONFLICT (natural_key_value) DO UPDATE SET {streamCol} = ?, tenant_id = ?, is_archived = false"; + $"ON CONFLICT (natural_key_value, tenant_id) DO UPDATE SET {streamCol} = ?, is_archived = false"; operations.QueueSqlCommand(sql, innerValue, streamId, stream.TenantId, - streamId, stream.TenantId); + streamId); } else { diff --git a/src/Marten/Events/Schema/EventTagTable.cs b/src/Marten/Events/Schema/EventTagTable.cs index 07630e37ee..88fc49ab8d 100644 --- a/src/Marten/Events/Schema/EventTagTable.cs +++ b/src/Marten/Events/Schema/EventTagTable.cs @@ -1,6 +1,8 @@ using System; using JasperFx.Events.Tags; using Marten.Events.Archiving; +using Marten.Storage; +using Marten.Storage.Metadata; using Weasel.Postgresql; using Weasel.Postgresql.Tables; @@ -12,10 +14,17 @@ public EventTagTable(EventGraph events, ITagTypeRegistration registration) : base(new PostgresqlObjectName(events.DatabaseSchemaName, $"mt_event_tag_{registration.TableSuffix}")) { var pgType = PostgresqlTypeFor(registration.SimpleType); + var isConjoined = events.TenancyStyle == TenancyStyle.Conjoined; // Composite primary key with value first for query performance AddColumn("value", pgType).NotNull().AsPrimaryKey(); + // Add tenant_id to PK for conjoined tenancy to enable tenant-scoped tag queries + if (isConjoined) + { + AddColumn().AsPrimaryKey(); + } + if (events.UseArchivedStreamPartitioning) { // When mt_events is partitioned by is_archived, its PK includes is_archived. diff --git a/src/Marten/Events/Schema/NaturalKeyTable.cs b/src/Marten/Events/Schema/NaturalKeyTable.cs index 44b32875e2..d47a925827 100644 --- a/src/Marten/Events/Schema/NaturalKeyTable.cs +++ b/src/Marten/Events/Schema/NaturalKeyTable.cs @@ -26,10 +26,11 @@ public NaturalKeyTable(EventGraph events, NaturalKeyDefinition naturalKey) AddColumn(streamCol, streamColType).NotNull(); - // Tenancy support - if (events.TenancyStyle == TenancyStyle.Conjoined) + // Tenancy support - tenant_id is part of PK so same natural key can exist in different tenants + var isConjoined = events.TenancyStyle == TenancyStyle.Conjoined; + if (isConjoined) { - AddColumn(); + AddColumn().AsPrimaryKey(); } // Archive support @@ -38,11 +39,36 @@ public NaturalKeyTable(EventGraph events, NaturalKeyDefinition naturalKey) { archiving.PartitionByListValues().AddPartition("archived", true); - // FK to mt_streams must include is_archived when streams table is partitioned - ForeignKeys.Add(new ForeignKey($"fk_{Identifier.Name}_stream_is_archived") + if (isConjoined) { - ColumnNames = new[] { streamCol, "is_archived" }, - LinkedNames = new[] { "id", "is_archived" }, + // FK must include tenant_id and is_archived to match mt_streams composite PK + ForeignKeys.Add(new ForeignKey($"fk_{Identifier.Name}_stream_tenant_is_archived") + { + ColumnNames = new[] { streamCol, TenantIdColumn.Name, "is_archived" }, + LinkedNames = new[] { "id", TenantIdColumn.Name, "is_archived" }, + LinkedTable = new PostgresqlObjectName(events.DatabaseSchemaName, StreamsTable.TableName), + OnDelete = CascadeAction.Cascade + }); + } + else + { + // FK to mt_streams must include is_archived when streams table is partitioned + ForeignKeys.Add(new ForeignKey($"fk_{Identifier.Name}_stream_is_archived") + { + ColumnNames = new[] { streamCol, "is_archived" }, + LinkedNames = new[] { "id", "is_archived" }, + LinkedTable = new PostgresqlObjectName(events.DatabaseSchemaName, StreamsTable.TableName), + OnDelete = CascadeAction.Cascade + }); + } + } + else if (isConjoined) + { + // FK must include tenant_id to match mt_streams composite PK (tenant_id, id) + ForeignKeys.Add(new ForeignKey($"fk_{Identifier.Name}_stream_tenant") + { + ColumnNames = new[] { streamCol, TenantIdColumn.Name }, + LinkedNames = new[] { "id", TenantIdColumn.Name }, LinkedTable = new PostgresqlObjectName(events.DatabaseSchemaName, StreamsTable.TableName), OnDelete = CascadeAction.Cascade }); diff --git a/src/Marten/Events/Schema/QuickAppendEventFunction.cs b/src/Marten/Events/Schema/QuickAppendEventFunction.cs index c1e97efd7f..b098708816 100644 --- a/src/Marten/Events/Schema/QuickAppendEventFunction.cs +++ b/src/Marten/Events/Schema/QuickAppendEventFunction.cs @@ -81,10 +81,20 @@ public override void WriteCreateStatement(Migrator migrator, TextWriter writer) var paramName = $"tag_{tagType.TableSuffix}_values"; tagParameters += $", {paramName} varchar[]"; - tagInserts += $@" + if (tenancyStyle == TenancyStyle.Conjoined) + { + tagInserts += $@" + IF {paramName}[index] IS NOT NULL THEN + INSERT INTO {databaseSchema}.mt_event_tag_{tagType.TableSuffix} (value, tenant_id, seq_id) VALUES ({paramName}[index]::{PostgresqlTypeFor(tagType.SimpleType)}, tenantid, seq) ON CONFLICT DO NOTHING; + END IF;"; + } + else + { + tagInserts += $@" IF {paramName}[index] IS NOT NULL THEN INSERT INTO {databaseSchema}.mt_event_tag_{tagType.TableSuffix} (value, seq_id) VALUES ({paramName}[index]::{PostgresqlTypeFor(tagType.SimpleType)}, seq) ON CONFLICT DO NOTHING; END IF;"; + } } writer.WriteLine($@"