diff --git a/Zilean.sln.DotSettings.user b/Zilean.sln.DotSettings.user index ca94492..bd69b9e 100644 --- a/Zilean.sln.DotSettings.user +++ b/Zilean.sln.DotSettings.user @@ -1,4 +1,5 @@  + ForceIncluded ForceIncluded ForceIncluded ForceIncluded diff --git a/src/Zilean.Database/Services/ITorrentInfoService.cs b/src/Zilean.Database/Services/ITorrentInfoService.cs index 59af965..fd4f897 100644 --- a/src/Zilean.Database/Services/ITorrentInfoService.cs +++ b/src/Zilean.Database/Services/ITorrentInfoService.cs @@ -5,4 +5,5 @@ public interface ITorrentInfoService Task StoreTorrentInfo(List torrents, int batchSize = 10000); Task SearchForTorrentInfoByOnlyTitle(string query); Task SearchForTorrentInfoFiltered(TorrentInfoFilter filter, int? limit = null); + Task> GetExistingInfoHashesAsync(List infoHashes); } diff --git a/src/Zilean.Database/Services/TorrentInfoService.cs b/src/Zilean.Database/Services/TorrentInfoService.cs index c81a8f2..5185d0d 100644 --- a/src/Zilean.Database/Services/TorrentInfoService.cs +++ b/src/Zilean.Database/Services/TorrentInfoService.cs @@ -189,5 +189,18 @@ FROM search_imdb_meta( return imdbRecord?.ImdbId; } + public async Task> GetExistingInfoHashesAsync(List infoHashes) + { + await using var serviceScope = serviceProvider.CreateAsyncScope(); + await using var dbContext = serviceScope.ServiceProvider.GetRequiredService(); + + var existingHashes = await dbContext.Torrents + .Where(t => infoHashes.Contains(t.InfoHash)) + .Select(t => t.InfoHash) + .ToListAsync(); + + return [..existingHashes]; + } + private void WriteProgress(decimal @decimal) => logger.LogInformation("Storing torrent info: {Percentage:P}", @decimal); } diff --git a/src/Zilean.Scraper/Features/Ingestion/GenericIngestionProcessor.cs b/src/Zilean.Scraper/Features/Ingestion/GenericIngestionProcessor.cs index 443cbf8..47c6475 100644 --- a/src/Zilean.Scraper/Features/Ingestion/GenericIngestionProcessor.cs +++ b/src/Zilean.Scraper/Features/Ingestion/GenericIngestionProcessor.cs @@ -7,9 +7,14 @@ public class GenericIngestionProcessor( ITorrentInfoService torrentInfoService, ZileanConfiguration configuration) { - public async Task ProcessTorrentsAsync(string url, CancellationToken cancellationToken = default) + private int _processedCount; + + public async Task ProcessTorrentsAsync(GenericEndpoint endpoint, CancellationToken cancellationToken = default) { - logger.LogInformation("Processing URL: {Url}", url); + var sw = Stopwatch.StartNew(); + logger.LogInformation("Processing URL: {@Url}", endpoint); + + Interlocked.Exchange(ref _processedCount, 0); var channel = Channel.CreateBounded>(new BoundedChannelOptions(configuration.Ingestion.MaxChannelSize) { @@ -18,17 +23,27 @@ public async Task ProcessTorrentsAsync(string url, CancellationToken cancellatio FullMode = BoundedChannelFullMode.Wait }); - var producerTask = ProduceAsync(url, channel.Writer, cancellationToken); + var producerTask = ProduceAsync(endpoint, channel.Writer, cancellationToken); var consumerTask = ConsumeAsync(channel.Reader, configuration.Ingestion.BatchSize, cancellationToken); await Task.WhenAll(producerTask, consumerTask); + + logger.LogInformation("Processed {Count} torrents for endpoint '{@Endpoint}' in {TimeTaken}s", _processedCount, endpoint, sw.Elapsed.TotalSeconds); + sw.Stop(); } - private async Task ProduceAsync(string url, ChannelWriter> writer, CancellationToken cancellationToken = default) + private async Task ProduceAsync(GenericEndpoint endpoint, ChannelWriter> writer, CancellationToken cancellationToken = default) { try { var httpClient = clientFactory.CreateClient(); - var response = await httpClient.GetAsync(url, HttpCompletionOption.ResponseHeadersRead, cancellationToken); + var fullUrl = endpoint.EndpointType switch + { + GenericEndpointType.Zurg => $"{endpoint.Url}/debug/torrents", + GenericEndpointType.Zilean => $"{endpoint.Url}/torrents/all", + _ => throw new InvalidOperationException("Unknown endpoint type") + }; + + var response = await httpClient.GetAsync(fullUrl, HttpCompletionOption.ResponseHeadersRead, cancellationToken); response.EnsureSuccessStatusCode(); var stream = await response.Content.ReadAsStreamAsync(cancellationToken); @@ -89,6 +104,7 @@ private async Task ProcessBatch(List> batch, CancellationTok { var current = await result; torrents.Add(ExtractedDmmEntry.FromStreamedEntry(current)); + Interlocked.Increment(ref _processedCount); } if (torrents.Count == 0 || cancellationToken.IsCancellationRequested) @@ -96,11 +112,22 @@ private async Task ProcessBatch(List> batch, CancellationTok return; } - logger.LogInformation("Processing batch of {Count} torrents", torrents.Count); + var infoHashes = torrents.Select(t => t.InfoHash!).ToList(); + + var existingInfoHashes = await torrentInfoService.GetExistingInfoHashesAsync(infoHashes); + + var newTorrents = torrents.Where(t => !existingInfoHashes.Contains(t.InfoHash)).ToList(); + logger.LogInformation("Filtered out {Count} torrents already in the database", torrents.Count - newTorrents.Count); + + if (newTorrents.Count == 0) + { + logger.LogInformation("No new torrents to process in this batch."); + return; + } if (torrents.Count != 0) { - var parsedTorrents = await parseTorrentNameService.ParseAndPopulateAsync(torrents); + var parsedTorrents = await parseTorrentNameService.ParseAndPopulateAsync(newTorrents); var finalizedTorrents = parsedTorrents.Where(torrentInfo => torrentInfo.WipeSomeTissue()).ToList(); logger.LogInformation("Parsed {Count} torrents", finalizedTorrents.Count); await torrentInfoService.StoreTorrentInfo(finalizedTorrents); diff --git a/src/Zilean.Scraper/Features/Ingestion/GenericIngestionScraping.cs b/src/Zilean.Scraper/Features/Ingestion/GenericIngestionScraping.cs index 17efb04..e3e718c 100644 --- a/src/Zilean.Scraper/Features/Ingestion/GenericIngestionScraping.cs +++ b/src/Zilean.Scraper/Features/Ingestion/GenericIngestionScraping.cs @@ -10,27 +10,16 @@ public async Task Execute(CancellationToken cancellationToken) { logger.LogInformation("Starting ingestion scraping"); - List urlsToProcess = []; + List urlsToProcess = []; - if (configuration.Ingestion.Kubernetes.EnableServiceDiscovery) - { - logger.LogInformation("Discovering URLs from Kubernetes services"); - var urls = await kubernetesServiceDiscovery.DiscoverUrlsAsync(cancellationToken); - logger.LogInformation("Discovered {Count} URLs from Kubernetes services", urls.Count); - urlsToProcess.AddRange(urls); - } + configuration.Ingestion.ZileanInstances.Add("http://localhost:8181"); + configuration.Ingestion.ZurgInstances.Add("http://experiments:19999"); - if (configuration.Ingestion.ZurgInstances.Count > 0) - { - logger.LogInformation("Adding Zurg instances to the list of URLs to process"); - urlsToProcess.AddRange(configuration.Ingestion.ZurgInstances); - } + await DiscoverUrlsFromKubernetesServices(cancellationToken, urlsToProcess); - if (configuration.Ingestion.ZileanInstances.Count > 0) - { - logger.LogInformation("Adding Zilean instances to the list of URLs to process"); - urlsToProcess.AddRange(configuration.Ingestion.ZileanInstances); - } + AddZurgInstancesToUrls(urlsToProcess); + + AddZileanInstancesToUrls(urlsToProcess); if (urlsToProcess.Count == 0) { @@ -56,7 +45,7 @@ public async Task Execute(CancellationToken cancellationToken) } catch (Exception ex) { - logger.LogError(ex, "Error processing URL: {Url}", url); + logger.LogError(ex, "Error processing URL: {@Url}", url); } } @@ -64,4 +53,41 @@ public async Task Execute(CancellationToken cancellationToken) return 0; } + + private void AddZileanInstancesToUrls(List urlsToProcess) + { + if (configuration.Ingestion.ZileanInstances.Count > 0) + { + logger.LogInformation("Adding Zilean instances to the list of URLs to process"); + urlsToProcess.AddRange(configuration.Ingestion.ZileanInstances.Select(url => new GenericEndpoint + { + EndpointType = GenericEndpointType.Zilean, + Url = url, + })); + } + } + + private void AddZurgInstancesToUrls(List urlsToProcess) + { + if (configuration.Ingestion.ZurgInstances.Count > 0) + { + logger.LogInformation("Adding Zurg instances to the list of URLs to process"); + urlsToProcess.AddRange(configuration.Ingestion.ZurgInstances.Select(url => new GenericEndpoint + { + EndpointType = GenericEndpointType.Zurg, + Url = url, + })); + } + } + + private async Task DiscoverUrlsFromKubernetesServices(CancellationToken cancellationToken, List urlsToProcess) + { + if (configuration.Ingestion.Kubernetes.EnableServiceDiscovery) + { + logger.LogInformation("Discovering URLs from Kubernetes services"); + var endpoints = await kubernetesServiceDiscovery.DiscoverUrlsAsync(cancellationToken); + logger.LogInformation("Discovered {Count} URLs from Kubernetes services", endpoints.Count); + urlsToProcess.AddRange(endpoints); + } + } } diff --git a/src/Zilean.Scraper/Features/Ingestion/KubernetesServiceDiscovery.cs b/src/Zilean.Scraper/Features/Ingestion/KubernetesServiceDiscovery.cs index d83021d..7b86ddf 100644 --- a/src/Zilean.Scraper/Features/Ingestion/KubernetesServiceDiscovery.cs +++ b/src/Zilean.Scraper/Features/Ingestion/KubernetesServiceDiscovery.cs @@ -4,9 +4,11 @@ public class KubernetesServiceDiscovery( ILogger logger, ZileanConfiguration configuration) { - public async Task> DiscoverUrlsAsync(CancellationToken cancellationToken = default) + private record DiscoveredService(V1Service Service, KubernetesSelector Selector); + + public async Task> DiscoverUrlsAsync(CancellationToken cancellationToken = default) { - var urls = new List(); + var urls = new List(); try { @@ -14,44 +16,55 @@ public async Task> DiscoverUrlsAsync(CancellationToken cancellation KubernetesClientConfiguration.BuildConfigFromConfigFile(configuration.Ingestion.Kubernetes.KubeConfigFile); var kubernetesClient = new Kubernetes(clientConfig); - var services = await kubernetesClient.CoreV1.ListServiceForAllNamespacesAsync( - labelSelector: configuration.Ingestion.Kubernetes.LabelSelector, - cancellationToken: cancellationToken); + List discoveredServices = []; + + foreach (var selector in configuration.Ingestion.Kubernetes.KubernetesSelectors) + { + var services = await kubernetesClient.CoreV1.ListServiceForAllNamespacesAsync( + labelSelector: selector.LabelSelector, + cancellationToken: cancellationToken); + + discoveredServices.AddRange(services.Items.Select(service => new DiscoveredService(service, selector))); + } - foreach (var service in services.Items) + foreach (var service in discoveredServices) { try { var url = BuildUrlFromService(service); if (!string.IsNullOrEmpty(url)) { - urls.Add(url); + urls.Add(new GenericEndpoint + { + EndpointType = service.Selector.EndpointType, + Url = url, + }); logger.LogInformation("Discovered service URL: {Url}", url); } } catch (Exception ex) { logger.LogError(ex, "Failed to build URL for service {ServiceName} in namespace {Namespace}", - service.Metadata.Name, service.Metadata.NamespaceProperty); + service.Service.Metadata.Name, service.Service.Metadata.NamespaceProperty); } } } catch (Exception ex) { - logger.LogError(ex, "Failed to list services with label selector {LabelSelector}", configuration.Ingestion.Kubernetes.LabelSelector); + logger.LogError(ex, "Failed to list services with label selectors {@LabelSelector}", configuration.Ingestion.Kubernetes.KubernetesSelectors); } return urls; } - private string BuildUrlFromService(V1Service service) + private static string BuildUrlFromService(DiscoveredService service) { - if (service.Metadata?.NamespaceProperty == null) + if (service.Service.Metadata?.NamespaceProperty == null) { throw new InvalidOperationException("Service metadata or namespace is missing."); } - var namespaceName = service.Metadata.NamespaceProperty; - return string.Format(configuration.Ingestion.Kubernetes.ZurgUrlTemplate, namespaceName); + var namespaceName = service.Service.Metadata.NamespaceProperty; + return string.Format(service.Selector.UrlTemplate, namespaceName); } } diff --git a/src/Zilean.Shared/Features/Configuration/IngestionConfiguration.cs b/src/Zilean.Shared/Features/Configuration/IngestionConfiguration.cs index a1204b8..dbb533f 100644 --- a/src/Zilean.Shared/Features/Configuration/IngestionConfiguration.cs +++ b/src/Zilean.Shared/Features/Configuration/IngestionConfiguration.cs @@ -6,8 +6,9 @@ public class IngestionConfiguration public List ZileanInstances { get; set; } = []; public bool EnableScraping { get; set; } = false; public KubernetesConfiguration Kubernetes { get; set; } = new(); - public int BatchSize { get; set; } = 1000; + public int BatchSize { get; set; } = 500; public int MaxChannelSize { get; set; } = 5000; - public string ScrapeSchedule { get; set; } = "0 * * * *"; + public string ZurgEndpointSuffix { get; set; } = "/debug/torrents"; + public string ZileanEndpointSuffix { get; set; } = "/torrents/all"; } diff --git a/src/Zilean.Shared/Features/Configuration/KubernetesConfiguration.cs b/src/Zilean.Shared/Features/Configuration/KubernetesConfiguration.cs index 27810af..8964c76 100644 --- a/src/Zilean.Shared/Features/Configuration/KubernetesConfiguration.cs +++ b/src/Zilean.Shared/Features/Configuration/KubernetesConfiguration.cs @@ -3,8 +3,7 @@ public class KubernetesConfiguration { public bool EnableServiceDiscovery { get; set; } = false; - public string ZurgUrlTemplate { get; set; } = "http://zurg.{0}:9999/debug/torrents"; - public string LabelSelector { get; set; } = "app.elfhosted.com/name=zurg"; + public List KubernetesSelectors { get; set; } = [new()]; public string KubeConfigFile { get; set; } = "/$HOME/.kube/config"; public bool IsConfigFile { get; set; } = false; } diff --git a/src/Zilean.Shared/Features/Configuration/KubernetesSelector.cs b/src/Zilean.Shared/Features/Configuration/KubernetesSelector.cs new file mode 100644 index 0000000..cfdf9cf --- /dev/null +++ b/src/Zilean.Shared/Features/Configuration/KubernetesSelector.cs @@ -0,0 +1,8 @@ +namespace Zilean.Shared.Features.Configuration; + +public class KubernetesSelector +{ + public string UrlTemplate { get; set; } = "http://zurg.{0}:9999/debug/torrents"; + public string LabelSelector { get; set; } = "app.elfhosted.com/name=zurg"; + public GenericEndpointType EndpointType { get; set; } = GenericEndpointType.Zurg; +} diff --git a/src/Zilean.Shared/Features/Dmm/DmmRecords.cs b/src/Zilean.Shared/Features/Dmm/DmmRecords.cs index 80bbd63..8305b40 100644 --- a/src/Zilean.Shared/Features/Dmm/DmmRecords.cs +++ b/src/Zilean.Shared/Features/Dmm/DmmRecords.cs @@ -1,5 +1,3 @@ -using Zilean.Shared.Features.Scraping; - namespace Zilean.Shared.Features.Dmm; public class ExtractedDmmEntry(string? infoHash, string? filename, long filesize, TorrentInfo? parseResponse) diff --git a/src/Zilean.Shared/Features/Scraping/GenericEndpoint.cs b/src/Zilean.Shared/Features/Scraping/GenericEndpoint.cs new file mode 100644 index 0000000..998135c --- /dev/null +++ b/src/Zilean.Shared/Features/Scraping/GenericEndpoint.cs @@ -0,0 +1,7 @@ +namespace Zilean.Shared.Features.Scraping; + +public class GenericEndpoint +{ + public required string Url { get; set; } + public required GenericEndpointType EndpointType { get; set; } +} diff --git a/src/Zilean.Shared/Features/Scraping/GenericEndpointType.cs b/src/Zilean.Shared/Features/Scraping/GenericEndpointType.cs new file mode 100644 index 0000000..2a0c04a --- /dev/null +++ b/src/Zilean.Shared/Features/Scraping/GenericEndpointType.cs @@ -0,0 +1,7 @@ +namespace Zilean.Shared.Features.Scraping; + +public enum GenericEndpointType +{ + Zilean, + Zurg +} diff --git a/src/Zilean.Shared/GlobalUsings.cs b/src/Zilean.Shared/GlobalUsings.cs index 97ca155..36e1084 100644 --- a/src/Zilean.Shared/GlobalUsings.cs +++ b/src/Zilean.Shared/GlobalUsings.cs @@ -17,6 +17,7 @@ global using Zilean.Shared.Features.Configuration; global using Zilean.Shared.Features.Dmm; global using Zilean.Shared.Features.Imdb; +global using Zilean.Shared.Features.Scraping; global using Zilean.Shared.Features.Torznab.Categories; global using Zilean.Shared.Features.Torznab.Info; global using Zilean.Shared.Features.Torznab.Parameters;