From fc0d43660a488b274d8f3eb620b4eb5e1987530a Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Thu, 16 Apr 2026 12:43:37 -0500 Subject: [PATCH] Fix FlatTableProjection partial-mapping events violating NOT NULL (#4255) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, every event mapped into a FlatTableProjection generated an INSERT … ON CONFLICT DO UPDATE function using only the columns that event populates. When the target table had a NOT NULL column not populated by every event, the partial-mapping event's INSERT would violate the constraint (PostgreSQL validates the proposed INSERT row against constraints before ON CONFLICT resolves). Fix: detect events that map a strict subset of the table's non-PK columns and generate an UPDATE-only function for them. Full-mapping events keep the original INSERT … ON CONFLICT DO UPDATE form, so they can still create rows. Semantics: partial events update existing rows. If a stream starts with a partial event before any full-mapping event, the UPDATE matches zero rows and is a safe no-op. Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/events/projections/flat.md | 30 +++ ...Bug_4255_flat_table_not_null_constraint.cs | 182 ++++++++++++++++++ .../Flattened/FlatTableUpsertFunction.cs | 52 ++++- 3 files changed, 260 insertions(+), 4 deletions(-) create mode 100644 src/EventSourcingTests/Projections/Flattened/Bug_4255_flat_table_not_null_constraint.cs diff --git a/docs/events/projections/flat.md b/docs/events/projections/flat.md index 50250325f5..c70955f85c 100644 --- a/docs/events/projections/flat.md +++ b/docs/events/projections/flat.md @@ -112,6 +112,36 @@ A couple notes on this version of the code: The `FlatTableProjection` in its first incarnation is not yet able to use event metadata. +### Partial-Mapping Events (Update-Only) + +When an event mapped into a `FlatTableProjection` does not populate every non-primary-key +column on the target table, Marten generates an **UPDATE-only** function for that event: + +```sql +-- For an event that maps only the `field` column: +CREATE FUNCTION mt_upsert_proj_eventb(p_id uuid, p_field text) RETURNS void +LANGUAGE plpgsql AS $function$ +BEGIN + UPDATE proj SET field = p_field WHERE id = p_id; +END; +$function$; +``` + +Events that map **every** non-PK column still use the original `INSERT … ON CONFLICT DO UPDATE` +form so they can both create and update rows. + +This means partial-mapping events are **safe against NOT NULL constraints** on columns they +don't populate — they cannot create a half-populated row. It also means that if a partial +event fires for a stream whose row does not yet exist, the UPDATE matches zero rows and is +a no-op. Streams should therefore start with a full-mapping event that can create the row. + +::: warning +Prior to Marten 8.x, all events generated `INSERT … ON CONFLICT DO UPDATE`. If your table +had NOT NULL columns not populated by every event, appending those events would raise +`23502: null value in column "…" violates not-null constraint`. The partial-mapping +UPDATE-only behavior resolves this. +::: + ## Using EventProjection for Flat Tables ::: tip diff --git a/src/EventSourcingTests/Projections/Flattened/Bug_4255_flat_table_not_null_constraint.cs b/src/EventSourcingTests/Projections/Flattened/Bug_4255_flat_table_not_null_constraint.cs new file mode 100644 index 0000000000..72ab0e2cc2 --- /dev/null +++ b/src/EventSourcingTests/Projections/Flattened/Bug_4255_flat_table_not_null_constraint.cs @@ -0,0 +1,182 @@ +using System; +using System.Threading.Tasks; +using JasperFx; +using JasperFx.Events.Projections; +using Marten.Events.Projections; +using Marten.Events.Projections.Flattened; +using Marten.Testing.Harness; +using Shouldly; +using Weasel.Core; +using Weasel.Postgresql; +using Xunit; + +namespace EventSourcingTests.Projections.Flattened; + +/// +/// Regression test for https://github.com/JasperFx/marten/issues/4255. +/// +/// When a FlatTableProjection maps multiple events to the same table, and the table +/// has a NOT NULL column that is not populated by every event, partial-mapping events +/// previously produced an INSERT … ON CONFLICT DO UPDATE that violated the NOT NULL +/// constraint. +/// +/// The fix: partial-mapping events now generate an UPDATE-only function. Full-mapping +/// events keep the original INSERT … ON CONFLICT DO UPDATE behavior. +/// +public class Bug_4255_flat_table_not_null_constraint : OneOffConfigurationsContext +{ + [Fact] + public async Task partial_event_on_existing_row_updates_without_violating_not_null() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + opts.AutoCreateSchemaObjects = AutoCreate.All; + }); + + await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(AutoCreate.CreateOrUpdate); + + // Add a NOT NULL constraint on other_id, simulating the scenario where a user + // has created the table out-of-band with stricter constraints than Marten infers. + await using (var conn = theStore.Storage.Database.CreateConnection()) + { + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = $"ALTER TABLE {SchemaName}.bug_4255_proj ALTER COLUMN other_id SET NOT NULL;"; + await cmd.ExecuteNonQueryAsync(); + } + + var streamId = Guid.NewGuid(); + var otherId = Guid.NewGuid(); + + // EventA maps every non-PK column (full-mapping) — creates the row via INSERT ON CONFLICT + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, new Bug4255EventA(streamId, otherId, "initial")); + await session.SaveChangesAsync(); + } + + // EventB maps only `field` (partial-mapping) — after fix, UPDATE-only so + // the NOT NULL constraint is not violated. + await using (var session = theStore.LightweightSession()) + { + session.Events.Append(streamId, new Bug4255EventB("changed")); + await session.SaveChangesAsync(); + } + + // Verify the UPDATE happened: field changed, other_id preserved. + await using (var conn = theStore.Storage.Database.CreateConnection()) + { + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = $"SELECT other_id, field FROM {SchemaName}.bug_4255_proj WHERE id = @id"; + cmd.AddNamedParameter("id", streamId); + await using var reader = await cmd.ExecuteReaderAsync(); + + (await reader.ReadAsync()).ShouldBeTrue(); + reader.GetGuid(0).ShouldBe(otherId); + reader.GetString(1).ShouldBe("changed"); + } + } + + [Fact] + public async Task partial_event_on_new_stream_is_a_safe_noop() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + opts.AutoCreateSchemaObjects = AutoCreate.All; + }); + + await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(AutoCreate.CreateOrUpdate); + + await using (var conn = theStore.Storage.Database.CreateConnection()) + { + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = $"ALTER TABLE {SchemaName}.bug_4255_proj ALTER COLUMN other_id SET NOT NULL;"; + await cmd.ExecuteNonQueryAsync(); + } + + var newStreamId = Guid.NewGuid(); + + // Starting a new stream with a partial event: no row exists yet. + // Previously, this threw a NOT NULL violation. After the fix, the UPDATE + // statement matches zero rows and is a no-op. + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(newStreamId, new Bug4255EventB("first-is-b")); + await session.SaveChangesAsync(); + } + + // No row should be created because partial events are UPDATE-only. + await using (var conn = theStore.Storage.Database.CreateConnection()) + { + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = $"SELECT COUNT(*) FROM {SchemaName}.bug_4255_proj WHERE id = @id"; + cmd.AddNamedParameter("id", newStreamId); + var count = (long)(await cmd.ExecuteScalarAsync())!; + count.ShouldBe(0L); + } + } + + [Fact] + public async Task full_mapping_event_still_uses_insert_on_conflict() + { + // Sanity check: the existing INSERT … ON CONFLICT DO UPDATE path is preserved + // for events that map every non-PK column. + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + opts.AutoCreateSchemaObjects = AutoCreate.All; + }); + + await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(AutoCreate.CreateOrUpdate); + + var streamId = Guid.NewGuid(); + var otherId = Guid.NewGuid(); + + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, new Bug4255EventA(streamId, otherId, "hello")); + await session.SaveChangesAsync(); + } + + await using (var conn = theStore.Storage.Database.CreateConnection()) + { + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = $"SELECT other_id, field FROM {SchemaName}.bug_4255_proj WHERE id = @id"; + cmd.AddNamedParameter("id", streamId); + await using var reader = await cmd.ExecuteReaderAsync(); + + (await reader.ReadAsync()).ShouldBeTrue(); + reader.GetGuid(0).ShouldBe(otherId); + reader.GetString(1).ShouldBe("hello"); + } + } +} + +public class Bug4255Projection : FlatTableProjection +{ + public Bug4255Projection() : base("bug_4255_proj", SchemaNameSource.DocumentSchema) + { + Table.AddColumn("id").AsPrimaryKey(); + Table.AddColumn("other_id"); + Table.AddColumn("field"); + + // EventA populates every non-PK column — full mapping, INSERT ON CONFLICT + Project(map => + { + map.Map(e => e.OtherId, "other_id"); + map.Map(e => e.Field, "field"); + }, e => e.Id); + + // EventB only populates `field` — partial mapping, UPDATE-only after fix + Project(map => { map.Map(e => e.Field, "field"); }); + } +} + +public record Bug4255EventA(Guid Id, Guid OtherId, string Field); +public record Bug4255EventB(string Field); diff --git a/src/Marten/Events/Projections/Flattened/FlatTableUpsertFunction.cs b/src/Marten/Events/Projections/Flattened/FlatTableUpsertFunction.cs index 4dd72863c2..497dbd0393 100644 --- a/src/Marten/Events/Projections/Flattened/FlatTableUpsertFunction.cs +++ b/src/Marten/Events/Projections/Flattened/FlatTableUpsertFunction.cs @@ -1,3 +1,4 @@ +using System; using System.Collections.Generic; using System.IO; using System.Linq; @@ -24,19 +25,62 @@ public FlatTableUpsertFunction(DbObjectName identifier, Table table, List + /// True when this event maps only a subset of the table's non-primary-key + /// columns. Partial events generate UPDATE-only functions so that they cannot + /// violate NOT NULL constraints on columns they don't populate (#4255). + /// + internal bool IsPartialMapping + { + get + { + var mappedColumnNames = _columns.Select(x => x.ColumnName) + .ToHashSet(StringComparer.OrdinalIgnoreCase); + + var pkColumnNames = _table.PrimaryKeyColumns + .ToHashSet(StringComparer.OrdinalIgnoreCase); + + return _table.Columns + .Where(c => !pkColumnNames.Contains(c.Name)) + .Any(c => !mappedColumnNames.Contains(c.Name)); + } + } + public override void WriteCreateStatement(Migrator migrator, TextWriter writer) { var pkColumns = _table.PrimaryKeyColumns.Select(x => _table.ColumnFor(x)).ToArray(); - var inserts = _table.PrimaryKeyColumns.Concat(_columns.Select(x => x.ColumnName)).Join(", "); - // Arguments var argList = arguments(pkColumns).Join(", "); + if (IsPartialMapping) + { + // For partial-mapping events, only UPDATE the existing row. If no row exists, + // this is a no-op — which is safer than inserting a partially populated row + // that may violate NOT NULL constraints on unmapped columns (#4255). + var updates = _columns.Select(x => x.UpdateFieldSql(_table)).Join(", "); + var whereClause = _table.PrimaryKeyColumns + .Select(c => $"{c} = {_table.ColumnFor(c).ToArgumentName()}") + .Join(" AND "); + + writer.WriteLine($@" +CREATE OR REPLACE FUNCTION {Identifier.QualifiedName}({argList}) RETURNS void LANGUAGE plpgsql +AS $function$ +BEGIN +UPDATE {_table.Identifier.QualifiedName} SET {updates} + WHERE {whereClause}; +END; +$function$; +"); + return; + } + + var inserts = _table.PrimaryKeyColumns.Concat(_columns.Select(x => x.ColumnName)).Join(", "); + // Insert values var insertExpressions = insertValues(pkColumns).Join(", "); - var updates = _columns.Select(x => x.UpdateFieldSql(_table)).Join(", "); + var allUpdates = _columns.Select(x => x.UpdateFieldSql(_table)).Join(", "); writer.WriteLine($@" CREATE OR REPLACE FUNCTION {Identifier.QualifiedName}({argList}) RETURNS void LANGUAGE plpgsql @@ -44,7 +88,7 @@ public override void WriteCreateStatement(Migrator migrator, TextWriter writer) BEGIN INSERT INTO {_table.Identifier.QualifiedName} ({inserts}) VALUES ({insertExpressions}) ON CONFLICT ON CONSTRAINT {_table.PrimaryKeyName} - DO UPDATE SET {updates}; + DO UPDATE SET {allUpdates}; END; $function$; ");