diff --git a/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs b/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs
index 2bdc59b4..3ff209c7 100644
--- a/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs
+++ b/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs
@@ -9,84 +9,199 @@
using Akka.Persistence.Azure.Journal;
using Akka.Persistence.Azure.Query;
using Akka.Persistence.Azure.Snapshot;
+using Azure.Data.Tables;
+using Azure.Identity;
+using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using FluentAssertions;
+using FluentAssertions.Extensions;
using Xunit;
namespace Akka.Persistence.Azure.Tests
{
public class AzurePersistenceConfigSpec
{
+ private const string SnapshotStorePath = "akka.persistence.snapshot-store.azure-blob-store";
+ private const string JournalPath = "akka.persistence.journal.azure-table";
+
+ private static readonly AzureBlobSnapshotStoreSettings DefaultSnapshotSettings =
+ AzureBlobSnapshotStoreSettings.Create(AzurePersistence.DefaultConfig.GetConfig(SnapshotStorePath));
+
+ private static readonly AzureTableStorageJournalSettings DefaultJournalSettings =
+ AzureTableStorageJournalSettings.Create(AzurePersistence.DefaultConfig.GetConfig(JournalPath));
+
[Fact]
public void ShouldLoadDefaultConfig()
{
- var defaultConfig = AzurePersistence.DefaultConfig;
- defaultConfig.HasPath("akka.persistence.journal.azure-table").Should().BeTrue();
- defaultConfig.HasPath("akka.persistence.snapshot-store.azure-blob-store").Should().BeTrue();
+ AzurePersistence.DefaultConfig.HasPath(SnapshotStorePath).Should().BeTrue();
+ AzurePersistence.DefaultConfig.HasPath(JournalPath).Should().BeTrue();
+ AzurePersistence.DefaultConfig.HasPath(AzureTableStorageReadJournal.Identifier).Should().BeTrue();
}
[Fact]
public void ShouldParseDefaultSnapshotConfig()
{
- var blobSettings =
- AzureBlobSnapshotStoreSettings.Create(
- ConfigurationFactory.ParseString(@"akka.persistence.snapshot-store.azure-blob-store{
- connection-string = foo
- container-name = bar
- }").WithFallback(AzurePersistence.DefaultConfig)
- .GetConfig("akka.persistence.snapshot-store.azure-blob-store"));
+ var settings =
+ AzureBlobSnapshotStoreSettings.Create(AzurePersistence.DefaultConfig.GetConfig(SnapshotStorePath));
- blobSettings.ContainerName.Should().Be("bar");
- blobSettings.ConnectionString.Should().Be("foo");
- blobSettings.ConnectTimeout.Should().Be(TimeSpan.FromSeconds(3));
- blobSettings.RequestTimeout.Should().Be(TimeSpan.FromSeconds(3));
- blobSettings.VerboseLogging.Should().BeFalse();
- blobSettings.ContainerPublicAccessType.Should().Be(PublicAccessType.None);
+ settings.ConnectionString.Should().BeEmpty();
+ settings.ContainerName.Should().Be("akka-persistence-default-container");
+ settings.ConnectTimeout.Should().Be(3.Seconds());
+ settings.RequestTimeout.Should().Be(3.Seconds());
+ settings.VerboseLogging.Should().BeFalse();
+ settings.Development.Should().BeFalse();
+ settings.AutoInitialize.Should().BeTrue();
+ settings.ContainerPublicAccessType.Should().Be(PublicAccessType.None);
+ settings.ServiceUri.Should().BeNull();
+ settings.DefaultAzureCredential.Should().BeNull();
+ settings.BlobClientOptions.Should().BeNull();
}
- [Fact]
- public void ShouldProvideDefaultContainerNameValue()
+ [Fact(DisplayName = "AzureBlobSnapshotStoreSettings With overrides should override default values")]
+ public void SnapshotSettingsWithMethodsTest()
{
- var blobSettings =
- AzureBlobSnapshotStoreSettings.Create(
- ConfigurationFactory.ParseString(@"akka.persistence.snapshot-store.azure-blob-store{
- connection-string = foo
- }").WithFallback(AzurePersistence.DefaultConfig)
- .GetConfig("akka.persistence.snapshot-store.azure-blob-store"));
+ var uri = new Uri("https://whatever.com");
+ var credentials = new DefaultAzureCredential();
+ var options = new BlobClientOptions();
+ var settings = DefaultSnapshotSettings
+ .WithConnectionString("abc")
+ .WithContainerName("bcd")
+ .WithConnectTimeout(1.Seconds())
+ .WithRequestTimeout(2.Seconds())
+ .WithVerboseLogging(true)
+ .WithDevelopment(true)
+ .WithAutoInitialize(false)
+ .WithContainerPublicAccessType(PublicAccessType.Blob)
+ .WithAzureCredential(uri, credentials, options);
+
+ settings.ConnectionString.Should().Be("abc");
+ settings.ContainerName.Should().Be("bcd");
+ settings.ConnectTimeout.Should().Be(1.Seconds());
+ settings.RequestTimeout.Should().Be(2.Seconds());
+ settings.VerboseLogging.Should().BeTrue();
+ settings.Development.Should().BeTrue();
+ settings.AutoInitialize.Should().BeFalse();
+ settings.ContainerPublicAccessType.Should().Be(PublicAccessType.Blob);
+ settings.ServiceUri.Should().Be(uri);
+ settings.DefaultAzureCredential.Should().Be(credentials);
+ settings.BlobClientOptions.Should().Be(options);
+ }
- blobSettings.ContainerName.Should().Be("akka-persistence-default-container");
+ [Fact(DisplayName = "AzureBlobSnapshotStoreSetup should override settings values")]
+ public void SnapshotSetupTest()
+ {
+ var uri = new Uri("https://whatever.com");
+ var credentials = new DefaultAzureCredential();
+ var options = new BlobClientOptions();
+ var setup = new AzureBlobSnapshotSetup
+ {
+ ConnectionString = "abc",
+ ContainerName = "bcd",
+ ConnectTimeout = 1.Seconds(),
+ RequestTimeout = 2.Seconds(),
+ VerboseLogging = true,
+ Development = true,
+ AutoInitialize = false,
+ ContainerPublicAccessType = PublicAccessType.Blob,
+ ServiceUri = uri,
+ DefaultAzureCredential = credentials,
+ BlobClientOptions = options
+ };
+
+ var settings = setup.Apply(DefaultSnapshotSettings);
+
+ settings.ConnectionString.Should().Be("abc");
+ settings.ContainerName.Should().Be("bcd");
+ settings.ConnectTimeout.Should().Be(1.Seconds());
+ settings.RequestTimeout.Should().Be(2.Seconds());
+ settings.VerboseLogging.Should().BeTrue();
+ settings.Development.Should().BeTrue();
+ settings.AutoInitialize.Should().BeFalse();
+ settings.ContainerPublicAccessType.Should().Be(PublicAccessType.Blob);
+ settings.ServiceUri.Should().Be(uri);
+ settings.DefaultAzureCredential.Should().Be(credentials);
+ settings.BlobClientOptions.Should().Be(options);
}
[Fact]
public void ShouldParseTableConfig()
{
- var tableSettings =
- AzureTableStorageJournalSettings.Create(
- ConfigurationFactory.ParseString(@"akka.persistence.journal.azure-table{
- connection-string = foo
- table-name = bar
- }").WithFallback(AzurePersistence.DefaultConfig)
- .GetConfig("akka.persistence.journal.azure-table"));
+ var settings = DefaultJournalSettings;
- tableSettings.TableName.Should().Be("bar");
- tableSettings.ConnectionString.Should().Be("foo");
- tableSettings.ConnectTimeout.Should().Be(TimeSpan.FromSeconds(3));
- tableSettings.RequestTimeout.Should().Be(TimeSpan.FromSeconds(3));
- tableSettings.VerboseLogging.Should().BeFalse();
+ settings.ConnectionString.Should().BeEmpty();
+ settings.TableName.Should().Be("AkkaPersistenceDefaultTable");
+ settings.ConnectTimeout.Should().Be(3.Seconds());
+ settings.RequestTimeout.Should().Be(3.Seconds());
+ settings.VerboseLogging.Should().BeFalse();
+ settings.Development.Should().BeFalse();
+ settings.AutoInitialize.Should().BeTrue();
+ settings.ServiceUri.Should().BeNull();
+ settings.DefaultAzureCredential.Should().BeNull();
+ settings.TableClientOptions.Should().BeNull();
}
- [Fact]
- public void ShouldProvideDefaultTableNameValue()
+ [Fact(DisplayName = "AzureTableStorageJournalSettings With overrides should override default values")]
+ public void JournalSettingsWithMethodsTest()
{
- var tableSettings =
- AzureTableStorageJournalSettings.Create(
- ConfigurationFactory.ParseString(@"akka.persistence.journal.azure-table{
- connection-string = foo
- }").WithFallback(AzurePersistence.DefaultConfig)
- .GetConfig("akka.persistence.journal.azure-table"));
- tableSettings.TableName.Should().Be("AkkaPersistenceDefaultTable");
+ var uri = new Uri("https://whatever.com");
+ var credentials = new DefaultAzureCredential();
+ var options = new TableClientOptions();
+ var settings = DefaultJournalSettings
+ .WithConnectionString("abc")
+ .WithTableName("bcd")
+ .WithConnectTimeout(1.Seconds())
+ .WithRequestTimeout(2.Seconds())
+ .WithVerboseLogging(true)
+ .WithDevelopment(true)
+ .WithAutoInitialize(false)
+ .WithAzureCredential(uri, credentials, options);
+
+ settings.ConnectionString.Should().Be("abc");
+ settings.TableName.Should().Be("bcd");
+ settings.ConnectTimeout.Should().Be(1.Seconds());
+ settings.RequestTimeout.Should().Be(2.Seconds());
+ settings.VerboseLogging.Should().BeTrue();
+ settings.Development.Should().BeTrue();
+ settings.AutoInitialize.Should().BeFalse();
+ settings.ServiceUri.Should().Be(uri);
+ settings.DefaultAzureCredential.Should().Be(credentials);
+ settings.TableClientOptions.Should().Be(options);
}
+ [Fact(DisplayName = "AzureTableStorageJournalSetup should override settings values")]
+ public void JournalSetupTest()
+ {
+ var uri = new Uri("https://whatever.com");
+ var credentials = new DefaultAzureCredential();
+ var options = new TableClientOptions();
+ var setup = new AzureTableStorageJournalSetup
+ {
+ ConnectionString = "abc",
+ TableName = "bcd",
+ ConnectTimeout = 1.Seconds(),
+ RequestTimeout = 2.Seconds(),
+ VerboseLogging = true,
+ Development = true,
+ AutoInitialize = false,
+ ServiceUri = uri,
+ DefaultAzureCredential = credentials,
+ TableClientOptions = options
+ };
+
+ var settings = setup.Apply(DefaultJournalSettings);
+
+ settings.ConnectionString.Should().Be("abc");
+ settings.TableName.Should().Be("bcd");
+ settings.ConnectTimeout.Should().Be(1.Seconds());
+ settings.RequestTimeout.Should().Be(2.Seconds());
+ settings.VerboseLogging.Should().BeTrue();
+ settings.Development.Should().BeTrue();
+ settings.AutoInitialize.Should().BeFalse();
+ settings.ServiceUri.Should().Be(uri);
+ settings.DefaultAzureCredential.Should().Be(credentials);
+ settings.TableClientOptions.Should().Be(options);
+ }
+
[Theory]
[InlineData("fo", "Invalid table name length")]
[InlineData("1foo", "Invalid table name")]
diff --git a/src/Akka.Persistence.Azure/AzurePersistence.cs b/src/Akka.Persistence.Azure/AzurePersistence.cs
index 478f8a57..7abeb4bb 100644
--- a/src/Akka.Persistence.Azure/AzurePersistence.cs
+++ b/src/Akka.Persistence.Azure/AzurePersistence.cs
@@ -34,7 +34,7 @@ public AzurePersistence(ActorSystem system, AzureTableStorageJournalSettings tab
///
/// The default HOCON configuration for .
///
- public static Config DefaultConfig =>
+ public static readonly Config DefaultConfig =
ConfigurationFactory.FromResource("Akka.Persistence.Azure.reference.conf");
///
diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs
index 18915695..3b925498 100644
--- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs
+++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs
@@ -61,8 +61,26 @@ public AzureTableStorageJournal(Config config = null)
AzurePersistence.Get(Context.System).TableSettings :
AzureTableStorageJournalSettings.Create(config);
+ var setup = Context.System.Settings.Setup.Get();
+ if (setup.HasValue)
+ _settings = setup.Value.Apply(_settings);
+
_serialization = new SerializationHelper(Context.System);
- _tableServiceClient = new TableServiceClient(_settings.ConnectionString);
+
+ if (_settings.Development)
+ {
+ _tableServiceClient = new TableServiceClient(connectionString: "UseDevelopmentStorage=true");
+ }
+ else
+ {
+ // Use DefaultAzureCredential if both ServiceUri and DefaultAzureCredential are populated in the settings
+ _tableServiceClient = _settings.ServiceUri != null && _settings.DefaultAzureCredential != null
+ ? new TableServiceClient(
+ endpoint: _settings.ServiceUri,
+ tokenCredential: _settings.DefaultAzureCredential,
+ options: _settings.TableClientOptions)
+ : new TableServiceClient(connectionString: _settings.ConnectionString);
+ }
}
public TableClient Table
diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs
index 8ac6f0ba..95ed994e 100644
--- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs
+++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs
@@ -6,8 +6,11 @@
using System;
using System.Linq;
+using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Azure.Util;
+using Azure.Data.Tables;
+using Azure.Identity;
namespace Akka.Persistence.Azure.Journal
{
@@ -18,6 +21,7 @@ public sealed class AzureTableStorageJournalSettings
{
private static readonly string[] ReservedTableNames = {"tables"};
+ [Obsolete]
public AzureTableStorageJournalSettings(
string connectionString,
string tableName,
@@ -26,6 +30,30 @@ public AzureTableStorageJournalSettings(
bool verboseLogging,
bool development,
bool autoInitialize)
+ : this(
+ connectionString: connectionString,
+ tableName: tableName,
+ connectTimeout: connectTimeout,
+ requestTimeout: requestTimeout,
+ verboseLogging: verboseLogging,
+ development: development,
+ autoInitialize: autoInitialize,
+ serviceUri: null,
+ defaultAzureCredential: null,
+ tableClientOptions: null)
+ { }
+
+ public AzureTableStorageJournalSettings(
+ string connectionString,
+ string tableName,
+ TimeSpan connectTimeout,
+ TimeSpan requestTimeout,
+ bool verboseLogging,
+ bool development,
+ bool autoInitialize,
+ Uri serviceUri,
+ DefaultAzureCredential defaultAzureCredential,
+ TableClientOptions tableClientOptions)
{
if(string.IsNullOrWhiteSpace(tableName))
throw new ConfigurationException("[AzureTableStorageJournal] Table name is null or empty.");
@@ -45,6 +73,9 @@ public AzureTableStorageJournalSettings(
VerboseLogging = verboseLogging;
Development = development;
AutoInitialize = autoInitialize;
+ ServiceUri = serviceUri;
+ DefaultAzureCredential = defaultAzureCredential;
+ TableClientOptions = tableClientOptions;
}
///
@@ -72,10 +103,97 @@ public AzureTableStorageJournalSettings(
///
public bool VerboseLogging { get; }
+ ///
+ /// Flag that we're running in development mode. When this is set, and
+ /// will be ignored, replaced with "UseDevelopmentStorage=true" for local
+ /// connection to Azurite.
+ ///
public bool Development { get; }
+ ///
+ /// Automatically create the Table Storage table if no existing table is found
+ ///
public bool AutoInitialize { get; }
+ ///
+ /// A referencing the blob service.
+ /// This is likely to be similar to "https://{account_name}.table.core.windows.net".
+ ///
+ public Uri ServiceUri { get; }
+
+ ///
+ /// The used to sign API requests.
+ ///
+ public DefaultAzureCredential DefaultAzureCredential { get; }
+
+ ///
+ /// Optional client options that define the transport pipeline policies for authentication,
+ /// retries, etc., that are applied to every request.
+ ///
+ public TableClientOptions TableClientOptions { get; }
+
+ public AzureTableStorageJournalSettings WithConnectionString(string connectionString)
+ => Copy(connectionString: connectionString);
+ public AzureTableStorageJournalSettings WithTableName(string tableName)
+ => Copy(tableName: tableName);
+ public AzureTableStorageJournalSettings WithConnectTimeout(TimeSpan connectTimeout)
+ => Copy(connectTimeout: connectTimeout);
+ public AzureTableStorageJournalSettings WithRequestTimeout(TimeSpan requestTimeout)
+ => Copy(requestTimeout: requestTimeout);
+ public AzureTableStorageJournalSettings WithVerboseLogging(bool verboseLogging)
+ => Copy(verboseLogging: verboseLogging);
+ public AzureTableStorageJournalSettings WithDevelopment(bool development)
+ => Copy(development: development);
+ public AzureTableStorageJournalSettings WithAutoInitialize(bool autoInitialize)
+ => Copy(autoInitialize: autoInitialize);
+ public AzureTableStorageJournalSettings WithAzureCredential(
+ Uri serviceUri,
+ DefaultAzureCredential defaultAzureCredential,
+ TableClientOptions tableClientOptions = null)
+ => Copy(
+ serviceUri: serviceUri,
+ defaultAzureCredential: defaultAzureCredential,
+ tableClientOptions: tableClientOptions);
+
+ private AzureTableStorageJournalSettings Copy(
+ string connectionString = null,
+ string tableName = null,
+ TimeSpan? connectTimeout = null,
+ TimeSpan? requestTimeout = null,
+ bool? verboseLogging = null,
+ bool? development = null,
+ bool? autoInitialize = null,
+ Uri serviceUri = null,
+ DefaultAzureCredential defaultAzureCredential = null,
+ TableClientOptions tableClientOptions = null)
+ => new AzureTableStorageJournalSettings(
+ connectionString: connectionString ?? ConnectionString,
+ tableName: tableName ?? TableName,
+ connectTimeout: connectTimeout ?? ConnectTimeout,
+ requestTimeout: requestTimeout ?? RequestTimeout,
+ verboseLogging: verboseLogging ?? VerboseLogging,
+ development: development ?? Development,
+ autoInitialize: autoInitialize ?? AutoInitialize,
+ serviceUri: serviceUri ?? ServiceUri,
+ defaultAzureCredential: defaultAzureCredential ?? DefaultAzureCredential,
+ tableClientOptions: tableClientOptions ?? TableClientOptions);
+
+ ///
+ /// Creates an instance using the
+ /// `akka.persistence.journal.azure-table` HOCON configuration section inside
+ /// the settings.
+ ///
+ /// The to extract the configuration from
+ /// A new settings instance.
+ public static AzureTableStorageJournalSettings Create(ActorSystem system)
+ {
+ var config = system.Settings.Config.GetConfig("akka.persistence.journal.azure-table");
+ if (config is null)
+ throw new ConfigurationException(
+ "Could not find HOCON config at path 'akka.persistence.journal.azure-table'");
+ return Create(config);
+ }
+
///
/// Creates an instance using the
/// `akka.persistence.journal.azure-table` HOCON configuration section.
@@ -84,6 +202,9 @@ public AzureTableStorageJournalSettings(
/// A new settings instance.
public static AzureTableStorageJournalSettings Create(Config config)
{
+ if (config is null)
+ throw new ArgumentNullException(nameof(config));
+
var connectionString = config.GetString("connection-string");
var tableName = config.GetString("table-name");
var connectTimeout = config.GetTimeSpan("connect-timeout", TimeSpan.FromSeconds(3));
@@ -93,13 +214,16 @@ public static AzureTableStorageJournalSettings Create(Config config)
var autoInitialize = config.GetBoolean("auto-initialize", true);
return new AzureTableStorageJournalSettings(
- connectionString,
- tableName,
- connectTimeout,
- requestTimeout,
- verbose,
- development,
- autoInitialize);
+ connectionString: connectionString,
+ tableName: tableName,
+ connectTimeout: connectTimeout,
+ requestTimeout: requestTimeout,
+ verboseLogging: verbose,
+ development: development,
+ autoInitialize: autoInitialize,
+ serviceUri: null,
+ defaultAzureCredential: null,
+ tableClientOptions: null);
}
}
}
\ No newline at end of file
diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSetup.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSetup.cs
new file mode 100644
index 00000000..3e42ad35
--- /dev/null
+++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSetup.cs
@@ -0,0 +1,92 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2015 - 2018 Petabridge, LLC
+//
+// -----------------------------------------------------------------------
+
+using System;
+using Akka.Actor.Setup;
+using Azure.Data.Tables;
+using Azure.Identity;
+
+namespace Akka.Persistence.Azure.Journal
+{
+ public class AzureTableStorageJournalSetup : Setup
+ {
+ ///
+ /// The connection string for connecting to Windows Azure table storage.
+ ///
+ public string ConnectionString { get; set; }
+
+ ///
+ /// The table of the table we'll be connecting to.
+ ///
+ public string TableName { get; set; }
+
+ ///
+ /// Initial timeout to use when connecting to Azure Table Storage for the first time.
+ ///
+ public TimeSpan? ConnectTimeout { get; set; }
+
+ ///
+ /// Timeouts for individual read, write, and delete requests to Azure Table Storage.
+ ///
+ public TimeSpan? RequestTimeout { get; set; }
+
+ ///
+ /// For debugging purposes only. Logs every individual operation to Azure table storage.
+ ///
+ public bool? VerboseLogging { get; set; }
+
+ ///
+ /// Flag that we're running in development mode. When this is set, and
+ /// will be ignored, replaced with "UseDevelopmentStorage=true" for local
+ /// connection to Azurite.
+ ///
+ public bool? Development { get; set; }
+
+ ///
+ /// Automatically create the Table Storage table if no existing table is found
+ ///
+ public bool? AutoInitialize { get; set; }
+
+ ///
+ /// A referencing the blob service.
+ /// This is likely to be similar to "https://{account_name}.table.core.windows.net".
+ ///
+ public Uri ServiceUri { get; set; }
+
+ ///
+ /// The used to sign API requests.
+ ///
+ public DefaultAzureCredential DefaultAzureCredential { get; set; }
+
+ ///
+ /// Optional client options that define the transport pipeline policies for authentication,
+ /// retries, etc., that are applied to every request.
+ ///
+ public TableClientOptions TableClientOptions { get; set; }
+
+ internal AzureTableStorageJournalSettings Apply(AzureTableStorageJournalSettings settings)
+ {
+ if (ConnectionString != null)
+ settings = settings.WithConnectionString(ConnectionString);
+ if (TableName != null)
+ settings = settings.WithTableName(TableName);
+ if (ConnectTimeout != null)
+ settings = settings.WithConnectTimeout(ConnectTimeout.Value);
+ if (RequestTimeout != null)
+ settings = settings.WithRequestTimeout(RequestTimeout.Value);
+ if (VerboseLogging != null)
+ settings = settings.WithVerboseLogging(VerboseLogging.Value);
+ if (Development != null)
+ settings = settings.WithDevelopment(Development.Value);
+ if (AutoInitialize != null)
+ settings = settings.WithAutoInitialize(AutoInitialize.Value);
+ if (ServiceUri != null && DefaultAzureCredential != null)
+ settings = settings.WithAzureCredential(ServiceUri, DefaultAzureCredential, TableClientOptions);
+
+ return settings;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs b/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs
index ac619942..aaa35d23 100644
--- a/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs
+++ b/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs
@@ -17,26 +17,37 @@ public class AzureTableStorageReadJournal : IReadJournal,
IEventsByTagQuery,
ICurrentEventsByTagQuery
{
- public static string Identifier = "akka.persistence.query.journal.azure-table";
+ public const string Identifier = "akka.persistence.query.journal.azure-table";
private readonly int _maxBufferSize;
private readonly TimeSpan _refreshInterval;
private readonly string _writeJournalPluginId;
///
- /// Returns a default query configuration for akka persistence SQLite-based journals and snapshot stores.
+ /// Returns a default query configuration for akka persistence Azure-based journals and snapshot stores.
///
///
- public static Config DefaultConfiguration()
- {
- return ConfigurationFactory.FromResource("Akka.Persistence.Azure.reference.conf");
- }
+ // NOTE: Do NOT remove this method, this is being called through reflection magic code
+ // in Akka.Persistence.Query.PersistenceQuery.GetDefaultConfig()
+ public static Config DefaultConfiguration() => AzurePersistence.DefaultConfig;
public AzureTableStorageReadJournal(ExtendedActorSystem system, Config config)
{
_maxBufferSize = config.GetInt("max-buffer-size");
_refreshInterval = config.GetTimeSpan("refresh-interval");
_writeJournalPluginId = config.GetString("write-plugin");
+
+ var setupOption = system.Settings.Setup.Get();
+ if (setupOption.HasValue)
+ {
+ var setup = setupOption.Value;
+ if (setup.MaxBufferSize != null)
+ _maxBufferSize = setup.MaxBufferSize.Value;
+ if (setup.RefreshInterval != null)
+ _refreshInterval = setup.RefreshInterval.Value;
+ if (!string.IsNullOrWhiteSpace(setup.WritePluginId))
+ _writeJournalPluginId = setup.WritePluginId;
+ }
}
///
diff --git a/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournalSetup.cs b/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournalSetup.cs
new file mode 100644
index 00000000..733a07f8
--- /dev/null
+++ b/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournalSetup.cs
@@ -0,0 +1,35 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2015 - 2018 Petabridge, LLC
+//
+// -----------------------------------------------------------------------
+
+using System;
+using Akka.Actor.Setup;
+
+namespace Akka.Persistence.Azure.Query
+{
+ public class AzureTableStorageReadJournalSetup: Setup
+ {
+ ///
+ /// How many events to fetch in one query (replay) and keep buffered until they
+ /// are delivered downstream.
+ ///
+ public int? MaxBufferSize { get; set; }
+
+ ///
+ /// The Azure Table write journal is notifying the query side as soon as things
+ /// are persisted, but for efficiency reasons the query side retrieves the events
+ /// in batches that sometimes can be delayed up to the configured .
+ ///
+ public TimeSpan? RefreshInterval { get; set; }
+
+ ///
+ /// Absolute path to the write journal plugin configuration entry that this
+ /// query journal will connect to.
+ /// If undefined (or "") it will connect to the default journal as specified by the
+ /// akka.persistence.journal.plugin property.
+ ///
+ public string WritePluginId { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotSetup.cs b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotSetup.cs
index c7c26a9e..f6d10054 100644
--- a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotSetup.cs
+++ b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotSetup.cs
@@ -1,12 +1,21 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2015 - 2018 Petabridge, LLC
+//
+// -----------------------------------------------------------------------
+
+using System;
using Akka.Actor.Setup;
using Azure.Identity;
using Azure.Storage.Blobs;
+using Azure.Storage.Blobs.Models;
namespace Akka.Persistence.Azure.Snapshot
{
+ ///
+ /// Setup class for .
+ /// Any populated properties will override its respective HOCON setting.
+ ///
public class AzureBlobSnapshotSetup : Setup
{
///
@@ -28,22 +37,94 @@ public static AzureBlobSnapshotSetup Create(
Uri serviceUri,
DefaultAzureCredential defaultAzureCredential,
BlobClientOptions blobClientOptions = default)
- => new AzureBlobSnapshotSetup(serviceUri, defaultAzureCredential, blobClientOptions);
+ => new AzureBlobSnapshotSetup
+ {
+ ServiceUri = serviceUri,
+ DefaultAzureCredential = defaultAzureCredential,
+ BlobClientOptions = blobClientOptions
+ };
- private AzureBlobSnapshotSetup(
- Uri serviceUri,
- DefaultAzureCredential azureCredential,
- BlobClientOptions blobClientOptions)
- {
- ServiceUri = serviceUri;
- DefaultAzureCredential = azureCredential;
- BlobClientOptions = blobClientOptions;
- }
+ ///
+ /// The connection string for connecting to Windows Azure blob storage account.
+ ///
+ public string ConnectionString { get; set; }
+
+ ///
+ /// The table of the container we'll be using to serialize these blobs.
+ ///
+ public string ContainerName { get; set; }
+
+ ///
+ /// Initial timeout to use when connecting to Azure Container Storage for the first time.
+ ///
+ public TimeSpan? ConnectTimeout { get; set; }
+
+ ///
+ /// Timeouts for individual read, write, and delete requests to Azure Container Storage.
+ ///
+ public TimeSpan? RequestTimeout { get; set; }
+
+ ///
+ /// For debugging purposes only. Logs every individual operation to Azure table storage.
+ ///
+ public bool? VerboseLogging { get; set; }
+
+ ///
+ /// Flag that we're running in development mode. When this is set, and
+ /// will be ignored, replaced with "UseDevelopmentStorage=true" for local
+ /// connection to Azurite.
+ ///
+ public bool? Development { get; set; }
+
+ ///
+ /// Automatically create the Blog Storage container if no existing Blob container is found
+ ///
+ public bool? AutoInitialize { get; set; }
+
+ ///
+ /// The public access type of the auto-initialized Blob Storage container
+ ///
+ public PublicAccessType? ContainerPublicAccessType { get; set; }
+
+ ///
+ /// A referencing the blob service.
+ /// This is likely to be similar to "https://{account_name}.blob.core.windows.net".
+ ///
+ public Uri ServiceUri { get; set; }
- public Uri ServiceUri { get; }
+ ///
+ /// The used to sign API requests.
+ ///
+ public DefaultAzureCredential DefaultAzureCredential { get; set; }
+
+ ///
+ /// Optional client options that define the transport pipeline policies for authentication,
+ /// retries, etc., that are applied to every request.
+ ///
+ public BlobClientOptions BlobClientOptions { get; set; }
- public DefaultAzureCredential DefaultAzureCredential { get; }
+ internal AzureBlobSnapshotStoreSettings Apply(AzureBlobSnapshotStoreSettings settings)
+ {
+ if (ConnectionString != null)
+ settings = settings.WithConnectionString(ConnectionString);
+ if (ContainerName != null)
+ settings = settings.WithContainerName(ContainerName);
+ if (ConnectTimeout != null)
+ settings = settings.WithConnectTimeout(ConnectTimeout.Value);
+ if (RequestTimeout != null)
+ settings = settings.WithRequestTimeout(RequestTimeout.Value);
+ if (VerboseLogging != null)
+ settings = settings.WithVerboseLogging(VerboseLogging.Value);
+ if (Development != null)
+ settings = settings.WithDevelopment(Development.Value);
+ if (AutoInitialize != null)
+ settings = settings.WithAutoInitialize(AutoInitialize.Value);
+ if (ContainerPublicAccessType != null)
+ settings = settings.WithContainerPublicAccessType(ContainerPublicAccessType.Value);
+ if (ServiceUri != null && DefaultAzureCredential != null)
+ settings = settings.WithAzureCredential(ServiceUri, DefaultAzureCredential, BlobClientOptions);
- public BlobClientOptions BlobClientOptions { get; }
+ return settings;
+ }
}
}
diff --git a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs
index 2afe505b..488b04f8 100644
--- a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs
+++ b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs
@@ -54,24 +54,22 @@ public AzureBlobSnapshotStore(Config config = null)
? AzurePersistence.Get(Context.System).BlobSettings
: AzureBlobSnapshotStoreSettings.Create(config);
+ var setup = Context.System.Settings.Setup.Get();
+ if (setup.HasValue)
+ _settings = setup.Value.Apply(_settings);
+
if (_settings.Development)
{
- _serviceClient = new BlobServiceClient("UseDevelopmentStorage=true");
+ _serviceClient = new BlobServiceClient(connectionString: "UseDevelopmentStorage=true");
}
else
{
- var credentialSetup = Context.System.Settings.Setup.Get();
- if (credentialSetup.HasValue)
- {
- _serviceClient = new BlobServiceClient(
- credentialSetup.Value.ServiceUri,
- credentialSetup.Value.DefaultAzureCredential,
- credentialSetup.Value.BlobClientOptions);
- }
- else
- {
- _serviceClient = new BlobServiceClient(_settings.ConnectionString);
- }
+ _serviceClient = _settings.ServiceUri != null && _settings.DefaultAzureCredential != null
+ ? _serviceClient = new BlobServiceClient(
+ serviceUri: _settings.ServiceUri,
+ credential: _settings.DefaultAzureCredential,
+ options: _settings.BlobClientOptions)
+ : _serviceClient = new BlobServiceClient(connectionString: _settings.ConnectionString);
}
_containerClient = new Lazy(() => InitCloudStorage(5).Result);
diff --git a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs
index f7c5aa26..25ca31e2 100644
--- a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs
+++ b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs
@@ -5,8 +5,11 @@
// -----------------------------------------------------------------------
using System;
+using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Azure.Util;
+using Azure.Identity;
+using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
namespace Akka.Persistence.Azure.Snapshot
@@ -29,6 +32,7 @@ public AzureBlobSnapshotStoreSettings(
: this(connectionString, containerName, connectTimeout, requestTimeout, verboseLogging, development, autoInitialize, PublicAccessType.BlobContainer)
{ }
+ [Obsolete]
public AzureBlobSnapshotStoreSettings(
string connectionString,
string containerName,
@@ -38,6 +42,32 @@ public AzureBlobSnapshotStoreSettings(
bool development,
bool autoInitialize,
PublicAccessType containerPublicAccessType)
+ : this(
+ connectionString: connectionString,
+ containerName: containerName,
+ connectTimeout: connectTimeout,
+ requestTimeout: requestTimeout,
+ verboseLogging: verboseLogging,
+ development: development,
+ autoInitialize: autoInitialize,
+ containerPublicAccessType: containerPublicAccessType,
+ serviceUri: null,
+ defaultAzureCredential: null,
+ blobClientOption: null)
+ { }
+
+ public AzureBlobSnapshotStoreSettings(
+ string connectionString,
+ string containerName,
+ TimeSpan connectTimeout,
+ TimeSpan requestTimeout,
+ bool verboseLogging,
+ bool development,
+ bool autoInitialize,
+ PublicAccessType containerPublicAccessType,
+ Uri serviceUri,
+ DefaultAzureCredential defaultAzureCredential,
+ BlobClientOptions blobClientOption)
{
if (string.IsNullOrWhiteSpace(containerName))
throw new ConfigurationException("[AzureBlobSnapshotStore] Container name is null or empty.");
@@ -51,6 +81,9 @@ public AzureBlobSnapshotStoreSettings(
Development = development;
AutoInitialize = autoInitialize;
ContainerPublicAccessType = containerPublicAccessType;
+ ServiceUri = serviceUri;
+ DefaultAzureCredential = defaultAzureCredential;
+ BlobClientOptions = blobClientOption;
}
///
@@ -78,12 +111,106 @@ public AzureBlobSnapshotStoreSettings(
///
public bool VerboseLogging { get; }
+ ///
+ /// Flag that we're running in development mode. When this is set, and
+ /// will be ignored, replaced with "UseDevelopmentStorage=true" for local
+ /// connection to Azurite.
+ ///
public bool Development { get; }
+ ///
+ /// Automatically create the Blog Storage container if no existing Blob container is found
+ ///
public bool AutoInitialize { get; }
+ ///
+ /// The public access type of the auto-initialized Blob Storage container
+ ///
public PublicAccessType ContainerPublicAccessType { get; }
+ ///
+ /// A referencing the blob service.
+ /// This is likely to be similar to "https://{account_name}.blob.core.windows.net".
+ ///
+ public Uri ServiceUri { get; }
+
+ ///
+ /// The used to sign API requests.
+ ///
+ public DefaultAzureCredential DefaultAzureCredential { get; }
+
+ ///
+ /// Optional client options that define the transport pipeline policies for authentication,
+ /// retries, etc., that are applied to every request.
+ ///
+ public BlobClientOptions BlobClientOptions { get; }
+
+ public AzureBlobSnapshotStoreSettings WithConnectionString(string connectionString)
+ => Copy(connectionString: connectionString);
+ public AzureBlobSnapshotStoreSettings WithContainerName(string containerName)
+ => Copy(containerName: containerName);
+ public AzureBlobSnapshotStoreSettings WithConnectTimeout(TimeSpan connectTimeout)
+ => Copy(connectTimeout: connectTimeout);
+ public AzureBlobSnapshotStoreSettings WithRequestTimeout(TimeSpan requestTimeout)
+ => Copy(requestTimeout: requestTimeout);
+ public AzureBlobSnapshotStoreSettings WithVerboseLogging(bool verboseLogging)
+ => Copy(verboseLogging: verboseLogging);
+ public AzureBlobSnapshotStoreSettings WithDevelopment(bool development)
+ => Copy(development: development);
+ public AzureBlobSnapshotStoreSettings WithAutoInitialize(bool autoInitialize)
+ => Copy(autoInitialize: autoInitialize);
+ public AzureBlobSnapshotStoreSettings WithContainerPublicAccessType(PublicAccessType containerPublicAccessType)
+ => Copy(containerPublicAccessType: containerPublicAccessType);
+ public AzureBlobSnapshotStoreSettings WithAzureCredential(
+ Uri serviceUri,
+ DefaultAzureCredential defaultAzureCredential,
+ BlobClientOptions blobClientOption = null)
+ => Copy(
+ serviceUri: serviceUri,
+ defaultAzureCredential: defaultAzureCredential,
+ blobClientOption: blobClientOption);
+
+ private AzureBlobSnapshotStoreSettings Copy(
+ string connectionString = null,
+ string containerName = null,
+ TimeSpan? connectTimeout = null,
+ TimeSpan? requestTimeout = null,
+ bool? verboseLogging = null,
+ bool? development = null,
+ bool? autoInitialize = null,
+ PublicAccessType? containerPublicAccessType = null,
+ Uri serviceUri = null,
+ DefaultAzureCredential defaultAzureCredential = null,
+ BlobClientOptions blobClientOption = null)
+ => new AzureBlobSnapshotStoreSettings(
+ connectionString ?? ConnectionString,
+ containerName ?? ContainerName,
+ connectTimeout ?? ConnectTimeout,
+ requestTimeout ?? RequestTimeout,
+ verboseLogging ?? VerboseLogging,
+ development ?? Development,
+ autoInitialize ?? AutoInitialize,
+ containerPublicAccessType ?? ContainerPublicAccessType,
+ serviceUri ?? ServiceUri,
+ defaultAzureCredential ?? DefaultAzureCredential,
+ blobClientOption ?? BlobClientOptions);
+
+ ///
+ /// Creates an instance using the
+ /// `akka.persistence.snapshot-store.azure-blob-store` HOCON configuration section inside
+ /// the settings.
+ ///
+ /// The to extract the configuration from
+ /// A new settings instance.
+ public static AzureBlobSnapshotStoreSettings Create(ActorSystem system)
+ {
+ var config = system.Settings.Config.GetConfig("akka.persistence.snapshot-store.azure-blob-store");
+ if (config is null)
+ throw new ConfigurationException(
+ "Could not find HOCON config at path 'akka.persistence.snapshot-store.azure-blob-store'");
+ return Create(config);
+ }
+
///
/// Creates an instance using the
/// `akka.persistence.snapshot-store.azure-blob-store` HOCON configuration section.
@@ -92,6 +219,9 @@ public AzureBlobSnapshotStoreSettings(
/// A new settings instance.
public static AzureBlobSnapshotStoreSettings Create(Config config)
{
+ if (config is null)
+ throw new ArgumentNullException(nameof(config));
+
var connectionString = config.GetString("connection-string");
var containerName = config.GetString("container-name");
var connectTimeout = config.GetTimeSpan("connect-timeout", TimeSpan.FromSeconds(3));