Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using IntegrationTests;
using JasperFx.Core;
using JasperFx.Events;
using JasperFx.Events.Projections;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Polecat;
using PolecatTests.Distribution.TripDomain;
using Shouldly;
using Wolverine;
using Wolverine.Polecat;
using Wolverine.Runtime.Agents;
using Wolverine.Tracking;

namespace PolecatTests.Distribution;

// Regression for GH-3168: under UseWolverineManagedEventSubscriptionDistribution a Polecat host must not
// only SURFACE the async projection shards as agent URIs (polecat_managed_event_subscription_distribution)
// but actually START them on the node. Previously every start threw "Unknown event projection or
// subscription": the agent URI carries the store Type ("SqlServer") in its authority, which System.Uri
// lowercases, while the EventSubscriptionAgentFamily store map was keyed by the original-cased
// Identity.ToString() ("Polecat:SqlServer") — so the reverse lookup missed and no agent ran (node N: []).
// Marten dodged it only because its Type is already lowercase ("marten").
public class polecat_managed_distribution_starts_agents_3168 : IAsyncLifetime
{
private IHost _host = null!;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// Speed up the leader-election + assignment loops (mirrors the Marten harness).
opts.Durability.HealthCheckPollingTime = 1.Seconds();
opts.Durability.CheckAssignmentPeriod = 1.Seconds();

opts.Services.AddPolecat(m =>
{
m.ConnectionString = Servers.SqlServerConnectionString;
m.DatabaseSchemaName = "polecat_3168";

m.Projections.Add<TripProjection>(ProjectionLifecycle.Async);
m.Projections.Add<DayProjection>(ProjectionLifecycle.Async);
m.Projections.Add<DistanceProjection>(ProjectionLifecycle.Async);
})
.IntegrateWithWolverine(o => o.UseWolverineManagedEventSubscriptionDistribution = true)
.ApplyAllDatabaseChangesOnStartup();

opts.Discovery.DisableConventionalDiscovery();
}).StartAsync();
}

public async Task DisposeAsync()
{
_host.GetRuntime().Agents.DisableHealthChecks();
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public async Task the_three_async_projection_agents_actually_run_on_one_node()
{
await _host.WaitUntilAssumesLeadershipAsync(5.Seconds());

// All three async projection shards must actually be running on the single node (GH-3168).
await _host.WaitUntilAssignmentsChangeTo(w =>
{
w.AgentScheme = EventSubscriptionAgentFamily.SchemeName;
w.ExpectRunningAgents(_host, 3);
}, 30.Seconds());

var running = _host.RunningAgents()
.Where(x => x.Scheme == EventSubscriptionAgentFamily.SchemeName)
.ToArray();

running.Length.ShouldBe(3);
}
}
12 changes: 9 additions & 3 deletions src/Wolverine/Runtime/Agents/EventSubscriptionAgentFamily.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ public static Uri UriFor(EventStoreIdentity storeIdentity, DatabaseId databaseId
return new Uri($"{SchemeName}://{storeIdentity.Type}/{storeIdentity.Name}/{databaseId}/{name.RelativeUrl}");
}

// The store Type rides in the agent URI's authority (host), and System.Uri lowercases the authority
// while EventStoreIdentity.ToString() preserves the original casing. Normalize both the stored key
// and the reverse lookup to one casing so a store whose Identity.Type has uppercase letters (e.g.
// Polecat's "SqlServer") still round-trips from URI back to its EventStoreAgents. See GH-3168.
private static string StoreKey(string identity) => identity.ToLowerInvariant();

/// <summary>
/// Resolve the agent <see cref="Uri" /> for a projection/subscription shard identified by its
/// <paramref name="shardIdentity" /> (the JasperFx <c>ShardName.Identity</c>, e.g. <c>"Trip:All"</c>)
Expand Down Expand Up @@ -105,7 +111,7 @@ public EventSubscriptionAgentFamily(IEnumerable<IEventStore> stores, IEnumerable

foreach (var store in stores)
{
_stores = _stores.AddOrUpdate(store.Identity.ToString(), new EventStoreAgents(store, _observers));
_stores = _stores.AddOrUpdate(StoreKey(store.Identity.ToString()), new EventStoreAgents(store, _observers));
}
}

Expand All @@ -123,7 +129,7 @@ public async ValueTask<IAgent> BuildAgentAsync(Uri uri, IWolverineRuntime wolver
// segments[2] database id
// other segments get you the relative path

var storeIdentity = $"{uri.Segments[1].Trim('/')}:{uri.Host}";
var storeIdentity = StoreKey($"{uri.Segments[1].Trim('/')}:{uri.Host}");
if (_stores.TryFind(storeIdentity, out var store))
{
var databaseId = DatabaseId.Parse(uri.Segments[2].Trim('/'));
Expand Down Expand Up @@ -164,7 +170,7 @@ public async ValueTask DisposeAsync()

internal EventStoreAgents FindStore(EventStoreIdentity identity)
{
if (_stores.TryFind(identity.ToString(), out var store))
if (_stores.TryFind(StoreKey(identity.ToString()), out var store))
{
return store;
}
Expand Down
Loading