Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions src/EventSourcingTests/FetchForWriting/try_fast_forward_version.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
using System;
using System.Threading.Tasks;
using EventSourcingTests.Aggregation;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace EventSourcingTests.FetchForWriting;

public class reentrant_usage_with_fetch_for_writing : IntegrationContext
{
public reentrant_usage_with_fetch_for_writing(DefaultStoreFixture fixture) : base(fixture)
{
}

[Fact]
public async Task does_no_harm_event_when_called_early()
{
var streamId = Guid.NewGuid();

var stream = await theSession.Events.FetchForWriting<SimpleAggregate>(streamId);
stream.Aggregate.ShouldBeNull();
stream.CurrentVersion.ShouldBe(0);

stream.TryFastForwardVersion();

stream.AppendOne(new AEvent());
stream.AppendMany(new BEvent(), new BEvent(), new BEvent());
stream.AppendMany(new CEvent(), new CEvent());

await theSession.SaveChangesAsync();

var document = await theSession.Events.AggregateStreamAsync<SimpleAggregate>(streamId);
document.ACount.ShouldBe(1);
document.BCount.ShouldBe(3);
document.CCount.ShouldBe(2);
}

[Fact]
public async Task append_successfully_after_initial_save_changes_async()
{
var streamId = Guid.NewGuid();

var stream = await theSession.Events.FetchForWriting<SimpleAggregate>(streamId);
stream.Aggregate.ShouldBeNull();
stream.CurrentVersion.ShouldBe(0);

await theSession.SaveChangesAsync();

stream.TryFastForwardVersion();

stream.AppendOne(new AEvent());
stream.AppendMany(new BEvent(), new BEvent(), new BEvent());
stream.AppendMany(new CEvent(), new CEvent());

await theSession.SaveChangesAsync();

var document = await theSession.Events.AggregateStreamAsync<SimpleAggregate>(streamId);
document.ACount.ShouldBe(1);
document.BCount.ShouldBe(3);
document.CCount.ShouldBe(2);
}

[Fact]
public async Task append_multiple_times()
{
var streamId = Guid.NewGuid();

var stream = await theSession.Events.FetchForWriting<SimpleAggregate>(streamId);
stream.Aggregate.ShouldBeNull();
stream.CurrentVersion.ShouldBe(0);

await theSession.SaveChangesAsync();

stream.AppendOne(new AEvent());

stream.TryFastForwardVersion();


stream.AppendMany(new BEvent(), new BEvent(), new BEvent());
stream.AppendMany(new CEvent(), new CEvent());

await theSession.SaveChangesAsync();

var document = await theSession.Events.AggregateStreamAsync<SimpleAggregate>(streamId);
document.ACount.ShouldBe(1);
document.BCount.ShouldBe(3);
document.CCount.ShouldBe(2);
}
}
8 changes: 4 additions & 4 deletions src/Marten/Events/EventStore.FetchForWriting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ IEventStream<TDoc> IEventIdentityStrategy<Guid>.StartStream<TDoc>(TDoc document,
action.AggregateType = typeof(TDoc);
action.ExpectedVersionOnServer = 0;

return new EventStream<TDoc>(_store.Events, id, document, cancellation, action);
return new EventStream<TDoc>(_session, _store.Events, id, document, cancellation, action);
}

IEventStream<TDoc> IEventIdentityStrategy<Guid>.AppendToStream<TDoc>(TDoc document, DocumentSessionBase session,
Guid id, long version, CancellationToken cancellation)
{
var action = session.Events.Append(id);
action.ExpectedVersionOnServer = version;
return new EventStream<TDoc>(_store.Events, id, document, cancellation, action);
return new EventStream<TDoc>(_session, _store.Events, id, document, cancellation, action);
}

IQueryHandler<IReadOnlyList<IEvent>> IEventIdentityStrategy<Guid>.BuildEventQueryHandler(Guid id,
Expand Down Expand Up @@ -72,15 +72,15 @@ IEventStream<TDoc> IEventIdentityStrategy<string>.StartStream<TDoc>(TDoc documen
action.AggregateType = typeof(TDoc);
action.ExpectedVersionOnServer = 0;

return new EventStream<TDoc>(_store.Events, id, document, cancellation, action);
return new EventStream<TDoc>(_session, _store.Events, id, document, cancellation, action);
}

IEventStream<TDoc> IEventIdentityStrategy<string>.AppendToStream<TDoc>(TDoc document,
DocumentSessionBase session, string id, long version, CancellationToken cancellation)
{
var action = session.Events.Append(id);
action.ExpectedVersionOnServer = version;
return new EventStream<TDoc>(_store.Events, id, document, cancellation, action);
return new EventStream<TDoc>(_session, _store.Events, id, document, cancellation, action);
}

IQueryHandler<IReadOnlyList<IEvent>> IEventIdentityStrategy<string>.BuildEventQueryHandler(string id,
Expand Down
40 changes: 36 additions & 4 deletions src/Marten/Events/IEventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Marten.Internal.Sessions;

namespace Marten.Events;

Expand All @@ -20,14 +21,32 @@ public interface IEventStream<T>
void AppendOne(object @event);
void AppendMany(params object[] events);
void AppendMany(IEnumerable<object> events);

/// <summary>
/// If you need to reuse this IEventStream after committing the active
/// unit of work, call this method to "forward" the expected version
/// for the next usage in a subsequent unit of work.
///
/// This will do no harm if the event stream has never been committed and can
/// be used safely regardless of the EventStream state
///
/// This is probably mostly useful for legacy code
/// </summary>
void TryFastForwardVersion();
}

internal interface IEventStream
{

}

internal class EventStream<T>: IEventStream<T>
internal class EventStream<T>: IEventStream<T>, IEventStream
{
private readonly StreamAction _stream;
private readonly DocumentSessionBase _session;
private StreamAction _stream;
private readonly Func<object, IEvent> _wrapper;

public EventStream(EventGraph events, Guid streamId, T aggregate, CancellationToken cancellation,
public EventStream(DocumentSessionBase session, EventGraph events, Guid streamId, T aggregate, CancellationToken cancellation,
StreamAction stream)
{
_wrapper = o =>
Expand All @@ -37,14 +56,15 @@ public EventStream(EventGraph events, Guid streamId, T aggregate, CancellationTo
return e;
};

_session = session;
_stream = stream;
_stream.AggregateType = typeof(T);

Cancellation = cancellation;
Aggregate = aggregate;
}

public EventStream(EventGraph events, string streamKey, T aggregate, CancellationToken cancellation,
public EventStream(DocumentSessionBase session, EventGraph events, string streamKey, T aggregate, CancellationToken cancellation,
StreamAction stream)
{
_wrapper = o =>
Expand All @@ -54,13 +74,25 @@ public EventStream(EventGraph events, string streamKey, T aggregate, Cancellatio
return e;
};

_session = session;
_stream = stream;
_stream.AggregateType = typeof(T);

Cancellation = cancellation;
Aggregate = aggregate;
}

public void TryFastForwardVersion()
{
if (_session.WorkTracker.Streams.Contains(_stream))
{
return;
}

_stream = _stream.FastForward();
_session.WorkTracker.Streams.Add(_stream);
}

public Guid Id => _stream.Id;
public string Key => _stream.Key;

Expand Down
15 changes: 15 additions & 0 deletions src/Marten/Events/StreamAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ protected StreamAction(Guid id, string key, StreamActionType actionType)
ActionType = actionType;
}

internal StreamAction Clone()
{
return (StreamAction)MemberwiseClone();
}

/// <summary>
/// Identity of the stream if using Guid's as the identity
/// </summary>
Expand Down Expand Up @@ -444,4 +449,14 @@ public bool IsStarting()
{
return ActionType == StreamActionType.Start || Events.First().Version == 1;
}

internal StreamAction FastForward()
{
if (!_events.Any()) return this;

return new StreamAction(Id, Key, StreamActionType.Append)
{
TenantId = TenantId, Id = Id, ExpectedVersionOnServer = Version
};
}
}