From 25e190a359e5ff05e3e0bf532b5f4791b8279425 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Fri, 29 May 2026 14:24:14 -0700 Subject: [PATCH 1/4] fix(reminders): gate delivery on active silo Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../ReminderService/LocalReminderService.cs | 197 +++++++++++++++--- .../TestInternalGrains/ReminderTestGrain2.cs | 30 ++- 2 files changed, 190 insertions(+), 37 deletions(-) diff --git a/src/Orleans.Reminders/ReminderService/LocalReminderService.cs b/src/Orleans.Reminders/ReminderService/LocalReminderService.cs index 9acd2f22300..d816bdc1a9f 100644 --- a/src/Orleans.Reminders/ReminderService/LocalReminderService.cs +++ b/src/Orleans.Reminders/ReminderService/LocalReminderService.cs @@ -32,6 +32,10 @@ internal sealed partial class LocalReminderService : GrainService, IReminderServ private long localTableSequence; private uint initialReadCallCount = 0; private Task? runTask; + private readonly object _deliveryLock = new(); + private bool _isDeliveringReminders; + private int _activeReminderDeliveries; + private TaskCompletionSource? _deliveryQuiesced; public LocalReminderService( GrainReferenceActivator referenceActivator, @@ -71,7 +75,7 @@ void ILifecycleParticipant.Participate(ISiloLifecycle observer) nameof(LocalReminderService), ServiceLifecycleStage.Active, StartReminderService, - NoOpStop); + StopDeliveringReminders); async Task InitializeReminderService(CancellationToken ct) { @@ -106,16 +110,24 @@ async Task StartReminderService(CancellationToken ct) try { - await this.QueueTask(Start).WaitAsync(cts.Token); + await this.QueueTask(async () => + { + StartDeliveringReminders(); + await Start(); + }).WaitAsync(cts.Token); } catch (Exception exception) { + await StopDeliveringReminders(cts.Token); LogErrorStartingReminderService(exception); throw; } } - static Task NoOpStop(CancellationToken _) => Task.CompletedTask; + Task StopDeliveringReminders(CancellationToken ct) + { + return this.QueueTask(() => StopDeliveringRemindersAsync()).WaitAsync(ct); + } } /// @@ -124,6 +136,8 @@ async Task StartReminderService(CancellationToken ct) /// private async Task Initialize(CancellationToken cancellationToken) { + CheckRuntimeContext(); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(this.reminderOptions.InitializationTimeout); @@ -133,29 +147,32 @@ private async Task Initialize(CancellationToken cancellationToken) public async override Task Stop() { - await base.Stop(); + await this.QueueTask(() => StopCoreAsync()); - listRefreshTimer.Dispose(); - if (this.runTask is { } task) + async Task StopCoreAsync() { - await task; - } + CheckRuntimeContext(); - var disposeTasks = new List(localReminders.Count); - foreach (LocalReminderData r in localReminders.Values) - { - disposeTasks.Add(r.StopAsync(ReminderEvents.LocalReminderStopReason.ServiceStopped)); - } + await StopDeliveringRemindersAsync(); + await base.Stop(); - await Task.WhenAll(disposeTasks); - await reminderTable.StopAsync(); + listRefreshTimer.Dispose(); + if (this.runTask is { } task) + { + await task; + } - // For a graceful shutdown, also handover reminder responsibilities to new owner, and update the ReminderTable - // currently, this is taken care of by periodically reading the reminder table + await reminderTable.StopAsync(); + + // For a graceful shutdown, also handover reminder responsibilities to new owner, and update the ReminderTable + // currently, this is taken care of by periodically reading the reminder table + } } public async Task RegisterOrUpdateReminder(GrainId grainId, string reminderName, TimeSpan dueTime, TimeSpan period) { + CheckRuntimeContext(); + var entry = new ReminderEntry { GrainId = grainId, @@ -192,6 +209,8 @@ public async Task RegisterOrUpdateReminder(GrainId grainId, stri /// public async Task UnregisterReminder(IGrainReminder reminder) { + CheckRuntimeContext(); + var remData = (ReminderData)reminder; LogDebugUnregisterReminder(reminder, localTableSequence); @@ -283,6 +302,8 @@ public async Task> GetReminders(GrainId grainId) /// private Task ReadAndUpdateReminders() { + CheckRuntimeContext(); + if (StoppedCancellationTokenSource.IsCancellationRequested) return Task.CompletedTask; var tasks = new List(); @@ -302,6 +323,8 @@ private Task ReadAndUpdateReminders() private void RemoveOutOfRangeReminders(List removedReminderTasks) { + CheckRuntimeContext(); + var remindersOutOfRange = 0; foreach (var r in localReminders) @@ -324,6 +347,8 @@ private void RemoveOutOfRangeReminders(List removedReminderTasks) public override Task OnRangeChange(IRingRange oldRange, IRingRange newRange, bool increased) { + CheckRuntimeContext(); + _ = base.OnRangeChange(oldRange, newRange, increased); if (Status == GrainServiceStatus.Started) return ReadAndUpdateReminders(); @@ -339,6 +364,8 @@ private async Task RunAsync() { try { + CheckRuntimeContext(); + overrideDelay = null; switch (Status) { @@ -363,18 +390,23 @@ private async Task RunAsync() protected override async Task StartInBackground() { + CheckRuntimeContext(); + await DoInitialReadAndUpdateReminders(); this.runTask = RunAsync(); } private async Task DoInitialReadAndUpdateReminders() { + CheckRuntimeContext(); + try { if (StoppedCancellationTokenSource.IsCancellationRequested) return; initialReadCallCount++; await this.ReadAndUpdateReminders(); + Status = GrainServiceStatus.Started; startedTask.TrySetResult(true); } @@ -396,6 +428,8 @@ private async Task DoInitialReadAndUpdateReminders() private async Task ReadTableAndStartTimers(ISingleRange range, int rangeSerialNumberCopy) { + CheckRuntimeContext(); + LogDebugReadingRows(range); localTableSequence++; long cachedSequence = localTableSequence; @@ -505,28 +539,122 @@ private async Task ReadTableAndStartTimers(ISingleRange range, int rangeSerialNu private void AddOrUpdateLocalReminder(ReminderEntry entry) { + CheckRuntimeContext(); + localTableSequence++; var key = new ReminderIdentity(entry.GrainId, entry.ReminderName); ref var reminderData = ref CollectionsMarshal.GetValueRefOrAddDefault(localReminders, key, out var exists); - if (exists && reminderData is { } existingReminder) + if (exists && reminderData is not null) { - existingReminder.LocalSequenceNumber = localTableSequence; - existingReminder.Update(entry); + reminderData.LocalSequenceNumber = localTableSequence; + reminderData.Update(entry); LogDebugUpdatedReminder(entry); - return; + } + else + { + reminderData = new LocalReminderData(entry, this) + { + LocalSequenceNumber = localTableSequence, + }; + + LogDebugStartedReminder(entry); } - var newReminder = new LocalReminderData(entry, this) + lock (_deliveryLock) { - LocalSequenceNumber = localTableSequence, - }; - newReminder.Start(); - reminderData = newReminder; - LogDebugStartedReminder(entry); + if (!_isDeliveringReminders) + { + return; + } + } + + reminderData.TryStart(); + } + + private void StartDeliveringReminders() + { + CheckRuntimeContext(); + + lock (_deliveryLock) + { + if (_isDeliveringReminders) + { + return; + } + + _isDeliveringReminders = true; + } + + foreach (var reminderData in localReminders.Values) + { + reminderData.TryStart(); + } + } + + private async Task StopDeliveringRemindersAsync() + { + CheckRuntimeContext(); + + Task? deliveryQuiescedTask = null; + lock (_deliveryLock) + { + _isDeliveringReminders = false; + if (_activeReminderDeliveries > 0) + { + _deliveryQuiesced ??= new(TaskCreationOptions.RunContinuationsAsynchronously); + deliveryQuiescedTask = _deliveryQuiesced.Task; + } + } + + // Stop all reminders. + var tasks = new List(localReminders.Count + (deliveryQuiescedTask is null ? 0 : 1)); + if (deliveryQuiescedTask is not null) + { + tasks.Add(deliveryQuiescedTask); + } + + foreach (var reminderData in localReminders.Values) + { + tasks.Add(reminderData.StopAsync(ReminderEvents.LocalReminderStopReason.ServiceStopped)); + } + + await Task.WhenAll(tasks); + } + + private bool TryBeginSingleReminderDelivery() + { + lock (_deliveryLock) + { + if (!_isDeliveringReminders) + { + return false; + } + + ++_activeReminderDeliveries; + return true; + } + } + + private void CompleteSingleReminderDelivery() + { + TaskCompletionSource? quiesced = null; + lock (_deliveryLock) + { + --_activeReminderDeliveries; + if (_activeReminderDeliveries == 0) + { + quiesced = _deliveryQuiesced; + _deliveryQuiesced = null; + } + } + + quiesced?.SetResult(); } private Task DoResponsibilitySanityCheck(GrainId grainId, string debugInfo) { + CheckRuntimeContext(); + switch (Status) { case GrainServiceStatus.Booting: @@ -568,6 +696,8 @@ async Task WaitForInitCompletion() void CheckRange() { + CheckRuntimeContext(); + if (!RingRange.InRange(grainId)) { LogWarningNotResponsible(debugInfo, grainId, RingRange); @@ -698,15 +828,15 @@ public bool IsRunning } } - public void Start() + public bool TryStart() { GrainId grainId; string reminderName; lock (_lock) { - if (_runTask is not null) + if (_runTask is not null || _stopReason is not (int)ReminderEvents.LocalReminderStopReason.Unknown) { - throw new InvalidOperationException($"{nameof(Start)} may only be called once per instance and has already been called on this instance."); + return false; } grainId = _entry.GrainId; @@ -716,6 +846,7 @@ public void Start() } ReminderEvents.EmitLocalReminderStarted(grainId, reminderName, this, _shared.Silo); + return true; } public void Update(ReminderEntry entry) @@ -778,7 +909,7 @@ private async Task RunAsync(GrainId grainId, string reminderName) while (await WaitForNextTick()) { var entry = PrepareTick(); - if (entry is null) + if (entry is null || !_shared.TryBeginSingleReminderDelivery()) { continue; } @@ -824,6 +955,10 @@ private async Task RunAsync(GrainId grainId, string reminderName) { LogWarningFiringReminder(_shared.logger, entry.ReminderName, entry.GrainId, exception); } + finally + { + _shared.CompleteSingleReminderDelivery(); + } } } finally diff --git a/test/Grains/TestInternalGrains/ReminderTestGrain2.cs b/test/Grains/TestInternalGrains/ReminderTestGrain2.cs index 53db189398b..89211772b18 100644 --- a/test/Grains/TestInternalGrains/ReminderTestGrain2.cs +++ b/test/Grains/TestInternalGrains/ReminderTestGrain2.cs @@ -81,7 +81,7 @@ public async Task StartReminder(string reminderName, TimeSpan? p return r; } - public Task ReceiveReminder(string reminderName, TickStatus status) + public async Task ReceiveReminder(string reminderName, TickStatus status) { // it can happen that due to failure, when a new activation is created, // it doesn't know which reminders were registered against the grain @@ -92,6 +92,15 @@ public Task ReceiveReminder(string reminderName, TickStatus status) this.sequence.Add(reminderName, 0); // we'll get upto date to the latest sequence number while processing this tick } + if (!this.allReminders.ContainsKey(reminderName)) + { + await GetMissingReminders(); + if (!this.allReminders.ContainsKey(reminderName)) + { + throw new OrleansException($"Could not find reminder {reminderName} in grain {this.IdentityString}"); + } + } + allReminders[reminderName].Fired.Add(status.CurrentTickTime); // calculating tick sequence number @@ -111,7 +120,7 @@ public Task ReceiveReminder(string reminderName, TickStatus status) if (sequenceNumber < this.sequence[reminderName]) { this.logger.LogInformation("ReceiveReminder: {Reminder} Incorrect tick {ExpectedSequenceNumber} vs. {SequenceNumber} with status {Status}.", reminderName, this.sequence[reminderName], sequenceNumber, status); - return Task.CompletedTask; + return; } this.sequence[reminderName] = sequenceNumber; this.logger.LogInformation("ReceiveReminder: {ReminderNAme} Sequence # {SequenceNumber} with status {Status}.", reminderName, this.sequence[reminderName], status); @@ -120,7 +129,7 @@ public Task ReceiveReminder(string reminderName, TickStatus status) string counterValue = this.sequence[reminderName].ToString(CultureInfo.InvariantCulture); File.WriteAllText(fileName, counterValue); - return Task.CompletedTask; + return; } public async Task StopReminder(string reminderName) @@ -295,7 +304,7 @@ public async Task StartReminder(string reminderName, TimeSpan? p return r; } - public Task ReceiveReminder(string reminderName, TickStatus status) + public async Task ReceiveReminder(string reminderName, TickStatus status) { // it can happen that due to failure, when a new activation is created, // it doesn't know which reminders were registered against the grain @@ -308,6 +317,15 @@ public Task ReceiveReminder(string reminderName, TickStatus status) this.sequence.Add(reminderName, 0); // we'll get upto date to the latest sequence number while processing this tick } + if (!this.allReminders.ContainsKey(reminderName)) + { + await GetMissingReminders(); + if (!this.allReminders.ContainsKey(reminderName)) + { + throw new OrleansException($"Could not find reminder {reminderName} in grain {this.IdentityString}"); + } + } + // calculating tick sequence number // we do all arithmetics on DateTime by converting into long because we dont have divide operation on DateTime @@ -325,7 +343,7 @@ public Task ReceiveReminder(string reminderName, TickStatus status) if (sequenceNumber < this.sequence[reminderName]) { this.logger.LogInformation("{ReminderName} Incorrect tick {ExpectedSequenceNumber} vs. {SequenceNumber} with status {Status}.", reminderName, this.sequence[reminderName], sequenceNumber, status); - return Task.CompletedTask; + return; } this.sequence[reminderName] = sequenceNumber; @@ -333,7 +351,7 @@ public Task ReceiveReminder(string reminderName, TickStatus status) File.WriteAllText(GetFileName(reminderName), this.sequence[reminderName].ToString()); - return Task.CompletedTask; + return; } public async Task StopReminder(string reminderName) From c8483faff7bf0eef35b1d67c9367bf8eb0442b4b Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Fri, 29 May 2026 15:05:15 -0700 Subject: [PATCH 2/4] fix(reminders): combine active lifecycle hooks Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Hosting/SiloBuilderReminderExtensions.cs | 2 + .../ReminderService/LocalReminderService.cs | 176 +++++++----------- 2 files changed, 70 insertions(+), 108 deletions(-) diff --git a/src/Orleans.Reminders/Hosting/SiloBuilderReminderExtensions.cs b/src/Orleans.Reminders/Hosting/SiloBuilderReminderExtensions.cs index 4e7e2aa030c..f71c65c5a98 100644 --- a/src/Orleans.Reminders/Hosting/SiloBuilderReminderExtensions.cs +++ b/src/Orleans.Reminders/Hosting/SiloBuilderReminderExtensions.cs @@ -3,6 +3,7 @@ using Orleans.Configuration.Internal; using System.Linq; using Orleans.Runtime.ReminderService; +using Orleans.Services; using Orleans.Timers; namespace Orleans.Hosting; @@ -28,6 +29,7 @@ public static void AddReminders(this IServiceCollection services) } services.AddSingleton(); + services.AddFromExisting(); services.AddFromExisting(); services.AddFromExisting, LocalReminderService>(); services.AddSingleton(); diff --git a/src/Orleans.Reminders/ReminderService/LocalReminderService.cs b/src/Orleans.Reminders/ReminderService/LocalReminderService.cs index d816bdc1a9f..66231cd7263 100644 --- a/src/Orleans.Reminders/ReminderService/LocalReminderService.cs +++ b/src/Orleans.Reminders/ReminderService/LocalReminderService.cs @@ -68,20 +68,15 @@ void ILifecycleParticipant.Participate(ISiloLifecycle observer) { observer.Subscribe( nameof(LocalReminderService), - ServiceLifecycleStage.BecomeActive, - InitializeReminderService, - StopReminderService); - observer.Subscribe( - nameof(LocalReminderService), - ServiceLifecycleStage.Active, - StartReminderService, - StopDeliveringReminders); + ServiceLifecycleStage.RuntimeGrainServices, + StartReminderTable, + StopReminderTable); - async Task InitializeReminderService(CancellationToken ct) + async Task StartReminderTable(CancellationToken ct) { try { - await this.QueueTask(() => Initialize(ct)); + await this.QueueTask(() => StartReminderTableCoreAsync(ct)); } catch (Exception exception) { @@ -90,51 +85,25 @@ async Task InitializeReminderService(CancellationToken ct) } } - async Task StopReminderService(CancellationToken ct) - { - try - { - await this.QueueTask(Stop).WaitAsync(ct); - } - catch (Exception exception) - { - LogErrorStoppingReminderService(exception); - throw; - } - } - - async Task StartReminderService(CancellationToken ct) + async Task StopReminderTable(CancellationToken ct) { - using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); - cts.CancelAfter(this.reminderOptions.InitializationTimeout); - try { - await this.QueueTask(async () => - { - StartDeliveringReminders(); - await Start(); - }).WaitAsync(cts.Token); + await this.QueueTask(() => reminderTable.StopAsync()).WaitAsync(ct); } catch (Exception exception) { - await StopDeliveringReminders(cts.Token); - LogErrorStartingReminderService(exception); + LogErrorActivatingReminderService(exception); throw; } } - - Task StopDeliveringReminders(CancellationToken ct) - { - return this.QueueTask(() => StopDeliveringRemindersAsync()).WaitAsync(ct); - } } /// /// Initializes the reminder table. /// /// - private async Task Initialize(CancellationToken cancellationToken) + async Task StartReminderTableCoreAsync(CancellationToken cancellationToken) { CheckRuntimeContext(); @@ -145,28 +114,75 @@ private async Task Initialize(CancellationToken cancellationToken) await reminderTable.StartAsync(cts.Token); } - public async override Task Stop() + public override async Task Start() { - await this.QueueTask(() => StopCoreAsync()); + CheckRuntimeContext(); - async Task StopCoreAsync() + try { - CheckRuntimeContext(); + lock (_deliveryLock) + { + if (_isDeliveringReminders) + { + return; + } - await StopDeliveringRemindersAsync(); - await base.Stop(); + _isDeliveringReminders = true; + } - listRefreshTimer.Dispose(); - if (this.runTask is { } task) + foreach (var reminderData in localReminders.Values) { - await task; + reminderData.TryStart(); } - await reminderTable.StopAsync(); + await base.Start(); + } + catch (Exception exception) + { + await Stop(); + LogErrorStartingReminderService(exception); + throw; + } + } - // For a graceful shutdown, also handover reminder responsibilities to new owner, and update the ReminderTable - // currently, this is taken care of by periodically reading the reminder table + public override async Task Stop() + { + CheckRuntimeContext(); + + Task? deliveryQuiescedTask = null; + lock (_deliveryLock) + { + _isDeliveringReminders = false; + if (_activeReminderDeliveries > 0) + { + _deliveryQuiesced ??= new(TaskCreationOptions.RunContinuationsAsynchronously); + deliveryQuiescedTask = _deliveryQuiesced.Task; + } } + + // Stop all reminders. + var tasks = new List(localReminders.Count + (deliveryQuiescedTask is null ? 0 : 1)); + if (deliveryQuiescedTask is not null) + { + tasks.Add(deliveryQuiescedTask); + } + + foreach (var reminderData in localReminders.Values) + { + tasks.Add(reminderData.StopAsync(ReminderEvents.LocalReminderStopReason.ServiceStopped)); + } + + await Task.WhenAll(tasks); + await base.Stop(); + + listRefreshTimer.Dispose(); + if (this.runTask is { } task) + { + await task; + } + + // For a graceful shutdown, also handover reminder responsibilities to new owner, and update the ReminderTable + // currently, this is taken care of by periodically reading the reminder table } public async Task RegisterOrUpdateReminder(GrainId grainId, string reminderName, TimeSpan dueTime, TimeSpan period) @@ -571,56 +587,6 @@ private void AddOrUpdateLocalReminder(ReminderEntry entry) reminderData.TryStart(); } - private void StartDeliveringReminders() - { - CheckRuntimeContext(); - - lock (_deliveryLock) - { - if (_isDeliveringReminders) - { - return; - } - - _isDeliveringReminders = true; - } - - foreach (var reminderData in localReminders.Values) - { - reminderData.TryStart(); - } - } - - private async Task StopDeliveringRemindersAsync() - { - CheckRuntimeContext(); - - Task? deliveryQuiescedTask = null; - lock (_deliveryLock) - { - _isDeliveringReminders = false; - if (_activeReminderDeliveries > 0) - { - _deliveryQuiesced ??= new(TaskCreationOptions.RunContinuationsAsynchronously); - deliveryQuiescedTask = _deliveryQuiesced.Task; - } - } - - // Stop all reminders. - var tasks = new List(localReminders.Count + (deliveryQuiescedTask is null ? 0 : 1)); - if (deliveryQuiescedTask is not null) - { - tasks.Add(deliveryQuiescedTask); - } - - foreach (var reminderData in localReminders.Values) - { - tasks.Add(reminderData.StopAsync(ReminderEvents.LocalReminderStopReason.ServiceStopped)); - } - - await Task.WhenAll(tasks); - } - private bool TryBeginSingleReminderDelivery() { lock (_deliveryLock) @@ -1115,12 +1081,6 @@ private readonly struct ReminderIdentity(GrainId grainId, string reminderName) : )] private partial void LogErrorActivatingReminderService(Exception exception); - [LoggerMessage( - Level = LogLevel.Error, - Message = "Error stopping reminder service." - )] - private partial void LogErrorStoppingReminderService(Exception exception); - [LoggerMessage( Level = LogLevel.Error, Message = "Error starting reminder service." From 40fd9d263ac3ba084b25aea66afce3fa5770996e Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Fri, 29 May 2026 15:06:57 -0700 Subject: [PATCH 3/4] fix(runtime): avoid inactive silo activations Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../ReminderService/LocalReminderService.cs | 32 ++++---- .../GrainDirectory/CachedGrainLocator.cs | 38 ++++++++- .../GrainDirectory/LocalGrainDirectory.cs | 81 +++++++++++++++++-- .../Directory/CachedGrainLocatorTests.cs | 75 ++++++++++++++++- .../LocalGrainDirectoryTests.cs | 36 +++++++++ 5 files changed, 235 insertions(+), 27 deletions(-) diff --git a/src/Orleans.Reminders/ReminderService/LocalReminderService.cs b/src/Orleans.Reminders/ReminderService/LocalReminderService.cs index 66231cd7263..871fbb08b07 100644 --- a/src/Orleans.Reminders/ReminderService/LocalReminderService.cs +++ b/src/Orleans.Reminders/ReminderService/LocalReminderService.cs @@ -83,6 +83,17 @@ async Task StartReminderTable(CancellationToken ct) LogErrorActivatingReminderService(exception); throw; } + + async Task StartReminderTableCoreAsync(CancellationToken cancellationToken) + { + CheckRuntimeContext(); + + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(this.reminderOptions.InitializationTimeout); + + // Confirm that it can access the underlying store, as after this the ReminderService will load in the background, without the opportunity to prevent the Silo from starting + await reminderTable.StartAsync(cts.Token); + } } async Task StopReminderTable(CancellationToken ct) @@ -99,21 +110,6 @@ async Task StopReminderTable(CancellationToken ct) } } - /// - /// Initializes the reminder table. - /// - /// - async Task StartReminderTableCoreAsync(CancellationToken cancellationToken) - { - CheckRuntimeContext(); - - using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cts.CancelAfter(this.reminderOptions.InitializationTimeout); - - // Confirm that it can access the underlying store, as after this the ReminderService will load in the background, without the opportunity to prevent the Silo from starting - await reminderTable.StartAsync(cts.Token); - } - public override async Task Start() { CheckRuntimeContext(); @@ -1081,6 +1077,12 @@ private readonly struct ReminderIdentity(GrainId grainId, string reminderName) : )] private partial void LogErrorActivatingReminderService(Exception exception); + [LoggerMessage( + Level = LogLevel.Error, + Message = "Error stopping reminder service." + )] + private partial void LogErrorStoppingReminderService(Exception exception); + [LoggerMessage( Level = LogLevel.Error, Message = "Error starting reminder service." diff --git a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs index 3f03e60ee3b..f43a5ddfb78 100644 --- a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs +++ b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs @@ -72,6 +72,10 @@ public async ValueTask Lookup(GrainId grainId) await GetGrainDirectory(grainId.Type).Unregister(entry); entry = null; } + else if (IsKnownNonActiveSilo(entry)) + { + entry = null; + } else { // Add to the local cache and return it @@ -112,9 +116,21 @@ public async Task Register(GrainAddress address, GrainAddress prev await GetGrainDirectory(grainType).Unregister(result); result = await GetGrainDirectory(grainType).Register(address, previousAddress); } + else if (IsKnownNonActiveSilo(result)) + { + result = await GetGrainDirectory(grainType).Register(address, result); + } // Cache update - this.cache.AddOrUpdate(result, (int)result.MembershipVersion.Value); + if (result is not null && IsKnownNonActiveSilo(result)) + { + return null; + } + + if (result is not null) + { + this.cache.AddOrUpdate(result, (int)result.MembershipVersion.Value); + } return result; @@ -200,11 +216,27 @@ private bool IsKnownDeadSilo(SiloAddress siloAddress, MembershipVersion membersh return siloAddress is null || current.GetSiloStatus(siloAddress, membershipVersion) == SiloStatus.Dead; } + private bool IsKnownNonActiveSilo(GrainAddress grainAddress) + => IsKnownNonActiveSilo(grainAddress.SiloAddress, grainAddress.MembershipVersion); + + private bool IsKnownNonActiveSilo(SiloAddress siloAddress, MembershipVersion membershipVersion) + { + var current = this.clusterMembershipService.CurrentSnapshot; + var status = current.GetSiloStatus(siloAddress, membershipVersion); + return siloAddress is null || (status != SiloStatus.None && status != SiloStatus.Active); + } + private static void ThrowUnsupportedGrainType(GrainId grainId) => throw new InvalidOperationException($"Unsupported grain type for grain {grainId}"); public void UpdateCache(GrainId grainId, SiloAddress siloAddress) { var membershipVersion = this.clusterMembershipService.CurrentSnapshot.Version; + if (IsKnownNonActiveSilo(siloAddress, membershipVersion)) + { + cache.Remove(grainId); + return; + } + cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress, MembershipVersion = membershipVersion }, (int)membershipVersion.Value); } public void InvalidateCache(GrainId grainId) => cache.Remove(grainId); @@ -220,8 +252,8 @@ public bool TryLookupInCache(GrainId grainId, out GrainAddress address) DirectoryInstruments.LookupsCacheIssued.Add(1); if (this.cache.LookUp(grainId, out address, out _)) { - // If the silo is dead, remove the entry - if (IsKnownDeadSilo(address)) + // If the silo cannot currently host activations, remove the entry. + if (IsKnownNonActiveSilo(address)) { address = default; this.cache.Remove(grainId); diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 6c7cf397aa2..5830d27a9b8 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -435,7 +435,7 @@ private void AdjustLocalCache(ClusterMembershipSnapshot snapshot, DirectoryMembe continue; } - if (IsDefunctActivation(activationAddress, snapshot)) + if (IsNonRoutableActivation(activationAddress, snapshot)) { DirectoryCache.Remove(activationAddress.GrainId); } @@ -452,6 +452,17 @@ internal static bool IsDefunctActivation(GrainAddress address, ClusterMembership return snapshot.GetSiloStatus(silo, address.MembershipVersion) == SiloStatus.Dead; } + internal static bool IsNonRoutableActivation(GrainAddress address, ClusterMembershipSnapshot snapshot) + { + if (address.SiloAddress is not { } silo) + { + return true; + } + + var status = snapshot.GetSiloStatus(silo, address.MembershipVersion); + return status != SiloStatus.None && status != SiloStatus.Active; + } + internal SiloAddress? FindPredecessor(SiloAddress silo) { var existing = directoryMembership.MembershipRingList; @@ -623,9 +634,22 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres DirectoryInstruments.RegistrationsSingleActLocal.Add(1); var result = DirectoryPartition.AddSingleActivation(address, previousAddress); + if (!address.Equals(result.Address) && result.Address is not null && IsNonRoutableActivation(result.Address, clusterMembershipService.CurrentSnapshot)) + { + result = DirectoryPartition.AddSingleActivation(address, result.Address); + } // update the cache so next local lookup will find this ActivationAddress in the cache and we will save full lookup. - DirectoryCache.AddOrUpdate(result.Address, result.VersionTag); + if (result.Address is not null && IsNonRoutableActivation(result.Address, clusterMembershipService.CurrentSnapshot)) + { + return new(default, result.VersionTag); + } + + if (result.Address is not null) + { + DirectoryCache.AddOrUpdate(result.Address, result.VersionTag); + } + return result; } else @@ -640,7 +664,12 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres // this way next local lookup will find this ActivationAddress in the cache and we will save a full lookup! if (result.Address == null) return result; - if (!address.Equals(result.Address) || IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot)) return result; + if (IsNonRoutableActivation(result.Address, clusterMembershipService.CurrentSnapshot)) + { + return new(default, result.VersionTag); + } + + if (!address.Equals(result.Address)) return result; // update the cache so next local lookup will find this ActivationAddress in the cache and we will save full lookup. DirectoryCache.AddOrUpdate(result.Address, result.VersionTag); @@ -814,6 +843,12 @@ public bool TryLocalLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress return false; } + if (IsNonRoutableActivation(localResult.Address, clusterMembershipService.CurrentSnapshot)) + { + address = null; + return false; + } + address = localResult.Address; DirectoryInstruments.LookupsLocalDirectorySuccesses.Add(1); DirectoryInstruments.LookupsLocalSuccesses.Add(1); @@ -827,7 +862,14 @@ public bool TryLocalLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress return null; } - return IsDefunctActivation(cache, clusterMembershipService.CurrentSnapshot) ? null : cache; + var snapshot = clusterMembershipService.CurrentSnapshot; + if (IsNonRoutableActivation(cache, snapshot)) + { + DirectoryCache.Remove(grain); + return null; + } + + return cache; } public async Task LookupAsync(GrainId grainId, int hopCount = 0) @@ -874,6 +916,11 @@ public async Task LookupAsync(GrainId grainId, int hopCount = 0) return new(default, GrainInfo.NO_ETAG); } + if (IsNonRoutableActivation(localResult.Address, clusterMembershipService.CurrentSnapshot)) + { + return new(default, GrainInfo.NO_ETAG); + } + LogTraceFullLookupMine(grainId, localResult.Address); DirectoryInstruments.LookupsLocalDirectorySuccesses.Add(1); return localResult; @@ -890,8 +937,13 @@ public async Task LookupAsync(GrainId grainId, int hopCount = 0) var result = await GetDirectoryReference(forwardAddress).LookupAsync(grainId, hopCount + 1); // update the cache - if (result.Address is { } address && !IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot)) + if (result.Address is { } address) { + if (IsNonRoutableActivation(address, clusterMembershipService.CurrentSnapshot)) + { + return new(default, result.VersionTag); + } + DirectoryCache.AddOrUpdate(address, result.VersionTag); } @@ -980,7 +1032,24 @@ private static int CompareSiloAddress(SiloAddress left, SiloAddress right) return hashComparison != 0 ? hashComparison : left.CompareTo(right); } - public void AddOrUpdateCacheEntry(GrainId grainId, SiloAddress siloAddress) => this.DirectoryCache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0); + public void AddOrUpdateCacheEntry(GrainId grainId, SiloAddress siloAddress) + { + var snapshot = clusterMembershipService.CurrentSnapshot; + var address = new GrainAddress + { + GrainId = grainId, + SiloAddress = siloAddress, + MembershipVersion = snapshot.Version + }; + + if (IsNonRoutableActivation(address, snapshot)) + { + DirectoryCache.Remove(grainId); + return; + } + + DirectoryCache.AddOrUpdate(address, (int)snapshot.Version.Value); + } public bool TryCachedLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress? address) { DirectoryInstruments.LookupsCacheIssued.Add(1); diff --git a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs index 8c48f102379..4beca9fb770 100644 --- a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs +++ b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs @@ -446,6 +446,47 @@ public async Task RegisterWhenOtherEntryExistsButSiloIsDead() await this.lifecycle.OnStop(); } + [Theory] + [InlineData(SiloStatus.Created)] + [InlineData(SiloStatus.Joining)] + [InlineData(SiloStatus.ShuttingDown)] + [InlineData(SiloStatus.Stopping)] + public async Task RegisterWhenOtherEntryExistsButSiloIsNonActiveButNotDead(SiloStatus status) + { + var expectedSilo = GenerateSiloAddress(); + var outdatedSilo = GenerateSiloAddress(); + + this.mockMembershipService.UpdateSiloStatus(expectedSilo, SiloStatus.Active, "exp"); + this.mockMembershipService.UpdateSiloStatus(outdatedSilo, status, "old"); + await this.lifecycle.OnStart(); + await WaitUntilClusterChangePropagated(); + + var expectedAddr = GenerateGrainAddress(expectedSilo); + var outdatedAddr = new GrainAddress + { + ActivationId = ActivationId.NewId(), + SiloAddress = outdatedSilo, + GrainId = expectedAddr.GrainId, + MembershipVersion = expectedAddr.MembershipVersion + }; + + this.grainDirectory.Register(expectedAddr, previousAddress: null).Returns(outdatedAddr); + this.grainDirectory.Register(expectedAddr, previousAddress: outdatedAddr).Returns(expectedAddr); + + var actual = await this.grainLocator.Register(expectedAddr, previousAddress: null); + + Assert.Equal(expectedAddr, actual); + await this.grainDirectory.Received(1).Register(expectedAddr, previousAddress: null); + await this.grainDirectory.Received(1).Register(expectedAddr, previousAddress: outdatedAddr); + await this.grainDirectory.DidNotReceive().Unregister(outdatedAddr); + + Assert.True(this.grainLocator.TryLookupInCache(expectedAddr.GrainId, out var result)); + Assert.NotNull(result); + Assert.Equal(expectedAddr, result); + + await this.lifecycle.OnStop(); + } + [Fact] public async Task LookupPopulateTheCache() { @@ -521,9 +562,11 @@ public async Task LocalLookupWhenEntryExistsButSiloIsDead() } [Theory] + [InlineData(SiloStatus.Created)] + [InlineData(SiloStatus.Joining)] [InlineData(SiloStatus.ShuttingDown)] [InlineData(SiloStatus.Stopping)] - public async Task LocalLookupWhenCachedEntrySiloIsTerminatingButNotDead(SiloStatus status) + public async Task LocalLookupWhenCachedEntrySiloIsNonActiveButNotDead(SiloStatus status) { var silo = GenerateSiloAddress(); @@ -542,13 +585,39 @@ public async Task LocalLookupWhenCachedEntrySiloIsTerminatingButNotDead(SiloStat this.mockMembershipService.UpdateSiloStatus(silo, status, "silo"); await WaitUntilClusterChangePropagated(); - Assert.True(this.grainLocator.TryLookupInCache(address.GrainId, out cached)); - Assert.Equal(address, cached); + Assert.False(this.grainLocator.TryLookupInCache(address.GrainId, out cached)); await this.grainDirectory.DidNotReceive().UnregisterSilos(Arg.Any>()); await this.lifecycle.OnStop(); } + [Theory] + [InlineData(SiloStatus.Created)] + [InlineData(SiloStatus.Joining)] + [InlineData(SiloStatus.ShuttingDown)] + [InlineData(SiloStatus.Stopping)] + public async Task LookupWhenEntryExistsButSiloIsNonActiveButNotDead(SiloStatus status) + { + var silo = GenerateSiloAddress(); + + this.mockMembershipService.UpdateSiloStatus(silo, status, "silo"); + await this.lifecycle.OnStart(); + await WaitUntilClusterChangePropagated(); + + var address = GenerateGrainAddress(silo); + + this.grainDirectory.Lookup(address.GrainId).Returns(address); + + var actual = await this.grainLocator.Lookup(address.GrainId); + Assert.Null(actual); + + await this.grainDirectory.Received(1).Lookup(address.GrainId); + await this.grainDirectory.DidNotReceive().Unregister(address); + Assert.False(this.grainLocator.TryLookupInCache(address.GrainId, out _)); + + await this.lifecycle.OnStop(); + } + /// /// Tests that the locator properly cleans up cached entries when a silo dies. /// This is critical for preventing requests from being sent to dead silos. diff --git a/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs index 3b820aec7d7..17d829a1945 100644 --- a/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs +++ b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs @@ -65,6 +65,42 @@ public void IsDefunctActivation_RemovesSiloReplacedBySuccessor() Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); } + [Theory] + [InlineData(SiloStatus.Created)] + [InlineData(SiloStatus.Joining)] + [InlineData(SiloStatus.ShuttingDown)] + [InlineData(SiloStatus.Stopping)] + [InlineData(SiloStatus.Dead)] + public void IsNonRoutableActivation_RejectsKnownNonActiveSilos(SiloStatus status) + { + var silo = CreateSiloAddress(1); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(silo, status, "silo"), version: 2); + + Assert.True(LocalGrainDirectory.IsNonRoutableActivation(address, snapshot)); + } + + [Fact] + public void IsNonRoutableActivation_AllowsActiveSilos() + { + var silo = CreateSiloAddress(1); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(silo, SiloStatus.Active, "silo"), version: 2); + + Assert.False(LocalGrainDirectory.IsNonRoutableActivation(address, snapshot)); + } + + [Fact] + public void IsNonRoutableActivation_AllowsUnknownSiloWithoutNewerMembershipVersion() + { + var silo = CreateSiloAddress(1); + var unrelatedSilo = CreateSiloAddress(1, port: 11112); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); + + Assert.False(LocalGrainDirectory.IsNonRoutableActivation(address, snapshot)); + } + private static ClusterMembershipSnapshot CreateSnapshot(ClusterMember member, long version) => new(ImmutableDictionary.Empty.Add(member.SiloAddress, member), new MembershipVersion(version)); From e925f7e4a77dd77414eb75f18a5bee13c40cf65a Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Fri, 29 May 2026 15:24:01 -0700 Subject: [PATCH 4/4] chore(pr): remove grain directory changes Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/CachedGrainLocator.cs | 38 +-------- .../GrainDirectory/LocalGrainDirectory.cs | 81 ++----------------- .../Directory/CachedGrainLocatorTests.cs | 75 +---------------- .../LocalGrainDirectoryTests.cs | 36 --------- 4 files changed, 12 insertions(+), 218 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs index f43a5ddfb78..3f03e60ee3b 100644 --- a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs +++ b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs @@ -72,10 +72,6 @@ public async ValueTask Lookup(GrainId grainId) await GetGrainDirectory(grainId.Type).Unregister(entry); entry = null; } - else if (IsKnownNonActiveSilo(entry)) - { - entry = null; - } else { // Add to the local cache and return it @@ -116,21 +112,9 @@ public async Task Register(GrainAddress address, GrainAddress prev await GetGrainDirectory(grainType).Unregister(result); result = await GetGrainDirectory(grainType).Register(address, previousAddress); } - else if (IsKnownNonActiveSilo(result)) - { - result = await GetGrainDirectory(grainType).Register(address, result); - } // Cache update - if (result is not null && IsKnownNonActiveSilo(result)) - { - return null; - } - - if (result is not null) - { - this.cache.AddOrUpdate(result, (int)result.MembershipVersion.Value); - } + this.cache.AddOrUpdate(result, (int)result.MembershipVersion.Value); return result; @@ -216,27 +200,11 @@ private bool IsKnownDeadSilo(SiloAddress siloAddress, MembershipVersion membersh return siloAddress is null || current.GetSiloStatus(siloAddress, membershipVersion) == SiloStatus.Dead; } - private bool IsKnownNonActiveSilo(GrainAddress grainAddress) - => IsKnownNonActiveSilo(grainAddress.SiloAddress, grainAddress.MembershipVersion); - - private bool IsKnownNonActiveSilo(SiloAddress siloAddress, MembershipVersion membershipVersion) - { - var current = this.clusterMembershipService.CurrentSnapshot; - var status = current.GetSiloStatus(siloAddress, membershipVersion); - return siloAddress is null || (status != SiloStatus.None && status != SiloStatus.Active); - } - private static void ThrowUnsupportedGrainType(GrainId grainId) => throw new InvalidOperationException($"Unsupported grain type for grain {grainId}"); public void UpdateCache(GrainId grainId, SiloAddress siloAddress) { var membershipVersion = this.clusterMembershipService.CurrentSnapshot.Version; - if (IsKnownNonActiveSilo(siloAddress, membershipVersion)) - { - cache.Remove(grainId); - return; - } - cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress, MembershipVersion = membershipVersion }, (int)membershipVersion.Value); } public void InvalidateCache(GrainId grainId) => cache.Remove(grainId); @@ -252,8 +220,8 @@ public bool TryLookupInCache(GrainId grainId, out GrainAddress address) DirectoryInstruments.LookupsCacheIssued.Add(1); if (this.cache.LookUp(grainId, out address, out _)) { - // If the silo cannot currently host activations, remove the entry. - if (IsKnownNonActiveSilo(address)) + // If the silo is dead, remove the entry + if (IsKnownDeadSilo(address)) { address = default; this.cache.Remove(grainId); diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 5830d27a9b8..6c7cf397aa2 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -435,7 +435,7 @@ private void AdjustLocalCache(ClusterMembershipSnapshot snapshot, DirectoryMembe continue; } - if (IsNonRoutableActivation(activationAddress, snapshot)) + if (IsDefunctActivation(activationAddress, snapshot)) { DirectoryCache.Remove(activationAddress.GrainId); } @@ -452,17 +452,6 @@ internal static bool IsDefunctActivation(GrainAddress address, ClusterMembership return snapshot.GetSiloStatus(silo, address.MembershipVersion) == SiloStatus.Dead; } - internal static bool IsNonRoutableActivation(GrainAddress address, ClusterMembershipSnapshot snapshot) - { - if (address.SiloAddress is not { } silo) - { - return true; - } - - var status = snapshot.GetSiloStatus(silo, address.MembershipVersion); - return status != SiloStatus.None && status != SiloStatus.Active; - } - internal SiloAddress? FindPredecessor(SiloAddress silo) { var existing = directoryMembership.MembershipRingList; @@ -634,22 +623,9 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres DirectoryInstruments.RegistrationsSingleActLocal.Add(1); var result = DirectoryPartition.AddSingleActivation(address, previousAddress); - if (!address.Equals(result.Address) && result.Address is not null && IsNonRoutableActivation(result.Address, clusterMembershipService.CurrentSnapshot)) - { - result = DirectoryPartition.AddSingleActivation(address, result.Address); - } // update the cache so next local lookup will find this ActivationAddress in the cache and we will save full lookup. - if (result.Address is not null && IsNonRoutableActivation(result.Address, clusterMembershipService.CurrentSnapshot)) - { - return new(default, result.VersionTag); - } - - if (result.Address is not null) - { - DirectoryCache.AddOrUpdate(result.Address, result.VersionTag); - } - + DirectoryCache.AddOrUpdate(result.Address, result.VersionTag); return result; } else @@ -664,12 +640,7 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres // this way next local lookup will find this ActivationAddress in the cache and we will save a full lookup! if (result.Address == null) return result; - if (IsNonRoutableActivation(result.Address, clusterMembershipService.CurrentSnapshot)) - { - return new(default, result.VersionTag); - } - - if (!address.Equals(result.Address)) return result; + if (!address.Equals(result.Address) || IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot)) return result; // update the cache so next local lookup will find this ActivationAddress in the cache and we will save full lookup. DirectoryCache.AddOrUpdate(result.Address, result.VersionTag); @@ -843,12 +814,6 @@ public bool TryLocalLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress return false; } - if (IsNonRoutableActivation(localResult.Address, clusterMembershipService.CurrentSnapshot)) - { - address = null; - return false; - } - address = localResult.Address; DirectoryInstruments.LookupsLocalDirectorySuccesses.Add(1); DirectoryInstruments.LookupsLocalSuccesses.Add(1); @@ -862,14 +827,7 @@ public bool TryLocalLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress return null; } - var snapshot = clusterMembershipService.CurrentSnapshot; - if (IsNonRoutableActivation(cache, snapshot)) - { - DirectoryCache.Remove(grain); - return null; - } - - return cache; + return IsDefunctActivation(cache, clusterMembershipService.CurrentSnapshot) ? null : cache; } public async Task LookupAsync(GrainId grainId, int hopCount = 0) @@ -916,11 +874,6 @@ public async Task LookupAsync(GrainId grainId, int hopCount = 0) return new(default, GrainInfo.NO_ETAG); } - if (IsNonRoutableActivation(localResult.Address, clusterMembershipService.CurrentSnapshot)) - { - return new(default, GrainInfo.NO_ETAG); - } - LogTraceFullLookupMine(grainId, localResult.Address); DirectoryInstruments.LookupsLocalDirectorySuccesses.Add(1); return localResult; @@ -937,13 +890,8 @@ public async Task LookupAsync(GrainId grainId, int hopCount = 0) var result = await GetDirectoryReference(forwardAddress).LookupAsync(grainId, hopCount + 1); // update the cache - if (result.Address is { } address) + if (result.Address is { } address && !IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot)) { - if (IsNonRoutableActivation(address, clusterMembershipService.CurrentSnapshot)) - { - return new(default, result.VersionTag); - } - DirectoryCache.AddOrUpdate(address, result.VersionTag); } @@ -1032,24 +980,7 @@ private static int CompareSiloAddress(SiloAddress left, SiloAddress right) return hashComparison != 0 ? hashComparison : left.CompareTo(right); } - public void AddOrUpdateCacheEntry(GrainId grainId, SiloAddress siloAddress) - { - var snapshot = clusterMembershipService.CurrentSnapshot; - var address = new GrainAddress - { - GrainId = grainId, - SiloAddress = siloAddress, - MembershipVersion = snapshot.Version - }; - - if (IsNonRoutableActivation(address, snapshot)) - { - DirectoryCache.Remove(grainId); - return; - } - - DirectoryCache.AddOrUpdate(address, (int)snapshot.Version.Value); - } + public void AddOrUpdateCacheEntry(GrainId grainId, SiloAddress siloAddress) => this.DirectoryCache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0); public bool TryCachedLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress? address) { DirectoryInstruments.LookupsCacheIssued.Add(1); diff --git a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs index 4beca9fb770..8c48f102379 100644 --- a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs +++ b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs @@ -446,47 +446,6 @@ public async Task RegisterWhenOtherEntryExistsButSiloIsDead() await this.lifecycle.OnStop(); } - [Theory] - [InlineData(SiloStatus.Created)] - [InlineData(SiloStatus.Joining)] - [InlineData(SiloStatus.ShuttingDown)] - [InlineData(SiloStatus.Stopping)] - public async Task RegisterWhenOtherEntryExistsButSiloIsNonActiveButNotDead(SiloStatus status) - { - var expectedSilo = GenerateSiloAddress(); - var outdatedSilo = GenerateSiloAddress(); - - this.mockMembershipService.UpdateSiloStatus(expectedSilo, SiloStatus.Active, "exp"); - this.mockMembershipService.UpdateSiloStatus(outdatedSilo, status, "old"); - await this.lifecycle.OnStart(); - await WaitUntilClusterChangePropagated(); - - var expectedAddr = GenerateGrainAddress(expectedSilo); - var outdatedAddr = new GrainAddress - { - ActivationId = ActivationId.NewId(), - SiloAddress = outdatedSilo, - GrainId = expectedAddr.GrainId, - MembershipVersion = expectedAddr.MembershipVersion - }; - - this.grainDirectory.Register(expectedAddr, previousAddress: null).Returns(outdatedAddr); - this.grainDirectory.Register(expectedAddr, previousAddress: outdatedAddr).Returns(expectedAddr); - - var actual = await this.grainLocator.Register(expectedAddr, previousAddress: null); - - Assert.Equal(expectedAddr, actual); - await this.grainDirectory.Received(1).Register(expectedAddr, previousAddress: null); - await this.grainDirectory.Received(1).Register(expectedAddr, previousAddress: outdatedAddr); - await this.grainDirectory.DidNotReceive().Unregister(outdatedAddr); - - Assert.True(this.grainLocator.TryLookupInCache(expectedAddr.GrainId, out var result)); - Assert.NotNull(result); - Assert.Equal(expectedAddr, result); - - await this.lifecycle.OnStop(); - } - [Fact] public async Task LookupPopulateTheCache() { @@ -562,11 +521,9 @@ public async Task LocalLookupWhenEntryExistsButSiloIsDead() } [Theory] - [InlineData(SiloStatus.Created)] - [InlineData(SiloStatus.Joining)] [InlineData(SiloStatus.ShuttingDown)] [InlineData(SiloStatus.Stopping)] - public async Task LocalLookupWhenCachedEntrySiloIsNonActiveButNotDead(SiloStatus status) + public async Task LocalLookupWhenCachedEntrySiloIsTerminatingButNotDead(SiloStatus status) { var silo = GenerateSiloAddress(); @@ -585,39 +542,13 @@ public async Task LocalLookupWhenCachedEntrySiloIsNonActiveButNotDead(SiloStatus this.mockMembershipService.UpdateSiloStatus(silo, status, "silo"); await WaitUntilClusterChangePropagated(); - Assert.False(this.grainLocator.TryLookupInCache(address.GrainId, out cached)); + Assert.True(this.grainLocator.TryLookupInCache(address.GrainId, out cached)); + Assert.Equal(address, cached); await this.grainDirectory.DidNotReceive().UnregisterSilos(Arg.Any>()); await this.lifecycle.OnStop(); } - [Theory] - [InlineData(SiloStatus.Created)] - [InlineData(SiloStatus.Joining)] - [InlineData(SiloStatus.ShuttingDown)] - [InlineData(SiloStatus.Stopping)] - public async Task LookupWhenEntryExistsButSiloIsNonActiveButNotDead(SiloStatus status) - { - var silo = GenerateSiloAddress(); - - this.mockMembershipService.UpdateSiloStatus(silo, status, "silo"); - await this.lifecycle.OnStart(); - await WaitUntilClusterChangePropagated(); - - var address = GenerateGrainAddress(silo); - - this.grainDirectory.Lookup(address.GrainId).Returns(address); - - var actual = await this.grainLocator.Lookup(address.GrainId); - Assert.Null(actual); - - await this.grainDirectory.Received(1).Lookup(address.GrainId); - await this.grainDirectory.DidNotReceive().Unregister(address); - Assert.False(this.grainLocator.TryLookupInCache(address.GrainId, out _)); - - await this.lifecycle.OnStop(); - } - /// /// Tests that the locator properly cleans up cached entries when a silo dies. /// This is critical for preventing requests from being sent to dead silos. diff --git a/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs index 17d829a1945..3b820aec7d7 100644 --- a/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs +++ b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs @@ -65,42 +65,6 @@ public void IsDefunctActivation_RemovesSiloReplacedBySuccessor() Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); } - [Theory] - [InlineData(SiloStatus.Created)] - [InlineData(SiloStatus.Joining)] - [InlineData(SiloStatus.ShuttingDown)] - [InlineData(SiloStatus.Stopping)] - [InlineData(SiloStatus.Dead)] - public void IsNonRoutableActivation_RejectsKnownNonActiveSilos(SiloStatus status) - { - var silo = CreateSiloAddress(1); - var address = CreateGrainAddress(silo, membershipVersion: 2); - var snapshot = CreateSnapshot(new ClusterMember(silo, status, "silo"), version: 2); - - Assert.True(LocalGrainDirectory.IsNonRoutableActivation(address, snapshot)); - } - - [Fact] - public void IsNonRoutableActivation_AllowsActiveSilos() - { - var silo = CreateSiloAddress(1); - var address = CreateGrainAddress(silo, membershipVersion: 2); - var snapshot = CreateSnapshot(new ClusterMember(silo, SiloStatus.Active, "silo"), version: 2); - - Assert.False(LocalGrainDirectory.IsNonRoutableActivation(address, snapshot)); - } - - [Fact] - public void IsNonRoutableActivation_AllowsUnknownSiloWithoutNewerMembershipVersion() - { - var silo = CreateSiloAddress(1); - var unrelatedSilo = CreateSiloAddress(1, port: 11112); - var address = CreateGrainAddress(silo, membershipVersion: 2); - var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); - - Assert.False(LocalGrainDirectory.IsNonRoutableActivation(address, snapshot)); - } - private static ClusterMembershipSnapshot CreateSnapshot(ClusterMember member, long version) => new(ImmutableDictionary.Empty.Add(member.SiloAddress, member), new MembershipVersion(version));