From 65403fe48a31611edd86aa37131357419c7efc8c Mon Sep 17 00:00:00 2001 From: "wojciech.wentland" Date: Mon, 9 Feb 2026 10:39:34 +0100 Subject: [PATCH 1/3] Fix thread-safety bugs, resource leaks, and scheduling issues across core components Addresses multiple concurrency bugs, resource disposal issues, and adds performance optimizations across the scheduler, cancellation, caching, and persistence layers. Co-Authored-By: Claude Opus 4.6 --- .../TickerRedisPersistenceProvider.cs | 93 ++++++++++--------- .../NodeHeartBeatBackgroundService.cs | 33 +++++-- .../TickerQRedisContext.cs | 2 +- .../Infrastructure/BasePersistenceProvider.cs | 22 +++-- src/TickerQ.Utilities/CronScheduleCache.cs | 46 +++++++-- .../Managers/InternalTickerManager.cs | 10 +- .../Models/InternalFunctionContext.cs | 27 ++++-- .../TickerCancellationTokenManager.cs | 84 +++++++++-------- .../TickerQSchedulerBackgroundService.cs | 8 +- src/TickerQ/Src/RestartThrottleManager.cs | 19 +++- .../Src/SafeCancellationTokenSource.cs | 69 ++++++++++++-- src/TickerQ/Src/TickerExecutionTaskHandler.cs | 36 +++---- .../TickerQThreadPool/TickerQTaskScheduler.cs | 19 +++- 13 files changed, 314 insertions(+), 154 deletions(-) diff --git a/src/TickerQ.Caching.StackExchangeRedis/Infrastructure/TickerRedisPersistenceProvider.cs b/src/TickerQ.Caching.StackExchangeRedis/Infrastructure/TickerRedisPersistenceProvider.cs index a44b6202..c3b1af53 100644 --- a/src/TickerQ.Caching.StackExchangeRedis/Infrastructure/TickerRedisPersistenceProvider.cs +++ b/src/TickerQ.Caching.StackExchangeRedis/Infrastructure/TickerRedisPersistenceProvider.cs @@ -115,52 +115,71 @@ private static TimeTickerEntity MapForQueue(TTimeTicker ticker) }; } - private async Task AddTimeTickerIndexesAsync(TTimeTicker ticker) + // Batched index updates — single round-trip per call via IBatch + private Task AddTimeTickerIndexesAsync(TTimeTicker ticker) { - await _db.SetAddAsync(TimeTickerIdsKey, ticker.Id.ToString()).ConfigureAwait(false); + var batch = _db.CreateBatch(); + var idStr = (RedisValue)ticker.Id.ToString(); + + var t1 = batch.SetAddAsync(TimeTickerIdsKey, idStr); + Task t2; if (ticker.ExecutionTime.HasValue && CanAcquire(ticker.Status, ticker.LockHolder, _lockHolder)) - { - await _db.SortedSetAddAsync(TimeTickerPendingKey, ticker.Id.ToString(), ToScore(ticker.ExecutionTime.Value)).ConfigureAwait(false); - } + t2 = batch.SortedSetAddAsync(TimeTickerPendingKey, idStr, ToScore(ticker.ExecutionTime.Value)); else - { - await _db.SortedSetRemoveAsync(TimeTickerPendingKey, ticker.Id.ToString()).ConfigureAwait(false); - } + t2 = batch.SortedSetRemoveAsync(TimeTickerPendingKey, idStr); + + batch.Execute(); + return Task.WhenAll(t1, t2); } - private async Task RemoveTimeTickerIndexesAsync(Guid id) + private Task RemoveTimeTickerIndexesAsync(Guid id) { - await _db.SetRemoveAsync(TimeTickerIdsKey, id.ToString()).ConfigureAwait(false); - await _db.SortedSetRemoveAsync(TimeTickerPendingKey, id.ToString()).ConfigureAwait(false); + var batch = _db.CreateBatch(); + var idStr = (RedisValue)id.ToString(); + + var t1 = batch.SetRemoveAsync(TimeTickerIdsKey, idStr); + var t2 = batch.SortedSetRemoveAsync(TimeTickerPendingKey, idStr); + + batch.Execute(); + return Task.WhenAll(t1, t2); } - private async Task AddCronIndexesAsync(TCronTicker ticker) + private Task AddCronIndexesAsync(TCronTicker ticker) { - await _db.SetAddAsync(CronIdsKey, ticker.Id.ToString()).ConfigureAwait(false); + return _db.SetAddAsync(CronIdsKey, ticker.Id.ToString()); } - private async Task RemoveCronIndexesAsync(Guid id) + private Task RemoveCronIndexesAsync(Guid id) { - await _db.SetRemoveAsync(CronIdsKey, id.ToString()).ConfigureAwait(false); + return _db.SetRemoveAsync(CronIdsKey, id.ToString()); } - private async Task AddCronOccurrenceIndexesAsync(CronTickerOccurrenceEntity occurrence) + private Task AddCronOccurrenceIndexesAsync(CronTickerOccurrenceEntity occurrence) { - await _db.SetAddAsync(CronOccurrenceIdsKey, occurrence.Id.ToString()).ConfigureAwait(false); + var batch = _db.CreateBatch(); + var idStr = (RedisValue)occurrence.Id.ToString(); + + var t1 = batch.SetAddAsync(CronOccurrenceIdsKey, idStr); + Task t2; if (CanAcquire(occurrence.Status, occurrence.LockHolder, _lockHolder)) - { - await _db.SortedSetAddAsync(CronOccurrencePendingKey, occurrence.Id.ToString(), ToScore(occurrence.ExecutionTime)).ConfigureAwait(false); - } + t2 = batch.SortedSetAddAsync(CronOccurrencePendingKey, idStr, ToScore(occurrence.ExecutionTime)); else - { - await _db.SortedSetRemoveAsync(CronOccurrencePendingKey, occurrence.Id.ToString()).ConfigureAwait(false); - } + t2 = batch.SortedSetRemoveAsync(CronOccurrencePendingKey, idStr); + + batch.Execute(); + return Task.WhenAll(t1, t2); } - private async Task RemoveCronOccurrenceIndexesAsync(Guid id) + private Task RemoveCronOccurrenceIndexesAsync(Guid id) { - await _db.SetRemoveAsync(CronOccurrenceIdsKey, id.ToString()).ConfigureAwait(false); - await _db.SortedSetRemoveAsync(CronOccurrencePendingKey, id.ToString()).ConfigureAwait(false); + var batch = _db.CreateBatch(); + var idStr = (RedisValue)id.ToString(); + + var t1 = batch.SetRemoveAsync(CronOccurrenceIdsKey, idStr); + var t2 = batch.SortedSetRemoveAsync(CronOccurrencePendingKey, idStr); + + batch.Execute(); + return Task.WhenAll(t1, t2); } #endregion @@ -426,15 +445,10 @@ public async Task ReleaseDeadNodeTimeTickerResources(string instanceIdentifier, var ticker = await GetAsync(TimeTickerKey(id)).ConfigureAwait(false); if (ticker == null) continue; - if (ticker.LockHolder == instanceIdentifier && ticker.Status == TickerStatus.InProgress) - { - ticker.Status = TickerStatus.Skipped; - ticker.SkippedReason = "Node is not alive!"; - ticker.ExecutedAt = now; - ticker.UpdatedAt = now; - } - else if (ticker.LockHolder == instanceIdentifier && (ticker.Status == TickerStatus.Idle || ticker.Status == TickerStatus.Queued)) + if (ticker.LockHolder == instanceIdentifier && + ticker.Status is TickerStatus.InProgress or TickerStatus.Idle or TickerStatus.Queued) { + // Reset to Idle so another node can pick up the work ticker.LockHolder = null; ticker.LockedAt = null; ticker.Status = TickerStatus.Idle; @@ -623,15 +637,10 @@ public async Task ReleaseDeadNodeOccurrenceResources(string instanceIdentifier, var occurrence = await GetAsync>(CronOccurrenceKey(id)).ConfigureAwait(false); if (occurrence == null) continue; - if (occurrence.LockHolder == instanceIdentifier && occurrence.Status == TickerStatus.InProgress) - { - occurrence.Status = TickerStatus.Skipped; - occurrence.SkippedReason = "Node is not alive!"; - occurrence.ExecutedAt = now; - occurrence.UpdatedAt = now; - } - else if (occurrence.LockHolder == instanceIdentifier && (occurrence.Status == TickerStatus.Idle || occurrence.Status == TickerStatus.Queued)) + if (occurrence.LockHolder == instanceIdentifier && + occurrence.Status is TickerStatus.InProgress or TickerStatus.Idle or TickerStatus.Queued) { + // Reset to Idle so another node can pick up the work occurrence.LockHolder = null; occurrence.LockedAt = null; occurrence.Status = TickerStatus.Idle; diff --git a/src/TickerQ.Caching.StackExchangeRedis/NodeHeartBeatBackgroundService.cs b/src/TickerQ.Caching.StackExchangeRedis/NodeHeartBeatBackgroundService.cs index 79973b43..289dd4d7 100644 --- a/src/TickerQ.Caching.StackExchangeRedis/NodeHeartBeatBackgroundService.cs +++ b/src/TickerQ.Caching.StackExchangeRedis/NodeHeartBeatBackgroundService.cs @@ -31,13 +31,28 @@ public override Task StartAsync(CancellationToken ct) protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - try + while (!stoppingToken.IsCancellationRequested) { - await RunTickerQFallbackAsync(stoppingToken); - } - catch (Exception e) - { - _logger.LogError("Heartbeat background service failed: {Exception}", e); + try + { + await RunTickerQFallbackAsync(stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + catch (Exception e) + { + _logger.LogError("Heartbeat background service failed: {Exception}. Retrying in 5 seconds...", e); + try + { + await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); + } + catch (OperationCanceledException) + { + break; + } + } } } @@ -66,4 +81,10 @@ public override async Task StopAsync(CancellationToken cancellationToken) Interlocked.Exchange(ref _started, 0); await base.StopAsync(cancellationToken); } + + public override void Dispose() + { + _tickerHeartBeatPeriodicTimer.Dispose(); + base.Dispose(); + } } \ No newline at end of file diff --git a/src/TickerQ.Caching.StackExchangeRedis/TickerQRedisContext.cs b/src/TickerQ.Caching.StackExchangeRedis/TickerQRedisContext.cs index ef85c854..16f602d9 100644 --- a/src/TickerQ.Caching.StackExchangeRedis/TickerQRedisContext.cs +++ b/src/TickerQ.Caching.StackExchangeRedis/TickerQRedisContext.cs @@ -145,6 +145,6 @@ public async Task GetOrSetArrayAsync(string cacheKey, // ignored } - return null; + return result; } } \ No newline at end of file diff --git a/src/TickerQ.EntityFrameworkCore/Infrastructure/BasePersistenceProvider.cs b/src/TickerQ.EntityFrameworkCore/Infrastructure/BasePersistenceProvider.cs index 5f343752..44c0374c 100644 --- a/src/TickerQ.EntityFrameworkCore/Infrastructure/BasePersistenceProvider.cs +++ b/src/TickerQ.EntityFrameworkCore/Infrastructure/BasePersistenceProvider.cs @@ -199,10 +199,10 @@ await dbContext.Set() await dbContext.Set() .Where(x => x.LockHolder == instanceIdentifier && x.Status == TickerStatus.InProgress) .ExecuteUpdateAsync(setter => setter - .SetProperty(x => x.Status, TickerStatus.Skipped) - .SetProperty(x => x.SkippedReason, "Node is not alive!") - .SetProperty(x => x.ExecutedAt, now) - .SetProperty(x => x.UpdatedAt, now), cancellationToken) + .SetProperty(x => x.LockHolder, _ => null) + .SetProperty(x => x.LockedAt, _ => null) + .SetProperty(x => x.Status, TickerStatus.Idle) + .SetProperty(x => x.UpdatedAt, now), cancellationToken) .ConfigureAwait(false); } #endregion @@ -334,9 +334,11 @@ public async Task GetAllCronTickerExpressions(CancellationTo }, expiration: TimeSpan.FromMinutes(10), cancellationToken: cancellationToken); - - - await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);; + + if (result != null) + return result; + + await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false); return await dbContext.Set() .AsNoTracking() .Select(MappingExtensions.ForCronTickerExpressions()) @@ -407,9 +409,9 @@ await dbContext.Set>() await dbContext.Set>() .Where(x => x.LockHolder == instanceIdentifier && x.Status == TickerStatus.InProgress) .ExecuteUpdateAsync(setter => setter - .SetProperty(x => x.Status, TickerStatus.Skipped) - .SetProperty(x => x.SkippedReason, "Node is not alive!") - .SetProperty(x => x.ExecutedAt, now) + .SetProperty(x => x.LockHolder, _ => null) + .SetProperty(x => x.LockedAt, _ => null) + .SetProperty(x => x.Status, TickerStatus.Idle) .SetProperty(x => x.UpdatedAt, now), cancellationToken) .ConfigureAwait(false); } diff --git a/src/TickerQ.Utilities/CronScheduleCache.cs b/src/TickerQ.Utilities/CronScheduleCache.cs index f355ff5b..efdec856 100644 --- a/src/TickerQ.Utilities/CronScheduleCache.cs +++ b/src/TickerQ.Utilities/CronScheduleCache.cs @@ -32,18 +32,48 @@ public static CrontabSchedule Get(string expression) public static DateTime? GetNextOccurrenceOrDefault(string expression, DateTime dateTime) { - var parsed = Get(Normalize(expression)); - + var parsed = Get(expression); + if (parsed == null) return null; - + var localTime = TimeZoneInfo.ConvertTimeFromUtc(dateTime, TimeZoneInfo); - + var nextOccurrence = parsed.GetNextOccurrence(localTime); - - var utcDateTime = TimeZoneInfo.ConvertTimeToUtc(nextOccurrence, TimeZoneInfo); - - return utcDateTime; + + try + { + var utcDateTime = TimeZoneInfo.ConvertTimeToUtc(nextOccurrence, TimeZoneInfo); + return utcDateTime; + } + catch (ArgumentException) + { + // DST gap: the local time produced by NCrontab doesn't exist + // (e.g., spring-forward skips 2:00→3:00). Advance past the gap + // by finding the next valid UTC instant after the invalid time, + // then ask NCrontab for the next occurrence from that point. + // Loop handles the (unlikely) case of consecutive gaps. + var candidate = nextOccurrence; + for (var i = 0; i < 24; i++) + { + candidate = candidate.AddHours(1); + try + { + var utc = TimeZoneInfo.ConvertTimeToUtc(candidate, TimeZoneInfo); + // candidate is valid — get the real next occurrence from here + var local = TimeZoneInfo.ConvertTimeFromUtc(utc, TimeZoneInfo); + var retryOccurrence = parsed.GetNextOccurrence(local); + return TimeZoneInfo.ConvertTimeToUtc(retryOccurrence, TimeZoneInfo); + } + catch (ArgumentException) + { + // Still in a gap — keep advancing + } + } + + // Exhausted retries — should never happen in practice + return null; + } } public static bool Invalidate(string expression) => diff --git a/src/TickerQ.Utilities/Managers/InternalTickerManager.cs b/src/TickerQ.Utilities/Managers/InternalTickerManager.cs index b0c43aa6..083def11 100644 --- a/src/TickerQ.Utilities/Managers/InternalTickerManager.cs +++ b/src/TickerQ.Utilities/Managers/InternalTickerManager.cs @@ -155,14 +155,14 @@ private async Task QueueNextTimeTickersAsync(TimeTick Retries = gch.Retries, RetryIntervals = gch.RetryIntervals, ParentId = gch.ParentId, - RunCondition = ch.RunCondition ?? RunCondition.OnAnyCompletedStatus + RunCondition = gch.RunCondition ?? RunCondition.OnAnyCompletedStatus }).ToList() }).ToList() }); - + await _notificationHubSender.UpdateTimeTickerNotifyAsync(updatedTimeTicker); } - + return results.ToArray(); } @@ -428,11 +428,11 @@ public async Task RunTimedOutTickers(CancellationToke Retries = gch.Retries, RetryIntervals = gch.RetryIntervals, ParentId = gch.ParentId, - RunCondition = ch.RunCondition ?? RunCondition.OnAnyCompletedStatus + RunCondition = gch.RunCondition ?? RunCondition.OnAnyCompletedStatus }).ToList() }).ToList() }); - + await _notificationHubSender.UpdateTimeTickerNotifyAsync(timedOutTimeTicker).ConfigureAwait(false); } diff --git a/src/TickerQ.Utilities/Models/InternalFunctionContext.cs b/src/TickerQ.Utilities/Models/InternalFunctionContext.cs index 49265af1..05d63a01 100644 --- a/src/TickerQ.Utilities/Models/InternalFunctionContext.cs +++ b/src/TickerQ.Utilities/Models/InternalFunctionContext.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Linq.Expressions; @@ -10,6 +11,9 @@ namespace TickerQ.Utilities.Models { public class InternalFunctionContext { + // Compiled setter cache to avoid reflection on every SetProperty call + private static readonly ConcurrentDictionary Setter, string Name)> SetterCache = new(); + public HashSet ParametersToUpdate { get; set; } = []; // Cached function delegate and priority for performance optimization // Eliminates dictionary lookups during execution @@ -36,15 +40,24 @@ public class InternalFunctionContext public InternalFunctionContext SetProperty(Expression> property, T value) { ParametersToUpdate ??= []; - - if (property.Body is MemberExpression { Member: PropertyInfo prop }) - { - prop.SetValue(this, value); - ParametersToUpdate.Add(prop.Name); - } - else + + if (property.Body is not MemberExpression { Member: PropertyInfo prop }) throw new ArgumentException("Expression must point to a property", nameof(property)); + var cached = SetterCache.GetOrAdd(prop.Name, _ => + { + var instance = Expression.Parameter(typeof(InternalFunctionContext), "obj"); + var val = Expression.Parameter(typeof(object), "val"); + var assign = Expression.Assign( + Expression.Property(instance, prop), + Expression.Convert(val, prop.PropertyType)); + var lambda = Expression.Lambda>(assign, instance, val); + return (lambda.Compile(), prop.Name); + }); + + cached.Setter(this, value); + ParametersToUpdate.Add(cached.Name); + return this; } diff --git a/src/TickerQ.Utilities/TickerCancellationTokenManager.cs b/src/TickerQ.Utilities/TickerCancellationTokenManager.cs index 2a2edf2f..28cf5335 100644 --- a/src/TickerQ.Utilities/TickerCancellationTokenManager.cs +++ b/src/TickerQ.Utilities/TickerCancellationTokenManager.cs @@ -53,22 +53,10 @@ internal static bool RemoveTickerCancellationToken(Guid tickerId) // Remove from parent index if it exists if (details.ParentId != Guid.Empty) { - if (ParentIdIndex.TryGetValue(details.ParentId, out var set)) - { - set.Remove(tickerId); - // Clean up empty sets - if (set.IsEmpty) - { - if (ParentIdIndex.TryRemove(details.ParentId, out var removedSet)) - { - // Dispose the ConcurrentHashSet to free ReaderWriterLockSlim - removedSet?.Dispose(); - } - } - } + RemoveFromParentIndex(details.ParentId, tickerId); } } - + return removed; } @@ -107,41 +95,61 @@ internal static void CleanUpTickerCancellationTokens() public static bool RequestTickerCancellationById(Guid tickerId) { - var existTickerCancellationToken = TickerCancellationTokens.TryRemove(tickerId, out var tickerCancellationToken); - - if(existTickerCancellationToken) + // Cancel while the entry is still tracked so IsParentRunning remains accurate + if (!TickerCancellationTokens.TryGetValue(tickerId, out var details)) + return false; + + // Signal cancellation while the entry is still tracked + try + { + details.CancellationSource?.Cancel(); + } + catch (ObjectDisposedException) + { + // Already disposed by another thread - safe to ignore + } + + // Now remove and dispose + if (TickerCancellationTokens.TryRemove(tickerId, out var removed)) { try { - tickerCancellationToken.CancellationSource.Cancel(); + removed.CancellationSource?.Dispose(); } - finally + catch { - // CRITICAL: Must dispose CancellationTokenSource to prevent memory leak - tickerCancellationToken.CancellationSource?.Dispose(); + // Ignore disposal errors } - - // Remove from parent index if it exists - if (tickerCancellationToken.ParentId != Guid.Empty) + + if (removed.ParentId != Guid.Empty) { - if (ParentIdIndex.TryGetValue(tickerCancellationToken.ParentId, out var set)) - { - set.Remove(tickerId); - if (set.IsEmpty) - { - if (ParentIdIndex.TryRemove(tickerCancellationToken.ParentId, out var removedSet)) - { - // Dispose the ConcurrentHashSet to free ReaderWriterLockSlim - removedSet?.Dispose(); - } - } - } + RemoveFromParentIndex(removed.ParentId, tickerId); } } - - return existTickerCancellationToken; + + return true; } + /// + /// Atomically removes a ticker from the parent index, cleaning up the set if empty. + /// Uses TryRemove with value comparison to avoid TOCTOU races. + /// + private static void RemoveFromParentIndex(Guid parentId, Guid tickerId) + { + if (!ParentIdIndex.TryGetValue(parentId, out var set)) + return; + + set.Remove(tickerId); + + // Only remove the set from the dictionary if it's still empty. + // Use the ICollection remove overload for atomic check-and-remove. + if (set.IsEmpty) + { + ((ICollection>>)ParentIdIndex) + .Remove(new KeyValuePair>(parentId, set)); + } + } + /// /// Fast O(1) lookup to check if any tickers are running for a given parent ID. /// This replaces the expensive LINQ Any() operation with a direct dictionary lookup. diff --git a/src/TickerQ/Src/BackgroundServices/TickerQSchedulerBackgroundService.cs b/src/TickerQ/Src/BackgroundServices/TickerQSchedulerBackgroundService.cs index ffaf02f6..cd4b8821 100644 --- a/src/TickerQ/Src/BackgroundServices/TickerQSchedulerBackgroundService.cs +++ b/src/TickerQ/Src/BackgroundServices/TickerQSchedulerBackgroundService.cs @@ -139,7 +139,7 @@ private async Task ReleaseAllResourcesAsync(Exception ex) if (ex != null && _executionContext.NotifyCoreAction != null) _executionContext.NotifyCoreAction(ex.ToString(), CoreNotifyActionType.NotifyHostExceptionMessage); - await _internalTickerManager.ReleaseAcquiredResources([], CancellationToken.None); + await _internalTickerManager.ReleaseAcquiredResources(null, CancellationToken.None); } public void RestartIfNeeded(DateTime? dateTime) @@ -179,4 +179,10 @@ public override async Task StopAsync(CancellationToken cancellationToken) Interlocked.Exchange(ref _started, 0); await base.StopAsync(cancellationToken); } + + public override void Dispose() + { + _restartThrottle.Dispose(); + base.Dispose(); + } } diff --git a/src/TickerQ/Src/RestartThrottleManager.cs b/src/TickerQ/Src/RestartThrottleManager.cs index d789e4da..11a154e1 100644 --- a/src/TickerQ/Src/RestartThrottleManager.cs +++ b/src/TickerQ/Src/RestartThrottleManager.cs @@ -3,13 +3,13 @@ namespace TickerQ; -public sealed class RestartThrottleManager +public sealed class RestartThrottleManager : IDisposable { private readonly Action _onRestartTriggered; private readonly object _lock = new(); private Timer _debounceTimer; private volatile bool _restartPending; - + private readonly TimeSpan _debounceWindow = TimeSpan.FromMilliseconds(50); public RestartThrottleManager(Action onRestartTriggered) @@ -22,11 +22,11 @@ public void RequestRestart() lock (_lock) { _restartPending = true; - + // Create timer only when first needed if (_debounceTimer == null) { - _debounceTimer = new Timer(OnTimerCallback, null, + _debounceTimer = new Timer(OnTimerCallback, null, _debounceWindow, Timeout.InfiniteTimeSpan); } else @@ -36,7 +36,7 @@ public void RequestRestart() } } } - + private void OnTimerCallback(object state) { lock (_lock) @@ -49,4 +49,13 @@ private void OnTimerCallback(object state) } } } + + public void Dispose() + { + lock (_lock) + { + _debounceTimer?.Dispose(); + _debounceTimer = null; + } + } } \ No newline at end of file diff --git a/src/TickerQ/Src/SafeCancellationTokenSource.cs b/src/TickerQ/Src/SafeCancellationTokenSource.cs index 58eaf386..72e5e31c 100644 --- a/src/TickerQ/Src/SafeCancellationTokenSource.cs +++ b/src/TickerQ/Src/SafeCancellationTokenSource.cs @@ -6,6 +6,7 @@ namespace TickerQ public sealed class SafeCancellationTokenSource : IDisposable { private readonly CancellationTokenSource _innerCts; + private int _disposed; private SafeCancellationTokenSource(CancellationTokenSource cts) { @@ -35,24 +36,74 @@ public static SafeCancellationTokenSource CreateLinked(params CancellationToken[ public bool IsCancellationRequested => _innerCts.IsCancellationRequested; - public bool IsDisposed { get; private set; } + public bool IsDisposed => Volatile.Read(ref _disposed) != 0; - public void Cancel(){ - if(!IsDisposed) + public void Cancel() + { + if (IsDisposed) + return; + + try + { _innerCts.Cancel(); + } + catch (ObjectDisposedException) + { + // Race between Cancel and Dispose — safe to ignore + } + } + + public void Cancel(bool throwOnFirstException) + { + if (IsDisposed) + return; + + try + { + _innerCts.Cancel(throwOnFirstException); + } + catch (ObjectDisposedException) + { + // Race between Cancel and Dispose — safe to ignore + } } - public void Cancel(bool throwOnFirstException) => _innerCts.Cancel(throwOnFirstException); + public void CancelAfter(TimeSpan delay) + { + if (IsDisposed) + return; + + try + { + _innerCts.CancelAfter(delay); + } + catch (ObjectDisposedException) + { + // Race between CancelAfter and Dispose — safe to ignore + } + } - public void CancelAfter(TimeSpan delay) => _innerCts.CancelAfter(delay); + public void CancelAfter(int millisecondsDelay) + { + if (IsDisposed) + return; - public void CancelAfter(int millisecondsDelay) => _innerCts.CancelAfter(millisecondsDelay); + try + { + _innerCts.CancelAfter(millisecondsDelay); + } + catch (ObjectDisposedException) + { + // Race between CancelAfter and Dispose — safe to ignore + } + } public void Dispose() { - if (IsDisposed) return; - IsDisposed = true; + if (Interlocked.Exchange(ref _disposed, 1) != 0) + return; + _innerCts.Dispose(); } } -} \ No newline at end of file +} diff --git a/src/TickerQ/Src/TickerExecutionTaskHandler.cs b/src/TickerQ/Src/TickerExecutionTaskHandler.cs index 534e8713..0a0c670e 100644 --- a/src/TickerQ/Src/TickerExecutionTaskHandler.cs +++ b/src/TickerQ/Src/TickerExecutionTaskHandler.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -18,6 +19,10 @@ namespace TickerQ; internal class TickerExecutionTaskHandler : ITickerExecutionTaskHandler { + // Cached lowercase TickerType names to avoid per-call ToString + ToLowerInvariant allocations + private static readonly Dictionary TickerTypeNamesLower = + Enum.GetValues().ToDictionary(t => t, t => t.ToString().ToLowerInvariant()); + private readonly IServiceProvider _serviceProvider; private readonly ITickerClock _clock; private readonly ITickerQInstrumentation _tickerQInstrumentation; @@ -39,8 +44,9 @@ public async Task ExecuteTaskAsync(InternalFunctionContext context, bool isDue, return; } - var childrenToRunAfter = new InternalFunctionContext[5]; - var tasksToRunNow = new Task[6]; + var childCount = context.TimeTickerChildren.Count; + var childrenToRunAfter = new InternalFunctionContext[childCount]; + var tasksToRunNow = new Task[childCount + 1]; var childrenToRunAfterCount = 0; var tasksToRunNowCount = 0; @@ -119,15 +125,14 @@ await _internalTickerManager.UpdateSkipTimeTickersWithUnifiedContextAsync( private async Task RunContextFunctionAsync(InternalFunctionContext context, bool isDue, CancellationToken cancellationToken, bool isChild = false) { - // Start OpenTelemetry activity for the entire job execution - using var jobActivity = _tickerQInstrumentation.StartJobActivity($"tickerq.job.execute.{context.Type.ToString().ToLowerInvariant()}", context); - - // Add additional tags to the activity + var typeName = TickerTypeNamesLower.GetValueOrDefault(context.Type, context.Type.ToString().ToLowerInvariant()); + + using var jobActivity = _tickerQInstrumentation.StartJobActivity($"tickerq.job.execute.{typeName}", context); + jobActivity?.SetTag("tickerq.job.is_due", isDue); jobActivity?.SetTag("tickerq.job.is_child", isChild); - - // Log job enqueued/started (using the available method) - _tickerQInstrumentation.LogJobEnqueued(context.Type.ToString(), context.FunctionName, context.TickerId, "ExecutionTaskHandler"); + + _tickerQInstrumentation.LogJobEnqueued(typeName, context.FunctionName, context.TickerId, "ExecutionTaskHandler"); context.SetProperty(x => x.Status, TickerStatus.InProgress); @@ -181,7 +186,7 @@ private async Task RunContextFunctionAsync(InternalFunctionContext context, bool { if (await WaitForRetry(context, cancellationToken, attempt, cancellationTokenSource)) break; - stopWatch.Start(); + stopWatch.Restart(); // Create service scope - will be disposed automatically via await using await using var scope = _serviceProvider.CreateAsyncScope(); @@ -212,7 +217,6 @@ private async Task RunContextFunctionAsync(InternalFunctionContext context, bool await _internalTickerManager.UpdateTickerAsync(context, cancellationToken); // Clean up and exit early on cancellation - cancellationTokenSource?.Dispose(); TickerCancellationTokenManager.RemoveTickerCancellationToken(context.TickerId); return; } @@ -232,17 +236,16 @@ private async Task RunContextFunctionAsync(InternalFunctionContext context, bool context.SetProperty(x => x.ExceptionDetails, ex.Message); jobActivity?.SetTag("tickerq.job.skip_reason", ex.Message); } - + // Add skip tags to activity jobActivity?.SetTag("tickerq.job.final_status", context.Status.ToString()); - + // Log job skipped _tickerQInstrumentation.LogJobSkipped(context.TickerId, context.FunctionName, ex.Message); await _internalTickerManager.UpdateTickerAsync(context, cancellationToken); - + // Clean up and exit early on termination - cancellationTokenSource?.Dispose(); TickerCancellationTokenManager.RemoveTickerCancellationToken(context.TickerId); return; } @@ -293,8 +296,7 @@ private async Task RunContextFunctionAsync(InternalFunctionContext context, bool } - // IMPORTANT: Always dispose CancellationTokenSource to prevent memory leaks - cancellationTokenSource?.Dispose(); + // Clean up: RemoveTickerCancellationToken handles disposal of the CTS TickerCancellationTokenManager.RemoveTickerCancellationToken(context.TickerId); } diff --git a/src/TickerQ/Src/TickerQThreadPool/TickerQTaskScheduler.cs b/src/TickerQ/Src/TickerQThreadPool/TickerQTaskScheduler.cs index f4f7b452..7bbc4c2f 100644 --- a/src/TickerQ/Src/TickerQThreadPool/TickerQTaskScheduler.cs +++ b/src/TickerQ/Src/TickerQThreadPool/TickerQTaskScheduler.cs @@ -325,7 +325,7 @@ private async Task ExecuteWorkAsync(WorkItem workItem) { // Decrement immediately after dequeue to keep counter in sync Interlocked.Decrement(ref _totalQueuedTasks); - + try { // Check cancellation before executing @@ -334,7 +334,7 @@ private async Task ExecuteWorkAsync(WorkItem workItem) // Start the work without awaiting it so this worker // can continue processing other items while the task awaits. var task = workItem.Work(workItem.UserToken); - + if (task == null) return; @@ -509,18 +509,27 @@ public async ValueTask DisposeAsync() { if (_disposed) return; - + _disposed = true; _isFrozen = true; // Prevent new tasks _shutdownCts.Cancel(); - + // Wait for workers to exit gracefully var timeout = DateTime.UtcNow.AddSeconds(5); while (_activeWorkers > 0 && DateTime.UtcNow < timeout) { await Task.Delay(100); } - + + // Drain remaining queued items to keep the counter accurate + for (int i = 0; i < _maxConcurrency; i++) + { + while (_workerQueues[i].TryDequeue(out _)) + { + Interlocked.Decrement(ref _totalQueuedTasks); + } + } + _notifyDebounce?.Dispose(); _shutdownCts?.Dispose(); } From 5fcb035813d3232b5f7d06c714fc8cc3746fc29d Mon Sep 17 00:00:00 2001 From: "wojciech.wentland" Date: Mon, 9 Feb 2026 10:53:44 +0100 Subject: [PATCH 2/3] Add tests for SafeCancellationTokenSource, CronScheduleCache DST handling, cancellation manager, and task scheduler Co-Authored-By: Claude Opus 4.6 --- .../CronScheduleCacheDstTests.cs | 57 ++++++++ .../SafeCancellationTokenSourceTests.cs | 95 ++++++++++++ .../TickerCancellationTokenManagerTests.cs | 135 ++++++++++++++++++ .../TickerQTaskSchedulerTests.cs | 84 +++++++++++ 4 files changed, 371 insertions(+) create mode 100644 tests/TickerQ.Tests/CronScheduleCacheDstTests.cs create mode 100644 tests/TickerQ.Tests/SafeCancellationTokenSourceTests.cs create mode 100644 tests/TickerQ.Tests/TickerCancellationTokenManagerTests.cs create mode 100644 tests/TickerQ.Tests/TickerQTaskSchedulerTests.cs diff --git a/tests/TickerQ.Tests/CronScheduleCacheDstTests.cs b/tests/TickerQ.Tests/CronScheduleCacheDstTests.cs new file mode 100644 index 00000000..6cdb357a --- /dev/null +++ b/tests/TickerQ.Tests/CronScheduleCacheDstTests.cs @@ -0,0 +1,57 @@ +using System; +using FluentAssertions; +using TickerQ.Utilities; +using Xunit; + +namespace TickerQ.Tests; + +public class CronScheduleCacheDstTests +{ + [Fact] + public void GetNextOccurrenceOrDefault_Handles_DST_Gap_Without_Throwing() + { + // Use US Eastern which has a spring-forward gap (2:00 AM → 3:00 AM) + var eastern = TimeZoneInfo.FindSystemTimeZoneById("America/New_York"); + CronScheduleCache.TimeZoneInfo = eastern; + + // Cron: every day at 2:30 AM — this time doesn't exist on spring-forward day + var expression = "0 30 2 * * *"; + + // March 9, 2025 is spring-forward in US Eastern (2:00 AM → 3:00 AM) + var beforeGap = new DateTime(2025, 3, 9, 6, 0, 0, DateTimeKind.Utc); // 1:00 AM ET + + var result = CronScheduleCache.GetNextOccurrenceOrDefault(expression, beforeGap); + + // Should not crash, and should return a valid UTC time after the gap + result.Should().NotBeNull(); + result!.Value.Kind.Should().Be(DateTimeKind.Utc); + + // The result should be after the gap (which ends at 3:00 AM ET = 7:00 AM UTC) + result.Value.Should().BeOnOrAfter(new DateTime(2025, 3, 9, 7, 0, 0, DateTimeKind.Utc)); + } + + [Fact] + public void GetNextOccurrenceOrDefault_Returns_Correct_Time_Outside_DST_Gap() + { + CronScheduleCache.TimeZoneInfo = TimeZoneInfo.Utc; + + // Every minute + var expression = "0 * * * * *"; + var now = new DateTime(2025, 6, 15, 12, 30, 0, DateTimeKind.Utc); + + var result = CronScheduleCache.GetNextOccurrenceOrDefault(expression, now); + + result.Should().NotBeNull(); + result!.Value.Should().Be(new DateTime(2025, 6, 15, 12, 31, 0, DateTimeKind.Utc)); + } + + [Fact] + public void GetNextOccurrenceOrDefault_Returns_Null_For_Unparseable_Expression() + { + CronScheduleCache.TimeZoneInfo = TimeZoneInfo.Utc; + + var result = CronScheduleCache.GetNextOccurrenceOrDefault("not a cron", DateTime.UtcNow); + + result.Should().BeNull(); + } +} diff --git a/tests/TickerQ.Tests/SafeCancellationTokenSourceTests.cs b/tests/TickerQ.Tests/SafeCancellationTokenSourceTests.cs new file mode 100644 index 00000000..08556ef1 --- /dev/null +++ b/tests/TickerQ.Tests/SafeCancellationTokenSourceTests.cs @@ -0,0 +1,95 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Xunit; + +namespace TickerQ.Tests; + +public class SafeCancellationTokenSourceTests +{ + [Fact] + public void Dispose_Is_Idempotent() + { + var cts = new SafeCancellationTokenSource(); + + cts.Dispose(); + cts.Dispose(); // second dispose should not throw + + cts.IsDisposed.Should().BeTrue(); + } + + [Fact] + public void Cancel_After_Dispose_Does_Not_Throw() + { + var cts = new SafeCancellationTokenSource(); + cts.Dispose(); + + var act = () => cts.Cancel(); + + act.Should().NotThrow(); + } + + [Fact] + public void CancelAfter_TimeSpan_After_Dispose_Does_Not_Throw() + { + var cts = new SafeCancellationTokenSource(); + cts.Dispose(); + + var act = () => cts.CancelAfter(TimeSpan.FromSeconds(1)); + + act.Should().NotThrow(); + } + + [Fact] + public void CancelAfter_Int_After_Dispose_Does_Not_Throw() + { + var cts = new SafeCancellationTokenSource(); + cts.Dispose(); + + var act = () => cts.CancelAfter(1000); + + act.Should().NotThrow(); + } + + [Fact] + public void Cancel_Sets_Token_To_Cancelled() + { + var cts = new SafeCancellationTokenSource(); + var token = cts.Token; + + cts.Cancel(); + + cts.IsCancellationRequested.Should().BeTrue(); + token.IsCancellationRequested.Should().BeTrue(); + } + + [Fact] + public void CreateLinked_Cancels_When_Parent_Cancels() + { + using var parent = new CancellationTokenSource(); + var linked = SafeCancellationTokenSource.CreateLinked(parent.Token); + + parent.Cancel(); + + linked.IsCancellationRequested.Should().BeTrue(); + linked.Dispose(); + } + + [Fact] + public async Task Concurrent_Cancel_And_Dispose_Does_Not_Throw() + { + // Run many iterations to exercise the race window + for (int i = 0; i < 1000; i++) + { + var cts = new SafeCancellationTokenSource(); + + var t1 = Task.Run(() => cts.Cancel()); + var t2 = Task.Run(() => cts.Dispose()); + + await Task.WhenAll(t1, t2); + + cts.IsDisposed.Should().BeTrue(); + } + } +} diff --git a/tests/TickerQ.Tests/TickerCancellationTokenManagerTests.cs b/tests/TickerQ.Tests/TickerCancellationTokenManagerTests.cs new file mode 100644 index 00000000..24f758df --- /dev/null +++ b/tests/TickerQ.Tests/TickerCancellationTokenManagerTests.cs @@ -0,0 +1,135 @@ +using System; +using System.Threading; +using FluentAssertions; +using TickerQ.Utilities; +using TickerQ.Utilities.Enums; +using TickerQ.Utilities.Models; +using Xunit; + +namespace TickerQ.Tests; + +public class TickerCancellationTokenManagerTests : IDisposable +{ + public void Dispose() + { + TickerCancellationTokenManager.CleanUpTickerCancellationTokens(); + } + + [Fact] + public void RequestCancellationById_Cancels_The_Token() + { + var cts = new CancellationTokenSource(); + var tickerId = Guid.NewGuid(); + var context = MakeContext(tickerId); + + TickerCancellationTokenManager.AddTickerCancellationToken(cts, context, isDue: false); + + var token = cts.Token; + token.IsCancellationRequested.Should().BeFalse(); + + var result = TickerCancellationTokenManager.RequestTickerCancellationById(tickerId); + + result.Should().BeTrue(); + token.IsCancellationRequested.Should().BeTrue(); + } + + [Fact] + public void RequestCancellationById_Returns_False_For_Unknown_Id() + { + var result = TickerCancellationTokenManager.RequestTickerCancellationById(Guid.NewGuid()); + + result.Should().BeFalse(); + } + + [Fact] + public void IsParentRunning_Returns_True_When_Ticker_Registered() + { + var cts = new CancellationTokenSource(); + var parentId = Guid.NewGuid(); + var tickerId = Guid.NewGuid(); + var context = MakeContext(tickerId, parentId); + + TickerCancellationTokenManager.AddTickerCancellationToken(cts, context, isDue: false); + + TickerCancellationTokenManager.IsParentRunning(parentId).Should().BeTrue(); + } + + [Fact] + public void IsParentRunning_Returns_False_After_Removal() + { + var cts = new CancellationTokenSource(); + var parentId = Guid.NewGuid(); + var tickerId = Guid.NewGuid(); + var context = MakeContext(tickerId, parentId); + + TickerCancellationTokenManager.AddTickerCancellationToken(cts, context, isDue: false); + TickerCancellationTokenManager.RemoveTickerCancellationToken(tickerId); + + TickerCancellationTokenManager.IsParentRunning(parentId).Should().BeFalse(); + } + + [Fact] + public void IsParentRunningExcludingSelf_Returns_False_When_Only_Self() + { + var cts = new CancellationTokenSource(); + var parentId = Guid.NewGuid(); + var tickerId = Guid.NewGuid(); + var context = MakeContext(tickerId, parentId); + + TickerCancellationTokenManager.AddTickerCancellationToken(cts, context, isDue: false); + + TickerCancellationTokenManager.IsParentRunningExcludingSelf(parentId, tickerId) + .Should().BeFalse(); + } + + [Fact] + public void IsParentRunningExcludingSelf_Returns_True_When_Sibling_Exists() + { + var parentId = Guid.NewGuid(); + var ticker1 = Guid.NewGuid(); + var ticker2 = Guid.NewGuid(); + + TickerCancellationTokenManager.AddTickerCancellationToken( + new CancellationTokenSource(), MakeContext(ticker1, parentId), isDue: false); + TickerCancellationTokenManager.AddTickerCancellationToken( + new CancellationTokenSource(), MakeContext(ticker2, parentId), isDue: false); + + TickerCancellationTokenManager.IsParentRunningExcludingSelf(parentId, ticker1) + .Should().BeTrue(); + } + + [Fact] + public void CancelById_Keeps_Entry_Tracked_During_Cancellation() + { + // Verifies cancel-before-remove: IsParentRunning should be true + // at the moment Cancel fires on the CTS + var parentId = Guid.NewGuid(); + var tickerId = Guid.NewGuid(); + var cts = new CancellationTokenSource(); + var context = MakeContext(tickerId, parentId); + + bool wasTrackedDuringCancel = false; + cts.Token.Register(() => + { + wasTrackedDuringCancel = TickerCancellationTokenManager.IsParentRunning(parentId); + }); + + TickerCancellationTokenManager.AddTickerCancellationToken(cts, context, isDue: false); + + TickerCancellationTokenManager.RequestTickerCancellationById(tickerId); + + wasTrackedDuringCancel.Should().BeTrue( + "the entry should still be in the dictionary when Cancel fires"); + } + + private static InternalFunctionContext MakeContext(Guid tickerId, Guid? parentId = null) + { + return new InternalFunctionContext + { + TickerId = tickerId, + ParentId = parentId, + FunctionName = "Test", + Type = TickerType.CronTickerOccurrence + }; + } +} diff --git a/tests/TickerQ.Tests/TickerQTaskSchedulerTests.cs b/tests/TickerQ.Tests/TickerQTaskSchedulerTests.cs new file mode 100644 index 00000000..fae24e98 --- /dev/null +++ b/tests/TickerQ.Tests/TickerQTaskSchedulerTests.cs @@ -0,0 +1,84 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using TickerQ.TickerQThreadPool; +using TickerQ.Utilities.Enums; +using Xunit; + +namespace TickerQ.Tests; + +public class TickerQTaskSchedulerTests +{ + [Fact] + public async Task DisposeAsync_Drains_Queued_Tasks_And_Counter_Reaches_Zero() + { + var scheduler = new TickerQTaskScheduler(2); + + // Queue work that will never complete (blocks on a semaphore) + var blocker = new SemaphoreSlim(0); + await scheduler.QueueAsync(async ct => await blocker.WaitAsync(ct), TickerTaskPriority.Normal); + await scheduler.QueueAsync(async ct => await blocker.WaitAsync(ct), TickerTaskPriority.Normal); + + // Queue more items that pile up behind the blockers + for (int i = 0; i < 5; i++) + { + await scheduler.QueueAsync(_ => Task.CompletedTask, TickerTaskPriority.Normal); + } + + await scheduler.DisposeAsync(); + + scheduler.TotalQueuedTasks.Should().BeLessThanOrEqualTo(0); + scheduler.IsDisposed.Should().BeTrue(); + } + + [Fact] + public async Task QueueAsync_Throws_After_Dispose() + { + var scheduler = new TickerQTaskScheduler(1); + await scheduler.DisposeAsync(); + + Func act = () => scheduler.QueueAsync(_ => Task.CompletedTask, TickerTaskPriority.Normal).AsTask(); + + await act.Should().ThrowAsync(); + } + + [Fact] + public async Task QueueAsync_Throws_When_Frozen() + { + var scheduler = new TickerQTaskScheduler(1); + scheduler.Freeze(); + + Func act = () => scheduler.QueueAsync(_ => Task.CompletedTask, TickerTaskPriority.Normal).AsTask(); + + await act.Should().ThrowAsync(); + + await scheduler.DisposeAsync(); + } + + [Fact] + public async Task Freeze_And_Resume_Work() + { + var scheduler = new TickerQTaskScheduler(1); + + scheduler.Freeze(); + scheduler.IsFrozen.Should().BeTrue(); + + scheduler.Resume(); + scheduler.IsFrozen.Should().BeFalse(); + + // Should be able to queue after resume + var executed = false; + await scheduler.QueueAsync(_ => + { + executed = true; + return Task.CompletedTask; + }, TickerTaskPriority.Normal); + + // Give worker time to pick it up + await Task.Delay(200); + + executed.Should().BeTrue(); + await scheduler.DisposeAsync(); + } +} From e1099ad3ed13f1f8baf8445e6c84dad66782abcf Mon Sep 17 00:00:00 2001 From: "wojciech.wentland" Date: Mon, 9 Feb 2026 10:54:42 +0100 Subject: [PATCH 3/3] =?UTF-8?q?Fix=20BindingFlags=20in=20SetProperty=5FRei?= =?UTF-8?q?nitializes=20test=20=E2=80=94=20ParametersToUpdate=20is=20publi?= =?UTF-8?q?c?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 --- tests/TickerQ.Tests/InternalFunctionContextTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/TickerQ.Tests/InternalFunctionContextTests.cs b/tests/TickerQ.Tests/InternalFunctionContextTests.cs index 74256bf7..257d9559 100644 --- a/tests/TickerQ.Tests/InternalFunctionContextTests.cs +++ b/tests/TickerQ.Tests/InternalFunctionContextTests.cs @@ -70,7 +70,7 @@ public void SetProperty_Reinitializes_Tracking_Set_When_Null() // Simulate a null ParametersToUpdate set to verify the null-coalescing assignment. var parametersProperty = typeof(InternalFunctionContext) - .GetProperty("ParametersToUpdate", BindingFlags.Instance | BindingFlags.NonPublic); + .GetProperty("ParametersToUpdate", BindingFlags.Instance | BindingFlags.Public); parametersProperty.Should().NotBeNull(); parametersProperty!.SetValue(context, null);