diff --git a/pyomo/common/formatting.py b/pyomo/common/formatting.py index 710116a98fe..1d290d115d5 100644 --- a/pyomo/common/formatting.py +++ b/pyomo/common/formatting.py @@ -18,8 +18,12 @@ StreamIndenter """ +import io import re import types + +from typing import Iterable + from pyomo.common.sorting import sorted_robust @@ -204,42 +208,59 @@ class StreamIndenter: StreamIndenter objects may be arbitrarily nested. """ - def __init__(self, ostream, indent=' ' * 4): - self.os = ostream - self.indent = indent - self.stripped_indent = indent.rstrip() - self.newline = True - - def __getattr__(self, name): - return getattr(self.os, name) - - def write(self, data): - if not len(data): - return - lines = data.split('\n') - if self.newline: - if lines[0]: - self.os.write(self.indent + lines[0]) - else: - self.os.write(self.stripped_indent) + _newline_re = re.compile('\n([^\n])') + _blankline_re = re.compile('\n\n') + + def __init__(self, ostream: io.TextIOBase, indent: str = ' ' * 4): + super().__setattr__('wrapped_os', ostream) + # The following is a "cute" trick: because of the __getattr__ / + # __setattr__ overloads, nested StreamIndenter instances all + # print directly to the underlying stream object, and all share + # a common `newline` flag. + if isinstance(ostream, StreamIndenter): + super().__setattr__('target_os', ostream.target_os) + indent = ostream.indent + indent else: - self.os.write(lines[0]) - if len(lines) < 2: - self.newline = False - return - for line in lines[1:-1]: - if line: - self.os.write("\n" + self.indent + line) - else: - self.os.write("\n" + self.stripped_indent) - if lines[-1]: - self.os.write("\n" + self.indent + lines[-1]) - self.newline = False + super().__setattr__('target_os', ostream) + # We will assume the last thing written to the stream we + # are wrapping ended with a newline + super().__setattr__('newline', True) + super().__setattr__('indent', indent) + super().__setattr__('indent_match', f'\n{indent}\\1') + super().__setattr__('stripped_indent', indent.rstrip()) + if self.stripped_indent: + super().__setattr__('blankline_match', f'\n{self.stripped_indent}\n') + + def __getattr__(self, name: str): + return getattr(self.wrapped_os, name) + + def __setattr__(self, name: str, val): + if name in self.__dict__: + super().__setattr__(name, val) else: - self.os.write("\n") - self.newline = True + self.wrapped_os.__setattr__(name, val) - def writelines(self, sequence): + def write(self, data: str) -> int: + if not data: + return 0 + written = 0 + if self.newline: + if data[0] != '\n': + written += self.target_os.write(self.indent) + elif self.stripped_indent: + written += self.target_os.write(self.stripped_indent) + data = self._newline_re.sub(self.indent_match, data) + if self.stripped_indent: + data, n = self._blankline_re.subn(self.blankline_match, data) + # If we replaced any blank lines, then we need to check + # again to catch cases like "\n\n\n" + if n: + data = self._blankline_re.sub(self.blankline_match, data) + written += self.target_os.write(data) + self.newline = data.endswith('\n') + return written + + def writelines(self, sequence: Iterable[str]) -> None: for x in sequence: self.write(x) diff --git a/pyomo/common/tee.py b/pyomo/common/tee.py index 0711d831bf4..ddd2a9ee359 100644 --- a/pyomo/common/tee.py +++ b/pyomo/common/tee.py @@ -77,8 +77,7 @@ def _retry(self, fcn, *args, retries=10): failCount = 0 while 1: try: - fcn(*args) - break + return fcn(*args) except (OSError, BlockingIOError): failCount += 1 if failCount >= retries: @@ -89,10 +88,12 @@ def flush(self): self._retry(self._ostream.flush) self._handle.flush = True - def write(self, data): + def write(self, data: str) -> int: + ans = 0 chunksize = _pipe_buffersize >> 1 # 1/2 the buffer size for i in range(0, len(data), chunksize): - self._retry(self._ostream.write, data[i : i + chunksize]) + ans += self._retry(self._ostream.write, data[i : i + chunksize]) + return ans def writelines(self, data): for line in data: @@ -116,9 +117,10 @@ class _AutoFlush(_SignalFlush): # Because we define write() and writelines() under windows, we # need to make sure that _AutoFlush calls them - def write(self, data): - super().write(data) + def write(self, data: str) -> int: + ans = super().write(data) self.flush() + return ans def writelines(self, data): super().writelines(data) @@ -126,9 +128,10 @@ def writelines(self, data): else: - def write(self, data): - self._ostream.write(data) + def write(self, data: str) -> int: + ans = self._ostream.write(data) self.flush() + return ans def writelines(self, data): self._ostream.writelines(data) diff --git a/pyomo/common/tests/test_formatting.py b/pyomo/common/tests/test_formatting.py index 0798a80589c..84298955f07 100644 --- a/pyomo/common/tests/test_formatting.py +++ b/pyomo/common/tests/test_formatting.py @@ -195,18 +195,26 @@ def _data_gen(i, j): class TestStreamIndenter(unittest.TestCase): + def test_empty(self): + OUT1 = StringIO() + OUT2 = StreamIndenter(OUT1) + self.assertEqual(0, OUT2.write('')) + self.assertEqual('', OUT2.getvalue()) + def test_noprefix(self): OUT1 = StringIO() OUT2 = StreamIndenter(OUT1) - OUT2.write('Hello?\nHello, world!') + self.assertEqual(28, OUT2.write('Hello?\nHello, world!')) self.assertEqual(' Hello?\n Hello, world!', OUT2.getvalue()) def test_prefix(self): - prefix = 'foo:' + prefix = 'foo: ' OUT1 = StringIO() OUT2 = StreamIndenter(OUT1, prefix) - OUT2.write('Hello?\nHello, world!') - self.assertEqual('foo:Hello?\nfoo:Hello, world!', OUT2.getvalue()) + OUT2.write('Hello?\nText\n\nHello, world!') + self.assertEqual( + 'foo: Hello?\nfoo: Text\nfoo:\nfoo: Hello, world!', OUT2.getvalue() + ) def test_blank_lines(self): OUT1 = StringIO() @@ -214,8 +222,45 @@ def test_blank_lines(self): OUT2.write('Hello?\n\nText\n\nHello, world!') self.assertEqual(' Hello?\n\n Text\n\n Hello, world!', OUT2.getvalue()) + def test_blank_lines_nonwhitespace_indent(self): + OUT1 = StringIO() + OUT2 = StreamIndenter(OUT1, " | ") + OUT2.write('Hello?\n\nText\n') + OUT2.write('\n') + OUT2.write('Hello, world!') + self.assertEqual( + ' | Hello?\n |\n | Text\n |\n | Hello, world!', OUT2.getvalue() + ) + def test_writelines(self): OUT1 = StringIO() OUT2 = StreamIndenter(OUT1) OUT2.writelines(['Hello?\n', '\n', 'Text\n', '\n', 'Hello, world!']) self.assertEqual(' Hello?\n\n Text\n\n Hello, world!', OUT2.getvalue()) + + def test_nested(self): + OUT1 = StringIO() + OUT2 = StreamIndenter(OUT1) + OUT3 = StreamIndenter(OUT2) + self.assertIs(OUT3.target_os, OUT2.target_os) + self.assertIs(OUT3.target_os, OUT1) + OUT3.write('Hello?\n\nText\n\nHello, world!') + self.assertEqual( + ' Hello?\n\n Text\n\n Hello, world!', OUT1.getvalue() + ) + + def test_nested_interleave(self): + OUT1 = StringIO() + OUT2 = StreamIndenter(OUT1) + OUT3 = StreamIndenter(OUT2) + self.assertIs(OUT3.target_os, OUT2.target_os) + self.assertIs(OUT3.target_os, OUT1) + OUT3.write('Hello?') + OUT2.write('\n\n') + OUT3.write('Text\n') + OUT2.write('Hi\n') + OUT3.write('Hello, world!') + self.assertEqual( + ' Hello?\n\n Text\n Hi\n Hello, world!', + OUT1.getvalue(), + ) diff --git a/pyomo/core/base/component.py b/pyomo/core/base/component.py index 8bbdc4d571b..6b28d10bca5 100644 --- a/pyomo/core/base/component.py +++ b/pyomo/core/base/component.py @@ -366,12 +366,6 @@ def _pprint_base_impl( if not _attr and self.parent_block() is None: _name = '' - # We only indent everything if we printed the header - if _attr or _name or _doc: - ostream = StreamIndenter(ostream, self._PPRINT_INDENT) - # The first line should be a hanging indent (i.e., not indented) - ostream.newline = False - if self.is_reference(): _attr = list(_attr) if _attr else [] _attr.append(('ReferenceTo', self.referent)) @@ -379,11 +373,13 @@ def _pprint_base_impl( if _name: ostream.write(_name + " : ") if _doc: - ostream.write(_doc + '\n') + ostream.write(_doc + '\n' + self._PPRINT_INDENT) if _attr: ostream.write(", ".join("%s=%s" % (k, v) for k, v in _attr)) if _attr or _name or _doc: ostream.write("\n") + # We only indent everything if we printed the header + ostream = StreamIndenter(ostream, self._PPRINT_INDENT) if not _constructed: # HACK: for backwards compatibility, Abstract blocks will