diff --git a/src/DbInteractions/DbBackgroundService.cs b/src/DbInteractions/DbBackgroundService.cs index 611f4b9..0ee7a08 100644 --- a/src/DbInteractions/DbBackgroundService.cs +++ b/src/DbInteractions/DbBackgroundService.cs @@ -123,6 +123,18 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) bool result = await dbInteractionService.IsOfflocReadyForProcessing(); dbMessagingService.DbPublishResponse(new IsOfflocReadyForProcessingReturnMessage(result)); }, TDbQueue.IsOfflocReadyForProcessing); + + dbMessagingService.SubscribeToDbRequest(async (message) => + { + string? result = await dbInteractionService.GetLastProcessedOfflocFileName(); + dbMessagingService.DbPublishResponse(new ResultGetLastProcessedOfflocFileMessage(result)); + }, TDbQueue.GetLastProcessedOfflocFile); + + dbMessagingService.SubscribeToDbRequest(async (message) => + { + string? result = await dbInteractionService.GetLastProcessedDeliusFileName(); + dbMessagingService.DbPublishResponse(new ResultGetLastProcessedDeliusFileMessage(result)); + }, TDbQueue.GetLastProcessedDeliusFile); } } diff --git a/src/DbInteractions/Services/DbInteractionService.cs b/src/DbInteractions/Services/DbInteractionService.cs index 83f2b0e..4c6b73e 100644 --- a/src/DbInteractions/Services/DbInteractionService.cs +++ b/src/DbInteractions/Services/DbInteractionService.cs @@ -522,4 +522,68 @@ WHERE Status <> 'Merged' } } } + + public async Task GetLastProcessedOfflocFileName() + { + SqlConnection offlocConn = new(offlocPictureConnString); + + using (offlocConn) + { + await offlocConn.OpenAsync(); + + var command = new SqlCommand(@" + SELECT TOP 1 FileName + FROM OfflocRunningPicture.ProcessedFiles + ORDER BY ValidFrom DESC", offlocConn) + { + CommandType = CommandType.Text, + CommandTimeout = 1200 + }; + + try + { +#pragma warning disable CS8605 // Unboxing a possibly null value. + string? result = (string?)await command.ExecuteScalarAsync(); +#pragma warning restore CS8605 // Unboxing a possibly null value. + return result; + } + catch (SqlException e) + { + statusService.StatusPublish(new StatusUpdateMessage(e.Message)); + return null; + } + } + } + + public async Task GetLastProcessedDeliusFileName() + { + SqlConnection deliusConn = new(deliusPictureConnString); + + using (deliusConn) + { + await deliusConn.OpenAsync(); + + var command = new SqlCommand(@" + SELECT TOP 1 FileName + FROM DeliusRunningPicture.ProcessedFiles + ORDER BY ValidFrom DESC", deliusConn) + { + CommandType = CommandType.Text, + CommandTimeout = 1200 + }; + + try + { +#pragma warning disable CS8605 // Unboxing a possibly null value. + string? result = (string?)await command.ExecuteScalarAsync(); +#pragma warning restore CS8605 // Unboxing a possibly null value. + return result; + } + catch (SqlException e) + { + statusService.StatusPublish(new StatusUpdateMessage(e.Message)); + return null; + } + } + } } diff --git a/src/DbInteractions/Services/IDbInteractionService.cs b/src/DbInteractions/Services/IDbInteractionService.cs index 66f3f11..b2efff2 100644 --- a/src/DbInteractions/Services/IDbInteractionService.cs +++ b/src/DbInteractions/Services/IDbInteractionService.cs @@ -22,4 +22,6 @@ public interface IDbInteractionService Task AssociateOfflocFileWithArchive(string fileName, string archiveName); Task IsDeliusReadyForProcessing(); Task IsOfflocReadyForProcessing(); + Task GetLastProcessedOfflocFileName(); + Task GetLastProcessedDeliusFileName(); } diff --git a/src/FileSync/Configuration/SyncOptions.cs b/src/FileSync/Configuration/SyncOptions.cs index d1dfbbb..08cf935 100644 --- a/src/FileSync/Configuration/SyncOptions.cs +++ b/src/FileSync/Configuration/SyncOptions.cs @@ -42,4 +42,16 @@ public class SyncOptions /// A value less than or equal to zero disables interval-based processing. /// public required int ProcessTimerIntervalSeconds { get; init; } + + /// + /// Indicates whether the application should allow processing of files with timestamps + /// older than previously processed files. + /// + /// + /// When set to false, the application will throw an exception if it encounters a file + /// with a timestamp older than the most recently processed file, preventing potential + /// data inconsistencies. + /// When set to true, files will be processed regardless of their timestamp relative to previous runs. + /// + public bool AllowProcessingOlderFiles { get; init; } } \ No newline at end of file diff --git a/src/FileSync/FileSyncBackgroundService.cs b/src/FileSync/FileSyncBackgroundService.cs index 09e622f..c83541b 100644 --- a/src/FileSync/FileSyncBackgroundService.cs +++ b/src/FileSync/FileSyncBackgroundService.cs @@ -29,6 +29,8 @@ public class FileSyncBackgroundService( { private Timer? timer = null; + private readonly SemaphoreSlim _lock = new(1, 1); + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { logger.LogInformation("Starting service..."); @@ -47,7 +49,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) if (syncOptions.Value is { ProcessOnTimer: true, ProcessTimerIntervalSeconds: > 0 }) { timer = new Timer( - callback: async (state) => await ProcessAsync(stoppingToken), + callback: async (state) => + { + logger.LogInformation("Timer elapsed, begin processing..."); + await ProcessAsync(stoppingToken); + }, state: null, dueTime: TimeSpan.FromSeconds(syncOptions.Value.ProcessTimerIntervalSeconds), period: TimeSpan.FromSeconds(syncOptions.Value.ProcessTimerIntervalSeconds)); @@ -55,6 +61,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) if (syncOptions.Value.ProcessOnStartup) { + logger.LogInformation("Processing on startup configured, beginning processing..."); await ProcessAsync(stoppingToken); } @@ -73,36 +80,87 @@ async Task ProcessMessageAsync(Message message, CancellationToken cancellationTo async Task ProcessAsync(CancellationToken cancellationToken = default) { - var isDeliusReady = await IsDeliusReady(); - var isOfflocReady = await IsOfflocReady(); - - var notReady = (isDeliusReady, isOfflocReady) switch + if (await _lock.WaitAsync(0, cancellationToken) is false) { - (false, false) => "Delius and Offloc are not ready for processing.", - (false, true) => "Delius is not ready for processing.", - (true, false) => "Offloc is not ready for processing.", - _ => null - }; - - if (!string.IsNullOrEmpty(notReady)) - { - logger.LogWarning($"{notReady} Exiting."); + logger.LogInformation("Previous processing still ongoing. Skipping this run."); return; } - await PreKickoffTasks(); + try + { + logger.LogInformation("Processing..."); + + var isDeliusReady = await IsDeliusReady(); + var isOfflocReady = await IsOfflocReady(); - var unprocessedDeliusFile = await GetNextUnprocessedDeliusFileAsync(cancellationToken); - var unprocessedOfflocFile = await GetNextUnprocessedOfflocFileAsync(cancellationToken); + var notReady = (isDeliusReady, isOfflocReady) switch + { + (false, false) => "Delius and Offloc are not ready for processing.", + (false, true) => "Delius is not ready for processing.", + (true, false) => "Offloc is not ready for processing.", + _ => null + }; - // No files to process - if (unprocessedDeliusFile is null || unprocessedOfflocFile is null) + if (!string.IsNullOrEmpty(notReady)) + { + logger.LogWarning($"{notReady} Exiting."); + return; + } + + await PreKickoffTasks(); + + var unprocessedDeliusFile = await GetNextUnprocessedDeliusFileAsync(cancellationToken); + var unprocessedOfflocFile = await GetNextUnprocessedOfflocFileAsync(cancellationToken); + + // No files to process + if (unprocessedDeliusFile is null || unprocessedOfflocFile is null) + { + var notAvailable = (unprocessedDeliusFile, unprocessedOfflocFile) switch + { + (null, not null) => $"Delius file is not available, but Offloc file ({unprocessedOfflocFile.Name}) is ready.", + (not null, null) => $"Offloc file is not available, but Delius file ({unprocessedDeliusFile.Name}) is ready.", + _ => null + }; + + if (!string.IsNullOrEmpty(notAvailable)) + { + logger.LogWarning($"{notAvailable} Exiting."); + } + + return; + } + + // Check to see if a file newer than the unprocessed files have already been processed. + if (syncOptions.Value.AllowProcessingOlderFiles is false) + { + var isDeliusFileNewerThanLastProcessed = await IsDeliusFileNewerThanLastProcessed(unprocessedDeliusFile, cancellationToken); + var isOfflocFileNewerThanLastProcessed = await IsOfflocFileNewerThanLastProcessed(unprocessedOfflocFile, cancellationToken); + + var isEitherFileOutdated = (isDeliusFileNewerThanLastProcessed, isOfflocFileNewerThanLastProcessed) switch + { + (false, true) => "Delius file is older than the last processed Delius file.", + (true, false) => "Offloc file is older than the last processed Offloc file.", + (false, false) => "Both Delius and Offloc files are older than their last processed files.", + _ => null + }; + + if (!string.IsNullOrEmpty(isEitherFileOutdated)) + { + logger.LogError($"{isEitherFileOutdated} Exiting."); + return; + } + } + + logger.LogInformation($"Targeting: Delius file ({unprocessedDeliusFile.Name}), Offloc file ({unprocessedOfflocFile.Name})"); + + stagingMessagingService.StagingPublish(new DeliusDownloadFinishedMessage(unprocessedDeliusFile.Name, unprocessedDeliusFile.GetFileId())); + stagingMessagingService.StagingPublish(new OfflocDownloadFinished(unprocessedOfflocFile.Name, unprocessedOfflocFile.GetFileId()!.Value, unprocessedOfflocFile.ParentArchiveName)); + } + finally { - return; + logger.LogInformation("Processing complete."); + _lock.Release(); } - - stagingMessagingService.StagingPublish(new DeliusDownloadFinishedMessage(unprocessedDeliusFile.Name, unprocessedDeliusFile.GetFileId())); - stagingMessagingService.StagingPublish(new OfflocDownloadFinished(unprocessedOfflocFile.Name, unprocessedOfflocFile.GetFileId()!.Value, unprocessedOfflocFile.ParentArchiveName)); } private async Task IsOfflocReady() @@ -160,7 +218,7 @@ private async Task IsDeliusReady() // If the downloaded file is an archive, extract it and check the contained file. // We only support archives containing a single Offloc (.dat) file. logger.LogInformation("Extracting Offloc archive: " + unprocessedOfflocFile.Name); - await ZipFile.ExtractToDirectoryAsync(downloadedFile, fileLocations.offlocInput, cancellationToken); + await ZipFile.ExtractToDirectoryAsync(downloadedFile, fileLocations.offlocInput, overwriteFiles: true, cancellationToken); File.Delete(downloadedFile); var filePath = Directory.GetFiles(fileLocations.offlocInput) @@ -229,6 +287,24 @@ private async Task PreKickoffTasks() await dbMessagingService.SendDbRequestAndWaitForResponse(new ClearDeliusStaging()); await dbMessagingService.SendDbRequestAndWaitForResponse(new ClearOfflocStaging()); LogStatus("Staging database tear down complete."); + + // Delete any files in input directories + LogStatus("Clearing input directories..."); + int counter = 0; + + foreach (var file in Directory.GetFiles(fileLocations.deliusInput)) + { + File.Delete(file); + counter++; + } + + foreach (var file in Directory.GetFiles(fileLocations.offlocInput)) + { + File.Delete(file); + counter++; + } + + LogStatus("Input directories cleared. Deleted " + counter + " file(s)."); } void LogStatus(string message) @@ -241,4 +317,40 @@ public override void Dispose() { timer?.Dispose(); } + + public async Task IsDeliusFileNewerThanLastProcessed(DeliusFile unprocessedDeliusFile, CancellationToken cancellationToken = default) + { + var lastProcessedDeliusFileName = await dbMessagingService.SendDbRequestAndWaitForResponse(new()); + + if(!string.IsNullOrEmpty(lastProcessedDeliusFileName.fileName)) + { + var file = new DeliusFile(lastProcessedDeliusFileName.fileName); + + if(file.GetDatestamp() >= unprocessedDeliusFile.GetDatestamp()) + { + logger.LogWarning($"The last processed Delius file ({file.Name}) is newer than or the same as the targeted Delius file ({unprocessedDeliusFile.Name})."); + return false; + } + } + + return true; + } + + public async Task IsOfflocFileNewerThanLastProcessed(OfflocFile unprocessedOfflocFile, CancellationToken cancellationToken = default) + { + var lastProcessedOfflocFileName = await dbMessagingService.SendDbRequestAndWaitForResponse(new()); + + if(!string.IsNullOrEmpty(lastProcessedOfflocFileName.fileName)) + { + var file = new OfflocFile(lastProcessedOfflocFileName.fileName); + + if(file.GetDatestamp() >= unprocessedOfflocFile.GetDatestamp()) + { + logger.LogWarning($"The last processed Offloc file ({file.Name}) is newer than or the same as the targeted Offloc file ({unprocessedOfflocFile.Name})."); + return false; + } + } + + return true; + } } diff --git a/src/FileSync/appsettings.json b/src/FileSync/appsettings.json index 23eda9e..3ee8180 100644 --- a/src/FileSync/appsettings.json +++ b/src/FileSync/appsettings.json @@ -3,7 +3,8 @@ "ProcessOnStartup": true, "ProcessOnCompletion": true, "ProcessOnTimer": true, - "ProcessTimerIntervalSeconds": 30 + "ProcessTimerIntervalSeconds": 30, + "AllowProcessingOlderFiles": false }, "FileSourceProvider": "MinIO", // S3, FileSystem, MinIO // Required if using 'S3' file source provider diff --git a/src/Messaging/Messages/DbMessages/Receiving/ResultGetLastProcessedDeliusFileMessage.cs b/src/Messaging/Messages/DbMessages/Receiving/ResultGetLastProcessedDeliusFileMessage.cs new file mode 100644 index 0000000..e200fc2 --- /dev/null +++ b/src/Messaging/Messages/DbMessages/Receiving/ResultGetLastProcessedDeliusFileMessage.cs @@ -0,0 +1,23 @@ +using System.Text.Json.Serialization; +using Messaging.Messages.StatusMessages; +using Messaging.Queues; + +namespace Messaging.Messages.DbMessages.Receiving; + +public class ResultGetLastProcessedDeliusFileMessage : DbResponseMessage +{ + public string? fileName; + + public override StatusUpdateMessage StatusMessage => new(); + + [JsonConstructor] + public ResultGetLastProcessedDeliusFileMessage() + { + Queue = TDbQueue.ResultLastProcessedDeliusFile; + } + + public ResultGetLastProcessedDeliusFileMessage(string? fileName) : this() + { + this.fileName = fileName; + } +} diff --git a/src/Messaging/Messages/DbMessages/Receiving/ResultGetLastProcessedOfflocFileMessage.cs b/src/Messaging/Messages/DbMessages/Receiving/ResultGetLastProcessedOfflocFileMessage.cs new file mode 100644 index 0000000..975ad1f --- /dev/null +++ b/src/Messaging/Messages/DbMessages/Receiving/ResultGetLastProcessedOfflocFileMessage.cs @@ -0,0 +1,23 @@ +using System.Text.Json.Serialization; +using Messaging.Messages.StatusMessages; +using Messaging.Queues; + +namespace Messaging.Messages.DbMessages.Receiving; + +public class ResultGetLastProcessedOfflocFileMessage : DbResponseMessage +{ + public string? fileName; + + public override StatusUpdateMessage StatusMessage => new(); + + [JsonConstructor] + public ResultGetLastProcessedOfflocFileMessage() + { + Queue = TDbQueue.ResultLastProcessedOfflocFile; + } + + public ResultGetLastProcessedOfflocFileMessage(string? fileName) : this() + { + this.fileName = fileName; + } +} diff --git a/src/Messaging/Messages/DbMessages/Sending/GetLastProcessedDeliusFile.cs b/src/Messaging/Messages/DbMessages/Sending/GetLastProcessedDeliusFile.cs new file mode 100644 index 0000000..3781c47 --- /dev/null +++ b/src/Messaging/Messages/DbMessages/Sending/GetLastProcessedDeliusFile.cs @@ -0,0 +1,15 @@ +using Messaging.Messages.StatusMessages; +using Messaging.Queues; + +namespace Messaging.Messages.DbMessages.Sending; + +public class GetLastProcessedDeliusFile : DbRequestMessage +{ + public override StatusUpdateMessage StatusMessage => new("Getting last processed delius file."); + + public GetLastProcessedDeliusFile() + { + Queue = TDbQueue.GetLastProcessedDeliusFile; + ReplyQueue = TDbQueue.ResultLastProcessedDeliusFile; + } +} \ No newline at end of file diff --git a/src/Messaging/Messages/DbMessages/Sending/GetLastProcessedOfflocFile.cs b/src/Messaging/Messages/DbMessages/Sending/GetLastProcessedOfflocFile.cs new file mode 100644 index 0000000..8dfab26 --- /dev/null +++ b/src/Messaging/Messages/DbMessages/Sending/GetLastProcessedOfflocFile.cs @@ -0,0 +1,16 @@ +using Messaging.Messages.StatusMessages; +using Messaging.Queues; + +namespace Messaging.Messages.DbMessages.Sending; + +public class GetLastProcessedOfflocFile : DbRequestMessage +{ + public override StatusUpdateMessage StatusMessage => new("Getting last processed offloc file."); + + public GetLastProcessedOfflocFile() + { + Queue = TDbQueue.GetLastProcessedOfflocFile; + ReplyQueue = TDbQueue.ResultLastProcessedOfflocFile; + } + +} \ No newline at end of file diff --git a/src/Messaging/Queues/TDbQueue.cs b/src/Messaging/Queues/TDbQueue.cs index 3d589f5..ccd511a 100644 --- a/src/Messaging/Queues/TDbQueue.cs +++ b/src/Messaging/Queues/TDbQueue.cs @@ -22,6 +22,8 @@ public enum TDbQueue DeliusFileProcessingStarted, IsDeliusReadyForProcessing, IsOfflocReadyForProcessing, + GetLastProcessedDeliusFile, + GetLastProcessedOfflocFile, //Incoming requests. ReturnedOfflocFiles, ReturnedOfflocFileDates, @@ -38,5 +40,7 @@ public enum TDbQueue ResultDeliusFileProcessingStarted, ResultAssociateOfflocFileWithArchive, IsDeliusReadyForProcessingResult, - IsOfflocReadyForProcessingResult + IsOfflocReadyForProcessingResult, + ResultLastProcessedDeliusFile, + ResultLastProcessedOfflocFile }