Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
<!-- For more info on central package management go to https://devblogs.microsoft.com/nuget/introducing-central-package-management/ -->
<Project>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="AwesomeAssertions" Version="8.1.0" />
<PackageVersion Include="coverlet.collector" Version="6.0.4" />
<PackageVersion Include="FluentAssertions" Version="7.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
<PackageVersion Include="NSubstitute" Version="5.3.0" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.0.2" />
<PackageVersion Include="Confluent.Kafka" Version="2.10.0" />
<PackageVersion Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.3.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Common" Version="4.10.0" />
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.2" />
<PackageVersion Include="SonarAnalyzer.CSharp" Version="10.8.0.113526" />
<PackageVersion Include="System.Linq.Async" Version="6.0.1" />
<PackageVersion Include="Microsoft.Sbom.Targets" Version="3.0.0" />
<PackageVersion Include="RocksDB" Version="9.10.0.55496" />
</ItemGroup>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="AwesomeAssertions" Version="9.1.0" />
<PackageVersion Include="coverlet.collector" Version="6.0.4" />
<PackageVersion Include="FluentAssertions" Version="7.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
<PackageVersion Include="NSubstitute" Version="5.3.0" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.1.1" />
<PackageVersion Include="Confluent.Kafka" Version="2.11.0" />
<PackageVersion Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.3.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Common" Version="4.10.0" />
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="9.0.7" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.7" />
<PackageVersion Include="SonarAnalyzer.CSharp" Version="10.13.0.120203" />
<PackageVersion Include="System.Linq.Async" Version="6.0.3" />
<PackageVersion Include="Microsoft.Sbom.Targets" Version="4.0.3" />
<PackageVersion Include="RocksDB" Version="10.2.1.58549" />
</ItemGroup>
</Project>
92 changes: 92 additions & 0 deletions examples/Examples/Aggregate/AggregateExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
using MinimalKafka;
using MinimalKafka.Stream;

namespace Examples.Aggregate;

public static class AggregateExtensions
{
/// <summary>
/// Maps an aggregate command stream to an aggregate state stream.
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TCommand"></typeparam>
/// <typeparam name="TAgrregate"></typeparam>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix typo in generic parameter name

The generic parameter is misspelled as TAgrregate instead of TAggregate in multiple locations.

-    /// <typeparam name="TAgrregate"></typeparam>
+    /// <typeparam name="TAggregate"></typeparam>
-    public static IKafkaConventionBuilder MapAggregate<TKey, TCommand, TAgrregate>(this IApplicationBuilder builder, string topic)
+    public static IKafkaConventionBuilder MapAggregate<TKey, TCommand, TAggregate>(this IApplicationBuilder builder, string topic)
-        where TAgrregate : IAggregate<TKey, TAgrregate, TCommand>
+        where TAggregate : IAggregate<TKey, TAggregate, TCommand>
-        return sb.MapAggregate<TKey, TCommand, TAgrregate>(topic);
+        return sb.MapAggregate<TKey, TCommand, TAggregate>(topic);
-    public static IKafkaConventionBuilder MapAggregate<TKey, TCommand, TAgrregate>(this IKafkaBuilder builder, string name)
+    public static IKafkaConventionBuilder MapAggregate<TKey, TCommand, TAggregate>(this IKafkaBuilder builder, string name)
-        where TAgrregate : IAggregate<TKey, TAgrregate, TCommand>
+        where TAggregate : IAggregate<TKey, TAggregate, TCommand>
-                state ??= TAgrregate.Create(cmd);
+                state ??= TAggregate.Create(cmd);
-                var result = TAgrregate.Apply(state, cmd);
+                var result = TAggregate.Apply(state, cmd);

Also applies to: 17-17, 35-35, 37-37

🤖 Prompt for AI Agents
In examples/Examples/Aggregate/AggregateExtensions.cs at lines 13, 17, 35, and
37, the generic parameter name is misspelled as TAgrregate. Correct the spelling
to TAggregate consistently in all these locations to fix the typo.

/// <param name="builder"></param>
/// <param name="topic"></param>
/// <returns></returns>
public static IKafkaConventionBuilder MapAggregate<TKey, TCommand, TAgrregate>(this IApplicationBuilder builder, string topic)
where TKey : notnull
where TAgrregate : IAggregate<TKey, TAgrregate, TCommand>
where TCommand : IAggregateCommands<TKey>
{
var sb = builder.ApplicationServices.GetRequiredService<IKafkaBuilder>();
return sb.MapAggregate<TKey, TCommand, TAgrregate>(topic);
}

/// <summary>
/// Maps an aggregate command stream to an aggregate state stream.
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TCommand"></typeparam>
/// <typeparam name="TState"></typeparam>
/// <param name="builder"></param>
/// <param name="name"></param>
/// <returns></returns>
public static IKafkaConventionBuilder MapAggregate<TKey, TCommand, TAgrregate>(this IKafkaBuilder builder, string name)
where TKey : notnull
where TAgrregate : IAggregate<TKey, TAgrregate, TCommand>
where TCommand : IAggregateCommands<TKey>
{
return builder.MapStream<TKey, TCommand>($"{name}-commands")
.Join<TKey, TAgrregate>(name).OnKey()
.Into(async (c, key, join) =>
{
var (cmd, state) = join;

if (cmd is null || c.ConsumerKey.TopicName == name)
{
return;
}

state ??= TAgrregate.Create(cmd).State;

if (cmd.Version != state.Version)
{
await c.ProduceAsync($"{name}-errors",
key,
CommandResult.Create(Result.Failed(state, $"Invalid command version: {cmd.Version}, expected: {state.Version}"), cmd));
return;
}

var result = TAgrregate.Apply(state, cmd);

if (result.IsSuccess)
{
await c.ProduceAsync(name, key, result.State);
}
else
{
await c.ProduceAsync($"{name}-errors", key,
CommandResult.Create(result, cmd));
}
});
}
}

internal class CommandResult
{
public static CommandResult<T, TCmd> Create<T, TCmd>(Result<T> result, TCmd command)
=> new()
{
Command = command,
State = result.State,
IsSuccess = result.IsSuccess,
ErrorMessage = result.ErrorMessage,
};
}

internal class CommandResult<TState, TCommand> : Result<TState>
{
public required TCommand Command { get; init; }

}
10 changes: 10 additions & 0 deletions examples/Examples/Aggregate/IAggregate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace Examples.Aggregate;

public interface IAggregate<TKey, TState, TCommand>
{
TKey Id { get; }
int Version { get; }

abstract static Result<TState> Apply(TState state, TCommand command);
abstract static Result<TState> Create(TCommand command);
}
8 changes: 8 additions & 0 deletions examples/Examples/Aggregate/IAggregateCommands.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Examples.Aggregate;

public interface IAggregateCommands<TKey>
{
TKey Id { get; }
int Version { get; }
string CommandName { get; }
}
26 changes: 26 additions & 0 deletions examples/Examples/Aggregate/Result.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace Examples.Aggregate;

public static class Result
{
public static Result<TResult> Failed<TResult>(TResult value, params string[] errorMessage)
=> new()
{
State = value,
ErrorMessage = errorMessage,
IsSuccess = false
};
}

public class Result<T>
{
public required T State { get; init; }
public bool IsSuccess { get; init; } = true;
public string[] ErrorMessage { get; init; } = [];

public static implicit operator Result<T>(T value)
=> new()
{
State = value,
IsSuccess = true
};
}
87 changes: 87 additions & 0 deletions examples/Examples/Aggregate/TestAggregate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
namespace Examples.Aggregate;

public class TestCommands : IAggregateCommands<Guid>
{
public Guid Id { get; init; } = Guid.NewGuid();
public required int Version { get; init; }
public required string CommandName { get; init; }

public SetCounter? SetCounter { get; set; }

}

public record SetCounter(int Counter);


public record Test : IAggregate<Guid, Test, TestCommands>
{
public Guid Id { get; init; }
public int Version { get; init; }
public int Counter { get; init; }

public static Result<Test> Create(TestCommands command)
=> new Test() { Id = command.Id, Version = 0 };

public static Result<Test> Apply(Test state, TestCommands command)
{
var result = command.CommandName switch
{
nameof(Create) => Create(command),
nameof(Increment) => state.Increment(),
nameof(Decrement) => state.Decrement(),
nameof(SetCounter) => state.SetCounter(command.SetCounter!),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add null check for SetCounter command

The null-forgiving operator ! is used on command.SetCounter, but the SetCounter property is nullable. This could cause a NullReferenceException if the property is null when the SetCounter command is processed.

-            nameof(SetCounter) => state.SetCounter(command.SetCounter!),
+            nameof(SetCounter) => command.SetCounter is not null 
+                ? state.SetCounter(command.SetCounter) 
+                : Result.Failed(state, "SetCounter command payload is required"),

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In examples/Examples/Aggregate/TestAggregate.cs at line 32, the code uses the
null-forgiving operator on the nullable property command.SetCounter, which risks
a NullReferenceException if it is null. Modify the code to add an explicit null
check for command.SetCounter before calling state.SetCounter, and handle the
null case appropriately to avoid exceptions.

_ => Result.Failed(state, "Unknown command: " + command.CommandName)
};

if (result.IsSuccess)
{
return result.State with { Version = state.Version + 1 };
}

return result;
}

public Result<Test> Increment()
{
if (Counter >= 100)
{
return Result.Failed(this, "Counter cannot exceed 100.");
}

return this with
{
Counter = Counter + 1
};
}

public Result<Test> Decrement()
{
if (Counter <= 0)
{
return Result.Failed(this, "Counter cannot be less than 0.");
}

return this with
{
Counter = Counter - 1
};
}

public Result<Test> SetCounter(SetCounter cmd)
{
if(cmd.Counter < 0)
{
return Result.Failed(this, "Counter cannot be less than 0.");
}

if(cmd.Counter > 100)
{
return Result.Failed(this, "Counter connot be more then 100.");
}

return this with
{
Counter = cmd.Counter
};
}
}
5 changes: 0 additions & 5 deletions examples/Examples/Examples.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@
<UserSecretsId>4ccc947d-36d3-4464-b8f8-695155c5984f</UserSecretsId>
</PropertyGroup>

<ItemGroup>
<Compile Remove="InnerJoin.cs" />
<Compile Remove="LeftJoin.cs" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\MinimalKafka.RocksDB\MinimalKafka.RocksDB.csproj" />
<ProjectReference Include="..\..\src\MinimalKafka\MinimalKafka.csproj" />
Expand Down
Loading
Loading