From de1f7cef0bfac4d1abb08cabbc868485c714728c Mon Sep 17 00:00:00 2001 From: sa3eed3ed Date: Mon, 30 Dec 2024 14:25:17 +0000 Subject: [PATCH 1/4] Map Extracted Files to Artifact Definitions in image_export.py --- plaso/cli/image_export_tool.py | 73 ++++-- plaso/engine/artifact_filters.py | 219 +++++++++++++--- plaso/engine/artifacts_trie.py | 225 +++++++++++++++++ plaso/engine/engine.py | 32 ++- plaso/engine/path_helper.py | 51 ++++ plaso/filters/file_entry.py | 23 ++ test_data/artifacts/artifacts_filters.yaml | 15 ++ tests/cli/image_export_tool.py | 188 ++++++++++++++ tests/engine/artifact_filters.py | 175 ++++++++++++- tests/engine/artifacts_trie.py | 275 +++++++++++++++++++++ tests/engine/engine.py | 128 +++++++++- tests/engine/path_helper.py | 113 ++++++++- tests/filters/file_entry.py | 34 +++ 13 files changed, 1465 insertions(+), 86 deletions(-) create mode 100644 plaso/engine/artifacts_trie.py create mode 100644 tests/engine/artifacts_trie.py diff --git a/plaso/cli/image_export_tool.py b/plaso/cli/image_export_tool.py index a7589d6541..d86e0023fb 100644 --- a/plaso/cli/image_export_tool.py +++ b/plaso/cli/image_export_tool.py @@ -48,14 +48,6 @@ class ImageExportTool(storage_media_tool.StorageMediaTool): _COPY_BUFFER_SIZE = 32768 - _DIRTY_CHARACTERS = frozenset([ - '\x00', '\x01', '\x02', '\x03', '\x04', '\x05', '\x06', '\x07', - '\x08', '\x09', '\x0a', '\x0b', '\x0c', '\x0d', '\x0e', '\x0f', - '\x10', '\x11', '\x12', '\x13', '\x14', '\x15', '\x16', '\x17', - '\x18', '\x19', '\x1a', '\x1b', '\x1c', '\x1d', '\x1e', '\x1f', - os.path.sep, '!', '$', '%', '&', '*', '+', ':', ';', '<', '>', - '?', '@', '|', '~', '\x7f']) - _HASHES_FILENAME = 'hashes.json' _READ_BUFFER_SIZE = 4096 @@ -99,6 +91,8 @@ def __init__(self, input_reader=None, output_writer=None): self.has_filters = False self.list_signature_identifiers = False + self._enable_artifacts_map = False + self._artifacts_paths_map = collections.defaultdict(list) def _CalculateDigestHash(self, file_entry, data_stream_name): """Calculates a SHA-256 digest of the contents of the file entry. @@ -128,8 +122,11 @@ def _CalculateDigestHash(self, file_entry, data_stream_name): return hasher_object.GetStringDigest() def _CreateSanitizedDestination( - self, source_file_entry, file_system_path_spec, source_data_stream_name, - destination_path): + self, + source_file_entry, + file_system_path_spec, + source_data_stream_name, + destination_path): """Creates a sanitized path of both destination directory and filename. This function replaces non-printable and other characters defined in @@ -151,11 +148,7 @@ def _CreateSanitizedDestination( path = getattr(file_system_path_spec, 'location', None) path_segments = file_system.SplitPath(path) - # Sanitize each path segment. - for index, path_segment in enumerate(path_segments): - path_segments[index] = ''.join([ - character if character not in self._DIRTY_CHARACTERS else '_' - for character in path_segment]) + path_segments = path_helper.PathHelper.SanitizePathSegments(path_segments) target_filename = path_segments.pop() @@ -213,17 +206,9 @@ def _ExtractDataStream( target_directory, target_filename = self._CreateSanitizedDestination( file_entry, file_entry.path_spec, data_stream_name, destination_path) - - # If does not exist, append path separator to have consistent behaviour. - if not destination_path.endswith(os.path.sep): - destination_path = destination_path + os.path.sep - - # TODO: refactor - path = None - + path = path_helper.PathHelper.GetRelativePath( + target_directory, target_filename, destination_path) target_path = os.path.join(target_directory, target_filename) - if target_path.startswith(destination_path): - path = target_path[len(destination_path):] self._paths_by_hash[digest].append(path) @@ -247,6 +232,13 @@ def _ExtractDataStream( f'exists.')) return + # Generate a map between artifacts and extracted paths + if self._enable_artifacts_map: + for artifact_name in self._filter_collection.GetMatchingArtifacts( + path, os.sep): + self._artifacts_paths_map.setdefault( + artifact_name, []).append(path) + try: self._WriteFileEntry(file_entry, data_stream_name, target_path) except (IOError, dfvfs_errors.BackEndError) as exception: @@ -348,13 +340,19 @@ def _Extract( try: extraction_engine.BuildCollectionFilters( - environment_variables, user_accounts, + environment_variables, + user_accounts, artifact_filter_names=artifact_filters, - filter_file_path=filter_file) + filter_file_path=filter_file, + enable_artifacts_map=self._enable_artifacts_map) except errors.InvalidFilter as exception: raise errors.BadConfigOption( f'Unable to build collection filters with error: {exception!s}') + if self._enable_artifacts_map: + self._filter_collection.SetArtifactsTrie( + extraction_engine.GetArtifactsTrie()) + excluded_find_specs = extraction_engine.GetCollectionExcludedFindSpecs() included_find_specs = extraction_engine.GetCollectionIncludedFindSpecs() @@ -654,6 +652,12 @@ def ParseArguments(self, arguments): self.AddFilterOptions(argument_parser) + argument_parser.add_argument( + '--enable_artifacts_map', dest='enable_artifacts_map', + action='store_true', default=False, help=( + 'Output a JSON file mapping extracted files/directories to ' + 'artifact definitions.')) + argument_parser.add_argument( '-w', '--write', action='store', dest='path', type=str, metavar='PATH', default='export', help=( @@ -785,6 +789,9 @@ def ParseOptions(self, options): self._EnforceProcessMemoryLimit(self._process_memory_limit) + self._enable_artifacts_map = getattr( + options, 'enable_artifacts_map', False) + def PrintFilterCollection(self): """Prints the filter collection.""" self._filter_collection.Print(self._output_writer) @@ -799,6 +806,12 @@ def ProcessSource(self): """ try: self.ScanSource(self._source_path) + if self._source_type not in self._SOURCE_TYPES_TO_PREPROCESS: + self._output_writer.Write( + ( + f'Input must be in {list(self._SOURCE_TYPES_TO_PREPROCESS)} ' + f'the "{self._source_type}" type is not supported.\n')) + return except dfvfs_errors.UserAbort as exception: raise errors.UserAbort(exception) @@ -823,5 +836,11 @@ def ProcessSource(self): json_data.append({'sha256': sha256, 'paths': paths}) json.dump(json_data, file_object) + if self._enable_artifacts_map: + artifacts_map_file = os.path.join( + self._destination_path, 'artifacts_map.json') + with open(artifacts_map_file, 'w', encoding='utf-8') as file_object: + json.dump(self._artifacts_paths_map, file_object) + self._output_writer.Write('Export completed.\n') self._output_writer.Write('\n') diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py index ba0c6d3f21..7223d58289 100644 --- a/plaso/engine/artifact_filters.py +++ b/plaso/engine/artifact_filters.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- """Helper to create filters based on forensic artifact definitions.""" +import os from artifacts import definitions as artifact_types from dfvfs.helpers import file_system_searcher as dfvfs_file_system_searcher @@ -9,6 +10,7 @@ from plaso.engine import logger from plaso.engine import path_helper +from plaso.engine import artifacts_trie class ArtifactDefinitionsFiltersHelper(object): @@ -28,6 +30,10 @@ class ArtifactDefinitionsFiltersHelper(object): generated Windows Registry find specifications. registry_find_specs (list[dfwinreg.FindSpec]): Windows Registry find specifications. + registry_find_specs_artifact_names (list[]str): Windows Registry artifact + names corresponding to the find specifications. + artifacts_trie (ArtifactsTrie): Trie structure for storing artifact + definitionpaths. """ _COMPATIBLE_REGISTRY_KEY_PATH_PREFIXES = frozenset([ @@ -52,9 +58,16 @@ def __init__(self, artifacts_registry): self.file_system_find_specs = [] self.registry_artifact_names = set() self.registry_find_specs = [] + self.registry_find_specs_artifact_names = [] + self.artifacts_trie = artifacts_trie.ArtifactsTrie() def _BuildFindSpecsFromArtifact( - self, definition, environment_variables, user_accounts): + self, + definition, + environment_variables, + user_accounts, + enable_artifacts_map=False, + original_registery_artifact_filter_names=None): """Builds find specifications from an artifact definition. Args: @@ -62,6 +75,11 @@ def _BuildFindSpecsFromArtifact( environment_variables (list[EnvironmentVariableArtifact]): environment variables. user_accounts (list[UserAccountArtifact]): user accounts. + enable_artifacts_map (Optional[bool]): True if the artifacts path map + should be generated. Defaults to False. + original_registery_artifact_filter_names (Optional[set[str]]): Set of + original registery filter names, used in case registery hive files + are being requested as a result of a previous filter. Returns: list[dfvfs.FindSpec|dfwinreg.FindSpec]: dfVFS or dfWinReg find @@ -72,8 +90,14 @@ def _BuildFindSpecsFromArtifact( if source.type_indicator == artifact_types.TYPE_INDICATOR_FILE: for path_entry in set(source.paths): specifications = self._BuildFindSpecsFromFileSourcePath( - path_entry, source.separator, environment_variables, - user_accounts) + definition.name, + path_entry, + source.separator, + environment_variables, + user_accounts, + enable_artifacts_map=enable_artifacts_map, + original_registery_artifact_filter_names=( + original_registery_artifact_filter_names)) find_specs.extend(specifications) self.file_system_artifact_names.add(definition.name) @@ -108,7 +132,12 @@ def _BuildFindSpecsFromArtifact( artifact_types.TYPE_INDICATOR_ARTIFACT_GROUP): for name in source.names: specifications = self._BuildFindSpecsFromGroupName( - name, environment_variables, user_accounts) + name, + environment_variables, + user_accounts, + enable_artifacts_map=enable_artifacts_map, + original_registery_artifact_filter_names=( + original_registery_artifact_filter_names)) find_specs.extend(specifications) else: @@ -119,7 +148,12 @@ def _BuildFindSpecsFromArtifact( return find_specs def _BuildFindSpecsFromGroupName( - self, group_name, environment_variables, user_accounts): + self, + group_name, + environment_variables, + user_accounts, + enable_artifacts_map=False, + original_registery_artifact_filter_names=None): """Builds find specifications from a artifact group name. Args: @@ -127,6 +161,11 @@ def _BuildFindSpecsFromGroupName( environment_variables (list[EnvironmentVariableArtifact]): environment variables. user_accounts (list[UserAccountArtifact]): user accounts. + enable_artifacts_map (Optional[bool]): True if the artifacts path map + should be generated. Defaults to False. + original_registery_artifact_filter_names (Optional[set[str]]): Set of + original registery filter names, used in case registery hive files + are being requested as a result of a previous filter. Returns: list[dfwinreg.FindSpec|dfvfs.FindSpec]: find specifications or None if no @@ -139,7 +178,12 @@ def _BuildFindSpecsFromGroupName( return None return self._BuildFindSpecsFromArtifact( - definition, environment_variables, user_accounts) + definition, + environment_variables, + user_accounts, + enable_artifacts_map=enable_artifacts_map, + original_registery_artifact_filter_names=( + original_registery_artifact_filter_names)) def _BuildFindSpecsFromRegistrySourceKey(self, key_path): """Build find specifications from a Windows Registry source type. @@ -163,7 +207,8 @@ def _BuildFindSpecsFromRegistrySourceKey(self, key_path): 'HKEY_LOCAL_MACHINE\\System\\ControlSet*', key_path_glob[43:]]) elif key_path_glob_upper.startswith('HKEY_USERS\\%%USERS.SID%%'): - key_path_glob = ''.join(['HKEY_CURRENT_USER', key_path_glob[26:]]) + # Escaping charachter excluded from string index. + key_path_glob = ''.join(['HKEY_CURRENT_USER', key_path_glob[24:]]) find_spec = dfwinreg_registry_searcher.FindSpec( key_path_glob=key_path_glob) @@ -172,15 +217,28 @@ def _BuildFindSpecsFromRegistrySourceKey(self, key_path): return find_specs def _BuildFindSpecsFromFileSourcePath( - self, source_path, path_separator, environment_variables, user_accounts): + self, + artifact_name, + source_path, + path_separator, + environment_variables, + user_accounts, + enable_artifacts_map=False, + original_registery_artifact_filter_names=None): """Builds find specifications from a file source type. Args: + artifact_name (str): artifact name. source_path (str): file system path defined by the source. path_separator (str): file system path segment separator. environment_variables (list[EnvironmentVariableArtifact]): environment variables. user_accounts (list[UserAccountArtifact]): user accounts. + enable_artifacts_map (Optional[bool]): True if the artifacts path map + should be generated. Defaults to False. + original_registery_artifact_filter_names (Optional[set[str]]): Set of + original registery filter names, used in case registery hive files + are being requested as a result of a previous filter. Returns: list[dfvfs.FindSpec]: find specifications for the file source type. @@ -194,34 +252,100 @@ def _BuildFindSpecsFromFileSourcePath( path_glob, path_separator, user_accounts): logger.debug(f'building find spec from path: {path:s}') - if '%' in path: - path = path_helper.PathHelper.ExpandWindowsPath( - path, environment_variables) - logger.debug(f'building find spec from expanded path: {path:s}') - - if not path.startswith(path_separator): - logger.warning(( - f'The path filter must be defined as an absolute path: ' - f'"{path:s}"')) + expanded_path = self._ExpandPathVariables( + path, environment_variables, path_separator) + if expanded_path is None: continue - try: - find_spec = dfvfs_file_system_searcher.FindSpec( - case_sensitive=False, location_glob=path, - location_separator=path_separator) - except ValueError as exception: - logger.error(( - f'Unable to build find specification for path: "{path:s}" with ' - f'error: {exception!s}')) + find_spec = self._CreateFindSpec(expanded_path, path_separator) + if find_spec is None: continue find_specs.append(find_spec) + if enable_artifacts_map: + self._AddToArtifactsTrie(artifact_name, + expanded_path, + original_registery_artifact_filter_names, + path_separator) + return find_specs + def _AddToArtifactsTrie( + self, + artifact_name, + path, + original_registery_artifact_filter_names, + path_separator): + """Adds a path to the artifacts trie. + + Args: + artifact_name (str): artifact name. + path (str): file system path. + original_registery_artifact_filter_names (Optional[set[str]]): Set of + original registery filter names. + path_separator (str): path separator. + """ + normalized_path = path.replace(path_separator, os.sep) + self.artifacts_trie.AddPath(artifact_name, normalized_path, os.sep) + if original_registery_artifact_filter_names: + for name in original_registery_artifact_filter_names: + self.artifacts_trie.AddPath(name, normalized_path, os.sep) + + def _ExpandPathVariables(self, path, environment_variables, path_separator): + """Expands Windows paths and validates the result. + + Args: + path (str): file system path with environment variables + environment_variables (list[EnvironmentVariableArtifact]): + environment variables. + path_separator (str): file system path segment separator. + + Returns: + str: expanded path, or None if the path is invalid + """ + + if '%' in path: + path = path_helper.PathHelper.ExpandWindowsPath( + path, environment_variables) + logger.debug(f'building find spec from expanded path: {path:s}') + + if not path.startswith(path_separator): + logger.warning(( + f'The path filter must be defined as an absolute path: ' + f'"{path:s}"')) + return None + return path + + def _CreateFindSpec(self, path, path_separator): + """Creates a dfVFS find specification. + + Args: + path (str): Path to match. + path_separator (str): file system path segment separator. + + + Returns: + dfvfs.FindSpec: a find specification or None if one cannot be created. + """ + try: + find_spec = dfvfs_file_system_searcher.FindSpec( + case_sensitive=False, location_glob=path, + location_separator=path_separator) + return find_spec + except ValueError as exception: + logger.error(( + f'Unable to build find specification for path: "{path:s}" with ' + f'error: {exception!s}')) + return None + def BuildFindSpecs( - self, artifact_filter_names, environment_variables=None, - user_accounts=None): + self, + artifact_filter_names, + environment_variables=None, + user_accounts=None, + enable_artifacts_map=False, + original_registery_artifact_filter_names=None): """Builds find specifications from artifact definitions. Args: @@ -230,8 +354,13 @@ def BuildFindSpecs( environment_variables (list[EnvironmentVariableArtifact]): environment variables. user_accounts (Optional[list[UserAccountArtifact]]): user accounts. + enable_artifacts_map (Optional[bool]): True if the artifacts path map + should be generated. Defaults to False. + original_registery_artifact_filter_names (Optional[set[str]]): Set of + original registery filter names, used in case registery hive files + are being requested as a result of a previous filter. """ - find_specs = [] + find_specs = {} for name in artifact_filter_names: definition = self._artifacts_registry.GetDefinitionByName(name) if not definition: @@ -242,19 +371,27 @@ def BuildFindSpecs( logger.debug(f'building find spec from artifact definition: {name:s}') artifact_find_specs = self._BuildFindSpecsFromArtifact( - definition, environment_variables, user_accounts) - find_specs.extend(artifact_find_specs) - - for find_spec in find_specs: - if isinstance(find_spec, dfvfs_file_system_searcher.FindSpec): - self.file_system_find_specs.append(find_spec) - - elif isinstance(find_spec, dfwinreg_registry_searcher.FindSpec): - self.registry_find_specs.append(find_spec) - - else: - type_string = type(find_spec) - logger.warning(f'Unsupported find specification type: {type_string!s}') + definition, + environment_variables, + user_accounts, + enable_artifacts_map=enable_artifacts_map, + original_registery_artifact_filter_names=( + original_registery_artifact_filter_names)) + find_specs.setdefault(name, []).extend(artifact_find_specs) + + for name, find_spec_values in find_specs.items(): + for find_spec in find_spec_values: + if isinstance(find_spec, dfvfs_file_system_searcher.FindSpec): + self.file_system_find_specs.append(find_spec) + + elif isinstance(find_spec, dfwinreg_registry_searcher.FindSpec): + self.registry_find_specs.append(find_spec) + # Artifact names ordered similar to registery find specs + self.registry_find_specs_artifact_names.append(name) + else: + type_string = type(find_spec) + logger.warning( + f'Unsupported find specification type: {type_string!s}') @classmethod def CheckKeyCompatibility(cls, key_path): diff --git a/plaso/engine/artifacts_trie.py b/plaso/engine/artifacts_trie.py new file mode 100644 index 0000000000..e8e5b8fb91 --- /dev/null +++ b/plaso/engine/artifacts_trie.py @@ -0,0 +1,225 @@ +# -*- coding: utf-8 -*- +"""Trie structure for storing artifact paths.""" + +import fnmatch +import glob +import os + +from plaso.engine import logger + + +class TrieNode(object): + """Represents a node in the Trie. + + Attributes: + children (dict[str, TrieNode]): Child nodes, keyed by path segment. + artifacts_names (list[str]): Names of artifacts associated with this node. + path_separator (str): Path separator used in the Trie. + is_root (bool): True if this is the root node. + """ + + def __init__(self, path_separator=None, is_root=False): + """Initializes a trie node object. + + Args: + path_separator (str): the path separator used in paths stored in + the Trie, typically '/' or '\'. + is_root (bool): True if this node is the root node. + """ + super(TrieNode, self).__init__() + self.children = {} + self.artifacts_names = [] + self.path_separator = path_separator + self.is_root = is_root + + +class ArtifactsTrie(object): + """Trie structure for storing artifact paths. + + Attributes: + root (TrieNode): Root node of the Trie. + artifacts_paths (dict[str, list[str]]): Artifact paths for glob expansion, + keyed by artifact name. + """ + + def __init__(self): + """Initializes an artifact trie object.""" + super(ArtifactsTrie, self).__init__() + self.root = TrieNode(is_root=True) + self.artifacts_paths = {} # Store artifact paths for glob expansion + + def AddPath(self, artifact_name, path, path_separator): + """Adds a path from an artifact definition to the Trie. + + Args: + artifact_name (str): name of the artifact. + path (str): path from the artifact definition. + path_separator (str): path separator. + """ + logger.debug(f'Adding path: "{path:s}" to artifact: "{artifact_name:s}"') + self.artifacts_paths.setdefault(artifact_name, []).append(path) + + # Start at the root + node = self.root + # Add a path separator node if this is a new separator + if path_separator not in node.children: + node.children[path_separator] = TrieNode(path_separator=path_separator) + node = node.children[path_separator] + # Handle handle the case when the input path is equal to the path_separator + if path == path_separator: + node.artifacts_names.append(artifact_name) + return + + path_segments = path.strip(path_separator).split(path_separator) + for segment in path_segments: + # Store the path_separator for each node. + if not hasattr(node, 'path_separator'): + node.path_separator = path_separator + + if segment not in node.children: + node.children[segment] = TrieNode(path_separator) + node = node.children[segment] + node.artifacts_names.append(artifact_name) + + def GetMatchingArtifacts(self, path, path_separator): + """Retrieves the artifact names that match the given path. + + Args: + path (str): path to match against the Trie. + path_separator (str): path separator. + + Returns: + list[str]: artifact names that match the path. + """ + # Start at the root's child that matches the path_separator + if path_separator not in self.root.children: + return [] + + sub_root_node = self.root.children[path_separator] + # Handle handle the case when the input path is equal to the path_separator + if path == path_separator: + matching_artifacts = set() + if sub_root_node.artifacts_names: + matching_artifacts.update(sub_root_node.artifacts_names) + return list(matching_artifacts) + path_segments = path.strip(path_separator).split(path_separator) + matching_artifacts = set() + # Update self.artifacts_paths before starting the search. + self.artifacts_paths = self._GetArtifactsPaths(sub_root_node) + + def _search_trie(node, current_path, segments): + """Searches the trie for paths matching the given path segments. + + Args: + node (TrieNode): current trie node being traversed. + current_path (str): path represented by the current node. + segments (list[str]): remaining path segments to match. + """ + if node.artifacts_names: + for artifact_name in node.artifacts_names: + for artifact_path in self.artifacts_paths.get(artifact_name, []): + if glob.has_magic(artifact_path): + if self._MatchesGlobPattern( + artifact_path, current_path, node.path_separator): + matching_artifacts.add(artifact_name) + elif os.path.normpath(current_path).strip(os.sep).split( + os.sep) == os.path.normpath(artifact_path).strip( + node.path_separator).split(node.path_separator): + matching_artifacts.add(artifact_name) + + if not segments: + return + + segment = segments[0] + remaining_segments = segments[1:] + + # Handle glob characters in the current segment. + for child_segment, child_node in node.children.items(): + # If the child is a glob, see if it matches. + if glob.has_magic(child_segment): + if self._MatchesGlobPattern( + child_segment, segment, child_node.path_separator): + _search_trie(child_node, os.path.join( + current_path, segment), remaining_segments) + _search_trie(node, os.path.join( + current_path, segment), remaining_segments) + elif child_segment == segment: + # If the child is an exact match, continue traversal. + _search_trie(child_node, os.path.join( + current_path, segment), remaining_segments) + + _search_trie(sub_root_node, '', path_segments) + return list(matching_artifacts) + + def _GetArtifactsPaths(self, node): + """Retrieves a mapping of artifact names to their paths. + + Args: + node (TrieNode): current trie node being traversed. + + Returns: + dict: dictionary mapping artifact names to their paths. + """ + artifacts_paths = {} + + def _collect_paths(node, current_path, artifacts): + """Collects paths from the trie. + + Args: + node (TrieNode): current node. + current_path (str): path leading to this node. + artifacts (dict): dictionary to store artifact paths. + """ + if node.artifacts_names: + for artifact_name in node.artifacts_names: + artifacts.setdefault(artifact_name, []).append(current_path) + + for segment, child_node in node.children.items(): + # Ensure the path_separator attribute exists. + if not hasattr(child_node, 'path_separator'): + child_node.path_separator = node.path_separator + + # Construct the next path segment. + if current_path == child_node.path_separator: + # Means it is the root folder, i.e. `/` + next_path = current_path + segment + else: + next_path = current_path + child_node.path_separator + segment + + _collect_paths(child_node, next_path, artifacts) + + _collect_paths(node, '', artifacts_paths) + return artifacts_paths + + def _MatchesGlobPattern(self, glob_pattern, path, path_separator): + """Checks if a path matches a given glob pattern. + + Args: + glob_pattern: The glob pattern to match against. + path: The path to check. + path_separator: The path separator used in the glob pattern. + + Returns: + True if the path matches the glob pattern, False otherwise. + """ + # Normalize paths using the appropriate separators + glob_pattern = glob_pattern.strip(path_separator).split(path_separator) + path = path.strip(os.sep).split(os.sep) + + i = 0 + j = 0 + while i < len(glob_pattern) and j < len(path): + if glob_pattern[i] == '**': + # If ** is the last part, it matches everything remaining + if i == len(glob_pattern) - 1: + return True + i += 1 # Move to the next part after ** + while j < len(path) and not fnmatch.fnmatch(path[j], glob_pattern[i]): + j += 1 # Keep advancing in the path until the next part matches + elif not fnmatch.fnmatch(path[j], glob_pattern[i]): + return False # Mismatch + else: + i += 1 + j += 1 + + return i == len(glob_pattern) and j == len(path) diff --git a/plaso/engine/engine.py b/plaso/engine/engine.py index 05b0317506..d917cd88a1 100644 --- a/plaso/engine/engine.py +++ b/plaso/engine/engine.py @@ -53,6 +53,7 @@ def __init__(self): self._status_update_interval = 0.5 self._storage_profiler = None self._task_queue_profiler = None + self._artifacts_trie = None self.knowledge_base = knowledge_base.KnowledgeBase() @@ -166,8 +167,12 @@ def BuildArtifactsRegistry( self._artifacts_registry = registry def BuildCollectionFilters( - self, environment_variables, user_accounts, artifact_filter_names=None, - filter_file_path=None): + self, + environment_variables, + user_accounts, + artifact_filter_names=None, + filter_file_path=None, + enable_artifacts_map=False): """Builds collection filters from artifacts or filter file if available. Args: @@ -178,6 +183,8 @@ def BuildCollectionFilters( definitions that are used for filtering file system and Windows Registry key paths. filter_file_path (Optional[str]): path of filter file. + enable_artifacts_map (Optional[bool]): True if the artifacts path map + should be generated. Defaults to False. Raises: InvalidFilter: if no valid file system find specifications are built. @@ -191,8 +198,10 @@ def BuildCollectionFilters( filters_helper = artifact_filters.ArtifactDefinitionsFiltersHelper( self._artifacts_registry) filters_helper.BuildFindSpecs( - artifact_filter_names, environment_variables=environment_variables, - user_accounts=user_accounts) + artifact_filter_names, + environment_variables=environment_variables, + user_accounts=user_accounts, + enable_artifacts_map=enable_artifacts_map) # If the user selected Windows Registry artifacts we have to ensure # the Windows Registry files are parsed. @@ -200,7 +209,10 @@ def BuildCollectionFilters( filters_helper.BuildFindSpecs( self._WINDOWS_REGISTRY_FILES_ARTIFACT_NAMES, environment_variables=environment_variables, - user_accounts=user_accounts) + user_accounts=user_accounts, + enable_artifacts_map=enable_artifacts_map, + original_registery_artifact_filter_names=( + filters_helper.registry_find_specs_artifact_names)) if not filters_helper.file_system_find_specs: raise errors.InvalidFilter( @@ -211,6 +223,8 @@ def BuildCollectionFilters( filters_helper.file_system_find_specs) self._registry_find_specs = filters_helper.registry_find_specs + self._artifacts_trie = filters_helper.artifacts_trie + elif filter_file_path: logger.debug(( f'building find specification based on filter file: ' @@ -388,3 +402,11 @@ def SetStatusUpdateInterval(self, status_update_interval): status_update_interval (float): status update interval. """ self._status_update_interval = status_update_interval + + def GetArtifactsTrie(self): + """Retrieves the artifacts trie. + + Returns: + ArtifactsTrie: artifacts trie. + """ + return self._artifacts_trie diff --git a/plaso/engine/path_helper.py b/plaso/engine/path_helper.py index 34e5a4d4e7..a9f59612a1 100644 --- a/plaso/engine/path_helper.py +++ b/plaso/engine/path_helper.py @@ -25,6 +25,13 @@ class PathHelper(object): ['%%users.userprofile%%', 'AppData', 'LocalLow']], '%%users.temp%%': [ ['%%users.localappdata%%', 'Temp']]} + _DIRTY_CHARACTERS = frozenset([ + '\x00', '\x01', '\x02', '\x03', '\x04', '\x05', '\x06', '\x07', + '\x08', '\x09', '\x0a', '\x0b', '\x0c', '\x0d', '\x0e', '\x0f', + '\x10', '\x11', '\x12', '\x13', '\x14', '\x15', '\x16', '\x17', + '\x18', '\x19', '\x1a', '\x1b', '\x1c', '\x1d', '\x1e', '\x1f', + os.path.sep, '!', '$', '%', '&', '*', '+', ':', ';', '<', '>', + '?', '@', '|', '~', '\x7f']) @classmethod def _ExpandUsersHomeDirectoryPathSegments( @@ -408,3 +415,47 @@ def GetWindowsSystemPath(cls, path, environment_variables): path = cls.ExpandWindowsPath(path, environment_variables) return path, filename + + @classmethod + def SanitizePathSegments(cls, path_segments): + """Sanitizes path segments. + + Replaces non-printable and other characters defined in _DIRTY_CHARACTERS + with an underscore "_". + + Args: + path_segments (list[str]): path segments. + + Returns: + list[str]: sanitized path segments. + """ + sanitized_path_segments = [] + for path_segment in path_segments: + sanitized_path_segment = ''.join([ + character if character not in cls._DIRTY_CHARACTERS else '_' + for character in path_segment]) + sanitized_path_segments.append(sanitized_path_segment) + return sanitized_path_segments + + @classmethod + def GetRelativePath( + cls, + target_directory, + target_filename, + destination_path): + """Retrieves the relative path from the destination path. + + Args: + target_directory (str): path of the target directory. + target_filename (str): name of the target file. + destination_path (str): destination path for the collected files. + Returns: + str: normalized path or None. + """ + + if not destination_path.endswith(os.path.sep): + destination_path = destination_path + os.path.sep + target_path = os.path.join(target_directory, target_filename) + if target_path.startswith(destination_path): + return target_path[len(destination_path):] + return None diff --git a/plaso/filters/file_entry.py b/plaso/filters/file_entry.py index 5b673f5f4d..14b233c88a 100644 --- a/plaso/filters/file_entry.py +++ b/plaso/filters/file_entry.py @@ -354,6 +354,7 @@ def __init__(self): """Initializes a file entry filter collection.""" super(FileEntryFilterCollection, self).__init__() self._filters = [] + self._artifacts_trie = None def AddFilter(self, file_entry_filter): """Adds a file entry filter to the collection. @@ -401,3 +402,25 @@ def Print(self, output_writer): output_writer.Write('Filters:\n') for file_entry_filter in self._filters: file_entry_filter.Print(output_writer) + + def SetArtifactsTrie(self, artifacts_trie): + """Sets the artifacts trie. + + Args: + artifacts_trie (ArtifactsTrie): artifacts trie. + """ + self._artifacts_trie = artifacts_trie + + def GetMatchingArtifacts(self, path, path_separator): + """Retrieves the artifacts that match the given path. + + Args: + path (str): The path of the extracted file. + path_separator (str): The path separator. + + Returns: + list[str]: A list of artifact names that match the path. + """ + if self._artifacts_trie: + return self._artifacts_trie.GetMatchingArtifacts(path, path_separator) + return [] diff --git a/test_data/artifacts/artifacts_filters.yaml b/test_data/artifacts/artifacts_filters.yaml index f212c6d8c4..76393886e4 100644 --- a/test_data/artifacts/artifacts_filters.yaml +++ b/test_data/artifacts/artifacts_filters.yaml @@ -110,3 +110,18 @@ sources: separator: '\' labels: [System] supported_os: [Windows] +--- +name: WindowsSystemRegistryFiles +doc: Windows system Registry files. +sources: +- type: FILE + attributes: + paths: + - '%%environ_systemdrive%%\System Volume Information\Syscache.hve' + - '%%environ_systemroot%%\System32\config\SAM' + - '%%environ_systemroot%%\System32\config\SECURITY' + - '%%environ_systemroot%%\System32\config\SOFTWARE' + - '%%environ_systemroot%%\System32\config\SYSTEM' + separator: '\' +supported_os: [Windows] +urls: ['https://artifacts-kb.readthedocs.io/en/latest/sources/windows/RegistryFiles.html'] diff --git a/tests/cli/image_export_tool.py b/tests/cli/image_export_tool.py index 76be9acb5e..87e14f0bb2 100644 --- a/tests/cli/image_export_tool.py +++ b/tests/cli/image_export_tool.py @@ -595,6 +595,194 @@ def testOutputJsonFile(self): expected_json_data.sort(key=lambda digest: digest['sha256']) self.assertEqual(json_data, expected_json_data) + def testProcessSourceEnableArtifactsMap(self): + """Tests the ProcessSource function with a artifacts filter file and + enable_artifacts_map flag. + + This test uses plaso/test_data/image.qcow2 which has directories matching + artifact filters: [TestGroupExport, TestFiles3, TestFiles4, + TestFilesImageExport] in plaso/test_data/artifacts/artifacts_filters.yaml + + plaso/test_data/image.qcow2 files and directories: + ├── a_directory/ + │ ├── a_file + │ └── another_file + ├── lost+found/ + └── passwords.txt + """ + test_artifacts_path = self._GetTestFilePath(['artifacts']) + self._SkipIfPathNotExists(test_artifacts_path) + + test_file_path = self._GetTestFilePath(['image.qcow2']) + self._SkipIfPathNotExists(test_file_path) + + output_writer = test_lib.TestOutputWriter(encoding='utf-8') + test_tool = image_export_tool.ImageExportTool(output_writer=output_writer) + + options = test_lib.TestOptions() + options.artifact_definitions_path = test_artifacts_path + options.image = test_file_path + options.quiet = True + options.artifact_filter_string = 'TestGroupExport' + options.enable_artifacts_map = True + + with shared_test_lib.TempDirectory() as temp_directory: + options.path = temp_directory + + test_tool.ParseOptions(options) + + test_tool.ProcessSource() + + expected_extracted_files = sorted([ + os.path.join(temp_directory, 'a_directory'), + os.path.join(temp_directory, 'a_directory', 'another_file'), + os.path.join(temp_directory, 'a_directory', 'a_file'), + os.path.join(temp_directory, 'passwords.txt'), + os.path.join(temp_directory, 'hashes.json'), + os.path.join(temp_directory, 'artifacts_map.json')]) + expected_json_data = { + 'TestFiles3': ['a_directory/another_file', 'a_directory/a_file'], + 'TestFiles4': ['a_directory/another_file', 'passwords.txt'] + } + + extracted_files = self._RecursiveList(temp_directory) + + self.assertEqual(sorted(extracted_files), expected_extracted_files) + + # Verify content of artifacts_map.json + artifacts_map_file_path = os.path.join( + temp_directory, 'artifacts_map.json') + with open(artifacts_map_file_path, 'r', encoding='utf-8') as file_object: + artifacts_map = json.load(file_object) + + self.assertTrue(isinstance(artifacts_map, dict)) + self.assertEqual(expected_json_data, artifacts_map) + + def testProcessSourceEnableArtifactsMap_NonMatchingPaths(self): + """Tests ProcessSource with artifacts map enabled and no matching paths. + + This test uses plaso/test_data/image.qcow2 which has directories matching + artifact filters: [TestGroupExport, TestFiles3, TestFiles4, + TestFilesImageExport] in plaso/test_data/artifacts/artifacts_filters.yaml + + plaso/test_data/image.qcow2 files and directories: + ├── a_directory/ + │ ├── a_file + │ └── another_file + ├── lost+found/ + └── passwords.txt + """ + test_artifacts_path = self._GetTestFilePath(["artifacts"]) + self._SkipIfPathNotExists(test_artifacts_path) + + test_file_path = self._GetTestFilePath(["image.qcow2"]) + self._SkipIfPathNotExists(test_file_path) + + output_writer = test_lib.TestOutputWriter(encoding="utf-8") + test_tool = image_export_tool.ImageExportTool(output_writer=output_writer) + + options = test_lib.TestOptions() + options.artifact_definitions_path = test_artifacts_path + options.image = test_file_path + options.quiet = True + # Use a valid artifact that will not match anything in the test image. + options.artifact_filter_string = "TestGroupExtract" + options.enable_artifacts_map = True + + with shared_test_lib.TempDirectory() as temp_directory: + options.path = temp_directory + + test_tool.ParseOptions(options) + + test_tool.ProcessSource() + + # Verify that no files were extracted. + # Only artifacts_map.json and hashes.json should exist. + self.assertEqual( + os.listdir(temp_directory), ['hashes.json', 'artifacts_map.json'] + ) + + # Verify that the trie has no matching paths for image.qcow2. + self.assertIsNotNone(test_tool._filter_collection._artifacts_trie) + self.assertNotIn('a_directory', + test_tool._filter_collection._artifacts_trie.root + .children['/'].children) + + # Verify that artifacts_map.json is created but empty + artifacts_map_file_path = os.path.join( + temp_directory, "artifacts_map.json" + ) + with open( + artifacts_map_file_path, "r", encoding="utf-8" + ) as file_object: + artifacts_map = json.load(file_object) + + self.assertEqual(artifacts_map, {}) + + def testProcessSourceEnableArtifactsMap_EmptyFilter(self): + """Tests ProcessSource with artifacts map enabled and an empty filter.""" + test_artifacts_path = self._GetTestFilePath(['artifacts']) + self._SkipIfPathNotExists(test_artifacts_path) + + test_file_path = self._GetTestFilePath(['image.qcow2']) + self._SkipIfPathNotExists(test_file_path) + + output_writer = test_lib.TestOutputWriter(encoding='utf-8') + test_tool = image_export_tool.ImageExportTool(output_writer=output_writer) + + options = test_lib.TestOptions() + options.artifact_definitions_path = test_artifacts_path + options.image = test_file_path + options.quiet = True + options.artifact_filter_string = '' # Empty filter + options.enable_artifacts_map = True + + with shared_test_lib.TempDirectory() as temp_directory: + options.path = temp_directory + + test_tool.ParseOptions(options) + + test_tool.ProcessSource() + + # Verify that artifacts_map.json is created but empty + artifacts_map_file_path = os.path.join( + temp_directory, 'artifacts_map.json') + with open(artifacts_map_file_path, 'r', encoding='utf-8') as file_object: + artifacts_map = json.load(file_object) + + self.assertEqual(artifacts_map, {}) + + def testProcessSourceWithFile(self): + """Tests the ProcessSource function with Registry files.""" + test_artifacts_path = self._GetTestFilePath(["artifacts"]) + self._SkipIfPathNotExists(test_artifacts_path) + + test_file_path = self._GetTestFilePath(["SYSTEM"]) + self._SkipIfPathNotExists(test_file_path) + + output_writer = test_lib.TestOutputWriter(encoding="utf-8") + test_tool = image_export_tool.ImageExportTool(output_writer=output_writer) + + options = test_lib.TestOptions() + options.artifact_definitions_path = test_artifacts_path + options.image = test_file_path + options.quiet = True + options.artifact_filter_string = "TestRegistry" + + with shared_test_lib.TempDirectory() as temp_directory: + options.path = temp_directory + + test_tool.ParseOptions(options) + test_tool.ProcessSource() + + output = output_writer.ReadOutput() + self.assertEqual( + output, + ('Input must be in ' + f'{list(test_tool._SOURCE_TYPES_TO_PREPROCESS)} ' + 'the "file" type is not supported.\n') + ) + if __name__ == '__main__': unittest.main() diff --git a/tests/engine/artifact_filters.py b/tests/engine/artifact_filters.py index 4ca36fa895..167e3709cd 100644 --- a/tests/engine/artifact_filters.py +++ b/tests/engine/artifact_filters.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- """Tests for the artifacts file filter functions.""" +import os import unittest from artifacts import reader as artifacts_reader @@ -210,11 +211,18 @@ def testBuildFindSpecsFromFileSourcePath(self): # Test expansion of environment variables. path_entry = '%%environ_systemroot%%\\test_data\\*.evtx' + artifact_name = 'Test' environment_variable = [artifacts.EnvironmentVariableArtifact( case_sensitive=False, name='SystemRoot', value='C:\\Windows')] find_specs = test_filter_file._BuildFindSpecsFromFileSourcePath( - path_entry, separator, environment_variable, test_user_accounts) + artifact_name, + path_entry, + separator, + environment_variable, + test_user_accounts, + enable_artifacts_map=True + ) # Should build 1 find_spec. self.assertEqual(len(find_specs), 1) @@ -229,8 +237,15 @@ def testBuildFindSpecsFromFileSourcePath(self): # Test expansion of globs. path_entry = '\\test_data\\**' + artifact_name = 'Test' find_specs = test_filter_file._BuildFindSpecsFromFileSourcePath( - path_entry, separator, environment_variable, test_user_accounts) + artifact_name, + path_entry, + separator, + environment_variable, + test_user_accounts, + enable_artifacts_map=True + ) # Glob expansion should by default recurse ten levels. self.assertEqual(len(find_specs), 10) @@ -255,8 +270,15 @@ def testBuildFindSpecsFromFileSourcePath(self): test_user_accounts = [test_user1, test_user2] path_entry = '%%users.homedir%%/.thumbnails/**3' + artifact_name = 'Test' find_specs = test_filter_file._BuildFindSpecsFromFileSourcePath( - path_entry, separator, environment_variable, test_user_accounts) + artifact_name, + path_entry, + separator, + environment_variable, + test_user_accounts, + enable_artifacts_map=True + ) # 6 find specs should be created for testuser1 and testuser2. self.assertEqual(len(find_specs), 6) @@ -278,8 +300,15 @@ def testBuildFindSpecsFromFileSourcePath(self): test_user_accounts = [test_user1, test_user2] path_entry = '%%users.userprofile%%\\AppData\\**4' + artifact_name = 'Test' find_specs = test_filter_file._BuildFindSpecsFromFileSourcePath( - path_entry, separator, environment_variable, test_user_accounts) + artifact_name, + path_entry, + separator, + environment_variable, + test_user_accounts, + enable_artifacts_map=True + ) # 8 find specs should be created for testuser1 and testuser2. self.assertEqual(len(find_specs), 8) @@ -291,8 +320,15 @@ def testBuildFindSpecsFromFileSourcePath(self): find_specs[7]._location_segments, expected_location_segments) path_entry = '%%users.localappdata%%\\Microsoft\\**4' + artifact_name = 'Test' find_specs = test_filter_file._BuildFindSpecsFromFileSourcePath( - path_entry, separator, environment_variable, test_user_accounts) + artifact_name, + path_entry, + separator, + environment_variable, + test_user_accounts, + enable_artifacts_map=True + ) # 16 find specs should be created for testuser1 and testuser2. self.assertEqual(len(find_specs), 16) @@ -304,7 +340,134 @@ def testBuildFindSpecsFromFileSourcePath(self): self.assertEqual( find_specs[15]._location_segments, expected_location_segments) - # TODO: add tests for _BuildFindSpecsFromRegistrySourceKey + # Test that paths are added to artifacts trie. + self.assertIn(os.sep, test_filter_file.artifacts_trie.root.children) + path_trie_node = test_filter_file.artifacts_trie.root.children[os.sep] + self.assertEqual( + path_trie_node.artifacts_names, []) + self.assertEqual(len(path_trie_node.children), 5) + self.assertIn('Windows', path_trie_node.children) + self.assertIn('test_data', path_trie_node.children) + self.assertIn('home', path_trie_node.children) + self.assertIn('Users', path_trie_node.children) + self.assertIn('homes', path_trie_node.children) + + self.assertEqual( + path_trie_node.children['Windows'].artifacts_names, []) + self.assertIn( + 'test_data', path_trie_node.children['Windows'].children) + + self.assertEqual( + path_trie_node.children['test_data'].artifacts_names, []) + self.assertEqual(len(path_trie_node.children['test_data'].children), 1) + self.assertIn('*', path_trie_node.children['test_data'].children) + + self.assertEqual( + path_trie_node.children['test_data'] + .children['*'].artifacts_names, + [artifact_name] + ) + self.assertEqual( + len(path_trie_node.children['test_data'].children['*'].children), 1) + self.assertIn( + '*', path_trie_node.children['test_data'].children['*'].children) + + self.assertEqual( + path_trie_node.children['test_data'] + .children['*'].children['*'].artifacts_names, + [artifact_name] + ) + self.assertEqual( + len( + path_trie_node.children['test_data'] + .children['*'].children['*'].children + ), + 1 + ) + self.assertIn( + '*', + path_trie_node.children['test_data'].children['*'] + .children['*'].children + ) + + self.assertEqual( + path_trie_node.children['home'].artifacts_names, []) + self.assertIn( + 'testuser2', path_trie_node.children['home'].children) + + self.assertEqual( + path_trie_node.children['Users'].artifacts_names, []) + self.assertIn( + 'testuser2', path_trie_node.children['Users'].children) + + self.assertEqual( + path_trie_node.children['homes'].artifacts_names, []) + self.assertIn( + 'testuser1', path_trie_node.children['homes'].children) + + def testBuildFindSpecsFromRegistrySourceKey(self): + """Tests the _BuildFindSpecsFromRegistrySourceKey function on Windows + Registry sources.""" + test_filter_file = self._CreateTestArtifactDefinitionsFiltersHelper() + + # Test expansion of multiple repeated stars. + key_path = 'HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Control\\**' + find_specs = test_filter_file._BuildFindSpecsFromRegistrySourceKey( + key_path) + + # Glob expansion should by default recurse ten levels. + self.assertEqual(len(find_specs), 10) + + # The dfwinreg.FindSpec calls glob2regex, thus the dot in ControlSet.* + first_expected_key_path_segments = [ + 'HKEY_LOCAL_MACHINE', 'System', 'ControlSet.*', 'Control', '.*'] + last_expected_key_path_segments = [ + 'HKEY_LOCAL_MACHINE', 'System', 'ControlSet.*', 'Control', '.*', '.*', + '.*', '.*', '.*', '.*', '.*', '.*', '.*', '.*'] + + self.assertEqual( + find_specs[0]._key_path_segments, first_expected_key_path_segments) + self.assertEqual( + find_specs[-1]._key_path_segments, last_expected_key_path_segments) + + # Test CurrentControlSet + key_path = 'HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Control' + find_specs = test_filter_file._BuildFindSpecsFromRegistrySourceKey( + key_path) + + self.assertEqual(len(find_specs), 1) + + # The dfwinreg.FindSpec calls glob2regex, thus the dot in ControlSet.* + expected_key_path_segments = [ + 'HKEY_LOCAL_MACHINE', 'System', 'ControlSet.*', 'Control'] + + self.assertEqual( + find_specs[0]._key_path_segments, expected_key_path_segments) + + # Test expansion of user home directories + key_path = 'HKEY_USERS\\%%users.sid%%\\Software' + find_specs = test_filter_file._BuildFindSpecsFromRegistrySourceKey( + key_path) + + self.assertEqual(len(find_specs), 1) + + expected_key_path_segments = ['HKEY_CURRENT_USER', 'Software'] + + self.assertEqual( + find_specs[0]._key_path_segments, expected_key_path_segments) + + # Test expansion of single star + key_path = 'HKEY_LOCAL_MACHINE\\Software\\*\\Classes' + find_specs = test_filter_file._BuildFindSpecsFromRegistrySourceKey( + key_path) + + self.assertEqual(len(find_specs), 1) + + expected_key_path_segments = [ + 'HKEY_LOCAL_MACHINE', 'Software', '.*', 'Classes'] + + self.assertEqual( + find_specs[0]._key_path_segments, expected_key_path_segments) if __name__ == '__main__': diff --git a/tests/engine/artifacts_trie.py b/tests/engine/artifacts_trie.py new file mode 100644 index 0000000000..0e89e4155f --- /dev/null +++ b/tests/engine/artifacts_trie.py @@ -0,0 +1,275 @@ +# -*- coding: utf-8 -*- +"""Tests for the artifacts trie.""" + +import unittest + +from plaso.engine import artifacts_trie + + +class TrieNodeTest(unittest.TestCase): + """Tests for the TrieNode object.""" + + def test_initialization(self): + """Test that the node can be initialized.""" + node = artifacts_trie.TrieNode() + self.assertIsNotNone(node) + self.assertEqual(node.children, {}) + self.assertEqual(node.artifacts_names, []) + self.assertIsNone(node.path_separator) + + # You can add more tests for TrieNode if needed, but it's a simple class. + + +class ArtifactsTrieTest(unittest.TestCase): + """Tests for the artifactsTrie object.""" + + def setUp(self): + """Setup a trie for testing.""" + self.trie = artifacts_trie.ArtifactsTrie() + + def test_add_path(self): + """Tests the AddPath function.""" + self.trie.AddPath('artifact1', '/path/to/file.txt', '/') + self.trie.AddPath('artifact2', '/path/to/another.txt', '/') + + root = self.trie.root + self.assertIn('/', root.children) + self.assertIn('path', root.children['/'].children) + self.assertIn('to', root.children['/'].children['path'].children) + self.assertIn( + 'file.txt', root.children['/'].children['path'].children['to'].children) + self.assertIn( + 'another.txt', + root.children['/'].children['path'].children['to'].children + ) + self.assertEqual( + root.children['/'] + .children['path'] + .children['to'] + .children['file.txt'] + .artifacts_names, + ['artifact1'], + ) + self.assertEqual( + root.children['/'] + .children['path'] + .children['to'] + .children['another.txt'] + .artifacts_names, + ['artifact2'] + ) + + def test_add_path_with_glob(self): + """Tests the AddPath function with glob patterns.""" + self.trie.AddPath('artifact1', '/path/to/*.txt', '/') + self.trie.AddPath('artifact2', '/path/to/dir**', '/') + + root = self.trie.root + self.assertIn('/', root.children) + self.assertIn('path', root.children['/'].children) + self.assertIn('to', root.children['/'].children['path'].children) + self.assertIn( + '*.txt', root.children['/'].children['path'].children['to'].children) + self.assertIn( + 'dir**', root.children['/'].children['path'].children['to'].children) + self.assertEqual( + root.children['/'] + .children['path'] + .children['to'] + .children['*.txt'] + .artifacts_names, + ['artifact1'] + ) + self.assertEqual( + root.children['/'] + .children['path'] + .children['to'] + .children['dir**'] + .artifacts_names, + ['artifact2'] + ) + + def test_get_matching_artifacts_no_glob(self): + """Tests GetMatchingArtifacts without glob.""" + self.trie.AddPath('artifact1', '/path/to/file.txt', '/') + self.trie.AddPath('artifact2', '/path/to/another.txt', '/') + + matches = self.trie.GetMatchingArtifacts('/path/to/file.txt', '/') + self.assertIn('artifact1', matches) + self.assertNotIn('artifact2', matches) + + def test_get_matching_artifacts_with_glob(self): + """Tests GetMatchingArtifacts with glob patterns.""" + self.trie.AddPath('artifact1', '/path/to/*.txt', '/') + self.trie.AddPath('artifact2', '/path/**/file.txt', '/') + # The trie structure will have these paths and children + # root + # | + # / + # | + # path + # | + # to, ** + # | | + # *.txt file.txt + + matches = self.trie.GetMatchingArtifacts('/path/to/file.txt', '/') + self.assertIn('artifact1', matches) + self.assertIn('artifact2', matches) + + matches = self.trie.GetMatchingArtifacts('/path/to/dir/file.txt', '/') + self.assertIn('artifact2', matches) + self.assertNotIn('artifact1', matches) + + def test_get_matching_artifacts_with_multiple_globs(self): + """Tests GetMatchingArtifacts with multiple consecutive glob patterns.""" + self.trie.AddPath('artifact1', '/**/**/file.txt', '/') + self.trie.AddPath('artifact2', '/**/**/another.txt', '/') + + # The trie structure will have these paths and children + # root + # | + # / + # | + # ** + # | + # **, file.txt, another.txt + + matches = self.trie.GetMatchingArtifacts( + '/path/to/dir/subdir/file.txt', '/') + self.assertIn('artifact1', matches) + self.assertNotIn('artifact2', matches) + + matches = self.trie.GetMatchingArtifacts( + '/path/to/dir/subdir/another.txt', '/') + self.assertIn('artifact2', matches) + self.assertNotIn('artifact1', matches) + + def test_get_matching_artifacts_single_asterisk(self): + """Tests GetMatchingArtifacts with single asterisk glob patterns.""" + self.trie.AddPath('artifact1', '/path/to/*/file.txt', '/') + self.trie.AddPath('artifact2', '/path/*/data/*.txt', '/') + self.trie.AddPath( + 'artifact3', '/home/*/.jupyter/jupyter_notebook_config.py', '/') + + matches = self.trie.GetMatchingArtifacts('/path/to/dir/file.txt', '/') + self.assertIn('artifact1', matches) + self.assertNotIn('artifact2', matches) + + matches = self.trie.GetMatchingArtifacts( + '/path/dir/data/test.txt', '/') + self.assertNotIn('artifact1', matches) + self.assertIn('artifact2', matches) + + matches = self.trie.GetMatchingArtifacts( + '/home/dummyuser/.jupyter/jupyter_notebook_config.py', '/') + self.assertNotIn('artifact1', matches) + self.assertNotIn('artifact2', matches) + self.assertIn('artifact3', matches) + + def test_get_matching_artifacts_windows_paths(self): + """Tests GetMatchingArtifacts with Windows paths.""" + self.trie.AddPath('artifact1', '\\Windows\\System32\\*.dll', '\\') + self.trie.AddPath('artifact2', '\\Users\\**\\AppData\\*', '\\') + + matches = self.trie.GetMatchingArtifacts( + '\\Windows\\System32\\kernel32.dll', '\\') + self.assertIn('artifact1', matches) + self.assertNotIn('artifact2', matches) + + matches = self.trie.GetMatchingArtifacts( + '\\Users\\test\\AppData\\Local', '\\') + self.assertIn('artifact2', matches) + self.assertNotIn('artifact1', matches) + + def test_get_matching_artifacts_negative_cases(self): + """Tests GetMatchingArtifacts with non-matching cases.""" + self.trie.AddPath('artifact1', '/path/to/file.txt', '/') + self.trie.AddPath('artifact2', '/path/to/*.txt', '/') + self.trie.AddPath('artifact3', '/path/**/file.txt', '/') + + matches = self.trie.GetMatchingArtifacts('/path/to/other.txt', '/') + self.assertNotIn('artifact1', matches) + self.assertNotIn('artifact3', matches) + + matches = self.trie.GetMatchingArtifacts('/path/dir/file.txt', '/') + self.assertNotIn('artifact1', matches) + self.assertNotIn('artifact2', matches) + + matches = self.trie.GetMatchingArtifacts('/path/to/dir/test/fi*.txt', '/') + self.assertNotIn('artifact1', matches) + self.assertNotIn('artifact2', matches) + + def test_add_path_with_mixed_separators(self): + """Tests the AddPath function with mixed path separators.""" + self.trie.AddPath('artifact1', '/mixed/path/style', '/') + self.trie.AddPath('artifact2', '\\another\\mixed\\style', '\\') + + root = self.trie.root + self.assertIn('/', root.children) + self.assertIn('mixed', root.children['/'].children) + + self.assertIn('\\', root.children) + self.assertIn('another', root.children['\\'].children) + + def test_get_matching_artifacts_mixed_separators(self): + """Tests GetMatchingArtifacts with mixed path separators.""" + self.trie.AddPath('artifact1', '/mixed/path/style', '/') + self.trie.AddPath('artifact2', '\\another\\mixed\\style', '\\') + + matches = self.trie.GetMatchingArtifacts('/mixed/path/style', '/') + self.assertIn('artifact1', matches) + + matches = self.trie.GetMatchingArtifacts('\\another\\mixed\\style', '\\') + self.assertIn('artifact2', matches) + + def test_get_matching_artifacts_same_path_different_artifacts(self): + """Tests GetMatchingArtifacts with same path for different artifacts.""" + self.trie.AddPath('artifact1', '/same/path/file.txt', '/') + self.trie.AddPath('artifact2', '/same/path/file.txt', '/') + + matches = self.trie.GetMatchingArtifacts('/same/path/file.txt', '/') + self.assertIn('artifact1', matches) + self.assertIn('artifact2', matches) + + def test_get_matching_artifacts_empty_path(self): + """Tests GetMatchingArtifacts with an empty path.""" + self.trie.AddPath('artifact1', '/path/to/file.txt', '/') + + matches = self.trie.GetMatchingArtifacts('', '/') + self.assertEqual(len(matches), 0) + + def test_get_matching_artifacts_root_path(self): + """Tests GetMatchingArtifacts with root path.""" + self.trie.AddPath('artifact1', '/', '/') + + matches = self.trie.GetMatchingArtifacts('/', '/') + self.assertIn('artifact1', matches) + + def test_get_matching_artifacts_nonexistent_path(self): + """Tests GetMatchingArtifacts with a nonexistent path.""" + self.trie.AddPath('artifact1', '/path/to/file.txt', '/') + + matches = self.trie.GetMatchingArtifacts('/nonexistent/path', '/') + self.assertEqual(len(matches), 0) + + def test_get_matching_artifacts_case_sensitivity(self): + """Tests GetMatchingArtifacts with different case sensitivity.""" + self.trie.AddPath('artifact1', '/path/to/file.txt', '/') + + matches = self.trie.GetMatchingArtifacts('/Path/To/File.TXT', '/') + self.assertNotIn('artifact1', matches) + + def test_get_matching_artifacts_special_characters(self): + """Tests GetMatchingArtifacts with special characters in path.""" + self.trie.AddPath('artifact1', '/path/to/file[0-9].txt', '/') + + matches = self.trie.GetMatchingArtifacts('/path/to/file1.txt', '/') + self.assertIn('artifact1', matches) + + matches = self.trie.GetMatchingArtifacts('/path/to/file.txt', '/') + self.assertNotIn('artifact1', matches) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/engine/engine.py b/tests/engine/engine.py index 439102dbda..4ce29714ff 100644 --- a/tests/engine/engine.py +++ b/tests/engine/engine.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- """Tests the engine.""" +import os import unittest from dfvfs.helpers import fake_file_system_builder @@ -11,8 +12,10 @@ from dfvfs.resolver import context from dfvfs.vfs import file_system as dfvfs_file_system +from plaso.containers import artifacts as containers_artifacts from plaso.engine import configurations from plaso.engine import engine +from plaso.lib import errors from plaso.storage.fake import writer as fake_writer from tests import test_lib as shared_test_lib @@ -93,7 +96,115 @@ def testBuildArtifactsRegistry(self): # TODO: add test that raises BadConfigOption - # TODO: add tests for BuildCollectionFilters. + def testBuildCollectionFilters(self): + """Tests the BuildCollectionFilters function.""" + test_artifacts_path = shared_test_lib.GetTestFilePath(['artifacts']) + self._SkipIfPathNotExists(test_artifacts_path) + + test_engine = TestEngine() + test_engine.BuildArtifactsRegistry(test_artifacts_path, None) + + # Test with artifact_filter_names + artifact_filter_names = ['TestFiles', 'TestFiles2'] + environment_variables = [ + containers_artifacts.EnvironmentVariableArtifact( + case_sensitive=False, name='systemdrive', value='C:' + ) + ] + test_user_accounts = [ + containers_artifacts.UserAccountArtifact( + identifier='1000', + path_separator='\\', + user_directory='C:\\Users\\testuser1', + username='testuser1', + ), + containers_artifacts.UserAccountArtifact( + identifier='1001', + path_separator='\\', + user_directory='%%environ_systemdrive%%\\Users\\testuser2', + username='testuser2', + ), + ] + + # Pass artifact_filter_names to CreateSession + session = test_engine.CreateSession( + artifact_filter_names=artifact_filter_names) + test_engine.BuildCollectionFilters( + environment_variables, + test_user_accounts, + artifact_filter_names=session.artifact_filters, + enable_artifacts_map=True + ) + + self.assertIsNotNone(test_engine._artifacts_trie) + # Verify content of the artifacts trie + self.assertIn(os.sep, test_engine._artifacts_trie.root.children) + trie_root = test_engine._artifacts_trie.root.children[os.sep] + self.assertIn( + 'test_data', + trie_root.children) + self.assertIn( + '*.evtx', + trie_root.children['test_data'].children) + self.assertIn( + 'Users', + trie_root.children) + self.assertIn( + 'testuser1', + trie_root.children['Users'].children) + self.assertIn( + 'testuser2', + trie_root.children['Users'].children) + self.assertIn( + 'Documents', + trie_root.children['Users'].children['testuser1'].children) + self.assertIn( + 'Documents', + trie_root.children['Users'].children['testuser2'].children) + self.assertIn( + 'WindowsPowerShell', + trie_root.children['Users'].children['testuser1'] + .children['Documents'].children) + self.assertIn( + 'WindowsPowerShell', + trie_root.children['Users'].children['testuser2'] + .children['Documents'].children) + self.assertIn( + 'profile.ps1', + trie_root.children['Users'].children['testuser1'] + .children['Documents'].children['WindowsPowerShell'].children) + self.assertIn( + 'profile.ps1', + trie_root.children['Users'].children['testuser2'] + .children['Documents'].children['WindowsPowerShell'].children) + + # Test with filter_file_path + test_filter_file_path = self._GetTestFilePath( + ['end_to_end', 'filter_file2.yaml']) + self._SkipIfPathNotExists(test_filter_file_path) + + test_engine.BuildCollectionFilters( + environment_variables, + test_user_accounts, + filter_file_path=test_filter_file_path, + enable_artifacts_map=True + ) + + self.assertIsNotNone(test_engine._included_file_system_find_specs) + self.assertIsNotNone(test_engine._excluded_file_system_find_specs) + + # Test specific file paths from filter file. + included_find_specs = test_engine.GetCollectionIncludedFindSpecs() + self.assertGreater(len(included_find_specs), 0) + + # Test with invalid filter + with self.assertRaises(errors.InvalidFilter): + test_engine.BuildCollectionFilters( + environment_variables, + test_user_accounts, + artifact_filter_names=['NonExistentArtifact'], + enable_artifacts_map=True + ) def testCreateSession(self): """Tests the CreateSession function.""" @@ -116,8 +227,12 @@ def testGetSourceFileSystem(self): parent=os_path_spec) resolver_context = context.Context() - test_file_system, test_mount_point = test_engine.GetSourceFileSystem( - source_path_spec, resolver_context=resolver_context) + ( + test_file_system, + test_mount_point, + ) = test_engine.GetSourceFileSystem( + source_path_spec, resolver_context=resolver_context + ) self.assertIsNotNone(test_file_system) self.assertIsInstance(test_file_system, dfvfs_file_system.FileSystem) @@ -149,11 +264,12 @@ def testPreprocessSource(self): storage_writer.Open() source_configurations = test_engine.PreprocessSource( - [source_path_spec], storage_writer) + [source_path_spec], storage_writer + ) self.assertEqual(len(source_configurations), 1) - self.assertEqual(source_configurations[0].operating_system, 'Windows NT') + self.assertEqual(source_configurations[0].operating_system, "Windows NT") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/engine/path_helper.py b/tests/engine/path_helper.py index 8170f60597..4f4adfedf1 100644 --- a/tests/engine/path_helper.py +++ b/tests/engine/path_helper.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- """Tests for the path helper.""" +import os import unittest from dfvfs.lib import definitions as dfvfs_definitions @@ -175,7 +176,8 @@ def testExpandGlobStars(self): self.assertEqual(paths, ['/etc/sysconfig/my**']) # Test globstar with suffix. - paths = path_helper.PathHelper.ExpandGlobStars('/etc/sysconfig/**.exe', '/') + paths = path_helper.PathHelper.ExpandGlobStars( + '/etc/sysconfig/**.exe', '/') self.assertEqual(len(paths), 1) @@ -417,6 +419,115 @@ def testGetRelativePathForPathSpec(self): qcow_path_spec) self.assertIsNone(display_name) + def testSanitizePathSegments(self): + """Tests the SanitizePathSegments function.""" + # Test with an empty list of path segments. + sanitized_path_segments = path_helper.PathHelper.SanitizePathSegments([]) + self.assertEqual(sanitized_path_segments, []) + + # Test with clean path segments. + path_segments = ['test', 'directory', 'file.txt'] + expected_sanitized_segments = ['test', 'directory', 'file.txt'] + sanitized_path_segments = path_helper.PathHelper.SanitizePathSegments( + path_segments) + self.assertEqual(sanitized_path_segments, expected_sanitized_segments) + + # Test with dirty path segments. + path_segments = ['test\x00dir', 'fi:le?.txt', 'temp\x1ffile'] + expected_sanitized_segments = ['test_dir', 'fi_le_.txt', 'temp_file'] + sanitized_path_segments = path_helper.PathHelper.SanitizePathSegments( + path_segments) + self.assertEqual(sanitized_path_segments, expected_sanitized_segments) + + # Test with a path segment containing the path separator. + path_segments = [f'test{os.sep}dir', 'file.txt'] + expected_sanitized_segments = ['test_dir', 'file.txt'] + sanitized_path_segments = path_helper.PathHelper.SanitizePathSegments( + path_segments) + self.assertEqual(sanitized_path_segments, expected_sanitized_segments) + + # Test with mixed clean and dirty path segments. + path_segments = ['clean', 'dir\x00ty', 'fi:le?.txt'] + expected_sanitized_segments = ['clean', 'dir_ty', 'fi_le_.txt'] + sanitized_path_segments = path_helper.PathHelper.SanitizePathSegments( + path_segments) + self.assertEqual(sanitized_path_segments, expected_sanitized_segments) + + # Test with path segments containing only dirty characters. + path_segments = ['\x00\x01\x02', '::::', '!!!!'] + expected_sanitized_segments = ['___', '____', '____'] + sanitized_path_segments = path_helper.PathHelper.SanitizePathSegments( + path_segments) + self.assertEqual(sanitized_path_segments, expected_sanitized_segments) + + def testGetRelativePath(self): + """Tests the GetRelativePath function.""" + # Test with normal paths. + target_directory = '/home/user/output' + target_filename = 'file.txt' + destination_path = '/home/user/output/' + expected_relative_path = 'file.txt' + relative_path = path_helper.PathHelper.GetRelativePath( + target_directory, target_filename, destination_path) + self.assertEqual(relative_path, expected_relative_path) + + # Test with subdirectory. + target_directory = '/home/user/output/subdir' + target_filename = 'image.dd' + destination_path = '/home/user/output/' + expected_relative_path = os.path.join('subdir', 'image.dd') + relative_path = path_helper.PathHelper.GetRelativePath( + target_directory, target_filename, destination_path) + self.assertEqual(relative_path, expected_relative_path) + + # Test with a relative destination path. + target_directory = 'output/subdir' + target_filename = 'file.txt' + destination_path = 'output' + os.sep + expected_relative_path = os.path.join('subdir', 'file.txt') + relative_path = path_helper.PathHelper.GetRelativePath( + target_directory, target_filename, destination_path) + self.assertEqual(relative_path, expected_relative_path) + + # Test with no match. + target_directory = '/home/user/output/subdir' + target_filename = 'image.E01' + destination_path = '/another/directory/' + relative_path = path_helper.PathHelper.GetRelativePath( + target_directory, target_filename, destination_path) + self.assertIsNone(relative_path) + + # Test with empty inputs. + relative_path = path_helper.PathHelper.GetRelativePath('', '', '') + self.assertIsNone(relative_path) + + # Test with only destination path ending with separator. + target_directory = '/home/user/output/subdir' + target_filename = 'data.txt' + destination_path = '/home/user/output/' + expected_relative_path = os.path.join('subdir', 'data.txt') + relative_path = path_helper.PathHelper.GetRelativePath( + target_directory, target_filename, destination_path) + self.assertEqual(relative_path, expected_relative_path) + + # Test with only destination path not ending with separator. + target_directory = '/home/user/output/subdir' + target_filename = 'file.txt' + destination_path = '/home/user/output' + expected_relative_path = os.path.join('subdir', 'file.txt') + relative_path = path_helper.PathHelper.GetRelativePath( + target_directory, target_filename, destination_path) + self.assertEqual(relative_path, expected_relative_path) + + # Test with target directory being root. + target_directory = '/' + target_filename = 'file.txt' + destination_path = '/' + expected_relative_path = 'file.txt' + relative_path = path_helper.PathHelper.GetRelativePath( + target_directory, target_filename, destination_path) + self.assertEqual(relative_path, expected_relative_path) + if __name__ == '__main__': unittest.main() diff --git a/tests/filters/file_entry.py b/tests/filters/file_entry.py index 9f621e9bcb..f1e320f7c1 100644 --- a/tests/filters/file_entry.py +++ b/tests/filters/file_entry.py @@ -2,12 +2,14 @@ # -*- coding: utf-8 -*- """Tests for the file entry filters.""" +import os import unittest from dfvfs.lib import definitions as dfvfs_definitions from dfvfs.path import factory as path_spec_factory from dfvfs.resolver import resolver as path_spec_resolver +from plaso.engine import artifacts_trie from plaso.filters import file_entry as file_entry_filters from plaso.lib import specification @@ -378,6 +380,38 @@ def testHasFilters(self): test_filter_collection.AddFilter(file_entry_filter) self.assertTrue(test_filter_collection.HasFilters()) + def testSetArtifactsTrie(self): + """Tests the SetArtifactsTrie function.""" + test_filter_collection = file_entry_filters.FileEntryFilterCollection() + trie = artifacts_trie.ArtifactsTrie() + test_filter_collection.SetArtifactsTrie(trie) + self.assertEqual(test_filter_collection._artifacts_trie, trie) + + def testGetMatchingArtifacts(self): + """Tests the GetMatchingArtifacts function.""" + test_filter_collection = file_entry_filters.FileEntryFilterCollection() + trie = artifacts_trie.ArtifactsTrie() + trie.AddPath('artifact1', '/path/to/file.txt', os.sep) + trie.AddPath('artifact2', '/path/to/dir/', os.sep) + test_filter_collection.SetArtifactsTrie(trie) + + # Test matching a file. + matches = test_filter_collection.GetMatchingArtifacts( + '/path/to/file.txt', os.sep) + self.assertIn('artifact1', matches) + self.assertNotIn('artifact2', matches) + + # Test matching a directory. + matches = test_filter_collection.GetMatchingArtifacts( + '/path/to/dir', os.sep) + self.assertIn('artifact2', matches) + self.assertNotIn('artifact1', matches) + + # Test non-matching path. + matches = test_filter_collection.GetMatchingArtifacts( + '/nonexistent/path', os.sep) + self.assertEqual(matches, []) + # TODO: add test for Matches. # TODO: add test for Print. From a545f8b3967133717600614005da32eab78b2b1d Mon Sep 17 00:00:00 2001 From: sa3eed3ed Date: Tue, 31 Dec 2024 00:43:18 +0000 Subject: [PATCH 2/4] remove os-dependent path operations in artifacts_trie --- plaso/engine/artifacts_trie.py | 48 ++++++++++++++++++++++++++++------ tests/cli/image_export_tool.py | 10 ++++--- tests/engine/artifacts_trie.py | 15 ++++++----- tests/engine/path_helper.py | 30 ++++++++++----------- 4 files changed, 71 insertions(+), 32 deletions(-) diff --git a/plaso/engine/artifacts_trie.py b/plaso/engine/artifacts_trie.py index e8e5b8fb91..ad6bea5520 100644 --- a/plaso/engine/artifacts_trie.py +++ b/plaso/engine/artifacts_trie.py @@ -122,9 +122,9 @@ def _search_trie(node, current_path, segments): if self._MatchesGlobPattern( artifact_path, current_path, node.path_separator): matching_artifacts.add(artifact_name) - elif os.path.normpath(current_path).strip(os.sep).split( - os.sep) == os.path.normpath(artifact_path).strip( - node.path_separator).split(node.path_separator): + elif self._GetNonEmptyPathSegments( + current_path, path_separator) == self._GetNonEmptyPathSegments( + artifact_path, node.path_separator): matching_artifacts.add(artifact_name) if not segments: @@ -139,18 +139,35 @@ def _search_trie(node, current_path, segments): if glob.has_magic(child_segment): if self._MatchesGlobPattern( child_segment, segment, child_node.path_separator): - _search_trie(child_node, os.path.join( - current_path, segment), remaining_segments) - _search_trie(node, os.path.join( + _search_trie(child_node, self._CustomPathJoin( + path_separator, current_path, segment), remaining_segments) + _search_trie( + node, + self._CustomPathJoin( + path_separator, + current_path, segment), remaining_segments) elif child_segment == segment: # If the child is an exact match, continue traversal. - _search_trie(child_node, os.path.join( + _search_trie(child_node, self._CustomPathJoin( + path_separator, current_path, segment), remaining_segments) _search_trie(sub_root_node, '', path_segments) return list(matching_artifacts) + def _GetNonEmptyPathSegments(self, path, separator): + """Splits a path into segments and remove non-empty segments. + + Args: + path (str): The path string to be split. + separator (str): The path separator. + + Returns: + list[str]: A list of non-empty path segments. + """ + return [s for s in path.split(separator) if s] + def _GetArtifactsPaths(self, node): """Retrieves a mapping of artifact names to their paths. @@ -191,6 +208,21 @@ def _collect_paths(node, current_path, artifacts): _collect_paths(node, '', artifacts_paths) return artifacts_paths + def _CustomPathJoin(self, separator, current_path, new_segment): + """Joins path components using a custom separator, replacing os.sep. + + Args: + separator (str): The custom separator to use. + current_path (str): The current path. + new_segment (str): The new segment to add to it. + + Returns: + str: The joined path with the custom separator. + """ + current_path = current_path.replace(separator, os.sep) + joined_path = os.path.join(current_path, new_segment) + return joined_path.replace(os.sep, separator) + def _MatchesGlobPattern(self, glob_pattern, path, path_separator): """Checks if a path matches a given glob pattern. @@ -204,7 +236,7 @@ def _MatchesGlobPattern(self, glob_pattern, path, path_separator): """ # Normalize paths using the appropriate separators glob_pattern = glob_pattern.strip(path_separator).split(path_separator) - path = path.strip(os.sep).split(os.sep) + path = path.strip(path_separator).split(path_separator) i = 0 j = 0 diff --git a/tests/cli/image_export_tool.py b/tests/cli/image_export_tool.py index 87e14f0bb2..2fabcb3e83 100644 --- a/tests/cli/image_export_tool.py +++ b/tests/cli/image_export_tool.py @@ -641,8 +641,11 @@ def testProcessSourceEnableArtifactsMap(self): os.path.join(temp_directory, 'hashes.json'), os.path.join(temp_directory, 'artifacts_map.json')]) expected_json_data = { - 'TestFiles3': ['a_directory/another_file', 'a_directory/a_file'], - 'TestFiles4': ['a_directory/another_file', 'passwords.txt'] + 'TestFiles3': [ + f'a_directory{os.sep}another_file', + f'a_directory{os.sep}a_file'], + 'TestFiles4': + [f'a_directory{os.sep}another_file', 'passwords.txt'] } extracted_files = self._RecursiveList(temp_directory) @@ -699,7 +702,8 @@ def testProcessSourceEnableArtifactsMap_NonMatchingPaths(self): # Verify that no files were extracted. # Only artifacts_map.json and hashes.json should exist. self.assertEqual( - os.listdir(temp_directory), ['hashes.json', 'artifacts_map.json'] + sorted(os.listdir(temp_directory)), sorted( + ['hashes.json', 'artifacts_map.json']) ) # Verify that the trie has no matching paths for image.qcow2. diff --git a/tests/engine/artifacts_trie.py b/tests/engine/artifacts_trie.py index 0e89e4155f..70402f62cf 100644 --- a/tests/engine/artifacts_trie.py +++ b/tests/engine/artifacts_trie.py @@ -202,8 +202,9 @@ def test_get_matching_artifacts_negative_cases(self): def test_add_path_with_mixed_separators(self): """Tests the AddPath function with mixed path separators.""" - self.trie.AddPath('artifact1', '/mixed/path/style', '/') - self.trie.AddPath('artifact2', '\\another\\mixed\\style', '\\') + self.trie.AddPath('artifact1', '/mixed/path/style_s', '/') + self.trie.AddPath( + 'artifact2', '\\another\\mixed path\\style to-match_it', '\\') root = self.trie.root self.assertIn('/', root.children) @@ -214,13 +215,15 @@ def test_add_path_with_mixed_separators(self): def test_get_matching_artifacts_mixed_separators(self): """Tests GetMatchingArtifacts with mixed path separators.""" - self.trie.AddPath('artifact1', '/mixed/path/style', '/') - self.trie.AddPath('artifact2', '\\another\\mixed\\style', '\\') + self.trie.AddPath('artifact1', '/mixed/path/style_s', '/') + self.trie.AddPath( + 'artifact2', '\\another\\mixed path\\style to-match_it', '\\') - matches = self.trie.GetMatchingArtifacts('/mixed/path/style', '/') + matches = self.trie.GetMatchingArtifacts('/mixed/path/style_s', '/') self.assertIn('artifact1', matches) - matches = self.trie.GetMatchingArtifacts('\\another\\mixed\\style', '\\') + matches = self.trie.GetMatchingArtifacts( + '\\another\\mixed path\\style to-match_it', '\\') self.assertIn('artifact2', matches) def test_get_matching_artifacts_same_path_different_artifacts(self): diff --git a/tests/engine/path_helper.py b/tests/engine/path_helper.py index 4f4adfedf1..bfb7ef415e 100644 --- a/tests/engine/path_helper.py +++ b/tests/engine/path_helper.py @@ -440,7 +440,7 @@ def testSanitizePathSegments(self): self.assertEqual(sanitized_path_segments, expected_sanitized_segments) # Test with a path segment containing the path separator. - path_segments = [f'test{os.sep}dir', 'file.txt'] + path_segments = [f'test{os.path.sep}dir', 'file.txt'] expected_sanitized_segments = ['test_dir', 'file.txt'] sanitized_path_segments = path_helper.PathHelper.SanitizePathSegments( path_segments) @@ -463,36 +463,36 @@ def testSanitizePathSegments(self): def testGetRelativePath(self): """Tests the GetRelativePath function.""" # Test with normal paths. - target_directory = '/home/user/output' + target_directory = '/home/user/output'.replace('/', os.path.sep) target_filename = 'file.txt' - destination_path = '/home/user/output/' + destination_path = '/home/user/output/'.replace('/', os.path.sep) expected_relative_path = 'file.txt' relative_path = path_helper.PathHelper.GetRelativePath( target_directory, target_filename, destination_path) self.assertEqual(relative_path, expected_relative_path) # Test with subdirectory. - target_directory = '/home/user/output/subdir' + target_directory = '/home/user/output/subdir'.replace('/', os.path.sep) target_filename = 'image.dd' - destination_path = '/home/user/output/' + destination_path = '/home/user/output/'.replace('/', os.path.sep) expected_relative_path = os.path.join('subdir', 'image.dd') relative_path = path_helper.PathHelper.GetRelativePath( target_directory, target_filename, destination_path) self.assertEqual(relative_path, expected_relative_path) # Test with a relative destination path. - target_directory = 'output/subdir' + target_directory = 'output/subdir'.replace('/', os.path.sep) target_filename = 'file.txt' - destination_path = 'output' + os.sep + destination_path = 'output' + os.path.sep expected_relative_path = os.path.join('subdir', 'file.txt') relative_path = path_helper.PathHelper.GetRelativePath( target_directory, target_filename, destination_path) self.assertEqual(relative_path, expected_relative_path) # Test with no match. - target_directory = '/home/user/output/subdir' + target_directory = '/home/user/output/subdir'.replace('/', os.path.sep) target_filename = 'image.E01' - destination_path = '/another/directory/' + destination_path = '/another/directory/'.replace('/', os.path.sep) relative_path = path_helper.PathHelper.GetRelativePath( target_directory, target_filename, destination_path) self.assertIsNone(relative_path) @@ -502,27 +502,27 @@ def testGetRelativePath(self): self.assertIsNone(relative_path) # Test with only destination path ending with separator. - target_directory = '/home/user/output/subdir' + target_directory = '/home/user/output/subdir'.replace('/', os.path.sep) target_filename = 'data.txt' - destination_path = '/home/user/output/' + destination_path = '/home/user/output/'.replace('/', os.path.sep) expected_relative_path = os.path.join('subdir', 'data.txt') relative_path = path_helper.PathHelper.GetRelativePath( target_directory, target_filename, destination_path) self.assertEqual(relative_path, expected_relative_path) # Test with only destination path not ending with separator. - target_directory = '/home/user/output/subdir' + target_directory = '/home/user/output/subdir'.replace('/', os.path.sep) target_filename = 'file.txt' - destination_path = '/home/user/output' + destination_path = '/home/user/output'.replace('/', os.path.sep) expected_relative_path = os.path.join('subdir', 'file.txt') relative_path = path_helper.PathHelper.GetRelativePath( target_directory, target_filename, destination_path) self.assertEqual(relative_path, expected_relative_path) # Test with target directory being root. - target_directory = '/' + target_directory = '/'.replace('/', os.path.sep) target_filename = 'file.txt' - destination_path = '/' + destination_path = '/'.replace('/', os.path.sep) expected_relative_path = 'file.txt' relative_path = path_helper.PathHelper.GetRelativePath( target_directory, target_filename, destination_path) From 2a7e31b1e63886866580119cc983eee4d826ada8 Mon Sep 17 00:00:00 2001 From: sa3eed3ed Date: Tue, 31 Dec 2024 01:23:44 +0000 Subject: [PATCH 3/4] nits --- tests/cli/image_export_tool.py | 2 +- tests/filters/file_entry.py | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/cli/image_export_tool.py b/tests/cli/image_export_tool.py index 2fabcb3e83..cac181f7de 100644 --- a/tests/cli/image_export_tool.py +++ b/tests/cli/image_export_tool.py @@ -710,7 +710,7 @@ def testProcessSourceEnableArtifactsMap_NonMatchingPaths(self): self.assertIsNotNone(test_tool._filter_collection._artifacts_trie) self.assertNotIn('a_directory', test_tool._filter_collection._artifacts_trie.root - .children['/'].children) + .children[os.sep].children) # Verify that artifacts_map.json is created but empty artifacts_map_file_path = os.path.join( diff --git a/tests/filters/file_entry.py b/tests/filters/file_entry.py index f1e320f7c1..05e67469ec 100644 --- a/tests/filters/file_entry.py +++ b/tests/filters/file_entry.py @@ -391,25 +391,27 @@ def testGetMatchingArtifacts(self): """Tests the GetMatchingArtifacts function.""" test_filter_collection = file_entry_filters.FileEntryFilterCollection() trie = artifacts_trie.ArtifactsTrie() - trie.AddPath('artifact1', '/path/to/file.txt', os.sep) - trie.AddPath('artifact2', '/path/to/dir/', os.sep) + trie.AddPath( + 'artifact1', f'{os.sep}path{os.sep}to{os.sep}file.txt', os.sep) + trie.AddPath( + 'artifact2', f'{os.sep}path{os.sep}to{os.sep}dir{os.sep}', os.sep) test_filter_collection.SetArtifactsTrie(trie) # Test matching a file. matches = test_filter_collection.GetMatchingArtifacts( - '/path/to/file.txt', os.sep) + f'{os.sep}path{os.sep}to{os.sep}file.txt', os.sep) self.assertIn('artifact1', matches) self.assertNotIn('artifact2', matches) # Test matching a directory. matches = test_filter_collection.GetMatchingArtifacts( - '/path/to/dir', os.sep) + f'{os.sep}path{os.sep}to{os.sep}dir', os.sep) self.assertIn('artifact2', matches) self.assertNotIn('artifact1', matches) # Test non-matching path. matches = test_filter_collection.GetMatchingArtifacts( - '/nonexistent/path', os.sep) + f'{os.sep}nonexistent{os.sep}path', os.sep) self.assertEqual(matches, []) # TODO: add test for Matches. From 96a3bd10f0e78c20e8150db76fa8157fa473b779 Mon Sep 17 00:00:00 2001 From: sa3eed3ed Date: Thu, 2 Jan 2025 16:04:50 +0000 Subject: [PATCH 4/4] handle cases where path segments are sanitized at time when output is written --- plaso/engine/artifacts_trie.py | 62 ++++++++++++++++++++++++++++------ tests/engine/artifacts_trie.py | 21 ++++++++++++ 2 files changed, 72 insertions(+), 11 deletions(-) diff --git a/plaso/engine/artifacts_trie.py b/plaso/engine/artifacts_trie.py index ad6bea5520..2b5a57e7a6 100644 --- a/plaso/engine/artifacts_trie.py +++ b/plaso/engine/artifacts_trie.py @@ -6,6 +6,7 @@ import os from plaso.engine import logger +from plaso.engine import path_helper class TrieNode(object): @@ -118,14 +119,16 @@ def _search_trie(node, current_path, segments): if node.artifacts_names: for artifact_name in node.artifacts_names: for artifact_path in self.artifacts_paths.get(artifact_name, []): - if glob.has_magic(artifact_path): + if self._ComparePathIfSanitized( + current_path, + path_separator, + artifact_path, + node.path_separator): + matching_artifacts.add(artifact_name) + elif glob.has_magic(artifact_path): if self._MatchesGlobPattern( artifact_path, current_path, node.path_separator): matching_artifacts.add(artifact_name) - elif self._GetNonEmptyPathSegments( - current_path, path_separator) == self._GetNonEmptyPathSegments( - artifact_path, node.path_separator): - matching_artifacts.add(artifact_name) if not segments: return @@ -135,8 +138,20 @@ def _search_trie(node, current_path, segments): # Handle glob characters in the current segment. for child_segment, child_node in node.children.items(): + if ( + child_segment == segment or + # comapring the sanitized version of the path segment stored in + # the tree to the path segment from to the tool output as it + # sanitizes path segments before writting data to disk. + path_helper.PathHelper.SanitizePathSegments( + [child_segment]).pop() == segment + ): + # If the child is an exact match, continue traversal. + _search_trie(child_node, self._CustomPathJoin( + path_separator, + current_path, child_segment), remaining_segments) # If the child is a glob, see if it matches. - if glob.has_magic(child_segment): + elif glob.has_magic(child_segment): if self._MatchesGlobPattern( child_segment, segment, child_node.path_separator): _search_trie(child_node, self._CustomPathJoin( @@ -147,15 +162,40 @@ def _search_trie(node, current_path, segments): self._CustomPathJoin( path_separator, current_path, segment), remaining_segments) - elif child_segment == segment: - # If the child is an exact match, continue traversal. - _search_trie(child_node, self._CustomPathJoin( - path_separator, - current_path, segment), remaining_segments) _search_trie(sub_root_node, '', path_segments) return list(matching_artifacts) + def _ComparePathIfSanitized( + self, + current_path, + path_separator, + artifact_path, + atrifact_path_seperator): + """Compares a current path with an artifact path, handling sanitization. + + This method checks if the current_path matches the artifact_path, + considering that the artifact_path might have been sanitized. + + Args: + current_path (str): The current path being checked. + path_separator (str): Path separator for the current path. + artifact_path (str): The artifact path to compare against. + atrifact_path_seperator (str): Path separator for the artifact path. + + Returns: + bool: True if the current path matches the artifact path (or its + sanitized version), False otherwise. + """ + atrifact_path_segments = self._GetNonEmptyPathSegments( + artifact_path, atrifact_path_seperator) + return self._GetNonEmptyPathSegments( + current_path, path_separator) in [ + atrifact_path_segments, + path_helper.PathHelper.SanitizePathSegments( + atrifact_path_segments) + ] + def _GetNonEmptyPathSegments(self, path, separator): """Splits a path into segments and remove non-empty segments. diff --git a/tests/engine/artifacts_trie.py b/tests/engine/artifacts_trie.py index 70402f62cf..add63751d3 100644 --- a/tests/engine/artifacts_trie.py +++ b/tests/engine/artifacts_trie.py @@ -273,6 +273,27 @@ def test_get_matching_artifacts_special_characters(self): matches = self.trie.GetMatchingArtifacts('/path/to/file.txt', '/') self.assertNotIn('artifact1', matches) + def test_get_matching_artifacts_sanitized_paths(self): + """Tests GetMatchingArtifacts with sanitized paths.""" + self.trie.AddPath('artifact1', '/path/to/file\x00\x01.txt', '/') + self.trie.AddPath('artifact2', '/another/path/fo:r?/file.txt', '/') + + # Test path with dirty characters that would be sanitized. + matches = self.trie.GetMatchingArtifacts( + '/path/to/file__.txt', '/') + self.assertIn('artifact1', matches) + + # Test with a different path that sanitizes to the same value. + matches = self.trie.GetMatchingArtifacts( + '/another/path/fo_r_/file.txt', '/') + self.assertIn('artifact2', matches) + + # Negative test with non-matching sanitized path. + matches = self.trie.GetMatchingArtifacts( + '/nonexistent/path/fo_r_/file.txt', '/') + self.assertNotIn('artifact1', matches) + self.assertNotIn('artifact2', matches) + if __name__ == '__main__': unittest.main()