From b8d2b263b9b46c1d15408a710bc1bee72eef2d95 Mon Sep 17 00:00:00 2001 From: mrT23 Date: Thu, 5 Sep 2024 07:42:50 +0300 Subject: [PATCH 1/4] feat: enhance GitHub polling with synchronous comment processing and improved logging and bug fixing --- pr_agent/servers/github_polling.py | 189 +++++++++++++++++++++++------ 1 file changed, 150 insertions(+), 39 deletions(-) diff --git a/pr_agent/servers/github_polling.py b/pr_agent/servers/github_polling.py index 64f4dea9d..73b171a78 100644 --- a/pr_agent/servers/github_polling.py +++ b/pr_agent/servers/github_polling.py @@ -1,7 +1,10 @@ import asyncio +import multiprocessing +from collections import deque import traceback from datetime import datetime, timezone - +import time +import requests import aiohttp from pr_agent.agent.pr_agent import PRAgent @@ -24,6 +27,106 @@ def now() -> str: now_utc = now_utc.replace("+00:00", "Z") return now_utc +async def async_handle_request(pr_url, rest_of_comment, comment_id, git_provider): + agent = PRAgent() + success = await agent.handle_request( + pr_url, + rest_of_comment, + notify=lambda: git_provider.add_eyes_reaction(comment_id) + ) + return success + +def run_handle_request(pr_url, rest_of_comment, comment_id, git_provider): + return asyncio.run(async_handle_request(pr_url, rest_of_comment, comment_id, git_provider)) + + +def process_comment_sync(pr_url, rest_of_comment, comment_id): + try: + # Run the async handle_request in a separate function + git_provider = get_git_provider()(pr_url=pr_url) + success = run_handle_request(pr_url, rest_of_comment, comment_id, git_provider) + except Exception as e: + get_logger().error(f"Error processing comment: {e}", artifact={"traceback": traceback.format_exc()}) + + +async def process_comment(pr_url, rest_of_comment, comment_id): + try: + git_provider = get_git_provider()(pr_url=pr_url) + git_provider.set_pr(pr_url) + agent = PRAgent() + success = await agent.handle_request( + pr_url, + rest_of_comment, + notify=lambda: git_provider.add_eyes_reaction(comment_id) + ) + get_logger().info(f"Finished processing comment for PR: {pr_url}") + except Exception as e: + get_logger().error(f"Error processing comment: {e}", artifact={"traceback": traceback.format_exc()}) + +async def is_valid_notification(notification, headers, handled_ids, session, user_id): + try: + if 'reason' in notification and notification['reason'] == 'mention': + if 'subject' in notification and notification['subject']['type'] == 'PullRequest': + pr_url = notification['subject']['url'] + latest_comment = notification['subject']['latest_comment_url'] + if not latest_comment or not isinstance(latest_comment, str): + get_logger().debug(f"not latest_comment, but its ok") + # continue + async with session.get(latest_comment, headers=headers) as comment_response: + check_prev_comments = False + if comment_response.status == 200: + comment = await comment_response.json() + if 'id' in comment: + if comment['id'] in handled_ids: + get_logger().debug(f"comment['id'] in handled_ids") + return False, handled_ids + else: + handled_ids.add(comment['id']) + if 'user' in comment and 'login' in comment['user']: + if comment['user']['login'] == user_id: + get_logger().debug(f"comment['user']['login'] == user_id") + check_prev_comments = True + comment_body = comment.get('body', '') + if not comment_body: + get_logger().debug(f"no comment_body") + check_prev_comments = True + commenter_github_user = comment['user']['login'] \ + if 'user' in comment else '' + get_logger().info(f"Polling, pr_url: {pr_url}", + artifact={"comment": comment_body}) + user_tag = "@" + user_id + if user_tag not in comment_body: + get_logger().debug(f"user_tag not in comment_body") + check_prev_comments = True + + if not check_prev_comments: + return True, handled_ids, comment, comment_body, pr_url, user_tag + else: # we could not find the user tag in the latest comment. Check previous comments + # get all comments in the PR + requests_url = f"{pr_url}/comments".replace("pulls", "issues") + comments_response = requests.get(requests_url, headers=headers) + comments = comments_response.json()[::-1] + max_comment_to_scan = 4 + for comment in comments[:max_comment_to_scan]: + if 'user' in comment and 'login' in comment['user']: + if comment['user']['login'] == user_id: + continue + comment_body = comment.get('body', '') + if not comment_body: + continue + if user_tag in comment_body: + get_logger().info("found user tag in previous comments") + return True, handled_ids, comment, comment_body, pr_url, user_tag + + get_logger().error(f"Failed to fetch comments for PR: {pr_url}") + return False, handled_ids + + return False, handled_ids + except Exception as e: + get_logger().error(f"Error processing notification: {e}", artifact={"traceback": traceback.format_exc()}) + return False, handled_ids + + async def polling_loop(): """ @@ -34,7 +137,6 @@ async def polling_loop(): last_modified = [None] git_provider = get_git_provider()() user_id = git_provider.get_user_id() - agent = PRAgent() get_settings().set("CONFIG.PUBLISH_OUTPUT_PROGRESS", False) get_settings().set("pr_description.publish_description_as_comment", True) @@ -53,7 +155,8 @@ async def polling_loop(): async with aiohttp.ClientSession() as session: while True: try: - await asyncio.sleep(5) + await asyncio.sleep(3) + get_logger().info("Polling for notifications") headers = { "Accept": "application/vnd.github.v3+json", "Authorization": f"Bearer {token}" @@ -74,43 +177,38 @@ async def polling_loop(): notifications = await response.json() if not notifications: continue + get_logger().info(f"Received {len(notifications)} notifications") + task_queue = deque() for notification in notifications: + # mark notification as read + await mark_notification_as_read(headers, notification, session) + handled_ids.add(notification['id']) - if 'reason' in notification and notification['reason'] == 'mention': - if 'subject' in notification and notification['subject']['type'] == 'PullRequest': - pr_url = notification['subject']['url'] - latest_comment = notification['subject']['latest_comment_url'] - if not latest_comment or not isinstance(latest_comment, str): - continue - async with session.get(latest_comment, headers=headers) as comment_response: - if comment_response.status == 200: - comment = await comment_response.json() - if 'id' in comment: - if comment['id'] in handled_ids: - continue - else: - handled_ids.add(comment['id']) - if 'user' in comment and 'login' in comment['user']: - if comment['user']['login'] == user_id: - continue - comment_body = comment.get('body', '') - if not comment_body: - continue - commenter_github_user = comment['user']['login'] \ - if 'user' in comment else '' - get_logger().info(f"Polling, pr_url: {pr_url}", - artifact={"comment": comment_body}) - user_tag = "@" + user_id - if user_tag not in comment_body: - continue - rest_of_comment = comment_body.split(user_tag)[1].strip() - comment_id = comment['id'] - git_provider.set_pr(pr_url) - success = await agent.handle_request(pr_url, rest_of_comment, - notify=lambda: git_provider.add_eyes_reaction( - comment_id)) # noqa E501 - if not success: - git_provider.set_pr(pr_url) + output = await is_valid_notification(notification, headers, handled_ids, session, user_id) + if output[0]: + _, handled_ids, comment, comment_body, pr_url, user_tag = output + rest_of_comment = comment_body.split(user_tag)[1].strip() + comment_id = comment['id'] + + # Add to the task queue + get_logger().info( + f"Adding comment processing to task queue for PR, {pr_url}, comment_body: {comment_body}") + task_queue.append((process_comment_sync, (pr_url, rest_of_comment, comment_id))) + get_logger().info(f"Queued comment processing for PR: {pr_url}") + else: + get_logger().debug(f"Skipping comment processing for PR: {pr_url}") + + if task_queue: + processes = [] + for func, args in task_queue: # Create parallel tasks + p = multiprocessing.Process(target=func, args=args) + processes.append(p) + p.start() + task_queue.clear() + + # Dont wait for all processes to complete. Move on to the next iteration + # for p in processes: + # p.join() elif response.status != 304: print(f"Failed to fetch notifications. Status code: {response.status}") @@ -120,5 +218,18 @@ async def polling_loop(): artifact={"traceback": traceback.format_exc()}) +async def mark_notification_as_read(headers, notification, session): + async with session.patch( + f"https://api.github.com/notifications/threads/{notification['id']}", + headers=headers) as mark_read_response: + if mark_read_response.status != 205: + get_logger().error( + f"Failed to mark notification as read. Status code: {mark_read_response.status}") + + if __name__ == '__main__': - asyncio.run(polling_loop()) \ No newline at end of file + asyncio.run(polling_loop()) + +# # Example usage +# task_queue = deque([lambda: print("Task executed1"), lambda: print("Task executed2")]) +# asyncio.run(background_task_manager(task_queue)) \ No newline at end of file From 19048ee70558503aca82ebebf648677a230894ec Mon Sep 17 00:00:00 2001 From: mrT23 Date: Thu, 5 Sep 2024 16:53:31 +0300 Subject: [PATCH 2/4] feat: enhance GitHub polling with synchronous comment processing and improved logging --- pr_agent/servers/github_polling.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/pr_agent/servers/github_polling.py b/pr_agent/servers/github_polling.py index 73b171a78..805eae512 100644 --- a/pr_agent/servers/github_polling.py +++ b/pr_agent/servers/github_polling.py @@ -16,6 +16,15 @@ NOTIFICATION_URL = "https://api.github.com/notifications" +async def mark_notification_as_read(headers, notification, session): + async with session.patch( + f"https://api.github.com/notifications/threads/{notification['id']}", + headers=headers) as mark_read_response: + if mark_read_response.status != 205: + get_logger().error( + f"Failed to mark notification as read. Status code: {mark_read_response.status}") + + def now() -> str: """ Get the current UTC time in ISO 8601 format. @@ -218,18 +227,5 @@ async def polling_loop(): artifact={"traceback": traceback.format_exc()}) -async def mark_notification_as_read(headers, notification, session): - async with session.patch( - f"https://api.github.com/notifications/threads/{notification['id']}", - headers=headers) as mark_read_response: - if mark_read_response.status != 205: - get_logger().error( - f"Failed to mark notification as read. Status code: {mark_read_response.status}") - - if __name__ == '__main__': asyncio.run(polling_loop()) - -# # Example usage -# task_queue = deque([lambda: print("Task executed1"), lambda: print("Task executed2")]) -# asyncio.run(background_task_manager(task_queue)) \ No newline at end of file From f0d780c7ece7c1e504837cd7ee737292cfbfbf03 Mon Sep 17 00:00:00 2001 From: mrT23 Date: Thu, 5 Sep 2024 16:55:10 +0300 Subject: [PATCH 3/4] feat: enhance GitHub polling with synchronous comment processing and improved logging --- pr_agent/servers/github_polling.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pr_agent/servers/github_polling.py b/pr_agent/servers/github_polling.py index 805eae512..14379083b 100644 --- a/pr_agent/servers/github_polling.py +++ b/pr_agent/servers/github_polling.py @@ -164,8 +164,7 @@ async def polling_loop(): async with aiohttp.ClientSession() as session: while True: try: - await asyncio.sleep(3) - get_logger().info("Polling for notifications") + await asyncio.sleep(5) headers = { "Accept": "application/vnd.github.v3+json", "Authorization": f"Bearer {token}" From 85754d2d79aa144341e947079e94ab9aa99dccf8 Mon Sep 17 00:00:00 2001 From: mrT23 Date: Thu, 5 Sep 2024 16:57:10 +0300 Subject: [PATCH 4/4] feat: enhance GitHub polling with synchronous comment processing and improved logging --- pr_agent/servers/github_polling.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pr_agent/servers/github_polling.py b/pr_agent/servers/github_polling.py index 14379083b..954f8c79f 100644 --- a/pr_agent/servers/github_polling.py +++ b/pr_agent/servers/github_polling.py @@ -206,12 +206,17 @@ async def polling_loop(): else: get_logger().debug(f"Skipping comment processing for PR: {pr_url}") + max_allowed_parallel_tasks = 10 if task_queue: processes = [] - for func, args in task_queue: # Create parallel tasks + for i, func, args in enumerate(task_queue): # Create parallel tasks p = multiprocessing.Process(target=func, args=args) processes.append(p) p.start() + if i > max_allowed_parallel_tasks: + get_logger().error( + f"Dropping {len(task_queue) - max_allowed_parallel_tasks} tasks from polling session") + break task_queue.clear() # Dont wait for all processes to complete. Move on to the next iteration