Skip to content

Commit

Permalink
feat: service discovery and manual config
Browse files Browse the repository at this point in the history
  • Loading branch information
iPromKnight committed Nov 16, 2024
1 parent 79e12e1 commit b39b6a3
Show file tree
Hide file tree
Showing 19 changed files with 297 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ namespace Zilean.ApiService.Features.Torrents;
public static class TorrentsEndpoints
{
private const string GroupName = "torrents";
private const string Scrape = "/scrape";
private const string Scrape = "/all";

public static WebApplication MapTorrentsEndpoints(this WebApplication app, ZileanConfiguration configuration)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
using Zilean.Scraper.Features.Dmm;
using Zilean.Scraper.Features.Imdb;
using Zilean.Scraper.Features.PythonSupport;

namespace Zilean.Scraper.Features.Bootstrapping;

public static class ServiceCollectionExtensions
{
public static void AddDmmScraper(this IServiceCollection services, IConfiguration configuration)
public static void AddScrapers(this IServiceCollection services, IConfiguration configuration)
{
var zileanConfiguration = configuration.GetZileanConfiguration();

services.AddSingleton(zileanConfiguration);
services.AddImdbServices();
services.AddDmmServices();
services.AddGenericServices();
services.AddZileanDataServices(zileanConfiguration);
services.AddSingleton<ParseTorrentNameService>();
services.AddHostedService<ServiceLifetime>();
Expand All @@ -27,6 +24,13 @@ private static void AddDmmServices(this IServiceCollection services)
services.AddTransient<DmmService>();
}

private static void AddGenericServices(this IServiceCollection services)
{
services.AddSingleton<GenericIngestionScraping>();
services.AddSingleton<GenericIngestionProcessor>();
services.AddSingleton<KubernetesServiceDiscovery>();
}

private static void AddImdbServices(this IServiceCollection services)
{
services.AddSingleton<ImdbFileService>();
Expand Down
3 changes: 0 additions & 3 deletions src/Zilean.Scraper/Features/Bootstrapping/ServiceLifetime.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
using Zilean.Scraper.Features.Dmm;
using Zilean.Scraper.Features.Imdb;

namespace Zilean.Scraper.Features.Bootstrapping;

public class ServiceLifetime(ImdbMetadataLoader metadataLoader, DmmScraping dmmScraper, IServiceProvider serviceProvider, ILogger<ServiceLifetime> logger, ZileanConfiguration configuration) : IHostedLifecycleService
Expand Down
96 changes: 0 additions & 96 deletions src/Zilean.Scraper/Features/Dmm/ZileanScraper.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Zilean.Scraper.Features.Dmm;
namespace Zilean.Scraper.Features.Ingestion;

public class DmmFileDownloader(ILogger<DmmFileDownloader> logger, ZileanConfiguration configuration)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using Zilean.Scraper.Features.LzString;

namespace Zilean.Scraper.Features.Dmm;
namespace Zilean.Scraper.Features.Ingestion;

public partial class DmmPageProcessor(DmmSyncState state)
: IDisposable
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using Zilean.Scraper.Features.PythonSupport;

namespace Zilean.Scraper.Features.Dmm;
namespace Zilean.Scraper.Features.Ingestion;

public class DmmScraping(
DmmSyncState dmmState,
Expand Down Expand Up @@ -111,7 +109,7 @@ await AnsiConsole.Progress()

var parsedTorrents = await parseTorrentNameService.ParseAndPopulateAsync(distinctTorrents);

var finalizedTorrents = parsedTorrents.Where(WipeSomeTissue).ToList();
var finalizedTorrents = parsedTorrents.Where(torrentInfo => torrentInfo.WipeSomeTissue()).ToList();

await torrentInfoService.StoreTorrentInfo(finalizedTorrents);
}
Expand Down Expand Up @@ -156,7 +154,7 @@ await Parallel.ForEachAsync(files, parallelOptions, async (file, ct) =>

var parsedTorrents = await parseTorrentNameService.ParseAndPopulateAsync(distinctTorrents);

var finalizedTorrents = parsedTorrents.Where(WipeSomeTissue).ToList();
var finalizedTorrents = parsedTorrents.Where(torrentInfo => torrentInfo.WipeSomeTissue()).ToList();

logger.LogInformation("Parsed {Count} torrents", finalizedTorrents.Count);

Expand Down Expand Up @@ -191,9 +189,4 @@ private static async IAsyncEnumerable<ExtractedDmmEntry> ProcessFileAsync(string
yield return torrent;
}
}

private static bool WipeSomeTissue(TorrentInfo torrent) =>
!((torrent.RawTitle.Contains(" xxx ", StringComparison.OrdinalIgnoreCase) ||
torrent.RawTitle.Contains(" xx ", StringComparison.OrdinalIgnoreCase)) &&
!torrent.ParsedTitle.Contains("XXX", StringComparison.OrdinalIgnoreCase));
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Zilean.Scraper.Features.Dmm;
namespace Zilean.Scraper.Features.Ingestion;

public class DmmSyncState(ILogger<DmmSyncState> logger, DmmService dmmService)
{
Expand Down
115 changes: 115 additions & 0 deletions src/Zilean.Scraper/Features/Ingestion/GenericIngestionProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
namespace Zilean.Scraper.Features.Ingestion;

public class GenericIngestionProcessor(
IHttpClientFactory clientFactory,
ILogger<GenericIngestionProcessor> logger,
ParseTorrentNameService parseTorrentNameService,
TorrentInfoService torrentInfoService,
ZileanConfiguration configuration)
{
public async Task ProcessTorrentsAsync(string url, CancellationToken cancellationToken = default)
{
logger.LogInformation("Processing URL: {Url}", url);

var channel = Channel.CreateBounded<Task<StreamedEntry>>(new BoundedChannelOptions(configuration.Ingestion.MaxChannelSize)
{
SingleReader = true,
SingleWriter = false,
FullMode = BoundedChannelFullMode.Wait
});

var producerTask = ProduceAsync(url, channel.Writer, cancellationToken);
var consumerTask = ConsumeAsync(channel.Reader, configuration.Ingestion.BatchSize, cancellationToken);
await Task.WhenAll(producerTask, consumerTask);
}

private async Task ProduceAsync(string url, ChannelWriter<Task<StreamedEntry>> writer, CancellationToken cancellationToken = default)
{
try
{
var httpClient = clientFactory.CreateClient();
var response = await httpClient.GetAsync(url, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
response.EnsureSuccessStatusCode();

var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
var options = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true
};

await foreach (var item in JsonSerializer.DeserializeAsyncEnumerable<StreamedEntry>(stream, options, cancellationToken))
{
if (item is not null)
{
await writer.WriteAsync(Task.FromResult(item), cancellationToken);
}
}
}
catch (Exception)
{
logger.LogWarning("Error processing item");
throw;
}
finally
{
writer.Complete();
}
}

private async Task ConsumeAsync(ChannelReader<Task<StreamedEntry>> reader, int batchSize, CancellationToken cancellationToken = default)
{
var batch = new List<Task<StreamedEntry>>(batchSize);

await foreach (var task in reader.ReadAllAsync(cancellationToken))
{
batch.Add(task);

if (batch.Count < batchSize)
{
continue;
}

await ProcessBatch(batch, cancellationToken);
batch.Clear();
}

if (batch.Count > 0)
{
await ProcessBatch(batch, cancellationToken);
}
}

private async Task ProcessBatch(List<Task<StreamedEntry>> batch, CancellationToken cancellationToken = default)
{
try
{
var torrents = new List<ExtractedDmmEntry>();

await foreach (var result in Task.WhenEach(batch).WithCancellation(cancellationToken))
{
var current = await result;
torrents.Add(ExtractedDmmEntry.FromStreamedEntry(current));
}

if (torrents.Count == 0 || cancellationToken.IsCancellationRequested)
{
return;
}

logger.LogInformation("Processing batch of {Count} torrents", torrents.Count);

if (torrents.Count != 0)
{
var parsedTorrents = await parseTorrentNameService.ParseAndPopulateAsync(torrents);
var finalizedTorrents = parsedTorrents.Where(torrentInfo => torrentInfo.WipeSomeTissue()).ToList();
logger.LogInformation("Parsed {Count} torrents", finalizedTorrents.Count);
await torrentInfoService.StoreTorrentInfo(finalizedTorrents);
}
}
catch (Exception)
{
logger.LogWarning("Error processing batch of torrents. Batch size: {BatchSize}", batch.Count);
throw;
}
}
}
67 changes: 67 additions & 0 deletions src/Zilean.Scraper/Features/Ingestion/GenericIngestionScraping.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
namespace Zilean.Scraper.Features.Ingestion;

public class GenericIngestionScraping(
ZileanConfiguration configuration,
GenericIngestionProcessor ingestionProcessor,
ILogger<GenericIngestionScraping> logger,
KubernetesServiceDiscovery kubernetesServiceDiscovery)
{
public async Task<int> Execute(CancellationToken cancellationToken)
{
logger.LogInformation("Starting ingestion scraping");

List<string> urlsToProcess = [];

if (configuration.Ingestion.Kubernetes.EnableServiceDiscovery)
{
logger.LogInformation("Discovering URLs from Kubernetes services");
var urls = await kubernetesServiceDiscovery.DiscoverUrlsAsync(cancellationToken);
logger.LogInformation("Discovered {Count} URLs from Kubernetes services", urls.Count);
urlsToProcess.AddRange(urls);
}

if (configuration.Ingestion.EnableZurgIngestion)
{
logger.LogInformation("Adding Zurg instances to the list of URLs to process");
urlsToProcess.AddRange(configuration.Ingestion.ZurgInstances);
}

if (configuration.Ingestion.EnableZileanIngestion)
{
logger.LogInformation("Adding Zilean instances to the list of URLs to process");
urlsToProcess.AddRange(configuration.Ingestion.ZileanInstances);
}

if (urlsToProcess.Count == 0)
{
logger.LogInformation("No URLs to process, exiting");
return 0;
}

var completedCount = 0;

foreach (var url in urlsToProcess)
{
cancellationToken.ThrowIfCancellationRequested();

try
{
await ingestionProcessor.ProcessTorrentsAsync(url, cancellationToken);
completedCount++;
}
catch (OperationCanceledException)
{
logger.LogInformation("Ingestion scraping cancelled");
break;
}
catch (Exception ex)
{
logger.LogError(ex, "Error processing URL: {Url}", url);
}
}

logger.LogInformation("Ingestion scraping completed for {Count} URLs", completedCount);

return 0;
}
}
Loading

0 comments on commit b39b6a3

Please sign in to comment.