Skip to content

Commit

Permalink
Changes to text parser to handle decode errors log2timeline#3301
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Jan 31, 2021
1 parent a6627f4 commit 95cea5c
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 11 deletions.
42 changes: 40 additions & 2 deletions plaso/parsers/text_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""

import abc
import codecs

import pyparsing

Expand Down Expand Up @@ -177,6 +178,34 @@ def __init__(self):
# TODO: self._line_structures is a work-around and this needs
# a structural fix.
self._line_structures = list(self.LINE_STRUCTURES)
self._parser_mediator = None

codecs.register_error('text_parser_handler', self._EncodingErrorHandler)

def _EncodingErrorHandler(self, exception):
"""Encoding error handler.
Args:
exception [UnicodeDecodeError]: exception.
Returns:
tuple[str, int]: replacement string and a position where encoding should
continue.
Raises:
TypeError: if exception is not of type UnicodeDecodeError.
"""
if not isinstance(exception, UnicodeDecodeError):
raise TypeError('Unsupported exception type.')

if self._parser_mediator:
self._parser_mediator.ProduceExtractionWarning(
'error decoding 0x{0:02x} at offset: {1:d}'.format(
exception.object[exception.start],
self._current_offset + exception.start))

escaped = '\\x{0:2x}'.format(exception.object[exception.start])
return (escaped, exception.start + 1)

def _GetValueFromStructure(self, structure, name, default_value=None):
"""Retrieves a token value from a Pyparsing structure.
Expand Down Expand Up @@ -279,7 +308,7 @@ def _ReadLine(self, text_file_object, max_len=None, depth=0):
Raises:
UnicodeDecodeError: if the text cannot be decoded using the specified
encoding.
encoding and encoding errors is set to strict.
"""
line = text_file_object.readline(size=max_len)

Expand Down Expand Up @@ -312,6 +341,10 @@ def ParseFileObject(self, parser_mediator, file_object):
'Line structure undeclared, unable to proceed.')

encoding = self._ENCODING or parser_mediator.codepage

# Use strict encoding error handling in the verification step so that
# a text parser does not generate extraction warning for encoding errors
# of unsupported files.
text_file_object = text_file.TextFile(file_object, encoding=encoding)

try:
Expand All @@ -337,6 +370,12 @@ def ParseFileObject(self, parser_mediator, file_object):
if not self.VerifyStructure(parser_mediator, line):
raise errors.UnableToParseFile('Wrong file structure.')

self._parser_mediator = parser_mediator

text_file_object = text_file.TextFile(
file_object, encoding=encoding, encoding_errors='text_parser_handler')
line = self._ReadLine(text_file_object, max_len=self.MAX_LINE_LENGTH)

consecutive_line_failures = 0
index = None
# Set the offset to the beginning of the file.
Expand Down Expand Up @@ -570,7 +609,6 @@ def ParseFileObject(self, parser_mediator, file_object):
for key, structure in self.LINE_STRUCTURES:
structure.parseWithTabs()


consecutive_line_failures = 0
# Read every line in the text file.
while text_reader.lines:
Expand Down
170 changes: 161 additions & 9 deletions tests/parsers/text_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,62 @@
# -*- coding: utf-8 -*-
"""This file contains the tests for the generic text parser."""

import codecs
import unittest

import pyparsing

from dfvfs.file_io import fake_file_io
from dfvfs.helpers import text_file as dfvfs_text_file
from dfvfs.path import fake_path_spec
from dfvfs.resolver import context as dfvfs_context

from plaso.parsers import text_parser

from tests.parsers import test_lib


class TestPyparsingSingleLineTextParser(
text_parser.PyparsingSingleLineTextParser):
"""Single line PyParsing-based text parser for testing purposes."""

_ENCODING = 'utf-8'

_LINE = pyparsing.Regex('.*') + pyparsing.lineEnd()

LINE_STRUCTURES = [('line', _LINE)]

def ParseRecord(self, parser_mediator, key, structure):
"""Parses a log record structure and produces events.
This function takes as an input a parsed pyparsing structure
and produces an EventObject if possible from that structure.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfvfs.
key (str): name of the parsed structure.
structure (pyparsing.ParseResults): tokens from a parsed log line.
"""
return

def VerifyStructure(self, parser_mediator, line):
"""Verify the structure of the file and return boolean based on that check.
This function should read enough text from the text file to confirm
that the file is the correct one for this particular parser.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfvfs.
line (str): single line from the text file.
Returns:
bool: True if this is the correct parser, False otherwise.
"""
return True


class PyparsingConstantsTest(test_lib.ParserTestCase):
"""Tests the PyparsingConstants text parser."""

Expand Down Expand Up @@ -52,32 +99,137 @@ def testConstantIPv4(self):
text_parser.PyparsingConstants.IPV4_ADDRESS.parseString('34.258')


class PyparsingSingleLineTextParserTest(unittest.TestCase):
class PyparsingSingleLineTextParserTest(test_lib.ParserTestCase):
"""Tests for the single line PyParsing-based text parser."""

# pylint: disable=protected-access
# pylint: disable=attribute-defined-outside-init,protected-access

def _EncodingErrorHandler(self, exception):
"""Encoding error handler.
Args:
exception [UnicodeDecodeError]: exception.
Returns:
tuple[str, int]: replacement string and number of bytes to skip.
Raises:
TypeError: if exception is not of type UnicodeDecodeError.
"""
if not isinstance(exception, UnicodeDecodeError):
raise TypeError('Unsupported exception type.')

self._encoding_errors.append(
(exception.start, exception.object[exception.start]))
escaped = '\\x{0:2x}'.format(exception.object[exception.start])
return (escaped, exception.start + 1)

def testIsText(self):
"""Tests the _IsText function."""
parser = text_parser.PyparsingSingleLineTextParser()
test_parser = TestPyparsingSingleLineTextParser()

bytes_in = b'this is My Weird ASCII and non whatever string.'
self.assertTrue(parser._IsText(bytes_in))
self.assertTrue(test_parser._IsText(bytes_in))

bytes_in = 'Plaso Síar Og Raðar Þessu'
self.assertTrue(parser._IsText(bytes_in))
self.assertTrue(test_parser._IsText(bytes_in))

bytes_in = b'\x01\\62LSO\xFF'
self.assertFalse(parser._IsText(bytes_in))
self.assertFalse(test_parser._IsText(bytes_in))

bytes_in = b'T\x00h\x00i\x00s\x00\x20\x00'
self.assertTrue(parser._IsText(bytes_in))
self.assertTrue(test_parser._IsText(bytes_in))

bytes_in = b'Ascii\x00'
self.assertTrue(parser._IsText(bytes_in))
self.assertTrue(test_parser._IsText(bytes_in))

bytes_in = b'Ascii Open then...\x00\x99\x23'
self.assertFalse(parser._IsText(bytes_in))
self.assertFalse(test_parser._IsText(bytes_in))

def testReadLine(self):
"""Tests the _ReadLine function."""
resolver_context = dfvfs_context.Context()

test_path_spec = fake_path_spec.FakePathSpec(location='/file.txt')
data = b'This is another file.'
file_object = fake_file_io.FakeFile(resolver_context, test_path_spec, data)
file_object.Open()

test_parser = TestPyparsingSingleLineTextParser()
test_text_file = dfvfs_text_file.TextFile(file_object, encoding='utf-8')
line = test_parser._ReadLine(test_text_file)
self.assertEqual(line, 'This is another file.')

test_path_spec = fake_path_spec.FakePathSpec(location='/file.txt')
data = b'This is an\xbather file.'
file_object = fake_file_io.FakeFile(resolver_context, test_path_spec, data)
file_object.Open()

test_parser = TestPyparsingSingleLineTextParser()
test_text_file = dfvfs_text_file.TextFile(file_object, encoding='utf8')
with self.assertRaises(UnicodeDecodeError):
test_parser._ReadLine(test_text_file)

test_path_spec = fake_path_spec.FakePathSpec(location='/file.txt')
data = b'This is an\xbather file.'
file_object = fake_file_io.FakeFile(resolver_context, test_path_spec, data)
file_object.Open()

test_parser = TestPyparsingSingleLineTextParser()
test_text_file = dfvfs_text_file.TextFile(
file_object, encoding='utf8', encoding_errors='replace')
line = test_parser._ReadLine(test_text_file)
self.assertEqual(line, 'This is an\ufffdther file.')

self._encoding_errors = []
codecs.register_error('test_handler', self._EncodingErrorHandler)

test_path_spec = fake_path_spec.FakePathSpec(location='/file.txt')
data = b'This is an\xbather file.'
file_object = fake_file_io.FakeFile(resolver_context, test_path_spec, data)
file_object.Open()

test_parser = TestPyparsingSingleLineTextParser()
test_text_file = dfvfs_text_file.TextFile(
file_object, encoding='utf8', encoding_errors='test_handler')
line = test_parser._ReadLine(test_text_file)
self.assertEqual(line, 'This is an\\xbather file.')

self.assertEqual(len(self._encoding_errors), 1)
self.assertEqual(self._encoding_errors[0], (10, 0xba))

def testParseFileObject(self):
"""Tests the ParseFileObject function."""
storage_writer = self._CreateStorageWriter()
parser_mediator = self._CreateParserMediator(storage_writer)
resolver_context = dfvfs_context.Context()

test_path_spec = fake_path_spec.FakePathSpec(location='/file.txt')
data = b'This is another file.\nWith two lines.\n'
file_object = fake_file_io.FakeFile(resolver_context, test_path_spec, data)
file_object.Open()

test_parser = TestPyparsingSingleLineTextParser()
test_parser.ParseFileObject(parser_mediator, file_object)

self.assertEqual(storage_writer.number_of_warnings, 0)
# The test parser does not generate events.
self.assertEqual(storage_writer.number_of_events, 0)

storage_writer = self._CreateStorageWriter()
parser_mediator = self._CreateParserMediator(storage_writer)

test_path_spec = fake_path_spec.FakePathSpec(location='/file.txt')
data = b'This is another file.\nWith tw\xba lines.\n'
file_object = fake_file_io.FakeFile(resolver_context, test_path_spec, data)
file_object.Open()

test_parser = TestPyparsingSingleLineTextParser()
test_parser.ParseFileObject(parser_mediator, file_object)

self.assertEqual(storage_writer.number_of_warnings, 1)
# The test parser does not generate events.
self.assertEqual(storage_writer.number_of_events, 0)


if __name__ == '__main__':
Expand Down

0 comments on commit 95cea5c

Please sign in to comment.