diff --git a/docs/events/dcb.md b/docs/events/dcb.md
index 0dd286cefe..7b6a3ccf7b 100644
--- a/docs/events/dcb.md
+++ b/docs/events/dcb.md
@@ -276,6 +276,18 @@ catch (DcbConcurrencyException ex)
The consistency check only detects events that match the **same tag query**. Events appended to unrelated tags or streams will not cause a violation.
:::
+### How the boundary check serializes
+
+::: warning Upgrading from 9.3 or earlier
+Marten 9.4 added a new schema object — `mt_dcb_tag_version` — to fix [#4591](https://github.com/JasperFx/marten/issues/4591). Deployments with `AutoCreate.None` must run `db-patch` / `db-apply` before deploying 9.4. See [Migration Guide → 9.4 schema migration](/migration-guide#required-schema-migration-dcb-tag-version-side-table).
+:::
+
+Internally, `FetchForWritingByTags` records the captured version of every tag value referenced by the query in a side table (`mt_dcb_tag_version`, one row per `(tag_table, tag_value, tenant_id)`). At `SaveChangesAsync` time Marten emits an `INSERT … ON CONFLICT DO UPDATE … WHERE version = $captured RETURNING 1` for each captured row. The row-level lock plus the captured-version predicate is the serialization point: two truly-concurrent appenders observing the same captured version both attempt to bump the row — the first wins, the second's `RETURNING` matches no rows and surfaces `DcbConcurrencyException`. This works at PostgreSQL's default `READ COMMITTED` isolation; no `SERIALIZABLE`, no advisory locks.
+
+Every save that appends a tagged event — boundary or otherwise — also queues a producer-side bump against the same row. That's what keeps a plain `session.Events.Append(streamId, taggedEvent)` from silently committing past an in-flight boundary fetch held by another session: the version moves on every commit, not only on boundary saves.
+
+The side table grows with **distinct boundary-tag values**, not with event volume, and is never deleted automatically — the same `StudentId` or `CourseId` reuses its row across every save. Avoid using ephemeral or one-shot values as DCB tags if you want to keep the table compact.
+
## Checking Event Existence
If you only need to know whether any events matching a tag query exist -- without loading or deserializing them -- use `EventsExistAsync`. This is a lightweight `SELECT EXISTS(...)` query that avoids the overhead of fetching and materializing event data:
@@ -384,17 +396,17 @@ A reproducible side-by-side benchmark lives at `src/DcbLoadTest` and can be re-r
Guidance:
- **Prefer HStore** when your DCB queries match on **two or more tag types** (the common case — most projection boundaries combine an aggregate-id tag with one or more domain tags). The JOIN cost on TagTables grows with each additional tag type; HStore stays flat.
-- **Prefer HStore** when your hot path is `FetchForWritingByTags` (consistency-boundary read-modify-write). Round-trip drops roughly in half because both the read and the consistency-check `EXISTS` become single-table lookups.
+- **Prefer HStore** when your hot path is `FetchForWritingByTags` (consistency-boundary read-modify-write). The fetch round-trip drops because the events `SELECT` is a single-table lookup instead of an N-way JOIN.
- **Stay on TagTables** if your DCB workload is dominated by **single-tag `EventsExistAsync` probes**. That case is what the per-type tables are optimized for — a primary-key lookup on a small dedicated table — and HStore's GIN containment is slightly slower per probe.
- **Either mode is fine** for append throughput. With tags, HStore is about 30% faster than TagTables because it issues one `UPDATE` per tagged event instead of one `INSERT` per `(event, tag)` pair.
If you're starting a new event store on Marten 9.0 and most of your projections key off `(aggregateId, someOtherTag)`, HStore is the recommended choice. If you're upgrading from Marten 8 and already have a populated TagTables-mode store, there is no compelling reason to switch.
-### Consistency Check
+### Consistency Check
-At `SaveChangesAsync` time, Marten executes an `EXISTS` query checking for new events matching the tag query with `seq_id > lastSeenSequence`. This runs in the same transaction as the event appends, providing serializable consistency for the tagged boundary.
+At `SaveChangesAsync` time, Marten emits a per-tag `UPDATE … WHERE version = $captured` against the `mt_dcb_tag_version` side table — one statement per distinct `(tag_table, tag_value)` tuple in the boundary query, in deterministic sort order. The row-level write lock plus the version predicate is the serialization point: two concurrent appenders capturing the same version both try to bump it; one wins, the other's `UPDATE` matches zero rows and surfaces `DcbConcurrencyException`. This works at PostgreSQL's default `READ COMMITTED` isolation — no advisory locks, no `SERIALIZABLE` transactions.
-The shape of the `EXISTS` query depends on the storage mode (multi-table `INNER JOIN` for TagTables, single-table `@>` containment for HStore), but the behavior is identical from the caller's perspective.
+The check shape no longer depends on `DcbStorageMode`. Both `TagTables` and `HStore` share the same side-table mechanism for the consistency check; the storage mode only affects how tags are physically read at fetch time and written at append time.
### Tag Routing
diff --git a/docs/migration-guide.md b/docs/migration-guide.md
index 1531f55d26..7414c2ab4e 100644
--- a/docs/migration-guide.md
+++ b/docs/migration-guide.md
@@ -1,5 +1,33 @@
# Migration Guide
+## Key Changes in 9.4.0
+
+### Required schema migration — DCB tag-version side table
+
+If you register **any** DCB tag types via `Events.RegisterTagType()` (or auto-discovery from `SingleStreamProjection` with a strong-typed `TId`), Marten 9.4 introduces a new schema object: a side table `mt_dcb_tag_version` in your event-store schema. The table is created automatically on first save under the default `AutoCreate.CreateOrUpdate`, but **deployments that pin `AutoCreate.None` and ship schema changes via `db-patch` / `db-apply` must run the migration before deploying 9.4**, or saves with tagged events will fail with `relation ".mt_dcb_tag_version" does not exist`.
+
+```bash
+# Generate the patch against your production-equivalent DB
+dotnet run -- db-patch ./schema-9.4.sql --drop ./schema-9.4.drop.sql
+
+# Or apply directly if your deploy pipeline runs Marten with elevated DDL rights
+dotnet run -- db-apply
+```
+
+#### What changes on every save (with DCB tags)
+
+Beyond the new schema object, every save that appends a tagged event now also queues one extra `INSERT … ON CONFLICT DO UPDATE` against `mt_dcb_tag_version`. This is the *producer-side bump* that makes the boundary check serializable. The overhead is one row write per distinct `(tag_type, tag_value)` tuple referenced by the batch — typically one or two rows per save.
+
+#### Why this lands as a required migration in a point release
+
+This is the fix for [#4591](https://github.com/JasperFx/marten/issues/4591) — a correctness bug where truly-concurrent DCB tag-boundary appends could **both commit** when the contract demands exactly one. The check pre-9.4 emitted a `SELECT EXISTS (… FROM mt_events …)` as a separate non-locking statement before the INSERTs at `READ COMMITTED`, leaving an open race window. Two concurrent fetch→save sessions both ran the check before either committed, both saw no conflict, both inserted. The bug affected both `DcbStorageMode.HStore` and `DcbStorageMode.TagTables` — the predicate shape differed but the racy `SELECT-then-INSERT` pattern was identical.
+
+The side-table mechanism converts the predicate read into a row-level write conflict, so concurrent boundary saves serialize on a row lock at `READ COMMITTED` — no `SERIALIZABLE`, no advisory locks. See [DCB → Consistency Check](/events/dcb#consistency-check) for the full mechanism.
+
+#### Growth and cleanup
+
+The side table grows with **distinct boundary-tag values**, not with event volume — the same `StudentId` or `CourseId` reuses its row across every save. Rows are never deleted automatically; avoid using ephemeral or one-shot values as DCB tags if you want to keep the table compact.
+
## Key Changes in 9.0.0
### Platform support
diff --git a/src/EventSourcingTests/Dcb/Bug_4591_truly_concurrent_dcb_tag_appends_both_commit.cs b/src/EventSourcingTests/Dcb/Bug_4591_truly_concurrent_dcb_tag_appends_both_commit.cs
new file mode 100644
index 0000000000..2714d278d7
--- /dev/null
+++ b/src/EventSourcingTests/Dcb/Bug_4591_truly_concurrent_dcb_tag_appends_both_commit.cs
@@ -0,0 +1,170 @@
+#nullable enable
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using JasperFx.Events;
+using JasperFx.Events.Tags;
+using Marten;
+using Marten.Events;
+using Marten.Testing.Harness;
+using Shouldly;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace EventSourcingTests.Dcb;
+
+// #4591 regression. Pre-fix, AssertDcbConsistency emitted a `SELECT EXISTS (...)` over
+// mt_events as a separate non-locking statement before the INSERTs, with the transaction
+// at READ COMMITTED. Two concurrent fetch-and-save sessions both ran their EXISTS check
+// before either committed, both saw no conflict, both committed → 5–7 of 8 racers
+// committed when the contract demands exactly one.
+//
+// The fix (see DcbTagVersionTable + DcbTagVersionAssertion) replaces the predicate
+// read with an UPDATE … WHERE version = $captured on a side table keyed by
+// (tag_table, tag_value, tenant_id). The UPDATE acquires a row lock; the WHERE
+// clause is the optimistic check; concurrent saves serialize on the row.
+//
+// Run-mode coverage:
+// - [Theory] over DcbStorageMode (HStore + TagTables) so the side-table fix is
+// verified for both physical-storage shapes, not just the HStore one in the
+// reporter's environment.
+// - Reporter's environment was specifically HStore + Conjoined + LightweightSession.
+// The Theory pins that variant and adds the TagTables variant alongside it.
+[Collection("OneOffs")]
+public class Bug_4591_truly_concurrent_dcb_tag_appends_both_commit: OneOffConfigurationsContext
+{
+ private readonly ITestOutputHelper _output;
+
+ public Bug_4591_truly_concurrent_dcb_tag_appends_both_commit(ITestOutputHelper output)
+ {
+ _output = output;
+ }
+
+ private void ConfigureStore(DcbStorageMode storageMode)
+ {
+ StoreOptions(opts =>
+ {
+ opts.Events.AddEventType();
+ opts.Events.AddEventType();
+
+ // Reporter's environment for HStore variant; the side-table fix has to
+ // hold for both DCB storage modes either way.
+ opts.Events.DcbStorageMode = storageMode;
+ opts.Events.TenancyStyle = JasperFx.MultiTenancy.TenancyStyle.Conjoined;
+ opts.Policies.AllDocumentsAreMultiTenanted();
+
+ opts.Events.RegisterTagType("student")
+ .ForAggregate();
+ opts.Events.RegisterTagType("course")
+ .ForAggregate();
+
+ opts.Projections.LiveStreamAggregation();
+ });
+ }
+
+ [Theory]
+ [InlineData(DcbStorageMode.HStore)]
+ [InlineData(DcbStorageMode.TagTables)]
+ public async Task truly_concurrent_appends_sharing_a_tag_serialize_to_one_winner(DcbStorageMode storageMode)
+ {
+ ConfigureStore(storageMode);
+
+ const int Racers = 8;
+ const string TenantId = "acme";
+
+ // The DCB *boundary* tag — shared by every racer. This is the tag value
+ // the DCB consistency check queries against; the bug was a check-then-act
+ // race on this tag.
+ var courseId = new CourseId(Guid.NewGuid());
+
+ // Each racer also carries its OWN StudentId. EventBoundary routes events
+ // to a stream by the first tag with an AggregateType, so distinct
+ // per-racer StudentIds mean distinct streams — the
+ // (stream_id, version) unique constraint on mt_events does NOT serialize
+ // them. The race must be caught by the DCB tag check alone.
+
+ // Seed: one event under a DIFFERENT student stream but tagged with the
+ // shared CourseId so lastSeenSequence > 0.
+ var seedStudentId = new StudentId(Guid.NewGuid());
+ var seedStreamId = seedStudentId.Value;
+ await using (var seedSession = theStore.LightweightSession(TenantId))
+ {
+ var enrolled = seedSession.Events.BuildEvent(new StudentEnrolled("Seed", "Math"));
+ enrolled.WithTag(seedStudentId, courseId);
+ seedSession.Events.Append(seedStreamId, enrolled);
+ await seedSession.SaveChangesAsync();
+ }
+
+ // Barrier-sync the fetch → save handoff. Every session fetches and then
+ // awaits the same TCS; once all `Racers` sessions are past the fetch the
+ // barrier completes and every session races into its SaveChangesAsync
+ // simultaneously. The captured tag versions are therefore identical
+ // across all racers.
+ var allFetched = new TaskCompletionSource[Racers];
+ for (var i = 0; i < Racers; i++) allFetched[i] = new TaskCompletionSource();
+ var startSaves = new TaskCompletionSource();
+
+ // Query is by the SHARED CourseId — that's the boundary the DCB check
+ // must serialize.
+ var query = new EventTagQuery().Or(courseId);
+
+ var racerTasks = Enumerable.Range(0, Racers).Select(i => Task.Run(async () =>
+ {
+ // Each racer has its own StudentId → its own stream when routed
+ // by EventBoundary.AppendOne.
+ var racerStudentId = new StudentId(Guid.NewGuid());
+
+ await using var session = theStore.LightweightSession(TenantId);
+ var boundary = await session.Events.FetchForWritingByTags(query);
+
+ allFetched[i].SetResult();
+ await startSaves.Task;
+
+ var append = session.Events.BuildEvent(new AssignmentSubmitted($"HW-{i}", 80 + i));
+ // studentId first → routes to studentId-stream (distinct per racer);
+ // courseId second → recorded against the shared boundary.
+ append.WithTag(racerStudentId, courseId);
+ boundary.AppendOne(append);
+
+ try
+ {
+ await session.SaveChangesAsync();
+ return (Index: i, Committed: true, Exception: (Exception?)null);
+ }
+ catch (DcbConcurrencyException ex)
+ {
+ return (Index: i, Committed: false, Exception: (Exception?)ex);
+ }
+ })).ToArray();
+
+ await Task.WhenAll(allFetched.Select(t => t.Task));
+ startSaves.SetResult();
+
+ var results = await Task.WhenAll(racerTasks);
+
+ var committed = results.Count(r => r.Committed);
+ var throws = results.Count(r => r.Exception is DcbConcurrencyException);
+ var otherErrors = results.Where(r => r.Exception is not null and not DcbConcurrencyException).ToList();
+
+ _output.WriteLine($"DCB storage mode: {storageMode}");
+ _output.WriteLine($"Truly-concurrent racers: {Racers}");
+ _output.WriteLine($" Committed: {committed}");
+ _output.WriteLine($" DcbConcurrencyException: {throws}");
+ if (otherErrors.Count > 0)
+ {
+ _output.WriteLine($" Other exceptions: {otherErrors.Count}");
+ foreach (var er in otherErrors)
+ {
+ _output.WriteLine($" Racer {er.Index}: {er.Exception}");
+ }
+ }
+
+ // Exactly one racer should commit; everyone else must observe the
+ // DCB concurrency violation.
+ committed.ShouldBe(1,
+ $"Expected exactly one truly-concurrent append to commit; observed {committed} commits + {throws} DcbConcurrencyException throws (mode = {storageMode}).");
+ throws.ShouldBe(Racers - 1);
+ otherErrors.ShouldBeEmpty();
+ }
+}
diff --git a/src/EventSourcingTests/Dcb/hstore_conjoined_tenancy_dcb_tag_tests.cs b/src/EventSourcingTests/Dcb/hstore_conjoined_tenancy_dcb_tag_tests.cs
index 3b46bada54..970a31257d 100644
--- a/src/EventSourcingTests/Dcb/hstore_conjoined_tenancy_dcb_tag_tests.cs
+++ b/src/EventSourcingTests/Dcb/hstore_conjoined_tenancy_dcb_tag_tests.cs
@@ -17,7 +17,7 @@ namespace EventSourcingTests.Dcb;
/// . Validates that the per-tenant filters added
/// to the HStore branches in EventStore.Dcb.BuildTagQuerySql,
/// EventsExistByTagsHandler, FetchForWritingByTagsHandler,
-/// AssertDcbConsistency, and SetEventTagsHstoreOperation correctly
+/// DcbTagVersionAssertion, and SetEventTagsHstoreOperation correctly
/// isolate DCB query, exists, aggregate, fetch-for-writing, and consistency-check
/// results between tenants.
///
diff --git a/src/Marten/Events/Dcb/AssertDcbConsistency.cs b/src/Marten/Events/Dcb/AssertDcbConsistency.cs
deleted file mode 100644
index b184144d9c..0000000000
--- a/src/Marten/Events/Dcb/AssertDcbConsistency.cs
+++ /dev/null
@@ -1,158 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Data.Common;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-using JasperFx.Events;
-using JasperFx.Events.Tags;
-using Marten.Internal;
-using Marten.Internal.Operations;
-using Marten.Storage;
-using Weasel.Postgresql;
-
-namespace Marten.Events.Dcb;
-
-internal class AssertDcbConsistency: IStorageOperation
-{
- private readonly EventGraph _events;
- private readonly EventTagQuery _query;
- private readonly long _lastSeenSequence;
-
- public AssertDcbConsistency(EventGraph events, EventTagQuery query, long lastSeenSequence)
- {
- _events = events;
- _query = query;
- _lastSeenSequence = lastSeenSequence;
- }
-
- public void ConfigureCommand(ICommandBuilder builder, IMartenSession session)
- {
- var conditions = _query.Conditions;
-
- if (_events.DcbStorageMode == DcbStorageMode.HStore)
- {
- builder.Append("select exists (select 1 from ");
- builder.Append(_events.DatabaseSchemaName);
- builder.Append(".mt_events e where e.seq_id > ");
- builder.AppendParameter(_lastSeenSequence);
- builder.Append(" and ");
- HStoreDcbQueryFragment.AppendOrPredicate(builder, _events, conditions, "e");
-
- if (_events.TenancyStyle == TenancyStyle.Conjoined)
- {
- builder.Append(" and e.tenant_id = ");
- builder.AppendParameter(session.TenantId);
- }
-
- builder.Append(" limit 1)");
- return;
- }
-
- // Build EXISTS query to check if any new matching events have been appended
- // since our last seen sequence
- builder.Append("select exists (select 1 from ");
-
- var distinctTagTypes = conditions.Select(c => c.TagType).Distinct().ToList();
-
- // Start with the first tag table
- var first = true;
- for (var i = 0; i < distinctTagTypes.Count; i++)
- {
- var tagType = distinctTagTypes[i];
- var registration = _events.FindTagType(tagType)
- ?? throw new InvalidOperationException(
- $"Tag type '{tagType.Name}' is not registered. Call RegisterTagType<{tagType.Name}>() first.");
-
- var alias = $"t{i}";
- if (first)
- {
- builder.Append(_events.DatabaseSchemaName);
- builder.Append(".mt_event_tag_");
- builder.Append(registration.TableSuffix);
- builder.Append(" ");
- builder.Append(alias);
- first = false;
- }
- else
- {
- builder.Append(" inner join ");
- builder.Append(_events.DatabaseSchemaName);
- builder.Append(".mt_event_tag_");
- builder.Append(registration.TableSuffix);
- builder.Append(" ");
- builder.Append(alias);
- builder.Append(" on t0.seq_id = ");
- builder.Append(alias);
- builder.Append(".seq_id");
- }
- }
-
- // Join to mt_events only if we need event type filtering
- var hasEventTypeFilter = conditions.Any(c => c.EventType != null);
- if (hasEventTypeFilter)
- {
- builder.Append(" inner join ");
- builder.Append(_events.DatabaseSchemaName);
- builder.Append(".mt_events e on t0.seq_id = e.seq_id");
- }
-
- builder.Append(" where t0.seq_id > ");
- builder.AppendParameter(_lastSeenSequence);
-
- // Build OR conditions
- builder.Append(" and (");
- for (var i = 0; i < conditions.Count; i++)
- {
- if (i > 0)
- {
- builder.Append(" or ");
- }
-
- var condition = conditions[i];
- var tagIndex = distinctTagTypes.IndexOf(condition.TagType);
- var alias = $"t{tagIndex}";
-
- builder.Append("(");
- builder.Append(alias);
- builder.Append(".value = ");
-
- var registration = _events.FindTagType(condition.TagType)!;
- var value = registration.ExtractValue(condition.TagValue);
- builder.AppendParameter(value);
-
- if (condition.EventType != null)
- {
- builder.Append(" and e.type = ");
- var eventTypeName = _events.EventMappingFor(condition.EventType).EventTypeName;
- builder.AppendParameter(eventTypeName);
- }
-
- builder.Append(")");
- }
-
- builder.Append(")");
-
- // Filter by tenant_id for conjoined tenancy
- if (_events.TenancyStyle == TenancyStyle.Conjoined)
- {
- builder.Append(" and t0.tenant_id = ");
- builder.AppendParameter(session.TenantId);
- }
-
- builder.Append(" limit 1)");
- }
-
- public Type DocumentType => typeof(IEvent);
-
- public async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token)
- {
- if (await reader.ReadAsync(token).ConfigureAwait(false) &&
- await reader.GetFieldValueAsync(0, token).ConfigureAwait(false))
- {
- exceptions.Add(new DcbConcurrencyException(_query, _lastSeenSequence));
- }
- }
-
- public OperationRole Role() => OperationRole.Events;
-}
diff --git a/src/Marten/Events/Dcb/DcbTagVersionAssertion.cs b/src/Marten/Events/Dcb/DcbTagVersionAssertion.cs
new file mode 100644
index 0000000000..bcd6c22b23
--- /dev/null
+++ b/src/Marten/Events/Dcb/DcbTagVersionAssertion.cs
@@ -0,0 +1,146 @@
+#nullable enable
+using System;
+using System.Collections.Generic;
+using System.Data.Common;
+using System.Threading;
+using System.Threading.Tasks;
+using JasperFx.Events;
+using JasperFx.Events.Tags;
+using Marten.Internal;
+using Marten.Internal.Operations;
+using Weasel.Postgresql;
+
+namespace Marten.Events.Dcb;
+
+///
+/// One captured-and-bumped tag-version row carried from FetchForWritingByTags
+/// through to SaveChangesAsync.
+///
+///
+/// The tag type's — matches the
+/// mt_event_tag_{suffix} table name and is the discriminator stored in
+/// mt_dcb_tag_version.tag_table.
+///
+/// Canonical string form of the tag value (see ).
+/// The version observed at fetch time. The save's UPDATE WHERE version = $captured is the optimistic check.
+internal readonly record struct DcbTagVersionEntry(string TagTable, string TagValue, long CapturedVersion);
+
+///
+/// Storage operation that enforces the DCB tag boundary by bumping the
+/// mt_dcb_tag_version rows captured at fetch time. Replaces the racy
+/// SELECT-EXISTS over mt_events that
+/// emitted. Each UPDATE is the serialization point: at READ COMMITTED, the
+/// row-level lock + version = $captured predicate converts what was a
+/// predicate read into a row-level write conflict. Fixes #4591.
+///
+///
+/// Multi-tag queries emit one UPDATE per (tag_table, tag_value) tuple. The
+/// tuples are sorted by (tag_table, tag_value) before SQL is built so two
+/// concurrent appenders touching the same tag set acquire locks in identical
+/// order — no risk of deadlock from cross-locking.
+///
+internal class DcbTagVersionAssertion: IStorageOperation
+{
+ private readonly EventGraph _events;
+ private readonly EventTagQuery _query;
+ private readonly long _lastSeenSequence;
+ private readonly IReadOnlyList _orderedEntries;
+
+ public DcbTagVersionAssertion(
+ EventGraph events,
+ EventTagQuery query,
+ long lastSeenSequence,
+ IReadOnlyList capturedEntries)
+ {
+ _events = events;
+ _query = query;
+ _lastSeenSequence = lastSeenSequence;
+
+ // Sort once, here — both ConfigureCommand and PostprocessAsync iterate
+ // in the same order, and the deterministic order is what keeps two
+ // concurrent appenders touching the same tag rows from deadlocking.
+ var sorted = new DcbTagVersionEntry[capturedEntries.Count];
+ for (var i = 0; i < capturedEntries.Count; i++) sorted[i] = capturedEntries[i];
+ Array.Sort(sorted, static (a, b) =>
+ {
+ var byTable = string.CompareOrdinal(a.TagTable, b.TagTable);
+ return byTable != 0 ? byTable : string.CompareOrdinal(a.TagValue, b.TagValue);
+ });
+ _orderedEntries = sorted;
+ }
+
+ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session)
+ {
+ var schema = _events.DatabaseSchemaName;
+ var tenantId = session.TenantId;
+
+ for (var i = 0; i < _orderedEntries.Count; i++)
+ {
+ // StartNewCommand (not `; `) so Npgsql sends each statement
+ // as a separate batched command — multiple `;`-separated
+ // statements in a single prepared statement raise Postgres
+ // SQLSTATE 42601.
+ if (i > 0) builder.StartNewCommand();
+
+ var entry = _orderedEntries[i];
+
+ // INSERT … ON CONFLICT DO UPDATE WHERE handles the two cases the
+ // fetch path delegates to us:
+ // - captured = 0 + row missing → INSERT(version=1) succeeds; first save wins.
+ // - row exists → ON CONFLICT branch; the WHERE filters to captured-match
+ // only, so any save that observed a stale version returns no row.
+ // INSERT … ON CONFLICT waits on the conflicting row's xact, so two
+ // first-time creators serialize on the unique-PK insert path the same
+ // way subsequent versioned-update saves serialize on the row lock.
+ builder.Append("insert into ");
+ builder.Append(schema);
+ builder.Append(".mt_dcb_tag_version (tag_table, tag_value, tenant_id, version) values (");
+ builder.AppendParameter(entry.TagTable);
+ builder.Append(", ");
+ builder.AppendParameter(entry.TagValue);
+ builder.Append(", ");
+ builder.AppendParameter(tenantId);
+ builder.Append(", ");
+ builder.AppendParameter(entry.CapturedVersion + 1);
+ // ON CONFLICT DO UPDATE references the existing row via the
+ // unqualified table name (`mt_dcb_tag_version.version`) — Postgres
+ // does not accept the schema prefix in this clause.
+ builder.Append(") on conflict (tag_table, tag_value, tenant_id) do update set version = mt_dcb_tag_version.version + 1 where mt_dcb_tag_version.version = ");
+ builder.AppendParameter(entry.CapturedVersion);
+ builder.Append(" returning 1");
+ }
+ }
+
+ public Type DocumentType => typeof(IEvent);
+
+ public async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token)
+ {
+ var conflictDetected = false;
+
+ for (var i = 0; i < _orderedEntries.Count; i++)
+ {
+ if (i > 0)
+ {
+ // Advance past the previous statement's result set. The outer
+ // OperationPage advances past the LAST result set on its own.
+ await reader.NextResultAsync(token).ConfigureAwait(false);
+ }
+
+ var hasRow = await reader.ReadAsync(token).ConfigureAwait(false);
+ if (!hasRow)
+ {
+ conflictDetected = true;
+ // Keep iterating so we consume the remaining result sets — the
+ // batch protocol requires every statement's result set to be
+ // walked before the next operation can read its own.
+ }
+ }
+
+ if (conflictDetected)
+ {
+ exceptions.Add(new DcbConcurrencyException(_query, _lastSeenSequence));
+ }
+ }
+
+ public OperationRole Role() => OperationRole.Events;
+}
diff --git a/src/Marten/Events/Dcb/DcbTagVersionBumpOperation.cs b/src/Marten/Events/Dcb/DcbTagVersionBumpOperation.cs
new file mode 100644
index 0000000000..fee468d416
--- /dev/null
+++ b/src/Marten/Events/Dcb/DcbTagVersionBumpOperation.cs
@@ -0,0 +1,104 @@
+#nullable enable
+using System;
+using System.Collections.Generic;
+using System.Data.Common;
+using System.Threading;
+using System.Threading.Tasks;
+using JasperFx.Events;
+using Marten.Internal;
+using Marten.Internal.Operations;
+using Marten.Services;
+using NpgsqlTypes;
+using Weasel.Postgresql;
+
+namespace Marten.Events.Dcb;
+
+///
+/// Producer-side bump for the DCB tag-version side table. Every save that appends
+/// a tagged event (boundary or not) queues one of these per distinct
+/// (tag_table, tag_value) tuple so the side table reflects every commit, not only
+/// boundary saves. Without this hook, a plain
+/// session.Events.Append(streamId, taggedEvent) would commit silently
+/// without invalidating any in-flight DCB boundary that captured the prior
+/// version — see #4591.
+///
+///
+/// Emits a single multi-row INSERT … ON CONFLICT DO UPDATE statement (mirroring
+/// the per-statement
+/// shape: no RETURNING, empty PostprocessAsync) — that keeps the batch protocol
+/// trivial. A previous design used N separate statements with RETURNING which
+/// interleaved the batched result sets with neighbouring ops' result sets and
+/// shifted reader positions in subtle ways.
+///
+internal class DcbTagVersionBumpOperation: IStorageOperation, NoDataReturnedCall
+{
+ private readonly EventGraph _events;
+ private readonly IReadOnlyList<(string TagTable, string TagValue)> _orderedEntries;
+
+ public DcbTagVersionBumpOperation(
+ EventGraph events,
+ IReadOnlyList<(string TagTable, string TagValue)> entries)
+ {
+ _events = events;
+
+ // Deterministic order across concurrent appenders → no deadlocks when
+ // two saves touch overlapping tag rows. Same rationale as
+ // DcbTagVersionAssertion.
+ var sorted = new (string, string)[entries.Count];
+ for (var i = 0; i < entries.Count; i++) sorted[i] = entries[i];
+ Array.Sort(sorted, static (a, b) =>
+ {
+ var byTable = string.CompareOrdinal(a.Item1, b.Item1);
+ return byTable != 0 ? byTable : string.CompareOrdinal(a.Item2, b.Item2);
+ });
+ _orderedEntries = sorted;
+ }
+
+ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session)
+ {
+ var schema = _events.DatabaseSchemaName;
+ var tenantId = session.TenantId;
+
+ // One multi-row INSERT — VALUES list feeds straight into the
+ // ON CONFLICT DO UPDATE that bumps each row by 1. Row-level locks are
+ // taken in (tag_table, tag_value) sorted order via the index seek, so
+ // concurrent appenders touching the same tag set can't deadlock.
+ builder.Append("insert into ");
+ builder.Append(schema);
+ builder.Append(".mt_dcb_tag_version (tag_table, tag_value, tenant_id, version) values ");
+
+ for (var i = 0; i < _orderedEntries.Count; i++)
+ {
+ if (i > 0) builder.Append(", ");
+ builder.Append("(");
+
+ var tableParam = builder.AppendParameter(_orderedEntries[i].TagTable);
+ tableParam.NpgsqlDbType = NpgsqlDbType.Varchar;
+ builder.Append(", ");
+
+ var valueParam = builder.AppendParameter(_orderedEntries[i].TagValue);
+ valueParam.NpgsqlDbType = NpgsqlDbType.Text;
+ builder.Append(", ");
+
+ var tenantParam = builder.AppendParameter(tenantId);
+ tenantParam.NpgsqlDbType = NpgsqlDbType.Varchar;
+ builder.Append(", 1)");
+ }
+
+ // ON CONFLICT DO UPDATE references the existing row via the
+ // unqualified table name (`mt_dcb_tag_version.version`) — Postgres
+ // does not accept the schema prefix in this clause.
+ builder.Append(" on conflict (tag_table, tag_value, tenant_id) do update set version = mt_dcb_tag_version.version + 1");
+ }
+
+ public Type DocumentType => typeof(IEvent);
+
+ public Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token)
+ {
+ // No RETURNING; mirror InsertEventTagOperation's empty PostprocessAsync.
+ // The outer OperationPage advances past our single result set on its own.
+ return Task.CompletedTask;
+ }
+
+ public OperationRole Role() => OperationRole.Events;
+}
diff --git a/src/Marten/Events/Dcb/EventBoundary.cs b/src/Marten/Events/Dcb/EventBoundary.cs
index 9ca19bb111..0da082a902 100644
--- a/src/Marten/Events/Dcb/EventBoundary.cs
+++ b/src/Marten/Events/Dcb/EventBoundary.cs
@@ -11,8 +11,8 @@ namespace Marten.Events.Dcb;
///
/// Marten's concrete implementation of the lifted
/// contract. Boundary
-/// consistency is enforced by at
-/// SaveChangesAsync() time.
+/// consistency is enforced by at
+/// SaveChangesAsync() time — see #4591.
///
internal class EventBoundary: IEventBoundary where T : class
{
diff --git a/src/Marten/Events/Dcb/FetchForWritingByTagsHandler.cs b/src/Marten/Events/Dcb/FetchForWritingByTagsHandler.cs
index b00dbd81ff..add6164c82 100644
--- a/src/Marten/Events/Dcb/FetchForWritingByTagsHandler.cs
+++ b/src/Marten/Events/Dcb/FetchForWritingByTagsHandler.cs
@@ -23,6 +23,12 @@ internal class FetchForWritingByTagsHandler: IQueryHandler>
private readonly DocumentStore _store;
private readonly EventTagQuery _query;
+ // Distinct (TagTable, TagValue) targets for the side-table UPSERT + capture
+ // step appended at the tail of ConfigureCommand. Populated there, replayed
+ // in HandleAsync so the captured versions land in the same slots the
+ // assertion expects. See #4591.
+ private List<(string TagTable, string TagValue)>? _versionTargets;
+
public FetchForWritingByTagsHandler(DocumentStore store, EventTagQuery query)
{
_store = store;
@@ -122,6 +128,77 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session)
}
builder.Append(" order by e.seq_id");
+
+ // #4591: append the DCB tag-version capture step as a second statement
+ // in the same command. The events SELECT above is unchanged; this just
+ // SELECTs the current version for each (tag_table, tag_value) in the
+ // query so the captured version flows into DcbTagVersionAssertion's
+ // INSERT … ON CONFLICT DO UPDATE WHERE at SaveChangesAsync time.
+ AppendVersionCapture(builder, session);
+ }
+
+ private void AppendVersionCapture(ICommandBuilder builder, IMartenSession session)
+ {
+ var conditions = _query.Conditions;
+ var schema = _store.Events.DatabaseSchemaName;
+
+ // Distinct by (TagTable, TagValue) — the side table is keyed by table
+ // suffix + canonical string value (see TagValueStringifier), not by the
+ // condition's optional EventType filter.
+ var targets = new List<(string TagTable, string TagValue)>(conditions.Count);
+ var seen = new HashSet<(string, string)>();
+ foreach (var condition in conditions)
+ {
+ var registration = _store.Events.FindTagType(condition.TagType)
+ ?? throw new InvalidOperationException(
+ $"Tag type '{condition.TagType.Name}' is not registered.");
+ var raw = registration.ExtractValue(condition.TagValue);
+ var tagValue = TagValueStringifier.Stringify(raw);
+ var key = (registration.TableSuffix, tagValue);
+ if (seen.Add(key))
+ {
+ targets.Add((registration.TableSuffix, tagValue));
+ }
+ }
+
+ _versionTargets = targets;
+
+ var tenantId = session.TenantId;
+
+ // Use StartNewCommand (not `; `) so Npgsql sends a separate
+ // batched command. Multiple `;`-separated statements in a single
+ // prepared statement raise Postgres SQLSTATE 42601.
+ builder.StartNewCommand();
+
+ // Plain SELECT — no INSERT here. The fetch query shares the session
+ // connection, and any INSERT-OR-IGNORE here would hold the row lock
+ // until SaveChangesAsync, deadlocking concurrent fetchers that all
+ // target the same boundary row. Row creation is deferred to
+ // DcbTagVersionAssertion's INSERT … ON CONFLICT DO UPDATE WHERE, which
+ // runs inside the save transaction where holding the lock is correct.
+ //
+ // Missing rows return no row from this SELECT; HandleAsync substitutes
+ // captured_version = 0, and the save's INSERT path takes care of the
+ // first-time create (with concurrent first-time creators racing on the
+ // unique-PK constraint, exactly as desired).
+ builder.Append("select tag_table, tag_value, version from ");
+ builder.Append(schema);
+ builder.Append(".mt_dcb_tag_version where tenant_id = ");
+ var tenantParam = builder.AppendParameter(tenantId);
+ tenantParam.NpgsqlDbType = NpgsqlDbType.Varchar;
+ builder.Append(" and (tag_table, tag_value) in (");
+ for (var i = 0; i < targets.Count; i++)
+ {
+ if (i > 0) builder.Append(", ");
+ builder.Append("(");
+ var tableParam = builder.AppendParameter(targets[i].TagTable);
+ tableParam.NpgsqlDbType = NpgsqlDbType.Varchar;
+ builder.Append(", ");
+ var valueParam = builder.AppendParameter(targets[i].TagValue);
+ valueParam.NpgsqlDbType = NpgsqlDbType.Text;
+ builder.Append(")");
+ }
+ builder.Append(")");
}
private string[]? resolveAggregatorEventTypeNames()
@@ -154,6 +231,14 @@ public async Task> HandleAsync(DbDataReader reader, IMartenSes
var lastSeenSequence = events.Count > 0 ? events.Max(e => e.Sequence) : 0;
+ // #4591: second result set from ConfigureCommand — per-tag captured
+ // versions for the side-table assertion. Always present because
+ // AppendVersionUpsertAndCapture always runs (any DCB tag query
+ // necessarily references at least one tag).
+ await reader.NextResultAsync(token).ConfigureAwait(false);
+
+ var capturedVersions = await readCapturedVersions(reader, token).ConfigureAwait(false);
+
T? aggregate = default;
if (events.Count > 0)
{
@@ -167,12 +252,40 @@ public async Task> HandleAsync(DbDataReader reader, IMartenSes
aggregate = await aggregator.BuildAsync(events, docSession, default, token).ConfigureAwait(false);
}
- var assertion = new AssertDcbConsistency(_store.Events, _query, lastSeenSequence);
+ var assertion = new DcbTagVersionAssertion(_store.Events, _query, lastSeenSequence, capturedVersions);
docSession.QueueOperation(assertion);
return new EventBoundary(docSession, _store.Events, aggregate, events, lastSeenSequence);
}
+ private async Task> readCapturedVersions(
+ DbDataReader reader, CancellationToken token)
+ {
+ // SELECT returns only the rows that exist. A missing row means no prior
+ // save has touched this tag boundary yet — captured version = 0, and the
+ // assertion's INSERT … ON CONFLICT will create the row at save time
+ // (with concurrent first-time creators racing on the unique-PK insert).
+ var byKey = new Dictionary<(string, string), long>();
+ while (await reader.ReadAsync(token).ConfigureAwait(false))
+ {
+ var tagTable = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false);
+ var tagValue = await reader.GetFieldValueAsync(1, token).ConfigureAwait(false);
+ var version = await reader.GetFieldValueAsync(2, token).ConfigureAwait(false);
+ byKey[(tagTable, tagValue)] = version;
+ }
+
+ var targets = _versionTargets!;
+ var entries = new DcbTagVersionEntry[targets.Count];
+ for (var i = 0; i < targets.Count; i++)
+ {
+ var key = (targets[i].TagTable, targets[i].TagValue);
+ byKey.TryGetValue(key, out var version); // missing → version = 0
+ entries[i] = new DcbTagVersionEntry(targets[i].TagTable, targets[i].TagValue, version);
+ }
+
+ return entries;
+ }
+
public Task StreamJson(Stream stream, DbDataReader reader, CancellationToken token)
{
throw new NotSupportedException();
diff --git a/src/Marten/Events/Dcb/HStoreDcbQueryFragment.cs b/src/Marten/Events/Dcb/HStoreDcbQueryFragment.cs
index d7316f12d1..12271ae273 100644
--- a/src/Marten/Events/Dcb/HStoreDcbQueryFragment.cs
+++ b/src/Marten/Events/Dcb/HStoreDcbQueryFragment.cs
@@ -10,8 +10,8 @@ namespace Marten.Events.Dcb;
/// Shared SQL-emission helpers for DCB queries when
/// is in effect. The same containment predicate shape is needed by
/// , ,
-/// , and the inline query builder on
-/// ; this helper keeps the SQL identical across all four.
+/// and the inline query builder on ; this helper keeps the
+/// SQL identical across all three.
///
internal static class HStoreDcbQueryFragment
{
diff --git a/src/Marten/Events/Dcb/TagValueStringifier.cs b/src/Marten/Events/Dcb/TagValueStringifier.cs
new file mode 100644
index 0000000000..b156cee869
--- /dev/null
+++ b/src/Marten/Events/Dcb/TagValueStringifier.cs
@@ -0,0 +1,38 @@
+using System;
+using System.Globalization;
+
+namespace Marten.Events.Dcb;
+
+///
+/// Converts a tag value (the primitive returned by
+/// ) to its
+/// canonical string form for storage in the heterogeneous
+/// mt_dcb_tag_version side table. The DCB tag-version side table is keyed
+/// across every registered tag type, so values are stringified to text rather
+/// than stored in their native types.
+///
+///
+/// Stable formatting matters: the same tag value must produce the same string
+/// across processes and across .NET runtimes so concurrent appenders agree on
+/// which row to lock.
+///
+/// - : "d" (lowercase, hyphenated) format.
+/// - : passed through.
+/// - Numeric primitives: .
+///
+///
+internal static class TagValueStringifier
+{
+ public static string Stringify(object value)
+ {
+ ArgumentNullException.ThrowIfNull(value);
+
+ return value switch
+ {
+ string s => s,
+ Guid g => g.ToString("d"),
+ IFormattable f => f.ToString(null, CultureInfo.InvariantCulture),
+ _ => value.ToString() ?? string.Empty
+ };
+ }
+}
diff --git a/src/Marten/Events/EventGraph.FeatureSchema.cs b/src/Marten/Events/EventGraph.FeatureSchema.cs
index c3c5281dab..11a15f3de2 100644
--- a/src/Marten/Events/EventGraph.FeatureSchema.cs
+++ b/src/Marten/Events/EventGraph.FeatureSchema.cs
@@ -124,5 +124,14 @@ private IEnumerable createAllSchemaObjects()
yield return new EventTagTable(this, tagRegistration);
}
}
+
+ // #4591: side-table that serializes truly-concurrent DCB tag appends.
+ // Only relevant if at least one DCB tag type is registered (since the
+ // table is keyed by tag_table/tag_value and would be dead weight in
+ // event stores that don't use tags at all).
+ if (_tagTypes.Count > 0)
+ {
+ yield return new DcbTagVersionTable(this);
+ }
}
}
diff --git a/src/Marten/Events/EventStore.Dcb.cs b/src/Marten/Events/EventStore.Dcb.cs
index cc1edd1582..65431fdf3b 100644
--- a/src/Marten/Events/EventStore.Dcb.cs
+++ b/src/Marten/Events/EventStore.Dcb.cs
@@ -61,21 +61,22 @@ private async Task> QueryByTagsAsync(EventTagQuery query,
public async Task> FetchForWritingByTags(EventTagQuery query,
CancellationToken cancellation = default) where T : class
{
- var eventTypeNames = ResolveAggregatorEventTypeNames();
- var events = await QueryByTagsAsync(query, eventTypeNames, cancellation).ConfigureAwait(false);
- var lastSeenSequence = events.Count > 0 ? events.Max(e => e.Sequence) : 0;
-
- T? aggregate = default;
- if (events.Count > 0)
+ if (query.Conditions.Count == 0)
{
- aggregate = await AggregateEventsAsync(events, cancellation).ConfigureAwait(false);
+ throw new ArgumentException("EventTagQuery must have at least one condition.");
}
- // Register the DCB assertion to run at SaveChangesAsync time
- var assertion = new AssertDcbConsistency(_store.Events, query, lastSeenSequence);
- _session.QueueOperation(assertion);
+ await _session.Database.EnsureStorageExistsAsync(typeof(IEvent), cancellation).ConfigureAwait(false);
- return new EventBoundary(_session, _store.Events, aggregate, events, lastSeenSequence);
+ // #4591: route the non-batched path through the same handler used by
+ // BatchedQuery.FetchForWritingByTags. The handler is now responsible
+ // for (a) selecting events, (b) reading the captured per-tag versions
+ // from the mt_dcb_tag_version side table, and (c) queuing the
+ // DcbTagVersionAssertion that bumps those rows at SaveChangesAsync
+ // time. Keeping the two entry points pointed at one implementation
+ // means the boundary serialization can't drift between them.
+ var handler = new FetchForWritingByTagsHandler(_store, query);
+ return await _session.ExecuteHandlerAsync(handler, cancellation).ConfigureAwait(false);
}
private async Task AggregateEventsAsync(IReadOnlyList events,
@@ -221,19 +222,4 @@ private static async Task> ReadEventsFromReaderAsync(DbDat
return (sb.ToString(), paramValues);
}
-
- private string[]? ResolveAggregatorEventTypeNames() where T : class
- {
- var aggregator = _store.Options.Projections.AggregatorFor();
- if (aggregator is not EventFilterable filterable) return null;
-
- var includedTypes = filterable.IncludedEventTypes;
- if (includedTypes.Count == 0 || includedTypes.Any(x => x.IsAbstract || x.IsInterface)) return null;
-
- var additionalAliases = _store.Events.AliasesForEvents(includedTypes);
- return includedTypes
- .Select(x => _store.Events.EventMappingFor(x).Alias)
- .Union(additionalAliases)
- .ToArray();
- }
}
diff --git a/src/Marten/Events/Operations/EventTagOperations.cs b/src/Marten/Events/Operations/EventTagOperations.cs
index 5d0cadb9e4..b664db6d8e 100644
--- a/src/Marten/Events/Operations/EventTagOperations.cs
+++ b/src/Marten/Events/Operations/EventTagOperations.cs
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using JasperFx.Events;
+using Marten.Events.Dcb;
using Marten.Internal.Sessions;
using Marten.Storage;
@@ -7,6 +8,36 @@ namespace Marten.Events.Operations;
internal static class EventTagOperations
{
+ ///
+ /// #4591: queue the producer-side bump of mt_dcb_tag_version for every
+ /// distinct (tag_table, tag_value) tuple appearing on the stream's tagged
+ /// events. Must be called for every save that may write tagged events,
+ /// regardless of whether tags are persisted via per-type tables (TagTables),
+ /// the HStore column, or the bulk PostgreSQL function — without this, plain
+ /// session.Events.Append commits silently bypass the DCB boundary
+ /// check held by another in-flight session.
+ ///
+ public static void QueueDcbVersionBumpIfNeeded(EventGraph eventGraph, DocumentSessionBase session, StreamAction stream)
+ {
+ if (eventGraph.TagTypes.Count == 0) return;
+
+ var seen = new HashSet<(string, string)>();
+ var entries = new List<(string TagTable, string TagValue)>();
+
+ foreach (var @event in stream.Events)
+ {
+ var tags = @event.Tags;
+ if (tags == null || tags.Count == 0) continue;
+
+ CollectDcbVersionTargets(eventGraph, tags, seen, entries);
+ }
+
+ if (entries.Count > 0)
+ {
+ session.QueueOperation(new DcbTagVersionBumpOperation(eventGraph, entries));
+ }
+ }
+
///
/// Queue tag inserts using pre-assigned sequence numbers (Rich append mode).
///
@@ -79,6 +110,31 @@ public static void QueueTagOperationsByEventId(EventGraph eventGraph, DocumentSe
}
}
+ // #4591: collect canonical (tag_table, tag_value) tuples for the
+ // mt_dcb_tag_version producer-bump operation. Skips tag types that aren't
+ // registered for storage and dedupes tuples already seen in this save.
+ private static void CollectDcbVersionTargets(EventGraph eventGraph,
+ IReadOnlyList tags,
+ HashSet<(string, string)> seen,
+ List<(string TagTable, string TagValue)> entries)
+ {
+ foreach (var tag in tags)
+ {
+ var registration = eventGraph.FindTagType(tag.TagType);
+ if (registration == null) continue;
+
+ var raw = registration.ExtractValue(tag.Value);
+ if (raw == null) continue;
+
+ var canonical = TagValueStringifier.Stringify(raw);
+ var key = (registration.TableSuffix, canonical);
+ if (seen.Add(key))
+ {
+ entries.Add((registration.TableSuffix, canonical));
+ }
+ }
+ }
+
///
/// Build an HSTORE-compatible Dictionary<string, string> from an event's
/// tag bag. Tags whose type isn't registered are skipped (mirrors the per-tag-table
diff --git a/src/Marten/Events/QuickEventAppender.cs b/src/Marten/Events/QuickEventAppender.cs
index a85eae9fff..19330faab5 100644
--- a/src/Marten/Events/QuickEventAppender.cs
+++ b/src/Marten/Events/QuickEventAppender.cs
@@ -98,6 +98,13 @@ private static void registerOperationsForStreams(EventGraph eventGraph, Document
}
}
}
+
+ // #4591: queue the DCB tag-version producer-bump once per stream,
+ // regardless of which tag-write path above ran. The non-HStore
+ // bulk QuickAppend code path writes tags inside the PostgreSQL
+ // function — without this hook, those commits would silently
+ // bypass any in-flight DCB boundary check.
+ EventTagOperations.QueueDcbVersionBumpIfNeeded(eventGraph, session, stream);
}
}
diff --git a/src/Marten/Events/RichEventAppender.cs b/src/Marten/Events/RichEventAppender.cs
index 9784858caa..14bfab6e80 100644
--- a/src/Marten/Events/RichEventAppender.cs
+++ b/src/Marten/Events/RichEventAppender.cs
@@ -66,6 +66,12 @@ public async Task ProcessEventsAsync(EventGraph eventGraph, DocumentSessionBase
}
EventTagOperations.QueueTagOperations(eventGraph, session, stream);
+
+ // #4591: always queue the DCB tag-version producer-bump separately
+ // from the per-storage-mode tag writes. Boundary-fetch sessions in
+ // flight rely on this row-level bump to invalidate their captured
+ // versions.
+ EventTagOperations.QueueDcbVersionBumpIfNeeded(eventGraph, session, stream);
}
// Queue AssertStreamVersion operations for streams with AlwaysEnforceConsistency but no events
diff --git a/src/Marten/Events/Schema/DcbTagVersionTable.cs b/src/Marten/Events/Schema/DcbTagVersionTable.cs
new file mode 100644
index 0000000000..5e8ebd4cdc
--- /dev/null
+++ b/src/Marten/Events/Schema/DcbTagVersionTable.cs
@@ -0,0 +1,46 @@
+using Weasel.Postgresql;
+using Weasel.Postgresql.Tables;
+
+namespace Marten.Events.Schema;
+
+///
+/// Side-table that converts the DCB tag-boundary check from a racy predicate read
+/// (SELECT EXISTS over mt_events) into a row-level write conflict. One row per
+/// (tag_table, tag_value, tenant_id); the row's version is bumped on every
+/// FetchForWritingByTags → SaveChangesAsync round trip, and the save's
+/// UPDATE … WHERE version = $captured is the serialization point. See
+/// . Fixes #4591.
+///
+internal class DcbTagVersionTable: Table
+{
+ public DcbTagVersionTable(EventGraph events)
+ : base(new PostgresqlObjectName(events.DatabaseSchemaName, "mt_dcb_tag_version"))
+ {
+ // text columns — the table is keyed across heterogeneous tag types, so
+ // values are stringified to a canonical form (see TagValueStringifier).
+ AddColumn("tag_table", "varchar").NotNull().AsPrimaryKey();
+ AddColumn("tag_value", "text").NotNull().AsPrimaryKey();
+
+ // Always present, regardless of tenancy style. In single-tenant stores
+ // every row uses the default tenant id ('*DEFAULT*'); in conjoined
+ // stores tenant_id is part of the PK so tenant A's bumps never collide
+ // with tenant B's.
+ // Weasel's DefaultValueByString quotes the value itself, so pass the
+ // raw literal *DEFAULT* (not '*DEFAULT*') to get `DEFAULT '*DEFAULT*'`.
+ AddColumn("tenant_id", "varchar").DefaultValueByString("*DEFAULT*").NotNull().AsPrimaryKey();
+
+ // No index on version — keeps UPDATEs HOT-eligible (no index touch on
+ // bump), which matters because this table sees an UPDATE on every
+ // boundary save.
+ AddColumn("version", "bigint").DefaultValueByExpression("0").NotNull();
+
+ PrimaryKeyName = "pk_mt_dcb_tag_version";
+
+ // No index on version — keeping that column unindexed means UPDATEs
+ // remain HOT-eligible (only the PK index lookup, no index-entry rewrite
+ // on bump). A lower heap fillfactor would help further by leaving free
+ // space in each page for in-place updates, but Weasel.Postgresql's Table
+ // doesn't currently expose `WITH (fillfactor = N)` on the heap table —
+ // tracked as a follow-up.
+ }
+}