Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion examples/KafkaAdventure/Extensions/KafkaBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
using MinimalKafka.Builders;
using MinimalKafka.Builders;
using MinimalKafka.Extension;

namespace KafkaAdventure.Extensions;

public static class KafkaBuilderExtensions
{
/// <summary>
/// Configures the Kafka convention builder with a client ID set to the specified feature name and a group ID combining the feature name and the current ASP.NET Core environment.
/// </summary>
/// <param name="featureName">The name to use as the Kafka client ID and as part of the group ID.</param>
/// <returns>The modified <see cref="IKafkaConventionBuilder"/> instance.</returns>
public static IKafkaConventionBuilder AsFeature(this IKafkaConventionBuilder builder, string featureName)
{
builder.WithClientId(featureName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
using KafkaAdventure.Extensions;
using KafkaAdventure.Extensions;
using MinimalKafka.Extension;
using MinimalKafka.Stream;

namespace KafkaAdventure.Features.CommandProcessor;

public static class ProcessorFeature
{
/// <summary>
/// Configures the Kafka stream processor for handling game commands on the web application.
/// </summary>
/// <remarks>
/// Sets up processing for the "game-commands" Kafka topic, routing incoming commands to appropriate topics based on their type:
/// - "HELP" commands produce a list of available commands to "game-response".
/// - "GO" and "LOOK" commands produce movement instructions to "game-movement".
/// - "INVENTORY" commands are forwarded to "game-inventory".
/// - "ECHO" commands produce an echo response to "game-response".
/// - Unrecognized commands produce an error response to "game-response".
/// Registers the entire pipeline as a feature named "Commands".
/// </remarks>
public static void MapProcessor(this WebApplication app)
{
Console.WriteLine("Starting Up ProcessorFeature");
Expand Down Expand Up @@ -52,7 +64,12 @@ public static void MapProcessor(this WebApplication app)

public record Command(string Cmd, string[] Args)
{
public bool IsCommand(string s) => Cmd.StartsWith(s, StringComparison.InvariantCultureIgnoreCase);
/// <summary>
/// Determines whether the command string starts with the specified value, ignoring case.
/// </summary>
/// <param name="s">The string to compare against the start of the command.</param>
/// <returns>True if the command starts with the specified string; otherwise, false.</returns>
public bool IsCommand(string s) => Cmd.StartsWith(s, StringComparison.InvariantCultureIgnoreCase);
};

public record Response(string Command, string Value);
Expand Down
11 changes: 10 additions & 1 deletion examples/KafkaAdventure/Features/Input/InputFeature.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using KafkaAdventure.Extensions;
using KafkaAdventure.Extensions;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.SignalR;
using MinimalKafka;
Expand All @@ -8,6 +8,10 @@ namespace KafkaAdventure.Features.Input;

public static class InputFeature
{
/// <summary>
/// Configures the application to map the InputHub SignalR hub and the "game-response" Kafka topic for input handling.
/// </summary>
/// <typeparam name="T">The application type implementing both IEndpointRouteBuilder and IApplicationBuilder.</typeparam>
public static void MapInput<T>(this T app)
where T : IEndpointRouteBuilder, IApplicationBuilder
{
Expand All @@ -17,6 +21,11 @@ public static void MapInput<T>(this T app)
.AsFeature("Input");
}

/// <summary>
/// Sends a response message to all SignalR clients in the specified game group.
/// </summary>
/// <param name="gameId">The identifier of the SignalR group representing the game session.</param>
/// <param name="response">The response containing the message to send to clients.</param>
public static async Task HandleAsync([FromServices] IHubContext<InputHub> hub, [FromKey] string gameId, [FromValue] Response response)
{
await hub.Clients.Group(gameId).SendAsync("ReceiveMessage", response.Value);
Expand Down
11 changes: 10 additions & 1 deletion examples/KafkaAdventure/Features/Input/InputHub.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Confluent.Kafka;
using Confluent.Kafka;
using Microsoft.AspNetCore.SignalR;

namespace KafkaAdventure.Features;
Expand All @@ -8,6 +8,10 @@ public class InputHub(
IProducer<string, Command> command
) : Hub
{
/// <summary>
/// Adds the current connection to the specified game group and sends introductory messages to all clients in that group.
/// </summary>
/// <param name="gameId">The identifier of the game group to join.</param>
public async Task JoinGame(string gameId)
{
await Groups.AddToGroupAsync(Context.ConnectionId, gameId);
Expand All @@ -17,6 +21,11 @@ public async Task JoinGame(string gameId)
await Clients.Group(gameId).SendAsync("ReceiveMessage", "Type your commands to explore the world. Type 'help' for a list of commands.");
}

/// <summary>
/// Processes a client message by parsing it into a command and arguments, then produces a Command message to the "game-commands" Kafka topic for the specified game.
/// </summary>
/// <param name="gameId">The identifier of the game to which the command applies.</param>
/// <param name="message">The message from the client, expected to contain a command and optional arguments separated by spaces.</param>
public async Task SendMessage(string gameId, string message)
{
if(string.IsNullOrWhiteSpace(message))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
using System.Reflection;
using System.Reflection;
using System.Text.Json;

namespace KafkaAdventure.Features.Locations;

public class LocationContext
{
/// <summary>
/// Initializes the LocationContext with a default list containing a single forest location and its exits.
/// </summary>
public LocationContext()
{
Locations = [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Confluent.Kafka;
using Confluent.Kafka;
using KafkaAdventure.Extensions;
using Microsoft.AspNetCore.Mvc;
using MinimalKafka.Extension;
Expand All @@ -8,6 +8,12 @@ namespace KafkaAdventure.Features.Locations;

public static class LocationsFeature
{
/// <summary>
/// Configures HTTP and Kafka stream endpoints for managing location data.
/// </summary>
/// <remarks>
/// Maps a POST endpoint at <c>/locations</c> that accepts an array of <see cref="Location"/> objects and produces them to the "game-locations" Kafka topic. Also sets up a Kafka stream consumer for the "game-locations" topic, updating or adding locations in the <see cref="LocationContext"/> based on incoming messages.
/// </remarks>
public static void MapLocations<T>(this T app)
where T : IEndpointRouteBuilder, IApplicationBuilder
{
Expand Down
13 changes: 11 additions & 2 deletions examples/KafkaAdventure/Features/Movement/MovementFeature.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using KafkaAdventure.Extensions;
using KafkaAdventure.Extensions;
using KafkaAdventure.Features.Locations;
using MinimalKafka.Extension;
using MinimalKafka.Stream;
Expand All @@ -7,6 +7,10 @@ namespace KafkaAdventure.Features.Movement;

public static class MovementFeature
{
/// <summary>
/// Configures a Kafka stream processing pipeline for handling player movement commands in a game, joining movement events with player positions and producing appropriate responses and location updates.
/// </summary>
/// <typeparam name="T">The application type implementing both <see cref="IEndpointRouteBuilder"/> and <see cref="IApplicationBuilder"/>.</typeparam>
public static void MapMovement<T>(this T app)
where T : IEndpointRouteBuilder, IApplicationBuilder
{
Expand Down Expand Up @@ -48,7 +52,12 @@ public record Position(int X, int Y);

public record class Movement(string Cmd, string Direction)
{
public bool IsCommand(string s) => Cmd.StartsWith(s, StringComparison.InvariantCultureIgnoreCase);
/// <summary>
/// Determines whether the movement command starts with the specified string, using a case-insensitive comparison.
/// </summary>
/// <param name="s">The string to compare against the start of the command.</param>
/// <returns>True if the command starts with the specified string; otherwise, false.</returns>
public bool IsCommand(string s) => Cmd.StartsWith(s, StringComparison.InvariantCultureIgnoreCase);
};

public record Response(string Command, string Value);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using KafkaAdventure.Extensions;
using KafkaAdventure.Extensions;
using KafkaAdventure.Features.Locations;
using MinimalKafka.Extension;
using MinimalKafka.Metadata;
Expand All @@ -8,6 +8,10 @@ namespace KafkaAdventure.Features.PlayerLocation;

public static class PlayerLocations
{
/// <summary>
/// Configures a Kafka stream processing pipeline that listens for player location updates and produces descriptive responses about the player's current location and available exits.
/// </summary>
/// <typeparam name="T">A type that implements <see cref="IApplicationBuilder"/>.</typeparam>
public static void MapPlayerLocations<T>(this T app)
where T : IApplicationBuilder
{
Expand Down
19 changes: 17 additions & 2 deletions examples/KafkaAdventure/StreamStore.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Caching.Memory;
using MinimalKafka.Stream;

namespace KafkaAdventure;

public class StreamStore<TKey, TValue>(IMemoryCache cache) : IStreamStore<TKey, TValue>
{
/// <summary>
/// Adds a new value to the cache for the specified key or updates the existing value using the provided functions.
/// </summary>
/// <param name="key">The key associated with the value to add or update.</param>
/// <param name="create">A function to create a new value if the key does not exist.</param>
/// <param name="update">A function to update the existing value if the key is found.</param>
/// <returns>A <see cref="ValueTask{TValue}"/> containing the added or updated value.</returns>
public ValueTask<TValue> AddOrUpdate(TKey key, Func<TKey, TValue> create, Func<TKey, TValue, TValue> update)
{
if(cache.TryGetValue(key!, out TValue? value) ){
Expand All @@ -15,9 +22,17 @@ public ValueTask<TValue> AddOrUpdate(TKey key, Func<TKey, TValue> create, Func<T
};
}

public IAsyncEnumerable<TValue> FindAsync(Func<TValue, bool> predicate)
/// <summary>
/// Returns an empty asynchronous sequence of values, ignoring the provided predicate.
/// </summary>
public IAsyncEnumerable<TValue> FindAsync(Func<TValue, bool> predicate)
=> AsyncEnumerable.Empty<TValue>();

/// <summary>
/// Retrieves the value associated with the specified key from the cache asynchronously.
/// </summary>
/// <param name="key">The key whose value should be retrieved.</param>
/// <returns>A ValueTask containing the value if found; otherwise, null.</returns>
public ValueTask<TValue?> FindByIdAsync(TKey key)
{
return new ValueTask<TValue?>(cache.Get<TValue>(key!));
Expand Down
28 changes: 27 additions & 1 deletion examples/KafkaAdventure/wwwroot/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const inputField = document.getElementById("commandInput");
const inputField = document.getElementById("commandInput");
const terminal = document.getElementById("terminal");
const gamewindow = document.getElementById("gamewindow");

Expand All @@ -18,11 +18,18 @@ inputField.addEventListener("keydown", function (event) {
});


/**
* Scrolls the element with ID "body" to its bottom.
*/
function scrollToBottom() {
const bdy = document.getElementById("body");
bdy.scrollTop = bdy.scrollHeight;
}

/**
* Sends a user command to the server and displays it in the terminal.
* @param {string} command - The command string entered by the user.
*/
function executeCommand(command) {
const gameId = localStorage.getItem('gameid');
connection.send("SendMessage", gameId, command);
Expand All @@ -33,6 +40,10 @@ function executeCommand(command) {

scrollToBottom();
}
/**
* Displays a server response in the terminal with a typewriter animation effect.
* @param {string} res - The response text to display.
*/
function addResponse(res) {
const response = document.createElement("p");
response.className = "text-green-400";
Expand All @@ -51,6 +62,12 @@ function addResponse(res) {
scrollToBottom();
}

/**
* Animates text into a specified DOM element, displaying one character at a time at a given speed.
* @param {string} elementId - The ID of the DOM element where the text will be displayed.
* @param {string} text - The text to animate.
* @param {number} speed - The delay in milliseconds between each character.
*/
function slowType(elementId, text, speed) {
let i = 0;
const element = document.getElementById(elementId);
Expand All @@ -65,6 +82,10 @@ function slowType(elementId, text, speed) {

typeWriter();
}
/**
* Generates a version 4 UUID string using timestamp and randomization.
* @return {string} A randomly generated UUID in the format xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx.
*/
function generateUUID() { // Public Domain/MIT
var d = new Date().getTime();//Timestamp
var d2 = ((typeof performance !== 'undefined') && performance.now && (performance.now() * 1000)) || 0;//Time in microseconds since page-load or 0 if unsupported
Expand All @@ -82,6 +103,11 @@ function generateUUID() { // Public Domain/MIT
}


/**
* Initializes and maintains the SignalR connection for the game session.
*
* Retrieves or generates a unique game ID, establishes the SignalR connection, sets up a handler for incoming messages, and joins the game session. If the connection fails, it retries after a delay.
*/
async function start() {
try {
let gameId = localStorage.getItem('gameid');
Expand Down
15 changes: 14 additions & 1 deletion src/MinimalKafka.RocksDB/ByteSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
using System.Text.Json;
using System.Text.Json;

namespace MinimalKafka.Stream.Storage.RocksDB;

internal class ByteSerializer : IByteSerializer
{
/// <summary>
/// Serializes the specified object to a UTF-8 encoded JSON byte array.
/// </summary>
/// <param name="value">The object to serialize. Must not be null.</param>
/// <returns>A byte array containing the UTF-8 encoded JSON representation of the object.</returns>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="value"/> is null.</exception>
public byte[] Serialize<T>(T value)
{
if (value is null)
Expand All @@ -12,6 +18,13 @@ public byte[] Serialize<T>(T value)
return JsonSerializer.SerializeToUtf8Bytes(value);
}

/// <summary>
/// Deserializes a UTF-8 encoded JSON byte array into an object of type <typeparamref name="T"/>.
/// </summary>
/// <param name="bytes">The byte array containing the JSON data to deserialize.</param>
/// <returns>The deserialized object of type <typeparamref name="T"/>.</returns>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="bytes"/> is null or empty.</exception>
/// <exception cref="InvalidOperationException">Thrown if deserialization fails and returns null.</exception>
public T Deserialize<T>(byte[]? bytes)
{
if (bytes == null || bytes.Length == 0)
Expand Down
15 changes: 12 additions & 3 deletions src/MinimalKafka.RocksDB/IByteSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace MinimalKafka.Stream.Storage.RocksDB;
namespace MinimalKafka.Stream.Storage.RocksDB;

/// <summary>
/// Interface for serializing and deserializing byte arrays.
Expand All @@ -10,14 +10,23 @@ public interface IByteSerializer
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="value"></param>
/// <returns></returns>
/// <summary>
/// Serializes the specified value into a byte array.
/// </summary>
/// <typeparam name="T">The type of the value to serialize.</typeparam>
/// <param name="value">The value to serialize.</param>
/// <returns>A byte array representing the serialized value.</returns>
byte[] Serialize<T>(T value);

/// <summary>
/// Deserializes a byte array to an object of type T.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="bytes"></param>
/// <returns></returns>
/// <summary>
/// Deserializes a byte array into an object of type <typeparamref name="T"/>.
/// </summary>
/// <param name="bytes">The byte array to deserialize, or null.</param>
/// <returns>The deserialized object of type <typeparamref name="T"/>.</returns>
T Deserialize<T>(byte[]? bytes);
}
8 changes: 6 additions & 2 deletions src/MinimalKafka.RocksDB/KafkaBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using MinimalKafka.Builders;
using MinimalKafka.Extension;
using MinimalKafka.Stream.Storage.RocksDB;
Expand All @@ -18,7 +18,11 @@ public static class KafkaBuilderExtensions
/// </summary>
/// <param name="builder"></param>
/// <param name="options"></param>
/// <returns></returns>
/// <summary>
/// Configures the Kafka builder to use RocksDB as the stream storage backend.
/// </summary>
/// <param name="options">A delegate to configure the <see cref="RocksDBOptions"/> for the RocksDB instance.</param>
/// <returns>The configured <see cref="IAddKafkaBuilder"/> instance for fluent chaining.</returns>
public static IAddKafkaBuilder UseRocksDB(this IAddKafkaBuilder builder, Action<RocksDBOptions> options)
{
var o = new RocksDBOptions();
Expand Down
Loading
Loading