Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/build.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="JasperFx" Version="1.0.0" />
<PackageReference Include="JasperFx" Version="1.6.0-alpha-4" />
<PackageReference Include="Npgsql" Version="8.0.3" />
<PackageReference Include="Nuke.Common" Version="8.0.0" />
</ItemGroup>
Expand Down
8 changes: 4 additions & 4 deletions src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ public async Task see_the_dead_letter_events()
theSession.Logger = new TestOutputMartenLogger(_output);
var skipped = await theSession.Query<DeadLetterEvent>().ToListAsync();

skipped.Where(x => x.ProjectionName == "CollateNames" && x.ShardName == "All")
.Select(x => x.EventSequence).OrderBy(x => x)
.ShouldHaveTheSameElementsAs(4, 5, 6, 7);

skipped.Where(x => x.ProjectionName == "NamedDocuments" && x.ShardName == "All")
.Select(x => x.EventSequence).OrderBy(x => x)
.ShouldHaveTheSameElementsAs(4, 5, 6, 7, 11, 14);

skipped.Where(x => x.ProjectionName == "CollateNames" && x.ShardName == "All")
.Select(x => x.EventSequence).OrderBy(x => x)
.ShouldHaveTheSameElementsAs(4, 5, 6, 7);

}
}

Expand Down
78 changes: 78 additions & 0 deletions src/DaemonTests/wait_for_non_stale_data_error_cases.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using System;
using System.Threading.Tasks;
using EventSourcingTests.Aggregation;
using JasperFx.Core;
using JasperFx.Events;
using JasperFx.Events.Daemon;
using JasperFx.Events.Projections;
using Marten.Events.Aggregation;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace DaemonTests;

public class wait_for_non_stale_data_error_cases : OneOffConfigurationsContext
{
[Fact]
public async Task get_a_good_timeout_exception()
{
StoreOptions(opts =>
{
opts.Projections.Errors.SkipApplyErrors = false;
opts.Projections.Add<SometimesFailingLetterCountsProjection>(ProjectionLifecycle.Async);
});

theSession.Events.StartStream<LetterCounts>(new AEvent(), new BEvent(), new CEvent(), new DEvent());
theSession.Events.StartStream<LetterCounts>(new AEvent(), new BEvent(), new CEvent(), new DEvent());
theSession.Events.StartStream<LetterCounts>(new AEvent(), new BEvent(), new CEvent(), new DEvent());
theSession.Events.StartStream<LetterCounts>(new AEvent(), new ThrowError(false), new CEvent(), new DEvent());
theSession.Events.StartStream<LetterCounts>(new AEvent(), new BEvent(), new CEvent(), new DEvent());
theSession.Events.StartStream<LetterCounts>(new AEvent(), new BEvent(), new ThrowError(true), new DEvent());
await theSession.SaveChangesAsync();

using var daemon = await theStore.BuildProjectionDaemonAsync();

await daemon.StartAllAsync();

var aggregated = await Should.ThrowAsync<AggregateException>(async () =>
{
await daemon.WaitForNonStaleData(5.Seconds());
});

aggregated.InnerExceptions[0].ShouldBeOfType<TimeoutException>();
aggregated.InnerExceptions[1].ShouldBeOfType<ApplyEventException>();


}
}

public record ThrowError(bool ShouldThrow);

public class SometimesFailingLetterCountsProjection: SingleStreamProjection<LetterCounts, Guid>
{
public override LetterCounts Evolve(LetterCounts snapshot, Guid id, IEvent e)
{
snapshot ??= new LetterCounts { Id = id };
switch (e.Data)
{
case AEvent _:
snapshot.ACount++;
break;
case BEvent _:
snapshot.BCount++;
break;
case CEvent _:
snapshot.CCount++;
break;
case DEvent _:
snapshot.DCount++;
break;
case ThrowError x:
if (x.ShouldThrow) throw new Exception("You stink!");
break;
}

return snapshot;
}
}
51 changes: 51 additions & 0 deletions src/EventSourcingTests/Bugs/Bug_3942_string_only_record.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using JasperFx.Core;
using JasperFx.Events;
using JasperFx.Events.Projections;
using Marten.Events.Aggregation;
using Marten.Testing.Harness;
using Xunit;
using System.Threading.Tasks;

namespace EventSourcingTests.Bugs;

public class single_property_async : BugIntegrationContext
{
public single_property_async()
{
StoreOptions(o =>
{
o.Events.StreamIdentity = StreamIdentity.AsString;
o.Projections.Add<SingleProjection>(ProjectionLifecycle.Async);
}, true);
}

[Fact]
public async Task start_and_append_events()
{
await using var session = theStore.LightweightSession();

var stream = session.Events.StartStream<SingleProp>("key", new SinglePropCreate());

await session.SaveChangesAsync();

var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();
await daemon.WaitForNonStaleData(20.Seconds());

var aggregate = await theSession.LoadAsync<SingleProp>(stream.Key!);

Assert.NotNull(aggregate);
}
}

public class SingleProjection: SingleStreamProjection<SingleProp, string>
{
public SingleProp Create(IEvent<SinglePropCreate> @event)
{
return new SingleProp(@event.StreamKey);
}
}

public record SingleProp(string Id);

public record SinglePropCreate;
42 changes: 42 additions & 0 deletions src/EventSourcingTests/determining_the_event_store_identity.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System.Linq;
using System.Threading.Tasks;
using JasperFx.Core.Reflection;
using JasperFx.Events;
using Marten;
using Marten.Testing.Harness;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Xunit;

namespace EventSourcingTests;

public class determining_the_event_store_identity
{
[Fact]
public async Task use_correct_identities()
{
using var host = await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddMarten(m =>
{
m.Connection(ConnectionSource.ConnectionString);
m.DatabaseSchemaName = "es_identity";
});

services.AddMartenStore<IThingStore>(m =>
{
m.Connection(ConnectionSource.ConnectionString);
m.DatabaseSchemaName = "things";
});

}).StartAsync();

var stores = host.Services.GetServices<IEventStore>().ToArray();
stores.Single(x => x.GetType() == typeof(DocumentStore)).As<IEventStore>().Identity.ShouldBe(new EventStoreIdentity("main", "marten"));
stores.OfType<IThingStore>().Single().As<IEventStore>().Identity.ShouldBe(new EventStoreIdentity("ithingstore", "marten"));
}
}

public interface IThingStore: IDocumentStore;
2 changes: 1 addition & 1 deletion src/LinqTestsTypes/LinqTestsTypes.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@

<ItemGroup>
<PackageReference Include="FSharp.Core" Version="9.0.100" />
<PackageReference Include="JasperFx" Version="1.0.0" />
<PackageReference Include="JasperFx" Version="1.6.0-alpha-4" />
</ItemGroup>
</Project>
9 changes: 5 additions & 4 deletions src/Marten/DocumentStore.EventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ namespace Marten;

public partial class DocumentStore: IEventStore<IDocumentOperations, IQuerySession>, ISubscriptionRunner<ISubscription>
{

static DocumentStore()
{
ProjectionExceptions.RegisterTransientExceptionType<NpgsqlException>();
Expand All @@ -58,6 +57,8 @@ bool IEventStore.HasMultipleTenants
}
}

public EventStoreIdentity Identity { get; }

IEventRegistry IEventStore<IDocumentOperations, IQuerySession>.Registry => Options.EventGraph;

public Type IdentityTypeForProjectedType(Type aggregateType)
Expand Down Expand Up @@ -209,11 +210,11 @@ public async ValueTask<IProjectionBatch<IDocumentOperations, IQuerySession>> Sta
var projectionBatch = new ProjectionBatch(session, batch, mode);
if (range.SequenceFloor == 0)
{
batch.Queue.Post(new InsertProjectionProgress(session.Options.EventGraph, range));
await batch.Queue.PostAsync(new InsertProjectionProgress(session.Options.EventGraph, range)).ConfigureAwait(false);
}
else
{
batch.Queue.Post(new UpdateProjectionProgress(session.Options.EventGraph, range));
await batch.Queue.PostAsync(new UpdateProjectionProgress(session.Options.EventGraph, range)).ConfigureAwait(false);
}

return projectionBatch;
Expand Down Expand Up @@ -288,7 +289,7 @@ async Task ISubscriptionRunner<ISubscription>.ExecuteAsync(ISubscription subscri
};;

// Mark the progression
batch.Queue.Post(range.BuildProgressionOperation(Events));
await batch.Queue.PostAsync(range.BuildProgressionOperation(Events)).ConfigureAwait(false);

await using var session = new ProjectionDocumentSession(this, batch,
new SessionOptions
Expand Down
13 changes: 13 additions & 0 deletions src/Marten/DocumentStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public DocumentStore(StoreOptions options)

decorator.ReadEventTypes(options.EventGraph);
}

Identity = new(Options.StoreName.ToLowerInvariant(), "marten");
}

public ITenancy Tenancy => Options.Tenancy;
Expand Down Expand Up @@ -391,6 +393,17 @@ public async ValueTask<IProjectionDaemon> BuildProjectionDaemonAsync(
return database.As<MartenDatabase>().StartProjectionDaemon(this, logger);
}

public async ValueTask<IProjectionDaemon> BuildProjectionDaemonAsync(DatabaseId id)
{
var logger = Options.LogFactory?.CreateLogger<ProjectionDaemon>() ?? Options.DotNetLogger ?? NullLogger.Instance;

var database = await Tenancy.FindDatabase(id).ConfigureAwait(false);

await database.EnsureStorageExistsAsync(typeof(IEvent)).ConfigureAwait(false);

return database.As<MartenDatabase>().StartProjectionDaemon(this, logger);
}

[Obsolete(
"""
Opening a session without explicitly providing desired type may be dropped in next Marten version.
Expand Down
69 changes: 54 additions & 15 deletions src/Marten/Events/AsyncProjectionTestingExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.CommandLine.TextualDisplays;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using JasperFx.Events.Daemon;
using JasperFx.Events.Projections;
using Marten.Events.Daemon;
using Marten.Services;
using Marten.Storage;
using Microsoft.Extensions.Hosting;

Expand Down Expand Up @@ -85,6 +89,12 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase
// Number of active projection shards, plus the high water mark
var projectionsCount = database.As<MartenDatabase>().Options.Projections.AllShards().Count + 1;

// Just get out of there if there are no projections
if (projectionsCount == 1)
{
return;
}

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(timeout);

Expand All @@ -105,33 +115,62 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase
throw new TimeoutException("No event activity was detected within the timeout span");
}

IReadOnlyList<ShardState> projections;
do
IReadOnlyList<ShardState> projections = [];
try
{
projections = await database.AllProjectionProgress(cancellationSource.Token).ConfigureAwait(false);
if ((projections.Count >= projectionsCount &&
projections.All(x => x.Sequence >= initial.EventSequenceNumber))
|| cancellationSource.IsCancellationRequested)
do
{
break;
}

await Task.Delay(250.Milliseconds(), cancellationSource.Token).ConfigureAwait(false);
} while (true);
projections = await database.AllProjectionProgress(cancellationSource.Token).ConfigureAwait(false);
if ((projections.Count >= projectionsCount &&
projections.All(x => x.Sequence >= initial.EventSequenceNumber))
|| cancellationSource.IsCancellationRequested)
{
break;
}

await Task.Delay(250.Milliseconds(), cancellationSource.Token).ConfigureAwait(false);
} while (true);
}
catch (TaskCanceledException)
{
// We just didn't finish
}

if (projections.Count < projectionsCount)
{
throw new TimeoutException(
$"The projection shards (in total of {projectionsCount}) haven't been completely started within the timeout span");
var writer = new StringWriter();
await writer.WriteLineAsync($"The projection shards (in total of {projectionsCount}) haven't been completely started within the timeout span").ConfigureAwait(false);
await writer.WriteLineAsync().ConfigureAwait(false);
await writer.WriteLineAsync(writeStatusMessage(projections)).ConfigureAwait(false);
await writer.WriteLineAsync().ConfigureAwait(false);

throw new TimeoutException(writer.ToString());
}

if (cancellationSource.IsCancellationRequested)
{
throw new TimeoutException(
$"The projections timed out before reaching the initial sequence of {initial.EventSequenceNumber}");
var writer = new StringWriter();
await writer.WriteLineAsync($"The projections timed out before reaching the initial sequence of {initial.EventSequenceNumber}").ConfigureAwait(false);
await writer.WriteLineAsync().ConfigureAwait(false);
await writer.WriteLineAsync(writeStatusMessage(projections)).ConfigureAwait(false);
await writer.WriteLineAsync().ConfigureAwait(false);

throw new TimeoutException(writer.ToString());
}
}

private static string writeStatusMessage(IReadOnlyList<ShardState> projections)
{

var grid = new Grid<ShardState>();
grid.AddColumn("Shard Name", x => x.ShardName);
grid.AddColumn("Sequence", x => x.Sequence.ToString(), true);

return grid.Write(projections);


}

private static bool isComplete(this Dictionary<string, long> tracking, long highWaterMark)
{
return tracking.Values.All(x => x >= highWaterMark);
Expand Down
Loading
Loading