diff --git a/KernelMemory.FileWatcher/Services/HttpWorker.cs b/KernelMemory.FileWatcher/Services/HttpWorker.cs index c4440de..bbe40c4 100644 --- a/KernelMemory.FileWatcher/Services/HttpWorker.cs +++ b/KernelMemory.FileWatcher/Services/HttpWorker.cs @@ -3,6 +3,8 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using System.Threading; +using Serilog.Data; namespace KernelMemory.FileWatcher.Services { @@ -28,43 +30,63 @@ public async Task StartAsync(CancellationToken cancellationToken) timer = new PeriodicTimer(options.Schedule); while (await timer.WaitForNextTickAsync(cancellationToken)) { - while (store.HasNext()) + var messages = store.TakeAll(); + if (messages.Any()) { - var message = store.TakeNext(); - if (message != null && message.Event?.EventType != FileEventType.Ignore) - { - var client = httpClientFactory.CreateClient("km-client"); - string endpoint; - HttpResponseMessage response = null!; + var tasks = new List(); - if (message.Event is { EventType: FileEventType.Upsert }) + foreach (var message in messages) + { + tasks.Add(async () => { - endpoint = "/upload"; + if (message.Event?.EventType != FileEventType.Ignore) + { + logger.LogInformation($"Processing message {message.DocumentId} for file {message.Event?.FileName} of type {message.Event?.EventType}"); + var client = httpClientFactory.CreateClient("km-client"); + string endpoint; + HttpResponseMessage? response = null; - var content = new MultipartFormDataContent(); - var fileContent = new StreamContent(File.OpenRead(message.Event.Directory)); - content.Add(fileContent, "file", message.Event.FileName); - content.Add(new StringContent(message.Index), "index"); - content.Add(new StringContent(message.DocumentId), "documentid"); - response = await client.PostAsync(endpoint, content, cancellationToken); - } - else if (message.Event is { EventType: FileEventType.Delete }) - { - endpoint = $"/documents?index={message.Index}&documentId={message.DocumentId}"; - response = await client.DeleteAsync(endpoint, cancellationToken); - } + if (message.Event is { EventType: FileEventType.Upsert }) + { + endpoint = "/upload"; - if (response.IsSuccessStatusCode) - { - logger.LogInformation($"Sent message {message.DocumentId} to {options.Endpoint}"); - } - else - { - logger.LogError($"Failed to send message {message.DocumentId} to {options.Endpoint}"); - } + var content = new MultipartFormDataContent(); + var fileContent = new StreamContent(File.OpenRead(message.Event.Directory)); + content.Add(fileContent, "file", message.Event.FileName); + content.Add(new StringContent(message.Index), "index"); + content.Add(new StringContent(message.DocumentId), "documentid"); + response = await client.PostAsync(endpoint, content); + } + else if (message.Event is { EventType: FileEventType.Delete }) + { + endpoint = $"/documents?index={message.Index}&documentId={message.DocumentId}"; + response = await client.DeleteAsync(endpoint); + } + + if (response is { IsSuccessStatusCode: true }) + { + logger.LogInformation($"Sent message {message.DocumentId} to {options.Endpoint}"); + } + else + { + logger.LogError($"Failed to send message {message.DocumentId} to {options.Endpoint}"); + } + } + }); } - } + var parallelOptions = new ParallelOptions + { + CancellationToken = cancellationToken, + MaxDegreeOfParallelism = options.ParallelUploads + }; + Parallel.Invoke(parallelOptions, tasks.ToArray()); + + } + else + { + logger.LogInformation("Nothing to process"); + } } } diff --git a/KernelMemory.FileWatcher/Services/MessageStore.cs b/KernelMemory.FileWatcher/Services/MessageStore.cs index 01514d6..062a5d7 100644 --- a/KernelMemory.FileWatcher/Services/MessageStore.cs +++ b/KernelMemory.FileWatcher/Services/MessageStore.cs @@ -12,6 +12,7 @@ internal interface IMessageStore { Task Add(FileEvent fileEvent); public Message? TakeNext(); + public List TakeAll(); public bool HasNext(); } @@ -63,6 +64,18 @@ public Task Add(FileEvent fileEvent) return store.TryRemove(store.Keys.First(), out var message) ? message : null; } + public List TakeAll() + { + var result = new List(); + foreach (var key in store.Keys) + { + store.TryRemove(key, out var message); + if(message!=null) { result.Add(message); } + } + + return result; + } + public bool HasNext() { return store.Count > 0;