Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions docs/events/dcb.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,18 @@ catch (DcbConcurrencyException ex)
The consistency check only detects events that match the **same tag query**. Events appended to unrelated tags or streams will not cause a violation.
:::

### How the boundary check serializes <Badge type="tip" text="9.4" />

::: warning Upgrading from 9.3 or earlier
Marten 9.4 added a new schema object — `mt_dcb_tag_version` — to fix [#4591](https://github.com/JasperFx/marten/issues/4591). Deployments with `AutoCreate.None` must run `db-patch` / `db-apply` before deploying 9.4. See [Migration Guide → 9.4 schema migration](/migration-guide#required-schema-migration-dcb-tag-version-side-table).
:::

Internally, `FetchForWritingByTags` records the captured version of every tag value referenced by the query in a side table (`mt_dcb_tag_version`, one row per `(tag_table, tag_value, tenant_id)`). At `SaveChangesAsync` time Marten emits an `INSERT … ON CONFLICT DO UPDATE … WHERE version = $captured RETURNING 1` for each captured row. The row-level lock plus the captured-version predicate is the serialization point: two truly-concurrent appenders observing the same captured version both attempt to bump the row — the first wins, the second's `RETURNING` matches no rows and surfaces `DcbConcurrencyException`. This works at PostgreSQL's default `READ COMMITTED` isolation; no `SERIALIZABLE`, no advisory locks.

Every save that appends a tagged event — boundary or otherwise — also queues a producer-side bump against the same row. That's what keeps a plain `session.Events.Append(streamId, taggedEvent)` from silently committing past an in-flight boundary fetch held by another session: the version moves on every commit, not only on boundary saves.

The side table grows with **distinct boundary-tag values**, not with event volume, and is never deleted automatically — the same `StudentId` or `CourseId` reuses its row across every save. Avoid using ephemeral or one-shot values as DCB tags if you want to keep the table compact.

## Checking Event Existence

If you only need to know whether any events matching a tag query exist -- without loading or deserializing them -- use `EventsExistAsync`. This is a lightweight `SELECT EXISTS(...)` query that avoids the overhead of fetching and materializing event data:
Expand Down Expand Up @@ -384,17 +396,17 @@ A reproducible side-by-side benchmark lives at `src/DcbLoadTest` and can be re-r
Guidance:

- **Prefer HStore** when your DCB queries match on **two or more tag types** (the common case — most projection boundaries combine an aggregate-id tag with one or more domain tags). The JOIN cost on TagTables grows with each additional tag type; HStore stays flat.
- **Prefer HStore** when your hot path is `FetchForWritingByTags` (consistency-boundary read-modify-write). Round-trip drops roughly in half because both the read and the consistency-check `EXISTS` become single-table lookups.
- **Prefer HStore** when your hot path is `FetchForWritingByTags` (consistency-boundary read-modify-write). The fetch round-trip drops because the events `SELECT` is a single-table lookup instead of an N-way JOIN.
- **Stay on TagTables** if your DCB workload is dominated by **single-tag `EventsExistAsync` probes**. That case is what the per-type tables are optimized for — a primary-key lookup on a small dedicated table — and HStore's GIN containment is slightly slower per probe.
- **Either mode is fine** for append throughput. With tags, HStore is about 30% faster than TagTables because it issues one `UPDATE` per tagged event instead of one `INSERT` per `(event, tag)` pair.

If you're starting a new event store on Marten 9.0 and most of your projections key off `(aggregateId, someOtherTag)`, HStore is the recommended choice. If you're upgrading from Marten 8 and already have a populated TagTables-mode store, there is no compelling reason to switch.

### Consistency Check
### Consistency Check <Badge type="tip" text="9.4" />

At `SaveChangesAsync` time, Marten executes an `EXISTS` query checking for new events matching the tag query with `seq_id > lastSeenSequence`. This runs in the same transaction as the event appends, providing serializable consistency for the tagged boundary.
At `SaveChangesAsync` time, Marten emits a per-tag `UPDATE … WHERE version = $captured` against the `mt_dcb_tag_version` side table — one statement per distinct `(tag_table, tag_value)` tuple in the boundary query, in deterministic sort order. The row-level write lock plus the version predicate is the serialization point: two concurrent appenders capturing the same version both try to bump it; one wins, the other's `UPDATE` matches zero rows and surfaces `DcbConcurrencyException`. This works at PostgreSQL's default `READ COMMITTED` isolation — no advisory locks, no `SERIALIZABLE` transactions.

The shape of the `EXISTS` query depends on the storage mode (multi-table `INNER JOIN` for TagTables, single-table `@>` containment for HStore), but the behavior is identical from the caller's perspective.
The check shape no longer depends on `DcbStorageMode`. Both `TagTables` and `HStore` share the same side-table mechanism for the consistency check; the storage mode only affects how tags are physically read at fetch time and written at append time.

### Tag Routing

Expand Down
28 changes: 28 additions & 0 deletions docs/migration-guide.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
# Migration Guide

## Key Changes in 9.4.0

### Required schema migration — DCB tag-version side table <Badge type="warning" text="action required" />

If you register **any** DCB tag types via `Events.RegisterTagType<T>()` (or auto-discovery from `SingleStreamProjection<TDoc, TId>` with a strong-typed `TId`), Marten 9.4 introduces a new schema object: a side table `mt_dcb_tag_version` in your event-store schema. The table is created automatically on first save under the default `AutoCreate.CreateOrUpdate`, but **deployments that pin `AutoCreate.None` and ship schema changes via `db-patch` / `db-apply` must run the migration before deploying 9.4**, or saves with tagged events will fail with `relation "<schema>.mt_dcb_tag_version" does not exist`.

```bash
# Generate the patch against your production-equivalent DB
dotnet run -- db-patch ./schema-9.4.sql --drop ./schema-9.4.drop.sql

# Or apply directly if your deploy pipeline runs Marten with elevated DDL rights
dotnet run -- db-apply
```

#### What changes on every save (with DCB tags)

Beyond the new schema object, every save that appends a tagged event now also queues one extra `INSERT … ON CONFLICT DO UPDATE` against `mt_dcb_tag_version`. This is the *producer-side bump* that makes the boundary check serializable. The overhead is one row write per distinct `(tag_type, tag_value)` tuple referenced by the batch — typically one or two rows per save.

#### Why this lands as a required migration in a point release

This is the fix for [#4591](https://github.com/JasperFx/marten/issues/4591) — a correctness bug where truly-concurrent DCB tag-boundary appends could **both commit** when the contract demands exactly one. The check pre-9.4 emitted a `SELECT EXISTS (… FROM mt_events …)` as a separate non-locking statement before the INSERTs at `READ COMMITTED`, leaving an open race window. Two concurrent fetch→save sessions both ran the check before either committed, both saw no conflict, both inserted. The bug affected both `DcbStorageMode.HStore` and `DcbStorageMode.TagTables` — the predicate shape differed but the racy `SELECT-then-INSERT` pattern was identical.

The side-table mechanism converts the predicate read into a row-level write conflict, so concurrent boundary saves serialize on a row lock at `READ COMMITTED` — no `SERIALIZABLE`, no advisory locks. See [DCB → Consistency Check](/events/dcb#consistency-check) for the full mechanism.

#### Growth and cleanup

The side table grows with **distinct boundary-tag values**, not with event volume — the same `StudentId` or `CourseId` reuses its row across every save. Rows are never deleted automatically; avoid using ephemeral or one-shot values as DCB tags if you want to keep the table compact.

## Key Changes in 9.0.0

### Platform support
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using JasperFx.Events;
using JasperFx.Events.Tags;
using Marten;
using Marten.Events;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;
using Xunit.Abstractions;

namespace EventSourcingTests.Dcb;

// #4591 regression. Pre-fix, AssertDcbConsistency emitted a `SELECT EXISTS (...)` over
// mt_events as a separate non-locking statement before the INSERTs, with the transaction
// at READ COMMITTED. Two concurrent fetch-and-save sessions both ran their EXISTS check
// before either committed, both saw no conflict, both committed → 5–7 of 8 racers
// committed when the contract demands exactly one.
//
// The fix (see DcbTagVersionTable + DcbTagVersionAssertion) replaces the predicate
// read with an UPDATE … WHERE version = $captured on a side table keyed by
// (tag_table, tag_value, tenant_id). The UPDATE acquires a row lock; the WHERE
// clause is the optimistic check; concurrent saves serialize on the row.
//
// Run-mode coverage:
// - [Theory] over DcbStorageMode (HStore + TagTables) so the side-table fix is
// verified for both physical-storage shapes, not just the HStore one in the
// reporter's environment.
// - Reporter's environment was specifically HStore + Conjoined + LightweightSession.
// The Theory pins that variant and adds the TagTables variant alongside it.
[Collection("OneOffs")]
public class Bug_4591_truly_concurrent_dcb_tag_appends_both_commit: OneOffConfigurationsContext
{
private readonly ITestOutputHelper _output;

public Bug_4591_truly_concurrent_dcb_tag_appends_both_commit(ITestOutputHelper output)
{
_output = output;
}

private void ConfigureStore(DcbStorageMode storageMode)
{
StoreOptions(opts =>
{
opts.Events.AddEventType<StudentEnrolled>();
opts.Events.AddEventType<AssignmentSubmitted>();

// Reporter's environment for HStore variant; the side-table fix has to
// hold for both DCB storage modes either way.
opts.Events.DcbStorageMode = storageMode;
opts.Events.TenancyStyle = JasperFx.MultiTenancy.TenancyStyle.Conjoined;
opts.Policies.AllDocumentsAreMultiTenanted();

opts.Events.RegisterTagType<StudentId>("student")
.ForAggregate<StudentCourseEnrollment>();
opts.Events.RegisterTagType<CourseId>("course")
.ForAggregate<StudentCourseEnrollment>();

opts.Projections.LiveStreamAggregation<StudentCourseEnrollment>();
});
}

[Theory]
[InlineData(DcbStorageMode.HStore)]
[InlineData(DcbStorageMode.TagTables)]
public async Task truly_concurrent_appends_sharing_a_tag_serialize_to_one_winner(DcbStorageMode storageMode)
{
ConfigureStore(storageMode);

const int Racers = 8;
const string TenantId = "acme";

// The DCB *boundary* tag — shared by every racer. This is the tag value
// the DCB consistency check queries against; the bug was a check-then-act
// race on this tag.
var courseId = new CourseId(Guid.NewGuid());

// Each racer also carries its OWN StudentId. EventBoundary routes events
// to a stream by the first tag with an AggregateType, so distinct
// per-racer StudentIds mean distinct streams — the
// (stream_id, version) unique constraint on mt_events does NOT serialize
// them. The race must be caught by the DCB tag check alone.

// Seed: one event under a DIFFERENT student stream but tagged with the
// shared CourseId so lastSeenSequence > 0.
var seedStudentId = new StudentId(Guid.NewGuid());
var seedStreamId = seedStudentId.Value;
await using (var seedSession = theStore.LightweightSession(TenantId))
{
var enrolled = seedSession.Events.BuildEvent(new StudentEnrolled("Seed", "Math"));
enrolled.WithTag(seedStudentId, courseId);
seedSession.Events.Append(seedStreamId, enrolled);
await seedSession.SaveChangesAsync();
}

// Barrier-sync the fetch → save handoff. Every session fetches and then
// awaits the same TCS; once all `Racers` sessions are past the fetch the
// barrier completes and every session races into its SaveChangesAsync
// simultaneously. The captured tag versions are therefore identical
// across all racers.
var allFetched = new TaskCompletionSource[Racers];
for (var i = 0; i < Racers; i++) allFetched[i] = new TaskCompletionSource();
var startSaves = new TaskCompletionSource();

// Query is by the SHARED CourseId — that's the boundary the DCB check
// must serialize.
var query = new EventTagQuery().Or<CourseId>(courseId);

var racerTasks = Enumerable.Range(0, Racers).Select(i => Task.Run(async () =>
{
// Each racer has its own StudentId → its own stream when routed
// by EventBoundary.AppendOne.
var racerStudentId = new StudentId(Guid.NewGuid());

await using var session = theStore.LightweightSession(TenantId);
var boundary = await session.Events.FetchForWritingByTags<StudentCourseEnrollment>(query);

allFetched[i].SetResult();
await startSaves.Task;

var append = session.Events.BuildEvent(new AssignmentSubmitted($"HW-{i}", 80 + i));
// studentId first → routes to studentId-stream (distinct per racer);
// courseId second → recorded against the shared boundary.
append.WithTag(racerStudentId, courseId);
boundary.AppendOne(append);

try
{
await session.SaveChangesAsync();
return (Index: i, Committed: true, Exception: (Exception?)null);
}
catch (DcbConcurrencyException ex)
{
return (Index: i, Committed: false, Exception: (Exception?)ex);
}
})).ToArray();

await Task.WhenAll(allFetched.Select(t => t.Task));
startSaves.SetResult();

var results = await Task.WhenAll(racerTasks);

var committed = results.Count(r => r.Committed);
var throws = results.Count(r => r.Exception is DcbConcurrencyException);
var otherErrors = results.Where(r => r.Exception is not null and not DcbConcurrencyException).ToList();

_output.WriteLine($"DCB storage mode: {storageMode}");
_output.WriteLine($"Truly-concurrent racers: {Racers}");
_output.WriteLine($" Committed: {committed}");
_output.WriteLine($" DcbConcurrencyException: {throws}");
if (otherErrors.Count > 0)
{
_output.WriteLine($" Other exceptions: {otherErrors.Count}");
foreach (var er in otherErrors)
{
_output.WriteLine($" Racer {er.Index}: {er.Exception}");
}
}

// Exactly one racer should commit; everyone else must observe the
// DCB concurrency violation.
committed.ShouldBe(1,
$"Expected exactly one truly-concurrent append to commit; observed {committed} commits + {throws} DcbConcurrencyException throws (mode = {storageMode}).");
throws.ShouldBe(Racers - 1);
otherErrors.ShouldBeEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace EventSourcingTests.Dcb;
/// <see cref="DcbStorageMode.HStore"/>. Validates that the per-tenant filters added
/// to the HStore branches in <c>EventStore.Dcb.BuildTagQuerySql</c>,
/// <c>EventsExistByTagsHandler</c>, <c>FetchForWritingByTagsHandler</c>,
/// <c>AssertDcbConsistency</c>, and <c>SetEventTagsHstoreOperation</c> correctly
/// <c>DcbTagVersionAssertion</c>, and <c>SetEventTagsHstoreOperation</c> correctly
/// isolate DCB query, exists, aggregate, fetch-for-writing, and consistency-check
/// results between tenants.
/// </summary>
Expand Down
Loading
Loading