From 0e708f480173a638d35d3a1901ae5b2f80531a3b Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Sat, 6 Feb 2021 08:32:16 +0100 Subject: [PATCH] Changes to text parser to handle decode errors #3301 (#3302) --- plaso/parsers/text_parser.py | 42 ++++++++- tests/parsers/text_parser.py | 170 +++++++++++++++++++++++++++++++++-- 2 files changed, 201 insertions(+), 11 deletions(-) diff --git a/plaso/parsers/text_parser.py b/plaso/parsers/text_parser.py index 209ee9d559..8f06c1e7e9 100644 --- a/plaso/parsers/text_parser.py +++ b/plaso/parsers/text_parser.py @@ -7,6 +7,7 @@ """ import abc +import codecs import pyparsing @@ -177,6 +178,34 @@ def __init__(self): # TODO: self._line_structures is a work-around and this needs # a structural fix. self._line_structures = list(self.LINE_STRUCTURES) + self._parser_mediator = None + + codecs.register_error('text_parser_handler', self._EncodingErrorHandler) + + def _EncodingErrorHandler(self, exception): + """Encoding error handler. + + Args: + exception [UnicodeDecodeError]: exception. + + Returns: + tuple[str, int]: replacement string and a position where encoding should + continue. + + Raises: + TypeError: if exception is not of type UnicodeDecodeError. + """ + if not isinstance(exception, UnicodeDecodeError): + raise TypeError('Unsupported exception type.') + + if self._parser_mediator: + self._parser_mediator.ProduceExtractionWarning( + 'error decoding 0x{0:02x} at offset: {1:d}'.format( + exception.object[exception.start], + self._current_offset + exception.start)) + + escaped = '\\x{0:2x}'.format(exception.object[exception.start]) + return (escaped, exception.start + 1) def _GetValueFromStructure(self, structure, name, default_value=None): """Retrieves a token value from a Pyparsing structure. @@ -279,7 +308,7 @@ def _ReadLine(self, text_file_object, max_len=None, depth=0): Raises: UnicodeDecodeError: if the text cannot be decoded using the specified - encoding. + encoding and encoding errors is set to strict. """ line = text_file_object.readline(size=max_len) @@ -312,6 +341,10 @@ def ParseFileObject(self, parser_mediator, file_object): 'Line structure undeclared, unable to proceed.') encoding = self._ENCODING or parser_mediator.codepage + + # Use strict encoding error handling in the verification step so that + # a text parser does not generate extraction warning for encoding errors + # of unsupported files. text_file_object = text_file.TextFile(file_object, encoding=encoding) try: @@ -337,6 +370,12 @@ def ParseFileObject(self, parser_mediator, file_object): if not self.VerifyStructure(parser_mediator, line): raise errors.UnableToParseFile('Wrong file structure.') + self._parser_mediator = parser_mediator + + text_file_object = text_file.TextFile( + file_object, encoding=encoding, encoding_errors='text_parser_handler') + line = self._ReadLine(text_file_object, max_len=self.MAX_LINE_LENGTH) + consecutive_line_failures = 0 index = None # Set the offset to the beginning of the file. @@ -570,7 +609,6 @@ def ParseFileObject(self, parser_mediator, file_object): for key, structure in self.LINE_STRUCTURES: structure.parseWithTabs() - consecutive_line_failures = 0 # Read every line in the text file. while text_reader.lines: diff --git a/tests/parsers/text_parser.py b/tests/parsers/text_parser.py index c04e591f7c..5af577b001 100644 --- a/tests/parsers/text_parser.py +++ b/tests/parsers/text_parser.py @@ -2,15 +2,62 @@ # -*- coding: utf-8 -*- """This file contains the tests for the generic text parser.""" +import codecs import unittest import pyparsing +from dfvfs.file_io import fake_file_io +from dfvfs.helpers import text_file as dfvfs_text_file +from dfvfs.path import fake_path_spec +from dfvfs.resolver import context as dfvfs_context + from plaso.parsers import text_parser from tests.parsers import test_lib +class TestPyparsingSingleLineTextParser( + text_parser.PyparsingSingleLineTextParser): + """Single line PyParsing-based text parser for testing purposes.""" + + _ENCODING = 'utf-8' + + _LINE = pyparsing.Regex('.*') + pyparsing.lineEnd() + + LINE_STRUCTURES = [('line', _LINE)] + + def ParseRecord(self, parser_mediator, key, structure): + """Parses a log record structure and produces events. + + This function takes as an input a parsed pyparsing structure + and produces an EventObject if possible from that structure. + + Args: + parser_mediator (ParserMediator): mediates interactions between parsers + and other components, such as storage and dfvfs. + key (str): name of the parsed structure. + structure (pyparsing.ParseResults): tokens from a parsed log line. + """ + return + + def VerifyStructure(self, parser_mediator, line): + """Verify the structure of the file and return boolean based on that check. + + This function should read enough text from the text file to confirm + that the file is the correct one for this particular parser. + + Args: + parser_mediator (ParserMediator): mediates interactions between parsers + and other components, such as storage and dfvfs. + line (str): single line from the text file. + + Returns: + bool: True if this is the correct parser, False otherwise. + """ + return True + + class PyparsingConstantsTest(test_lib.ParserTestCase): """Tests the PyparsingConstants text parser.""" @@ -52,32 +99,137 @@ def testConstantIPv4(self): text_parser.PyparsingConstants.IPV4_ADDRESS.parseString('34.258') -class PyparsingSingleLineTextParserTest(unittest.TestCase): +class PyparsingSingleLineTextParserTest(test_lib.ParserTestCase): """Tests for the single line PyParsing-based text parser.""" - # pylint: disable=protected-access + # pylint: disable=attribute-defined-outside-init,protected-access + + def _EncodingErrorHandler(self, exception): + """Encoding error handler. + + Args: + exception [UnicodeDecodeError]: exception. + + Returns: + tuple[str, int]: replacement string and number of bytes to skip. + + Raises: + TypeError: if exception is not of type UnicodeDecodeError. + """ + if not isinstance(exception, UnicodeDecodeError): + raise TypeError('Unsupported exception type.') + + self._encoding_errors.append( + (exception.start, exception.object[exception.start])) + escaped = '\\x{0:2x}'.format(exception.object[exception.start]) + return (escaped, exception.start + 1) def testIsText(self): """Tests the _IsText function.""" - parser = text_parser.PyparsingSingleLineTextParser() + test_parser = TestPyparsingSingleLineTextParser() bytes_in = b'this is My Weird ASCII and non whatever string.' - self.assertTrue(parser._IsText(bytes_in)) + self.assertTrue(test_parser._IsText(bytes_in)) bytes_in = 'Plaso Síar Og Raðar Þessu' - self.assertTrue(parser._IsText(bytes_in)) + self.assertTrue(test_parser._IsText(bytes_in)) bytes_in = b'\x01\\62LSO\xFF' - self.assertFalse(parser._IsText(bytes_in)) + self.assertFalse(test_parser._IsText(bytes_in)) bytes_in = b'T\x00h\x00i\x00s\x00\x20\x00' - self.assertTrue(parser._IsText(bytes_in)) + self.assertTrue(test_parser._IsText(bytes_in)) bytes_in = b'Ascii\x00' - self.assertTrue(parser._IsText(bytes_in)) + self.assertTrue(test_parser._IsText(bytes_in)) bytes_in = b'Ascii Open then...\x00\x99\x23' - self.assertFalse(parser._IsText(bytes_in)) + self.assertFalse(test_parser._IsText(bytes_in)) + + def testReadLine(self): + """Tests the _ReadLine function.""" + resolver_context = dfvfs_context.Context() + + test_path_spec = fake_path_spec.FakePathSpec(location='/file.txt') + data = b'This is another file.' + file_object = fake_file_io.FakeFile(resolver_context, test_path_spec, data) + file_object.Open() + + test_parser = TestPyparsingSingleLineTextParser() + test_text_file = dfvfs_text_file.TextFile(file_object, encoding='utf-8') + line = test_parser._ReadLine(test_text_file) + self.assertEqual(line, 'This is another file.') + + test_path_spec = fake_path_spec.FakePathSpec(location='/file.txt') + data = b'This is an\xbather file.' + file_object = fake_file_io.FakeFile(resolver_context, test_path_spec, data) + file_object.Open() + + test_parser = TestPyparsingSingleLineTextParser() + test_text_file = dfvfs_text_file.TextFile(file_object, encoding='utf8') + with self.assertRaises(UnicodeDecodeError): + test_parser._ReadLine(test_text_file) + + test_path_spec = fake_path_spec.FakePathSpec(location='/file.txt') + data = b'This is an\xbather file.' + file_object = fake_file_io.FakeFile(resolver_context, test_path_spec, data) + file_object.Open() + + test_parser = TestPyparsingSingleLineTextParser() + test_text_file = dfvfs_text_file.TextFile( + file_object, encoding='utf8', encoding_errors='replace') + line = test_parser._ReadLine(test_text_file) + self.assertEqual(line, 'This is an\ufffdther file.') + + self._encoding_errors = [] + codecs.register_error('test_handler', self._EncodingErrorHandler) + + test_path_spec = fake_path_spec.FakePathSpec(location='/file.txt') + data = b'This is an\xbather file.' + file_object = fake_file_io.FakeFile(resolver_context, test_path_spec, data) + file_object.Open() + + test_parser = TestPyparsingSingleLineTextParser() + test_text_file = dfvfs_text_file.TextFile( + file_object, encoding='utf8', encoding_errors='test_handler') + line = test_parser._ReadLine(test_text_file) + self.assertEqual(line, 'This is an\\xbather file.') + + self.assertEqual(len(self._encoding_errors), 1) + self.assertEqual(self._encoding_errors[0], (10, 0xba)) + + def testParseFileObject(self): + """Tests the ParseFileObject function.""" + storage_writer = self._CreateStorageWriter() + parser_mediator = self._CreateParserMediator(storage_writer) + resolver_context = dfvfs_context.Context() + + test_path_spec = fake_path_spec.FakePathSpec(location='/file.txt') + data = b'This is another file.\nWith two lines.\n' + file_object = fake_file_io.FakeFile(resolver_context, test_path_spec, data) + file_object.Open() + + test_parser = TestPyparsingSingleLineTextParser() + test_parser.ParseFileObject(parser_mediator, file_object) + + self.assertEqual(storage_writer.number_of_warnings, 0) + # The test parser does not generate events. + self.assertEqual(storage_writer.number_of_events, 0) + + storage_writer = self._CreateStorageWriter() + parser_mediator = self._CreateParserMediator(storage_writer) + + test_path_spec = fake_path_spec.FakePathSpec(location='/file.txt') + data = b'This is another file.\nWith tw\xba lines.\n' + file_object = fake_file_io.FakeFile(resolver_context, test_path_spec, data) + file_object.Open() + + test_parser = TestPyparsingSingleLineTextParser() + test_parser.ParseFileObject(parser_mediator, file_object) + + self.assertEqual(storage_writer.number_of_warnings, 1) + # The test parser does not generate events. + self.assertEqual(storage_writer.number_of_events, 0) if __name__ == '__main__':