Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: chunk initial symlinks on re-ingest #882

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 74 additions & 51 deletions src/program/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,63 +393,86 @@ def _init_db_from_symlinks(self):
return

logger.log("PROGRAM", "Collecting items from symlinks, this may take a while depending on library size")
items = self.services[SymlinkLibrary].run()
errors = []
added_items = set()

progress, console = create_progress_bar(len(items))
task = progress.add_task("Enriching items with metadata", total=len(items), log="")

with Live(progress, console=console, refresh_per_second=10):
workers = int(os.getenv("SYMLINK_MAX_WORKERS", 4))
with ThreadPoolExecutor(thread_name_prefix="EnhanceSymlinks", max_workers=workers) as executor:
future_to_item = {
executor.submit(self._enhance_item, item): item
for item in items
if isinstance(item, (Movie, Show))
}

for future in as_completed(future_to_item):
item = future_to_item[future]
log_message = ""

try:
items = self.services[SymlinkLibrary].run()
errors = []
added_items = set()

# Convert items to list and get total count
items_list = [item for item in items if isinstance(item, (Movie, Show))]
total_items = len(items_list)

progress, console = create_progress_bar(total_items)
task = progress.add_task("Enriching items with metadata", total=total_items, log="")

# Process in chunks of 100 items
chunk_size = 100
with Live(progress, console=console, refresh_per_second=10):
workers = int(os.getenv("SYMLINK_MAX_WORKERS", 4))

for i in range(0, total_items, chunk_size):
chunk = items_list[i:i + chunk_size]
try:
if not item or item.imdb_id in added_items:
errors.append(f"Duplicate symlink directory found for {item.log_string}")
continue

# Check for existing item using your db_functions
if db_functions.get_item_by_id(item.id, session=session):
errors.append(f"Duplicate item found in database for id: {item.id}")
continue

enhanced_item = future.result()
if not enhanced_item:
errors.append(f"Failed to enhance {item.log_string} ({item.imdb_id}) with Trakt Indexer")
continue

enhanced_item.store_state()
session.add(enhanced_item)
added_items.add(item.imdb_id)

log_message = f"Indexed IMDb Id: {enhanced_item.id} as {enhanced_item.type.title()}: {enhanced_item.log_string}"
except NotADirectoryError:
errors.append(f"Skipping {item.log_string} as it is not a valid directory")
with ThreadPoolExecutor(thread_name_prefix="EnhanceSymlinks", max_workers=workers) as executor:
future_to_item = {
executor.submit(self._enhance_item, item): item
for item in chunk
}

for future in as_completed(future_to_item):
item = future_to_item[future]
log_message = ""

try:
if not item or item.imdb_id in added_items:
errors.append(f"Duplicate symlink directory found for {item.log_string}")
continue

if db_functions.get_item_by_id(item.id, session=session):
errors.append(f"Duplicate item found in database for id: {item.id}")
continue

enhanced_item = future.result()
if not enhanced_item:
errors.append(f"Failed to enhance {item.log_string} ({item.imdb_id}) with Trakt Indexer")
continue

enhanced_item.store_state()
session.add(enhanced_item)
added_items.add(item.imdb_id)

log_message = f"Indexed IMDb Id: {enhanced_item.id} as {enhanced_item.type.title()}: {enhanced_item.log_string}"
except NotADirectoryError:
errors.append(f"Skipping {item.log_string} as it is not a valid directory")
except Exception as e:
logger.exception(f"Error processing {item.log_string}: {e}")
raise # Re-raise to trigger rollback
finally:
progress.update(task, advance=1, log=log_message)

# Only commit if the entire chunk was successful
session.commit()

except Exception as e:
logger.exception(f"Error processing {item.log_string}: {e}")
finally:
progress.update(task, advance=1, log=log_message)

session.rollback()
logger.error(f"Failed to process chunk {i//chunk_size + 1}, rolling back all changes: {str(e)}")
raise # Re-raise to abort the entire process
progress.update(task, log="Finished Indexing Symlinks!")
session.commit()

if errors:
logger.error("Errors encountered during initialization")
for error in errors:
logger.error(error)
if errors:
logger.error("Errors encountered during initialization")
for error in errors:
logger.error(error)

except Exception as e:
session.rollback()
logger.error(f"Failed to initialize database from symlinks: {str(e)}")
return

elapsed_time = datetime.now() - start_time
total_seconds = elapsed_time.total_seconds()
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
logger.success(f"Database initialized, time taken: h{int(hours):02d}:m{int(minutes):02d}:s{int(seconds):02d}")
logger.success(f"Database initialized, time taken: h{int(hours):02d}:m{int(minutes):02d}:s{int(seconds):02d}")
Loading