diff --git a/src/DocumentDbTests/Bugs/Bug_4219_bulk_insert_composite_pk.cs b/src/DocumentDbTests/Bugs/Bug_4219_bulk_insert_composite_pk.cs new file mode 100644 index 0000000000..5e931b5959 --- /dev/null +++ b/src/DocumentDbTests/Bugs/Bug_4219_bulk_insert_composite_pk.cs @@ -0,0 +1,116 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Marten; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace DocumentDbTests.Bugs; + +public class Bug_4219_bulk_insert_composite_pk : OneOffConfigurationsContext +{ + [Fact] + public async Task bulk_insert_ignore_duplicates_with_range_partitioned_table() + { + StoreOptions(opts => + { + opts.Schema.For() + .Identity(x => x.Id) + .PartitionOn(x => x.Date, x => + { + x.ByRange() + .AddRange("q1_2026", new DateOnly(2026, 1, 1), new DateOnly(2026, 4, 1)) + .AddRange("q2_2026", new DateOnly(2026, 4, 1), new DateOnly(2026, 7, 1)); + }); + }); + + await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + + // Insert initial data + var initial = new[] + { + new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 1, 15), Value = 100 }, + new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 2, 15), Value = 200 }, + }; + + await theStore.BulkInsertDocumentsAsync(initial); + + // Now bulk insert with IgnoreDuplicates: + // - first two have same composite PKs as existing (should be skipped) + // - third has same ID but different date (new composite PK — should be inserted) + var second = new[] + { + new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 1, 15), Value = 999 }, + new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 2, 15), Value = 888 }, + new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 3, 15), Value = 300 }, + }; + + await theStore.BulkInsertDocumentsAsync(second, BulkInsertMode.IgnoreDuplicates); + + await using var session = theStore.QuerySession(); + var results = await session.Query() + .Where(x => x.Id == "sensor-1") + .OrderBy(x => x.Date) + .ToListAsync(); + + // Should have 3 rows: original Jan + Feb values preserved, new March added + results.Count.ShouldBe(3); + results[0].Value.ShouldBe(100); // original preserved + results[1].Value.ShouldBe(200); // original preserved + results[2].Value.ShouldBe(300); // new row + } + + [Fact] + public async Task bulk_insert_overwrite_with_range_partitioned_table() + { + StoreOptions(opts => + { + opts.Schema.For() + .Identity(x => x.Id) + .PartitionOn(x => x.Date, x => + { + x.ByRange() + .AddRange("q1_2026", new DateOnly(2026, 1, 1), new DateOnly(2026, 4, 1)) + .AddRange("q2_2026", new DateOnly(2026, 4, 1), new DateOnly(2026, 7, 1)); + }); + }); + + await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + + // Insert initial data + var initial = new[] + { + new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 1, 15), Value = 100 }, + new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 2, 15), Value = 200 }, + }; + + await theStore.BulkInsertDocumentsAsync(initial); + + // Overwrite with updated values — same composite PKs, different values + var updated = new[] + { + new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 1, 15), Value = 999 }, + new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 2, 15), Value = 888 }, + }; + + await theStore.BulkInsertDocumentsAsync(updated, BulkInsertMode.OverwriteExisting); + + await using var session = theStore.QuerySession(); + var results = await session.Query() + .Where(x => x.Id == "sensor-1") + .OrderBy(x => x.Date) + .ToListAsync(); + + results.Count.ShouldBe(2); + results[0].Value.ShouldBe(999); + results[1].Value.ShouldBe(888); + } +} + +public class DailySnapshot +{ + public string Id { get; set; } = default!; + public DateOnly Date { get; set; } + public decimal Value { get; set; } +} diff --git a/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs b/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs index c7982f0ac9..101b435a2a 100644 --- a/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs +++ b/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs @@ -110,7 +110,6 @@ private List orderArgumentsForBulkWriting(UpsertFunction upsertF public string CopyNewDocumentsFromTempTable() { var table = _mapping.Schema.Table; - var isMultiTenanted = _mapping.TenancyStyle == TenancyStyle.Conjoined; var storageTable = table.Identifier.QualifiedName; var columns = table.Columns.Where(x => x.Name != SchemaConstants.LastModifiedColumn) @@ -118,29 +117,29 @@ public string CopyNewDocumentsFromTempTable() var selectColumns = table.Columns.Where(x => x.Name != SchemaConstants.LastModifiedColumn) .Select(x => $"{_tempTable}.\\\"{x.Name}\\\"").Join(", "); - var joinExpression = isMultiTenanted - ? $"{_tempTable}.id = {storageTable}.id and {_tempTable}.tenant_id = {storageTable}.tenant_id" - : $"{_tempTable}.id = {storageTable}.id"; + var joinExpression = buildPrimaryKeyJoinExpression(table, _tempTable, storageTable); + + // Use the first PK column for the NULL check (any PK column works since they're all NOT NULL) + var firstPkColumn = table.PrimaryKeyColumns.First(); return $"insert into {storageTable} ({columns}, {SchemaConstants.LastModifiedColumn}) " + $"(select {selectColumns}, transaction_timestamp() " + $"from {_tempTable} left join {storageTable} on {joinExpression} " + - $"where {storageTable}.id is null)"; + $"where {storageTable}.{firstPkColumn} is null)"; } public string OverwriteDuplicatesFromTempTable() { var table = _mapping.Schema.Table; - var isMultiTenanted = _mapping.TenancyStyle == TenancyStyle.Conjoined; var storageTable = table.Identifier.QualifiedName; - var updates = table.Columns.Where(x => x.Name != "id" && x.Name != SchemaConstants.LastModifiedColumn) + var pkColumns = table.PrimaryKeyColumns.ToArray(); + var updates = table.Columns + .Where(x => !pkColumns.Contains(x.Name) && x.Name != SchemaConstants.LastModifiedColumn) .Select(x => $"{x.Name} = source.{x.Name}").Join(", "); - var joinExpression = isMultiTenanted - ? "source.id = target.id and source.tenant_id = target.tenant_id" - : "source.id = target.id"; + var joinExpression = buildPrimaryKeyJoinExpression(table, "source", "target"); return $"update {storageTable} target SET {updates}, {SchemaConstants.LastModifiedColumn} = transaction_timestamp() FROM {_tempTable} source WHERE {joinExpression}"; @@ -154,19 +153,30 @@ public string OverwriteDuplicatesFromTempTableWithVersionCheck() } var table = _mapping.Schema.Table; - var isMultiTenanted = _mapping.TenancyStyle == TenancyStyle.Conjoined; var storageTable = table.Identifier.QualifiedName; - var updates = table.Columns.Where(x => x.Name != "id" && x.Name != SchemaConstants.LastModifiedColumn) + var pkColumns = table.PrimaryKeyColumns.ToArray(); + var updates = table.Columns + .Where(x => !pkColumns.Contains(x.Name) && x.Name != SchemaConstants.LastModifiedColumn) .Select(x => $"{x.Name} = source.{x.Name}").Join(", "); - var joinExpression = isMultiTenanted - ? "source.id = target.id and source.tenant_id = target.tenant_id" - : "source.id = target.id"; + var joinExpression = buildPrimaryKeyJoinExpression(table, "source", "target"); return $"update {storageTable} target SET {updates}, {SchemaConstants.LastModifiedColumn} = transaction_timestamp() FROM {_tempTable} source WHERE {joinExpression} and target.{SchemaConstants.VersionColumn} = source.{SchemaConstants.ExpectedVersionColumn}"; } + /// + /// Build a join expression using all primary key columns from the table. + /// This handles composite PKs from partitioned tables where partition columns + /// are part of the primary key. + /// + private static string buildPrimaryKeyJoinExpression(Weasel.Postgresql.Tables.Table table, string leftAlias, string rightAlias) + { + return table.PrimaryKeyColumns + .Select(col => $"{leftAlias}.{col} = {rightAlias}.{col}") + .Join(" and "); + } + public string CreateTempTableForCopying() { if (!needsExpectedVersion())