Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for saving multiple aggregates at once #19

Merged
merged 10 commits into from
May 29, 2024
2 changes: 1 addition & 1 deletion Tacta.EventStore/Domain/DomainEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Tacta.EventStore.Domain
public abstract class DomainEvent : IDomainEvent
{
public Guid Id { get; }

public string AggregateId { get; }

[JsonIgnore] public int Version { get; private set; }
Expand Down
2 changes: 2 additions & 0 deletions Tacta.EventStore/Domain/IDomainEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ namespace Tacta.EventStore.Domain
{
public interface IDomainEvent
{
Guid Id { get; }

int Sequence { get; }

int Version { get; }
Expand Down
57 changes: 57 additions & 0 deletions Tacta.EventStore/Repository/Aggregate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using System.Collections.Generic;
using System.Linq;
using Tacta.EventStore.Domain;
using Tacta.EventStore.Repository.Exceptions;

namespace Tacta.EventStore.Repository
{
public sealed class Aggregate
{
public AggregateRecord AggregateRecord { get; }
public IReadOnlyCollection<EventRecord<IDomainEvent>> EventRecords { get; }

public Aggregate(AggregateRecord aggregateRecord, IReadOnlyCollection<EventRecord<IDomainEvent>> eventRecords)
{
if (aggregateRecord == null) throw new InvalidAggregateRecordException("Aggregate record cannot be null");
if (eventRecords == null) throw new InvalidEventRecordException("Event records cannot be null");

AggregateRecord = aggregateRecord;
EventRecords = eventRecords;
}

public Aggregate(IAggregateRoot<IEntityId> aggregateRoot)
{
var aggregateRecord = new AggregateRecord(
aggregateRoot.Id.ToString(),
aggregateRoot.GetType().Name,
aggregateRoot.Version);
var eventRecords = aggregateRoot.DomainEvents
.Select(@event => new EventRecord<IDomainEvent>(
@event.Id,
@event.CreatedAt,
@event))
.ToList()
.AsReadOnly();

AggregateRecord = aggregateRecord;
EventRecords = eventRecords;
}

internal IEnumerable<StoredEvent> ToStoredEvents()
{
var version = AggregateRecord.Version;

return EventRecords.Select(eventRecord =>
new StoredEvent
{
AggregateId = AggregateRecord.Id,
Aggregate = AggregateRecord.Name,
Version = ++version,
CreatedAt = eventRecord.CreatedAt,
Payload = PayloadSerializer.Serialize(eventRecord.Event),
Id = eventRecord.Id,
Name = eventRecord.Event.GetType().Name
});
}
}
}
43 changes: 41 additions & 2 deletions Tacta.EventStore/Repository/EventStoreRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Dapper;
using Newtonsoft.Json;
using Tacta.Connection;
using Tacta.EventStore.Domain;
using Tacta.EventStore.Repository.Exceptions;

namespace Tacta.EventStore.Repository
Expand Down Expand Up @@ -53,7 +54,7 @@ public async Task SaveAsync<T>(AggregateRecord aggregateRecord,
Aggregate = aggregateRecord.Name,
Version = ++version,
CreatedAt = eventRecord.CreatedAt,
Payload = JsonConvert.SerializeObject(eventRecord.Event, Formatting.Indented, _jsonSerializerSettings),
SegoMarko marked this conversation as resolved.
Show resolved Hide resolved
Payload = PayloadSerializer.Serialize(eventRecord.Event),
Id = eventRecord.Id,
Name = eventRecord.Event.GetType().Name
});
Expand All @@ -75,6 +76,44 @@ await _sqlConnectionFactory.ExecuteWithTransactionIfExists(async (connection, tr
}
}

public async Task SaveAsync(IReadOnlyCollection<Aggregate> aggregates,
CancellationToken cancellationToken = default)
{
if (aggregates == null || !aggregates.Any()) return;

var records = aggregates.SelectMany(aggregate => aggregate.ToStoredEvents());

try
{
await _sqlConnectionFactory.ExecuteWithTransactionIfExists(async (connection, transaction) =>
{
await connection.ExecuteAsync(StoredEvent.InsertQuery, records, transaction).ConfigureAwait(false);
}, cancellationToken).ConfigureAwait(false);
}
catch (Exception e)
{
if ((e is System.Data.SqlClient.SqlException || e is Microsoft.Data.SqlClient.SqlException) &&
e.Message.Contains("ConcurrencyCheckIndex"))
throw new ConcurrencyCheckException(e.Message);

throw;
}
}

public async Task SaveAsync<T, TId>(T aggregateRoot) where T : AggregateRoot<TId> where TId : EntityId
{
var aggregate = new Aggregate(aggregateRoot);

await SaveAsync(aggregate.AggregateRecord, aggregate.EventRecords).ConfigureAwait(false);
}

public async Task SaveAsync<T, TId>(IEnumerable<T> aggregateRoots) where T : AggregateRoot<TId> where TId : EntityId
{
var aggregates = aggregateRoots.Select(ar => new Aggregate(ar)).ToList().AsReadOnly();

await SaveAsync(aggregates).ConfigureAwait(false);
}

public async Task<IReadOnlyCollection<EventStoreRecord<T>>> GetAsync<T>(string aggregateId, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(aggregateId))
Expand Down Expand Up @@ -149,7 +188,7 @@ public async Task<IReadOnlyCollection<EventStoreRecord<T>>> GetAsync<T>(string q

return storedEvents.Select(@event => new EventStoreRecord<T>
{
Event = JsonConvert.DeserializeObject<T>(@event.Payload, _jsonSerializerSettings),
Event = PayloadSerializer.Deserialize<T>(@event),
AggregateId = @event.AggregateId,
CreatedAt = @event.CreatedAt,
Id = @event.Id,
Expand Down
24 changes: 24 additions & 0 deletions Tacta.EventStore/Repository/PayloadSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using Newtonsoft.Json;

namespace Tacta.EventStore.Repository
{
public static class PayloadSerializer
{
private static readonly JsonSerializerSettings JsonSerializerSettings = new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.All,
SegoMarko marked this conversation as resolved.
Show resolved Hide resolved
NullValueHandling = NullValueHandling.Ignore,
MetadataPropertyHandling = MetadataPropertyHandling.ReadAhead
};

public static string Serialize<T>(T @event)
{
return JsonConvert.SerializeObject(@event, Formatting.Indented, JsonSerializerSettings);
}

internal static T Deserialize<T>(StoredEvent @event)
{
return JsonConvert.DeserializeObject<T>(@event.Payload, JsonSerializerSettings);
}
}
}
6 changes: 3 additions & 3 deletions Tacta.EventStore/Tacta.EventStore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
<PackageIcon></PackageIcon>
<PackageProjectUrl>https://tacta.io/</PackageProjectUrl>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<Version>1.5.1</Version>
<PackageVersion>1.5.1</PackageVersion>
<Version>1.5.2</Version>
<PackageVersion>1.5.2</PackageVersion>
<Title>Tacta EventStore Library</Title>
<PackageReleaseNotes>Projections Status Monitoring Page</PackageReleaseNotes>
<PackageReleaseNotes>Saving multiple aggregates at once</PackageReleaseNotes>
<Configurations>Debug;Release</Configurations>
<Platforms>AnyCPU</Platforms>
</PropertyGroup>
Expand Down