diff --git a/src/Persistence/MartenTests/Distribution/Support/MultiTenantContext.cs b/src/Persistence/MartenTests/Distribution/Support/MultiTenantContext.cs index b16249e7d..fdca21dcd 100644 --- a/src/Persistence/MartenTests/Distribution/Support/MultiTenantContext.cs +++ b/src/Persistence/MartenTests/Distribution/Support/MultiTenantContext.cs @@ -35,7 +35,7 @@ public class MultiTenantContext : IAsyncLifetime protected string tenant3ConnectionString; protected string tenant4ConnectionString; protected IHost theOriginalHost; - internal ProjectionAgents theProjectionAgents; + internal EventSubscriptionAgentFamily theDistributor; public MultiTenantContext(ITestOutputHelper output) { @@ -69,7 +69,7 @@ public async Task InitializeAsync() theOriginalHost = await startHostAsync(); - theProjectionAgents = theOriginalHost.Services.GetServices().OfType().Single(); + theDistributor = theOriginalHost.Services.GetServices().OfType().Single(); } public async Task DisposeAsync() @@ -185,6 +185,6 @@ private async Task shutdownHostAsync(IHost host) protected Uri[] runningSubscriptions(IHost host) { var runtime = host.Services.GetRequiredService(); - return runtime.Agents.AllRunningAgentUris().Where(x => x.Scheme == ProjectionAgents.SchemeName).ToArray(); + return runtime.Agents.AllRunningAgentUris().Where(x => x.Scheme == EventSubscriptionAgentFamily.SchemeName).ToArray(); } } \ No newline at end of file diff --git a/src/Persistence/MartenTests/Distribution/Support/SingleTenantContext.cs b/src/Persistence/MartenTests/Distribution/Support/SingleTenantContext.cs index 927e9e43b..90470a152 100644 --- a/src/Persistence/MartenTests/Distribution/Support/SingleTenantContext.cs +++ b/src/Persistence/MartenTests/Distribution/Support/SingleTenantContext.cs @@ -27,7 +27,7 @@ public abstract class SingleTenantContext : IAsyncLifetime private readonly List _hosts = new(); private readonly ITestOutputHelper _output; protected IHost theOriginalHost; - internal ProjectionAgents theProjectionAgents; + internal EventSubscriptionAgentFamily theProjectionAgents; public SingleTenantContext(ITestOutputHelper output) { @@ -99,7 +99,7 @@ protected async Task startHostAsync() _hosts.Add(host); - theProjectionAgents ??= host.Services.GetServices().OfType().Single(); + theProjectionAgents ??= host.Services.GetServices().OfType().Single(); return host; } @@ -134,7 +134,7 @@ protected async Task startGreenHostAsync() _hosts.Add(host); - theProjectionAgents ??= host.Services.GetServices().OfType().Single(); + theProjectionAgents ??= host.Services.GetServices().OfType().Single(); return host; } diff --git a/src/Persistence/MartenTests/Distribution/basic_agent_mechanics_multiple_tenants.cs b/src/Persistence/MartenTests/Distribution/basic_agent_mechanics_multiple_tenants.cs index b0659380e..3a73b8ca5 100644 --- a/src/Persistence/MartenTests/Distribution/basic_agent_mechanics_multiple_tenants.cs +++ b/src/Persistence/MartenTests/Distribution/basic_agent_mechanics_multiple_tenants.cs @@ -19,7 +19,7 @@ public async Task start_with_multiple_databases_on_one_single_node() await theOriginalHost.WaitUntilAssignmentsChangeTo(w => { - w.AgentScheme = theProjectionAgents.Scheme; + w.AgentScheme = theDistributor.Scheme; // 3 projections x 2 databases = 6 total w.ExpectRunningAgents(theOriginalHost, 6); @@ -31,12 +31,12 @@ public async Task spread_databases_out_via_host() { await tenancy.AddDatabaseRecordAsync("tenant1", tenant1ConnectionString); await tenancy.AddDatabaseRecordAsync("tenant2", tenant2ConnectionString); - await tenancy.AddDatabaseRecordAsync("tenant3", tenant2ConnectionString); - await tenancy.AddDatabaseRecordAsync("tenant4", tenant2ConnectionString); + await tenancy.AddDatabaseRecordAsync("tenant3", tenant3ConnectionString); + await tenancy.AddDatabaseRecordAsync("tenant4", tenant4ConnectionString); await theOriginalHost.WaitUntilAssignmentsChangeTo(w => { - w.AgentScheme = theProjectionAgents.Scheme; + w.AgentScheme = theDistributor.Scheme; // 3 projections x 2 databases = 6 total w.ExpectRunningAgents(theOriginalHost, 12); @@ -59,7 +59,7 @@ await theOriginalHost.WaitUntilAssignmentsChangeTo(w => await theOriginalHost.WaitUntilAssignmentsChangeTo(w => { - w.AgentScheme = theProjectionAgents.Scheme; + w.AgentScheme = theDistributor.Scheme; // 3 projections x 2 databases = 6 total w.ExpectRunningAgents(theOriginalHost, 3); diff --git a/src/Persistence/MartenTests/Distribution/basic_agent_mechanics_single_tenant.cs b/src/Persistence/MartenTests/Distribution/basic_agent_mechanics_single_tenant.cs index 2f30ae117..672cda624 100644 --- a/src/Persistence/MartenTests/Distribution/basic_agent_mechanics_single_tenant.cs +++ b/src/Persistence/MartenTests/Distribution/basic_agent_mechanics_single_tenant.cs @@ -27,9 +27,14 @@ public async Task find_all_known_agents() var uris = await theProjectionAgents.AllKnownAgentsAsync(); uris.Count.ShouldBe(3); - uris.ShouldContain(ProjectionAgents.UriFor("Marten", "Day:All")); - uris.ShouldContain(ProjectionAgents.UriFor("Marten", "Trip:All")); - uris.ShouldContain(ProjectionAgents.UriFor("Marten", "Distance:All")); + + uris.OrderBy(x => x.ToString()).ShouldBe([ + new Uri("event-subscriptions://main:marten@localhost.postgres/day/all"), + new Uri("event-subscriptions://main:marten@localhost.postgres/distance/all"), + new Uri("event-subscriptions://main:marten@localhost.postgres/trip/all"), + + ]); + } [Fact] @@ -39,7 +44,7 @@ public async Task everything_is_started_up_on_one_node() await theOriginalHost.WaitUntilAssignmentsChangeTo(w => { - w.AgentScheme = ProjectionAgents.SchemeName; + w.AgentScheme = EventSubscriptionAgentFamily.SchemeName; w.ExpectRunningAgents(theOriginalHost, 3); }, 30.Seconds()); } @@ -55,7 +60,7 @@ public async Task spread_out_over_multiple_hosts() // Now, let's check that the load is redistributed! await theOriginalHost.WaitUntilAssignmentsChangeTo(w => { - w.AgentScheme = ProjectionAgents.SchemeName; + w.AgentScheme = EventSubscriptionAgentFamily.SchemeName; w.ExpectRunningAgents(theOriginalHost, 1); w.ExpectRunningAgents(host2, 1); w.ExpectRunningAgents(host3, 1); diff --git a/src/Persistence/MartenTests/Distribution/blue_green_deployment_with_single_tenant.cs b/src/Persistence/MartenTests/Distribution/blue_green_deployment_with_single_tenant.cs index 05b7f407c..0b241c6e8 100644 --- a/src/Persistence/MartenTests/Distribution/blue_green_deployment_with_single_tenant.cs +++ b/src/Persistence/MartenTests/Distribution/blue_green_deployment_with_single_tenant.cs @@ -19,7 +19,7 @@ public async Task spin_up_single_blue_and_single_green_host() await theOriginalHost.WaitUntilAssignmentsChangeTo(w => { - w.AgentScheme = ProjectionAgents.SchemeName; + w.AgentScheme = EventSubscriptionAgentFamily.SchemeName; w.ExpectRunningAgents(theOriginalHost, 3); w.ExpectRunningAgents(greenHost, 3); }, 30.Seconds()); diff --git a/src/Persistence/MartenTests/Distribution/parsing_uris.cs b/src/Persistence/MartenTests/Distribution/parsing_uris.cs deleted file mode 100644 index ceb6a4a69..000000000 --- a/src/Persistence/MartenTests/Distribution/parsing_uris.cs +++ /dev/null @@ -1,17 +0,0 @@ -using Shouldly; -using Wolverine.Marten.Distribution; - -namespace MartenTests.Distribution; - -public class parsing_uris -{ - [Fact] - public void round_trip_uri() - { - var uri = ProjectionAgents.UriFor("db1", "Proj1:V2:All"); - var (databaseName, identity) = ProjectionAgents.Parse(uri); - - databaseName.ShouldBe("db1"); - identity.ShouldBe("Proj1:V2:All"); - } -} \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/AncillaryWolverineOptionsMartenExtensions.cs b/src/Persistence/Wolverine.Marten/AncillaryWolverineOptionsMartenExtensions.cs index f6060e64d..598783b82 100644 --- a/src/Persistence/Wolverine.Marten/AncillaryWolverineOptionsMartenExtensions.cs +++ b/src/Persistence/Wolverine.Marten/AncillaryWolverineOptionsMartenExtensions.cs @@ -4,6 +4,7 @@ using JasperFx.Core.Reflection; using JasperFx.Events.Subscriptions; using Marten; +using Marten.Events.Daemon.Coordination; using Marten.Internal; using Marten.Storage; using Marten.Subscriptions; @@ -14,6 +15,7 @@ using Weasel.Core; using Weasel.Core.Migrations; using Weasel.Postgresql; +using Wolverine.Marten.Distribution; using Wolverine.Marten.Publishing; using Wolverine.Marten.Subscriptions; using Wolverine.Persistence; @@ -82,6 +84,10 @@ public static MartenServiceCollectionExtensions.MartenStoreExpression Integra expression.Services.AddType(typeof(IDatabaseSource), typeof(MessageDatabaseDiscovery), ServiceLifetime.Singleton); + + // TODO -- watch the service registrations + expression.Services.AddSingleton(); + expression.Services.AddSingleton, WolverineProjectionCoordinator>(); // Limitation is that the wolverine objects go in the same schema diff --git a/src/Persistence/Wolverine.Marten/Distribution/EventStoreAgents.cs b/src/Persistence/Wolverine.Marten/Distribution/EventStoreAgents.cs new file mode 100644 index 000000000..563e003d8 --- /dev/null +++ b/src/Persistence/Wolverine.Marten/Distribution/EventStoreAgents.cs @@ -0,0 +1,163 @@ +using ImTools; +using JasperFx.Core; +using JasperFx.Descriptors; +using JasperFx.Events; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; + +namespace Wolverine.Marten.Distribution; + +internal class EventStoreAgents : IAsyncDisposable +{ + private readonly IEventStore _store; + private readonly IObserver[] _observers; + private readonly SemaphoreSlim _daemonLock = new(1); + private ImHashMap _daemons = ImHashMap.Empty; + private readonly List _shardNames = new(); + + public EventStoreAgents(IEventStore store, IObserver[] observers) + { + _store = store; + _observers = observers ?? []; + Identity = _store.Identity; + } + + public EventStoreIdentity Identity { get; } + + public async ValueTask DisposeAsync() + { + foreach (var entry in _daemons.Enumerate()) + { + try + { + await entry.Value.StopAllAsync(); + entry.Value.SafeDispose(); + } + catch (Exception e) + { + // TODO -- probably want to log this just in case + } + } + } + + public async ValueTask FindDaemonAsync(DatabaseId databaseId) + { + if (_daemons.TryFind(databaseId, out var daemon)) + { + return daemon; + } + + await _daemonLock.WaitAsync(); + try + { + // Gotta do the double lock thing + if (_daemons.TryFind(databaseId, out daemon)) + { + return daemon; + } + + daemon = await _store.BuildProjectionDaemonAsync(databaseId); + foreach (var observer in _observers) + { + // TODO -- do we need to care about un-subscribing? + daemon.Tracker.Subscribe(observer); + } + + _daemons = _daemons.AddOrUpdate(databaseId, daemon); + } + finally + { + _daemonLock.Release(); + } + + return daemon; + } + + public async ValueTask> SupportedAgentsAsync(CancellationToken cancellation) + { + var list = new List(); + var usage = await _store.TryCreateUsage(cancellation); + if (usage == null) return list; + + // Using this to keep from double dipping + var databaseIds = new List(); + foreach (var database in usage.Database.Databases) + { + var id = new DatabaseId(database.ServerName, database.DatabaseName); + databaseIds.Add(id); + + foreach (var shardName in usage.Subscriptions.Where(x => x.Lifecycle == ProjectionLifecycle.Async).SelectMany(x => x.ShardNames)) + { + _shardNames.Fill(shardName); + var uri = EventSubscriptionAgentFamily.UriFor(_store.Identity, id, shardName); + list.Add(uri); + } + } + + if (usage.Database.MainDatabase != null) + { + var database = usage.Database.MainDatabase; + var id = new DatabaseId(database.ServerName, database.DatabaseName); + if (!databaseIds.Contains(id)) + { + foreach (var shardName in usage.Subscriptions.Where(x => x.Lifecycle == ProjectionLifecycle.Async).SelectMany(x => x.ShardNames)) + { + _shardNames.Fill(shardName); + var uri = EventSubscriptionAgentFamily.UriFor(_store.Identity, id, shardName); + list.Add(uri); + } + } + } + + return list; + } + + public async Task BuildAgentAsync(Uri uri, DatabaseId databaseId, string shardPath) + { + var shardName = _shardNames.FirstOrDefault(x => x.RelativeUrl == shardPath); + if (shardName == null) + { + throw new ArgumentOutOfRangeException(nameof(shardPath), $"Unable to find a shard with path '{shardPath}'"); + } + + var daemon = await FindDaemonAsync(databaseId); + + return new EventSubscriptionAgent(uri, shardName, daemon); + } + + public async Task StartAllAsync(CancellationToken cancellationToken) + { + var usage = await _store.TryCreateUsage(cancellationToken); + if (usage == null) + { + return; + } + + foreach (var database in usage.Database.Databases) + { + var id = new DatabaseId(database.ServerName, database.DatabaseName); + var daemon = await FindDaemonAsync(id); + + await daemon.StartAllAsync(); + } + } + + public async Task StopAllAsync(CancellationToken cancellationToken) + { + foreach (var kvEntry in _daemons.Enumerate()) + { + var daemon = kvEntry.Value; + await daemon.StopAllAsync(); + } + } + + public IProjectionDaemon DaemonForMainDatabase() + { + throw new NotSupportedException("This method is not supported with the Wolverine managed projection/subscription distribution"); + } + + public ValueTask DaemonForDatabase(string databaseIdentifier) + { + throw new NotSupportedException("This method is not supported with the Wolverine managed projection/subscription distribution"); + } +} \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/Distribution/EventSubscriptionAgent.cs b/src/Persistence/Wolverine.Marten/Distribution/EventSubscriptionAgent.cs new file mode 100644 index 000000000..a2cf5626d --- /dev/null +++ b/src/Persistence/Wolverine.Marten/Distribution/EventSubscriptionAgent.cs @@ -0,0 +1,38 @@ +using JasperFx; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; +using Wolverine.Runtime.Agents; +using ISubscriptionAgent = JasperFx.Events.Daemon.ISubscriptionAgent; + +namespace Wolverine.Marten.Distribution; + +internal class EventSubscriptionAgent : IAgent +{ + private readonly ShardName _shardName; + private readonly IProjectionDaemon _daemon; + private ISubscriptionAgent? _innerAgent; + + public EventSubscriptionAgent(Uri uri, ShardName shardName, IProjectionDaemon daemon) + { + _shardName = shardName; + _daemon = daemon; + Uri = uri; + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + _innerAgent = await _daemon.StartAgentAsync(_shardName, cancellationToken); + Status = AgentStatus.Running; + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + await _daemon.StopAgentAsync(_shardName); + Status = AgentStatus.Stopped; + } + + public Uri Uri { get; } + + // Be nice for this to get the Paused too + public AgentStatus Status { get; private set; } = AgentStatus.Stopped; +} \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/Distribution/EventSubscriptionAgentFamily.cs b/src/Persistence/Wolverine.Marten/Distribution/EventSubscriptionAgentFamily.cs new file mode 100644 index 000000000..4fea98b14 --- /dev/null +++ b/src/Persistence/Wolverine.Marten/Distribution/EventSubscriptionAgentFamily.cs @@ -0,0 +1,92 @@ +using ImTools; +using JasperFx.Core; +using JasperFx.Descriptors; +using JasperFx.Events; +using JasperFx.Events.Projections; +using Wolverine.Runtime; +using Wolverine.Runtime.Agents; + +namespace Wolverine.Marten.Distribution; + +public class EventSubscriptionAgentFamily : IStaticAgentFamily, IAsyncDisposable +{ + public const string SchemeName = "event-subscriptions"; + private ImHashMap _stores = ImHashMap.Empty; + private readonly IObserver[] _observers; + private readonly CancellationTokenSource _cancellation = new(); + + public static Uri UriFor(EventStoreIdentity storeIdentity, DatabaseId databaseId, ShardName name) + { + return new Uri($"{SchemeName}://{storeIdentity}@{databaseId}/{name.RelativeUrl}"); + } + + public EventSubscriptionAgentFamily(IEnumerable stores, IEnumerable> observers) + { + foreach (var store in stores) + { + _stores = _stores.AddOrUpdate(store.Identity.ToString(), new EventStoreAgents(store, _observers)); + } + + _observers = observers.ToArray(); + } + + public string Scheme => SchemeName; + public ValueTask> AllKnownAgentsAsync() + { + return SupportedAgentsAsync(); + } + + public async ValueTask BuildAgentAsync(Uri uri, IWolverineRuntime wolverineRuntime) + { + // First check that we aren't already running this! + + var storeIdentity = uri.UserInfo; + if (_stores.TryFind(storeIdentity, out var store)) + { + var databaseId = DatabaseId.Parse(uri.Host); + var shardPath = uri.AbsolutePath.Trim('/'); + + return await store.BuildAgentAsync(uri, databaseId, shardPath); + } + + throw new AgentStartingException(uri, wolverineRuntime.Options.UniqueNodeId, new ArgumentOutOfRangeException(nameof(uri), "Unknown event projection or subscription")); + } + + public async ValueTask> SupportedAgentsAsync() + { + var list = new List(); + + foreach (var entry in _stores.Enumerate()) + { + var store = entry.Value; + list.AddRange(await store.SupportedAgentsAsync(_cancellation.Token)); + } + + return list; + } + + public ValueTask EvaluateAssignmentsAsync(AssignmentGrid assignments) + { + assignments.DistributeEvenlyWithBlueGreenSemantics(SchemeName); + return new ValueTask(); + } + + public async ValueTask DisposeAsync() + { + foreach (var store in _stores.Enumerate()) + { + await store.Value.DisposeAsync(); + } + } + + internal EventStoreAgents FindStore(EventStoreIdentity identity) + { + if (_stores.TryFind(identity.ToString(), out var store)) + { + return store; + } + + throw new ArgumentOutOfRangeException(nameof(identity), + $"Unknown identity {identity}, known stores are {_stores.Enumerate().Select(x => x.Key).Join(", ")}"); + } +} \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/Distribution/ProjectionAgent.cs b/src/Persistence/Wolverine.Marten/Distribution/ProjectionAgent.cs deleted file mode 100644 index d93dfe76f..000000000 --- a/src/Persistence/Wolverine.Marten/Distribution/ProjectionAgent.cs +++ /dev/null @@ -1,43 +0,0 @@ -using Marten.Events.Daemon.Coordination; -using Wolverine.Runtime.Agents; - -namespace Wolverine.Marten.Distribution; - -internal class ProjectionAgent : IAgent -{ - private readonly string _shardName; - private readonly string _databaseName; - private readonly IProjectionCoordinator _coordinator; - - public ProjectionAgent(Uri uri, IProjectionCoordinator coordinator) - { - (_databaseName, _shardName) = ProjectionAgents.Parse(uri); - _coordinator = coordinator; - - Uri = uri; - } - - public async Task StartAsync(CancellationToken cancellationToken) - { - var daemon = await _coordinator.DaemonForDatabase(_databaseName); - await daemon.StartAgentAsync(_shardName, cancellationToken); - Status = AgentStatus.Started; - } - - public async Task StopAsync(CancellationToken cancellationToken) - { - var daemon = await _coordinator.DaemonForDatabase(_databaseName); - await daemon.StopAgentAsync(_shardName); - Status = AgentStatus.Stopped; - } - - public async Task PauseAsync(CancellationToken cancellationToken) - { - var daemon = await _coordinator.DaemonForDatabase(_databaseName); - await daemon.StopAgentAsync(_shardName); - Status = AgentStatus.Paused; - } - - public Uri Uri { get; } - public AgentStatus Status { get; set; } = AgentStatus.Stopped; -} \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/Distribution/ProjectionAgents.cs b/src/Persistence/Wolverine.Marten/Distribution/ProjectionAgents.cs deleted file mode 100644 index 78697bb6f..000000000 --- a/src/Persistence/Wolverine.Marten/Distribution/ProjectionAgents.cs +++ /dev/null @@ -1,140 +0,0 @@ -using ImTools; -using JasperFx.Core; -using JasperFx.Core.Reflection; -using JasperFx.Events.Daemon; -using JasperFx.Events.Projections; -using Marten; -using Marten.Events.Daemon.Coordination; -using Marten.Storage; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Wolverine.Runtime; -using Wolverine.Runtime.Agents; -using AgentStatus = Wolverine.Runtime.Agents.AgentStatus; - -namespace Wolverine.Marten.Distribution; - -internal class ProjectionAgents : IStaticAgentFamily, IProjectionCoordinator -{ - public const string SchemeName = "event-subscriptions"; - - private readonly IDocumentStore _store; - private readonly IProjectionCoordinator _coordinator; - private ImHashMap _agents = ImHashMap.Empty; - private readonly object _agentLocker = new(); - - public static Uri UriFor(string databaseName, string shardName) - { - return $"{SchemeName}://{databaseName}/{shardName}".ToUri(); - } - - public static (string DatabaseName, string ProjectionName) Parse(Uri uri) - { - if (uri.Scheme != SchemeName) - throw new ArgumentOutOfRangeException(nameof(uri), $"{uri} is not a {SchemeName} Uri"); - - return (uri.Host, uri.Segments.Last().Trim('/')); - - } - - public ProjectionAgents(IDocumentStore store, ILogger logger) - { - _store = store; - _coordinator = new ProjectionCoordinator(store, logger); - } - - public ValueTask> AllKnownAgentsAsync() - { - return SupportedAgentsAsync(); - } - - private IEnumerable allAgentUris(IReadOnlyList databases, ShardName[] shards) - { - foreach (var database in databases) - { - foreach (var shard in shards) - { - yield return UriFor(database.Identifier, shard.Identity); - } - } - } - - public ValueTask BuildAgentAsync(Uri uri, IWolverineRuntime wolverineRuntime) - { - if (_agents.TryFind(uri, out var agent)) - { - return new ValueTask(agent); - } - - lock (_agentLocker) - { - if (_agents.TryFind(uri, out agent)) - { - return new ValueTask(agent); - } - - agent = new ProjectionAgent(uri, _coordinator); - _agents = _agents.AddOrUpdate(uri, agent); - } - - return new ValueTask(agent); - } - - public async ValueTask> SupportedAgentsAsync() - { - var databases = await _store.Storage.AllDatabases(); - var shardNames = _store.As().Options.Projections.AllShards().Select(x => x.Name).ToArray(); - - return allAgentUris(databases, shardNames).ToList(); - } - - public ValueTask EvaluateAssignmentsAsync(AssignmentGrid assignments) - { - assignments.DistributeEvenlyWithBlueGreenSemantics(SchemeName); - return new ValueTask(); - } - - public string Scheme => SchemeName; - - Task IHostedService.StartAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } - - Task IHostedService.StopAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } - - IProjectionDaemon IProjectionCoordinator.DaemonForMainDatabase() - { - return _coordinator.DaemonForMainDatabase(); - } - - ValueTask IProjectionCoordinator.DaemonForDatabase(string databaseIdentifier) - { - return _coordinator.DaemonForDatabase(databaseIdentifier); - } - - async Task IProjectionCoordinator.PauseAsync() - { - var active = _agents.Enumerate().Select(x => x.Value) - .Where(x => x.Status == AgentStatus.Started).ToArray(); - - foreach (var agent in active) - { - await agent.PauseAsync(CancellationToken.None); - } - } - - async Task IProjectionCoordinator.ResumeAsync() - { - var paused = _agents.Enumerate().Select(x => x.Value) - .Where(x => x.Status == AgentStatus.Paused).ToArray(); - - foreach (var agent in paused) - { - await agent.StartAsync(CancellationToken.None); - } - } -} \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/Distribution/WolverineProjectionCoordinator.cs b/src/Persistence/Wolverine.Marten/Distribution/WolverineProjectionCoordinator.cs new file mode 100644 index 000000000..c48c39225 --- /dev/null +++ b/src/Persistence/Wolverine.Marten/Distribution/WolverineProjectionCoordinator.cs @@ -0,0 +1,58 @@ +using JasperFx.Core.Reflection; +using JasperFx.Events; +using JasperFx.Events.Daemon; +using Marten; +using Marten.Events.Daemon.Coordination; + +namespace Wolverine.Marten.Distribution; + +internal class WolverineProjectionCoordinator : WolverineProjectionCoordinator, IProjectionCoordinator where T : IDocumentStore +{ + public WolverineProjectionCoordinator(EventSubscriptionAgentFamily agents, T store) : base(agents, store) + { + } +} + +internal class WolverineProjectionCoordinator : IProjectionCoordinator +{ + private readonly EventSubscriptionAgentFamily _agents; + private readonly EventStoreIdentity _identity; + private readonly EventStoreAgents _storeAgents; + + public WolverineProjectionCoordinator(EventSubscriptionAgentFamily agents, IDocumentStore store) + { + _agents = agents; + _identity = store.As().Identity; + _storeAgents = _agents.FindStore(_identity); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + return _storeAgents.StartAllAsync(cancellationToken); + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return _storeAgents.StopAllAsync(cancellationToken); + } + + public IProjectionDaemon DaemonForMainDatabase() + { + return _storeAgents.DaemonForMainDatabase(); + } + + public ValueTask DaemonForDatabase(string databaseIdentifier) + { + return _storeAgents.DaemonForDatabase(databaseIdentifier); + } + + public Task PauseAsync() + { + return StopAsync(CancellationToken.None); + } + + public Task ResumeAsync() + { + return StartAsync(CancellationToken.None); + } +} \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/Distribution/notes.md b/src/Persistence/Wolverine.Marten/Distribution/notes.md deleted file mode 100644 index 80345ccbb..000000000 --- a/src/Persistence/Wolverine.Marten/Distribution/notes.md +++ /dev/null @@ -1,37 +0,0 @@ -# Scalable Projection Distributor - -## Notes - -* Projections that are revisioned, how do we discover them? -* Make Marten on activation, persist all shard name progress? Discover from there? -* All known supported Uri is passed to `AssignmentGrid`. Probably requires changes to Wolverine, but start from - integration tests w/ CritterStackPro - -## Next Steps - -1. Add `MultipleTenantContext`. Copy paste around code to create databases. Start by making it static -2. Test distribution by database -3. Add `startGreenHostAsync()` to new `TenantContext` with different `TripProjection`, see new versioned projections - running on green even if the original leader is "blue" -4. Unit test hard on assignment grid. Maybe a helper base class for that? -5. Take down nodes, see them move projections -6. Take down leader, see projections move -7. In blue/green with single tenant, take down leader -8. In blue/green with multiple tenant, take down leader -9. With multi-tenancy, add a new database at runtime and see that the agent is started *somewhere*. - -## Critter Stack Pro Use Cases (Phase 1) - -1. Single database, distribute running asynchronous projections evenly across running nodes -2. Multi-tenancy with separate databases, distribute running asynchronous projections by database across running nodes -3. In blue/green deployments, allow for running separate versions of the same named projection on the blue or green - nodes -4. In blue/green deployments, allow for all new projections -4. Discover new tenant databases at runtime and distribute async projection running for the new databases - -## Wolverine Agent Distribution - -* `AssignmentGrid` needs to declare/assert/find what nodes can support nodes. For the blue/green -* For the blue/green deployment, if there is any projection that can only be supported on a subset of nodes, spread - those - evenly. Use `IProjectionDistributor` otherwise diff --git a/src/Persistence/Wolverine.Marten/MartenMessageDatabaseSource.cs b/src/Persistence/Wolverine.Marten/MartenMessageDatabaseSource.cs index 3384f9ca8..59e5c4de5 100644 --- a/src/Persistence/Wolverine.Marten/MartenMessageDatabaseSource.cs +++ b/src/Persistence/Wolverine.Marten/MartenMessageDatabaseSource.cs @@ -162,6 +162,8 @@ private PostgresqlMessageStore createTenantWolverineStore(IMartenDatabase databa { Name = database.Identifier ?? new NpgsqlConnectionStringBuilder(settings.ConnectionString).Database }; + + store.TenantIds.AddRange(database.TenantIds.Distinct()); return store; } diff --git a/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj b/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj index fde96659f..94ed289ac 100644 --- a/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj +++ b/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs b/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs index 578220596..a44e431f4 100644 --- a/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs +++ b/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs @@ -123,9 +123,10 @@ public static MartenServiceCollectionExtensions.MartenConfigurationExpression In if (integration.UseWolverineManagedEventSubscriptionDistribution) { - expression.Services.AddSingleton(); - expression.Services.AddSingleton(s => s.GetRequiredService()); - expression.Services.AddSingleton(s => s.GetRequiredService()); + expression.Services.AddSingleton(); + expression.Services.AddSingleton(); + expression.Services.AddSingleton(s => s.GetRequiredService()); + expression.Services.AddSingleton(); } expression.Services.AddType(typeof(IDatabaseSource), typeof(MessageDatabaseDiscovery), diff --git a/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs b/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs index 6431a142f..e8283e2af 100644 --- a/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs +++ b/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs @@ -40,6 +40,9 @@ internal class PostgresqlMessageStore : MessageDatabase public PostgresqlMessageStore(DatabaseSettings databaseSettings, DurabilitySettings settings, NpgsqlDataSource dataSource, ILogger logger) : this(databaseSettings, settings, GetPrimaryNpgsqlNodeIfPossible(dataSource), logger, Array.Empty()) { + // ReSharper disable once VirtualMemberCallInConstructor + var descriptor = Describe(); + Id = new DatabaseId(descriptor.ServerName, descriptor.DatabaseName); } private static NpgsqlDataSource GetPrimaryNpgsqlNodeIfPossible(NpgsqlDataSource dataSource) @@ -351,6 +354,8 @@ public override DatabaseDescriptor Describe() SubjectUri = SubjectUri, Identifier = Identifier }; + + descriptor.TenantIds.AddRange(TenantIds); descriptor.Properties.Add(OptionsValue.Read(builder, x => x.Host)); descriptor.Properties.Add(OptionsValue.Read(builder, x => x.Port)); diff --git a/src/Persistence/Wolverine.Postgresql/PostgresqlTenantedMessageStore.cs b/src/Persistence/Wolverine.Postgresql/PostgresqlTenantedMessageStore.cs index 47342e26f..3f026f8cc 100644 --- a/src/Persistence/Wolverine.Postgresql/PostgresqlTenantedMessageStore.cs +++ b/src/Persistence/Wolverine.Postgresql/PostgresqlTenantedMessageStore.cs @@ -1,5 +1,6 @@ using ImTools; using JasperFx; +using JasperFx.Core; using JasperFx.Descriptors; using JasperFx.MultiTenancy; using Microsoft.Extensions.Logging; @@ -46,6 +47,8 @@ public async ValueTask FindAsync(string tenantId) var connectionString = await _persistence.ConnectionStringTenancy.FindAsync(tenantId); store = buildTenantStoreForConnectionString(connectionString); } + + store.TenantIds.Fill(tenantId); if (_runtime.Options.AutoBuildMessageStorageOnStartup != AutoCreate.None) { @@ -110,6 +113,7 @@ public async Task RefreshAsync() if (!_stores.Contains(assignment.TenantId)) { var store = buildTenantStoreForConnectionString(assignment.Value); + store.TenantIds.Fill(assignment.TenantId); if (_runtime.Options.AutoBuildMessageStorageOnStartup != AutoCreate.None) { @@ -130,6 +134,7 @@ public async Task RefreshAsync() if (!_stores.Contains(assignment.TenantId)) { var store = buildTenantStoreForDataSource(assignment.Value); + store.TenantIds.Fill(assignment.TenantId); if (_runtime.Options.AutoBuildMessageStorageOnStartup != AutoCreate.None) { diff --git a/src/Persistence/Wolverine.Postgresql/Transport/StickyPostgresqlQueueListenerAgent.cs b/src/Persistence/Wolverine.Postgresql/Transport/StickyPostgresqlQueueListenerAgent.cs index 081388026..1e7c34c97 100644 --- a/src/Persistence/Wolverine.Postgresql/Transport/StickyPostgresqlQueueListenerAgent.cs +++ b/src/Persistence/Wolverine.Postgresql/Transport/StickyPostgresqlQueueListenerAgent.cs @@ -1,3 +1,4 @@ +using JasperFx; using Wolverine.Runtime; using Wolverine.Runtime.Agents; @@ -19,7 +20,7 @@ public StickyPostgresqlQueueListenerAgent(IWolverineRuntime runtime, string queu Uri = new Uri($"{StickyPostgresqlQueueListenerAgentFamily.StickyListenerSchema}://{_queue}/{_databaseName}"); } - public AgentStatus Status { get; set; } = AgentStatus.Started; + public AgentStatus Status { get; set; } = AgentStatus.Running; public async Task StartAsync(CancellationToken cancellationToken) { diff --git a/src/Persistence/Wolverine.Postgresql/Wolverine.Postgresql.csproj b/src/Persistence/Wolverine.Postgresql/Wolverine.Postgresql.csproj index 297045e70..9bbcdc485 100644 --- a/src/Persistence/Wolverine.Postgresql/Wolverine.Postgresql.csproj +++ b/src/Persistence/Wolverine.Postgresql/Wolverine.Postgresql.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs b/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs index e3814ec90..20214710a 100644 --- a/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs +++ b/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs @@ -1,3 +1,4 @@ +using JasperFx; using JasperFx.Blocks; using JasperFx.Core; using JasperFx.Core.Reflection; @@ -64,7 +65,7 @@ public DurabilityAgent(string databaseName, IWolverineRuntime runtime, IMessageD public bool AutoStartScheduledJobPolling { get; set; } = false; - public AgentStatus Status { get; set; } = AgentStatus.Started; + public AgentStatus Status { get; set; } = AgentStatus.Running; public Task StartAsync(CancellationToken cancellationToken) { diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs index d2ecf4f6d..523b06683 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs @@ -61,6 +61,8 @@ protected MessageDatabase(DatabaseSettings databaseSettings, DbDataSource dataSo DataSource = dataSource; var descriptor = Describe(); + + Id = new DatabaseId(descriptor.ServerName, descriptor.DatabaseName); var parts = new List { diff --git a/src/Persistence/Wolverine.RDBMS/Wolverine.RDBMS.csproj b/src/Persistence/Wolverine.RDBMS/Wolverine.RDBMS.csproj index 573403b78..c9ecbc00e 100644 --- a/src/Persistence/Wolverine.RDBMS/Wolverine.RDBMS.csproj +++ b/src/Persistence/Wolverine.RDBMS/Wolverine.RDBMS.csproj @@ -14,7 +14,7 @@ - + diff --git a/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.cs b/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.cs index 626e99967..7651fc1ad 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.cs @@ -1,3 +1,4 @@ +using JasperFx; using JasperFx.Core; using JasperFx.Core.Reflection; using Microsoft.Extensions.Logging; diff --git a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.cs b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.cs index 40c7ec457..6d346b23d 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.cs @@ -30,6 +30,8 @@ public RavenDbMessageStore(IDocumentStore store, WolverineOptions options) public MessageStoreRole Role { get; set; } = MessageStoreRole.Main; + public List TenantIds { get; } = new(); + public void PromoteToMain(IWolverineRuntime runtime) { Role = MessageStoreRole.Main; diff --git a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs index e0c787959..dd8ca6d9a 100644 --- a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs +++ b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs @@ -50,6 +50,10 @@ public SqlServerMessageStore(DatabaseSettings database, DurabilitySettings setti var storage = typeof(DatabaseSagaSchema<,>).CloseAndBuildAs(sagaTableDefinition, _settings, sagaTableDefinition.IdMember.GetMemberType(), sagaTableDefinition.SagaType); _sagaStorage = _sagaStorage.AddOrUpdate(sagaTableDefinition.SagaType, storage); } + + // ReSharper disable once VirtualMemberCallInConstructor + var descriptor = Describe(); + Id = new DatabaseId(descriptor.ServerName, descriptor.DatabaseName); } protected override void writeMessageIdArrayQueryList(DbCommandBuilder builder, Guid[] messageIds) @@ -355,6 +359,8 @@ public override DatabaseDescriptor Describe() SchemaOrNamespace = _settings.SchemaName, SubjectUri = SubjectUri }; + + descriptor.TenantIds.AddRange(TenantIds); descriptor.Properties.Add(OptionsValue.Read(builder, x => x.ApplicationName)); descriptor.Properties.Add(OptionsValue.Read(builder, x => x.Enlist)); diff --git a/src/Persistence/Wolverine.SqlServer/SqlServerTenantedMessageStore.cs b/src/Persistence/Wolverine.SqlServer/SqlServerTenantedMessageStore.cs index c7a08f6bd..d0f81e26c 100644 --- a/src/Persistence/Wolverine.SqlServer/SqlServerTenantedMessageStore.cs +++ b/src/Persistence/Wolverine.SqlServer/SqlServerTenantedMessageStore.cs @@ -1,11 +1,11 @@ using ImTools; using JasperFx; +using JasperFx.Core; using JasperFx.Descriptors; using JasperFx.MultiTenancy; using Microsoft.Extensions.Logging; using Wolverine.Persistence.Durability; using Wolverine.RDBMS; -using Wolverine.RDBMS.MultiTenancy; using Wolverine.RDBMS.Sagas; using Wolverine.Runtime; using Wolverine.SqlServer.Persistence; @@ -45,6 +45,7 @@ public async ValueTask FindAsync(string tenantId) var connectionString = await _persistence.ConnectionStringTenancy!.FindAsync(tenantId); store = buildTenantStoreForConnectionString(connectionString); + store.TenantIds.Fill(tenantId); if (_runtime.Options.AutoBuildMessageStorageOnStartup != AutoCreate.None) { @@ -86,6 +87,7 @@ public async Task RefreshAsync() if (!_stores.Contains(assignment.TenantId)) { var store = buildTenantStoreForConnectionString(assignment.Value); + store.TenantIds.Fill(assignment.TenantId); if (_runtime.Options.AutoBuildMessageStorageOnStartup != AutoCreate.None) { diff --git a/src/Persistence/Wolverine.SqlServer/Transport/SqlServerTransportDatabase.cs b/src/Persistence/Wolverine.SqlServer/Transport/SqlServerTransportDatabase.cs index 943272ffc..cb438102b 100644 --- a/src/Persistence/Wolverine.SqlServer/Transport/SqlServerTransportDatabase.cs +++ b/src/Persistence/Wolverine.SqlServer/Transport/SqlServerTransportDatabase.cs @@ -26,6 +26,10 @@ internal static SqlConnection BuildConnection(IWolverineRuntime runtime) { _transport = runtime.Options.SqlServerTransport(); _runtime = runtime; + + // ReSharper disable once VirtualMemberCallInConstructor + var descriptor = Describe(); + Id = new DatabaseId(descriptor.ServerName, descriptor.DatabaseName); } public override DatabaseDescriptor Describe() @@ -39,6 +43,8 @@ public override DatabaseDescriptor Describe() Subject = GetType().FullNameInCode(), SchemaOrNamespace = _transport.TransportSchemaName }; + + descriptor.TenantIds.AddRange(TenantIds); descriptor.Properties.Add(OptionsValue.Read(builder, x => x.ApplicationName)); descriptor.Properties.Add(OptionsValue.Read(builder, x => x.Enlist)); diff --git a/src/Persistence/Wolverine.SqlServer/Wolverine.SqlServer.csproj b/src/Persistence/Wolverine.SqlServer/Wolverine.SqlServer.csproj index 554a7aed7..99b53bdd1 100644 --- a/src/Persistence/Wolverine.SqlServer/Wolverine.SqlServer.csproj +++ b/src/Persistence/Wolverine.SqlServer/Wolverine.SqlServer.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/Testing/Wolverine.ComplianceTests/FakeAgent.cs b/src/Testing/Wolverine.ComplianceTests/FakeAgent.cs index 2e4a8ce9b..261f40a3a 100644 --- a/src/Testing/Wolverine.ComplianceTests/FakeAgent.cs +++ b/src/Testing/Wolverine.ComplianceTests/FakeAgent.cs @@ -1,3 +1,4 @@ +using JasperFx; using Wolverine.Runtime.Agents; namespace Wolverine.ComplianceTests; @@ -14,7 +15,7 @@ public FakeAgent(Uri uri) public Task StartAsync(CancellationToken cancellationToken) { IsRunning = true; - Status = AgentStatus.Started; + Status = AgentStatus.Running; return Task.CompletedTask; } @@ -25,7 +26,7 @@ public Task StopAsync(CancellationToken cancellationToken) return Task.CompletedTask; } - public AgentStatus Status { get; private set; } = AgentStatus.Started; + public AgentStatus Status { get; private set; } = AgentStatus.Running; public Uri Uri { get; } } \ No newline at end of file diff --git a/src/Transports/SignalR/Wolverine.SignalR/Client/SignalRClientEndpoint.cs b/src/Transports/SignalR/Wolverine.SignalR/Client/SignalRClientEndpoint.cs index e7e30341a..c02f4f70e 100644 --- a/src/Transports/SignalR/Wolverine.SignalR/Client/SignalRClientEndpoint.cs +++ b/src/Transports/SignalR/Wolverine.SignalR/Client/SignalRClientEndpoint.cs @@ -1,4 +1,6 @@ +using System.Diagnostics; using System.Text.Json; +using JasperFx.Core; using Microsoft.AspNetCore.SignalR.Client; using Microsoft.Extensions.Logging; using Wolverine.Configuration; @@ -30,10 +32,12 @@ public SignalRClientEndpoint(Uri uri, SignalRClientTransport parent) : base(Tran IsListener = true; Mode = EndpointMode.Inline; + + // Just to use the same defaults + JsonOptions = new SignalRTransport().JsonOptions; } - public JsonSerializerOptions JsonOptions { get; set; } = new JsonSerializerOptions - { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; + public JsonSerializerOptions JsonOptions { get; set; } public Uri SignalRUri { get; } @@ -68,6 +72,11 @@ internal async Task ReceiveAsync(string json) { if (Receiver == null || _mapper == null) return; + if (json.IsEmpty()) + { + Logger?.LogError(new ArgumentOutOfRangeException(nameof(json)), "Received empty json into the SignalR client"); + } + try { var envelope = new Envelope(); @@ -135,6 +144,7 @@ public async ValueTask SendAsync(Envelope envelope) throw new InvalidOperationException($"SignalR Client {Uri} is not initialized"); var json = _mapper.WriteToString(envelope); - await _connection.InvokeAsync(nameof(WolverineHub.Receive), json); + + await _connection.InvokeAsync(nameof(WolverineHub.ReceiveMessage), json); } } \ No newline at end of file diff --git a/src/Transports/SignalR/Wolverine.SignalR/Internals/SignalRTransport.cs b/src/Transports/SignalR/Wolverine.SignalR/Internals/SignalRTransport.cs index 8aefb2378..4ebc4f0ec 100644 --- a/src/Transports/SignalR/Wolverine.SignalR/Internals/SignalRTransport.cs +++ b/src/Transports/SignalR/Wolverine.SignalR/Internals/SignalRTransport.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using System.Text.Json.Serialization; using JasperFx.Core; using JasperFx.Resources; using Microsoft.AspNetCore.SignalR; @@ -21,6 +22,9 @@ public class SignalRTransport : Endpoint, ITransport, IListener, ISender public SignalRTransport() : base($"{ProtocolName}://wolverine".ToUri(), EndpointRole.Application) { IsListener = true; + + JsonOptions = new(JsonSerializerOptions.Web) { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; + JsonOptions.Converters.Add(new JsonStringEnumConverter()); } protected override ISender CreateSender(IWolverineRuntime runtime) @@ -63,8 +67,7 @@ bool ITransport.TryBuildStatefulResource(IWolverineRuntime runtime, out IStatefu internal ILogger? Logger { get; set; } - public JsonSerializerOptions JsonOptions { get; set; } = new JsonSerializerOptions - { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; + public JsonSerializerOptions JsonOptions { get; set; } public IReceiver? Receiver { get; private set; } @@ -78,6 +81,10 @@ internal async Task ReceiveAsync(HubCallerContext context, string json) "The SignalR Transport has not been initialized. Ensure that there is a WolverineOptions.UseSignalR() call in your configuration"); } + if (Logger?.IsEnabled(LogLevel.Debug) ?? false) + { + Logger.LogDebug("Received JSON from SignalR: {Json} ", json); + } var envelope = new SignalREnvelope(context, HubContext!); _mapper!.MapIncoming(envelope, json); @@ -155,6 +162,11 @@ public ValueTask SendAsync(Envelope envelope) var json = _mapper!.WriteToString(envelope); + if (Logger != null && Logger.IsEnabled(LogLevel.Debug)) + { + Logger.LogDebug("Sent JSON via SignalR: {Json}", json); + } + return new ValueTask(locator.Find(HubContext!).SendAsync(operation, json)); } } diff --git a/src/Transports/SignalR/Wolverine.SignalR/WolverineHub.cs b/src/Transports/SignalR/Wolverine.SignalR/WolverineHub.cs index f4a452758..cc4949560 100644 --- a/src/Transports/SignalR/Wolverine.SignalR/WolverineHub.cs +++ b/src/Transports/SignalR/Wolverine.SignalR/WolverineHub.cs @@ -20,7 +20,8 @@ public WolverineHub(SignalRTransport endpoint) _endpoint = endpoint; } - public Task Receive(string json) + [HubMethodName("ReceiveMessage")] + public Task ReceiveMessage(string json) { return _endpoint.ReceiveAsync(Context, json); } diff --git a/src/Wolverine/Persistence/Durability/DeadLetterEnvelope.cs b/src/Wolverine/Persistence/Durability/DeadLetterEnvelope.cs index 06cf21e31..cd819eb24 100644 --- a/src/Wolverine/Persistence/Durability/DeadLetterEnvelope.cs +++ b/src/Wolverine/Persistence/Durability/DeadLetterEnvelope.cs @@ -35,6 +35,10 @@ bool replayable public Envelope Envelope { get; } public string MessageType { get; } public string ReceivedAt { get; } + + /// + /// Service name of the original sending application + /// public string Source { get; } public string ExceptionType { get; } public string ExceptionMessage { get; } diff --git a/src/Wolverine/Persistence/Durability/IMessageStore.cs b/src/Wolverine/Persistence/Durability/IMessageStore.cs index 1ccc7998d..615bf44f0 100644 --- a/src/Wolverine/Persistence/Durability/IMessageStore.cs +++ b/src/Wolverine/Persistence/Durability/IMessageStore.cs @@ -71,6 +71,11 @@ public interface IMessageStore : IAsyncDisposable /// What is the role of this message store within the application? /// MessageStoreRole Role { get; } + + /// + /// In the case of multi-tenancy, this would hold one or more tenant ids + /// + List TenantIds { get; } /// /// Unique identifier for a message store in case of systems that use multiple message diff --git a/src/Wolverine/Persistence/Durability/MultiTenantedMessageDatabase.Agents.cs b/src/Wolverine/Persistence/Durability/MultiTenantedMessageDatabase.Agents.cs index a44055dea..d119073c0 100644 --- a/src/Wolverine/Persistence/Durability/MultiTenantedMessageDatabase.Agents.cs +++ b/src/Wolverine/Persistence/Durability/MultiTenantedMessageDatabase.Agents.cs @@ -1,3 +1,4 @@ +using JasperFx; using Wolverine.Runtime; using Wolverine.Runtime.Agents; @@ -63,7 +64,7 @@ public Task StopAsync(CancellationToken cancellationToken) return Task.CompletedTask; } - public AgentStatus Status { get; set; } = AgentStatus.Started; + public AgentStatus Status { get; set; } = AgentStatus.Running; public Uri Uri { get; } = new Uri("wolverinedb://"); } diff --git a/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs b/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs index 90bb41d94..42a202071 100644 --- a/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs +++ b/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs @@ -38,6 +38,8 @@ public MultiTenantedMessageStore(IMessageStore main, IWolverineRuntime runtime, Main = main; } + public List TenantIds { get; } = new(); + public void DemoteToAncillary() { Main.DemoteToAncillary(); diff --git a/src/Wolverine/Persistence/Durability/NullMessageStore.cs b/src/Wolverine/Persistence/Durability/NullMessageStore.cs index 66c3af8c7..192527193 100644 --- a/src/Wolverine/Persistence/Durability/NullMessageStore.cs +++ b/src/Wolverine/Persistence/Durability/NullMessageStore.cs @@ -22,6 +22,8 @@ public Task MarkIncomingEnvelopeAsHandledAsync(Envelope envelope) { return Task.CompletedTask; } + + public List TenantIds { get; } = new(); public string Name => "Nullo"; public void PromoteToMain(IWolverineRuntime runtime) diff --git a/src/Wolverine/Runtime/Agents/ExclusiveListenerFamily.cs b/src/Wolverine/Runtime/Agents/ExclusiveListenerFamily.cs index 60712096f..657e03f1c 100644 --- a/src/Wolverine/Runtime/Agents/ExclusiveListenerFamily.cs +++ b/src/Wolverine/Runtime/Agents/ExclusiveListenerFamily.cs @@ -1,3 +1,4 @@ +using JasperFx; using JasperFx.Core; using Wolverine.Configuration; @@ -29,7 +30,7 @@ public async Task StopAsync(CancellationToken cancellationToken) public Uri Uri { get; set; } - public AgentStatus Status { get; set; } = AgentStatus.Started; + public AgentStatus Status { get; set; } = AgentStatus.Running; } internal class ExclusiveListenerFamily : IStaticAgentFamily diff --git a/src/Wolverine/Runtime/Agents/IAgent.cs b/src/Wolverine/Runtime/Agents/IAgent.cs index 30d1fa22b..2d51c5526 100644 --- a/src/Wolverine/Runtime/Agents/IAgent.cs +++ b/src/Wolverine/Runtime/Agents/IAgent.cs @@ -1,4 +1,5 @@ using System; +using JasperFx; using Microsoft.Extensions.Hosting; namespace Wolverine.Runtime.Agents; @@ -44,7 +45,7 @@ public async Task StartAsync(CancellationToken cancellationToken) await agent.StartAsync(cancellationToken); } - Status = AgentStatus.Started; + Status = AgentStatus.Running; } public async Task StopAsync(CancellationToken cancellationToken) @@ -54,17 +55,10 @@ public async Task StopAsync(CancellationToken cancellationToken) await agent.StopAsync(cancellationToken); } - Status = AgentStatus.Started; + Status = AgentStatus.Running ; } public AgentStatus Status { get; private set; } = AgentStatus.Stopped; } -public enum AgentStatus -{ - Started, - Stopped, - Paused -} - #endregion \ No newline at end of file diff --git a/src/Wolverine/Runtime/Agents/IWolverineObserver.cs b/src/Wolverine/Runtime/Agents/IWolverineObserver.cs index 99bc30da5..20dbf06fc 100644 --- a/src/Wolverine/Runtime/Agents/IWolverineObserver.cs +++ b/src/Wolverine/Runtime/Agents/IWolverineObserver.cs @@ -87,6 +87,8 @@ public void MessageRouted(Type messageType, IMessageRouter router) public async Task AssignmentsChanged(AssignmentGrid grid, AgentCommands commands) { + if (!commands.Any()) return; + var records = commands.Select(x => new NodeRecord { NodeNumber = _runtime.Options.Durability.AssignedNodeNumber, diff --git a/src/Wolverine/Runtime/Agents/LeaderPinnedAgentFamily.cs b/src/Wolverine/Runtime/Agents/LeaderPinnedAgentFamily.cs index 5825b3315..4980f87d2 100644 --- a/src/Wolverine/Runtime/Agents/LeaderPinnedAgentFamily.cs +++ b/src/Wolverine/Runtime/Agents/LeaderPinnedAgentFamily.cs @@ -1,3 +1,4 @@ +using JasperFx; using JasperFx.Core; using Wolverine.Configuration; @@ -29,7 +30,7 @@ public async Task StopAsync(CancellationToken cancellationToken) public Uri Uri { get; set; } - public AgentStatus Status { get; set; } = AgentStatus.Started; + public AgentStatus Status { get; set; } = AgentStatus.Running; } public class LeaderPinnedListenerFamily : IStaticAgentFamily diff --git a/src/Wolverine/Runtime/Agents/NodeAgentController.cs b/src/Wolverine/Runtime/Agents/NodeAgentController.cs index 5f2361929..f3c595521 100644 --- a/src/Wolverine/Runtime/Agents/NodeAgentController.cs +++ b/src/Wolverine/Runtime/Agents/NodeAgentController.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using JasperFx; using JasperFx.Core; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -197,7 +198,7 @@ public async Task StopAgentAsync(Uri agentUri) public Uri[] AllRunningAgentUris() { - return Agents.Where(x => x.Value.Status == AgentStatus.Started).Select(x => x.Key).ToArray(); + return Agents.Where(x => x.Value.Status != AgentStatus.Stopped).Select(x => x.Key).ToArray(); } /// diff --git a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs index c48ca039d..380d1801e 100644 --- a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs +++ b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs @@ -97,6 +97,13 @@ public void ConfigureHandlerForMessage(Type messageType, Action co internal void AddMessageHandler(Type messageType, IMessageHandler handler) { + // Makes error handling and other configuration work cleanly + if (handler is MessageHandler h) + { + var chain = new HandlerChain(messageType, this); + h.Chain = chain; + } + _handlers = _handlers.AddOrUpdate(messageType, handler); RegisterMessageType(messageType); } diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index 047e2a336..6a1df99ea 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -213,6 +213,12 @@ public async Task StopAsync(CancellationToken cancellationToken) } DurabilitySettings.Cancel(); + + // ReSharper disable once SuspiciousTypeConversion.Global + if (Observer is IAsyncDisposable d) + { + await d.DisposeAsync(); + } } private void startInMemoryScheduledJobs() diff --git a/src/Wolverine/Tracking/Diagnostics.cs b/src/Wolverine/Tracking/Diagnostics.cs deleted file mode 100644 index 59ca392fb..000000000 --- a/src/Wolverine/Tracking/Diagnostics.cs +++ /dev/null @@ -1,121 +0,0 @@ -namespace Wolverine.Tracking; - -internal class Grid -{ - private readonly List> _columns = new(); - - public void AddColumn(string header, Func source, bool rightJustified = false) - { - var column = new Column(header, source) - { - RightJustified = rightJustified - }; - - _columns.Add(column); - } - - public string Write(IReadOnlyList items) - { - var writer = new StringWriter(); - - var totalWidth = determineWidths(items); - writer.WriteLine(); - writeSolidLine(writer, totalWidth); - - writeHeaderRow(writer); - - writeSolidLine(writer, totalWidth); - - foreach (var item in items) - { - writeBodyRow(writer, item); - } - - writeSolidLine(writer, totalWidth); - - return writer.ToString(); - } - - private int determineWidths(IReadOnlyList items) - { - foreach (var column in _columns) - { - column.DetermineWidth(items); - } - - var totalWidth = _columns.Sum(x => x.Width) + (_columns.Count) + 1; - return totalWidth; - } - - private void writeBodyRow(StringWriter writer, T item) - { - foreach (var column in _columns) - { - column.WriteLine(writer, item); - } - - writer.WriteLine('|'); - } - - private void writeHeaderRow(StringWriter writer) - { - foreach (var column in _columns) - { - column.WriteHeader(writer); - } - - writer.WriteLine('|'); - } - - private static void writeSolidLine(StringWriter writer, int totalWidth) - { - writer.WriteLine("".PadRight(totalWidth, '-')); - } -} - -internal class Column -{ - private readonly Func _source; - private int _width; - public string Header { get; } - - public Column(string header, Func source) - { - _source = source; - Header = header; - } - - public bool RightJustified { get; set; } - - public void DetermineWidth(IEnumerable items) - { - _width = items.Select(x => _source(x)).Where(x => x != null).Max(x => x.Length); - if (Header.Length > _width) _width = Header.Length; - _width += 4; - } - - public int Width => _width; - - public void WriteHeader(TextWriter writer) - { - writer.Write("| "); - writer.Write(Header); - writer.Write("".PadRight(_width - Header.Length - 1)); - } - - public void WriteLine(TextWriter writer, T item) - { - var value = _source(item) ?? string.Empty; - writer.Write("| "); - if (RightJustified) - { - writer.Write("".PadRight(_width - value.Length - 1)); - writer.Write(value); - } - else - { - writer.Write(value); - writer.Write("".PadRight(_width - value.Length - 1)); - } - } -} \ No newline at end of file diff --git a/src/Wolverine/Tracking/TrackedSession.cs b/src/Wolverine/Tracking/TrackedSession.cs index 1e83f9826..9962aa5fd 100644 --- a/src/Wolverine/Tracking/TrackedSession.cs +++ b/src/Wolverine/Tracking/TrackedSession.cs @@ -1,4 +1,5 @@ using System.Diagnostics; +using JasperFx.CommandLine.TextualDisplays; using JasperFx.Core; using JasperFx.Core.Reflection; using Microsoft.Extensions.DependencyInjection; @@ -29,6 +30,8 @@ internal class TrackedSession : ITrackedSession private readonly Stopwatch _stopwatch = new(); + private readonly List> _ignoreMessageRules = [t => t.CanBeCastTo()]; + private TrackingStatus _status = TrackingStatus.Active; public TrackedSession(IHost host) @@ -426,7 +429,8 @@ public void MaybeRecord(MessageEventType messageEventType, Envelope envelope, st } // Ignore these - if (envelope.Message is IAgentCommand) + var messageType = envelope.Message.GetType(); + if (_ignoreMessageRules.Any(x => x(messageType))) { return; } @@ -451,7 +455,8 @@ public void Record(MessageEventType eventType, Envelope envelope, string? servic } // Ignore these - if (envelope.Message is IAgentCommand) + var messageType = envelope.Message?.GetType(); + if (messageType != null && _ignoreMessageRules.Any(x => x(messageType))) { return; } @@ -523,6 +528,11 @@ public override string ToString() return $"{conditions}\n\n{activity}\\{exceptions}"; } + + public void IgnoreMessageTypes(Func filter) + { + _ignoreMessageRules.Add(filter); + } } internal class HostWrapper : IHost diff --git a/src/Wolverine/Tracking/TrackedSessionConfiguration.cs b/src/Wolverine/Tracking/TrackedSessionConfiguration.cs index 53355f7fc..be6578762 100644 --- a/src/Wolverine/Tracking/TrackedSessionConfiguration.cs +++ b/src/Wolverine/Tracking/TrackedSessionConfiguration.cs @@ -26,6 +26,29 @@ public TrackedSessionConfiguration Timeout(TimeSpan timeout) return this; } + /// + /// Do not track any messages of this type + /// Helpful for polling operations that maybe happening during your testing + /// + /// + /// + public TrackedSessionConfiguration IgnoreMessageType() + { + return IgnoreMessagesMatchingType(t => t == typeof(T)); + } + + /// + /// Do not track any messages where the message type matches this filter. + /// Helpful for polling operations that maybe happening during your testing + /// + /// + /// + public TrackedSessionConfiguration IgnoreMessagesMatchingType(Func filter) + { + Session.IgnoreMessageTypes(filter); + return this; + } + /// /// Track activity across an additional Wolverine application /// diff --git a/src/Wolverine/Wolverine.csproj b/src/Wolverine/Wolverine.csproj index 3fb299d8c..9bf4d9f29 100644 --- a/src/Wolverine/Wolverine.csproj +++ b/src/Wolverine/Wolverine.csproj @@ -4,7 +4,7 @@ WolverineFx - +