Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/DbInteractions/DbBackgroundService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
bool result = await dbInteractionService.IsOfflocReadyForProcessing();
dbMessagingService.DbPublishResponse(new IsOfflocReadyForProcessingReturnMessage(result));
}, TDbQueue.IsOfflocReadyForProcessing);

dbMessagingService.SubscribeToDbRequest<GetLastProcessedOfflocFile>(async (message) =>
{
string? result = await dbInteractionService.GetLastProcessedOfflocFileName();
dbMessagingService.DbPublishResponse(new ResultGetLastProcessedOfflocFileMessage(result));
}, TDbQueue.GetLastProcessedOfflocFile);

dbMessagingService.SubscribeToDbRequest<GetLastProcessedDeliusFile>(async (message) =>
{
string? result = await dbInteractionService.GetLastProcessedDeliusFileName();
dbMessagingService.DbPublishResponse(new ResultGetLastProcessedDeliusFileMessage(result));
}, TDbQueue.GetLastProcessedDeliusFile);

}
}
64 changes: 64 additions & 0 deletions src/DbInteractions/Services/DbInteractionService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -522,4 +522,68 @@ WHERE Status <> 'Merged'
}
}
}

public async Task<string?> GetLastProcessedOfflocFileName()
{
SqlConnection offlocConn = new(offlocPictureConnString);

using (offlocConn)
{
await offlocConn.OpenAsync();

var command = new SqlCommand(@"
SELECT TOP 1 FileName
FROM OfflocRunningPicture.ProcessedFiles
ORDER BY ValidFrom DESC", offlocConn)
{
CommandType = CommandType.Text,
CommandTimeout = 1200
};

try
{
#pragma warning disable CS8605 // Unboxing a possibly null value.
string? result = (string?)await command.ExecuteScalarAsync();
#pragma warning restore CS8605 // Unboxing a possibly null value.
return result;
}
catch (SqlException e)
{
statusService.StatusPublish(new StatusUpdateMessage(e.Message));
return null;
}
}
}

public async Task<string?> GetLastProcessedDeliusFileName()
{
SqlConnection deliusConn = new(deliusPictureConnString);

using (deliusConn)
{
await deliusConn.OpenAsync();

var command = new SqlCommand(@"
SELECT TOP 1 FileName
FROM DeliusRunningPicture.ProcessedFiles
ORDER BY ValidFrom DESC", deliusConn)
{
CommandType = CommandType.Text,
CommandTimeout = 1200
};

try
{
#pragma warning disable CS8605 // Unboxing a possibly null value.
string? result = (string?)await command.ExecuteScalarAsync();
#pragma warning restore CS8605 // Unboxing a possibly null value.
return result;
}
catch (SqlException e)
{
statusService.StatusPublish(new StatusUpdateMessage(e.Message));
return null;
}
}
}
}
2 changes: 2 additions & 0 deletions src/DbInteractions/Services/IDbInteractionService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ public interface IDbInteractionService
Task AssociateOfflocFileWithArchive(string fileName, string archiveName);
Task<bool> IsDeliusReadyForProcessing();
Task<bool> IsOfflocReadyForProcessing();
Task<string?> GetLastProcessedOfflocFileName();
Task<string?> GetLastProcessedDeliusFileName();
}
12 changes: 12 additions & 0 deletions src/FileSync/Configuration/SyncOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,16 @@ public class SyncOptions
/// A value less than or equal to zero disables interval-based processing.
/// </value>
public required int ProcessTimerIntervalSeconds { get; init; }

/// <summary>
/// Indicates whether the application should allow processing of files with timestamps
/// older than previously processed files.
/// </summary>
/// <remarks>
/// When set to <c>false</c>, the application will throw an exception if it encounters a file
/// with a timestamp older than the most recently processed file, preventing potential
/// data inconsistencies.
/// When set to <c>true</c>, files will be processed regardless of their timestamp relative to previous runs.
/// </remarks>
public bool AllowProcessingOlderFiles { get; init; }
}
160 changes: 136 additions & 24 deletions src/FileSync/FileSyncBackgroundService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class FileSyncBackgroundService(
{
private Timer? timer = null;

private readonly SemaphoreSlim _lock = new(1, 1);

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("Starting service...");
Expand All @@ -47,14 +49,19 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
if (syncOptions.Value is { ProcessOnTimer: true, ProcessTimerIntervalSeconds: > 0 })
{
timer = new Timer(
callback: async (state) => await ProcessAsync(stoppingToken),
callback: async (state) =>
{
logger.LogInformation("Timer elapsed, begin processing...");
await ProcessAsync(stoppingToken);
},
state: null,
dueTime: TimeSpan.FromSeconds(syncOptions.Value.ProcessTimerIntervalSeconds),
period: TimeSpan.FromSeconds(syncOptions.Value.ProcessTimerIntervalSeconds));
}

if (syncOptions.Value.ProcessOnStartup)
{
logger.LogInformation("Processing on startup configured, beginning processing...");
await ProcessAsync(stoppingToken);
}

Expand All @@ -73,36 +80,87 @@ async Task ProcessMessageAsync(Message message, CancellationToken cancellationTo

async Task ProcessAsync(CancellationToken cancellationToken = default)
{
var isDeliusReady = await IsDeliusReady();
var isOfflocReady = await IsOfflocReady();

var notReady = (isDeliusReady, isOfflocReady) switch
if (await _lock.WaitAsync(0, cancellationToken) is false)
{
(false, false) => "Delius and Offloc are not ready for processing.",
(false, true) => "Delius is not ready for processing.",
(true, false) => "Offloc is not ready for processing.",
_ => null
};

if (!string.IsNullOrEmpty(notReady))
{
logger.LogWarning($"{notReady} Exiting.");
logger.LogInformation("Previous processing still ongoing. Skipping this run.");
return;
}

await PreKickoffTasks();
try
{
logger.LogInformation("Processing...");

var isDeliusReady = await IsDeliusReady();
var isOfflocReady = await IsOfflocReady();

var unprocessedDeliusFile = await GetNextUnprocessedDeliusFileAsync(cancellationToken);
var unprocessedOfflocFile = await GetNextUnprocessedOfflocFileAsync(cancellationToken);
var notReady = (isDeliusReady, isOfflocReady) switch
{
(false, false) => "Delius and Offloc are not ready for processing.",
(false, true) => "Delius is not ready for processing.",
(true, false) => "Offloc is not ready for processing.",
_ => null
};

// No files to process
if (unprocessedDeliusFile is null || unprocessedOfflocFile is null)
if (!string.IsNullOrEmpty(notReady))
{
logger.LogWarning($"{notReady} Exiting.");
return;
}

await PreKickoffTasks();

var unprocessedDeliusFile = await GetNextUnprocessedDeliusFileAsync(cancellationToken);
var unprocessedOfflocFile = await GetNextUnprocessedOfflocFileAsync(cancellationToken);

// No files to process
if (unprocessedDeliusFile is null || unprocessedOfflocFile is null)
{
var notAvailable = (unprocessedDeliusFile, unprocessedOfflocFile) switch
{
(null, not null) => $"Delius file is not available, but Offloc file ({unprocessedOfflocFile.Name}) is ready.",
(not null, null) => $"Offloc file is not available, but Delius file ({unprocessedDeliusFile.Name}) is ready.",
_ => null
};

if (!string.IsNullOrEmpty(notAvailable))
{
logger.LogWarning($"{notAvailable} Exiting.");
}

return;
}

// Check to see if a file newer than the unprocessed files have already been processed.
if (syncOptions.Value.AllowProcessingOlderFiles is false)
{
var isDeliusFileNewerThanLastProcessed = await IsDeliusFileNewerThanLastProcessed(unprocessedDeliusFile, cancellationToken);
var isOfflocFileNewerThanLastProcessed = await IsOfflocFileNewerThanLastProcessed(unprocessedOfflocFile, cancellationToken);

var isEitherFileOutdated = (isDeliusFileNewerThanLastProcessed, isOfflocFileNewerThanLastProcessed) switch
{
(false, true) => "Delius file is older than the last processed Delius file.",
(true, false) => "Offloc file is older than the last processed Offloc file.",
(false, false) => "Both Delius and Offloc files are older than their last processed files.",
_ => null
};

if (!string.IsNullOrEmpty(isEitherFileOutdated))
{
logger.LogError($"{isEitherFileOutdated} Exiting.");
return;
}
}

logger.LogInformation($"Targeting: Delius file ({unprocessedDeliusFile.Name}), Offloc file ({unprocessedOfflocFile.Name})");

stagingMessagingService.StagingPublish(new DeliusDownloadFinishedMessage(unprocessedDeliusFile.Name, unprocessedDeliusFile.GetFileId()));
stagingMessagingService.StagingPublish(new OfflocDownloadFinished(unprocessedOfflocFile.Name, unprocessedOfflocFile.GetFileId()!.Value, unprocessedOfflocFile.ParentArchiveName));
}
finally
{
return;
logger.LogInformation("Processing complete.");
_lock.Release();
}

stagingMessagingService.StagingPublish(new DeliusDownloadFinishedMessage(unprocessedDeliusFile.Name, unprocessedDeliusFile.GetFileId()));
stagingMessagingService.StagingPublish(new OfflocDownloadFinished(unprocessedOfflocFile.Name, unprocessedOfflocFile.GetFileId()!.Value, unprocessedOfflocFile.ParentArchiveName));
}

private async Task<bool> IsOfflocReady()
Expand Down Expand Up @@ -160,7 +218,7 @@ private async Task<bool> IsDeliusReady()
// If the downloaded file is an archive, extract it and check the contained file.
// We only support archives containing a single Offloc (.dat) file.
logger.LogInformation("Extracting Offloc archive: " + unprocessedOfflocFile.Name);
await ZipFile.ExtractToDirectoryAsync(downloadedFile, fileLocations.offlocInput, cancellationToken);
await ZipFile.ExtractToDirectoryAsync(downloadedFile, fileLocations.offlocInput, overwriteFiles: true, cancellationToken);
File.Delete(downloadedFile);

var filePath = Directory.GetFiles(fileLocations.offlocInput)
Expand Down Expand Up @@ -229,6 +287,24 @@ private async Task PreKickoffTasks()
await dbMessagingService.SendDbRequestAndWaitForResponse<ClearDeliusStaging, ResultClearDeliusStaging>(new ClearDeliusStaging());
await dbMessagingService.SendDbRequestAndWaitForResponse<ClearOfflocStaging, ResultClearOfflocStaging>(new ClearOfflocStaging());
LogStatus("Staging database tear down complete.");

// Delete any files in input directories
LogStatus("Clearing input directories...");
int counter = 0;

foreach (var file in Directory.GetFiles(fileLocations.deliusInput))
{
File.Delete(file);
counter++;
}

foreach (var file in Directory.GetFiles(fileLocations.offlocInput))
{
File.Delete(file);
counter++;
}

LogStatus("Input directories cleared. Deleted " + counter + " file(s).");
}

void LogStatus(string message)
Expand All @@ -241,4 +317,40 @@ public override void Dispose()
{
timer?.Dispose();
}

public async Task<bool> IsDeliusFileNewerThanLastProcessed(DeliusFile unprocessedDeliusFile, CancellationToken cancellationToken = default)
{
var lastProcessedDeliusFileName = await dbMessagingService.SendDbRequestAndWaitForResponse<GetLastProcessedDeliusFile, ResultGetLastProcessedDeliusFileMessage>(new());

if(!string.IsNullOrEmpty(lastProcessedDeliusFileName.fileName))
{
var file = new DeliusFile(lastProcessedDeliusFileName.fileName);

if(file.GetDatestamp() >= unprocessedDeliusFile.GetDatestamp())
{
logger.LogWarning($"The last processed Delius file ({file.Name}) is newer than or the same as the targeted Delius file ({unprocessedDeliusFile.Name}).");
return false;
}
}

return true;
}

public async Task<bool> IsOfflocFileNewerThanLastProcessed(OfflocFile unprocessedOfflocFile, CancellationToken cancellationToken = default)
{
var lastProcessedOfflocFileName = await dbMessagingService.SendDbRequestAndWaitForResponse<GetLastProcessedOfflocFile, ResultGetLastProcessedOfflocFileMessage>(new());

if(!string.IsNullOrEmpty(lastProcessedOfflocFileName.fileName))
{
var file = new OfflocFile(lastProcessedOfflocFileName.fileName);

if(file.GetDatestamp() >= unprocessedOfflocFile.GetDatestamp())
{
logger.LogWarning($"The last processed Offloc file ({file.Name}) is newer than or the same as the targeted Offloc file ({unprocessedOfflocFile.Name}).");
return false;
}
}

return true;
}
}
3 changes: 2 additions & 1 deletion src/FileSync/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
"ProcessOnStartup": true,
"ProcessOnCompletion": true,
"ProcessOnTimer": true,
"ProcessTimerIntervalSeconds": 30
"ProcessTimerIntervalSeconds": 30,
"AllowProcessingOlderFiles": false
},
"FileSourceProvider": "MinIO", // S3, FileSystem, MinIO
// Required if using 'S3' file source provider
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System.Text.Json.Serialization;
using Messaging.Messages.StatusMessages;
using Messaging.Queues;

namespace Messaging.Messages.DbMessages.Receiving;

public class ResultGetLastProcessedDeliusFileMessage : DbResponseMessage
{
public string? fileName;

public override StatusUpdateMessage StatusMessage => new();

[JsonConstructor]
public ResultGetLastProcessedDeliusFileMessage()
{
Queue = TDbQueue.ResultLastProcessedDeliusFile;
}

public ResultGetLastProcessedDeliusFileMessage(string? fileName) : this()
{
this.fileName = fileName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System.Text.Json.Serialization;
using Messaging.Messages.StatusMessages;
using Messaging.Queues;

namespace Messaging.Messages.DbMessages.Receiving;

public class ResultGetLastProcessedOfflocFileMessage : DbResponseMessage
{
public string? fileName;

public override StatusUpdateMessage StatusMessage => new();

[JsonConstructor]
public ResultGetLastProcessedOfflocFileMessage()
{
Queue = TDbQueue.ResultLastProcessedOfflocFile;
}

public ResultGetLastProcessedOfflocFileMessage(string? fileName) : this()
{
this.fileName = fileName;
}
}
Loading