diff --git a/Directory.Build.props b/Directory.Build.props index c500e5541a..a01f6a7793 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,7 +1,7 @@ - 7.40.0 + 7.40.1 12.0 Jeremy D. Miller;Babu Annamalai;Oskar Dudycz;Joona-Pekka Kokko https://martendb.io/logo.png diff --git a/src/EventSourcingTests/Bugs/Bug_3769_query_for_non_stale_projection_with_custom_projection.cs b/src/EventSourcingTests/Bugs/Bug_3769_query_for_non_stale_projection_with_custom_projection.cs new file mode 100644 index 0000000000..89e47cbbfe --- /dev/null +++ b/src/EventSourcingTests/Bugs/Bug_3769_query_for_non_stale_projection_with_custom_projection.cs @@ -0,0 +1,76 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventSourcingTests.Aggregation; +using EventSourcingTests.FetchForWriting; +using JasperFx.Core; +using Marten; +using Marten.Events; +using Marten.Events.Projections; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.Bugs; + +public class Bug_3769_query_for_non_stale_projection_with_custom_projection : BugIntegrationContext +{ + [Fact] + public async Task can_use_custom_aggregation_with_non_stale_query_data() + { + StoreOptions(opts => + { + opts.Projections.Add(new CustomAggregateProjection(), ProjectionLifecycle.Async, asyncConfiguration:x => x.StorageTypes.Add(typeof(SimpleAggregate))); + }); + + var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + + theSession.Events.StartStream(new AEvent(), new AEvent(), new BEvent(), new CEvent()); + theSession.Events.StartStream(new AEvent(), new AEvent(), new CEvent(), new CEvent()); + theSession.Events.StartStream(new AEvent(), new BEvent(), new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var results = await theSession.QueryForNonStaleData(5.Seconds()) + .Where(x => x.CCount == 2).ToListAsync(); + + results.Count.ShouldBe(2); + } +} + +public class CustomAggregateProjection: IProjection +{ + public void Apply(IDocumentOperations operations, IReadOnlyList streams) + { + throw new System.NotImplementedException(); + } + + public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList streams, CancellationToken cancellation) + { + foreach (var stream in streams) + { + var aggregate = await operations.LoadAsync(stream.Id); + aggregate ??= new SimpleAggregate { Id = stream.Id }; + + foreach (var e in stream.Events) + { + switch (e.Data) + { + case AEvent: + aggregate.ACount++; + break; + case BEvent: + aggregate.BCount++; + break; + case CEvent: + aggregate.CCount++; + break; + } + } + + operations.Store(aggregate); + } + } +} diff --git a/src/Marten/Events/Projections/ProjectionWrapper.cs b/src/Marten/Events/Projections/ProjectionWrapper.cs index 40b57948ef..462a53029c 100644 --- a/src/Marten/Events/Projections/ProjectionWrapper.cs +++ b/src/Marten/Events/Projections/ProjectionWrapper.cs @@ -26,8 +26,10 @@ public ProjectionWrapper(IProjection projection, ProjectionLifecycle lifecycle) public IEnumerable PublishedTypes() { - // Really indeterminate - yield break; + foreach (var storageType in Options.StorageTypes) + { + yield return storageType; + } } public ProjectionLifecycle Lifecycle { get; set; }