diff --git a/src/program/db/db_functions.py b/src/program/db/db_functions.py index 54cd8591..ed373a9a 100644 --- a/src/program/db/db_functions.py +++ b/src/program/db/db_functions.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING from loguru import logger -from sqlalchemy import delete, insert, inspect, select, text +from sqlalchemy import delete, exists, insert, inspect, or_, select, text from sqlalchemy.orm import Session, joinedload, selectinload import alembic diff --git a/src/program/program.py b/src/program/program.py index 8515f464..3ae7aa9f 100644 --- a/src/program/program.py +++ b/src/program/program.py @@ -39,7 +39,7 @@ if settings_manager.settings.tracemalloc: import tracemalloc -from sqlalchemy import func, select, text +from sqlalchemy import and_, exists, func, or_, select, text from program.db import db_functions from program.db.db import ( @@ -212,9 +212,39 @@ def _retry_library(self) -> None: for item_id in result.scalars(): self.em.add_event(Event(emitted_by="RetryLibrary", item_id=item_id)) + def _reindex_ongoing(self) -> None: + """Reindex ongoing items.""" + with db.Session() as session: + results = session.execute( + select(MediaItem.id) + .where(MediaItem.type.in_(["movie", "show"])) + .where(MediaItem.last_state.in_([States.Ongoing, States.Unreleased])) + .where(or_(MediaItem.aired_at <= datetime.now(), MediaItem.aired_at.is_(None))) + ).scalars().all() + + if len(results) == 0: + logger.log("PROGRAM", "No ongoing or unreleased items to reindex.") + return + + logger.log("PROGRAM", f"Reindexing {len(results)} unreleased and ongoing items.") + + for item_id in results: + try: + item = session.execute(select(MediaItem).filter_by(id=item_id)).unique().scalar_one_or_none() + if item: + for indexed_item in TraktIndexer().run(item): + indexed_item.store_state() + session.merge(indexed_item) + logger.debug(f"Reindexed {indexed_item.log_string} ({indexed_item.id})") + session.commit() + except Exception as e: + logger.error(f"Failed to reindex item with ID {item_id}: {e}") + logger.log("PROGRAM", "Reindexing completed.") + def _schedule_functions(self) -> None: """Schedule each service based on its update interval.""" scheduled_functions = { + self._reindex_ongoing: {"interval": 60 * 60 * 24}, self._retry_library: {"interval": 60 * 60 * 24}, log_cleaner: {"interval": 60 * 60}, vacuum_and_analyze_index_maintenance: {"interval": 60 * 60 * 24}, @@ -226,9 +256,6 @@ def _schedule_functions(self) -> None: "args": [settings_manager.settings.symlink.library_path, settings_manager.settings.symlink.rclone_path] } - # if settings_manager.settings.post_processing.subliminal.enabled: - # scheduled_functions[self._download_subtitles] = {"interval": 60 * 60 * 24} - for func, config in scheduled_functions.items(): self.scheduler.add_job( func, diff --git a/src/program/services/scrapers/mediafusion.py b/src/program/services/scrapers/mediafusion.py index f312bfc9..810f0906 100644 --- a/src/program/services/scrapers/mediafusion.py +++ b/src/program/services/scrapers/mediafusion.py @@ -141,7 +141,7 @@ def scrape(self, item: MediaItem) -> tuple[Dict[str, str], int]: for stream in response.data.streams: if not hasattr(stream, "description") and hasattr(stream, "title") and "rate-limit exceeded" in stream.title: - raise RateLimitExceeded + raise RateLimitExceeded(f"Mediafusion rate-limit exceeded for item: {item.log_string}") description_split = stream.description.replace("📂 ", "") raw_title = description_split.split("/")[0] or description_split.split("\n")[0] # we want the torrent name if possible info_hash = re.search(r"info_hash=([A-Za-z0-9]+)", stream.url).group(1)