Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
<!-- For more info on central package management go to https://devblogs.microsoft.com/nuget/introducing-central-package-management/ -->
<Project>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="AwesomeAssertions" Version="8.1.0" />
<PackageVersion Include="coverlet.collector" Version="6.0.4" />
<PackageVersion Include="FluentAssertions" Version="7.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
<PackageVersion Include="NSubstitute" Version="5.3.0" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.0.2" />
<PackageVersion Include="Confluent.Kafka" Version="2.10.0" />
<PackageVersion Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.3.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Common" Version="4.10.0" />
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.2" />
<PackageVersion Include="SonarAnalyzer.CSharp" Version="10.8.0.113526" />
<PackageVersion Include="System.Linq.Async" Version="6.0.1" />
<PackageVersion Include="Microsoft.Sbom.Targets" Version="3.0.0" />
<PackageVersion Include="RocksDB" Version="9.10.0.55496" />
</ItemGroup>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="AwesomeAssertions" Version="9.1.0" />
<PackageVersion Include="coverlet.collector" Version="6.0.4" />
<PackageVersion Include="FluentAssertions" Version="7.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
<PackageVersion Include="NSubstitute" Version="5.3.0" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.1.1" />
<PackageVersion Include="Confluent.Kafka" Version="2.11.0" />
<PackageVersion Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.3.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Common" Version="4.10.0" />
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="9.0.7" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.7" />
<PackageVersion Include="SonarAnalyzer.CSharp" Version="10.13.0.120203" />
<PackageVersion Include="System.Linq.Async" Version="6.0.3" />
<PackageVersion Include="Microsoft.Sbom.Targets" Version="4.0.3" />
<PackageVersion Include="RocksDB" Version="10.2.1.58549" />
</ItemGroup>
</Project>
7 changes: 7 additions & 0 deletions MinimalKafka.sln
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaAdventure", "examples\
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MinimalKafka.RocksDB.Tests", "test\MinimalKafka.RocksDB.Tests\MinimalKafka.RocksDB.Tests.csproj", "{AFF59BB3-8ECD-AC56-2421-5251AD2F50E1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MinimalKafka.Aggregates", "src\MinimalKafka.Aggregates\MinimalKafka.Aggregates.csproj", "{C96993B7-7A2E-4F79-8D05-357E72AABADF}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -77,6 +79,10 @@ Global
{AFF59BB3-8ECD-AC56-2421-5251AD2F50E1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AFF59BB3-8ECD-AC56-2421-5251AD2F50E1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AFF59BB3-8ECD-AC56-2421-5251AD2F50E1}.Release|Any CPU.Build.0 = Release|Any CPU
{C96993B7-7A2E-4F79-8D05-357E72AABADF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C96993B7-7A2E-4F79-8D05-357E72AABADF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C96993B7-7A2E-4F79-8D05-357E72AABADF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C96993B7-7A2E-4F79-8D05-357E72AABADF}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -88,6 +94,7 @@ Global
{73E4A183-B305-4450-AC5D-3CAD94FEFE0E} = {A09FD86F-4832-467D-96BD-74DDD5066B2E}
{36D263AF-C209-437D-9528-2DD7BF78B0D8} = {FF927BA2-E4C3-4D02-ACD4-251E68C149E4}
{AFF59BB3-8ECD-AC56-2421-5251AD2F50E1} = {1C8F1D9A-608B-4F46-8A59-3CF48AA4AF9D}
{C96993B7-7A2E-4F79-8D05-357E72AABADF} = {A09FD86F-4832-467D-96BD-74DDD5066B2E}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {E3EFB6EF-B785-4560-A9DB-E5BB7DADBFF4}
Expand Down
105 changes: 105 additions & 0 deletions examples/Examples/Aggregate/Test.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using MinimalKafka.Aggregates;

namespace Examples.Aggregate;

/// <summary>
/// Example aggregate for testing purposes. Supports increment, decrement, and set operations on a counter.
/// </summary>
public record Test : IAggregate<Guid, Test, TestCommands>
{
/// <summary>
/// Gets the aggregate identifier.
/// </summary>
public Guid Id { get; init; }

/// <summary>
/// Gets the aggregate version.
/// </summary>
public int Version { get; init; }

/// <summary>
/// Gets the current counter value.
/// </summary>
public int Counter { get; init; }

/// <summary>
/// Creates a new instance of <see cref="Test"/> aggregate from a command.
/// </summary>
/// <param name="command">The command to initialize the aggregate.</param>
/// <returns>A new <see cref="Test"/> aggregate wrapped in a <see cref="Result{Test}"/>.</returns>
public static Result<Test> Create(TestCommands command)
=> new Test() { Id = command.Id, Version = 0 };

/// <summary>
/// Applies a command to the current state and returns the resulting state.
/// </summary>
/// <param name="state">The current aggregate state.</param>
/// <param name="command">The command to apply.</param>
/// <returns>The new state as a <see cref="Result{Test}"/>.</returns>
public static Result<Test> Apply(Test state, TestCommands command)
{
var result = command.CommandName switch
{
nameof(Create) => Create(command),
nameof(Increment) => state.Increment(),
nameof(Decrement) => state.Decrement(),
nameof(SetCounter) => state.SetCounter(command.SetCounter!),
_ => Result.Failed(state, "Unknown command: " + command.CommandName)
};

if (result.IsSuccess)
{
return result.State with { Version = state.Version + 1 };
}

return result;
}

/// <summary>
/// Increments the counter by one.
/// </summary>
/// <returns>New state if successful, or failed result if out of bounds.</returns>
public Result<Test> Increment()
{
if (Counter >= 100)
{
return Result.Failed(this, "Counter cannot exceed 100.");
}

return this with { Counter = Counter + 1 };
}

/// <summary>
/// Decrements the counter by one.
/// </summary>
/// <returns>New state if successful, or failed result if out of bounds.</returns>
public Result<Test> Decrement()
{
if (Counter <= 0)
{
return Result.Failed(this, "Counter cannot be less than 0.");
}

return this with { Counter = Counter - 1 };
}

/// <summary>
/// Sets the counter to a specific value using the <see cref="SetCounter"/> command.
/// </summary>
/// <param name="cmd">The command containing the new counter value.</param>
/// <returns>New state if within bounds, or failed result otherwise.</returns>
public Result<Test> SetCounter(SetCounter cmd)
{
if (cmd.Counter < 0)
{
return Result.Failed(this, "Counter cannot be less than 0.");
}

if (cmd.Counter > 100)
{
return Result.Failed(this, "Counter cannot be more than 100.");
}

return this with { Counter = cmd.Counter };
}
}
14 changes: 14 additions & 0 deletions examples/Examples/Aggregate/TestCommands.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using MinimalKafka.Aggregates;

namespace Examples.Aggregate;

public class TestCommands : IAggregateCommands<Guid>
{
public Guid Id { get; init; } = Guid.NewGuid();
public required int Version { get; init; }
public required string CommandName { get; init; }
public SetCounter? SetCounter { get; set; }

}

public record SetCounter(int Counter);
6 changes: 1 addition & 5 deletions examples/Examples/Examples.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@
</PropertyGroup>

<ItemGroup>
<Compile Remove="InnerJoin.cs" />
<Compile Remove="LeftJoin.cs" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\MinimalKafka.Aggregates\MinimalKafka.Aggregates.csproj" />
<ProjectReference Include="..\..\src\MinimalKafka.RocksDB\MinimalKafka.RocksDB.csproj" />
<ProjectReference Include="..\..\src\MinimalKafka\MinimalKafka.csproj" />
</ItemGroup>
Expand Down
97 changes: 57 additions & 40 deletions examples/Examples/Program.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using Confluent.Kafka;
using Examples;
using Examples.Aggregate;
using MinimalKafka;
using MinimalKafka.Extension;
using MinimalKafka.Serializers;
using MinimalKafka.Aggregates;
using MinimalKafka.Stream;

var builder = WebApplication.CreateBuilder(args);
Expand All @@ -11,66 +11,83 @@
{
config
.WithConfiguration(builder.Configuration.GetSection("Kafka"))
.WithBootstrapServers("nas:9092")
.WithGroupId(AppDomain.CurrentDomain.FriendlyName)
.WithClientId(AppDomain.CurrentDomain.FriendlyName)
.WithTransactionalId(AppDomain.CurrentDomain.FriendlyName)
Comment on lines +15 to +17
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Using AppDomain.CurrentDomain.FriendlyName for multiple Kafka settings may cause issues.

Using the same value for GroupId, ClientId, and TransactionalId might not be optimal. These should typically be different values to avoid conflicts, especially in distributed environments.

-           .WithGroupId(AppDomain.CurrentDomain.FriendlyName)
-           .WithClientId(AppDomain.CurrentDomain.FriendlyName)
-           .WithTransactionalId(AppDomain.CurrentDomain.FriendlyName)
+           .WithGroupId($"{AppDomain.CurrentDomain.FriendlyName}-group")
+           .WithClientId($"{AppDomain.CurrentDomain.FriendlyName}-client")
+           .WithTransactionalId($"{AppDomain.CurrentDomain.FriendlyName}-tx")
🤖 Prompt for AI Agents
In examples/Examples/Program.cs around lines 13 to 15, the same value from
AppDomain.CurrentDomain.FriendlyName is used for GroupId, ClientId, and
TransactionalId, which can cause conflicts in Kafka settings. Update the code to
assign distinct and meaningful values to each of these properties to avoid
potential issues in distributed environments.

.WithOffsetReset(AutoOffsetReset.Earliest)
.WithPartitionAssignedHandler((_, p) => p.Select(tp => new TopicPartitionOffset(tp, Offset.Beginning)))
.WithJsonSerializers()
.UseRocksDB();
.UseRocksDB(x =>
{
x.DataPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "RocksDB");
});

});

var app = builder.Build();

app.MapTopic("my-topic", ([FromKey] string key, [FromValue] string value) =>
{
Console.WriteLine($"Received: {key} - {value}");

Console.WriteLine("##################");
Console.WriteLine("my-topic");
Console.WriteLine("##################");
});
app.MapAggregate<Test, Guid, TestCommands>("tests");

app.MapStream<Guid, LeftObject>("left")
.Join<int, RightObject>("right").On((l, r) => l.RightObjectId == r.Id)
.Into((c, v) =>
{
var (left, right) = v;

Console.WriteLine("##################");
Console.WriteLine("LEFT Join Right");
Console.WriteLine("##################");
//app.MapTopic("my-topic", ([FromKey] string key, [FromValue] string value) =>
//{
// Console.WriteLine($"Received: {key} - {value}");

return Task.CompletedTask;
}).WithGroupId("group1");
// Console.WriteLine("##################");
// Console.WriteLine("my-topic");
// Console.WriteLine("##################");
//});

app.MapStream<Guid, LeftObject>("left")
.Into(async (c, k, v) =>
{
v = v with { RightObjectId = 2 };

Console.WriteLine("##################");
Console.WriteLine("LEFT INTO UPDATE");
Console.WriteLine("##################");
//app.MapTopic("my-topic", ([FromKey] string key, [FromValue] string value) =>
//{
// Console.WriteLine($"Received: {key} - {value}");

await c.ProduceAsync("left-update", k, v);
}).WithGroupId("group2");
// Console.WriteLine("##################");
// Console.WriteLine("my-topic");
// Console.WriteLine("##################");
//});

//app.MapStream<Guid, LeftObject>("left")
// .Join<int, RightObject>("right").On((l, r) => l.RightObjectId == r.Id)
// .Into((c, v) =>
// {
// var (left, right) = v;

app.MapStream<int, RightObject>("right")
.Join<Guid, LeftObject>("left").On((k, v) => k, (k, v) => v.RightObjectId)
.Into((c, k, v) =>
{
var (left, right) = v;
// Console.WriteLine("##################");
// Console.WriteLine("LEFT Join Right");
// Console.WriteLine("##################");

Console.WriteLine("##################");
Console.WriteLine("RIGHT JOIN LEFT");
Console.WriteLine("##################");
// return Task.CompletedTask;
// });

return Task.CompletedTask;
})
.WithGroupId("group3");
//app.MapStream<Guid, LeftObject>("left")
// .Into(async (c, k, v) =>
// {
// v = v with { RightObjectId = 2 };

// Console.WriteLine("##################");
// Console.WriteLine("LEFT INTO UPDATE");
// Console.WriteLine("##################");

// await c.ProduceAsync("left-update", k, v);
// });


//app.MapStream<int, RightObject>("right")
// .Join<Guid, LeftObject>("left").On((k, v) => k, (k, v) => v.RightObjectId)
// .Into((c, k, v) =>
// {
// var (left, right) = v;

// Console.WriteLine("##################");
// Console.WriteLine("RIGHT JOIN LEFT");
// Console.WriteLine("##################");

// return Task.CompletedTask;
// });


await app.RunAsync();
14 changes: 0 additions & 14 deletions examples/KafkaAdventure/Extensions/KafkaBuilderExtensions.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using KafkaAdventure.Extensions;
using MinimalKafka.Extension;
using MinimalKafka;
using MinimalKafka.Stream;

namespace KafkaAdventure.Features.CommandProcessor;
Expand All @@ -16,7 +15,7 @@ public static void MapProcessor(this WebApplication app)
x.Branch(
(k, v) => v.IsCommand("HELP"),
(c,k,v) => c.ProduceAsync("game-response", k,
new Response(v.Cmd, "Commands: go [north/south/east/west], look, take [item], inventory"))
new Response(v.Cmd, "Commands: go [north/south/east/west], look, take [item], inventory"))
);

x.Branch(
Expand Down Expand Up @@ -45,9 +44,8 @@ public static void MapProcessor(this WebApplication app)

x.DefaultBranch((c, k, v)
=> c.ProduceAsync("game-response", k,
new Response(v.Cmd, $"The command '{v.Cmd}' is invalid!")));
})
.AsFeature("Commands");
new Response(v.Cmd, $"The command '{v.Cmd}' is invalid!")));
});
}

public record Command(string Cmd, string[] Args)
Expand Down
Loading
Loading