Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,52 +115,71 @@ private static TimeTickerEntity MapForQueue(TTimeTicker ticker)
};
}

private async Task AddTimeTickerIndexesAsync(TTimeTicker ticker)
// Batched index updates — single round-trip per call via IBatch
private Task AddTimeTickerIndexesAsync(TTimeTicker ticker)
{
await _db.SetAddAsync(TimeTickerIdsKey, ticker.Id.ToString()).ConfigureAwait(false);
var batch = _db.CreateBatch();
var idStr = (RedisValue)ticker.Id.ToString();

var t1 = batch.SetAddAsync(TimeTickerIdsKey, idStr);
Task t2;
if (ticker.ExecutionTime.HasValue && CanAcquire(ticker.Status, ticker.LockHolder, _lockHolder))
{
await _db.SortedSetAddAsync(TimeTickerPendingKey, ticker.Id.ToString(), ToScore(ticker.ExecutionTime.Value)).ConfigureAwait(false);
}
t2 = batch.SortedSetAddAsync(TimeTickerPendingKey, idStr, ToScore(ticker.ExecutionTime.Value));
else
{
await _db.SortedSetRemoveAsync(TimeTickerPendingKey, ticker.Id.ToString()).ConfigureAwait(false);
}
t2 = batch.SortedSetRemoveAsync(TimeTickerPendingKey, idStr);

batch.Execute();
return Task.WhenAll(t1, t2);
}

private async Task RemoveTimeTickerIndexesAsync(Guid id)
private Task RemoveTimeTickerIndexesAsync(Guid id)
{
await _db.SetRemoveAsync(TimeTickerIdsKey, id.ToString()).ConfigureAwait(false);
await _db.SortedSetRemoveAsync(TimeTickerPendingKey, id.ToString()).ConfigureAwait(false);
var batch = _db.CreateBatch();
var idStr = (RedisValue)id.ToString();

var t1 = batch.SetRemoveAsync(TimeTickerIdsKey, idStr);
var t2 = batch.SortedSetRemoveAsync(TimeTickerPendingKey, idStr);

batch.Execute();
return Task.WhenAll(t1, t2);
}

private async Task AddCronIndexesAsync(TCronTicker ticker)
private Task AddCronIndexesAsync(TCronTicker ticker)
{
await _db.SetAddAsync(CronIdsKey, ticker.Id.ToString()).ConfigureAwait(false);
return _db.SetAddAsync(CronIdsKey, ticker.Id.ToString());
}

private async Task RemoveCronIndexesAsync(Guid id)
private Task RemoveCronIndexesAsync(Guid id)
{
await _db.SetRemoveAsync(CronIdsKey, id.ToString()).ConfigureAwait(false);
return _db.SetRemoveAsync(CronIdsKey, id.ToString());
}

private async Task AddCronOccurrenceIndexesAsync(CronTickerOccurrenceEntity<TCronTicker> occurrence)
private Task AddCronOccurrenceIndexesAsync(CronTickerOccurrenceEntity<TCronTicker> occurrence)
{
await _db.SetAddAsync(CronOccurrenceIdsKey, occurrence.Id.ToString()).ConfigureAwait(false);
var batch = _db.CreateBatch();
var idStr = (RedisValue)occurrence.Id.ToString();

var t1 = batch.SetAddAsync(CronOccurrenceIdsKey, idStr);
Task t2;
if (CanAcquire(occurrence.Status, occurrence.LockHolder, _lockHolder))
{
await _db.SortedSetAddAsync(CronOccurrencePendingKey, occurrence.Id.ToString(), ToScore(occurrence.ExecutionTime)).ConfigureAwait(false);
}
t2 = batch.SortedSetAddAsync(CronOccurrencePendingKey, idStr, ToScore(occurrence.ExecutionTime));
else
{
await _db.SortedSetRemoveAsync(CronOccurrencePendingKey, occurrence.Id.ToString()).ConfigureAwait(false);
}
t2 = batch.SortedSetRemoveAsync(CronOccurrencePendingKey, idStr);

batch.Execute();
return Task.WhenAll(t1, t2);
}

private async Task RemoveCronOccurrenceIndexesAsync(Guid id)
private Task RemoveCronOccurrenceIndexesAsync(Guid id)
{
await _db.SetRemoveAsync(CronOccurrenceIdsKey, id.ToString()).ConfigureAwait(false);
await _db.SortedSetRemoveAsync(CronOccurrencePendingKey, id.ToString()).ConfigureAwait(false);
var batch = _db.CreateBatch();
var idStr = (RedisValue)id.ToString();

var t1 = batch.SetRemoveAsync(CronOccurrenceIdsKey, idStr);
var t2 = batch.SortedSetRemoveAsync(CronOccurrencePendingKey, idStr);

batch.Execute();
return Task.WhenAll(t1, t2);
}
#endregion

Expand Down Expand Up @@ -426,15 +445,10 @@ public async Task ReleaseDeadNodeTimeTickerResources(string instanceIdentifier,
var ticker = await GetAsync<TTimeTicker>(TimeTickerKey(id)).ConfigureAwait(false);
if (ticker == null) continue;

if (ticker.LockHolder == instanceIdentifier && ticker.Status == TickerStatus.InProgress)
{
ticker.Status = TickerStatus.Skipped;
ticker.SkippedReason = "Node is not alive!";
ticker.ExecutedAt = now;
ticker.UpdatedAt = now;
}
else if (ticker.LockHolder == instanceIdentifier && (ticker.Status == TickerStatus.Idle || ticker.Status == TickerStatus.Queued))
if (ticker.LockHolder == instanceIdentifier &&
ticker.Status is TickerStatus.InProgress or TickerStatus.Idle or TickerStatus.Queued)
{
// Reset to Idle so another node can pick up the work
ticker.LockHolder = null;
ticker.LockedAt = null;
ticker.Status = TickerStatus.Idle;
Expand Down Expand Up @@ -623,15 +637,10 @@ public async Task ReleaseDeadNodeOccurrenceResources(string instanceIdentifier,
var occurrence = await GetAsync<CronTickerOccurrenceEntity<TCronTicker>>(CronOccurrenceKey(id)).ConfigureAwait(false);
if (occurrence == null) continue;

if (occurrence.LockHolder == instanceIdentifier && occurrence.Status == TickerStatus.InProgress)
{
occurrence.Status = TickerStatus.Skipped;
occurrence.SkippedReason = "Node is not alive!";
occurrence.ExecutedAt = now;
occurrence.UpdatedAt = now;
}
else if (occurrence.LockHolder == instanceIdentifier && (occurrence.Status == TickerStatus.Idle || occurrence.Status == TickerStatus.Queued))
if (occurrence.LockHolder == instanceIdentifier &&
occurrence.Status is TickerStatus.InProgress or TickerStatus.Idle or TickerStatus.Queued)
{
// Reset to Idle so another node can pick up the work
occurrence.LockHolder = null;
occurrence.LockedAt = null;
occurrence.Status = TickerStatus.Idle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,28 @@ public override Task StartAsync(CancellationToken ct)

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
while (!stoppingToken.IsCancellationRequested)
{
await RunTickerQFallbackAsync(stoppingToken);
}
catch (Exception e)
{
_logger.LogError("Heartbeat background service failed: {Exception}", e);
try
{
await RunTickerQFallbackAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception e)
{
_logger.LogError("Heartbeat background service failed: {Exception}. Retrying in 5 seconds...", e);
try
{
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
catch (OperationCanceledException)
{
break;
}
}
}
}

Expand Down Expand Up @@ -66,4 +81,10 @@ public override async Task StopAsync(CancellationToken cancellationToken)
Interlocked.Exchange(ref _started, 0);
await base.StopAsync(cancellationToken);
}

public override void Dispose()
{
_tickerHeartBeatPeriodicTimer.Dispose();
base.Dispose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,6 @@ public async Task<TResult[]> GetOrSetArrayAsync<TResult>(string cacheKey,
// ignored
}

return null;
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ await dbContext.Set<TTimeTicker>()
await dbContext.Set<TTimeTicker>()
.Where(x => x.LockHolder == instanceIdentifier && x.Status == TickerStatus.InProgress)
.ExecuteUpdateAsync(setter => setter
.SetProperty(x => x.Status, TickerStatus.Skipped)
.SetProperty(x => x.SkippedReason, "Node is not alive!")
.SetProperty(x => x.ExecutedAt, now)
.SetProperty(x => x.UpdatedAt, now), cancellationToken)
.SetProperty(x => x.LockHolder, _ => null)
.SetProperty(x => x.LockedAt, _ => null)
.SetProperty(x => x.Status, TickerStatus.Idle)
.SetProperty(x => x.UpdatedAt, now), cancellationToken)
.ConfigureAwait(false);
}
#endregion
Expand Down Expand Up @@ -334,9 +334,11 @@ public async Task<CronTickerEntity[]> GetAllCronTickerExpressions(CancellationTo
},
expiration: TimeSpan.FromMinutes(10),
cancellationToken: cancellationToken);


await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);;

if (result != null)
return result;

await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);
return await dbContext.Set<TCronTicker>()
.AsNoTracking()
.Select(MappingExtensions.ForCronTickerExpressions<CronTickerEntity>())
Expand Down Expand Up @@ -407,9 +409,9 @@ await dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
await dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
.Where(x => x.LockHolder == instanceIdentifier && x.Status == TickerStatus.InProgress)
.ExecuteUpdateAsync(setter => setter
.SetProperty(x => x.Status, TickerStatus.Skipped)
.SetProperty(x => x.SkippedReason, "Node is not alive!")
.SetProperty(x => x.ExecutedAt, now)
.SetProperty(x => x.LockHolder, _ => null)
.SetProperty(x => x.LockedAt, _ => null)
.SetProperty(x => x.Status, TickerStatus.Idle)
.SetProperty(x => x.UpdatedAt, now), cancellationToken)
.ConfigureAwait(false);
}
Expand Down
46 changes: 38 additions & 8 deletions src/TickerQ.Utilities/CronScheduleCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,48 @@ public static CrontabSchedule Get(string expression)

public static DateTime? GetNextOccurrenceOrDefault(string expression, DateTime dateTime)
{
var parsed = Get(Normalize(expression));
var parsed = Get(expression);

if (parsed == null)
return null;

var localTime = TimeZoneInfo.ConvertTimeFromUtc(dateTime, TimeZoneInfo);

var nextOccurrence = parsed.GetNextOccurrence(localTime);

var utcDateTime = TimeZoneInfo.ConvertTimeToUtc(nextOccurrence, TimeZoneInfo);

return utcDateTime;

try
{
var utcDateTime = TimeZoneInfo.ConvertTimeToUtc(nextOccurrence, TimeZoneInfo);
return utcDateTime;
}
catch (ArgumentException)
{
// DST gap: the local time produced by NCrontab doesn't exist
// (e.g., spring-forward skips 2:00→3:00). Advance past the gap
// by finding the next valid UTC instant after the invalid time,
// then ask NCrontab for the next occurrence from that point.
// Loop handles the (unlikely) case of consecutive gaps.
var candidate = nextOccurrence;
for (var i = 0; i < 24; i++)
{
candidate = candidate.AddHours(1);
try
{
var utc = TimeZoneInfo.ConvertTimeToUtc(candidate, TimeZoneInfo);
// candidate is valid — get the real next occurrence from here
var local = TimeZoneInfo.ConvertTimeFromUtc(utc, TimeZoneInfo);
var retryOccurrence = parsed.GetNextOccurrence(local);
return TimeZoneInfo.ConvertTimeToUtc(retryOccurrence, TimeZoneInfo);
}
catch (ArgumentException)
{
// Still in a gap — keep advancing
}
}

// Exhausted retries — should never happen in practice
return null;
}
}

public static bool Invalidate(string expression) =>
Expand Down
10 changes: 5 additions & 5 deletions src/TickerQ.Utilities/Managers/InternalTickerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,14 @@ private async Task<InternalFunctionContext[]> QueueNextTimeTickersAsync(TimeTick
Retries = gch.Retries,
RetryIntervals = gch.RetryIntervals,
ParentId = gch.ParentId,
RunCondition = ch.RunCondition ?? RunCondition.OnAnyCompletedStatus
RunCondition = gch.RunCondition ?? RunCondition.OnAnyCompletedStatus
}).ToList()
}).ToList()
});

await _notificationHubSender.UpdateTimeTickerNotifyAsync(updatedTimeTicker);
}

return results.ToArray();
}

Expand Down Expand Up @@ -428,11 +428,11 @@ public async Task<InternalFunctionContext[]> RunTimedOutTickers(CancellationToke
Retries = gch.Retries,
RetryIntervals = gch.RetryIntervals,
ParentId = gch.ParentId,
RunCondition = ch.RunCondition ?? RunCondition.OnAnyCompletedStatus
RunCondition = gch.RunCondition ?? RunCondition.OnAnyCompletedStatus
}).ToList()
}).ToList()
});

await _notificationHubSender.UpdateTimeTickerNotifyAsync(timedOutTimeTicker).ConfigureAwait(false);
}

Expand Down
27 changes: 20 additions & 7 deletions src/TickerQ.Utilities/Models/InternalFunctionContext.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq.Expressions;
Expand All @@ -10,6 +11,9 @@ namespace TickerQ.Utilities.Models
{
public class InternalFunctionContext
{
// Compiled setter cache to avoid reflection on every SetProperty call
private static readonly ConcurrentDictionary<string, (Action<InternalFunctionContext, object> Setter, string Name)> SetterCache = new();

public HashSet<string> ParametersToUpdate { get; set; } = [];
// Cached function delegate and priority for performance optimization
// Eliminates dictionary lookups during execution
Expand All @@ -36,15 +40,24 @@ public class InternalFunctionContext
public InternalFunctionContext SetProperty<T>(Expression<Func<InternalFunctionContext, T>> property, T value)
{
ParametersToUpdate ??= [];

if (property.Body is MemberExpression { Member: PropertyInfo prop })
{
prop.SetValue(this, value);
ParametersToUpdate.Add(prop.Name);
}
else

if (property.Body is not MemberExpression { Member: PropertyInfo prop })
throw new ArgumentException("Expression must point to a property", nameof(property));

var cached = SetterCache.GetOrAdd(prop.Name, _ =>
{
var instance = Expression.Parameter(typeof(InternalFunctionContext), "obj");
var val = Expression.Parameter(typeof(object), "val");
var assign = Expression.Assign(
Expression.Property(instance, prop),
Expression.Convert(val, prop.PropertyType));
var lambda = Expression.Lambda<Action<InternalFunctionContext, object>>(assign, instance, val);
return (lambda.Compile(), prop.Name);
});

cached.Setter(this, value);
ParametersToUpdate.Add(cached.Name);

return this;
}

Expand Down
Loading
Loading