-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathgitutil.py
182 lines (137 loc) · 5.42 KB
/
gitutil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/usr/bin/env python
# coding=utf8
import sys
import os
import re
from functools import wraps
import git # GitPython
import logbook
log = logbook.Logger('gitutil')
class HandlerException(Exception): pass
class GitRemoteHandler(object):
"""A standalone git-remote-helper handler.
Parses input from git and translates it into method calls on an instance,
also handles the capabilities command.
To use, subclass from GitRemoteHandler and call the run() method. The handler
will read each command and call the corresponding git_ method, i.e. a push
command will be handled by a method called git_push. Any arguments space separated
arguments are split and passed on.
The handler also reads the commandline arguments and supplies two attributes,
remote_name, containing the name of the remote, and remote_address, containing
the remote URI.
See http://www.kernel.org/pub/software/scm/git/docs/git-remote-helpers.html or
git-remote-helpers (1) for details on git-remote-helpers."""
supported_options = []
"""A list of options supported by the remote handler. The "option" command allows
the client to set options, supported options are available through the options
attribute, unsupported ones will be rejected."""
def __init__(self):
self.args = sys.argv[1:]
self.remote_name = 'origin' # no name happens on clone, default is origin
self.remote_address = None
if 2 == len(self.args):
self.remote_address = self.args[1]
if self.args[1] != self.args[0]: self.remote_name = self.args[0]
self.options = {}
self._log = logbook.Logger('gitutil.GitRemoteHandler')
def handle_command(self, line):
"""Dispatch command.
Called by run(), upon reading a command (line), call the corresponding
function.
Tries to flush after a command returns."""
args = line.split(' ')
command = args.pop(0)
self._log.debug('on stdin: %s' % line)
if not hasattr(self, 'git_' + command): raise AttributeError('git requested an unsupported command: %s' % command)
func = getattr(self, 'git_' + command)
if hasattr(func, 'is_git_multiline') and func.is_git_multiline:
lines = []
# read all until an empty line is encountered
while '' != line.rstrip():
lines.append(line)
line = sys.stdin.readline()
func(lines)
else:
func(*args)
try:
sys.stdout.flush()
except IOError:
# git closed the connection
pass
def git_capabilities(self):
"""Capabilities command.
Handles the "capabilities" command of git, every function prefixed with "git_" is assumed
to be a supported command and sent with the leading "git_" stripped."""
caps = []
for name in dir(self):
if 'git_capabilities' == name: continue
attr = getattr(self,name)
if name.startswith('git_') and callable(attr):
caps.append('*' + name[4:] if hasattr(attr, 'git_required') else name[4:])
self._log.debug('sending capabilities: %s' % caps)
for c in caps: print c
# end with a blank line
print
def git_option(self, name, value):
"""Handle git options.
Options set by the git client are stored in the options attribute. Options not found
in supported_options are rejected with an "unsupported" reply."""
if name in self.supported_options:
self.options[name] = value
self._log.debug('option %s: %s' % (name, value))
print "ok"
else:
self._log.debug('option %s unsupported' % name)
print "unsupported"
def run(self):
"""Run, waiting for input.
Blocks and reads commands, until stdin is closed, EOF is encountered or
an empty line is sent."""
self._log.debug('spawned process: %d' % os.getpid())
self._log.debug('remote name: %s' % self.remote_name)
self._log.debug('remote address: %s' % self.remote_address)
while not sys.stdin.closed:
line = sys.stdin.readline()
if '' == line: break # EOF
if '' == line.strip(): break # empty line ends as well
try:
self.handle_command(line.rstrip(os.linesep))
except Exception, e:
self._log.exception(e)
print >>sys.stderr, os.path.basename(sys.argv[0]) + ':', e
break
def multiline_command(f):
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
wrapper.is_git_multiline = True
return wrapper
s3_url_exp = 's3://(?:(?P<key>[^:@]+)(?::(?P<secret>[^@]*))?@)?(?P<bucket>[^:@]+)(?::(?P<prefix>[^@]*))?$'
"""Regular expression used for matching S3 URLs.
s3 bucket URLs have the following format::
s3://access_key:secret_key@bucket_name:some_prefix
The access_key, secret_key parts are optional. Thus it is possible to specify an URL without
any secret information on the commandline::
s3://access_key@some_bucket:/myprefix
The shortest s3 URL is just a bucket with no prefix::
s3://mahbukkit
"""
s3_url_re = re.compile(s3_url_exp)
"""Compiled version of s3_url_exp."""
def parse_s3_url(url):
"""Parse url with s3_url_exp. If the url does not match, raise an Exception."""
m = s3_url_re.match(url)
if not m: raise Exception('Not a valid S3 URL: %s' % url)
return m.groupdict()
def merge_git_config(config_files = ['.git/config', '~/.gitconfig', '/etc/gitconfig']):
"""Loads git configuration files and merges them into a dictionary. Dictionarys
have sections as keys, their values being a dictionary of the sections entries.
Files are merged from back to front, latter inputs are overwritten by those before
them."""
conf = {}
for cf in reversed(config_files):
p = git.config.GitConfigParser(os.path.expanduser(cf))
for sect in p.sections():
for key, value in p.items(sect):
conf.setdefault(sect, {})[key] = value
return conf