-
Notifications
You must be signed in to change notification settings - Fork 15.7k
/
github.py
858 lines (773 loc) Β· 31.6 KB
/
github.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
"""Util that calls GitHub."""
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional
import requests
from langchain_core.utils import get_from_dict_or_env
from pydantic import BaseModel, ConfigDict, model_validator
if TYPE_CHECKING:
from github.Issue import Issue
from github.PullRequest import PullRequest
def _import_tiktoken() -> Any:
"""Import tiktoken."""
try:
import tiktoken
except ImportError:
raise ImportError(
"tiktoken is not installed. "
"Please install it with `pip install tiktoken`"
)
return tiktoken
class GitHubAPIWrapper(BaseModel):
"""Wrapper for GitHub API."""
github: Any = None #: :meta private:
github_repo_instance: Any = None #: :meta private:
github_repository: Optional[str] = None
github_app_id: Optional[str] = None
github_app_private_key: Optional[str] = None
active_branch: Optional[str] = None
github_base_branch: Optional[str] = None
model_config = ConfigDict(
extra="forbid",
)
@model_validator(mode="before")
@classmethod
def validate_environment(cls, values: Dict) -> Any:
"""Validate that api key and python package exists in environment."""
github_repository = get_from_dict_or_env(
values, "github_repository", "GITHUB_REPOSITORY"
)
github_app_id = get_from_dict_or_env(values, "github_app_id", "GITHUB_APP_ID")
github_app_private_key = get_from_dict_or_env(
values, "github_app_private_key", "GITHUB_APP_PRIVATE_KEY"
)
try:
from github import Auth, GithubIntegration
except ImportError:
raise ImportError(
"PyGithub is not installed. "
"Please install it with `pip install PyGithub`"
)
try:
# interpret the key as a file path
# fallback to interpreting as the key itself
with open(github_app_private_key, "r") as f:
private_key = f.read()
except Exception:
private_key = github_app_private_key
auth = Auth.AppAuth(
github_app_id,
private_key,
)
gi = GithubIntegration(auth=auth)
installation = gi.get_installations()
if not installation:
raise ValueError(
f"Please make sure to install the created github app with id "
f"{github_app_id} on the repo: {github_repository}"
"More instructions can be found at "
"https://docs.github.com/en/apps/using-"
"github-apps/installing-your-own-github-app"
)
try:
installation = installation[0]
except ValueError as e:
raise ValueError(
"Please make sure to give correct github parameters "
f"Error message: {e}"
)
# create a GitHub instance:
g = installation.get_github_for_installation()
repo = g.get_repo(github_repository)
github_base_branch = get_from_dict_or_env(
values,
"github_base_branch",
"GITHUB_BASE_BRANCH",
default=repo.default_branch,
)
active_branch = get_from_dict_or_env(
values,
"active_branch",
"ACTIVE_BRANCH",
default=repo.default_branch,
)
values["github"] = g
values["github_repo_instance"] = repo
values["github_repository"] = github_repository
values["github_app_id"] = github_app_id
values["github_app_private_key"] = github_app_private_key
values["active_branch"] = active_branch
values["github_base_branch"] = github_base_branch
return values
def parse_issues(self, issues: List[Issue]) -> List[dict]:
"""
Extracts title and number from each Issue and puts them in a dictionary
Parameters:
issues(List[Issue]): A list of Github Issue objects
Returns:
List[dict]: A dictionary of issue titles and numbers
"""
parsed = []
for issue in issues:
title = issue.title
number = issue.number
opened_by = issue.user.login if issue.user else None
issue_dict = {"title": title, "number": number}
if opened_by is not None:
issue_dict["opened_by"] = opened_by
parsed.append(issue_dict)
return parsed
def parse_pull_requests(self, pull_requests: List[PullRequest]) -> List[dict]:
"""
Extracts title and number from each Issue and puts them in a dictionary
Parameters:
issues(List[Issue]): A list of Github Issue objects
Returns:
List[dict]: A dictionary of issue titles and numbers
"""
parsed = []
for pr in pull_requests:
parsed.append(
{
"title": pr.title,
"number": pr.number,
"commits": str(pr.commits),
"comments": str(pr.comments),
}
)
return parsed
def get_issues(self) -> str:
"""
Fetches all open issues from the repo excluding pull requests
Returns:
str: A plaintext report containing the number of issues
and each issue's title and number.
"""
issues = self.github_repo_instance.get_issues(state="open")
# Filter out pull requests (part of GH issues object)
issues = [issue for issue in issues if not issue.pull_request]
if issues:
parsed_issues = self.parse_issues(issues)
parsed_issues_str = (
"Found " + str(len(parsed_issues)) + " issues:\n" + str(parsed_issues)
)
return parsed_issues_str
else:
return "No open issues available"
def list_open_pull_requests(self) -> str:
"""
Fetches all open PRs from the repo
Returns:
str: A plaintext report containing the number of PRs
and each PR's title and number.
"""
# issues = self.github_repo_instance.get_issues(state="open")
pull_requests = self.github_repo_instance.get_pulls(state="open")
if pull_requests.totalCount > 0:
parsed_prs = self.parse_pull_requests(pull_requests)
parsed_prs_str = (
"Found " + str(len(parsed_prs)) + " pull requests:\n" + str(parsed_prs)
)
return parsed_prs_str
else:
return "No open pull requests available"
def list_files_in_main_branch(self) -> str:
"""
Fetches all files in the main branch of the repo.
Returns:
str: A plaintext report containing the paths and names of the files.
"""
files: List[str] = []
try:
contents = self.github_repo_instance.get_contents(
"", ref=self.github_base_branch
)
for content in contents:
if content.type == "dir":
files.extend(self._list_files(content.path))
else:
files.append(content.path)
if files:
files_str = "\n".join(files)
return f"Found {len(files)} files in the main branch:\n{files_str}"
else:
return "No files found in the main branch"
except Exception as e:
return str(e)
def set_active_branch(self, branch_name: str) -> str:
"""Equivalent to `git checkout branch_name` for this Agent.
Clones formatting from Github.
Returns an Error (as a string) if branch doesn't exist.
"""
curr_branches = [
branch.name for branch in self.github_repo_instance.get_branches()
]
if branch_name in curr_branches:
self.active_branch = branch_name
return f"Switched to branch `{branch_name}`"
else:
return (
f"Error {branch_name} does not exist,"
f"in repo with current branches: {str(curr_branches)}"
)
def list_branches_in_repo(self) -> str:
"""
Fetches a list of all branches in the repository.
Returns:
str: A plaintext report containing the names of the branches.
"""
try:
branches = [
branch.name for branch in self.github_repo_instance.get_branches()
]
if branches:
branches_str = "\n".join(branches)
return (
f"Found {len(branches)} branches in the repository:"
f"\n{branches_str}"
)
else:
return "No branches found in the repository"
except Exception as e:
return str(e)
def create_branch(self, proposed_branch_name: str) -> str:
"""
Create a new branch, and set it as the active bot branch.
Equivalent to `git switch -c proposed_branch_name`
If the proposed branch already exists, we append _v1 then _v2...
until a unique name is found.
Returns:
str: A plaintext success message.
"""
from github import GithubException
i = 0
new_branch_name = proposed_branch_name
base_branch = self.github_repo_instance.get_branch(
self.github_repo_instance.default_branch
)
for i in range(1000):
try:
self.github_repo_instance.create_git_ref(
ref=f"refs/heads/{new_branch_name}", sha=base_branch.commit.sha
)
self.active_branch = new_branch_name
return (
f"Branch '{new_branch_name}' "
"created successfully, and set as current active branch."
)
except GithubException as e:
if e.status == 422 and "Reference already exists" in e.data["message"]:
i += 1
new_branch_name = f"{proposed_branch_name}_v{i}"
else:
# Handle any other exceptions
print(f"Failed to create branch. Error: {e}") # noqa: T201
raise Exception(
"Unable to create branch name from proposed_branch_name: "
f"{proposed_branch_name}"
)
return (
"Unable to create branch. "
"At least 1000 branches exist with named derived from "
f"proposed_branch_name: `{proposed_branch_name}`"
)
def list_files_in_bot_branch(self) -> str:
"""
Fetches all files in the active branch of the repo,
the branch the bot uses to make changes.
Returns:
str: A plaintext list containing the filepaths in the branch.
"""
files: List[str] = []
try:
contents = self.github_repo_instance.get_contents(
"", ref=self.active_branch
)
for content in contents:
if content.type == "dir":
files.extend(self._list_files(content.path))
else:
files.append(content.path)
if files:
files_str = "\n".join(files)
return (
f"Found {len(files)} files in branch `{self.active_branch}`:\n"
f"{files_str}"
)
else:
return f"No files found in branch: `{self.active_branch}`"
except Exception as e:
return f"Error: {e}"
def get_files_from_directory(self, directory_path: str) -> str:
"""
Recursively fetches files from a directory in the repo.
Parameters:
directory_path (str): Path to the directory
Returns:
str: List of file paths, or an error message.
"""
from github import GithubException
try:
return str(self._list_files(directory_path))
except GithubException as e:
return f"Error: status code {e.status}, {e.message}"
def _list_files(self, directory_path: str) -> List[str]:
files: List[str] = []
contents = self.github_repo_instance.get_contents(
directory_path, ref=self.active_branch
)
for content in contents:
if content.type == "dir":
files.extend(self._list_files(content.path))
else:
files.append(content.path)
return files
def get_issue(self, issue_number: int) -> Dict[str, Any]:
"""
Fetches a specific issue and its first 10 comments
Parameters:
issue_number(int): The number for the github issue
Returns:
dict: A dictionary containing the issue's title,
body, comments as a string, and the username of the user
who opened the issue
"""
issue = self.github_repo_instance.get_issue(number=issue_number)
page = 0
comments: List[dict] = []
while len(comments) <= 10:
comments_page = issue.get_comments().get_page(page)
if len(comments_page) == 0:
break
for comment in comments_page:
comments.append({"body": comment.body, "user": comment.user.login})
page += 1
opened_by = None
if issue.user and issue.user.login:
opened_by = issue.user.login
return {
"number": issue_number,
"title": issue.title,
"body": issue.body,
"comments": str(comments),
"opened_by": str(opened_by),
}
def list_pull_request_files(self, pr_number: int) -> List[Dict[str, Any]]:
"""Fetches the full text of all files in a PR. Truncates after first 3k tokens.
# TODO: Enhancement to summarize files with ctags if they're getting long.
Args:
pr_number(int): The number of the pull request on Github
Returns:
dict: A dictionary containing the issue's title,
body, and comments as a string
"""
tiktoken = _import_tiktoken()
MAX_TOKENS_FOR_FILES = 3_000
pr_files = []
pr = self.github_repo_instance.get_pull(number=int(pr_number))
total_tokens = 0
page = 0
while True: # or while (total_tokens + tiktoken()) < MAX_TOKENS_FOR_FILES:
files_page = pr.get_files().get_page(page)
if len(files_page) == 0:
break
for file in files_page:
try:
file_metadata_response = requests.get(file.contents_url)
if file_metadata_response.status_code == 200:
download_url = json.loads(file_metadata_response.text)[
"download_url"
]
else:
print(f"Failed to download file: {file.contents_url}, skipping") # noqa: T201
continue
file_content_response = requests.get(download_url)
if file_content_response.status_code == 200:
# Save the content as a UTF-8 string
file_content = file_content_response.text
else:
print( # noqa: T201
"Failed downloading file content "
f"(Error {file_content_response.status_code}). Skipping"
)
continue
file_tokens = len(
tiktoken.get_encoding("cl100k_base").encode(
file_content + file.filename + "file_name file_contents"
)
)
if (total_tokens + file_tokens) < MAX_TOKENS_FOR_FILES:
pr_files.append(
{
"filename": file.filename,
"contents": file_content,
"additions": file.additions,
"deletions": file.deletions,
}
)
total_tokens += file_tokens
except Exception as e:
print(f"Error when reading files from a PR on github. {e}") # noqa: T201
page += 1
return pr_files
def get_pull_request(self, pr_number: int) -> Dict[str, Any]:
"""
Fetches a specific pull request and its first 10 comments,
limited by max_tokens.
Parameters:
pr_number(int): The number for the Github pull
max_tokens(int): The maximum number of tokens in the response
Returns:
dict: A dictionary containing the pull's title, body,
and comments as a string
"""
max_tokens = 2_000
pull = self.github_repo_instance.get_pull(number=pr_number)
total_tokens = 0
def get_tokens(text: str) -> int:
tiktoken = _import_tiktoken()
return len(tiktoken.get_encoding("cl100k_base").encode(text))
def add_to_dict(data_dict: Dict[str, Any], key: str, value: str) -> None:
nonlocal total_tokens # Declare total_tokens as nonlocal
tokens = get_tokens(value)
if total_tokens + tokens <= max_tokens:
data_dict[key] = value
total_tokens += tokens # Now this will modify the outer variable
response_dict: Dict[str, str] = {}
add_to_dict(response_dict, "title", pull.title)
add_to_dict(response_dict, "number", str(pr_number))
add_to_dict(response_dict, "body", pull.body if pull.body else "")
comments: List[str] = []
page = 0
while len(comments) <= 10:
comments_page = pull.get_issue_comments().get_page(page)
if len(comments_page) == 0:
break
for comment in comments_page:
comment_str = str({"body": comment.body, "user": comment.user.login})
if total_tokens + get_tokens(comment_str) > max_tokens:
break
comments.append(comment_str)
total_tokens += get_tokens(comment_str)
page += 1
add_to_dict(response_dict, "comments", str(comments))
commits: List[str] = []
page = 0
while len(commits) <= 10:
commits_page = pull.get_commits().get_page(page)
if len(commits_page) == 0:
break
for commit in commits_page:
commit_str = str({"message": commit.commit.message})
if total_tokens + get_tokens(commit_str) > max_tokens:
break
commits.append(commit_str)
total_tokens += get_tokens(commit_str)
page += 1
add_to_dict(response_dict, "commits", str(commits))
return response_dict
def create_pull_request(self, pr_query: str) -> str:
"""
Makes a pull request from the bot's branch to the base branch
Parameters:
pr_query(str): a string which contains the PR title
and the PR body. The title is the first line
in the string, and the body are the rest of the string.
For example, "Updated README\nmade changes to add info"
Returns:
str: A success or failure message
"""
if self.github_base_branch == self.active_branch:
return """Cannot make a pull request because
commits are already in the main or master branch."""
else:
try:
title = pr_query.split("\n")[0]
body = pr_query[len(title) + 2 :]
pr = self.github_repo_instance.create_pull(
title=title,
body=body,
head=self.active_branch,
base=self.github_base_branch,
)
return f"Successfully created PR number {pr.number}"
except Exception as e:
return "Unable to make pull request due to error:\n" + str(e)
def comment_on_issue(self, comment_query: str) -> str:
"""
Adds a comment to a github issue
Parameters:
comment_query(str): a string which contains the issue number,
two newlines, and the comment.
for example: "1\n\nWorking on it now"
adds the comment "working on it now" to issue 1
Returns:
str: A success or failure message
"""
issue_number = int(comment_query.split("\n\n")[0])
comment = comment_query[len(str(issue_number)) + 2 :]
try:
issue = self.github_repo_instance.get_issue(number=issue_number)
issue.create_comment(comment)
return "Commented on issue " + str(issue_number)
except Exception as e:
return "Unable to make comment due to error:\n" + str(e)
def create_file(self, file_query: str) -> str:
"""
Creates a new file on the Github repo
Parameters:
file_query(str): a string which contains the file path
and the file contents. The file path is the first line
in the string, and the contents are the rest of the string.
For example, "hello_world.md\n# Hello World!"
Returns:
str: A success or failure message
"""
if self.active_branch == self.github_base_branch:
return (
"You're attempting to commit to the directly to the"
f"{self.github_base_branch} branch, which is protected. "
"Please create a new branch and try again."
)
file_path = file_query.split("\n")[0]
file_contents = file_query[len(file_path) + 2 :]
try:
try:
file = self.github_repo_instance.get_contents(
file_path, ref=self.active_branch
)
if file:
return (
f"File already exists at `{file_path}` "
f"on branch `{self.active_branch}`. You must use "
"`update_file` to modify it."
)
except Exception:
# expected behavior, file shouldn't exist yet
pass
self.github_repo_instance.create_file(
path=file_path,
message="Create " + file_path,
content=file_contents,
branch=self.active_branch,
)
return "Created file " + file_path
except Exception as e:
return "Unable to make file due to error:\n" + str(e)
def read_file(self, file_path: str) -> str:
"""
Read a file from this agent's branch, defined by self.active_branch,
which supports PR branches.
Parameters:
file_path(str): the file path
Returns:
str: The file decoded as a string, or an error message if not found
"""
try:
file = self.github_repo_instance.get_contents(
file_path, ref=self.active_branch
)
return file.decoded_content.decode("utf-8")
except Exception as e:
return (
f"File not found `{file_path}` on branch"
f"`{self.active_branch}`. Error: {str(e)}"
)
def update_file(self, file_query: str) -> str:
"""
Updates a file with new content.
Parameters:
file_query(str): Contains the file path and the file contents.
The old file contents is wrapped in OLD <<<< and >>>> OLD
The new file contents is wrapped in NEW <<<< and >>>> NEW
For example:
/test/hello.txt
OLD <<<<
Hello Earth!
>>>> OLD
NEW <<<<
Hello Mars!
>>>> NEW
Returns:
A success or failure message
"""
if self.active_branch == self.github_base_branch:
return (
"You're attempting to commit to the directly"
f"to the {self.github_base_branch} branch, which is protected. "
"Please create a new branch and try again."
)
try:
file_path: str = file_query.split("\n")[0]
old_file_contents = (
file_query.split("OLD <<<<")[1].split(">>>> OLD")[0].strip()
)
new_file_contents = (
file_query.split("NEW <<<<")[1].split(">>>> NEW")[0].strip()
)
file_content = self.read_file(file_path)
updated_file_content = file_content.replace(
old_file_contents, new_file_contents
)
if file_content == updated_file_content:
return (
"File content was not updated because old content was not found."
"It may be helpful to use the read_file action to get "
"the current file contents."
)
self.github_repo_instance.update_file(
path=file_path,
message="Update " + str(file_path),
content=updated_file_content,
branch=self.active_branch,
sha=self.github_repo_instance.get_contents(
file_path, ref=self.active_branch
).sha,
)
return "Updated file " + str(file_path)
except Exception as e:
return "Unable to update file due to error:\n" + str(e)
def delete_file(self, file_path: str) -> str:
"""
Deletes a file from the repo
Parameters:
file_path(str): Where the file is
Returns:
str: Success or failure message
"""
if self.active_branch == self.github_base_branch:
return (
"You're attempting to commit to the directly"
f"to the {self.github_base_branch} branch, which is protected. "
"Please create a new branch and try again."
)
try:
self.github_repo_instance.delete_file(
path=file_path,
message="Delete " + file_path,
branch=self.active_branch,
sha=self.github_repo_instance.get_contents(
file_path, ref=self.active_branch
).sha,
)
return "Deleted file " + file_path
except Exception as e:
return "Unable to delete file due to error:\n" + str(e)
def search_issues_and_prs(self, query: str) -> str:
"""
Searches issues and pull requests in the repository.
Parameters:
query(str): The search query
Returns:
str: A string containing the first 5 issues and pull requests
"""
search_result = self.github.search_issues(query, repo=self.github_repository)
max_items = min(5, search_result.totalCount)
results = [f"Top {max_items} results:"]
for issue in search_result[:max_items]:
results.append(
f"Title: {issue.title}, Number: {issue.number}, State: {issue.state}"
)
return "\n".join(results)
def search_code(self, query: str) -> str:
"""
Searches code in the repository.
# Todo: limit total tokens returned...
Parameters:
query(str): The search query
Returns:
str: A string containing, at most, the top 5 search results
"""
search_result = self.github.search_code(
query=query, repo=self.github_repository
)
if search_result.totalCount == 0:
return "0 results found."
max_results = min(5, search_result.totalCount)
results = [f"Showing top {max_results} of {search_result.totalCount} results:"]
count = 0
for code in search_result:
if count >= max_results:
break
# Get the file content using the PyGithub get_contents method
file_content = self.github_repo_instance.get_contents(
code.path, ref=self.active_branch
).decoded_content.decode()
results.append(
f"Filepath: `{code.path}`\nFile contents: "
f"{file_content}\n<END OF FILE>"
)
count += 1
return "\n".join(results)
def create_review_request(self, reviewer_username: str) -> str:
"""
Creates a review request on *THE* open pull request
that matches the current active_branch.
Parameters:
reviewer_username(str): The username of the person who is being requested
Returns:
str: A message confirming the creation of the review request
"""
pull_requests = self.github_repo_instance.get_pulls(
state="open", sort="created"
)
# find PR against active_branch
pr = next(
(pr for pr in pull_requests if pr.head.ref == self.active_branch), None
)
if pr is None:
return (
"No open pull request found for the "
f"current branch `{self.active_branch}`"
)
try:
pr.create_review_request(reviewers=[reviewer_username])
return (
f"Review request created for user {reviewer_username} "
f"on PR #{pr.number}"
)
except Exception as e:
return f"Failed to create a review request with error {e}"
def run(self, mode: str, query: str) -> str:
if mode == "get_issue":
return json.dumps(self.get_issue(int(query)))
elif mode == "get_pull_request":
return json.dumps(self.get_pull_request(int(query)))
elif mode == "list_pull_request_files":
return json.dumps(self.list_pull_request_files(int(query)))
elif mode == "get_issues":
return self.get_issues()
elif mode == "comment_on_issue":
return self.comment_on_issue(query)
elif mode == "create_file":
return self.create_file(query)
elif mode == "create_pull_request":
return self.create_pull_request(query)
elif mode == "read_file":
return self.read_file(query)
elif mode == "update_file":
return self.update_file(query)
elif mode == "delete_file":
return self.delete_file(query)
elif mode == "list_open_pull_requests":
return self.list_open_pull_requests()
elif mode == "list_files_in_main_branch":
return self.list_files_in_main_branch()
elif mode == "list_files_in_bot_branch":
return self.list_files_in_bot_branch()
elif mode == "list_branches_in_repo":
return self.list_branches_in_repo()
elif mode == "set_active_branch":
return self.set_active_branch(query)
elif mode == "create_branch":
return self.create_branch(query)
elif mode == "get_files_from_directory":
return self.get_files_from_directory(query)
elif mode == "search_issues_and_prs":
return self.search_issues_and_prs(query)
elif mode == "search_code":
return self.search_code(query)
elif mode == "create_review_request":
return self.create_review_request(query)
else:
raise ValueError("Invalid mode" + mode)