diff --git a/Directory.Packages.props b/Directory.Packages.props index 0b5b1b4902..dbefb2d640 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -56,6 +56,7 @@ + diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 930d1b3948..45b3e8d5d0 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -169,6 +169,7 @@ const config: UserConfig = { { text: 'Quick Start', link: '/events/quickstart' }, { text: 'Storage', link: '/events/storage' }, { text: 'Appending Events', link: '/events/appending' }, + { text: 'Bulk Appending Events', link: '/events/bulk-appending' }, { text: 'Querying Events', link: '/events/querying' }, { text: 'Metadata', link: '/events/metadata' }, { text: 'Archiving Streams', link: '/events/archiving' }, diff --git a/docs/events/bulk-appending.md b/docs/events/bulk-appending.md new file mode 100644 index 0000000000..dc636ad3bd --- /dev/null +++ b/docs/events/bulk-appending.md @@ -0,0 +1,212 @@ +# Bulk Appending Events + + + +::: tip +This feature is intended for data seeding, migration from other event stores, load testing, and importing +events from external systems. For normal application event appending, use the standard +[Appending Events](/events/appending) API instead. +::: + +Marten provides a high-throughput bulk event append API that uses PostgreSQL's `COPY ... FROM STDIN BINARY` +protocol to efficiently load large numbers of events into the event store. This bypasses the normal append +pipeline for maximum speed, making it suitable for scenarios where you need to insert millions or even +billions of events. + +## How It Works + +The bulk append API: + +1. Pre-allocates event sequence numbers from the `mt_events_sequence` +2. Uses `NpgsqlBinaryImporter` to COPY stream records into `mt_streams` +3. Uses `NpgsqlBinaryImporter` to COPY event records into `mt_events` +4. Updates the high water mark in `mt_event_progression` so the async daemon knows where to start + +This approach is significantly faster than the normal append path because it avoids per-row function +calls, version checking, and individual INSERT statements. + +## Basic Usage + +Build a list of `StreamAction` objects representing new event streams, then call `BulkInsertEventsAsync` +on the document store: + + + +```cs +public static async Task BulkAppendBasicExample(DocumentStore store) +{ + // Build up a list of stream actions with events + var streams = new List(); + + for (int i = 0; i < 1000; i++) + { + var streamId = Guid.NewGuid(); + var events = new object[] + { + new BulkOrderPlaced(streamId, "Widget", 5), + new BulkOrderShipped(streamId, $"TRACK-{i}"), + new BulkOrderDelivered(streamId, DateTimeOffset.UtcNow) + }; + + streams.Add(StreamAction.Start(store.Events, streamId, events)); + } + + // Bulk insert all events using PostgreSQL COPY for maximum throughput + await store.BulkInsertEventsAsync(streams); +} +``` +snippet source | anchor + + +## Multi-Tenancy + +When using [conjoined multi-tenancy](/events/multitenancy), use the tenant-specific overload: + + + +```cs +public static async Task BulkAppendWithTenantExample(DocumentStore store) +{ + var streams = new List(); + + for (int i = 0; i < 500; i++) + { + var streamId = Guid.NewGuid(); + var events = new object[] + { + new BulkOrderPlaced(streamId, "Gadget", 2), + new BulkOrderShipped(streamId, $"TRACK-{i}") + }; + + streams.Add(StreamAction.Start(store.Events, streamId, events)); + } + + // Bulk insert events for a specific tenant when using conjoined tenancy + await store.BulkInsertEventsAsync("tenant-abc", streams); +} +``` +snippet source | anchor + + +## Event Metadata + +You can set metadata on individual events before bulk inserting. This works with any combination +of enabled metadata columns (correlation ID, causation ID, headers, user name): + + + +```cs +public static async Task BulkAppendWithMetadataExample(DocumentStore store) +{ + var streamId = Guid.NewGuid(); + var events = new object[] + { + new BulkOrderPlaced(streamId, "Widget", 10), + new BulkOrderShipped(streamId, "TRACK-123") + }; + + var action = StreamAction.Start(store.Events, streamId, events); + + // Set metadata on individual events before bulk inserting + foreach (var e in action.Events) + { + e.CorrelationId = "import-batch-42"; + e.CausationId = "migration-job"; + e.SetHeader("source", "legacy-system"); + } + + await store.BulkInsertEventsAsync(new[] { action }); +} +``` +snippet source | anchor + + +## Controlling Batch Size + +For very large imports, you can control the COPY batch size. Each batch is a separate PostgreSQL +COPY operation, which helps manage memory usage: + + + +```cs +public static async Task BulkAppendWithBatchSizeExample(DocumentStore store) +{ + var streams = new List(); + + // Generate a large number of streams + for (int i = 0; i < 100_000; i++) + { + var streamId = Guid.NewGuid(); + streams.Add(StreamAction.Start(store.Events, streamId, + new object[] { new BulkOrderPlaced(streamId, "Item", 1) })); + } + + // Control the COPY batch size for memory management. + // Each batch is a separate PostgreSQL COPY operation. + await store.BulkInsertEventsAsync(streams, batchSize: 5000); +} +``` +snippet source | anchor + + +## String Stream Identity + +The bulk append API works with both Guid and string stream identities: + + + +```cs +public static async Task BulkAppendWithStringIdentityExample(DocumentStore store) +{ + // When using StreamIdentity.AsString, use string-keyed stream actions + var streams = new List(); + + for (int i = 0; i < 100; i++) + { + var key = $"order-{Guid.NewGuid():N}"; + var events = new object[] + { + new BulkOrderPlaced(Guid.NewGuid(), "Widget", 1), + new BulkOrderShipped(Guid.NewGuid(), $"TRACK-{i}") + }; + + streams.Add(StreamAction.Start(store.Events, key, events)); + } + + await store.BulkInsertEventsAsync(streams); +} +``` +snippet source | anchor + + +## Supported Configurations + +The bulk append API supports all combinations of: + +| Configuration | Options | +| ------------- | ------- | +| Stream identity | `AsGuid`, `AsString` | +| Tenancy | Single, Conjoined | +| Archived stream partitioning | On, Off | +| Metadata columns | Correlation ID, Causation ID, Headers, User Name (any combination) | + +## Limitations + +The bulk append API intentionally trades off features for throughput: + +- **No inline projections** -- events are written directly without triggering inline projections. Use + [async projections](/events/projections/async-daemon) and rebuild after bulk loading. +- **No optimistic concurrency** -- there is no version checking against existing streams. This API is + designed for initial data loading, not concurrent writes. +- **New streams only** -- bulk append creates new streams. It does not support appending to existing streams. +- **No event tags** -- DCB tag operations are not included in the COPY pipeline. Tags would need to be + handled separately after bulk loading. + +## Performance + +In local benchmarks, the bulk append API achieves approximately **80,000-110,000 events/second** +depending on event complexity and PostgreSQL configuration. This compares to approximately +**60,000-80,000 events/second** using Marten's QuickAppend mode with parallel sessions. + +The bulk append approach is especially advantageous when loading tens of millions of events or more, +where the reduced per-event overhead of PostgreSQL COPY becomes significant. diff --git a/src/EventSourcingTests/BulkEventAppendTests.cs b/src/EventSourcingTests/BulkEventAppendTests.cs new file mode 100644 index 0000000000..bf85941bdc --- /dev/null +++ b/src/EventSourcingTests/BulkEventAppendTests.cs @@ -0,0 +1,696 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using JasperFx.Events; +using Marten; +using Marten.Events; +using Marten.Storage; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace EventSourcingTests; + +public class BulkEventAppendTests: OneOffConfigurationsContext +{ + #region Helpers + + private static List createQuestStreams(EventGraph events, int count) + { + var actions = new List(); + for (int i = 0; i < count; i++) + { + var id = Guid.NewGuid(); + var questEvents = new object[] + { + new QuestStarted { Name = $"Quest {i}" }, + new MembersJoined(1, "Somewhere", "Frodo", "Sam"), + new MembersJoined(2, "Rivendell", "Aragorn"), + new QuestEnded { Name = $"Quest {i}" } + }; + + actions.Add(StreamAction.Start(events, id, questEvents)); + } + + return actions; + } + + private static List createStringKeyedStreams(EventGraph events, int count) + { + var actions = new List(); + for (int i = 0; i < count; i++) + { + var key = $"quest-{Guid.NewGuid():N}"; + var questEvents = new object[] + { + new QuestStarted { Name = $"Quest {i}" }, + new MembersJoined(1, "Shire", "Bilbo"), + new QuestEnded { Name = $"Quest {i}" } + }; + + actions.Add(StreamAction.Start(events, key, questEvents)); + } + + return actions; + } + + private static StreamAction createSingleStream(EventGraph events, Guid id) + { + var questEvents = new object[] + { + new QuestStarted { Name = "Test Quest" }, + new MembersJoined(1, "Shire", "Frodo"), + new MembersJoined(2, "Rivendell", "Aragorn"), + new QuestEnded { Name = "Test Quest" } + }; + return StreamAction.Start(events, id, questEvents); + } + + private static StreamAction createSingleStringStream(EventGraph events, string key) + { + var questEvents = new object[] + { + new QuestStarted { Name = "Test Quest" }, + new MembersJoined(1, "Shire", "Bilbo"), + new QuestEnded { Name = "Test Quest" } + }; + return StreamAction.Start(events, key, questEvents); + } + + #endregion + + #region Stream Identity: Guid + + [Fact] + public async Task guid_identity_single_tenant() + { + var store = StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsGuid; + }); + + var actions = createQuestStreams(store.Events, 20); + await store.BulkInsertEventsAsync(actions); + + await using var session = store.LightweightSession(); + var stats = await store.Advanced.FetchEventStoreStatistics(); + + stats.StreamCount.ShouldBe(20); + stats.EventCount.ShouldBe(80); + + var firstId = actions[0].Id; + var events = await session.Events.FetchStreamAsync(firstId); + events.Count.ShouldBe(4); + + // Verify event types are correct + events[0].Data.ShouldBeOfType(); + events[1].Data.ShouldBeOfType(); + events[3].Data.ShouldBeOfType(); + } + + [Fact] + public async Task guid_identity_conjoined_tenancy() + { + var store = StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsGuid; + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + }); + + var actions1 = createQuestStreams(store.Events, 10); + await store.BulkInsertEventsAsync("tenant-1", actions1); + + var actions2 = createQuestStreams(store.Events, 5); + await store.BulkInsertEventsAsync("tenant-2", actions2); + + await using var session1 = store.LightweightSession("tenant-1"); + var allEvents1 = await session1.Events.QueryAllRawEvents().ToListAsync(); + allEvents1.ShouldAllBe(e => e.TenantId == "tenant-1"); + allEvents1.Count.ShouldBe(40); + + await using var session2 = store.LightweightSession("tenant-2"); + var allEvents2 = await session2.Events.QueryAllRawEvents().ToListAsync(); + allEvents2.ShouldAllBe(e => e.TenantId == "tenant-2"); + allEvents2.Count.ShouldBe(20); + + // Verify streams are isolated per tenant + var stream1 = await session1.Events.FetchStreamAsync(actions1[0].Id); + stream1.Count.ShouldBe(4); + + var stream2CrossTenant = await session2.Events.FetchStreamAsync(actions1[0].Id); + stream2CrossTenant.Count.ShouldBe(0); + } + + #endregion + + #region Stream Identity: String + + [Fact] + public async Task string_identity_single_tenant() + { + var store = StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + var actions = createStringKeyedStreams(store.Events, 10); + await store.BulkInsertEventsAsync(actions); + + await using var session = store.LightweightSession(); + var stats = await store.Advanced.FetchEventStoreStatistics(); + stats.StreamCount.ShouldBe(10); + + var firstKey = actions[0].Key; + var events = await session.Events.FetchStreamAsync(firstKey!); + events.Count.ShouldBe(3); + } + + [Fact] + public async Task string_identity_conjoined_tenancy() + { + var store = StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + }); + + var actions1 = createStringKeyedStreams(store.Events, 8); + await store.BulkInsertEventsAsync("tenant-a", actions1); + + var actions2 = createStringKeyedStreams(store.Events, 4); + await store.BulkInsertEventsAsync("tenant-b", actions2); + + await using var sessionA = store.LightweightSession("tenant-a"); + var eventsA = await sessionA.Events.QueryAllRawEvents().ToListAsync(); + eventsA.ShouldAllBe(e => e.TenantId == "tenant-a"); + eventsA.Count.ShouldBe(24); // 8 streams * 3 events + + await using var sessionB = store.LightweightSession("tenant-b"); + var eventsB = await sessionB.Events.QueryAllRawEvents().ToListAsync(); + eventsB.ShouldAllBe(e => e.TenantId == "tenant-b"); + eventsB.Count.ShouldBe(12); + + // Cross-tenant isolation + var streamA = await sessionA.Events.FetchStreamAsync(actions1[0].Key!); + streamA.Count.ShouldBe(3); + + var streamACrossTenant = await sessionB.Events.FetchStreamAsync(actions1[0].Key!); + streamACrossTenant.Count.ShouldBe(0); + } + + #endregion + + #region Metadata: Individual Columns + + [Fact] + public async Task metadata_correlation_id_only() + { + var store = StoreOptions(opts => + { + opts.Events.MetadataConfig.CorrelationIdEnabled = true; + }); + + var streamId = Guid.NewGuid(); + var action = createSingleStream(store.Events, streamId); + for (int i = 0; i < action.Events.Count; i++) + { + action.Events[i].CorrelationId = $"corr-{i}"; + } + + await store.BulkInsertEventsAsync(new[] { action }); + + await using var session = store.LightweightSession(); + var loaded = await session.Events.FetchStreamAsync(streamId); + + loaded.Count.ShouldBe(4); + loaded[0].CorrelationId.ShouldBe("corr-0"); + loaded[1].CorrelationId.ShouldBe("corr-1"); + loaded[3].CorrelationId.ShouldBe("corr-3"); + } + + [Fact] + public async Task metadata_causation_id_only() + { + var store = StoreOptions(opts => + { + opts.Events.MetadataConfig.CausationIdEnabled = true; + }); + + var streamId = Guid.NewGuid(); + var action = createSingleStream(store.Events, streamId); + for (int i = 0; i < action.Events.Count; i++) + { + action.Events[i].CausationId = $"cause-{i}"; + } + + await store.BulkInsertEventsAsync(new[] { action }); + + await using var session = store.LightweightSession(); + var loaded = await session.Events.FetchStreamAsync(streamId); + + loaded[0].CausationId.ShouldBe("cause-0"); + loaded[2].CausationId.ShouldBe("cause-2"); + } + + [Fact] + public async Task metadata_headers_only() + { + var store = StoreOptions(opts => + { + opts.Events.MetadataConfig.HeadersEnabled = true; + }); + + var streamId = Guid.NewGuid(); + var action = createSingleStream(store.Events, streamId); + action.Events[0].SetHeader("key-a", "value-a"); + action.Events[1].SetHeader("key-b", 42); + + await store.BulkInsertEventsAsync(new[] { action }); + + await using var session = store.LightweightSession(); + var loaded = await session.Events.FetchStreamAsync(streamId); + + loaded[0].GetHeader("key-a").ShouldBe("value-a"); + loaded[1].GetHeader("key-b").ShouldNotBeNull(); + } + + [Fact] + public async Task metadata_user_name_only() + { + var store = StoreOptions(opts => + { + opts.Events.MetadataConfig.UserNameEnabled = true; + }); + + var streamId = Guid.NewGuid(); + var action = createSingleStream(store.Events, streamId); + for (int i = 0; i < action.Events.Count; i++) + { + action.Events[i].UserName = $"user-{i}"; + } + + await store.BulkInsertEventsAsync(new[] { action }); + + await using var session = store.LightweightSession(); + var loaded = await session.Events.FetchStreamAsync(streamId); + + loaded[0].UserName.ShouldBe("user-0"); + loaded[3].UserName.ShouldBe("user-3"); + } + + #endregion + + #region Metadata: All Columns Combined + + [Fact] + public async Task metadata_all_columns_enabled() + { + var store = StoreOptions(opts => + { + opts.Events.MetadataConfig.CorrelationIdEnabled = true; + opts.Events.MetadataConfig.CausationIdEnabled = true; + opts.Events.MetadataConfig.HeadersEnabled = true; + opts.Events.MetadataConfig.UserNameEnabled = true; + }); + + var streamId = Guid.NewGuid(); + var action = createSingleStream(store.Events, streamId); + for (int i = 0; i < action.Events.Count; i++) + { + action.Events[i].CorrelationId = $"corr-{i}"; + action.Events[i].CausationId = $"cause-{i}"; + action.Events[i].SetHeader("idx", i); + action.Events[i].UserName = $"user-{i}"; + } + + await store.BulkInsertEventsAsync(new[] { action }); + + await using var session = store.LightweightSession(); + var loaded = await session.Events.FetchStreamAsync(streamId); + + loaded.Count.ShouldBe(4); + for (int i = 0; i < loaded.Count; i++) + { + loaded[i].CorrelationId.ShouldBe($"corr-{i}"); + loaded[i].CausationId.ShouldBe($"cause-{i}"); + loaded[i].GetHeader("idx").ShouldNotBeNull(); + loaded[i].UserName.ShouldBe($"user-{i}"); + } + } + + [Fact] + public async Task metadata_all_columns_with_nulls() + { + var store = StoreOptions(opts => + { + opts.Events.MetadataConfig.CorrelationIdEnabled = true; + opts.Events.MetadataConfig.CausationIdEnabled = true; + opts.Events.MetadataConfig.HeadersEnabled = true; + opts.Events.MetadataConfig.UserNameEnabled = true; + }); + + var streamId = Guid.NewGuid(); + var action = createSingleStream(store.Events, streamId); + // Only set metadata on first event, leave rest null + action.Events[0].CorrelationId = "corr-only"; + action.Events[0].CausationId = "cause-only"; + action.Events[0].UserName = "user-only"; + + await store.BulkInsertEventsAsync(new[] { action }); + + await using var session = store.LightweightSession(); + var loaded = await session.Events.FetchStreamAsync(streamId); + + loaded.Count.ShouldBe(4); + loaded[0].CorrelationId.ShouldBe("corr-only"); + loaded[0].CausationId.ShouldBe("cause-only"); + loaded[0].UserName.ShouldBe("user-only"); + + // Other events should have null metadata + loaded[1].CorrelationId.ShouldBeNull(); + loaded[1].CausationId.ShouldBeNull(); + loaded[1].UserName.ShouldBeNull(); + } + + #endregion + + #region Metadata + String Identity + + [Fact] + public async Task string_identity_with_all_metadata() + { + var store = StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Events.MetadataConfig.CorrelationIdEnabled = true; + opts.Events.MetadataConfig.CausationIdEnabled = true; + opts.Events.MetadataConfig.HeadersEnabled = true; + opts.Events.MetadataConfig.UserNameEnabled = true; + }); + + var key = "meta-quest-string"; + var action = createSingleStringStream(store.Events, key); + for (int i = 0; i < action.Events.Count; i++) + { + action.Events[i].CorrelationId = $"corr-{i}"; + action.Events[i].CausationId = $"cause-{i}"; + action.Events[i].UserName = $"user-{i}"; + action.Events[i].SetHeader("pos", i); + } + + await store.BulkInsertEventsAsync(new[] { action }); + + await using var session = store.LightweightSession(); + var loaded = await session.Events.FetchStreamAsync(key); + + loaded.Count.ShouldBe(3); + loaded[0].CorrelationId.ShouldBe("corr-0"); + loaded[0].UserName.ShouldBe("user-0"); + loaded[2].CausationId.ShouldBe("cause-2"); + } + + #endregion + + #region Metadata + Conjoined Tenancy + + [Fact] + public async Task conjoined_tenancy_with_all_metadata() + { + var store = StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Events.MetadataConfig.CorrelationIdEnabled = true; + opts.Events.MetadataConfig.CausationIdEnabled = true; + opts.Events.MetadataConfig.HeadersEnabled = true; + opts.Events.MetadataConfig.UserNameEnabled = true; + }); + + var streamId = Guid.NewGuid(); + var action = createSingleStream(store.Events, streamId); + for (int i = 0; i < action.Events.Count; i++) + { + action.Events[i].CorrelationId = $"tenant-corr-{i}"; + action.Events[i].CausationId = $"tenant-cause-{i}"; + action.Events[i].UserName = $"tenant-user-{i}"; + } + + await store.BulkInsertEventsAsync("my-tenant", new[] { action }); + + await using var session = store.LightweightSession("my-tenant"); + var loaded = await session.Events.FetchStreamAsync(streamId); + + loaded.Count.ShouldBe(4); + loaded.ShouldAllBe(e => e.TenantId == "my-tenant"); + loaded[0].CorrelationId.ShouldBe("tenant-corr-0"); + loaded[0].UserName.ShouldBe("tenant-user-0"); + } + + #endregion + + #region Archived Stream Partitioning Combos + + [Fact] + public async Task archived_partitioning_with_guid_identity() + { + var store = StoreOptions(opts => + { + opts.Events.UseArchivedStreamPartitioning = true; + }); + + var actions = createQuestStreams(store.Events, 10); + await store.BulkInsertEventsAsync(actions); + + var stats = await store.Advanced.FetchEventStoreStatistics(); + stats.StreamCount.ShouldBe(10); + stats.EventCount.ShouldBe(40); + } + + [Fact] + public async Task archived_partitioning_with_string_identity() + { + var store = StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Events.UseArchivedStreamPartitioning = true; + }); + + var actions = createStringKeyedStreams(store.Events, 10); + await store.BulkInsertEventsAsync(actions); + + var stats = await store.Advanced.FetchEventStoreStatistics(); + stats.StreamCount.ShouldBe(10); + stats.EventCount.ShouldBe(30); + } + + [Fact] + public async Task archived_partitioning_with_conjoined_tenancy() + { + var store = StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Events.UseArchivedStreamPartitioning = true; + }); + + var actions = createQuestStreams(store.Events, 8); + await store.BulkInsertEventsAsync("partitioned-tenant", actions); + + await using var session = store.LightweightSession("partitioned-tenant"); + var events = await session.Events.QueryAllRawEvents().ToListAsync(); + events.Count.ShouldBe(32); + events.ShouldAllBe(e => e.TenantId == "partitioned-tenant"); + } + + [Fact] + public async Task archived_partitioning_with_conjoined_tenancy_and_string_identity() + { + var store = StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Events.UseArchivedStreamPartitioning = true; + }); + + var actions = createStringKeyedStreams(store.Events, 6); + await store.BulkInsertEventsAsync("combined-tenant", actions); + + await using var session = store.LightweightSession("combined-tenant"); + var events = await session.Events.QueryAllRawEvents().ToListAsync(); + events.Count.ShouldBe(18); + events.ShouldAllBe(e => e.TenantId == "combined-tenant"); + + var stream = await session.Events.FetchStreamAsync(actions[0].Key!); + stream.Count.ShouldBe(3); + } + + #endregion + + #region Version and Sequence Correctness + + [Fact] + public async Task versions_correct_with_guid_identity() + { + var store = StoreOptions(opts => { }); + + var streamId = Guid.NewGuid(); + var action = createSingleStream(store.Events, streamId); + await store.BulkInsertEventsAsync(new[] { action }); + + await using var session = store.LightweightSession(); + var loaded = await session.Events.FetchStreamAsync(streamId); + + for (int i = 0; i < loaded.Count; i++) + { + loaded[i].Version.ShouldBe(i + 1); + } + + var state = await session.Events.FetchStreamStateAsync(streamId); + state.ShouldNotBeNull(); + state!.Version.ShouldBe(4); + } + + [Fact] + public async Task versions_correct_with_string_identity() + { + var store = StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + var key = "version-test-stream"; + var action = createSingleStringStream(store.Events, key); + await store.BulkInsertEventsAsync(new[] { action }); + + await using var session = store.LightweightSession(); + var loaded = await session.Events.FetchStreamAsync(key); + + for (int i = 0; i < loaded.Count; i++) + { + loaded[i].Version.ShouldBe(i + 1); + } + + var state = await session.Events.FetchStreamStateAsync(key); + state.ShouldNotBeNull(); + state!.Version.ShouldBe(3); + } + + [Fact] + public async Task sequences_are_globally_unique_and_ordered() + { + var store = StoreOptions(opts => { }); + + var actions = createQuestStreams(store.Events, 20); + await store.BulkInsertEventsAsync(actions); + + await using var session = store.LightweightSession(); + var allEvents = await session.Events.QueryAllRawEvents() + .OrderBy(e => e.Sequence) + .ToListAsync(); + + // All sequences should be unique + var sequences = allEvents.Select(e => e.Sequence).ToList(); + sequences.Distinct().Count().ShouldBe(sequences.Count); + + // Should be in ascending order + for (int i = 1; i < sequences.Count; i++) + { + sequences[i].ShouldBeGreaterThan(sequences[i - 1]); + } + } + + [Fact] + public async Task high_water_mark_updated() + { + var store = StoreOptions(opts => { }); + + var actions = createQuestStreams(store.Events, 10); + var totalEvents = actions.Sum(a => a.Events.Count); + + await store.BulkInsertEventsAsync(actions); + + var stats = await store.Advanced.FetchEventStoreStatistics(); + stats.EventCount.ShouldBe(totalEvents); + } + + #endregion + + #region Batching + + [Fact] + public async Task multiple_copy_batches() + { + var store = StoreOptions(opts => { }); + + var actions = createQuestStreams(store.Events, 50); + await store.BulkInsertEventsAsync(actions, batchSize: 10); + + var stats = await store.Advanced.FetchEventStoreStatistics(); + stats.StreamCount.ShouldBe(50); + stats.EventCount.ShouldBe(200); + } + + [Fact] + public async Task batch_size_of_one() + { + var store = StoreOptions(opts => { }); + + var actions = createQuestStreams(store.Events, 5); + await store.BulkInsertEventsAsync(actions, batchSize: 1); + + var stats = await store.Advanced.FetchEventStoreStatistics(); + stats.StreamCount.ShouldBe(5); + stats.EventCount.ShouldBe(20); + } + + #endregion + + #region Full Combination: Conjoined + String + All Metadata + Archived Partitioning + + [Fact] + public async Task full_combination_all_options_enabled() + { + var store = StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Events.UseArchivedStreamPartitioning = true; + opts.Events.MetadataConfig.CorrelationIdEnabled = true; + opts.Events.MetadataConfig.CausationIdEnabled = true; + opts.Events.MetadataConfig.HeadersEnabled = true; + opts.Events.MetadataConfig.UserNameEnabled = true; + }); + + var key = "full-combo-quest"; + var action = createSingleStringStream(store.Events, key); + for (int i = 0; i < action.Events.Count; i++) + { + action.Events[i].CorrelationId = $"fc-corr-{i}"; + action.Events[i].CausationId = $"fc-cause-{i}"; + action.Events[i].UserName = $"fc-user-{i}"; + action.Events[i].SetHeader("full-combo", true); + } + + await store.BulkInsertEventsAsync("full-combo-tenant", new[] { action }); + + await using var session = store.LightweightSession("full-combo-tenant"); + var loaded = await session.Events.FetchStreamAsync(key); + + loaded.Count.ShouldBe(3); + loaded.ShouldAllBe(e => e.TenantId == "full-combo-tenant"); + + for (int i = 0; i < loaded.Count; i++) + { + loaded[i].Version.ShouldBe(i + 1); + loaded[i].CorrelationId.ShouldBe($"fc-corr-{i}"); + loaded[i].CausationId.ShouldBe($"fc-cause-{i}"); + loaded[i].UserName.ShouldBe($"fc-user-{i}"); + loaded[i].GetHeader("full-combo").ShouldNotBeNull(); + } + + var state = await session.Events.FetchStreamStateAsync(key); + state.ShouldNotBeNull(); + state!.Version.ShouldBe(3); + } + + #endregion +} diff --git a/src/EventSourcingTests/Examples/BulkAppendSamples.cs b/src/EventSourcingTests/Examples/BulkAppendSamples.cs new file mode 100644 index 0000000000..4b17acd25c --- /dev/null +++ b/src/EventSourcingTests/Examples/BulkAppendSamples.cs @@ -0,0 +1,137 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using JasperFx.Events; +using Marten; +using Marten.Events; + +namespace EventSourcingTests.Examples; + +public static class BulkAppendSamples +{ + #region sample_bulk_append_events_basic + + public static async Task BulkAppendBasicExample(DocumentStore store) + { + // Build up a list of stream actions with events + var streams = new List(); + + for (int i = 0; i < 1000; i++) + { + var streamId = Guid.NewGuid(); + var events = new object[] + { + new BulkOrderPlaced(streamId, "Widget", 5), + new BulkOrderShipped(streamId, $"TRACK-{i}"), + new BulkOrderDelivered(streamId, DateTimeOffset.UtcNow) + }; + + streams.Add(StreamAction.Start(store.Events, streamId, events)); + } + + // Bulk insert all events using PostgreSQL COPY for maximum throughput + await store.BulkInsertEventsAsync(streams); + } + + #endregion + + #region sample_bulk_append_events_with_tenant + + public static async Task BulkAppendWithTenantExample(DocumentStore store) + { + var streams = new List(); + + for (int i = 0; i < 500; i++) + { + var streamId = Guid.NewGuid(); + var events = new object[] + { + new BulkOrderPlaced(streamId, "Gadget", 2), + new BulkOrderShipped(streamId, $"TRACK-{i}") + }; + + streams.Add(StreamAction.Start(store.Events, streamId, events)); + } + + // Bulk insert events for a specific tenant when using conjoined tenancy + await store.BulkInsertEventsAsync("tenant-abc", streams); + } + + #endregion + + #region sample_bulk_append_events_with_metadata + + public static async Task BulkAppendWithMetadataExample(DocumentStore store) + { + var streamId = Guid.NewGuid(); + var events = new object[] + { + new BulkOrderPlaced(streamId, "Widget", 10), + new BulkOrderShipped(streamId, "TRACK-123") + }; + + var action = StreamAction.Start(store.Events, streamId, events); + + // Set metadata on individual events before bulk inserting + foreach (var e in action.Events) + { + e.CorrelationId = "import-batch-42"; + e.CausationId = "migration-job"; + e.SetHeader("source", "legacy-system"); + } + + await store.BulkInsertEventsAsync(new[] { action }); + } + + #endregion + + #region sample_bulk_append_events_with_batch_size + + public static async Task BulkAppendWithBatchSizeExample(DocumentStore store) + { + var streams = new List(); + + // Generate a large number of streams + for (int i = 0; i < 100_000; i++) + { + var streamId = Guid.NewGuid(); + streams.Add(StreamAction.Start(store.Events, streamId, + new object[] { new BulkOrderPlaced(streamId, "Item", 1) })); + } + + // Control the COPY batch size for memory management. + // Each batch is a separate PostgreSQL COPY operation. + await store.BulkInsertEventsAsync(streams, batchSize: 5000); + } + + #endregion + + #region sample_bulk_append_events_string_identity + + public static async Task BulkAppendWithStringIdentityExample(DocumentStore store) + { + // When using StreamIdentity.AsString, use string-keyed stream actions + var streams = new List(); + + for (int i = 0; i < 100; i++) + { + var key = $"order-{Guid.NewGuid():N}"; + var events = new object[] + { + new BulkOrderPlaced(Guid.NewGuid(), "Widget", 1), + new BulkOrderShipped(Guid.NewGuid(), $"TRACK-{i}") + }; + + streams.Add(StreamAction.Start(store.Events, key, events)); + } + + await store.BulkInsertEventsAsync(streams); + } + + #endregion +} + +// Sample event types for documentation +public record BulkOrderPlaced(Guid OrderId, string Product, int Quantity); +public record BulkOrderShipped(Guid OrderId, string TrackingNumber); +public record BulkOrderDelivered(Guid OrderId, DateTimeOffset DeliveredAt); diff --git a/src/Marten/DocumentStore.cs b/src/Marten/DocumentStore.cs index a7a3f400fc..e8f0d79c65 100644 --- a/src/Marten/DocumentStore.cs +++ b/src/Marten/DocumentStore.cs @@ -165,6 +165,65 @@ public async Task BulkInsertDocumentsAsync(string tenantId, IEnumerable await bulkInsertion.BulkInsertDocumentsAsync(documents, mode, batchSize, cancellation).ConfigureAwait(false); } + public async Task BulkInsertEventsAsync(IReadOnlyList streams, int batchSize = 1000, + CancellationToken cancellation = default) + { + await Storage.ApplyAllConfiguredChangesToDatabaseAsync().ConfigureAwait(false); + + var tenant = Tenancy.Default; + var appender = new BulkEventAppender(Events, Options.Serializer()); + + await using var conn = tenant.Database.CreateConnection(); + await conn.OpenAsync(cancellation).ConfigureAwait(false); + var tx = await conn.BeginTransactionAsync(cancellation).ConfigureAwait(false); + + try + { + await appender.BulkInsertAsync(conn, streams, batchSize, cancellation).ConfigureAwait(false); + await tx.CommitAsync(cancellation).ConfigureAwait(false); + } + catch + { + await tx.RollbackAsync(cancellation).ConfigureAwait(false); + throw; + } + } + + public async Task BulkInsertEventsAsync(string tenantId, IReadOnlyList streams, + int batchSize = 1000, CancellationToken cancellation = default) + { + await Storage.ApplyAllConfiguredChangesToDatabaseAsync().ConfigureAwait(false); + + var tenant = await Tenancy.GetTenantAsync(Options.TenantIdStyle.MaybeCorrectTenantId(tenantId)) + .ConfigureAwait(false); + var appender = new BulkEventAppender(Events, Options.Serializer()); + + // Set tenant on all streams + foreach (var stream in streams) + { + stream.TenantId = tenantId; + foreach (var e in stream.Events) + { + e.TenantId = tenantId; + } + } + + await using var conn = tenant.Database.CreateConnection(); + await conn.OpenAsync(cancellation).ConfigureAwait(false); + var tx = await conn.BeginTransactionAsync(cancellation).ConfigureAwait(false); + + try + { + await appender.BulkInsertAsync(conn, streams, batchSize, cancellation).ConfigureAwait(false); + await tx.CommitAsync(cancellation).ConfigureAwait(false); + } + catch + { + await tx.RollbackAsync(cancellation).ConfigureAwait(false); + throw; + } + } + public IDiagnostics Diagnostics { get; } public IDocumentSession OpenSession(SessionOptions options) diff --git a/src/Marten/Events/BulkEventAppender.cs b/src/Marten/Events/BulkEventAppender.cs new file mode 100644 index 0000000000..d54a11a190 --- /dev/null +++ b/src/Marten/Events/BulkEventAppender.cs @@ -0,0 +1,410 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx; +using JasperFx.Events; +using Marten.Events.Schema; +using Marten.Storage; +using Npgsql; +using NpgsqlTypes; + +namespace Marten.Events; + +/// +/// Uses PostgreSQL COPY binary import to efficiently bulk-load large numbers of events. +/// This bypasses the normal append pipeline (no inline projections, no optimistic concurrency) +/// and is intended for seeding, migration, and load testing scenarios. +/// +internal class BulkEventAppender +{ + private readonly EventGraph _events; + private readonly ISerializer _serializer; + + public BulkEventAppender(EventGraph events, ISerializer serializer) + { + _events = events; + _serializer = serializer; + } + + public async Task BulkInsertAsync( + NpgsqlConnection conn, + IReadOnlyList streams, + int batchSize, + CancellationToken cancellation) + { + if (streams.Count == 0) return; + + var totalEvents = streams.Sum(s => s.Events.Count); + if (totalEvents == 0) return; + + var schema = _events.DatabaseSchemaName; + + // Step 1: Pre-allocate sequence numbers + var sequences = await fetchSequences(conn, schema, totalEvents, cancellation).ConfigureAwait(false); + + // Step 2: Assign versions and sequences to all events + assignVersionsAndSequences(streams, sequences); + + // Step 3: COPY streams + await copyStreams(conn, schema, streams, batchSize, cancellation).ConfigureAwait(false); + + // Step 4: COPY events + await copyEvents(conn, schema, streams, batchSize, cancellation).ConfigureAwait(false); + + // Step 5: Update high water mark + await updateHighWaterMark(conn, schema, streams, cancellation).ConfigureAwait(false); + } + + private async Task> fetchSequences( + NpgsqlConnection conn, + string schema, + int count, + CancellationToken cancellation) + { + var sql = $"select nextval('{schema}.mt_events_sequence') from generate_series(1,{count})"; + await using var cmd = conn.CreateCommand(); + cmd.CommandText = sql; + await using var reader = await cmd.ExecuteReaderAsync(cancellation).ConfigureAwait(false); + + var queue = new Queue(count); + while (await reader.ReadAsync(cancellation).ConfigureAwait(false)) + { + queue.Enqueue(reader.GetInt64(0)); + } + + return queue; + } + + private void assignVersionsAndSequences(IReadOnlyList streams, Queue sequences) + { + foreach (var stream in streams) + { + long version = 0; + foreach (var e in stream.Events) + { + version++; + e.Version = version; + e.Sequence = sequences.Dequeue(); + + // Ensure event type metadata is populated + if (string.IsNullOrEmpty(e.EventTypeName)) + { + var mapping = _events.EventMappingFor(e.EventType); + e.EventTypeName = mapping.EventTypeName; + e.DotNetTypeName = mapping.DotNetTypeName; + } + } + + stream.Version = version; + } + } + + private async Task copyStreams( + NpgsqlConnection conn, + string schema, + IReadOnlyList streams, + int batchSize, + CancellationToken cancellation) + { + var columns = buildStreamColumns(); + var copySql = $"COPY {schema}.mt_streams({string.Join(", ", columns.Select(c => $"\"{c}\""))}) FROM STDIN BINARY"; + + var batch = 0; + NpgsqlBinaryImporter? writer = null; + + try + { + foreach (var stream in streams) + { + if (batch % batchSize == 0) + { + if (writer != null) + { + await writer.CompleteAsync(cancellation).ConfigureAwait(false); + await writer.DisposeAsync().ConfigureAwait(false); + } + + writer = await conn.BeginBinaryImportAsync(copySql, cancellation).ConfigureAwait(false); + } + + await writer!.StartRowAsync(cancellation).ConfigureAwait(false); + await writeStreamRow(writer, stream, cancellation).ConfigureAwait(false); + batch++; + } + + if (writer != null) + { + await writer.CompleteAsync(cancellation).ConfigureAwait(false); + } + } + finally + { + if (writer != null) + { + await writer.DisposeAsync().ConfigureAwait(false); + } + } + } + + private List buildStreamColumns() + { + var columns = new List(); + + if (_events.TenancyStyle == TenancyStyle.Conjoined) + { + columns.Add("tenant_id"); + } + + columns.Add("id"); + columns.Add("type"); + columns.Add("version"); + columns.Add("is_archived"); + + return columns; + } + + private async Task writeStreamRow(NpgsqlBinaryImporter writer, StreamAction stream, + CancellationToken cancellation) + { + if (_events.TenancyStyle == TenancyStyle.Conjoined) + { + await writer.WriteAsync(stream.TenantId ?? StorageConstants.DefaultTenantId, NpgsqlDbType.Varchar, + cancellation).ConfigureAwait(false); + } + + if (_events.StreamIdentity == StreamIdentity.AsGuid) + { + await writer.WriteAsync(stream.Id, NpgsqlDbType.Uuid, cancellation).ConfigureAwait(false); + } + else + { + await writer.WriteAsync(stream.Key!, NpgsqlDbType.Varchar, cancellation).ConfigureAwait(false); + } + + if (stream.AggregateTypeName != null) + { + await writer.WriteAsync(stream.AggregateTypeName, NpgsqlDbType.Varchar, cancellation) + .ConfigureAwait(false); + } + else + { + await writer.WriteNullAsync(cancellation).ConfigureAwait(false); + } + + await writer.WriteAsync(stream.Version, NpgsqlDbType.Bigint, cancellation).ConfigureAwait(false); + await writer.WriteAsync(false, NpgsqlDbType.Boolean, cancellation).ConfigureAwait(false); + } + + private async Task copyEvents( + NpgsqlConnection conn, + string schema, + IReadOnlyList streams, + int batchSize, + CancellationToken cancellation) + { + var columns = buildEventColumns(); + var copySql = $"COPY {schema}.mt_events({string.Join(", ", columns.Select(c => $"\"{c}\""))}) FROM STDIN BINARY"; + + var batch = 0; + NpgsqlBinaryImporter? writer = null; + + try + { + foreach (var stream in streams) + { + foreach (var e in stream.Events) + { + if (batch % batchSize == 0) + { + if (writer != null) + { + await writer.CompleteAsync(cancellation).ConfigureAwait(false); + await writer.DisposeAsync().ConfigureAwait(false); + } + + writer = await conn.BeginBinaryImportAsync(copySql, cancellation).ConfigureAwait(false); + } + + await writer!.StartRowAsync(cancellation).ConfigureAwait(false); + await writeEventRow(writer, stream, e, cancellation).ConfigureAwait(false); + batch++; + } + } + + if (writer != null) + { + await writer.CompleteAsync(cancellation).ConfigureAwait(false); + } + } + finally + { + if (writer != null) + { + await writer.DisposeAsync().ConfigureAwait(false); + } + } + } + + private List buildEventColumns() + { + var columns = new List + { + "seq_id", + "id", + "stream_id", + "version", + "data", + "type", + "timestamp", + "tenant_id", + "mt_dotnet_type", + "is_archived" + }; + + if (_events.Metadata.CorrelationId.Enabled) + { + columns.Add("correlation_id"); + } + + if (_events.Metadata.CausationId.Enabled) + { + columns.Add("causation_id"); + } + + if (_events.Metadata.Headers.Enabled) + { + columns.Add("headers"); + } + + if (_events.Metadata.UserName.Enabled) + { + columns.Add("user_name"); + } + + return columns; + } + + private async Task writeEventRow(NpgsqlBinaryImporter writer, StreamAction stream, IEvent e, + CancellationToken cancellation) + { + // seq_id + await writer.WriteAsync(e.Sequence, NpgsqlDbType.Bigint, cancellation).ConfigureAwait(false); + + // id + await writer.WriteAsync(e.Id, NpgsqlDbType.Uuid, cancellation).ConfigureAwait(false); + + // stream_id + if (_events.StreamIdentity == StreamIdentity.AsGuid) + { + await writer.WriteAsync(stream.Id, NpgsqlDbType.Uuid, cancellation).ConfigureAwait(false); + } + else + { + await writer.WriteAsync(stream.Key!, NpgsqlDbType.Varchar, cancellation).ConfigureAwait(false); + } + + // version + await writer.WriteAsync(e.Version, NpgsqlDbType.Bigint, cancellation).ConfigureAwait(false); + + // data (jsonb) + var json = _serializer.ToJson(e.Data); + await writer.WriteAsync(json, NpgsqlDbType.Jsonb, cancellation).ConfigureAwait(false); + + // type + await writer.WriteAsync(e.EventTypeName, NpgsqlDbType.Varchar, cancellation).ConfigureAwait(false); + + // timestamp + await writer.WriteAsync(e.Timestamp != default ? e.Timestamp : DateTimeOffset.UtcNow, + NpgsqlDbType.TimestampTz, cancellation).ConfigureAwait(false); + + // tenant_id + await writer.WriteAsync(e.TenantId ?? stream.TenantId ?? StorageConstants.DefaultTenantId, + NpgsqlDbType.Varchar, cancellation).ConfigureAwait(false); + + // mt_dotnet_type + if (!string.IsNullOrEmpty(e.DotNetTypeName)) + { + await writer.WriteAsync(e.DotNetTypeName, NpgsqlDbType.Varchar, cancellation).ConfigureAwait(false); + } + else + { + await writer.WriteNullAsync(cancellation).ConfigureAwait(false); + } + + // is_archived + await writer.WriteAsync(e.IsArchived, NpgsqlDbType.Boolean, cancellation).ConfigureAwait(false); + + // Optional metadata columns + if (_events.Metadata.CorrelationId.Enabled) + { + if (e.CorrelationId != null) + { + await writer.WriteAsync(e.CorrelationId, NpgsqlDbType.Varchar, cancellation).ConfigureAwait(false); + } + else + { + await writer.WriteNullAsync(cancellation).ConfigureAwait(false); + } + } + + if (_events.Metadata.CausationId.Enabled) + { + if (e.CausationId != null) + { + await writer.WriteAsync(e.CausationId, NpgsqlDbType.Varchar, cancellation).ConfigureAwait(false); + } + else + { + await writer.WriteNullAsync(cancellation).ConfigureAwait(false); + } + } + + if (_events.Metadata.Headers.Enabled) + { + if (e.Headers != null) + { + var headersJson = _serializer.ToJson(e.Headers); + await writer.WriteAsync(headersJson, NpgsqlDbType.Jsonb, cancellation).ConfigureAwait(false); + } + else + { + await writer.WriteNullAsync(cancellation).ConfigureAwait(false); + } + } + + if (_events.Metadata.UserName.Enabled) + { + if (e.UserName != null) + { + await writer.WriteAsync(e.UserName, NpgsqlDbType.Varchar, cancellation).ConfigureAwait(false); + } + else + { + await writer.WriteNullAsync(cancellation).ConfigureAwait(false); + } + } + } + + private async Task updateHighWaterMark( + NpgsqlConnection conn, + string schema, + IReadOnlyList streams, + CancellationToken cancellation) + { + var maxSequence = streams + .SelectMany(s => s.Events) + .Max(e => e.Sequence); + + var sql = $@" +INSERT INTO {schema}.mt_event_progression (name, last_seq_id) +VALUES ('HighWaterMark', @seq) +ON CONFLICT (name) DO UPDATE SET last_seq_id = GREATEST({schema}.mt_event_progression.last_seq_id, @seq)"; + + await using var cmd = conn.CreateCommand(); + cmd.CommandText = sql; + cmd.Parameters.AddWithValue("seq", maxSequence); + await cmd.ExecuteNonQueryAsync(cancellation).ConfigureAwait(false); + } +} diff --git a/src/Marten/IDocumentStore.cs b/src/Marten/IDocumentStore.cs index e2695a4751..9987167e98 100644 --- a/src/Marten/IDocumentStore.cs +++ b/src/Marten/IDocumentStore.cs @@ -4,6 +4,7 @@ using System.Threading; using System.Threading.Tasks; using System.Transactions; +using JasperFx.Events; using JasperFx.Events.Daemon; using Marten.Events.Daemon; using Marten.Services; @@ -300,6 +301,29 @@ Task BulkInsertDocumentsAsync(string tenantId, IEnumerable documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, CancellationToken cancellation = default); + /// + /// Uses PostgreSQL's COPY ... FROM STDIN BINARY feature to efficiently bulk-load + /// a large number of events into the event store. This bypasses the normal append pipeline + /// (no inline projections, no optimistic concurrency) and is intended for seeding, + /// migration, and load testing scenarios. + /// + /// The stream actions containing events to insert + /// Number of rows per COPY batch + /// + Task BulkInsertEventsAsync(IReadOnlyList streams, int batchSize = 1000, + CancellationToken cancellation = default); + + /// + /// Uses PostgreSQL's COPY ... FROM STDIN BINARY feature to efficiently bulk-load + /// a large number of events into the event store for a specific tenant. + /// + /// The tenant to insert events for + /// The stream actions containing events to insert + /// Number of rows per COPY batch + /// + Task BulkInsertEventsAsync(string tenantId, IReadOnlyList streams, int batchSize = 1000, + CancellationToken cancellation = default); + /// /// Build a new instance of the asynchronous projection daemon to use interactively /// in your own code