Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@
<PackageVersion Include="SonarAnalyzer.CSharp" Version="10.8.0.113526" />
<PackageVersion Include="System.Linq.Async" Version="6.0.1" />
<PackageVersion Include="Microsoft.Sbom.Targets" Version="3.0.0" />
<PackageVersion Include="RocksDB" Version="9.10.0.55496" />
</ItemGroup>
</Project>
22 changes: 21 additions & 1 deletion MinimalKafka.sln
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.0.31903.59
Expand Down Expand Up @@ -42,6 +41,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{6E4099F2-3
docs\toc.yml = docs\toc.yml
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MinimalKafka.RocksDB", "src\MinimalKafka.RocksDB\MinimalKafka.RocksDB.csproj", "{73E4A183-B305-4450-AC5D-3CAD94FEFE0E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MinimalKafka.RockDB.Tests", "test\MinimalKafka.RockDB.Tests\MinimalKafka.RockDB.Tests.csproj", "{98C13E58-329B-4574-9DB0-A569DEC0ECC2}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaAdventure", "examples\KafkaAdventure\KafkaAdventure.csproj", "{36D263AF-C209-437D-9528-2DD7BF78B0D8}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -60,6 +65,18 @@ Global
{CC13EBD7-29C8-4619-8CCB-0D7DC0F8AD96}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CC13EBD7-29C8-4619-8CCB-0D7DC0F8AD96}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CC13EBD7-29C8-4619-8CCB-0D7DC0F8AD96}.Release|Any CPU.Build.0 = Release|Any CPU
{73E4A183-B305-4450-AC5D-3CAD94FEFE0E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{73E4A183-B305-4450-AC5D-3CAD94FEFE0E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{73E4A183-B305-4450-AC5D-3CAD94FEFE0E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{73E4A183-B305-4450-AC5D-3CAD94FEFE0E}.Release|Any CPU.Build.0 = Release|Any CPU
{98C13E58-329B-4574-9DB0-A569DEC0ECC2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{98C13E58-329B-4574-9DB0-A569DEC0ECC2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{98C13E58-329B-4574-9DB0-A569DEC0ECC2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{98C13E58-329B-4574-9DB0-A569DEC0ECC2}.Release|Any CPU.Build.0 = Release|Any CPU
{36D263AF-C209-437D-9528-2DD7BF78B0D8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{36D263AF-C209-437D-9528-2DD7BF78B0D8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{36D263AF-C209-437D-9528-2DD7BF78B0D8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{36D263AF-C209-437D-9528-2DD7BF78B0D8}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -68,6 +85,9 @@ Global
{AD3D9E12-10F9-463D-A918-DDCF6286409E} = {A09FD86F-4832-467D-96BD-74DDD5066B2E}
{C58EB934-4DB2-4572-AEAE-9329057A94A7} = {1C8F1D9A-608B-4F46-8A59-3CF48AA4AF9D}
{CC13EBD7-29C8-4619-8CCB-0D7DC0F8AD96} = {FF927BA2-E4C3-4D02-ACD4-251E68C149E4}
{73E4A183-B305-4450-AC5D-3CAD94FEFE0E} = {A09FD86F-4832-467D-96BD-74DDD5066B2E}
{98C13E58-329B-4574-9DB0-A569DEC0ECC2} = {1C8F1D9A-608B-4F46-8A59-3CF48AA4AF9D}
{36D263AF-C209-437D-9528-2DD7BF78B0D8} = {FF927BA2-E4C3-4D02-ACD4-251E68C149E4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {E3EFB6EF-B785-4560-A9DB-E5BB7DADBFF4}
Expand Down
3 changes: 2 additions & 1 deletion examples/Examples/Examples.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
Expand All @@ -13,6 +13,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\MinimalKafka.RocksDB\MinimalKafka.RocksDB.csproj" />
<ProjectReference Include="..\..\src\MinimalKafka\MinimalKafka.csproj" />
</ItemGroup>

Expand Down
43 changes: 26 additions & 17 deletions examples/Examples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,25 @@
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddMinimalKafka(config =>
{
config
.WithBootstrapServers("localhost:19092")
.WithOffsetReset(AutoOffsetReset.Earliest)
.WithPartitionAssignedHandler((_, p) => {
return p.Select(tp => new TopicPartitionOffset(tp, Offset.Beginning));
})
.WithJsonSerializers()
.WithInMemoryStore();
});
{
config
.WithConfiguration(builder.Configuration.GetSection("Kafka"))
.WithOffsetReset(AutoOffsetReset.Earliest)
.WithPartitionAssignedHandler((_, p) => p.Select(tp => new TopicPartitionOffset(tp, Offset.Beginning)))
.WithJsonSerializers()
.UseRocksDB();

});

var app = builder.Build();

app.MapTopic("my-topic", ([FromKey] string key, [FromValue] string value) =>
{
Console.WriteLine($"Received: {key} - {value}");

Console.WriteLine("##################");
Console.WriteLine("my-topic");
Console.WriteLine("##################");
});

app.MapStream<Guid, LeftObject>("left")
Expand All @@ -32,13 +35,22 @@
{
var (left, right) = v;

Console.WriteLine("##################");
Console.WriteLine("LEFT Join Right");
Console.WriteLine("##################");

return Task.CompletedTask;
}).WithGroupId("group1");

app.MapStream<Guid, LeftObject>("left")
.Into(async (c, k, v) =>
{
v = v with { RightObjectId = 2 };

Console.WriteLine("##################");
Console.WriteLine("LEFT INTO UPDATE");
Console.WriteLine("##################");

await c.ProduceAsync("left-update", k, v);
}).WithGroupId("group2");

Expand All @@ -49,18 +61,15 @@
{
var (left, right) = v;

Console.WriteLine("##################");
Console.WriteLine("RIGHT JOIN LEFT");
Console.WriteLine("##################");

return Task.CompletedTask;
})
.WithGroupId("group3");


app.MapStream<int, RightObject>("right")
.Join<Guid, LeftObject>("left").On((k, v) => k, (k, v) => v.RightObjectId)
.Into((c, k, v) =>
{
throw new InvalidOperationException("this will not commit");
})
.WithGroupId("group4");



Expand Down
14 changes: 14 additions & 0 deletions examples/KafkaAdventure/Extensions/KafkaBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using MinimalKafka.Builders;
using MinimalKafka.Extension;

namespace KafkaAdventure.Extensions;

public static class KafkaBuilderExtensions
{
public static IKafkaConventionBuilder AsFeature(this IKafkaConventionBuilder builder, string featureName)
{
builder.WithClientId(featureName);
builder.WithGroupId(featureName + "-" + Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT"));
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using KafkaAdventure.Extensions;
using MinimalKafka.Extension;
using MinimalKafka.Stream;

namespace KafkaAdventure.Features.CommandProcessor;

public static class ProcessorFeature
{
public static void MapProcessor(this WebApplication app)
{
Console.WriteLine("Starting Up ProcessorFeature");

app.MapStream<string, Command>("game-commands")
.SplitInto(x =>
{
x.Branch(
(k, v) => v.IsCommand("HELP"),
(c,k,v) => c.ProduceAsync("game-response", k,
new Response(v.Cmd, "Commands: go [north/south/east/west], look, take [item], inventory"))
Comment on lines +18 to +19
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for async operations.

The ProduceAsync calls lack error handling, which could lead to silent failures in production environments.

Consider adding try-catch blocks or using a consistent error handling pattern:

-x.Branch(
-    (k, v) => v.IsCommand("HELP"),
-    (c,k,v) => c.ProduceAsync("game-response", k, 
-    new Response(v.Cmd, "Commands: go [north/south/east/west], look, take [item], inventory"))
-);
+x.Branch(
+    (k, v) => v.IsCommand("HELP"),
+    async (c,k,v) => {
+        try {
+            await c.ProduceAsync("game-response", k, 
+                new Response(v.Cmd, "Commands: go [north/south/east/west], look, take [item], inventory"));
+        } catch (Exception ex) {
+            Console.WriteLine($"Error producing HELP response: {ex.Message}");
+        }
+    }
+);

Also applies to: 24-25, 30-32, 37-37, 42-43, 47-48

🤖 Prompt for AI Agents
In examples/KafkaAdventure/Features/CommandProcessor/ProcessorFeature.cs around
lines 18-19 and also lines 24-25, 30-32, 37, 42-43, and 47-48, the ProduceAsync
calls lack error handling which can cause silent failures. Wrap each
ProduceAsync call in a try-catch block to catch exceptions and handle or log
errors appropriately to ensure failures are detected and managed.

);

x.Branch(
(k, v) => v.IsCommand("GO"),
(c, k, v) => c.ProduceAsync("game-movement", k,
new { v.Cmd, Direction = string.Join("", v.Args) })
);
Comment on lines +24 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Review the direction concatenation logic.

The string.Join("", v.Args) concatenates direction arguments without spaces, which could cause issues with multi-word directions. For example, "go north east" would become "northeast" instead of "north east".

Consider using a space separator or taking only the first argument:

-new { v.Cmd, Direction = string.Join("", v.Args) })
+new { v.Cmd, Direction = string.Join(" ", v.Args) })

Or if only single-word directions are expected:

-new { v.Cmd, Direction = string.Join("", v.Args) })
+new { v.Cmd, Direction = v.Args.FirstOrDefault() ?? "" })
🤖 Prompt for AI Agents
In examples/KafkaAdventure/Features/CommandProcessor/ProcessorFeature.cs around
lines 24 to 26, the direction concatenation uses string.Join with an empty
separator, which merges multiple direction words without spaces. To fix this,
change the separator in string.Join from "" to " " so that multi-word directions
remain properly spaced, e.g., string.Join(" ", v.Args). Alternatively, if only
single-word directions are expected, use just the first argument from v.Args
instead of joining all.


x.Branch(
(k, v) => v.IsCommand("LOOK"),
(c, k, v) => c.ProduceAsync("game-movement", k,
new { Cmd = "GO", Direction = "LOOK" }
)
);

x.Branch(
(k, v) => v.IsCommand("INVENTORY"),
(c, k, v) => c.ProduceAsync("game-inventory", k, v)
);

x.Branch(
(k, v) => v.IsCommand("ECHO"),
(c, k, v) => c.ProduceAsync("game-response", k,
new Response(v.Cmd, $"ECHO: {string.Join(' ', v.Args)}"))
);

x.DefaultBranch((c, k, v)
=> c.ProduceAsync("game-response", k,
new Response(v.Cmd, $"The command '{v.Cmd}' is invalid!")));
})
.AsFeature("Commands");
}

public record Command(string Cmd, string[] Args)
{
public bool IsCommand(string s) => Cmd.StartsWith(s, StringComparison.InvariantCultureIgnoreCase);
};

public record Response(string Command, string Value);
}
26 changes: 26 additions & 0 deletions examples/KafkaAdventure/Features/Input/InputFeature.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using KafkaAdventure.Extensions;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.SignalR;
using MinimalKafka;
using MinimalKafka.Metadata;

namespace KafkaAdventure.Features.Input;

public static class InputFeature
{
public static void MapInput<T>(this T app)
where T : IEndpointRouteBuilder, IApplicationBuilder
{
Console.WriteLine("Starting Up InputFeature");
app.MapHub<InputHub>("/input");
app.MapTopic("game-response", HandleAsync)
.AsFeature("Input");
}

public static async Task HandleAsync([FromServices] IHubContext<InputHub> hub, [FromKey] string gameId, [FromValue] Response response)
{
await hub.Clients.Group(gameId).SendAsync("ReceiveMessage", response.Value);
}
Comment on lines +20 to +23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling to the HandleAsync method

The HandleAsync method lacks error handling for the SignalR hub operation, which could lead to unhandled exceptions.

    public static async Task HandleAsync([FromServices] IHubContext<InputHub> hub, [FromKey] string gameId, [FromValue] Response response)
    {
+       try {
            await hub.Clients.Group(gameId).SendAsync("ReceiveMessage", response.Value);
+       } catch (Exception ex) {
+           // Get ILogger from DI or use a static logger
+           Console.Error.WriteLine($"Error sending message to group {gameId}: {ex.Message}");
+       }
    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public static async Task HandleAsync([FromServices] IHubContext<InputHub> hub, [FromKey] string gameId, [FromValue] Response response)
{
await hub.Clients.Group(gameId).SendAsync("ReceiveMessage", response.Value);
}
public static async Task HandleAsync([FromServices] IHubContext<InputHub> hub, [FromKey] string gameId, [FromValue] Response response)
{
try {
await hub.Clients.Group(gameId).SendAsync("ReceiveMessage", response.Value);
} catch (Exception ex) {
// Get ILogger from DI or use a static logger
Console.Error.WriteLine($"Error sending message to group {gameId}: {ex.Message}");
}
}


public record Response(string Command, string Value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Duplicate record definition - Response is already defined in InputHub.cs

The Response record is already defined in InputHub.cs. Having duplicate definitions can lead to confusion and potential bugs. Consider removing this definition and using the one from InputHub.cs.

-    public record Response(string Command, string Value);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public record Response(string Command, string Value);

}
36 changes: 36 additions & 0 deletions examples/KafkaAdventure/Features/Input/InputHub.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using Confluent.Kafka;
using Microsoft.AspNetCore.SignalR;

namespace KafkaAdventure.Features;

public class InputHub(
IProducer<string, Response> response,

Check warning on line 7 in examples/KafkaAdventure/Features/Input/InputHub.cs

View workflow job for this annotation

GitHub Actions / Build

Parameter 'response' is unread.

Check warning on line 7 in examples/KafkaAdventure/Features/Input/InputHub.cs

View workflow job for this annotation

GitHub Actions / Build

Parameter 'response' is unread.

Check warning on line 7 in examples/KafkaAdventure/Features/Input/InputHub.cs

View workflow job for this annotation

GitHub Actions / Build

Parameter 'response' is unread.

Check warning on line 7 in examples/KafkaAdventure/Features/Input/InputHub.cs

View workflow job for this annotation

GitHub Actions / Build

Parameter 'response' is unread.

Check warning on line 7 in examples/KafkaAdventure/Features/Input/InputHub.cs

View workflow job for this annotation

GitHub Actions / Analyze

Parameter 'response' is unread.

Check warning on line 7 in examples/KafkaAdventure/Features/Input/InputHub.cs

View workflow job for this annotation

GitHub Actions / Analyze

Parameter 'response' is unread.
IProducer<string, Command> command
) : Hub
Comment on lines +6 to +9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Unused parameter warning - the 'response' parameter is never used

The constructor parameter response is injected but never used in any method. Consider removing it if not needed or using it for message responses.

public class InputHub(
-    IProducer<string, Response> response,
    IProducer<string, Command> command
) : Hub
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public class InputHub(
IProducer<string, Response> response,
IProducer<string, Command> command
) : Hub
public class InputHub(
IProducer<string, Command> command
) : Hub
🧰 Tools
🪛 GitHub Check: Build

[warning] 7-7:
Parameter 'response' is unread.


[warning] 7-7:
Parameter 'response' is unread.

{
public async Task JoinGame(string gameId)
{
await Groups.AddToGroupAsync(Context.ConnectionId, gameId);

await Clients.Group(gameId).SendAsync("ReceiveMessage", "Welcome to 'a Kafka Adventure'");
await Clients.Group(gameId).SendAsync("ReceiveMessage", "You are an aspiring adventurer in search of the legendary relic.");
await Clients.Group(gameId).SendAsync("ReceiveMessage", "Type your commands to explore the world. Type 'help' for a list of commands.");
}

public async Task SendMessage(string gameId, string message)
{
if(string.IsNullOrWhiteSpace(message))
{
return;
}

var cmd = message.Split(' ');
await command.ProduceAsync("game-commands", new()
{
Key = gameId,
Value = new Command(cmd.First(), [.. cmd.Skip(1)])
});
}
}
public record Response(string Command, string Value);
public record Command(string cmd, string[] Args);
9 changes: 9 additions & 0 deletions examples/KafkaAdventure/Features/Locations/Location.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace KafkaAdventure.Features.Locations;

public class Location
{
public int Id { get; set; }
public required string Name { get; set; }
public required string Description { get; set; }
public required Dictionary<string, string> Exits { get; set; }
}
28 changes: 28 additions & 0 deletions examples/KafkaAdventure/Features/Locations/LocationContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Reflection;
using System.Text.Json;

namespace KafkaAdventure.Features.Locations;

public class LocationContext
{
public LocationContext()
{
Locations = [
new Location() {
Id = 1,
Name = "The Forest",
Description = "You are in a dark forest. The trees are tall and the air is damp.",
Exits = new Dictionary<string, string> {
{ "north", "The Forest" },
{ "south", "The Forest" },
{ "east", "The Forest" },
{ "west", "The Forest" }
}
}];

}

public List<Location> Locations { get; set; }


}
Loading
Loading