This repository was archived by the owner on Oct 10, 2022. It is now read-only.
forked from mozilla/agithub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGitHub.py
190 lines (163 loc) · 7.19 KB
/
GitHub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# Copyright 2012-2016 Jonathan Paugh and contributors
# See COPYING for license details
import base64
import time
import re
import logging
from agithub.base import API, ConnectionProperties, Client, RequestBody, ResponseBody
logger = logging.getLogger(__name__)
class GitHub(API):
"""
The agnostic GitHub API. It doesn't know, and you don't care.
>>> from agithub.GitHub import GitHub
>>> g = GitHub('user', 'pass')
>>> status, data = g.issues.get(filter='subscribed')
>>> data
[ list_, of, stuff ]
>>> status, data = g.repos.jpaugh.repla.issues[1].get()
>>> data
{ 'dict': 'my issue data', }
>>> name, repo = 'jpaugh', 'repla'
>>> status, data = g.repos[name][repo].issues[1].get()
same thing
>>> status, data = g.funny.I.donna.remember.that.one.get()
>>> status
404
That's all there is to it. (blah.post() should work, too.)
NOTE: It is up to you to spell things correctly. A GitHub object
doesn't even try to validate the url you feed it. On the other hand,
it automatically supports the full API--so why should you care?
"""
def __init__(self, username=None, password=None, token=None,
*args, **kwargs):
extraHeaders = {'accept': 'application/vnd.github.v3+json'}
auth = self.generateAuthHeader(username, password, token)
if auth is not None:
extraHeaders['authorization'] = auth
props = ConnectionProperties(
api_url=kwargs.pop('api_url', 'api.github.com'),
secure_http=True,
extra_headers=extraHeaders
)
self.setClient(GitHubClient(*args, **kwargs))
self.setConnectionProperties(props)
def generateAuthHeader(self, username=None, password=None, token=None):
if token is not None:
if password is not None:
raise TypeError(
"You cannot use both password and oauth token "
"authenication"
)
return 'Token %s' % token
elif username is not None:
if password is None:
raise TypeError(
"You need a password to authenticate as " + username
)
self.username = username
return self.hash_pass(password)
def hash_pass(self, password):
auth_str = ('%s:%s' % (self.username, password)).encode('utf-8')
return 'Basic '.encode('utf-8') + base64.b64encode(auth_str).strip()
class GitHubClient(Client):
def __init__(self, username=None, password=None, token=None,
connection_properties=None, paginate=False,
sleep_on_ratelimit=True):
super(GitHubClient, self).__init__()
self.paginate = paginate
self.sleep_on_ratelimit = sleep_on_ratelimit
def request(self, method, url, bodyData, headers):
"""Low-level networking. All HTTP-method methods call this"""
headers = self._fix_headers(headers)
url = self.prop.constructUrl(url)
if bodyData is None:
# Sending a content-type w/o the body might break some
# servers. Maybe?
if 'content-type' in headers:
del headers['content-type']
#TODO: Context manager
requestBody = RequestBody(bodyData, headers)
if self.sleep_on_ratelimit and self.no_ratelimit_remaining():
self.sleep_until_more_ratelimit()
while True:
conn = self.get_connection()
conn.request(method, url, requestBody.process(), headers)
response = conn.getresponse()
status = response.status
content = ResponseBody(response)
self.headers = response.getheaders()
conn.close()
if status == 403 and self.sleep_on_ratelimit and self.no_ratelimit_remaining():
self.sleep_until_more_ratelimit()
else:
data = content.processBody()
if self.paginate and type(data) == list:
data.extend(self.get_additional_pages(method, bodyData, headers))
return status, data
def get_additional_pages(self, method, bodyData, headers):
data = []
url = self.get_next_link_url()
if not url:
return data
logger.debug(
'Fetching an additional paginated GitHub response page at '
'{}'.format(url))
status, data = self.request(method, url, bodyData, headers)
if type(data) == list:
data.extend(self.get_additional_pages(method, bodyData, headers))
return data
elif (status == 403 and self.no_ratelimit_remaining()
and not self.sleep_on_ratelimit):
raise TypeError(
'While fetching paginated GitHub response pages, the GitHub '
'ratelimit was reached but sleep_on_ratelimit is disabled. '
'Either enable sleep_on_ratelimit or disable paginate.')
else:
raise TypeError(
'While fetching a paginated GitHub response page, a non-list '
'was returned with status {}: {}'.format(status, data))
def no_ratelimit_remaining(self):
headers = dict(self.headers if self.headers is not None else [])
ratelimit_remaining = int(
headers.get('X-RateLimit-Remaining'.lower(), 1))
return ratelimit_remaining == 0
def ratelimit_seconds_remaining(self):
ratelimit_reset = int(dict(self.headers).get(
'X-RateLimit-Reset'.lower(), 0))
return max(0, int(ratelimit_reset - time.time()) + 1)
def sleep_until_more_ratelimit(self):
logger.debug(
'No GitHub ratelimit remaining. Sleeping for {} seconds until {} '
'before trying API call again.'.format(
self.ratelimit_seconds_remaining(),
time.strftime(
"%H:%M:%S", time.localtime(
time.time() + self.ratelimit_seconds_remaining()))
))
time.sleep(self.ratelimit_seconds_remaining())
def get_next_link_url(self):
"""Given a set of HTTP headers find the RFC 5988 Link header field,
determine if it contains a relation type indicating a next resource and
if so return the URL of the next resource, otherwise return an empty
string."""
# From https://github.com/requests/requests/blob/master/requests/utils.py
for value in [x[1] for x in self.headers if x[0].lower() == 'link']:
replace_chars = ' \'"'
value = value.strip(replace_chars)
if not value:
return ''
for val in re.split(', *<', value):
try:
url, params = val.split(';', 1)
except ValueError:
url, params = val, ''
link = {'url': url.strip('<> \'"')}
for param in params.split(';'):
try:
key, value = param.split('=')
except ValueError:
break
link[key.strip(replace_chars)] = value.strip(replace_chars)
if link.get('rel') == 'next':
return link['url']
return ''