Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ES-2] Sequence change int to long #17

Merged
merged 9 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
CSPROJ_PATH: ${{ vars.CSPROJ_PATH }}

build:
runs-on: windows-2019
runs-on: windows-2022
needs: [ validate ]
steps:
- uses: actions/checkout@v3
Expand All @@ -35,8 +35,9 @@ jobs:

- name: Install LocalDb
run: |
choco install mssqlserver2014-sqllocaldb
choco install sqllocaldb
powershell -Command "Set-ExecutionPolicy Bypass"
sqllocaldb start "MSSQLLocalDB"

- name: Restore dependencies
run: dotnet restore
Expand Down
2 changes: 1 addition & 1 deletion EventStore.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ BEGIN
[AggregateId] NVARCHAR(100) NOT NULL,
[Aggregate] NVARCHAR(100) NOT NULL,
[Version] INT NOT NULL,
[Sequence] INT IDENTITY(1,1) NOT NULL,
[Sequence] BIGINT IDENTITY(1,1) NOT NULL,
[CreatedAt] DATETIME2(7) NOT NULL,
[Payload] NVARCHAR(MAX) NOT NULL) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY];

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ When creating a read model, do not forget ``` Sequence ``` property as it is man
```c#
public sealed class TicketReadModel
{
public int Sequence { get; set; } // required
public long Sequence { get; set; } // required
public string TicketId { get; set; }
public string? SprintId { get; set; }
public string BoardId { get; set; }
Expand Down
4 changes: 2 additions & 2 deletions Tacta.EventStore.Test/Projector/Projections/UserProjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@
{
public class UserProjection : Projection
{
public List<int> AppliedSequences { get; }
public List<long> AppliedSequences { get; }

public UserProjection(IProjectionRepository projectionRepository) : base(projectionRepository)
{
AppliedSequences = new List<int>();
AppliedSequences = new List<long>();
}

public async Task On(UserRegistered @event)

Check warning on line 19 in Tacta.EventStore.Test/Projector/Projections/UserProjection.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 19 in Tacta.EventStore.Test/Projector/Projections/UserProjection.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 19 in Tacta.EventStore.Test/Projector/Projections/UserProjection.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 19 in Tacta.EventStore.Test/Projector/Projections/UserProjection.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
{
AppliedSequences.Add(@event.Sequence);
}

public async Task On(UserBanned @event)

Check warning on line 24 in Tacta.EventStore.Test/Projector/Projections/UserProjection.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 24 in Tacta.EventStore.Test/Projector/Projections/UserProjection.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 24 in Tacta.EventStore.Test/Projector/Projections/UserProjection.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 24 in Tacta.EventStore.Test/Projector/Projections/UserProjection.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
{
AppliedSequences.Add(@event.Sequence);
}

public async Task On(UserVerified @event)

Check warning on line 29 in Tacta.EventStore.Test/Projector/Projections/UserProjection.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 29 in Tacta.EventStore.Test/Projector/Projections/UserProjection.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 29 in Tacta.EventStore.Test/Projector/Projections/UserProjection.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 29 in Tacta.EventStore.Test/Projector/Projections/UserProjection.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
{
if (DateTime.Now.Second % 3 == 0) throw new TimeoutException();

Expand Down
4 changes: 2 additions & 2 deletions Tacta.EventStore.Test/Projector/ResilienceTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ public async Task WhileRebuildInProgress_ProcessShouldWait()

public class VerySlowUserProjection : Projection
{
public List<int> AppliedSequences { get; }
public List<long> AppliedSequences { get; }

public VerySlowUserProjection(IProjectionRepository projectionRepository) : base(projectionRepository)
{
AppliedSequences = new List<int>();
AppliedSequences = new List<long>();
}

public async Task On(UserRegistered @event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public async Task ConcurrencyCheck()
[InlineData(4, 50, 0)]
[InlineData(4, 1, 0)]
[InlineData(2, null, 2)]
public async Task GetFromSequenceAsync(int sequence, int? take, int count)
public async Task GetFromSequenceAsync(long sequence, int? take, int count)
{
// Given
await StoreBooCreatedAndActivated("booId1");
Expand Down
10 changes: 5 additions & 5 deletions Tacta.EventStore.Test/Repository/ExceptionHandlingTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ await Assert.ThrowsAsync<InvalidAggregateIdException>(() =>
public async Task GivenSequenceLessThenZero_WhenGetFromSequenceAsync_ShouldThrowInvalidSequenceException()
{
// Given
const int sequence = -1;
const long sequence = -1;

// When + Then
await Assert.ThrowsAsync<InvalidSequenceException>(() =>
Expand All @@ -201,7 +201,7 @@ await Assert.ThrowsAsync<InvalidSequenceException>(() =>
public async Task GivenNullAggregateId_WhenGetUntilAsync_ShouldThrowInvalidAggregateIdException()
{
// Given
const int sequence = 2;
const long sequence = 2;
const string aggregateId = null;

// When + Then
Expand All @@ -213,7 +213,7 @@ await Assert.ThrowsAsync<InvalidAggregateIdException>(() =>
public async Task GivenEmptyAggregateId_WhenGetUntilAsync_ShouldThrowInvalidAggregateIdException()
{
// Given
const int sequence = 2;
const long sequence = 2;
var aggregateId = string.Empty;

// When + Then
Expand All @@ -225,7 +225,7 @@ await Assert.ThrowsAsync<InvalidAggregateIdException>(() =>
public async Task GivenSequenceLessThenZero_WhenGetUntilAsync_ShouldThrowInvalidSequenceException()
{
// Given
const int sequence = -1;
const long sequence = -1;
const string aggregateId = "AggregateId";

// When + Then
Expand All @@ -237,7 +237,7 @@ await Assert.ThrowsAsync<InvalidSequenceException>(() =>
public async Task GivenZeroSequence_WhenGetUntilAsync_ShouldThrowInvalidSequenceException()
{
// Given
const int sequence = 0;
const long sequence = 0;
const string aggregateId = "AggregateId";

// When + Then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public class UserReadModel
{
public Guid Id { get; set; }
public DateTime UpdatedAt { get; set; }
public int Sequence { get; set; }
public long Sequence { get; set; }
public Guid EventId { get; set; }
public string Name { get; set; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class UserReadModelTestBuilder
private Guid _id;
private Guid _eventId;
private string _name;
private int _sequence;
private long _sequence;
private DateTime _updatedAt;

public static UserReadModelTestBuilder Default() => new UserReadModelTestBuilder()
Expand Down Expand Up @@ -35,7 +35,7 @@ public UserReadModelTestBuilder WithName(string name)
return this;
}

public UserReadModelTestBuilder WithSequence(int sequence)
public UserReadModelTestBuilder WithSequence(long sequence)
{
_sequence = sequence;
return this;
Expand Down
26 changes: 12 additions & 14 deletions Tacta.EventStore.Test/Repository/SqlBaseTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,28 @@ namespace Tacta.EventStore.Test.Repository
{
public abstract class SqlBaseTest : IDisposable
{
public const string EventStoreTableName = "[dbo].[EventStore]";
public const string UserReadModelTableName = "[dbo].[UserReadModel]";

private const string EventStoreTableName = "[dbo].[EventStore]";

private const string MasterConnectionString =
@"Server=(localdb)\mssqlLocaldb; Database=master; Trusted_Connection=True;";

private readonly string _connectionString;
private readonly string _dbName;

protected IConnectionFactory ConnectionFactory;
protected readonly IConnectionFactory ConnectionFactory;

protected SqlBaseTest()
{
_dbName = $"TmpTestDb{Guid.NewGuid().ToString("n").Substring(0, 8)}";
_connectionString = $@"Server=(localdb)\mssqlLocaldb; Database={_dbName}; Trusted_Connection=True";
_connectionString =
$@"Server=(localdb)\mssqlLocaldb;Database={_dbName};Trusted_Connection=True;Max Pool Size=200;Connect Timeout=60";
ConnectionFactory = new SqlConnectionFactory(_connectionString);

CreateDatabase();
CreateTables();
}

public void Dispose()
{
DeleteDatabase();
}
public void Dispose() => DeleteDatabase();

private void CreateDatabase()
{
Expand All @@ -50,9 +47,8 @@ private void CreateDatabase()

private void CreateTables()
{
using var connection = new SqlConnection(_connectionString);
CreateEventStoreTable(connection);
CreateUserReadModelTable(connection);
CreateEventStoreTable();
CreateUserReadModelTable();
}

private void DeleteDatabase()
Expand All @@ -67,8 +63,9 @@ private void DeleteDatabase()
connection.Execute(dropDatabase);
}

private void CreateEventStoreTable(SqlConnection connection)
private void CreateEventStoreTable()
{
using var connection = new SqlConnection(_connectionString);
var sqlScript =
$@"CREATE TABLE {EventStoreTableName} (
[Id] UNIQUEIDENTIFIER NOT NULL,
Expand Down Expand Up @@ -101,8 +98,9 @@ CREATE NONCLUSTERED INDEX [AggregateIdIndex] ON [dbo].[EventStore]
connection.Execute(sqlScript);
}

private void CreateUserReadModelTable(SqlConnection connection)
private void CreateUserReadModelTable()
{
using var connection = new SqlConnection(_connectionString);
var sqlScript =
$@"CREATE TABLE {UserReadModelTableName}(
[Id] [uniqueidentifier] NOT NULL,
Expand Down
4 changes: 2 additions & 2 deletions Tacta.EventStore/Domain/DomainEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public abstract class DomainEvent : IDomainEvent

[JsonIgnore] public int Version { get; private set; }

[JsonIgnore] public int Sequence { get; private set; }
[JsonIgnore] public long Sequence { get; private set; }

public DateTime CreatedAt { get; set; }

Expand All @@ -27,7 +27,7 @@ protected DomainEvent(Guid id, string aggregateId, DateTime createdAt)
(Id, AggregateId, CreatedAt) = (id, aggregateId, createdAt);
}

public void WithVersionAndSequence(int version, int sequence)
public void WithVersionAndSequence(int version, long sequence)
{
(Version, Sequence) = (version, sequence);
}
Expand Down
2 changes: 1 addition & 1 deletion Tacta.EventStore/Domain/IDomainEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace Tacta.EventStore.Domain
{
public interface IDomainEvent
{
int Sequence { get; }
long Sequence { get; }

int Version { get; }

Expand Down
2 changes: 1 addition & 1 deletion Tacta.EventStore/Projector/IProjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface IProjection

Task Initialize();

int GetSequence();
long GetSequence();

Task Rebuild();
}
Expand Down
4 changes: 2 additions & 2 deletions Tacta.EventStore/Projector/Projection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Tacta.EventStore.Projector
{
public abstract class Projection : IProjection
{
private int _sequence;
private long _sequence;

private readonly IProjectionRepository _projectionRepository;

Expand All @@ -33,7 +33,7 @@ public async Task Apply(IReadOnlyCollection<IDomainEvent> events)

public async Task On(IDomainEvent @event) => await Task.FromResult(_sequence = @event.Sequence).ConfigureAwait(false);

public int GetSequence() => _sequence;
public long GetSequence() => _sequence;

public async Task Rebuild()
{
Expand Down
2 changes: 1 addition & 1 deletion Tacta.EventStore/Projector/ProjectionProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class ProjectionProcessor : IProjectionProcessor
private readonly IEventStoreRepository _eventStoreRepository;
private readonly AsyncRetryPolicy _retryPolicy;
private bool _isInitialized;
private int _pivot;
private long _pivot;
private readonly SemaphoreSlim _processingSemaphore = new SemaphoreSlim(1, 1);

public ProjectionProcessor(IEnumerable<IProjection> projections, IEventStoreRepository eventStoreRepository)
Expand Down
2 changes: 1 addition & 1 deletion Tacta.EventStore/Repository/EventStoreRecord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public sealed class EventStoreRecord<T>
{
public string AggregateId { get; set; }
public int Version { get; set; }
public int Sequence { get; set; }
public long Sequence { get; set; }
public Guid Id { get; set; }
public DateTime CreatedAt { get; set; }
public T Event { get; set; }
Expand Down
6 changes: 3 additions & 3 deletions Tacta.EventStore/Repository/EventStoreRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public async Task<IReadOnlyCollection<EventStoreRecord<T>>> GetAsync<T>(string a
return await GetAsync<T>(StoredEvent.SelectQuery, param, cancellationToken).ConfigureAwait(false);
}

public async Task<IReadOnlyCollection<EventStoreRecord<T>>> GetFromSequenceAsync<T>(int sequence,
public async Task<IReadOnlyCollection<EventStoreRecord<T>>> GetFromSequenceAsync<T>(long sequence,
int? take = null, CancellationToken cancellationToken = default)
{
if (sequence < 0)
Expand Down Expand Up @@ -113,7 +113,7 @@ public async Task<IReadOnlyCollection<EventStoreRecord<T>>> GetUntilAsync<T>(str
return await GetAsync<T>(StoredEvent.SelectUntilEventQuery, param, cancellationToken).ConfigureAwait(false);
}

public async Task<IReadOnlyCollection<EventStoreRecord<T>>> GetUntilAsync<T>(string aggregateId, int sequence, CancellationToken cancellationToken = default)
public async Task<IReadOnlyCollection<EventStoreRecord<T>>> GetUntilAsync<T>(string aggregateId, long sequence, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(aggregateId))
throw new InvalidAggregateIdException("Aggregate Id cannot be null or white space");
Expand All @@ -126,7 +126,7 @@ public async Task<IReadOnlyCollection<EventStoreRecord<T>>> GetUntilAsync<T>(str
return await GetAsync<T>(StoredEvent.SelectUntilSequenceQuery, param, cancellationToken).ConfigureAwait(false);
}

public async Task<int> GetLatestSequence(CancellationToken cancellationToken = default)
public async Task<long> GetLatestSequence(CancellationToken cancellationToken = default)
{
return await _sqlConnectionFactory.ExecuteWithTransactionIfExists(async (connection, transaction) =>
{
Expand Down
6 changes: 3 additions & 3 deletions Tacta.EventStore/Repository/IEventStoreRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ Task SaveAsync<T>(AggregateRecord aggregateRecord, IReadOnlyCollection<EventReco
CancellationToken cancellationToken = default);

Task<IReadOnlyCollection<EventStoreRecord<T>>> GetAsync<T>(string aggregateId, CancellationToken cancellationToken = default);
Task<IReadOnlyCollection<EventStoreRecord<T>>> GetFromSequenceAsync<T>(int sequence, int? take = null, CancellationToken cancellationToken = default);
Task<IReadOnlyCollection<EventStoreRecord<T>>> GetFromSequenceAsync<T>(long sequence, int? take = null, CancellationToken cancellationToken = default);
Task<IReadOnlyCollection<EventStoreRecord<T>>> GetUntilAsync<T>(string aggregateId, Guid eventId, CancellationToken cancellationToken = default);
Task<IReadOnlyCollection<EventStoreRecord<T>>> GetUntilAsync<T>(string aggregateId, int sequence, CancellationToken cancellationToken = default);
Task<int> GetLatestSequence(CancellationToken cancellationToken = default);
Task<IReadOnlyCollection<EventStoreRecord<T>>> GetUntilAsync<T>(string aggregateId, long sequence, CancellationToken cancellationToken = default);
Task<long> GetLatestSequence(CancellationToken cancellationToken = default);
Task<IReadOnlyCollection<EventStoreRecord<T>>> GetAsync<T>(string query, object param, CancellationToken cancellationToken = default);
}
}
2 changes: 1 addition & 1 deletion Tacta.EventStore/Repository/IProjectionRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace Tacta.EventStore.Repository
{
public interface IProjectionRepository
{
Task<int> GetSequenceAsync();
Task<long> GetSequenceAsync();
Task DeleteAllAsync();
}
}
4 changes: 2 additions & 2 deletions Tacta.EventStore/Repository/ProjectionRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ protected ProjectionRepository(IConnectionFactory connectionFactory, string tabl
_table = table;
}

public async Task<int> GetSequenceAsync()
public async Task<long> GetSequenceAsync()
{
using (var connection = _connectionFactory.Connection())
{
var query = $"SELECT MAX (Sequence) FROM {_table}";

var sequence = await connection.QuerySingleOrDefaultAsync<int?>(query).ConfigureAwait(false);
var sequence = await connection.QuerySingleOrDefaultAsync<long?>(query).ConfigureAwait(false);

return sequence ?? default;
}
Expand Down
2 changes: 1 addition & 1 deletion Tacta.EventStore/Repository/StoredEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ internal sealed class StoredEvent
public string AggregateId { get; set; }
public string Aggregate { get; set; }
public int Version { get; set; }
public int Sequence { get; set; }
public long Sequence { get; set; }
public Guid Id { get; set; }
public string Name { get; set; }
public DateTime CreatedAt { get; set; }
Expand Down
6 changes: 3 additions & 3 deletions Tacta.EventStore/Tacta.EventStore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
<PackageIcon></PackageIcon>
<PackageProjectUrl>https://tacta.io/</PackageProjectUrl>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<Version>1.6.1</Version>
<PackageVersion>1.6.1</PackageVersion>
<Version>1.6.2</Version>
<PackageVersion>1.6.2</PackageVersion>
<Title>Tacta EventStore Library</Title>
<PackageReleaseNotes>Added support for rebuilding projections</PackageReleaseNotes>
<PackageReleaseNotes>Sequence type changed from int to long</PackageReleaseNotes>
<Configurations>Debug;Release</Configurations>
<Platforms>AnyCPU</Platforms>
</PropertyGroup>
Expand Down
Loading