Skip to content

Commit

Permalink
added method for getting instantiated aggregate from aggregate id, an…
Browse files Browse the repository at this point in the history
…d tests
  • Loading branch information
Marko Sego committed May 31, 2024
1 parent 2ff5084 commit 3300c74
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 4 deletions.
78 changes: 76 additions & 2 deletions Tacta.EventStore.Test/Repository/EventStoreRepositoryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public EventStoreRepositoryTest()
_eventStoreRepository = new EventStoreRepository(ConnectionFactory);
}


[Fact]
public async Task InsertAsync_GetAsync_SingleAggregate()
{
Expand All @@ -48,6 +47,32 @@ public async Task InsertAsync_GetAsync_SingleAggregate()
Assert.Equal(booCreated.CreatedAt.ToShortTimeString(), results.Single(x => x.AggregateId == booId).CreatedAt.ToShortTimeString());
Assert.Equal(booCreated.Id, results.Single(x => x.AggregateId == booId).Id);
}

[Fact]
public async Task InsertAsync_GetAsync_SingleAggregate_WhenGetAsAggregate()
{
// Given
var aggregateId = new BacklogItemId();
var summary = "summary";
var aggregate = BacklogItem.FromSummary(aggregateId, summary);
var taskTitle = "task-title";
aggregate.AddTask(taskTitle);

// When
var aggregateRecord = new AggregateRecord(aggregate.Id.ToString(), aggregate.GetType().Name, aggregate.Version);
var eventRecords = aggregate.DomainEvents.Select(@event =>
new EventRecord<IDomainEvent>(((DomainEvent)@event).Id, @event.CreatedAt, @event)).ToList().AsReadOnly();

await _eventStoreRepository.SaveAsync(aggregateRecord, eventRecords).ConfigureAwait(false);

// Then
var result = await _eventStoreRepository.GetAsync<DomainEvent, BacklogItem>(aggregateId.ToString()).ConfigureAwait(false);

Assert.NotNull(result);
Assert.Equal(summary, result.Summary);
Assert.Single(result.SubTasks);
Assert.Equal(taskTitle, result.SubTasks.Single().Title);
}

[Fact]
public async Task InsertAsync_GetAsync_SingleAggregate_WhenSavedAsCollection()
Expand Down Expand Up @@ -143,6 +168,37 @@ public async Task PassTransaction_InsertAsync_GetAsync_SingleAggregate()
Assert.Equal(booCreated.Id, results.Single(x => x.AggregateId == booId).Id);
}

[Fact]
public async Task PassTransaction_InsertAsync_GetAsync_GetAsAggregate()
{
using var connection = ConnectionFactory.Connection();
connection.Open();
var transaction = connection.BeginTransaction();

// Given
var aggregateId = new BacklogItemId();
var summary = "summary";
var aggregate = BacklogItem.FromSummary(aggregateId, summary);
var taskTitle = "task-title";
aggregate.AddTask(taskTitle);

// When
var aggregateRecord = new AggregateRecord(aggregate.Id.ToString(), aggregate.GetType().Name, aggregate.Version);
var eventRecords = aggregate.DomainEvents.Select(@event =>
new EventRecord<IDomainEvent>(((DomainEvent)@event).Id, @event.CreatedAt, @event)).ToList().AsReadOnly();

await _eventStoreRepository.SaveAsync(aggregateRecord, eventRecords).ConfigureAwait(false);
transaction.Commit();

// Then
var result = await _eventStoreRepository.GetAsync<DomainEvent, BacklogItem>(aggregateId.ToString()).ConfigureAwait(false);

Assert.NotNull(result);
Assert.Equal(summary, result.Summary);
Assert.Single(result.SubTasks);
Assert.Equal(taskTitle, result.SubTasks.Single().Title);
}

[Fact]
public async Task PassTransaction_InsertAsync_GetAsync_MultipleAggregates_WhenSavedAsCollection()
{
Expand Down Expand Up @@ -472,6 +528,24 @@ public async Task PropertiesCheck_WhenSavedAsAggregateRoot()
Assert.Equal(1, results.First().Version);
Assert.Equal(1, results.First().Sequence);
}

[Fact]
public async Task PropertiesCheck_WhenGetAsAggregate()
{
// Given
var aggregateId = new BacklogItemId();
var summary = "summary";
var aggregate = BacklogItem.FromSummary(aggregateId, summary);

await _eventStoreRepository.SaveAsync(aggregate).ConfigureAwait(false);

// When
var result = await _eventStoreRepository.GetAsync<DomainEvent, BacklogItem>(aggregateId.ToString()).ConfigureAwait(false);

// Then
Assert.Equal(summary, result.Summary);
Assert.Equal(1, result.Version);
}

[Fact]
public async Task ConcurrencyCheck()
Expand Down Expand Up @@ -611,7 +685,7 @@ public async Task GetUntilSequence_ShouldReturnAllAggregateEventStoreRecords()
// Then
Assert.Equal(2, eventStoreRecords.Count);
}

[Fact]
public async Task GetUntilSequence_WithSavedAsAggregateRoot_ShouldReturnAllAggregateEventStoreRecords()
{
Expand Down
14 changes: 14 additions & 0 deletions Tacta.EventStore/Repository/EventStoreRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,20 @@ public async Task<IReadOnlyCollection<EventStoreRecord<T>>> GetAsync<T>(string a
return await GetAsync<T>(StoredEvent.SelectQuery, param, cancellationToken).ConfigureAwait(false);
}

public async Task<TA> GetAsync<T, TA>(string aggregateId, CancellationToken cancellationToken = default) where T : DomainEvent where TA : class, IAggregateRoot<IEntityId>
{
if (string.IsNullOrWhiteSpace(aggregateId))
throw new InvalidAggregateIdException("Aggregate Id cannot be null or white space");

var param = new { AggregateId = aggregateId };

var records = await GetAsync<T>(StoredEvent.SelectQuery, param, cancellationToken).ConfigureAwait(false);
records.ToList().ForEach(x => x.Event.WithVersionAndSequence(x.Version, x.Sequence));
var domainEvents = records.Select(x => x.Event).ToList().AsReadOnly();

return domainEvents.Any() ? (TA)Activator.CreateInstance(typeof(TA), domainEvents) : null;
}

public async Task<IReadOnlyCollection<EventStoreRecord<T>>> GetFromSequenceAsync<T>(long sequence,
int? take = null, CancellationToken cancellationToken = default)
{
Expand Down
8 changes: 6 additions & 2 deletions Tacta.EventStore/Repository/IEventStoreRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Tacta.EventStore.Domain;
using Tacta.EventStore.Repository.Models;

namespace Tacta.EventStore.Repository
{
public interface IEventStoreRepository
{
Task SaveAsync<T>(AggregateRecord aggregateRecord, IReadOnlyCollection<EventRecord<T>> eventRecords,
CancellationToken cancellationToken = default);
Task SaveAsync<T>(AggregateRecord aggregateRecord, IReadOnlyCollection<EventRecord<T>> eventRecords, CancellationToken cancellationToken = default);
Task SaveAsync(IReadOnlyCollection<Aggregate> aggregates, CancellationToken cancellationToken = default);
Task SaveAsync<T>(T aggregateRoot) where T : IAggregateRoot<IEntityId>;
Task SaveAsync<T>(IEnumerable<T> aggregateRoots) where T : IAggregateRoot<IEntityId>;

Task<IReadOnlyCollection<EventStoreRecord<T>>> GetAsync<T>(string aggregateId, CancellationToken cancellationToken = default);
Task<TA> GetAsync<T, TA>(string aggregateId, CancellationToken cancellationToken = default) where T : DomainEvent where TA : class, IAggregateRoot<IEntityId>;
Task<IReadOnlyCollection<EventStoreRecord<T>>> GetFromSequenceAsync<T>(long sequence, int? take = null, CancellationToken cancellationToken = default);
Task<IReadOnlyCollection<EventStoreRecord<T>>> GetUntilAsync<T>(string aggregateId, Guid eventId, CancellationToken cancellationToken = default);
Task<IReadOnlyCollection<EventStoreRecord<T>>> GetUntilAsync<T>(string aggregateId, long sequence, CancellationToken cancellationToken = default);
Expand Down

0 comments on commit 3300c74

Please sign in to comment.