diff --git a/src/Umbraco.Core/Configuration/Models/DistributedJobSettings.cs b/src/Umbraco.Core/Configuration/Models/DistributedJobSettings.cs index cafe666b7bb3..af1417c067ea 100644 --- a/src/Umbraco.Core/Configuration/Models/DistributedJobSettings.cs +++ b/src/Umbraco.Core/Configuration/Models/DistributedJobSettings.cs @@ -10,6 +10,7 @@ public class DistributedJobSettings { internal const string StaticPeriod = "00:00:05"; internal const string StaticDelay = "00:01:00"; + internal const string StaticMaxExecutionTime = "00:05:00"; /// /// Gets or sets a value for the period of checking if there are any runnable distributed jobs. @@ -22,4 +23,11 @@ public class DistributedJobSettings /// [DefaultValue(StaticDelay)] public TimeSpan Delay { get; set; } = TimeSpan.Parse(StaticDelay); + + /// + /// Gets or sets the maximum execution time for a distributed job before it is considered timed out. + /// When a job exceeds this time, it is considered stale and can be picked up by another server for recovery and restarted. + /// + [DefaultValue(StaticMaxExecutionTime)] + public TimeSpan MaximumExecutionTime { get; set; } = TimeSpan.Parse(StaticMaxExecutionTime); } diff --git a/src/Umbraco.Infrastructure/BackgroundJobs/DistributedBackgroundJobHostedService.cs b/src/Umbraco.Infrastructure/BackgroundJobs/DistributedBackgroundJobHostedService.cs index 01711d64ef2f..6cc629bba0af 100644 --- a/src/Umbraco.Infrastructure/BackgroundJobs/DistributedBackgroundJobHostedService.cs +++ b/src/Umbraco.Infrastructure/BackgroundJobs/DistributedBackgroundJobHostedService.cs @@ -1,5 +1,4 @@ -using System.Diagnostics; -using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Umbraco.Cms.Core; @@ -22,10 +21,6 @@ public class DistributedBackgroundJobHostedService : BackgroundService /// /// Initializes a new instance of the class. /// - /// - /// - /// - /// public DistributedBackgroundJobHostedService( ILogger logger, IRuntimeState runtimeState, diff --git a/src/Umbraco.Infrastructure/Persistence/Repositories/IDistributedJobRepository.cs b/src/Umbraco.Infrastructure/Persistence/Repositories/IDistributedJobRepository.cs index 2566a5b9b989..33f8932b2052 100644 --- a/src/Umbraco.Infrastructure/Persistence/Repositories/IDistributedJobRepository.cs +++ b/src/Umbraco.Infrastructure/Persistence/Repositories/IDistributedJobRepository.cs @@ -33,4 +33,30 @@ public interface IDistributedJobRepository /// Deletes a job. /// void Delete(DistributedBackgroundJobModel distributedBackgroundJob); + + /// + /// Adds multiple jobs in a single batch operation. + /// + /// The jobs to add. + void Add(IEnumerable jobs) + { + // TODO: Delete default implementation in V18 + foreach (DistributedBackgroundJobModel job in jobs) + { + Add(job); + } + } + + /// + /// Deletes multiple jobs in a single batch operation. + /// + /// The jobs to delete. + void Delete(IEnumerable jobs) + { + // TODO: Delete default implementation in V18 + foreach (DistributedBackgroundJobModel job in jobs) + { + Delete(job); + } + } } diff --git a/src/Umbraco.Infrastructure/Persistence/Repositories/Implement/DistributedJobRepository.cs b/src/Umbraco.Infrastructure/Persistence/Repositories/Implement/DistributedJobRepository.cs index 4e6502dc7d11..73550577af8a 100644 --- a/src/Umbraco.Infrastructure/Persistence/Repositories/Implement/DistributedJobRepository.cs +++ b/src/Umbraco.Infrastructure/Persistence/Repositories/Implement/DistributedJobRepository.cs @@ -87,6 +87,40 @@ public void Delete(DistributedBackgroundJobModel distributedBackgroundJob) } } + /// + public void Add(IEnumerable jobs) + { + if (scopeAccessor.AmbientScope is null) + { + throw new InvalidOperationException("No scope, could not add distributed jobs"); + } + + IEnumerable dtos = jobs.Select(MapToDto); + scopeAccessor.AmbientScope.Database.InsertBulk(dtos); + } + + /// + public void Delete(IEnumerable jobs) + { + if (scopeAccessor.AmbientScope is null) + { + throw new InvalidOperationException("No scope, could not delete distributed jobs"); + } + + var jobIds = jobs.Select(x => x.Id).ToArray(); + if (jobIds.Length is 0) + { + return; + } + + Sql sql = scopeAccessor.AmbientScope.SqlContext.Sql() + .Delete() + .From() + .WhereIn(x => x.Id, jobIds); + + scopeAccessor.AmbientScope.Database.Execute(sql); + } + private DistributedJobDto MapToDto(DistributedBackgroundJobModel model) => new() { diff --git a/src/Umbraco.Infrastructure/Services/Implement/DistributedJobService.cs b/src/Umbraco.Infrastructure/Services/Implement/DistributedJobService.cs index 286567842605..bf3967ee852b 100644 --- a/src/Umbraco.Infrastructure/Services/Implement/DistributedJobService.cs +++ b/src/Umbraco.Infrastructure/Services/Implement/DistributedJobService.cs @@ -1,5 +1,9 @@ -using Microsoft.Extensions.Logging; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using Umbraco.Cms.Core; +using Umbraco.Cms.Core.Configuration.Models; +using Umbraco.Cms.Core.DependencyInjection; using Umbraco.Cms.Core.Scoping; using Umbraco.Cms.Infrastructure.BackgroundJobs; using Umbraco.Cms.Infrastructure.Models; @@ -14,24 +18,41 @@ public class DistributedJobService : IDistributedJobService private readonly IDistributedJobRepository _distributedJobRepository; private readonly IEnumerable _distributedBackgroundJobs; private readonly ILogger _logger; + private readonly DistributedJobSettings _settings; /// /// Initializes a new instance of the class. /// - /// - /// - /// - /// + [Obsolete("Use the constructor that accepts IOptions. Scheduled for removal in V18.")] public DistributedJobService( ICoreScopeProvider coreScopeProvider, IDistributedJobRepository distributedJobRepository, IEnumerable distributedBackgroundJobs, ILogger logger) + : this( + coreScopeProvider, + distributedJobRepository, + distributedBackgroundJobs, + logger, + StaticServiceProvider.Instance.GetRequiredService>()) + { + } + + /// + /// Initializes a new instance of the class. + /// + public DistributedJobService( + ICoreScopeProvider coreScopeProvider, + IDistributedJobRepository distributedJobRepository, + IEnumerable distributedBackgroundJobs, + ILogger logger, + IOptions settings) { _coreScopeProvider = coreScopeProvider; _distributedJobRepository = distributedJobRepository; _distributedBackgroundJobs = distributedBackgroundJobs; _logger = logger; + _settings = settings.Value; } /// @@ -42,7 +63,8 @@ public DistributedJobService( scope.EagerWriteLock(Constants.Locks.DistributedJobs); IEnumerable jobs = _distributedJobRepository.GetAll(); - DistributedBackgroundJobModel? job = jobs.FirstOrDefault(x => x.LastRun < DateTime.UtcNow - x.Period); + DistributedBackgroundJobModel? job = jobs.FirstOrDefault(x => x.LastRun < DateTime.UtcNow - x.Period + && (x.IsRunning is false || x.LastAttemptedRun < DateTime.UtcNow - x.Period - _settings.MaximumExecutionTime)); if (job is null) { @@ -93,45 +115,64 @@ public async Task FinishAsync(string jobName) /// public async Task EnsureJobsAsync() { + // Pre-compute registered job data outside the lock to minimize lock hold time + var registeredJobsByName = _distributedBackgroundJobs.ToDictionary(x => x.Name, x => x.Period); + + // Early exit if no registered jobs + if (registeredJobsByName.Count is 0) + { + return; + } + using ICoreScope scope = _coreScopeProvider.CreateCoreScope(); scope.WriteLock(Constants.Locks.DistributedJobs); DistributedBackgroundJobModel[] existingJobs = _distributedJobRepository.GetAll().ToArray(); var existingJobsByName = existingJobs.ToDictionary(x => x.Name); - foreach (IDistributedBackgroundJob registeredJob in _distributedBackgroundJobs) + // Collect all changes first, then execute - minimizes time spent in the critical section + var jobsToAdd = new List(); + DateTime utcNow = DateTime.UtcNow; + + foreach (KeyValuePair registeredJob in registeredJobsByName) { - if (existingJobsByName.TryGetValue(registeredJob.Name, out DistributedBackgroundJobModel? existingJob)) + if (existingJobsByName.TryGetValue(registeredJob.Key, out DistributedBackgroundJobModel? existingJob)) { - // Update if period has changed - if (existingJob.Period != registeredJob.Period) + // Update only if period has actually changed + if (existingJob.Period != registeredJob.Value) { - existingJob.Period = registeredJob.Period; + existingJob.Period = registeredJob.Value; _distributedJobRepository.Update(existingJob); } } else { - // Add new job (fresh install or newly registered job) - var newJob = new DistributedBackgroundJobModel + // Collect new jobs for batch insert + jobsToAdd.Add(new DistributedBackgroundJobModel { - Name = registeredJob.Name, - Period = registeredJob.Period, - LastRun = DateTime.UtcNow, + Name = registeredJob.Key, + Period = registeredJob.Value, + LastRun = utcNow, IsRunning = false, - LastAttemptedRun = DateTime.UtcNow, - }; - _distributedJobRepository.Add(newJob); + LastAttemptedRun = utcNow, + }); } } - // Remove jobs that are no longer registered in code - var registeredJobNames = _distributedBackgroundJobs.Select(x => x.Name).ToHashSet(); - IEnumerable jobsToRemove = existingJobs.Where(x => registeredJobNames.Contains(x.Name) is false); + // Batch insert new jobs + if (jobsToAdd.Count > 0) + { + _distributedJobRepository.Add(jobsToAdd); + } + + // Batch delete jobs that are no longer registered + var jobsToRemove = existingJobs + .Where(x => registeredJobsByName.ContainsKey(x.Name) is false) + .ToList(); - foreach (DistributedBackgroundJobModel jobToRemove in jobsToRemove) + if (jobsToRemove.Count > 0) { - _distributedJobRepository.Delete(jobToRemove); + _distributedJobRepository.Delete(jobsToRemove); } scope.Complete(); diff --git a/tests/Umbraco.Tests.Integration/Umbraco.Infrastructure/Services/DistributedJobServiceTests.cs b/tests/Umbraco.Tests.Integration/Umbraco.Infrastructure/Services/DistributedJobServiceTests.cs new file mode 100644 index 000000000000..8c972356c93f --- /dev/null +++ b/tests/Umbraco.Tests.Integration/Umbraco.Infrastructure/Services/DistributedJobServiceTests.cs @@ -0,0 +1,333 @@ +using Microsoft.Extensions.DependencyInjection; +using NUnit.Framework; +using Umbraco.Cms.Core.Configuration.Models; +using Umbraco.Cms.Infrastructure.BackgroundJobs; +using Umbraco.Cms.Infrastructure.Persistence.Dtos; +using Umbraco.Cms.Infrastructure.Services; +using Umbraco.Cms.Tests.Common.Testing; +using Umbraco.Cms.Tests.Integration.Testing; + +namespace Umbraco.Cms.Tests.Integration.Umbraco.Infrastructure.Services; + +/// +/// Integration tests for . +/// +[TestFixture] +[UmbracoTest(Database = UmbracoTestOptions.Database.NewSchemaPerTest)] +internal sealed class DistributedJobServiceTests : UmbracoIntegrationTest +{ + private const string TestJobName = "TestDistributedJob"; + private static readonly TimeSpan TestJobPeriod = TimeSpan.FromMinutes(5); + private static readonly TimeSpan TestMaxExecutionTime = TimeSpan.FromMinutes(10); + + private IDistributedJobService DistributedJobService => GetRequiredService(); + + protected override void CustomTestSetup(IUmbracoBuilder builder) + { + // Register a test job + builder.Services.AddSingleton(); + + // Configure settings with a known MaximumExecutionTime for testing + builder.Services.PostConfigure(options => + { + options.MaximumExecutionTime = TestMaxExecutionTime; + }); + } + + [Test] + public async Task TryTakeRunnableAsync_JobIsDueAndNotRunning_ReturnsJob() + { + // Arrange - Ensure jobs are registered + await DistributedJobService.EnsureJobsAsync(); + + // Set the job's LastRun to be older than the period + SetJobState(TestJobName, lastRun: DateTime.UtcNow - TestJobPeriod - TimeSpan.FromMinutes(1), isRunning: false); + + // Act + var job = await DistributedJobService.TryTakeRunnableAsync(); + + // Assert + Assert.IsNotNull(job); + Assert.AreEqual(TestJobName, job!.Name); + + // Verify the job is now marked as running + var jobState = GetJobState(TestJobName); + Assert.IsTrue(jobState.IsRunning); + } + + [Test] + public async Task TryTakeRunnableAsync_JobIsNotDue_ReturnsNull() + { + // Arrange - Ensure jobs are registered + await DistributedJobService.EnsureJobsAsync(); + + // Set the job's LastRun to be recent (not due yet) + SetJobState(TestJobName, lastRun: DateTime.UtcNow, isRunning: false); + + // Act + var job = await DistributedJobService.TryTakeRunnableAsync(); + + // Assert + Assert.IsNull(job); + } + + [Test] + public async Task TryTakeRunnableAsync_JobIsRunningAndNotTimedOut_ReturnsNull() + { + // Arrange - Ensure jobs are registered + await DistributedJobService.EnsureJobsAsync(); + + // Set the job as running with a recent LastAttemptedRun (not timed out) + SetJobState( + TestJobName, + lastRun: DateTime.UtcNow - TestJobPeriod - TimeSpan.FromMinutes(1), + isRunning: true, + lastAttemptedRun: DateTime.UtcNow); + + // Act + var job = await DistributedJobService.TryTakeRunnableAsync(); + + // Assert - Should NOT pick up the job because it's running and hasn't timed out + Assert.IsNull(job); + } + + [Test] + public async Task TryTakeRunnableAsync_JobIsRunningButTimedOut_ReturnsJob() + { + // Arrange - Ensure jobs are registered + await DistributedJobService.EnsureJobsAsync(); + + // Set the job as running but with LastAttemptedRun older than Period + MaxExecutionTime (timed out) + var timedOutTime = DateTime.UtcNow - TestJobPeriod - TestMaxExecutionTime - TimeSpan.FromMinutes(1); + SetJobState( + TestJobName, + lastRun: timedOutTime - TimeSpan.FromMinutes(1), + isRunning: true, + lastAttemptedRun: timedOutTime); + + // Act + var job = await DistributedJobService.TryTakeRunnableAsync(); + + // Assert - Should pick up the job because it has timed out (stale recovery) + Assert.IsNotNull(job); + Assert.AreEqual(TestJobName, job!.Name); + } + + [Test] + public async Task FinishAsync_MarksJobAsNotRunningAndUpdatesLastRun() + { + // Arrange - Ensure jobs are registered and take a job + await DistributedJobService.EnsureJobsAsync(); + SetJobState(TestJobName, lastRun: DateTime.UtcNow - TestJobPeriod - TimeSpan.FromMinutes(1), isRunning: false); + + var job = await DistributedJobService.TryTakeRunnableAsync(); + Assert.IsNotNull(job); + + var beforeFinish = DateTime.UtcNow; + + // Act + await DistributedJobService.FinishAsync(job!.Name); + + // Assert + var jobState = GetJobState(TestJobName); + Assert.IsFalse(jobState.IsRunning); + Assert.GreaterOrEqual(jobState.LastRun, beforeFinish); + Assert.GreaterOrEqual(jobState.LastAttemptedRun, beforeFinish); + } + + [Test] + public async Task EnsureJobsAsync_RegistersNewJobs() + { + // Act + await DistributedJobService.EnsureJobsAsync(); + + // Assert + var jobState = GetJobState(TestJobName); + Assert.IsNotNull(jobState); + Assert.AreEqual(TestJobPeriod.Ticks, jobState.Period); + } + + [Test] + public async Task TryTakeRunnableAsync_MultipleJobsDue_ReturnsFirstDueJob() + { + // Arrange - Ensure jobs are registered + await DistributedJobService.EnsureJobsAsync(); + + // Set the job's LastRun to be older than the period + SetJobState(TestJobName, lastRun: DateTime.UtcNow - TestJobPeriod - TimeSpan.FromMinutes(1), isRunning: false); + + // Act - Take the first job + var job1 = await DistributedJobService.TryTakeRunnableAsync(); + Assert.IsNotNull(job1); + + // Act - Try to take another job (same job is now running, so should return null if no other jobs are due) + var job2 = await DistributedJobService.TryTakeRunnableAsync(); + + // Assert - Should be null because the only test job is now running + Assert.IsNull(job2); + } + + [Test] + public async Task TryTakeRunnableAsync_JobRunningJustBeforeTimeout_ReturnsNull() + { + // Arrange - Ensure jobs are registered + await DistributedJobService.EnsureJobsAsync(); + + // Set the job as running with LastAttemptedRun just slightly before the timeout threshold + // This tests the boundary condition + var justBeforeTimeout = DateTime.UtcNow - TestJobPeriod - TestMaxExecutionTime + TimeSpan.FromSeconds(30); + SetJobState( + TestJobName, + lastRun: justBeforeTimeout - TestJobPeriod, + isRunning: true, + lastAttemptedRun: justBeforeTimeout); + + // Act + var job = await DistributedJobService.TryTakeRunnableAsync(); + + // Assert - Should NOT pick up because it hasn't quite timed out yet + Assert.IsNull(job); + } + + [Test] + public async Task EnsureJobsAsync_RemovesUnregisteredJobs() + { + // Arrange - First ensure our test job is registered + await DistributedJobService.EnsureJobsAsync(); + + // Add an orphaned job directly to the database (simulating a job that was removed from code) + var orphanedJobName = "OrphanedJob"; + InsertJob(orphanedJobName, TimeSpan.FromHours(1)); + + // Verify both jobs exist + Assert.IsNotNull(GetJobStateOrDefault(TestJobName)); + Assert.IsNotNull(GetJobStateOrDefault(orphanedJobName)); + + // Act - EnsureJobsAsync should remove the orphaned job + await DistributedJobService.EnsureJobsAsync(); + + // Assert + Assert.IsNotNull(GetJobStateOrDefault(TestJobName)); // Registered job still exists + Assert.IsNull(GetJobStateOrDefault(orphanedJobName)); // Orphaned job was removed + } + + [Test] + public async Task EnsureJobsAsync_UpdatesJobPeriodWhenChanged() + { + // Arrange - First ensure job is registered + await DistributedJobService.EnsureJobsAsync(); + + // Manually change the period in the database to simulate a mismatch + var originalPeriod = TestJobPeriod; + var differentPeriod = TimeSpan.FromHours(99); + UpdateJobPeriod(TestJobName, differentPeriod); + + // Verify the period was changed + var beforeUpdate = GetJobState(TestJobName); + Assert.AreEqual(differentPeriod.Ticks, beforeUpdate.Period); + + // Act - EnsureJobsAsync should update the period back to the registered value + await DistributedJobService.EnsureJobsAsync(); + + // Assert - Period should be restored to the registered value + var afterUpdate = GetJobState(TestJobName); + Assert.AreEqual(originalPeriod.Ticks, afterUpdate.Period); + } + + [Test] + public async Task EnsureJobsAsync_IsIdempotent() + { + // Act - Call EnsureJobsAsync multiple times + await DistributedJobService.EnsureJobsAsync(); + var afterFirst = GetJobState(TestJobName); + + await DistributedJobService.EnsureJobsAsync(); + var afterSecond = GetJobState(TestJobName); + + await DistributedJobService.EnsureJobsAsync(); + var afterThird = GetJobState(TestJobName); + + // Assert - Job should still exist with same properties + Assert.AreEqual(afterFirst.Id, afterSecond.Id); + Assert.AreEqual(afterSecond.Id, afterThird.Id); + Assert.AreEqual(TestJobPeriod.Ticks, afterThird.Period); + } + + private void SetJobState(string jobName, DateTime lastRun, bool isRunning, DateTime? lastAttemptedRun = null) + { + using var scope = ScopeProvider.CreateScope(); + var sql = ScopeAccessor.AmbientScope!.SqlContext.Sql() + .Update(u => u + .Set(x => x.LastRun, lastRun) + .Set(x => x.IsRunning, isRunning) + .Set(x => x.LastAttemptedRun, lastAttemptedRun ?? lastRun)) + .Where(x => x.Name == jobName); + + ScopeAccessor.AmbientScope.Database.Execute(sql); + scope.Complete(); + } + + private void InsertJob(string jobName, TimeSpan period) + { + using var scope = ScopeProvider.CreateScope(); + var dto = new DistributedJobDto + { + Name = jobName, + Period = period.Ticks, + LastRun = DateTime.UtcNow, + IsRunning = false, + LastAttemptedRun = DateTime.UtcNow, + }; + ScopeAccessor.AmbientScope!.Database.Insert(dto); + scope.Complete(); + } + + private void UpdateJobPeriod(string jobName, TimeSpan period) + { + using var scope = ScopeProvider.CreateScope(); + var sql = ScopeAccessor.AmbientScope!.SqlContext.Sql() + .Update(u => u.Set(x => x.Period, period.Ticks)) + .Where(x => x.Name == jobName); + + ScopeAccessor.AmbientScope.Database.Execute(sql); + scope.Complete(); + } + + private DistributedJobDto GetJobState(string jobName) + { + using var scope = ScopeProvider.CreateScope(); + var sql = ScopeAccessor.AmbientScope!.SqlContext.Sql() + .Select() + .From() + .Where(x => x.Name == jobName); + + var result = ScopeAccessor.AmbientScope.Database.Single(sql); + scope.Complete(); + return result; + } + + private DistributedJobDto? GetJobStateOrDefault(string jobName) + { + using var scope = ScopeProvider.CreateScope(); + var sql = ScopeAccessor.AmbientScope!.SqlContext.Sql() + .Select() + .From() + .Where(x => x.Name == jobName); + + var result = ScopeAccessor.AmbientScope.Database.FirstOrDefault(sql); + scope.Complete(); + return result; + } + + /// + /// A simple test implementation of . + /// + private sealed class TestDistributedJob : IDistributedBackgroundJob + { + public string Name => TestJobName; + + public TimeSpan Period => TestJobPeriod; + + public Task ExecuteAsync() => Task.CompletedTask; + } +}