Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class MultiTenantContext : IAsyncLifetime
protected string tenant3ConnectionString;
protected string tenant4ConnectionString;
protected IHost theOriginalHost;
internal ProjectionAgents theProjectionAgents;
internal EventSubscriptionAgentFamily theDistributor;

public MultiTenantContext(ITestOutputHelper output)
{
Expand Down Expand Up @@ -69,7 +69,7 @@ public async Task InitializeAsync()

theOriginalHost = await startHostAsync();

theProjectionAgents = theOriginalHost.Services.GetServices<IAgentFamily>().OfType<ProjectionAgents>().Single();
theDistributor = theOriginalHost.Services.GetServices<IAgentFamily>().OfType<EventSubscriptionAgentFamily>().Single();
}

public async Task DisposeAsync()
Expand Down Expand Up @@ -185,6 +185,6 @@ private async Task shutdownHostAsync(IHost host)
protected Uri[] runningSubscriptions(IHost host)
{
var runtime = host.Services.GetRequiredService<IWolverineRuntime>();
return runtime.Agents.AllRunningAgentUris().Where(x => x.Scheme == ProjectionAgents.SchemeName).ToArray();
return runtime.Agents.AllRunningAgentUris().Where(x => x.Scheme == EventSubscriptionAgentFamily.SchemeName).ToArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public abstract class SingleTenantContext : IAsyncLifetime
private readonly List<IHost> _hosts = new();
private readonly ITestOutputHelper _output;
protected IHost theOriginalHost;
internal ProjectionAgents theProjectionAgents;
internal EventSubscriptionAgentFamily theProjectionAgents;

public SingleTenantContext(ITestOutputHelper output)
{
Expand Down Expand Up @@ -99,7 +99,7 @@ protected async Task<IHost> startHostAsync()

_hosts.Add(host);

theProjectionAgents ??= host.Services.GetServices<IAgentFamily>().OfType<ProjectionAgents>().Single();
theProjectionAgents ??= host.Services.GetServices<IAgentFamily>().OfType<EventSubscriptionAgentFamily>().Single();

return host;
}
Expand Down Expand Up @@ -134,7 +134,7 @@ protected async Task<IHost> startGreenHostAsync()

_hosts.Add(host);

theProjectionAgents ??= host.Services.GetServices<IAgentFamily>().OfType<ProjectionAgents>().Single();
theProjectionAgents ??= host.Services.GetServices<IAgentFamily>().OfType<EventSubscriptionAgentFamily>().Single();

return host;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public async Task start_with_multiple_databases_on_one_single_node()

await theOriginalHost.WaitUntilAssignmentsChangeTo(w =>
{
w.AgentScheme = theProjectionAgents.Scheme;
w.AgentScheme = theDistributor.Scheme;

// 3 projections x 2 databases = 6 total
w.ExpectRunningAgents(theOriginalHost, 6);
Expand All @@ -31,12 +31,12 @@ public async Task spread_databases_out_via_host()
{
await tenancy.AddDatabaseRecordAsync("tenant1", tenant1ConnectionString);
await tenancy.AddDatabaseRecordAsync("tenant2", tenant2ConnectionString);
await tenancy.AddDatabaseRecordAsync("tenant3", tenant2ConnectionString);
await tenancy.AddDatabaseRecordAsync("tenant4", tenant2ConnectionString);
await tenancy.AddDatabaseRecordAsync("tenant3", tenant3ConnectionString);
await tenancy.AddDatabaseRecordAsync("tenant4", tenant4ConnectionString);

await theOriginalHost.WaitUntilAssignmentsChangeTo(w =>
{
w.AgentScheme = theProjectionAgents.Scheme;
w.AgentScheme = theDistributor.Scheme;

// 3 projections x 2 databases = 6 total
w.ExpectRunningAgents(theOriginalHost, 12);
Expand All @@ -59,7 +59,7 @@ await theOriginalHost.WaitUntilAssignmentsChangeTo(w =>

await theOriginalHost.WaitUntilAssignmentsChangeTo(w =>
{
w.AgentScheme = theProjectionAgents.Scheme;
w.AgentScheme = theDistributor.Scheme;

// 3 projections x 2 databases = 6 total
w.ExpectRunningAgents(theOriginalHost, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ public async Task find_all_known_agents()
var uris = await theProjectionAgents.AllKnownAgentsAsync();

uris.Count.ShouldBe(3);
uris.ShouldContain(ProjectionAgents.UriFor("Marten", "Day:All"));
uris.ShouldContain(ProjectionAgents.UriFor("Marten", "Trip:All"));
uris.ShouldContain(ProjectionAgents.UriFor("Marten", "Distance:All"));

uris.OrderBy(x => x.ToString()).ShouldBe([
new Uri("event-subscriptions://main:marten@localhost.postgres/day/all"),
new Uri("event-subscriptions://main:marten@localhost.postgres/distance/all"),
new Uri("event-subscriptions://main:marten@localhost.postgres/trip/all"),

]);

}

[Fact]
Expand All @@ -39,7 +44,7 @@ public async Task everything_is_started_up_on_one_node()

await theOriginalHost.WaitUntilAssignmentsChangeTo(w =>
{
w.AgentScheme = ProjectionAgents.SchemeName;
w.AgentScheme = EventSubscriptionAgentFamily.SchemeName;
w.ExpectRunningAgents(theOriginalHost, 3);
}, 30.Seconds());
}
Expand All @@ -55,7 +60,7 @@ public async Task spread_out_over_multiple_hosts()
// Now, let's check that the load is redistributed!
await theOriginalHost.WaitUntilAssignmentsChangeTo(w =>
{
w.AgentScheme = ProjectionAgents.SchemeName;
w.AgentScheme = EventSubscriptionAgentFamily.SchemeName;
w.ExpectRunningAgents(theOriginalHost, 1);
w.ExpectRunningAgents(host2, 1);
w.ExpectRunningAgents(host3, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public async Task spin_up_single_blue_and_single_green_host()

await theOriginalHost.WaitUntilAssignmentsChangeTo(w =>
{
w.AgentScheme = ProjectionAgents.SchemeName;
w.AgentScheme = EventSubscriptionAgentFamily.SchemeName;
w.ExpectRunningAgents(theOriginalHost, 3);
w.ExpectRunningAgents(greenHost, 3);
}, 30.Seconds());
Expand Down
17 changes: 0 additions & 17 deletions src/Persistence/MartenTests/Distribution/parsing_uris.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using JasperFx.Core.Reflection;
using JasperFx.Events.Subscriptions;
using Marten;
using Marten.Events.Daemon.Coordination;
using Marten.Internal;
using Marten.Storage;
using Marten.Subscriptions;
Expand All @@ -14,6 +15,7 @@
using Weasel.Core;
using Weasel.Core.Migrations;
using Weasel.Postgresql;
using Wolverine.Marten.Distribution;
using Wolverine.Marten.Publishing;
using Wolverine.Marten.Subscriptions;
using Wolverine.Persistence;
Expand Down Expand Up @@ -82,6 +84,10 @@ public static MartenServiceCollectionExtensions.MartenStoreExpression<T> Integra

expression.Services.AddType(typeof(IDatabaseSource), typeof(MessageDatabaseDiscovery),
ServiceLifetime.Singleton);

// TODO -- watch the service registrations
expression.Services.AddSingleton<EventSubscriptionAgentFamily>();
expression.Services.AddSingleton<IProjectionCoordinator<T>, WolverineProjectionCoordinator<T>>();

// Limitation is that the wolverine objects go in the same schema

Expand Down
163 changes: 163 additions & 0 deletions src/Persistence/Wolverine.Marten/Distribution/EventStoreAgents.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
using ImTools;
using JasperFx.Core;
using JasperFx.Descriptors;
using JasperFx.Events;
using JasperFx.Events.Daemon;
using JasperFx.Events.Projections;

namespace Wolverine.Marten.Distribution;

internal class EventStoreAgents : IAsyncDisposable
{
private readonly IEventStore _store;
private readonly IObserver<ShardState>[] _observers;
private readonly SemaphoreSlim _daemonLock = new(1);
private ImHashMap<DatabaseId, IProjectionDaemon> _daemons = ImHashMap<DatabaseId, IProjectionDaemon>.Empty;
private readonly List<ShardName> _shardNames = new();

public EventStoreAgents(IEventStore store, IObserver<ShardState>[] observers)
{
_store = store;
_observers = observers ?? [];
Identity = _store.Identity;
}

public EventStoreIdentity Identity { get; }

public async ValueTask DisposeAsync()
{
foreach (var entry in _daemons.Enumerate())
{
try
{
await entry.Value.StopAllAsync();
entry.Value.SafeDispose();
}
catch (Exception e)
{
// TODO -- probably want to log this just in case
}
}
}

public async ValueTask<IProjectionDaemon> FindDaemonAsync(DatabaseId databaseId)
{
if (_daemons.TryFind(databaseId, out var daemon))
{
return daemon;
}

await _daemonLock.WaitAsync();
try
{
// Gotta do the double lock thing
if (_daemons.TryFind(databaseId, out daemon))
{
return daemon;
}

daemon = await _store.BuildProjectionDaemonAsync(databaseId);
foreach (var observer in _observers)
{
// TODO -- do we need to care about un-subscribing?
daemon.Tracker.Subscribe(observer);
}

_daemons = _daemons.AddOrUpdate(databaseId, daemon);
}
finally
{
_daemonLock.Release();
}

return daemon;
}

public async ValueTask<IReadOnlyList<Uri>> SupportedAgentsAsync(CancellationToken cancellation)
{
var list = new List<Uri>();
var usage = await _store.TryCreateUsage(cancellation);
if (usage == null) return list;

// Using this to keep from double dipping
var databaseIds = new List<DatabaseId>();
foreach (var database in usage.Database.Databases)
{
var id = new DatabaseId(database.ServerName, database.DatabaseName);
databaseIds.Add(id);

foreach (var shardName in usage.Subscriptions.Where(x => x.Lifecycle == ProjectionLifecycle.Async).SelectMany(x => x.ShardNames))
{
_shardNames.Fill(shardName);
var uri = EventSubscriptionAgentFamily.UriFor(_store.Identity, id, shardName);
list.Add(uri);
}
}

if (usage.Database.MainDatabase != null)
{
var database = usage.Database.MainDatabase;
var id = new DatabaseId(database.ServerName, database.DatabaseName);
if (!databaseIds.Contains(id))
{
foreach (var shardName in usage.Subscriptions.Where(x => x.Lifecycle == ProjectionLifecycle.Async).SelectMany(x => x.ShardNames))
{
_shardNames.Fill(shardName);
var uri = EventSubscriptionAgentFamily.UriFor(_store.Identity, id, shardName);
list.Add(uri);
}
}
}

return list;
}

public async Task<EventSubscriptionAgent> BuildAgentAsync(Uri uri, DatabaseId databaseId, string shardPath)
{
var shardName = _shardNames.FirstOrDefault(x => x.RelativeUrl == shardPath);
if (shardName == null)
{
throw new ArgumentOutOfRangeException(nameof(shardPath), $"Unable to find a shard with path '{shardPath}'");
}

var daemon = await FindDaemonAsync(databaseId);

return new EventSubscriptionAgent(uri, shardName, daemon);
}

public async Task StartAllAsync(CancellationToken cancellationToken)
{
var usage = await _store.TryCreateUsage(cancellationToken);
if (usage == null)
{
return;
}

foreach (var database in usage.Database.Databases)
{
var id = new DatabaseId(database.ServerName, database.DatabaseName);
var daemon = await FindDaemonAsync(id);

await daemon.StartAllAsync();
}
}

public async Task StopAllAsync(CancellationToken cancellationToken)
{
foreach (var kvEntry in _daemons.Enumerate())
{
var daemon = kvEntry.Value;
await daemon.StopAllAsync();
}
}

public IProjectionDaemon DaemonForMainDatabase()
{
throw new NotSupportedException("This method is not supported with the Wolverine managed projection/subscription distribution");
}

public ValueTask<IProjectionDaemon> DaemonForDatabase(string databaseIdentifier)
{
throw new NotSupportedException("This method is not supported with the Wolverine managed projection/subscription distribution");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using JasperFx;
using JasperFx.Events.Daemon;
using JasperFx.Events.Projections;
using Wolverine.Runtime.Agents;
using ISubscriptionAgent = JasperFx.Events.Daemon.ISubscriptionAgent;

namespace Wolverine.Marten.Distribution;

internal class EventSubscriptionAgent : IAgent
{
private readonly ShardName _shardName;
private readonly IProjectionDaemon _daemon;
private ISubscriptionAgent? _innerAgent;

public EventSubscriptionAgent(Uri uri, ShardName shardName, IProjectionDaemon daemon)
{
_shardName = shardName;
_daemon = daemon;
Uri = uri;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
_innerAgent = await _daemon.StartAgentAsync(_shardName, cancellationToken);
Status = AgentStatus.Running;
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await _daemon.StopAgentAsync(_shardName);
Status = AgentStatus.Stopped;
}

public Uri Uri { get; }

// Be nice for this to get the Paused too
public AgentStatus Status { get; private set; } = AgentStatus.Stopped;
}
Loading
Loading