-
Notifications
You must be signed in to change notification settings - Fork 28
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fixes #19
- Loading branch information
Showing
4 changed files
with
341 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
import pathlib | ||
import queue | ||
import stat | ||
import threading | ||
|
||
from mopidy import exceptions | ||
|
||
|
||
class FindError(exceptions.MopidyException): | ||
def __init__(self, message, errno=None): | ||
super().__init__(message, errno) | ||
self.errno = errno | ||
|
||
|
||
def find_mtimes(root, follow=False): | ||
results, errors = _find(root, relative=False, follow=follow) | ||
|
||
# return the mtimes as integer milliseconds | ||
mtimes = {f: int(st.st_mtime * 1000) for f, st in results.items()} | ||
|
||
return mtimes, errors | ||
|
||
|
||
def _find(root, thread_count=10, relative=False, follow=False): | ||
"""Threaded find implementation that provides stat results for files. | ||
Tries to protect against sym/hardlink loops by keeping an eye on parent | ||
(st_dev, st_ino) pairs. | ||
:param Path root: root directory to search from, may not be a file | ||
:param int thread_count: number of workers to use, mainly useful to | ||
mitigate network lag when scanning on NFS etc. | ||
:param bool relative: if results should be relative to root or absolute | ||
:param bool follow: if symlinks should be followed | ||
""" | ||
root = pathlib.Path(root).resolve() | ||
threads = [] | ||
results = {} | ||
errors = {} | ||
done = threading.Event() | ||
work = queue.Queue() | ||
work.put((root, [])) | ||
|
||
if not relative: | ||
root = None | ||
|
||
args = (root, follow, done, work, results, errors) | ||
for _ in range(thread_count): | ||
t = threading.Thread(target=_find_worker, args=args) | ||
t.daemon = True | ||
t.start() | ||
threads.append(t) | ||
|
||
work.join() | ||
done.set() | ||
for t in threads: | ||
t.join() | ||
return results, errors | ||
|
||
|
||
def _find_worker(root, follow, done, work, results, errors): | ||
"""Worker thread for collecting stat() results. | ||
:param Path root: directory to make results relative to | ||
:param bool follow: if symlinks should be followed | ||
:param threading.Event done: event indicating that all work has been done | ||
:param queue.Queue work: queue of paths to process | ||
:param dict results: shared dictionary for storing all the stat() results | ||
:param dict errors: shared dictionary for storing any per path errors | ||
""" | ||
while not done.is_set(): | ||
try: | ||
entry, parents = work.get(block=False) | ||
except queue.Empty: | ||
continue | ||
|
||
if root: | ||
path = entry.relative_to(root) | ||
else: | ||
path = entry | ||
|
||
try: | ||
if follow: | ||
st = entry.stat() | ||
else: | ||
st = entry.lstat() | ||
|
||
if (st.st_dev, st.st_ino) in parents: | ||
errors[path] = FindError("Sym/hardlink loop found.") | ||
continue | ||
|
||
if stat.S_ISDIR(st.st_mode): | ||
for e in entry.iterdir(): | ||
work.put((e, parents + [(st.st_dev, st.st_ino)])) | ||
elif stat.S_ISREG(st.st_mode): | ||
results[path] = st | ||
elif stat.S_ISLNK(st.st_mode): | ||
errors[path] = FindError("Not following symlinks.") | ||
else: | ||
errors[path] = FindError("Not a file or directory.") | ||
|
||
except OSError as exc: | ||
errors[path] = FindError(exc.strerror, exc.errno) | ||
finally: | ||
work.task_done() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
import os | ||
import pathlib | ||
import shutil | ||
|
||
import pytest | ||
|
||
from mopidy import exceptions | ||
from mopidy_local import mtimes | ||
|
||
import tests | ||
|
||
|
||
@pytest.fixture | ||
def tmp_dir_path(tmp_path): | ||
yield tmp_path | ||
if tmp_path.is_dir(): | ||
shutil.rmtree(str(tmp_path)) | ||
|
||
|
||
def test_find_error_is_a_mopidy_exception(): | ||
assert issubclass(mtimes.FindError, exceptions.MopidyException) | ||
|
||
|
||
def test_find_error_can_store_an_errno(): | ||
exc = mtimes.FindError("msg", errno=1234) | ||
|
||
assert exc.message == "msg" | ||
assert exc.errno == 1234 | ||
|
||
|
||
def test_names_are_pathlib_objects(): | ||
result, errors = mtimes.find_mtimes(str(tests.path_to_data_dir(""))) | ||
|
||
for name in list(result.keys()) + list(errors.keys()): | ||
assert isinstance(name, pathlib.Path) | ||
|
||
|
||
def test_nonexistent_dir_is_an_error(tmp_dir_path): | ||
missing_path = tmp_dir_path / "does-not-exist" | ||
|
||
result, errors = mtimes.find_mtimes(missing_path) | ||
|
||
assert result == {} | ||
assert errors == {missing_path: tests.IsA(mtimes.FindError)} | ||
|
||
|
||
def test_empty_dirs_are_not_in_the_result(tmp_dir_path): | ||
"""Empty directories should not show up in results""" | ||
dir_path = tmp_dir_path / "empty" | ||
dir_path.mkdir() | ||
|
||
result, errors = mtimes.find_mtimes(dir_path) | ||
|
||
assert result == {} | ||
assert errors == {} | ||
|
||
|
||
def test_file_as_the_root_just_returns_the_file(tmp_dir_path): | ||
file_path = tmp_dir_path / "single" | ||
file_path.touch() | ||
|
||
result, errors = mtimes.find_mtimes(file_path) | ||
|
||
assert result == {file_path: tests.IsA(int)} | ||
assert errors == {} | ||
|
||
|
||
def test_nested_directories(tmp_dir_path): | ||
# Setup foo/bar and baz directories | ||
foo_path = tmp_dir_path / "foo" / "file" | ||
foo_path.parent.mkdir() | ||
foo_path.touch() | ||
foo_bar_path = tmp_dir_path / "foo" / "bar" / "filee" | ||
foo_bar_path.parent.mkdir() | ||
foo_bar_path.touch() | ||
baz_path = tmp_dir_path / "baz" / "file" | ||
baz_path.parent.mkdir() | ||
baz_path.touch() | ||
|
||
result, errors = mtimes.find_mtimes(tmp_dir_path) | ||
|
||
assert result == { | ||
foo_path: tests.IsA(int), | ||
foo_bar_path: tests.IsA(int), | ||
baz_path: tests.IsA(int), | ||
} | ||
assert errors == {} | ||
|
||
|
||
def test_missing_permission_to_file_is_not_an_error(tmp_dir_path): | ||
"""Missing permissions to a file is not a search error""" | ||
file_path = tmp_dir_path / "file" | ||
file_path.touch(mode=0o000) | ||
|
||
result, errors = mtimes.find_mtimes(tmp_dir_path) | ||
|
||
assert result == {file_path: tests.IsA(int)} | ||
assert errors == {} | ||
|
||
file_path.chmod(0o644) | ||
|
||
|
||
def test_missing_permission_to_directory_is_an_error(tmp_dir_path): | ||
dir_path = tmp_dir_path / "dir" | ||
dir_path.mkdir(mode=0o000) | ||
|
||
result, errors = mtimes.find_mtimes(tmp_dir_path) | ||
|
||
assert result == {} | ||
assert errors == {dir_path: tests.IsA(mtimes.FindError)} | ||
|
||
dir_path.chmod(0o755) | ||
|
||
|
||
def test_symlinks_are_by_default_an_error(tmp_dir_path): | ||
"""By default symlinks should be treated as an error""" | ||
file_path = tmp_dir_path / "file" | ||
file_path.touch() | ||
link_path = tmp_dir_path / "link" | ||
link_path.symlink_to(file_path) | ||
|
||
result, errors = mtimes.find_mtimes(tmp_dir_path) | ||
|
||
assert result == {file_path: tests.IsA(int)} | ||
assert errors == {link_path: tests.IsA(mtimes.FindError)} | ||
|
||
|
||
def test_with_follow_symlink_to_file_as_root_is_followed(tmp_dir_path): | ||
file_path = tmp_dir_path / "file" | ||
file_path.touch() | ||
link_path = tmp_dir_path / "link" | ||
link_path.symlink_to(file_path) | ||
|
||
result, errors = mtimes.find_mtimes(link_path, follow=True) | ||
|
||
assert result == {file_path: tests.IsA(int)} | ||
assert errors == {} | ||
|
||
|
||
def test_symlink_to_directory_is_followed(tmp_dir_path): | ||
file_path = tmp_dir_path / "dir" / "file" | ||
file_path.parent.mkdir() | ||
file_path.touch() | ||
link_path = tmp_dir_path / "link" | ||
link_path.symlink_to(file_path.parent, target_is_directory=True) | ||
|
||
result, errors = mtimes.find_mtimes(link_path, follow=True) | ||
|
||
assert result == {file_path: tests.IsA(int)} | ||
assert errors == {} | ||
|
||
|
||
def test_symlink_pointing_at_itself_fails(tmp_dir_path): | ||
link_path = tmp_dir_path / "link" | ||
link_path.symlink_to(link_path) | ||
|
||
result, errors = mtimes.find_mtimes(tmp_dir_path, follow=True) | ||
|
||
assert result == {} | ||
assert errors == {link_path: tests.IsA(mtimes.FindError)} | ||
|
||
|
||
def test_symlink_pointing_at_parent_fails(tmp_dir_path): | ||
"""We should detect a loop via the parent and give up on the branch""" | ||
|
||
link_path = tmp_dir_path / "link" | ||
link_path.symlink_to(tmp_dir_path, target_is_directory=True) | ||
|
||
result, errors = mtimes.find_mtimes(tmp_dir_path, follow=True) | ||
|
||
assert result == {} | ||
assert errors == {link_path: tests.IsA(Exception)} | ||
|
||
|
||
def test_indirect_symlink_loop(tmp_dir_path): | ||
"""More indirect loops should also be detected""" | ||
# Setup tmpdir/directory/loop where loop points to tmpdir | ||
link_path = tmp_dir_path / "dir" / "link" | ||
link_path.parent.mkdir() | ||
link_path.symlink_to(tmp_dir_path, target_is_directory=True) | ||
|
||
result, errors = mtimes.find_mtimes(tmp_dir_path, follow=True) | ||
|
||
assert result == {} | ||
assert errors == {link_path: tests.IsA(Exception)} | ||
|
||
|
||
def test_symlink_branches_are_not_excluded(tmp_dir_path): | ||
"""Using symlinks to make a file show up multiple times should work""" | ||
file_path = tmp_dir_path / "dir" / "file" | ||
file_path.parent.mkdir() | ||
file_path.touch() | ||
link1_path = tmp_dir_path / "link1" | ||
link1_path.symlink_to(file_path) | ||
link2_path = tmp_dir_path / "link2" | ||
link2_path.symlink_to(file_path) | ||
|
||
result, errors = mtimes.find_mtimes(tmp_dir_path, follow=True) | ||
|
||
assert result == { | ||
file_path: tests.IsA(int), | ||
link1_path: tests.IsA(int), | ||
link2_path: tests.IsA(int), | ||
} | ||
assert errors == {} | ||
|
||
|
||
def test_gives_mtime_in_milliseconds(tmp_dir_path): | ||
file_path = tmp_dir_path / "file" | ||
file_path.touch() | ||
|
||
os.utime(str(file_path), (1, 3.14159265)) | ||
|
||
result, errors = mtimes.find_mtimes(tmp_dir_path) | ||
|
||
assert result == {file_path: 3141} | ||
assert errors == {} |