Skip to content

Commit

Permalink
Added formatters configuration file reader log2timeline#444
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Feb 11, 2020
1 parent 2658c80 commit 36ede14
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 2 deletions.
2 changes: 1 addition & 1 deletion plaso/engine/yaml_filter_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def _ReadFilterDefinition(self, filter_definition_values):
paths=paths)

def _ReadFromFileObject(self, file_object):
"""Reads the path filters from the YAML-based filter file-like object.
"""Reads the path filters from a file-like object.
Args:
file_object (file): filter file-like object.
Expand Down
145 changes: 145 additions & 0 deletions plaso/formatters/yaml_formatters_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# -*- coding: utf-8 -*-
"""YAML-based formatters file."""

from __future__ import unicode_literals

import io
import yaml

from plaso.formatters import interface
from plaso.lib import errors


class YAMLFormattersFile(object):
"""YAML-based formatters file.
A YAML-based formatters file contains one or more event formatters.
type: 'conditional'
data_type: 'fs:stat'
message:
- '{display_name}'
- 'Type: {file_entry_type}'
- '({unallocated})'
short_message:
- '{filename}'
short_source: 'FILE'
source: 'File system'
Where:
* type, defines the formatter data type, which can be "basic" or
"conditional";
* data_type, defines the corresponding event data type;
* message, defines a list of message string pieces;
* short_message, defines the short message string pieces;
* short_source, defines the short source;
* source, defines the source.
"""

_SUPPORTED_KEYS = frozenset([
'data_type',
'message',
'short_message',
'short_source',
'source',
'type'])

def _ReadFormatterDefinition(self, formatter_definition_values):
"""Reads an event formatter definition from a dictionary.
Args:
formatter_definition_values (dict[str, object]): formatter definition
values.
Returns:
EventFormatter: an event formatter.
Raises:
ParseError: if the format of the formatter definition is not set
or incorrect.
"""
if not formatter_definition_values:
raise errors.ParseError('Missing formatter definition values.')

different_keys = set(formatter_definition_values) - self._SUPPORTED_KEYS
if different_keys:
different_keys = ', '.join(different_keys)
raise errors.ParseError('Undefined keys: {0:s}'.format(different_keys))

formatter_type = formatter_definition_values.get('type', None)
if not formatter_type:
raise errors.ParseError(
'Invalid event formatter definition missing type.')

if formatter_type not in ('basic', 'conditional'):
raise errors.ParseError(
'Invalid event formatter definition unsupported type: {0!s}.'.format(
formatter_type))

data_type = formatter_definition_values.get('data_type', None)
if not data_type:
raise errors.ParseError(
'Invalid event formatter definition missing data type.')

message = formatter_definition_values.get('message', None)
if not message:
raise errors.ParseError(
'Invalid event formatter definition missing message.')

short_message = formatter_definition_values.get('short_message', None)
if not short_message:
raise errors.ParseError(
'Invalid event formatter definition missing short message.')

short_source = formatter_definition_values.get('short_source', None)
if not short_source:
raise errors.ParseError(
'Invalid event formatter definition missing short source.')

source = formatter_definition_values.get('source', None)
if not source:
raise errors.ParseError(
'Invalid event formatter definition missing source.')

if formatter_type == 'basic':
formatter = interface.EventFormatter()
# TODO: check if message and short_message are strings
formatter.FORMAT_STRING = message
formatter.FORMAT_STRING_SHORT = short_message

elif formatter_type == 'conditional':
formatter = interface.ConditionalEventFormatter()
# TODO: check if message and short_message are list of strings
formatter.FORMAT_STRING_PIECES = message
formatter.FORMAT_STRING_SHORT_PIECES = short_message

formatter.DATA_TYPE = data_type
formatter.SOURCE_LONG = source
formatter.SOURCE_SHORT = short_source

return formatter

def _ReadFromFileObject(self, file_object):
"""Reads the event formatters from a file-like object.
Args:
file_object (file): formatters file-like object.
Yields:
EventFormatter: event formatters.
"""
yaml_generator = yaml.safe_load_all(file_object)

for yaml_definition in yaml_generator:
yield self._ReadFormatterDefinition(yaml_definition)

def ReadFromFile(self, path):
"""Reads the event formatters from the YAML-based formatters file.
Args:
path (str): path to a formatters file.
Returns:
list[EventFormatter]: event formatters.
"""
with io.open(path, 'r', encoding='utf-8') as file_object:
return list(self._ReadFromFileObject(file_object))
12 changes: 12 additions & 0 deletions test_data/formatters/format_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# YAML-based formatters file for testing format features.

type: 'conditional'
data_type: 'fs:stat'
message:
- '{display_name}'
- 'Type: {file_entry_type}'
- '({unallocated})'
short_message:
- '{filename}'
short_source: 'FILE'
source: 'File system'
2 changes: 1 addition & 1 deletion tests/formatters/init_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class FormattersImportTest(test_lib.ImportCheckTestCase):
_CLI_HELPERS_PATH = os.path.join(os.getcwd(), 'plaso', 'formatters')
_IGNORABLE_FILES = frozenset([
'default.py', 'interface.py', 'logger.py', 'manager.py', 'mediator.py',
'winevt_rc.py'])
'winevt_rc.py', 'yaml_formatters_file.py'])

def testFormattersImported(self):
"""Tests that all parsers are imported."""
Expand Down
114 changes: 114 additions & 0 deletions tests/formatters/yaml_formatters_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the YAML-based formatters file."""

from __future__ import unicode_literals

import io
import unittest

from plaso.formatters import yaml_formatters_file
from plaso.lib import errors

from tests import test_lib as shared_test_lib


class YAMLFormattersFileTest(shared_test_lib.BaseTestCase):
"""Tests for the YAML-based formatters file."""

# pylint: disable=protected-access

def testReadFormatterDefinition(self):
"""Tests the _ReadFormatterDefinition function."""
test_formatters_file = yaml_formatters_file.YAMLFormattersFile()

formatter = test_formatters_file._ReadFormatterDefinition({
'type': 'conditional',
'data_type': 'fs:stat',
'message': [
'{display_name}',
'Type: {file_entry_type}',
'({unallocated})'],
'short_message': [
'{filename}'],
'short_source': 'FILE',
'source': 'File system'})

self.assertIsNotNone(formatter)
self.assertEqual(formatter.DATA_TYPE, 'fs:stat')

with self.assertRaises(errors.ParseError):
test_formatters_file._ReadFormatterDefinition({})

with self.assertRaises(errors.ParseError):
test_formatters_file._ReadFormatterDefinition({'type': 'bogus'})

with self.assertRaises(errors.ParseError):
test_formatters_file._ReadFormatterDefinition({'type': 'conditional'})

with self.assertRaises(errors.ParseError):
test_formatters_file._ReadFormatterDefinition({
'type': 'conditional',
'data_type': 'fs:stat'})

with self.assertRaises(errors.ParseError):
test_formatters_file._ReadFormatterDefinition({
'type': 'conditional',
'data_type': 'fs:stat',
'message': [
'{display_name}',
'Type: {file_entry_type}',
'({unallocated})']})

with self.assertRaises(errors.ParseError):
test_formatters_file._ReadFormatterDefinition({
'type': 'conditional',
'data_type': 'fs:stat',
'message': [
'{display_name}',
'Type: {file_entry_type}',
'({unallocated})'],
'short_message': [
'{filename}']})

with self.assertRaises(errors.ParseError):
test_formatters_file._ReadFormatterDefinition({
'type': 'conditional',
'data_type': 'fs:stat',
'message': [
'{display_name}',
'Type: {file_entry_type}',
'({unallocated})'],
'short_message': [
'{filename}'],
'short_source': 'FILE'})

with self.assertRaises(errors.ParseError):
test_formatters_file._ReadFormatterDefinition({'bogus': 'error'})

def testReadFromFileObject(self):
"""Tests the _ReadFromFileObject function."""
test_path = self._GetTestFilePath(['formatters', 'format_test.yaml'])
self._SkipIfPathNotExists(test_path)

test_formatters_file = yaml_formatters_file.YAMLFormattersFile()
with io.open(test_path, 'r', encoding='utf-8') as file_object:
formatters = list(test_formatters_file._ReadFromFileObject(file_object))

self.assertEqual(len(formatters), 1)

def testReadFromFile(self):
"""Tests the ReadFromFile function."""
test_path = self._GetTestFilePath(['formatters', 'format_test.yaml'])
self._SkipIfPathNotExists(test_path)

test_formatters_file = yaml_formatters_file.YAMLFormattersFile()
formatters = test_formatters_file.ReadFromFile(test_path)

self.assertEqual(len(formatters), 1)

self.assertEqual(formatters[0].DATA_TYPE, 'fs:stat')


if __name__ == '__main__':
unittest.main()

0 comments on commit 36ede14

Please sign in to comment.