Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/validation #211

Merged
merged 6 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 39 additions & 38 deletions backend/program/content/listrr.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
"""Listrr content module"""
from time import time
from typing import Optional

from requests.exceptions import HTTPError

from program.settings.manager import settings_manager
from utils.logger import logger
from utils.request import get, ping
from program.media.container import MediaItemContainer
from program.updaters.trakt import Updater as Trakt, get_imdbid_from_tmdb, get_imdbid_from_tvdb
from program.updaters.trakt import Updater as Trakt, get_imdbid_from_tmdb


class Listrr:
Expand All @@ -19,15 +16,16 @@ def __init__(self, media_items: MediaItemContainer):
self.url = "https://listrr.pro/api"
self.settings = settings_manager.settings.content.listrr
self.headers = {"X-Api-Key": self.settings.api_key}
self.initialized = self.validate_settings()
self.initialized = self.validate()
if not self.initialized:
return
self.media_items = media_items
self.updater = Trakt()
self.not_found_ids = []
self.next_run_time = 0
logger.info("Listrr initialized!")

def validate_settings(self) -> bool:
def validate(self) -> bool:
"""Validate Listrr settings."""
if not self.settings.enabled:
logger.debug("Listrr is set to disabled.")
Expand Down Expand Up @@ -57,55 +55,58 @@ def validate_settings(self) -> bool:
return False

def run(self):
"""Fetch media from Listrr and add them to media_items attribute."""
"""Fetch new media from `Listrr`"""
if time() < self.next_run_time:
return
self.not_found_ids.clear()
self.next_run_time = time() + self.settings.update_interval
movie_items = self._get_items_from_Listrr("Movies", self.settings.movie_lists)
show_items = self._get_items_from_Listrr("Shows", self.settings.show_lists)
items = list(set(movie_items + show_items))
new_items = [item for item in items if item not in self.media_items]
items = set(movie_items + show_items)
new_items = [item for item in items if item not in self.media_items and not item]
if not new_items:
return
container = self.updater.create_items(new_items)
for item in container:
item.set("requested_by", "Listrr")
added_items = self.media_items.extend(container)
length = len(added_items)
if length >= 1 and length <= 5:
for item in added_items:
logger.info("Added %s", item.log_string)
elif length > 5:
logger.info("Added %s items", length)
previous_count = len(self.media_items)
self.media_items.extend(container)
if len(self.media_items) > 0:
if len(self.media_items) <= 5:
for item in container:
logger.info("Added %s", item.log_string)
else:
logger.info("Added %s items", len(self.media_items) - previous_count)
if self.not_found_ids:
logger.warn("Failed to process %s items, skipping.", len(self.not_found_ids))

def _get_items_from_Listrr(self, content_type, content_lists):
"""Fetch unique IMDb IDs from Listrr for a given type and list of content."""
unique_ids = set()
if not content_lists:
return list(unique_ids)

for list_id in content_lists:
page = 1
total_pages = 1
if not list_id or len(list_id) != 24:
continue # Skip invalid list IDs

page, total_pages = 1, 1
while page <= total_pages:
if list_id == "":
break
try:
response = get(
self.url + f"/List/{content_type}/{list_id}/ReleaseDate/Descending/{page}",
additional_headers=self.headers,
)
if response.is_ok:
total_pages = response.data.pages
for item in response.data.items:
imdb_id = item.imDbId
url = f"{self.url}/List/{content_type}/{list_id}/ReleaseDate/Descending/{page}"
response = get(url, additional_headers=self.headers).response
data = response.json()
total_pages = data.get('pages', 1)
for item in data.get('items', []):
imdb_id = item.get('imDbId')
if imdb_id:
unique_ids.add(imdb_id)
elif content_type == "Movies" and item.get('tmDbId'):
imdb_id = get_imdbid_from_tmdb(item['tmDbId'])
if imdb_id:
unique_ids.add(imdb_id)
# elif content_type == "Shows" and item.tvDbId:
# imdb_id = get_imdbid_from_tvdb(item.tvDbId)
# if imdb_id:
# unique_ids.add(imdb_id)
if not imdb_id and content_type == "Movies" and item.tmDbId:
imdb_id = get_imdbid_from_tmdb(item.tmDbId)
if imdb_id:
unique_ids.add(imdb_id)
else:
break
else:
self.not_found_ids.append(item['id'])
except HTTPError as e:
if e.response.status_code in [400, 404, 429, 500]:
break
Expand Down
11 changes: 7 additions & 4 deletions backend/program/content/mdblist.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Mdblist content module"""
from typing import Optional

from time import time
from program.settings.manager import settings_manager
from utils.logger import logger
from utils.request import RateLimitExceeded, RateLimiter, get, ping
Expand All @@ -14,16 +13,17 @@ class Mdblist:
def __init__(self, media_items: MediaItemContainer):
self.key = "mdblist"
self.settings = settings_manager.settings.content.mdblist
self.initialized = self.validate_settings()
self.initialized = self.validate()
if not self.initialized:
return
self.media_items = media_items
self.updater = Trakt()
self.next_run_time = 0
self.requests_per_2_minutes = self._calculate_request_time()
self.rate_limiter = RateLimiter(self.requests_per_2_minutes, 120, True)
logger.info("mdblist initialized")

def validate_settings(self):
def validate(self):
if not self.settings.enabled:
logger.debug("Mdblist is set to disabled.")
return False
Expand All @@ -42,6 +42,9 @@ def validate_settings(self):
def run(self):
"""Fetch media from mdblist and add them to media_items attribute
if they are not already there"""
if time() < self.next_run_time:
return
self.next_run_time = time() + self.settings.update_interval
try:
with self.rate_limiter:
items = []
Expand Down
43 changes: 25 additions & 18 deletions backend/program/content/overseerr.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
"""Mdblist content module"""
from typing import Optional


from time import time
from program.settings.manager import settings_manager
from utils.logger import logger
from utils.request import get, ping
from program.media.container import MediaItemContainer
from program.updaters.trakt import Updater as Trakt, get_imdbid_from_tmdb, get_imdbid_from_tvdb
from program.updaters.trakt import Updater as Trakt, get_imdbid_from_tmdb


class Overseerr:
Expand All @@ -16,15 +14,16 @@ def __init__(self, media_items: MediaItemContainer):
self.key = "overseerr"
self.settings = settings_manager.settings.content.overseerr
self.headers = {"X-Api-Key": self.settings.api_key}
self.initialized = self.validate_settings()
self.initialized = self.validate()
if not self.initialized:
return
self.media_items = media_items
self.updater = Trakt()
self.not_found_ids = []
self.next_run_time = 0
logger.info("Overseerr initialized!")

def validate_settings(self) -> bool:
def validate(self) -> bool:
if not self.settings.enabled:
logger.debug("Overseerr is set to disabled.")
return False
Expand All @@ -48,22 +47,30 @@ def validate_settings(self) -> bool:
return False

def run(self):
"""Fetch media from overseerr and add them to media_items attribute
if they are not already there"""
"""Fetch new media from `Overseerr`"""
if time() < self.next_run_time:
return
self.not_found_ids.clear()
self.next_run_time = time() + self.settings.update_interval
items = self._get_items_from_overseerr(10000)
new_items = [item for item in items if item not in self.media_items] or []
new_items = [item for item in items if item not in self.media_items and not item]
if not new_items:
return
container = self.updater.create_items(new_items)
for item in container:
item.set("requested_by", "Overseerr")
added_items = self.media_items.extend(container)
length = len(added_items)
if length >= 1 and length <= 5:
for item in added_items:
logger.info("Added %s", item.log_string)
elif length > 5:
logger.info("Added %s items", length)
previous_count = len(self.media_items)
self.media_items.extend(container)
if len(self.media_items) > 0:
if len(self.media_items) <= 5:
for item in container:
logger.info("Added %s", item.log_string)
else:
logger.info("Added %s items", len(self.media_items) - previous_count)
if self.not_found_ids:
logger.warn("Failed to process %s items, skipping.", len(self.not_found_ids))

def _get_items_from_overseerr(self, amount: int):
def _get_items_from_overseerr(self, amount: int) -> list[str]:
"""Fetch media from overseerr"""
response = get(
self.settings.url + f"/api/v1/request?take={amount}",
Expand All @@ -80,7 +87,7 @@ def _get_items_from_overseerr(self, amount: int):
ids.append(item.media.imdbId)
return ids

def get_imdb_id(self, overseerr_item):
def get_imdb_id(self, overseerr_item) -> str:
"""Get imdbId for item from overseerr"""
if overseerr_item.mediaType == "show":
external_id = overseerr_item.tvdbId
Expand Down
Loading
Loading