diff --git a/src/Orleans.Reminders/Hosting/SiloBuilderReminderExtensions.cs b/src/Orleans.Reminders/Hosting/SiloBuilderReminderExtensions.cs index 4e7e2aa030c..f71c65c5a98 100644 --- a/src/Orleans.Reminders/Hosting/SiloBuilderReminderExtensions.cs +++ b/src/Orleans.Reminders/Hosting/SiloBuilderReminderExtensions.cs @@ -3,6 +3,7 @@ using Orleans.Configuration.Internal; using System.Linq; using Orleans.Runtime.ReminderService; +using Orleans.Services; using Orleans.Timers; namespace Orleans.Hosting; @@ -28,6 +29,7 @@ public static void AddReminders(this IServiceCollection services) } services.AddSingleton(); + services.AddFromExisting(); services.AddFromExisting(); services.AddFromExisting, LocalReminderService>(); services.AddSingleton(); diff --git a/src/Orleans.Reminders/ReminderService/LocalReminderService.cs b/src/Orleans.Reminders/ReminderService/LocalReminderService.cs index 9acd2f22300..871fbb08b07 100644 --- a/src/Orleans.Reminders/ReminderService/LocalReminderService.cs +++ b/src/Orleans.Reminders/ReminderService/LocalReminderService.cs @@ -32,6 +32,10 @@ internal sealed partial class LocalReminderService : GrainService, IReminderServ private long localTableSequence; private uint initialReadCallCount = 0; private Task? runTask; + private readonly object _deliveryLock = new(); + private bool _isDeliveringReminders; + private int _activeReminderDeliveries; + private TaskCompletionSource? _deliveryQuiesced; public LocalReminderService( GrainReferenceActivator referenceActivator, @@ -64,75 +68,107 @@ void ILifecycleParticipant.Participate(ISiloLifecycle observer) { observer.Subscribe( nameof(LocalReminderService), - ServiceLifecycleStage.BecomeActive, - InitializeReminderService, - StopReminderService); - observer.Subscribe( - nameof(LocalReminderService), - ServiceLifecycleStage.Active, - StartReminderService, - NoOpStop); + ServiceLifecycleStage.RuntimeGrainServices, + StartReminderTable, + StopReminderTable); - async Task InitializeReminderService(CancellationToken ct) + async Task StartReminderTable(CancellationToken ct) { try { - await this.QueueTask(() => Initialize(ct)); + await this.QueueTask(() => StartReminderTableCoreAsync(ct)); } catch (Exception exception) { LogErrorActivatingReminderService(exception); throw; } + + async Task StartReminderTableCoreAsync(CancellationToken cancellationToken) + { + CheckRuntimeContext(); + + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(this.reminderOptions.InitializationTimeout); + + // Confirm that it can access the underlying store, as after this the ReminderService will load in the background, without the opportunity to prevent the Silo from starting + await reminderTable.StartAsync(cts.Token); + } } - async Task StopReminderService(CancellationToken ct) + async Task StopReminderTable(CancellationToken ct) { try { - await this.QueueTask(Stop).WaitAsync(ct); + await this.QueueTask(() => reminderTable.StopAsync()).WaitAsync(ct); } catch (Exception exception) { - LogErrorStoppingReminderService(exception); + LogErrorActivatingReminderService(exception); throw; } } + } - async Task StartReminderService(CancellationToken ct) - { - using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); - cts.CancelAfter(this.reminderOptions.InitializationTimeout); + public override async Task Start() + { + CheckRuntimeContext(); - try + try + { + lock (_deliveryLock) { - await this.QueueTask(Start).WaitAsync(cts.Token); + if (_isDeliveringReminders) + { + return; + } + + _isDeliveringReminders = true; } - catch (Exception exception) + + foreach (var reminderData in localReminders.Values) { - LogErrorStartingReminderService(exception); - throw; + reminderData.TryStart(); } - } - static Task NoOpStop(CancellationToken _) => Task.CompletedTask; + await base.Start(); + } + catch (Exception exception) + { + await Stop(); + LogErrorStartingReminderService(exception); + throw; + } } - /// - /// Initializes the reminder table. - /// - /// - private async Task Initialize(CancellationToken cancellationToken) + public override async Task Stop() { - using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cts.CancelAfter(this.reminderOptions.InitializationTimeout); + CheckRuntimeContext(); - // Confirm that it can access the underlying store, as after this the ReminderService will load in the background, without the opportunity to prevent the Silo from starting - await reminderTable.StartAsync(cts.Token); - } + Task? deliveryQuiescedTask = null; + lock (_deliveryLock) + { + _isDeliveringReminders = false; + if (_activeReminderDeliveries > 0) + { + _deliveryQuiesced ??= new(TaskCreationOptions.RunContinuationsAsynchronously); + deliveryQuiescedTask = _deliveryQuiesced.Task; + } + } - public async override Task Stop() - { + // Stop all reminders. + var tasks = new List(localReminders.Count + (deliveryQuiescedTask is null ? 0 : 1)); + if (deliveryQuiescedTask is not null) + { + tasks.Add(deliveryQuiescedTask); + } + + foreach (var reminderData in localReminders.Values) + { + tasks.Add(reminderData.StopAsync(ReminderEvents.LocalReminderStopReason.ServiceStopped)); + } + + await Task.WhenAll(tasks); await base.Stop(); listRefreshTimer.Dispose(); @@ -141,21 +177,14 @@ public async override Task Stop() await task; } - var disposeTasks = new List(localReminders.Count); - foreach (LocalReminderData r in localReminders.Values) - { - disposeTasks.Add(r.StopAsync(ReminderEvents.LocalReminderStopReason.ServiceStopped)); - } - - await Task.WhenAll(disposeTasks); - await reminderTable.StopAsync(); - // For a graceful shutdown, also handover reminder responsibilities to new owner, and update the ReminderTable // currently, this is taken care of by periodically reading the reminder table } public async Task RegisterOrUpdateReminder(GrainId grainId, string reminderName, TimeSpan dueTime, TimeSpan period) { + CheckRuntimeContext(); + var entry = new ReminderEntry { GrainId = grainId, @@ -192,6 +221,8 @@ public async Task RegisterOrUpdateReminder(GrainId grainId, stri /// public async Task UnregisterReminder(IGrainReminder reminder) { + CheckRuntimeContext(); + var remData = (ReminderData)reminder; LogDebugUnregisterReminder(reminder, localTableSequence); @@ -283,6 +314,8 @@ public async Task> GetReminders(GrainId grainId) /// private Task ReadAndUpdateReminders() { + CheckRuntimeContext(); + if (StoppedCancellationTokenSource.IsCancellationRequested) return Task.CompletedTask; var tasks = new List(); @@ -302,6 +335,8 @@ private Task ReadAndUpdateReminders() private void RemoveOutOfRangeReminders(List removedReminderTasks) { + CheckRuntimeContext(); + var remindersOutOfRange = 0; foreach (var r in localReminders) @@ -324,6 +359,8 @@ private void RemoveOutOfRangeReminders(List removedReminderTasks) public override Task OnRangeChange(IRingRange oldRange, IRingRange newRange, bool increased) { + CheckRuntimeContext(); + _ = base.OnRangeChange(oldRange, newRange, increased); if (Status == GrainServiceStatus.Started) return ReadAndUpdateReminders(); @@ -339,6 +376,8 @@ private async Task RunAsync() { try { + CheckRuntimeContext(); + overrideDelay = null; switch (Status) { @@ -363,18 +402,23 @@ private async Task RunAsync() protected override async Task StartInBackground() { + CheckRuntimeContext(); + await DoInitialReadAndUpdateReminders(); this.runTask = RunAsync(); } private async Task DoInitialReadAndUpdateReminders() { + CheckRuntimeContext(); + try { if (StoppedCancellationTokenSource.IsCancellationRequested) return; initialReadCallCount++; await this.ReadAndUpdateReminders(); + Status = GrainServiceStatus.Started; startedTask.TrySetResult(true); } @@ -396,6 +440,8 @@ private async Task DoInitialReadAndUpdateReminders() private async Task ReadTableAndStartTimers(ISingleRange range, int rangeSerialNumberCopy) { + CheckRuntimeContext(); + LogDebugReadingRows(range); localTableSequence++; long cachedSequence = localTableSequence; @@ -505,28 +551,72 @@ private async Task ReadTableAndStartTimers(ISingleRange range, int rangeSerialNu private void AddOrUpdateLocalReminder(ReminderEntry entry) { + CheckRuntimeContext(); + localTableSequence++; var key = new ReminderIdentity(entry.GrainId, entry.ReminderName); ref var reminderData = ref CollectionsMarshal.GetValueRefOrAddDefault(localReminders, key, out var exists); - if (exists && reminderData is { } existingReminder) + if (exists && reminderData is not null) { - existingReminder.LocalSequenceNumber = localTableSequence; - existingReminder.Update(entry); + reminderData.LocalSequenceNumber = localTableSequence; + reminderData.Update(entry); LogDebugUpdatedReminder(entry); - return; } + else + { + reminderData = new LocalReminderData(entry, this) + { + LocalSequenceNumber = localTableSequence, + }; - var newReminder = new LocalReminderData(entry, this) + LogDebugStartedReminder(entry); + } + + lock (_deliveryLock) { - LocalSequenceNumber = localTableSequence, - }; - newReminder.Start(); - reminderData = newReminder; - LogDebugStartedReminder(entry); + if (!_isDeliveringReminders) + { + return; + } + } + + reminderData.TryStart(); + } + + private bool TryBeginSingleReminderDelivery() + { + lock (_deliveryLock) + { + if (!_isDeliveringReminders) + { + return false; + } + + ++_activeReminderDeliveries; + return true; + } + } + + private void CompleteSingleReminderDelivery() + { + TaskCompletionSource? quiesced = null; + lock (_deliveryLock) + { + --_activeReminderDeliveries; + if (_activeReminderDeliveries == 0) + { + quiesced = _deliveryQuiesced; + _deliveryQuiesced = null; + } + } + + quiesced?.SetResult(); } private Task DoResponsibilitySanityCheck(GrainId grainId, string debugInfo) { + CheckRuntimeContext(); + switch (Status) { case GrainServiceStatus.Booting: @@ -568,6 +658,8 @@ async Task WaitForInitCompletion() void CheckRange() { + CheckRuntimeContext(); + if (!RingRange.InRange(grainId)) { LogWarningNotResponsible(debugInfo, grainId, RingRange); @@ -698,15 +790,15 @@ public bool IsRunning } } - public void Start() + public bool TryStart() { GrainId grainId; string reminderName; lock (_lock) { - if (_runTask is not null) + if (_runTask is not null || _stopReason is not (int)ReminderEvents.LocalReminderStopReason.Unknown) { - throw new InvalidOperationException($"{nameof(Start)} may only be called once per instance and has already been called on this instance."); + return false; } grainId = _entry.GrainId; @@ -716,6 +808,7 @@ public void Start() } ReminderEvents.EmitLocalReminderStarted(grainId, reminderName, this, _shared.Silo); + return true; } public void Update(ReminderEntry entry) @@ -778,7 +871,7 @@ private async Task RunAsync(GrainId grainId, string reminderName) while (await WaitForNextTick()) { var entry = PrepareTick(); - if (entry is null) + if (entry is null || !_shared.TryBeginSingleReminderDelivery()) { continue; } @@ -824,6 +917,10 @@ private async Task RunAsync(GrainId grainId, string reminderName) { LogWarningFiringReminder(_shared.logger, entry.ReminderName, entry.GrainId, exception); } + finally + { + _shared.CompleteSingleReminderDelivery(); + } } } finally diff --git a/test/Grains/TestInternalGrains/ReminderTestGrain2.cs b/test/Grains/TestInternalGrains/ReminderTestGrain2.cs index 53db189398b..89211772b18 100644 --- a/test/Grains/TestInternalGrains/ReminderTestGrain2.cs +++ b/test/Grains/TestInternalGrains/ReminderTestGrain2.cs @@ -81,7 +81,7 @@ public async Task StartReminder(string reminderName, TimeSpan? p return r; } - public Task ReceiveReminder(string reminderName, TickStatus status) + public async Task ReceiveReminder(string reminderName, TickStatus status) { // it can happen that due to failure, when a new activation is created, // it doesn't know which reminders were registered against the grain @@ -92,6 +92,15 @@ public Task ReceiveReminder(string reminderName, TickStatus status) this.sequence.Add(reminderName, 0); // we'll get upto date to the latest sequence number while processing this tick } + if (!this.allReminders.ContainsKey(reminderName)) + { + await GetMissingReminders(); + if (!this.allReminders.ContainsKey(reminderName)) + { + throw new OrleansException($"Could not find reminder {reminderName} in grain {this.IdentityString}"); + } + } + allReminders[reminderName].Fired.Add(status.CurrentTickTime); // calculating tick sequence number @@ -111,7 +120,7 @@ public Task ReceiveReminder(string reminderName, TickStatus status) if (sequenceNumber < this.sequence[reminderName]) { this.logger.LogInformation("ReceiveReminder: {Reminder} Incorrect tick {ExpectedSequenceNumber} vs. {SequenceNumber} with status {Status}.", reminderName, this.sequence[reminderName], sequenceNumber, status); - return Task.CompletedTask; + return; } this.sequence[reminderName] = sequenceNumber; this.logger.LogInformation("ReceiveReminder: {ReminderNAme} Sequence # {SequenceNumber} with status {Status}.", reminderName, this.sequence[reminderName], status); @@ -120,7 +129,7 @@ public Task ReceiveReminder(string reminderName, TickStatus status) string counterValue = this.sequence[reminderName].ToString(CultureInfo.InvariantCulture); File.WriteAllText(fileName, counterValue); - return Task.CompletedTask; + return; } public async Task StopReminder(string reminderName) @@ -295,7 +304,7 @@ public async Task StartReminder(string reminderName, TimeSpan? p return r; } - public Task ReceiveReminder(string reminderName, TickStatus status) + public async Task ReceiveReminder(string reminderName, TickStatus status) { // it can happen that due to failure, when a new activation is created, // it doesn't know which reminders were registered against the grain @@ -308,6 +317,15 @@ public Task ReceiveReminder(string reminderName, TickStatus status) this.sequence.Add(reminderName, 0); // we'll get upto date to the latest sequence number while processing this tick } + if (!this.allReminders.ContainsKey(reminderName)) + { + await GetMissingReminders(); + if (!this.allReminders.ContainsKey(reminderName)) + { + throw new OrleansException($"Could not find reminder {reminderName} in grain {this.IdentityString}"); + } + } + // calculating tick sequence number // we do all arithmetics on DateTime by converting into long because we dont have divide operation on DateTime @@ -325,7 +343,7 @@ public Task ReceiveReminder(string reminderName, TickStatus status) if (sequenceNumber < this.sequence[reminderName]) { this.logger.LogInformation("{ReminderName} Incorrect tick {ExpectedSequenceNumber} vs. {SequenceNumber} with status {Status}.", reminderName, this.sequence[reminderName], sequenceNumber, status); - return Task.CompletedTask; + return; } this.sequence[reminderName] = sequenceNumber; @@ -333,7 +351,7 @@ public Task ReceiveReminder(string reminderName, TickStatus status) File.WriteAllText(GetFileName(reminderName), this.sequence[reminderName].ToString()); - return Task.CompletedTask; + return; } public async Task StopReminder(string reminderName)