Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/events/projections/flat.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,36 @@ A couple notes on this version of the code:

The `FlatTableProjection` in its first incarnation is not yet able to use event metadata.

### Partial-Mapping Events (Update-Only) <Badge type="tip" text="8.x" />

When an event mapped into a `FlatTableProjection` does not populate every non-primary-key
column on the target table, Marten generates an **UPDATE-only** function for that event:

```sql
-- For an event that maps only the `field` column:
CREATE FUNCTION mt_upsert_proj_eventb(p_id uuid, p_field text) RETURNS void
LANGUAGE plpgsql AS $function$
BEGIN
UPDATE proj SET field = p_field WHERE id = p_id;
END;
$function$;
```

Events that map **every** non-PK column still use the original `INSERT … ON CONFLICT DO UPDATE`
form so they can both create and update rows.

This means partial-mapping events are **safe against NOT NULL constraints** on columns they
don't populate — they cannot create a half-populated row. It also means that if a partial
event fires for a stream whose row does not yet exist, the UPDATE matches zero rows and is
a no-op. Streams should therefore start with a full-mapping event that can create the row.

::: warning
Prior to Marten 8.x, all events generated `INSERT … ON CONFLICT DO UPDATE`. If your table
had NOT NULL columns not populated by every event, appending those events would raise
`23502: null value in column "…" violates not-null constraint`. The partial-mapping
UPDATE-only behavior resolves this.
:::

## Using EventProjection for Flat Tables

::: tip
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
using System;
using System.Threading.Tasks;
using JasperFx;
using JasperFx.Events.Projections;
using Marten.Events.Projections;
using Marten.Events.Projections.Flattened;
using Marten.Testing.Harness;
using Shouldly;
using Weasel.Core;
using Weasel.Postgresql;
using Xunit;

namespace EventSourcingTests.Projections.Flattened;

/// <summary>
/// Regression test for https://github.com/JasperFx/marten/issues/4255.
///
/// When a FlatTableProjection maps multiple events to the same table, and the table
/// has a NOT NULL column that is not populated by every event, partial-mapping events
/// previously produced an INSERT … ON CONFLICT DO UPDATE that violated the NOT NULL
/// constraint.
///
/// The fix: partial-mapping events now generate an UPDATE-only function. Full-mapping
/// events keep the original INSERT … ON CONFLICT DO UPDATE behavior.
/// </summary>
public class Bug_4255_flat_table_not_null_constraint : OneOffConfigurationsContext
{
[Fact]
public async Task partial_event_on_existing_row_updates_without_violating_not_null()
{
StoreOptions(opts =>
{
opts.Projections.Add<Bug4255Projection>(ProjectionLifecycle.Inline);
opts.AutoCreateSchemaObjects = AutoCreate.All;
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(AutoCreate.CreateOrUpdate);

// Add a NOT NULL constraint on other_id, simulating the scenario where a user
// has created the table out-of-band with stricter constraints than Marten infers.
await using (var conn = theStore.Storage.Database.CreateConnection())
{
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = $"ALTER TABLE {SchemaName}.bug_4255_proj ALTER COLUMN other_id SET NOT NULL;";
await cmd.ExecuteNonQueryAsync();
}

var streamId = Guid.NewGuid();
var otherId = Guid.NewGuid();

// EventA maps every non-PK column (full-mapping) — creates the row via INSERT ON CONFLICT
await using (var session = theStore.LightweightSession())
{
session.Events.StartStream(streamId, new Bug4255EventA(streamId, otherId, "initial"));
await session.SaveChangesAsync();
}

// EventB maps only `field` (partial-mapping) — after fix, UPDATE-only so
// the NOT NULL constraint is not violated.
await using (var session = theStore.LightweightSession())
{
session.Events.Append(streamId, new Bug4255EventB("changed"));
await session.SaveChangesAsync();
}

// Verify the UPDATE happened: field changed, other_id preserved.
await using (var conn = theStore.Storage.Database.CreateConnection())
{
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = $"SELECT other_id, field FROM {SchemaName}.bug_4255_proj WHERE id = @id";
cmd.AddNamedParameter("id", streamId);
await using var reader = await cmd.ExecuteReaderAsync();

(await reader.ReadAsync()).ShouldBeTrue();
reader.GetGuid(0).ShouldBe(otherId);
reader.GetString(1).ShouldBe("changed");
}
}

[Fact]
public async Task partial_event_on_new_stream_is_a_safe_noop()
{
StoreOptions(opts =>
{
opts.Projections.Add<Bug4255Projection>(ProjectionLifecycle.Inline);
opts.AutoCreateSchemaObjects = AutoCreate.All;
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(AutoCreate.CreateOrUpdate);

await using (var conn = theStore.Storage.Database.CreateConnection())
{
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = $"ALTER TABLE {SchemaName}.bug_4255_proj ALTER COLUMN other_id SET NOT NULL;";
await cmd.ExecuteNonQueryAsync();
}

var newStreamId = Guid.NewGuid();

// Starting a new stream with a partial event: no row exists yet.
// Previously, this threw a NOT NULL violation. After the fix, the UPDATE
// statement matches zero rows and is a no-op.
await using (var session = theStore.LightweightSession())
{
session.Events.StartStream(newStreamId, new Bug4255EventB("first-is-b"));
await session.SaveChangesAsync();
}

// No row should be created because partial events are UPDATE-only.
await using (var conn = theStore.Storage.Database.CreateConnection())
{
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = $"SELECT COUNT(*) FROM {SchemaName}.bug_4255_proj WHERE id = @id";
cmd.AddNamedParameter("id", newStreamId);
var count = (long)(await cmd.ExecuteScalarAsync())!;
count.ShouldBe(0L);
}
}

[Fact]
public async Task full_mapping_event_still_uses_insert_on_conflict()
{
// Sanity check: the existing INSERT … ON CONFLICT DO UPDATE path is preserved
// for events that map every non-PK column.
StoreOptions(opts =>
{
opts.Projections.Add<Bug4255Projection>(ProjectionLifecycle.Inline);
opts.AutoCreateSchemaObjects = AutoCreate.All;
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(AutoCreate.CreateOrUpdate);

var streamId = Guid.NewGuid();
var otherId = Guid.NewGuid();

await using (var session = theStore.LightweightSession())
{
session.Events.StartStream(streamId, new Bug4255EventA(streamId, otherId, "hello"));
await session.SaveChangesAsync();
}

await using (var conn = theStore.Storage.Database.CreateConnection())
{
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = $"SELECT other_id, field FROM {SchemaName}.bug_4255_proj WHERE id = @id";
cmd.AddNamedParameter("id", streamId);
await using var reader = await cmd.ExecuteReaderAsync();

(await reader.ReadAsync()).ShouldBeTrue();
reader.GetGuid(0).ShouldBe(otherId);
reader.GetString(1).ShouldBe("hello");
}
}
}

public class Bug4255Projection : FlatTableProjection
{
public Bug4255Projection() : base("bug_4255_proj", SchemaNameSource.DocumentSchema)
{
Table.AddColumn<Guid>("id").AsPrimaryKey();
Table.AddColumn<Guid>("other_id");
Table.AddColumn<string>("field");

// EventA populates every non-PK column — full mapping, INSERT ON CONFLICT
Project<Bug4255EventA>(map =>
{
map.Map(e => e.OtherId, "other_id");
map.Map(e => e.Field, "field");
}, e => e.Id);

// EventB only populates `field` — partial mapping, UPDATE-only after fix
Project<Bug4255EventB>(map => { map.Map(e => e.Field, "field"); });
}
}

public record Bug4255EventA(Guid Id, Guid OtherId, string Field);
public record Bug4255EventB(string Field);
52 changes: 48 additions & 4 deletions src/Marten/Events/Projections/Flattened/FlatTableUpsertFunction.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
Expand All @@ -24,27 +25,70 @@ public FlatTableUpsertFunction(DbObjectName identifier, Table table, List<IColum
_columns = columns;
}

/// <summary>
/// True when this event maps only a subset of the table's non-primary-key
/// columns. Partial events generate UPDATE-only functions so that they cannot
/// violate NOT NULL constraints on columns they don't populate (#4255).
/// </summary>
internal bool IsPartialMapping
{
get
{
var mappedColumnNames = _columns.Select(x => x.ColumnName)
.ToHashSet(StringComparer.OrdinalIgnoreCase);

var pkColumnNames = _table.PrimaryKeyColumns
.ToHashSet(StringComparer.OrdinalIgnoreCase);

return _table.Columns
.Where(c => !pkColumnNames.Contains(c.Name))
.Any(c => !mappedColumnNames.Contains(c.Name));
}
}

public override void WriteCreateStatement(Migrator migrator, TextWriter writer)
{
var pkColumns = _table.PrimaryKeyColumns.Select(x => _table.ColumnFor(x)).ToArray();

var inserts = _table.PrimaryKeyColumns.Concat(_columns.Select(x => x.ColumnName)).Join(", ");

// Arguments
var argList = arguments(pkColumns).Join(", ");

if (IsPartialMapping)
{
// For partial-mapping events, only UPDATE the existing row. If no row exists,
// this is a no-op — which is safer than inserting a partially populated row
// that may violate NOT NULL constraints on unmapped columns (#4255).
var updates = _columns.Select(x => x.UpdateFieldSql(_table)).Join(", ");
var whereClause = _table.PrimaryKeyColumns
.Select(c => $"{c} = {_table.ColumnFor(c).ToArgumentName()}")
.Join(" AND ");

writer.WriteLine($@"
CREATE OR REPLACE FUNCTION {Identifier.QualifiedName}({argList}) RETURNS void LANGUAGE plpgsql
AS $function$
BEGIN
UPDATE {_table.Identifier.QualifiedName} SET {updates}
WHERE {whereClause};
END;
$function$;
");
return;
}

var inserts = _table.PrimaryKeyColumns.Concat(_columns.Select(x => x.ColumnName)).Join(", ");

// Insert values
var insertExpressions = insertValues(pkColumns).Join(", ");

var updates = _columns.Select(x => x.UpdateFieldSql(_table)).Join(", ");
var allUpdates = _columns.Select(x => x.UpdateFieldSql(_table)).Join(", ");

writer.WriteLine($@"
CREATE OR REPLACE FUNCTION {Identifier.QualifiedName}({argList}) RETURNS void LANGUAGE plpgsql
AS $function$
BEGIN
INSERT INTO {_table.Identifier.QualifiedName} ({inserts}) VALUES ({insertExpressions})
ON CONFLICT ON CONSTRAINT {_table.PrimaryKeyName}
DO UPDATE SET {updates};
DO UPDATE SET {allUpdates};
END;
$function$;
");
Expand Down
Loading