-
Notifications
You must be signed in to change notification settings - Fork 364
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added formatters configuration file reader #444
- Loading branch information
1 parent
5157939
commit d6fe5cf
Showing
5 changed files
with
273 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
# -*- coding: utf-8 -*- | ||
"""YAML-based formatters file.""" | ||
|
||
from __future__ import unicode_literals | ||
|
||
import io | ||
import yaml | ||
|
||
from plaso.formatters import interface | ||
from plaso.lib import errors | ||
|
||
|
||
class YAMLFormattersFile(object): | ||
"""YAML-based formatters file. | ||
A YAML-based formatters file contains one or more event formatters. | ||
type: 'conditional' | ||
data_type: 'fs:stat' | ||
message: | ||
- '{display_name}' | ||
- 'Type: {file_entry_type}' | ||
- '({unallocated})' | ||
short_message: | ||
- '{filename}' | ||
short_source: 'FILE' | ||
source: 'File system' | ||
Where: | ||
* type, defines the formatter data type, which can be "basic" or | ||
"conditional"; | ||
* data_type, defines the corresponding event data type; | ||
* message, defines a list of message string pieces; | ||
* short_message, defines the short message string pieces; | ||
* short_source, defines the short source; | ||
* source, defines the source. | ||
""" | ||
|
||
_SUPPORTED_KEYS = frozenset([ | ||
'data_type', | ||
'message', | ||
'short_message', | ||
'short_source', | ||
'source', | ||
'type']) | ||
|
||
def _ReadFormatterDefinition(self, formatter_definition_values): | ||
"""Reads an event formatter definition from a dictionary. | ||
Args: | ||
formatter_definition_values (dict[str, object]): formatter definition | ||
values. | ||
Returns: | ||
EventFormatter: an event formatter. | ||
Raises: | ||
ParseError: if the format of the formatter definition is not set | ||
or incorrect. | ||
""" | ||
if not formatter_definition_values: | ||
raise errors.ParseError('Missing formatter definition values.') | ||
|
||
different_keys = set(formatter_definition_values) - self._SUPPORTED_KEYS | ||
if different_keys: | ||
different_keys = ', '.join(different_keys) | ||
raise errors.ParseError('Undefined keys: {0:s}'.format(different_keys)) | ||
|
||
formatter_type = formatter_definition_values.get('type', None) | ||
if not formatter_type: | ||
raise errors.ParseError( | ||
'Invalid event formatter definition missing type.') | ||
|
||
if formatter_type not in ('basic', 'conditional'): | ||
raise errors.ParseError( | ||
'Invalid event formatter definition unsupported type: {0!s}.'.format( | ||
formatter_type)) | ||
|
||
data_type = formatter_definition_values.get('data_type', None) | ||
if not data_type: | ||
raise errors.ParseError( | ||
'Invalid event formatter definition missing data type.') | ||
|
||
message = formatter_definition_values.get('message', None) | ||
if not message: | ||
raise errors.ParseError( | ||
'Invalid event formatter definition missing message.') | ||
|
||
short_message = formatter_definition_values.get('short_message', None) | ||
if not short_message: | ||
raise errors.ParseError( | ||
'Invalid event formatter definition missing short message.') | ||
|
||
short_source = formatter_definition_values.get('short_source', None) | ||
if not short_source: | ||
raise errors.ParseError( | ||
'Invalid event formatter definition missing short source.') | ||
|
||
source = formatter_definition_values.get('source', None) | ||
if not source: | ||
raise errors.ParseError( | ||
'Invalid event formatter definition missing source.') | ||
|
||
if formatter_type == 'basic': | ||
formatter = interface.EventFormatter() | ||
# TODO: check if message and short_message are strings | ||
formatter.FORMAT_STRING = message | ||
formatter.FORMAT_STRING_SHORT = short_message | ||
|
||
elif formatter_type == 'conditional': | ||
formatter = interface.ConditionalEventFormatter() | ||
# TODO: check if message and short_message are list of strings | ||
formatter.FORMAT_STRING_PIECES = message | ||
formatter.FORMAT_STRING_SHORT_PIECES = short_message | ||
|
||
formatter.DATA_TYPE = data_type | ||
formatter.SOURCE_LONG = source | ||
formatter.SOURCE_SHORT = short_source | ||
|
||
return formatter | ||
|
||
def _ReadFromFileObject(self, file_object): | ||
"""Reads the event formatters from a file-like object. | ||
Args: | ||
file_object (file): formatters file-like object. | ||
Yields: | ||
EventFormatter: event formatters. | ||
""" | ||
yaml_generator = yaml.safe_load_all(file_object) | ||
|
||
for yaml_definition in yaml_generator: | ||
yield self._ReadFormatterDefinition(yaml_definition) | ||
|
||
def ReadFromFile(self, path): | ||
"""Reads the event formatters from the YAML-based formatters file. | ||
Args: | ||
path (str): path to a formatters file. | ||
Returns: | ||
list[EventFormatter]: event formatters. | ||
""" | ||
with io.open(path, 'r', encoding='utf-8') as file_object: | ||
return list(self._ReadFromFileObject(file_object)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
# YAML-based formatters file for testing format features. | ||
|
||
type: 'conditional' | ||
data_type: 'fs:stat' | ||
message: | ||
- '{display_name}' | ||
- 'Type: {file_entry_type}' | ||
- '({unallocated})' | ||
short_message: | ||
- '{filename}' | ||
short_source: 'FILE' | ||
source: 'File system' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
#!/usr/bin/env python3 | ||
# -*- coding: utf-8 -*- | ||
"""Tests for the YAML-based formatters file.""" | ||
|
||
from __future__ import unicode_literals | ||
|
||
import io | ||
import unittest | ||
|
||
from plaso.formatters import yaml_formatters_file | ||
from plaso.lib import errors | ||
|
||
from tests import test_lib as shared_test_lib | ||
|
||
|
||
class YAMLFormattersFileTest(shared_test_lib.BaseTestCase): | ||
"""Tests for the YAML-based formatters file.""" | ||
|
||
# pylint: disable=protected-access | ||
|
||
def testReadFormatterDefinition(self): | ||
"""Tests the _ReadFormatterDefinition function.""" | ||
test_formatters_file = yaml_formatters_file.YAMLFormattersFile() | ||
|
||
formatter = test_formatters_file._ReadFormatterDefinition({ | ||
'type': 'conditional', | ||
'data_type': 'fs:stat', | ||
'message': [ | ||
'{display_name}', | ||
'Type: {file_entry_type}', | ||
'({unallocated})'], | ||
'short_message': [ | ||
'{filename}'], | ||
'short_source': 'FILE', | ||
'source': 'File system'}) | ||
|
||
self.assertIsNotNone(formatter) | ||
self.assertEqual(formatter.DATA_TYPE, 'fs:stat') | ||
|
||
with self.assertRaises(errors.ParseError): | ||
test_formatters_file._ReadFormatterDefinition({}) | ||
|
||
with self.assertRaises(errors.ParseError): | ||
test_formatters_file._ReadFormatterDefinition({'type': 'bogus'}) | ||
|
||
with self.assertRaises(errors.ParseError): | ||
test_formatters_file._ReadFormatterDefinition({'type': 'conditional'}) | ||
|
||
with self.assertRaises(errors.ParseError): | ||
test_formatters_file._ReadFormatterDefinition({ | ||
'type': 'conditional', | ||
'data_type': 'fs:stat'}) | ||
|
||
with self.assertRaises(errors.ParseError): | ||
test_formatters_file._ReadFormatterDefinition({ | ||
'type': 'conditional', | ||
'data_type': 'fs:stat', | ||
'message': [ | ||
'{display_name}', | ||
'Type: {file_entry_type}', | ||
'({unallocated})']}) | ||
|
||
with self.assertRaises(errors.ParseError): | ||
test_formatters_file._ReadFormatterDefinition({ | ||
'type': 'conditional', | ||
'data_type': 'fs:stat', | ||
'message': [ | ||
'{display_name}', | ||
'Type: {file_entry_type}', | ||
'({unallocated})'], | ||
'short_message': [ | ||
'{filename}']}) | ||
|
||
with self.assertRaises(errors.ParseError): | ||
test_formatters_file._ReadFormatterDefinition({ | ||
'type': 'conditional', | ||
'data_type': 'fs:stat', | ||
'message': [ | ||
'{display_name}', | ||
'Type: {file_entry_type}', | ||
'({unallocated})'], | ||
'short_message': [ | ||
'{filename}'], | ||
'short_source': 'FILE'}) | ||
|
||
with self.assertRaises(errors.ParseError): | ||
test_formatters_file._ReadFormatterDefinition({'bogus': 'error'}) | ||
|
||
@shared_test_lib.skipUnlessHasTestFile(['formatters', 'format_test.yaml']) | ||
def testReadFromFileObject(self): | ||
"""Tests the _ReadFromFileObject function.""" | ||
test_formatters_file = yaml_formatters_file.YAMLFormattersFile() | ||
|
||
test_path = self._GetTestFilePath(['formatters', 'format_test.yaml']) | ||
with io.open(test_path, 'r', encoding='utf-8') as file_object: | ||
formatters = list(test_formatters_file._ReadFromFileObject(file_object)) | ||
|
||
self.assertEqual(len(formatters), 1) | ||
|
||
@shared_test_lib.skipUnlessHasTestFile(['formatters', 'format_test.yaml']) | ||
def testReadFromFile(self): | ||
"""Tests the ReadFromFile function.""" | ||
test_formatters_file = yaml_formatters_file.YAMLFormattersFile() | ||
|
||
test_path = self._GetTestFilePath(['formatters', 'format_test.yaml']) | ||
formatters = test_formatters_file.ReadFromFile(test_path) | ||
|
||
self.assertEqual(len(formatters), 1) | ||
|
||
self.assertEqual(formatters[0].DATA_TYPE, 'fs:stat') | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |