-
Notifications
You must be signed in to change notification settings - Fork 335
/
inputs.py
610 lines (472 loc) · 18 KB
/
inputs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
# -*- coding: utf-8 -*-
"""
This module provide some helpers for advanced types parsing.
You can define you own parser using the same pattern:
.. code-block:: python
def my_type(value):
if not condition:
raise ValueError('This is not my type')
return parse(value)
# Swagger documentation
my_type.__schema__ = {'type': 'string', 'format': 'my-custom-format'}
The last line allows you to document properly the type in the Swagger documentation.
"""
from __future__ import unicode_literals
import re
import socket
from datetime import datetime, time, timedelta
from email.utils import parsedate_tz, mktime_tz
from six.moves.urllib.parse import urlparse
import aniso8601
import pytz
# Constants for upgrading date-based intervals to full datetimes.
START_OF_DAY = time(0, 0, 0, tzinfo=pytz.UTC)
END_OF_DAY = time(23, 59, 59, 999999, tzinfo=pytz.UTC)
netloc_regex = re.compile(
r"(?:(?P<auth>[^:@]+?(?::[^:@]*?)?)@)?" # basic auth
r"(?:"
r"(?P<localhost>localhost)|" # localhost...
r"(?P<ipv4>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|" # ...or ipv4
r"(?:\[?(?P<ipv6>[A-F0-9]*:[A-F0-9:]+)\]?)|" # ...or ipv6
r"(?P<domain>(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?))" # domain...
r")"
r"(?::(?P<port>\d+))?" # optional port
r"$",
re.IGNORECASE,
)
email_regex = re.compile(
r"^" "(?P<local>[^@]*[^@.])" r"@" r"(?P<server>[^@]+(?:\.[^@]+)*)" r"$",
re.IGNORECASE,
)
time_regex = re.compile(r"\d{2}:\d{2}")
def ipv4(value):
"""Validate an IPv4 address"""
try:
socket.inet_aton(value)
if value.count(".") == 3:
return value
except socket.error:
pass
raise ValueError("{0} is not a valid ipv4 address".format(value))
ipv4.__schema__ = {"type": "string", "format": "ipv4"}
def ipv6(value):
"""Validate an IPv6 address"""
try:
socket.inet_pton(socket.AF_INET6, value)
return value
except socket.error:
raise ValueError("{0} is not a valid ipv4 address".format(value))
ipv6.__schema__ = {"type": "string", "format": "ipv6"}
def ip(value):
"""Validate an IP address (both IPv4 and IPv6)"""
try:
return ipv4(value)
except ValueError:
pass
try:
return ipv6(value)
except ValueError:
raise ValueError("{0} is not a valid ip".format(value))
ip.__schema__ = {"type": "string", "format": "ip"}
class URL(object):
"""
Validate an URL.
Example::
parser = reqparse.RequestParser()
parser.add_argument('url', type=inputs.URL(schemes=['http', 'https']))
Input to the ``URL`` argument will be rejected
if it does not match an URL with specified constraints.
If ``check`` is True it will also be rejected if the domain does not exists.
:param bool check: Check the domain exists (perform a DNS resolution)
:param bool ip: Allow IP (both ipv4/ipv6) as domain
:param bool local: Allow localhost (both string or ip) as domain
:param bool port: Allow a port to be present
:param bool auth: Allow authentication to be present
:param list|tuple schemes: Restrict valid schemes to this list
:param list|tuple domains: Restrict valid domains to this list
:param list|tuple exclude: Exclude some domains
"""
def __init__(
self,
check=False,
ip=False,
local=False,
port=False,
auth=False,
schemes=None,
domains=None,
exclude=None,
):
self.check = check
self.ip = ip
self.local = local
self.port = port
self.auth = auth
self.schemes = schemes
self.domains = domains
self.exclude = exclude
def error(self, value, details=None):
msg = "{0} is not a valid URL"
if details:
msg = ". ".join((msg, details))
raise ValueError(msg.format(value))
def __call__(self, value):
parsed = urlparse(value)
netloc_match = netloc_regex.match(parsed.netloc)
if not all((parsed.scheme, parsed.netloc)):
if netloc_regex.match(
parsed.netloc or parsed.path.split("/", 1)[0].split("?", 1)[0]
):
self.error(value, "Did you mean: http://{0}")
self.error(value)
if parsed.scheme and self.schemes and parsed.scheme not in self.schemes:
self.error(value, "Protocol is not allowed")
if not netloc_match:
self.error(value)
data = netloc_match.groupdict()
if data["ipv4"] or data["ipv6"]:
if not self.ip:
self.error(value, "IP is not allowed")
else:
try:
ip(data["ipv4"] or data["ipv6"])
except ValueError as e:
self.error(value, str(e))
if not self.local:
if data["ipv4"] and data["ipv4"].startswith("127."):
self.error(value, "Localhost is not allowed")
elif data["ipv6"] == "::1":
self.error(value, "Localhost is not allowed")
if self.check:
pass
if data["auth"] and not self.auth:
self.error(value, "Authentication is not allowed")
if data["localhost"] and not self.local:
self.error(value, "Localhost is not allowed")
if data["port"]:
if not self.port:
self.error(value, "Custom port is not allowed")
else:
port = int(data["port"])
if not 0 < port < 65535:
self.error(value, "Port is out of range")
if data["domain"]:
if self.domains and data["domain"] not in self.domains:
self.error(value, "Domain is not allowed")
elif self.exclude and data["domain"] in self.exclude:
self.error(value, "Domain is not allowed")
if self.check:
try:
socket.getaddrinfo(data["domain"], None)
except socket.error:
self.error(value, "Domain does not exists")
return value
@property
def __schema__(self):
return {
"type": "string",
"format": "url",
}
#: Validate an URL
#:
#: Legacy validator, allows, auth, port, ip and local
#: Only allows schemes 'http', 'https', 'ftp' and 'ftps'
url = URL(
ip=True, auth=True, port=True, local=True, schemes=("http", "https", "ftp", "ftps")
)
class email(object):
"""
Validate an email.
Example::
parser = reqparse.RequestParser()
parser.add_argument('email', type=inputs.email(dns=True))
Input to the ``email`` argument will be rejected if it does not match an email
and if domain does not exists.
:param bool check: Check the domain exists (perform a DNS resolution)
:param bool ip: Allow IP (both ipv4/ipv6) as domain
:param bool local: Allow localhost (both string or ip) as domain
:param list|tuple domains: Restrict valid domains to this list
:param list|tuple exclude: Exclude some domains
"""
def __init__(self, check=False, ip=False, local=False, domains=None, exclude=None):
self.check = check
self.ip = ip
self.local = local
self.domains = domains
self.exclude = exclude
def error(self, value, msg=None):
msg = msg or "{0} is not a valid email"
raise ValueError(msg.format(value))
def is_ip(self, value):
try:
ip(value)
return True
except ValueError:
return False
def __call__(self, value):
match = email_regex.match(value)
if not match or ".." in value:
self.error(value)
server = match.group("server")
if self.check:
try:
socket.getaddrinfo(server, None)
except socket.error:
self.error(value)
if self.domains and server not in self.domains:
self.error(value, "{0} does not belong to the authorized domains")
if self.exclude and server in self.exclude:
self.error(value, "{0} belongs to a forbidden domain")
if not self.local and (
server in ("localhost", "::1") or server.startswith("127.")
):
self.error(value)
if self.is_ip(server) and not self.ip:
self.error(value)
return value
@property
def __schema__(self):
return {
"type": "string",
"format": "email",
}
class regex(object):
"""
Validate a string based on a regular expression.
Example::
parser = reqparse.RequestParser()
parser.add_argument('example', type=inputs.regex('^[0-9]+$'))
Input to the ``example`` argument will be rejected if it contains anything
but numbers.
:param str pattern: The regular expression the input must match
"""
def __init__(self, pattern):
self.pattern = pattern
self.re = re.compile(pattern)
def __call__(self, value):
if not self.re.search(value):
message = 'Value does not match pattern: "{0}"'.format(self.pattern)
raise ValueError(message)
return value
def __deepcopy__(self, memo):
return regex(self.pattern)
@property
def __schema__(self):
return {
"type": "string",
"pattern": self.pattern,
}
def _normalize_interval(start, end, value):
"""
Normalize datetime intervals.
Given a pair of datetime.date or datetime.datetime objects,
returns a 2-tuple of tz-aware UTC datetimes spanning the same interval.
For datetime.date objects, the returned interval starts at 00:00:00.0
on the first date and ends at 00:00:00.0 on the second.
Naive datetimes are upgraded to UTC.
Timezone-aware datetimes are normalized to the UTC tzdata.
Params:
- start: A date or datetime
- end: A date or datetime
"""
if not isinstance(start, datetime):
start = datetime.combine(start, START_OF_DAY)
end = datetime.combine(end, START_OF_DAY)
if start.tzinfo is None:
start = pytz.UTC.localize(start)
end = pytz.UTC.localize(end)
else:
start = start.astimezone(pytz.UTC)
end = end.astimezone(pytz.UTC)
return start, end
def _expand_datetime(start, value):
if not isinstance(start, datetime):
# Expand a single date object to be the interval spanning
# that entire day.
end = start + timedelta(days=1)
else:
# Expand a datetime based on the finest resolution provided
# in the original input string.
time = value.split("T")[1]
time_without_offset = re.sub("[+-].+", "", time)
num_separators = time_without_offset.count(":")
if num_separators == 0:
# Hour resolution
end = start + timedelta(hours=1)
elif num_separators == 1:
# Minute resolution:
end = start + timedelta(minutes=1)
else:
# Second resolution
end = start + timedelta(seconds=1)
return end
def _parse_interval(value):
"""
Do some nasty try/except voodoo to get some sort of datetime
object(s) out of the string.
"""
try:
return sorted(aniso8601.parse_interval(value))
except ValueError:
try:
return aniso8601.parse_datetime(value), None
except ValueError:
return aniso8601.parse_date(value), None
def iso8601interval(value, argument="argument"):
"""
Parses ISO 8601-formatted datetime intervals into tuples of datetimes.
Accepts both a single date(time) or a full interval using either start/end
or start/duration notation, with the following behavior:
- Intervals are defined as inclusive start, exclusive end
- Single datetimes are translated into the interval spanning the
largest resolution not specified in the input value, up to the day.
- The smallest accepted resolution is 1 second.
- All timezones are accepted as values; returned datetimes are
localized to UTC. Naive inputs and date inputs will are assumed UTC.
Examples::
"2013-01-01" -> datetime(2013, 1, 1), datetime(2013, 1, 2)
"2013-01-01T12" -> datetime(2013, 1, 1, 12), datetime(2013, 1, 1, 13)
"2013-01-01/2013-02-28" -> datetime(2013, 1, 1), datetime(2013, 2, 28)
"2013-01-01/P3D" -> datetime(2013, 1, 1), datetime(2013, 1, 4)
"2013-01-01T12:00/PT30M" -> datetime(2013, 1, 1, 12), datetime(2013, 1, 1, 12, 30)
"2013-01-01T06:00/2013-01-01T12:00" -> datetime(2013, 1, 1, 6), datetime(2013, 1, 1, 12)
:param str value: The ISO8601 date time as a string
:return: Two UTC datetimes, the start and the end of the specified interval
:rtype: A tuple (datetime, datetime)
:raises ValueError: if the interval is invalid.
"""
if not value:
raise ValueError("Expected a valid ISO8601 date/time interval.")
try:
start, end = _parse_interval(value)
if end is None:
end = _expand_datetime(start, value)
start, end = _normalize_interval(start, end, value)
except ValueError:
msg = (
"Invalid {arg}: {value}. {arg} must be a valid ISO8601 date/time interval."
)
raise ValueError(msg.format(arg=argument, value=value))
return start, end
iso8601interval.__schema__ = {"type": "string", "format": "iso8601-interval"}
def date(value):
"""Parse a valid looking date in the format YYYY-mm-dd"""
date = datetime.strptime(value, "%Y-%m-%d")
return date
date.__schema__ = {"type": "string", "format": "date"}
def _get_integer(value):
try:
return int(value)
except (TypeError, ValueError):
raise ValueError("{0} is not a valid integer".format(value))
def natural(value, argument="argument"):
"""Restrict input type to the natural numbers (0, 1, 2, 3...)"""
value = _get_integer(value)
if value < 0:
msg = "Invalid {arg}: {value}. {arg} must be a non-negative integer"
raise ValueError(msg.format(arg=argument, value=value))
return value
natural.__schema__ = {"type": "integer", "minimum": 0}
def positive(value, argument="argument"):
"""Restrict input type to the positive integers (1, 2, 3...)"""
value = _get_integer(value)
if value < 1:
msg = "Invalid {arg}: {value}. {arg} must be a positive integer"
raise ValueError(msg.format(arg=argument, value=value))
return value
positive.__schema__ = {"type": "integer", "minimum": 0, "exclusiveMinimum": True}
class int_range(object):
"""Restrict input to an integer in a range (inclusive)"""
def __init__(self, low, high, argument="argument"):
self.low = low
self.high = high
self.argument = argument
def __call__(self, value):
value = _get_integer(value)
if value < self.low or value > self.high:
msg = "Invalid {arg}: {val}. {arg} must be within the range {lo} - {hi}"
raise ValueError(
msg.format(arg=self.argument, val=value, lo=self.low, hi=self.high)
)
return value
@property
def __schema__(self):
return {
"type": "integer",
"minimum": self.low,
"maximum": self.high,
}
def boolean(value):
"""
Parse the string ``"true"`` or ``"false"`` as a boolean (case insensitive).
Also accepts ``"1"`` and ``"0"`` as ``True``/``False`` (respectively).
If the input is from the request JSON body, the type is already a native python boolean,
and will be passed through without further parsing.
:raises ValueError: if the boolean value is invalid
"""
if isinstance(value, bool):
return value
if value is None:
raise ValueError("boolean type must be non-null")
elif not value:
return False
value = str(value).lower()
if value in ("true", "1", "on",):
return True
if value in ("false", "0",):
return False
raise ValueError("Invalid literal for boolean(): {0}".format(value))
boolean.__schema__ = {"type": "boolean"}
def datetime_from_rfc822(value):
"""
Turns an RFC822 formatted date into a datetime object.
Example::
inputs.datetime_from_rfc822('Wed, 02 Oct 2002 08:00:00 EST')
:param str value: The RFC822-complying string to transform
:return: The parsed datetime
:rtype: datetime
:raises ValueError: if value is an invalid date literal
"""
raw = value
if not time_regex.search(value):
value = " ".join((value, "00:00:00"))
try:
timetuple = parsedate_tz(value)
timestamp = mktime_tz(timetuple)
if timetuple[-1] is None:
return datetime.fromtimestamp(timestamp).replace(tzinfo=pytz.utc)
else:
return datetime.fromtimestamp(timestamp, pytz.utc)
except Exception:
raise ValueError('Invalid date literal "{0}"'.format(raw))
def datetime_from_iso8601(value):
"""
Turns an ISO8601 formatted date into a datetime object.
Example::
inputs.datetime_from_iso8601("2012-01-01T23:30:00+02:00")
:param str value: The ISO8601-complying string to transform
:return: A datetime
:rtype: datetime
:raises ValueError: if value is an invalid date literal
"""
try:
try:
return aniso8601.parse_datetime(value)
except ValueError:
date = aniso8601.parse_date(value)
return datetime(date.year, date.month, date.day)
except Exception:
raise ValueError('Invalid date literal "{0}"'.format(value))
datetime_from_iso8601.__schema__ = {"type": "string", "format": "date-time"}
def date_from_iso8601(value):
"""
Turns an ISO8601 formatted date into a date object.
Example::
inputs.date_from_iso8601("2012-01-01")
:param str value: The ISO8601-complying string to transform
:return: A date
:rtype: date
:raises ValueError: if value is an invalid date literal
"""
return datetime_from_iso8601(value).date()
date_from_iso8601.__schema__ = {"type": "string", "format": "date"}