Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions src/DocumentDbTests/Bugs/Bug_4219_bulk_insert_composite_pk.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Marten;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace DocumentDbTests.Bugs;

public class Bug_4219_bulk_insert_composite_pk : OneOffConfigurationsContext
{
[Fact]
public async Task bulk_insert_ignore_duplicates_with_range_partitioned_table()
{
StoreOptions(opts =>
{
opts.Schema.For<DailySnapshot>()
.Identity(x => x.Id)
.PartitionOn(x => x.Date, x =>
{
x.ByRange()
.AddRange("q1_2026", new DateOnly(2026, 1, 1), new DateOnly(2026, 4, 1))
.AddRange("q2_2026", new DateOnly(2026, 4, 1), new DateOnly(2026, 7, 1));
});
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

// Insert initial data
var initial = new[]
{
new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 1, 15), Value = 100 },
new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 2, 15), Value = 200 },
};

await theStore.BulkInsertDocumentsAsync(initial);

// Now bulk insert with IgnoreDuplicates:
// - first two have same composite PKs as existing (should be skipped)
// - third has same ID but different date (new composite PK — should be inserted)
var second = new[]
{
new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 1, 15), Value = 999 },
new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 2, 15), Value = 888 },
new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 3, 15), Value = 300 },
};

await theStore.BulkInsertDocumentsAsync(second, BulkInsertMode.IgnoreDuplicates);

await using var session = theStore.QuerySession();
var results = await session.Query<DailySnapshot>()
.Where(x => x.Id == "sensor-1")
.OrderBy(x => x.Date)
.ToListAsync();

// Should have 3 rows: original Jan + Feb values preserved, new March added
results.Count.ShouldBe(3);
results[0].Value.ShouldBe(100); // original preserved
results[1].Value.ShouldBe(200); // original preserved
results[2].Value.ShouldBe(300); // new row
}

[Fact]
public async Task bulk_insert_overwrite_with_range_partitioned_table()
{
StoreOptions(opts =>
{
opts.Schema.For<DailySnapshot>()
.Identity(x => x.Id)
.PartitionOn(x => x.Date, x =>
{
x.ByRange()
.AddRange("q1_2026", new DateOnly(2026, 1, 1), new DateOnly(2026, 4, 1))
.AddRange("q2_2026", new DateOnly(2026, 4, 1), new DateOnly(2026, 7, 1));
});
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

// Insert initial data
var initial = new[]
{
new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 1, 15), Value = 100 },
new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 2, 15), Value = 200 },
};

await theStore.BulkInsertDocumentsAsync(initial);

// Overwrite with updated values — same composite PKs, different values
var updated = new[]
{
new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 1, 15), Value = 999 },
new DailySnapshot { Id = "sensor-1", Date = new DateOnly(2026, 2, 15), Value = 888 },
};

await theStore.BulkInsertDocumentsAsync(updated, BulkInsertMode.OverwriteExisting);

await using var session = theStore.QuerySession();
var results = await session.Query<DailySnapshot>()
.Where(x => x.Id == "sensor-1")
.OrderBy(x => x.Date)
.ToListAsync();

results.Count.ShouldBe(2);
results[0].Value.ShouldBe(999);
results[1].Value.ShouldBe(888);
}
}

public class DailySnapshot
{
public string Id { get; set; } = default!;
public DateOnly Date { get; set; }
public decimal Value { get; set; }
}
40 changes: 25 additions & 15 deletions src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,37 +110,36 @@ private List<UpsertArgument> orderArgumentsForBulkWriting(UpsertFunction upsertF
public string CopyNewDocumentsFromTempTable()
{
var table = _mapping.Schema.Table;
var isMultiTenanted = _mapping.TenancyStyle == TenancyStyle.Conjoined;

var storageTable = table.Identifier.QualifiedName;
var columns = table.Columns.Where(x => x.Name != SchemaConstants.LastModifiedColumn)
.Select(x => $"\\\"{x.Name}\\\"").Join(", ");
var selectColumns = table.Columns.Where(x => x.Name != SchemaConstants.LastModifiedColumn)
.Select(x => $"{_tempTable}.\\\"{x.Name}\\\"").Join(", ");

var joinExpression = isMultiTenanted
? $"{_tempTable}.id = {storageTable}.id and {_tempTable}.tenant_id = {storageTable}.tenant_id"
: $"{_tempTable}.id = {storageTable}.id";
var joinExpression = buildPrimaryKeyJoinExpression(table, _tempTable, storageTable);

// Use the first PK column for the NULL check (any PK column works since they're all NOT NULL)
var firstPkColumn = table.PrimaryKeyColumns.First();

return
$"insert into {storageTable} ({columns}, {SchemaConstants.LastModifiedColumn}) " +
$"(select {selectColumns}, transaction_timestamp() " +
$"from {_tempTable} left join {storageTable} on {joinExpression} " +
$"where {storageTable}.id is null)";
$"where {storageTable}.{firstPkColumn} is null)";
}

public string OverwriteDuplicatesFromTempTable()
{
var table = _mapping.Schema.Table;
var isMultiTenanted = _mapping.TenancyStyle == TenancyStyle.Conjoined;
var storageTable = table.Identifier.QualifiedName;

var updates = table.Columns.Where(x => x.Name != "id" && x.Name != SchemaConstants.LastModifiedColumn)
var pkColumns = table.PrimaryKeyColumns.ToArray();
var updates = table.Columns
.Where(x => !pkColumns.Contains(x.Name) && x.Name != SchemaConstants.LastModifiedColumn)
.Select(x => $"{x.Name} = source.{x.Name}").Join(", ");

var joinExpression = isMultiTenanted
? "source.id = target.id and source.tenant_id = target.tenant_id"
: "source.id = target.id";
var joinExpression = buildPrimaryKeyJoinExpression(table, "source", "target");

return
$"update {storageTable} target SET {updates}, {SchemaConstants.LastModifiedColumn} = transaction_timestamp() FROM {_tempTable} source WHERE {joinExpression}";
Expand All @@ -154,19 +153,30 @@ public string OverwriteDuplicatesFromTempTableWithVersionCheck()
}

var table = _mapping.Schema.Table;
var isMultiTenanted = _mapping.TenancyStyle == TenancyStyle.Conjoined;
var storageTable = table.Identifier.QualifiedName;

var updates = table.Columns.Where(x => x.Name != "id" && x.Name != SchemaConstants.LastModifiedColumn)
var pkColumns = table.PrimaryKeyColumns.ToArray();
var updates = table.Columns
.Where(x => !pkColumns.Contains(x.Name) && x.Name != SchemaConstants.LastModifiedColumn)
.Select(x => $"{x.Name} = source.{x.Name}").Join(", ");

var joinExpression = isMultiTenanted
? "source.id = target.id and source.tenant_id = target.tenant_id"
: "source.id = target.id";
var joinExpression = buildPrimaryKeyJoinExpression(table, "source", "target");

return $"update {storageTable} target SET {updates}, {SchemaConstants.LastModifiedColumn} = transaction_timestamp() FROM {_tempTable} source WHERE {joinExpression} and target.{SchemaConstants.VersionColumn} = source.{SchemaConstants.ExpectedVersionColumn}";
}

/// <summary>
/// Build a join expression using all primary key columns from the table.
/// This handles composite PKs from partitioned tables where partition columns
/// are part of the primary key.
/// </summary>
private static string buildPrimaryKeyJoinExpression(Weasel.Postgresql.Tables.Table table, string leftAlias, string rightAlias)
{
return table.PrimaryKeyColumns
.Select(col => $"{leftAlias}.{col} = {rightAlias}.{col}")
.Join(" and ");
}

public string CreateTempTableForCopying()
{
if (!needsExpectedVersion())
Expand Down
Loading