diff --git a/plaso/analysis/interface.py b/plaso/analysis/interface.py index 89475902e3..7b38ad27a8 100644 --- a/plaso/analysis/interface.py +++ b/plaso/analysis/interface.py @@ -11,10 +11,19 @@ from plaso.containers import events from plaso.containers import reports from plaso.lib import definitions +from plaso.storage import event_tag_index class AnalysisPlugin(object): - """Class that defines the analysis plugin interface.""" + """Class that defines the analysis plugin interface. + + Attributes: + number_of_consumed_events (int): number of events consumed by the analysis + plugin. + number_of_filtered_events (int): number of events filtered by the event + filter during analysis. + plugin_type (str): analysis plugin type. + """ # The name of the plugin. This is the name that is matched against when # loading plugins, so it is important that this name is short, concise and @@ -25,6 +34,10 @@ def __init__(self): """Initializes an analysis plugin.""" super(AnalysisPlugin, self).__init__() self._analysis_counter = collections.Counter() + self._event_tag_index = event_tag_index.EventTagIndex() + + self.number_of_consumed_events = 0 + self.number_of_filtered_events = 0 self.plugin_type = analysis_definitions.PLUGIN_TYPE_REPORT @property @@ -90,3 +103,57 @@ def ExamineEvent(self, mediator, event, event_data, event_data_stream): event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. """ + + def ProcessEventStore(self, mediator, storage_reader, event_filter=None): + """Analyzes an event store. + + Args: + mediator (AnalysisMediator): mediates interactions between + analysis plugins and other components, such as storage and dfvfs. + storage_reader (StorageReader): storage reader. + event_filter (Optional[EventObjectFilter]): event filter. + """ + # TODO: determine if filter_limit makes sense for analysis plugins or + # that it should be removed. + filter_limit = getattr(event_filter, 'limit', None) + + # TODO: test if GetEvents is faster for analysis plugins that do not + # require the events to be in chronological order. + # if event_filter: + # event_generator = storage_reader.GetSortedEvents() + # else: + # event_generator = storage_reader.GetEvents() + + for event in storage_reader.GetSortedEvents(): + if mediator.abort: + break + + event_data_identifier = event.GetEventDataIdentifier() + event_data = storage_reader.GetEventDataByIdentifier( + event_data_identifier) + + event_data_stream_identifier = event_data.GetEventDataStreamIdentifier() + event_data_stream = None + if event_data_stream_identifier: + event_data_stream = storage_reader.GetEventDataStreamByIdentifier( + event_data_stream_identifier) + + event_identifier = event.GetIdentifier() + event_tag = self._event_tag_index.GetEventTagByIdentifier( + storage_reader, event_identifier) + + filter_match = None + if event_filter: + filter_match = event_filter.Match( + event, event_data, event_data_stream, event_tag) + + # pylint: disable=singleton-comparison + if filter_match == False: + self.number_of_filtered_events += 1 + continue + + self.ExamineEvent(mediator, event, event_data, event_data_stream) + self.number_of_consumed_events += 1 + + if filter_limit and filter_limit == self.number_of_consumed_events: + break diff --git a/plaso/cli/psort_tool.py b/plaso/cli/psort_tool.py index 098c15b499..e2f61c66aa 100644 --- a/plaso/cli/psort_tool.py +++ b/plaso/cli/psort_tool.py @@ -70,8 +70,8 @@ def __init__(self, input_reader=None, output_writer=None): self._analysis_plugins_output_format = None self._command_line_arguments = None self._deduplicate_events = True - self._event_filter_expression = None self._event_filter = None + self._event_filter_expression = None self._knowledge_base = knowledge_base.KnowledgeBase() self._number_of_analysis_reports = 0 self._output_time_zone = None @@ -567,7 +567,6 @@ def ProcessStorage(self): analysis_engine.AnalyzeEvents( self._knowledge_base, storage_writer, self._data_location, self._analysis_plugins, configuration, - event_filter=self._event_filter, event_filter_expression=self._event_filter_expression, status_update_callback=status_update_callback) diff --git a/plaso/multi_processing/analysis_process.py b/plaso/multi_processing/analysis_process.py index 6dc277c4fc..c0c0c69d6e 100644 --- a/plaso/multi_processing/analysis_process.py +++ b/plaso/multi_processing/analysis_process.py @@ -6,6 +6,7 @@ from plaso.analysis import mediator as analysis_mediator from plaso.containers import tasks from plaso.engine import plaso_queue +from plaso.filters import event_filter from plaso.lib import definitions from plaso.lib import errors from plaso.multi_processing import base_process @@ -49,7 +50,6 @@ def __init__( self._event_queue = event_queue self._foreman_status_wait_event = None self._knowledge_base = knowledge_base - self._number_of_consumed_events = 0 self._status = definitions.STATUS_INDICATOR_INITIALIZED self._storage_writer = storage_writer self._task = None @@ -60,6 +60,11 @@ def _GetStatus(self): Returns: dict[str, object]: status attributes, indexed by name. """ + number_of_consumed_events = 0 + if self._analysis_plugin: + number_of_consumed_events = ( + self._analysis_plugin.number_of_consumed_events) + if self._analysis_mediator: number_of_produced_event_tags = ( self._analysis_mediator.number_of_produced_event_tags) @@ -85,7 +90,7 @@ def _GetStatus(self): 'display_name': '', 'identifier': self._name, 'number_of_consumed_event_tags': None, - 'number_of_consumed_events': self._number_of_consumed_events, + 'number_of_consumed_events': number_of_consumed_events, 'number_of_consumed_reports': None, 'number_of_consumed_sources': None, 'number_of_consumed_warnings': None, @@ -98,11 +103,6 @@ def _GetStatus(self): 'task_identifier': None, 'used_memory': used_memory} - if self._status in ( - definitions.STATUS_INDICATOR_ABORTED, - definitions.STATUS_INDICATOR_COMPLETED): - self._foreman_status_wait_event.set() - return status def _Main(self): @@ -149,30 +149,14 @@ def _Main(self): storage_writer.WriteTaskStart() try: - logger.debug( - '{0!s} (PID: {1:d}) started monitoring event queue.'.format( - self._name, self._pid)) - - while not self._abort: - try: - queued_object = self._event_queue.PopItem() + filter_object = None + if self._event_filter_expression: + filter_object = event_filter.EventObjectFilter() + filter_object.CompileFilter(self._event_filter_expression) - except (errors.QueueClose, errors.QueueEmpty) as exception: - logger.debug('ConsumeItems exiting with exception {0!s}.'.format( - type(exception))) - break - - if isinstance(queued_object, plaso_queue.QueueAbort): - logger.debug('ConsumeItems exiting, dequeued QueueAbort object.') - break - - self._ProcessEvent(self._analysis_mediator, *queued_object) - - self._number_of_consumed_events += 1 - - logger.debug( - '{0!s} (PID: {1:d}) stopped monitoring event queue.'.format( - self._name, self._pid)) + with self._storage_writer.CreateStorageReader() as storage_reader: + self._analysis_plugin.ProcessEventStore( + self._analysis_mediator, storage_reader, event_filter=filter_object) if not self._abort: self._status = definitions.STATUS_INDICATOR_REPORTING @@ -182,6 +166,7 @@ def _Main(self): # All exceptions need to be caught here to prevent the process # from being killed by an uncaught exception. except Exception as exception: # pylint: disable=broad-except + # TODO: write analysis error and change logger to debug only. logger.warning( 'Unhandled exception in process: {0!s} (PID: {1:d}).'.format( self._name, self._pid)) @@ -206,23 +191,38 @@ def _Main(self): logger.warning('Unable to finalize task storage with error: {0!s}'.format( exception)) + if self._serializers_profiler: + self._storage_writer.SetSerializersProfiler(None) + + if self._storage_profiler: + self._storage_writer.SetStorageProfiler(None) + + self._StopProfiling() + if self._abort: self._status = definitions.STATUS_INDICATOR_ABORTED else: self._status = definitions.STATUS_INDICATOR_COMPLETED - self._foreman_status_wait_event.wait(self._FOREMAN_STATUS_WAIT) + while not self._abort: + try: + queued_object = self._event_queue.PopItem() - logger.debug('Analysis plugin: {0!s} (PID: {1:d}) stopped'.format( - self._name, self._pid)) + except (errors.QueueClose, errors.QueueEmpty) as exception: + logger.debug('ConsumeItems exiting with exception {0!s}.'.format( + type(exception))) + break - if self._serializers_profiler: - self._storage_writer.SetSerializersProfiler(None) + if isinstance(queued_object, plaso_queue.QueueAbort): + logger.debug('ConsumeItems exiting, dequeued QueueAbort object.') + break - if self._storage_profiler: - self._storage_writer.SetStorageProfiler(None) + # TODO: is this wait event still needed? + self._foreman_status_wait_event.set() + self._foreman_status_wait_event.wait(self._FOREMAN_STATUS_WAIT) - self._StopProfiling() + logger.debug('Analysis plugin: {0!s} (PID: {1:d}) stopped'.format( + self._name, self._pid)) self._analysis_mediator = None self._foreman_status_wait_event = None @@ -234,26 +234,6 @@ def _Main(self): except errors.QueueAlreadyClosed: logger.error('Queue for {0:s} was already closed.'.format(self.name)) - def _ProcessEvent(self, mediator, event, event_data, event_data_stream): - """Processes an event. - - Args: - mediator (AnalysisMediator): mediates interactions between - analysis plugins and other components, such as storage and dfvfs. - event (EventObject): event. - event_data (EventData): event data. - event_data_stream (EventDataStream): event data stream. - """ - try: - self._analysis_plugin.ExamineEvent( - mediator, event, event_data, event_data_stream) - - except Exception as exception: # pylint: disable=broad-except - # TODO: write analysis error and change logger to debug only. - - logger.warning('Unhandled exception while processing event object.') - logger.exception(exception) - def SignalAbort(self): """Signals the process to abort.""" self._abort = True diff --git a/plaso/multi_processing/psort.py b/plaso/multi_processing/psort.py index 4f52e9e6f0..b49123457d 100644 --- a/plaso/multi_processing/psort.py +++ b/plaso/multi_processing/psort.py @@ -224,7 +224,6 @@ def __init__(self, worker_memory_limit=None, worker_timeout=None): super(PsortMultiProcessEngine, self).__init__() self._analysis_plugins = {} - self._completed_analysis_processes = set() self._data_location = None self._event_filter_expression = None self._event_queues = {} @@ -255,14 +254,13 @@ def __init__(self, worker_memory_limit=None, worker_timeout=None): self._worker_memory_limit = worker_memory_limit self._worker_timeout = worker_timeout - def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None): - """Analyzes events in a plaso storage. + def _AnalyzeEventStore(self, storage_writer, analysis_plugins): + """Analyzes an event store. Args: storage_writer (StorageWriter): storage writer. analysis_plugins (dict[str, AnalysisPlugin]): analysis plugins that should be run and their names. - event_filter (Optional[EventObjectFilter]): event filter. Returns: collections.Counter: counter containing information about the events @@ -281,53 +279,9 @@ def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None): self._number_of_produced_sources = 0 self._number_of_produced_warnings = 0 - number_of_filtered_events = 0 - - logger.debug('Processing events.') - - filter_limit = getattr(event_filter, 'limit', None) - - for event in storage_writer.GetSortedEvents(): - event_data_identifier = event.GetEventDataIdentifier() - event_data = storage_writer.GetEventDataByIdentifier( - event_data_identifier) - - event_data_stream_identifier = event_data.GetEventDataStreamIdentifier() - if event_data_stream_identifier: - event_data_stream = storage_writer.GetEventDataStreamByIdentifier( - event_data_stream_identifier) - else: - event_data_stream = None - - event_identifier = event.GetIdentifier() - event_tag = self._event_tag_index.GetEventTagByIdentifier( - storage_writer, event_identifier) - - if event_filter: - filter_match = event_filter.Match( - event, event_data, event_data_stream, event_tag) - else: - filter_match = None - - # pylint: disable=singleton-comparison - if filter_match == False: - number_of_filtered_events += 1 - continue - - for event_queue in self._event_queues.values(): - # TODO: Check for premature exit of analysis plugins. - event_queue.PushItem((event, event_data, event_data_stream)) - - self._number_of_consumed_events += 1 - - if (event_filter and filter_limit and - filter_limit == self._number_of_consumed_events): - break - - logger.debug('Finished pushing events to analysis plugins.') - # Signal that we have finished adding events. - for event_queue in self._event_queues.values(): - event_queue.PushItem(plaso_queue.QueueAbort(), block=False) + # TODO: change events into Progress and have analysis plugins report + # progress in percentage e.g. x out of y events or x out of y event data + # streams. Have main process report x out of y analysis results merged. logger.debug('Processing analysis plugin results.') @@ -351,6 +305,7 @@ def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None): event_queue = self._event_queues[plugin_name] del self._event_queues[plugin_name] + event_queue.PushItem(plaso_queue.QueueAbort()) event_queue.Close() storage_merge_reader = storage_writer.StartMergeTaskStorage(task) @@ -379,7 +334,6 @@ def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None): logger.debug('Processing completed.') events_counter = collections.Counter() - events_counter['Events filtered'] = number_of_filtered_events events_counter['Events processed'] = self._number_of_consumed_events return events_counter @@ -397,60 +351,50 @@ def _CheckStatusAnalysisProcess(self, pid): # vs management). self._RaiseIfNotRegistered(pid) - if pid in self._completed_analysis_processes: - status_indicator = definitions.STATUS_INDICATOR_COMPLETED - process_status = { - 'processing_status': status_indicator} - used_memory = 0 + process = self._processes_per_pid[pid] + process_status = self._QueryProcessStatus(process) + if process_status is None: + process_is_alive = False else: - process = self._processes_per_pid[pid] - - process_status = self._QueryProcessStatus(process) - if process_status is None: - process_is_alive = False - else: - process_is_alive = True + process_is_alive = True - process_information = self._process_information_per_pid[pid] - used_memory = process_information.GetUsedMemory() or 0 + process_information = self._process_information_per_pid[pid] + used_memory = process_information.GetUsedMemory() or 0 - if self._worker_memory_limit and used_memory > self._worker_memory_limit: - logger.warning(( - 'Process: {0:s} (PID: {1:d}) killed because it exceeded the ' - 'memory limit: {2:d}.').format( - process.name, pid, self._worker_memory_limit)) - self._KillProcess(pid) - - if isinstance(process_status, dict): - self._rpc_errors_per_pid[pid] = 0 - status_indicator = process_status.get('processing_status', None) + if self._worker_memory_limit and used_memory > self._worker_memory_limit: + logger.warning(( + 'Process: {0:s} (PID: {1:d}) killed because it exceeded the ' + 'memory limit: {2:d}.').format( + process.name, pid, self._worker_memory_limit)) + self._KillProcess(pid) - if status_indicator == definitions.STATUS_INDICATOR_COMPLETED: - self._completed_analysis_processes.add(pid) + if isinstance(process_status, dict): + self._rpc_errors_per_pid[pid] = 0 + status_indicator = process_status.get('processing_status', None) - else: - rpc_errors = self._rpc_errors_per_pid.get(pid, 0) + 1 - self._rpc_errors_per_pid[pid] = rpc_errors + else: + rpc_errors = self._rpc_errors_per_pid.get(pid, 0) + 1 + self._rpc_errors_per_pid[pid] = rpc_errors - if rpc_errors > self._MAXIMUM_RPC_ERRORS: - process_is_alive = False + if rpc_errors > self._MAXIMUM_RPC_ERRORS: + process_is_alive = False - if process_is_alive: - rpc_port = process.rpc_port.value - logger.warning(( - 'Unable to retrieve process: {0:s} (PID: {1:d}) status via ' - 'RPC socket: http://localhost:{2:d}').format( - process.name, pid, rpc_port)) + if process_is_alive: + rpc_port = process.rpc_port.value + logger.warning(( + 'Unable to retrieve process: {0:s} (PID: {1:d}) status via ' + 'RPC socket: http://localhost:{2:d}').format( + process.name, pid, rpc_port)) - processing_status_string = 'RPC error' - status_indicator = definitions.STATUS_INDICATOR_RUNNING - else: - processing_status_string = 'killed' - status_indicator = definitions.STATUS_INDICATOR_KILLED + processing_status_string = 'RPC error' + status_indicator = definitions.STATUS_INDICATOR_RUNNING + else: + processing_status_string = 'killed' + status_indicator = definitions.STATUS_INDICATOR_KILLED - process_status = { - 'processing_status': processing_status_string} + process_status = { + 'processing_status': processing_status_string} self._UpdateProcessingStatus(pid, process_status, used_memory) @@ -888,8 +832,8 @@ def _StartWorkerProcess(self, process_name, storage_writer): def AnalyzeEvents( self, knowledge_base_object, storage_writer, data_location, - analysis_plugins, processing_configuration, event_filter=None, - event_filter_expression=None, status_update_callback=None): + analysis_plugins, processing_configuration, event_filter_expression=None, + status_update_callback=None): """Analyzes events in a plaso storage. Args: @@ -902,7 +846,6 @@ def AnalyzeEvents( should be run and their names. processing_configuration (ProcessingConfiguration): processing configuration. - event_filter (Optional[EventObjectFilter]): event filter. event_filter_expression (Optional[str]): event filter expression. status_update_callback (Optional[function]): callback function for status updates. @@ -944,8 +887,7 @@ def AnalyzeEvents( try: storage_writer.WriteSessionConfiguration() - self._AnalyzeEvents( - storage_writer, analysis_plugins, event_filter=event_filter) + self._AnalyzeEventStore(storage_writer, analysis_plugins) self._status = definitions.STATUS_INDICATOR_FINALIZING @@ -992,9 +934,6 @@ def AnalyzeEvents( if keyboard_interrupt: raise KeyboardInterrupt - if keyboard_interrupt: - raise KeyboardInterrupt - def ExportEvents( self, knowledge_base_object, storage_reader, output_module, processing_configuration, deduplicate_events=True, event_filter=None, diff --git a/plaso/storage/file_interface.py b/plaso/storage/file_interface.py index 20afc9b8f4..533da2a3bd 100644 --- a/plaso/storage/file_interface.py +++ b/plaso/storage/file_interface.py @@ -456,20 +456,20 @@ class StorageFileWriter(interface.StorageWriter): """Defines an interface for a file-backed storage writer.""" def __init__( - self, session, output_file, - storage_type=definitions.STORAGE_TYPE_SESSION, task=None): + self, session, path, storage_type=definitions.STORAGE_TYPE_SESSION, + task=None): """Initializes a storage writer. Args: session (Session): session the storage changes are part of. - output_file (str): path to the output file. + path (str): path to the storage file. storage_type (Optional[str]): storage type. task(Optional[Task]): task. """ super(StorageFileWriter, self).__init__( session, storage_type=storage_type, task=task) self._merge_task_storage_path = '' - self._output_file = output_file + self._path = path self._processed_task_storage_path = '' self._storage_file = None self._task_storage_path = None @@ -912,7 +912,7 @@ def Open(self, **unused_kwargs): if self._storage_profiler: self._storage_file.SetStorageProfiler(self._storage_profiler) - self._storage_file.Open(path=self._output_file, read_only=False) + self._storage_file.Open(path=self._path, read_only=False) self._first_written_event_source_index = ( self._storage_file.GetNumberOfEventSources()) @@ -1053,7 +1053,7 @@ def StartTaskStorage(self): if self._task_storage_path: raise IOError('Task storage path already exists.') - output_directory = os.path.dirname(self._output_file) + output_directory = os.path.dirname(self._path) self._task_storage_path = tempfile.mkdtemp(dir=output_directory) self._merge_task_storage_path = os.path.join( diff --git a/plaso/storage/interface.py b/plaso/storage/interface.py index c6b63d984e..42239712ae 100644 --- a/plaso/storage/interface.py +++ b/plaso/storage/interface.py @@ -1208,6 +1208,17 @@ def CheckTaskReadyForMerge(self, task): bool: True if the task is ready to be merged. """ + def CreateStorageReader(self): + """Creates a session storage reader. + + Returns: + StorageReader: storage reader for the session store. + + Raises: + NotImplementedError: since there is no implementation. + """ + raise NotImplementedError() + # pylint: disable=unused-argument def CreateTaskStorage(self, task, task_storage_format): """Creates a task store. diff --git a/plaso/storage/sqlite/sqlite_file.py b/plaso/storage/sqlite/sqlite_file.py index f60dbca435..9fc8ba5657 100644 --- a/plaso/storage/sqlite/sqlite_file.py +++ b/plaso/storage/sqlite/sqlite_file.py @@ -2,6 +2,7 @@ """SQLite-based storage.""" import os +import pathlib import sqlite3 import zlib @@ -1020,8 +1021,21 @@ def Open(self, path=None, read_only=True, **unused_kwargs): path = os.path.abspath(path) - connection = sqlite3.connect( - path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) + try: + path_uri = pathlib.Path(path).as_uri() + if read_only: + path_uri = '{0:s}?mode=ro'.format(path_uri) + + except ValueError: + path_uri = None + + if path_uri: + connection = sqlite3.connect( + path_uri, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES, + uri=True) + else: + connection = sqlite3.connect( + path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) cursor = connection.cursor() if not cursor: diff --git a/plaso/storage/sqlite/writer.py b/plaso/storage/sqlite/writer.py index 68bf91d695..3772c3458b 100644 --- a/plaso/storage/sqlite/writer.py +++ b/plaso/storage/sqlite/writer.py @@ -6,6 +6,7 @@ from plaso.lib import definitions from plaso.storage import file_interface from plaso.storage.sqlite import merge_reader +from plaso.storage.sqlite import reader from plaso.storage.sqlite import sqlite_file from plaso.storage.redis import merge_reader as redis_merge_reader from plaso.storage.redis import writer as redis_writer @@ -15,6 +16,14 @@ class SQLiteStorageFileWriter(file_interface.StorageFileWriter): """SQLite-based storage file writer.""" + def CreateStorageReader(self): + """Creates a session storage reader. + + Returns: + StorageReader: storage reader for the session store. + """ + return reader.SQLiteStorageFileReader(self._path) + def CreateTaskStorage(self, task, task_storage_format): """Creates a task storage. @@ -92,10 +101,10 @@ def _CheckRedisTaskStoreReadyForMerge(self, task): Returns: bool: True if the task is ready to be merged. """ - reader = redis_reader.RedisStorageReader(task) - reader.Open() - is_ready = reader.IsFinalized() - reader.Close() + storage_reader = redis_reader.RedisStorageReader(task) + storage_reader.Open() + is_ready = storage_reader.IsFinalized() + storage_reader.Close() task.storage_file_size = 1000 return is_ready