diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py index c7e7856f0..13d897263 100644 --- a/git/objects/submodule/base.py +++ b/git/objects/submodule/base.py @@ -1,44 +1,37 @@ -# need a dict to set bloody .name field from io import BytesIO import logging import os +import os.path as osp import stat import uuid import git from git.cmd import Git -from git.compat import ( - defenc, - is_win, -) -from git.config import SectionConstraint, GitConfigParser, cp +from git.compat import defenc, is_win +from git.config import GitConfigParser, SectionConstraint, cp from git.exc import ( + BadName, InvalidGitRepositoryError, NoSuchPathError, RepositoryDirtyError, - BadName, ) from git.objects.base import IndexObject, Object from git.objects.util import TraversableIterableObj - from git.util import ( - join_path_native, - to_native_path_linux, + IterableList, RemoteProgress, + join_path_native, rmtree, + to_native_path_linux, unbare_repo, - IterableList, ) -from git.util import HIDE_WINDOWS_KNOWN_ERRORS - -import os.path as osp from .util import ( + SubmoduleConfigParser, + find_first_remote_branch, mkhead, sm_name, sm_section, - SubmoduleConfigParser, - find_first_remote_branch, ) @@ -1060,28 +1053,13 @@ def remove( import gc gc.collect() - try: - rmtree(str(wtd)) - except Exception as ex: - if HIDE_WINDOWS_KNOWN_ERRORS: - from unittest import SkipTest - - raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex - raise + rmtree(str(wtd)) # END delete tree if possible # END handle force if not dry_run and osp.isdir(git_dir): self._clear_cache() - try: - rmtree(git_dir) - except Exception as ex: - if HIDE_WINDOWS_KNOWN_ERRORS: - from unittest import SkipTest - - raise SkipTest(f"FIXME: fails with: PermissionError\n {ex}") from ex - else: - raise + rmtree(git_dir) # end handle separate bare repository # END handle module deletion diff --git a/git/util.py b/git/util.py index 48901ba0c..97f461a83 100644 --- a/git/util.py +++ b/git/util.py @@ -5,24 +5,24 @@ # the BSD License: https://opensource.org/license/bsd-3-clause/ from abc import abstractmethod -import os.path as osp -from .compat import is_win import contextlib from functools import wraps import getpass import logging import os +import os.path as osp +import pathlib import platform -import subprocess import re import shutil import stat -from sys import maxsize +import subprocess +import sys import time from urllib.parse import urlsplit, urlunsplit import warnings -# from git.objects.util import Traversable +from .compat import is_win # typing --------------------------------------------------------- @@ -42,22 +42,17 @@ Tuple, TypeVar, Union, - cast, TYPE_CHECKING, + cast, overload, ) -import pathlib - if TYPE_CHECKING: from git.remote import Remote from git.repo.base import Repo from git.config import GitConfigParser, SectionConstraint from git import Git - # from git.objects.base import IndexObject - - from .types import ( Literal, SupportsIndex, @@ -75,7 +70,6 @@ # --------------------------------------------------------------------- - from gitdb.util import ( # NOQA @IgnorePep8 make_sha, LockedFD, # @UnusedImport @@ -88,7 +82,6 @@ hex_to_bin, # @UnusedImport ) - # NOTE: Some of the unused imports might be used/imported by others. # Handle once test-cases are back up and running. # Most of these are unused here, but are for use by git-python modules so these @@ -116,14 +109,33 @@ log = logging.getLogger(__name__) -# types############################################################ + +def _read_env_flag(name: str, default: bool) -> bool: + try: + value = os.environ[name] + except KeyError: + return default + + log.warning( + "The %s environment variable is deprecated. Its effect has never been documented and changes without warning.", + name, + ) + + adjusted_value = value.strip().lower() + + if adjusted_value in {"", "0", "false", "no"}: + return False + if adjusted_value in {"1", "true", "yes"}: + return True + log.warning("%s has unrecognized value %r, treating as %r.", name, value, default) + return default #: We need an easy way to see if Appveyor TCs start failing, #: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy, #: till then, we wish to hide them. -HIDE_WINDOWS_KNOWN_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_KNOWN_ERRORS", True) -HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_FREEZE_ERRORS", True) +HIDE_WINDOWS_KNOWN_ERRORS = is_win and _read_env_flag("HIDE_WINDOWS_KNOWN_ERRORS", True) +HIDE_WINDOWS_FREEZE_ERRORS = is_win and _read_env_flag("HIDE_WINDOWS_FREEZE_ERRORS", True) # { Utility Methods @@ -177,25 +189,29 @@ def patch_env(name: str, value: str) -> Generator[None, None, None]: def rmtree(path: PathLike) -> None: - """Remove the given recursively. + """Remove the given directory tree recursively. - :note: we use shutil rmtree but adjust its behaviour to see whether files that - couldn't be deleted are read-only. Windows will not remove them in that case""" + :note: We use :func:`shutil.rmtree` but adjust its behaviour to see whether files that + couldn't be deleted are read-only. Windows will not remove them in that case.""" - def onerror(func: Callable, path: PathLike, exc_info: str) -> None: - # Is the error an access error ? + def handler(function: Callable, path: PathLike, _excinfo: Any) -> None: + """Callback for :func:`shutil.rmtree`. Works either as ``onexc`` or ``onerror``.""" + # Is the error an access error? os.chmod(path, stat.S_IWUSR) try: - func(path) # Will scream if still not possible to delete. - except Exception as ex: + function(path) + except PermissionError as ex: if HIDE_WINDOWS_KNOWN_ERRORS: from unittest import SkipTest - raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex + raise SkipTest(f"FIXME: fails with: PermissionError\n {ex}") from ex raise - return shutil.rmtree(path, False, onerror) + if sys.version_info >= (3, 12): + shutil.rmtree(path, onexc=handler) + else: + shutil.rmtree(path, onerror=handler) def rmfile(path: PathLike) -> None: @@ -995,7 +1011,7 @@ def __init__( self, file_path: PathLike, check_interval_s: float = 0.3, - max_block_time_s: int = maxsize, + max_block_time_s: int = sys.maxsize, ) -> None: """Configure the instance diff --git a/test-requirements.txt b/test-requirements.txt index 9414da09c..a69181be1 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -7,4 +7,5 @@ pre-commit pytest pytest-cov pytest-instafail +pytest-subtests pytest-sugar diff --git a/test/test_docs.py b/test/test_docs.py index 79e1f1be4..d1ed46926 100644 --- a/test/test_docs.py +++ b/test/test_docs.py @@ -21,7 +21,10 @@ def tearDown(self): gc.collect() - # @skipIf(HIDE_WINDOWS_KNOWN_ERRORS, ## ACTUALLY skipped by `git.submodule.base#L869`. + # ACTUALLY skipped by git.util.rmtree (in local onerror function), from the last call to it via + # git.objects.submodule.base.Submodule.remove (at "handle separate bare repository"), line 1062. + # + # @skipIf(HIDE_WINDOWS_KNOWN_ERRORS, # "FIXME: helper.wrapper fails with: PermissionError: [WinError 5] Access is denied: " # "'C:\\Users\\appveyor\\AppData\\Local\\Temp\\1\\test_work_tree_unsupportedryfa60di\\master_repo\\.git\\objects\\pack\\pack-bc9e0787aef9f69e1591ef38ea0a6f566ec66fe3.idx") # noqa E501 @with_rw_directory diff --git a/test/test_submodule.py b/test/test_submodule.py index 79ff2c5f2..31a555ce2 100644 --- a/test/test_submodule.py +++ b/test/test_submodule.py @@ -457,7 +457,10 @@ def _do_base_tests(self, rwrepo): True, ) - # @skipIf(HIDE_WINDOWS_KNOWN_ERRORS, ## ACTUALLY skipped by `git.submodule.base#L869`. + # ACTUALLY skipped by git.util.rmtree (in local onerror function), called via + # git.objects.submodule.base.Submodule.remove at "method(mp)", line 1011. + # + # @skipIf(HIDE_WINDOWS_KNOWN_ERRORS, # "FIXME: fails with: PermissionError: [WinError 32] The process cannot access the file because" # "it is being used by another process: " # "'C:\\Users\\ankostis\\AppData\\Local\\Temp\\tmp95c3z83bnon_bare_test_base_rw\\git\\ext\\gitdb\\gitdb\\ext\\smmap'") # noqa E501 @@ -819,9 +822,11 @@ def test_git_submodules_and_add_sm_with_new_commit(self, rwdir): assert commit_sm.binsha == sm_too.binsha assert sm_too.binsha != sm.binsha - # @skipIf(HIDE_WINDOWS_KNOWN_ERRORS, ## ACTUALLY skipped by `git.submodule.base#L869`. - # "FIXME: helper.wrapper fails with: PermissionError: [WinError 5] Access is denied: " - # "'C:\\Users\\appveyor\\AppData\\Local\\Temp\\1\\test_work_tree_unsupportedryfa60di\\master_repo\\.git\\objects\\pack\\pack-bc9e0787aef9f69e1591ef38ea0a6f566ec66fe3.idx") # noqa E501 + @pytest.mark.xfail( + HIDE_WINDOWS_KNOWN_ERRORS, + reason='"The process cannot access the file because it is being used by another process" on call to sm.move', + raises=PermissionError, + ) @with_rw_directory def test_git_submodule_compatibility(self, rwdir): parent = git.Repo.init(osp.join(rwdir, "parent")) diff --git a/test/test_util.py b/test/test_util.py index 2b1e518ed..f75231c98 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -4,13 +4,18 @@ # This module is part of GitPython and is released under # the BSD License: https://opensource.org/license/bsd-3-clause/ +import ast +import contextlib +from datetime import datetime import os +import pathlib import pickle +import stat +import subprocess import sys import tempfile import time -from unittest import mock, skipUnless -from datetime import datetime +from unittest import SkipTest, mock, skipIf, skipUnless import ddt import pytest @@ -19,71 +24,199 @@ from git.compat import is_win from git.objects.util import ( altz_to_utctz_str, - utctz_to_altz, - verify_utctz, + from_timestamp, parse_date, tzoffset, - from_timestamp, -) -from test.lib import ( - TestBase, - with_rw_repo, + utctz_to_altz, + verify_utctz, ) from git.util import ( - LockFile, - BlockingLockFile, - get_user_id, Actor, + BlockingLockFile, IterableList, + LockFile, cygpath, decygpath, + get_user_id, remove_password_if_present, + rmtree, ) +from test.lib import TestBase, with_rw_repo -_norm_cygpath_pairs = ( - (r"foo\bar", "foo/bar"), - (r"foo/bar", "foo/bar"), - (r"C:\Users", "/cygdrive/c/Users"), - (r"C:\d/e", "/cygdrive/c/d/e"), - ("C:\\", "/cygdrive/c/"), - (r"\\server\C$\Users", "//server/C$/Users"), - (r"\\server\C$", "//server/C$"), - ("\\\\server\\c$\\", "//server/c$/"), - (r"\\server\BAR/", "//server/BAR/"), - (r"D:/Apps", "/cygdrive/d/Apps"), - (r"D:/Apps\fOO", "/cygdrive/d/Apps/fOO"), - (r"D:\Apps/123", "/cygdrive/d/Apps/123"), -) +class _Member: + """A member of an IterableList.""" -_unc_cygpath_pairs = ( - (r"\\?\a:\com", "/cygdrive/a/com"), - (r"\\?\a:/com", "/cygdrive/a/com"), - (r"\\?\UNC\server\D$\Apps", "//server/D$/Apps"), -) + __slots__ = ("name",) + def __init__(self, name): + self.name = name -class TestIterableMember(object): + def __repr__(self): + return f"{type(self).__name__}({self.name!r})" - """A member of an iterable list""" - __slots__ = "name" +@contextlib.contextmanager +def _tmpdir_to_force_permission_error(): + """Context manager to test permission errors in situations where they are not overcome.""" + if sys.platform == "cygwin": + raise SkipTest("Cygwin can't set the permissions that make the test meaningful.") + if sys.version_info < (3, 8): + raise SkipTest("In 3.7, TemporaryDirectory doesn't clean up after weird permissions.") - def __init__(self, name): - self.name = name + with tempfile.TemporaryDirectory() as parent: + td = pathlib.Path(parent, "testdir") + td.mkdir() + (td / "x").write_bytes(b"") + (td / "x").chmod(stat.S_IRUSR) # Set up PermissionError on Windows. + td.chmod(stat.S_IRUSR | stat.S_IXUSR) # Set up PermissionError on Unix. + yield td - def __repr__(self): - return "TestIterableMember(%r)" % self.name + +@contextlib.contextmanager +def _tmpdir_for_file_not_found(): + """Context manager to test errors deleting a directory that are not due to permissions.""" + with tempfile.TemporaryDirectory() as parent: + yield pathlib.Path(parent, "testdir") # It is deliberately never created. @ddt.ddt class TestUtils(TestBase): - def setup(self): - self.testdict = { - "string": "42", - "int": 42, - "array": [42], - } + def test_rmtree_deletes_nested_dir_with_files(self): + with tempfile.TemporaryDirectory() as parent: + td = pathlib.Path(parent, "testdir") + for d in td, td / "q", td / "s": + d.mkdir() + for f in ( + td / "p", + td / "q" / "w", + td / "q" / "x", + td / "r", + td / "s" / "y", + td / "s" / "z", + ): + f.write_bytes(b"") + + try: + rmtree(td) + except SkipTest as ex: + self.fail(f"rmtree unexpectedly attempts skip: {ex!r}") + + self.assertFalse(td.exists()) + + @skipIf( + sys.platform == "cygwin", + "Cygwin can't set the permissions that make the test meaningful.", + ) + def test_rmtree_deletes_dir_with_readonly_files(self): + # Automatically works on Unix, but requires special handling on Windows. + # Not to be confused with what _tmpdir_to_force_permission_error sets up (see below). + with tempfile.TemporaryDirectory() as parent: + td = pathlib.Path(parent, "testdir") + for d in td, td / "sub": + d.mkdir() + for f in td / "x", td / "sub" / "y": + f.write_bytes(b"") + f.chmod(0) + + try: + rmtree(td) + except SkipTest as ex: + self.fail(f"rmtree unexpectedly attempts skip: {ex!r}") + + self.assertFalse(td.exists()) + + def test_rmtree_can_wrap_exceptions(self): + """rmtree wraps PermissionError when HIDE_WINDOWS_KNOWN_ERRORS is true.""" + with _tmpdir_to_force_permission_error() as td: + # Access the module through sys.modules so it is unambiguous which module's + # attribute we patch: the original git.util, not git.index.util even though + # git.index.util "replaces" git.util and is what "import git.util" gives us. + with mock.patch.object(sys.modules["git.util"], "HIDE_WINDOWS_KNOWN_ERRORS", True): + # Disable common chmod functions so the callback can't fix the problem. + with mock.patch.object(os, "chmod"), mock.patch.object(pathlib.Path, "chmod"): + # Now we can see how an intractable PermissionError is treated. + with self.assertRaises(SkipTest): + rmtree(td) + + @ddt.data( + (False, PermissionError, _tmpdir_to_force_permission_error), + (False, FileNotFoundError, _tmpdir_for_file_not_found), + (True, FileNotFoundError, _tmpdir_for_file_not_found), + ) + def test_rmtree_does_not_wrap_unless_called_for(self, case): + """rmtree doesn't wrap non-PermissionError, nor if HIDE_WINDOWS_KNOWN_ERRORS is false.""" + hide_windows_known_errors, exception_type, tmpdir_context_factory = case + + with tmpdir_context_factory() as td: + # See comments in test_rmtree_can_wrap_exceptions regarding the patching done here. + with mock.patch.object( + sys.modules["git.util"], + "HIDE_WINDOWS_KNOWN_ERRORS", + hide_windows_known_errors, + ): + with mock.patch.object(os, "chmod"), mock.patch.object(pathlib.Path, "chmod"): + with self.assertRaises(exception_type): + try: + rmtree(td) + except SkipTest as ex: + self.fail(f"rmtree unexpectedly attempts skip: {ex!r}") + + @ddt.data("HIDE_WINDOWS_KNOWN_ERRORS", "HIDE_WINDOWS_FREEZE_ERRORS") + def test_env_vars_for_windows_tests(self, name): + def run_parse(value): + command = [ + sys.executable, + "-c", + f"from git.util import {name}; print(repr({name}))", + ] + output = subprocess.check_output( + command, + env=None if value is None else dict(os.environ, **{name: value}), + text=True, + ) + return ast.literal_eval(output) + + for env_var_value, expected_truth_value in ( + (None, os.name == "nt"), # True on Windows when the environment variable is unset. + ("", False), + (" ", False), + ("0", False), + ("1", os.name == "nt"), + ("false", False), + ("true", os.name == "nt"), + ("False", False), + ("True", os.name == "nt"), + ("no", False), + ("yes", os.name == "nt"), + ("NO", False), + ("YES", os.name == "nt"), + (" no ", False), + (" yes ", os.name == "nt"), + ): + with self.subTest(env_var_value=env_var_value): + self.assertIs(run_parse(env_var_value), expected_truth_value) + + _norm_cygpath_pairs = ( + (R"foo\bar", "foo/bar"), + (R"foo/bar", "foo/bar"), + (R"C:\Users", "/cygdrive/c/Users"), + (R"C:\d/e", "/cygdrive/c/d/e"), + ("C:\\", "/cygdrive/c/"), + (R"\\server\C$\Users", "//server/C$/Users"), + (R"\\server\C$", "//server/C$"), + ("\\\\server\\c$\\", "//server/c$/"), + (R"\\server\BAR/", "//server/BAR/"), + (R"D:/Apps", "/cygdrive/d/Apps"), + (R"D:/Apps\fOO", "/cygdrive/d/Apps/fOO"), + (R"D:\Apps/123", "/cygdrive/d/Apps/123"), + ) + + _unc_cygpath_pairs = ( + (R"\\?\a:\com", "/cygdrive/a/com"), + (R"\\?\a:/com", "/cygdrive/a/com"), + (R"\\?\UNC\server\D$\Apps", "//server/D$/Apps"), + ) # FIXME: Mark only the /proc-prefixing cases xfail, somehow (or fix them). @pytest.mark.xfail( @@ -98,16 +231,16 @@ def test_cygpath_ok(self, case): self.assertEqual(cwpath, cpath, wpath) @pytest.mark.xfail( - reason=r'2nd example r".\bar" -> "bar" fails, returns "./bar"', + reason=R'2nd example r".\bar" -> "bar" fails, returns "./bar"', raises=AssertionError, ) @skipUnless(sys.platform == "cygwin", "Paths specifically for Cygwin.") @ddt.data( - (r"./bar", "bar"), - (r".\bar", "bar"), # FIXME: Mark only this one xfail, somehow (or fix it). - (r"../bar", "../bar"), - (r"..\bar", "../bar"), - (r"../bar/.\foo/../chu", "../bar/chu"), + (R"./bar", "bar"), + (R".\bar", "bar"), # FIXME: Mark only this one xfail, somehow (or fix it). + (R"../bar", "../bar"), + (R"..\bar", "../bar"), + (R"../bar/.\foo/../chu", "../bar/chu"), ) def test_cygpath_norm_ok(self, case): wpath, cpath = case @@ -116,12 +249,12 @@ def test_cygpath_norm_ok(self, case): @skipUnless(sys.platform == "cygwin", "Paths specifically for Cygwin.") @ddt.data( - r"C:", - r"C:Relative", - r"D:Apps\123", - r"D:Apps/123", - r"\\?\a:rel", - r"\\share\a:rel", + R"C:", + R"C:Relative", + R"D:Apps\123", + R"D:Apps/123", + R"\\?\a:rel", + R"\\share\a:rel", ) def test_cygpath_invalids(self, wpath): cwpath = cygpath(wpath) @@ -286,15 +419,18 @@ def test_actor_from_string(self): Actor("name last another", "some-very-long-email@example.com"), ) - @ddt.data(("name", ""), ("name", "prefix_")) + @ddt.data( + ("name", ""), + ("name", "prefix_"), + ) def test_iterable_list(self, case): name, prefix = case ilist = IterableList(name, prefix) name1 = "one" name2 = "two" - m1 = TestIterableMember(prefix + name1) - m2 = TestIterableMember(prefix + name2) + m1 = _Member(prefix + name1) + m2 = _Member(prefix + name2) ilist.extend((m1, m2))