diff --git a/src/EventSourcingTests/FetchForWriting/try_fast_forward_version.cs b/src/EventSourcingTests/FetchForWriting/try_fast_forward_version.cs new file mode 100644 index 0000000000..733a14059a --- /dev/null +++ b/src/EventSourcingTests/FetchForWriting/try_fast_forward_version.cs @@ -0,0 +1,97 @@ +using System; +using System.Threading.Tasks; +using EventSourcingTests.Aggregation; +using JasperFx.Events.Projections; +using Marten.Events.Projections; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.FetchForWriting; + +public class try_fast_forward_version : OneOffConfigurationsContext +{ + [Fact] + public async Task does_no_harm_event_when_called_early() + { + StoreOptions(opts => opts.Projections.Snapshot(SnapshotLifecycle.Inline)); + + var streamId = Guid.NewGuid(); + + var stream = await theSession.Events.FetchForWriting(streamId); + + // Should be fine to call this on a brand new stream even though it does nothing + stream.TryFastForwardVersion(); + + stream.AppendOne(new AEvent()); + stream.AppendMany(new BEvent(), new BEvent(), new BEvent()); + stream.AppendMany(new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + var document = await theSession.Events.AggregateStreamAsync(streamId); + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + + [Fact] + public async Task append_successfully_after_initial_save_changes_async() + { + StoreOptions(opts => opts.Projections.Snapshot(SnapshotLifecycle.Inline)); + + var streamId = Guid.NewGuid(); + + var stream = await theSession.Events.FetchForWriting(streamId); + + stream.AppendOne(new AEvent()); + stream.AppendMany(new BEvent(), new BEvent(), new BEvent()); + stream.AppendMany(new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + // Advance the expected version now that we've committed the first batch + stream.TryFastForwardVersion(); + + stream.AppendOne(new AEvent()); + stream.AppendOne(new AEvent()); + + await theSession.SaveChangesAsync(); + + var document = await theSession.Events.AggregateStreamAsync(streamId); + document.ACount.ShouldBe(3); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + + [Fact] + public async Task append_multiple_times() + { + StoreOptions(opts => opts.Projections.Snapshot(SnapshotLifecycle.Inline)); + + var streamId = Guid.NewGuid(); + + var stream = await theSession.Events.FetchForWriting(streamId); + + stream.AppendOne(new AEvent()); + + await theSession.SaveChangesAsync(); + + stream.TryFastForwardVersion(); + + stream.AppendOne(new BEvent()); + + await theSession.SaveChangesAsync(); + + stream.TryFastForwardVersion(); + + stream.AppendOne(new CEvent()); + + await theSession.SaveChangesAsync(); + + var document = await theSession.Events.AggregateStreamAsync(streamId); + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(1); + document.CCount.ShouldBe(1); + } +} diff --git a/src/Marten/Events/EventStore.FetchForWriting.cs b/src/Marten/Events/EventStore.FetchForWriting.cs index 3d12b7f523..3f52dd04a1 100644 --- a/src/Marten/Events/EventStore.FetchForWriting.cs +++ b/src/Marten/Events/EventStore.FetchForWriting.cs @@ -39,7 +39,7 @@ IEventStream IEventIdentityStrategy.StartStream(TDoc? document action.AggregateType = typeof(TDoc); action.ExpectedVersionOnServer = 0; - return new EventStream(_store.Events, id, document, cancellation, action); + return new EventStream(session, _store.Events, id, document, cancellation, action); } IEventStream IEventIdentityStrategy.AppendToStream(TDoc? document, DocumentSessionBase session, @@ -47,7 +47,7 @@ IEventStream IEventIdentityStrategy.AppendToStream(TDoc? docum { var action = session.Events.Append(id); action.ExpectedVersionOnServer = version; - return new EventStream(_store.Events, id, document, cancellation, action); + return new EventStream(session, _store.Events, id, document, cancellation, action); } IQueryHandler> IEventIdentityStrategy.BuildEventQueryHandler(bool isGlobal, Guid id, @@ -104,7 +104,7 @@ IEventStream IEventIdentityStrategy.StartStream(TDoc? docume action.AggregateType = typeof(TDoc); action.ExpectedVersionOnServer = 0; - return new EventStream(_store.Events, id, document, cancellation, action); + return new EventStream(session, _store.Events, id, document, cancellation, action); } IEventStream IEventIdentityStrategy.AppendToStream(TDoc? document, @@ -112,7 +112,7 @@ IEventStream IEventIdentityStrategy.AppendToStream(TDoc? doc { var action = session.Events.Append(id); action.ExpectedVersionOnServer = version; - return new EventStream(_store.Events, id, document, cancellation, action); + return new EventStream(session, _store.Events, id, document, cancellation, action); } IQueryHandler> IEventIdentityStrategy.BuildEventQueryHandler(bool isGlobal, string id, diff --git a/src/Marten/Events/IEventStream.cs b/src/Marten/Events/IEventStream.cs index e94cf31a04..958cc2b074 100644 --- a/src/Marten/Events/IEventStream.cs +++ b/src/Marten/Events/IEventStream.cs @@ -3,9 +3,18 @@ using System.Linq; using System.Threading; using JasperFx.Events; +using Marten.Internal.Sessions; namespace Marten.Events; +/// +/// Internal marker interface for event streams +/// +internal interface IEventStream +{ + void TryFastForwardVersion(); +} + public interface IEventStream where T: notnull { T? Aggregate { get; } @@ -21,16 +30,25 @@ public interface IEventStream where T: notnull void AppendOne(object @event); void AppendMany(params object[] events); void AppendMany(IEnumerable events); + + /// + /// Try to advance the expected starting version for optimistic concurrency checks to the current version + /// so that you can reuse a stream object for multiple units of work. This is meant to only be used in + /// very specific circumstances. + /// + void TryFastForwardVersion(); } -internal class EventStream: IEventStream where T: notnull +internal class EventStream: IEventStream, IEventStream where T: notnull { - private readonly StreamAction _stream; + private StreamAction _stream; private readonly Func _wrapper; + private readonly DocumentSessionBase _session; - public EventStream(EventGraph events, Guid streamId, T? aggregate, CancellationToken cancellation, + public EventStream(DocumentSessionBase session, EventGraph events, Guid streamId, T? aggregate, CancellationToken cancellation, StreamAction stream) { + _session = session; _wrapper = o => { var e = events.BuildEvent(o); @@ -45,9 +63,10 @@ public EventStream(EventGraph events, Guid streamId, T? aggregate, CancellationT Aggregate = aggregate; } - public EventStream(EventGraph events, string streamKey, T? aggregate, CancellationToken cancellation, + public EventStream(DocumentSessionBase session, EventGraph events, string streamKey, T? aggregate, CancellationToken cancellation, StreamAction stream) { + _session = session; _wrapper = o => { var e = events.BuildEvent(o); @@ -90,4 +109,15 @@ public void AppendMany(IEnumerable events) public CancellationToken Cancellation { get; } public IReadOnlyList Events => _stream.Events; + + public void TryFastForwardVersion() + { + if (_session.WorkTracker.Streams.Contains(_stream)) + { + return; + } + + _stream = _stream.FastForward(); + _session.WorkTracker.Streams.Add(_stream); + } } diff --git a/src/Marten/Events/StubEventStream.cs b/src/Marten/Events/StubEventStream.cs index bee7548347..59a0d35da5 100644 --- a/src/Marten/Events/StubEventStream.cs +++ b/src/Marten/Events/StubEventStream.cs @@ -68,4 +68,12 @@ public void AppendMany(IEnumerable events) public string Key { get; set; } = Guid.NewGuid().ToString(); public IReadOnlyList Events => EventsAppended.Select(x => EventGraph.BuildEvent(x)).ToList(); + + /// + /// No-op in the stub. This method is provided to satisfy the interface contract. + /// + public void TryFastForwardVersion() + { + // Intentionally does nothing in the stub + } }