Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/MinimalKafka.RocksDB/ByteSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@

namespace MinimalKafka.Stream.Storage.RocksDB;

internal class ByteSerializer : IByteSerializer
internal class ByteSerializer(JsonSerializerOptions options) : IByteSerializer
{
public byte[] Serialize<T>(T value)
{
if (value is null)
throw new ArgumentNullException(nameof(value));

return JsonSerializer.SerializeToUtf8Bytes(value);
return JsonSerializer.SerializeToUtf8Bytes(value, options);
}

public T Deserialize<T>(byte[]? bytes)
{
if (bytes == null || bytes.Length == 0)
throw new ArgumentNullException(nameof(bytes));

return JsonSerializer.Deserialize<T>(bytes) ?? throw new InvalidOperationException("Deserialization failed");
return JsonSerializer.Deserialize<T>(bytes, options) ?? throw new InvalidOperationException("Deserialization failed");
}
}
40 changes: 29 additions & 11 deletions src/MinimalKafka.RocksDB/KafkaBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
using Microsoft.Extensions.DependencyInjection;
using MinimalKafka.Builders;
using MinimalKafka.Builders;
using MinimalKafka.Extension;
using MinimalKafka.Stream;
using MinimalKafka.Stream.Storage.RocksDB;
using RocksDbSharp;
using System.Text.Json;

#pragma warning disable IDE0130 // Namespace does not match folder structure
namespace MinimalKafka;
Expand All @@ -18,17 +16,37 @@ public static class KafkaBuilderExtensions
/// Configures the <see cref="IAddKafkaBuilder"/> to use RocksDB as the stream store.
/// </summary>
/// <param name="builder"></param>
/// <param name="path"></param>
/// <param name="options"></param>
/// <returns></returns>
public static IAddKafkaBuilder UseRocksDB(this IAddKafkaBuilder builder, string? path = null)
public static IAddKafkaBuilder UseRocksDB(this IAddKafkaBuilder builder, Action<RocksDBOptions>? options = null)
{
var dataPath = path ??
Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData),
"RockDB");
var config = new RocksDBOptions();
options?.Invoke(config);

Directory.CreateDirectory(dataPath);
Directory.CreateDirectory(config.DataPath);

builder.WithStreamStoreFactory(new RocksDBStreamStoreFactory(dataPath));
builder.WithStreamStoreFactory(new RocksDBStreamStoreFactory(config));
return builder;
}

/// <summary>
/// Configures the specified <see cref="RocksDBOptions"/> to use a JSON-based serializer.
/// </summary>
/// <remarks>This method sets the <see cref="RocksDBOptions.Serializer"/> property to a serializer that
/// uses JSON serialization. Use the <paramref name="options"/> parameter to customize the behavior of the JSON
/// serializer, such as formatting or type handling.</remarks>
/// <param name="rockDBOptions">The <see cref="RocksDBOptions"/> instance to configure.</param>
/// <param name="options">An optional delegate to configure the <see cref="JsonSerializerOptions"/> used by the serializer. If not
/// provided, default options are used.</param>
/// <returns>The configured <see cref="RocksDBOptions"/> instance.</returns>
public static RocksDBOptions UseJsonSerializer(this RocksDBOptions rockDBOptions, Action<JsonSerializerOptions>? options = null)
{
var config = JsonSerializerOptions.Default;

options?.Invoke(config);

rockDBOptions.Serializer = new ByteSerializer(config);

return rockDBOptions;
}
}
22 changes: 22 additions & 0 deletions src/MinimalKafka.RocksDB/RocksDBOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System.Text.Json;

namespace MinimalKafka.Stream.Storage.RocksDB;

/// <summary>
/// Options for configuring RocksDB stream storage.
/// </summary>
public class RocksDBOptions
{
/// <summary>
/// Gets or sets the file system path where application data is stored.
/// </summary>
public string DataPath { get; set; } = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData),
"RockDB");

/// <summary>
/// Gets or sets the serializer used for converting objects to and from byte arrays.
/// </summary>
/// <remarks>The serializer can be customized to use different serialization formats or settings,
/// depending on the implementation of <see cref="IByteSerializer"/>.</remarks>
public IByteSerializer Serializer { get; set; } = new ByteSerializer(JsonSerializerOptions.Default);
}
17 changes: 10 additions & 7 deletions src/MinimalKafka.RocksDB/RocksDBStoreManager.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
using RocksDbSharp;
using Confluent.Kafka;
using RocksDbSharp;
using System.Collections.Concurrent;
using System.Text.Json;

namespace MinimalKafka.Stream.Storage.RocksDB;
internal sealed class RocksDBStreamStoreFactory : IDisposable, IStreamStoreFactory
{
private readonly RocksDb _db;
private readonly ConcurrentDictionary<string, ColumnFamilyHandle> _columnFamilies = new();

public RocksDBStreamStoreFactory(string dbPath)
private readonly RocksDBOptions _config;

public RocksDBStreamStoreFactory(RocksDBOptions config)
{
var options = new DbOptions()
.SetCreateIfMissing(true)
Expand All @@ -18,7 +21,7 @@ public RocksDBStreamStoreFactory(string dbPath)
string[] existingFamilies;
try
{
existingFamilies = [.. RocksDb.ListColumnFamilies(options, dbPath)];
existingFamilies = [.. RocksDb.ListColumnFamilies(options, config.DataPath)];
}
catch
{
Expand All @@ -32,15 +35,15 @@ public RocksDBStreamStoreFactory(string dbPath)
cfDescriptors.Add(name, new ColumnFamilyOptions());
}

_db = RocksDb.Open(options, dbPath, cfDescriptors);
_db = RocksDb.Open(options, config.DataPath, cfDescriptors);

// Store all handles
for (int i = 0; i < existingFamilies.Length; i++)
{
_columnFamilies[existingFamilies[i]] = _db.GetColumnFamily(existingFamilies[i]);
}


_config = config;
}

public void Dispose()
Expand All @@ -59,6 +62,6 @@ public IStreamStore<TKey, TValue> GetStreamStore<TKey, TValue>()
_columnFamilies[storeName] = cfHandle;
}

return new RocksDBStreamStore<TKey, TValue>(_db, cfHandle, new ByteSerializer());
return new RocksDBStreamStore<TKey, TValue>(_db, cfHandle, _config.Serializer);
}
}
26 changes: 21 additions & 5 deletions src/MinimalKafka/KafkaProducerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ internal class KafkaProducerFactory<TKey, TValue> : IProducer<TKey, TValue>

public string Name => Producer.Name;

public IReadOnlyList<object> Metadata { get; }

public KafkaProducerFactory(IKafkaBuilder builder)
{
var c = builder.MetaData.OfType<IConfigurationMetadata>().FirstOrDefault()?.Configuration;
var keySerializer = builder.MetaData.OfType<KeySerializerMetadata>().First();
var valueSerializer = builder.MetaData.OfType<ValueSerializerMetadata>().First();

ProducerConfig config = c is null ? new() : new(c);
Metadata = builder.MetaData;

var config = BuildConfig();

_topicFormatter = builder.MetaData.OfType<ITopicFormatterMetadata>().First();

var keySerializer = builder.MetaData.OfType<KeySerializerMetadata>().First();
var valueSerializer = builder.MetaData.OfType<ValueSerializerMetadata>().First();

var serializerKey = ActivatorUtilities.CreateInstance(builder.ServiceProvider, keySerializer.GetSerializerType<TKey>());
var serializerValue = ActivatorUtilities.CreateInstance(builder.ServiceProvider, valueSerializer.GetSerializerType<TValue>());

Expand All @@ -35,6 +38,19 @@ public KafkaProducerFactory(IKafkaBuilder builder)
.Build();
}

private ProducerConfig BuildConfig()
{
var c = Metadata.OfType<IConfigurationMetadata>().FirstOrDefault()?.Configuration;

ProducerConfig config = c is null ? new() : new(c);

foreach (var item in Metadata.OfType<IProducerConfigMetadata>())
{
item.Set(config);
}
return config;
}

public Task<DeliveryResult<TKey, TValue>> ProduceAsync(string topic, Message<TKey, TValue> message, CancellationToken cancellationToken = default)
{
return Producer.ProduceAsync(_topicFormatter.Format(topic), message, cancellationToken);
Expand Down
12 changes: 11 additions & 1 deletion src/MinimalKafka/Stream/Storage/InMemoryStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,19 @@ namespace MinimalKafka.Stream.Storage;

internal sealed class InMemoryStreamStoreFactory : IStreamStoreFactory
{
private readonly List<object> _stores = [];

public IStreamStore<TKey, TValue> GetStreamStore<TKey, TValue>()
where TKey : notnull
=> new InMemoryStore<TKey, TValue>();
{
var item = _stores.OfType<InMemoryStore<TKey, TValue>>().FirstOrDefault();
if(item == null)
{
item = new InMemoryStore<TKey, TValue>();
_stores.Add(item);
}
return item;
}
}

internal sealed class InMemoryStore<TKey, TValue>() : BackgroundService, IStreamStore<TKey, TValue>
Expand Down
5 changes: 4 additions & 1 deletion test/MinimalKafka.RocksDB.Tests/StreamStore_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ public async Task AddOrUpdate_WithNewKey_ShouldAddValue()
var services = new ServiceCollection();
services.AddMinimalKafka(builder =>
{
builder.UseRocksDB(RocksDBHelper.DataPath);
builder.UseRocksDB(o =>
{
o.DataPath = RocksDBHelper.DataPath;
});
});
var provider = services.BuildServiceProvider();
var factory = provider.GetRequiredService<IStreamStoreFactory>();
Expand Down
Loading