Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions src/EventSourcingTests/FetchForWriting/try_fast_forward_version.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using System;
using System.Threading.Tasks;
using EventSourcingTests.Aggregation;
using JasperFx.Events.Projections;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace EventSourcingTests.FetchForWriting;

public class try_fast_forward_version : OneOffConfigurationsContext
{
[Fact]
public async Task does_no_harm_event_when_called_early()
{
StoreOptions(opts => opts.Projections.Snapshot<SimpleAggregate>(SnapshotLifecycle.Inline));

var streamId = Guid.NewGuid();

var stream = await theSession.Events.FetchForWriting<SimpleAggregate>(streamId);

// Should be fine to call this on a brand new stream even though it does nothing
stream.TryFastForwardVersion();

stream.AppendOne(new AEvent());
stream.AppendMany(new BEvent(), new BEvent(), new BEvent());
stream.AppendMany(new CEvent(), new CEvent());

await theSession.SaveChangesAsync();

var document = await theSession.Events.AggregateStreamAsync<SimpleAggregate>(streamId);
document.ACount.ShouldBe(1);
document.BCount.ShouldBe(3);
document.CCount.ShouldBe(2);
}

[Fact]
public async Task append_successfully_after_initial_save_changes_async()
{
StoreOptions(opts => opts.Projections.Snapshot<SimpleAggregate>(SnapshotLifecycle.Inline));

var streamId = Guid.NewGuid();

var stream = await theSession.Events.FetchForWriting<SimpleAggregate>(streamId);

stream.AppendOne(new AEvent());
stream.AppendMany(new BEvent(), new BEvent(), new BEvent());
stream.AppendMany(new CEvent(), new CEvent());

await theSession.SaveChangesAsync();

// Advance the expected version now that we've committed the first batch
stream.TryFastForwardVersion();

stream.AppendOne(new AEvent());
stream.AppendOne(new AEvent());

await theSession.SaveChangesAsync();

var document = await theSession.Events.AggregateStreamAsync<SimpleAggregate>(streamId);
document.ACount.ShouldBe(3);
document.BCount.ShouldBe(3);
document.CCount.ShouldBe(2);
}

[Fact]
public async Task append_multiple_times()
{
StoreOptions(opts => opts.Projections.Snapshot<SimpleAggregate>(SnapshotLifecycle.Inline));

var streamId = Guid.NewGuid();

var stream = await theSession.Events.FetchForWriting<SimpleAggregate>(streamId);

stream.AppendOne(new AEvent());

await theSession.SaveChangesAsync();

stream.TryFastForwardVersion();

stream.AppendOne(new BEvent());

await theSession.SaveChangesAsync();

stream.TryFastForwardVersion();

stream.AppendOne(new CEvent());

await theSession.SaveChangesAsync();

var document = await theSession.Events.AggregateStreamAsync<SimpleAggregate>(streamId);
document.ACount.ShouldBe(1);
document.BCount.ShouldBe(1);
document.CCount.ShouldBe(1);
}
}
8 changes: 4 additions & 4 deletions src/Marten/Events/EventStore.FetchForWriting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ IEventStream<TDoc> IEventIdentityStrategy<Guid>.StartStream<TDoc>(TDoc? document
action.AggregateType = typeof(TDoc);
action.ExpectedVersionOnServer = 0;

return new EventStream<TDoc>(_store.Events, id, document, cancellation, action);
return new EventStream<TDoc>(session, _store.Events, id, document, cancellation, action);
}

IEventStream<TDoc> IEventIdentityStrategy<Guid>.AppendToStream<TDoc>(TDoc? document, DocumentSessionBase session,
Guid id, long version, CancellationToken cancellation) where TDoc : class
{
var action = session.Events.Append(id);
action.ExpectedVersionOnServer = version;
return new EventStream<TDoc>(_store.Events, id, document, cancellation, action);
return new EventStream<TDoc>(session, _store.Events, id, document, cancellation, action);
}

IQueryHandler<IReadOnlyList<IEvent>> IEventIdentityStrategy<Guid>.BuildEventQueryHandler(bool isGlobal, Guid id,
Expand Down Expand Up @@ -104,15 +104,15 @@ IEventStream<TDoc> IEventIdentityStrategy<string>.StartStream<TDoc>(TDoc? docume
action.AggregateType = typeof(TDoc);
action.ExpectedVersionOnServer = 0;

return new EventStream<TDoc>(_store.Events, id, document, cancellation, action);
return new EventStream<TDoc>(session, _store.Events, id, document, cancellation, action);
}

IEventStream<TDoc> IEventIdentityStrategy<string>.AppendToStream<TDoc>(TDoc? document,
DocumentSessionBase session, string id, long version, CancellationToken cancellation) where TDoc : class
{
var action = session.Events.Append(id);
action.ExpectedVersionOnServer = version;
return new EventStream<TDoc>(_store.Events, id, document, cancellation, action);
return new EventStream<TDoc>(session, _store.Events, id, document, cancellation, action);
}

IQueryHandler<IReadOnlyList<IEvent>> IEventIdentityStrategy<string>.BuildEventQueryHandler(bool isGlobal, string id,
Expand Down
38 changes: 34 additions & 4 deletions src/Marten/Events/IEventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,18 @@
using System.Linq;
using System.Threading;
using JasperFx.Events;
using Marten.Internal.Sessions;

namespace Marten.Events;

/// <summary>
/// Internal marker interface for event streams
/// </summary>
internal interface IEventStream
{
void TryFastForwardVersion();
}

public interface IEventStream<out T> where T: notnull
{
T? Aggregate { get; }
Expand All @@ -21,16 +30,25 @@ public interface IEventStream<out T> where T: notnull
void AppendOne(object @event);
void AppendMany(params object[] events);
void AppendMany(IEnumerable<object> events);

/// <summary>
/// Try to advance the expected starting version for optimistic concurrency checks to the current version
/// so that you can reuse a stream object for multiple units of work. This is meant to only be used in
/// very specific circumstances.
/// </summary>
void TryFastForwardVersion();
}

internal class EventStream<T>: IEventStream<T> where T: notnull
internal class EventStream<T>: IEventStream<T>, IEventStream where T: notnull
{
private readonly StreamAction _stream;
private StreamAction _stream;
private readonly Func<object, IEvent> _wrapper;
private readonly DocumentSessionBase _session;

public EventStream(EventGraph events, Guid streamId, T? aggregate, CancellationToken cancellation,
public EventStream(DocumentSessionBase session, EventGraph events, Guid streamId, T? aggregate, CancellationToken cancellation,
StreamAction stream)
{
_session = session;
_wrapper = o =>
{
var e = events.BuildEvent(o);
Expand All @@ -45,9 +63,10 @@ public EventStream(EventGraph events, Guid streamId, T? aggregate, CancellationT
Aggregate = aggregate;
}

public EventStream(EventGraph events, string streamKey, T? aggregate, CancellationToken cancellation,
public EventStream(DocumentSessionBase session, EventGraph events, string streamKey, T? aggregate, CancellationToken cancellation,
StreamAction stream)
{
_session = session;
_wrapper = o =>
{
var e = events.BuildEvent(o);
Expand Down Expand Up @@ -90,4 +109,15 @@ public void AppendMany(IEnumerable<object> events)
public CancellationToken Cancellation { get; }

public IReadOnlyList<IEvent> Events => _stream.Events;

public void TryFastForwardVersion()
{
if (_session.WorkTracker.Streams.Contains(_stream))
{
return;
}

_stream = _stream.FastForward();
_session.WorkTracker.Streams.Add(_stream);
}
}
8 changes: 8 additions & 0 deletions src/Marten/Events/StubEventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,12 @@ public void AppendMany(IEnumerable<object> events)
public string Key { get; set; } = Guid.NewGuid().ToString();

public IReadOnlyList<IEvent> Events => EventsAppended.Select(x => EventGraph.BuildEvent(x)).ToList();

/// <summary>
/// No-op in the stub. This method is provided to satisfy the interface contract.
/// </summary>
public void TryFastForwardVersion()
{
// Intentionally does nothing in the stub
}
}
Loading