Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Orleans.Clustering.Cosmos;

internal class CosmosGatewayListProvider : IGatewayListProvider
internal partial class CosmosGatewayListProvider : IGatewayListProvider
{
private readonly ILogger _logger;
private readonly string _clusterId;
Expand Down Expand Up @@ -42,7 +42,7 @@ public async Task InitializeGatewayListProvider()
}
catch (Exception ex)
{
_logger.LogError(ex, "Error initializing Azure Cosmos DB gateway list provider");
LogErrorInitializingGatewayListProvider(ex);
throw;
}
}
Expand Down Expand Up @@ -70,11 +70,23 @@ public async Task<IList<Uri>> GetGateways()
}
catch (Exception ex)
{
_logger.LogError(ex, "Error reading gateway list from Azure Cosmos DB");
LogErrorReadingGatewayListFromCosmosDb(ex);
throw;
}
}

private static Uri ConvertToGatewayUri(SiloEntity gateway) =>
SiloAddress.New(new IPEndPoint(IPAddress.Parse(gateway.Address), gateway.ProxyPort!.Value), gateway.Generation).ToGatewayUri();
}

[LoggerMessage(
Level = LogLevel.Error,
Message = "Error initializing Azure Cosmos DB gateway list provider"
)]
private partial void LogErrorInitializingGatewayListProvider(Exception ex);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Error reading gateway list from Azure Cosmos DB"
)]
private partial void LogErrorReadingGatewayListFromCosmosDb(Exception ex);
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public async Task<MembershipTableData> ReadRow(SiloAddress key)
}
else
{
_logger.LogError("Initial ClusterVersionEntity entity does not exist.");
LogErrorClusterVersionEntityDoesNotExist();
}

var memEntries = new List<Tuple<MembershipEntry, string>>
Expand Down Expand Up @@ -577,4 +577,4 @@ private readonly struct MembershipEntryLogValue(MembershipEntry membershipEntry)
Message = "Unable to query entry {Entry}"
)]
private partial void LogWarningUnableToQueryEntry(MembershipEntryLogValue entry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
namespace Orleans.Runtime.Host
{
/// <summary>
/// Interface exposed by ServiceRuntimeWrapper for functionality provided
/// Interface exposed by ServiceRuntimeWrapper for functionality provided
/// by Microsoft.WindowsAzure.ServiceRuntime.
/// </summary>
public interface IServiceRuntimeWrapper
Expand All @@ -33,7 +33,7 @@ public interface IServiceRuntimeWrapper
/// Update domain of the role instance
/// </summary>
int UpdateDomain { get; }

/// <summary>
/// Fault domain of the role instance
/// </summary>
Expand Down Expand Up @@ -75,16 +75,16 @@ public interface IServiceRuntimeWrapper


/// <summary>
/// The purpose of this class is to wrap the functionality provided
/// The purpose of this class is to wrap the functionality provided
/// by Microsoft.WindowsAzure.ServiceRuntime.dll, so that we can access it via Reflection,
/// and not have a compile-time dependency on it.
/// Microsoft.WindowsAzure.ServiceRuntime.dll doesn't have an official NuGet package.
/// By loading it via Reflection we solve this problem, and do not need an assembly
/// By loading it via Reflection we solve this problem, and do not need an assembly
/// binding redirect for it, as we can call any compatible version.
/// Microsoft.WindowsAzure.ServiceRuntime.dll hasn't changed in years, so the chance of a breaking change
/// is relatively low.
/// </summary>
internal class ServiceRuntimeWrapper : IServiceRuntimeWrapper, IDeploymentConfiguration
internal partial class ServiceRuntimeWrapper : IServiceRuntimeWrapper, IDeploymentConfiguration
{
private readonly ILogger logger;
private Assembly assembly;
Expand Down Expand Up @@ -130,7 +130,7 @@ public IList<string> GetAllSiloNames()
var list = new List<string>();
foreach(dynamic instance in instances)
list.Add(ExtractInstanceName(instance.Id,DeploymentId));

return list;
}

Expand All @@ -145,15 +145,8 @@ public IPEndPoint GetIPEndpoint(string endpointName)
}
catch (Exception exc)
{
var endpointNames = (string)string.Join(", ", instanceEndpoints);
logger.LogError(
(int)ErrorCode.SiloEndpointConfigError,
exc,
"Unable to obtain endpoint info for role {RoleName} from role config parameter {EndpointName} -- Endpoints defined = [{EndpointNames}]",
RoleName,
endpointName,
endpointNames);

var endpointNames = string.Join(", ", instanceEndpoints);
LogErrorUnableToObtainEndpointInfo(RoleName, endpointName, endpointNames);
throw new OrleansException(
$"Unable to obtain endpoint info for role {RoleName} from role config parameter {endpointName} -- Endpoints defined = [{endpointNames}]",
exc);
Expand All @@ -169,7 +162,7 @@ public void SubscribeForStoppingNotification(object handlerObject, EventHandler<
{
var handlerDelegate = handler.GetMethodInfo().CreateDelegate(stoppingEvent.EventHandlerType, handlerObject);
stoppingEventAdd.Invoke(null, new object[] { handlerDelegate });

}

public void UnsubscribeFromStoppingNotification(object handlerObject, EventHandler<object> handler)
Expand All @@ -187,18 +180,16 @@ private void Initialize()
// If we are runing within a worker role Microsoft.WindowsAzure.ServiceRuntime should already be loaded
if (assembly == null)
{
const string msg1 = "Microsoft.WindowsAzure.ServiceRuntime is not loaded. Trying to load it with Assembly.LoadWithPartialName().";
logger.LogWarning((int)ErrorCode.AzureServiceRuntime_NotLoaded, msg1);
LogWarningAzureServiceRuntimeNotLoaded();

// Microsoft.WindowsAzure.ServiceRuntime isn't loaded. We may be running within a web role or not in Azure.
#pragma warning disable 618
assembly = Assembly.Load(new AssemblyName("Microsoft.WindowsAzure.ServiceRuntime, Version = 2.7.0.0, Culture = neutral, PublicKeyToken = 31bf3856ad364e35"));
#pragma warning restore 618
if (assembly == null)
{
const string msg2 = "Failed to find or load Microsoft.WindowsAzure.ServiceRuntime.";
logger.LogError((int)ErrorCode.AzureServiceRuntime_FailedToLoad, msg2);
throw new OrleansException(msg2);
LogErrorAzureServiceRuntimeFailedToLoad();
throw new OrleansException("Failed to find or load Microsoft.WindowsAzure.ServiceRuntime.");
}
}

Expand Down Expand Up @@ -231,5 +222,26 @@ private static string ExtractInstanceName(string instanceId, string deploymentId
? instanceId[(deploymentId.Length + 1)..]
: instanceId;
}

[LoggerMessage(
Level = LogLevel.Error,
EventId = (int)ErrorCode.SiloEndpointConfigError,
Message = "Unable to obtain endpoint info for role {RoleName} from role config parameter {EndpointName} -- Endpoints defined = [{EndpointNames}]"
)]
private partial void LogErrorUnableToObtainEndpointInfo(string roleName, string endpointName, string endpointNames);

[LoggerMessage(
Level = LogLevel.Warning,
EventId = (int)ErrorCode.AzureServiceRuntime_NotLoaded,
Message = "Microsoft.WindowsAzure.ServiceRuntime is not loaded. Trying to load it with Assembly.LoadWithPartialName()."
)]
private partial void LogWarningAzureServiceRuntimeNotLoaded();

[LoggerMessage(
Level = LogLevel.Error,
EventId = (int)ErrorCode.AzureServiceRuntime_FailedToLoad,
Message = "Failed to find or load Microsoft.WindowsAzure.ServiceRuntime."
)]
private partial void LogErrorAzureServiceRuntimeFailedToLoad();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static (string LowerBound, string UpperBound) ConstructPartitionKeyBounds
public override string ToString() => $"Reminder [PartitionKey={PartitionKey} RowKey={RowKey} GrainId={GrainReference} ReminderName={ReminderName} Deployment={DeploymentId} ServiceId={ServiceId} StartAt={StartAt} Period={Period} GrainRefConsistentHash={GrainRefConsistentHash}]";
}

internal sealed class RemindersTableManager : AzureTableDataManager<ReminderTableEntry>
internal sealed partial class RemindersTableManager : AzureTableDataManager<ReminderTableEntry>
{
private readonly string _serviceId;
private readonly string _clusterId;
Expand Down Expand Up @@ -106,17 +106,15 @@ public RemindersTableManager(
}
}

var queryResults = await ReadTableEntriesAndEtagsAsync(query);
return queryResults.ToList();
return await ReadTableEntriesAndEtagsAsync(query);
}

internal async Task<List<(ReminderTableEntry Entity, string ETag)>> FindReminderEntries(GrainId grainId)
{
var partitionKey = ReminderTableEntry.ConstructPartitionKey(_serviceId, grainId);
var (rowKeyLowerBound, rowKeyUpperBound) = ReminderTableEntry.ConstructRowKeyBounds(grainId);
var query = TableClient.CreateQueryFilter($"(PartitionKey eq {partitionKey}) and ((RowKey gt {rowKeyLowerBound}) and (RowKey le {rowKeyUpperBound}))");
var queryResults = await ReadTableEntriesAndEtagsAsync(query);
return queryResults.ToList();
return await ReadTableEntriesAndEtagsAsync(query);
}

internal async Task<(ReminderTableEntry Entity, string ETag)> FindReminderEntry(GrainId grainId, string reminderName)
Expand All @@ -140,11 +138,9 @@ internal async Task<string> UpsertRow(ReminderTableEntry reminderEntry)
}
catch(Exception exc)
{
HttpStatusCode httpStatusCode;
string restStatus;
if (AzureTableUtils.EvaluateException(exc, out httpStatusCode, out restStatus))
if (AzureTableUtils.EvaluateException(exc, out var httpStatusCode, out var restStatus))
{
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace("UpsertRow failed with HTTP status code: {HttpStatusCode}, REST status: {RestStatus}", httpStatusCode, restStatus);
LogTraceUpsertRowFailed(Logger, httpStatusCode, restStatus);
if (AzureTableUtils.IsContentionError(httpStatusCode)) return null; // false;
}
throw;
Expand All @@ -161,15 +157,9 @@ internal async Task<bool> DeleteReminderEntryConditionally(ReminderTableEntry re
}
catch(Exception exc)
{
HttpStatusCode httpStatusCode;
string restStatus;
if (AzureTableUtils.EvaluateException(exc, out httpStatusCode, out restStatus))
if (AzureTableUtils.EvaluateException(exc, out var httpStatusCode, out var restStatus))
{
if (Logger.IsEnabled(LogLevel.Trace))
Logger.LogTrace(
"DeleteReminderEntryConditionally failed with HTTP status code: {HttpStatusCode}, REST status: {RestStatus}",
httpStatusCode,
restStatus);
LogTraceDeleteReminderEntryConditionallyFailed(Logger, httpStatusCode, restStatus);
if (AzureTableUtils.IsContentionError(httpStatusCode)) return false;
}
throw;
Expand All @@ -189,13 +179,25 @@ internal async Task DeleteTableEntries()

foreach (var entriesPerPartition in groupedByHash.Values)
{
foreach (var batch in entriesPerPartition.BatchIEnumerable(this.StoragePolicyOptions.MaxBulkUpdateRows))
foreach (var batch in entriesPerPartition.BatchIEnumerable(this.StoragePolicyOptions.MaxBulkUpdateRows))
{
tasks.Add(DeleteTableEntriesAsync(batch));
}
}

await Task.WhenAll(tasks);
}

[LoggerMessage(
Level = LogLevel.Trace,
Message = "UpsertRow failed with HTTP status code: {HttpStatusCode}, REST status: {RestStatus}"
)]
private static partial void LogTraceUpsertRowFailed(ILogger logger, HttpStatusCode httpStatusCode, string restStatus);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "DeleteReminderEntryConditionally failed with HTTP status code: {HttpStatusCode}, REST status: {RestStatus}"
)]
private static partial void LogTraceDeleteReminderEntryConditionallyFailed(ILogger logger, HttpStatusCode httpStatusCode, string restStatus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Orleans.Providers.Streams.AzureQueue
/// <summary>
/// Receives batches of messages from a single partition of a message queue.
/// </summary>
internal class AzureQueueAdapterReceiver : IQueueAdapterReceiver
internal partial class AzureQueueAdapterReceiver : IQueueAdapterReceiver
{
private AzureQueueDataManager queue;
private long lastReadMessage;
Expand Down Expand Up @@ -131,9 +131,7 @@ public async Task MessagesDeliveredAsync(IList<IBatchContainer> messages)
}
catch (Exception exc)
{
logger.LogWarning((int)AzureQueueErrorCode.AzureQueue_15,
exc,
"Exception upon DeleteQueueMessage on queue {QueueName}. Ignoring.", this.azureQueueName);
LogWarningOnDeleteQueueMessage(exc, this.azureQueueName);
}
}
finally
Expand All @@ -142,6 +140,13 @@ public async Task MessagesDeliveredAsync(IList<IBatchContainer> messages)
}
}

[LoggerMessage(
Level = LogLevel.Warning,
EventId = (int)AzureQueueErrorCode.AzureQueue_15,
Message = "Exception upon DeleteQueueMessage on queue {QueueName}. Ignoring."
)]
private partial void LogWarningOnDeleteQueueMessage(Exception exception, string queueName);

private class PendingDelivery
{
public PendingDelivery(StreamSequenceToken token, QueueMessage message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private ValueTask<QueueClient> GetQueueClient()
}

return GetQueueClientAsync();
async ValueTask<QueueClient> GetQueueClientAsync() => _queueClient ??= await GetCloudQueueClient(options, logger);
async ValueTask<QueueClient> GetQueueClientAsync() => _queueClient ??= await GetCloudQueueClient(options);
}

/// <summary>
Expand Down Expand Up @@ -368,11 +368,11 @@ private void ReportErrorAndRethrow(Exception exc, string operation, AzureQueueEr
var errMsg = string.Format(
"Error doing {0} for Azure storage queue {1} " + Environment.NewLine
+ "Exception = {2}", operation, QueueName, exc);
logger.LogError((int)errorCode, exc, "{Message}", errMsg);
logger.LogError((int)errorCode, exc, "{Message}", errMsg); // TODO: pending on https://github.com/dotnet/runtime/issues/110570
throw new AggregateException(errMsg, exc);
}

private async Task<QueueClient> GetCloudQueueClient(AzureQueueOptions options, ILogger logger)
private async Task<QueueClient> GetCloudQueueClient(AzureQueueOptions options)
{
try
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
using System;
using Microsoft.Extensions.Logging;

namespace Orleans.Streaming.EventHubs
{
/// <summary>
Expand All @@ -16,37 +13,4 @@ internal enum OrleansEventHubErrorCode
FailedPartitionRead = ServiceBus + 1,
RetryReceiverInit = ServiceBus + 2,
}

internal static class LoggerExtensions
{
internal static void Debug(this ILogger logger, OrleansEventHubErrorCode errorCode, string format, params object[] args)
{
logger.LogDebug((int)errorCode, format, args);
}

internal static void Trace(this ILogger logger, OrleansEventHubErrorCode errorCode, string format, params object[] args)
{
logger.LogTrace((int)errorCode, format, args);
}

internal static void Info(this ILogger logger, OrleansEventHubErrorCode errorCode, string format, params object[] args)
{
logger.LogInformation((int)errorCode, format, args);
}

internal static void Warn(this ILogger logger, OrleansEventHubErrorCode errorCode, string format, params object[] args)
{
logger.LogWarning((int)errorCode, format, args);
}

internal static void Warn(this ILogger logger, OrleansEventHubErrorCode errorCode, string message, Exception exception)
{
logger.LogWarning((int)errorCode, exception, message);
}

internal static void Error(this ILogger logger, OrleansEventHubErrorCode errorCode, string message, Exception exception = null)
{
logger.LogError((int)errorCode, exception, message);
}
}
}
Loading
Loading