diff --git a/docs/articles/persistence/custom-persistence-provider.md b/docs/articles/persistence/custom-persistence-provider.md index 519c3208611..ca30c385622 100644 --- a/docs/articles/persistence/custom-persistence-provider.md +++ b/docs/articles/persistence/custom-persistence-provider.md @@ -217,6 +217,11 @@ akka.persistence.journal-plugin-fallback { # Dispatcher for message replay. replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" + # journal supervisor strategy used. + # It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. And have a parameterless constructor + # by default it restarts the journal on crash + supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy" + # Default serializer used as manifest serializer when applicable # and payload serializer when no specific binding overrides are specified serializer = "json" @@ -406,6 +411,11 @@ akka.persistence.snapshot-store-plugin-fallback { # Dispatcher for the plugin actor. plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + # snapshot-store supervisor strategy used. + # It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. And have a parameterless constructor + # by default it restarts the snapshot-store on crash + supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy" + # Default serializer used as manifest serializer when applicable # and payload serializer when no specific binding overrides are specified serializer = "json" diff --git a/docs/articles/persistence/storage-plugins.md b/docs/articles/persistence/storage-plugins.md index aa8c63c35bc..d2ba4983100 100644 --- a/docs/articles/persistence/storage-plugins.md +++ b/docs/articles/persistence/storage-plugins.md @@ -36,3 +36,49 @@ akka { } } ``` + +### Controlling Journal or Snapshot Crash Behavior + +By default the base implementations upon which all journal or snapshot-store implementations are build upon provides out of the box behavior for dealing with errors that occur during the writing or reading of data from the underlying store. Errors that occur will be communicated with the persistent actor that is using them at that time. +So in general once started successfully the journal or snapshot-store will be ready and available for the duration of your application, and won't crash. However in the case they do crash, due to unforeseen circumstances the default behavior is to immediately restart them. This is generally the behavior you want. +But in case you do want to customize how the system handles the crashing of the journal or snapshot-store. You can specify your own supervision strategy using the `supervisor-strategy` property. +This class needs to inherit from `Akka.Actor.SupervisorStrategyConfigurator` and have a parameter-less constructor. +Configuration example: + +```hocon +akka { + persistence { + journal { + plugin = "akka.persistence.journal.sqlite" + auto-start-journals = ["akka.persistence.journal.sqlite"] + supervisor-strategy = "My.full.namespace.CustomSupervisorStrategyConfigurator" + } + snapshot-store { + plugin = "akka.persistence.snapshot-store.sqlite" + auto-start-snapshot-stores = ["akka.persistence.snapshot-store.sqlite"] + supervisor-strategy = "My.full.namespace.CustomSupervisorStrategyConfigurator" + } + } +} +``` + +One such case could be to detect and handle misconfigured application settings during startup. For example if your using a SQL based journal and you misconfigured the connection string you might opt to return a supervision strategy that detects certain network connection errors, and after a few retries signals your application to shutdown instead of continue running with a journal or snapshot-store that in all likelihood will never be able to recover, forever stuck in a restart loop while your application is running. + +An example of what this could look like is this: + +```csharp + + public class MyCustomSupervisorConfigurator : SupervisorStrategyConfigurator + { + public override SupervisorStrategy Create() + { + //optionally only stop if the error occurs more then x times in y period + //this will be highly likely if its an unrecoverable error during start/init of the journal/snapshot store + return new OneForOneStrategy(10,TimeSpan.FromSeconds(5),ex => + { + //detect unrecoverable exception here + return Directive.Stop; + }); + } + } +``` diff --git a/src/core/Akka.Persistence.Tests/PersistenceConfigSpec.cs b/src/core/Akka.Persistence.Tests/PersistenceConfigSpec.cs index 4223bbf522e..7c9b9e3a285 100644 --- a/src/core/Akka.Persistence.Tests/PersistenceConfigSpec.cs +++ b/src/core/Akka.Persistence.Tests/PersistenceConfigSpec.cs @@ -5,11 +5,15 @@ // //----------------------------------------------------------------------- +using System; +using System.Collections.Generic; using Akka.Actor; +using Akka.Actor.Internal; using Akka.Configuration; using Akka.Persistence.Journal; using Akka.Persistence.Snapshot; using Akka.TestKit; +using Akka.Util; using Xunit; using Xunit.Abstractions; @@ -68,6 +72,32 @@ protected internal override bool AroundReceive(Receive receive, object message) } } + public class TestSupervisorConfigurator : SupervisorStrategyConfigurator + { + public override SupervisorStrategy Create() + { + return new CustomStrategy(10,TimeSpan.FromSeconds(5),ex => + { + //detect unrecoverable exception here + return Directive.Stop; + }); + } + } + + public class CustomStrategy : OneForOneStrategy + { + public CustomStrategy(int? maxNrOfRetries, TimeSpan? withinTimeRange, Func localOnlyDecider) : base(maxNrOfRetries, withinTimeRange, localOnlyDecider) + { + } + + public override void HandleChildTerminated(IActorContext actorContext, IActorRef child, IEnumerable children) + { + //because the journal does not has child actors, the ref is always the actor itself. So optionally do something special here + //to indicate to the system that the journal crashed in an unrecoverable way. + } + } + + #endregion private static readonly string SpecConfig = @" @@ -82,6 +112,12 @@ class = ""Akka.Persistence.Tests.PersistenceConfigSpec+TestJournal, Akka.Persist plugin-dispatcher = ""akka.actor.default-dispatcher"" test-value = ""B"" } + test3 { + class = ""Akka.Persistence.Tests.PersistenceConfigSpec+TestJournal, Akka.Persistence.Tests"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + test-value = ""B"" + supervisor-strategy = ""Akka.Persistence.Tests.PersistenceConfigSpec+TestSupervisorConfigurator, Akka.Persistence.Tests"" + } } akka.persistence.snapshot-store { test1 { @@ -100,6 +136,51 @@ public PersistenceConfigSpec(ITestOutputHelper output) : base(SpecConfig, output { } + /// + /// Verify that the journal config contains the expected default from our fallback configs + /// No spec for when the user overrides that because its not the goal to test the hocon config system. + /// Merely that the plugin system here properly applies the fallback config for this config value. + /// + [Fact] + public void Journal_has_supervision_strategy_configured() + { + var persistence = Persistence.Instance.Apply(Sys); + + var config = persistence.JournalConfigFor("akka.persistence.journal.test2"); + var defaultstrategy = config.GetString("supervisor-strategy"); + defaultstrategy.ShouldBe(typeof(Akka.Actor.DefaultSupervisorStrategy).FullName); + } + + /// + /// Verify that the snapshot config contains the expected default from our fallback configs + /// No spec for when the user overrides that because its not the goal to test the hocon config system. + /// Merely that the plugin system here properly applies the fallback config for this config value. + /// + [Fact] + public void Snapshot_has_supervision_strategy_configured() + { + var persistence = Persistence.Instance.Apply(Sys); + + var config = persistence.JournalConfigFor("akka.persistence.snapshot-store.test1"); + var defaultstrategy = config.GetString("supervisor-strategy"); + defaultstrategy.ShouldBe(typeof(Akka.Actor.DefaultSupervisorStrategy).FullName); + } + + [Fact] + public void Journal_has_custom_supervision_strategy_applied() + { + var persistence = Persistence.Instance.Apply(Sys); + var journal = persistence.JournalFor("akka.persistence.journal.test3"); //get our journal with the custom configuration + + //waves magic wand + var magicref = journal as ActorRefWithCell; + var appliedStrat = magicref.Underlying.Props.SupervisorStrategy; + //because the configured value for our supervisor strategy is our CustomStrategy + //we verify that the strat returned is the same as currently applied + var customstrategy = new TestSupervisorConfigurator().Create(); + appliedStrat.GetType().ShouldBe(customstrategy.GetType()); + } + [Fact] public void Persistence_should_use_inmem_journal_by_default() { diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs index 55b09aefd1e..317d2acf187 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs @@ -189,12 +189,7 @@ protected sealed override bool Receive(object message) { return ReceiveWriteJournal(message) || ReceivePluginInternal(message); } - - /// - /// TBD - /// - /// TBD - /// TBD + protected bool ReceiveWriteJournal(object message) { switch (message) diff --git a/src/core/Akka.Persistence/Journal/WriteJournal.cs b/src/core/Akka.Persistence/Journal/WriteJournal.cs index f0de1e73496..871e9047a27 100644 --- a/src/core/Akka.Persistence/Journal/WriteJournal.cs +++ b/src/core/Akka.Persistence/Journal/WriteJournal.cs @@ -14,7 +14,7 @@ namespace Akka.Persistence.Journal { /// - /// TBD + /// Base class for the journal persistence /// public abstract class WriteJournalBase : ActorBase { @@ -30,10 +30,11 @@ protected WriteJournalBase() } /// - /// TBD + /// Creates a sequence of write actions to be executed based on the given messages. + /// Applies any registered EventAdapters to the payloads. /// - /// TBD - /// TBD + /// list of messages to write + /// protected IEnumerable PreparePersistentBatch(IEnumerable resequenceables) { foreach (var resequenceable in resequenceables) @@ -53,7 +54,7 @@ protected IEnumerable PreparePersistentBatch(IEnumerable - /// INTERNAL API + /// Apply registered eventadapter to the data payload /// [InternalApi] protected IEnumerable AdaptFromJournal(IPersistentRepresentation representation) @@ -65,7 +66,7 @@ protected IEnumerable AdaptFromJournal(IPersistentRep } /// - /// INTERNAL API + /// Apply any registered eventadapter to the data payload /// protected IPersistentRepresentation AdaptToJournal(IPersistentRepresentation representation) { diff --git a/src/core/Akka.Persistence/Persistence.cs b/src/core/Akka.Persistence/Persistence.cs index d6fc71537b4..d6d8f052719 100644 --- a/src/core/Akka.Persistence/Persistence.cs +++ b/src/core/Akka.Persistence/Persistence.cs @@ -302,11 +302,17 @@ private static IActorRef CreatePlugin(ExtendedActorSystem system, string configP var pluginType = Type.GetType(pluginTypeName, true); var pluginDispatcherId = pluginConfig.GetString("plugin-dispatcher", null); object[] pluginActorArgs = pluginType.GetConstructor(new[] { typeof(Config) }) != null ? new object[] { pluginConfig } : null; - var pluginActorProps = new Props(pluginType, pluginActorArgs).WithDispatcher(pluginDispatcherId); - + + //todo wrap in backoffsupervisor ? + + //supervisor-strategy is defined by default in the fallback configs. So we always expect to get a value here even if the user has not explicitly defined anything + var configurator = SupervisorStrategyConfigurator.CreateConfigurator(pluginConfig.GetString("supervisor-strategy")); + + var pluginActorProps = new Props(pluginType, pluginActorArgs).WithDispatcher(pluginDispatcherId).WithSupervisorStrategy(configurator.Create()); + return system.SystemActorOf(pluginActorProps, pluginActorName); } - + private static EventAdapters CreateAdapters(ExtendedActorSystem system, string configPath) { var pluginConfig = system.Settings.Config.GetConfig(configPath); diff --git a/src/core/Akka.Persistence/persistence.conf b/src/core/Akka.Persistence/persistence.conf index 3c1273edca5..af28d4800e7 100644 --- a/src/core/Akka.Persistence/persistence.conf +++ b/src/core/Akka.Persistence/persistence.conf @@ -114,7 +114,11 @@ akka.persistence { # Dispatcher for message replay. replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" - + # journal supervisor strategy used. + # It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. And have a parameterless constructor + # by default it restarts the journal on crash + supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy" + # Default serializer used as manifest serializer when applicable and payload serializer when no specific binding overrides are specified serializer = "json" @@ -173,7 +177,12 @@ akka.persistence { # Dispatcher for the plugin actor. plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" - + + # snapshot supervisor strategy used. + # It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. And have a parameterless constructor + # by default it restarts the snapshot on crash + supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy" + # Default serializer used as manifest serializer when applicable and payload serializer when no specific binding overrides are specified serializer = "json" circuit-breaker { diff --git a/src/core/Akka/Actor/SupervisorStrategy.cs b/src/core/Akka/Actor/SupervisorStrategy.cs index d5246feb48d..84ebbcabde0 100644 --- a/src/core/Akka/Actor/SupervisorStrategy.cs +++ b/src/core/Akka/Actor/SupervisorStrategy.cs @@ -615,15 +615,7 @@ protected override Directive Handle(IActorRef child, Exception exception) return Decider.Decide(exception); } - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD + /// public override void ProcessFailure(IActorContext context, bool restart, IActorRef child, Exception cause, ChildRestartStats stats, IReadOnlyCollection children) { if (children.Count > 0) @@ -645,12 +637,7 @@ public override void ProcessFailure(IActorContext context, bool restart, IActorR } } - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD + /// public override void HandleChildTerminated(IActorContext actorContext, IActorRef child, IEnumerable children) { //Intentionally left blank @@ -998,14 +985,10 @@ public override int GetHashCode() } /// - /// TBD + /// Base configurator class used for configuring the guardian-supervisor-strategy /// public abstract class SupervisorStrategyConfigurator { - /// - /// TBD - /// - /// TBD public abstract SupervisorStrategy Create(); /// @@ -1037,30 +1020,20 @@ public static SupervisorStrategyConfigurator CreateConfigurator(string typeName) } } - /// - /// TBD - /// + public class DefaultSupervisorStrategy : SupervisorStrategyConfigurator { - /// - /// TBD - /// - /// TBD + public override SupervisorStrategy Create() { return SupervisorStrategy.DefaultStrategy; } } - /// - /// TBD - /// + public class StoppingSupervisorStrategy : SupervisorStrategyConfigurator { - /// - /// TBD - /// - /// TBD + public override SupervisorStrategy Create() { return SupervisorStrategy.StoppingStrategy;