Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<Project>
<PropertyGroup>
<Version>9.0.0-alpha.4</Version>
<Version>9.0.0-alpha.5</Version>
<LangVersion>13.0</LangVersion>
<Authors>Jeremy D. Miller;Babu Annamalai;Jaedyn Tonee</Authors>
<PackageIconUrl>https://martendb.io/logo.png</PackageIconUrl>
Expand Down
12 changes: 6 additions & 6 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
<PackageVersion Include="FluentAssertions" Version="6.12.0" />
<PackageVersion Include="FSharp.Core" Version="9.0.100" />
<PackageVersion Include="FSharp.SystemTextJson" Version="1.3.13" />
<PackageVersion Include="JasperFx" Version="2.0.0-alpha.17" />
<PackageVersion Include="JasperFx.Events" Version="2.0.0-alpha.16" />
<PackageVersion Include="JasperFx.Events.SourceGenerator" Version="2.0.0-alpha.9">
<PackageVersion Include="JasperFx" Version="2.0.0-alpha.20" />
<PackageVersion Include="JasperFx.Events" Version="2.0.0-alpha.20" />
<PackageVersion Include="JasperFx.Events.SourceGenerator" Version="2.0.0-alpha.12">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageVersion>
<PackageVersion Include="JasperFx.SourceGeneration" Version="2.0.0-alpha.6" />
<PackageVersion Include="JasperFx.SourceGeneration" Version="2.0.0-alpha.9" />
<PackageVersion Include="Jil" Version="3.0.0-alpha2" />
<PackageVersion Include="Lamar" Version="7.1.1" />
<PackageVersion Include="Lamar.Microsoft.DependencyInjection" Version="15.0.0" />
Expand Down Expand Up @@ -66,8 +66,8 @@
<PackageVersion Include="Swashbuckle.AspNetCore" Version="6.5.0" />
<PackageVersion Include="System.IO.Hashing" Version="10.0.3" />
<PackageVersion Include="Vogen" Version="7.0.0" />
<PackageVersion Include="Weasel.EntityFrameworkCore" Version="9.0.0-alpha.6" />
<PackageVersion Include="Weasel.Postgresql" Version="9.0.0-alpha.6" />
<PackageVersion Include="Weasel.EntityFrameworkCore" Version="9.0.0-alpha.7" />
<PackageVersion Include="Weasel.Postgresql" Version="9.0.0-alpha.7" />
<PackageVersion Include="WolverineFx.Marten" Version="4.2.0" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.1.5" />
Expand Down
19 changes: 0 additions & 19 deletions src/Marten/Events/Daemon/Coordination/IProjectionDistributor.cs

This file was deleted.

14 changes: 0 additions & 14 deletions src/Marten/Events/Daemon/Coordination/IProjectionSet.cs

This file was deleted.

This file was deleted.

75 changes: 60 additions & 15 deletions src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
using JasperFx.Events.Projections;
using Marten.Storage;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Polly;
using Weasel.Postgresql;

namespace Marten.Events.Daemon.Coordination;

Expand Down Expand Up @@ -40,21 +42,7 @@ public ProjectionCoordinator(IDocumentStore documentStore, ILogger<ProjectionCoo

Mode = store.Options.Projections.AsyncMode;

if (store.Options.Projections.AsyncMode == DaemonMode.Solo)
{
Distributor = new SoloProjectionDistributor(store);
}
else if (store.Options.Projections.AsyncMode == DaemonMode.HotCold)
{
if (store.Options.Tenancy is DefaultTenancy)
{
Distributor = new SingleTenantProjectionDistributor(store);
}
else
{
Distributor = new MultiTenantedProjectionDistributor(store);
}
}
Distributor = BuildDistributor(store);

_options = store.Options;
_logger = logger;
Expand All @@ -63,6 +51,63 @@ public ProjectionCoordinator(IDocumentStore documentStore, ILogger<ProjectionCoo
Store = store;
}

// 9.0 (#4349 dedupe): the Solo / SingleTenant / MultiTenanted distributors now live in
// JasperFx.Events. Marten wires them with closures over its own tenancy, shard, and lock
// surfaces. ProjectionSet (Marten-side) remains the IProjectionSet implementation, and the
// Postgres lock factory hands back Weasel's AdvisoryLock — which implements
// JasperFx.Events.Daemon.IAdvisoryLock directly as of Weasel 9.0.0-alpha.7.
private static IProjectionDistributor BuildDistributor(DocumentStore store)
{
var projections = store.Options.Projections;
var baseLockId = projections.DaemonLockId;

Func<IEnumerable<ShardName>> allShards = () => projections.AllShards().Select(x => x.Name);

Func<IProjectionDatabase, IReadOnlyList<ShardName>, int, IProjectionSet> setFactory =
(db, names, lockId) => new ProjectionSet(lockId, (MartenDatabase)db, names);

Func<ValueTask<IReadOnlyList<IProjectionDatabase>>> allDatabases = async () =>
{
var databases = await store.Storage.AllDatabases().ConfigureAwait(false);
return databases.OfType<IProjectionDatabase>().ToList();
};

switch (projections.AsyncMode)
{
case DaemonMode.Solo:
return new SoloProjectionDistributor(allDatabases, allShards, setFactory, baseLockId);

case DaemonMode.HotCold:
var lockFactory = buildLockFactory(store);
if (store.Options.Tenancy is DefaultTenancy)
{
return new SingleTenantProjectionDistributor(
() => (IProjectionDatabase)store.Storage.Database,
allShards, lockFactory, setFactory,
store.Options.EventGraph.DatabaseSchemaName, baseLockId);
}

return new MultiTenantedProjectionDistributor(allDatabases, allShards, lockFactory, setFactory,
baseLockId);

default:
return null;
}
}

private static Func<IProjectionDatabase, IAdvisoryLock> buildLockFactory(DocumentStore store)
{
ILogger logger = store.Options.LogFactory?.CreateLogger<AdvisoryLock>() ??
store.Options.DotNetLogger ?? NullLogger<AdvisoryLock>.Instance;

return db => new AdvisoryLock(((MartenDatabase)db).DataSource, logger, ((MartenDatabase)db).Id.Identity,
new AdvisoryLockOptions
{
LockMonitoringEnabled = store.Options.Events.UseMonitoredAdvisoryLock,
TransactionalLockEnabled = store.Options.Events.UseAdvisoryLockTransaction
});
}

public DaemonMode Mode { get; }

public DocumentStore Store { get; }
Expand Down
11 changes: 2 additions & 9 deletions src/Marten/Events/Daemon/Coordination/ProjectionSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,16 @@ namespace Marten.Events.Daemon.Coordination;

public class ProjectionSet: IProjectionSet
{
private readonly DocumentStore _store;
private readonly MartenDatabase _database;

public ProjectionSet(int lockId, DocumentStore store, MartenDatabase database, IReadOnlyList<ShardName> names)
public ProjectionSet(int lockId, MartenDatabase database, IReadOnlyList<ShardName> names)
{
_store = store;
LockId = lockId;
_database = database;
Names = names;
}

public int LockId { get; }
public IMartenDatabase Database => _database;
public IProjectionDaemon BuildDaemon()
{
return _database.StartProjectionDaemon(_store);
}

public IProjectionDatabase Database => _database;
public IReadOnlyList<ShardName> Names { get; }
}

This file was deleted.

Loading
Loading