Skip to content

Commit

Permalink
chore: Refactor plex.py for improved matching and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Gyarbij committed Aug 31, 2024
1 parent 02bbd4a commit 0c0cac8
Showing 1 changed file with 80 additions and 58 deletions.
138 changes: 80 additions & 58 deletions plexist/modules/plex.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,28 @@

DB_PATH = os.getenv('DB_PATH', 'plexist.db')

def initialize_db():
conn = sqlite3.connect('plexist.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS plexist (
title TEXT,
artist TEXT,
album TEXT,
year INTEGER,
genre TEXT,
plex_id INTEGER
)
''')
conn.commit()
conn.close()
def initialize_db():
conn = sqlite3.connect('plexist.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS plexist (
title TEXT,
artist TEXT,
album TEXT,
year INTEGER,
genre TEXT,
plex_id INTEGER
)
''')
conn.commit()
conn.close()

def _get_available_plex_tracks(plex: PlexServer, tracks: List[Track]) -> List:
with ThreadPoolExecutor() as executor:
results = list(executor.map(lambda track: _match_single_track(plex, track), tracks))
plex_tracks = [result[0] for result in results if result[0]]
missing_tracks = [result[1] for result in results if result[1]]
return plex_tracks, missing_tracks

def _match_single_track(plex, track, year=None, genre=None):
plex_id = get_matched_song(track.title, track.artist, track.album)
Expand All @@ -40,9 +47,9 @@ def _match_single_track(plex, track, year=None, genre=None):
def similarity(a, b):
return SequenceMatcher(None, a.lower(), b.lower()).ratio()

def search_and_score(query, threshold):
def search_and_score(query, threshold, year_weight=0, genre_weight=0):
try:
search = plex.search(query, mediatype="track", limit=10)
search = plex.search(query, mediatype="track", limit=20) # Increased limit
except BadRequest:
logging.info(f"Failed to search {query} on Plex")
return None, 0
Expand All @@ -55,7 +62,7 @@ def search_and_score(query, threshold):
score += similarity(s.title, track.title) * 0.4
score += similarity(s.artist().title, track.artist) * 0.3
score += similarity(s.album().title, track.album) * 0.2

# Check for version in parentheses
if '(' in track.title and '(' in s.title:
version_similarity = similarity(
Expand All @@ -64,50 +71,63 @@ def search_and_score(query, threshold):
)
score += version_similarity * 0.1

# Year and Genre Matching (if available)
if year and s.year:
score += (s.year == year) * year_weight
if genre and s.genres:
genre_matches = any(similarity(g.tag, genre) > 0.8 for g in s.genres)
score += genre_matches * genre_weight

if score > best_score:
best_score = score
best_match = s

return (best_match, best_score) if best_score >= threshold else (None, 0)

# Stage 1: Strict matching
# Stage 1: Strict matching (including year and genre if available)
query = f"{track.title} {track.artist} {track.album}"
match, score = search_and_score(query, 0.8)
match, score = search_and_score(query, 0.85, year_weight=0.2, genre_weight=0.1) # Adjusted weights
if match:
insert_matched_song(track.title, track.artist, track.album, match.ratingKey)
return match, None

# Stage 2: Relax album requirement
query = f"{track.title} {track.artist}"
match, score = search_and_score(query, 0.7)
match, score = search_and_score(query, 0.75) # Slightly higher threshold
if match:
logging.info(f"Matched '{track.title}' by '{track.artist}' with relaxed album criteria. Score: {score}")
insert_matched_song(track.title, track.artist, track.album, match.ratingKey)
return match, None

# Stage 3: Further relaxation
# Stage 3: Further relaxation (partial title)
words = track.title.split()
if len(words) > 1:
query = f"{' '.join(words[:2])} {track.artist}"
match, score = search_and_score(query, 0.4)
match, score = search_and_score(query, 0.6) # Increased threshold
if match:
logging.info(f"Matched '{track.title}' by '{track.artist}' with partial title. Score: {score}")
insert_matched_song(track.title, track.artist, track.album, match.ratingKey)
return match, None

# Stage 4: Artist Only Match (for compilations, soundtracks, etc.)
query = f"{track.artist}"
match, score = search_and_score(query, 0.65)
if match:
logging.info(f"Matched '{track.title}' by '{track.artist}' with artist only. Score: {score}")
insert_matched_song(track.title, track.artist, track.album, match.ratingKey)
return match, None

# Stage 5: Title Only Match (last resort)
query = f"{track.title}"
match, score = search_and_score(query, 0.55)
if match:
logging.info(f"Matched '{track.title}' by '{track.artist}' with title only. Score: {score}")
insert_matched_song(track.title, track.artist, track.album, match.ratingKey)
return match, None

logging.info(f"No match found for track {track.title} by {track.artist}.")
return None, track

def insert_matched_song(title, artist, album, plex_id):
conn = sqlite3.connect('plexist.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO plexist (title, artist, album, plex_id)
VALUES (?, ?, ?, ?)
''', (title, artist, album, plex_id))
conn.commit()
conn.close()


def get_matched_song(title, artist, album):
conn = sqlite3.connect('plexist.db')
cursor = conn.cursor()
Expand All @@ -119,30 +139,15 @@ def get_matched_song(title, artist, album):
conn.close()
return result[0] if result else None


def _write_csv(tracks: List[Track], name: str, path: str = "/data") -> None:
data_folder = pathlib.Path(path)
data_folder.mkdir(parents=True, exist_ok=True)
file = data_folder / f"{name}.csv"
with open(file, "w", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(Track.__annotations__.keys())
for track in tracks:
writer.writerow(
[track.title, track.artist, track.album, track.url]
)

def _delete_csv(name: str, path: str = "/data") -> None:
data_folder = pathlib.Path(path)
file = data_folder / f"{name}.csv"
file.unlink()

def _get_available_plex_tracks(plex: PlexServer, tracks: List[Track]) -> List:
with ThreadPoolExecutor() as executor:
results = list(executor.map(lambda track: _match_single_track(plex, track), tracks))
plex_tracks = [result[0] for result in results if result[0]]
missing_tracks = [result[1] for result in results if result[1]]
return plex_tracks, missing_tracks
def insert_matched_song(title, artist, album, plex_id):
conn = sqlite3.connect('plexist.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO plexist (title, artist, album, plex_id)
VALUES (?, ?, ?, ?)
''', (title, artist, album, plex_id))
conn.commit()
conn.close()

def _update_plex_playlist(
plex: PlexServer,
Expand Down Expand Up @@ -213,6 +218,23 @@ def update_or_create_plex_playlist(
except Exception as e:
logging.error("Failed to delete %s.csv: %s", playlist.name, str(e))

def _write_csv(tracks: List[Track], name: str, path: str = "/data") -> None:
data_folder = pathlib.Path(path)
data_folder.mkdir(parents=True, exist_ok=True)
file = data_folder / f"{name}.csv"
with open(file, "w", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(Track.__annotations__.keys())
for track in tracks:
writer.writerow(
[track.title, track.artist, track.album, track.url]
)

def _delete_csv(name: str, path: str = "/data") -> None:
data_folder = pathlib.Path(path)
file = data_folder / f"{name}.csv"
file.unlink()

def end_session():
if 'conn' in locals() or 'conn' in globals():
conn.close()

0 comments on commit 0c0cac8

Please sign in to comment.