Skip to content

Commit

Permalink
Changed analysis plugins to directly read from storage log2timeline#3439
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Jun 19, 2021
1 parent 26742d5 commit 9284bfb
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 166 deletions.
69 changes: 68 additions & 1 deletion plaso/analysis/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,19 @@
from plaso.containers import events
from plaso.containers import reports
from plaso.lib import definitions
from plaso.storage import event_tag_index


class AnalysisPlugin(object):
"""Class that defines the analysis plugin interface."""
"""Class that defines the analysis plugin interface.
Attributes:
number_of_consumed_events (int): number of events consumed by the analysis
plugin.
number_of_filtered_events (int): number of events filtered by the event
filter during analysis.
plugin_type (str): analysis plugin type.
"""

# The name of the plugin. This is the name that is matched against when
# loading plugins, so it is important that this name is short, concise and
Expand All @@ -28,6 +37,10 @@ def __init__(self):
"""Initializes an analysis plugin."""
super(AnalysisPlugin, self).__init__()
self._analysis_counter = collections.Counter()
self._event_tag_index = event_tag_index.EventTagIndex()

self.number_of_consumed_events = 0
self.number_of_filtered_events = 0
self.plugin_type = analysis_definitions.PLUGIN_TYPE_REPORT

@property
Expand Down Expand Up @@ -93,3 +106,57 @@ def ExamineEvent(self, mediator, event, event_data, event_data_stream):
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
"""

def ProcessEventStore(self, mediator, storage_reader, event_filter=None):
"""Analyzes an event store.
Args:
mediator (AnalysisMediator): mediates interactions between
analysis plugins and other components, such as storage and dfvfs.
storage_reader (StorageReader): storage reader.
event_filter (Optional[EventObjectFilter]): event filter.
"""
# TODO: determine if filter_limit makes sense for analysis plugins or
# that it should be removed.
filter_limit = getattr(event_filter, 'limit', None)

# TODO: test if GetEvents is faster for analysis plugins that do not
# require the events to be in chronological order.
# if event_filter:
# event_generator = storage_reader.GetSortedEvents()
# else:
# event_generator = storage_reader.GetEvents()

for event in storage_reader.GetSortedEvents():
if mediator.abort:
break

event_data_identifier = event.GetEventDataIdentifier()
event_data = storage_reader.GetEventDataByIdentifier(
event_data_identifier)

event_data_stream_identifier = event_data.GetEventDataStreamIdentifier()
event_data_stream = None
if event_data_stream_identifier:
event_data_stream = storage_reader.GetEventDataStreamByIdentifier(
event_data_stream_identifier)

event_identifier = event.GetIdentifier()
event_tag = self._event_tag_index.GetEventTagByIdentifier(
storage_reader, event_identifier)

filter_match = None
if event_filter:
filter_match = event_filter.Match(
event, event_data, event_data_stream, event_tag)

# pylint: disable=singleton-comparison
if filter_match == False:
self.number_of_filtered_events += 1
continue

self.ExamineEvent(mediator, event, event_data, event_data_stream)
self.number_of_consumed_events += 1

if filter_limit and filter_limit == self.number_of_consumed_events:
break
3 changes: 1 addition & 2 deletions plaso/cli/psort_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ def __init__(self, input_reader=None, output_writer=None):
self._analysis_plugins_output_format = None
self._command_line_arguments = None
self._deduplicate_events = True
self._event_filter_expression = None
self._event_filter = None
self._event_filter_expression = None
self._knowledge_base = knowledge_base.KnowledgeBase()
self._number_of_analysis_reports = 0
self._preferred_language = 'en-US'
Expand Down Expand Up @@ -527,7 +527,6 @@ def ProcessStorage(self):
analysis_engine.AnalyzeEvents(
session, self._knowledge_base, storage_writer, self._data_location,
self._analysis_plugins, configuration,
event_filter=self._event_filter,
event_filter_expression=self._event_filter_expression,
status_update_callback=status_update_callback,
storage_file_path=self._storage_file_path)
Expand Down
158 changes: 53 additions & 105 deletions plaso/multi_process/analysis_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def __init__(self, worker_memory_limit=None, worker_timeout=None):

super(AnalysisMultiProcessEngine, self).__init__()
self._analysis_plugins = {}
self._completed_analysis_processes = set()
self._data_location = None
self._event_filter_expression = None
self._event_queues = {}
Expand All @@ -74,14 +73,13 @@ def __init__(self, worker_memory_limit=None, worker_timeout=None):
self._worker_memory_limit = worker_memory_limit
self._worker_timeout = worker_timeout or definitions.DEFAULT_WORKER_TIMEOUT

def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None):
"""Analyzes events in a plaso storage.
def _AnalyzeEventStore(self, storage_writer, analysis_plugins):
"""Analyzes an event store.
Args:
storage_writer (StorageWriter): storage writer.
analysis_plugins (dict[str, AnalysisPlugin]): analysis plugins that
should be run and their names.
event_filter (Optional[EventObjectFilter]): event filter.
Returns:
collections.Counter: counter containing information about the events
Expand All @@ -102,60 +100,25 @@ def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None):
self._number_of_produced_event_tags = 0
self._number_of_produced_sources = 0

number_of_filtered_events = 0

logger.debug('Processing events.')

filter_limit = getattr(event_filter, 'limit', None)

for event in storage_writer.GetSortedEvents():
event_data_identifier = event.GetEventDataIdentifier()
event_data = storage_writer.GetEventDataByIdentifier(
event_data_identifier)

event_data_stream_identifier = event_data.GetEventDataStreamIdentifier()
if event_data_stream_identifier:
event_data_stream = storage_writer.GetEventDataStreamByIdentifier(
event_data_stream_identifier)
else:
event_data_stream = None

event_identifier = event.GetIdentifier()
event_tag = self._event_tag_index.GetEventTagByIdentifier(
storage_writer, event_identifier)

if event_filter:
filter_match = event_filter.Match(
event, event_data, event_data_stream, event_tag)
else:
filter_match = None

# pylint: disable=singleton-comparison
if filter_match == False:
number_of_filtered_events += 1
continue

for event_queue in self._event_queues.values():
# TODO: Check for premature exit of analysis plugins.
event_queue.PushItem((event, event_data, event_data_stream))

self._number_of_consumed_events += 1

if (event_filter and filter_limit and
filter_limit == self._number_of_consumed_events):
break

logger.debug('Finished pushing events to analysis plugins.')
# Signal that we have finished adding events.
for event_queue in self._event_queues.values():
event_queue.PushItem(plaso_queue.QueueAbort(), block=False)
# TODO: change events into Progress and have analysis plugins report
# progress in percentage e.g. x out of y events or x out of y event data
# streams. Have main process report x out of y analysis results merged.

logger.debug('Processing analysis plugin results.')

# TODO: use a task based approach.
plugin_names = list(analysis_plugins.keys())
while plugin_names:
for plugin_name in list(plugin_names):

analysis_tasks = []
for plugin_name in analysis_plugins.keys():
task = tasks.Task()
task.storage_format = definitions.STORAGE_FORMAT_SQLITE
task.identifier = plugin_name

analysis_tasks.append(task)

while analysis_tasks:
for task in list(analysis_tasks):
if self._abort:
break

Expand All @@ -174,6 +137,7 @@ def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None):
event_queue = self._event_queues[plugin_name]
del self._event_queues[plugin_name]

event_queue.PushItem(plaso_queue.QueueAbort())
event_queue.Close()

storage_merge_reader = self._StartMergeTaskStorage(
Expand All @@ -182,7 +146,7 @@ def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None):
storage_merge_reader.MergeAttributeContainers(
callback=self._MergeEventTag)
# TODO: temporary solution.
plugin_names.remove(plugin_name)
analysis_tasks.remove(task)

self._status = definitions.STATUS_INDICATOR_RUNNING

Expand All @@ -204,7 +168,6 @@ def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None):
logger.debug('Processing completed.')

events_counter = collections.Counter()
events_counter['Events filtered'] = number_of_filtered_events
events_counter['Events processed'] = self._number_of_consumed_events

return events_counter
Expand All @@ -222,60 +185,50 @@ def _CheckStatusAnalysisProcess(self, pid):
# vs management).
self._RaiseIfNotRegistered(pid)

if pid in self._completed_analysis_processes:
status_indicator = definitions.STATUS_INDICATOR_COMPLETED
process_status = {
'processing_status': status_indicator}
used_memory = 0
process = self._processes_per_pid[pid]

process_status = self._QueryProcessStatus(process)
if process_status is None:
process_is_alive = False
else:
process = self._processes_per_pid[pid]
process_is_alive = True

process_status = self._QueryProcessStatus(process)
if process_status is None:
process_is_alive = False
else:
process_is_alive = True

process_information = self._process_information_per_pid[pid]
used_memory = process_information.GetUsedMemory() or 0

if self._worker_memory_limit and used_memory > self._worker_memory_limit:
logger.warning((
'Process: {0:s} (PID: {1:d}) killed because it exceeded the '
'memory limit: {2:d}.').format(
process.name, pid, self._worker_memory_limit))
self._KillProcess(pid)
process_information = self._process_information_per_pid[pid]
used_memory = process_information.GetUsedMemory() or 0

if isinstance(process_status, dict):
self._rpc_errors_per_pid[pid] = 0
status_indicator = process_status.get('processing_status', None)
if self._worker_memory_limit and used_memory > self._worker_memory_limit:
logger.warning((
'Process: {0:s} (PID: {1:d}) killed because it exceeded the '
'memory limit: {2:d}.').format(
process.name, pid, self._worker_memory_limit))
self._KillProcess(pid)

if status_indicator == definitions.STATUS_INDICATOR_COMPLETED:
self._completed_analysis_processes.add(pid)
if isinstance(process_status, dict):
self._rpc_errors_per_pid[pid] = 0
status_indicator = process_status.get('processing_status', None)

else:
rpc_errors = self._rpc_errors_per_pid.get(pid, 0) + 1
self._rpc_errors_per_pid[pid] = rpc_errors
else:
rpc_errors = self._rpc_errors_per_pid.get(pid, 0) + 1
self._rpc_errors_per_pid[pid] = rpc_errors

if rpc_errors > self._MAXIMUM_RPC_ERRORS:
process_is_alive = False
if rpc_errors > self._MAXIMUM_RPC_ERRORS:
process_is_alive = False

if process_is_alive:
rpc_port = process.rpc_port.value
logger.warning((
'Unable to retrieve process: {0:s} (PID: {1:d}) status via '
'RPC socket: http://localhost:{2:d}').format(
process.name, pid, rpc_port))
if process_is_alive:
rpc_port = process.rpc_port.value
logger.warning((
'Unable to retrieve process: {0:s} (PID: {1:d}) status via '
'RPC socket: http://localhost:{2:d}').format(
process.name, pid, rpc_port))

processing_status_string = 'RPC error'
status_indicator = definitions.STATUS_INDICATOR_RUNNING
else:
processing_status_string = 'killed'
status_indicator = definitions.STATUS_INDICATOR_KILLED
processing_status_string = 'RPC error'
status_indicator = definitions.STATUS_INDICATOR_RUNNING
else:
processing_status_string = 'killed'
status_indicator = definitions.STATUS_INDICATOR_KILLED

process_status = {
'processing_status': processing_status_string}
process_status = {
'processing_status': processing_status_string}

self._UpdateProcessingStatus(pid, process_status, used_memory)

Expand Down Expand Up @@ -552,7 +505,6 @@ def AnalyzeEvents(
should be run and their names.
processing_configuration (ProcessingConfiguration): processing
configuration.
event_filter (Optional[EventObjectFilter]): event filter.
event_filter_expression (Optional[str]): event filter expression.
status_update_callback (Optional[function]): callback function for status
updates.
Expand Down Expand Up @@ -599,8 +551,7 @@ def AnalyzeEvents(
try:
storage_writer.WriteSessionConfiguration()

self._AnalyzeEvents(
storage_writer, analysis_plugins, event_filter=event_filter)
self._AnalyzeEventStore(storage_writer, analysis_plugins)

self._status = definitions.STATUS_INDICATOR_FINALIZING

Expand Down Expand Up @@ -661,6 +612,3 @@ def AnalyzeEvents(

if keyboard_interrupt:
raise KeyboardInterrupt

if keyboard_interrupt:
raise KeyboardInterrupt
Loading

0 comments on commit 9284bfb

Please sign in to comment.