-
Notifications
You must be signed in to change notification settings - Fork 3
/
mv_sed_test.py
executable file
·181 lines (162 loc) · 6.95 KB
/
mv_sed_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python3
import collections
import functools
import os
import pathlib
import random
import tempfile
import unittest
import mv_sed
def random_path(rand, median_length, random_segment):
segments = random_list(rand, median_length, random_segment)
return pathlib.PurePosixPath(os.path.join("/", *segments))
def random_list(rand, median_length, random_element):
p = 0.5 ** (1 / median_length) if median_length else 0.0
elements = []
while rand.random() < p:
elements.append(random_element(rand))
return elements
class TestMvSed(unittest.TestCase):
def test_lookup_with_path_compression(self):
m = {0: 0, 1: 1}
self.assertEqual(mv_sed.lookup_with_path_compression(m, 0), 0)
self.assertEqual(m, {0: 0, 1: 1})
m = {0: 0, 1: 0}
self.assertEqual(mv_sed.lookup_with_path_compression(m, 1), 0)
self.assertEqual(m, {0: 0, 1: 0})
m = {0: 0, 1: 0, 2: 1, 3: 2, 4: 4}
self.assertEqual(mv_sed.lookup_with_path_compression(m, 3), 0)
self.assertEqual(m, {0: 0, 1: 0, 2: 0, 3: 0, 4: 4})
def _find_conflicting_paths(self, paths):
conflicts = mv_sed.find_conflicting_paths(paths)
for cause, count in collections.Counter(conflicts.values()).items():
with self.subTest(cause=cause):
# each conflict group should have more than 1 member
self.assertGreater(count, 1)
# each conflict group should point directly to root
self.assertEqual(conflicts[cause], cause)
for conflict, cause in conflicts.items():
with self.subTest(conflict=conflict, cause=cause):
conflict_path = paths[conflict]
cause_path = paths[cause]
if conflict_path == cause_path:
# when paths are equal, root should be the lower index
self.assertLessEqual(cause, conflict)
else:
# otherwise, root should be higher in hierarchy
self.assertIn(cause_path, conflict_path.parents)
# idempotence test
nonconflicting_paths = [path for i, path in enumerate(paths)
if i not in conflicts]
self.assertEqual(
mv_sed.find_conflicting_paths(nonconflicting_paths),
{},
)
# ensure remaining files do not conflict
files = frozenset(nonconflicting_paths)
self.assertCountEqual(files, nonconflicting_paths)
dirs = frozenset(ancestor
for path in files
for ancestor in path.parents)
self.assertEqual(dirs & files, frozenset())
return conflicts
def test_find_conflicting_paths(self):
self.assertEqual(self._find_conflicting_paths([]), {})
self.assertEqual(self._find_conflicting_paths([
pathlib.PureWindowsPath("a"),
pathlib.PureWindowsPath("A/b"),
]), {0: 0, 1: 0})
self.assertEqual(self._find_conflicting_paths([
pathlib.PurePosixPath("a"),
pathlib.PurePosixPath("a/b"),
]), {0: 0, 1: 0})
self.assertEqual(self._find_conflicting_paths([
pathlib.PurePosixPath("a/b/c"),
pathlib.PurePosixPath("a"),
pathlib.PurePosixPath("ab"),
]), {0: 1, 1: 1})
self.assertEqual(self._find_conflicting_paths([
pathlib.PurePosixPath("a"),
pathlib.PurePosixPath("b"),
pathlib.PurePosixPath("a"),
pathlib.PurePosixPath("a/b"),
]), {0: 0, 2: 0, 3: 0})
def test_find_conflicting_paths_random(self):
rand = random.Random(0x1a766736a8255aa6)
rand_segment = lambda r: r.choice(["a", "ab", "b", "c"])
max_num_segments = 30
max_num_paths = 20
n = 100
for i in range(n):
num_segments = i / n * max_num_segments
num_paths = i / n * max_num_paths
rand_path = functools.partial(
random_path,
median_length=num_segments,
random_segment=rand_segment,
)
paths = random_list(rand, num_paths, rand_path)
with self.subTest(paths=paths):
self._find_conflicting_paths(paths)
def test_validate_mass_rename(self):
with tempfile.TemporaryDirectory() as tmp_str:
tmp = pathlib.Path(tmp_str)
for name in ["a", "b", "d", "e", "f", "g"]:
tmp.joinpath(name).write_bytes(b"")
conflict = f"DEST_CONFLICT:{tmp.joinpath('c')}"
self.assertEqual(mv_sed.validate_mass_rename(
[
tmp.joinpath("a"),
tmp.joinpath("b"),
tmp.joinpath("c1"),
tmp.joinpath("c2"),
tmp.joinpath("d"),
tmp.joinpath("f"),
tmp.joinpath("g"),
tmp.joinpath("s"),
tmp.joinpath("m"),
],
[
tmp.joinpath("a"),
tmp.joinpath("r"),
tmp.joinpath("c"),
tmp.joinpath("c"),
tmp.joinpath("e"),
tmp.joinpath("G/f"),
tmp.joinpath("g/f"),
tmp.joinpath("s"),
],
), [
(mv_sed.Action.NOP, tmp.joinpath("a"), tmp.joinpath("a")),
(mv_sed.Action.RENAME, tmp.joinpath("b"), tmp.joinpath("r")),
(conflict, tmp.joinpath("c1"), tmp.joinpath("c")),
(conflict, tmp.joinpath("c2"), tmp.joinpath("c")),
("DEST_EXISTS", tmp.joinpath("d"), tmp.joinpath("e")),
("DEST_PARENT_NOT_EXIST", tmp.joinpath("f"), tmp.joinpath("G/f")),
("DEST_PARENT_NOT_DIR", tmp.joinpath("g"), tmp.joinpath("g/f")),
("SOURCE_NOT_EXIST", tmp.joinpath("s"), tmp.joinpath("s")),
("DEST_NOT_SET", tmp.joinpath("m"), tmp.joinpath("m")),
])
self.assertEqual(mv_sed.validate_mass_rename([], [pathlib.Path("a")]), [
("SOURCE_NOT_SET", pathlib.Path("a"), pathlib.Path("a")),
])
def test_date_substituter(self):
sub1 = mv_sed.date_substituter("", "")
sub2 = mv_sed.date_substituter(
r"\A(?P<mygroup>.*)\Z",
r"\g<mygroup>",
)
for s in ["", "abc", "12345"]:
with self.subTest(s=s):
self.assertEqual(sub1(s), s)
self.assertEqual(sub2(s), s)
self.assertEqual(mv_sed.date_substituter(
r"\A{t:%Y} {t:%m} {t:%d}\Z",
"{t:%Y-%m-%d}",
)("2020 02 21"), "2020-02-21")
self.assertEqual(mv_sed.date_substituter(
r"\A{t:%b} {t:%d}, {t:%Y} {t:%H}:{t:%M} \({u:%Y}\)\Z",
"file{u:%Y}.{t:%Y-%m-%d}_{t:%H%M}.txt",
)("Jan 15, 2019 15:23 (2020)"), "file2020.2019-01-15_1523.txt")
if __name__ == "__main__":
unittest.main()