From 5ef861950811d554fc5d18412bab7eea47f5cdf4 Mon Sep 17 00:00:00 2001 From: Justin Merrell Date: Wed, 22 Nov 2023 17:26:55 -0500 Subject: [PATCH 1/7] fix: worker not retry get_job --- runpod/serverless/modules/rp_job.py | 8 ++++++-- runpod/serverless/modules/rp_scale.py | 5 ++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/runpod/serverless/modules/rp_job.py b/runpod/serverless/modules/rp_job.py index 3498bacb..a2b2a1a8 100644 --- a/runpod/serverless/modules/rp_job.py +++ b/runpod/serverless/modules/rp_job.py @@ -70,7 +70,7 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] continue next_job = await response.json() - log.debug(f"Request Received | {next_job}") + log.debug("Request Received", {next_job}) # Check if the job is valid job_id = next_job.get("id", None) @@ -87,7 +87,11 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] next_job = None except Exception as err: # pylint: disable=broad-except - log.error(f"Error while getting job: {err}") + err_type = type(err).__name__ + err_message = str(err) + err_traceback = traceback.format_exc() + log.error(f"Failed to get job, error type: {err_type}, error message: {err_message}") + log.error(f"Traceback: {err_traceback}") if next_job is None: log.debug("No job available, waiting for the next one.") diff --git a/runpod/serverless/modules/rp_scale.py b/runpod/serverless/modules/rp_scale.py index 39b4efa3..fe7a9199 100644 --- a/runpod/serverless/modules/rp_scale.py +++ b/runpod/serverless/modules/rp_scale.py @@ -13,6 +13,7 @@ log = RunPodLogger() job_list = Jobs() + class JobScaler(): """ A class for automatically retrieving new jobs from the server and processing them concurrently. @@ -100,7 +101,7 @@ async def get_jobs(self, session): break for _ in range(self.num_concurrent_get_job_requests): - job = await get_job(session, retry=False) + job = await get_job(session) self.job_history.append(1 if job else 0) if job: yield job @@ -128,8 +129,6 @@ async def get_jobs(self, session): f"{self.num_concurrent_get_job_requests}." ) - - def upscale_rate(self) -> None: """ Upscale the job retrieval rate by adjusting the number of concurrent requests. From 4c5f2987c681b42719a125473c9b4d5eb2bc50d9 Mon Sep 17 00:00:00 2001 From: Justin Merrell Date: Wed, 22 Nov 2023 17:52:11 -0500 Subject: [PATCH 2/7] fix: job test --- runpod/serverless/modules/rp_job.py | 8 ++++---- tests/test_serverless/test_modules/test_job.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/runpod/serverless/modules/rp_job.py b/runpod/serverless/modules/rp_job.py index a2b2a1a8..cb36a074 100644 --- a/runpod/serverless/modules/rp_job.py +++ b/runpod/serverless/modules/rp_job.py @@ -53,19 +53,19 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] async with session.get(_job_get_url()) as response: if response.status == 204: log.debug("No content, no job to process.") - if not retry: + if retry is False: return None continue if response.status == 400: log.debug("Received 400 status, expected when FlashBoot is enabled.") - if not retry: + if retry is False: return None continue if response.status != 200: log.error(f"Failed to get job, status code: {response.status}") - if not retry: + if retry is False: return None continue @@ -95,7 +95,7 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] if next_job is None: log.debug("No job available, waiting for the next one.") - if not retry: + if retry is False: return None log.debug("Confirmed valid request.", next_job['id']) diff --git a/tests/test_serverless/test_modules/test_job.py b/tests/test_serverless/test_modules/test_job.py index 588bfd76..1de1b4d7 100644 --- a/tests/test_serverless/test_modules/test_job.py +++ b/tests/test_serverless/test_modules/test_job.py @@ -155,7 +155,7 @@ async def test_get_job_exception(self): job = await rp_job.get_job(mock_session_exception, retry=False) assert job is None - assert mock_log.error.call_count == 1 + assert mock_log.error.call_count == 2 class TestRunJob(IsolatedAsyncioTestCase): From 55d968873f79a2547992e327c3a203f0cbd8999f Mon Sep 17 00:00:00 2001 From: Justin Merrell Date: Wed, 22 Nov 2023 18:04:59 -0500 Subject: [PATCH 3/7] Update rp_job.py --- runpod/serverless/modules/rp_job.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/runpod/serverless/modules/rp_job.py b/runpod/serverless/modules/rp_job.py index cb36a074..ee11c8eb 100644 --- a/runpod/serverless/modules/rp_job.py +++ b/runpod/serverless/modules/rp_job.py @@ -76,6 +76,9 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] job_id = next_job.get("id", None) job_input = next_job.get("input", None) + print(f"job_id: {job_id}") + print(f"job_input: {job_input}") + if None in [job_id, job_input]: missing_fields = [] if job_id is None: From 65ff4a5415d73a0f820f7321418f2c578b5e8532 Mon Sep 17 00:00:00 2001 From: Justin Merrell Date: Wed, 22 Nov 2023 18:19:28 -0500 Subject: [PATCH 4/7] Update rp_job.py --- runpod/serverless/modules/rp_job.py | 42 ++++++++++++++--------------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/runpod/serverless/modules/rp_job.py b/runpod/serverless/modules/rp_job.py index ee11c8eb..b0c8528d 100644 --- a/runpod/serverless/modules/rp_job.py +++ b/runpod/serverless/modules/rp_job.py @@ -54,40 +54,37 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] if response.status == 204: log.debug("No content, no job to process.") if retry is False: - return None + break continue if response.status == 400: log.debug("Received 400 status, expected when FlashBoot is enabled.") if retry is False: - return None + break continue if response.status != 200: log.error(f"Failed to get job, status code: {response.status}") if retry is False: - return None + break continue next_job = await response.json() log.debug("Request Received", {next_job}) - # Check if the job is valid - job_id = next_job.get("id", None) - job_input = next_job.get("input", None) + # Check if the job is valid + job_id = next_job.get("id", None) + job_input = next_job.get("input", None) - print(f"job_id: {job_id}") - print(f"job_input: {job_input}") + if None in [job_id, job_input]: + missing_fields = [] + if job_id is None: + missing_fields.append("id") + if job_input is None: + missing_fields.append("input") - if None in [job_id, job_input]: - missing_fields = [] - if job_id is None: - missing_fields.append("id") - if job_input is None: - missing_fields.append("input") - - log.error(f"Job has missing field(s): {', '.join(missing_fields)}.") - next_job = None + log.error(f"Job has missing field(s): {', '.join(missing_fields)}.") + next_job = None except Exception as err: # pylint: disable=broad-except err_type = type(err).__name__ @@ -99,15 +96,16 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] if next_job is None: log.debug("No job available, waiting for the next one.") if retry is False: - return None - - log.debug("Confirmed valid request.", next_job['id']) + break + else: + log.debug("Confirmed valid request.", next_job['id']) - if next_job: job_list.add_job(next_job["id"]) log.debug("Request ID added.", next_job['id']) - return next_job + return next_job + + return None async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]: From 5e6e92b801473b59b1e332d9f4485ba4f997ba63 Mon Sep 17 00:00:00 2001 From: Justin Merrell Date: Wed, 22 Nov 2023 18:43:55 -0500 Subject: [PATCH 5/7] Update rp_job.py --- runpod/serverless/modules/rp_job.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/runpod/serverless/modules/rp_job.py b/runpod/serverless/modules/rp_job.py index b0c8528d..9db22b3c 100644 --- a/runpod/serverless/modules/rp_job.py +++ b/runpod/serverless/modules/rp_job.py @@ -69,7 +69,7 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] break continue - next_job = await response.json() + received_request = await response.json() log.debug("Request Received", {next_job}) # Check if the job is valid @@ -85,6 +85,8 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] log.error(f"Job has missing field(s): {', '.join(missing_fields)}.") next_job = None + else: + next_job = received_request except Exception as err: # pylint: disable=broad-except err_type = type(err).__name__ From e34b9e08fc0e415361abbe6bb35790b036b198ca Mon Sep 17 00:00:00 2001 From: Justin Merrell Date: Wed, 22 Nov 2023 18:44:59 -0500 Subject: [PATCH 6/7] Update rp_job.py --- runpod/serverless/modules/rp_job.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/runpod/serverless/modules/rp_job.py b/runpod/serverless/modules/rp_job.py index 9db22b3c..9a73bbce 100644 --- a/runpod/serverless/modules/rp_job.py +++ b/runpod/serverless/modules/rp_job.py @@ -73,8 +73,8 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] log.debug("Request Received", {next_job}) # Check if the job is valid - job_id = next_job.get("id", None) - job_input = next_job.get("input", None) + job_id = received_request.get("id", None) + job_input = received_request.get("input", None) if None in [job_id, job_input]: missing_fields = [] @@ -84,7 +84,6 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] missing_fields.append("input") log.error(f"Job has missing field(s): {', '.join(missing_fields)}.") - next_job = None else: next_job = received_request From 9a3bcd9bc39917201ab9682de5e90f9d2449304b Mon Sep 17 00:00:00 2001 From: Justin Merrell Date: Wed, 22 Nov 2023 18:56:52 -0500 Subject: [PATCH 7/7] Update rp_job.py --- runpod/serverless/modules/rp_job.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/runpod/serverless/modules/rp_job.py b/runpod/serverless/modules/rp_job.py index 9a73bbce..c5181a31 100644 --- a/runpod/serverless/modules/rp_job.py +++ b/runpod/serverless/modules/rp_job.py @@ -8,6 +8,7 @@ import os import json +import asyncio import traceback from aiohttp import ClientSession @@ -98,6 +99,8 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] log.debug("No job available, waiting for the next one.") if retry is False: break + + await asyncio.sleep(1) else: log.debug("Confirmed valid request.", next_job['id'])