Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using JasperFx.Events.Tags;
using Marten;
using Marten.Events.Schema;
using Marten.Testing.Harness;
using Shouldly;
using Weasel.Postgresql;
using Weasel.Postgresql.Tables;
using Xunit;

namespace CoreTests.Bugs;

public class Bug_4190_archived_partitioning_with_strong_typed_id : BugIntegrationContext
{
// Strongly-typed ID similar to Vogen-generated value types
public record EntityId(Guid Value);

[Fact]
public void event_tag_table_fk_correction_should_not_throw_with_archived_partitioning()
{
StoreOptions(opts =>
{
opts.Events.UseArchivedStreamPartitioning = true;
opts.Events.RegisterTagType<EntityId>("entity");
});

var events = theStore.Options.EventGraph;
var schemaObjects = ((Weasel.Core.Migrations.IFeatureSchema)events).Objects;

var tagTable = schemaObjects.OfType<Table>()
.FirstOrDefault(t => t.Identifier.Name.Contains("mt_event_tag"));
tagTable.ShouldNotBeNull();

var eventsTable = schemaObjects.OfType<Table>()
.FirstOrDefault(t => t.Identifier.Name == "mt_events");
eventsTable.ShouldNotBeNull();

// Verify the events table PK includes is_archived when partitioned
eventsTable.PrimaryKeyColumns.ShouldContain("is_archived",
"mt_events PK should include is_archived when UseArchivedStreamPartitioning is enabled");

// The tag table should have is_archived column to satisfy the FK correction
var isArchivedCol = tagTable.Columns.FirstOrDefault(c => c.Name == "is_archived");
isArchivedCol.ShouldNotBeNull(
"EventTagTable should have is_archived column when UseArchivedStreamPartitioning is enabled");

// Explicitly test PostProcess doesn't throw
Should.NotThrow(() => tagTable.PostProcess(schemaObjects));
}

[Fact]
public async Task can_create_schema_with_archived_partitioning_and_tag_type()
{
StoreOptions(opts =>
{
opts.Events.UseArchivedStreamPartitioning = true;
opts.Events.RegisterTagType<EntityId>("entity");
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();
await theStore.Storage.Database.AssertDatabaseMatchesConfigurationAsync();
}

[Fact]
public async Task schema_is_idempotent_with_archived_partitioning_and_tag_type()
{
StoreOptions(opts =>
{
opts.Events.UseArchivedStreamPartitioning = true;
opts.Events.RegisterTagType<EntityId>("entity");
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

var store2 = SeparateStore(opts =>
{
opts.Events.UseArchivedStreamPartitioning = true;
opts.Events.RegisterTagType<EntityId>("entity");
});

await store2.Storage.Database.AssertDatabaseMatchesConfigurationAsync();
}

[Fact]
public async Task can_create_schema_with_archived_partitioning_conjoined_tenancy_and_tag_type()
{
StoreOptions(opts =>
{
opts.Events.UseArchivedStreamPartitioning = true;
opts.Events.TenancyStyle = Marten.Storage.TenancyStyle.Conjoined;
opts.Events.RegisterTagType<EntityId>("entity");
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();
await theStore.Storage.Database.AssertDatabaseMatchesConfigurationAsync();
}
}
26 changes: 24 additions & 2 deletions src/Marten/Events/Schema/EventTagTable.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using JasperFx.Events.Tags;
using Marten.Events.Archiving;
using Weasel.Postgresql;
using Weasel.Postgresql.Tables;

Expand All @@ -14,8 +15,29 @@ public EventTagTable(EventGraph events, ITagTypeRegistration registration)

// Composite primary key with value first for query performance
AddColumn("value", pgType).NotNull().AsPrimaryKey();
AddColumn("seq_id", "bigint").NotNull().AsPrimaryKey()
.ForeignKeyTo(new PostgresqlObjectName(events.DatabaseSchemaName, "mt_events"), "seq_id");

if (events.UseArchivedStreamPartitioning)
{
// When mt_events is partitioned by is_archived, its PK includes is_archived.
// The FK must reference all PK columns, so we need is_archived in this table too.
AddColumn("seq_id", "bigint").NotNull().AsPrimaryKey();

var archiving = AddColumn<IsArchivedColumn>();
archiving.AsPrimaryKey();
archiving.PartitionByListValues().AddPartition("archived", true);

ForeignKeys.Add(new ForeignKey($"fkey_mt_event_tag_{registration.TableSuffix}_seq_id_is_archived")
{
ColumnNames = new[] { "seq_id", "is_archived" },
LinkedNames = new[] { "seq_id", "is_archived" },
LinkedTable = new PostgresqlObjectName(events.DatabaseSchemaName, "mt_events")
});
}
else
{
AddColumn("seq_id", "bigint").NotNull().AsPrimaryKey()
.ForeignKeyTo(new PostgresqlObjectName(events.DatabaseSchemaName, "mt_events"), "seq_id");
}

PrimaryKeyName = $"pk_mt_event_tag_{registration.TableSuffix}";
}
Expand Down
29 changes: 20 additions & 9 deletions src/Marten/Events/Schema/NaturalKeyTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ public NaturalKeyTable(EventGraph events, NaturalKeyDefinition naturalKey)

AddColumn(streamCol, streamColType).NotNull();

// FK to mt_streams with CASCADE delete
ForeignKeys.Add(new ForeignKey($"fk_{Identifier.Name}_stream")
{
ColumnNames = new[] { streamCol },
LinkedNames = new[] { "id" },
LinkedTable = new PostgresqlObjectName(events.DatabaseSchemaName, StreamsTable.TableName),
OnDelete = CascadeAction.Cascade
});

// Tenancy support
if (events.TenancyStyle == TenancyStyle.Conjoined)
{
Expand All @@ -46,6 +37,26 @@ public NaturalKeyTable(EventGraph events, NaturalKeyDefinition naturalKey)
if (events.UseArchivedStreamPartitioning)
{
archiving.PartitionByListValues().AddPartition("archived", true);

// FK to mt_streams must include is_archived when streams table is partitioned
ForeignKeys.Add(new ForeignKey($"fk_{Identifier.Name}_stream_is_archived")
{
ColumnNames = new[] { streamCol, "is_archived" },
LinkedNames = new[] { "id", "is_archived" },
LinkedTable = new PostgresqlObjectName(events.DatabaseSchemaName, StreamsTable.TableName),
OnDelete = CascadeAction.Cascade
});
}
else
{
// FK to mt_streams with CASCADE delete
ForeignKeys.Add(new ForeignKey($"fk_{Identifier.Name}_stream")
{
ColumnNames = new[] { streamCol },
LinkedNames = new[] { "id" },
LinkedTable = new PostgresqlObjectName(events.DatabaseSchemaName, StreamsTable.TableName),
OnDelete = CascadeAction.Cascade
});
}

// Index on stream id/key for reverse lookups
Expand Down
Loading