Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions docs/events/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,37 @@ var store = DocumentStore.For(opts =>
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L235-L245' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_making_the_events_multi_tenanted' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Big Integer Event Sequences <Badge type="tip" text="8.x" />

By default, Marten's internal PostgreSQL functions (`mt_quick_append_events`) use standard 32-bit `int` types for
event version numbers, sequence values, and return types. This works well for the vast majority of systems, but
the `int` type has a maximum value of approximately 2.1 billion. For very high-volume event stores that may
exceed this threshold, Marten provides an opt-in flag to switch these functions to use 64-bit `bigint` types instead:

```cs
var store = DocumentStore.For(opts =>
{
opts.Connection("some connection string");

// Opt into bigint (64-bit) types for event sequences and versions
// in the PostgreSQL event append functions
opts.Events.EnableBigIntEvents = true;
});
```

When `EnableBigIntEvents` is `true`, the generated `mt_quick_append_events` function will declare its
`event_version`, `seq`, and `return_value` variables as `bigint` instead of `int`, and return `bigint[]`
instead of `int[]`. This prevents integer overflow errors when the global event sequence counter surpasses
the ~2.1 billion limit of a 32-bit integer.

::: warning
Enabling this flag will cause Marten to regenerate the `mt_quick_append_events` function with different
type signatures. This means a schema migration will be required when you first enable the flag. Marten's
normal schema migration tooling will handle this automatically.
:::

::: info
The `EnableBigIntEvents` flag is `false` by default in Marten 8.x for backward compatibility. Starting
in **Marten 9.0**, this flag will default to `true`.
:::
194 changes: 194 additions & 0 deletions src/EventSourcingTests/Bugs/Bug_4246_enable_bigint_events.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
using System;
using System.Threading.Tasks;
using JasperFx.Events;
using Marten;
using Marten.Events;
using Marten.Testing.Harness;
using Npgsql;
using Shouldly;
using Xunit;

namespace EventSourcingTests.Bugs;

/// <summary>
/// Tests for the EnableBigIntEvents flag that controls whether
/// mt_quick_append_events uses bigint (64-bit) types for version,
/// sequence, and return values. Without this flag, sequence values
/// exceeding int32 range (~2.1B) cause integer out of range errors.
/// </summary>
public class Bug_4246_enable_bigint_events : OneOffConfigurationsContext
{
[Fact]
public async Task events_work_normally_with_bigint_disabled()
{
StoreOptions(opts =>
{
opts.Events.AppendMode = EventAppendMode.Quick;
// EnableBigIntEvents is false by default
});

var streamId = Guid.NewGuid();
await using (var session = theStore.LightweightSession())
{
session.Events.StartStream(streamId,
new QuestStarted { Name = "Quest 1" },
new MembersJoined { Members = new[] { "Frodo", "Sam" } });
await session.SaveChangesAsync();
}

await using (var session = theStore.LightweightSession())
{
session.Events.Append(streamId,
new MembersDeparted { Members = new[] { "Sam" } });
await session.SaveChangesAsync();
}

await using (var query = theStore.QuerySession())
{
var events = await query.Events.FetchStreamAsync(streamId);
events.Count.ShouldBe(3);
events[0].Version.ShouldBe(1);
events[1].Version.ShouldBe(2);
events[2].Version.ShouldBe(3);
}
}

[Fact]
public async Task events_work_normally_with_bigint_enabled()
{
StoreOptions(opts =>
{
opts.Events.AppendMode = EventAppendMode.Quick;
opts.Events.EnableBigIntEvents = true;
});

var streamId = Guid.NewGuid();
await using (var session = theStore.LightweightSession())
{
session.Events.StartStream(streamId,
new QuestStarted { Name = "Quest 2" },
new MembersJoined { Members = new[] { "Aragorn", "Legolas" } });
await session.SaveChangesAsync();
}

await using (var session = theStore.LightweightSession())
{
session.Events.Append(streamId,
new MembersDeparted { Members = new[] { "Legolas" } });
await session.SaveChangesAsync();
}

await using (var query = theStore.QuerySession())
{
var events = await query.Events.FetchStreamAsync(streamId);
events.Count.ShouldBe(3);
events[0].Version.ShouldBe(1);
events[1].Version.ShouldBe(2);
events[2].Version.ShouldBe(3);
}
}

[Fact]
public async Task bigint_enabled_handles_sequences_above_int32_max()
{
StoreOptions(opts =>
{
opts.Events.AppendMode = EventAppendMode.Quick;
opts.Events.EnableBigIntEvents = true;
});

var schemaName = theStore.Options.DatabaseSchemaName;
const long largeSequence = 2_200_000_000L;

// Append first event at normal sequence
var streamId = Guid.NewGuid();
await using (var session = theStore.LightweightSession())
{
session.Events.StartStream(streamId,
new QuestStarted { Name = "BigInt Test" });
await session.SaveChangesAsync();
}

// Jump the sequence past int32 max
await using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString);
await conn.OpenAsync();
var restartCmd = conn.CreateCommand();
restartCmd.CommandText = $"ALTER SEQUENCE {schemaName}.mt_events_sequence RESTART WITH {largeSequence}";
await restartCmd.ExecuteNonQueryAsync();
await conn.CloseAsync();

// Append second event — this would fail with int overflow without bigint
await using (var session = theStore.LightweightSession())
{
session.Events.Append(streamId,
new MembersJoined { Members = new[] { "Gandalf" } });
await session.SaveChangesAsync();
}

// Verify both events exist with correct sequences
await using (var query = theStore.QuerySession())
{
var events = await query.Events.FetchStreamAsync(streamId);
events.Count.ShouldBe(2);
events[0].Sequence.ShouldBe(1L);
events[1].Sequence.ShouldBe(largeSequence);
}
}

[Fact]
public void bigint_events_is_false_by_default()
{
var opts = new StoreOptions();
opts.Events.EnableBigIntEvents.ShouldBeFalse();
}

[Fact]
public async Task function_uses_int_when_flag_is_false()
{
StoreOptions(opts =>
{
opts.Events.AppendMode = EventAppendMode.Quick;
// EnableBigIntEvents defaults to false
});

await theStore.Storage.Database.EnsureStorageExistsAsync(typeof(IEvent), default);

// Check the function DDL contains int types
var ddl = theStore.Storage.Database.ToDatabaseScript();
// The function should use "int" not "bigint" when flag is off
// (DDL only shows pending changes, so verify via function definition)
await using var conn = theStore.Storage.Database.CreateConnection();
await conn.OpenAsync();
var schema = theStore.Options.DatabaseSchemaName;

await using var cmd = conn.CreateCommand();
cmd.CommandText = $"SELECT pg_get_functiondef(oid) FROM pg_proc WHERE proname = 'mt_quick_append_events' AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = '{schema}')";
var funcDef = (string?)await cmd.ExecuteScalarAsync();

funcDef.ShouldNotBeNull();
funcDef.ShouldContain("integer[]"); // Returns int[] when bigint disabled
}

[Fact]
public async Task function_uses_bigint_when_flag_is_true()
{
StoreOptions(opts =>
{
opts.Events.AppendMode = EventAppendMode.Quick;
opts.Events.EnableBigIntEvents = true;
});

await theStore.Storage.Database.EnsureStorageExistsAsync(typeof(IEvent), default);

await using var conn = theStore.Storage.Database.CreateConnection();
await conn.OpenAsync();
var schema = theStore.Options.DatabaseSchemaName;

await using var cmd = conn.CreateCommand();
cmd.CommandText = $"SELECT pg_get_functiondef(oid) FROM pg_proc WHERE proname = 'mt_quick_append_events' AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = '{schema}')";
var funcDef = (string?)await cmd.ExecuteScalarAsync();

funcDef.ShouldNotBeNull();
funcDef.ShouldContain("bigint[]"); // Returns bigint[] when enabled
}
}
8 changes: 8 additions & 0 deletions src/Marten/Events/EventGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ public override IEvent BuildEvent(object eventData)
/// </summary>
public bool EnableEventTypeIndex { get; set; } = false;

/// <summary>
/// Opt into using bigint (64-bit) types for event version, sequence, and return
/// values in the mt_quick_append_events and mt_get_next_hi PostgreSQL functions.
/// This prevents integer overflow when sequence values exceed int32 range (~2.1 billion).
/// Default is false for backward compatibility. Will become true by default in Marten 9.0.
/// </summary>
public bool EnableBigIntEvents { get; set; } = false;

public bool EnableSideEffectsOnInlineProjections { get; set; } = false;

/// <summary>
Expand Down
7 changes: 7 additions & 0 deletions src/Marten/Events/IEventStoreOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ public interface IEventStoreOptions
/// </summary>
public bool EnableEventTypeIndex { get; set; }

/// <summary>
/// Opt into using bigint (64-bit) types for event version, sequence, and return
/// values in PostgreSQL functions. Prevents integer overflow when sequence values
/// exceed int32 range. Default is false. Will become true in Marten 9.0.
/// </summary>
public bool EnableBigIntEvents { get; set; }

public EventAppendMode AppendMode { get; set; }

/// <summary>
Expand Down
13 changes: 9 additions & 4 deletions src/Marten/Events/Schema/QuickAppendEventFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,22 @@ public override void WriteCreateStatement(Migrator migrator, TextWriter writer)
}
}

// When EnableBigIntEvents is true, use bigint for version/sequence/return
// to prevent integer overflow when sequences exceed int32 range (~2.1B)
var intType = _events.EnableBigIntEvents ? "bigint" : "int";
var returnType = _events.EnableBigIntEvents ? "bigint[]" : "int[]";

writer.WriteLine($@"
CREATE OR REPLACE FUNCTION {Identifier}(stream {streamIdType}, stream_type varchar, tenantid varchar, event_ids uuid[], event_types varchar[], dotnet_types varchar[], bodies jsonb[]{metadataParameters}{tagParameters}) RETURNS bigint[] AS $$
CREATE OR REPLACE FUNCTION {Identifier}(stream {streamIdType}, stream_type varchar, tenantid varchar, event_ids uuid[], event_types varchar[], dotnet_types varchar[], bodies jsonb[]{metadataParameters}{tagParameters}) RETURNS {returnType} AS $$
DECLARE
event_version bigint;
event_version {intType};
event_type varchar;
event_id uuid;
body jsonb;
index int;
seq bigint;
seq {intType};
actual_tenant varchar;
return_value bigint[];
return_value {returnType};
BEGIN
select version into event_version from {databaseSchema}.mt_streams where {streamsWhere};
if event_version IS NULL then
Expand Down
Loading