diff --git a/src/Marten.MemoryPack.Tests/MemoryPackEventTests.cs b/src/Marten.MemoryPack.Tests/MemoryPackEventTests.cs index bc51b043c9..9d27016efb 100644 --- a/src/Marten.MemoryPack.Tests/MemoryPackEventTests.cs +++ b/src/Marten.MemoryPack.Tests/MemoryPackEventTests.cs @@ -197,3 +197,147 @@ public async Task pre_existing_json_rows_still_read_after_feature_is_in_place() // pattern from PR #4576. [CollectionDefinition("Marten.MemoryPack", DisableParallelization = true)] public class MemoryPackCollection; + +// #4515 upgrade-path test: prove the *changeover* works on an existing +// store. Two DocumentStore instances point at the same database with the +// same schema name; the first is configured without any binary +// serialization, the second adds `opts.Events.UseMemoryPackSerializer()` +// (the only delta). The test isn't reusing the fixture's shared store +// because each phase needs its own DocumentStore lifecycle. +[Collection("Marten.MemoryPack")] +public class binary_serialization_upgrade_tests +{ + private const string Schema = "memorypack_upgrade"; + + [Fact] + public async Task enabling_binary_serialization_on_existing_store_still_reads_old_json_events_and_appends_new_binary() + { + var streamId = Guid.NewGuid(); + + // ----- Phase 1 --------------------------------------------------- + // Store with NO binary serialization wired. Uses a JSON-only event + // type. This is what an existing production system looks like before + // the binary opt-in is added. + await using (var storeBeforeBinary = DocumentStore.For(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = Schema; + opts.AutoCreateSchemaObjects = JasperFx.AutoCreate.All; + // Rich on purpose so the second-phase store can also be Rich without + // re-doing the schema bind ordering — kept identical to make the + // "only delta" promise visible. (If a future user has Quick on + // Phase 1, they'd hit AssertNoBinaryEventsForQuickMode when adding + // binary events in Phase 2 — that's the documented Phase-2 follow-up.) + opts.Events.AppendMode = EventAppendMode.Rich; + })) + { + // Clean slate — drop any leftover state from a prior test run. + await storeBeforeBinary.Advanced.Clean.CompletelyRemoveAllAsync(); + await storeBeforeBinary.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + + await using var session = storeBeforeBinary.LightweightSession(); + session.Events.StartStream(streamId, + new TripCommentAdded(streamId, "from json-only store v1", DateTimeOffset.UtcNow), + new TripCommentAdded(streamId, "from json-only store v2", DateTimeOffset.UtcNow)); + await session.SaveChangesAsync(); + } + + // ----- Phase 2 --------------------------------------------------- + // Same config except UseMemoryPackSerializer() is now wired. The + // [BinaryEvent]-marked types (TripStarted / TripEnded) resolve to + // MemoryPack at EventMapping construction. + await using (var storeWithBinary = DocumentStore.For(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = Schema; // SAME schema + opts.AutoCreateSchemaObjects = JasperFx.AutoCreate.All; + opts.Events.AppendMode = EventAppendMode.Rich; + opts.Events.UseMemoryPackSerializer(); // THE only delta + })) + { + // Schema apply — no-op for `data` / `bdata` (column already exists + // from Phase 1's migration; existing rows have bdata = NULL). + await storeWithBinary.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + + // Append new BINARY events to the SAME stream that already holds + // two JSON-serialized events from Phase 1. + await using (var session = storeWithBinary.LightweightSession()) + { + session.Events.Append(streamId, + new TripStarted(streamId, "Alice", DateTimeOffset.UtcNow), + new TripEnded(streamId, DateTimeOffset.UtcNow, 42.50m)); + await session.SaveChangesAsync(); + } + + // Read back the whole stream — must replay all four events + // in the correct order, JSON and binary deserializing through + // the per-row dispatch on bdata IS NULL. + await using (var query = storeWithBinary.QuerySession()) + { + var events = await query.Events.FetchStreamAsync(streamId); + + events.Count.ShouldBe(4); + + // Phase 1 JSON events + events[0].Data.ShouldBeOfType() + .Comment.ShouldBe("from json-only store v1"); + events[1].Data.ShouldBeOfType() + .Comment.ShouldBe("from json-only store v2"); + + // Phase 2 binary events + events[2].Data.ShouldBeOfType().DriverName.ShouldBe("Alice"); + events[3].Data.ShouldBeOfType().FareAmount.ShouldBe(42.50m); + + // Version numbers must be monotonic across the format switch + // — the Phase 2 store has to see the existing stream + // version (2) and assign 3, 4 to the new events. + events.Select(e => e.Version).ShouldBe(new long[] { 1, 2, 3, 4 }); + } + + // Belt-and-braces: also append a NEW JSON event (the JSON + // path is supposed to keep working in the same store, on the + // same stream, alongside binary appends). + await using (var session = storeWithBinary.LightweightSession()) + { + session.Events.Append(streamId, + new TripCommentAdded(streamId, "post-upgrade JSON", DateTimeOffset.UtcNow)); + await session.SaveChangesAsync(); + } + + await using (var query = storeWithBinary.QuerySession()) + { + var events = await query.Events.FetchStreamAsync(streamId); + events.Count.ShouldBe(5); + events[4].Data.ShouldBeOfType() + .Comment.ShouldBe("post-upgrade JSON"); + } + + // Verify the on-disk shape — the JSON rows have bdata NULL, + // the binary rows don't. This is the row-level discriminator + // the read path keys off of. + await using (var conn = storeWithBinary.Storage.Database.CreateConnection()) + { + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = $"select type, bdata is null as bdata_is_null " + + $"from {Schema}.mt_events where stream_id = $1 order by version"; + var p = cmd.CreateParameter(); + p.Value = streamId; + cmd.Parameters.Add(p); + + var rows = new System.Collections.Generic.List<(string type, bool bdataIsNull)>(); + await using var reader = await cmd.ExecuteReaderAsync(); + while (await reader.ReadAsync()) + { + rows.Add((reader.GetString(0), reader.GetBoolean(1))); + } + + rows[0].ShouldBe(("trip_comment_added", true)); // phase 1 JSON + rows[1].ShouldBe(("trip_comment_added", true)); // phase 1 JSON + rows[2].ShouldBe(("trip_started", false)); // phase 2 binary + rows[3].ShouldBe(("trip_ended", false)); // phase 2 binary + rows[4].ShouldBe(("trip_comment_added", true)); // phase 2 JSON + } + } + } +}