diff --git a/src/TickerQ.EntityFrameworkCore/Infrastructure/BasePersistenceProvider.cs b/src/TickerQ.EntityFrameworkCore/Infrastructure/BasePersistenceProvider.cs index b04837bf..c562772d 100644 --- a/src/TickerQ.EntityFrameworkCore/Infrastructure/BasePersistenceProvider.cs +++ b/src/TickerQ.EntityFrameworkCore/Infrastructure/BasePersistenceProvider.cs @@ -339,7 +339,7 @@ public async Task ReleaseAcquiredCronTickerOccurrences(Guid[] occurrenceIds, Can ? dbContext.Set>() : dbContext.Set>().Where(x => occurrenceIds.Contains(x.Id)); - await baseQuery.Where(x => occurrenceIds.Contains(x.Id)) + await baseQuery .WhereCanAcquire(_lockHolder) .ExecuteUpdateAsync(setter => setter .SetProperty(x => x.LockHolder, _ => null) @@ -415,7 +415,7 @@ public async IAsyncEnumerable> QueueCron { Id = item.NextCronOccurrence.Id, CronTickerId = item.Id, - ExecutionTime = now, + ExecutionTime = executionTime, Status = TickerStatus.Queued, LockHolder = _lockHolder, LockedAt = now, @@ -464,12 +464,12 @@ public async Task GetCronTickerOccurrenceRequest(Guid tickerId, Cancella .ConfigureAwait(false); } - public async Task UpdateCronTickerOccurrencesWithUnifiedContext(Guid[] timeTickerIds, InternalFunctionContext functionContext, + public async Task UpdateCronTickerOccurrencesWithUnifiedContext(Guid[] cronOccurrenceIds, InternalFunctionContext functionContext, CancellationToken cancellationToken = default) { await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);; await dbContext.Set>() - .Where(x => timeTickerIds.Contains(x.CronTickerId)) + .Where(x => cronOccurrenceIds.Contains(x.Id)) .ExecuteUpdateAsync(setter => setter.UpdateCronTickerOccurrence(functionContext), cancellationToken) .ConfigureAwait(false); } diff --git a/src/TickerQ.Utilities/Managers/InternalTickerManager.cs b/src/TickerQ.Utilities/Managers/InternalTickerManager.cs index d748f769..a5c6f840 100644 --- a/src/TickerQ.Utilities/Managers/InternalTickerManager.cs +++ b/src/TickerQ.Utilities/Managers/InternalTickerManager.cs @@ -28,132 +28,99 @@ public InternalTickerManager( _clock = clock ?? throw new ArgumentNullException(nameof(clock)); _notificationHubSender = notificationHubSender; } - + public async Task<(TimeSpan TimeRemaining, InternalFunctionContext[] Functions)> GetNextTickers(CancellationToken cancellationToken = default) { - while (true) - { - var minCronGroupTask = GetEarliestCronTickerGroupAsync(cancellationToken); - var minTimeTickersTask = _persistenceProvider.GetEarliestTimeTickers(cancellationToken); - - await Task.WhenAll(minCronGroupTask, minTimeTickersTask).ConfigureAwait(false); - - var (minCronGroup, minTimeTickers) = (await minCronGroupTask, await minTimeTickersTask); - - var minTimeTickerTime = minTimeTickers.Length != 0 - ? minTimeTickers[0].ExecutionTime ?? default - : default; + var now = _clock.UtcNow; - var minTimeRemaining = CalculateMinTimeRemaining(minCronGroup, minTimeTickerTime, out var typesToQueue); + var minCronGroupTask = GetEarliestCronTickerGroupAsync(cancellationToken); + var minTimeTickersTask = _persistenceProvider.GetEarliestTimeTickers(cancellationToken); - if (minTimeRemaining == Timeout.InfiniteTimeSpan) - return (Timeout.InfiniteTimeSpan, []); + await Task.WhenAll(minCronGroupTask, minTimeTickersTask).ConfigureAwait(false); - var nextTickers = await RetrieveEligibleTickersAsync(minCronGroup, minTimeTickers, typesToQueue, cancellationToken).ConfigureAwait(false); + var minCronGroup = await minCronGroupTask.ConfigureAwait(false); + var minTimeTickers = await minTimeTickersTask.ConfigureAwait(false); - if (nextTickers.Length != 0) - return (minTimeRemaining, nextTickers); - - if(typesToQueue.All(x => x == TickerType.CronTickerOccurrence)) - return (minTimeRemaining, nextTickers); - - await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationToken).ConfigureAwait(false); // Faster retry for time-sensitive tasks - } - } + var cronTime = minCronGroup?.Key; + var timeTickerTime = minTimeTickers.Length > 0 + ? minTimeTickers[0].ExecutionTime + : null; - private TimeSpan CalculateMinTimeRemaining( - (DateTime Key, InternalManagerContext[] Items)? minCronTicker, - DateTime minTimeTicker, - out TickerType[] sources) - { - var now = _clock.UtcNow; + if (cronTime is null && timeTickerTime is null) + return (Timeout.InfiniteTimeSpan, []); - DateTime? cron = minCronTicker?.Key; - DateTime? time = minTimeTicker == default ? null : minTimeTicker; + TimeSpan timeRemaining; + bool includeCron = false; + bool includeTimeTickers = false; - // no values - if (cron is null && time is null) + if (cronTime is null) { - sources = []; - return Timeout.InfiniteTimeSpan; + includeTimeTickers = true; + timeRemaining = SafeRemaining(timeTickerTime!.Value, now); } - - // only cron - if (time is null) + else if (timeTickerTime is null) { - sources = [TickerType.CronTickerOccurrence]; - var cronRemaining = cron.Value - now; - // Ensure we don't return negative values - schedule for immediate execution - return cronRemaining < TimeSpan.Zero ? TimeSpan.Zero : cronRemaining; + includeCron = true; + timeRemaining = SafeRemaining(cronTime.Value, now); } - - // only time - if (cron is null) + else { - sources = [TickerType.TimeTicker]; - var timeRemaining = time.Value - now; - // Ensure we don't return negative values - schedule for immediate execution - return timeRemaining < TimeSpan.Zero ? TimeSpan.Zero : timeRemaining; - } + var cronSecond = new DateTime(cronTime.Value.Year, cronTime.Value.Month, cronTime.Value.Day, + cronTime.Value.Hour, cronTime.Value.Minute, cronTime.Value.Second); + var timeSecond = new DateTime(timeTickerTime.Value.Year, timeTickerTime.Value.Month, timeTickerTime.Value.Day, + timeTickerTime.Value.Hour, timeTickerTime.Value.Minute, timeTickerTime.Value.Second); - // both present - check if they're in the exact same second (ignoring milliseconds) - var cronSecond = new DateTime(cron.Value.Year, cron.Value.Month, cron.Value.Day, - cron.Value.Hour, cron.Value.Minute, cron.Value.Second); - var timeSecond = new DateTime(time.Value.Year, time.Value.Month, time.Value.Day, - time.Value.Hour, time.Value.Minute, time.Value.Second); - - // Only batch if they're in the exact same second - if (cronSecond == timeSecond) - { - sources = [TickerType.CronTickerOccurrence, TickerType.TimeTicker]; - var earliest = cron < time ? cron.Value : time.Value; - var earliestRemaining = earliest - now; - // Ensure we don't return negative values - return earliestRemaining < TimeSpan.Zero ? TimeSpan.Zero : earliestRemaining; + if (cronSecond == timeSecond) + { + includeCron = true; + includeTimeTickers = true; + var earliest = cronTime < timeTickerTime ? cronTime.Value : timeTickerTime.Value; + timeRemaining = SafeRemaining(earliest, now); + } + else if (cronTime < timeTickerTime) + { + includeCron = true; + timeRemaining = SafeRemaining(cronTime.Value, now); + } + else + { + includeTimeTickers = true; + timeRemaining = SafeRemaining(timeTickerTime.Value, now); + } } - // Different seconds - only process the earliest one - if (cron < time) - { - sources = [TickerType.CronTickerOccurrence]; - var cronRemaining = cron.Value - now; - return cronRemaining < TimeSpan.Zero ? TimeSpan.Zero : cronRemaining; - } + if (!includeCron && !includeTimeTickers) + return (Timeout.InfiniteTimeSpan, []); - sources = [TickerType.TimeTicker]; - var finalTimeRemaining = time.Value - now; - return finalTimeRemaining < TimeSpan.Zero ? TimeSpan.Zero : finalTimeRemaining; - } + InternalFunctionContext[] cronFunctions = []; + InternalFunctionContext[] timeFunctions = []; - private async TaskRetrieveEligibleTickersAsync( - (DateTime Key, InternalManagerContext[] Items)? minCronTicker, - TimeTickerEntity[] minTimeTicker, - TickerType[] typesToQueue, - CancellationToken cancellationToken = default) - { - - if (typesToQueue.Contains(TickerType.CronTickerOccurrence) && typesToQueue.Contains(TickerType.TimeTicker)) - { - var nextCronTickersTask = QueueNextCronTickersAsync(minCronTicker!.Value, cancellationToken); - var nextTimeTickersTask = QueueNextTimeTickersAsync(minTimeTicker, cancellationToken); + if (includeCron && minCronGroup is not null) + cronFunctions = await QueueNextCronTickersAsync(minCronGroup.Value, cancellationToken).ConfigureAwait(false); - await Task.WhenAll(nextCronTickersTask, nextTimeTickersTask).ConfigureAwait(false); - - var (nextCronTickers, nextTimeTickers) = (await nextCronTickersTask, await nextTimeTickersTask); - - // Safety check for extremely large datasets - var totalLength = nextCronTickers.Length + nextTimeTickers.Length; - - var merged = new InternalFunctionContext[totalLength]; - nextCronTickers.AsSpan().CopyTo(merged.AsSpan(0, nextCronTickers.Length)); - nextTimeTickers.AsSpan().CopyTo(merged.AsSpan(nextCronTickers.Length, nextTimeTickers.Length)); - return merged; - } + if (includeTimeTickers && minTimeTickers.Length > 0) + timeFunctions = await QueueNextTimeTickersAsync(minTimeTickers, cancellationToken).ConfigureAwait(false); - if (typesToQueue.Contains(TickerType.TimeTicker)) - return await QueueNextTimeTickersAsync(minTimeTicker, cancellationToken).ConfigureAwait(false); - else - return await QueueNextCronTickersAsync(minCronTicker!.Value, cancellationToken).ConfigureAwait(false); + if (cronFunctions.Length == 0 && timeFunctions.Length == 0) + return (timeRemaining, []); + + if (cronFunctions.Length == 0) + return (timeRemaining, timeFunctions); + + if (timeFunctions.Length == 0) + return (timeRemaining, cronFunctions); + + var merged = new InternalFunctionContext[cronFunctions.Length + timeFunctions.Length]; + cronFunctions.AsSpan().CopyTo(merged.AsSpan(0, cronFunctions.Length)); + timeFunctions.AsSpan().CopyTo(merged.AsSpan(cronFunctions.Length, timeFunctions.Length)); + + return (timeRemaining, merged); + } + + private static TimeSpan SafeRemaining(DateTime target, DateTime now) + { + var remaining = target - now; + return remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining; } private async Task QueueNextTimeTickersAsync(TimeTickerEntity[] minTimeTickers, CancellationToken cancellationToken = default) @@ -489,9 +456,9 @@ public async Task MigrateDefinedCronTickers((string, string)[] cronExpressions, public async Task DeleteTicker(Guid tickerId, TickerType type, CancellationToken cancellationToken = default) { if (type == TickerType.CronTickerOccurrence) - await _persistenceProvider.RemoveTimeTickers([tickerId], cancellationToken).ConfigureAwait(false); - else await _persistenceProvider.RemoveCronTickers([tickerId], cancellationToken).ConfigureAwait(false); + else + await _persistenceProvider.RemoveTimeTickers([tickerId], cancellationToken).ConfigureAwait(false); } public async Task ReleaseDeadNodeResources(string instanceIdentifier, CancellationToken cancellationToken = default) @@ -503,4 +470,4 @@ public async Task ReleaseDeadNodeResources(string instanceIdentifier, Cancellati await Task.WhenAll(cronOccurrence, timeTickers).ConfigureAwait(false); } } -} \ No newline at end of file +} diff --git a/src/TickerQ.Utilities/Models/InternalFunctionContext.cs b/src/TickerQ.Utilities/Models/InternalFunctionContext.cs index 95db2462..3166ce15 100644 --- a/src/TickerQ.Utilities/Models/InternalFunctionContext.cs +++ b/src/TickerQ.Utilities/Models/InternalFunctionContext.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; -using System.Linq; using System.Linq.Expressions; using System.Reflection; using TickerQ.Utilities.Enums; diff --git a/src/TickerQ/Src/BackgroundServices/TickerQFallbackBackgroundService.cs b/src/TickerQ/Src/BackgroundServices/TickerQFallbackBackgroundService.cs index c4590e98..fe5517d0 100644 --- a/src/TickerQ/Src/BackgroundServices/TickerQFallbackBackgroundService.cs +++ b/src/TickerQ/Src/BackgroundServices/TickerQFallbackBackgroundService.cs @@ -11,7 +11,6 @@ namespace TickerQ.BackgroundServices; internal class TickerQFallbackBackgroundService : BackgroundService { private int _started; - private PeriodicTimer _tickerFallbackJobPeriodicTimer; private readonly IInternalTickerManager _internalTickerManager; private readonly TickerExecutionTaskHandler _tickerExecutionTaskHandler; private readonly TickerQTaskScheduler _tickerQTaskScheduler; @@ -33,16 +32,8 @@ public override Task StartAsync(CancellationToken ct) protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - _tickerFallbackJobPeriodicTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(10)); - await RunTickerQFallbackAsync(stoppingToken); - } - - private async Task RunTickerQFallbackAsync(CancellationToken stoppingToken) - { - while (await _tickerFallbackJobPeriodicTimer.WaitForNextTickAsync(stoppingToken)) + while (!stoppingToken.IsCancellationRequested) { - var oldPeriod = _tickerFallbackJobPeriodicTimer.Period; - var functions = await _internalTickerManager.RunTimedOutTickers(stoppingToken); if (functions.Length != 0) @@ -76,13 +67,12 @@ private async Task RunTickerQFallbackAsync(CancellationToken stoppingToken) await _tickerQTaskScheduler.QueueAsync(ct => _tickerExecutionTaskHandler.ExecuteTaskAsync(function, true, ct), function.CachedPriority, stoppingToken); } - _tickerFallbackJobPeriodicTimer.Period = TimeSpan.FromMilliseconds(10); + await Task.Delay(TimeSpan.FromMilliseconds(10), stoppingToken); } else - _tickerFallbackJobPeriodicTimer.Period = _fallbackJobPeriod; - - if(oldPeriod != _fallbackJobPeriod) - await _tickerFallbackJobPeriodicTimer.WaitForNextTickAsync(stoppingToken); + { + await Task.Delay(_fallbackJobPeriod, stoppingToken); + } } } @@ -92,4 +82,4 @@ public override async Task StopAsync(CancellationToken cancellationToken) Interlocked.Exchange(ref _started, 0); await base.StopAsync(cancellationToken); } -} \ No newline at end of file +} diff --git a/src/TickerQ/Src/BackgroundServices/TickerQSchedulerBackgroundService.cs b/src/TickerQ/Src/BackgroundServices/TickerQSchedulerBackgroundService.cs index fd515e74..caddef1a 100644 --- a/src/TickerQ/Src/BackgroundServices/TickerQSchedulerBackgroundService.cs +++ b/src/TickerQ/Src/BackgroundServices/TickerQSchedulerBackgroundService.cs @@ -13,7 +13,6 @@ namespace TickerQ.BackgroundServices; internal class TickerQSchedulerBackgroundService : BackgroundService, ITickerQHostScheduler { - private PeriodicTimer _tickerJobPeriodicTimer; private readonly RestartThrottleManager _restartThrottle; private readonly IInternalTickerManager _internalTickerManager; private readonly TickerExecutionContext _executionContext; @@ -60,7 +59,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) try { - _tickerJobPeriodicTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(1)); await RunTickerQSchedulerAsync(stoppingToken, _schedulerLoopCancellationTokenSource.Token); } catch (OperationCanceledException) when (_schedulerLoopCancellationTokenSource.Token.IsCancellationRequested && !stoppingToken.IsCancellationRequested) @@ -85,10 +83,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } finally { - // CRITICAL: Must dispose PeriodicTimer to prevent memory leak - _tickerJobPeriodicTimer?.Dispose(); - _tickerJobPeriodicTimer = null; - + _executionContext.SetFunctions(null); _schedulerLoopCancellationTokenSource?.Dispose(); _schedulerLoopCancellationTokenSource = null; } @@ -97,16 +92,16 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) private async Task RunTickerQSchedulerAsync(CancellationToken stoppingToken, CancellationToken cancellationToken) { - while (await _tickerJobPeriodicTimer.WaitForNextTickAsync(cancellationToken)) + while (!stoppingToken.IsCancellationRequested && !cancellationToken.IsCancellationRequested) { - var oldPeriod = _tickerJobPeriodicTimer.Period; - if (_executionContext.Functions.Length != 0) { await _internalTickerManager.SetTickersInProgress(_executionContext.Functions, cancellationToken); foreach (var function in _executionContext.Functions.OrderBy(x => x.CachedPriority)) _ = _taskScheduler.QueueAsync(async ct => await _taskHandler.ExecuteTaskAsync(function,false, ct), function.CachedPriority, stoppingToken); + + _executionContext.SetFunctions(null); } var (timeRemaining, functions) = @@ -114,24 +109,23 @@ private async Task RunTickerQSchedulerAsync(CancellationToken stoppingToken, Can _executionContext.SetFunctions(functions); - var sleepDuration = timeRemaining > TimeSpan.FromDays(1) || timeRemaining == Timeout.InfiniteTimeSpan - ? TimeSpan.FromDays(1) - : timeRemaining; - - if (sleepDuration <= TimeSpan.Zero) - sleepDuration = TimeSpan.FromMilliseconds(1); - - _tickerJobPeriodicTimer.Period = sleepDuration; - - if (timeRemaining == Timeout.InfiniteTimeSpan) + TimeSpan sleepDuration; + if (timeRemaining == Timeout.InfiniteTimeSpan || timeRemaining > TimeSpan.FromDays(1)) + { + sleepDuration = TimeSpan.FromDays(1); _executionContext.SetNextPlannedOccurrence(null); + } else + { + sleepDuration = timeRemaining <= TimeSpan.Zero + ? TimeSpan.FromMilliseconds(1) + : timeRemaining; _executionContext.SetNextPlannedOccurrence(DateTime.UtcNow.Add(sleepDuration)); + } _executionContext.NotifyCoreAction(_executionContext.GetNextPlannedOccurrence(), CoreNotifyActionType.NotifyNextOccurence); - - if(oldPeriod != _tickerJobPeriodicTimer.Period) - await _tickerJobPeriodicTimer.WaitForNextTickAsync(cancellationToken); + + await Task.Delay(sleepDuration, cancellationToken); } } @@ -153,11 +147,19 @@ public void RestartIfNeeded(DateTime? dateTime) // Restart if: // 1. No tasks are currently planned, OR - // 2. The new task should execute BEFORE or at the same time as the currently planned task, OR + // 2. The new task should execute at least 500ms earlier than the currently planned task, OR // 3. The new task is already due/overdue (ExecutionTime <= now) - if (nextPlannedOccurrence == null || - dateTime.Value <= nextPlannedOccurrence.Value || - dateTime.Value <= now) + if (nextPlannedOccurrence == null) + { + _restartThrottle.RequestRestart(); + return; + } + + var newTime = dateTime.Value; + var threshold = TimeSpan.FromMilliseconds(500); + var diff = nextPlannedOccurrence.Value - newTime; + + if (newTime <= now || diff > threshold) _restartThrottle.RequestRestart(); } diff --git a/src/TickerQ/Src/TickerQThreadPool/WorkItem.cs b/src/TickerQ/Src/TickerQThreadPool/WorkItem.cs index 893a1fd5..e3b51a39 100644 --- a/src/TickerQ/Src/TickerQThreadPool/WorkItem.cs +++ b/src/TickerQ/Src/TickerQThreadPool/WorkItem.cs @@ -18,3 +18,4 @@ public WorkItem(Func work, CancellationToken userToken) UserToken = userToken; } } +