Skip to content

Commit 34b0a5f

Browse files
Merge pull request #230 from runpod/job-take-bug
Job take bug
2 parents 863ad22 + 9a3bcd9 commit 34b0a5f

File tree

3 files changed

+37
-29
lines changed

3 files changed

+37
-29
lines changed

runpod/serverless/modules/rp_job.py

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import os
1010
import json
11+
import asyncio
1112
import traceback
1213
from aiohttp import ClientSession
1314

@@ -53,54 +54,62 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]
5354
async with session.get(_job_get_url()) as response:
5455
if response.status == 204:
5556
log.debug("No content, no job to process.")
56-
if not retry:
57-
return None
57+
if retry is False:
58+
break
5859
continue
5960

6061
if response.status == 400:
6162
log.debug("Received 400 status, expected when FlashBoot is enabled.")
62-
if not retry:
63-
return None
63+
if retry is False:
64+
break
6465
continue
6566

6667
if response.status != 200:
6768
log.error(f"Failed to get job, status code: {response.status}")
68-
if not retry:
69-
return None
69+
if retry is False:
70+
break
7071
continue
7172

72-
next_job = await response.json()
73-
log.debug(f"Request Received | {next_job}")
73+
received_request = await response.json()
74+
log.debug("Request Received", {next_job})
7475

75-
# Check if the job is valid
76-
job_id = next_job.get("id", None)
77-
job_input = next_job.get("input", None)
76+
# Check if the job is valid
77+
job_id = received_request.get("id", None)
78+
job_input = received_request.get("input", None)
7879

79-
if None in [job_id, job_input]:
80-
missing_fields = []
81-
if job_id is None:
82-
missing_fields.append("id")
83-
if job_input is None:
84-
missing_fields.append("input")
80+
if None in [job_id, job_input]:
81+
missing_fields = []
82+
if job_id is None:
83+
missing_fields.append("id")
84+
if job_input is None:
85+
missing_fields.append("input")
8586

86-
log.error(f"Job has missing field(s): {', '.join(missing_fields)}.")
87-
next_job = None
87+
log.error(f"Job has missing field(s): {', '.join(missing_fields)}.")
88+
else:
89+
next_job = received_request
8890

8991
except Exception as err: # pylint: disable=broad-except
90-
log.error(f"Error while getting job: {err}")
92+
err_type = type(err).__name__
93+
err_message = str(err)
94+
err_traceback = traceback.format_exc()
95+
log.error(f"Failed to get job, error type: {err_type}, error message: {err_message}")
96+
log.error(f"Traceback: {err_traceback}")
9197

9298
if next_job is None:
9399
log.debug("No job available, waiting for the next one.")
94-
if not retry:
95-
return None
100+
if retry is False:
101+
break
96102

97-
log.debug("Confirmed valid request.", next_job['id'])
103+
await asyncio.sleep(1)
104+
else:
105+
log.debug("Confirmed valid request.", next_job['id'])
98106

99-
if next_job:
100107
job_list.add_job(next_job["id"])
101108
log.debug("Request ID added.", next_job['id'])
102109

103-
return next_job
110+
return next_job
111+
112+
return None
104113

105114

106115
async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:

runpod/serverless/modules/rp_scale.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
log = RunPodLogger()
1414
job_list = Jobs()
1515

16+
1617
class JobScaler():
1718
"""
1819
A class for automatically retrieving new jobs from the server and processing them concurrently.
@@ -100,7 +101,7 @@ async def get_jobs(self, session):
100101
break
101102

102103
for _ in range(self.num_concurrent_get_job_requests):
103-
job = await get_job(session, retry=False)
104+
job = await get_job(session)
104105
self.job_history.append(1 if job else 0)
105106
if job:
106107
yield job
@@ -128,8 +129,6 @@ async def get_jobs(self, session):
128129
f"{self.num_concurrent_get_job_requests}."
129130
)
130131

131-
132-
133132
def upscale_rate(self) -> None:
134133
"""
135134
Upscale the job retrieval rate by adjusting the number of concurrent requests.

tests/test_serverless/test_modules/test_job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ async def test_get_job_exception(self):
155155
job = await rp_job.get_job(mock_session_exception, retry=False)
156156

157157
assert job is None
158-
assert mock_log.error.call_count == 1
158+
assert mock_log.error.call_count == 2
159159

160160

161161
class TestRunJob(IsolatedAsyncioTestCase):

0 commit comments

Comments
 (0)