Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public async Task ReleaseAcquiredCronTickerOccurrences(Guid[] occurrenceIds, Can
? dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
: dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>().Where(x => occurrenceIds.Contains(x.Id));

await baseQuery.Where(x => occurrenceIds.Contains(x.Id))
await baseQuery
.WhereCanAcquire(_lockHolder)
.ExecuteUpdateAsync(setter => setter
.SetProperty(x => x.LockHolder, _ => null)
Expand Down Expand Up @@ -415,7 +415,7 @@ public async IAsyncEnumerable<CronTickerOccurrenceEntity<TCronTicker>> QueueCron
{
Id = item.NextCronOccurrence.Id,
CronTickerId = item.Id,
ExecutionTime = now,
ExecutionTime = executionTime,
Status = TickerStatus.Queued,
LockHolder = _lockHolder,
LockedAt = now,
Expand Down Expand Up @@ -464,12 +464,12 @@ public async Task<byte[]> GetCronTickerOccurrenceRequest(Guid tickerId, Cancella
.ConfigureAwait(false);
}

public async Task UpdateCronTickerOccurrencesWithUnifiedContext(Guid[] timeTickerIds, InternalFunctionContext functionContext,
public async Task UpdateCronTickerOccurrencesWithUnifiedContext(Guid[] cronOccurrenceIds, InternalFunctionContext functionContext,
CancellationToken cancellationToken = default)
{
await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);;
await dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
.Where(x => timeTickerIds.Contains(x.CronTickerId))
.Where(x => cronOccurrenceIds.Contains(x.Id))
.ExecuteUpdateAsync(setter => setter.UpdateCronTickerOccurrence<TCronTicker>(functionContext), cancellationToken)
.ConfigureAwait(false);
}
Expand Down
183 changes: 75 additions & 108 deletions src/TickerQ.Utilities/Managers/InternalTickerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,132 +28,99 @@ public InternalTickerManager(
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
_notificationHubSender = notificationHubSender;
}

public async Task<(TimeSpan TimeRemaining, InternalFunctionContext[] Functions)> GetNextTickers(CancellationToken cancellationToken = default)
{
while (true)
{
var minCronGroupTask = GetEarliestCronTickerGroupAsync(cancellationToken);
var minTimeTickersTask = _persistenceProvider.GetEarliestTimeTickers(cancellationToken);

await Task.WhenAll(minCronGroupTask, minTimeTickersTask).ConfigureAwait(false);

var (minCronGroup, minTimeTickers) = (await minCronGroupTask, await minTimeTickersTask);

var minTimeTickerTime = minTimeTickers.Length != 0
? minTimeTickers[0].ExecutionTime ?? default
: default;
var now = _clock.UtcNow;

var minTimeRemaining = CalculateMinTimeRemaining(minCronGroup, minTimeTickerTime, out var typesToQueue);
var minCronGroupTask = GetEarliestCronTickerGroupAsync(cancellationToken);
var minTimeTickersTask = _persistenceProvider.GetEarliestTimeTickers(cancellationToken);

if (minTimeRemaining == Timeout.InfiniteTimeSpan)
return (Timeout.InfiniteTimeSpan, []);
await Task.WhenAll(minCronGroupTask, minTimeTickersTask).ConfigureAwait(false);

var nextTickers = await RetrieveEligibleTickersAsync(minCronGroup, minTimeTickers, typesToQueue, cancellationToken).ConfigureAwait(false);
var minCronGroup = await minCronGroupTask.ConfigureAwait(false);
var minTimeTickers = await minTimeTickersTask.ConfigureAwait(false);

if (nextTickers.Length != 0)
return (minTimeRemaining, nextTickers);

if(typesToQueue.All(x => x == TickerType.CronTickerOccurrence))
return (minTimeRemaining, nextTickers);

await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationToken).ConfigureAwait(false); // Faster retry for time-sensitive tasks
}
}
var cronTime = minCronGroup?.Key;
var timeTickerTime = minTimeTickers.Length > 0
? minTimeTickers[0].ExecutionTime
: null;

private TimeSpan CalculateMinTimeRemaining(
(DateTime Key, InternalManagerContext[] Items)? minCronTicker,
DateTime minTimeTicker,
out TickerType[] sources)
{
var now = _clock.UtcNow;
if (cronTime is null && timeTickerTime is null)
return (Timeout.InfiniteTimeSpan, []);

DateTime? cron = minCronTicker?.Key;
DateTime? time = minTimeTicker == default ? null : minTimeTicker;
TimeSpan timeRemaining;
bool includeCron = false;
bool includeTimeTickers = false;

// no values
if (cron is null && time is null)
if (cronTime is null)
{
sources = [];
return Timeout.InfiniteTimeSpan;
includeTimeTickers = true;
timeRemaining = SafeRemaining(timeTickerTime!.Value, now);
}

// only cron
if (time is null)
else if (timeTickerTime is null)
{
sources = [TickerType.CronTickerOccurrence];
var cronRemaining = cron.Value - now;
// Ensure we don't return negative values - schedule for immediate execution
return cronRemaining < TimeSpan.Zero ? TimeSpan.Zero : cronRemaining;
includeCron = true;
timeRemaining = SafeRemaining(cronTime.Value, now);
}

// only time
if (cron is null)
else
{
sources = [TickerType.TimeTicker];
var timeRemaining = time.Value - now;
// Ensure we don't return negative values - schedule for immediate execution
return timeRemaining < TimeSpan.Zero ? TimeSpan.Zero : timeRemaining;
}
var cronSecond = new DateTime(cronTime.Value.Year, cronTime.Value.Month, cronTime.Value.Day,
cronTime.Value.Hour, cronTime.Value.Minute, cronTime.Value.Second);
var timeSecond = new DateTime(timeTickerTime.Value.Year, timeTickerTime.Value.Month, timeTickerTime.Value.Day,
timeTickerTime.Value.Hour, timeTickerTime.Value.Minute, timeTickerTime.Value.Second);

// both present - check if they're in the exact same second (ignoring milliseconds)
var cronSecond = new DateTime(cron.Value.Year, cron.Value.Month, cron.Value.Day,
cron.Value.Hour, cron.Value.Minute, cron.Value.Second);
var timeSecond = new DateTime(time.Value.Year, time.Value.Month, time.Value.Day,
time.Value.Hour, time.Value.Minute, time.Value.Second);

// Only batch if they're in the exact same second
if (cronSecond == timeSecond)
{
sources = [TickerType.CronTickerOccurrence, TickerType.TimeTicker];
var earliest = cron < time ? cron.Value : time.Value;
var earliestRemaining = earliest - now;
// Ensure we don't return negative values
return earliestRemaining < TimeSpan.Zero ? TimeSpan.Zero : earliestRemaining;
if (cronSecond == timeSecond)
{
includeCron = true;
includeTimeTickers = true;
var earliest = cronTime < timeTickerTime ? cronTime.Value : timeTickerTime.Value;
timeRemaining = SafeRemaining(earliest, now);
}
else if (cronTime < timeTickerTime)
{
includeCron = true;
timeRemaining = SafeRemaining(cronTime.Value, now);
}
else
{
includeTimeTickers = true;
timeRemaining = SafeRemaining(timeTickerTime.Value, now);
}
}

// Different seconds - only process the earliest one
if (cron < time)
{
sources = [TickerType.CronTickerOccurrence];
var cronRemaining = cron.Value - now;
return cronRemaining < TimeSpan.Zero ? TimeSpan.Zero : cronRemaining;
}
if (!includeCron && !includeTimeTickers)
return (Timeout.InfiniteTimeSpan, []);

sources = [TickerType.TimeTicker];
var finalTimeRemaining = time.Value - now;
return finalTimeRemaining < TimeSpan.Zero ? TimeSpan.Zero : finalTimeRemaining;
}
InternalFunctionContext[] cronFunctions = [];
InternalFunctionContext[] timeFunctions = [];

private async Task<InternalFunctionContext[]>RetrieveEligibleTickersAsync(
(DateTime Key, InternalManagerContext[] Items)? minCronTicker,
TimeTickerEntity[] minTimeTicker,
TickerType[] typesToQueue,
CancellationToken cancellationToken = default)
{

if (typesToQueue.Contains(TickerType.CronTickerOccurrence) && typesToQueue.Contains(TickerType.TimeTicker))
{
var nextCronTickersTask = QueueNextCronTickersAsync(minCronTicker!.Value, cancellationToken);
var nextTimeTickersTask = QueueNextTimeTickersAsync(minTimeTicker, cancellationToken);
if (includeCron && minCronGroup is not null)
cronFunctions = await QueueNextCronTickersAsync(minCronGroup.Value, cancellationToken).ConfigureAwait(false);

await Task.WhenAll(nextCronTickersTask, nextTimeTickersTask).ConfigureAwait(false);

var (nextCronTickers, nextTimeTickers) = (await nextCronTickersTask, await nextTimeTickersTask);

// Safety check for extremely large datasets
var totalLength = nextCronTickers.Length + nextTimeTickers.Length;

var merged = new InternalFunctionContext[totalLength];
nextCronTickers.AsSpan().CopyTo(merged.AsSpan(0, nextCronTickers.Length));
nextTimeTickers.AsSpan().CopyTo(merged.AsSpan(nextCronTickers.Length, nextTimeTickers.Length));
return merged;
}
if (includeTimeTickers && minTimeTickers.Length > 0)
timeFunctions = await QueueNextTimeTickersAsync(minTimeTickers, cancellationToken).ConfigureAwait(false);

if (typesToQueue.Contains(TickerType.TimeTicker))
return await QueueNextTimeTickersAsync(minTimeTicker, cancellationToken).ConfigureAwait(false);
else
return await QueueNextCronTickersAsync(minCronTicker!.Value, cancellationToken).ConfigureAwait(false);
if (cronFunctions.Length == 0 && timeFunctions.Length == 0)
return (timeRemaining, []);

if (cronFunctions.Length == 0)
return (timeRemaining, timeFunctions);

if (timeFunctions.Length == 0)
return (timeRemaining, cronFunctions);

var merged = new InternalFunctionContext[cronFunctions.Length + timeFunctions.Length];
cronFunctions.AsSpan().CopyTo(merged.AsSpan(0, cronFunctions.Length));
timeFunctions.AsSpan().CopyTo(merged.AsSpan(cronFunctions.Length, timeFunctions.Length));

return (timeRemaining, merged);
}

private static TimeSpan SafeRemaining(DateTime target, DateTime now)
{
var remaining = target - now;
return remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining;
}

private async Task<InternalFunctionContext[]> QueueNextTimeTickersAsync(TimeTickerEntity[] minTimeTickers, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -489,9 +456,9 @@ public async Task MigrateDefinedCronTickers((string, string)[] cronExpressions,
public async Task DeleteTicker(Guid tickerId, TickerType type, CancellationToken cancellationToken = default)
{
if (type == TickerType.CronTickerOccurrence)
await _persistenceProvider.RemoveTimeTickers([tickerId], cancellationToken).ConfigureAwait(false);
else
await _persistenceProvider.RemoveCronTickers([tickerId], cancellationToken).ConfigureAwait(false);
else
await _persistenceProvider.RemoveTimeTickers([tickerId], cancellationToken).ConfigureAwait(false);
}

public async Task ReleaseDeadNodeResources(string instanceIdentifier, CancellationToken cancellationToken = default)
Expand All @@ -503,4 +470,4 @@ public async Task ReleaseDeadNodeResources(string instanceIdentifier, Cancellati
await Task.WhenAll(cronOccurrence, timeTickers).ConfigureAwait(false);
}
}
}
}
1 change: 0 additions & 1 deletion src/TickerQ.Utilities/Models/InternalFunctionContext.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using TickerQ.Utilities.Enums;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ namespace TickerQ.BackgroundServices;
internal class TickerQFallbackBackgroundService : BackgroundService
{
private int _started;
private PeriodicTimer _tickerFallbackJobPeriodicTimer;
private readonly IInternalTickerManager _internalTickerManager;
private readonly TickerExecutionTaskHandler _tickerExecutionTaskHandler;
private readonly TickerQTaskScheduler _tickerQTaskScheduler;
Expand All @@ -33,16 +32,8 @@ public override Task StartAsync(CancellationToken ct)

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_tickerFallbackJobPeriodicTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(10));
await RunTickerQFallbackAsync(stoppingToken);
}

private async Task RunTickerQFallbackAsync(CancellationToken stoppingToken)
{
while (await _tickerFallbackJobPeriodicTimer.WaitForNextTickAsync(stoppingToken))
while (!stoppingToken.IsCancellationRequested)
{
var oldPeriod = _tickerFallbackJobPeriodicTimer.Period;

var functions = await _internalTickerManager.RunTimedOutTickers(stoppingToken);

if (functions.Length != 0)
Expand Down Expand Up @@ -76,13 +67,12 @@ private async Task RunTickerQFallbackAsync(CancellationToken stoppingToken)
await _tickerQTaskScheduler.QueueAsync(ct => _tickerExecutionTaskHandler.ExecuteTaskAsync(function, true, ct), function.CachedPriority, stoppingToken);
}

_tickerFallbackJobPeriodicTimer.Period = TimeSpan.FromMilliseconds(10);
await Task.Delay(TimeSpan.FromMilliseconds(10), stoppingToken);
}
else
_tickerFallbackJobPeriodicTimer.Period = _fallbackJobPeriod;

if(oldPeriod != _fallbackJobPeriod)
await _tickerFallbackJobPeriodicTimer.WaitForNextTickAsync(stoppingToken);
{
await Task.Delay(_fallbackJobPeriod, stoppingToken);
}
}
}

Expand All @@ -92,4 +82,4 @@ public override async Task StopAsync(CancellationToken cancellationToken)
Interlocked.Exchange(ref _started, 0);
await base.StopAsync(cancellationToken);
}
}
}
Loading
Loading