Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 55 additions & 29 deletions src/Orleans.Runtime/ConsistentRing/ConsistentRingProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Orleans.Runtime.ConsistentRing
/// Note: MembershipOracle uses 'forward/counter-clockwise' definition to assign responsibilities.
/// E.g. in a ring of nodes {5, 10, 15}, the responsible of key 7 is node 5 (the node is responsible for its succeeding range).
/// </summary>
internal sealed class ConsistentRingProvider :
internal sealed partial class ConsistentRingProvider :
IConsistentRingProvider, ISiloStatusListener, IDisposable
{
// internal, so that unit tests can access them
Expand Down Expand Up @@ -81,7 +81,6 @@ internal void AddServer(SiloAddress silo)
if (!(membershipRingList.Count == 0 || myOldIndex != -1))
throw new OrleansException(string.Format("{0}: Couldn't find my position in the ring {1}.", MyAddress, Utils.EnumerableToString(membershipRingList)));


// insert new silo in the sorted order
int hash = silo.GetConsistentHashCode();

Expand All @@ -100,10 +99,7 @@ internal void AddServer(SiloAddress silo)
NotifyLocalRangeSubscribers(oldRange, myRange, false);
}

if (log.IsEnabled(LogLevel.Debug))
{
log.LogDebug("Added Server {SiloAddress}. Current view: {CurrentView}", silo.ToStringWithHashCode(), this.ToString());
}
LogDebugAddedServer(log, new(silo), this);
}
}

Expand Down Expand Up @@ -140,12 +136,12 @@ internal void RemoveServer(SiloAddress silo)
int myNewIndex = membershipRingList.IndexOf(MyAddress);

if (myNewIndex == -1)
throw new OrleansException($"{MyAddress}: Couldn't find my position in the ring {this.ToString()}.");
throw new OrleansException($"{MyAddress}: Couldn't find my position in the ring {this}.");

bool wasMyPred = ((myNewIndex == indexOfFailedSilo) || (myNewIndex == 0 && indexOfFailedSilo == membershipRingList.Count)); // no need for '- 1'
if (wasMyPred) // failed node was our predecessor
{
if (log.IsEnabled(LogLevel.Debug)) log.LogDebug("Failed server was my predecessor? {WasPredecessor}, updated view {CurrentView}", wasMyPred, this.ToString());
LogDebugFailedServerWasMyPredecessor(log, wasMyPred, this);

IRingRange oldRange = myRange;
if (membershipRingList.Count == 1) // i'm the only one left
Expand All @@ -163,14 +159,7 @@ internal void RemoveServer(SiloAddress silo)
}
}

if (log.IsEnabled(LogLevel.Debug))
{
log.LogDebug(
"Removed Server {SiloAddress} hash {Hash}. Current view {CurrentView}",
silo,
silo.GetConsistentHashCode(),
this.ToString());
}
LogDebugRemovedServer(log, silo, new(silo), this);
}
}

Expand Down Expand Up @@ -199,10 +188,7 @@ public bool UnSubscribeFromRangeChangeEvents(IRingRangeListener observer)

private void NotifyLocalRangeSubscribers(IRingRange old, IRingRange now, bool increased)
{
if (log.IsEnabled(LogLevel.Debug))
{
log.LogDebug("NotifyLocalRangeSubscribers about old {OldRange} new {NewRange} increased? {IsIncreased}", old, now, increased);
}
LogDebugNotifyLocalRangeSubscribers(log, old, now, increased);

IRingRangeListener[] copy;
lock (statusListeners)
Expand All @@ -219,14 +205,7 @@ private void NotifyLocalRangeSubscribers(IRingRange old, IRingRange now, bool in
}
catch (Exception exc)
{
log.LogWarning(
(int)ErrorCode.CRP_Local_Subscriber_Exception,
exc,
"Error notifying listener '{ListenerType}' of ring range {AdjustmentKind} from '{OldRange}' to '{NewRange}'.",
listener.GetType().FullName,
increased ? "expansion" : "contraction",
old,
now);
LogWarningErrorNotifyingListener(log, exc, listener.GetType().FullName, increased ? "expansion" : "contraction", old, now);
}
}
}
Expand Down Expand Up @@ -305,7 +284,7 @@ private SiloAddress CalculateTargetSilo(uint hash, bool excludeThisSiloIfStoppin
}
}

if (log.IsEnabled(LogLevel.Trace)) log.LogTrace("Silo {SiloAddress} calculated ring partition owner silo {OwnerAddress} for key {Key}: {Key} --> {OwnerHash}", MyAddress, siloAddress, hash, hash, siloAddress?.GetConsistentHashCode());
LogTraceCalculatedRingPartitionOwner(log, MyAddress, siloAddress, hash, new(siloAddress));
return siloAddress;
}

Expand All @@ -318,5 +297,52 @@ public void Dispose()
{
_siloStatusOracle.UnSubscribeFromSiloStatusEvents(this);
}

private readonly struct SiloAddressWithHashLogRecord(SiloAddress siloAddress)
{
public override string ToString() => siloAddress.ToStringWithHashCode();
}

private readonly struct ConsistentHashCodeLogRecord(SiloAddress siloAddress)
{
public override string ToString() => siloAddress?.GetConsistentHashCode().ToString();
}

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Added Server {SiloAddress}. Current view: {CurrentView}"
)]
private static partial void LogDebugAddedServer(ILogger logger, SiloAddressWithHashLogRecord siloAddress, ConsistentRingProvider currentView);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Failed server was my predecessor? {WasPredecessor}, updated view {CurrentView}"
)]
private static partial void LogDebugFailedServerWasMyPredecessor(ILogger logger, bool wasPredecessor, ConsistentRingProvider currentView);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Removed Server {SiloAddress} hash {Hash}. Current view {CurrentView}"
)]
private static partial void LogDebugRemovedServer(ILogger logger, SiloAddress siloAddress, SiloAddressWithHashLogRecord hash, ConsistentRingProvider currentView);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "NotifyLocalRangeSubscribers about old {OldRange} new {NewRange} increased? {IsIncreased}"
)]
private static partial void LogDebugNotifyLocalRangeSubscribers(ILogger logger, IRingRange oldRange, IRingRange newRange, bool isIncreased);

[LoggerMessage(
EventId = (int)ErrorCode.CRP_Local_Subscriber_Exception,
Level = LogLevel.Warning,
Message = "Error notifying listener '{ListenerType}' of ring range {AdjustmentKind} from '{OldRange}' to '{NewRange}'."
)]
private static partial void LogWarningErrorNotifyingListener(ILogger logger, Exception exception, string listenerType, string adjustmentKind, IRingRange oldRange, IRingRange newRange);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Silo {SiloAddress} calculated ring partition owner silo {OwnerAddress} for key {Key}: {Key} --> {OwnerHash}"
)]
private static partial void LogTraceCalculatedRingPartitionOwner(ILogger logger, SiloAddress siloAddress, SiloAddress ownerAddress, uint key, ConsistentHashCodeLogRecord ownerHash);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

namespace Orleans.Runtime.GrainDirectory
{
internal sealed class AdaptiveDirectoryCacheMaintainer
internal sealed partial class AdaptiveDirectoryCacheMaintainer
{
private static readonly TimeSpan SLEEP_TIME_BETWEEN_REFRESHES = Debugger.IsAttached ? TimeSpan.FromMinutes(5) : TimeSpan.FromMinutes(1); // this should be something like minTTL/4

Expand Down Expand Up @@ -135,14 +135,7 @@ private async Task Run()
}
}

if (Log.IsEnabled(LogLevel.Trace))
Log.LogTrace(
"Silo {SiloAddress} self-owned (and removed) {OwnedAndRemovedCount}, kept {KeptCount}, removed {RemovedCount} and tried to refresh {RefreshedCount} grains",
router.MyAddress,
ownedAndRemovedCount,
keptCount,
removedCount,
refreshedCount);
LogTraceSelfOwnedAndRemoved(Log, router.MyAddress, ownedAndRemovedCount, keptCount, removedCount, refreshedCount);

// Send batch requests
SendBatchCacheRefreshRequests(fetchInBatchList);
Expand Down Expand Up @@ -174,15 +167,15 @@ private void SendBatchCacheRefreshRequests(Dictionary<SiloAddress, List<GrainId>
ProcessCacheRefreshResponse(silo, response);
}).Ignore();

if (Log.IsEnabled(LogLevel.Trace)) Log.LogTrace("Silo {SiloAddress} is sending request to silo {OwnerSilo} with {Count} entries", router.MyAddress, silo, cachedGrainAndETagList.Count);
LogTraceSendingRequest(Log, router.MyAddress, silo, cachedGrainAndETagList.Count);
}
}

private void ProcessCacheRefreshResponse(
SiloAddress silo,
List<AddressAndTag> refreshResponse)
{
if (Log.IsEnabled(LogLevel.Trace)) Log.LogTrace("Silo {SiloAddress} received ProcessCacheRefreshResponse. #Response entries {Count}.", router.MyAddress, refreshResponse.Count);
LogTraceReceivedProcessCacheRefreshResponse(Log, router.MyAddress, refreshResponse.Count);

int otherSiloCount = 0, updatedCount = 0, unchangedCount = 0;

Expand Down Expand Up @@ -218,14 +211,7 @@ private void ProcessCacheRefreshResponse(
}
}

if (Log.IsEnabled(LogLevel.Trace))
Log.LogTrace(
"Silo {SiloAddress} processed refresh response from {OtherSilo} with {UpdatedCount} updated, {RemovedCount} removed, {UnchangedCount} unchanged grains",
router.MyAddress,
silo,
otherSiloCount,
updatedCount,
unchangedCount);
LogTraceProcessedRefreshResponse(Log, router.MyAddress, silo, otherSiloCount, updatedCount, unchangedCount);
}

/// <summary>
Expand Down Expand Up @@ -277,5 +263,29 @@ private void ProduceStats()
lastNumAccesses = curNumAccesses;
lastNumHits = curNumHits;
}

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Silo {SiloAddress} self-owned (and removed) {OwnedAndRemovedCount}, kept {KeptCount}, removed {RemovedCount} and tried to refresh {RefreshedCount} grains"
)]
private static partial void LogTraceSelfOwnedAndRemoved(ILogger logger, SiloAddress siloAddress, int ownedAndRemovedCount, int keptCount, int removedCount, int refreshedCount);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Silo {SiloAddress} is sending request to silo {OwnerSilo} with {Count} entries"
)]
private static partial void LogTraceSendingRequest(ILogger logger, SiloAddress siloAddress, SiloAddress ownerSilo, int count);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Silo {SiloAddress} received ProcessCacheRefreshResponse. #Response entries {Count}."
)]
private static partial void LogTraceReceivedProcessCacheRefreshResponse(ILogger logger, SiloAddress siloAddress, int count);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Silo {SiloAddress} processed refresh response from {OtherSilo} with {UpdatedCount} updated, {RemovedCount} removed, {UnchangedCount} unchanged grains"
)]
private static partial void LogTraceProcessedRefreshResponse(ILogger logger, SiloAddress siloAddress, SiloAddress otherSilo, int updatedCount, int removedCount, int unchangedCount);
}
}
Loading
Loading