Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
ο»Ώ<Project>

<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<RepositoryUrl>https://github.com/arcenox-co/TickerQ</RepositoryUrl>
Expand All @@ -8,7 +9,7 @@
<PackageIcon>icon.jpg</PackageIcon>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Version>9.1.0</Version>
<DotNetVersion>[9.0.0,10.0.0)</DotNetVersion>
<DotNetVersion>[10.0.0,11.0.0)</DotNetVersion>
<LangVersion>default</LangVersion>
</PropertyGroup>

Expand All @@ -17,5 +18,4 @@
</ItemGroup>

<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))" />

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ public async Task ReleaseAcquiredTimeTickers(Guid[] timeTickerIds, CancellationT
await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);;
var now = _clock.UtcNow;

var baseQuery = timeTickerIds.Length == 0
var idList = timeTickerIds.ToList();
var baseQuery = idList.Count == 0
? dbContext.Set<TTimeTicker>()
: dbContext.Set<TTimeTicker>().Where(x => timeTickerIds.Contains(x.Id));
: dbContext.Set<TTimeTicker>().Where(x => idList.Contains(x.Id));

await baseQuery
.WhereCanAcquire(_lockHolder)
Expand All @@ -127,9 +128,15 @@ public async Task<int> UpdateTimeTicker(InternalFunctionContext functionContexts
public async Task UpdateTimeTickersWithUnifiedContext(Guid[] timeTickerIds, InternalFunctionContext functionContext, CancellationToken cancellationToken = default)
{
await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);;
var idList = timeTickerIds.ToList();
await dbContext.Set<TTimeTicker>()
<<<<<<< HEAD
.Where(x => timeTickerIds.Contains(x.Id))
.ExecuteUpdateAsync(MappingExtensions.UpdateTimeTicker<TTimeTicker>(functionContext, _clock.UtcNow), cancellationToken).ConfigureAwait(false);
=======
.Where(x => idList.Contains(x.Id))
.ExecuteUpdateAsync(setter => setter.UpdateTimeTicker<TTimeTicker>(functionContext, _clock.UtcNow), cancellationToken).ConfigureAwait(false);
>>>>>>> 39b9b90 (Fix .NET 9+ EF Core query failures caused by ReadOnlySpan array.Contains() (#574))
}

public async Task<TimeTickerEntity[]> GetEarliestTimeTickers(CancellationToken cancellationToken)
Expand Down Expand Up @@ -214,10 +221,11 @@ public async Task<TimeTickerEntity[]> AcquireImmediateTimeTickersAsync(Guid[] id

await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);
var now = _clock.UtcNow;
var idList = ids.ToList();

// Acquire and mark InProgress in a single update
var affected = await dbContext.Set<TTimeTicker>()
.Where(x => ids.Contains(x.Id))
.Where(x => idList.Contains(x.Id))
.WhereCanAcquire(_lockHolder)
.ExecuteUpdateAsync(setter => setter
.SetProperty(x => x.LockHolder, _lockHolder)
Expand All @@ -232,7 +240,7 @@ public async Task<TimeTickerEntity[]> AcquireImmediateTimeTickersAsync(Guid[] id
// Return the acquired tickers for immediate execution, with children
return await dbContext.Set<TTimeTicker>()
.AsNoTracking()
.Where(x => ids.Contains(x.Id) && x.LockHolder == _lockHolder && x.Status == TickerStatus.InProgress)
.Where(x => idList.Contains(x.Id) && x.LockHolder == _lockHolder && x.Status == TickerStatus.InProgress)
.Include(x => x.Children.Where(y => y.ExecutionTime == null))
.Select(MappingExtensions.ForQueueTimeTickers<TTimeTicker>())
.ToArrayAsync(cancellationToken)
Expand All @@ -245,7 +253,7 @@ public async Task MigrateDefinedCronTickers((string Function, string Expression)
await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);
var now = _clock.UtcNow;

var functions = cronTickers.Select(x => x.Function).ToArray();
var functions = cronTickers.Select(x => x.Function).ToList();
var cronSet = dbContext.Set<TCronTicker>();

// Build the complete set of registered function names to detect orphaned tickers.
Expand All @@ -262,16 +270,17 @@ public async Task MigrateDefinedCronTickers((string Function, string Expression)
.ToArrayAsync(cancellationToken)
.ConfigureAwait(false);

if (orphanedCron.Length > 0)
var orphanedCronList = orphanedCron.ToList();
if (orphanedCronList.Count > 0)
{
// Delete related occurrences first (if any), then the cron tickers
await dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
.Where(o => orphanedCron.Contains(o.CronTickerId))
.Where(o => orphanedCronList.Contains(o.CronTickerId))
.ExecuteDeleteAsync(cancellationToken)
.ConfigureAwait(false);

await cronSet
.Where(c => orphanedCron.Contains(c.Id))
.Where(c => orphanedCronList.Contains(c.Id))
.ExecuteDeleteAsync(cancellationToken)
.ConfigureAwait(false);
}
Expand Down Expand Up @@ -421,9 +430,10 @@ public async Task ReleaseAcquiredCronTickerOccurrences(Guid[] occurrenceIds, Can
var now = _clock.UtcNow;
await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);;

var baseQuery = occurrenceIds.Length == 0
? dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
: dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>().Where(x => occurrenceIds.Contains(x.Id));
var idList = occurrenceIds.ToList();
var baseQuery = idList.Count == 0
? dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
: dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>().Where(x => idList.Contains(x.Id));

await baseQuery
.WhereCanAcquire(_lockHolder)
Expand Down Expand Up @@ -525,11 +535,12 @@ public async Task<CronTickerOccurrenceEntity<TCronTicker>> GetEarliestAvailableC
{
var now = _clock.UtcNow;
var mainSchedulerThreshold = now.AddSeconds(-1);
var idList = ids.ToList();
await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);;
return await dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
.AsNoTracking()
.Include(x => x.CronTicker)
.Where(x => ids.Contains(x.CronTickerId))
.Where(x => idList.Contains(x.CronTickerId))
.Where(x => x.ExecutionTime >= mainSchedulerThreshold) // Only items within the 1-second main scheduler window
.WhereCanAcquire(_lockHolder)
.OrderBy(x => x.ExecutionTime)
Expand All @@ -554,9 +565,15 @@ public async Task UpdateCronTickerOccurrencesWithUnifiedContext(Guid[] cronOccur
CancellationToken cancellationToken = default)
{
await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);;
var idList = cronOccurrenceIds.ToList();
await dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
<<<<<<< HEAD
.Where(x => cronOccurrenceIds.Contains(x.Id))
.ExecuteUpdateAsync(MappingExtensions.UpdateCronTickerOccurrence<TCronTicker>(functionContext), cancellationToken)
=======
.Where(x => idList.Contains(x.Id))
.ExecuteUpdateAsync(setter => setter.UpdateCronTickerOccurrence<TCronTicker>(functionContext), cancellationToken)
>>>>>>> 39b9b90 (Fix .NET 9+ EF Core query failures caused by ReadOnlySpan array.Contains() (#574))
.ConfigureAwait(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ public async Task<int> RemoveTimeTickers(Guid[] timeTickerIds, CancellationToken
await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);;

// Load the entities to be deleted (including children for cascade delete)
var idList = timeTickerIds.ToList();
var tickersToDelete = await dbContext.Set<TTimeTicker>()
.Include(x => x.Children)
.ThenInclude(x => x.Children) // Include grandchildren if needed
.Where(x => timeTickerIds.Contains(x.Id))
.Where(x => idList.Contains(x.Id))
.ToListAsync(cancellationToken)
.ConfigureAwait(false);

Expand Down Expand Up @@ -189,7 +190,8 @@ public async Task<int> UpdateCronTickers(TCronTicker[] cronTickers, Cancellation
public async Task<int> RemoveCronTickers(Guid[] cronTickerIds, CancellationToken cancellationToken)
{
await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);
var result = await dbContext.Set<TCronTicker>().Where(x => cronTickerIds.Contains(x.Id))
var idList = cronTickerIds.ToList();
var result = await dbContext.Set<TCronTicker>().Where(x => idList.Contains(x.Id))
.ExecuteDeleteAsync(cancellationToken).ConfigureAwait(false);

if(RedisContext.HasRedisConnection)
Expand Down Expand Up @@ -246,8 +248,9 @@ public async Task<int> InsertCronTickerOccurrences(CronTickerOccurrenceEntity<TC
public async Task<int> RemoveCronTickerOccurrences(Guid[] cronTickerOccurrences, CancellationToken cancellationToken = default)
{
await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);
var idList = cronTickerOccurrences.ToList();
return await dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
.Where(x => cronTickerOccurrences.Contains(x.Id))
.Where(x => idList.Contains(x.Id))
.ExecuteDeleteAsync(cancellationToken).ConfigureAwait(false);
}

Expand All @@ -258,10 +261,11 @@ public async Task<CronTickerOccurrenceEntity<TCronTicker>[]> AcquireImmediateCro

await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);
var now = _clock.UtcNow;
var idList = occurrenceIds.ToList();

// Only acquire occurrences that are acquirable (Idle/Queued and not locked by another node)
var query = dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
.Where(x => occurrenceIds.Contains(x.Id))
.Where(x => idList.Contains(x.Id))
.WhereCanAcquire(_lockHolder);

// Lock and mark InProgress
Expand All @@ -279,7 +283,7 @@ public async Task<CronTickerOccurrenceEntity<TCronTicker>[]> AcquireImmediateCro
// Return acquired occurrences with CronTicker populated
return await dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
.AsNoTracking()
.Where(x => occurrenceIds.Contains(x.Id) && x.LockHolder == _lockHolder && x.Status == TickerStatus.InProgress)
.Where(x => idList.Contains(x.Id) && x.LockHolder == _lockHolder && x.Status == TickerStatus.InProgress)
.Include(x => x.CronTicker)
.ToArrayAsync(cancellationToken)
.ConfigureAwait(false);
Expand Down
Loading