From 43a24eced7cf9882a26f25f7fc70580a1c439f32 Mon Sep 17 00:00:00 2001 From: CasVT Date: Tue, 10 Dec 2024 14:00:22 +0100 Subject: [PATCH] Refactored conversion.py & converters.py --- backend/base/definitions.py | 24 ++ backend/base/files.py | 31 ++- backend/features/tasks.py | 24 +- backend/implementations/conversion.py | 273 ++++++++++------------ backend/implementations/converters.py | 318 ++++++++++---------------- backend/internals/settings.py | 4 +- frontend/api.py | 6 +- frontend/static/js/view_volume.js | 18 +- 8 files changed, 329 insertions(+), 369 deletions(-) diff --git a/backend/base/definitions.py b/backend/base/definitions.py index cd2ce80..beb8beb 100644 --- a/backend/base/definitions.py +++ b/backend/base/definitions.py @@ -31,6 +31,12 @@ class Constants: LOGGER_FILENAME = "Kapowarr.log" ARCHIVE_EXTRACT_FOLDER = '.archive_extract' + ZIP_MIN_MOD_TIME = 315619200 + RAR_EXECUTABLES = { + 'linux': 'rar_linux_64', + 'darwin': 'rar_bsd_64', + 'win32': 'rar_windows_64.exe' + } DEFAULT_USERAGENT = "Kapowarr" TOTAL_RETRIES = 5 @@ -486,3 +492,21 @@ class DBMigrator(ABC): @abstractmethod def run(self) -> None: ... + + +class FileConverter(ABC): + source_format: str + target_format: str + + @staticmethod + @abstractmethod + def convert(file: str) -> List[str]: + """Convert a file from source_format to target_format. + + Args: + file (str): Filepath to the source file, should be in source_format. + + Returns: + List[str]: The resulting files or directories, in target_format. + """ + ... diff --git a/backend/base/files.py b/backend/base/files.py index cff2fd5..45b8be6 100644 --- a/backend/base/files.py +++ b/backend/base/files.py @@ -7,12 +7,12 @@ from collections import deque from os import listdir, makedirs, remove, scandir from os.path import (abspath, basename, commonpath, dirname, isdir, - isfile, join, relpath, samefile, splitext) + isfile, join, relpath, samefile, sep, splitext) from re import compile from shutil import copytree, move, rmtree from typing import Deque, Dict, Iterable, List, Sequence, Set -from backend.base.definitions import CharConstants +from backend.base.definitions import CharConstants, Constants from backend.base.helpers import check_filter, force_suffix from backend.base.logging import LOGGER @@ -196,6 +196,33 @@ def propose_basefolder_change( return file_changes +def generate_archive_folder( + volume_folder: str, + archive_file: str +) -> str: + """Generate a folder in which the given archive file can be extracted. + + Args: + volume_folder (str): The volume folder that the archive file is in. + archive_file (str): The filepath of the archive file itself. + + Returns: + str: The folder in which the archive file can be extracted. + """ + return join( + volume_folder, + Constants.ARCHIVE_EXTRACT_FOLDER, + splitext( + '_'.join( + relpath( + archive_file, + volume_folder + ).split(sep) + ) + )[0] + ) + + # region Creation def create_folder(folder: str) -> None: """Create a folder diff --git a/backend/features/tasks.py b/backend/features/tasks.py index 6f049de..2f617a2 100644 --- a/backend/features/tasks.py +++ b/backend/features/tasks.py @@ -131,15 +131,15 @@ def __init__( self, volume_id: int, issue_id: int, - filepath_filter: Union[List[str], None] = [] + filepath_filter: List[str] = [] ) -> None: """Create the task Args: volume_id (int): The ID of the volume for which to perform the task. issue_id (int): The ID of the issue for which to perform the task. - filepath_filter (Union[List[str], None], optional): Only rename - files in this list. + filepath_filter (List[str], optional): Only rename files in this + list. Defaults to []. """ self._volume_id = volume_id @@ -184,15 +184,15 @@ def __init__( self, volume_id: int, issue_id: int, - filepath_filter: Union[List[str], None] = [] + filepath_filter: List[str] = [] ) -> None: """Create the task Args: volume_id (int): The ID of the volume for which to perform the task. issue_id (int): The ID of the issue for which to perform the task. - filepath_filter (Union[List[str], None], optional): Only rename - files in this list. + filepath_filter (List[str], optional): Only rename files in this + list. Defaults to []. """ self._volume_id = volume_id @@ -320,14 +320,14 @@ def issue_id(self) -> None: def __init__( self, volume_id: int, - filepath_filter: Union[List[str], None] = [] + filepath_filter: List[str] = [] ) -> None: """Create the task Args: volume_id (int): The ID of the volume for which to perform the task. - filepath_filter (Union[List[str], None], optional): Only rename - files in this list. + filepath_filter (List[str], optional): Only rename files in this + list. Defaults to []. """ self._volume_id = volume_id @@ -368,14 +368,14 @@ def issue_id(self) -> None: def __init__( self, volume_id: int, - filepath_filter: Union[List[str], None] = [] + filepath_filter: List[str] = [] ) -> None: """Create the task Args: volume_id (int): The ID of the volume for which to perform the task. - filepath_filter (Union[List[str], None], optional): Only convert - files in this list. + filepath_filter (List[str], optional): Only convert files in this + list. Defaults to []. """ self._volume_id = volume_id diff --git a/backend/implementations/conversion.py b/backend/implementations/conversion.py index ec91517..1f3cb79 100644 --- a/backend/implementations/conversion.py +++ b/backend/implementations/conversion.py @@ -4,104 +4,21 @@ Converting files to a different format """ +from functools import lru_cache from itertools import chain from os.path import splitext -from sys import platform -from typing import Dict, Iterable, List, Set, Tuple, Type, Union +from typing import Dict, List, Set, Type, Union from zipfile import ZipFile -from backend.base.definitions import FileConstants -from backend.base.helpers import PortablePool -from backend.implementations.converters import (FileConverter, get_rar_output, - rar_executables) +from backend.base.definitions import FileConstants, FileConverter +from backend.base.helpers import PortablePool, filtered_iter +from backend.base.logging import LOGGER +from backend.implementations.converters import run_rar from backend.implementations.volumes import Volume, scan_files from backend.internals.db import commit from backend.internals.server import WebSocket from backend.internals.settings import Settings -conversion_methods: Dict[str, Dict[str, Type[FileConverter]]] = {} -"source_format -> target_format -> conversion class" -for fc in FileConverter.__subclasses__(): - conversion_methods.setdefault(fc.source_format, {})[fc.target_format] = fc - - -def get_available_formats() -> Set[str]: - """Get all available formats that can be converted to. - - Returns: - Set[str]: The list with all formats - """ - return set(chain.from_iterable(conversion_methods.values())) - - -def find_target_format_file( - file: str, - formats: Iterable[str] -) -> Union[Type[FileConverter], None]: - """Get a FileConverter class based on source format and desired formats. - - Args: - file (str): The file to get the converter for. - formats (Iterable[str]): The formats to convert to, in order of preference. - - Returns: - Union[Type[FileConverter], None]: The converter class that is possible - and most prefered. - In case of no possible conversion, `None` is returned. - """ - source_format = splitext(file)[1].lstrip('.').lower() - - if source_format not in conversion_methods: - return - - if ( - source_format in ('rar', 'cbr') - and platform not in rar_executables - ): - return - - available_formats = conversion_methods[source_format] - - for format in formats: - if source_format == format: - break - - if format in available_formats: - return available_formats[format] - - return - - -def convert_file( - file: str, - formats: Iterable[str] -) -> Union[str, List[str]]: - """Convert a file from one format to another. - - Args: - file (str): The file to convert. - - formats (Iterable[str]): A iterable of formats to convert the file to. - Order of list is preference of format (left to right). - - Should be key `conversion.conversion_methods` -> source_format dict. - - Returns: - Union[str, List[str]]: The path of the converted file. - """ - conversion_class = find_target_format_file( - file, - formats - ) - if conversion_class is not None: - return conversion_class.convert(file) - else: - return file - - -def map_convert_file(a: Tuple[str, Iterable[str]]) -> Union[str, List[str]]: - return convert_file(*a) - def archive_contains_issues(archive_file: str) -> bool: """Check if an archive file contains full issues or if the whole archive @@ -112,7 +29,7 @@ def archive_contains_issues(archive_file: str) -> bool: extension. Returns: - bool: Whether or not the archive file contains issue files. + bool: Whether the archive file contains issue files. """ ext = splitext(archive_file)[1].lower() @@ -120,11 +37,11 @@ def archive_contains_issues(archive_file: str) -> bool: with ZipFile(archive_file) as zip: namelist = zip.namelist() - elif ext == '.rar' and platform in rar_executables: - namelist = get_rar_output([ - "lb", - archive_file - ]).split("\n")[:-1] + elif ext == '.rar': + namelist = run_rar([ + "lb", # List archive contents bare + archive_file # Archive to list contents of + ]).stdout.split("\n")[:-1] else: return False @@ -135,11 +52,95 @@ def archive_contains_issues(archive_file: str) -> bool: ) +class FileConversionHandler: + @staticmethod + @lru_cache(1) + def get_conversion_methods() -> Dict[str, Dict[str, Type[FileConverter]]]: + """Get all converters. + + Returns: + Dict[str, Dict[str, Type[FileConverter]]]: Mapping of source_format + to target_format to conversion class. + """ + conversion_methods = {} + for fc in FileConverter.__subclasses__(): + conversion_methods.setdefault( + fc.source_format, {} + )[fc.target_format] = fc + return conversion_methods + + @staticmethod + @lru_cache(1) + def get_available_formats() -> Set[str]: + """Get all available formats that can be converted to. + + Returns: + Set[str]: The list with all formats. + """ + return set(chain.from_iterable( + FileConversionHandler.get_conversion_methods().values() + )) + + def __init__( + self, + file: str, + format_preference: Union[List[str], None] = None + ) -> None: + """Prepare file for conversion. + + Args: + file (str): The file to convert. + format_preference (Union[List[str], None], optional): Custom format + preference to use, or `None` to use the one from the settings. + Defaults to None. + """ + self.file = file + self.fp = format_preference or Settings().sv.format_preference + self.source_format = splitext(file)[1].lstrip('.').lower() + self.target_format = self.source_format + self.converter = None + + conversion_methods = self.get_conversion_methods() + + if self.source_format not in conversion_methods: + return + + available_formats = conversion_methods[self.source_format] + for format in self.fp: + if self.source_format == format: + break + + if format in available_formats: + self.target_format = available_formats[format].target_format + self.converter = available_formats[format] + break + + return + + +def convert_file(converter: FileConversionHandler) -> List[str]: + """Convert a file. + + Args: + converter (FileConversionHandler): The file converter. + + Returns: + List[str]: The resulting files from the conversion. + """ + if not converter.converter: + return [converter.file] + + LOGGER.info( + f"Converting file from {converter.source_format} to {converter.target_format}: {converter.file}" + ) + return converter.converter.convert(converter.file) + + def preview_mass_convert( volume_id: int, issue_id: Union[int, None] = None -) -> List[Dict[str, str]]: - """Get a list of suggested conversions for a volume or issue +) -> Dict[str, str]: + """Get a list of suggested conversions for a volume or issue. Args: volume_id (int): The ID of the volume to check for. @@ -147,17 +148,15 @@ def preview_mass_convert( Defaults to None. Returns: - List[Dict[str, str]]: The list of suggestions. - Dicts have the keys `before` and `after`. + Dict[str, str]: Mapping of filename before to after conversion. """ settings = Settings().get_settings() volume = Volume(volume_id) - format_preference = settings.format_preference extract_issue_ranges = settings.extract_issue_ranges volume_folder = volume.vd.folder - result = [] + result = {} for f in sorted(( f["filepath"] for f in ( @@ -173,28 +172,16 @@ def preview_mass_convert( and splitext(f)[1].lower() in FileConstants.EXTRACTABLE_EXTENSIONS and archive_contains_issues(f) ): - converter = find_target_format_file( - f, - ['folder'] - ) + converter = FileConversionHandler(f, ['folder']).converter if converter is None: - converter = find_target_format_file( - f, - format_preference - ) + converter = FileConversionHandler(f).converter if converter is not None: if converter.target_format == 'folder': - result.append({ - 'before': f, - 'after': volume_folder - }) + result[f] = volume_folder else: - result.append({ - 'before': f, - 'after': splitext(f)[0] + '.' + converter.target_format - }) + result[f] = splitext(f)[0] + '.' + converter.target_format return result @@ -202,7 +189,7 @@ def preview_mass_convert( def mass_convert( volume_id: int, issue_id: Union[int, None] = None, - filepath_filter: Union[List[str], None] = None, + filepath_filter: List[str] = [], update_websocket: bool = False ) -> None: """Convert files for a volume or issue. @@ -213,58 +200,48 @@ def mass_convert( issue_id (Union[int, None], optional): The ID of the issue to convert for. Defaults to None. - filepath_filter (Union[List[str], None], optional): Only convert files + filepath_filter (List[str], optional): Only convert files mentioned in this list. - Defaults to None. + Defaults to []. update_websocket (bool, optional): Send task progress updates over the websocket. Defaults to False. """ - # We're checking a lot if strings are in this list, - # so making it a set will increase performance (due to hashing). - hashed_files = set(filepath_filter or []) - settings = Settings().get_settings() volume = Volume(volume_id) - format_preference = settings.format_preference extract_issue_ranges = settings.extract_issue_ranges - planned_conversions: List[Tuple[str, List[str]]] = [] - for f in ( - f["filepath"] + planned_conversions: List[FileConversionHandler] = [] + for f in filtered_iter( + (f["filepath"] for f in ( volume.get_all_files() if not issue_id else volume.get_issue(issue_id).get_files() - ) + )), + set(filepath_filter) ): - if hashed_files and f not in hashed_files: - continue - converted = False if ( extract_issue_ranges and splitext(f)[1].lower() in FileConstants.EXTRACTABLE_EXTENSIONS and archive_contains_issues(f) ): - converter = find_target_format_file( - f, - ['folder'] - ) - if converter is not None: - resulting_files = converter.convert(f) + converter = FileConversionHandler(f, ['folder']) + if converter.converter is not None: + resulting_files = convert_file(converter) for file in resulting_files: - planned_conversions.append( - (file, format_preference) - ) + fch = FileConversionHandler(file) + if fch.converter is not None: + planned_conversions.append(fch) converted = True if not converted: - planned_conversions.append( - (f, format_preference) - ) + fch = FileConversionHandler(f) + if fch.converter is not None: + planned_conversions.append(fch) total_count = len(planned_conversions) @@ -273,9 +250,7 @@ def mass_convert( elif total_count == 1: # Avoid mp overhead when we're only converting one file - convert_file( - *planned_conversions[0] - ) + convert_file(planned_conversions[0]) else: # Commit changes because new connections are opened in the processes @@ -287,14 +262,14 @@ def mass_convert( message=f'Converted 0/{total_count}' ) for idx, _ in enumerate(pool.imap_unordered( - map_convert_file, + convert_file, planned_conversions )): ws.update_task_status( message=f'Converted {idx+1}/{total_count}' ) else: - pool.starmap( + pool.map( convert_file, planned_conversions ) diff --git a/backend/implementations/converters.py b/backend/implementations/converters.py index 3e3d30f..b368edc 100644 --- a/backend/implementations/converters.py +++ b/backend/implementations/converters.py @@ -4,33 +4,47 @@ Contains all the converters for converting from one format to another """ -from abc import ABC, abstractmethod +from __future__ import annotations + from os import utime -from os.path import basename, dirname, getmtime, join, relpath, sep, splitext +from os.path import basename, dirname, getmtime, join, splitext from shutil import make_archive -from subprocess import call as spc, run as sprun +from subprocess import run from sys import platform -from typing import List, Union +from typing import TYPE_CHECKING, List, final from zipfile import ZipFile -from backend.base.definitions import (SCANNABLE_EXTENSIONS, - Constants, FileConstants) +from backend.base.definitions import (SCANNABLE_EXTENSIONS, Constants, + FileConstants, FileConverter) from backend.base.file_extraction import extract_filename_data from backend.base.files import (create_folder, delete_empty_parent_folders, delete_file_folder, folder_path, - list_files, rename_file) + generate_archive_folder, list_files, + rename_file) from backend.base.logging import LOGGER from backend.implementations.matching import folder_extraction_filter from backend.implementations.naming import mass_rename from backend.implementations.volumes import Volume, scan_files from backend.internals.db_models import FilesDB -rar_executables = { - 'linux': folder_path('backend', 'lib', 'rar_linux_64'), - 'darwin': folder_path('backend', 'lib', 'rar_bsd_64'), - 'win32': folder_path('backend', 'lib', 'rar_windows_64.exe') -} -"Maps a platform name to it's rar executable" +if TYPE_CHECKING: + from subprocess import CompletedProcess + + +def run_rar(args: List[str]) -> CompletedProcess[str]: + """Run rar executable. Platform is taken care of inside function. + + Args: + args (List[str]): The arguments to give to the executable. + + Raises: + KeyError: Platform not supported. + + Returns: + CompletedProcess[str]: The result of the process. + """ + exe = folder_path('backend', 'lib', Constants.RAR_EXECUTABLES[platform]) + return run([exe, *args], capture_output=True, text=True) def extract_files_from_folder( @@ -63,17 +77,13 @@ def extract_files_from_folder( c for c in folder_contents if ( - not 'variant cover' in c.lower() - and ( - basename(c).lower() in FileConstants.METADATA_FILES - or - folder_extraction_filter( - extract_filename_data(c, False), - volume_data, - volume_issues, - end_year - ) + folder_extraction_filter( + extract_filename_data(c, False), + volume_data, + volume_issues, + end_year ) + and 'variant cover' not in c.lower() ) ] LOGGER.debug(f'Relevant files: {rel_files}') @@ -94,108 +104,48 @@ def extract_files_from_folder( return result -def _run_rar(args: List[str]) -> int: - """ - Run rar executable. This function takes care of the platform. - Note: It is already expected when this function is called - that the platform is supported. The check should be done outside. - - Args: - args (List[str]): The arguments to give to the executable. - - Returns: - int: The exit code of the executable. - """ - exe = rar_executables[platform] - - return spc([exe, *args]) - - -def get_rar_output(args: List[str]) -> str: - """ - Run rar executable and return stdout. This function takes care of - the platform. Note: It is already expected when this function is called - that the platform is supported. The check should be done outside. - - Args: - args (List[str]): The arguments to give to the executable. - - Returns: - str: The stdout of the executable. - """ - exe = rar_executables[platform] - - return sprun([exe, *args], capture_output=True, text=True).stdout - - -class FileConverter(ABC): - source_format: str - target_format: str - - @staticmethod - @abstractmethod - def convert(file: str) -> Union[str, List[str]]: - """Convert a file from source_format to target_format. - - Args: - file (str): Filepath to the source file, should be in source_format. - - Returns: - Union[str, List[str]]: The resulting files or directories, in target_format. - """ - ... - # ===================== -# ZIP +# region ZIP # ===================== - - +@final class ZIPtoCBZ(FileConverter): source_format = 'zip' target_format = 'cbz' @staticmethod - def convert(file: str) -> str: + def convert(file: str) -> List[str]: target = splitext(file)[0] + '.cbz' rename_file( file, target ) - return target + return [target] +@final class ZIPtoRAR(FileConverter): source_format = 'zip' target_format = 'rar' @staticmethod - def convert(file: str) -> str: + def convert(file: str) -> List[str]: volume_id = FilesDB.volume_of_file(file) if not volume_id: # File not matched to volume - return file + return [file] + volume_folder = Volume(volume_id).vd.folder - archive_folder = join( - volume_folder, - Constants.ARCHIVE_EXTRACT_FOLDER, - splitext( - '_'.join( - relpath( - file, - volume_folder - ).split(sep) - ) - )[0] - ) + archive_folder = generate_archive_folder(volume_folder, file) with ZipFile(file, 'r') as zip: zip.extractall(archive_folder) - _run_rar([ - 'a', - '-ep', '-inul', - splitext(file)[0], - archive_folder + run_rar([ + 'a', # Add files to archive + '-ep', # Exclude paths from names + '-inul', # Disable all messages + splitext(file)[0], # Ext-less target filename of created archive + archive_folder # Source folder ]) delete_file_folder(archive_folder) @@ -203,20 +153,22 @@ def convert(file: str) -> str: delete_empty_parent_folders(dirname(file), volume_folder) delete_empty_parent_folders(dirname(archive_folder), volume_folder) - return splitext(file)[0] + '.rar' + return [splitext(file)[0] + '.rar'] +@final class ZIPtoCBR(FileConverter): source_format = 'zip' target_format = 'cbr' @staticmethod - def convert(file: str) -> str: - rar_file = ZIPtoRAR.convert(file) + def convert(file: str) -> List[str]: + rar_file = ZIPtoRAR.convert(file)[0] cbr_file = RARtoCBR.convert(rar_file) return cbr_file +@final class ZIPtoFOLDER(FileConverter): source_format = 'zip' target_format = 'folder' @@ -227,25 +179,15 @@ def convert(file: str) -> List[str]: if not volume_id: # File not matched to volume return [file] + volume_folder = Volume(volume_id).vd.folder - zip_folder = join( - volume_folder, - Constants.ARCHIVE_EXTRACT_FOLDER, - splitext( - '_'.join( - relpath( - file, - volume_folder - ).split(sep) - ) - )[0], - ) + archive_folder = generate_archive_folder(volume_folder, file) with ZipFile(file, 'r') as zip: - zip.extractall(zip_folder) + zip.extractall(archive_folder) resulting_files = extract_files_from_folder( - zip_folder, + archive_folder, volume_id ) @@ -256,52 +198,55 @@ def convert(file: str) -> List[str]: filepath_filter=resulting_files ) - delete_file_folder(zip_folder) + delete_file_folder(archive_folder) delete_file_folder(file) delete_empty_parent_folders(dirname(file), volume_folder) - delete_empty_parent_folders(dirname(zip_folder), volume_folder) + delete_empty_parent_folders(dirname(archive_folder), volume_folder) return resulting_files + # ===================== -# CBZ +# region CBZ # ===================== - - +@final class CBZtoZIP(FileConverter): source_format = 'cbz' target_format = 'zip' @staticmethod - def convert(file: str) -> str: + def convert(file: str) -> List[str]: target = splitext(file)[0] + '.zip' rename_file( file, target ) - return target + return [target] +@final class CBZtoRAR(FileConverter): source_format = 'cbz' target_format = 'rar' @staticmethod - def convert(file: str) -> str: + def convert(file: str) -> List[str]: return ZIPtoRAR.convert(file) +@final class CBZtoCBR(FileConverter): source_format = 'cbz' target_format = 'cbr' @staticmethod - def convert(file: str) -> str: - rar_file = ZIPtoRAR.convert(file) + def convert(file: str) -> List[str]: + rar_file = ZIPtoRAR.convert(file)[0] cbr_file = RARtoCBR.convert(rar_file) return cbr_file +@final class CBZtoFOLDER(FileConverter): source_format = 'cbz' target_format = 'folder' @@ -310,84 +255,81 @@ class CBZtoFOLDER(FileConverter): def convert(file: str) -> List[str]: return ZIPtoFOLDER.convert(file) + # ===================== -# RAR +# region RAR # ===================== - - +@final class RARtoCBR(FileConverter): source_format = 'rar' target_format = 'cbr' @staticmethod - def convert(file: str) -> str: + def convert(file: str) -> List[str]: target = splitext(file)[0] + '.cbr' rename_file( file, target ) - return target + return [target] +@final class RARtoZIP(FileConverter): source_format = 'rar' target_format = 'zip' @staticmethod - def convert(file: str) -> str: + def convert(file: str) -> List[str]: volume_id = FilesDB.volume_of_file(file) if not volume_id: # File not matched to volume - return file - volume_folder = Volume(volume_id).vd.folder - rar_folder = join( - volume_folder, - Constants.ARCHIVE_EXTRACT_FOLDER, - splitext( - '_'.join( - relpath( - file, - volume_folder - ).split(sep) - ) - )[0] - ) - - create_folder(rar_folder) + return [file] - _run_rar([ - 'x', - '-inul', - file, - rar_folder + volume_folder = Volume(volume_id).vd.folder + archive_folder = generate_archive_folder(volume_folder, file) + create_folder(archive_folder) + + run_rar([ + 'x', # Extract files with full path + '-inul', # Disable all messages + file, # Source archive file + archive_folder # Target folder to extract into ]) - for f in list_files(rar_folder): - if getmtime(f) <= 315619200: - utime(f, (315619200, 315619200)) + # Files that are put in a ZIP file have to have a minimum last + # modification time. + for f in list_files(archive_folder): + if getmtime(f) <= Constants.ZIP_MIN_MOD_TIME: + utime( + f, + (Constants.ZIP_MIN_MOD_TIME, Constants.ZIP_MIN_MOD_TIME) + ) target_file = splitext(file)[0] - target_archive = make_archive(target_file, 'zip', rar_folder) + target_archive = make_archive(target_file, 'zip', archive_folder) - delete_file_folder(rar_folder) + delete_file_folder(archive_folder) delete_file_folder(file) delete_empty_parent_folders(dirname(file), volume_folder) - delete_empty_parent_folders(dirname(rar_folder), volume_folder) + delete_empty_parent_folders(dirname(archive_folder), volume_folder) - return target_archive + return [target_archive] +@final class RARtoCBZ(FileConverter): source_format = 'rar' target_format = 'cbz' @staticmethod - def convert(file: str) -> str: - zip_file = RARtoZIP.convert(file) + def convert(file: str) -> List[str]: + zip_file = RARtoZIP.convert(file)[0] cbz_file = ZIPtoCBZ.convert(zip_file) return cbz_file +@final class RARtoFOLDER(FileConverter): source_format = 'rar' target_format = 'folder' @@ -398,31 +340,20 @@ def convert(file: str) -> List[str]: if not volume_id: # File not matched to volume return [file] - volume_folder = Volume(volume_id).vd.folder - rar_folder = join( - volume_folder, - Constants.ARCHIVE_EXTRACT_FOLDER, - splitext( - '_'.join( - relpath( - file, - volume_folder - ).split(sep) - ) - )[0] - ) - - create_folder(rar_folder) - _run_rar([ - 'x', - '-inul', - file, - rar_folder + volume_folder = Volume(volume_id).vd.folder + archive_folder = generate_archive_folder(volume_folder, file) + create_folder(archive_folder) + + run_rar([ + 'x', # Extract files with full path + '-inul', # Disable all messages + file, # Source archive file + archive_folder # Target folder to extract into ]) resulting_files = extract_files_from_folder( - rar_folder, + archive_folder, volume_id ) @@ -433,52 +364,55 @@ def convert(file: str) -> List[str]: filepath_filter=resulting_files ) - delete_file_folder(rar_folder) + delete_file_folder(archive_folder) delete_file_folder(file) delete_empty_parent_folders(dirname(file), volume_folder) - delete_empty_parent_folders(dirname(rar_folder), volume_folder) + delete_empty_parent_folders(dirname(archive_folder), volume_folder) return resulting_files + # ===================== -# CBR +# region CBR # ===================== - - +@final class CBRtoRAR(FileConverter): source_format = 'cbr' target_format = 'rar' @staticmethod - def convert(file: str) -> str: + def convert(file: str) -> List[str]: target = splitext(file)[0] + '.rar' rename_file( file, target ) - return target + return [target] +@final class CBRtoZIP(FileConverter): source_format = 'cbr' target_format = 'zip' @staticmethod - def convert(file: str) -> str: + def convert(file: str) -> List[str]: return RARtoZIP.convert(file) +@final class CBRtoCBZ(FileConverter): source_format = 'cbr' target_format = 'cbz' @staticmethod - def convert(file: str) -> str: - zip_file = RARtoZIP.convert(file) + def convert(file: str) -> List[str]: + zip_file = RARtoZIP.convert(file)[0] cbz_file = ZIPtoCBZ.convert(zip_file) return cbz_file +@final class CBRtoFOLDER(FileConverter): source_format = 'cbr' target_format = 'folder' diff --git a/backend/internals/settings.py b/backend/internals/settings.py index ba9d3f5..132dadb 100644 --- a/backend/internals/settings.py +++ b/backend/internals/settings.py @@ -367,9 +367,9 @@ def __format_value(self, key: str, value: Any) -> Any: elif key == 'format_preference': from backend.implementations.conversion import \ - get_available_formats + FileConversionHandler - available = get_available_formats() + available = FileConversionHandler.get_available_formats() for entry in value: if entry not in available: raise InvalidSettingValue(key, value) diff --git a/frontend/api.py b/frontend/api.py index 8aca45f..c7de149 100644 --- a/frontend/api.py +++ b/frontend/api.py @@ -53,7 +53,7 @@ get_blocklist, get_blocklist_entry) from backend.implementations.comicvine import ComicVine -from backend.implementations.conversion import (get_available_formats, +from backend.implementations.conversion import (FileConversionHandler, preview_mass_convert) from backend.implementations.download_direct_clients import credentials from backend.implementations.download_torrent_clients import (TorrentClients, @@ -375,7 +375,7 @@ def api_tasks(): or isinstance(filepath_filter, list) ): raise InvalidKeyValue('filepath_filter', filepath_filter) - kwargs['filepath_filter'] = filepath_filter + kwargs['filepath_filter'] = filepath_filter or [] if task.action == 'update_all': allow_skipping = data.get('allow_skipping', False) @@ -476,7 +476,7 @@ def api_settings_api_key(): @error_handler @auth def api_settings_available_formats(): - result = list(get_available_formats()) + result = list(FileConversionHandler.get_available_formats()) return return_api(result) diff --git a/frontend/static/js/view_volume.js b/frontend/static/js/view_volume.js index 05c815f..76e72ae 100644 --- a/frontend/static/js/view_volume.js +++ b/frontend/static/js/view_volume.js @@ -541,19 +541,19 @@ function showConvert(api_key, issue_id=null) { const table = table_container.querySelector('tbody'); table.innerHTML = ''; - if (!json.result.length) { + if (!Object.keys(json.result).length) { hide([table_container, convert_button], [empty_rename]); } else { hide([empty_rename], [table_container, convert_button]); - json.result.forEach(convert_entry => { - const before = ViewEls.pre_build.rename_before.cloneNode(true); - table.appendChild(before); - const after = ViewEls.pre_build.rename_after.cloneNode(true); - table.appendChild(after); - - before.querySelector('td:last-child').innerText = convert_entry.before; - after.querySelector('td:last-child').innerText = convert_entry.after; + Object.entries(json.result).forEach(mapping => { + const before_row = ViewEls.pre_build.rename_before.cloneNode(true); + table.appendChild(before_row); + const after_row = ViewEls.pre_build.rename_after.cloneNode(true); + table.appendChild(after_row); + + before_row.querySelector('td:last-child').innerText = mapping[0]; + after_row.querySelector('td:last-child').innerText = mapping[1]; }); }; showWindow('convert-window');