forked from angeloxx/swagger2modsec
-
Notifications
You must be signed in to change notification settings - Fork 0
/
swagger.py
90 lines (66 loc) · 3.15 KB
/
swagger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/python
import logging, sys, re, os, json, requests, io, yaml, coloredlogs
from py_essentials import hashing as hs
class Swagger:
def __init__(self, filename, logger):
self.endpoints = []
self.checksum = ""
self.filename = filename
self.logger = logger
try:
with open(self.filename) as json_file:
self.swagger = json.load(json_file)
except Exception as e:
self.logger.error("Error reading file: {0}".format(str(e)))
sys.exit(1)
self.checksum = hs.fileChecksum(self.filename, "sha256")
self.__getEndpoints()
def __getEndpoints(self):
for path in self.swagger["paths"]:
self.endpoints.append(path)
def getEndpointMethods(self, endpoint):
ret = []
if not endpoint in self.swagger["paths"]:
return ret
for method in self.swagger["paths"][endpoint]:
ret.append(method)
return ret
def getEndpoints(self):
return self.endpoints
def getEndpointURIParameterValidator(self, _endpoint, _parameter, _method = ""):
# NOTE: the validator for an URI parameter SHOULD be the same, the script
# will use the FIRST match if method is empty and REQUIRED=false is not supported
try:
for method in self.swagger["paths"][_endpoint]:
if method != "" or method == _method:
for parameterValue in self.swagger["paths"][_endpoint][method]["parameters"]:
if parameterValue["name"] == _parameter and "type" in parameterValue:
if parameterValue["type"] == "integer":
return "[0-9]+"
if parameterValue["type"] == "string":
return "[\w\s\d]+"
if parameterValue["type"] == "number":
if parameterValue["format"] == "double":
return "(-?)(0|([1-9][0-9]*))(\\.[0-9]+)?"
except Exception as e:
self.logger.error("getEndpointURIParameterValidator({0},{1},{2})".format(_endpoint, _parameter, _method))
self.logger.error("{0}".format(e))
sys.exit(1)
return ""
def getEndpointArguments(self, _endpoint, _method):
ret = []
for parameterValue in self.swagger["paths"][_endpoint][_method]["parameters"]:
ret.append(parameterValue["name"])
return ret
def endpointRequestURI(self, endpoint):
if not "{" in endpoint:
return "@streq {}".format(endpoint)
endpointURI = endpoint.replace("/","\/")
for parameter in re.findall("\{(\w+)\}",endpoint):
validator = self.getEndpointURIParameterValidator(endpoint,parameter)
endpointURI = endpointURI.replace("{0}".format("{"+parameter+"}"), validator)
return "^{}$".format(endpointURI)
def exportYaml(self,yamlfile):
yamlfilecontent = yaml.dump(self.swagger, default_flow_style=False)
with open(yamlfile, 'w') as f:
f.write(yamlfilecontent)