Skip to content

Commit

Permalink
parallel upload
Browse files Browse the repository at this point in the history
  • Loading branch information
mplogas committed Mar 17, 2024
1 parent cb236af commit a14b905
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 30 deletions.
82 changes: 52 additions & 30 deletions KernelMemory.FileWatcher/Services/HttpWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Threading;
using Serilog.Data;

namespace KernelMemory.FileWatcher.Services
{
Expand All @@ -28,43 +30,63 @@ public async Task StartAsync(CancellationToken cancellationToken)
timer = new PeriodicTimer(options.Schedule);
while (await timer.WaitForNextTickAsync(cancellationToken))
{
while (store.HasNext())
var messages = store.TakeAll();
if (messages.Any())
{
var message = store.TakeNext();
if (message != null && message.Event?.EventType != FileEventType.Ignore)
{
var client = httpClientFactory.CreateClient("km-client");
string endpoint;
HttpResponseMessage response = null!;
var tasks = new List<Action>();

if (message.Event is { EventType: FileEventType.Upsert })
foreach (var message in messages)
{
tasks.Add(async () =>
{
endpoint = "/upload";
if (message.Event?.EventType != FileEventType.Ignore)
{
logger.LogInformation($"Processing message {message.DocumentId} for file {message.Event?.FileName} of type {message.Event?.EventType}");
var client = httpClientFactory.CreateClient("km-client");
string endpoint;
HttpResponseMessage? response = null;

var content = new MultipartFormDataContent();
var fileContent = new StreamContent(File.OpenRead(message.Event.Directory));
content.Add(fileContent, "file", message.Event.FileName);
content.Add(new StringContent(message.Index), "index");
content.Add(new StringContent(message.DocumentId), "documentid");
response = await client.PostAsync(endpoint, content, cancellationToken);
}
else if (message.Event is { EventType: FileEventType.Delete })
{
endpoint = $"/documents?index={message.Index}&documentId={message.DocumentId}";
response = await client.DeleteAsync(endpoint, cancellationToken);
}
if (message.Event is { EventType: FileEventType.Upsert })
{
endpoint = "/upload";

if (response.IsSuccessStatusCode)
{
logger.LogInformation($"Sent message {message.DocumentId} to {options.Endpoint}");
}
else
{
logger.LogError($"Failed to send message {message.DocumentId} to {options.Endpoint}");
}
var content = new MultipartFormDataContent();
var fileContent = new StreamContent(File.OpenRead(message.Event.Directory));
content.Add(fileContent, "file", message.Event.FileName);
content.Add(new StringContent(message.Index), "index");
content.Add(new StringContent(message.DocumentId), "documentid");
response = await client.PostAsync(endpoint, content);
}
else if (message.Event is { EventType: FileEventType.Delete })
{
endpoint = $"/documents?index={message.Index}&documentId={message.DocumentId}";
response = await client.DeleteAsync(endpoint);
}

if (response is { IsSuccessStatusCode: true })
{
logger.LogInformation($"Sent message {message.DocumentId} to {options.Endpoint}");
}
else
{
logger.LogError($"Failed to send message {message.DocumentId} to {options.Endpoint}");
}
}
});
}
}

var parallelOptions = new ParallelOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = options.ParallelUploads
};
Parallel.Invoke(parallelOptions, tasks.ToArray());

}
else
{
logger.LogInformation("Nothing to process");
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions KernelMemory.FileWatcher/Services/MessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal interface IMessageStore
{
Task Add(FileEvent fileEvent);
public Message? TakeNext();
public List<Message> TakeAll();
public bool HasNext();
}

Expand Down Expand Up @@ -63,6 +64,18 @@ public Task Add(FileEvent fileEvent)
return store.TryRemove(store.Keys.First(), out var message) ? message : null;
}

public List<Message> TakeAll()
{
var result = new List<Message>();
foreach (var key in store.Keys)
{
store.TryRemove(key, out var message);
if(message!=null) { result.Add(message); }
}

return result;
}

public bool HasNext()
{
return store.Count > 0;
Expand Down

0 comments on commit a14b905

Please sign in to comment.