Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/DaemonTests/Internals/HighWaterAgentTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public async Task will_not_go_in_loop_when_sequence_is_advanced_but_gaps_from_hi
using (var conn = theStore.Storage.Database.CreateConnection())
{
await conn.OpenAsync();
await conn.CreateCommand($"SELECT setval('daemon.mt_events_sequence', {NumberOfEvents + 5});").ExecuteNonQueryAsync();

// 32 is a magic number, that's the page size of the PostgreSQL sequence size
await conn.CreateCommand($"SELECT setval('daemon.mt_events_sequence', {NumberOfEvents + 37});").ExecuteNonQueryAsync();
await conn.CloseAsync();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using JasperFx.Core;
using JasperFx.Events;
using JasperFx.Events.Projections;
using Marten.Events.Aggregation;
using Marten.Testing.Harness;
using Npgsql;
using Shouldly;
using Xunit;

namespace EventSourcingTests.Bugs;

public class Bug_3874_able_to_read_archived_and_tombstone_from_older_names : BugIntegrationContext
{
[Fact]
public async Task write_then_read_archived()
{
var streamId = Guid.NewGuid();
theSession.Events.StartStream(streamId, new Archived("Old"));

await theSession.SaveChangesAsync();

theSession.QueueSqlCommand("update bugs.mt_events set mt_dotnet_type = 'Marten.Events.Archived, Marten'");
await theSession.SaveChangesAsync();

var events = await theSession.Events.FetchStreamAsync(streamId);

events.Single().Data.ShouldBeOfType<Archived>().Reason.ShouldBe("Old");
}

[Fact]
public async Task write_then_read_tombstone()
{
var streamId = Guid.NewGuid();
theSession.Events.StartStream(streamId, new Tombstone());

await theSession.SaveChangesAsync();

theSession.QueueSqlCommand("update bugs.mt_events set mt_dotnet_type = 'Marten.Events.Operations.Tombstone, Marten'");
await theSession.SaveChangesAsync();

var events = await theSession.Events.FetchStreamAsync(streamId);

events.Single().Data.ShouldBeOfType<Tombstone>();
}

[Fact]
public async Task reproduction()
{
StoreOptions(opts =>
{
opts.Projections.LiveStreamAggregation<ReproLiveAgg>();
opts.Projections.Add<ReproSimpleProjection>(ProjectionLifecycle.Inline);
opts.Projections.Add<ReproArchivedProjection>(ProjectionLifecycle.Async);
});

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();

Guid id = CombGuidIdGeneration.NewGuid();
theSession.Events.StartStream<ReproLiveAgg>(id, new ReproEvent.ReproCreated("some name"));
await theSession.SaveChangesAsync();

theSession.Events.Append(id, new ReproEvent.ReproUpdated("other name"));
await theSession.SaveChangesAsync();

theSession.Events.Append(id, new Archived("Archived"));
await theSession.SaveChangesAsync();

await daemon.WaitForNonStaleData(10.Seconds());

(await theSession.LoadAsync<ReproSimpleDetails>(id)).ShouldBeNull();
(await theSession.LoadAsync<ReproArchivedDetails>(id)).ShouldNotBeNull().Status.ShouldBe("Archived");

}
}

public record ReproSimpleDetails(Guid Id, string Name, string Status);
public record ReproArchivedDetails(Guid Id, string Name, string Status);

public record ReproEvent
{
public record ReproCreated(string Name) : ReproEvent;
public record ReproUpdated(string UpdatedName) : ReproEvent;
}

public record ReproLiveAgg(Guid Id, string Name)
{
public static ReproLiveAgg Create(IEvent<ReproEvent.ReproCreated> @event) => new(@event.Id, @event.Data.Name);
public static ReproLiveAgg Apply(ReproEvent.ReproUpdated @event, ReproLiveAgg current) => current with { Name = @event.UpdatedName };
}

public class ReproSimpleProjection : SingleStreamProjection<ReproSimpleDetails, Guid>
{
public static ReproSimpleDetails Create(IEvent<ReproEvent.ReproCreated> @event) => new(@event.Id, @event.Data.Name, "Created");
public static ReproSimpleDetails Apply(ReproEvent.ReproUpdated @event, ReproSimpleDetails current) => current with { Name = @event.UpdatedName, Status = "Updated" };
public static bool ShouldDelete(Archived _) => true;
}

public class ReproArchivedProjection : SingleStreamProjection<ReproArchivedDetails, Guid>
{
public ReproArchivedProjection()
{
IncludeArchivedEvents = true;
}

public static ReproArchivedDetails Create(IEvent<ReproEvent.ReproCreated> @event) => new(@event.Id, @event.Data.Name, "Created");
public static ReproArchivedDetails Apply(ReproEvent.ReproUpdated @event, ReproArchivedDetails current) => current with { Name = @event.UpdatedName, Status = "Updated" };
public static ReproArchivedDetails Apply(Archived _, ReproArchivedDetails current) => current with { Status = "Archived" };
}
8 changes: 8 additions & 0 deletions src/EventSourcingTests/EventGraphTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Marten;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Operations;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;
Expand All @@ -20,6 +21,13 @@ public EventGraphTests()
theGraph = new StoreOptions().EventGraph;
}

[Fact]
public void get_backwards_compatible_name_for_archived()
{
theGraph.TypeForDotNetName("Marten.Events.Archived, Marten").ShouldBe(typeof(Archived));
theGraph.TypeForDotNetName("Marten.Events.Operations.Tombstone, Marten").ShouldBe(typeof(Tombstone));
}

[Fact]
public void use_optimized_projection_rebuilds_is_false_by_default()
{
Expand Down
1 change: 1 addition & 0 deletions src/Marten/Events/Daemon/Internals/ResilientEventLoader.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using JasperFx;
using JasperFx.Events.Daemon;
using JasperFx.Events.Projections;
using Polly;
Expand Down
15 changes: 14 additions & 1 deletion src/Marten/Events/EventGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ internal EventGraph(StoreOptions options)
_aggregateTypeByName = new Cache<string, Type>(findAggregateType);

AddEventType<Archived>();

}

internal NpgsqlDbType StreamIdDbType { get; private set; }
Expand Down Expand Up @@ -476,7 +477,19 @@ internal Type TypeForDotNetName(string assemblyQualifiedName)
{
if (!_nameToType.Value.TryFind(assemblyQualifiedName, out var value))
{
value = Type.GetType(assemblyQualifiedName);
if (assemblyQualifiedName.Contains(".Archived"))
{
value = typeof(Archived);
}
else if (assemblyQualifiedName.Contains(".Tombstone"))
{
value = typeof(Tombstone);
}
else
{
value = Type.GetType(assemblyQualifiedName);
}

if (value == null)
{
throw new UnknownEventTypeException($"Unable to load event type '{assemblyQualifiedName}'.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx;
using JasperFx.Descriptors;
using Marten.Events.Operations;
using Marten.Internal.OpenTelemetry;
Expand Down
5 changes: 3 additions & 2 deletions src/Marten/Marten.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@

<ItemGroup>
<PackageReference Include="FSharp.Core" Version="9.0.100" />
<PackageReference Include="JasperFx" Version="1.2.0" />
<PackageReference Include="JasperFx.Events" Version="1.2.1" />
<PackageReference Include="JasperFx" Version="1.2.2" />
<PackageReference Include="JasperFx.Events" Version="1.3.1" />
<PackageReference Include="JasperFx.RuntimeCompiler" Version="4.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<!-- This is forced by Npgsql peer dependency -->
<PackageReference Include="Npgsql.Json.NET" Version="9.0.2" />
<PackageReference Include="Polly.Core" Version="8.5.2" />
<PackageReference Include="Weasel.Postgresql" Version="8.1.1" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="9.0.0" />
</ItemGroup>

<Import Project="../../Analysis.Build.props" />
Expand Down
Loading