diff --git a/docs/events/configuration.md b/docs/events/configuration.md index fe0e58d8af..768a3b504a 100644 --- a/docs/events/configuration.md +++ b/docs/events/configuration.md @@ -62,3 +62,37 @@ var store = DocumentStore.For(opts => ``` snippet source | anchor + +## Big Integer Event Sequences + +By default, Marten's internal PostgreSQL functions (`mt_quick_append_events`) use standard 32-bit `int` types for +event version numbers, sequence values, and return types. This works well for the vast majority of systems, but +the `int` type has a maximum value of approximately 2.1 billion. For very high-volume event stores that may +exceed this threshold, Marten provides an opt-in flag to switch these functions to use 64-bit `bigint` types instead: + +```cs +var store = DocumentStore.For(opts => +{ + opts.Connection("some connection string"); + + // Opt into bigint (64-bit) types for event sequences and versions + // in the PostgreSQL event append functions + opts.Events.EnableBigIntEvents = true; +}); +``` + +When `EnableBigIntEvents` is `true`, the generated `mt_quick_append_events` function will declare its +`event_version`, `seq`, and `return_value` variables as `bigint` instead of `int`, and return `bigint[]` +instead of `int[]`. This prevents integer overflow errors when the global event sequence counter surpasses +the ~2.1 billion limit of a 32-bit integer. + +::: warning +Enabling this flag will cause Marten to regenerate the `mt_quick_append_events` function with different +type signatures. This means a schema migration will be required when you first enable the flag. Marten's +normal schema migration tooling will handle this automatically. +::: + +::: info +The `EnableBigIntEvents` flag is `false` by default in Marten 8.x for backward compatibility. Starting +in **Marten 9.0**, this flag will default to `true`. +::: diff --git a/src/EventSourcingTests/Bugs/Bug_4246_enable_bigint_events.cs b/src/EventSourcingTests/Bugs/Bug_4246_enable_bigint_events.cs new file mode 100644 index 0000000000..507c99c13c --- /dev/null +++ b/src/EventSourcingTests/Bugs/Bug_4246_enable_bigint_events.cs @@ -0,0 +1,194 @@ +using System; +using System.Threading.Tasks; +using JasperFx.Events; +using Marten; +using Marten.Events; +using Marten.Testing.Harness; +using Npgsql; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.Bugs; + +/// +/// Tests for the EnableBigIntEvents flag that controls whether +/// mt_quick_append_events uses bigint (64-bit) types for version, +/// sequence, and return values. Without this flag, sequence values +/// exceeding int32 range (~2.1B) cause integer out of range errors. +/// +public class Bug_4246_enable_bigint_events : OneOffConfigurationsContext +{ + [Fact] + public async Task events_work_normally_with_bigint_disabled() + { + StoreOptions(opts => + { + opts.Events.AppendMode = EventAppendMode.Quick; + // EnableBigIntEvents is false by default + }); + + var streamId = Guid.NewGuid(); + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, + new QuestStarted { Name = "Quest 1" }, + new MembersJoined { Members = new[] { "Frodo", "Sam" } }); + await session.SaveChangesAsync(); + } + + await using (var session = theStore.LightweightSession()) + { + session.Events.Append(streamId, + new MembersDeparted { Members = new[] { "Sam" } }); + await session.SaveChangesAsync(); + } + + await using (var query = theStore.QuerySession()) + { + var events = await query.Events.FetchStreamAsync(streamId); + events.Count.ShouldBe(3); + events[0].Version.ShouldBe(1); + events[1].Version.ShouldBe(2); + events[2].Version.ShouldBe(3); + } + } + + [Fact] + public async Task events_work_normally_with_bigint_enabled() + { + StoreOptions(opts => + { + opts.Events.AppendMode = EventAppendMode.Quick; + opts.Events.EnableBigIntEvents = true; + }); + + var streamId = Guid.NewGuid(); + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, + new QuestStarted { Name = "Quest 2" }, + new MembersJoined { Members = new[] { "Aragorn", "Legolas" } }); + await session.SaveChangesAsync(); + } + + await using (var session = theStore.LightweightSession()) + { + session.Events.Append(streamId, + new MembersDeparted { Members = new[] { "Legolas" } }); + await session.SaveChangesAsync(); + } + + await using (var query = theStore.QuerySession()) + { + var events = await query.Events.FetchStreamAsync(streamId); + events.Count.ShouldBe(3); + events[0].Version.ShouldBe(1); + events[1].Version.ShouldBe(2); + events[2].Version.ShouldBe(3); + } + } + + [Fact] + public async Task bigint_enabled_handles_sequences_above_int32_max() + { + StoreOptions(opts => + { + opts.Events.AppendMode = EventAppendMode.Quick; + opts.Events.EnableBigIntEvents = true; + }); + + var schemaName = theStore.Options.DatabaseSchemaName; + const long largeSequence = 2_200_000_000L; + + // Append first event at normal sequence + var streamId = Guid.NewGuid(); + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, + new QuestStarted { Name = "BigInt Test" }); + await session.SaveChangesAsync(); + } + + // Jump the sequence past int32 max + await using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString); + await conn.OpenAsync(); + var restartCmd = conn.CreateCommand(); + restartCmd.CommandText = $"ALTER SEQUENCE {schemaName}.mt_events_sequence RESTART WITH {largeSequence}"; + await restartCmd.ExecuteNonQueryAsync(); + await conn.CloseAsync(); + + // Append second event — this would fail with int overflow without bigint + await using (var session = theStore.LightweightSession()) + { + session.Events.Append(streamId, + new MembersJoined { Members = new[] { "Gandalf" } }); + await session.SaveChangesAsync(); + } + + // Verify both events exist with correct sequences + await using (var query = theStore.QuerySession()) + { + var events = await query.Events.FetchStreamAsync(streamId); + events.Count.ShouldBe(2); + events[0].Sequence.ShouldBe(1L); + events[1].Sequence.ShouldBe(largeSequence); + } + } + + [Fact] + public void bigint_events_is_false_by_default() + { + var opts = new StoreOptions(); + opts.Events.EnableBigIntEvents.ShouldBeFalse(); + } + + [Fact] + public async Task function_uses_int_when_flag_is_false() + { + StoreOptions(opts => + { + opts.Events.AppendMode = EventAppendMode.Quick; + // EnableBigIntEvents defaults to false + }); + + await theStore.Storage.Database.EnsureStorageExistsAsync(typeof(IEvent), default); + + // Check the function DDL contains int types + var ddl = theStore.Storage.Database.ToDatabaseScript(); + // The function should use "int" not "bigint" when flag is off + // (DDL only shows pending changes, so verify via function definition) + await using var conn = theStore.Storage.Database.CreateConnection(); + await conn.OpenAsync(); + var schema = theStore.Options.DatabaseSchemaName; + + await using var cmd = conn.CreateCommand(); + cmd.CommandText = $"SELECT pg_get_functiondef(oid) FROM pg_proc WHERE proname = 'mt_quick_append_events' AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = '{schema}')"; + var funcDef = (string?)await cmd.ExecuteScalarAsync(); + + funcDef.ShouldNotBeNull(); + funcDef.ShouldContain("integer[]"); // Returns int[] when bigint disabled + } + + [Fact] + public async Task function_uses_bigint_when_flag_is_true() + { + StoreOptions(opts => + { + opts.Events.AppendMode = EventAppendMode.Quick; + opts.Events.EnableBigIntEvents = true; + }); + + await theStore.Storage.Database.EnsureStorageExistsAsync(typeof(IEvent), default); + + await using var conn = theStore.Storage.Database.CreateConnection(); + await conn.OpenAsync(); + var schema = theStore.Options.DatabaseSchemaName; + + await using var cmd = conn.CreateCommand(); + cmd.CommandText = $"SELECT pg_get_functiondef(oid) FROM pg_proc WHERE proname = 'mt_quick_append_events' AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = '{schema}')"; + var funcDef = (string?)await cmd.ExecuteScalarAsync(); + + funcDef.ShouldNotBeNull(); + funcDef.ShouldContain("bigint[]"); // Returns bigint[] when enabled + } +} diff --git a/src/Marten/Events/EventGraph.cs b/src/Marten/Events/EventGraph.cs index 5d1f066bcc..57e1832dc9 100644 --- a/src/Marten/Events/EventGraph.cs +++ b/src/Marten/Events/EventGraph.cs @@ -220,6 +220,14 @@ public override IEvent BuildEvent(object eventData) /// public bool EnableEventTypeIndex { get; set; } = false; + /// + /// Opt into using bigint (64-bit) types for event version, sequence, and return + /// values in the mt_quick_append_events and mt_get_next_hi PostgreSQL functions. + /// This prevents integer overflow when sequence values exceed int32 range (~2.1 billion). + /// Default is false for backward compatibility. Will become true by default in Marten 9.0. + /// + public bool EnableBigIntEvents { get; set; } = false; + public bool EnableSideEffectsOnInlineProjections { get; set; } = false; /// diff --git a/src/Marten/Events/IEventStoreOptions.cs b/src/Marten/Events/IEventStoreOptions.cs index 526d8233bf..273c7d620f 100644 --- a/src/Marten/Events/IEventStoreOptions.cs +++ b/src/Marten/Events/IEventStoreOptions.cs @@ -71,6 +71,13 @@ public interface IEventStoreOptions /// public bool EnableEventTypeIndex { get; set; } + /// + /// Opt into using bigint (64-bit) types for event version, sequence, and return + /// values in PostgreSQL functions. Prevents integer overflow when sequence values + /// exceed int32 range. Default is false. Will become true in Marten 9.0. + /// + public bool EnableBigIntEvents { get; set; } + public EventAppendMode AppendMode { get; set; } /// diff --git a/src/Marten/Events/Schema/QuickAppendEventFunction.cs b/src/Marten/Events/Schema/QuickAppendEventFunction.cs index b51e6b441a..bb1e9b6af4 100644 --- a/src/Marten/Events/Schema/QuickAppendEventFunction.cs +++ b/src/Marten/Events/Schema/QuickAppendEventFunction.cs @@ -97,17 +97,22 @@ public override void WriteCreateStatement(Migrator migrator, TextWriter writer) } } + // When EnableBigIntEvents is true, use bigint for version/sequence/return + // to prevent integer overflow when sequences exceed int32 range (~2.1B) + var intType = _events.EnableBigIntEvents ? "bigint" : "int"; + var returnType = _events.EnableBigIntEvents ? "bigint[]" : "int[]"; + writer.WriteLine($@" -CREATE OR REPLACE FUNCTION {Identifier}(stream {streamIdType}, stream_type varchar, tenantid varchar, event_ids uuid[], event_types varchar[], dotnet_types varchar[], bodies jsonb[]{metadataParameters}{tagParameters}) RETURNS bigint[] AS $$ +CREATE OR REPLACE FUNCTION {Identifier}(stream {streamIdType}, stream_type varchar, tenantid varchar, event_ids uuid[], event_types varchar[], dotnet_types varchar[], bodies jsonb[]{metadataParameters}{tagParameters}) RETURNS {returnType} AS $$ DECLARE - event_version bigint; + event_version {intType}; event_type varchar; event_id uuid; body jsonb; index int; - seq bigint; + seq {intType}; actual_tenant varchar; - return_value bigint[]; + return_value {returnType}; BEGIN select version into event_version from {databaseSchema}.mt_streams where {streamsWhere}; if event_version IS NULL then