diff --git a/Tacta.EventStore.Test/Repository/EventStoreRepositoryTest.cs b/Tacta.EventStore.Test/Repository/EventStoreRepositoryTest.cs index 89510f7..adba54e 100644 --- a/Tacta.EventStore.Test/Repository/EventStoreRepositoryTest.cs +++ b/Tacta.EventStore.Test/Repository/EventStoreRepositoryTest.cs @@ -23,7 +23,6 @@ public EventStoreRepositoryTest() _eventStoreRepository = new EventStoreRepository(ConnectionFactory); } - [Fact] public async Task InsertAsync_GetAsync_SingleAggregate() { @@ -48,6 +47,32 @@ public async Task InsertAsync_GetAsync_SingleAggregate() Assert.Equal(booCreated.CreatedAt.ToShortTimeString(), results.Single(x => x.AggregateId == booId).CreatedAt.ToShortTimeString()); Assert.Equal(booCreated.Id, results.Single(x => x.AggregateId == booId).Id); } + + [Fact] + public async Task InsertAsync_GetAsync_SingleAggregate_WhenGetAsAggregate() + { + // Given + var aggregateId = new BacklogItemId(); + var summary = "summary"; + var aggregate = BacklogItem.FromSummary(aggregateId, summary); + var taskTitle = "task-title"; + aggregate.AddTask(taskTitle); + + // When + var aggregateRecord = new AggregateRecord(aggregate.Id.ToString(), aggregate.GetType().Name, aggregate.Version); + var eventRecords = aggregate.DomainEvents.Select(@event => + new EventRecord(((DomainEvent)@event).Id, @event.CreatedAt, @event)).ToList().AsReadOnly(); + + await _eventStoreRepository.SaveAsync(aggregateRecord, eventRecords).ConfigureAwait(false); + + // Then + var result = await _eventStoreRepository.GetAsync(aggregateId.ToString()).ConfigureAwait(false); + + Assert.NotNull(result); + Assert.Equal(summary, result.Summary); + Assert.Single(result.SubTasks); + Assert.Equal(taskTitle, result.SubTasks.Single().Title); + } [Fact] public async Task InsertAsync_GetAsync_SingleAggregate_WhenSavedAsCollection() @@ -143,6 +168,37 @@ public async Task PassTransaction_InsertAsync_GetAsync_SingleAggregate() Assert.Equal(booCreated.Id, results.Single(x => x.AggregateId == booId).Id); } + [Fact] + public async Task PassTransaction_InsertAsync_GetAsync_GetAsAggregate() + { + using var connection = ConnectionFactory.Connection(); + connection.Open(); + var transaction = connection.BeginTransaction(); + + // Given + var aggregateId = new BacklogItemId(); + var summary = "summary"; + var aggregate = BacklogItem.FromSummary(aggregateId, summary); + var taskTitle = "task-title"; + aggregate.AddTask(taskTitle); + + // When + var aggregateRecord = new AggregateRecord(aggregate.Id.ToString(), aggregate.GetType().Name, aggregate.Version); + var eventRecords = aggregate.DomainEvents.Select(@event => + new EventRecord(((DomainEvent)@event).Id, @event.CreatedAt, @event)).ToList().AsReadOnly(); + + await _eventStoreRepository.SaveAsync(aggregateRecord, eventRecords).ConfigureAwait(false); + transaction.Commit(); + + // Then + var result = await _eventStoreRepository.GetAsync(aggregateId.ToString()).ConfigureAwait(false); + + Assert.NotNull(result); + Assert.Equal(summary, result.Summary); + Assert.Single(result.SubTasks); + Assert.Equal(taskTitle, result.SubTasks.Single().Title); + } + [Fact] public async Task PassTransaction_InsertAsync_GetAsync_MultipleAggregates_WhenSavedAsCollection() { @@ -472,6 +528,24 @@ public async Task PropertiesCheck_WhenSavedAsAggregateRoot() Assert.Equal(1, results.First().Version); Assert.Equal(1, results.First().Sequence); } + + [Fact] + public async Task PropertiesCheck_WhenGetAsAggregate() + { + // Given + var aggregateId = new BacklogItemId(); + var summary = "summary"; + var aggregate = BacklogItem.FromSummary(aggregateId, summary); + + await _eventStoreRepository.SaveAsync(aggregate).ConfigureAwait(false); + + // When + var result = await _eventStoreRepository.GetAsync(aggregateId.ToString()).ConfigureAwait(false); + + // Then + Assert.Equal(summary, result.Summary); + Assert.Equal(1, result.Version); + } [Fact] public async Task ConcurrencyCheck() @@ -611,7 +685,7 @@ public async Task GetUntilSequence_ShouldReturnAllAggregateEventStoreRecords() // Then Assert.Equal(2, eventStoreRecords.Count); } - + [Fact] public async Task GetUntilSequence_WithSavedAsAggregateRoot_ShouldReturnAllAggregateEventStoreRecords() { diff --git a/Tacta.EventStore/Repository/EventStoreRepository.cs b/Tacta.EventStore/Repository/EventStoreRepository.cs index 7747089..e4b4f83 100644 --- a/Tacta.EventStore/Repository/EventStoreRepository.cs +++ b/Tacta.EventStore/Repository/EventStoreRepository.cs @@ -131,6 +131,20 @@ public async Task>> GetAsync(string a return await GetAsync(StoredEvent.SelectQuery, param, cancellationToken).ConfigureAwait(false); } + public async Task GetAsync(string aggregateId, CancellationToken cancellationToken = default) where T : DomainEvent where TA : class, IAggregateRoot + { + if (string.IsNullOrWhiteSpace(aggregateId)) + throw new InvalidAggregateIdException("Aggregate Id cannot be null or white space"); + + var param = new { AggregateId = aggregateId }; + + var records = await GetAsync(StoredEvent.SelectQuery, param, cancellationToken).ConfigureAwait(false); + records.ToList().ForEach(x => x.Event.WithVersionAndSequence(x.Version, x.Sequence)); + var domainEvents = records.Select(x => x.Event).ToList().AsReadOnly(); + + return domainEvents.Any() ? (TA)Activator.CreateInstance(typeof(TA), domainEvents) : null; + } + public async Task>> GetFromSequenceAsync(long sequence, int? take = null, CancellationToken cancellationToken = default) { diff --git a/Tacta.EventStore/Repository/IEventStoreRepository.cs b/Tacta.EventStore/Repository/IEventStoreRepository.cs index 7135770..ddd065e 100644 --- a/Tacta.EventStore/Repository/IEventStoreRepository.cs +++ b/Tacta.EventStore/Repository/IEventStoreRepository.cs @@ -2,16 +2,20 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using Tacta.EventStore.Domain; using Tacta.EventStore.Repository.Models; namespace Tacta.EventStore.Repository { public interface IEventStoreRepository { - Task SaveAsync(AggregateRecord aggregateRecord, IReadOnlyCollection> eventRecords, - CancellationToken cancellationToken = default); + Task SaveAsync(AggregateRecord aggregateRecord, IReadOnlyCollection> eventRecords, CancellationToken cancellationToken = default); + Task SaveAsync(IReadOnlyCollection aggregates, CancellationToken cancellationToken = default); + Task SaveAsync(T aggregateRoot) where T : IAggregateRoot; + Task SaveAsync(IEnumerable aggregateRoots) where T : IAggregateRoot; Task>> GetAsync(string aggregateId, CancellationToken cancellationToken = default); + Task GetAsync(string aggregateId, CancellationToken cancellationToken = default) where T : DomainEvent where TA : class, IAggregateRoot; Task>> GetFromSequenceAsync(long sequence, int? take = null, CancellationToken cancellationToken = default); Task>> GetUntilAsync(string aggregateId, Guid eventId, CancellationToken cancellationToken = default); Task>> GetUntilAsync(string aggregateId, long sequence, CancellationToken cancellationToken = default);