Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions src/EventSourcingTests/replacing_events.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using JasperFx.Events;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace EventSourcingTests;

public class replacing_events : OneOffConfigurationsContext
{
[Fact]
public async Task simple_replacement_default_settings()
{
var streamId = Guid.NewGuid();
var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } };
var departed = new MembersDeparted { Members = new[] { "Thom" } };

theSession.Events.StartStream<Quest>(streamId, joined, departed);
await theSession.SaveChangesAsync();

var events = await theSession.Events.FetchStreamAsync(streamId);

var sequence = events.Last().Sequence;

var joined2 = new MembersJoined { Members = ["Moiraine", "Lan"] };
theSession.Events.CompletelyReplaceEvent(sequence, joined2);
await theSession.SaveChangesAsync();

var events2 = await theSession.Events.FetchStreamAsync(streamId);
var final = events2.Last().ShouldBeOfType<Event<MembersJoined>>();

// These should not change
final.Version.ShouldBe(events.Last().Version);
final.Sequence.ShouldBe(events.Last().Sequence);

// Id gets changed
final.Id.ShouldNotBe(events.Last().Id);

// These need to get changed
final.Data.Members.ShouldBe(["Moiraine", "Lan"]);
final.DotNetTypeName.ShouldBe("EventSourcingTests.MembersJoined, EventSourcingTests");
final.EventTypeName.ShouldBe("members_joined");

}

[Fact]
public async Task simple_replacement_all_metadata_turned_on()
{
StoreOptions(opts =>
{
opts.Events.MetadataConfig.HeadersEnabled = true;
opts.Events.MetadataConfig.CausationIdEnabled = true;
opts.Events.MetadataConfig.CorrelationIdEnabled = true;
});

var streamId = Guid.NewGuid();
var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } };
var departed = new MembersDeparted { Members = new[] { "Thom" } };

theSession.Events.StartStream<Quest>(streamId, joined, departed);
await theSession.SaveChangesAsync();

var events = await theSession.Events.FetchStreamAsync(streamId);

var sequence = events.Last().Sequence;

var joined2 = new MembersJoined { Members = ["Moiraine", "Lan"] };
theSession.Events.CompletelyReplaceEvent(sequence, joined2);
await theSession.SaveChangesAsync();

var events2 = await theSession.Events.FetchStreamAsync(streamId);
var final = events2.Last().ShouldBeOfType<Event<MembersJoined>>();

// These should not change
final.Version.ShouldBe(events.Last().Version);
final.Sequence.ShouldBe(events.Last().Sequence);

// Id gets changed
final.Id.ShouldNotBe(events.Last().Id);
final.Timestamp.ShouldNotBe(events.Last().Timestamp);

// These need to get changed
final.Data.Members.ShouldBe(["Moiraine", "Lan"]);
final.DotNetTypeName.ShouldBe("EventSourcingTests.MembersJoined, EventSourcingTests");
final.EventTypeName.ShouldBe("members_joined");

}
}
8 changes: 8 additions & 0 deletions src/Marten/Events/EventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,12 @@ public void OverwriteEvent(IEvent e)
var op = new OverwriteEventOperation(_store.Events, e);
_session.QueueOperation(op);
}

public Guid CompletelyReplaceEvent<T>(long sequence, T eventBody) where T : class
{
var op = new ReplaceEventOperation<T>(_store.Events, eventBody, sequence);
_session.QueueOperation(op);

return op.Id;
}
}
13 changes: 12 additions & 1 deletion src/Marten/Events/IEventStoreOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ Task WriteExclusivelyToAggregate<T>(string id, Func<IEventStream<T>, Task> writi

/// <summary>
/// Advanced usage! If you have some need to overwrite the data or headers of an existing event,
/// this registers an operation with the current session to do so
/// this registers an operation with the current session to do so.
/// </summary>
/// <param name="e"></param>
void OverwriteEvent(IEvent e);
Expand All @@ -369,4 +369,15 @@ Task WriteExclusivelyToAggregate<T>(string id, Func<IEventStream<T>, Task> writi
/// <typeparam name="T"></typeparam>
/// <returns></returns>
ValueTask<T?> FetchLatest<T>(string id, CancellationToken cancellation = default) where T : class;

/// <summary>
/// Completely replace event data at a specified spot in the event store without changing
/// any stream identity or version information. Replaces all header information with empty.
/// This was originally meant for stream compacting
/// </summary>
/// <param name="sequence"></param>
/// <param name="eventBody"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
Guid CompletelyReplaceEvent<T>(long sequence, T eventBody) where T : class;
}
79 changes: 79 additions & 0 deletions src/Marten/Events/Protected/ReplaceEventOperation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core;
using JasperFx.Events;
using Marten.Internal;
using Marten.Internal.Operations;
using NpgsqlTypes;
using Weasel.Postgresql;

namespace Marten.Events.Protected;

internal class ReplaceEventOperation<T> : IStorageOperation where T : class
{
private readonly EventGraph _graph;
private readonly T _eventBody;
private readonly long _sequence;
private readonly string _eventTypeName;
private readonly string _dotNetType;

public ReplaceEventOperation(EventGraph graph, T eventBody, long sequence)
{
_graph = graph;
_eventBody = eventBody;
_sequence = sequence;
var mapping = graph.EventMappingFor<T>();
_eventTypeName = mapping.EventTypeName;
_dotNetType = mapping.DotNetTypeName;

Id = CombGuidIdGeneration.NewGuid();
}

public Guid Id { get; }

public void ConfigureCommand(ICommandBuilder builder, IMartenSession session)
{
builder.Append($"update {_graph.DatabaseSchemaName}.mt_events set data = ");
builder.AppendParameter(session.Serializer.ToJson(_eventBody), NpgsqlDbType.Jsonb);
builder.Append(", timestamp = now() at time zone 'utc', type = ");
builder.AppendParameter(_eventTypeName, NpgsqlDbType.Varchar);
builder.Append(", mt_dotnet_type = ");
builder.AppendParameter(_dotNetType, NpgsqlDbType.Varchar);
builder.Append(", id = ");
builder.AppendParameter(Id);

if (_graph.MetadataConfig.HeadersEnabled)
{
builder.Append(", headers = '{}'");
}

if (_graph.MetadataConfig.CausationIdEnabled)
{
builder.Append(", causation_id = NULL");
}

if (_graph.MetadataConfig.CorrelationIdEnabled)
{
builder.Append(", correlation_id = NULL");
}

builder.Append(" where seq_id = ");
builder.AppendParameter(_sequence);
}

public Type DocumentType => typeof(IEvent);
public void Postprocess(DbDataReader reader, IList<Exception> exceptions)
{
// Nothing
}

public Task PostprocessAsync(DbDataReader reader, IList<Exception> exceptions, CancellationToken token)
{
return Task.CompletedTask;
}

public OperationRole Role() => OperationRole.Events;
}
Loading