Skip to content
Merged
21 changes: 21 additions & 0 deletions Orleans.sln
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ChaoticCluster.ServiceDefau
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestSerializerExternalModels", "test\Misc\TestSerializerExternalModels\TestSerializerExternalModels.csproj", "{5D587DDE-036D-4694-A314-8DDF270AC031}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Orleans.Journaling", "src\Orleans.Journaling\Orleans.Journaling.csproj", "{20EFDCFC-F3FE-5509-5950-516E90DE1E05}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Orleans.Journaling.Tests", "test\Orleans.Journaling.Tests\Orleans.Journaling.Tests.csproj", "{4A4D30F4-6D61-7A80-8352-D76BD29582E0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Orleans.Journaling.AzureStorage", "src\Azure\Orleans.Journaling.AzureStorage\Orleans.Journaling.AzureStorage.csproj", "{E613A10D-757D-44BA-97C1-3D06C22BDB2E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -672,6 +678,18 @@ Global
{5D587DDE-036D-4694-A314-8DDF270AC031}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5D587DDE-036D-4694-A314-8DDF270AC031}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5D587DDE-036D-4694-A314-8DDF270AC031}.Release|Any CPU.Build.0 = Release|Any CPU
{20EFDCFC-F3FE-5509-5950-516E90DE1E05}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{20EFDCFC-F3FE-5509-5950-516E90DE1E05}.Debug|Any CPU.Build.0 = Debug|Any CPU
{20EFDCFC-F3FE-5509-5950-516E90DE1E05}.Release|Any CPU.ActiveCfg = Release|Any CPU
{20EFDCFC-F3FE-5509-5950-516E90DE1E05}.Release|Any CPU.Build.0 = Release|Any CPU
{4A4D30F4-6D61-7A80-8352-D76BD29582E0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4A4D30F4-6D61-7A80-8352-D76BD29582E0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4A4D30F4-6D61-7A80-8352-D76BD29582E0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4A4D30F4-6D61-7A80-8352-D76BD29582E0}.Release|Any CPU.Build.0 = Release|Any CPU
{E613A10D-757D-44BA-97C1-3D06C22BDB2E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E613A10D-757D-44BA-97C1-3D06C22BDB2E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E613A10D-757D-44BA-97C1-3D06C22BDB2E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E613A10D-757D-44BA-97C1-3D06C22BDB2E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -796,6 +814,9 @@ Global
{76A549FA-69F1-4967-82B6-161A8B52C86B} = {2579A7F6-EBE8-485A-BB20-A5D19DB5612B}
{4004A79F-B6BB-4472-891B-AD1348AE3E93} = {2579A7F6-EBE8-485A-BB20-A5D19DB5612B}
{5D587DDE-036D-4694-A314-8DDF270AC031} = {70BCC54E-1618-4742-A079-07588065E361}
{20EFDCFC-F3FE-5509-5950-516E90DE1E05} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
{4A4D30F4-6D61-7A80-8352-D76BD29582E0} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A}
{E613A10D-757D-44BA-97C1-3D06C22BDB2E} = {4C5D66BF-EE1C-4DD8-8551-D1B7F3768A34}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952}
Expand Down
175 changes: 175 additions & 0 deletions src/Azure/Orleans.Journaling.AzureStorage/AzureAppendBlobLogStorage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
using Azure;
using Azure.Storage.Blobs.Specialized;
using Azure.Storage.Blobs.Models;
using System.Runtime.CompilerServices;
using Azure.Storage.Sas;
using Orleans.Serialization.Buffers;
using Microsoft.Extensions.Logging;

namespace Orleans.Journaling;

internal sealed class AzureAppendBlobLogStorage : IStateMachineStorage
{
private static readonly AppendBlobCreateOptions CreateOptions = new() { Conditions = new() { IfNoneMatch = ETag.All } };
private readonly AppendBlobClient _client;
private readonly ILogger<AzureAppendBlobLogStorage> _logger;
private readonly LogExtentBuilder.ReadOnlyStream _stream;
private readonly AppendBlobAppendBlockOptions _appendOptions;
private bool _exists;
private int _numBlocks;

public bool IsCompactionRequested => _numBlocks > 10;

public AzureAppendBlobLogStorage(AppendBlobClient client, ILogger<AzureAppendBlobLogStorage> logger)
{
_client = client;
_logger = logger;
_stream = new();

// For the first request, if we have not performed a read yet, we want to guard against clobbering an existing blob.
_appendOptions = new AppendBlobAppendBlockOptions() { Conditions = new AppendBlobRequestConditions { IfNoneMatch = ETag.All } };
}

public async ValueTask AppendAsync(LogExtentBuilder value, CancellationToken cancellationToken)
{
if (!_exists)
{
var response = await _client.CreateAsync(CreateOptions, cancellationToken);
_appendOptions.Conditions.IfNoneMatch = default;
_appendOptions.Conditions.IfMatch = response.Value.ETag;
_exists = true;
}

_stream.SetBuilder(value);
var result = await _client.AppendBlockAsync(_stream, _appendOptions, cancellationToken).ConfigureAwait(false);
if (_logger.IsEnabled(LogLevel.Debug))
{
var length = value.Length;
_logger.LogDebug("Appended {Length} bytes to blob \"{ContainerName}/{BlobName}\"", length, _client.BlobContainerName, _client.Name);
}

_stream.Reset();
_appendOptions.Conditions.IfNoneMatch = default;
_appendOptions.Conditions.IfMatch = result.Value.ETag;
_numBlocks = result.Value.BlobCommittedBlockCount;
}

public async ValueTask DeleteAsync(CancellationToken cancellationToken)
{
var conditions = new BlobRequestConditions { IfMatch = _appendOptions.Conditions.IfMatch };
await _client.DeleteAsync(conditions: conditions, cancellationToken: cancellationToken).ConfigureAwait(false);

// Expect no blob to have been created when we append to it.
_appendOptions.Conditions.IfNoneMatch = ETag.All;
_appendOptions.Conditions.IfMatch = default;
_numBlocks = 0;
}

public async IAsyncEnumerable<LogExtent> ReadAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
Response<BlobDownloadStreamingResult> result;
try
{
// If the blob was not newly created, then download the blob.
result = await _client.DownloadStreamingAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}
catch (RequestFailedException exception) when (exception.Status is 404)
{
_exists = false;
yield break;
}

// If the blob has a size of zero, check for a snapshot and restore the blob from the snapshot if one exists.
if (result.Value.Details.ContentLength == 0)
{
if (result.Value.Details.Metadata.TryGetValue("snapshot", out var snapshot) && snapshot is { Length: > 0 })
{
result = await CopyFromSnapshotAsync(result.Value.Details.ETag, snapshot, cancellationToken).ConfigureAwait(false);
}
}

_numBlocks = result.Value.Details.BlobCommittedBlockCount;
_appendOptions.Conditions.IfNoneMatch = default;
_appendOptions.Conditions.IfMatch = result.Value.Details.ETag;
_exists = true;

// Read everything into a single log segment. We could change this to read in chunks,
// yielding when the stream does not return synchronously, if we wanted to support larger state machines.
var rawStream = result.Value.Content;
using var buffer = new ArcBufferWriter();
while (true)
{
var mem = buffer.GetMemory();
var bytesRead = await rawStream.ReadAsync(mem, cancellationToken);
if (bytesRead == 0)
{
if (buffer.Length > 0)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
var length = buffer.Length;
_logger.LogDebug("Read {Length} bytes from blob \"{ContainerName}/{BlobName}\"", length, _client.BlobContainerName, _client.Name);
}

yield return new LogExtent(buffer.ConsumeSlice(buffer.Length));
}

yield break;
}

buffer.AdvanceWriter(bytesRead);
}
}

private async Task<Response<BlobDownloadStreamingResult>> CopyFromSnapshotAsync(ETag eTag, string snapshotDetail, CancellationToken cancellationToken)
{
// Read snapshot and append it to the blob.
var snapshot = _client.WithSnapshot(snapshotDetail);
var uri = snapshot.GenerateSasUri(permissions: BlobSasPermissions.Read, expiresOn: DateTimeOffset.UtcNow.AddHours(1));
var copyResult = await _client.SyncCopyFromUriAsync(
uri,
new BlobCopyFromUriOptions { DestinationConditions = new BlobRequestConditions { IfNoneMatch = eTag } },
cancellationToken).ConfigureAwait(false);
if (copyResult.Value.CopyStatus is not CopyStatus.Success)
{
throw new InvalidOperationException($"Copy did not complete successfully. Status: {copyResult.Value.CopyStatus}");
}

var result = await _client.DownloadStreamingAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
_exists = true;
return result;
}

public async ValueTask ReplaceAsync(LogExtentBuilder value, CancellationToken cancellationToken)
{
// Create a snapshot of the blob for recovery purposes.
var blobSnapshot = await _client.CreateSnapshotAsync(conditions: _appendOptions.Conditions, cancellationToken: cancellationToken).ConfigureAwait(false);

// Open the blob for writing, overwriting existing contents.
var createOptions = new AppendBlobCreateOptions()
{
Conditions = _appendOptions.Conditions,
Metadata = new Dictionary<string, string> { ["snapshot"] = blobSnapshot.Value.Snapshot },
};
var createResult = await _client.CreateAsync(createOptions, cancellationToken).ConfigureAwait(false);
_appendOptions.Conditions.IfMatch = createResult.Value.ETag;
_appendOptions.Conditions.IfNoneMatch = default;

// Write the state machine snapshot.
_stream.SetBuilder(value);
var result = await _client.AppendBlockAsync(_stream, _appendOptions, cancellationToken).ConfigureAwait(false);
if (_logger.IsEnabled(LogLevel.Debug))
{
var length = value.Length;
_logger.LogDebug("Replaced blob \"{ContainerName}/{BlobName}\", writing {Length} bytes", _client.BlobContainerName, _client.Name, length);
}

_stream.Reset();
_appendOptions.Conditions.IfNoneMatch = default;
_appendOptions.Conditions.IfMatch = result.Value.ETag;
_numBlocks = result.Value.BlobCommittedBlockCount;

// Delete the blob snapshot.
await _client.WithSnapshot(blobSnapshot.Value.Snapshot).DeleteAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage;
using Azure.Core;
using Orleans.Runtime;

namespace Orleans.Journaling;

/// <summary>
/// Options for configuring the Azure Append Blob state machine storage provider.
/// </summary>
public sealed class AzureAppendBlobStateMachineStorageOptions
{
private BlobServiceClient? _blobServiceClient;

/// <summary>
/// Container name where state machine state is stored.
/// </summary>
public string ContainerName { get; set; } = DEFAULT_CONTAINER_NAME;
public const string DEFAULT_CONTAINER_NAME = "state";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this container name have to be globally unique for the storage account? Should we prepend something here to avoid collisions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's configurable, so developers can configure it how they like (eg, prefixing it with a unique id, possibly based on ServiceId). They can also provide a factory via the BuildContainerFactory property below to customize it on a per-grain basis.

They have options already, but we could set a different default value or potentially prefix it with the ServiceId automatically. What do you prefer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a collision error will be obvious, I guess it's fine to leave as is.


/// <summary>
/// Gets or sets the delegate used to generate the blob name for a given grain.
/// </summary>
public Func<GrainId, string> GetBlobName { get; set; } = DefaultGetBlobName;

private static readonly Func<GrainId, string> DefaultGetBlobName = static (GrainId grainId) => $"{grainId}.bin";

/// <summary>
/// Options to be used when configuring the blob storage client, or <see langword="null"/> to use the default options.
/// </summary>
public BlobClientOptions? ClientOptions { get; set; }

/// <summary>
/// Gets or sets the client used to access the Azure Blob Service.
/// </summary>
public BlobServiceClient? BlobServiceClient
{
get => _blobServiceClient;
set
{
ArgumentNullException.ThrowIfNull(value);
_blobServiceClient = value;
CreateClient = ct => Task.FromResult(value);
}
}

/// <summary>
/// The optional delegate used to create a <see cref="BlobServiceClient"/> instance.
/// </summary>
internal Func<CancellationToken, Task<BlobServiceClient>>? CreateClient { get; private set; }

/// <summary>
/// Stage of silo lifecycle where storage should be initialized. Storage must be initialized prior to use.
/// </summary>
public int InitStage { get; set; } = DEFAULT_INIT_STAGE;
public const int DEFAULT_INIT_STAGE = ServiceLifecycleStage.ApplicationServices;

/// <summary>
/// A function for building container factory instances.
/// </summary>
public Func<IServiceProvider, AzureAppendBlobStateMachineStorageOptions, IBlobContainerFactory> BuildContainerFactory { get; set; }
= static (provider, options) => new DefaultBlobContainerFactory(options);

/// <summary>
/// Configures the <see cref="BlobServiceClient"/> using a connection string.
/// </summary>
public void ConfigureBlobServiceClient(string connectionString)
{
ArgumentException.ThrowIfNullOrWhiteSpace(connectionString);
CreateClient = ct => Task.FromResult(new BlobServiceClient(connectionString, ClientOptions));
}

/// <summary>
/// Configures the <see cref="BlobServiceClient"/> using an authenticated service URI.
/// </summary>
public void ConfigureBlobServiceClient(Uri serviceUri)
{
ArgumentNullException.ThrowIfNull(serviceUri);
CreateClient = ct => Task.FromResult(new BlobServiceClient(serviceUri, ClientOptions));
}

/// <summary>
/// Configures the <see cref="BlobServiceClient"/> using the provided callback.
/// </summary>
public void ConfigureBlobServiceClient(Func<CancellationToken, Task<BlobServiceClient>> createClientCallback)
{
CreateClient = createClientCallback ?? throw new ArgumentNullException(nameof(createClientCallback));
}

/// <summary>
/// Configures the <see cref="BlobServiceClient"/> using an authenticated service URI and a <see cref="TokenCredential"/>.
/// </summary>
public void ConfigureBlobServiceClient(Uri serviceUri, TokenCredential tokenCredential)
{
ArgumentNullException.ThrowIfNull(serviceUri);
ArgumentNullException.ThrowIfNull(tokenCredential);
CreateClient = ct => Task.FromResult(new BlobServiceClient(serviceUri, tokenCredential, ClientOptions));
}

/// <summary>
/// Configures the <see cref="BlobServiceClient"/> using an authenticated service URI and a <see cref="AzureSasCredential"/>.
/// </summary>
public void ConfigureBlobServiceClient(Uri serviceUri, AzureSasCredential azureSasCredential)
{
ArgumentNullException.ThrowIfNull(serviceUri);
ArgumentNullException.ThrowIfNull(azureSasCredential);
CreateClient = ct => Task.FromResult(new BlobServiceClient(serviceUri, azureSasCredential, ClientOptions));
}

/// <summary>
/// Configures the <see cref="BlobServiceClient"/> using an authenticated service URI and a <see cref="StorageSharedKeyCredential"/>.
/// </summary>
public void ConfigureBlobServiceClient(Uri serviceUri, StorageSharedKeyCredential sharedKeyCredential)
{
ArgumentNullException.ThrowIfNull(serviceUri);
ArgumentNullException.ThrowIfNull(sharedKeyCredential);
CreateClient = ct => Task.FromResult(new BlobServiceClient(serviceUri, sharedKeyCredential, ClientOptions));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Azure.Storage.Blobs.Specialized;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging;
using Orleans.Runtime;

namespace Orleans.Journaling;

internal sealed class AzureAppendBlobStateMachineStorageProvider(
IOptions<AzureAppendBlobStateMachineStorageOptions> options,
IServiceProvider serviceProvider,
ILogger<AzureAppendBlobLogStorage> logger) : IStateMachineStorageProvider, ILifecycleParticipant<ISiloLifecycle>
{
private readonly IBlobContainerFactory _containerFactory = options.Value.BuildContainerFactory(serviceProvider, options.Value);
private readonly AzureAppendBlobStateMachineStorageOptions _options = options.Value;

private async Task Initialize(CancellationToken cancellationToken)
{
var client = await _options.CreateClient!(cancellationToken);
await _containerFactory.InitializeAsync(client, cancellationToken).ConfigureAwait(false);
}

public IStateMachineStorage Create(IGrainContext grainContext)
{
var container = _containerFactory.GetBlobContainerClient(grainContext.GrainId);
var blobName = _options.GetBlobName(grainContext.GrainId);
var blobClient = container.GetAppendBlobClient(blobName);
return new AzureAppendBlobLogStorage(blobClient, logger);
}

public void Participate(ISiloLifecycle observer)
{
observer.Subscribe(
nameof(AzureAppendBlobStateMachineStorageProvider),
ServiceLifecycleStage.RuntimeInitialize,
onStart: Initialize);
}
}
Loading