diff --git a/Directory.Build.props b/Directory.Build.props index 994f9229d4..c2016bcedd 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,7 +1,7 @@ - 9.0.0-alpha.4 + 9.0.0-alpha.5 13.0 Jeremy D. Miller;Babu Annamalai;Jaedyn Tonee https://martendb.io/logo.png diff --git a/Directory.Packages.props b/Directory.Packages.props index c660c49fc5..b9c3dc9bee 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -13,13 +13,13 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive - + @@ -66,8 +66,8 @@ - - + + diff --git a/src/Marten/Events/Daemon/Coordination/IProjectionDistributor.cs b/src/Marten/Events/Daemon/Coordination/IProjectionDistributor.cs deleted file mode 100644 index 5b6aed229c..0000000000 --- a/src/Marten/Events/Daemon/Coordination/IProjectionDistributor.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -namespace Marten.Events.Daemon.Coordination; - -public interface IProjectionDistributor : IAsyncDisposable -{ - ValueTask> BuildDistributionAsync(); - Task RandomWait(CancellationToken token); - - bool HasLock(IProjectionSet set); - Task TryAttainLockAsync(IProjectionSet set, CancellationToken token); - - Task ReleaseLockAsync(IProjectionSet set); - - Task ReleaseAllLocks(); -} diff --git a/src/Marten/Events/Daemon/Coordination/IProjectionSet.cs b/src/Marten/Events/Daemon/Coordination/IProjectionSet.cs deleted file mode 100644 index 5820764c44..0000000000 --- a/src/Marten/Events/Daemon/Coordination/IProjectionSet.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System.Collections.Generic; -using JasperFx.Events.Daemon; -using JasperFx.Events.Projections; -using Marten.Storage; - -namespace Marten.Events.Daemon.Coordination; - -public interface IProjectionSet -{ - int LockId { get; } - IMartenDatabase Database { get; } - IProjectionDaemon BuildDaemon(); - IReadOnlyList Names { get; } -} diff --git a/src/Marten/Events/Daemon/Coordination/MultiTenantedProjectionDistributor.cs b/src/Marten/Events/Daemon/Coordination/MultiTenantedProjectionDistributor.cs deleted file mode 100644 index 1ba4bcd98d..0000000000 --- a/src/Marten/Events/Daemon/Coordination/MultiTenantedProjectionDistributor.cs +++ /dev/null @@ -1,85 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using JasperFx.Core; -using Marten.Storage; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Abstractions; -using Weasel.Postgresql; - -namespace Marten.Events.Daemon.Coordination; - -public class MultiTenantedProjectionDistributor: IProjectionDistributor -{ - private readonly DocumentStore _store; - private readonly Cache _locks; - - public MultiTenantedProjectionDistributor(DocumentStore store) - { - _store = store; - - var logger = _store.Options.LogFactory?.CreateLogger() ?? - _store.Options.DotNetLogger ?? NullLogger.Instance; - - _locks = new(db => new AdvisoryLock(((MartenDatabase)db).DataSource, logger, db.Id.Identity, new AdvisoryLockOptions { LockMonitoringEnabled = store.Options.Events.UseMonitoredAdvisoryLock, TransactionalLockEnabled = store.Options.Events.UseAdvisoryLockTransaction })); - } - - public async ValueTask DisposeAsync() - { - foreach (var advisoryLock in _locks) - { - await advisoryLock.DisposeAsync().ConfigureAwait(false); - } - } - - public async ValueTask> BuildDistributionAsync() - { - var databases = await _store.Storage.AllDatabases().ConfigureAwait(false); - return databases.OfType().Select(db => - { - var projectionOptions = _store.Options.Projections; - return new ProjectionSet(projectionOptions.DaemonLockId, _store, db, - projectionOptions.AllShards().Select(x => x.Name).ToList()); - }).OrderBy(x => Random.Shared.NextDouble()).ToList(); - } - - public virtual Task RandomWait(CancellationToken token) - { - return Task.Delay(Random.Shared.Next(0, 500).Milliseconds(), token); - } - - public bool HasLock(IProjectionSet set) - { - return _locks[set.Database].HasLock(set.LockId); - } - - public Task TryAttainLockAsync(IProjectionSet set, CancellationToken token) - { - return _locks[set.Database].TryAttainLockAsync(set.LockId, token); - } - - public Task ReleaseLockAsync(IProjectionSet set) - { - return _locks[set.Database].ReleaseLockAsync(set.LockId); - } - - public async Task ReleaseAllLocks() - { - foreach (var @lock in _locks) - { - try - { - await @lock.DisposeAsync().ConfigureAwait(false); - } - catch (Exception) - { - // Need to swallow shutdown failures. It's from the advisory locks hanging - // in test harnesses - } - } - - _locks.ClearAll(); - } -} diff --git a/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs b/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs index 42366295ff..9da0d75455 100644 --- a/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs +++ b/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs @@ -10,7 +10,9 @@ using JasperFx.Events.Projections; using Marten.Storage; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using Polly; +using Weasel.Postgresql; namespace Marten.Events.Daemon.Coordination; @@ -40,21 +42,7 @@ public ProjectionCoordinator(IDocumentStore documentStore, ILogger> allShards = () => projections.AllShards().Select(x => x.Name); + + Func, int, IProjectionSet> setFactory = + (db, names, lockId) => new ProjectionSet(lockId, (MartenDatabase)db, names); + + Func>> allDatabases = async () => + { + var databases = await store.Storage.AllDatabases().ConfigureAwait(false); + return databases.OfType().ToList(); + }; + + switch (projections.AsyncMode) + { + case DaemonMode.Solo: + return new SoloProjectionDistributor(allDatabases, allShards, setFactory, baseLockId); + + case DaemonMode.HotCold: + var lockFactory = buildLockFactory(store); + if (store.Options.Tenancy is DefaultTenancy) + { + return new SingleTenantProjectionDistributor( + () => (IProjectionDatabase)store.Storage.Database, + allShards, lockFactory, setFactory, + store.Options.EventGraph.DatabaseSchemaName, baseLockId); + } + + return new MultiTenantedProjectionDistributor(allDatabases, allShards, lockFactory, setFactory, + baseLockId); + + default: + return null; + } + } + + private static Func buildLockFactory(DocumentStore store) + { + ILogger logger = store.Options.LogFactory?.CreateLogger() ?? + store.Options.DotNetLogger ?? NullLogger.Instance; + + return db => new AdvisoryLock(((MartenDatabase)db).DataSource, logger, ((MartenDatabase)db).Id.Identity, + new AdvisoryLockOptions + { + LockMonitoringEnabled = store.Options.Events.UseMonitoredAdvisoryLock, + TransactionalLockEnabled = store.Options.Events.UseAdvisoryLockTransaction + }); + } + public DaemonMode Mode { get; } public DocumentStore Store { get; } diff --git a/src/Marten/Events/Daemon/Coordination/ProjectionSet.cs b/src/Marten/Events/Daemon/Coordination/ProjectionSet.cs index 1d816c785e..6d2454706e 100644 --- a/src/Marten/Events/Daemon/Coordination/ProjectionSet.cs +++ b/src/Marten/Events/Daemon/Coordination/ProjectionSet.cs @@ -7,23 +7,16 @@ namespace Marten.Events.Daemon.Coordination; public class ProjectionSet: IProjectionSet { - private readonly DocumentStore _store; private readonly MartenDatabase _database; - public ProjectionSet(int lockId, DocumentStore store, MartenDatabase database, IReadOnlyList names) + public ProjectionSet(int lockId, MartenDatabase database, IReadOnlyList names) { - _store = store; LockId = lockId; _database = database; Names = names; } public int LockId { get; } - public IMartenDatabase Database => _database; - public IProjectionDaemon BuildDaemon() - { - return _database.StartProjectionDaemon(_store); - } - + public IProjectionDatabase Database => _database; public IReadOnlyList Names { get; } } diff --git a/src/Marten/Events/Daemon/Coordination/SingleTenantProjectionDistributor.cs b/src/Marten/Events/Daemon/Coordination/SingleTenantProjectionDistributor.cs deleted file mode 100644 index 2cd5871b1f..0000000000 --- a/src/Marten/Events/Daemon/Coordination/SingleTenantProjectionDistributor.cs +++ /dev/null @@ -1,102 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using JasperFx.Core; -using Marten.Storage; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Abstractions; -using Weasel.Postgresql; - -namespace Marten.Events.Daemon.Coordination; - -public class SingleTenantProjectionDistributor : IProjectionDistributor -{ - private readonly DocumentStore _store; - private readonly Cache _locks; - - public SingleTenantProjectionDistributor(DocumentStore store) - { - _store = store; - - var logger = _store.Options.LogFactory?.CreateLogger() ?? - _store.Options.DotNetLogger ?? NullLogger.Instance; - - _locks = new(db => new AdvisoryLock(((MartenDatabase)db).DataSource, logger, db.Id.Identity, new AdvisoryLockOptions { LockMonitoringEnabled = store.Options.Events.UseMonitoredAdvisoryLock, TransactionalLockEnabled = store.Options.Events.UseAdvisoryLockTransaction })); - } - - public ValueTask> BuildDistributionAsync() - { - var database = _store.Storage.Database; - var projectionShards = _store.Options.Projections.AllShards(); - - IReadOnlyList sets = projectionShards.Select(shard => - { - // Make deterministic for each projection name - var lockId = - Math.Abs($"{_store.Options.EventGraph.DatabaseSchemaName}:{shard.Name.Identity}" - .GetDeterministicHashCode()) + _store.Options.Projections.DaemonLockId; - - return new ProjectionSet(lockId, _store, (MartenDatabase)database, - new[] { shard.Name }); - }).OrderBy(x => Random.Shared.NextDouble()).ToList(); - - return ValueTask.FromResult(sets); - } - - public virtual Task RandomWait(CancellationToken token) - { - return Task.Delay(Random.Shared.Next(0, 500).Milliseconds(), token); - } - - public async ValueTask DisposeAsync() - { - foreach (var advisoryLock in _locks) - { - try - { - await advisoryLock.DisposeAsync().ConfigureAwait(false); - } - catch (Exception e) - { - var logger = _store.Options.LogFactory?.CreateLogger() ?? - _store.Options.DotNetLogger ?? NullLogger.Instance; - - logger.LogError(e, "Error while trying to dispose SingleTenantProjectionDistributor"); - } - } - } - - public bool HasLock(IProjectionSet set) - { - return _locks[set.Database].HasLock(set.LockId); - } - - public Task TryAttainLockAsync(IProjectionSet set, CancellationToken token) - { - return _locks[set.Database].TryAttainLockAsync(set.LockId, token); - } - - public Task ReleaseLockAsync(IProjectionSet set) - { - return _locks[set.Database].ReleaseLockAsync(set.LockId); - } - - public async Task ReleaseAllLocks() - { - foreach (var @lock in _locks) - { - try - { - await @lock.DisposeAsync().ConfigureAwait(false); - } - catch (Exception) - { - // We need to swallow any exceptions here - } - } - - _locks.ClearAll(); - } -} diff --git a/src/Marten/Events/Daemon/Coordination/SoloProjectionDistributor.cs b/src/Marten/Events/Daemon/Coordination/SoloProjectionDistributor.cs deleted file mode 100644 index 06bc806b96..0000000000 --- a/src/Marten/Events/Daemon/Coordination/SoloProjectionDistributor.cs +++ /dev/null @@ -1,58 +0,0 @@ -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Marten.Storage; - -namespace Marten.Events.Daemon.Coordination; - -public class SoloProjectionDistributor: IProjectionDistributor -{ - private readonly DocumentStore _store; - - public SoloProjectionDistributor(DocumentStore store) - { - _store = store; - } - - public ValueTask DisposeAsync() - { - return new ValueTask(); - } - - public async ValueTask> BuildDistributionAsync() - { - var databases = await _store.Storage.AllDatabases().ConfigureAwait(false); - return databases.OfType().Select(db => - { - var projectionOptions = _store.Options.Projections; - return new ProjectionSet(projectionOptions.DaemonLockId, _store, db, - projectionOptions.AllShards().Select(x => x.Name).ToList()); - }).ToList(); - } - - public Task RandomWait(CancellationToken token) - { - return Task.CompletedTask; - } - - public bool HasLock(IProjectionSet set) - { - return true; - } - - public Task TryAttainLockAsync(IProjectionSet set, CancellationToken token) - { - return Task.FromResult(true); - } - - public Task ReleaseLockAsync(IProjectionSet set) - { - return Task.CompletedTask; - } - - public Task ReleaseAllLocks() - { - return Task.CompletedTask; - } -} diff --git a/src/Marten/Storage/MartenDatabase.cs b/src/Marten/Storage/MartenDatabase.cs index a5c112eca5..e4ba81e7e5 100644 --- a/src/Marten/Storage/MartenDatabase.cs +++ b/src/Marten/Storage/MartenDatabase.cs @@ -20,7 +20,7 @@ namespace Marten.Storage; -public partial class MartenDatabase: PostgresqlDatabase, IMartenDatabase +public partial class MartenDatabase: PostgresqlDatabase, IMartenDatabase, IProjectionDatabase { private readonly StorageFeatures _features; @@ -49,6 +49,11 @@ string identifier public StoreOptions Options { get; } + // #4500-family dedupe: IProjectionDatabase contract for the lifted JasperFx.Events + // projection distributors. Identifier comes from the Weasel DatabaseBase; the URI is + // the canonical descriptor URI used for telemetry — never hit on the daemon hot path. + Uri IProjectionDatabase.DatabaseUri => Describe().DatabaseUri(); + public ISequences Sequences => _sequences.Value; public IProviderGraph Providers { get; }