Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 46 additions & 4 deletions src/Orleans.Core/Diagnostics/Metrics/DirectoryInstruments.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
using System;
using System.Collections.Immutable;
using System.Diagnostics.Metrics;
using System.Threading;

#nullable disable
namespace Orleans.Runtime;

internal static class DirectoryInstruments
{
private static ImmutableArray<CacheSizeObserverRegistration> CacheSizeObservers = [];

internal static readonly Counter<int> LookupsLocalIssued = Instruments.Meter.CreateCounter<int>(InstrumentNames.DIRECTORY_LOOKUPS_LOCAL_ISSUED);
internal static readonly Counter<int> LookupsLocalSuccesses = Instruments.Meter.CreateCounter<int>(InstrumentNames.DIRECTORY_LOOKUPS_LOCAL_SUCCESSES);

Expand All @@ -19,7 +23,6 @@ internal static class DirectoryInstruments

internal static readonly Counter<int> LookupsCacheIssued = Instruments.Meter.CreateCounter<int>(InstrumentNames.DIRECTORY_LOOKUPS_CACHE_ISSUED);
internal static readonly Counter<int> LookupsCacheSuccesses = Instruments.Meter.CreateCounter<int>(InstrumentNames.DIRECTORY_LOOKUPS_CACHE_SUCCESSES);
internal static readonly Counter<int> ValidationsCacheSent = Instruments.Meter.CreateCounter<int>(InstrumentNames.DIRECTORY_VALIDATIONS_CACHE_SENT);
internal static readonly Counter<int> ValidationsCacheReceived = Instruments.Meter.CreateCounter<int>(InstrumentNames.DIRECTORY_VALIDATIONS_CACHE_RECEIVED);

internal static readonly Counter<int> SnapshotTransferCount = Instruments.Meter.CreateCounter<int>(InstrumentNames.DIRECTORY_RANGE_SNAPSHOT_TRANSFER_COUNT);
Expand All @@ -34,10 +37,14 @@ internal static void RegisterDirectoryPartitionSizeObserve(Func<int> observeValu
DirectoryPartitionSize = Instruments.Meter.CreateObservableGauge<int>(InstrumentNames.DIRECTORY_PARTITION_SIZE, observeValue);
}

internal static ObservableGauge<int> CacheSize;
internal static void RegisterCacheSizeObserve(Func<int> observeValue)
internal static readonly ObservableGauge<int> CacheSize = Instruments.Meter.CreateObservableGauge<int>(InstrumentNames.DIRECTORY_CACHE_SIZE, ObserveCacheSize);
internal static IDisposable RegisterCacheSizeObserve(Func<int> observeValue)
{
CacheSize = Instruments.Meter.CreateObservableGauge<int>(InstrumentNames.DIRECTORY_CACHE_SIZE, observeValue);
ArgumentNullException.ThrowIfNull(observeValue);

var registration = new CacheSizeObserverRegistration(observeValue);
ImmutableInterlocked.Update(ref CacheSizeObservers, static (observers, registration) => observers.Add(registration), registration);
return registration;
}

internal static ObservableGauge<int> RingSize;
Expand Down Expand Up @@ -75,4 +82,39 @@ internal static void RegisterMyPortionAverageRingPercentageObserve(Func<float> o
internal static readonly Counter<int> UnregistrationsManyIssued = Instruments.Meter.CreateCounter<int>(InstrumentNames.DIRECTORY_UNREGISTRATIONS_MANY_ISSUED);
internal static readonly Counter<int> UnregistrationsManyRemoteSent = Instruments.Meter.CreateCounter<int>(InstrumentNames.DIRECTORY_UNREGISTRATIONS_MANY_REMOTE_SENT);
internal static readonly Counter<int> UnregistrationsManyRemoteReceived = Instruments.Meter.CreateCounter<int>(InstrumentNames.DIRECTORY_UNREGISTRATIONS_MANY_REMOTE_RECEIVED);

private static int ObserveCacheSize()
{
var result = 0;
foreach (var observer in CacheSizeObservers)
{
result += observer.Observe();
}

return result;
Comment on lines +88 to +94
}

private sealed class CacheSizeObserverRegistration : IDisposable
{
private Func<int> observeValue;

public CacheSizeObserverRegistration(Func<int> observeValue)
{
this.observeValue = observeValue;
}

public int Observe()
{
var observer = Volatile.Read(ref observeValue);
return observer is null ? 0 : observer();
}

public void Dispose()
{
if (Interlocked.Exchange(ref observeValue, null) is not null)
{
ImmutableInterlocked.Update(ref CacheSizeObservers, static (observers, registration) => observers.Remove(registration), this);
}
}
}
}
5 changes: 0 additions & 5 deletions src/Orleans.Core/Diagnostics/Metrics/InstrumentNames.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,15 @@ internal static class InstrumentNames
public const string CATALOG_ACTIVATION_CONCURRENT_REGISTRATION_ATTEMPTS = "orleans-catalog-activation-concurrent-registration-attempts";

// Directory
// not used...
public const string DIRECTORY_LOOKUPS_LOCAL_ISSUED = "orleans-directory-lookups-local-issued";
// not used...
public const string DIRECTORY_LOOKUPS_LOCAL_SUCCESSES = "orleans-directory-lookups-local-successes";
public const string DIRECTORY_LOOKUPS_FULL_ISSUED = "orleans-directory-lookups-full-issued";
public const string DIRECTORY_LOOKUPS_REMOTE_SENT = "orleans-directory-lookups-remote-sent";
public const string DIRECTORY_LOOKUPS_REMOTE_RECEIVED = "orleans-directory-lookups-remote-received";
public const string DIRECTORY_LOOKUPS_LOCALDIRECTORY_ISSUED = "orleans-directory-lookups-local-directory-issued";
public const string DIRECTORY_LOOKUPS_LOCALDIRECTORY_SUCCESSES = "orleans-directory-lookups-local-directory-successes";
// not used
public const string DIRECTORY_LOOKUPS_CACHE_ISSUED = "orleans-directory-lookups-cache-issued";
// not used
public const string DIRECTORY_LOOKUPS_CACHE_SUCCESSES = "orleans-directory-lookups-cache-successes";
public const string DIRECTORY_VALIDATIONS_CACHE_SENT = "orleans-directory-validations-cache-sent";
public const string DIRECTORY_VALIDATIONS_CACHE_RECEIVED = "orleans-directory-validations-cache-received";
public const string DIRECTORY_PARTITION_SIZE = "orleans-directory-partition-size";
public const string DIRECTORY_CACHE_SIZE = "orleans-directory-cache-size";
Expand Down
2 changes: 2 additions & 0 deletions src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public bool TryLookupInCache(GrainId grainId, out GrainAddress address)
ThrowUnsupportedGrainType(grainId);
}

DirectoryInstruments.LookupsCacheIssued.Add(1);
if (this.cache.LookUp(grainId, out address, out _))
{
// If the silo is dead, remove the entry
Expand All @@ -228,6 +229,7 @@ public bool TryLookupInCache(GrainId grainId, out GrainAddress address)
else
{
// Entry found and valid -> return it
DirectoryInstruments.LookupsCacheSuccesses.Add(1);
return true;
}
}
Expand Down
12 changes: 11 additions & 1 deletion src/Orleans.Runtime/GrainDirectory/DhtGrainLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,17 @@ public static DhtGrainLocator FromLocalGrainDirectory(LocalGrainDirectory localG
public void UpdateCache(GrainId grainId, SiloAddress siloAddress) => _localGrainDirectory.AddOrUpdateCacheEntry(grainId, siloAddress);
public void InvalidateCache(GrainId grainId) => _localGrainDirectory.InvalidateCacheEntry(grainId);
public void InvalidateCache(GrainAddress address) => _localGrainDirectory.InvalidateCacheEntry(address);
public bool TryLookupInCache(GrainId grainId, out GrainAddress address) => _localGrainDirectory.TryCachedLookup(grainId, out address);
public bool TryLookupInCache(GrainId grainId, out GrainAddress address)
{
DirectoryInstruments.LookupsLocalIssued.Add(1);
if (!_localGrainDirectory.TryCachedLookup(grainId, out address))
{
return false;
}

DirectoryInstruments.LookupsLocalSuccesses.Add(1);
return true;
}

private class BatchedDeregistrationWorker
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ public async Task RegisterMany(List<GrainAddress> addresses)

public async Task<List<AddressAndTag>> LookUpMany(List<(GrainId GrainId, int Version)> grainAndETagList)
{
DirectoryInstruments.ValidationsCacheReceived.Add(1);
LogInformationLookUpManyReceived(_logger, Silo, grainAndETagList.Count);

using (var cts = CreateTimeoutCts(_directory.OnStoppedToken))
Expand Down
10 changes: 0 additions & 10 deletions src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ internal interface ILocalGrainDirectory : IDhtGrainDirectory
/// <param name="origin"> the silo from which the message to the non-existing activation was sent</param>
Task UnregisterAfterNonexistingActivation(GrainAddress address, SiloAddress origin);

/// <summary>
/// Fetches locally known directory information for a grain.
/// If there is no local information, either in the cache or in this node's directory partition,
/// then this method will return false and leave the list empty.
/// </summary>
/// <param name="grain">The ID of the grain to look up.</param>
/// <param name="addresses">An output parameter that receives the list of locally-known activations of the grain.</param>
/// <returns>True if remote addresses are complete within freshness constraint</returns>
bool LocalLookup(GrainId grain, out AddressAndTag addresses);

/// <summary>
/// Invalidates cache entry for the given activation address.
/// This method is intended to be called whenever a directory client tries to access
Expand Down
99 changes: 11 additions & 88 deletions src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -788,63 +788,6 @@ public async Task UnregisterManyAsync(List<GrainAddress> addresses, Unregistrati
}


public bool LocalLookup(GrainId grain, out AddressAndTag result)
{
DirectoryInstruments.LookupsLocalIssued.Add(1);

var silo = CalculateGrainDirectoryPartition(grain);

LogDebugLocalLookupAttempt(
MyAddress,
grain,
silo,
new(grain),
new(silo));

//this will only happen if I'm the only silo in the cluster and I'm shutting down
if (silo == null)
{
LogTraceLocalLookupMineNull(grain);
result = default;
return false;
}

// handle cache
DirectoryInstruments.LookupsCacheIssued.Add(1);
var address = GetLocalCacheData(grain);
if (address != default)
{
result = new(address, 0);

LogTraceLocalLookupCache(grain, result.Address);
DirectoryInstruments.LookupsCacheSuccesses.Add(1);
DirectoryInstruments.LookupsLocalSuccesses.Add(1);
return true;
}

// check if we own the grain
if (silo.Equals(MyAddress))
{
DirectoryInstruments.LookupsLocalDirectoryIssued.Add(1);
result = GetLocalDirectoryData(grain);
if (result.Address == null)
{
// it can happen that we cannot find the grain in our partition if there were
// some recent changes in the membership
LogTraceLocalLookupMineNull(grain);
return false;
}
LogTraceLocalLookupMine(grain, result.Address);
DirectoryInstruments.LookupsLocalDirectorySuccesses.Add(1);
DirectoryInstruments.LookupsLocalSuccesses.Add(1);
return true;
}

LogTraceTryFullLookupElse(grain);
result = default;
return false;
}

public AddressAndTag GetLocalDirectoryData(GrainId grain) => DirectoryPartition.LookUpActivation(grain);

public GrainAddress? GetLocalCacheData(GrainId grain)
Expand Down Expand Up @@ -1008,7 +951,17 @@ private static int CompareSiloAddress(SiloAddress left, SiloAddress right)
}

public void AddOrUpdateCacheEntry(GrainId grainId, SiloAddress siloAddress) => this.DirectoryCache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0);
public bool TryCachedLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress? address) => (address = GetLocalCacheData(grainId)) is not null;
public bool TryCachedLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress? address)
{
DirectoryInstruments.LookupsCacheIssued.Add(1);
if ((address = GetLocalCacheData(grainId)) is null)
{
return false;
}

DirectoryInstruments.LookupsCacheSuccesses.Add(1);
return true;
}
void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
lifecycle.Subscribe<LocalGrainDirectory>(ServiceLifecycleStage.RuntimeServices, (ct) => Task.Run(() => Start()), (ct) => Task.Run(() => StopAsync()));
Expand Down Expand Up @@ -1136,36 +1089,6 @@ private readonly struct SiloHashLogValue(SiloAddress? silo)
)]
private partial void LogWarningUnregisterManyAsyncNotOwner(int count, int hopCount);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Silo {SiloAddress} tries to lookup for {Grain}-->{PartitionOwner} ({GrainHashCode}-->{PartitionOwnerHashCode})"
)]
private partial void LogDebugLocalLookupAttempt(SiloAddress siloAddress, GrainId grain, SiloAddress? partitionOwner, GrainHashLogValue grainHashCode, SiloHashLogValue partitionOwnerHashCode);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "LocalLookup mine {GrainId}=null"
)]
private partial void LogTraceLocalLookupMineNull(GrainId grainId);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "LocalLookup cache {GrainId}={TargetAddress}"
)]
private partial void LogTraceLocalLookupCache(GrainId grainId, GrainAddress? targetAddress);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "LocalLookup mine {GrainId}={Address}"
)]
private partial void LogTraceLocalLookupMine(GrainId grainId, GrainAddress? address);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "TryFullLookup else {GrainId}=null"
)]
private partial void LogTraceTryFullLookupElse(GrainId grainId);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "LookupAsync - It seems we are not the owner of grain {GrainId} (hash: {Hash:X}), trying to forward it to {ForwardAddress} (hopCount={HopCount})"
Expand Down
32 changes: 24 additions & 8 deletions src/Orleans.Runtime/GrainDirectory/LruGrainDirectoryCache.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Orleans.Caching;

#nullable disable
namespace Orleans.Runtime.GrainDirectory;

internal sealed class LruGrainDirectoryCache(
int maxCacheSize,
TimeSpan maxCacheTTL,
TimeProvider timeProvider) : ConcurrentLruCache<GrainId, (GrainAddress ActivationAddress, int Version)>(
capacity: maxCacheSize,
comparer: null,
timeToLive: maxCacheTTL,
timeProvider: timeProvider), IGrainDirectoryCache
internal sealed class LruGrainDirectoryCache : ConcurrentLruCache<GrainId, (GrainAddress ActivationAddress, int Version)>, IGrainDirectoryCache, IAsyncDisposable
{
private static readonly Func<(GrainAddress Address, int Version), GrainAddress, bool> ActivationAddressesMatch = (value, state) => GrainAddress.MatchesGrainIdAndSilo(state, value.Address);
private readonly IDisposable _cacheSizeRegistration;

public LruGrainDirectoryCache(
int maxCacheSize,
TimeSpan maxCacheTTL,
TimeProvider timeProvider)
: base(
capacity: maxCacheSize,
comparer: null,
timeToLive: maxCacheTTL,
timeProvider: timeProvider)
{
_cacheSizeRegistration = DirectoryInstruments.RegisterCacheSizeObserve(() => Count);
}

public void AddOrUpdate(GrainAddress activationAddress, int version) => AddOrUpdate(activationAddress.GrainId, (activationAddress, version));

Expand Down Expand Up @@ -46,4 +54,12 @@ public bool LookUp(GrainId key, out GrainAddress result, out int version)
}
}
}

public new async ValueTask DisposeAsync()
{
_cacheSizeRegistration.Dispose();
await base.DisposeAsync();
}

async ValueTask IAsyncDisposable.DisposeAsync() => await DisposeAsync();
}
5 changes: 0 additions & 5 deletions test/Orleans.Core.Tests/Directory/MockLocalGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ public bool IsSiloInCluster(SiloAddress silo)
throw new NotImplementedException();
}

public bool LocalLookup(GrainId grain, out AddressAndTag addresses)
{
throw new NotImplementedException();
}

public Task<AddressAndTag> LookupAsync(GrainId grainId, int hopCount = 0)
{
throw new NotImplementedException();
Expand Down
Loading