diff --git a/src/EventSourcingTests/replacing_events.cs b/src/EventSourcingTests/replacing_events.cs new file mode 100644 index 0000000000..1013fc2fab --- /dev/null +++ b/src/EventSourcingTests/replacing_events.cs @@ -0,0 +1,90 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using JasperFx.Events; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace EventSourcingTests; + +public class replacing_events : OneOffConfigurationsContext +{ + [Fact] + public async Task simple_replacement_default_settings() + { + var streamId = Guid.NewGuid(); + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + theSession.Events.StartStream(streamId, joined, departed); + await theSession.SaveChangesAsync(); + + var events = await theSession.Events.FetchStreamAsync(streamId); + + var sequence = events.Last().Sequence; + + var joined2 = new MembersJoined { Members = ["Moiraine", "Lan"] }; + theSession.Events.CompletelyReplaceEvent(sequence, joined2); + await theSession.SaveChangesAsync(); + + var events2 = await theSession.Events.FetchStreamAsync(streamId); + var final = events2.Last().ShouldBeOfType>(); + + // These should not change + final.Version.ShouldBe(events.Last().Version); + final.Sequence.ShouldBe(events.Last().Sequence); + + // Id gets changed + final.Id.ShouldNotBe(events.Last().Id); + + // These need to get changed + final.Data.Members.ShouldBe(["Moiraine", "Lan"]); + final.DotNetTypeName.ShouldBe("EventSourcingTests.MembersJoined, EventSourcingTests"); + final.EventTypeName.ShouldBe("members_joined"); + + } + + [Fact] + public async Task simple_replacement_all_metadata_turned_on() + { + StoreOptions(opts => + { + opts.Events.MetadataConfig.HeadersEnabled = true; + opts.Events.MetadataConfig.CausationIdEnabled = true; + opts.Events.MetadataConfig.CorrelationIdEnabled = true; + }); + + var streamId = Guid.NewGuid(); + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + theSession.Events.StartStream(streamId, joined, departed); + await theSession.SaveChangesAsync(); + + var events = await theSession.Events.FetchStreamAsync(streamId); + + var sequence = events.Last().Sequence; + + var joined2 = new MembersJoined { Members = ["Moiraine", "Lan"] }; + theSession.Events.CompletelyReplaceEvent(sequence, joined2); + await theSession.SaveChangesAsync(); + + var events2 = await theSession.Events.FetchStreamAsync(streamId); + var final = events2.Last().ShouldBeOfType>(); + + // These should not change + final.Version.ShouldBe(events.Last().Version); + final.Sequence.ShouldBe(events.Last().Sequence); + + // Id gets changed + final.Id.ShouldNotBe(events.Last().Id); + final.Timestamp.ShouldNotBe(events.Last().Timestamp); + + // These need to get changed + final.Data.Members.ShouldBe(["Moiraine", "Lan"]); + final.DotNetTypeName.ShouldBe("EventSourcingTests.MembersJoined, EventSourcingTests"); + final.EventTypeName.ShouldBe("members_joined"); + + } +} diff --git a/src/Marten/Events/EventStore.cs b/src/Marten/Events/EventStore.cs index b9ab8c77d9..35781d344a 100644 --- a/src/Marten/Events/EventStore.cs +++ b/src/Marten/Events/EventStore.cs @@ -25,4 +25,12 @@ public void OverwriteEvent(IEvent e) var op = new OverwriteEventOperation(_store.Events, e); _session.QueueOperation(op); } + + public Guid CompletelyReplaceEvent(long sequence, T eventBody) where T : class + { + var op = new ReplaceEventOperation(_store.Events, eventBody, sequence); + _session.QueueOperation(op); + + return op.Id; + } } diff --git a/src/Marten/Events/IEventStoreOperations.cs b/src/Marten/Events/IEventStoreOperations.cs index 072819e7f1..65bb13cd69 100644 --- a/src/Marten/Events/IEventStoreOperations.cs +++ b/src/Marten/Events/IEventStoreOperations.cs @@ -345,7 +345,7 @@ Task WriteExclusivelyToAggregate(string id, Func, Task> writi /// /// Advanced usage! If you have some need to overwrite the data or headers of an existing event, - /// this registers an operation with the current session to do so + /// this registers an operation with the current session to do so. /// /// void OverwriteEvent(IEvent e); @@ -369,4 +369,15 @@ Task WriteExclusivelyToAggregate(string id, Func, Task> writi /// /// ValueTask FetchLatest(string id, CancellationToken cancellation = default) where T : class; + + /// + /// Completely replace event data at a specified spot in the event store without changing + /// any stream identity or version information. Replaces all header information with empty. + /// This was originally meant for stream compacting + /// + /// + /// + /// + /// + Guid CompletelyReplaceEvent(long sequence, T eventBody) where T : class; } diff --git a/src/Marten/Events/Protected/ReplaceEventOperation.cs b/src/Marten/Events/Protected/ReplaceEventOperation.cs new file mode 100644 index 0000000000..d9b0a3a02b --- /dev/null +++ b/src/Marten/Events/Protected/ReplaceEventOperation.cs @@ -0,0 +1,79 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core; +using JasperFx.Events; +using Marten.Internal; +using Marten.Internal.Operations; +using NpgsqlTypes; +using Weasel.Postgresql; + +namespace Marten.Events.Protected; + +internal class ReplaceEventOperation : IStorageOperation where T : class +{ + private readonly EventGraph _graph; + private readonly T _eventBody; + private readonly long _sequence; + private readonly string _eventTypeName; + private readonly string _dotNetType; + + public ReplaceEventOperation(EventGraph graph, T eventBody, long sequence) + { + _graph = graph; + _eventBody = eventBody; + _sequence = sequence; + var mapping = graph.EventMappingFor(); + _eventTypeName = mapping.EventTypeName; + _dotNetType = mapping.DotNetTypeName; + + Id = CombGuidIdGeneration.NewGuid(); + } + + public Guid Id { get; } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + builder.Append($"update {_graph.DatabaseSchemaName}.mt_events set data = "); + builder.AppendParameter(session.Serializer.ToJson(_eventBody), NpgsqlDbType.Jsonb); + builder.Append(", timestamp = now() at time zone 'utc', type = "); + builder.AppendParameter(_eventTypeName, NpgsqlDbType.Varchar); + builder.Append(", mt_dotnet_type = "); + builder.AppendParameter(_dotNetType, NpgsqlDbType.Varchar); + builder.Append(", id = "); + builder.AppendParameter(Id); + + if (_graph.MetadataConfig.HeadersEnabled) + { + builder.Append(", headers = '{}'"); + } + + if (_graph.MetadataConfig.CausationIdEnabled) + { + builder.Append(", causation_id = NULL"); + } + + if (_graph.MetadataConfig.CorrelationIdEnabled) + { + builder.Append(", correlation_id = NULL"); + } + + builder.Append(" where seq_id = "); + builder.AppendParameter(_sequence); + } + + public Type DocumentType => typeof(IEvent); + public void Postprocess(DbDataReader reader, IList exceptions) + { + // Nothing + } + + public Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + return Task.CompletedTask; + } + + public OperationRole Role() => OperationRole.Events; +}