diff --git a/src/EventSourcingTests/FetchForWriting/try_fast_forward_version.cs b/src/EventSourcingTests/FetchForWriting/try_fast_forward_version.cs new file mode 100644 index 0000000000..9df174f710 --- /dev/null +++ b/src/EventSourcingTests/FetchForWriting/try_fast_forward_version.cs @@ -0,0 +1,90 @@ +using System; +using System.Threading.Tasks; +using EventSourcingTests.Aggregation; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.FetchForWriting; + +public class reentrant_usage_with_fetch_for_writing : IntegrationContext +{ + public reentrant_usage_with_fetch_for_writing(DefaultStoreFixture fixture) : base(fixture) + { + } + + [Fact] + public async Task does_no_harm_event_when_called_early() + { + var streamId = Guid.NewGuid(); + + var stream = await theSession.Events.FetchForWriting(streamId); + stream.Aggregate.ShouldBeNull(); + stream.CurrentVersion.ShouldBe(0); + + stream.TryFastForwardVersion(); + + stream.AppendOne(new AEvent()); + stream.AppendMany(new BEvent(), new BEvent(), new BEvent()); + stream.AppendMany(new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + var document = await theSession.Events.AggregateStreamAsync(streamId); + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + + [Fact] + public async Task append_successfully_after_initial_save_changes_async() + { + var streamId = Guid.NewGuid(); + + var stream = await theSession.Events.FetchForWriting(streamId); + stream.Aggregate.ShouldBeNull(); + stream.CurrentVersion.ShouldBe(0); + + await theSession.SaveChangesAsync(); + + stream.TryFastForwardVersion(); + + stream.AppendOne(new AEvent()); + stream.AppendMany(new BEvent(), new BEvent(), new BEvent()); + stream.AppendMany(new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + var document = await theSession.Events.AggregateStreamAsync(streamId); + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + + [Fact] + public async Task append_multiple_times() + { + var streamId = Guid.NewGuid(); + + var stream = await theSession.Events.FetchForWriting(streamId); + stream.Aggregate.ShouldBeNull(); + stream.CurrentVersion.ShouldBe(0); + + await theSession.SaveChangesAsync(); + + stream.AppendOne(new AEvent()); + + stream.TryFastForwardVersion(); + + + stream.AppendMany(new BEvent(), new BEvent(), new BEvent()); + stream.AppendMany(new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + var document = await theSession.Events.AggregateStreamAsync(streamId); + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } +} diff --git a/src/Marten/Events/EventStore.FetchForWriting.cs b/src/Marten/Events/EventStore.FetchForWriting.cs index 2c648d4380..9e7d8760ca 100644 --- a/src/Marten/Events/EventStore.FetchForWriting.cs +++ b/src/Marten/Events/EventStore.FetchForWriting.cs @@ -33,7 +33,7 @@ IEventStream IEventIdentityStrategy.StartStream(TDoc document, action.AggregateType = typeof(TDoc); action.ExpectedVersionOnServer = 0; - return new EventStream(_store.Events, id, document, cancellation, action); + return new EventStream(_session, _store.Events, id, document, cancellation, action); } IEventStream IEventIdentityStrategy.AppendToStream(TDoc document, DocumentSessionBase session, @@ -41,7 +41,7 @@ IEventStream IEventIdentityStrategy.AppendToStream(TDoc docume { var action = session.Events.Append(id); action.ExpectedVersionOnServer = version; - return new EventStream(_store.Events, id, document, cancellation, action); + return new EventStream(_session, _store.Events, id, document, cancellation, action); } IQueryHandler> IEventIdentityStrategy.BuildEventQueryHandler(Guid id, @@ -72,7 +72,7 @@ IEventStream IEventIdentityStrategy.StartStream(TDoc documen action.AggregateType = typeof(TDoc); action.ExpectedVersionOnServer = 0; - return new EventStream(_store.Events, id, document, cancellation, action); + return new EventStream(_session, _store.Events, id, document, cancellation, action); } IEventStream IEventIdentityStrategy.AppendToStream(TDoc document, @@ -80,7 +80,7 @@ IEventStream IEventIdentityStrategy.AppendToStream(TDoc docu { var action = session.Events.Append(id); action.ExpectedVersionOnServer = version; - return new EventStream(_store.Events, id, document, cancellation, action); + return new EventStream(_session, _store.Events, id, document, cancellation, action); } IQueryHandler> IEventIdentityStrategy.BuildEventQueryHandler(string id, diff --git a/src/Marten/Events/IEventStream.cs b/src/Marten/Events/IEventStream.cs index 27b195980a..0d7bdefd99 100644 --- a/src/Marten/Events/IEventStream.cs +++ b/src/Marten/Events/IEventStream.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading; +using Marten.Internal.Sessions; namespace Marten.Events; @@ -20,14 +21,32 @@ public interface IEventStream void AppendOne(object @event); void AppendMany(params object[] events); void AppendMany(IEnumerable events); + + /// + /// If you need to reuse this IEventStream after committing the active + /// unit of work, call this method to "forward" the expected version + /// for the next usage in a subsequent unit of work. + /// + /// This will do no harm if the event stream has never been committed and can + /// be used safely regardless of the EventStream state + /// + /// This is probably mostly useful for legacy code + /// + void TryFastForwardVersion(); +} + +internal interface IEventStream +{ + } -internal class EventStream: IEventStream +internal class EventStream: IEventStream, IEventStream { - private readonly StreamAction _stream; + private readonly DocumentSessionBase _session; + private StreamAction _stream; private readonly Func _wrapper; - public EventStream(EventGraph events, Guid streamId, T aggregate, CancellationToken cancellation, + public EventStream(DocumentSessionBase session, EventGraph events, Guid streamId, T aggregate, CancellationToken cancellation, StreamAction stream) { _wrapper = o => @@ -37,6 +56,7 @@ public EventStream(EventGraph events, Guid streamId, T aggregate, CancellationTo return e; }; + _session = session; _stream = stream; _stream.AggregateType = typeof(T); @@ -44,7 +64,7 @@ public EventStream(EventGraph events, Guid streamId, T aggregate, CancellationTo Aggregate = aggregate; } - public EventStream(EventGraph events, string streamKey, T aggregate, CancellationToken cancellation, + public EventStream(DocumentSessionBase session, EventGraph events, string streamKey, T aggregate, CancellationToken cancellation, StreamAction stream) { _wrapper = o => @@ -54,6 +74,7 @@ public EventStream(EventGraph events, string streamKey, T aggregate, Cancellatio return e; }; + _session = session; _stream = stream; _stream.AggregateType = typeof(T); @@ -61,6 +82,17 @@ public EventStream(EventGraph events, string streamKey, T aggregate, Cancellatio Aggregate = aggregate; } + public void TryFastForwardVersion() + { + if (_session.WorkTracker.Streams.Contains(_stream)) + { + return; + } + + _stream = _stream.FastForward(); + _session.WorkTracker.Streams.Add(_stream); + } + public Guid Id => _stream.Id; public string Key => _stream.Key; diff --git a/src/Marten/Events/StreamAction.cs b/src/Marten/Events/StreamAction.cs index b7fa58113b..bfde7f3a00 100644 --- a/src/Marten/Events/StreamAction.cs +++ b/src/Marten/Events/StreamAction.cs @@ -52,6 +52,11 @@ protected StreamAction(Guid id, string key, StreamActionType actionType) ActionType = actionType; } + internal StreamAction Clone() + { + return (StreamAction)MemberwiseClone(); + } + /// /// Identity of the stream if using Guid's as the identity /// @@ -444,4 +449,14 @@ public bool IsStarting() { return ActionType == StreamActionType.Start || Events.First().Version == 1; } + + internal StreamAction FastForward() + { + if (!_events.Any()) return this; + + return new StreamAction(Id, Key, StreamActionType.Append) + { + TenantId = TenantId, Id = Id, ExpectedVersionOnServer = Version + }; + } }