forked from Udzu/pudzu
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
407 lines (341 loc) · 15.5 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
import bisect
import datetime
import hashlib
import itertools
import logging
import math
import operator as op
import os.path
import random
from collections import abc, OrderedDict, Iterable, Mapping, Counter, namedtuple
from collections.abc import Sequence
from functools import wraps, partial
from importlib import import_module
from inspect import signature
from math import log10, floor, ceil
from time import sleep
from urllib.parse import urlparse
from toolz.dicttoolz import *
from toolz.functoolz import identity
# Configure logging
logging.basicConfig(format='[%(asctime)s] %(name)s:%(levelname)s - %(message)s', datefmt='%H:%M:%S', level=logging.INFO)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
# Classes
class MissingModule(object):
"""A class representing a missing module import: see optional_import."""
def __init__(self, module, bindings):
self._module = module
for k,v in bindings.items():
setattr(self, k, v)
def __getattr__(self, k):
raise ImportError("Missing module: {}".format(self._module))
def __bool__(self):
return False
def __repr__(self):
return "<MissingModule: {}>".format(self._module)
def optional_import(module, **bindings):
"""Optionally load the named module, returning a MissingModule
object on failure, optionally with the given bindings."""
try:
return import_module(module)
except ImportError:
return MissingModule(module, bindings)
def optional_import_from(module, identifier, default=None):
"""Optionally import an identifier from the named module, returning the
default value on failure."""
return optional_import(module).__dict__.get(identifier, default)
class ValueCache():
"""A simple container with a returning assignment operator."""
def __init__(self, value=None):
self.value = value
def __pos__(self):
return self.value
def __repr__(self):
return "ValueCache({})".format(self.value)
def set(self, value):
self.value = value
return value
# Decorators
def number_of_args(fn):
"""Return the number of positional arguments for a function, or None if the number is variable.
Looks inside any decorated functions."""
try:
if hasattr(fn, '__wrapped__'):
return number_of_args(fn.__wrapped__)
if any(p.kind == p.VAR_POSITIONAL for p in signature(fn).parameters.values()):
return None
else:
return sum(p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) for p in signature(fn).parameters.values())
except ValueError:
# signatures don't work for built-in operators, so check for a few explicitly
UNARY_OPS = [len, op.not_, op.truth, op.abs, op.index, op.inv, op.invert, op.neg, op.pos]
BINARY_OPS = [op.lt, op.le, op.gt, op.ge, op.eq, op.ne, op.is_, op.is_not, op.add, op.and_, op.floordiv, op.lshift, op.mod, op.mul, op.or_, op.pow, op.rshift, op.sub, op.truediv, op.xor, op.concat, op.contains, op.countOf, op.delitem, op.getitem, op.indexOf]
TERNARY_OPS = [op.setitem]
if fn in UNARY_OPS:
return 1
elif fn in BINARY_OPS:
return 2
elif fn in TERNARY_OPS:
return 3
else:
raise NotImplementedError("Bult-in operator {} not supported".format(fn))
def all_keyword_args(fn):
"""Return the names of all the keyword arguments for a function, or None if the number is variable.
Looks inside any decorated functions."""
try:
if hasattr(fn, '__wrapped__'):
return all_keyword_args(fn.__wrapped__)
elif any(p.kind == p.VAR_KEYWORD for p in signature(fn).parameters.values()):
return None
else:
return [p.name for p in signature(fn).parameters.values() if p.kind in (p.KEYWORD_ONLY, p.POSITIONAL_OR_KEYWORD)]
except ValueError:
# signatures don't work for built-in operators, so check for a few explicitly, otherwise assume none
BUILTINS = { }
return BUILTINS.get(fn, [])
def ignoring_extra_args(fn):
"""Function decorator that calls the wrapped function with
correct number of positional arguments, discarding any
additional arguments."""
n = number_of_args(fn)
kwa = all_keyword_args(fn)
@wraps(fn)
def wrapper(*args, **kwargs):
return fn(*args[0:n], **keyfilter(lambda k: kwa is None or k in kwa, kwargs))
return wrapper
def ignoring_exceptions(fn, value_on_throw=None, exceptions=Exception):
"""Function decorator that catches exceptions, returning instead."""
@wraps(fn)
def wrapper(*args, **kwargs):
try:
return fn(*args, **kwargs)
except exceptions:
return value_on_throw
return wrapper
def with_retries(fn, max_retries=None, max_duration=None, interval=0.5, exceptions=Exception):
"""Function decorator that retries the function when exceptions are raised."""
@wraps(fn)
def wrapper(*args, **kwargs):
if max_duration is None:
end_time = datetime.datetime.max
else:
end_time = datetime.datetime.now() + datetime.timedelta(seconds=max_duration)
for i in itertools.count() if max_retries is None else range(max_retries):
try:
return fn(*args, **kwargs)
except exceptions:
if i + 1 == max_retries: raise
elif datetime.datetime.now() > datetime.datetime.max: raise
else: sleep(interval)
return wrapper
class cached_property(object):
"""Cached property decorator. Cache expires after a set interval or on deletion."""
def __init__(self, fn, expires_after=None):
self.__doc__ = fn.__doc__
self.fn = fn
self.name = fn.__name__
self.expires_after = expires_after
def __get__(self, obj, owner=None):
if obj is None:
return self
if not hasattr(obj, '_property_cache_expiry_times'):
obj._property_cache_expiry_times = {}
if not hasattr(obj, '_property_cache_values'):
obj._property_cache_values = {}
if (obj._property_cache_expiry_times.get(self.name) is None or
datetime.datetime.now() > obj._property_cache_expiry_times[self.name]):
obj._property_cache_values[self.name] = self.fn(obj)
if self.expires_after is None:
obj._property_cache_expiry_times[self.name] = datetime.datetime.max
else:
obj._property_cache_expiry_times[self.name] = datetime.datetime.now() + datetime.timedelta(seconds=self.expires_after)
return obj._property_cache_values[self.name]
def __delete__(self, obj):
if self.name in getattr(obj, '_property_cache_expiry_times', {}):
del obj._property_cache_expiry_times[self.name]
if self.name in getattr(obj, '_property_cache_values', {}):
del obj._property_cache_values[self.name]
def cached_property_expires_after(expires_after):
return partial(cached_property, expires_after=expires_after)
# Iterables
def non_string_iterable(v):
"""Return whether the object is any Iterable other than str."""
return isinstance(v, Iterable) and not isinstance(v, str)
def make_iterable(v):
"""Return an iterable from an object, wrapping it in a tuple if needed."""
return v if non_string_iterable(v) else () if v is None else (v,)
def non_string_sequence(v, types=None):
"""Return whether the object is a Sequence other than str, optionally
with the given element types."""
return isinstance(v, Sequence) and (types is None or all(any(isinstance(x, t) for t in make_iterable(types)) for x in v))
def make_sequence(v):
"""Return a sequence from an object, wrapping it in a tuple if needed."""
return v if non_string_sequence(v) else () if v is None else (v,)
def batch_iterable(iterable, batch_size):
"""Generator that yields the elements of an iterable n at a time."""
sourceiter = iter(iterable)
while True:
slice = list(islice(sourceiter, batch_size))
if len(slice) == 0: break
yield slice
def repeat_each(iterable, repeats):
"""Generator that yields the elements of an iterable, repeated n times each."""
return (p[0] for p in itertools.product(iterable, range(repeats)))
def generate_ngrams(iterable, n):
"""Generator that yields n-grams from a sequence."""
return zip(*[itertools.islice(it,i,None) for i,it in enumerate(itertools.tee(iterable, n))])
def leafs(iterable):
"""Generator that yields all the leaf nodes of an iterable."""
for x in iterable:
if non_string_iterable(x):
yield from leafs(x)
else:
yield x
def remove_duplicates(seq, key=lambda v:v, keep_last=False):
"""Return an order preserving tuple copy containing items from an iterable, deduplicated
based on the given key function."""
d = OrderedDict()
for x in seq:
k = key(x)
if keep_last and k in d:
del d[k]
if keep_last or k not in d:
d[k] = x
return tuple(d.values())
def first_or_none(seq):
"""Return the first element of a sequence, or None if it's empty."""
return seq[0] if len(seq) > 0 else None
def is_in(x, l):
"""Whether x is the same object as any member of l"""
return any(x is y for y in l)
def update_sequence(s, n, x):
"""Return a tuple copy of s with the nth element replaced by x."""
t = tuple(s)
if -len(t) <= n < len(t):
return t[0:n] + (x,) + t[n+1:0 if n ==-1 else None]
else:
raise IndexError("sequence index out of range")
# Mappings
def make_mapping(v, key_fn=identity):
"""Return a mapping from an object, using a function to generate keys if needed.
Mappings are left as is, iterables are split into elements, everything else is
wrapped in a singleton map."""
if isinstance(v, Mapping): return v
elif non_string_iterable(v): return { ignoring_extra_args(key_fn)(i, x) : x for (i,x) in enumerate(v) }
else: return { ignoring_extra_args(key_fn)(None, v) : v }
def merge_dicts(*dicts, merge_fn=lambda k, *vs: vs[-1]):
"""Merge a collection of dicts using the merge function, which is
a function on conflicting field names and values."""
def item_map(kv): return (kv[0], kv[1][0] if len(kv[1]) == 1 else merge_fn(kv[0], *kv[1]))
return itemmap(item_map, merge_with(list, *dicts))
# Functions
def papply(func, *args, **kwargs):
"""Like functoools.partial, but also postpones evaluation of any positional arguments
with a value of Ellipsis (...): e.g. papply(print, ..., 2, ..., 4)(1, 3, 5) prints 1 2 3 4 5."""
min_args = sum(int(x is Ellipsis) for x in args)
def newfunc(*fargs, **fkwargs):
if len(fargs) < min_args:
raise TypeError("Partial application expects at least {} positional arguments but {} were given".format(min_args, len(fargs)))
newkwargs = kwargs.copy()
newkwargs.update(fkwargs)
newargs, i = [], 0
for arg in args:
if arg is Ellipsis:
newargs.append(fargs[i])
i += 1
else:
newargs.append(arg)
newargs += fargs[i:]
return func(*newargs, **newkwargs)
return newfunc
def artial(func, *args, **kwargs):
"""Like functools.partial, but always omits the first positional argument."""
def newfunc(*fargs, **fkwargs):
if len(fargs) == 0:
raise TypeError("Partial application expects at least 1 positional arguments but 0 were given")
newkwargs = kwargs.copy()
newkwargs.update(fkwargs)
rargs = args + fargs[1:]
return func(fargs[0], *rargs, **newkwargs)
return newfunc
# Data structures
class CaseInsensitiveDict(abc.MutableMapping):
"""Case-insensitive dict."""
def __init__(self, d={}, base_factory=dict):
self._d = base_factory()
self._k = {}
if isinstance(d, abc.Mapping):
for k, v in d.items():
self.__setitem__(k, v)
elif isinstance(d, abc.Iterable):
for (k, v) in d:
self.__setitem__(k, v)
def __getitem__(self, k):
was_missing = k.lower() not in self._d
v = self._d[k.lower()]
if was_missing and k.lower() in self._d:
# must be using a defaultdict of some kind
self._k[k.lower()] = k
return v
def __setitem__(self, k, v):
self._d[k.lower()] = v
self._k[k.lower()] = k
def __delitem__(self, k):
del self._d[k.lower()]
del self._k[k.lower()]
def __iter__(self):
return (self._k[k] for k in self._d)
def __len__(self):
return len(self._d)
def __repr__(self):
return "{" + ", ".join("%r: %r" % (self._k[k], v) for (k, v) in self._d.items()) + "}"
def copy(self):
return CaseInsensitiveDict(self)
# Numeric
def sign(x):
"""Sign indication of a number"""
return 1 if x > 0 else -1 if x < 0 else 0
def round_significant(x, n=1):
"""Round x to n significant digits."""
return 0 if x==0 else round(x, -int(floor(log10(abs(x)))) + (n-1))
def floor_digits(x, n=0):
"""Floor x to n decimal digits."""
return floor(x * 10**n) / 10**n
def floor_significant(x, n=1):
"""Floor x to n significant digits."""
return 0 if x==0 else floor_digits(x, -int(floor(log10(abs(x)))) + (n-1))
def ceil_digits(x, n=0):
"""Ceil x to n decimal digits."""
return ceil(x * 10**n) / 10**n
def ceil_significant(x, n=1):
"""Ceil x to n significant digits."""
return 0 if x==0 else ceil_digits(x, -int(floor(log10(abs(x)))) + (n-1))
def delimit(x, low, high):
"""Delimit x so that it lies between the low and high marks."""
return max(low, min(x, high))
def weighted_choices(seq, weights, n):
"""Return random elements from a sequence, according to the given relative weights."""
cum = list(itertools.accumulate(weights, op.add))
return [seq[bisect.bisect_left(cum, random.uniform(0, cum[-1]))] for i in range(n)]
def weighted_choice(seq, weights):
"""Return a single random elements from a sequence, according to the given relative weights."""
return weighted_choices(seq, weights, n=1)[0]
def _Counter_randoms(self, n):
"""Return random elements from the Counter collection, weighted by count."""
return weighted_choices(list(self.keys()), list(self.values()), n=n)
def _Counter_random(self):
"""Return a single random elements from the Counter collection, weighted by count."""
return weighted_choice(list(self.keys()), list(self.values()))
Counter.randoms = _Counter_randoms
Counter.random = _Counter_random
# Network/io
def printed(o, **kwargs):
"""Print an object and return it"""
return print(o, **kwargs) or o
def url_to_filepath(url):
"""Convert url to a filepath of the form hostname/hash-of-path.extension. Ignores protocol, port, query and fragment."""
uparse = urlparse(url)
upath, uext = os.path.splitext(uparse.path)
uname = hashlib.sha1(upath.encode('utf-8')).hexdigest()
return os.path.join(uparse.netloc, uname + uext)