Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,13 @@ public async Task MigrateDefinedCronTickers((string Function, string Expression)
{
var now = _clock.UtcNow;
const string seedPrefix = "MemoryTicker_Seeded_";
var functions = cronTickers.Select(x => x.Function).ToHashSet(StringComparer.Ordinal);

// Load existing seeded cron tickers
// Build the complete set of registered function names to detect orphaned tickers.
// This covers functions whose InitIdentifier was cleared by a dashboard edit (#517).
var allRegisteredFunctions = TickerFunctionProvider.TickerFunctions.Keys
.ToHashSet(StringComparer.Ordinal);

// Load all existing cron tickers
var existingIds = await _db.SetMembersAsync(CronIdsKey).ConfigureAwait(false);
var existingList = new List<TCronTicker>();
foreach (var redisValue in existingIds)
Expand All @@ -352,17 +356,22 @@ public async Task MigrateDefinedCronTickers((string Function, string Expression)
if (cron != null) existingList.Add(cron);
}

var seededCron = existingList.Where(c => c.InitIdentifier != null && c.InitIdentifier.StartsWith(seedPrefix)).ToList();
var seededToDelete = seededCron.Where(c => !functions.Contains(c.Function)).Select(c => c.Id).ToArray();
// Delete all cron tickers whose function no longer exists in the code definitions
var orphanedToDelete = existingList
.Where(c => !allRegisteredFunctions.Contains(c.Function))
.Select(c => c.Id).ToArray();

foreach (var id in seededToDelete)
foreach (var id in orphanedToDelete)
{
await RemoveCronIndexesAsync(id).ConfigureAwait(false);
await RemoveCronOccurrenceIndexesAsyncForCron(id).ConfigureAwait(false);
await _db.KeyDeleteAsync(CronKey(id)).ConfigureAwait(false);
}

var existingByFunction = existingList.ToDictionary(c => c.Function, c => c, StringComparer.Ordinal);
var orphanedSet = orphanedToDelete.ToHashSet();
var existingByFunction = existingList
.Where(c => !orphanedSet.Contains(c.Id))
.ToDictionary(c => c.Function, c => c, StringComparer.Ordinal);
foreach (var (function, expression) in cronTickers)
{
if (existingByFunction.TryGetValue(function, out var cron))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,36 +248,36 @@ public async Task MigrateDefinedCronTickers((string Function, string Expression)
var functions = cronTickers.Select(x => x.Function).ToArray();
var cronSet = dbContext.Set<TCronTicker>();

// Identify seeded cron tickers (created from in-memory definitions)
const string seedPrefix = "MemoryTicker_Seeded_";

var seededCron = await cronSet
.Where(c => c.InitIdentifier != null && c.InitIdentifier.StartsWith(seedPrefix))
.ToListAsync(cancellationToken)
.ConfigureAwait(false);

var newFunctionSet = functions.ToHashSet(StringComparer.Ordinal);

// Delete seeded cron tickers whose function no longer exists in the code definitions
var seededToDelete = seededCron
.Where(c => !newFunctionSet.Contains(c.Function))
// Build the complete set of registered function names to detect orphaned tickers.
// This covers functions whose InitIdentifier was cleared by a dashboard edit (#517).
var allRegisteredFunctions = TickerFunctionProvider.TickerFunctions.Keys
.ToHashSet(StringComparer.Ordinal);

// Find all cron tickers whose function no longer exists in the code definitions.
// This includes seeded tickers (InitIdentifier = "MemoryTicker_Seeded_*") as well as
// previously-seeded tickers whose InitIdentifier was cleared by a dashboard update.
var orphanedCron = await cronSet
.Where(c => !allRegisteredFunctions.Contains(c.Function))
.Select(c => c.Id)
.ToArray();
.ToArrayAsync(cancellationToken)
.ConfigureAwait(false);

if (seededToDelete.Length > 0)
if (orphanedCron.Length > 0)
{
// Delete related occurrences first (if any), then the cron tickers
await dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
.Where(o => seededToDelete.Contains(o.CronTickerId))
.Where(o => orphanedCron.Contains(o.CronTickerId))
.ExecuteDeleteAsync(cancellationToken)
.ConfigureAwait(false);

await cronSet
.Where(c => seededToDelete.Contains(c.Id))
.Where(c => orphanedCron.Contains(c.Id))
.ExecuteDeleteAsync(cancellationToken)
.ConfigureAwait(false);
}

var newFunctionSet = functions.ToHashSet(StringComparer.Ordinal);

// Load existing (remaining) cron tickers for the current function set
var existing = await cronSet
.Where(c => functions.Contains(c.Function))
Expand Down
13 changes: 12 additions & 1 deletion src/TickerQ/Src/Provider/TickerInMemoryPersistenceProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,17 @@ public Task MigrateDefinedCronTickers((string Function, string Expression)[] cro
{
var now = _clock.UtcNow;

// Remove orphaned cron tickers whose function no longer exists in the code definitions (#517).
var allRegisteredFunctions = TickerFunctionProvider.TickerFunctions.Keys
.ToHashSet(StringComparer.Ordinal);

var snapshot = CronTickers.ToArray();
foreach (var (id, ticker) in snapshot)
{
if (!allRegisteredFunctions.Contains(ticker.Function))
CronTickers.TryRemove(id, out _);
}

foreach (var (function, expression) in cronTickers)
{
// Check if already exists (take snapshot for thread safety)
Expand All @@ -538,7 +549,7 @@ public Task MigrateDefinedCronTickers((string Function, string Expression)[] cro
UpdatedAt = now,
Request = Array.Empty<byte>()
};

CronTickers.TryAdd(id, cronTicker);
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/TickerQ/Src/TickerExecutionTaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ private async Task RunContextFunctionAsync(InternalFunctionContext context, bool

stopWatch.Start();

if (context.CachedDelegate is null)
throw new InvalidOperationException(
$"Ticker function '{context.FunctionName}' was not found in the registered functions. " +
"Ensure the function is properly decorated with [TickerFunction] attribute and the containing class is registered.");

// Create service scope - will be disposed automatically via await using
await using var scope = _serviceProvider.CreateAsyncScope();
tickerFunctionContext.SetServiceScope(scope);
Expand Down
Loading