diff --git a/src/MinimalKafka.RocksDB/ByteSerializer.cs b/src/MinimalKafka.RocksDB/ByteSerializer.cs index 2d7099c..0c0d295 100644 --- a/src/MinimalKafka.RocksDB/ByteSerializer.cs +++ b/src/MinimalKafka.RocksDB/ByteSerializer.cs @@ -2,14 +2,14 @@ namespace MinimalKafka.Stream.Storage.RocksDB; -internal class ByteSerializer : IByteSerializer +internal class ByteSerializer(JsonSerializerOptions options) : IByteSerializer { public byte[] Serialize(T value) { if (value is null) throw new ArgumentNullException(nameof(value)); - return JsonSerializer.SerializeToUtf8Bytes(value); + return JsonSerializer.SerializeToUtf8Bytes(value, options); } public T Deserialize(byte[]? bytes) @@ -17,6 +17,6 @@ public T Deserialize(byte[]? bytes) if (bytes == null || bytes.Length == 0) throw new ArgumentNullException(nameof(bytes)); - return JsonSerializer.Deserialize(bytes) ?? throw new InvalidOperationException("Deserialization failed"); + return JsonSerializer.Deserialize(bytes, options) ?? throw new InvalidOperationException("Deserialization failed"); } } \ No newline at end of file diff --git a/src/MinimalKafka.RocksDB/KafkaBuilderExtensions.cs b/src/MinimalKafka.RocksDB/KafkaBuilderExtensions.cs index 867f128..92dd760 100644 --- a/src/MinimalKafka.RocksDB/KafkaBuilderExtensions.cs +++ b/src/MinimalKafka.RocksDB/KafkaBuilderExtensions.cs @@ -1,9 +1,7 @@ -using Microsoft.Extensions.DependencyInjection; -using MinimalKafka.Builders; +using MinimalKafka.Builders; using MinimalKafka.Extension; -using MinimalKafka.Stream; using MinimalKafka.Stream.Storage.RocksDB; -using RocksDbSharp; +using System.Text.Json; #pragma warning disable IDE0130 // Namespace does not match folder structure namespace MinimalKafka; @@ -18,17 +16,36 @@ public static class KafkaBuilderExtensions /// Configures the to use RocksDB as the stream store. /// /// - /// + /// /// - public static IAddKafkaBuilder UseRocksDB(this IAddKafkaBuilder builder, string? path = null) + public static IAddKafkaBuilder UseRocksDB(this IAddKafkaBuilder builder, Action? options = null) { - var dataPath = path ?? - Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), - "RockDB"); + var config = new RocksDBOptions(); + options?.Invoke(config); - Directory.CreateDirectory(dataPath); + Directory.CreateDirectory(config.DataPath); - builder.WithStreamStoreFactory(new RocksDBStreamStoreFactory(dataPath)); + builder.WithStreamStoreFactory(new RocksDBStreamStoreFactory(config)); return builder; } + + /// + /// Configures the specified to use a JSON-based serializer. + /// + /// This method sets the property to a serializer that + /// uses JSON serialization. Use the parameter to customize the behavior of the JSON + /// serializer, such as formatting or type handling. + /// The instance to configure. + /// An optional delegate to configure the used by the serializer. If not + /// provided, default options are used. + /// The configured instance. + public static RocksDBOptions UseJsonSerializer(this RocksDBOptions rockDBOptions, Action? options = null) + { + var config = new JsonSerializerOptions(); + options?.Invoke(config); + + rockDBOptions.Serializer = new ByteSerializer(config); + + return rockDBOptions; + } } diff --git a/src/MinimalKafka.RocksDB/RocksDBOptions.cs b/src/MinimalKafka.RocksDB/RocksDBOptions.cs new file mode 100644 index 0000000..b76316d --- /dev/null +++ b/src/MinimalKafka.RocksDB/RocksDBOptions.cs @@ -0,0 +1,22 @@ +using System.Text.Json; + +namespace MinimalKafka.Stream.Storage.RocksDB; + +/// +/// Options for configuring RocksDB stream storage. +/// +public class RocksDBOptions +{ + /// + /// Gets or sets the file system path where application data is stored. + /// + public string DataPath { get; set; } = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), + "RocksDB"); + + /// + /// Gets or sets the serializer used for converting objects to and from byte arrays. + /// + /// The serializer can be customized to use different serialization formats or settings, + /// depending on the implementation of . + public IByteSerializer Serializer { get; set; } = new ByteSerializer(JsonSerializerOptions.Default); +} diff --git a/src/MinimalKafka.RocksDB/RocksDBStoreManager.cs b/src/MinimalKafka.RocksDB/RocksDBStoreManager.cs index dc0396c..eae0f5e 100644 --- a/src/MinimalKafka.RocksDB/RocksDBStoreManager.cs +++ b/src/MinimalKafka.RocksDB/RocksDBStoreManager.cs @@ -1,14 +1,20 @@ -using RocksDbSharp; +using Confluent.Kafka; +using RocksDbSharp; using System.Collections.Concurrent; +using System.Text.Json; namespace MinimalKafka.Stream.Storage.RocksDB; internal sealed class RocksDBStreamStoreFactory : IDisposable, IStreamStoreFactory { private readonly RocksDb _db; private readonly ConcurrentDictionary _columnFamilies = new(); - - public RocksDBStreamStoreFactory(string dbPath) + private readonly RocksDBOptions _config; + + public RocksDBStreamStoreFactory(RocksDBOptions config) { + ArgumentNullException.ThrowIfNull(config); + _config = config; + var options = new DbOptions() .SetCreateIfMissing(true) .SetCreateMissingColumnFamilies(true); @@ -18,7 +24,7 @@ public RocksDBStreamStoreFactory(string dbPath) string[] existingFamilies; try { - existingFamilies = [.. RocksDb.ListColumnFamilies(options, dbPath)]; + existingFamilies = [.. RocksDb.ListColumnFamilies(options, _config.DataPath)]; } catch { @@ -32,15 +38,13 @@ public RocksDBStreamStoreFactory(string dbPath) cfDescriptors.Add(name, new ColumnFamilyOptions()); } - _db = RocksDb.Open(options, dbPath, cfDescriptors); + _db = RocksDb.Open(options, _config.DataPath, cfDescriptors); // Store all handles for (int i = 0; i < existingFamilies.Length; i++) { _columnFamilies[existingFamilies[i]] = _db.GetColumnFamily(existingFamilies[i]); } - - } public void Dispose() @@ -59,6 +63,6 @@ public IStreamStore GetStreamStore() _columnFamilies[storeName] = cfHandle; } - return new RocksDBStreamStore(_db, cfHandle, new ByteSerializer()); + return new RocksDBStreamStore(_db, cfHandle, _config.Serializer); } } \ No newline at end of file diff --git a/src/MinimalKafka/KafkaProducerFactory.cs b/src/MinimalKafka/KafkaProducerFactory.cs index 1ed22f9..1ad46ff 100644 --- a/src/MinimalKafka/KafkaProducerFactory.cs +++ b/src/MinimalKafka/KafkaProducerFactory.cs @@ -16,16 +16,19 @@ internal class KafkaProducerFactory : IProducer public string Name => Producer.Name; + public IReadOnlyList Metadata { get; } + public KafkaProducerFactory(IKafkaBuilder builder) { - var c = builder.MetaData.OfType().FirstOrDefault()?.Configuration; - var keySerializer = builder.MetaData.OfType().First(); - var valueSerializer = builder.MetaData.OfType().First(); - - ProducerConfig config = c is null ? new() : new(c); + Metadata = builder.MetaData; + + var config = BuildConfig(); _topicFormatter = builder.MetaData.OfType().First(); + var keySerializer = builder.MetaData.OfType().First(); + var valueSerializer = builder.MetaData.OfType().First(); + var serializerKey = ActivatorUtilities.CreateInstance(builder.ServiceProvider, keySerializer.GetSerializerType()); var serializerValue = ActivatorUtilities.CreateInstance(builder.ServiceProvider, valueSerializer.GetSerializerType()); @@ -35,6 +38,19 @@ public KafkaProducerFactory(IKafkaBuilder builder) .Build(); } + private ProducerConfig BuildConfig() + { + var c = Metadata.OfType().FirstOrDefault()?.Configuration; + + ProducerConfig config = c is null ? new() : new(c); + + foreach (var item in Metadata.OfType()) + { + item.Set(config); + } + return config; + } + public Task> ProduceAsync(string topic, Message message, CancellationToken cancellationToken = default) { return Producer.ProduceAsync(_topicFormatter.Format(topic), message, cancellationToken); diff --git a/src/MinimalKafka/Stream/Storage/InMemoryStore.cs b/src/MinimalKafka/Stream/Storage/InMemoryStore.cs index 2bdff0b..bfce380 100644 --- a/src/MinimalKafka/Stream/Storage/InMemoryStore.cs +++ b/src/MinimalKafka/Stream/Storage/InMemoryStore.cs @@ -1,12 +1,27 @@ using Microsoft.Extensions.Hosting; +using System.Collections.Concurrent; namespace MinimalKafka.Stream.Storage; internal sealed class InMemoryStreamStoreFactory : IStreamStoreFactory { + private readonly ConcurrentBag _stores = []; + private readonly object _lock = new(); + public IStreamStore GetStreamStore() where TKey : notnull - => new InMemoryStore(); + { + lock (_lock) + { + var item = _stores.OfType>().FirstOrDefault(); + if(item == null) + { + item = new InMemoryStore(); + _stores.Add(item); + } + return item; + } + } } internal sealed class InMemoryStore() : BackgroundService, IStreamStore diff --git a/test/MinimalKafka.RocksDB.Tests/StreamStore_Tests.cs b/test/MinimalKafka.RocksDB.Tests/StreamStore_Tests.cs index 58d2fbe..ea98ae2 100644 --- a/test/MinimalKafka.RocksDB.Tests/StreamStore_Tests.cs +++ b/test/MinimalKafka.RocksDB.Tests/StreamStore_Tests.cs @@ -17,7 +17,10 @@ public async Task AddOrUpdate_WithNewKey_ShouldAddValue() var services = new ServiceCollection(); services.AddMinimalKafka(builder => { - builder.UseRocksDB(RocksDBHelper.DataPath); + builder.UseRocksDB(o => + { + o.DataPath = RocksDBHelper.DataPath; + }); }); var provider = services.BuildServiceProvider(); var factory = provider.GetRequiredService();