Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 89 additions & 29 deletions src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Net;
using System.Text;
using System.Text.Json;
using System.Threading;
Expand All @@ -14,7 +15,7 @@

namespace Orleans.GrainDirectory.Redis
{
public class RedisGrainDirectory : IGrainDirectory, ILifecycleParticipant<ISiloLifecycle>
public partial class RedisGrainDirectory : IGrainDirectory, ILifecycleParticipant<ISiloLifecycle>
{
private readonly RedisGrainDirectoryOptions _directoryOptions;
private readonly ClusterOptions _clusterOptions;
Expand Down Expand Up @@ -44,8 +45,7 @@ public RedisGrainDirectory(
{
var result = (string?)await _database.StringGetAsync(GetKey(grainId));

if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("Lookup {GrainId}: {Result}", grainId, string.IsNullOrWhiteSpace(result) ? "null" : result);
LogDebugLookup(grainId, string.IsNullOrWhiteSpace(result) ? "null" : result);

if (string.IsNullOrWhiteSpace(result))
return default;
Expand All @@ -54,7 +54,7 @@ public RedisGrainDirectory(
}
catch (Exception ex)
{
_logger.LogError(ex, "Lookup failed for {GrainId}", grainId);
LogErrorLookupFailed(ex, grainId);

if (IsRedisException(ex))
throw new OrleansException($"Lookup failed for {grainId} : {ex}");
Expand All @@ -64,12 +64,12 @@ public RedisGrainDirectory(
}

public Task<GrainAddress?> Register(GrainAddress address) => Register(address, null);

public async Task<GrainAddress?> Register(GrainAddress address, GrainAddress? previousAddress)
{
const string RegisterScript =
"""
local cur = redis.call('GET', KEYS[1])
local cur = redis.call('GET', KEYS[1])
local success = true
if cur ~= false then
local typedCur = cjson.decode(cur)
Expand All @@ -82,7 +82,7 @@ public RedisGrainDirectory(
redis.call('SET', KEYS[1], ARGV[1])
if ARGV[3] ~= '-1' then
redis.call('EXPIRE', KEYS[1], ARGV[3])
end
end
return nil
end

Expand All @@ -101,24 +101,18 @@ public RedisGrainDirectory(

if (entryString is null)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Registered {GrainId} ({Address})", address.GrainId, value);
}
LogDebugRegistered(address.GrainId, value);

return address;
}

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Failed to register {GrainId} ({Address}) in directory: Conflicted with existing value, {Result}", address.GrainId, value, entryString);
}
LogDebugRegisterFailed(address.GrainId, value, entryString);

return JsonSerializer.Deserialize<GrainAddress>(entryString);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to register {GrainId} ({Address}) in directory", address.GrainId, value);
LogErrorRegisterFailed(ex, address.GrainId, value);

if (IsRedisException(ex))
{
Expand All @@ -135,7 +129,7 @@ public async Task Unregister(GrainAddress address)
{
const string DeleteScript =
"""
local cur = redis.call('GET', KEYS[1])
local cur = redis.call('GET', KEYS[1])
if cur ~= false then
local typedCur = cjson.decode(cur)
if typedCur.ActivationId == ARGV[1] then
Expand All @@ -153,19 +147,14 @@ public async Task Unregister(GrainAddress address)
keys: new RedisKey[] { GetKey(address.GrainId) },
values: new RedisValue[] { address.ActivationId.ToString() });

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Unregister {GrainId} ({Address}): {Result}", address.GrainId, JsonSerializer.Serialize(address), (result != 0) ? "OK" : "Conflict");
}
LogDebugUnregister(address.GrainId, new(address), (result != 0) ? "OK" : "Conflict");
}
catch (Exception ex)
{
var value = JsonSerializer.Serialize(address);

_logger.LogError(ex, "Unregister failed for {GrainId} ({Address})", address.GrainId, value);
LogErrorUnregisterFailed(ex, address.GrainId, new(address));

if (IsRedisException(ex))
throw new OrleansException($"Unregister failed for {address.GrainId} ({value}) : {ex}");
throw new OrleansException($"Unregister failed for {address.GrainId} ({JsonSerializer.Serialize(address)}) : {ex}");
else
throw;
}
Expand Down Expand Up @@ -209,16 +198,87 @@ private async Task Uninitialize(CancellationToken arg)

#region Logging
private void LogConnectionRestored(object? sender, ConnectionFailedEventArgs e)
=> _logger.LogInformation(e.Exception, "Connection to {EndPoint} failed: {FailureType}", e.EndPoint, e.FailureType);
=> LogInfoConnectionRestored(e.Exception, e.EndPoint, e.FailureType);

private void LogConnectionFailed(object? sender, ConnectionFailedEventArgs e)
=> _logger.LogError(e.Exception, "Connection to {EndPoint} failed: {FailureType}", e.EndPoint, e.FailureType);
=> LogErrorConnectionFailed(e.Exception, e.EndPoint, e.FailureType);

private void LogErrorMessage(object? sender, RedisErrorEventArgs e)
=> _logger.LogError(e.Message);
=> LogErrorRedisMessage(e.Message);

private void LogInternalError(object? sender, InternalErrorEventArgs e)
=> _logger.LogError(e.Exception, "Internal error");
=> LogErrorInternalError(e.Exception);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Lookup {GrainId}: {Result}"
)]
private partial void LogDebugLookup(GrainId grainId, string result);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Lookup failed for {GrainId}"
)]
private partial void LogErrorLookupFailed(Exception exception, GrainId grainId);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Registered {GrainId} ({Address})"
)]
private partial void LogDebugRegistered(GrainId grainId, string address);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Failed to register {GrainId} ({Address}) in directory: Conflicted with existing value, {Result}"
)]
private partial void LogDebugRegisterFailed(GrainId grainId, string address, string result);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Failed to register {GrainId} ({Address}) in directory"
)]
private partial void LogErrorRegisterFailed(Exception exception, GrainId grainId, string address);

private readonly struct GrainAddressLogRecord(GrainAddress address)
{
public override string ToString() => JsonSerializer.Serialize(address);
}

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Unregister {GrainId} ({Address}): {Result}"
)]
private partial void LogDebugUnregister(GrainId grainId, GrainAddressLogRecord address, string result);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Unregister failed for {GrainId} ({Address})"
)]
private partial void LogErrorUnregisterFailed(Exception exception, GrainId grainId, GrainAddressLogRecord address);

[LoggerMessage(
Level = LogLevel.Information,
Message = "Connection to {EndPoint} restored: {FailureType}"
)]
private partial void LogInfoConnectionRestored(Exception? exception, EndPoint? endPoint, ConnectionFailureType failureType);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Connection to {EndPoint} failed: {FailureType}"
)]
private partial void LogErrorConnectionFailed(Exception? exception, EndPoint? endPoint, ConnectionFailureType failureType);

[LoggerMessage(
Level = LogLevel.Error,
Message = "{Message}"
)]
private partial void LogErrorRedisMessage(string message);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Internal error"
)]
private partial void LogErrorInternalError(Exception? exception);
#endregion

// These exceptions are not serializable by the client
Expand Down
76 changes: 39 additions & 37 deletions src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace Orleans.Persistence
/// <summary>
/// Redis-based grain storage provider
/// </summary>
public class RedisGrainStorage : IGrainStorage, ILifecycleParticipant<ISiloLifecycle>
public partial class RedisGrainStorage : IGrainStorage, ILifecycleParticipant<ISiloLifecycle>
{
private readonly string _serviceId;
private readonly RedisValue _ttl;
Expand Down Expand Up @@ -65,42 +65,22 @@ public void Participate(ISiloLifecycle lifecycle)

private async Task Init(CancellationToken cancellationToken)
{
var timer = Stopwatch.StartNew();
var startTime = Stopwatch.GetTimestamp();

try
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug(
"RedisGrainStorage {Name} is initializing: ServiceId={ServiceId} DeleteOnClear={DeleteOnClear}",
_name,
_serviceId,
_options.DeleteStateOnClear);
}
LogDebugInitializing(_name, _serviceId, _options.DeleteStateOnClear);

_connection = await _options.CreateMultiplexer(_options).ConfigureAwait(false);
_db = _connection.GetDatabase();

if (_logger.IsEnabled(LogLevel.Debug))
{
timer.Stop();
_logger.LogDebug(
"Init: Name={Name} ServiceId={ServiceId}, initialized in {ElapsedMilliseconds} ms",
_name,
_serviceId,
timer.Elapsed.TotalMilliseconds.ToString("0.00"));
}
var elapsed = Stopwatch.GetElapsedTime(startTime);
LogDebugInitialized(_name, _serviceId, elapsed.TotalMilliseconds);
}
catch (Exception ex)
{
timer.Stop();
_logger.LogError(
ex,
"Init: Name={Name} ServiceId={ServiceId}, errored in {ElapsedMilliseconds} ms.",
_name,
_serviceId,
timer.Elapsed.TotalMilliseconds.ToString("0.00"));

var elapsed = Stopwatch.GetElapsedTime(startTime);
LogErrorInitFailed(ex, _name, _serviceId, elapsed.TotalMilliseconds);
throw new RedisStorageException(Invariant($"{ex.GetType()}: {ex.Message}"));
}
}
Expand Down Expand Up @@ -139,11 +119,7 @@ public async Task ReadStateAsync<T>(string grainType, GrainId grainId, IGrainSta
}
catch (Exception exception)
{
_logger.LogError(
"Failed to read grain state for {GrainType} grain with ID {GrainId} and storage key {Key}.",
grainType,
grainId,
key);
LogErrorReadStateFailed(exception, grainType, grainId, key);
throw new RedisStorageException(Invariant($"Failed to read grain state for {grainType} with ID {grainId} and storage key {key}. {exception.GetType()}: {exception.Message}"));
}
}
Expand Down Expand Up @@ -186,11 +162,7 @@ public async Task WriteStateAsync<T>(string grainType, GrainId grainId, IGrainSt
}
catch (Exception exception) when (exception is not InconsistentStateException)
{
_logger.LogError(
"Failed to write grain state for {GrainType} grain with ID {GrainId} and storage key {Key}.",
grainType,
grainId,
key);
LogErrorWriteStateFailed(exception, grainType, grainId, key);
throw new RedisStorageException(
Invariant($"Failed to write grain state for {grainType} grain with ID {grainId} and storage key {key}. {exception.GetType()}: {exception.Message}"));
}
Expand Down Expand Up @@ -288,5 +260,35 @@ private async Task Close(CancellationToken cancellationToken)
}

private T CreateInstance<T>() => _activatorProvider.GetActivator<T>().Create();

[LoggerMessage(
Level = LogLevel.Debug,
Message = "RedisGrainStorage {Name} is initializing: ServiceId={ServiceId} DeleteOnClear={DeleteOnClear}"
)]
private partial void LogDebugInitializing(string name, string serviceId, bool deleteOnClear);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Init: Name={Name} ServiceId={ServiceId}, initialized in {ElapsedMilliseconds} ms"
)]
private partial void LogDebugInitialized(string name, string serviceId, double elapsedMilliseconds);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Init: Name={Name} ServiceId={ServiceId}, errored in {ElapsedMilliseconds} ms."
)]
private partial void LogErrorInitFailed(Exception exception, string name, string serviceId, double elapsedMilliseconds);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Failed to read grain state for {GrainType} grain with ID {GrainId} and storage key {Key}."
)]
private partial void LogErrorReadStateFailed(Exception exception, string grainType, GrainId grainId, RedisKey key);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Failed to write grain state for {GrainType} grain with ID {GrainId} and storage key {Key}."
)]
private partial void LogErrorWriteStateFailed(Exception exception, string grainType, GrainId grainId, RedisKey key);
}
}
18 changes: 13 additions & 5 deletions src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

namespace Orleans.Reminders.Redis
{
internal class RedisReminderTable : IReminderTable
internal partial class RedisReminderTable : IReminderTable
{
private readonly RedisKey _hashSetKey;
private readonly RedisReminderTableOptions _redisOptions;
Expand Down Expand Up @@ -175,10 +175,7 @@ public async Task<string> UpsertRow(ReminderEntry entry)

try
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("UpsertRow entry = {Entry}, ETag = {ETag}", entry.ToString(), entry.ETag);
}
LogDebugUpsertRow(new(entry), entry.ETag);

var (newETag, value) = ConvertFromEntry(entry);
var (from, to) = GetFilter(entry.GrainId, entry.ReminderName);
Expand Down Expand Up @@ -247,5 +244,16 @@ private static ReminderEntry ConvertToEntry(string reminderValue)

return (eTag, JsonConvert.SerializeObject(segments, _jsonSettings)[1..^1]);
}

private readonly struct ReminderEntryLogValue(ReminderEntry entry)
{
public override string ToString() => entry.ToString();
}

[LoggerMessage(
Level = LogLevel.Debug,
Message = "UpsertRow entry = {Entry}, ETag = {ETag}"
)]
private partial void LogDebugUpsertRow(ReminderEntryLogValue entry, string eTag);
}
}
Loading