diff --git a/src/DaemonTests/Internals/HighWaterAgentTests.cs b/src/DaemonTests/Internals/HighWaterAgentTests.cs index 97612820c3..c15ec96dc3 100644 --- a/src/DaemonTests/Internals/HighWaterAgentTests.cs +++ b/src/DaemonTests/Internals/HighWaterAgentTests.cs @@ -96,7 +96,9 @@ public async Task will_not_go_in_loop_when_sequence_is_advanced_but_gaps_from_hi using (var conn = theStore.Storage.Database.CreateConnection()) { await conn.OpenAsync(); - await conn.CreateCommand($"SELECT setval('daemon.mt_events_sequence', {NumberOfEvents + 5});").ExecuteNonQueryAsync(); + + // 32 is a magic number, that's the page size of the PostgreSQL sequence size + await conn.CreateCommand($"SELECT setval('daemon.mt_events_sequence', {NumberOfEvents + 37});").ExecuteNonQueryAsync(); await conn.CloseAsync(); } diff --git a/src/EventSourcingTests/Bugs/Bug_3874_able_to_read_archived_and_tombstone_from_older_names.cs b/src/EventSourcingTests/Bugs/Bug_3874_able_to_read_archived_and_tombstone_from_older_names.cs new file mode 100644 index 0000000000..fab6e36507 --- /dev/null +++ b/src/EventSourcingTests/Bugs/Bug_3874_able_to_read_archived_and_tombstone_from_older_names.cs @@ -0,0 +1,112 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using JasperFx.Core; +using JasperFx.Events; +using JasperFx.Events.Projections; +using Marten.Events.Aggregation; +using Marten.Testing.Harness; +using Npgsql; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.Bugs; + +public class Bug_3874_able_to_read_archived_and_tombstone_from_older_names : BugIntegrationContext +{ + [Fact] + public async Task write_then_read_archived() + { + var streamId = Guid.NewGuid(); + theSession.Events.StartStream(streamId, new Archived("Old")); + + await theSession.SaveChangesAsync(); + + theSession.QueueSqlCommand("update bugs.mt_events set mt_dotnet_type = 'Marten.Events.Archived, Marten'"); + await theSession.SaveChangesAsync(); + + var events = await theSession.Events.FetchStreamAsync(streamId); + + events.Single().Data.ShouldBeOfType().Reason.ShouldBe("Old"); + } + + [Fact] + public async Task write_then_read_tombstone() + { + var streamId = Guid.NewGuid(); + theSession.Events.StartStream(streamId, new Tombstone()); + + await theSession.SaveChangesAsync(); + + theSession.QueueSqlCommand("update bugs.mt_events set mt_dotnet_type = 'Marten.Events.Operations.Tombstone, Marten'"); + await theSession.SaveChangesAsync(); + + var events = await theSession.Events.FetchStreamAsync(streamId); + + events.Single().Data.ShouldBeOfType(); + } + + [Fact] + public async Task reproduction() + { + StoreOptions(opts => + { + opts.Projections.LiveStreamAggregation(); + opts.Projections.Add(ProjectionLifecycle.Inline); + opts.Projections.Add(ProjectionLifecycle.Async); + }); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + + Guid id = CombGuidIdGeneration.NewGuid(); + theSession.Events.StartStream(id, new ReproEvent.ReproCreated("some name")); + await theSession.SaveChangesAsync(); + + theSession.Events.Append(id, new ReproEvent.ReproUpdated("other name")); + await theSession.SaveChangesAsync(); + + theSession.Events.Append(id, new Archived("Archived")); + await theSession.SaveChangesAsync(); + + await daemon.WaitForNonStaleData(10.Seconds()); + + (await theSession.LoadAsync(id)).ShouldBeNull(); + (await theSession.LoadAsync(id)).ShouldNotBeNull().Status.ShouldBe("Archived"); + + } +} + +public record ReproSimpleDetails(Guid Id, string Name, string Status); +public record ReproArchivedDetails(Guid Id, string Name, string Status); + +public record ReproEvent +{ + public record ReproCreated(string Name) : ReproEvent; + public record ReproUpdated(string UpdatedName) : ReproEvent; +} + +public record ReproLiveAgg(Guid Id, string Name) +{ + public static ReproLiveAgg Create(IEvent @event) => new(@event.Id, @event.Data.Name); + public static ReproLiveAgg Apply(ReproEvent.ReproUpdated @event, ReproLiveAgg current) => current with { Name = @event.UpdatedName }; +} + +public class ReproSimpleProjection : SingleStreamProjection +{ + public static ReproSimpleDetails Create(IEvent @event) => new(@event.Id, @event.Data.Name, "Created"); + public static ReproSimpleDetails Apply(ReproEvent.ReproUpdated @event, ReproSimpleDetails current) => current with { Name = @event.UpdatedName, Status = "Updated" }; + public static bool ShouldDelete(Archived _) => true; +} + +public class ReproArchivedProjection : SingleStreamProjection +{ + public ReproArchivedProjection() + { + IncludeArchivedEvents = true; + } + + public static ReproArchivedDetails Create(IEvent @event) => new(@event.Id, @event.Data.Name, "Created"); + public static ReproArchivedDetails Apply(ReproEvent.ReproUpdated @event, ReproArchivedDetails current) => current with { Name = @event.UpdatedName, Status = "Updated" }; + public static ReproArchivedDetails Apply(Archived _, ReproArchivedDetails current) => current with { Status = "Archived" }; +} diff --git a/src/EventSourcingTests/EventGraphTests.cs b/src/EventSourcingTests/EventGraphTests.cs index 1ba5803e40..cd844db78d 100644 --- a/src/EventSourcingTests/EventGraphTests.cs +++ b/src/EventSourcingTests/EventGraphTests.cs @@ -5,6 +5,7 @@ using Marten; using Marten.Events; using Marten.Events.Aggregation; +using Marten.Events.Operations; using Marten.Testing.Harness; using Shouldly; using Xunit; @@ -20,6 +21,13 @@ public EventGraphTests() theGraph = new StoreOptions().EventGraph; } + [Fact] + public void get_backwards_compatible_name_for_archived() + { + theGraph.TypeForDotNetName("Marten.Events.Archived, Marten").ShouldBe(typeof(Archived)); + theGraph.TypeForDotNetName("Marten.Events.Operations.Tombstone, Marten").ShouldBe(typeof(Tombstone)); + } + [Fact] public void use_optimized_projection_rebuilds_is_false_by_default() { diff --git a/src/Marten/Events/Daemon/Internals/ResilientEventLoader.cs b/src/Marten/Events/Daemon/Internals/ResilientEventLoader.cs index 5225b93203..3a739b7c7f 100644 --- a/src/Marten/Events/Daemon/Internals/ResilientEventLoader.cs +++ b/src/Marten/Events/Daemon/Internals/ResilientEventLoader.cs @@ -1,6 +1,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using JasperFx; using JasperFx.Events.Daemon; using JasperFx.Events.Projections; using Polly; diff --git a/src/Marten/Events/EventGraph.cs b/src/Marten/Events/EventGraph.cs index 6987c47baa..cacf362161 100644 --- a/src/Marten/Events/EventGraph.cs +++ b/src/Marten/Events/EventGraph.cs @@ -75,6 +75,7 @@ internal EventGraph(StoreOptions options) _aggregateTypeByName = new Cache(findAggregateType); AddEventType(); + } internal NpgsqlDbType StreamIdDbType { get; private set; } @@ -476,7 +477,19 @@ internal Type TypeForDotNetName(string assemblyQualifiedName) { if (!_nameToType.Value.TryFind(assemblyQualifiedName, out var value)) { - value = Type.GetType(assemblyQualifiedName); + if (assemblyQualifiedName.Contains(".Archived")) + { + value = typeof(Archived); + } + else if (assemblyQualifiedName.Contains(".Tombstone")) + { + value = typeof(Tombstone); + } + else + { + value = Type.GetType(assemblyQualifiedName); + } + if (value == null) { throw new UnknownEventTypeException($"Unable to load event type '{assemblyQualifiedName}'."); diff --git a/src/Marten/Internal/Sessions/EventTracingConnectionLifetime.cs b/src/Marten/Internal/Sessions/EventTracingConnectionLifetime.cs index 07a61e30f8..b9abeb7942 100644 --- a/src/Marten/Internal/Sessions/EventTracingConnectionLifetime.cs +++ b/src/Marten/Internal/Sessions/EventTracingConnectionLifetime.cs @@ -6,6 +6,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using JasperFx; using JasperFx.Descriptors; using Marten.Events.Operations; using Marten.Internal.OpenTelemetry; diff --git a/src/Marten/Marten.csproj b/src/Marten/Marten.csproj index 9cb6479086..aefcce75e1 100644 --- a/src/Marten/Marten.csproj +++ b/src/Marten/Marten.csproj @@ -33,14 +33,15 @@ - - + + +