Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions src/Marten.MemoryPack.Tests/MemoryPackEventTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,147 @@ public async Task pre_existing_json_rows_still_read_after_feature_is_in_place()
// pattern from PR #4576.
[CollectionDefinition("Marten.MemoryPack", DisableParallelization = true)]
public class MemoryPackCollection;

// #4515 upgrade-path test: prove the *changeover* works on an existing
// store. Two DocumentStore instances point at the same database with the
// same schema name; the first is configured without any binary
// serialization, the second adds `opts.Events.UseMemoryPackSerializer()`
// (the only delta). The test isn't reusing the fixture's shared store
// because each phase needs its own DocumentStore lifecycle.
[Collection("Marten.MemoryPack")]
public class binary_serialization_upgrade_tests
{
private const string Schema = "memorypack_upgrade";

[Fact]
public async Task enabling_binary_serialization_on_existing_store_still_reads_old_json_events_and_appends_new_binary()
{
var streamId = Guid.NewGuid();

// ----- Phase 1 ---------------------------------------------------
// Store with NO binary serialization wired. Uses a JSON-only event
// type. This is what an existing production system looks like before
// the binary opt-in is added.
await using (var storeBeforeBinary = DocumentStore.For(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = Schema;
opts.AutoCreateSchemaObjects = JasperFx.AutoCreate.All;
// Rich on purpose so the second-phase store can also be Rich without
// re-doing the schema bind ordering — kept identical to make the
// "only delta" promise visible. (If a future user has Quick on
// Phase 1, they'd hit AssertNoBinaryEventsForQuickMode when adding
// binary events in Phase 2 — that's the documented Phase-2 follow-up.)
opts.Events.AppendMode = EventAppendMode.Rich;
}))
{
// Clean slate — drop any leftover state from a prior test run.
await storeBeforeBinary.Advanced.Clean.CompletelyRemoveAllAsync();
await storeBeforeBinary.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

await using var session = storeBeforeBinary.LightweightSession();
session.Events.StartStream(streamId,
new TripCommentAdded(streamId, "from json-only store v1", DateTimeOffset.UtcNow),
new TripCommentAdded(streamId, "from json-only store v2", DateTimeOffset.UtcNow));
await session.SaveChangesAsync();
}

// ----- Phase 2 ---------------------------------------------------
// Same config except UseMemoryPackSerializer() is now wired. The
// [BinaryEvent]-marked types (TripStarted / TripEnded) resolve to
// MemoryPack at EventMapping construction.
await using (var storeWithBinary = DocumentStore.For(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = Schema; // SAME schema
opts.AutoCreateSchemaObjects = JasperFx.AutoCreate.All;
opts.Events.AppendMode = EventAppendMode.Rich;
opts.Events.UseMemoryPackSerializer(); // THE only delta
}))
{
// Schema apply — no-op for `data` / `bdata` (column already exists
// from Phase 1's migration; existing rows have bdata = NULL).
await storeWithBinary.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

// Append new BINARY events to the SAME stream that already holds
// two JSON-serialized events from Phase 1.
await using (var session = storeWithBinary.LightweightSession())
{
session.Events.Append(streamId,
new TripStarted(streamId, "Alice", DateTimeOffset.UtcNow),
new TripEnded(streamId, DateTimeOffset.UtcNow, 42.50m));
await session.SaveChangesAsync();
}

// Read back the whole stream — must replay all four events
// in the correct order, JSON and binary deserializing through
// the per-row dispatch on bdata IS NULL.
await using (var query = storeWithBinary.QuerySession())
{
var events = await query.Events.FetchStreamAsync(streamId);

events.Count.ShouldBe(4);

// Phase 1 JSON events
events[0].Data.ShouldBeOfType<TripCommentAdded>()
.Comment.ShouldBe("from json-only store v1");
events[1].Data.ShouldBeOfType<TripCommentAdded>()
.Comment.ShouldBe("from json-only store v2");

// Phase 2 binary events
events[2].Data.ShouldBeOfType<TripStarted>().DriverName.ShouldBe("Alice");
events[3].Data.ShouldBeOfType<TripEnded>().FareAmount.ShouldBe(42.50m);

// Version numbers must be monotonic across the format switch
// — the Phase 2 store has to see the existing stream
// version (2) and assign 3, 4 to the new events.
events.Select(e => e.Version).ShouldBe(new long[] { 1, 2, 3, 4 });
}

// Belt-and-braces: also append a NEW JSON event (the JSON
// path is supposed to keep working in the same store, on the
// same stream, alongside binary appends).
await using (var session = storeWithBinary.LightweightSession())
{
session.Events.Append(streamId,
new TripCommentAdded(streamId, "post-upgrade JSON", DateTimeOffset.UtcNow));
await session.SaveChangesAsync();
}

await using (var query = storeWithBinary.QuerySession())
{
var events = await query.Events.FetchStreamAsync(streamId);
events.Count.ShouldBe(5);
events[4].Data.ShouldBeOfType<TripCommentAdded>()
.Comment.ShouldBe("post-upgrade JSON");
}

// Verify the on-disk shape — the JSON rows have bdata NULL,
// the binary rows don't. This is the row-level discriminator
// the read path keys off of.
await using (var conn = storeWithBinary.Storage.Database.CreateConnection())
{
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = $"select type, bdata is null as bdata_is_null " +
$"from {Schema}.mt_events where stream_id = $1 order by version";
var p = cmd.CreateParameter();
p.Value = streamId;
cmd.Parameters.Add(p);

var rows = new System.Collections.Generic.List<(string type, bool bdataIsNull)>();
await using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
rows.Add((reader.GetString(0), reader.GetBoolean(1)));
}

rows[0].ShouldBe(("trip_comment_added", true)); // phase 1 JSON
rows[1].ShouldBe(("trip_comment_added", true)); // phase 1 JSON
rows[2].ShouldBe(("trip_started", false)); // phase 2 binary
rows[3].ShouldBe(("trip_ended", false)); // phase 2 binary
rows[4].ShouldBe(("trip_comment_added", true)); // phase 2 JSON
}
}
}
}
Loading