diff --git a/README.md b/README.md index fd7d795..ad45a3e 100644 --- a/README.md +++ b/README.md @@ -171,7 +171,34 @@ akka.persistence { } } ``` +There is also an implementation using System.Text.Json in the plugin +that can be used. That can be configured like this: +**Using Akka Hosting:** +```csharp +var host = new HostBuilder() + .ConfigureServices((context, services) => { + services.AddAkka("my-system-name", (builder, provider) => + { + builder.WithEventStorePersistence( + connectionString: _myConnectionString, + adapter: "system-text-json") + }); + }) +``` +**Using Hocon:** +``` +akka.persistence { + journal { + plugin = "akka.persistence.journal.eventstore"" + eventstore { + class = "Akka.Persistence.EventStore.Journal.EventStoreJournal, Akka.Persistence.EventStore" + connection-string = "esdb://admin:changeit@localhost:2113" + adapter = "system-text-json" + } + } +} +``` # Projections To support the Read Journal the plugin takes advantage of the [projections](https://developers.eventstore.com/server/v23.10/projections.html#introduction) feature diff --git a/src/Akka.Persistence.EventStore.Benchmarks/BasePersistBenchmarks.cs b/src/Akka.Persistence.EventStore.Benchmarks/BasePersistBenchmarks.cs index 65312c8..5a97748 100644 --- a/src/Akka.Persistence.EventStore.Benchmarks/BasePersistBenchmarks.cs +++ b/src/Akka.Persistence.EventStore.Benchmarks/BasePersistBenchmarks.cs @@ -10,6 +10,7 @@ using BenchmarkDotNet.Configs; using BenchmarkDotNet.Diagnosers; using BenchmarkDotNet.Loggers; +using JetBrains.Annotations; namespace Akka.Persistence.EventStore.Benchmarks; @@ -36,7 +37,9 @@ public Config() [GlobalSetup] public async Task Setup() { - _sys = await EventStoreBenchmarkFixture.CreateActorSystemWithCleanDb("system"); + _sys = await EventStoreBenchmarkFixture.CreateActorSystemWithCleanDb( + "system", + overrideSerializer: AdapterType); } [GlobalCleanup] @@ -72,9 +75,12 @@ public void SetupActors() : new SingleActorBenchmarkProxy(benchActor, testProbe, Configuration.Commands[^1]); } - [ParamsSource(nameof(GetNumberOfEventsConfiguration))] + [ParamsSource(nameof(GetNumberOfEventsConfiguration)), PublicAPI] public MessagesPerSecondConfiguration Configuration { get; set; } = null!; + [ParamsSource(nameof(GetAdapterTypes)), PublicAPI] + public string AdapterType { get; set; } = null!; + public static IImmutableList GetNumberOfEventsConfiguration() { const int numberOfEvents = 1000; @@ -87,6 +93,13 @@ public static IImmutableList GetNumberOfEventsCo .ToImmutableList(); } + public static IImmutableList GetAdapterTypes() + { + return ImmutableList.Create( + "default", + "system-text-json"); + } + protected async Task RunBenchmark(string mode) { Configuration.Commands.ForEach(cmd => _benchmarkProxy.Send(mode, cmd)); diff --git a/src/Akka.Persistence.EventStore.Benchmarks/EventStoreBenchmarkFixture.cs b/src/Akka.Persistence.EventStore.Benchmarks/EventStoreBenchmarkFixture.cs index 50bf0c1..794480a 100644 --- a/src/Akka.Persistence.EventStore.Benchmarks/EventStoreBenchmarkFixture.cs +++ b/src/Akka.Persistence.EventStore.Benchmarks/EventStoreBenchmarkFixture.cs @@ -18,7 +18,10 @@ public static async Task CreateActorSystemFromSeededData(string nam return ActorSystem.Create(name, config); } - public static async Task CreateActorSystemWithCleanDb(string name, Config? extraConfig = null) + public static async Task CreateActorSystemWithCleanDb( + string name, + Config? extraConfig = null, + string? overrideSerializer = null) { var eventStoreContainer = new EventStoreContainer(); await eventStoreContainer.InitializeAsync(); @@ -38,6 +41,12 @@ public static async Task CreateActorSystemWithCleanDb(string n .WithFallback(extraConfig ?? "") .WithFallback(Persistence.DefaultConfig()) .WithFallback(EventStorePersistence.DefaultConfiguration); + + if (!string.IsNullOrEmpty(overrideSerializer)) + { + config = config.WithFallback( + $"akka.persistence.journal.eventstore.adapter = \"{overrideSerializer}\""); + } var actorSystem = ActorSystem.Create(name, config); diff --git a/src/Akka.Persistence.EventStore.Benchmarks/SerializationBenchmarks.cs b/src/Akka.Persistence.EventStore.Benchmarks/SerializationBenchmarks.cs index 05e710e..90a0adf 100644 --- a/src/Akka.Persistence.EventStore.Benchmarks/SerializationBenchmarks.cs +++ b/src/Akka.Persistence.EventStore.Benchmarks/SerializationBenchmarks.cs @@ -6,6 +6,7 @@ using BenchmarkDotNet.Diagnosers; using BenchmarkDotNet.Loggers; using EventStore.Client; +using JetBrains.Annotations; namespace Akka.Persistence.EventStore.Benchmarks; @@ -23,7 +24,7 @@ public Config() private readonly ComplexEvent _complexEvent = ComplexEvent.Create(); - private DefaultMessageAdapter _adapter = null!; + private IMessageAdapter _adapter = null!; private EventStoreBenchmarkFixture.CleanActorSystem? _sys; private ResolvedEvent _serializedStringEvent; @@ -32,11 +33,12 @@ public Config() [GlobalSetup] public async Task Setup() { - _sys = await EventStoreBenchmarkFixture.CreateActorSystemWithCleanDb("system"); + _sys = await EventStoreBenchmarkFixture.CreateActorSystemWithCleanDb("system", overrideSerializer: AdapterType); - _adapter = new DefaultMessageAdapter( - _sys.System.Serialization, - new EventStoreJournalSettings(_sys.System.Settings.Config.GetConfig("akka.persistence.journal.eventstore"))); + var settings = + new EventStoreJournalSettings(_sys.System.Settings.Config.GetConfig("akka.persistence.journal.eventstore")); + + _adapter = settings.FindEventAdapter(_sys.System); var serializedStringEvent = await _adapter.Adapt(new Persistent("a")); var serializedComplexEvent = await _adapter.Adapt(new Persistent(_complexEvent)); @@ -82,6 +84,16 @@ public async Task Cleanup() if (_sys is not null) await _sys.DisposeAsync(); } + + [ParamsSource(nameof(GetAdapterTypes)), PublicAPI] + public string AdapterType { get; set; } = null!; + + public static IImmutableList GetAdapterTypes() + { + return ImmutableList.Create( + "default", + "system-text-json"); + } [Benchmark] public async Task SerializeStringEvent() @@ -107,6 +119,7 @@ public async Task DeSerializeComplexEvent() await _adapter.AdaptEvent(_serializedComplexEvent); } + [PublicAPI] public record ComplexEvent( string Name, int Number, diff --git a/src/Akka.Persistence.EventStore.Tests/EventStoreConfiguration.cs b/src/Akka.Persistence.EventStore.Tests/EventStoreConfiguration.cs index d0deda4..4c972a4 100644 --- a/src/Akka.Persistence.EventStore.Tests/EventStoreConfiguration.cs +++ b/src/Akka.Persistence.EventStore.Tests/EventStoreConfiguration.cs @@ -7,7 +7,7 @@ public static class EventStoreConfiguration public static Config Build( EventStoreContainer eventStoreContainer, string tenant, - Type? overrideSerializer = null) + string? overrideSerializer = null) { var config = ConfigurationFactory.ParseString($@" akka.loglevel = INFO @@ -40,7 +40,7 @@ class = ""Akka.Persistence.EventStore.Query.EventStoreReadJournalProvider, Akka. if (overrideSerializer != null) { config = config.WithFallback( - $"akka.persistence.journal.eventstore.adapter = \"{overrideSerializer.ToClrTypeName()}\""); + $"akka.persistence.journal.eventstore.adapter = \"{overrideSerializer}\""); } return config; diff --git a/src/Akka.Persistence.EventStore.Tests/EventStoreJournalAltAdapterSpec.cs b/src/Akka.Persistence.EventStore.Tests/EventStoreJournalAltAdapterSpec.cs index 832d644..e9386b1 100644 --- a/src/Akka.Persistence.EventStore.Tests/EventStoreJournalAltAdapterSpec.cs +++ b/src/Akka.Persistence.EventStore.Tests/EventStoreJournalAltAdapterSpec.cs @@ -15,7 +15,7 @@ public EventStoreJournalAltAdapterSpec(EventStoreContainer eventStoreContainer) : base(EventStoreConfiguration.Build( eventStoreContainer, "alt-journal-spec", - typeof(TestMessageAdapter)), nameof(EventStoreJournalSpec)) + typeof(TestMessageAdapter).ToClrTypeName()), nameof(EventStoreJournalAltAdapterSpec)) { Initialize(); } diff --git a/src/Akka.Persistence.EventStore.Tests/EventStoreJournalSystemTextJsonAdapterSpec.cs b/src/Akka.Persistence.EventStore.Tests/EventStoreJournalSystemTextJsonAdapterSpec.cs new file mode 100644 index 0000000..e709358 --- /dev/null +++ b/src/Akka.Persistence.EventStore.Tests/EventStoreJournalSystemTextJsonAdapterSpec.cs @@ -0,0 +1,22 @@ +using Akka.Persistence.TCK.Journal; +using Xunit; + +namespace Akka.Persistence.EventStore.Tests; + +[Collection(nameof(EventStoreTestsDatabaseCollection))] +public class EventStoreJournalSystemTextJsonAdapterSpec : JournalSpec +{ + protected override bool SupportsRejectingNonSerializableObjects => false; + + // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 + protected override bool SupportsSerialization => false; + + public EventStoreJournalSystemTextJsonAdapterSpec(EventStoreContainer eventStoreContainer) + : base(EventStoreConfiguration.Build( + eventStoreContainer, + "system-text-json-journal-spec", + "system-text-json"), nameof(EventStoreJournalSystemTextJsonAdapterSpec)) + { + Initialize(); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore/Serialization/DefaultMessageAdapter.cs b/src/Akka.Persistence.EventStore/Serialization/DefaultMessageAdapter.cs index 3afa55e..30e5330 100644 --- a/src/Akka.Persistence.EventStore/Serialization/DefaultMessageAdapter.cs +++ b/src/Akka.Persistence.EventStore/Serialization/DefaultMessageAdapter.cs @@ -139,6 +139,7 @@ protected virtual IStoredEventMetadata GetEventMetadata( [PublicAPI] protected virtual async Task GetEventMetadataFrom(ResolvedEvent evnt) { + // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract if (evnt.Event == null) return null; @@ -180,11 +181,15 @@ public interface IStoredEventMetadata string writerGuid { get; } // ReSharper disable once InconsistentNaming long? timestamp { get; } + // ReSharper disable once InconsistentNaming + string tenant { get; set; } + // ReSharper disable once InconsistentNaming + IImmutableSet tags { get; set; } } + [PublicAPI] public class StoredEventMetadata : IStoredEventMetadata { - [PublicAPI] public StoredEventMetadata() { @@ -209,18 +214,13 @@ public StoredEventMetadata( public string persistenceId { get; set; } = null!; // ReSharper disable once InconsistentNaming - [PublicAPI] public DateTimeOffset occurredOn { get; set; } public string manifest { get; set; } = null!; public long sequenceNr { get; set; } public string writerGuid { get; set; } = null!; public string journalType { get; set; } = null!; public long? timestamp { get; set; } - // ReSharper disable once InconsistentNaming - [PublicAPI] public string tenant { get; set; } = null!; - // ReSharper disable once InconsistentNaming - [PublicAPI] public IImmutableSet tags { get; set; } = ImmutableHashSet.Empty; public IActorRef? sender { get; set; } } diff --git a/src/Akka.Persistence.EventStore/Serialization/SettingsWithAdapterExtensions.cs b/src/Akka.Persistence.EventStore/Serialization/SettingsWithAdapterExtensions.cs index 8f4c051..245e358 100644 --- a/src/Akka.Persistence.EventStore/Serialization/SettingsWithAdapterExtensions.cs +++ b/src/Akka.Persistence.EventStore/Serialization/SettingsWithAdapterExtensions.cs @@ -1,3 +1,4 @@ +using System.Collections.Immutable; using Akka.Actor; using Akka.Event; using Akka.Persistence.EventStore.Configuration; @@ -6,51 +7,60 @@ namespace Akka.Persistence.EventStore.Serialization; public static class SettingsWithAdapterExtensions { + private static readonly IImmutableDictionary AdapterOverrides = new Dictionary + { + ["default"] = typeof(DefaultMessageAdapter), + ["system-text-json"] = typeof(SystemTextJsonMessageAdapter) + }.ToImmutableDictionary(); + public static IMessageAdapter FindEventAdapter( this ISettingsWithAdapter settings, ActorSystem actorSystem) { - if (settings.Adapter == "default") - return GetDefaultAdapter(); - + var type = AdapterOverrides.TryGetValue(settings.Adapter, out var adapterType) + ? adapterType + : Type.GetType(settings.Adapter); + + return Create(type, settings, actorSystem); + } + + private static IMessageAdapter Create( + Type? type, + ISettingsWithAdapter settings, + ActorSystem actorSystem) + { var logger = actorSystem.Log; + + if (type == null) + { + logger.Error( + "Unable to find type [{0}] Adapter. Is the assembly referenced properly? Falling back to default", + settings.Adapter); + + return Create(AdapterOverrides["default"], settings, actorSystem); + } try { - var journalMessageAdapterType = Type.GetType(settings.Adapter); - - if (journalMessageAdapterType == null) - { - logger.Error( - $"Unable to find type [{settings.Adapter}] Adapter. Is the assembly referenced properly? Falling back to default"); - - return GetDefaultAdapter(); - } - var adapterConstructor = - journalMessageAdapterType.GetConstructor([typeof(Akka.Serialization.Serialization), typeof(ISettingsWithAdapter)]); + type.GetConstructor([typeof(Akka.Serialization.Serialization), typeof(ISettingsWithAdapter)]); if ((adapterConstructor != null ? adapterConstructor.Invoke([actorSystem.Serialization, settings]) - : Activator.CreateInstance(journalMessageAdapterType)) is IMessageAdapter adapter) + : Activator.CreateInstance(type)) is IMessageAdapter adapter) return adapter; logger.Error( - $"Unable to create instance of type [{journalMessageAdapterType.AssemblyQualifiedName}] Adapter. Do you have an empty constructor, or one that takes in Akka.Serialization.Serialization? Falling back to default."); - - return GetDefaultAdapter(); + $"Unable to create instance of type [{type.AssemblyQualifiedName}] Adapter. Do you have an empty constructor, or one that takes in Akka.Serialization.Serialization and Akka.Persistence.EventStore.Configuration.ISettingsWithAdapter? Falling back to default."); + + return Create(AdapterOverrides["default"], settings, actorSystem); } catch (Exception e) { logger.Error(e, "Error loading Adapter. Falling back to default"); - return GetDefaultAdapter(); - } - - IMessageAdapter GetDefaultAdapter() - { - return new DefaultMessageAdapter(actorSystem.Serialization, settings); + return Create(AdapterOverrides["default"], settings, actorSystem); } } } \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore/Serialization/SystemTextJsonMessageAdapter.cs b/src/Akka.Persistence.EventStore/Serialization/SystemTextJsonMessageAdapter.cs new file mode 100644 index 0000000..f072513 --- /dev/null +++ b/src/Akka.Persistence.EventStore/Serialization/SystemTextJsonMessageAdapter.cs @@ -0,0 +1,25 @@ +using System.Buffers; +using System.Text.Json; +using Akka.Persistence.EventStore.Configuration; + +namespace Akka.Persistence.EventStore.Serialization; + +public class SystemTextJsonMessageAdapter( + Akka.Serialization.Serialization serialization, + ISettingsWithAdapter settings) : DefaultMessageAdapter(serialization, settings) +{ + protected override async Task> Serialize(object data) + { + var buffer = new ArrayBufferWriter(); + await using var writer = new Utf8JsonWriter(buffer); + + JsonSerializer.Serialize(writer, data); + + return buffer.WrittenMemory; + } + + protected override Task DeSerialize(ReadOnlyMemory data, Type type) + { + return Task.FromResult(JsonSerializer.Deserialize(data.Span, type)); + } +} \ No newline at end of file