Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,34 @@ akka.persistence {
}
}
```
There is also an implementation using System.Text.Json in the plugin
that can be used. That can be configured like this:
**Using Akka Hosting:**
```csharp
var host = new HostBuilder()
.ConfigureServices((context, services) => {
services.AddAkka("my-system-name", (builder, provider) =>
{
builder.WithEventStorePersistence(
connectionString: _myConnectionString,
adapter: "system-text-json")
});
})
```

**Using Hocon:**
```
akka.persistence {
journal {
plugin = "akka.persistence.journal.eventstore""
eventstore {
class = "Akka.Persistence.EventStore.Journal.EventStoreJournal, Akka.Persistence.EventStore"
connection-string = "esdb://admin:changeit@localhost:2113"
adapter = "system-text-json"
}
}
}
```
# Projections

To support the Read Journal the plugin takes advantage of the [projections](https://developers.eventstore.com/server/v23.10/projections.html#introduction) feature
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Loggers;
using JetBrains.Annotations;

namespace Akka.Persistence.EventStore.Benchmarks;

Expand All @@ -36,7 +37,9 @@ public Config()
[GlobalSetup]
public async Task Setup()
{
_sys = await EventStoreBenchmarkFixture.CreateActorSystemWithCleanDb("system");
_sys = await EventStoreBenchmarkFixture.CreateActorSystemWithCleanDb(
"system",
overrideSerializer: AdapterType);
}

[GlobalCleanup]
Expand Down Expand Up @@ -72,9 +75,12 @@ public void SetupActors()
: new SingleActorBenchmarkProxy(benchActor, testProbe, Configuration.Commands[^1]);
}

[ParamsSource(nameof(GetNumberOfEventsConfiguration))]
[ParamsSource(nameof(GetNumberOfEventsConfiguration)), PublicAPI]
public MessagesPerSecondConfiguration Configuration { get; set; } = null!;

[ParamsSource(nameof(GetAdapterTypes)), PublicAPI]
public string AdapterType { get; set; } = null!;

public static IImmutableList<MessagesPerSecondConfiguration> GetNumberOfEventsConfiguration()
{
const int numberOfEvents = 1000;
Expand All @@ -87,6 +93,13 @@ public static IImmutableList<MessagesPerSecondConfiguration> GetNumberOfEventsCo
.ToImmutableList();
}

public static IImmutableList<string> GetAdapterTypes()
{
return ImmutableList.Create(
"default",
"system-text-json");
}

protected async Task RunBenchmark(string mode)
{
Configuration.Commands.ForEach(cmd => _benchmarkProxy.Send(mode, cmd));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ public static async Task<ActorSystem> CreateActorSystemFromSeededData(string nam
return ActorSystem.Create(name, config);
}

public static async Task<CleanActorSystem> CreateActorSystemWithCleanDb(string name, Config? extraConfig = null)
public static async Task<CleanActorSystem> CreateActorSystemWithCleanDb(
string name,
Config? extraConfig = null,
string? overrideSerializer = null)
{
var eventStoreContainer = new EventStoreContainer();
await eventStoreContainer.InitializeAsync();
Expand All @@ -38,6 +41,12 @@ public static async Task<CleanActorSystem> CreateActorSystemWithCleanDb(string n
.WithFallback(extraConfig ?? "")
.WithFallback(Persistence.DefaultConfig())
.WithFallback(EventStorePersistence.DefaultConfiguration);

if (!string.IsNullOrEmpty(overrideSerializer))
{
config = config.WithFallback(
$"akka.persistence.journal.eventstore.adapter = \"{overrideSerializer}\"");
}

var actorSystem = ActorSystem.Create(name, config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Loggers;
using EventStore.Client;
using JetBrains.Annotations;

namespace Akka.Persistence.EventStore.Benchmarks;

Expand All @@ -23,7 +24,7 @@ public Config()

private readonly ComplexEvent _complexEvent = ComplexEvent.Create();

private DefaultMessageAdapter _adapter = null!;
private IMessageAdapter _adapter = null!;
private EventStoreBenchmarkFixture.CleanActorSystem? _sys;

private ResolvedEvent _serializedStringEvent;
Expand All @@ -32,11 +33,12 @@ public Config()
[GlobalSetup]
public async Task Setup()
{
_sys = await EventStoreBenchmarkFixture.CreateActorSystemWithCleanDb("system");
_sys = await EventStoreBenchmarkFixture.CreateActorSystemWithCleanDb("system", overrideSerializer: AdapterType);

_adapter = new DefaultMessageAdapter(
_sys.System.Serialization,
new EventStoreJournalSettings(_sys.System.Settings.Config.GetConfig("akka.persistence.journal.eventstore")));
var settings =
new EventStoreJournalSettings(_sys.System.Settings.Config.GetConfig("akka.persistence.journal.eventstore"));

_adapter = settings.FindEventAdapter(_sys.System);

var serializedStringEvent = await _adapter.Adapt(new Persistent("a"));
var serializedComplexEvent = await _adapter.Adapt(new Persistent(_complexEvent));
Expand Down Expand Up @@ -82,6 +84,16 @@ public async Task Cleanup()
if (_sys is not null)
await _sys.DisposeAsync();
}

[ParamsSource(nameof(GetAdapterTypes)), PublicAPI]
public string AdapterType { get; set; } = null!;

public static IImmutableList<string> GetAdapterTypes()
{
return ImmutableList.Create(
"default",
"system-text-json");
}

[Benchmark]
public async Task SerializeStringEvent()
Expand All @@ -107,6 +119,7 @@ public async Task DeSerializeComplexEvent()
await _adapter.AdaptEvent(_serializedComplexEvent);
}

[PublicAPI]
public record ComplexEvent(
string Name,
int Number,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public static class EventStoreConfiguration
public static Config Build(
EventStoreContainer eventStoreContainer,
string tenant,
Type? overrideSerializer = null)
string? overrideSerializer = null)
{
var config = ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
Expand Down Expand Up @@ -40,7 +40,7 @@ class = ""Akka.Persistence.EventStore.Query.EventStoreReadJournalProvider, Akka.
if (overrideSerializer != null)
{
config = config.WithFallback(
$"akka.persistence.journal.eventstore.adapter = \"{overrideSerializer.ToClrTypeName()}\"");
$"akka.persistence.journal.eventstore.adapter = \"{overrideSerializer}\"");
}

return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public EventStoreJournalAltAdapterSpec(EventStoreContainer eventStoreContainer)
: base(EventStoreConfiguration.Build(
eventStoreContainer,
"alt-journal-spec",
typeof(TestMessageAdapter)), nameof(EventStoreJournalSpec))
typeof(TestMessageAdapter).ToClrTypeName()), nameof(EventStoreJournalAltAdapterSpec))
{
Initialize();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Akka.Persistence.TCK.Journal;
using Xunit;

namespace Akka.Persistence.EventStore.Tests;

[Collection(nameof(EventStoreTestsDatabaseCollection))]
public class EventStoreJournalSystemTextJsonAdapterSpec : JournalSpec
{
protected override bool SupportsRejectingNonSerializableObjects => false;

// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
protected override bool SupportsSerialization => false;

public EventStoreJournalSystemTextJsonAdapterSpec(EventStoreContainer eventStoreContainer)
: base(EventStoreConfiguration.Build(
eventStoreContainer,
"system-text-json-journal-spec",
"system-text-json"), nameof(EventStoreJournalSystemTextJsonAdapterSpec))
{
Initialize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ protected virtual IStoredEventMetadata GetEventMetadata(
[PublicAPI]
protected virtual async Task<IStoredEventMetadata?> GetEventMetadataFrom(ResolvedEvent evnt)
{
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
if (evnt.Event == null)
return null;

Expand Down Expand Up @@ -180,11 +181,15 @@ public interface IStoredEventMetadata
string writerGuid { get; }
// ReSharper disable once InconsistentNaming
long? timestamp { get; }
// ReSharper disable once InconsistentNaming
string tenant { get; set; }
// ReSharper disable once InconsistentNaming
IImmutableSet<string> tags { get; set; }
}

[PublicAPI]
public class StoredEventMetadata : IStoredEventMetadata
{
[PublicAPI]
public StoredEventMetadata()
{

Expand All @@ -209,18 +214,13 @@ public StoredEventMetadata(

public string persistenceId { get; set; } = null!;
// ReSharper disable once InconsistentNaming
[PublicAPI]
public DateTimeOffset occurredOn { get; set; }
public string manifest { get; set; } = null!;
public long sequenceNr { get; set; }
public string writerGuid { get; set; } = null!;
public string journalType { get; set; } = null!;
public long? timestamp { get; set; }
// ReSharper disable once InconsistentNaming
[PublicAPI]
public string tenant { get; set; } = null!;
// ReSharper disable once InconsistentNaming
[PublicAPI]
public IImmutableSet<string> tags { get; set; } = ImmutableHashSet<string>.Empty;
public IActorRef? sender { get; set; }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Event;
using Akka.Persistence.EventStore.Configuration;
Expand All @@ -6,51 +7,60 @@ namespace Akka.Persistence.EventStore.Serialization;

public static class SettingsWithAdapterExtensions
{
private static readonly IImmutableDictionary<string, Type> AdapterOverrides = new Dictionary<string, Type>
{
["default"] = typeof(DefaultMessageAdapter),
["system-text-json"] = typeof(SystemTextJsonMessageAdapter)
}.ToImmutableDictionary();

public static IMessageAdapter FindEventAdapter(
this ISettingsWithAdapter settings,
ActorSystem actorSystem)
{
if (settings.Adapter == "default")
return GetDefaultAdapter();

var type = AdapterOverrides.TryGetValue(settings.Adapter, out var adapterType)
? adapterType
: Type.GetType(settings.Adapter);

return Create(type, settings, actorSystem);
}

private static IMessageAdapter Create(
Type? type,
ISettingsWithAdapter settings,
ActorSystem actorSystem)
{
var logger = actorSystem.Log;

if (type == null)
{
logger.Error(
"Unable to find type [{0}] Adapter. Is the assembly referenced properly? Falling back to default",
settings.Adapter);

return Create(AdapterOverrides["default"], settings, actorSystem);
}

try
{
var journalMessageAdapterType = Type.GetType(settings.Adapter);

if (journalMessageAdapterType == null)
{
logger.Error(
$"Unable to find type [{settings.Adapter}] Adapter. Is the assembly referenced properly? Falling back to default");

return GetDefaultAdapter();
}

var adapterConstructor =
journalMessageAdapterType.GetConstructor([typeof(Akka.Serialization.Serialization), typeof(ISettingsWithAdapter)]);
type.GetConstructor([typeof(Akka.Serialization.Serialization), typeof(ISettingsWithAdapter)]);

if ((adapterConstructor != null
? adapterConstructor.Invoke([actorSystem.Serialization, settings])
: Activator.CreateInstance(journalMessageAdapterType)) is IMessageAdapter adapter)
: Activator.CreateInstance(type)) is IMessageAdapter adapter)

return adapter;

logger.Error(
$"Unable to create instance of type [{journalMessageAdapterType.AssemblyQualifiedName}] Adapter. Do you have an empty constructor, or one that takes in Akka.Serialization.Serialization? Falling back to default.");
return GetDefaultAdapter();
$"Unable to create instance of type [{type.AssemblyQualifiedName}] Adapter. Do you have an empty constructor, or one that takes in Akka.Serialization.Serialization and Akka.Persistence.EventStore.Configuration.ISettingsWithAdapter? Falling back to default.");

return Create(AdapterOverrides["default"], settings, actorSystem);
}
catch (Exception e)
{
logger.Error(e, "Error loading Adapter. Falling back to default");

return GetDefaultAdapter();
}

IMessageAdapter GetDefaultAdapter()
{
return new DefaultMessageAdapter(actorSystem.Serialization, settings);
return Create(AdapterOverrides["default"], settings, actorSystem);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System.Buffers;
using System.Text.Json;
using Akka.Persistence.EventStore.Configuration;

namespace Akka.Persistence.EventStore.Serialization;

public class SystemTextJsonMessageAdapter(
Akka.Serialization.Serialization serialization,
ISettingsWithAdapter settings) : DefaultMessageAdapter(serialization, settings)
{
protected override async Task<ReadOnlyMemory<byte>> Serialize(object data)
{
var buffer = new ArrayBufferWriter<byte>();
await using var writer = new Utf8JsonWriter(buffer);

JsonSerializer.Serialize(writer, data);

return buffer.WrittenMemory;
}

protected override Task<object?> DeSerialize(ReadOnlyMemory<byte> data, Type type)
{
return Task.FromResult(JsonSerializer.Deserialize(data.Span, type));
}
}