Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
<AkkaVersion>1.5.40</AkkaVersion>
<AkkaHostingVersion>1.5.40</AkkaHostingVersion>
<AkkaVersion>1.5.42</AkkaVersion>
<AkkaHostingVersion>1.5.42</AkkaHostingVersion>
<EventStoreVersion>23.3.8</EventStoreVersion>
<XunitVersion>2.9.3</XunitVersion>
<TestSdkVersion>17.13.0</TestSdkVersion>
Expand Down
37 changes: 25 additions & 12 deletions src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public EventStoreJournal(Config journalConfig)
_tenantSettings = EventStoreTenantSettings.GetFrom(Context.System);
}

public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr)
public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken)
{
if (_writeInProgress.TryGetValue(persistenceId, out var wip))
{
Expand All @@ -59,18 +59,23 @@ public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId
EventStoreEventStreamFilter.FromEnd(_settings.GetStreamName(persistenceId, _tenantSettings),
fromSequenceNr);

var lastMessage = await EventStoreSource
var (killSwitch, task) = EventStoreSource
.FromStream(_eventStoreClient, filter)
.DeSerializeEventWith(_adapter, _settings.Parallelism)
.Filter(filter)
.Take(1)
.RunWith(new FirstOrDefault<ReplayCompletion<IPersistentRepresentation>>(), _mat);
.ViaMaterialized(KillSwitches.Single<ReplayCompletion<IPersistentRepresentation>>(), Keep.Right)
.ToMaterialized(new FirstOrDefault<ReplayCompletion<IPersistentRepresentation>>(), Keep.Both)
.Run(_mat);

cancellationToken.Register(() => killSwitch.Abort(new TimeoutException()));
var lastMessage = await task;

if (lastMessage != null)
return lastMessage.Data.SequenceNr;

var metadata =
await _eventStoreClient.GetStreamMetadataAsync(_settings.GetStreamName(persistenceId, _tenantSettings));
await _eventStoreClient.GetStreamMetadataAsync(_settings.GetStreamName(persistenceId, _tenantSettings), cancellationToken: cancellationToken);

var customMetaData = metadata.Metadata.CustomMetadata?.Deserialize<Dictionary<string, object>>() ??
new Dictionary<string, object>();
Expand Down Expand Up @@ -110,7 +115,7 @@ await EventStoreSource
}

protected override Task<IImmutableList<Exception?>> WriteMessagesAsync(
IEnumerable<AtomicWrite> atomicWrites)
IEnumerable<AtomicWrite> atomicWrites, CancellationToken cancellationToken)
{
var messagesList = atomicWrites.ToImmutableList();
var persistenceId = messagesList.Head().PersistenceId;
Expand All @@ -123,6 +128,7 @@ await EventStoreSource

var currentTimestamp = DateTime.Now.Ticks;

using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _pendingWriteCts.Token);
var future = _writeQueue
.Write(
_settings.GetStreamName(persistenceId, _tenantSettings),
Expand All @@ -141,6 +147,7 @@ await EventStoreSource
})
.OrderBy(y => y.SequenceNr)
.ToImmutableList(),
cts.Token,
expectedVersion)
.ContinueWith(IImmutableList<Exception?> (result) =>
{
Expand All @@ -151,7 +158,7 @@ await EventStoreSource
.Select(_ => exception)
.ToImmutableList();
},
cancellationToken: _pendingWriteCts.Token,
cancellationToken: cts.Token,
continuationOptions: TaskContinuationOptions.ExecuteSynchronously,
scheduler: TaskScheduler.Default);

Expand All @@ -162,30 +169,35 @@ await EventStoreSource
// Sequence Number reads won't block/await/etc.
future.ContinueWith(
continuationAction: p => self.Tell(new WriteFinished(persistenceId, p)),
cancellationToken: _pendingWriteCts.Token,
cancellationToken: cts.Token,
continuationOptions: TaskContinuationOptions.ExecuteSynchronously,
scheduler: TaskScheduler.Default);

// But we still want to return the future from `AsyncWriteMessages`
return future;
}

protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken)
{
var streamName = _settings.GetStreamName(persistenceId, _tenantSettings);

var filter = EventStoreEventStreamFilter.FromEnd(streamName, maxSequenceNumber: toSequenceNr);

var lastMessage = await EventStoreSource
var (killSwitch, task) = EventStoreSource
.FromStream(_eventStoreClient, filter)
.DeSerializeEventWith(_adapter, _settings.Parallelism)
.Filter(filter)
.Take(1)
.RunWith(new FirstOrDefault<ReplayCompletion<IPersistentRepresentation>>(), _mat);
.ViaMaterialized(KillSwitches.Single<ReplayCompletion<IPersistentRepresentation>>(), Keep.Right)
.ToMaterialized(new FirstOrDefault<ReplayCompletion<IPersistentRepresentation>>(), Keep.Both)
.Run(_mat);

cancellationToken.Register(() => killSwitch.Abort(new TimeoutException()));
var lastMessage = await task;

if (lastMessage != null)
{
var metadata = await _eventStoreClient.GetStreamMetadataAsync(streamName);
var metadata = await _eventStoreClient.GetStreamMetadataAsync(streamName, cancellationToken: cancellationToken);

var truncatePosition = lastMessage.Position + 1;

Expand All @@ -208,7 +220,8 @@ await _eventStoreClient
truncatePosition,
metadata.Metadata.CacheControl,
metadata.Metadata.Acl,
JsonSerializer.SerializeToDocument(customMetaData)));
JsonSerializer.SerializeToDocument(customMetaData)),
cancellationToken: cancellationToken);
}
}

Expand Down
37 changes: 24 additions & 13 deletions src/Akka.Persistence.EventStore/Snapshot/EventStoreSnapshotStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,31 @@ public EventStoreSnapshotStore(Config snapshotConfig)

protected override async Task<SelectedSnapshot?> LoadAsync(
string persistenceId,
SnapshotSelectionCriteria criteria)
SnapshotSelectionCriteria criteria,
CancellationToken cancellationToken)
{
var result = await FindSnapshot(_settings.GetStreamName(persistenceId, _tenantSettings), criteria);
var result = await FindSnapshot(_settings.GetStreamName(persistenceId, _tenantSettings), criteria, cancellationToken);

return result?.Data;
}

protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot)
protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot, CancellationToken cancellationToken)
{
await _writeQueue.Write(
_settings.GetStreamName(metadata.PersistenceId, _tenantSettings),
ImmutableList.Create(new SelectedSnapshot(metadata, snapshot)));
ImmutableList.Create(new SelectedSnapshot(metadata, snapshot)),
cancellationToken);
}

protected override Task DeleteAsync(SnapshotMetadata metadata)
protected override Task DeleteAsync(SnapshotMetadata metadata, CancellationToken cancellationToken)
{
return DeleteAsync(
metadata.PersistenceId,
new SnapshotSelectionCriteria(metadata.SequenceNr, metadata.Timestamp));
new SnapshotSelectionCriteria(metadata.SequenceNr, metadata.Timestamp),
cancellationToken);
}

protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken)
{
if (criteria.Equals(SnapshotSelectionCriteria.None))
return;
Expand All @@ -79,12 +82,13 @@ protected override async Task DeleteAsync(string persistenceId, SnapshotSelectio

var snapshotToDelete = await FindSnapshot(
streamName,
criteria);
criteria,
cancellationToken);

if (snapshotToDelete == null)
return;

var currentMetaData = await _eventStoreClient.GetStreamMetadataAsync(streamName);
var currentMetaData = await _eventStoreClient.GetStreamMetadataAsync(streamName, cancellationToken: cancellationToken);

await _eventStoreClient.SetStreamMetadataAsync(
streamName,
Expand All @@ -95,12 +99,14 @@ await _eventStoreClient.SetStreamMetadataAsync(
snapshotToDelete.Position + 1,
currentMetaData.Metadata.CacheControl,
currentMetaData.Metadata.Acl,
currentMetaData.Metadata.CustomMetadata));
currentMetaData.Metadata.CustomMetadata),
cancellationToken: cancellationToken);
}

private async Task<ReplayCompletion<SelectedSnapshot>?> FindSnapshot(
string streamName,
SnapshotSelectionCriteria criteria)
SnapshotSelectionCriteria criteria,
CancellationToken cancellationToken)
{
if (criteria.Equals(SnapshotSelectionCriteria.None))
return null;
Expand All @@ -111,11 +117,16 @@ await _eventStoreClient.SetStreamMetadataAsync(
Direction.Backwards,
criteria);

return await EventStoreSource
var (killSwitch, task) = EventStoreSource
.FromStream(_eventStoreClient, filter)
.DeSerializeSnapshotWith(_messageAdapter, _settings.Parallelism)
.Filter(filter)
.Take(1)
.RunWith(new FirstOrDefault<ReplayCompletion<SelectedSnapshot>>(), _mat);
.ViaMaterialized(KillSwitches.Single<ReplayCompletion<SelectedSnapshot>>(), Keep.Right)
.ToMaterialized(new FirstOrDefault<ReplayCompletion<SelectedSnapshot>>(), Keep.Both)
.Run(_mat);

cancellationToken.Register(() => killSwitch.Abort(new TimeoutException()));
return await task;
}
}
10 changes: 7 additions & 3 deletions src/Akka.Persistence.EventStore/Streams/EventStoreWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ await client.AppendToStreamAsync(
x.StreamName,
x.ExpectedRevision.Value,
events,
configureOperationOptions: options => options.ThrowOnAppendFailure = true);
configureOperationOptions: options => options.ThrowOnAppendFailure = true,
cancellationToken: x.CancellationToken);
}
else
{
Expand All @@ -47,7 +48,8 @@ await client
x.StreamName,
StreamState.Any,
events,
configureOperationOptions: options => options.ThrowOnAppendFailure = true);
configureOperationOptions: options => options.ThrowOnAppendFailure = true,
cancellationToken: x.CancellationToken);
}

x.Ack.TrySetResult(NotUsed.Instance);
Expand All @@ -68,11 +70,12 @@ await client
public async Task Write(
string streamName,
IImmutableList<TSource> events,
CancellationToken cancellationToken,
StreamRevision? expectedRevision = null)
{
var promise = new TaskCompletionSource<NotUsed>(TaskCreationOptions.RunContinuationsAsynchronously);

var result = await _writeQueue.OfferAsync(new QueueItem(streamName, events, promise, expectedRevision));
var result = await _writeQueue.OfferAsync(new QueueItem(streamName, events, promise, cancellationToken, expectedRevision));

switch (result)
{
Expand Down Expand Up @@ -103,5 +106,6 @@ private record QueueItem(
string StreamName,
IImmutableList<TSource> Events,
TaskCompletionSource<NotUsed> Ack,
CancellationToken CancellationToken,
StreamRevision? ExpectedRevision = null);
}
Loading