-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgit_http_backend.py
355 lines (312 loc) · 12.8 KB
/
git_http_backend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
import os
import sys
import json
import atexit
import asyncio
import base64
import pprint
import warnings
from aiohttp import web
from pathlib import Path
from io import BytesIO
from typing import Optional
import zipfile
import hashlib
import configparser
import argparse
from pydantic import BaseModel, Field
from atproto import Client, models
import keyring
import snoop
# TODO Make hash_alg configurable
hash_alg = 'sha384'
# TODO DEBUG REMOVE
os.environ["HOME"] = str(Path(__file__).parent.resolve())
parser = argparse.ArgumentParser(prog='atproto-git', usage='%(prog)s [options]')
parser.add_argument('--repos-directory', dest="repos_directory", help='directory for local copies of git repos')
args = parser.parse_args()
if not args.repos_directory or not Path(args.repos_directory).exists():
raise ValueError(f"--repos-directory does not exist {args.repos_directory!r}")
config = configparser.ConfigParser()
config.read(str(Path("~", ".gitconfig").expanduser()))
atproto_handle = config["user"]["atproto"]
atproto_handle_username = atproto_handle.split(".")[0]
atproto_base_url = "https://" + ".".join(atproto_handle.split(".")[1:])
atproto_email = config["user"]["email"]
atproto_password = keyring.get_password(
atproto_email,
".".join(["password", atproto_handle]),
)
class CacheATProtoIndex(BaseModel):
owner_profile: Optional[models.app.bsky.actor.defs.ProfileViewDetailed] = None
root: Optional[models.base.RecordModelBase] = None
entries: dict[str, 'CacheATProtoIndex'] = Field(
default_factory=lambda: {},
)
atproto_index = CacheATProtoIndex()
atproto_index_path = Path("~", ".cache", "atproto_vcs_git_cache.json").expanduser()
atproto_index_path.parent.mkdir(parents=True, exist_ok=True)
atexit.register(
lambda: atproto_index_path.write_text(
atproto_index.model_dump_json(),
)
)
if atproto_index_path.exists():
atproto_index = CacheATProtoIndex.model_validate_json(atproto_index_path.read_text())
client = Client(
base_url=atproto_base_url,
)
client.login(
atproto_handle,
atproto_password,
)
if atproto_index.owner_profile is None:
atproto_index.owner_profile = client.get_profile(atproto_handle)
atproto_index.root = atproto_index.owner_profile.pinned_post
if atproto_index.root is None:
# TODO Learn how to pin post
warnings.warn("TODO Learn how to pin post")
# TODO Create index post if not exists
warnings.warn("TODO Create index post if not exists")
# post = client.send_post('index')
# snoop.pp(post)
atproto_index.root = models.base.RecordModelBase(
# uri='at://did:plc:vjnm5ukoaxy4fi4clcqhagud/app.bsky.feed.post/3lbnnsi6vzc2l',
# cid='bafyreifu2tccoiq3ylpc3qhnbwdgxfnxwcnphgyptxkbxkovi7d5c7hwo4',
uri="at://did:plc:vjnm5ukoaxy4fi4clcqhagud/app.bsky.feed.post/3lbnvyk3dgk2l",
cid="bafyreigs4ihxc55x7qyw2epffa6duphyh2kmcbwe634jmg3ccy3brcw7ma",
)
def atproto_index_read(client, index, depth: int = None):
for index_type, index_entry in client.get_post_thread(
index.root.uri,
depth=depth,
):
if index_type == 'thread':
if index_entry.post.author.did == index.owner_profile.did:
pprint.pprint(json.loads(index_entry.model_dump_json()))
for reply in index_entry.replies:
if reply.post.author.did == index.owner_profile.did:
sub_index = index.__class__(
owner_profile=index.owner_profile,
root=models.base.RecordModelBase(
uri=reply.post.uri,
cid=reply.post.cid,
)
)
atproto_index_read(client, sub_index, depth=depth)
if reply.post.record.text in index.entries:
index.entries[reply.post.record.text].entries.update(
sub_index.entries,
)
else:
index.entries[reply.post.record.text] = sub_index
elif index_type == 'threadgate':
pass
else:
warnings.warn(f"Unkown get_post_thread().index_type: {index_type!r}: {pprint.pformat(index_entry)}")
atproto_index_read(client, atproto_index)
sys.exit(0)
def atproto_index_create(index, index_entry_key, data_as_image: bytes = None, data_as_image_hash: str = None):
if index_entry_key in index.entries:
return
parent = models.create_strong_ref(index.root)
root = models.create_strong_ref(index.root)
method = client.send_post
kwargs = {}
if data_as_image is not None:
method = client.send_image
kwargs["image"] = data_as_image
if data_as_image_hash is not None:
kwargs["image_alt"] = data_as_image_hash
post = method(
text=index_entry_key,
reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent, root=root),
**kwargs,
)
index.entries[index_entry_key] = index.__class__(
owner_profile=index.owner_profile,
root=models.base.RecordModelBase(
uri=post.uri,
cid=post.cid,
)
)
atproto_index_read(client, atproto_index)
atproto_index_create(atproto_index, "vcs")
atproto_index_create(atproto_index.entries["vcs"], "git")
atproto_index_read(client, atproto_index.entries["vcs"].entries["git"])
# Configuration
GIT_PROJECT_ROOT = args.repos_directory
GIT_HTTP_EXPORT_ALL = "1"
# Ensure the project root exists
os.makedirs(GIT_PROJECT_ROOT, exist_ok=True)
for repo_name in atproto_index.entries["vcs"].entries["git"].entries:
snoop.pp(repo_name)
sys.exit(0)
# Utility to list all internal files in a Git repository
def list_git_internal_files(repo_path):
files = []
git_dir = Path(repo_path)
for file in git_dir.rglob("*"):
if file.is_file():
yield file
# Create a zip archive containing the internal files
def create_zip_of_files(files):
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file in files:
arcname = str(file.relative_to(file.anchor))
zipf.write(file, arcname=arcname)
zip_buffer.seek(0)
return zip_buffer.read()
# Create a PNG image that also contains the zip archive
def create_png_with_zip(zip_data):
# Create a minimal PNG header
png_header = (
b'\x89PNG\r\n\x1a\n' # PNG signature
b'\x00\x00\x00\r' # IHDR chunk length
b'IHDR' # IHDR chunk type
b'\x00\x00\x00\x01' # Width: 1
b'\x00\x00\x00\x01' # Height: 1
b'\x08' # Bit depth: 8
b'\x02' # Color type: Truecolor
b'\x00' # Compression method
b'\x00' # Filter method
b'\x00' # Interlace method
b'\x90wS\xde' # CRC
b'\x00\x00\x00\x0a' # IDAT chunk length
b'IDAT' # IDAT chunk type
b'\x78\x9c\x63\x60\x00\x00\x00\x02\x00\x01' # Compressed data
b'\x02\x7e\xe5\x45' # CRC
b'\x00\x00\x00\x00' # IEND chunk length
b'IEND' # IEND chunk type
b'\xaeB`\x82' # CRC
)
# Combine the PNG header and the zip data
png_zip_data = png_header + zip_data
return png_zip_data
# Handle Git HTTP Backend requests
async def handle_git_backend_request(request):
global hash_alg
path_info = request.match_info.get("path", "")
env = {
"GIT_PROJECT_ROOT": GIT_PROJECT_ROOT,
"GIT_HTTP_EXPORT_ALL": GIT_HTTP_EXPORT_ALL,
"PATH_INFO": f"/{path_info}",
"REMOTE_USER": request.remote or "",
"REMOTE_ADDR": request.transport.get_extra_info("peername")[0],
"REQUEST_METHOD": request.method,
"QUERY_STRING": request.query_string,
"CONTENT_TYPE": request.headers.get("Content-Type", ""),
}
# Copy relevant HTTP headers to environment variables
for header in ("Content-Type", "User-Agent", "Accept-Encoding", "Pragma"):
header_value = request.headers.get(header)
if header_value:
env["HTTP_" + header.upper().replace("-", "_")] = header_value
# Prepare the subprocess to run git http-backend
proc = await asyncio.create_subprocess_exec(
"git", "http-backend",
env=env,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=sys.stderr, # Output stderr to the server's stderr
)
# Forward the request body to git http-backend
async def write_to_git(stdin):
try:
async for chunk in request.content.iter_chunked(4096):
stdin.write(chunk)
await stdin.drain()
except Exception as e:
print(f"Error writing to git http-backend: {e}", file=sys.stderr)
finally:
if not stdin.is_closing():
stdin.close()
# Read the response from git http-backend and send it back to the client
async def read_from_git(stdout, response):
headers = {}
headers_received = False
buffer = b""
while True:
chunk = await stdout.read(4096)
if not chunk:
break
buffer += chunk
if not headers_received:
header_end = buffer.find(b'\r\n\r\n')
if header_end != -1:
header_data = buffer[:header_end].decode('utf-8', errors='replace')
body = buffer[header_end+4:]
# Parse headers
for line in header_data.split('\r\n'):
if line:
key, value = line.split(':', 1)
headers[key.strip()] = value.strip()
# Send headers to the client
for key, value in headers.items():
response.headers[key] = value
await response.prepare(request)
await response.write(body)
headers_received = True
buffer = b""
else:
# Send body to the client
await response.write(chunk)
if not headers_received:
# If no headers were sent, send what we have
await response.prepare(request)
await response.write(buffer)
await response.write_eof()
# Create a StreamResponse to send data back to the client
response = web.StreamResponse()
# Run the read and write tasks concurrently
await asyncio.gather(
write_to_git(proc.stdin),
read_from_git(proc.stdout, response),
)
# Wait for the subprocess to finish
await proc.wait()
# Handle push events (git-receive-pack)
print(f"path_info: {path_info}")
if path_info.endswith("git-receive-pack"):
repo_name = Path(path_info).parent.name
repo_path = Path(GIT_PROJECT_ROOT, repo_name)
if repo_name.endswith(".git"):
repo_name = repo_name[:-4]
atproto_index_create(atproto_index.entries["vcs"].entries["git"], repo_name)
for internal_file in list_git_internal_files(repo_path):
repo_file_path = str(internal_file.relative_to(repo_path))
print(f"Updated internal file in {repo_name}: {repo_file_path}")
# Create zip archive of internal files
zip_data = create_zip_of_files([internal_file])
# Create PNG with embedded zip
png_zip_data = create_png_with_zip(zip_data)
# Base64 encode the PNG data
# encoded_data = base64.b64encode(png_zip_data).decode('utf-8')
# Output the data URL
# data_url = f"data:image/png;base64,{encoded_data}"
# print(data_url)
# atproto_index_create(atproto_index.entries["vcs"]["git"], data_url)
hash_instance = hashlib.new(hash_alg)
hash_instance.update(internal_file.read_bytes())
data_as_image_hash = hash_instance.hexdigest()
atproto_index_create(
atproto_index.entries["vcs"].entries["git"].entries[repo_name],
repo_file_path,
data_as_image=png_zip_data,
data_as_image_hash=f"{hash_alg}:{data_as_image_hash}",
)
return response
# Set up the application
app = web.Application()
app.router.add_route("*", "/{path:.*}", handle_git_backend_request)
if __name__ == "__main__":
# Ensure there is a bare Git repository for testing
test_repo_path = os.path.join(GIT_PROJECT_ROOT, "my-repo.git")
if not os.path.exists(test_repo_path):
os.makedirs(GIT_PROJECT_ROOT, exist_ok=True)
os.system(f"git init --bare {test_repo_path}")
os.system(f"rm -rf {test_repo_path}/hooks/")
print(f"Initialized bare repository at {test_repo_path}")
# Start the server
web.run_app(app, host="0.0.0.0", port=8080)