diff --git a/tagstudio/src/core/library.py b/tagstudio/src/core/library.py index 107b88cfb..9a8ef4900 100644 --- a/tagstudio/src/core/library.py +++ b/tagstudio/src/core/library.py @@ -14,7 +14,7 @@ from enum import Enum from pathlib import Path -from typing import cast, Generator +from typing import DefaultDict, cast, Generator from typing_extensions import Self from src.core.enums import FieldID @@ -43,6 +43,15 @@ class ItemType(Enum): logging.basicConfig(format="%(message)s", level=logging.INFO) +class KeyNameConstants: + """Class, containing constants for special keys in search query, that are not supposed to be visible to user.""" + + # FILENAME_KEYNAME = "filename" + # TAG_ID_KEYNAME = "tag_id" + UNBOUND_QUERY_ARGUMENTS_KEYNAME = "UNBOUND" + EMPTY_FIELD_QUERY_KEYNAME = "EMPTY" + + class Entry: """A Library Entry Object. Referenced by ID.""" @@ -433,7 +442,7 @@ def create_library(self, path: Path) -> int: self.verify_ts_folders() self.save_library_to_disk() self.open_library(self.library_dir) - except: + except Exception: traceback.print_exc() return 2 @@ -790,7 +799,7 @@ def to_json(self): def save_library_to_disk(self): """Saves the Library to disk at the default TagStudio folder location.""" - logging.info(f"[LIBRARY] Saving Library to Disk...") + logging.info("[LIBRARY] Saving Library to Disk...") start_time = time.time() filename = "ts_library.json" @@ -817,7 +826,7 @@ def save_library_backup_to_disk(self) -> str: Saves a backup file of the Library to disk at the default TagStudio folder location. Returns the filename used, including the date and time.""" - logging.info(f"[LIBRARY] Saving Library Backup to Disk...") + logging.info("[LIBRARY] Saving Library Backup to Disk...") start_time = time.time() filename = f'ts_library_backup_{datetime.datetime.utcnow().strftime("%F_%T").replace(":", "")}.json' @@ -1320,6 +1329,64 @@ def get_entry_id_from_filepath(self, filename: Path): except KeyError: return -1 + def parse_metadata( + self, query: str | None = None + ) -> list[dict[str, str | list[str]]]: + """ + Splits query into several maps 'meta_key -> value'\n + Values without specified key parsed as tags and put in 'unbound' key \n + example query1: + "meta_first: value; meta_second: value; tag1; | meta_first: value; meta_second: value; notag;" + example query2: + "tag1 | notag | tag2" + """ + unbound_keyname = KeyNameConstants.UNBOUND_QUERY_ARGUMENTS_KEYNAME + if query is None: + return [] + meta_list: list = [] + meta_conditions = query.strip().split("|") + for meta_condition in meta_conditions: + query_part: dict = {} + field_data_list = meta_condition.strip().split(";") + for field_data in field_data_list: + field_parsed = field_data.strip().split(":") + # if no ':' found, process this part as tags and flags + if len(field_parsed) == 1: + unbound_values = field_parsed[0].strip().casefold().split() + if query_part.get(unbound_keyname) is None: + query_part[unbound_keyname] = unbound_values + else: + query_part[unbound_keyname].extend(unbound_values) + continue + if len(field_parsed) > 2: + logging.warning("""[ERROR] Attempt to parse mutiple fields\ + as one! Do not forget to specify ';'\ + between different meta fields""") + query_part[field_parsed[0].strip().casefold()] = ( + field_parsed[1].strip().casefold() + ) + negative_flags: list = [] + # selecting null fields are done with '-field' word in unbound part + if unbound_keyname in query_part: + flags: list = query_part.get(unbound_keyname, []) + for flag in flags.copy(): + if flag[0] == "-": + negative_flag = flag[1:] + negative_flags.append(negative_flag) + flags.remove(flag) + if negative_flags: + query_part[KeyNameConstants.EMPTY_FIELD_QUERY_KEYNAME] = ( + negative_flags + ) + if flags: + query_part[unbound_keyname] = flags + else: + query_part.pop(unbound_keyname) + meta_list.append(query_part) + + logging.info(f"Parsed values: {meta_list}") + return meta_list + def search_library( self, query: str = None, @@ -1332,229 +1399,37 @@ def search_library( Uses a search query to generate a filtered results list. Returns a list of (str, int) tuples consisting of a result type and ID. """ - # self.filtered_entries.clear() - results: list[tuple[ItemType, int]] = [] - collations_added = [] - # print(f"Searching Library with query: {query} search_mode: {search_mode}") + results: list[tuple[ItemType, int]] | None = [] + collations_added: list = [] if query: - # start_time = time.time() - query = query.strip().lower() - query_words: list[str] = query.split(" ") - all_tag_terms: list[str] = [] - only_untagged: bool = "untagged" in query or "no tags" in query - only_empty: bool = "empty" in query or "no fields" in query - only_missing: bool = "missing" in query or "no file" in query - allow_adv: bool = "filename:" in query_words - tag_only: bool = "tag_id:" in query_words - if allow_adv: - query_words.remove("filename:") - if tag_only: - query_words.remove("tag_id:") - # TODO: Expand this to allow for dynamic fields to work. - only_no_author: bool = "no author" in query or "no artist" in query - - # Preprocess the Tag terms. - if query_words: - # print(query_words, self._tag_strings_to_id_map) - for i, term in enumerate(query_words): - for j, term in enumerate(query_words): - if ( - query_words[i : j + 1] - and " ".join(query_words[i : j + 1]) - in self._tag_strings_to_id_map - ): - all_tag_terms.append(" ".join(query_words[i : j + 1])) - # print(all_tag_terms) - - # This gets rid of any accidental term inclusions because they were words - # in another term. Ex. "3d" getting added in "3d art" - for i, term in enumerate(all_tag_terms): - for j, term2 in enumerate(all_tag_terms): - if i != j and all_tag_terms[i] in all_tag_terms[j]: - # print( - # f'removing {all_tag_terms[i]} because {all_tag_terms[i]} was in {all_tag_terms[j]}') - all_tag_terms.remove(all_tag_terms[i]) - break - - # print(all_tag_terms) - - # non_entry_count = 0 - # Iterate over all Entries ============================================================= - for entry in self.entries: - allowed_ext: bool = entry.filename.suffix.lower() not in self.ext_list - # try: - # entry: Entry = self.entries[self.file_to_library_index_map[self._source_filenames[i]]] - # print(f'{entry}') - - if allowed_ext == self.is_exclude_list: - # If the entry has tags of any kind, append them to this main tag list. - entry_tags: list[int] = [] - entry_authors: list[str] = [] - if entry.fields: - for field in entry.fields: - field_id = list(field.keys())[0] - if self.get_field_obj(field_id)["type"] == "tag_box": - entry_tags.extend(field[field_id]) - if self.get_field_obj(field_id)["name"] == "Author": - entry_authors.extend(field[field_id]) - if self.get_field_obj(field_id)["name"] == "Artist": - entry_authors.extend(field[field_id]) - - # print(f'Entry Tags: {entry_tags}') - - # Add Entries from special flags ------------------------------- - # TODO: Come up with a more user-resistent way to 'archived' and 'favorite' tags. - if only_untagged: - if not entry_tags: - results.append((ItemType.ENTRY, entry.id)) - elif only_no_author: - if not entry_authors: - results.append((ItemType.ENTRY, entry.id)) - elif only_empty: - if not entry.fields: - results.append((ItemType.ENTRY, entry.id)) - elif only_missing: - if ( - self.library_dir / entry.path / entry.filename - ).resolve() in self.missing_files: - results.append((ItemType.ENTRY, entry.id)) - - # elif query == "archived": - # if entry.tags and self._tag_names_to_tag_id_map[self.archived_word.lower()][0] in entry.tags: - # self.filtered_file_list.append(file) - # pb.value = len(self.filtered_file_list) - # elif query in entry.path.lower(): - - # NOTE: This searches path and filenames. - - if allow_adv: - if [q for q in query_words if (q in str(entry.path).lower())]: - results.append((ItemType.ENTRY, entry.id)) - elif [ - q for q in query_words if (q in str(entry.filename).lower()) - ]: - results.append((ItemType.ENTRY, entry.id)) - elif tag_only: - if entry.has_tag(self, int(query_words[0])): - results.append((ItemType.ENTRY, entry.id)) - - # elif query in entry.filename.lower(): - # self.filtered_entries.append(index) - elif entry_tags: - # function to add entry to results - def add_entry(entry: Entry): - # self.filter_entries.append() - # self.filtered_file_list.append(file) - # results.append((SearchItemType.ENTRY, entry.id)) - added = False - for f in entry.fields: - if self.get_field_attr(f, "type") == "collation": - if ( - self.get_field_attr(f, "content") - not in collations_added - ): - results.append( - ( - ItemType.COLLATION, - self.get_field_attr(f, "content"), - ) - ) - collations_added.append( - self.get_field_attr(f, "content") - ) - added = True - - if not added: - results.append((ItemType.ENTRY, entry.id)) - - if search_mode == SearchMode.AND: # Include all terms - # For each verified, extracted Tag term. - failure_to_union_terms = False - for term in all_tag_terms: - # If the term from the previous loop was already verified: - if not failure_to_union_terms: - cluster: set = set() - # Add the immediate associated Tags to the set (ex. Name, Alias hits) - # Since this term could technically map to multiple IDs, iterate over it - # (You're 99.9999999% likely to just get 1 item) - for id in self._tag_strings_to_id_map[term]: - cluster.add(id) - cluster = cluster.union( - set(self.get_tag_cluster(id)) - ) - # print(f'Full Cluster: {cluster}') - # For each of the Tag IDs in the term's ID cluster: - for t in cluster: - # Assume that this ID from the cluster is not in the Entry. - # Wait to see if proven wrong. - failure_to_union_terms = True - # If the ID actually is in the Entry, - if t in entry_tags: - # There wasn't a failure to find one of the term's cluster IDs in the Entry. - # There is also no more need to keep checking the rest of the terms in the cluster. - failure_to_union_terms = False - # print(f"FOUND MATCH: {t}") - break - # print(f'\tFailure to Match: {t}') - # # failure_to_union_terms is used to determine if all terms in the query were found in the entry. - # # If there even were tag terms to search through AND they all match an entry - if all_tag_terms and not failure_to_union_terms: - add_entry(entry) - - if search_mode == SearchMode.OR: # Include any terms - # For each verified, extracted Tag term. - for term in all_tag_terms: - # Add the immediate associated Tags to the set (ex. Name, Alias hits) - # Since this term could technically map to multiple IDs, iterate over it - # (You're 99.9999999% likely to just get 1 item) - for id in self._tag_strings_to_id_map[term]: - # If the ID actually is in the Entry, - if id in entry_tags: - # check if result already contains the entry - if (ItemType.ENTRY, entry.id) not in results: - add_entry(entry) - break - - # sys.stdout.write( - # f'\r[INFO][FILTER]: {len(self.filtered_file_list)} matches found') - # sys.stdout.flush() - - # except: - # # # Put this here to have new non-registered images show up - # # if query == "untagged" or query == "no author" or query == "no artist": - # # self.filtered_file_list.append(file) - # # non_entry_count = non_entry_count + 1 - # pass - - # end_time = time.time() - # print( - # f'[INFO][FILTER]: {len(self.filtered_entries)} matches found ({(end_time - start_time):.3f} seconds)') + filter = Filter(self) + processed_query = self.parse_metadata(query) + results = filter.filter_results(query, processed_query, search_mode) + if results is None: + return [] + else: + return results - # if non_entry_count: - # print( - # f'[INFO][FILTER]: There are {non_entry_count} new files in {self.source_dir} that do not have entries. These will not appear in most filtered results.') - # if not self.filtered_entries: - # print("[INFO][FILTER]: Filter returned no results.") else: for entry in self.entries: added = False allowed_ext = entry.filename.suffix.lower() not in self.ext_list if allowed_ext == self.is_exclude_list: - for f in entry.fields: - if self.get_field_attr(f, "type") == "collation": + for field in entry.fields: + if self.get_field_attr(field, "type") == "collation": if ( - self.get_field_attr(f, "content") + self.get_field_attr(field, "content") not in collations_added ): results.append( ( ItemType.COLLATION, - self.get_field_attr(f, "content"), + self.get_field_attr(field, "content"), ) ) collations_added.append( - self.get_field_attr(f, "content") + self.get_field_attr(field, "content") ) added = True @@ -2315,3 +2190,352 @@ def sort_fields(self, entry_id: int, order: list[int]) -> None: entry.fields = sorted( entry.fields, key=lambda x: order.index(self.get_field_attr(x, "id")) ) + + +class SpecialFlag: + only_untagged: bool + only_no_author: bool + only_empty: bool + only_missing: bool + + def __init__(self, query: str): + self.only_no_author: bool = "noauthor" in query or "noartist" in query + self.only_untagged: bool = "untagged" in query or "notags" in query + self.only_empty: bool = "empty" in query or "nofields" in query + self.only_missing: bool = "missing" in query or "nofile" in query + + +class Filter: + def __init__(self, lib: Library) -> None: + # self.lib = lib + self.ext_list = lib.ext_list + self.entries = lib.entries + self.is_exclude_list = lib.is_exclude_list + self.get_field_attr = lib.get_field_attr + self._tag_strings_to_id_map = lib._tag_strings_to_id_map + self.get_tag_cluster = lib.get_tag_cluster + self.library_dir = lib.library_dir + self.get_field_obj = lib.get_field_obj + self.missing_files = lib.missing_files + self.default_fields = lib.default_fields + + def generate_field_id_to_name_map(self) -> dict[int, str]: + map: dict = {} + for field in self.default_fields: + field_id = field.get("id") + if field_id is None: + logging.error(f"Field {field} is not found in default fields!") + continue + map[field_id] = field.get("name").lower() + return map + + self._field_id_to_name_map: dict = generate_field_id_to_name_map(self) + + def filter_results( + self, query: str, split_query: list[dict], search_mode: SearchMode + ) -> list[tuple[ItemType, int]]: + # start_time = time.time() + query = query.strip().casefold() + + pre_results: list[list[tuple[ItemType, int]]] = [] + + for query_part in split_query: + all_tag_terms: list[str] = [] + query_words = query_part.get( + KeyNameConstants.UNBOUND_QUERY_ARGUMENTS_KEYNAME + ) + + if isinstance(query_words, list): + all_tag_terms.extend(self.preprocess_tag_terms(query_words)) + + filtered_entries: list = [] + + # Iterate over all Entries ============================================================= + for entry in self.entries: + entry_tuple: tuple | None = None + allowed_ext: bool = entry.filename.suffix not in self.ext_list + if allowed_ext != self.is_exclude_list: + continue + (entry_tags, entry_authors) = self.populate_tags(entry) + is_selected: bool = True + # Remap entry.fields from list of dicts, to dict for ease of work + entry_fields: dict = self.remap_fields(entry) + for key, value in query_part.items(): + if isinstance(value, str): + value = value.strip().casefold() + entry_value = entry_fields.get(key) + + # special treatment for filename + if key == "filename" and isinstance(value, str): + is_selected = is_selected and self.check_filename(entry, value) + # all usual tags handling, as if search was in AND mode + elif key == "tag_id": + if not self.handle_tag_id(value, entry_tags): + is_selected = False + break + elif key == KeyNameConstants.UNBOUND_QUERY_ARGUMENTS_KEYNAME: + entry_tuple = None + # HACK + if isinstance(query_words, str): + query_words = [query_words] + if isinstance(query_words, list): + entry_tuple = self.handle_unbound( + entry, + all_tag_terms, + entry_tags, + entry_authors, + query_words, + ) + if entry_tuple is None: + is_selected = False + break + elif key == KeyNameConstants.EMPTY_FIELD_QUERY_KEYNAME: + empty_fields = value + if not isinstance(empty_fields, list): + empty_fields = [empty_fields] + if not self.required_fields_empty(entry_fields, empty_fields): + is_selected = False + break + else: + negative: bool = False + if key[0] == "-": + entry_value = entry_fields.get(key[1:]) + negative = True + if not self.handle_common_field(entry_value, value, negative): + is_selected = False + break + + if is_selected and entry_tuple is None: + filtered_entries.append((ItemType.ENTRY, entry.id)) + elif is_selected and entry_tuple is not None: + filtered_entries.append(entry_tuple) + pre_results.append(filtered_entries) + + # Entries should match all parts separated by '|' + result_set: set = set() + if search_mode == SearchMode.AND: + if len(pre_results) == 1: + return pre_results[0] + elif len(pre_results) == 0: + return [] + result_set = set(pre_results[0]) + # entries should be found in every part of query + for and_result in pre_results[1:]: + set_copy: list = list(result_set) + for recorded_entry in set_copy: + entry_detected: bool = False + for new_entry in and_result: + if recorded_entry == new_entry: + entry_detected = True + break + if not entry_detected: + result_set.remove(recorded_entry) + return list(result_set) + # Entries should match any of parts separated by '|' + elif search_mode == SearchMode.OR: + for or_result in pre_results: + for or_entry in or_result: + result_set.add(or_entry) + return list(result_set) + + def handle_common_field( + self, entry_value: list[str] | str | None, value: str, query_is_negative: bool + ) -> bool: + if isinstance(entry_value, list): + entry_value = "".join(entry_value) + + if entry_value is None: + return query_is_negative + else: + entry_value = entry_value.casefold() + + if value in entry_value: + return not query_is_negative + + return query_is_negative + + def handle_tag_id(self, query: str, entry_tags: list[int]) -> bool: + try: + id_query = int(query) + except Exception: + return False + return id_query in entry_tags + + def handle_unbound( + self, + entry: Entry, + all_tag_terms: list[str], + entry_tags: list[int], + entry_authors: list[list[str]], + unbound: list[str], + ) -> tuple[ItemType, int] | None: + collations_added: list = [] + + special_flags = SpecialFlag("".join(unbound)) + + if self.add_entries_from_special_flags( + special_flags, entry_tags, entry_authors, entry + ): + return (ItemType.ENTRY, entry.id) + + def add_entry(entry: Entry) -> tuple[ItemType, int] | None: + # self.filter_entries.append() + # self.filtered_file_list.append(file) + # results.append((SearchItemType.ENTRY, entry.id)) + added = False + for f in entry.fields: + if self.get_field_attr(f, "type") == "collation": + if self.get_field_attr(f, "content") not in collations_added: + collations_added.append(self.get_field_attr(f, "content")) + return ( + ItemType.COLLATION, + self.get_field_attr(f, "content"), + ) + added = True + + if not added: + return (ItemType.ENTRY, entry.id) + else: + return None + + # For each verified, extracted Tag term. + failure_to_union_terms = False + for term in all_tag_terms: + # If the term from the previous loop was already verified: + if not failure_to_union_terms: + cluster: set = set() + # Add the immediate associated Tags to the set (ex. Name, Alias hits) + # Since this term could technically map to multiple IDs, iterate over it + # (You're 99.9999999% likely to just get 1 item) + for id in self._tag_strings_to_id_map[term]: + cluster.add(id) + cluster = cluster.union(set(self.get_tag_cluster(id))) + # For each of the Tag IDs in the term's ID cluster: + for t in cluster: + # Assume that this ID from the cluster is not in the Entry. + # Wait to see if proven wrong. + failure_to_union_terms = True + # If the ID actually is in the Entry, + if t in entry_tags: + # There wasn't a failure to find one of the term's cluster IDs in the Entry. + # There is also no more need to keep checking the rest of the terms in the cluster. + failure_to_union_terms = False + # print(f"FOUND MATCH: {t}") + break + # print(f'\tFailure to Match: {t}') + # # failure_to_union_terms is used to determine if all terms in the query were found in the entry. + # # If there even were tag terms to search through AND they all match an entry + if all_tag_terms and not failure_to_union_terms: + return add_entry(entry) + else: + return None + + def add_entries_from_special_flags( + self, + special_flags: SpecialFlag, + entry_tags: list[int], + entry_authors: list[list[str]], + entry: Entry, + ): + if special_flags.only_untagged and not entry_tags: + return True + elif special_flags.only_no_author and not entry_authors: + return True + elif special_flags.only_empty and (not (entry.fields and entry.fields[0])): + return True + elif ( + special_flags.only_missing + and (self.library_dir / entry.path / entry.filename).resolve() + in self.missing_files + ): + return True + return False + + def remap_fields(self, entry: Entry) -> dict[str | int, str]: + entry_fields: dict = DefaultDict(list) + for field in entry.fields: + for key, value in field.items(): + key_mapped = self._field_id_to_name_map.get(int(key)) + key_type = self.get_field_obj(key)["type"] + if isinstance(value, str): + value = value.casefold() + elif isinstance(value, list): + for i in value: + if isinstance(i, str): + i = i.casefold() + if ( + key_type != "tag_box" + and key_mapped != "author" + and key_mapped != "artist" + ): + entry_fields[key_mapped].append(value) + else: + entry_fields[key_mapped].extend(value) + return entry_fields + + def populate_tags(self, entry: Entry) -> tuple[list[int], list[list[str]]]: + """Parse tags from entry.fields\n + returns tuple (entry_tags, entry_authors)""" + entry_tags: list[int] = [] + entry_authors: list[list[str]] = [] + if entry.fields: + for field in entry.fields: + if not field: + continue + field_id = list(field.keys())[0] + if self.get_field_obj(field_id)["type"] == "tag_box": + entry_tags.extend(field[field_id]) + if self.get_field_obj(field_id)["name"] == "Author": + entry_authors.extend(field[field_id]) + if self.get_field_obj(field_id)["name"] == "Artist": + entry_authors.extend(field[field_id]) + return (entry_tags, entry_authors) + + def check_filename(self, entry: Entry, query: str | None) -> bool: + """Checks if entry's path string contains Characters from 'filename: ' query part. + Method is coded in a way to look up individual characters, not substrings.""" + + if not query: + return False + + file_path_fuzzy: list = [] + file_filename_fuzzy: list = [] + for char in query: + if char in str(entry.path).casefold(): + file_path_fuzzy.append(char) + if char in str(entry.filename).casefold(): + file_filename_fuzzy.append(char) + + if file_path_fuzzy: + return True + elif file_filename_fuzzy: + return True + return False + + def required_fields_empty( + self, entry_fields: dict, empty_fields: list[str] + ) -> bool: + for field in empty_fields: + if entry_fields.get(field) is not None: + return False + return True + + def preprocess_tag_terms(self, query_words: list[str]): + all_tag_terms: list[str] = [] + for i, term in enumerate(query_words): + for j, term in enumerate(query_words): + query_words_sublist = query_words[i : j + 1] + query_words_single_string = " ".join(query_words_sublist) + if ( + query_words_sublist + and query_words_single_string in self._tag_strings_to_id_map + ): + all_tag_terms.append(query_words_single_string) + # This gets rid of any accidental term inclusions because they were words + # in another term. Ex. "3d" getting added in "3d art" + for i, term in enumerate(all_tag_terms): + for j, term2 in enumerate(all_tag_terms): + if i != j and all_tag_terms[i] in all_tag_terms[j]: + all_tag_terms.remove(all_tag_terms[i]) + break + return all_tag_terms diff --git a/tagstudio/tests/core/test_lib.py b/tagstudio/tests/core/test_lib.py index 997598f22..596696b82 100644 --- a/tagstudio/tests/core/test_lib.py +++ b/tagstudio/tests/core/test_lib.py @@ -10,7 +10,7 @@ def test_open_library(test_library, snapshot_json): [ ("First",), ("Second",), - ("--nomatch--",), + ("/--nomatch--/",), ], ) def test_library_search(test_library, query, snapshot_json): diff --git a/tagstudio/tests/core/test_search.py b/tagstudio/tests/core/test_search.py new file mode 100644 index 000000000..591a8d34d --- /dev/null +++ b/tagstudio/tests/core/test_search.py @@ -0,0 +1,270 @@ +from src.core.library import ( + ItemType, + Library, + Filter, + Entry, + SpecialFlag, + KeyNameConstants, +) +from src.core.enums import SearchMode +import pytest + +key_unbound = KeyNameConstants.UNBOUND_QUERY_ARGUMENTS_KEYNAME +key_empty = KeyNameConstants.EMPTY_FIELD_QUERY_KEYNAME + +test_library = Library() + +test_entry_one = Entry( + id=0, + filename="test_file1.png", + path=".", + fields=[{"6": [1000, 1001]}, {"1": ["James"]}], +) +test_entry_two = Entry(id=1, filename="test_file2.png", path="test_folder", fields=[{}]) +test_entry_three = Entry( + id=2, + filename="test_file3.png", + path="test_folder", + fields=[{"6": [1001]}, {"4": "Foo description"}], +) +test_entry_four = Entry( + id=3, + filename="test_file4.png", + path="test_folder", + fields=[{"1": ["Victor"]}, {"4": "description"}], +) +test_entry_five = Entry( + id=4, + filename="test_file5.png", + path="test_folder", + fields=[{"1": ["Victor"]}, {"1": ["James"]}], +) +test_entry_six = Entry( + id=5, + filename="test_file6.png", + path=".", + fields=[{"4": "description"}, {"4": "foo"}], +) + +test_library.entries = [ + test_entry_one, + test_entry_two, + test_entry_three, + test_entry_four, + test_entry_five, + test_entry_six, +] + +filter = Filter(test_library) + +### CASES ### + + +decomposition_cases: list[tuple] = [ + ("tag1 tag2", [{key_unbound: ["tag1", "tag2"]}]), + ("tag1 | tag2", [{key_unbound: ["tag1"]}, {key_unbound: ["tag2"]}]), + ("tag1 tag2 | tag3", [{key_unbound: ["tag1", "tag2"]}, {key_unbound: ["tag3"]}]), + ("tag1; description: desc", [{key_unbound: ["tag1"], "description": "desc"}]), + ("tag1; description: desc", [{key_unbound: ["tag1"], "description": "desc"}]), + ( + "tag1 -description | description: desc", + [{key_unbound: ["tag1"], key_empty: ["description"]}, {"description": "desc"}], + ), + ("; no author", [{key_unbound: ["no", "author"]}]), + ("-author", [{key_empty: ["author"]}]), + ("description: Foo", [{"description": "foo"}]), +] + +remap_cases: list[tuple] = [ + (test_entry_one, {"tags": [1000, 1001], "author": ["James"]}), + (test_entry_two, {}), + (test_entry_three, {"tags": [1001], "description": ["foo description"]}), + (test_entry_four, {"author": ["Victor"], "description": ["description"]}), + (test_entry_five, {"author": ["Victor", "James"]}), +] + +filename_cases: list[tuple] = [ + (test_entry_two, "2", True), + (test_entry_one, ".png", True), + (test_entry_three, "test_folder", True), + (test_entry_one, "file", True), + (test_entry_four, None, False), +] + +populate_tags_cases: list[tuple] = [ + (test_entry_one, ([1000, 1001], ["James"])), + (test_entry_two, ([], [])), + (test_entry_three, ([1001], [])), + (test_entry_four, ([], ["Victor"])), +] + +# no_author, untagged, empty, missing +special_flag_cases: list[tuple] = [ + ("noauthoruntagged", (True, True, False, False)), + ("emptynofile", (False, False, True, True)), + ("missinguntaggednoartist", (True, True, False, True)), + ("noartist", (True, False, False, False)), +] + +add_entries_from_special_cases: list[tuple] = [ + (test_entry_one, "noauthor", False), + (test_entry_two, "empty", True), + (test_entry_three, "noauthor", True), + (test_entry_four, "untagged", True), +] + +required_fields_empty_cases: list[tuple] = [ + (test_entry_one, ["author"], False), + (test_entry_one, ["description"], True), + (test_entry_two, ["description", "author"], True), + (test_entry_three, ["author"], True), +] + +filter_case_one: tuple = ( + [{key_unbound: ["no", "author"], "description": "des"}], + SearchMode.OR, + [(ItemType.ENTRY, 2), (ItemType.ENTRY, 5)], +) +filter_case_two: tuple = ( + [{key_unbound: ["no", "tags"]}, {"description": "des"}], + SearchMode.OR, + [ + (ItemType.ENTRY, 1), + (ItemType.ENTRY, 2), + (ItemType.ENTRY, 3), + (ItemType.ENTRY, 4), + (ItemType.ENTRY, 5), + ], +) +filter_case_three: tuple = ( + [{"tag_id": "1000"}, {key_unbound: "no author"}], + SearchMode.AND, + [], +) +filter_case_four: tuple = ( + [{"tag_id": "1001", key_unbound: ["no", "author"]}], + SearchMode.OR, + [(ItemType.ENTRY, 2)], +) + +filter_case_five: tuple = ( + [{"tag_id": "1000"}, {key_unbound: ["no", "author"]}], + SearchMode.OR, + [ + (ItemType.ENTRY, 0), + (ItemType.ENTRY, 1), + (ItemType.ENTRY, 2), + (ItemType.ENTRY, 5), + ], +) + +filter_case_six: tuple = ( + [{"description": "foo"}], + SearchMode.OR, + [(ItemType.ENTRY, 2), (ItemType.ENTRY, 5)], +) + +negative_filter_case_one: tuple = ( + [{key_empty: "description"}], + SearchMode.OR, + [(ItemType.ENTRY, 0), (ItemType.ENTRY, 1), (ItemType.ENTRY, 4)], +) + +negative_filter_case_two: tuple = ( + [{key_empty: "author"}], + SearchMode.OR, + [(ItemType.ENTRY, 1), (ItemType.ENTRY, 2), (ItemType.ENTRY, 5)], +) + +negative_filter_case_three: tuple = ( + [{"-description": "foo"}], + SearchMode.OR, + [ + (ItemType.ENTRY, 0), + (ItemType.ENTRY, 1), + (ItemType.ENTRY, 3), + (ItemType.ENTRY, 4), + ], +) + +negative_filter_case_four: tuple = ( + [{"-description": "desc", "description": "foo"}], + SearchMode.OR, + [], +) + + +filter_results_cases: list[tuple] = [ + filter_case_one, + filter_case_two, + filter_case_three, + filter_case_four, + filter_case_five, + filter_case_six, + negative_filter_case_one, + negative_filter_case_two, + negative_filter_case_three, + negative_filter_case_four, +] + +### TESTS ### + + +@pytest.mark.parametrize("input,expected", decomposition_cases) +def test_query_decomposition(input: str, expected: list[dict]): + assert test_library.parse_metadata(input) == expected + pass + + +@pytest.mark.parametrize("entry,expected", remap_cases) +def test_remap_one(entry: Entry, expected: dict): + assert filter.remap_fields(entry) == expected + + +@pytest.mark.parametrize("entry,query,result", filename_cases) +def test_filename(entry: Entry, query: str, result: bool): + assert filter.check_filename(entry, query) == result + + +@pytest.mark.parametrize("entry,expected", populate_tags_cases) +def test_populate_tags(entry: Entry, expected: tuple): + assert filter.populate_tags(entry) == expected + pass + + +@pytest.mark.parametrize("query,flags", special_flag_cases) +def test_special_flag(query: str, flags: tuple[bool, bool, bool, bool]): + special_flags = SpecialFlag(query) + result = ( + special_flags.only_no_author, + special_flags.only_untagged, + special_flags.only_empty, + special_flags.only_missing, + ) + assert result == flags + + +@pytest.mark.parametrize("entry,unbound_query,expected", add_entries_from_special_cases) +def test_add_entries_from_special(entry: Entry, unbound_query: str, expected: bool): + special_flags = SpecialFlag(unbound_query) + (entry_tags, entry_authors) = filter.populate_tags(entry) + result = filter.add_entries_from_special_flags( + special_flags, entry_tags, entry_authors, entry + ) + assert result == expected + + +@pytest.mark.parametrize("entry,empty_fields,expected", required_fields_empty_cases) +def test_required_fields_empty(entry: Entry, empty_fields: list[str], expected: bool): + entry_fields: dict = filter.remap_fields(entry) + result = filter.required_fields_empty(entry_fields, empty_fields) + assert result == expected + + +@pytest.mark.parametrize("split_query,search_mode,expected", filter_results_cases) +def test_filter_results( + split_query: list[dict], search_mode: SearchMode, expected: list[tuple | None] +): + # result_set: set = set() + # expected_set: set = set() + assert set(filter.filter_results("", split_query, search_mode)) == set(expected)