diff --git a/README.md b/README.md index 64fa64f7..7ca64778 100644 --- a/README.md +++ b/README.md @@ -299,8 +299,9 @@ Changelog Unreleased ----- -- ... - +- Refactor parser to fix parsing inconsistencies ([@bbc2])([#170]). + - Interpret escapes as control characters only in double-quoted strings. + - Interpret `#` as start of comment only if preceded by whitespace. 0.10.2 ----- @@ -428,6 +429,7 @@ Unreleased [#172]: https://github.com/theskumar/python-dotenv/issues/172 [#121]: https://github.com/theskumar/python-dotenv/issues/121 [#176]: https://github.com/theskumar/python-dotenv/issues/176 +[#170]: https://github.com/theskumar/python-dotenv/issues/170 [@asyncee]: https://github.com/asyncee [@greyli]: https://github.com/greyli diff --git a/setup.cfg b/setup.cfg index 7f784591..f0847b32 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,6 +5,9 @@ universal = 1 max-line-length = 120 exclude = .tox,.git,docs,venv,.venv +[mypy] +ignore_missing_imports = true + [metadata] description-file = README.rst diff --git a/src/dotenv/compat.py b/src/dotenv/compat.py index 99ffb39b..1a145345 100644 --- a/src/dotenv/compat.py +++ b/src/dotenv/compat.py @@ -1,4 +1,5 @@ import sys +from typing import Text if sys.version_info >= (3, 0): from io import StringIO # noqa @@ -6,3 +7,17 @@ from StringIO import StringIO # noqa PY2 = sys.version_info[0] == 2 # type: bool + + +def to_text(string): + # type: (str) -> Text + """ + Make a string Unicode if it isn't already. + + This is useful for defining raw unicode strings because `ur"foo"` isn't valid in + Python 3. + """ + if PY2: + return string.decode("utf-8") + else: + return string diff --git a/src/dotenv/main.py b/src/dotenv/main.py index 08122825..5b619b11 100644 --- a/src/dotenv/main.py +++ b/src/dotenv/main.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import, print_function, unicode_literals -import codecs import io import os import re @@ -9,13 +8,14 @@ import sys from subprocess import Popen import tempfile -from typing import (Any, Dict, Iterator, List, Match, NamedTuple, Optional, # noqa - Pattern, Union, TYPE_CHECKING, Text, IO, Tuple) # noqa +from typing import (Dict, Iterator, List, Match, Optional, # noqa + Pattern, Union, TYPE_CHECKING, Text, IO, Tuple) import warnings from collections import OrderedDict from contextlib import contextmanager from .compat import StringIO, PY2 +from .parser import parse_stream if TYPE_CHECKING: # pragma: no cover if sys.version_info >= (3, 6): @@ -30,84 +30,6 @@ __posix_variable = re.compile(r'\$\{[^\}]*\}') # type: Pattern[Text] -_binding = re.compile( - r""" - ( - \s* # leading whitespace - (?:export{0}+)? # export - - ( '[^']+' # single-quoted key - | [^=\#\s]+ # or unquoted key - )? - - (?: - (?:{0}*={0}*) # equal sign - - ( '(?:\\'|[^'])*' # single-quoted value - | "(?:\\"|[^"])*" # or double-quoted value - | [^\#\r\n]* # or unquoted value - ) - )? - - \s* # trailing whitespace - (?:\#[^\r\n]*)? # comment - (?:\r|\n|\r\n)? # newline - ) - """.format(r'[^\S\r\n]'), - re.MULTILINE | re.VERBOSE, -) # type: Pattern[Text] - -_escape_sequence = re.compile(r"\\[\\'\"abfnrtv]") # type: Pattern[Text] - - -Binding = NamedTuple("Binding", [("key", Optional[Text]), - ("value", Optional[Text]), - ("original", Text)]) - - -def decode_escapes(string): - # type: (Text) -> Text - def decode_match(match): - # type: (Match[Text]) -> Text - return codecs.decode(match.group(0), 'unicode-escape') # type: ignore - - return _escape_sequence.sub(decode_match, string) - - -def is_surrounded_by(string, char): - # type: (Text, Text) -> bool - return ( - len(string) > 1 - and string[0] == string[-1] == char - ) - - -def parse_binding(string, position): - # type: (Text, int) -> Tuple[Binding, int] - match = _binding.match(string, position) - assert match is not None - (matched, key, value) = match.groups() - if key is None or value is None: - key = None - value = None - else: - value_quoted = is_surrounded_by(value, "'") or is_surrounded_by(value, '"') - if value_quoted: - value = decode_escapes(value[1:-1]) - else: - value = value.strip() - return (Binding(key=key, value=value, original=matched), match.end()) - - -def parse_stream(stream): - # type:(IO[Text]) -> Iterator[Binding] - string = stream.read() - position = 0 - length = len(string) - while position < length: - (binding, position) = parse_binding(string, position) - yield binding - def to_env(text): # type: (Text) -> str diff --git a/src/dotenv/parser.py b/src/dotenv/parser.py new file mode 100644 index 00000000..b63cb3a0 --- /dev/null +++ b/src/dotenv/parser.py @@ -0,0 +1,147 @@ +import codecs +import re +from typing import (IO, Iterator, Match, NamedTuple, Optional, Pattern, # noqa + Sequence, Text) + +from .compat import to_text + + +def make_regex(string, extra_flags=0): + # type: (str, int) -> Pattern[Text] + return re.compile(to_text(string), re.UNICODE | extra_flags) + + +_whitespace = make_regex(r"\s*", extra_flags=re.MULTILINE) +_export = make_regex(r"(?:export[^\S\r\n]+)?") +_single_quoted_key = make_regex(r"'([^']+)'") +_unquoted_key = make_regex(r"([^=\#\s]+)") +_equal_sign = make_regex(r"[^\S\r\n]*=[^\S\r\n]*") +_single_quoted_value = make_regex(r"'((?:\\'|[^'])*)'") +_double_quoted_value = make_regex(r'"((?:\\"|[^"])*)"') +_unquoted_value_part = make_regex(r"([^ \r\n]*)") +_comment = make_regex(r"(?:\s*#[^\r\n]*)?") +_end_of_line = make_regex(r"[^\S\r\n]*(?:\r\n|\n|\r)?") +_rest_of_line = make_regex(r"[^\r\n]*(?:\r|\n|\r\n)?") +_double_quote_escapes = make_regex(r"\\[\\'\"abfnrtv]") +_single_quote_escapes = make_regex(r"\\[\\']") + +Binding = NamedTuple("Binding", [("key", Optional[Text]), + ("value", Optional[Text]), + ("original", Text)]) + + +class Error(Exception): + pass + + +class Reader: + def __init__(self, stream): + # type: (IO[Text]) -> None + self.string = stream.read() + self.position = 0 + self.mark = 0 + + def has_next(self): + # type: () -> bool + return self.position < len(self.string) + + def set_mark(self): + # type: () -> None + self.mark = self.position + + def get_marked(self): + # type: () -> Text + return self.string[self.mark:self.position] + + def peek(self, count): + # type: (int) -> Text + return self.string[self.position:self.position + count] + + def read(self, count): + # type: (int) -> Text + result = self.string[self.position:self.position + count] + if len(result) < count: + raise Error("read: End of string") + self.position += count + return result + + def read_regex(self, regex): + # type: (Pattern[Text]) -> Sequence[Text] + match = regex.match(self.string, self.position) + if match is None: + raise Error("read_regex: Pattern not found") + self.position = match.end() + return match.groups() + + +def decode_escapes(regex, string): + # type: (Pattern[Text], Text) -> Text + def decode_match(match): + # type: (Match[Text]) -> Text + return codecs.decode(match.group(0), 'unicode-escape') # type: ignore + + return regex.sub(decode_match, string) + + +def parse_key(reader): + # type: (Reader) -> Text + char = reader.peek(1) + if char == "'": + (key,) = reader.read_regex(_single_quoted_key) + else: + (key,) = reader.read_regex(_unquoted_key) + return key + + +def parse_unquoted_value(reader): + # type: (Reader) -> Text + value = u"" + while True: + (part,) = reader.read_regex(_unquoted_value_part) + value += part + after = reader.peek(2) + if len(after) < 2 or after[0] in u"\r\n" or after[1] in u" #\r\n": + return value + value += reader.read(2) + + +def parse_value(reader): + # type: (Reader) -> Text + char = reader.peek(1) + if char == u"'": + (value,) = reader.read_regex(_single_quoted_value) + return decode_escapes(_single_quote_escapes, value) + elif char == u'"': + (value,) = reader.read_regex(_double_quoted_value) + return decode_escapes(_double_quote_escapes, value) + elif char in (u"", u"\n", u"\r"): + return u"" + else: + return parse_unquoted_value(reader) + + +def parse_binding(reader): + # type: (Reader) -> Binding + reader.set_mark() + try: + reader.read_regex(_whitespace) + reader.read_regex(_export) + key = parse_key(reader) + reader.read_regex(_equal_sign) + value = parse_value(reader) + reader.read_regex(_comment) + reader.read_regex(_end_of_line) + return Binding(key=key, value=value, original=reader.get_marked()) + except Error: + reader.read_regex(_rest_of_line) + return Binding(key=None, value=None, original=reader.get_marked()) + + +def parse_stream(stream): + # type:(IO[Text]) -> Iterator[Binding] + reader = Reader(stream) + while reader.has_next(): + try: + yield parse_binding(reader) + except Error: + return diff --git a/tests/test_core.py b/tests/test_core.py index daf0f59e..349c58b8 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -13,7 +13,6 @@ from dotenv import dotenv_values, find_dotenv, load_dotenv, set_key from dotenv.compat import PY2, StringIO -from dotenv.main import Binding, parse_stream @contextlib.contextmanager @@ -26,81 +25,6 @@ def restore_os_environ(): os.environ.update(environ) -@pytest.mark.parametrize("test_input,expected", [ - ("", []), - ("a=b", [Binding(key="a", value="b", original="a=b")]), - ("'a'=b", [Binding(key="'a'", value="b", original="'a'=b")]), - ("[=b", [Binding(key="[", value="b", original="[=b")]), - (" a = b ", [Binding(key="a", value="b", original=" a = b ")]), - ("export a=b", [Binding(key="a", value="b", original="export a=b")]), - (" export 'a'=b", [Binding(key="'a'", value="b", original=" export 'a'=b")]), - (" export 'a'=b", [Binding(key="'a'", value="b", original=" export 'a'=b")]), - ("# a=b", [Binding(key=None, value=None, original="# a=b")]), - ('a=b # comment', [Binding(key="a", value="b", original="a=b # comment")]), - ("a=b space ", [Binding(key="a", value="b space", original="a=b space ")]), - ("a='b space '", [Binding(key="a", value="b space ", original="a='b space '")]), - ('a="b space "', [Binding(key="a", value="b space ", original='a="b space "')]), - ("export export_a=1", [Binding(key="export_a", value="1", original="export export_a=1")]), - ("export port=8000", [Binding(key="port", value="8000", original="export port=8000")]), - ('a="b\nc"', [Binding(key="a", value="b\nc", original='a="b\nc"')]), - ("a='b\nc'", [Binding(key="a", value="b\nc", original="a='b\nc'")]), - ('a="b\nc"', [Binding(key="a", value="b\nc", original='a="b\nc"')]), - ('a="b\\nc"', [Binding(key="a", value='b\nc', original='a="b\\nc"')]), - ('a="b\\"c"', [Binding(key="a", value='b"c', original='a="b\\"c"')]), - ("a='b\\'c'", [Binding(key="a", value="b'c", original="a='b\\'c'")]), - ("a=à", [Binding(key="a", value="à", original="a=à")]), - ('a="à"', [Binding(key="a", value="à", original='a="à"')]), - ('garbage', [Binding(key=None, value=None, original="garbage")]), - ( - "a=b\nc=d", - [ - Binding(key="a", value="b", original="a=b\n"), - Binding(key="c", value="d", original="c=d"), - ], - ), - ( - "a=b\r\nc=d", - [ - Binding(key="a", value="b", original="a=b\r\n"), - Binding(key="c", value="d", original="c=d"), - ], - ), - ( - 'a=\nb=c', - [ - Binding(key="a", value='', original='a=\n'), - Binding(key="b", value='c', original="b=c"), - ] - ), - ( - 'a="\nb=c', - [ - Binding(key="a", value='"', original='a="\n'), - Binding(key="b", value='c', original="b=c"), - ] - ), - ( - '# comment\na="b\nc"\nd=e\n', - [ - Binding(key=None, value=None, original="# comment\n"), - Binding(key="a", value="b\nc", original='a="b\nc"\n'), - Binding(key="d", value="e", original="d=e\n"), - ], - ), - ( - 'garbage[%$#\na=b', - [ - Binding(key=None, value=None, original="garbage[%$#\n"), - Binding(key="a", value="b", original='a=b'), - ], - ), -]) -def test_parse_stream(test_input, expected): - result = parse_stream(StringIO(test_input)) - - assert list(result) == expected - - def test_warns_if_file_does_not_exist(): with warnings.catch_warnings(record=True) as w: load_dotenv('.does_not_exist', verbose=True) diff --git a/tests/test_parser.py b/tests/test_parser.py new file mode 100644 index 00000000..f191f902 --- /dev/null +++ b/tests/test_parser.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +import pytest + +from dotenv.compat import StringIO +from dotenv.parser import Binding, parse_stream + + +@pytest.mark.parametrize("test_input,expected", [ + (u"", []), + (u"a=b", [Binding(key=u"a", value=u"b", original=u"a=b")]), + (u"'a'=b", [Binding(key=u"a", value=u"b", original=u"'a'=b")]), + (u"[=b", [Binding(key=u"[", value=u"b", original=u"[=b")]), + (u" a = b ", [Binding(key=u"a", value=u"b", original=u" a = b ")]), + (u"export a=b", [Binding(key=u"a", value=u"b", original=u"export a=b")]), + (u" export 'a'=b", [Binding(key=u"a", value=u"b", original=u" export 'a'=b")]), + (u"# a=b", [Binding(key=None, value=None, original=u"# a=b")]), + (u"a=b#c", [Binding(key=u"a", value=u"b#c", original=u"a=b#c")]), + (u'a=b # comment', [Binding(key=u"a", value=u"b", original=u"a=b # comment")]), + (u"a=b space ", [Binding(key=u"a", value=u"b space", original=u"a=b space ")]), + (u"a='b space '", [Binding(key=u"a", value=u"b space ", original=u"a='b space '")]), + (u'a="b space "', [Binding(key=u"a", value=u"b space ", original=u'a="b space "')]), + (u"export export_a=1", [Binding(key=u"export_a", value=u"1", original=u"export export_a=1")]), + (u"export port=8000", [Binding(key=u"port", value=u"8000", original=u"export port=8000")]), + (u'a="b\nc"', [Binding(key=u"a", value=u"b\nc", original=u'a="b\nc"')]), + (u"a='b\nc'", [Binding(key=u"a", value=u"b\nc", original=u"a='b\nc'")]), + (u'a="b\nc"', [Binding(key=u"a", value=u"b\nc", original=u'a="b\nc"')]), + (u'a="b\\nc"', [Binding(key=u"a", value=u'b\nc', original=u'a="b\\nc"')]), + (u"a='b\\nc'", [Binding(key=u"a", value=u'b\\nc', original=u"a='b\\nc'")]), + (u'a="b\\"c"', [Binding(key=u"a", value=u'b"c', original=u'a="b\\"c"')]), + (u"a='b\\'c'", [Binding(key=u"a", value=u"b'c", original=u"a='b\\'c'")]), + (u"a=à", [Binding(key=u"a", value=u"à", original=u"a=à")]), + (u'a="à"', [Binding(key=u"a", value=u"à", original=u'a="à"')]), + (u'garbage', [Binding(key=None, value=None, original=u"garbage")]), + ( + u"a=b\nc=d", + [ + Binding(key=u"a", value=u"b", original=u"a=b\n"), + Binding(key=u"c", value=u"d", original=u"c=d"), + ], + ), + ( + u"a=b\r\nc=d", + [ + Binding(key=u"a", value=u"b", original=u"a=b\r\n"), + Binding(key=u"c", value=u"d", original=u"c=d"), + ], + ), + ( + u'a=\nb=c', + [ + Binding(key=u"a", value=u'', original=u'a=\n'), + Binding(key=u"b", value=u'c', original=u"b=c"), + ] + ), + ( + u'a=b\n\nc=d', + [ + Binding(key=u"a", value=u"b", original=u"a=b\n"), + Binding(key=u"c", value=u"d", original=u"\nc=d"), + ] + ), + ( + u'a="\nb=c', + [ + Binding(key=None, value=None, original=u'a="\n'), + Binding(key=u"b", value=u"c", original=u"b=c"), + ] + ), + ( + u'# comment\na="b\nc"\nd=e\n', + [ + Binding(key=None, value=None, original=u"# comment\n"), + Binding(key=u"a", value=u"b\nc", original=u'a="b\nc"\n'), + Binding(key=u"d", value=u"e", original=u"d=e\n"), + ], + ), + ( + u'garbage[%$#\na=b', + [ + Binding(key=None, value=None, original=u"garbage[%$#\n"), + Binding(key=u"a", value=u"b", original=u'a=b'), + ], + ), +]) +def test_parse_stream(test_input, expected): + result = parse_stream(StringIO(test_input)) + + assert list(result) == expected diff --git a/tox.ini b/tox.ini index 56c8732a..077780f4 100644 --- a/tox.ini +++ b/tox.ini @@ -19,11 +19,11 @@ deps = mypy commands = flake8 src tests - mypy --python-version=3.7 src - mypy --python-version=3.6 src - mypy --python-version=3.5 src - mypy --python-version=3.4 src - mypy --python-version=2.7 src + mypy --python-version=3.7 src tests + mypy --python-version=3.6 src tests + mypy --python-version=3.5 src tests + mypy --python-version=3.4 src tests + mypy --python-version=2.7 src tests [testenv:manifest] deps = check-manifest