Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 29 additions & 43 deletions docs/events/projections/async-daemon.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
# Async Projections Daemon

::: tip
If you are experiencing any level of "stale high water" detection or getting log messages about "event skipping" with
Marten, you want to at least consider switching to the [QuickAppend](https://martendb.io/events/appending.html#rich-vs-quick-appends) option. The `QuickAppend`
mode is faster, and is substantially less likely to lead to gaps in the event sequence which in turn helps the async daemon
run more smoothly.
:::

The *Async Daemon* is the nickname for Marten's built in asynchronous projection processing engine. The current async daemon from Marten V4 on requires no other infrastructure
besides Postgresql and Marten itself. The daemon itself runs inside an [IHostedService](https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-5.0&tabs=visual-studio) implementation in your application. The **daemon is disabled by default**.

Expand All @@ -24,12 +17,6 @@ There are only two basic things to configure the *Async Daemon*:
1. Register the projections that should run asynchronously
2. Set the `StoreOptions.AsyncMode` to either `Solo` or `HotCold` (more on what these options mean later in this page)

:::warning
The asynchronous daemon service registration is **opt in** starting with V5 and requires the chained call
to `AddAsyncDaemon()` shown below. This was done to alleviate user issues with Marten inside of Azure Functions
where the runtime was not compatible with the hosted service for the daemon.
:::

As an example, this configures the daemon to run in the current node with a single active projection:

<!-- snippet: sample_bootstrap_daemon_solo -->
Expand Down Expand Up @@ -77,6 +64,13 @@ var host = await Host.CreateDefaultBuilder()
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/CommandLineRunner/AsyncDaemonBootstrappingSamples.cs#L88-L106' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_bootstrap_daemon_hotcold' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

::: tip
If you are experiencing any level of "stale high water" detection or getting log messages about "event skipping" with
Marten, you want to at least consider switching to the [QuickAppend](https://martendb.io/events/appending.html#rich-vs-quick-appends) option. The `QuickAppend`
mode is faster, and is substantially less likely to lead to gaps in the event sequence which in turn helps the async daemon
run more smoothly.
:::

## How the Daemon Works

![How Aggregation Works](/images/aggregation-projection-flow.png "How Aggregation Projections Work")
Expand All @@ -90,25 +84,24 @@ the current progression point, the daemon

## Solo vs. HotCold

::: tip
Marten's leader election is done with Postgresql advisory locks, so there is no additional software infrastructure necessary other than
Postgresql and Marten itself.
:::

::: tip
The "HotCold" mode was substantially changed for Marten 7.0 and will potentially run projections across different nodes
:::

As of right now, the daemon can run as one of two modes:

1. *Solo* -- the daemon will be automatically started when the application is bootstrapped and all projections and projection shards will be started on that node. The assumption with Solo
is that there is never more than one running system node for your application.
1. *HotCold* -- the daemon will use a built in [leader election](https://en.wikipedia.org/wiki/Leader_election) function individually for each
projection on each tenant database and **ensure that each projection is running on exactly one running process**.

Regardless of how things are configured, the daemon is designed to detect when multiple running processes are updating the same projection shard and will shut down the process if concurrency issues persist.
::: tip
When running in `HotCold` mode, Marten will monitor the Postgres advisory lock by running a `SELECT pg_catalog.pg_sleep(60)` query to detect if the database restarts or fails-over.
Without this monitoring, Marten will not be aware of the lock loss and multiple async daemons can start running concurrently across multiple nodes, causing application failure.

Some monitoring tools erroneously report this query as "load", however this query simply sleeps for 60 seconds and **does not** consume any database resources.
If this monitoring is undesirable for your scenario, you can opt-out by setting `options.Events.UseMonitoredAdvisoryLock` to false when configuring Marten.
:::

## Projection Distribution

If your Marten store is only using a single database, Marten will distribute projection by projection. If your store is using
If your Marten store is only using a single database, Marten will distribute projections by projection type. If your store is using
[separate databases for multi-tenancy](/configuration/multitenancy), the async daemon will group all projections for a single
database on the same executing node as a purposeful strategy to reduce the total number of connections to the databases.

Expand All @@ -126,25 +119,18 @@ and be able to replay events later after the fix.

## PgBouncer

::: tip
If you are also using [Wolverine](https://wolverinefx.net), its ability to [distribute Marten projections and subscriptions](https://wolverinefx.net/guide/durability/marten/distribution.html) does not depend on advisory
locks and also spreads work out more evenly through a cluster.
:::

If you use Marten's async daemon feature *and* [PgBouncer](https://www.pgbouncer.org/), make sure you're aware of some
[Npgsql configuration settings](https://www.npgsql.org/doc/compatibility.html#pgbouncer) for best usage with Marten. Marten's
async daemon uses [PostgreSQL Advisory Locks](https://www.postgresql.org/docs/current/explicit-locking.html) to help distribute work across an application cluster, and PgBouncer can
throw off that functionality without the connection settings in the Npgsql documentation linked above.

## Error Handling

::: warning
The async daemon error handling was rewritten for Marten 7.0. The new model uses
[Polly](https://www.thepollyproject.org/) for typical transient errors like network hiccups or a database being too
busy. Marten does have some configuration to alternatively skip certain errors in normal background operation or while
doing rebuilds.
::: tip
If you are also using [Wolverine](https://wolverinefx.net), its ability to [distribute Marten projections and subscriptions](https://wolverinefx.net/guide/durability/marten/distribution.html) does not depend on advisory
locks and also spreads work out more evenly through a cluster.
:::

## Error Handling

**In all examples, `opts` is a `StoreOptions` object. Besides the basic [Polly error handling](/configuration/retries#resiliency-policies),
you have these three options to configure error handling within your system's usage of asynchronous projections:

Expand Down Expand Up @@ -193,7 +179,7 @@ See the section on error handling. Poison event detection is a little more autom

## Accessing the Executing Async Daemon

New in Marten 7.0 is the ability to readily access the executing instance of the daemon for each database in your system.
Marten supports access to the executing instance of the daemon for each database in your system.
You can use this approach to track progress or start or stop individual projections like so:

<!-- snippet: sample_using_projection_coordinator -->
Expand Down Expand Up @@ -234,7 +220,7 @@ You can see the usage below from one of the Marten tests where we use that metho
daemon has caught up:

<!-- snippet: sample_using_WaitForNonStaleProjectionDataAsync -->
<a id='snippet-sample_using_waitfornonstaleprojectiondataasync'></a>
<a id='snippet-sample_using_WaitForNonStaleProjectionDataAsync'></a>
```cs
[Fact]
public async Task run_simultaneously()
Expand All @@ -255,7 +241,7 @@ public async Task run_simultaneously()
await CheckExpectedResults();
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/EventProjections/event_projections_end_to_end.cs#L28-L49' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_waitfornonstaleprojectiondataasync' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/EventProjections/event_projections_end_to_end.cs#L28-L49' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_WaitForNonStaleProjectionDataAsync' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The basic idea in your tests is to:
Expand Down Expand Up @@ -303,7 +289,7 @@ public async Task run_simultaneously()
The following code shows the diagnostics support for the async daemon as it is today:

<!-- snippet: sample_DaemonDiagnostics -->
<a id='snippet-sample_daemondiagnostics'></a>
<a id='snippet-sample_DaemonDiagnostics'></a>
```cs
public static async Task ShowDaemonDiagnostics(IDocumentStore store)
{
Expand All @@ -322,7 +308,7 @@ public static async Task ShowDaemonDiagnostics(IDocumentStore store)
Console.WriteLine($"The daemon high water sequence mark is {daemonHighWaterMark}");
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/CommandLineRunner/AsyncDaemonBootstrappingSamples.cs#L109-L128' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_daemondiagnostics' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/CommandLineRunner/AsyncDaemonBootstrappingSamples.cs#L109-L128' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_DaemonDiagnostics' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Command Line Support
Expand Down Expand Up @@ -443,7 +429,7 @@ from systems using Marten.
If your system is configured to export metrics and Open Telemetry data from Marten like this:

<!-- snippet: sample_enabling_open_telemetry_exporting_from_Marten -->
<a id='snippet-sample_enabling_open_telemetry_exporting_from_marten'></a>
<a id='snippet-sample_enabling_open_telemetry_exporting_from_Marten'></a>
```cs
// This is passed in by Project Aspire. The exporter usage is a little
// different for other tools like Prometheus or SigNoz
Expand All @@ -462,7 +448,7 @@ builder.Services.AddOpenTelemetry()
metrics.AddMeter("Marten");
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/samples/AspireHeadlessTripService/Program.cs#L21-L40' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_enabling_open_telemetry_exporting_from_marten' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/samples/AspireHeadlessTripService/Program.cs#L21-L40' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_enabling_open_telemetry_exporting_from_Marten' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

*And* you are running the async daemon in your system, you should see potentially activities for each running projection
Expand Down
10 changes: 5 additions & 5 deletions src/DaemonTests/Coordination/AdvisoryLocksTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public async Task get_lock_smoke_test()
await using var store = DocumentStore.For(ConnectionSource.ConnectionString);


await using var locks = new AdvisoryLock(((MartenDatabase)store.Storage.Database).DataSource, NullLogger.Instance, store.Storage.Database.Identifier);
await using var locks = new AdvisoryLock(((MartenDatabase)store.Storage.Database).DataSource, NullLogger.Instance, store.Storage.Database.Identifier, new AdvisoryLockOptions());

locks.HasLock(50).ShouldBeFalse();

Expand All @@ -33,8 +33,8 @@ public async Task get_exclusive_lock()
{
await using var store = DocumentStore.For(ConnectionSource.ConnectionString);

await using var locks = new AdvisoryLock(((MartenDatabase)store.Storage.Database).DataSource, NullLogger.Instance, store.Storage.Database.Identifier);
await using var locks2 = new AdvisoryLock(((MartenDatabase)store.Storage.Database).DataSource, NullLogger.Instance, store.Storage.Database.Identifier);
await using var locks = new AdvisoryLock(((MartenDatabase)store.Storage.Database).DataSource, NullLogger.Instance, store.Storage.Database.Identifier, new AdvisoryLockOptions());
await using var locks2 = new AdvisoryLock(((MartenDatabase)store.Storage.Database).DataSource, NullLogger.Instance, store.Storage.Database.Identifier, new AdvisoryLockOptions());

(await locks.TryAttainLockAsync(50, CancellationToken.None)).ShouldBeTrue();
(await locks2.TryAttainLockAsync(50, CancellationToken.None)).ShouldBeFalse();
Expand All @@ -51,7 +51,7 @@ public async Task get_multiple_locks()
{
await using var store = DocumentStore.For(ConnectionSource.ConnectionString);

await using var locks = new AdvisoryLock(((MartenDatabase)store.Storage.Database).DataSource, NullLogger.Instance, store.Storage.Database.Identifier);
await using var locks = new AdvisoryLock(((MartenDatabase)store.Storage.Database).DataSource, NullLogger.Instance, store.Storage.Database.Identifier, new AdvisoryLockOptions());

(await locks.TryAttainLockAsync(50, CancellationToken.None)).ShouldBeTrue();
(await locks.TryAttainLockAsync(51, CancellationToken.None)).ShouldBeTrue();
Expand All @@ -61,7 +61,7 @@ public async Task get_multiple_locks()
locks.HasLock(51).ShouldBeTrue();
locks.HasLock(52).ShouldBeTrue();

await using var locks2 = new AdvisoryLock(((MartenDatabase)store.Storage.Database).DataSource, NullLogger.Instance, store.Storage.Database.Identifier);
await using var locks2 = new AdvisoryLock(((MartenDatabase)store.Storage.Database).DataSource, NullLogger.Instance, store.Storage.Database.Identifier, new AdvisoryLockOptions());
(await locks2.TryAttainLockAsync(50, CancellationToken.None)).ShouldBeFalse();
(await locks2.TryAttainLockAsync(51, CancellationToken.None)).ShouldBeFalse();
(await locks2.TryAttainLockAsync(52, CancellationToken.None)).ShouldBeFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public MultiTenantedProjectionDistributor(DocumentStore store)
var logger = _store.Options.LogFactory?.CreateLogger<AdvisoryLock>() ??
_store.Options.DotNetLogger ?? NullLogger<AdvisoryLock>.Instance;

_locks = new(db => new AdvisoryLock(((MartenDatabase)db).DataSource, logger, db.Id.Identity));
_locks = new(db => new AdvisoryLock(((MartenDatabase)db).DataSource, logger, db.Id.Identity, new AdvisoryLockOptions { LockMonitoringEnabled = store.Options.Events.UseMonitoredAdvisoryLock }));
}

public async ValueTask DisposeAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public SingleTenantProjectionDistributor(DocumentStore store)
var logger = _store.Options.LogFactory?.CreateLogger<AdvisoryLock>() ??
_store.Options.DotNetLogger ?? NullLogger<AdvisoryLock>.Instance;

_locks = new(db => new AdvisoryLock(((MartenDatabase)db).DataSource, logger, db.Id.Identity));
_locks = new(db => new AdvisoryLock(((MartenDatabase)db).DataSource, logger, db.Id.Identity, new AdvisoryLockOptions { LockMonitoringEnabled = store.Options.Events.UseMonitoredAdvisoryLock }));
}

public ValueTask<IReadOnlyList<IProjectionSet>> BuildDistributionAsync()
Expand Down
43 changes: 2 additions & 41 deletions src/Marten/Events/EventGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,59 +197,20 @@ public IEvent BuildEvent(object eventData)

}

/// <summary>
/// Opt into some performance optimizations for projection rebuilds for both single stream and
/// multi-stream projections. This will result in new table columns and a potential database
/// migration. This will be a default in Marten 8.
/// </summary>
public bool UseOptimizedProjectionRebuilds { get; set; }

/// <summary>
/// Does Marten require a stream type for any new event streams? This will also
/// validate that an event stream already exists as part of appending events. Default in 7.0 is false,
/// but this will be true in 8.0
/// </summary>
public bool UseMandatoryStreamTypeDeclaration { get; set; }

public bool UseMonitoredAdvisoryLock { get; set; } = true;
public bool EnableAdvancedAsyncTracking { get; set; }

/// <summary>
/// This is an "opt in" feature to add the capability to mark some events as "skipped" in the database
/// meaning that they do not apply to projections or subscriptions. Use this to "cure" bad events
/// </summary>
public bool EnableEventSkippingInProjectionsOrSubscriptions { get; set; }

/// <summary>
/// Opt into using PostgreSQL list partitioning. This can have significant performance and scalability benefits
/// *if* you are also aggressively using event stream archiving
/// </summary>
public bool UseArchivedStreamPartitioning { get; set; }

/// <summary>
/// Optional extension point to receive published messages as a side effect from
/// aggregation projections
/// </summary>
public IMessageOutbox MessageOutbox { get; set; } = new NulloMessageOutbox();

/// <summary>
/// TimeProvider used for event timestamping metadata. Replace for controlling the timestamps
/// in testing
/// </summary>

[IgnoreDescription]
public TimeProvider TimeProvider { get; set; } = TimeProvider.System;

/// <summary>
/// Opt into having Marten create a unique index on Event.Id. The default is false. This may
/// be helpful if you need to create an external reference id to another system, or need to
/// load events by their Id
/// </summary>
public bool EnableUniqueIndexOnEventId { get; set; } = false;

/// <summary>
/// Opt into having Marten process "side effects" on aggregation projections
/// (SingleStreamProjection/MultiStreamProjection) while
/// running in an Inline lifecycle. Default is false;
/// </summary>
public bool EnableSideEffectsOnInlineProjections { get; set; } = false;

/// <summary>
Expand Down
9 changes: 9 additions & 0 deletions src/Marten/Events/IEventStoreOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ public interface IEventStoreOptions
/// </summary>
public bool UseMandatoryStreamTypeDeclaration { get; set; }

/// <summary>
/// Enables a background monitor to detect if the advisory lock is lost due to database restart or fail-over. Prevents situations where concurrent running of async daemons may occur on system recovery.
/// Only relevant when using the async daemon in HotCold mode. Enabled by default.
/// </summary>
/// <remarks>
/// This will show up as a SELECT SLEEP query with a 60-second sleep interval. This does not add any additional load to your database, regardless of what your monitoring tools might say.
/// </remarks>
public bool UseMonitoredAdvisoryLock { get; set; }

/// <summary>
/// Opt into different aliasing styles for .NET event types
/// </summary>
Expand Down
9 changes: 9 additions & 0 deletions src/Marten/Events/IReadOnlyEventStoreOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ public interface IReadOnlyEventStoreOptions
/// </summary>
bool UseMandatoryStreamTypeDeclaration { get; set; }

/// <summary>
/// Enables a background monitor to detect if the advisory lock is lost due to database restart or fail-over. Prevents situations where concurrent running of async daemons may occur on system recovery.
/// Only relevant when using the async daemon in HotCold mode. Enabled by default.
/// </summary>
/// <remarks>
/// This will show up as a SELECT SLEEP query with a 60-second sleep interval. This does not add any additional load to your database, regardless of what your monitoring tools might say.
/// </remarks>
public bool UseMonitoredAdvisoryLock { get; set; }

/// <summary>
/// Opt into different aliasing styles for .NET event types
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Marten/Marten.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<!-- This is forced by Npgsql peer dependency -->
<PackageReference Include="Npgsql.Json.NET" Version="9.0.4" />
<PackageReference Include="Weasel.Postgresql" Version="8.4.2" />
<PackageReference Include="Weasel.Postgresql" Version="8.4.3" />
</ItemGroup>


Expand Down
Loading