Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions src/DocumentDbTests/Bugs/Bug_4223_update_partitioned_composite_pk.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Marten;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace DocumentDbTests.Bugs;

public class Bug4223Snapshot
{
public string Id { get; set; } = "";
public DateOnly Date { get; set; }
public decimal Quantity { get; set; }
}

public class Bug_4223_update_partitioned_composite_pk : OneOffConfigurationsContext
{
public Bug_4223_update_partitioned_composite_pk()
{
_schemaName = "bug4223";
}

[Fact]
public async Task update_with_same_id_different_partition_key()
{
StoreOptions(opts =>
{
opts.Schema.For<Bug4223Snapshot>()
.Identity(x => x.Id)
.Duplicate(x => x.Date)
.PartitionOn(x => x.Date, x =>
{
x.ByRange()
.AddRange("q1_2026", new DateOnly(2026, 1, 1), new DateOnly(2026, 4, 1))
.AddRange("q2_2026", new DateOnly(2026, 4, 1), new DateOnly(2026, 7, 1));
});
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

// Insert two documents with same id, different date (valid per composite PK)
var d1 = new Bug4223Snapshot { Id = "sensor-1", Date = new DateOnly(2026, 1, 15), Quantity = 100 };
var d2 = new Bug4223Snapshot { Id = "sensor-1", Date = new DateOnly(2026, 3, 15), Quantity = 200 };

await using (var session = theStore.LightweightSession())
{
session.Store(d1);
session.Store(d2);
await session.SaveChangesAsync();
}

// Update only the first one — should NOT affect the second
await using (var session = theStore.LightweightSession())
{
d1.Quantity = 999;
session.Update(d1);
await session.SaveChangesAsync();
}

// Verify: d1 updated, d2 unchanged
await using (var query = theStore.QuerySession())
{
var results = await query.Query<Bug4223Snapshot>()
.Where(x => x.Id == "sensor-1")
.OrderBy(x => x.Date)
.ToListAsync();

results.Count.ShouldBe(2);
results[0].Quantity.ShouldBe(999m); // updated
results[1].Quantity.ShouldBe(200m); // unchanged
}
}

[Fact]
public async Task upsert_with_same_id_different_partition_key()
{
StoreOptions(opts =>
{
opts.Schema.For<Bug4223Snapshot>()
.Identity(x => x.Id)
.Duplicate(x => x.Date)
.PartitionOn(x => x.Date, x =>
{
x.ByRange()
.AddRange("q1_2026", new DateOnly(2026, 1, 1), new DateOnly(2026, 4, 1))
.AddRange("q2_2026", new DateOnly(2026, 4, 1), new DateOnly(2026, 7, 1));
});
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

// Store two documents with same id, different dates
await using (var session = theStore.LightweightSession())
{
session.Store(new Bug4223Snapshot { Id = "sensor-1", Date = new DateOnly(2026, 1, 15), Quantity = 100 });
session.Store(new Bug4223Snapshot { Id = "sensor-1", Date = new DateOnly(2026, 3, 15), Quantity = 200 });
await session.SaveChangesAsync();
}

// Upsert the first one with new quantity
await using (var session = theStore.LightweightSession())
{
session.Store(new Bug4223Snapshot { Id = "sensor-1", Date = new DateOnly(2026, 1, 15), Quantity = 777 });
await session.SaveChangesAsync();
}

await using (var query = theStore.QuerySession())
{
var results = await query.Query<Bug4223Snapshot>()
.Where(x => x.Id == "sensor-1")
.OrderBy(x => x.Date)
.ToListAsync();

results.Count.ShouldBe(2);
results[0].Quantity.ShouldBe(777m); // upserted
results[1].Quantity.ShouldBe(200m); // unchanged
}
}
}
10 changes: 5 additions & 5 deletions src/Marten/Storage/UpdateFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ protected override void writeFunction(TextWriter writer, string argList, string
string updates)
{
var statement = updates.Contains("where")
? $"UPDATE {_tableName} SET {updates} and id = docId;"
: $"UPDATE {_tableName} SET {updates} where id = docId;";
? $"UPDATE {_tableName} SET {updates} and id = docId{_andPartitionWhereClause};"
: $"UPDATE {_tableName} SET {updates} where id = docId{_andPartitionWhereClause};";

if (_mapping.Metadata.Revision.Enabled)
{
Expand All @@ -29,15 +29,15 @@ protected override void writeFunction(TextWriter writer, string argList, string
current_version INTEGER;
BEGIN
if revision <= 1 then
SELECT mt_version FROM {_tableName.QualifiedName} into current_version WHERE id = docId {_andTenantWhereClause};
SELECT mt_version FROM {_tableName.QualifiedName} into current_version WHERE id = docId {_andTenantWhereClause}{_andPartitionWhereClause};
if current_version is not null then
revision = current_version + 1;
end if;
end if;

{statement}

SELECT mt_version FROM {_tableName} into final_version WHERE id = docId {_andTenantWhereClause};
SELECT mt_version FROM {_tableName} into final_version WHERE id = docId {_andTenantWhereClause}{_andPartitionWhereClause};
RETURN final_version;
END;
$function$;
Expand All @@ -54,7 +54,7 @@ protected override void writeFunction(TextWriter writer, string argList, string
BEGIN
{statement}

SELECT mt_version FROM {_tableName} into final_version WHERE id = docId {_andTenantWhereClause};
SELECT mt_version FROM {_tableName} into final_version WHERE id = docId {_andTenantWhereClause}{_andPartitionWhereClause};
RETURN final_version;
END;
$function$;
Expand Down
30 changes: 27 additions & 3 deletions src/Marten/Storage/UpsertFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ internal class UpsertFunction: Function

public readonly IList<UpsertArgument> Arguments = new List<UpsertArgument>();
protected readonly string _primaryKeyFields;
protected string _andPartitionWhereClause = "";
protected readonly string _versionSourceTable;
protected readonly string _versionColumnName;
protected readonly string _tenantVersionWhereClause;
Expand Down Expand Up @@ -112,6 +113,29 @@ public UpsertFunction(DocumentMapping mapping, DbObjectName? identifier = null,
}

_primaryKeyFields = table.Columns.Where(x => x.IsPrimaryKey).Select(x => x.Name).Join(", ");

// Build WHERE clause fragment for partition columns (columns that are part of the PK
// but are not 'id' or 'tenant_id'). These must be included in UPDATE WHERE and
// SELECT WHERE clauses for partitioned tables to correctly target a single row.
if (table.Partitioning != null)
{
var partitionClauses = new List<string>();
foreach (var colName in table.Partitioning.Columns)
{
// Find the matching argument for this partition column
var arg = Arguments.FirstOrDefault(a =>
a.Column.Equals(colName, StringComparison.OrdinalIgnoreCase));
if (arg != null)
{
partitionClauses.Add($"{colName} = {arg.Arg}");
}
}

if (partitionClauses.Any())
{
_andPartitionWhereClause = " and " + partitionClauses.Join(" and ");
}
}
}

public void AddIfActive(MetadataColumn column)
Expand Down Expand Up @@ -200,7 +224,7 @@ protected virtual void writeFunction(TextWriter writer, string argList, string s
current_version INTEGER;
BEGIN

SELECT {_versionColumnName} into current_version FROM {_versionSourceTable} WHERE id = docId {_andTenantVersionWhereClause};
SELECT {_versionColumnName} into current_version FROM {_versionSourceTable} WHERE id = docId {_andTenantVersionWhereClause}{_andPartitionWhereClause};
if revision = 0 then
if current_version is not null then
{revisionModification}
Expand All @@ -219,7 +243,7 @@ protected virtual void writeFunction(TextWriter writer, string argList, string s
ON CONFLICT ({_primaryKeyFields})
DO UPDATE SET {updates};

SELECT mt_version into final_version FROM {_tableName.QualifiedName} WHERE id = docId {_andTenantWhereClause};
SELECT mt_version into final_version FROM {_tableName.QualifiedName} WHERE id = docId {_andTenantWhereClause}{_andPartitionWhereClause};
RETURN final_version;
END;
$function$;
Expand All @@ -239,7 +263,7 @@ ON CONFLICT ({_primaryKeyFields})
ON CONFLICT ({_primaryKeyFields})
DO UPDATE SET {updates};

SELECT mt_version FROM {_tableName.QualifiedName} into final_version WHERE id = docId {_andTenantWhereClause};
SELECT mt_version FROM {_tableName.QualifiedName} into final_version WHERE id = docId {_andTenantWhereClause}{_andPartitionWhereClause};
RETURN final_version;
END;
$function$;
Expand Down
Loading