Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 278 additions & 0 deletions src/CoreTests/Bugs/Bug_4614_revision_column_int_for_IRevisioned.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using JasperFx;
using JasperFx.Events;
using JasperFx.Events.Projections;
using Marten;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Metadata;
using Marten.Storage.Metadata;
using Marten.Testing.Harness;
using Npgsql;
using Shouldly;
using Weasel.Core;
using Weasel.Postgresql;
using Xunit;

namespace CoreTests.Bugs;

/// <summary>
/// #4614 — Marten 8 → 9 upgrade was migrating the mt_version column on
/// SingleStreamProjection aggregate document tables from <c>integer</c> to
/// <c>bigint</c>. That widening was a side-effect of <c>RevisionColumn</c>
/// becoming <c>MetadataColumn&lt;long&gt;</c> in #3733; the .NET-side
/// <c>IRevisioned.Version</c> was reverted to int in #4533, but the column
/// width was not. The fix splits the variant in two: IRevisioned-backed
/// documents (the V8 default; what SingleStreamProjection aggregates use)
/// get <c>integer</c>; ILongVersioned-backed documents (MultiStreamProjection's
/// shape, where Version is a global event-sequence number) keep <c>bigint</c>.
///
/// <para>Migration is non-destructive in both directions:</para>
/// <list type="bullet">
/// <item>V8 schema (<c>integer</c>) + IRevisioned (desired <c>integer</c>) — no migration.</item>
/// <item>V8 schema (<c>integer</c>) + ILongVersioned (desired <c>bigint</c>) — widen to bigint.</item>
/// <item>9.x deployment already migrated to <c>bigint</c> + IRevisioned (desired <c>integer</c>)
/// — tolerated, no force-narrow (a USING cast would risk silent data loss).</item>
/// </list>
/// </summary>
public class Bug_4614_revision_column_int_for_IRevisioned: OneOffConfigurationsContext
{
// ---- Fresh-creation tests: new tables match the V8 column width ----

[Fact]
public async Task fresh_table_for_IRevisioned_uses_integer_column()
{
StoreOptions(opts => opts.Schema.For<RevisionedDoc>());
await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

var actualType = await readVersionColumnType(typeof(RevisionedDoc));
actualType.ShouldBe("integer");
}

[Fact]
public async Task fresh_table_for_ILongVersioned_uses_bigint_column()
{
StoreOptions(opts => opts.Schema.For<LongVersionedDoc>());
await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

var actualType = await readVersionColumnType(typeof(LongVersionedDoc));
actualType.ShouldBe("bigint");
}

// ---- Migration tolerance — the regression's user-visible surface ----

[Fact]
public async Task V8_integer_column_for_IRevisioned_is_not_migrated_to_bigint()
{
// Stage 1: stand up the schema, then deliberately seed the V8 shape — a real V8
// deployment's table for an IRevisioned document had `mt_version integer`. On the
// current code (post-fix), the desired type is integer too, so apply should detect
// no work to do and leave the column alone — i.e. NO `ALTER COLUMN … TYPE bigint`
// gets emitted on the V8 → V9 upgrade.
StoreOptions(opts => opts.Schema.For<RevisionedDoc>());
await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

await using (var conn = new NpgsqlConnection(ConnectionSource.ConnectionString))
{
await conn.OpenAsync();
await conn.CreateCommand(
$"alter table {SchemaName}.mt_doc_revisioneddoc alter column mt_version type integer using mt_version::integer")
.ExecuteNonQueryAsync();
}

// Second apply must be a no-op for this column — the desired-vs-actual diff sees
// "integer integer", not "bigint integer", and emits nothing.
await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

(await readVersionColumnType(typeof(RevisionedDoc))).ShouldBe("integer");
}

[Fact]
public async Task V8_integer_column_for_ILongVersioned_still_widens_to_bigint()
{
// The legitimate V8 → V9 widening path is preserved for ILongVersioned-typed
// documents (the only docs that legitimately need a bigint column going forward).
StoreOptions(opts => opts.Schema.For<LongVersionedDoc>());
await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

await using (var conn = new NpgsqlConnection(ConnectionSource.ConnectionString))
{
await conn.OpenAsync();
await conn.CreateCommand(
$"alter table {SchemaName}.mt_doc_longversioneddoc alter column mt_version type integer using mt_version::integer")
.ExecuteNonQueryAsync();
}

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

(await readVersionColumnType(typeof(LongVersionedDoc))).ShouldBe("bigint");
}

[Fact]
public async Task existing_9x_bigint_column_for_IRevisioned_is_tolerated_not_narrowed()
{
// The reverse-direction safety: a deployment that already migrated to V9-with-bigint
// (before this fix) MUST NOT get force-narrowed to integer on the next apply — a
// `USING mt_version::integer` cast would silently truncate any out-of-range value.
// The diff treats bigint-actual + integer-desired as compatible (no SQL emitted).
StoreOptions(opts => opts.Schema.For<RevisionedDoc>());
await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

await using (var conn = new NpgsqlConnection(ConnectionSource.ConnectionString))
{
await conn.OpenAsync();
await conn.CreateCommand(
$"alter table {SchemaName}.mt_doc_revisioneddoc alter column mt_version type bigint")
.ExecuteNonQueryAsync();
}

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

(await readVersionColumnType(typeof(RevisionedDoc))).ShouldBe("bigint");
}

// ---- CRUD round-trip on both shapes ----

[Fact]
public async Task IRevisioned_round_trip_insert_update_read()
{
StoreOptions(opts => opts.Schema.For<RevisionedDoc>());

var doc = new RevisionedDoc { Id = Guid.NewGuid(), Name = "alpha" };

await using (var session = theStore.LightweightSession())
{
session.UpdateRevision(doc, 1);
await session.SaveChangesAsync();
}

await using (var session = theStore.LightweightSession())
{
doc.Name = "beta";
session.UpdateRevision(doc, 2);
await session.SaveChangesAsync();
}

await using (var query = theStore.QuerySession())
{
var loaded = await query.LoadAsync<RevisionedDoc>(doc.Id);
loaded.ShouldNotBeNull();
loaded.Name.ShouldBe("beta");
loaded.Version.ShouldBe(2);
}
}

[Fact]
public async Task ILongVersioned_round_trip_insert_update_read()
{
StoreOptions(opts => opts.Schema.For<LongVersionedDoc>());

var doc = new LongVersionedDoc { Id = Guid.NewGuid(), Name = "alpha" };

await using (var session = theStore.LightweightSession())
{
session.UpdateRevision(doc, 1);
await session.SaveChangesAsync();
}

await using (var session = theStore.LightweightSession())
{
doc.Name = "beta";
session.UpdateRevision(doc, 2);
await session.SaveChangesAsync();
}

await using (var query = theStore.QuerySession())
{
var loaded = await query.LoadAsync<LongVersionedDoc>(doc.Id);
loaded.ShouldNotBeNull();
loaded.Name.ShouldBe("beta");
loaded.Version.ShouldBe(2L);
}
}

// ---- SingleStreamProjection registration auto-picks the right variant ----

[Fact]
public async Task SingleStreamProjection_of_IRevisioned_document_gets_integer_column()
{
StoreOptions(opts =>
{
opts.Events.AddEventType<NameChanged>();
opts.Projections.Add<NamedAggregateProjection>(ProjectionLifecycle.Inline);
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

(await readVersionColumnType(typeof(NamedAggregate))).ShouldBe("integer");

// Round-trip through the actual projection path to prove the integer column
// works end-to-end: append event → inline projection writes the doc → read back.
var streamId = Guid.NewGuid();
await using (var session = theStore.LightweightSession())
{
session.Events.StartStream<NamedAggregate>(streamId, new NameChanged("first"));
await session.SaveChangesAsync();
}

await using (var session = theStore.LightweightSession())
{
session.Events.Append(streamId, new NameChanged("second"));
await session.SaveChangesAsync();
}

await using (var query = theStore.QuerySession())
{
var agg = await query.LoadAsync<NamedAggregate>(streamId);
agg.ShouldNotBeNull();
agg.Name.ShouldBe("second");
agg.Version.ShouldBe(2);
}
}

// ---- helper ----

private async Task<string> readVersionColumnType(Type docType)
{
await using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString);
await conn.OpenAsync();
var tableName = "mt_doc_" + docType.Name.ToLowerInvariant();
var dataType = (string?)await conn.CreateCommand(
"select data_type from information_schema.columns where table_schema = :s and table_name = :t and column_name = 'mt_version'")
.With("s", SchemaName)
.With("t", tableName)
.ExecuteScalarAsync();
return dataType ?? throw new InvalidOperationException(
$"mt_version column not found on {SchemaName}.{tableName}");
}
}

public class RevisionedDoc: IRevisioned
{
public Guid Id { get; set; }
public string Name { get; set; } = string.Empty;
public int Version { get; set; }
}

public class LongVersionedDoc: ILongVersioned
{
public Guid Id { get; set; }
public string Name { get; set; } = string.Empty;
public long Version { get; set; }
}

public record NameChanged(string Name);

public class NamedAggregate: IRevisioned
{
public Guid Id { get; set; }
public string Name { get; set; } = string.Empty;
public int Version { get; set; }
}

public partial class NamedAggregateProjection: SingleStreamProjection<NamedAggregate, Guid>
{
public void Apply(NameChanged @event, NamedAggregate agg) => agg.Name = @event.Name;
}
13 changes: 9 additions & 4 deletions src/Marten/Internal/ClosedShape/ClosedShapeInsertOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,13 @@ private int BindBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadata
// THEN COALESCE((select version from <events>.mt_streams where id = ?), 1)
// ELSE ? END (3 slots: ?=0 check, subquery id, explicit revision)
// UseVersionFromMatchingStream + conjoined: same with extra ? for tenant_id (4 slots)
parameters[slot].Value = Revision;
parameters[slot].NpgsqlDbType = NpgsqlDbType.Bigint;
// #4614: parameter type follows the column width (integer vs bigint).
var revisionDbType = _descriptor.RevisionBinder.ColumnDbType;
var revisionValue = revisionDbType == NpgsqlDbType.Integer
? (object)checked((int)Revision)
: Revision;
parameters[slot].Value = revisionValue;
parameters[slot].NpgsqlDbType = revisionDbType;
slot++;

if (_descriptor.UseVersionFromMatchingStream)
Expand All @@ -168,8 +173,8 @@ private int BindBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadata
}
}

parameters[slot].Value = Revision;
parameters[slot].NpgsqlDbType = NpgsqlDbType.Bigint;
parameters[slot].Value = revisionValue;
parameters[slot].NpgsqlDbType = revisionDbType;
return slot + 1;
}

Expand Down
29 changes: 19 additions & 10 deletions src/Marten/Internal/ClosedShape/ClosedShapeUpdateOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,16 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session)
}
else if (_descriptor.ConcurrencyMode == ConcurrencyMode.Numeric)
{
// WHERE (? = 0 or {table}.mt_version < ?) — bind raw
// Revision to both slots.
parameters[slot].Value = Revision;
parameters[slot].NpgsqlDbType = NpgsqlDbType.Bigint;
parameters[slot + 1].Value = Revision;
parameters[slot + 1].NpgsqlDbType = NpgsqlDbType.Bigint;
// WHERE (? = 0 or {table}.mt_version < ?) — bind raw Revision to both slots.
// #4614: parameter type tracks the column width (integer/bigint).
var revisionDbType = _descriptor.RevisionBinder!.ColumnDbType;
var revisionValue = revisionDbType == NpgsqlDbType.Integer
? (object)checked((int)Revision)
: Revision;
parameters[slot].Value = revisionValue;
parameters[slot].NpgsqlDbType = revisionDbType;
parameters[slot + 1].Value = revisionValue;
parameters[slot + 1].NpgsqlDbType = revisionDbType;
}
}

Expand Down Expand Up @@ -177,10 +181,15 @@ private int BindBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadata
ReferenceEquals(binder, _descriptor.RevisionBinder))
{
// SET mt_version = CASE WHEN ? = 0 THEN current+1 ELSE ? END
parameters[slot].Value = Revision;
parameters[slot].NpgsqlDbType = NpgsqlDbType.Bigint;
parameters[slot + 1].Value = Revision;
parameters[slot + 1].NpgsqlDbType = NpgsqlDbType.Bigint;
// #4614: parameter type tracks the column width (integer/bigint).
var revisionDbType = _descriptor.RevisionBinder.ColumnDbType;
var revisionValue = revisionDbType == NpgsqlDbType.Integer
? (object)checked((int)Revision)
: Revision;
parameters[slot].Value = revisionValue;
parameters[slot].NpgsqlDbType = revisionDbType;
parameters[slot + 1].Value = revisionValue;
parameters[slot + 1].NpgsqlDbType = revisionDbType;
return slot + 2;
}

Expand Down
14 changes: 10 additions & 4 deletions src/Marten/Internal/ClosedShape/ClosedShapeUpsertOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,14 @@ private int BindBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadata
// UseVersionFromMatchingStream + conjoined: same with extra ? for tenant_id (4 slots)
// The ON CONFLICT SET / WHERE branches always reference {table}.id
// directly so they keep their 4 revision slots downstream.
parameters[slot].Value = Revision;
parameters[slot].NpgsqlDbType = NpgsqlDbType.Bigint;
// #4614: the parameter type follows the column width (integer for IRevisioned,
// bigint for ILongVersioned) so the CASE expression's branch types align.
var revisionDbType = _descriptor.RevisionBinder.ColumnDbType;
var revisionValue = revisionDbType == NpgsqlDbType.Integer
? (object)checked((int)Revision)
: Revision;
parameters[slot].Value = revisionValue;
parameters[slot].NpgsqlDbType = revisionDbType;
slot++;

if (_descriptor.UseVersionFromMatchingStream)
Expand All @@ -206,8 +212,8 @@ private int BindBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadata
}
}

parameters[slot].Value = Revision;
parameters[slot].NpgsqlDbType = NpgsqlDbType.Bigint;
parameters[slot].Value = revisionValue;
parameters[slot].NpgsqlDbType = revisionDbType;
return slot + 1;
}

Expand Down
Loading
Loading