Skip to content

Commit

Permalink
Added memory test analysis plugin (#3307)
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz authored Feb 6, 2021
1 parent 9e8fa18 commit 7904d4d
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 14 deletions.
1 change: 1 addition & 0 deletions plaso/analysis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from plaso.analysis import nsrlsvr
from plaso.analysis import sessionize
from plaso.analysis import tagging
from plaso.analysis import test_memory
from plaso.analysis import unique_domains_visited
from plaso.analysis import viper
from plaso.analysis import virustotal
Expand Down
3 changes: 3 additions & 0 deletions plaso/analysis/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ class AnalysisPlugin(object):
# explains the nature of the plugin easily. It also needs to be unique.
NAME = 'analysis_plugin'

# Flag to indicate the analysis is for testing purposes only.
TEST_PLUGIN = False

def __init__(self):
"""Initializes an analysis plugin."""
super(AnalysisPlugin, self).__init__()
Expand Down
13 changes: 7 additions & 6 deletions plaso/analysis/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ def GetAllPluginInformation(cls):
"""
results = []
for plugin_class in cls._plugin_classes.values():
plugin_object = plugin_class()
# TODO: Use a specific description variable, not the docstring.
doc_string, _, _ = plugin_class.__doc__.partition('\n')
type_string = cls._PLUGIN_TYPE_STRINGS.get(plugin_object.plugin_type)
information_tuple = (plugin_object.NAME, doc_string, type_string)
results.append(information_tuple)
if not plugin_class.TEST_PLUGIN:
plugin_object = plugin_class()
# TODO: Use a specific description variable, not the docstring.
doc_string, _, _ = plugin_class.__doc__.partition('\n')
type_string = cls._PLUGIN_TYPE_STRINGS.get(plugin_object.plugin_type)
information_tuple = (plugin_object.NAME, doc_string, type_string)
results.append(information_tuple)

return sorted(results)

Expand Down
49 changes: 49 additions & 0 deletions plaso/analysis/test_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
"""Analysis plugin for testing exceeding memory consumption."""

from __future__ import unicode_literals

from plaso.analysis import interface
from plaso.analysis import manager
from plaso.containers import reports


class TestMemoryAnalysisPlugin(interface.AnalysisPlugin):
"""Analysis plugin for testing memory consumption."""

NAME = 'test_memory'

TEST_PLUGIN = True

def __init__(self):
"""Initializes an analysis plugin for testing memory consumption."""
super(TestMemoryAnalysisPlugin, self).__init__()
self._objects = []

def CompileReport(self, mediator):
"""Compiles an analysis report.
Args:
mediator (AnalysisMediator): mediates interactions between
analysis plugins and other components, such as storage and dfvfs.
Returns:
AnalysisReport: analysis report.
"""
return reports.AnalysisReport(
plugin_name=self.NAME, text='TestMemory report')

def ExamineEvent(self, mediator, event, event_data, event_data_stream):
"""Analyzes an event.
Args:
mediator (AnalysisMediator): mediates interactions between
analysis plugins and other components, such as storage and dfvfs.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
"""
self._objects.append(list(range(1024)))


manager.AnalysisPluginManager.RegisterPlugin(TestMemoryAnalysisPlugin)
30 changes: 22 additions & 8 deletions plaso/multi_processing/psort.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from plaso.engine import zeromq_queue
from plaso.lib import bufferlib
from plaso.lib import definitions
from plaso.lib import errors
from plaso.multi_processing import analysis_process
from plaso.multi_processing import engine as multi_process_engine
from plaso.multi_processing import logger
Expand Down Expand Up @@ -916,7 +917,9 @@ def AnalyzeEvents(
if not analysis_plugins:
return

abort_kill = False
keyboard_interrupt = False
queue_full = False

self._analysis_plugins = {}
self._data_location = data_location
Expand Down Expand Up @@ -952,15 +955,20 @@ def AnalyzeEvents(

self._status = definitions.STATUS_INDICATOR_FINALIZING

except errors.QueueFull:
queue_full = True
self._abort = True

except KeyboardInterrupt:
keyboard_interrupt = True
self._abort = True

self._processing_status.aborted = True
if self._status_update_callback:
self._status_update_callback(self._processing_status)

finally:
if self._abort:
self._processing_status.aborted = True
if self._status_update_callback:
self._status_update_callback(self._processing_status)

storage_writer.WriteSessionCompletion(aborted=self._abort)

storage_writer.Close()
Expand All @@ -970,12 +978,18 @@ def AnalyzeEvents(
# so we include the storage sync to disk in the status updates.
self._StopStatusUpdateThread()

try:
self._StopAnalysisProcesses(abort=self._abort)
if queue_full:
# TODO: handle abort on queue full more elegant.
abort_kill = True
else:
try:
self._StopAnalysisProcesses(abort=self._abort)

except KeyboardInterrupt:
keyboard_interrupt = True
except KeyboardInterrupt:
keyboard_interrupt = True
abort_kill = True

if abort_kill:
self._AbortKill()

# The abort can leave the main process unresponsive
Expand Down

0 comments on commit 7904d4d

Please sign in to comment.