Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Marten Outbox Pattern/Subscription with custom Projection #104

Merged
merged 1 commit into from
Mar 3, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CQRS.Tests/CQRS.Tests.csproj
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="6.5.1" />
<PackageReference Include="Marten" Version="5.0.0-alpha.5" />
<PackageReference Include="Marten" Version="5.0.0-alpha.6" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
<PackageReference Include="MediatR" Version="10.0.1" />
<PackageReference Include="SharpTestsEx" Version="2.0.0" />
1 change: 1 addition & 0 deletions Core.Api.Testing/ApiFixture.cs
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ public virtual Task DisposeAsync()
return Task.CompletedTask;
}

// TODO: Add Poly here
public async Task<HttpResponseMessage> Get(string path = "", int maxNumberOfRetries = 0,
int retryIntervalInMs = 1000, Func<HttpResponseMessage, ValueTask<bool>>? check = null)
{
2 changes: 1 addition & 1 deletion Core.ElasticSearch/Indices/IndexNameMapper.cs
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ public static void AddCustomMap(Type streamType, string mappedStreamName)

public static string ToIndexPrefix<TStream>() => ToIndexPrefix(typeof(TStream));

public static string ToIndexPrefix(Type streamType) => Instance.typeNameMap.GetOrAdd(streamType, (_) =>
public static string ToIndexPrefix(Type streamType) => Instance.typeNameMap.GetOrAdd(streamType, _ =>
{
var modulePrefix = streamType.Namespace!.Split(".").First();
return $"{modulePrefix}-{streamType.Name}".ToLower();
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ public class EventStoreDBSubscriptionToAllOptions

public class EventStoreDBSubscriptionToAll
{
private readonly INoMediatorEventBus noMediatorEventBus;
private readonly INoMediatorEventBus eventBus;
private readonly EventStoreClient eventStoreClient;
private readonly ISubscriptionCheckpointRepository checkpointRepository;
private readonly ILogger<EventStoreDBSubscriptionToAll> logger;
@@ -34,12 +34,12 @@ public class EventStoreDBSubscriptionToAll

public EventStoreDBSubscriptionToAll(
EventStoreClient eventStoreClient,
INoMediatorEventBus noMediatorEventBus,
INoMediatorEventBus eventBus,
ISubscriptionCheckpointRepository checkpointRepository,
ILogger<EventStoreDBSubscriptionToAll> logger
)
{
this.noMediatorEventBus = noMediatorEventBus ?? throw new ArgumentNullException(nameof(noMediatorEventBus));
this.eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
this.eventStoreClient = eventStoreClient ?? throw new ArgumentNullException(nameof(eventStoreClient));
this.checkpointRepository =
checkpointRepository ?? throw new ArgumentNullException(nameof(checkpointRepository));
@@ -98,7 +98,7 @@ private async Task HandleEvent(StreamSubscription subscription, ResolvedEvent re
}

// publish event to internal event bus
await noMediatorEventBus.Publish(streamEvent, ct);
await eventBus.Publish(streamEvent, ct);

await checkpointRepository.Store(SubscriptionId, resolvedEvent.Event.Position.CommitPosition, ct);
}
2 changes: 1 addition & 1 deletion Core.Kafka/Producers/KafkaProducer.cs
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ public async Task Publish(IExternalEvent @event, CancellationToken cancellationT
using var p = new ProducerBuilder<string, string>(config.ProducerConfig).Build();
await Task.Yield();
// publish event to kafka topic taken from config
var result = await p.ProduceAsync(config.Topic,
await p.ProduceAsync(config.Topic,
new Message<string, string>
{
// store event type name in message Key
56 changes: 22 additions & 34 deletions Core.Marten/Config.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
using Baseline.ImTools;
using Core.Events;
using Core.Ids;
using Core.Marten.Ids;
using Core.Marten.OptimisticConcurrency;
using Core.Marten.Subscriptions;
using Core.Threading;
using Marten;
using Marten.Events.Daemon.Resiliency;
using Marten.Events.Projections;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Weasel.Core;
@@ -37,45 +41,21 @@ public static IServiceCollection AddMarten(this IServiceCollection services, ICo
.AddScoped<IIdGenerator, MartenIdGenerator>()
.AddScoped<MartenOptimisticConcurrencyScope, MartenOptimisticConcurrencyScope>()
.AddScoped<MartenExpectedStreamVersionProvider, MartenExpectedStreamVersionProvider>()
.AddScoped<MartenNextStreamVersionProvider, MartenNextStreamVersionProvider>();

var documentStore = services
.AddMarten(options =>
{
SetStoreOptions(options, martenConfig, configureOptions);
})
.AddAsyncDaemon(DaemonMode.Solo)
.InitializeStore();

SetupSchema(documentStore, martenConfig, 1);
.AddScoped<MartenNextStreamVersionProvider, MartenNextStreamVersionProvider>()
.AddMarten(sp => SetStoreOptions(sp, martenConfig, configureOptions))
.ApplyAllDatabaseChangesOnStartup()
.AddAsyncDaemon(DaemonMode.Solo);

return services;
}

private static void SetupSchema(IDocumentStore documentStore, Config martenConfig, int retryLeft = 1)
{
try
{
if (martenConfig.ShouldRecreateDatabase)
documentStore.Advanced.Clean.CompletelyRemoveAll();

using (NoSynchronizationContextScope.Enter())
{
documentStore.Schema.ApplyAllConfiguredChangesToDatabaseAsync().Wait();
}
}
catch
{
if (retryLeft == 0) throw;

Thread.Sleep(1000);
SetupSchema(documentStore, martenConfig, --retryLeft);
}
}

private static void SetStoreOptions(StoreOptions options, Config config,
Action<StoreOptions>? configureOptions = null)
private static StoreOptions SetStoreOptions(
IServiceProvider serviceProvider,
Config config,
Action<StoreOptions>? configureOptions = null
)
{
var options = new StoreOptions();
options.Connection(config.ConnectionString);
options.AutoCreateSchemaObjects = AutoCreate.CreateOrUpdate;

@@ -88,6 +68,14 @@ private static void SetStoreOptions(StoreOptions options, Config config,
nonPublicMembersStorage: NonPublicMembersStorage.All
);

options.Projections.Add(
new MartenSubscription(new[] { new MartenEventPublisher(serviceProvider) }),
ProjectionLifecycle.Async,
"MartenSubscription"
);

configureOptions?.Invoke(options);

return options;
}
}
2 changes: 1 addition & 1 deletion Core.Marten/Core.Marten.csproj
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Marten" Version="5.0.0-alpha.5" />
<PackageReference Include="Marten" Version="5.0.0-alpha.6" />
<PackageReference Include="MediatR" Version="10.0.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="6.0.0" />
7 changes: 1 addition & 6 deletions Core.Marten/Repository/MartenRepository.cs
Original file line number Diff line number Diff line change
@@ -15,15 +15,12 @@ public interface IMartenRepository<T> where T : class, IAggregate
public class MartenRepository<T>: IMartenRepository<T> where T : class, IAggregate
{
private readonly IDocumentSession documentSession;
private readonly IEventBus eventBus;

public MartenRepository(
IDocumentSession documentSession,
IEventBus eventBus
IDocumentSession documentSession
)
{
this.documentSession = documentSession;
this.eventBus = eventBus;
}

public Task<T?> Find(Guid id, CancellationToken cancellationToken) =>
@@ -39,7 +36,6 @@ public async Task<long> Add(T aggregate, CancellationToken cancellationToken)
);

await documentSession.SaveChangesAsync(cancellationToken);
await eventBus.Publish(events, cancellationToken);

return events.Length;
}
@@ -59,7 +55,6 @@ public async Task<long> Update(T aggregate, long? expectedVersion = null, Cancel
);

await documentSession.SaveChangesAsync(cancellationToken);
await eventBus.Publish(events, cancellationToken);

return nextVersion;
}
42 changes: 42 additions & 0 deletions Core.Marten/Subscriptions/MartenEventPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using Core.Events;
using Marten;
using Marten.Events;
using Microsoft.Extensions.DependencyInjection;
using IEvent = Core.Events.IEvent;

namespace Core.Marten.Subscriptions;

public class MartenEventPublisher: IMartenEventsConsumer
{
private readonly IServiceProvider serviceProvider;

public MartenEventPublisher(
IServiceProvider serviceProvider
)
{
this.serviceProvider = serviceProvider;
}

public async Task ConsumeAsync(IDocumentOperations documentOperations, IReadOnlyList<StreamAction> streamActions,
CancellationToken ct)
{
foreach (var @event in streamActions.SelectMany(streamAction => streamAction.Events))
{
// TODO: align all handlers to use StreamEvent
// var streamEvent = new StreamEvent(
// @event.Data,
// new EventMetadata(
// (ulong)@event.Version,
// (ulong)@event.Sequence
// )
// );

using var scope = serviceProvider.CreateScope();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();

if (@event.Data is not IEvent mappedEvent) continue;

await eventBus.Publish(mappedEvent, ct);
}
}
}
43 changes: 43 additions & 0 deletions Core.Marten/Subscriptions/MartenSubscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using Marten;
using Marten.Events;
using Marten.Events.Projections;

namespace Core.Marten.Subscriptions;

public class MartenSubscription: IProjection
{
private readonly IEnumerable<IMartenEventsConsumer> consumers;

public MartenSubscription(IEnumerable<IMartenEventsConsumer> consumers)
{
this.consumers = consumers;
}

public void Apply(
IDocumentOperations operations,
IReadOnlyList<StreamAction> streams
) =>
throw new NotImplementedException("Subscriptions should work only in the async scope");

public async Task ApplyAsync(
IDocumentOperations operations,
IReadOnlyList<StreamAction> streams,
CancellationToken ct
)
{
foreach (var consumer in consumers)
{
await consumer.ConsumeAsync(operations, streams, ct);
}
}
}


public interface IMartenEventsConsumer
{
Task ConsumeAsync(
IDocumentOperations documentOperations,
IReadOnlyList<StreamAction> streamActions,
CancellationToken ct
);
}
38 changes: 35 additions & 3 deletions Core.Testing/ApiFixture.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System.Linq.Expressions;
using Core.Api.Testing;
using Core.Commands;
using Core.Events;
using Core.Events.External;
using Core.Requests;
using FluentAssertions;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
using IEventBus = Core.Events.IEventBus;
@@ -16,7 +18,7 @@ public abstract class ApiWithEventsFixture<TStartup>: ApiFixture<TStartup> where
private readonly DummyExternalCommandBus externalCommandBus = new();

public override TestContext CreateTestContext() =>
new TestContext<TStartup>(GetConfiguration, (services) =>
new TestContext<TStartup>(GetConfiguration, services =>
{
SetupServices?.Invoke(services);
services.AddSingleton(eventsLog);
@@ -46,10 +48,40 @@ public async Task PublishInternalEvent(IEvent @event, CancellationToken ct = def
await eventBus.Publish(@event, ct);
}

public IReadOnlyCollection<TEvent> PublishedInternalEventsOfType<TEvent>()
public IReadOnlyCollection<TEvent> PublishedInternalEventsOfType<TEvent>() =>
eventsLog.PublishedEvents.OfType<TEvent>().ToList();

// TODO: Add Poly here
public async Task ShouldPublishInternalEventOfType<TEvent>(
Expression<Func<TEvent, bool>> predicate,
int maxNumberOfRetries = 5,
int retryIntervalInMs = 1000)
{
return eventsLog.PublishedEvents.OfType<TEvent>().ToList();
var retryCount = maxNumberOfRetries;
var finished = false;

do
{
try
{
PublishedInternalEventsOfType<TEvent>().Should()
.HaveCount(1)
.And.Contain(predicate);

finished = true;
}
catch
{
if (retryCount == 0)
throw;
}

await Task.Delay(retryIntervalInMs);
retryCount--;
} while (!finished);
}


}

public abstract class ApiWithEventsFixture: ApiFixture
2 changes: 1 addition & 1 deletion Core.Tests/Core.Tests.csproj
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="6.5.1" />
<PackageReference Include="Marten" Version="5.0.0-alpha.5" />
<PackageReference Include="Marten" Version="5.0.0-alpha.6" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
<PackageReference Include="Microsoft.DotNet.InternalAbstractions" Version="1.0.500-preview2-1-003177" />
<PackageReference Include="SharpTestsEx" Version="2.0.0" />
8 changes: 3 additions & 5 deletions Core.WebApi/Tracing/Correlation/CorrelationIdMiddleware.cs
Original file line number Diff line number Diff line change
@@ -15,16 +15,14 @@ public class CorrelationIdMiddleware

private readonly RequestDelegate next;
private readonly ILogger<CorrelationIdMiddleware> logger;
private readonly Func<CorrelationId> correlationIdFactory;

public CorrelationIdMiddleware(RequestDelegate next, ILogger<CorrelationIdMiddleware> logger, Func<CorrelationId> correlationIdFactory)
public CorrelationIdMiddleware(RequestDelegate next, ILogger<CorrelationIdMiddleware> logger)
{
this.next = next ?? throw new ArgumentNullException(nameof(next));
this.logger = logger;
this.correlationIdFactory = correlationIdFactory;
}

public async Task Invoke(HttpContext context)
public async Task Invoke(HttpContext context, Func<CorrelationId> correlationIdFactory)
{
// get correlation id from header or generate a new one
context.TraceIdentifier =
@@ -54,8 +52,8 @@ public static class CorrelationIdMiddlewareConfig
{
public static IServiceCollection AddCorrelationIdMiddleware(this IServiceCollection services)
{
services.TryAddScoped<ICorrelationIdFactory, GuidCorrelationIdFactory>();
services.TryAddScoped<Func<CorrelationId>>(sp => sp.GetRequiredService<ICorrelationIdFactory>().New);
services.TryAddScoped<ICorrelationIdFactory, GuidCorrelationIdFactory>();

return services;
}
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="6.5.1" />
<PackageReference Include="Marten" Version="5.0.0-alpha.5" />
<PackageReference Include="Marten" Version="5.0.0-alpha.6" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
<PackageReference Include="MediatR" Version="10.0.1" />
<PackageReference Include="Microsoft.DotNet.InternalAbstractions" Version="1.0.500-preview2-1-003177" />
Loading