diff --git a/Directory.Packages.props b/Directory.Packages.props index 5d98c605d8..030503e27e 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -24,6 +24,7 @@ + diff --git a/build/build.cs b/build/build.cs index 0d1a815f4c..a1049a1e0c 100644 --- a/build/build.cs +++ b/build/build.cs @@ -44,7 +44,8 @@ class Build : NukeBuild .DependsOn(TestNodaTime) .DependsOn(TestAspnetcore) .DependsOn(TestPostGIS) - .DependsOn(TestPgVector); + .DependsOn(TestPgVector) + .DependsOn(TestMemoryPack); Target Init => _ => _ .Executes(() => @@ -138,6 +139,18 @@ class Build : NukeBuild .SetFramework(Framework)); }); + Target TestMemoryPack => _ => _ + .ProceedAfterFailure() + .Executes(() => + { + DotNetTest(c => c + .SetProjectFile("src/Marten.MemoryPack.Tests") + .SetConfiguration(Configuration) + .EnableNoBuild() + .EnableNoRestore() + .SetFramework(Framework)); + }); + Target TestPostGIS => _ => _ .ProceedAfterFailure() .Executes(() => diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 705a84dd28..252ee8db7c 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -223,6 +223,7 @@ const config: UserConfig = { { text: 'Natural Keys', link: '/events/natural-keys' }, { text: 'Dynamic Consistency Boundary', link: '/events/dcb' }, { text: 'Optimizing Performance', link: '/events/optimizing' }, + { text: 'Binary Event Serialization', link: '/events/binary-serialization' }, { text: 'Projections Overview', link: '/events/projections/', collapsed: true, items: [ diff --git a/docs/events/binary-serialization.md b/docs/events/binary-serialization.md new file mode 100644 index 0000000000..bd6aa85356 --- /dev/null +++ b/docs/events/binary-serialization.md @@ -0,0 +1,168 @@ +# Binary Event Serialization + +Marten can serialize individual event types to a binary wire format +([MemoryPack](https://github.com/Cysharp/MemoryPack), +[MessagePack](https://msgpack.org/), or anything else implementing +`IEventBinarySerializer`) instead of the default JSON, trading a few of JSON's +ergonomic wins for a meaningful throughput and storage-size improvement on +hot streams. See [#4515](https://github.com/JasperFx/marten/issues/4515) for +the design discussion. + +The opt-in is **per event type** — binary-serialized and JSON-serialized events +coexist in the same `mt_events` table, so the feature can be rolled out on an +existing store with **no migration of existing data**. + +## How it works + +A second column, `bdata bytea NULL`, sits alongside the existing `data jsonb NOT NULL` +on `mt_events`. The row-level discriminator is `bdata IS NULL`: + +| When | `data` | `bdata` | +| --- | --- | --- | +| Event uses the JSON serializer | full JSON payload | `NULL` | +| Event uses an `IEventBinarySerializer` | the placeholder `'{}'::jsonb` | the serialized bytes | + +On read, Marten inspects `bdata`: + +- `NULL` → existing JSON deserialization path. Pre-feature rows continue to work without conversion. +- non-null → `IEventBinarySerializer.Deserialize(eventType, bytes)`. + +Because the discriminator is on the row and the serializer is resolved per +event type, the same stream can carry rows of either format with no special +handling at the call site. + +## Quick start with `Marten.MemoryPack` + +The companion `Marten.MemoryPack` NuGet package ships a ready-to-use +`IEventBinarySerializer` over MemoryPack: + +```shell +dotnet add package Marten.MemoryPack +``` + +Mark event types you want to serialize as binary with both +`[BinaryEvent]` (so Marten picks them up) and `[MemoryPackable]` (so MemoryPack +can serialize them): + +```csharp +using Marten.Events; +using MemoryPack; + +[BinaryEvent] +[MemoryPackable] +public partial record TripStarted(Guid TripId, string DriverName, DateTimeOffset StartedAt); +``` + +Wire MemoryPack as the store-wide fallback for `[BinaryEvent]` types: + +```csharp +using Marten.MemoryPack; + +var store = DocumentStore.For(opts => +{ + opts.Connection(connectionString); + + // Phase 1 limitation — see "Constraints" below. + opts.Events.AppendMode = EventAppendMode.Rich; + + // Wire MemoryPack as DefaultBinarySerializer. [BinaryEvent]-marked + // event types resolve to this serializer on registration. + opts.Events.UseMemoryPackSerializer(); +}); +``` + +Now `TripStarted` writes through MemoryPack to `bdata`; un-marked events +continue to write JSON to `data`. + +## Registration ergonomics + +Two equivalent ways to opt an event type in: + +```csharp +// 1. Attribute-driven — uses opts.Events.DefaultBinarySerializer as the resolver. +[BinaryEvent] +[MemoryPackable] +public partial record TripEnded(Guid TripId, DateTimeOffset EndedAt); + +// 2. Fluent — wire an explicit per-type serializer (overrides any default). +opts.Events.UseBinarySerializer(new MemoryPackEventSerializer()); +``` + +Resolution order on `EventMapping` construction: + +1. Explicit `opts.Events.UseBinarySerializer(...)` for that type. +2. `[BinaryEvent]` attribute + `opts.Events.DefaultBinarySerializer`. +3. Otherwise, plain JSON (existing path). + +If a type carries `[BinaryEvent]` but no per-type serializer was wired AND +`DefaultBinarySerializer` is `null`, Marten throws at the first append with a +remediation message naming both registration entry points. + +## Bring your own serializer + +`IEventBinarySerializer` is small enough to implement directly against any +binary format — MessagePack, protobuf, etc.: + +```csharp +public interface IEventBinarySerializer +{ + byte[] Serialize(Type type, object data); + object Deserialize(Type type, byte[] data); +} +``` + +The serializer is a singleton — keep its state thread-safe. + +## On-disk shape + +For binary events, `data` holds the literal `{}` placeholder so the existing +`data jsonb NOT NULL` constraint stays intact (no schema relaxation): + +```sql +-- binary-serialized event +select type, data::text, bdata is null +from mt_events where seq_id = 42; +-- type | data | bdata is null +-- --------------|------|--------------- +-- trip_started | {} | false + +-- JSON-serialized event in the same stream +select type, data::text, bdata is null +from mt_events where seq_id = 43; +-- type | data | bdata is null +-- --------------------- |---------------------------------|--------------- +-- trip_comment_added | {"comment": "looking good", …} | true +``` + +## Migration + +Purely additive: the only schema change is `bdata bytea NULL` on `mt_events`. +Existing rows have `bdata = NULL` (the column's default for prior data) and +read through the JSON path. Marten's standard schema migration creates the +column for existing installations — no event data conversion required. + +## Constraints + +The 9.3 cut ships with deliberate scope: + +- **`EventAppendMode.Rich` only.** The default `QuickWithServerTimestamps` and + `Quick` modes go through the `mt_quick_append_events` PostgreSQL function, + whose signature would need a parallel `bdata bytea[]` parameter to carry + binary payloads. Until that lands, `BuildQuickDescriptor` / + `BuildQuickWithServerTimestampsDescriptor` throw at store-build time if any + binary event type is registered. Workaround: set + `opts.Events.AppendMode = EventAppendMode.Rich;`. +- **No bulk appender support.** `BulkEventAppender` uses Npgsql `COPY` with + the existing column shape; adding the `bdata` column to the COPY format + is part of the same Quick-mode follow-up. +- **No upcaster support.** Marten's + [event upcasters](/events/versioning) operate on JSON payloads and don't + generalize to a `byte[]` wire form. Binary event upcasters need their own + typed transform shape; tracked as a deferred follow-up. For now, design + binary event schemas with forward-compatibility in the serializer itself + (MemoryPack's `[MemoryPackOrder]` evolution, for example). + +## See also + +- [Optimizing Event Store Performance and Scalability](/events/optimizing) +- [Event Versioning](/events/versioning) (JSON upcasters) diff --git a/docs/events/optimizing.md b/docs/events/optimizing.md index 461f871c6a..cfb7c36df5 100644 --- a/docs/events/optimizing.md +++ b/docs/events/optimizing.md @@ -11,6 +11,13 @@ very large data loads. Marten has several options to potentially increase the performance and scalability of a system that uses the event sourcing functionality: +::: tip +For hot streams where JSON serialization overhead dominates, see +[Binary Event Serialization](/events/binary-serialization) — opt individual +event types into MemoryPack (or any `IEventBinarySerializer`) on a +per-type basis, with no migration of existing JSON-serialized events. +::: + ```cs diff --git a/src/Marten.MemoryPack.Tests/Marten.MemoryPack.Tests.csproj b/src/Marten.MemoryPack.Tests/Marten.MemoryPack.Tests.csproj new file mode 100644 index 0000000000..cbfc726dd9 --- /dev/null +++ b/src/Marten.MemoryPack.Tests/Marten.MemoryPack.Tests.csproj @@ -0,0 +1,22 @@ + + + false + enable + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + diff --git a/src/Marten.MemoryPack.Tests/MemoryPackEventTests.cs b/src/Marten.MemoryPack.Tests/MemoryPackEventTests.cs new file mode 100644 index 0000000000..bc51b043c9 --- /dev/null +++ b/src/Marten.MemoryPack.Tests/MemoryPackEventTests.cs @@ -0,0 +1,199 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using JasperFx.Events; +using Marten; +using Marten.Events; +using Marten.MemoryPack; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace Marten.MemoryPack.Tests; + +[Collection("Marten.MemoryPack")] +public class MemoryPackEventTests: IAsyncLifetime +{ + private DocumentStore _store = null!; + + public async Task InitializeAsync() + { + _store = DocumentStore.For(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = "memorypack_events"; + opts.AutoCreateSchemaObjects = JasperFx.AutoCreate.All; + + // #4515 Phase 1: binary serialization currently only supported on + // the Rich append path. Force-Rich here; the Quick descriptor + // builders throw with a clear remediation message if a binary + // event type is registered while AppendMode is non-Rich. + opts.Events.AppendMode = EventAppendMode.Rich; + + // Wire MemoryPack as the store-wide fallback for [BinaryEvent] + // types. TripStarted/PassengerPickedUp/TripEnded resolve to this + // serializer through the attribute. + opts.Events.UseMemoryPackSerializer(); + }); + + await _store.Advanced.Clean.CompletelyRemoveAllAsync(); + await _store.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + } + + public Task DisposeAsync() + { + _store?.Dispose(); + return Task.CompletedTask; + } + + [Fact] + public async Task can_round_trip_a_single_binary_event() + { + var streamId = Guid.NewGuid(); + var started = new TripStarted(streamId, "Alice", DateTimeOffset.UtcNow); + + await using (var session = _store.LightweightSession()) + { + session.Events.StartStream(streamId, started); + await session.SaveChangesAsync(); + } + + await using var query = _store.QuerySession(); + var events = await query.Events.FetchStreamAsync(streamId); + + events.Count.ShouldBe(1); + var loaded = events[0].Data.ShouldBeOfType(); + loaded.TripId.ShouldBe(streamId); + loaded.DriverName.ShouldBe("Alice"); + loaded.StartedAt.ShouldBe(started.StartedAt); + } + + [Fact] + public async Task multiple_binary_events_replay_in_order() + { + var streamId = Guid.NewGuid(); + var startedAt = DateTimeOffset.UtcNow; + + await using (var session = _store.LightweightSession()) + { + session.Events.StartStream(streamId, + new TripStarted(streamId, "Bob", startedAt), + new PassengerPickedUp(streamId, "Carol", startedAt.AddMinutes(5)), + new TripEnded(streamId, startedAt.AddMinutes(30), 24.50m)); + await session.SaveChangesAsync(); + } + + await using var query = _store.QuerySession(); + var events = await query.Events.FetchStreamAsync(streamId); + + events.Count.ShouldBe(3); + events[0].Data.ShouldBeOfType().DriverName.ShouldBe("Bob"); + events[1].Data.ShouldBeOfType().PassengerName.ShouldBe("Carol"); + events[2].Data.ShouldBeOfType().FareAmount.ShouldBe(24.50m); + } + + // The flagship test: the same stream carries JSON-serialized events and + // binary-serialized events. Round-trip must work for both, demonstrating + // the per-row dispatch (bdata IS NULL ⇒ JSON path; non-null ⇒ binary). + [Fact] + public async Task json_and_binary_events_coexist_on_one_stream() + { + var streamId = Guid.NewGuid(); + + await using (var session = _store.LightweightSession()) + { + session.Events.StartStream(streamId, + new TripStarted(streamId, "Dana", DateTimeOffset.UtcNow), // binary + new TripCommentAdded(streamId, "looking good", DateTimeOffset.UtcNow), // JSON + new PassengerPickedUp(streamId, "Eli", DateTimeOffset.UtcNow), // binary + new TripCommentAdded(streamId, "passenger boarded", DateTimeOffset.UtcNow)); // JSON + await session.SaveChangesAsync(); + } + + await using var query = _store.QuerySession(); + var events = await query.Events.FetchStreamAsync(streamId); + + events.Count.ShouldBe(4); + events[0].Data.ShouldBeOfType().DriverName.ShouldBe("Dana"); + events[1].Data.ShouldBeOfType().Comment.ShouldBe("looking good"); + events[2].Data.ShouldBeOfType().PassengerName.ShouldBe("Eli"); + events[3].Data.ShouldBeOfType().Comment.ShouldBe("passenger boarded"); + } + + [Fact] + public async Task binary_events_land_in_bdata_column_jsoned_events_land_in_data_column() + { + var streamId = Guid.NewGuid(); + + await using (var session = _store.LightweightSession()) + { + session.Events.StartStream(streamId, + new TripStarted(streamId, "Frank", DateTimeOffset.UtcNow), // binary + new TripCommentAdded(streamId, "hello", DateTimeOffset.UtcNow)); // JSON + await session.SaveChangesAsync(); + } + + // Pull the raw column values to verify the on-disk shape — the + // discriminator is bdata IS NULL. + await using var conn = _store.Storage.Database.CreateConnection(); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "select type, bdata is null as bdata_is_null, data::text " + + "from memorypack_events.mt_events where stream_id = $1 order by version"; + var p = cmd.CreateParameter(); + p.Value = streamId; + cmd.Parameters.Add(p); + + await using var reader = await cmd.ExecuteReaderAsync(); + var rows = new System.Collections.Generic.List<(string type, bool bdataIsNull, string data)>(); + while (await reader.ReadAsync()) + { + rows.Add((reader.GetString(0), reader.GetBoolean(1), reader.GetString(2))); + } + + rows.Count.ShouldBe(2); + + // First event is binary — bdata IS NOT NULL, data is the {} placeholder + rows[0].type.ShouldBe("trip_started"); + rows[0].bdataIsNull.ShouldBeFalse(); + rows[0].data.ShouldBe("{}"); + + // Second event is JSON — bdata IS NULL, data has the real payload + rows[1].type.ShouldBe("trip_comment_added"); + rows[1].bdataIsNull.ShouldBeTrue(); + rows[1].data.ShouldContain("hello"); + } + + // Upgrade-backfill: simulate a row written *before* the bdata column + // existed (NULL bdata, real JSON in data) and confirm it still reads + // through the JSON path after the feature is in place. This is what + // makes the migration safe — no event data needs converting. + [Fact] + public async Task pre_existing_json_rows_still_read_after_feature_is_in_place() + { + var streamId = Guid.NewGuid(); + + // Write through the standard JSON path first (a non-binary event type). + await using (var session = _store.LightweightSession()) + { + session.Events.StartStream(streamId, + new TripCommentAdded(streamId, "pre-upgrade comment", DateTimeOffset.UtcNow)); + await session.SaveChangesAsync(); + } + + // The row is in mt_events with bdata = NULL. Read it back — the + // per-row dispatch should fall through to mapping.ReadEventData. + await using var query = _store.QuerySession(); + var events = await query.Events.FetchStreamAsync(streamId); + + events.Count.ShouldBe(1); + events[0].Data.ShouldBeOfType() + .Comment.ShouldBe("pre-upgrade comment"); + } +} + +// Forces both binary test classes into one collection so they don't +// race on the shared mt_events schema; mirrors the Marten.PgVector +// pattern from PR #4576. +[CollectionDefinition("Marten.MemoryPack", DisableParallelization = true)] +public class MemoryPackCollection; diff --git a/src/Marten.MemoryPack.Tests/TestEvents.cs b/src/Marten.MemoryPack.Tests/TestEvents.cs new file mode 100644 index 0000000000..474021464e --- /dev/null +++ b/src/Marten.MemoryPack.Tests/TestEvents.cs @@ -0,0 +1,24 @@ +using System; +using Marten.Events; +using MemoryPack; + +namespace Marten.MemoryPack.Tests; + +// Binary event types — opted in via attribute. With +// opts.Events.UseMemoryPackSerializer() set as the store-wide fallback, +// these resolve to the MemoryPack serializer on registration. +[BinaryEvent] +[MemoryPackable] +public partial record TripStarted(Guid TripId, string DriverName, DateTimeOffset StartedAt); + +[BinaryEvent] +[MemoryPackable] +public partial record PassengerPickedUp(Guid TripId, string PassengerName, DateTimeOffset PickedUpAt); + +[BinaryEvent] +[MemoryPackable] +public partial record TripEnded(Guid TripId, DateTimeOffset EndedAt, decimal FareAmount); + +// JSON event — coexists in the same store and is appended without +// MemoryPack on the wire. Used by the mixed-stream test. +public record TripCommentAdded(Guid TripId, string Comment, DateTimeOffset At); diff --git a/src/Marten.MemoryPack/Marten.MemoryPack.csproj b/src/Marten.MemoryPack/Marten.MemoryPack.csproj new file mode 100644 index 0000000000..bc2f902a41 --- /dev/null +++ b/src/Marten.MemoryPack/Marten.MemoryPack.csproj @@ -0,0 +1,23 @@ + + + MemoryPack binary event serialization for Marten — an IEventBinarySerializer implementation for the per-event-type binary serialization opt-in (#4515). + true + true + true + false + true + true + true + true + enable + Marten.MemoryPack + + + + + + + + + + diff --git a/src/Marten.MemoryPack/MemoryPackEventSerializer.cs b/src/Marten.MemoryPack/MemoryPackEventSerializer.cs new file mode 100644 index 0000000000..42bfa619b3 --- /dev/null +++ b/src/Marten.MemoryPack/MemoryPackEventSerializer.cs @@ -0,0 +1,34 @@ +using System; +using Marten.Events; +using MemoryPack; + +namespace Marten.MemoryPack; + +/// +/// MemoryPack-backed . Wire it up +/// either per event type via opts.Events.UseBinarySerializer<TEvent>(new MemoryPackEventSerializer()) +/// or store-wide via +/// +/// to make [BinaryEvent]-marked types resolve to this serializer. +/// +/// +/// Event types must be MemoryPack-serializable — typically a +/// partial type decorated with [MemoryPackable]. See the +/// MemoryPack docs. +/// +public sealed class MemoryPackEventSerializer: IEventBinarySerializer +{ + public byte[] Serialize(Type type, object data) + { + if (data is null) throw new ArgumentNullException(nameof(data)); + return MemoryPackSerializer.Serialize(type, data); + } + + public object Deserialize(Type type, byte[] data) + { + return MemoryPackSerializer.Deserialize(type, data) + ?? throw new InvalidOperationException( + $"MemoryPack deserialization returned null for type '{type.FullName}'. " + + $"Ensure the wire payload was produced by a compatible MemoryPack serializer."); + } +} diff --git a/src/Marten.MemoryPack/MemoryPackEventSerializerExtensions.cs b/src/Marten.MemoryPack/MemoryPackEventSerializerExtensions.cs new file mode 100644 index 0000000000..634ebf16cd --- /dev/null +++ b/src/Marten.MemoryPack/MemoryPackEventSerializerExtensions.cs @@ -0,0 +1,33 @@ +using Marten.Events; + +namespace Marten.MemoryPack; + +/// +/// Sugar over + +/// for the +/// common case of wiring MemoryPack as the binary serializer for an +/// event store. +/// +public static class MemoryPackEventSerializerExtensions +{ + /// + /// Register as the store-wide + /// fallback binary serializer. Combine with the + /// on individual event types to opt + /// them in without a per-type fluent call. + /// + public static IEventStoreOptions UseMemoryPackSerializer(this IEventStoreOptions events) + { + events.DefaultBinarySerializer = new MemoryPackEventSerializer(); + return events; + } + + /// + /// Register a single event type as MemoryPack-serialized — shorthand for + /// events.UseBinarySerializer<TEvent>(new MemoryPackEventSerializer()). + /// + public static IEventStoreOptions UseMemoryPackSerializer(this IEventStoreOptions events) + { + return events.UseBinarySerializer(new MemoryPackEventSerializer()); + } +} diff --git a/src/Marten.slnx b/src/Marten.slnx index b4e281e65d..4c5a4fe2e7 100644 --- a/src/Marten.slnx +++ b/src/Marten.slnx @@ -16,6 +16,10 @@ + + + + diff --git a/src/Marten/EventStorage/ClosedShapeEventDocumentStorage.cs b/src/Marten/EventStorage/ClosedShapeEventDocumentStorage.cs index 43c1cb2b6d..833e6e2f2e 100644 --- a/src/Marten/EventStorage/ClosedShapeEventDocumentStorage.cs +++ b/src/Marten/EventStorage/ClosedShapeEventDocumentStorage.cs @@ -59,13 +59,17 @@ public ClosedShapeEventDocumentStorage(StoreOptions options): base(options) : EventStorageBuilder.Build(Events, _serializer); // Read-side column list for ApplyReaderDataToEvent (#4411). Built off - // EventsTable.SelectColumns() with positions 0/1/2 stripped — those - // are read by the base ISelector. Identical across all three - // append-mode variants (read shape doesn't depend on write shape) so - // we build it here instead of routing through EventStorage. + // EventsTable.SelectColumns() with positions 0/1/2/3 stripped: + // - 0/1/2 (data/type/mt_dotnet_type) are read by the base ISelector. + // - 3 (bdata, #4515) is read inline in EventDocumentStorage.Resolve to + // pick the JSON-vs-binary deserialization path; the bdata column's + // own ReadValueSync is a no-op so including it here would be wasted. + // Identical across all three append-mode variants (read shape doesn't + // depend on write shape) so we build it here instead of routing + // through EventStorage. _readerColumns = new Marten.Events.Schema.EventsTable(Events) .SelectColumns() - .Skip(3) + .Skip(4) .ToArray(); } @@ -139,7 +143,10 @@ public override void ApplyReaderDataToEvent(DbDataReader reader, IEvent e) // (default interface method falls through to the parameterless // ReadValueSync). HeadersColumn overrides this to deserialize // jsonb → Dictionary via the session's serializer. - _readerColumns[i].ReadValueSync(reader, i + 3, e, _serializer); + // Offset is + 4: 0/1/2 are data/type/mt_dotnet_type (base + // ISelector); 3 is bdata (#4515 — consumed inline in + // Resolve to pick the deserialization path). + _readerColumns[i].ReadValueSync(reader, i + 4, e, _serializer); } } @@ -148,7 +155,7 @@ public override async Task ApplyReaderDataToEventAsync(DbDataReader reader, IEve for (var i = 0; i < _readerColumns.Count; i++) { await _readerColumns[i] - .ReadValueAsync(reader, i + 3, e, _serializer, token) + .ReadValueAsync(reader, i + 4, e, _serializer, token) .ConfigureAwait(false); } } diff --git a/src/Marten/EventStorage/Dialects/PostgresEventStoreDialect.cs b/src/Marten/EventStorage/Dialects/PostgresEventStoreDialect.cs index 249be18550..835d5d6a4a 100644 --- a/src/Marten/EventStorage/Dialects/PostgresEventStoreDialect.cs +++ b/src/Marten/EventStorage/Dialects/PostgresEventStoreDialect.cs @@ -58,7 +58,25 @@ public RichEventStorageDescriptor BuildRichDescriptor(EventGraph graph, ISeriali insertStreamSql: BuildInsertStreamSql(graph), updateStreamVersionSql: BuildUpdateStreamVersionSql(graph), streamStateSelectSql: Marten.EventStorage.StreamStateSql.Build(graph), - serializeEventData: e => serializer.ToJson(e.Data), + // #4515 dispatch: binary events write a {} placeholder to `data` + // (the canonical "valid but empty" jsonb) and the real payload to + // `bdata`. JSON events write JSON to `data` and NULL to `bdata`. + // The split lets a single mt_events table hold both serialization + // formats without a schema flip. + serializeEventData: e => + { + var mapping = graph.EventMappingFor(e.EventType); + return mapping?.IsBinary == true + ? "{}" + : serializer.ToJson(e.Data); + }, + serializeEventBdata: e => + { + var mapping = graph.EventMappingFor(e.EventType); + return mapping?.IsBinary == true + ? mapping.BinarySerializer!.Serialize(e.EventType, e.Data) + : null; + }, metadataBinders: metadataBinders) { IsTenancyConjoined = isConjoined, @@ -256,6 +274,15 @@ private static Action BuildUpdateStreamVersionCom public QuickEventStorageDescriptor BuildQuickDescriptor(EventGraph graph, ISerializer serializer) { + // #4515 Phase 1 limitation: binary event serialization is implemented + // for the Rich append path only. The Quick path goes through the + // mt_quick_append_events server function whose signature is + // generated against `bytea[]` (UTF-8 JSON) for the data column; adding + // a parallel `bdata bytea[]` parameter (+ the function regen + bulk + // appender) is the explicit Phase 2 scope. Fail loud at store-build + // time rather than silently writing JSON for an opted-in binary type. + AssertNoBinaryEventsForQuickMode(graph, EventAppendMode.Quick); + var (isConjoined, isGuid, hasCausation, hasCorrelation, hasHeaders, hasUserName, hasTagWrites) = ReadQuickFlags(graph); @@ -269,7 +296,12 @@ public QuickEventStorageDescriptor BuildQuickDescriptor(EventGraph graph, ISeria insertStreamSql: BuildInsertStreamSql(graph), updateStreamVersionSql: BuildUpdateStreamVersionSql(graph), streamStateSelectSql: Marten.EventStorage.StreamStateSql.Build(graph), - serializeEventData: e => serializer.ToJson(e.Data)) + serializeEventData: e => serializer.ToJson(e.Data), + // #4515: Quick mode rejects binary events at build time (see + // AssertNoBinaryEventsForQuickMode), so bdata is always NULL + // here. The slot still exists because mt_events.bdata is part + // of every full-shape INSERT column list. + serializeEventBdata: _ => null) { IsGuidStreamIdentity = isGuid, IsTenancyConjoined = isConjoined, @@ -288,9 +320,37 @@ public QuickEventStorageDescriptor BuildQuickDescriptor(EventGraph graph, ISeria }; } + /// + /// #4515: enforce the Phase 1 constraint that binary event serialization + /// only ships on the Rich append path. If the user registered any + /// binary event types and is still on the default + /// (or + /// explicit ), throw with the + /// remediation recipe instead of writing wrong data. + /// + private static void AssertNoBinaryEventsForQuickMode(EventGraph graph, EventAppendMode mode) + { + var binaryEventTypes = graph.AllEvents() + .Where(e => e.IsBinary) + .Select(e => e.DocumentType.FullName ?? e.DocumentType.Name) + .ToArray(); + + if (binaryEventTypes.Length == 0) return; + + throw new InvalidOperationException( + $"Binary event serialization (#4515) is currently only supported with EventAppendMode.Rich, " + + $"but AppendMode is {mode} and the following event types are opted in to binary serialization: " + + $"{string.Join(", ", binaryEventTypes)}. " + + $"Set `opts.Events.AppendMode = EventAppendMode.Rich;` to use binary event serialization. " + + $"(Quick-mode support is tracked as a follow-up — the mt_quick_append_events server function " + + $"needs a parallel `bdata bytea[]` parameter.)"); + } + public QuickWithServerTimestampsEventStorageDescriptor BuildQuickWithServerTimestampsDescriptor( EventGraph graph, ISerializer serializer) { + AssertNoBinaryEventsForQuickMode(graph, EventAppendMode.QuickWithServerTimestamps); + var (isConjoined, isGuid, hasCausation, hasCorrelation, hasHeaders, hasUserName, hasTagWrites) = ReadQuickFlags(graph); @@ -304,7 +364,9 @@ public QuickWithServerTimestampsEventStorageDescriptor BuildQuickWithServerTimes insertStreamSql: BuildInsertStreamSql(graph), updateStreamVersionSql: BuildUpdateStreamVersionSql(graph), streamStateSelectSql: Marten.EventStorage.StreamStateSql.Build(graph), - serializeEventData: e => serializer.ToJson(e.Data)) + serializeEventData: e => serializer.ToJson(e.Data), + // #4515: see notes on the parallel call in BuildQuickDescriptor. + serializeEventBdata: _ => null) { IsGuidStreamIdentity = isGuid, IsTenancyConjoined = isConjoined, @@ -516,7 +578,7 @@ private static IEventMetadataBinder[] SelectRichMetadataBinders(IReadOnlyList private static bool IsCoreColumn(IEventTableColumn column) => column.Name is "id" or "stream_id" or "stream_key" or "version" - or "data" or "type" or "tenant_id" or "mt_dotnet_type" + or "data" or "bdata" or "type" or "tenant_id" or "mt_dotnet_type" or "timestamp"; /// diff --git a/src/Marten/EventStorage/Quick/QuickAppendEventWithVersionOperation.cs b/src/Marten/EventStorage/Quick/QuickAppendEventWithVersionOperation.cs index 473359f9c3..05913cd179 100644 --- a/src/Marten/EventStorage/Quick/QuickAppendEventWithVersionOperation.cs +++ b/src/Marten/EventStorage/Quick/QuickAppendEventWithVersionOperation.cs @@ -42,6 +42,7 @@ internal sealed class QuickAppendEventWithVersionOperation: AppendEventOperation private readonly IEventMetadataBinder[] _metadataBinders; private readonly bool _isGuidStreamIdentity; private readonly System.Func _serializeEventData; + private readonly System.Func _serializeEventBdata; public QuickAppendEventWithVersionOperation( string appendEventSqlPrefix, @@ -49,6 +50,7 @@ public QuickAppendEventWithVersionOperation( IEventMetadataBinder[] metadataBinders, bool isGuidStreamIdentity, System.Func serializeEventData, + System.Func serializeEventBdata, StreamAction stream, IEvent e) : base(stream, e) @@ -58,6 +60,7 @@ public QuickAppendEventWithVersionOperation( _metadataBinders = metadataBinders; _isGuidStreamIdentity = isGuidStreamIdentity; _serializeEventData = serializeEventData; + _serializeEventBdata = serializeEventBdata; } public override void ConfigureCommand(ICommandBuilder builder, IMartenSession session) @@ -70,6 +73,14 @@ public override void ConfigureCommand(ICommandBuilder builder, IMartenSession se pb.AppendParameter(_serializeEventData(Event), NpgsqlDbType.Jsonb); pb.AppendParameter(Event.EventTypeName, NpgsqlDbType.Varchar); pb.AppendParameter(Event.DotNetTypeName, NpgsqlDbType.Varchar); + + // #4515: bdata bytea (nullable). NULL for JSON-serialized events; + // bytes for binary-serialized events. Pinned at SELECT position 3 + // (right after mt_dotnet_type) by EventsTable.SelectColumns, so the + // bind sequence here mirrors that position. + var bdataBytes = _serializeEventBdata(Event); + pb.AppendParameter(bdataBytes ?? (object)System.DBNull.Value, NpgsqlDbType.Bytea); + pb.AppendParameter(Event.Id, NpgsqlDbType.Uuid); if (_isGuidStreamIdentity) diff --git a/src/Marten/EventStorage/Quick/QuickEventStorage.cs b/src/Marten/EventStorage/Quick/QuickEventStorage.cs index 0fa6e7793e..5266b55ca5 100644 --- a/src/Marten/EventStorage/Quick/QuickEventStorage.cs +++ b/src/Marten/EventStorage/Quick/QuickEventStorage.cs @@ -35,6 +35,7 @@ public override IStorageOperation AppendEvent(IMartenSession session, StreamActi _descriptor.AppendEventFullMetadataBinders, _descriptor.IsGuidStreamIdentity, _descriptor.SerializeEventData, + _descriptor.SerializeEventBdata, stream, @event); @@ -45,6 +46,7 @@ public override IStorageOperation QuickAppendEventWithVersion(StreamAction strea _descriptor.MetadataBinders, _descriptor.IsGuidStreamIdentity, _descriptor.SerializeEventData, + _descriptor.SerializeEventBdata, stream, @event); diff --git a/src/Marten/EventStorage/Quick/QuickEventStorageDescriptor.cs b/src/Marten/EventStorage/Quick/QuickEventStorageDescriptor.cs index a2ead32694..66e39941b2 100644 --- a/src/Marten/EventStorage/Quick/QuickEventStorageDescriptor.cs +++ b/src/Marten/EventStorage/Quick/QuickEventStorageDescriptor.cs @@ -38,13 +38,15 @@ public QuickEventStorageDescriptor( string insertStreamSql, string updateStreamVersionSql, string streamStateSelectSql, - Func serializeEventData) + Func serializeEventData, + Func serializeEventBdata) { QuickAppendEventsSql = quickAppendEventsSql; InsertStreamSql = insertStreamSql; UpdateStreamVersionSql = updateStreamVersionSql; StreamStateSelectSql = streamStateSelectSql; SerializeEventData = serializeEventData; + SerializeEventBdata = serializeEventBdata; } /// @@ -62,6 +64,18 @@ public QuickEventStorageDescriptor( public string StreamStateSelectSql { get; } public Func SerializeEventData { get; } + /// + /// #4515: serializer for the bdata bytea column on the per-event + /// QuickWithVersion INSERT shape. In Quick modes, binary event types + /// are rejected at descriptor-build time (see + /// PostgresEventStoreDialect.AssertNoBinaryEventsForQuickMode), + /// so the dialect installs a closure that always returns null — + /// bdata binds as DBNull. The slot still exists because + /// mt_events.bdata is part of the column list on every + /// full-shape INSERT. + /// + public Func SerializeEventBdata { get; } + /// Guid stream identity (writeId) vs string identity (writeKey). public bool IsGuidStreamIdentity { get; init; } diff --git a/src/Marten/EventStorage/QuickWithServerTimestamps/QuickWithServerTimestampsEventStorage.cs b/src/Marten/EventStorage/QuickWithServerTimestamps/QuickWithServerTimestampsEventStorage.cs index 4de51e089c..d1bb550404 100644 --- a/src/Marten/EventStorage/QuickWithServerTimestamps/QuickWithServerTimestampsEventStorage.cs +++ b/src/Marten/EventStorage/QuickWithServerTimestamps/QuickWithServerTimestampsEventStorage.cs @@ -33,6 +33,7 @@ public override IStorageOperation AppendEvent(IMartenSession session, StreamActi _descriptor.AppendEventFullMetadataBinders, _descriptor.IsGuidStreamIdentity, _descriptor.SerializeEventData, + _descriptor.SerializeEventBdata, stream, @event); @@ -43,6 +44,7 @@ public override IStorageOperation QuickAppendEventWithVersion(StreamAction strea _descriptor.MetadataBinders, _descriptor.IsGuidStreamIdentity, _descriptor.SerializeEventData, + _descriptor.SerializeEventBdata, stream, @event); diff --git a/src/Marten/EventStorage/QuickWithServerTimestamps/QuickWithServerTimestampsEventStorageDescriptor.cs b/src/Marten/EventStorage/QuickWithServerTimestamps/QuickWithServerTimestampsEventStorageDescriptor.cs index ec80b74389..fe8200804e 100644 --- a/src/Marten/EventStorage/QuickWithServerTimestamps/QuickWithServerTimestampsEventStorageDescriptor.cs +++ b/src/Marten/EventStorage/QuickWithServerTimestamps/QuickWithServerTimestampsEventStorageDescriptor.cs @@ -29,13 +29,15 @@ public QuickWithServerTimestampsEventStorageDescriptor( string insertStreamSql, string updateStreamVersionSql, string streamStateSelectSql, - Func serializeEventData) + Func serializeEventData, + Func serializeEventBdata) { QuickAppendEventsWithServerTimestampsSql = quickAppendEventsWithServerTimestampsSql; InsertStreamSql = insertStreamSql; UpdateStreamVersionSql = updateStreamVersionSql; StreamStateSelectSql = streamStateSelectSql; SerializeEventData = serializeEventData; + SerializeEventBdata = serializeEventBdata; } /// @@ -49,6 +51,14 @@ public QuickWithServerTimestampsEventStorageDescriptor( public string StreamStateSelectSql { get; } public Func SerializeEventData { get; } + /// + /// #4515: serializer for the bdata bytea column on the per-event + /// QuickWithVersion INSERT shape. Binary event types are rejected at + /// descriptor-build time in Quick modes, so this always returns + /// null — see . + /// + public Func SerializeEventBdata { get; } + /// Guid stream identity (writeId) vs string identity (writeKey). public bool IsGuidStreamIdentity { get; init; } diff --git a/src/Marten/EventStorage/Rich/RichAppendEventOperation.cs b/src/Marten/EventStorage/Rich/RichAppendEventOperation.cs index 6da211de98..b4a3a1cf11 100644 --- a/src/Marten/EventStorage/Rich/RichAppendEventOperation.cs +++ b/src/Marten/EventStorage/Rich/RichAppendEventOperation.cs @@ -60,6 +60,14 @@ public override void ConfigureCommand(ICommandBuilder builder, IMartenSession se pb.AppendParameter(Event.EventTypeName, NpgsqlDbType.Varchar); pb.AppendParameter(Event.DotNetTypeName, NpgsqlDbType.Varchar); + + // #4515: bdata bytea (nullable). For JSON-serialized events this is + // NULL; for binary-serialized events it carries the payload. Column + // order matches EventsTable.SelectColumns — bdata is pinned at + // position 3 right after mt_dotnet_type. + var bdataBytes = _descriptor.SerializeEventBdata(Event); + pb.AppendParameter(bdataBytes ?? (object)System.DBNull.Value, NpgsqlDbType.Bytea); + pb.AppendParameter(Event.Id, NpgsqlDbType.Uuid); // stream_id — Guid streams use Stream.Id, string streams use Stream.Key. diff --git a/src/Marten/EventStorage/Rich/RichEventStorage.cs b/src/Marten/EventStorage/Rich/RichEventStorage.cs index ee3dfc5c2e..497a818a8b 100644 --- a/src/Marten/EventStorage/Rich/RichEventStorage.cs +++ b/src/Marten/EventStorage/Rich/RichEventStorage.cs @@ -44,6 +44,7 @@ public override IStorageOperation QuickAppendEventWithVersion(StreamAction strea _descriptor.MetadataBindersWithoutSequence, _descriptor.IsGuidStreamIdentity, _descriptor.SerializeEventData, + _descriptor.SerializeEventBdata, stream, @event); diff --git a/src/Marten/EventStorage/Rich/RichEventStorageDescriptor.cs b/src/Marten/EventStorage/Rich/RichEventStorageDescriptor.cs index 6a2cbdf16a..3a71eaf07f 100644 --- a/src/Marten/EventStorage/Rich/RichEventStorageDescriptor.cs +++ b/src/Marten/EventStorage/Rich/RichEventStorageDescriptor.cs @@ -26,6 +26,7 @@ public RichEventStorageDescriptor( string updateStreamVersionSql, string streamStateSelectSql, Func serializeEventData, + Func serializeEventBdata, IEventMetadataBinder[] metadataBinders) { AppendEventSqlPrefix = appendEventSqlPrefix; @@ -34,6 +35,7 @@ public RichEventStorageDescriptor( UpdateStreamVersionSql = updateStreamVersionSql; StreamStateSelectSql = streamStateSelectSql; SerializeEventData = serializeEventData; + SerializeEventBdata = serializeEventBdata; MetadataBinders = metadataBinders; } @@ -42,8 +44,23 @@ public RichEventStorageDescriptor( public string InsertStreamSql { get; } public string UpdateStreamVersionSql { get; } public string StreamStateSelectSql { get; } + + /// + /// Serializer for the data jsonb column. Returns the full JSON + /// payload for JSON-serialized events and the literal {} + /// placeholder for binary-serialized events (the real payload lives in + /// bdata in that case — see ). + /// public Func SerializeEventData { get; } + /// + /// #4515: serializer for the bdata bytea column. Returns the + /// serialized bytes for binary-serialized events; returns null + /// (bound as ) for JSON-serialized + /// events. + /// + public Func SerializeEventBdata { get; } + /// /// Ordered metadata-column binders. Rich-mode only — Quick-mode /// descriptors don't expose this because Quick's metadata binding is diff --git a/src/Marten/Events/BinaryEventAttribute.cs b/src/Marten/Events/BinaryEventAttribute.cs new file mode 100644 index 0000000000..97f5aa354a --- /dev/null +++ b/src/Marten/Events/BinaryEventAttribute.cs @@ -0,0 +1,32 @@ +#nullable enable +using System; + +namespace Marten.Events; + +/// +/// Marks an event type as binary-serialized — its data column in +/// mt_events is populated with the '{}'::jsonb placeholder and +/// the actual payload lives in bdata, serialized by an +/// . See +/// #4515. +/// +/// +/// +/// The serializer used for an attribute-marked type is resolved at +/// registration time: opts.Events.DefaultBinarySerializer is the +/// fallback when no explicit per-type serializer was wired via +/// opts.Events.UseBinarySerializer<TEvent>(serializer). If +/// the type is attribute-marked but neither a per-type nor a store-wide +/// serializer is configured, the store will throw at the first append. +/// +/// +/// JSON-serialized events and binary-serialized events coexist in the +/// same table on a per-event-type basis, so applying this attribute to +/// a single event type is a safe in-place change — existing JSON rows +/// continue to read through the JSON path. +/// +/// +[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct, Inherited = false)] +public sealed class BinaryEventAttribute: Attribute +{ +} diff --git a/src/Marten/Events/EventDocumentStorage.cs b/src/Marten/Events/EventDocumentStorage.cs index b31f1ca8cc..8f7bad3d7d 100644 --- a/src/Marten/Events/EventDocumentStorage.cs +++ b/src/Marten/Events/EventDocumentStorage.cs @@ -321,7 +321,31 @@ public IEvent Resolve(DbDataReader reader) } } - var @event = mapping.ReadEventData(_serializer, reader); + // #4515: per-row JSON-vs-binary dispatch. EventsTable pins bdata at + // column ordinal 3 right after data/type/mt_dotnet_type. bdata IS NULL + // means JSON-serialized event (existing path); non-null bytes mean + // binary-serialized — deserialize via the mapping's registered + // IEventBinarySerializer. + IEvent @event; + if (!reader.IsDBNull(3)) + { + if (mapping.BinarySerializer is null) + { + throw new InvalidOperationException( + $"Event row at mt_events.bdata is non-null but no IEventBinarySerializer is registered " + + $"for type '{mapping.DocumentType.FullName}'. Configure with " + + $"opts.Events.UseBinarySerializer<{mapping.DocumentType.Name}>(...) " + + $"or set opts.Events.DefaultBinarySerializer."); + } + + var bytes = reader.GetFieldValue(3); + var data = mapping.BinarySerializer.Deserialize(mapping.DocumentType, bytes); + @event = mapping.Wrap(data); + } + else + { + @event = mapping.ReadEventData(_serializer, reader); + } ApplyReaderDataToEvent(reader, @event); @@ -354,11 +378,42 @@ public async Task ResolveAsync(DbDataReader reader, CancellationToken to IEvent @event; try { - @event = await mapping.ReadEventDataAsync(_serializer, reader, token).ConfigureAwait(false); + // #4515: same per-row dispatch as the sync Resolve overload — bdata + // at ordinal 3 picks JSON-vs-binary deserialization. + if (!await reader.IsDBNullAsync(3, token).ConfigureAwait(false)) + { + if (mapping.BinarySerializer is null) + { + throw new InvalidOperationException( + $"Event row at mt_events.bdata is non-null but no IEventBinarySerializer is registered " + + $"for type '{mapping.DocumentType.FullName}'. Configure with " + + $"opts.Events.UseBinarySerializer<{mapping.DocumentType.Name}>(...) " + + $"or set opts.Events.DefaultBinarySerializer."); + } + + var bytes = await reader.GetFieldValueAsync(3, token).ConfigureAwait(false); + var data = mapping.BinarySerializer.Deserialize(mapping.DocumentType, bytes); + @event = mapping.Wrap(data); + } + else + { + @event = await mapping.ReadEventDataAsync(_serializer, reader, token).ConfigureAwait(false); + } } catch (Exception e) { - var sequence = await reader.GetFieldValueAsync(3, token).ConfigureAwait(false); + // #4515: mt_events.seq_id shifted from ordinal 3 to ordinal 4 after + // bdata's insertion (EventsTable.SelectColumns now pins data, type, + // mt_dotnet_type, bdata, seq_id at 0..4). + long sequence; + try + { + sequence = await reader.GetFieldValueAsync(4, token).ConfigureAwait(false); + } + catch + { + sequence = -1; + } throw new EventDeserializationFailureException(sequence, mapping, e); } diff --git a/src/Marten/Events/EventGraph.cs b/src/Marten/Events/EventGraph.cs index acf5f5b8e2..664f820c18 100644 --- a/src/Marten/Events/EventGraph.cs +++ b/src/Marten/Events/EventGraph.cs @@ -333,6 +333,68 @@ public override void AddEventType(Type eventType) _events.FillDefault(eventType); } + // #4515: per-event-type binary serializer registry. EventMapping's + // constructor calls ResolveBinarySerializerFor when the mapping is built + // (lazily on first use); UseBinarySerializer populates this dictionary + // ahead of time so the resolution lands on the registered instance. + private readonly System.Collections.Concurrent.ConcurrentDictionary _binarySerializerByType = new(); + + /// + public IEventBinarySerializer? DefaultBinarySerializer { get; set; } + + /// + public IEventStoreOptions UseBinarySerializer(IEventBinarySerializer serializer) + { + if (serializer == null) throw new ArgumentNullException(nameof(serializer)); + + var eventType = typeof(TEvent); + _binarySerializerByType[eventType] = serializer; + + // Make sure the mapping exists and is wired with this serializer. + // EventMapping reads its BinarySerializer from + // ResolveBinarySerializerFor in its constructor, so if the mapping + // already exists (e.g. another store-options call referenced the type + // first) we need to refresh its serializer reference here too. + AddEventType(eventType); + var mapping = EventMappingFor(eventType); + if (mapping is not null) + { + mapping.BinarySerializer = serializer; + } + + return this; + } + + /// + /// #4515: resolve the binary serializer for an event type. Called from + /// 's constructor. Precedence: explicit + /// per-type registration via + /// beats + . + /// Returns null for plain JSON events. + /// + internal IEventBinarySerializer? ResolveBinarySerializerFor(Type eventType) + { + if (_binarySerializerByType.TryGetValue(eventType, out var explicitSerializer)) + { + return explicitSerializer; + } + + if (eventType.IsDefined(typeof(BinaryEventAttribute), inherit: false)) + { + if (DefaultBinarySerializer is null) + { + throw new InvalidOperationException( + $"Event type '{eventType.FullName}' is marked with [BinaryEvent] but no IEventBinarySerializer was registered. " + + $"Either call opts.Events.UseBinarySerializer<{eventType.Name}>(...) explicitly, " + + $"or set opts.Events.DefaultBinarySerializer to a store-wide fallback."); + } + + return DefaultBinarySerializer; + } + + return null; + } + /// /// Register an event type with Marten. This isn't strictly necessary for normal usage, /// but can help Marten with asynchronous projections where Marten hasn't yet encountered diff --git a/src/Marten/Events/EventMapping.cs b/src/Marten/Events/EventMapping.cs index 0ba723780f..a2dd09cebe 100644 --- a/src/Marten/Events/EventMapping.cs +++ b/src/Marten/Events/EventMapping.cs @@ -50,6 +50,12 @@ protected EventMapping(EventGraph parent, Type eventType) : base(eventType) _parent = parent; DocumentType = eventType; + // #4515: pick up binary-serializer wiring at construction. Either an + // explicit per-type registration via UseBinarySerializer(...) or + // the BinaryEventAttribute + store-wide DefaultBinarySerializer. + // Null = plain JSON-serialized event (the existing path). + BinarySerializer = parent.ResolveBinarySerializerFor(eventType); + IdMember = DocumentType.GetProperty(nameof(IEvent.Id))!; _inner = new DocumentMapping(eventType, parent.Options); @@ -66,6 +72,24 @@ protected EventMapping(EventGraph parent, Type eventType) : base(eventType) JsonTransformation(null); } + /// + /// #4515: binary serializer for this event type, or null for the + /// standard JSON path. When non-null, write operations route the payload + /// into mt_events.bdata (bytea); read operations dispatch on the + /// row's bdata IS NULL state so JSON-serialized rows for the same + /// event type still read correctly. + /// + [IgnoreDescription] + public IEventBinarySerializer? BinarySerializer { get; internal set; } + + /// + /// #4515: true when this event type is opted in to binary + /// serialization on the write path (a is + /// wired). Read-path dispatch remains row-by-row on bdata's NULL + /// state so pre-opt-in JSON rows continue to read through the JSON path. + /// + public bool IsBinary => BinarySerializer is not null; + [IgnoreDescription] public Func ReadEventData { get; private set; } diff --git a/src/Marten/Events/IEventBinarySerializer.cs b/src/Marten/Events/IEventBinarySerializer.cs new file mode 100644 index 0000000000..c57c1f79e9 --- /dev/null +++ b/src/Marten/Events/IEventBinarySerializer.cs @@ -0,0 +1,44 @@ +#nullable enable +using System; + +namespace Marten.Events; + +/// +/// Pluggable binary serializer for event data — addresses +/// #4515. +/// Allows individual event types to opt out of jsonb serialization +/// in favor of a binary wire format (MemoryPack, MessagePack, etc.). +/// +/// +/// +/// Binary serialization is enabled per event type, not +/// store-wide. A store can have JSON events and binary events mixed in +/// the same mt_events table; the row's serialization format is +/// determined by the bdata column being NULL (JSON) or +/// non-null (binary). This makes the feature safe to roll out on an +/// existing store with no migration of existing event data. +/// +/// +/// Opt in by either marking an event type with +/// or registering it through +/// opts.Events.UseBinarySerializer<TEvent>(serializer). +/// Event types without a per-type serializer fall back to the +/// store-wide opts.Events.DefaultBinarySerializer if one is set. +/// +/// +public interface IEventBinarySerializer +{ + /// + /// Serialize an event data instance to bytes. + /// + /// The runtime CLR type of the event data. + /// The event data to serialize. + byte[] Serialize(Type type, object data); + + /// + /// Deserialize bytes back into an event data instance. + /// + /// The target CLR type to deserialize into. + /// The bytes previously produced by . + object Deserialize(Type type, byte[] data); +} diff --git a/src/Marten/Events/IEventStoreOptions.cs b/src/Marten/Events/IEventStoreOptions.cs index cd1cf8c4dc..5f2f7a2b85 100644 --- a/src/Marten/Events/IEventStoreOptions.cs +++ b/src/Marten/Events/IEventStoreOptions.cs @@ -199,6 +199,27 @@ public interface IEventStoreOptions /// void AddEventTypes(IEnumerable types); + /// + /// Store-wide fallback used for event types + /// marked with when no explicit per-type + /// serializer was wired via . Default + /// is null; setting this is what makes attribute-only opt-in work for + /// the common case of one binary serializer per store. See #4515. + /// + public IEventBinarySerializer? DefaultBinarySerializer { get; set; } + + /// + /// Opt a single event type into binary serialization (#4515). The event's + /// payload is written to the bdata bytea column instead of the + /// data jsonb column; existing JSON rows for the same type continue + /// to read through the JSON path. Calling this also adds the event type to + /// the registry (no separate call needed). + /// + /// CLR event type to opt in. + /// Per-type serializer to use for this event. + /// Event store options, to allow fluent definition. + IEventStoreOptions UseBinarySerializer(IEventBinarySerializer serializer); + /// /// Maps CLR event type as particular event type name. This is useful for event type migration. /// See more in documentation diff --git a/src/Marten/Events/Schema/EventBdataColumn.cs b/src/Marten/Events/Schema/EventBdataColumn.cs new file mode 100644 index 0000000000..caa7143ee9 --- /dev/null +++ b/src/Marten/Events/Schema/EventBdataColumn.cs @@ -0,0 +1,54 @@ +#nullable enable +using System.Data.Common; +using JasperFx.Events; +using Marten.Services; +using Weasel.Postgresql.Tables; + +namespace Marten.Events.Schema; + +/// +/// Sibling to for binary event payloads +/// (#4515). +/// For event types opted in to binary serialization, the JSON data +/// column holds the literal '{}'::jsonb placeholder and the actual +/// payload lives here as raw bytes. +/// +/// +/// +/// Nullable on purpose: when an event row uses JSON serialization, +/// data holds the full payload and bdata is NULL. +/// The presence of a non-null bdata is the on-row discriminator. +/// This is what makes the feature additive — existing JSON rows in an +/// upgraded store have bdata = NULL and continue to read through +/// the JSON path. +/// +/// +/// Read-path note: the bytes are consumed up in +/// +/// / +/// +/// (the per-row JSON-vs-binary dispatch point), so this column's +/// ReadValueSync / ReadValueAsync are deliberately +/// no-ops in the per-column metadata loop — they'd otherwise re-read +/// the same bytes for nothing. +/// +/// +internal sealed class EventBdataColumn: TableColumn, IEventTableColumn +{ + public EventBdataColumn(): base("bdata", "bytea") + { + AllowNulls = true; + } + + public string ValueSql(EventGraph graph, AppendMode mode) => "?"; + + void IEventTableColumn.ReadValueSync(DbDataReader reader, int index, IEvent @event) + { + // No-op — consumed in EventDocumentStorage.Resolve to choose the + // JSON-vs-binary deserialization path. + } + + System.Threading.Tasks.Task IEventTableColumn.ReadValueAsync( + DbDataReader reader, int index, IEvent @event, System.Threading.CancellationToken cancellation) + => System.Threading.Tasks.Task.CompletedTask; +} diff --git a/src/Marten/Events/Schema/EventsTable.cs b/src/Marten/Events/Schema/EventsTable.cs index 5ed308c407..c42083ed8e 100644 --- a/src/Marten/Events/Schema/EventsTable.cs +++ b/src/Marten/Events/Schema/EventsTable.cs @@ -23,6 +23,10 @@ public EventsTable(EventGraph events): base(new PostgresqlObjectName(events.Data AddColumn(new VersionColumn()); AddColumn(); + // #4515: nullable sibling to `data` for binary event payloads. NULL + // for JSON-serialized events; bytes (e.g. MemoryPack) for events + // opted in via [BinaryEvent] / opts.Events.UseBinarySerializer(...). + AddColumn(); AddColumn(); AddColumn(new EventTableColumn("timestamp", x => x.Timestamp)) .NotNull().DefaultValueByString("(now())"); @@ -151,6 +155,11 @@ internal IList SelectColumns() var data = columns.OfType().Single(); var typeName = columns.OfType().Single(); var dotNetTypeName = columns.OfType().Single(); + // #4515: bdata (nullable bytea) for binary event payloads. Pinned at + // position 3 in the SELECT projection so EventDocumentStorage.Resolve + // can pick JSON-vs-binary deserialization from a stable ordinal + // before the per-column metadata loop runs. + var bdata = columns.OfType().Single(); columns.Remove(data); columns.Insert(0, data); @@ -158,6 +167,8 @@ internal IList SelectColumns() columns.Insert(1, typeName); columns.Remove(dotNetTypeName); columns.Insert(2, dotNetTypeName); + columns.Remove(bdata); + columns.Insert(3, bdata); return columns; }