From 9284bfbd69dfe5a8067f86c14fed26972355289e Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Sun, 17 Jan 2021 12:11:08 +0100 Subject: [PATCH] Changed analysis plugins to directly read from storage #3439 --- plaso/analysis/interface.py | 69 ++++++++++- plaso/cli/psort_tool.py | 3 +- plaso/multi_process/analysis_engine.py | 158 ++++++++---------------- plaso/multi_process/analysis_process.py | 87 ++++++------- plaso/storage/file_interface.py | 8 +- plaso/storage/sqlite/sqlite_file.py | 18 ++- 6 files changed, 177 insertions(+), 166 deletions(-) diff --git a/plaso/analysis/interface.py b/plaso/analysis/interface.py index 03106012a6..1cb97458e6 100644 --- a/plaso/analysis/interface.py +++ b/plaso/analysis/interface.py @@ -11,10 +11,19 @@ from plaso.containers import events from plaso.containers import reports from plaso.lib import definitions +from plaso.storage import event_tag_index class AnalysisPlugin(object): - """Class that defines the analysis plugin interface.""" + """Class that defines the analysis plugin interface. + + Attributes: + number_of_consumed_events (int): number of events consumed by the analysis + plugin. + number_of_filtered_events (int): number of events filtered by the event + filter during analysis. + plugin_type (str): analysis plugin type. + """ # The name of the plugin. This is the name that is matched against when # loading plugins, so it is important that this name is short, concise and @@ -28,6 +37,10 @@ def __init__(self): """Initializes an analysis plugin.""" super(AnalysisPlugin, self).__init__() self._analysis_counter = collections.Counter() + self._event_tag_index = event_tag_index.EventTagIndex() + + self.number_of_consumed_events = 0 + self.number_of_filtered_events = 0 self.plugin_type = analysis_definitions.PLUGIN_TYPE_REPORT @property @@ -93,3 +106,57 @@ def ExamineEvent(self, mediator, event, event_data, event_data_stream): event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. """ + + def ProcessEventStore(self, mediator, storage_reader, event_filter=None): + """Analyzes an event store. + + Args: + mediator (AnalysisMediator): mediates interactions between + analysis plugins and other components, such as storage and dfvfs. + storage_reader (StorageReader): storage reader. + event_filter (Optional[EventObjectFilter]): event filter. + """ + # TODO: determine if filter_limit makes sense for analysis plugins or + # that it should be removed. + filter_limit = getattr(event_filter, 'limit', None) + + # TODO: test if GetEvents is faster for analysis plugins that do not + # require the events to be in chronological order. + # if event_filter: + # event_generator = storage_reader.GetSortedEvents() + # else: + # event_generator = storage_reader.GetEvents() + + for event in storage_reader.GetSortedEvents(): + if mediator.abort: + break + + event_data_identifier = event.GetEventDataIdentifier() + event_data = storage_reader.GetEventDataByIdentifier( + event_data_identifier) + + event_data_stream_identifier = event_data.GetEventDataStreamIdentifier() + event_data_stream = None + if event_data_stream_identifier: + event_data_stream = storage_reader.GetEventDataStreamByIdentifier( + event_data_stream_identifier) + + event_identifier = event.GetIdentifier() + event_tag = self._event_tag_index.GetEventTagByIdentifier( + storage_reader, event_identifier) + + filter_match = None + if event_filter: + filter_match = event_filter.Match( + event, event_data, event_data_stream, event_tag) + + # pylint: disable=singleton-comparison + if filter_match == False: + self.number_of_filtered_events += 1 + continue + + self.ExamineEvent(mediator, event, event_data, event_data_stream) + self.number_of_consumed_events += 1 + + if filter_limit and filter_limit == self.number_of_consumed_events: + break diff --git a/plaso/cli/psort_tool.py b/plaso/cli/psort_tool.py index 24460efe73..3a07b382d3 100644 --- a/plaso/cli/psort_tool.py +++ b/plaso/cli/psort_tool.py @@ -69,8 +69,8 @@ def __init__(self, input_reader=None, output_writer=None): self._analysis_plugins_output_format = None self._command_line_arguments = None self._deduplicate_events = True - self._event_filter_expression = None self._event_filter = None + self._event_filter_expression = None self._knowledge_base = knowledge_base.KnowledgeBase() self._number_of_analysis_reports = 0 self._preferred_language = 'en-US' @@ -527,7 +527,6 @@ def ProcessStorage(self): analysis_engine.AnalyzeEvents( session, self._knowledge_base, storage_writer, self._data_location, self._analysis_plugins, configuration, - event_filter=self._event_filter, event_filter_expression=self._event_filter_expression, status_update_callback=status_update_callback, storage_file_path=self._storage_file_path) diff --git a/plaso/multi_process/analysis_engine.py b/plaso/multi_process/analysis_engine.py index 3063336a47..56ad4e9547 100644 --- a/plaso/multi_process/analysis_engine.py +++ b/plaso/multi_process/analysis_engine.py @@ -47,7 +47,6 @@ def __init__(self, worker_memory_limit=None, worker_timeout=None): super(AnalysisMultiProcessEngine, self).__init__() self._analysis_plugins = {} - self._completed_analysis_processes = set() self._data_location = None self._event_filter_expression = None self._event_queues = {} @@ -74,14 +73,13 @@ def __init__(self, worker_memory_limit=None, worker_timeout=None): self._worker_memory_limit = worker_memory_limit self._worker_timeout = worker_timeout or definitions.DEFAULT_WORKER_TIMEOUT - def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None): - """Analyzes events in a plaso storage. + def _AnalyzeEventStore(self, storage_writer, analysis_plugins): + """Analyzes an event store. Args: storage_writer (StorageWriter): storage writer. analysis_plugins (dict[str, AnalysisPlugin]): analysis plugins that should be run and their names. - event_filter (Optional[EventObjectFilter]): event filter. Returns: collections.Counter: counter containing information about the events @@ -102,60 +100,25 @@ def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None): self._number_of_produced_event_tags = 0 self._number_of_produced_sources = 0 - number_of_filtered_events = 0 - - logger.debug('Processing events.') - - filter_limit = getattr(event_filter, 'limit', None) - - for event in storage_writer.GetSortedEvents(): - event_data_identifier = event.GetEventDataIdentifier() - event_data = storage_writer.GetEventDataByIdentifier( - event_data_identifier) - - event_data_stream_identifier = event_data.GetEventDataStreamIdentifier() - if event_data_stream_identifier: - event_data_stream = storage_writer.GetEventDataStreamByIdentifier( - event_data_stream_identifier) - else: - event_data_stream = None - - event_identifier = event.GetIdentifier() - event_tag = self._event_tag_index.GetEventTagByIdentifier( - storage_writer, event_identifier) - - if event_filter: - filter_match = event_filter.Match( - event, event_data, event_data_stream, event_tag) - else: - filter_match = None - - # pylint: disable=singleton-comparison - if filter_match == False: - number_of_filtered_events += 1 - continue - - for event_queue in self._event_queues.values(): - # TODO: Check for premature exit of analysis plugins. - event_queue.PushItem((event, event_data, event_data_stream)) - - self._number_of_consumed_events += 1 - - if (event_filter and filter_limit and - filter_limit == self._number_of_consumed_events): - break - - logger.debug('Finished pushing events to analysis plugins.') - # Signal that we have finished adding events. - for event_queue in self._event_queues.values(): - event_queue.PushItem(plaso_queue.QueueAbort(), block=False) + # TODO: change events into Progress and have analysis plugins report + # progress in percentage e.g. x out of y events or x out of y event data + # streams. Have main process report x out of y analysis results merged. logger.debug('Processing analysis plugin results.') # TODO: use a task based approach. plugin_names = list(analysis_plugins.keys()) - while plugin_names: - for plugin_name in list(plugin_names): + + analysis_tasks = [] + for plugin_name in analysis_plugins.keys(): + task = tasks.Task() + task.storage_format = definitions.STORAGE_FORMAT_SQLITE + task.identifier = plugin_name + + analysis_tasks.append(task) + + while analysis_tasks: + for task in list(analysis_tasks): if self._abort: break @@ -174,6 +137,7 @@ def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None): event_queue = self._event_queues[plugin_name] del self._event_queues[plugin_name] + event_queue.PushItem(plaso_queue.QueueAbort()) event_queue.Close() storage_merge_reader = self._StartMergeTaskStorage( @@ -182,7 +146,7 @@ def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None): storage_merge_reader.MergeAttributeContainers( callback=self._MergeEventTag) # TODO: temporary solution. - plugin_names.remove(plugin_name) + analysis_tasks.remove(task) self._status = definitions.STATUS_INDICATOR_RUNNING @@ -204,7 +168,6 @@ def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None): logger.debug('Processing completed.') events_counter = collections.Counter() - events_counter['Events filtered'] = number_of_filtered_events events_counter['Events processed'] = self._number_of_consumed_events return events_counter @@ -222,60 +185,50 @@ def _CheckStatusAnalysisProcess(self, pid): # vs management). self._RaiseIfNotRegistered(pid) - if pid in self._completed_analysis_processes: - status_indicator = definitions.STATUS_INDICATOR_COMPLETED - process_status = { - 'processing_status': status_indicator} - used_memory = 0 + process = self._processes_per_pid[pid] + process_status = self._QueryProcessStatus(process) + if process_status is None: + process_is_alive = False else: - process = self._processes_per_pid[pid] + process_is_alive = True - process_status = self._QueryProcessStatus(process) - if process_status is None: - process_is_alive = False - else: - process_is_alive = True - - process_information = self._process_information_per_pid[pid] - used_memory = process_information.GetUsedMemory() or 0 - - if self._worker_memory_limit and used_memory > self._worker_memory_limit: - logger.warning(( - 'Process: {0:s} (PID: {1:d}) killed because it exceeded the ' - 'memory limit: {2:d}.').format( - process.name, pid, self._worker_memory_limit)) - self._KillProcess(pid) + process_information = self._process_information_per_pid[pid] + used_memory = process_information.GetUsedMemory() or 0 - if isinstance(process_status, dict): - self._rpc_errors_per_pid[pid] = 0 - status_indicator = process_status.get('processing_status', None) + if self._worker_memory_limit and used_memory > self._worker_memory_limit: + logger.warning(( + 'Process: {0:s} (PID: {1:d}) killed because it exceeded the ' + 'memory limit: {2:d}.').format( + process.name, pid, self._worker_memory_limit)) + self._KillProcess(pid) - if status_indicator == definitions.STATUS_INDICATOR_COMPLETED: - self._completed_analysis_processes.add(pid) + if isinstance(process_status, dict): + self._rpc_errors_per_pid[pid] = 0 + status_indicator = process_status.get('processing_status', None) - else: - rpc_errors = self._rpc_errors_per_pid.get(pid, 0) + 1 - self._rpc_errors_per_pid[pid] = rpc_errors + else: + rpc_errors = self._rpc_errors_per_pid.get(pid, 0) + 1 + self._rpc_errors_per_pid[pid] = rpc_errors - if rpc_errors > self._MAXIMUM_RPC_ERRORS: - process_is_alive = False + if rpc_errors > self._MAXIMUM_RPC_ERRORS: + process_is_alive = False - if process_is_alive: - rpc_port = process.rpc_port.value - logger.warning(( - 'Unable to retrieve process: {0:s} (PID: {1:d}) status via ' - 'RPC socket: http://localhost:{2:d}').format( - process.name, pid, rpc_port)) + if process_is_alive: + rpc_port = process.rpc_port.value + logger.warning(( + 'Unable to retrieve process: {0:s} (PID: {1:d}) status via ' + 'RPC socket: http://localhost:{2:d}').format( + process.name, pid, rpc_port)) - processing_status_string = 'RPC error' - status_indicator = definitions.STATUS_INDICATOR_RUNNING - else: - processing_status_string = 'killed' - status_indicator = definitions.STATUS_INDICATOR_KILLED + processing_status_string = 'RPC error' + status_indicator = definitions.STATUS_INDICATOR_RUNNING + else: + processing_status_string = 'killed' + status_indicator = definitions.STATUS_INDICATOR_KILLED - process_status = { - 'processing_status': processing_status_string} + process_status = { + 'processing_status': processing_status_string} self._UpdateProcessingStatus(pid, process_status, used_memory) @@ -552,7 +505,6 @@ def AnalyzeEvents( should be run and their names. processing_configuration (ProcessingConfiguration): processing configuration. - event_filter (Optional[EventObjectFilter]): event filter. event_filter_expression (Optional[str]): event filter expression. status_update_callback (Optional[function]): callback function for status updates. @@ -599,8 +551,7 @@ def AnalyzeEvents( try: storage_writer.WriteSessionConfiguration() - self._AnalyzeEvents( - storage_writer, analysis_plugins, event_filter=event_filter) + self._AnalyzeEventStore(storage_writer, analysis_plugins) self._status = definitions.STATUS_INDICATOR_FINALIZING @@ -661,6 +612,3 @@ def AnalyzeEvents( if keyboard_interrupt: raise KeyboardInterrupt - - if keyboard_interrupt: - raise KeyboardInterrupt diff --git a/plaso/multi_process/analysis_process.py b/plaso/multi_process/analysis_process.py index 8f62d61164..3164f3e95d 100644 --- a/plaso/multi_process/analysis_process.py +++ b/plaso/multi_process/analysis_process.py @@ -6,6 +6,7 @@ from plaso.analysis import mediator as analysis_mediator from plaso.containers import tasks from plaso.engine import plaso_queue +from plaso.filters import event_filter from plaso.lib import definitions from plaso.lib import errors from plaso.multi_process import task_process @@ -62,6 +63,10 @@ def _GetStatus(self): dict[str, object]: status attributes, indexed by name. """ logger.debug('Status update requested') + number_of_consumed_events = 0 + if self._analysis_plugin: + number_of_consumed_events = ( + self._analysis_plugin.number_of_consumed_events) if self._analysis_mediator: number_of_produced_event_tags = ( @@ -89,7 +94,7 @@ def _GetStatus(self): 'identifier': self._name, 'number_of_consumed_analysis_warnings': None, 'number_of_consumed_event_tags': None, - 'number_of_consumed_events': self._number_of_consumed_events, + 'number_of_consumed_events': number_of_consumed_events, 'number_of_consumed_reports': None, 'number_of_consumed_sources': None, 'number_of_produced_analysis_warnings': None, @@ -154,30 +159,14 @@ def _Main(self): task_storage_writer.WriteTaskStart() try: - logger.debug( - '{0!s} (PID: {1:d}) started monitoring event queue.'.format( - self._name, self._pid)) - - while not self._abort: - try: - queued_object = self._event_queue.PopItem() - - except (errors.QueueClose, errors.QueueEmpty) as exception: - logger.debug('ConsumeItems exiting with exception {0!s}.'.format( - type(exception))) - break - - if isinstance(queued_object, plaso_queue.QueueAbort): - logger.debug('ConsumeItems exiting, dequeued QueueAbort object.') - break - - self._ProcessEvent(self._analysis_mediator, *queued_object) + filter_object = None + if self._event_filter_expression: + filter_object = event_filter.EventObjectFilter() + filter_object.CompileFilter(self._event_filter_expression) - self._number_of_consumed_events += 1 - - logger.debug( - '{0!s} (PID: {1:d}) stopped monitoring event queue.'.format( - self._name, self._pid)) + with self._storage_writer.CreateStorageReader() as storage_reader: + self._analysis_plugin.ProcessEventStore( + self._analysis_mediator, storage_reader, event_filter=filter_object) if not self._abort: self._status = definitions.STATUS_INDICATOR_REPORTING @@ -187,6 +176,7 @@ def _Main(self): # All exceptions need to be caught here to prevent the process # from being killed by an uncaught exception. except Exception as exception: # pylint: disable=broad-except + # TODO: write analysis error and change logger to debug only. logger.warning( 'Unhandled exception in process: {0!s} (PID: {1:d}).'.format( self._name, self._pid)) @@ -212,11 +202,32 @@ def _Main(self): logger.warning('Unable to finalize task storage with error: {0!s}'.format( exception)) + if self._serializers_profiler: + self._storage_writer.SetSerializersProfiler(None) + + if self._storage_profiler: + self._storage_writer.SetStorageProfiler(None) + + self._StopProfiling() + if self._abort: self._status = definitions.STATUS_INDICATOR_ABORTED else: self._status = definitions.STATUS_INDICATOR_COMPLETED + while not self._abort: + try: + queued_object = self._event_queue.PopItem() + + except (errors.QueueClose, errors.QueueEmpty) as exception: + logger.debug('ConsumeItems exiting with exception {0!s}.'.format( + type(exception))) + break + + if isinstance(queued_object, plaso_queue.QueueAbort): + logger.debug('ConsumeItems exiting, dequeued QueueAbort object.') + break + logger.debug('Wait for foreman status wait event') self._foreman_status_wait_event.clear() self._foreman_status_wait_event.wait(self._FOREMAN_STATUS_WAIT) @@ -224,14 +235,6 @@ def _Main(self): logger.debug('Analysis plugin: {0!s} (PID: {1:d}) stopped'.format( self._name, self._pid)) - if self._serializers_profiler: - self._storage_writer.SetSerializersProfiler(None) - - if self._storage_profiler: - self._storage_writer.SetStorageProfiler(None) - - self._StopProfiling() - self._analysis_mediator = None self._foreman_status_wait_event = None self._storage_writer = None @@ -242,26 +245,6 @@ def _Main(self): except errors.QueueAlreadyClosed: logger.error('Queue for {0:s} was already closed.'.format(self.name)) - def _ProcessEvent(self, mediator, event, event_data, event_data_stream): - """Processes an event. - - Args: - mediator (AnalysisMediator): mediates interactions between - analysis plugins and other components, such as storage and dfvfs. - event (EventObject): event. - event_data (EventData): event data. - event_data_stream (EventDataStream): event data stream. - """ - try: - self._analysis_plugin.ExamineEvent( - mediator, event, event_data, event_data_stream) - - except Exception as exception: # pylint: disable=broad-except - # TODO: write analysis error and change logger to debug only. - - logger.warning('Unhandled exception while processing event object.') - logger.exception(exception) - def SignalAbort(self): """Signals the process to abort.""" self._abort = True diff --git a/plaso/storage/file_interface.py b/plaso/storage/file_interface.py index c07e76d283..6db0d7e784 100644 --- a/plaso/storage/file_interface.py +++ b/plaso/storage/file_interface.py @@ -484,13 +484,13 @@ class StorageFileWriter(interface.StorageWriter): """Defines an interface for a file-backed storage writer.""" def __init__( - self, session, output_file, - storage_type=definitions.STORAGE_TYPE_SESSION, task=None): + self, session, path, storage_type=definitions.STORAGE_TYPE_SESSION, + task=None): """Initializes a storage writer. Args: session (Session): session the storage changes are part of. - output_file (str): path to the output file. + path (str): path to the storage file. storage_type (Optional[str]): storage type. task(Optional[Task]): task. """ @@ -841,7 +841,7 @@ def Open(self, **unused_kwargs): if self._storage_profiler: self._storage_file.SetStorageProfiler(self._storage_profiler) - self._storage_file.Open(path=self._output_file, read_only=False) + self._storage_file.Open(path=self._path, read_only=False) self._first_written_event_source_index = ( self._storage_file.GetNumberOfEventSources()) diff --git a/plaso/storage/sqlite/sqlite_file.py b/plaso/storage/sqlite/sqlite_file.py index 0b9a53f373..697d2b7249 100644 --- a/plaso/storage/sqlite/sqlite_file.py +++ b/plaso/storage/sqlite/sqlite_file.py @@ -3,6 +3,7 @@ import collections import os +import pathlib import sqlite3 import zlib @@ -1146,8 +1147,21 @@ def Open(self, path=None, read_only=True, **unused_kwargs): path = os.path.abspath(path) - connection = sqlite3.connect( - path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) + try: + path_uri = pathlib.Path(path).as_uri() + if read_only: + path_uri = '{0:s}?mode=ro'.format(path_uri) + + except ValueError: + path_uri = None + + if path_uri: + connection = sqlite3.connect( + path_uri, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES, + uri=True) + else: + connection = sqlite3.connect( + path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) cursor = connection.cursor() if not cursor: