Skip to content

Commit

Permalink
fix: rollback on bad chunk
Browse files Browse the repository at this point in the history
  • Loading branch information
dreulavelle committed Nov 14, 2024
1 parent 2b0af33 commit c0994f9
Showing 1 changed file with 72 additions and 59 deletions.
131 changes: 72 additions & 59 deletions src/program/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,70 +393,83 @@ def _init_db_from_symlinks(self):
return

logger.log("PROGRAM", "Collecting items from symlinks, this may take a while depending on library size")
items = self.services[SymlinkLibrary].run()
errors = []
added_items = set()

# Convert items to list and get total count
items_list = [item for item in items if isinstance(item, (Movie, Show))]
total_items = len(items_list)

progress, console = create_progress_bar(total_items)
task = progress.add_task("Enriching items with metadata", total=total_items, log="")
try:
items = self.services[SymlinkLibrary].run()
errors = []
added_items = set()

# Process in chunks of 100 items
chunk_size = 100
with Live(progress, console=console, refresh_per_second=10):
workers = int(os.getenv("SYMLINK_MAX_WORKERS", 4))
# Convert items to list and get total count
items_list = [item for item in items if isinstance(item, (Movie, Show))]
total_items = len(items_list)

for i in range(0, total_items, chunk_size):
chunk = items_list[i:i + chunk_size]
progress, console = create_progress_bar(total_items)
task = progress.add_task("Enriching items with metadata", total=total_items, log="")

# Process in chunks of 100 items
chunk_size = 100
with Live(progress, console=console, refresh_per_second=10):
workers = int(os.getenv("SYMLINK_MAX_WORKERS", 4))

with ThreadPoolExecutor(thread_name_prefix="EnhanceSymlinks", max_workers=workers) as executor:
future_to_item = {
executor.submit(self._enhance_item, item): item
for item in chunk
}

for future in as_completed(future_to_item):
item = future_to_item[future]
log_message = ""

try:
if not item or item.imdb_id in added_items:
errors.append(f"Duplicate symlink directory found for {item.log_string}")
continue

if db_functions.get_item_by_id(item.id, session=session):
errors.append(f"Duplicate item found in database for id: {item.id}")
continue

enhanced_item = future.result()
if not enhanced_item:
errors.append(f"Failed to enhance {item.log_string} ({item.imdb_id}) with Trakt Indexer")
continue

enhanced_item.store_state()
session.add(enhanced_item)
added_items.add(item.imdb_id)

log_message = f"Indexed IMDb Id: {enhanced_item.id} as {enhanced_item.type.title()}: {enhanced_item.log_string}"
except NotADirectoryError:
errors.append(f"Skipping {item.log_string} as it is not a valid directory")
except Exception as e:
logger.exception(f"Error processing {item.log_string}: {e}")
finally:
progress.update(task, advance=1, log=log_message)

# Commit after each chunk
session.commit()
for i in range(0, total_items, chunk_size):
chunk = items_list[i:i + chunk_size]

try:
with ThreadPoolExecutor(thread_name_prefix="EnhanceSymlinks", max_workers=workers) as executor:
future_to_item = {
executor.submit(self._enhance_item, item): item
for item in chunk
}

for future in as_completed(future_to_item):
item = future_to_item[future]
log_message = ""

try:
if not item or item.imdb_id in added_items:
errors.append(f"Duplicate symlink directory found for {item.log_string}")
continue

if db_functions.get_item_by_id(item.id, session=session):
errors.append(f"Duplicate item found in database for id: {item.id}")
continue

enhanced_item = future.result()
if not enhanced_item:
errors.append(f"Failed to enhance {item.log_string} ({item.imdb_id}) with Trakt Indexer")
continue

enhanced_item.store_state()
session.add(enhanced_item)
added_items.add(item.imdb_id)

log_message = f"Indexed IMDb Id: {enhanced_item.id} as {enhanced_item.type.title()}: {enhanced_item.log_string}"
except NotADirectoryError:
errors.append(f"Skipping {item.log_string} as it is not a valid directory")
except Exception as e:
logger.exception(f"Error processing {item.log_string}: {e}")
raise # Re-raise to trigger rollback
finally:
progress.update(task, advance=1, log=log_message)

# Only commit if the entire chunk was successful
session.commit()

except Exception as e:
session.rollback()
logger.error(f"Failed to process chunk {i//chunk_size + 1}, rolling back all changes: {str(e)}")
raise # Re-raise to abort the entire process

progress.update(task, log="Finished Indexing Symlinks!")
progress.update(task, log="Finished Indexing Symlinks!")

if errors:
logger.error("Errors encountered during initialization")
for error in errors:
logger.error(error)

if errors:
logger.error("Errors encountered during initialization")
for error in errors:
logger.error(error)
except Exception as e:
session.rollback()
logger.error(f"Failed to initialize database from symlinks: {str(e)}")
return

elapsed_time = datetime.now() - start_time
total_seconds = elapsed_time.total_seconds()
Expand Down

0 comments on commit c0994f9

Please sign in to comment.